第二节开始!
设计
经过第一节的学习,我们已经有了相应的了解,下面我们来代码实现:
首先清楚这几个点,我们存放的
对象有哪些
,以及容器
对象:Job实体类
, DelayJob对象 延时任务实体类
容器:
Job池,用于存放Job对象(Job Pool) (Hash结构:k->jobId v->job)
Job桶,用于存放Job延时任务对象,还有任务的状态,重试次数等等(Job Bucket) (ZSet结构: 大key-> bucketName 小key-> job score -> 延时终点时间戳(用于排序)核心)
Job队列,用于处理Timer从桶中轮询出来的对象,并且由topic路由 ;(List结构:k -> 业务Id+topic v -> DelayJob对象)(Lpush Rpop)
处理器:
延时任务处理器:处理DelayJob对象,定时任务执行,处理演示任务和超时任务(重置状态继续执行)
timer 从桶中轮询出来DelayJob对象,并且由topic路由到指定的Job队列处理
业务流程
首先我们分析下这个流程
用户提交任务。首先将任务推送至延迟队列中。
延迟队列接收到任务后,首先将任务推送至 job pool 中,然后计算其执行时间。
然后生成延迟任务(仅仅包含任务 id)放入某个桶中
时间组件时刻轮询各个桶,当时间到达的时候从 job pool 中获得任务元信息。
监测任务的合法性如果已经删除则 pass。继续轮询。如果任务合法则再次计算时间
如果合法则计算时间,如果时间合法:根据 topic 将任务放入对应的 ready queue,然后从 bucket 中移除。如果时间不合法,则重新计算时间再次放入 bucket,并移除之前的 bucket 中的内容
消费端轮询对应 topic 的 ready queue。获取 job 后做自己的业务逻辑。与此同时,服务端将已经被消费端获取的 job 按照其设定的 TTR,重新计算执行时间,并将其放入 bucket。
完成消费后,发送 finish 消息,服务端根据 job id 删除对应信息。
我们现在可以了解到中间存在的几个组件
延迟队列,为 Redis 延迟队列。实现消息传递
Job pool 任务池保存 job 元信息。根据文章描述使用 K/V 的数据结构,key 为 ID,value 为 job
Delay Bucket 用来保存业务的延迟任务。文章中描述使用轮询方式放入某一个 Bucket 可以知道其并没有使用 topic 来区分,个人这里默认使用顺序插入
Timer 时间组件,负责扫描各个 Bucket。根据文章描述存在多个 Timer,但是同一个 Timer 同一时间只能扫描一个 Bucket
Ready Queue 负责存放需要被完成的任务,但是根据描述根据 Topic 的不同存在多个 Ready Queue。
其中 Timer 负责轮询,Job pool、Delay Bucket、Ready Queue 都是不同职责的集合。
任务状态
ready:可执行状态,
delay:不可执行状态,等待时钟周期。
reserved:已被消费者读取,但没有完成消费。
deleted:已被消费完成或者已被删除。
对外提供的接口
接口 | 描述 | 数据 |
---|---|---|
add | 添加任务 | Job数据 |
pop | 取出待处理任务 | topic就是任务分组 |
finish | 完成任务 | 任务ID |
delete | 删除任务 | 任务ID |
额外的内容
首先根据状态状态描述,finish 和 delete 操作都是将任务设置成 deleted 状态。
根据文章描述的操作,在执行 finish 或者 delete 的操作的时候任务已经从元数据中移除,此时 deleted 状态可能只存在极短时间,所以实际实现中就直接删除了。
文章中并没有说明响应超时后如何处理,所以个人现在将其重新投入了待处理队列。
文章中因为使用了集群,所以使用 redis 的 setnx 锁来保证多个时间循环处理多个桶的时候不会出现重复循环。这里因为是简单的实现,所以就很简单的每个桶设置一个时间队列处理。也是为了方便简单处理。关于分布式锁可以看我之前的文章里面有描述。
实现
现在我们根据设计内容完成设计。这一块设计我们分四步完成
任务及相关对象
目前需要两个对象,一个是任务对象(job)一个负责保存任务引用的对象**(delay job)** 任务对象
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Job implements Serializable {
/**
* 延迟任务的唯一标识,用于检索任务
*/
@JsonSerialize(using = ToStringSerializer.class)
private Long id;
/**
* 任务类型(具体业务类型)
*/
private String topic;
/**
* 任务的延迟时间
*/
private long delayTime;
/**
* 任务的执行超时时间
*/
private long ttrTime;
/**
* 任务具体的消息内容,用于处理具体业务逻辑用
*/
private String message;
/**
* 重试次数
*/
private int retryCount;
/**
* 任务状态
*/
private JobStatus status;
}
任务引用对象
@Data
@AllArgsConstructor
public class DelayJob implements Serializable {
/**
* 延迟任务的唯一标识
*/
private long jodId;
/**
* 任务的执行时间
*/
private long delayDate;
/**
* 任务类型(具体业务类型)
*/
private String topic;
public DelayJob(Job job) {
this.jodId = job.getId();
this.delayDate = System.currentTimeMillis() + job.getDelayTime();
this.topic = job.getTopic();
}
public DelayJob(Object value, Double score) {
this.jodId = Long.parseLong(String.valueOf(value));
this.delayDate = System.currentTimeMillis() + score.longValue();
}
}
容器
目前我们需要完成三个容器的创建,Job 任务池、延迟任务容器、待完成任务容器 job 任务池,为普通的** K/V **结构,提供基础的操作
@Component
@Slf4j
public class JobPool {
@Autowired
private RedisTemplate redisTemplate;
private String NAME = "job.pool";
private BoundHashOperations getPool () {
BoundHashOperations ops = redisTemplate.boundHashOps(NAME);
return ops;
}
/**
* 添加任务
* @param job
*/
public void addJob (Job job) {
log.info("任务池添加任务:{}", JSON.toJSONString(job));
getPool().put(job.getId(),job);
return ;
}
/**
* 获得任务
* @param jobId
* @return
*/
public Job getJob(Long jobId) {
Object o = getPool().get(jobId);
if (o instanceof Job) {
return (Job) o;
}
return null;
}
/**
* 移除任务
* @param jobId
*/
public void removeDelayJob (Long jobId) {
log.info("任务池移除任务:{}",jobId);
// 移除任务
getPool().delete(jobId);
}
}
延迟任务,使用可排序的** ZSet 保存数据**,提供取出最小值等操作
@Slf4j
@Component
public class DelayBucket {
@Autowired
private RedisTemplate redisTemplate;
private static AtomicInteger index = new AtomicInteger(0);
@Value("${thread.size}")
private int bucketsSize;
private List <String> bucketNames = new ArrayList <>();
@Bean
public List <String> createBuckets() {
for (int i = 0; i < bucketsSize; i++) {
bucketNames.add("bucket" + i);
}
return bucketNames;
}
/**
* 获得桶的名称
* @return
*/
private String getThisBucketName() {
int thisIndex = index.addAndGet(1);
int i1 = thisIndex % bucketsSize;
return bucketNames.get(i1);
}
/**
* 获得桶集合
* @param bucketName
* @return
*/
private BoundZSetOperations getBucket(String bucketName) {
return redisTemplate.boundZSetOps(bucketName);
}
/**
* 放入延时任务
* @param job
*/
public void addDelayJob(DelayJob job) {
log.info("添加延迟任务:{}", JSON.toJSONString(job));
String thisBucketName = getThisBucketName();
BoundZSetOperations bucket = getBucket(thisBucketName);
bucket.add(job,job.getDelayDate());
}
/**
* 获得最新的延期任务
* @return
*/
public DelayJob getFirstDelayTime(Integer index) {
String name = bucketNames.get(index);
BoundZSetOperations bucket = getBucket(name);
Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1);
if (CollectionUtils.isEmpty(set)) {
return null;
}
ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0];
Object value = typedTuple.getValue();
if (value instanceof DelayJob) {
return (DelayJob) value;
}
return null;
}
/**
* 移除延时任务
* @param index
* @param delayJob
*/
public void removeDelayTime(Integer index,DelayJob delayJob) {
String name = bucketNames.get(index);
BoundZSetOperations bucket = getBucket(name);
bucket.remove(delayJob);
}
}
待完成任务,内部使用 topic 进行细分,每个 topic 对应一个** list 集合**
@Component
@Slf4j
public class ReadyQueue {
@Autowired
private RedisTemplate redisTemplate;
private String NAME = "process.queue";
private String getKey(String topic) {
return NAME + topic;
}
/**
* 获得队列
* @param topic
* @return
*/
private BoundListOperations getQueue (String topic) {
BoundListOperations ops = redisTemplate.boundListOps(getKey(topic));
return ops;
}
/**
* 设置任务
* @param delayJob
*/
public void pushJob(DelayJob delayJob) {
log.info("执行队列添加任务:{}",delayJob);
BoundListOperations listOperations = getQueue(delayJob.getTopic());
listOperations.leftPush(delayJob);
}
/**
* 移除并获得任务
* @param topic
* @return
*/
public DelayJob popJob(String topic) {
BoundListOperations listOperations = getQueue(topic);
Object o = listOperations.leftPop();
if (o instanceof DelayJob) {
log.info("执行队列取出任务:{}", JSON.toJSONString((DelayJob) o));
return (DelayJob) o;
}
return null;
}
}
轮询处理
设置了线程池为每个 bucket 设置一个轮询操作
@Component
public class DelayTimer implements ApplicationListener <ContextRefreshedEvent> {
@Autowired
private DelayBucket delayBucket;
@Autowired
private JobPool jobPool;
@Autowired
private ReadyQueue readyQueue;
@Value("${thread.size}")
private int length;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
ExecutorService executorService = new ThreadPoolExecutor(
length,
length,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue <Runnable>());
for (int i = 0; i < length; i++) {
executorService.execute(
new DelayJobHandler(
delayBucket,
jobPool,
readyQueue,
i));
}
}
}
处理任务Handler(重置超时任务状态,执行延时任务;主要是负责状态的流转)
@Slf4j
@Data
@AllArgsConstructor
public class DelayJobHandler implements Runnable{
/**
* 延迟队列
*/
private DelayBucket delayBucket;
/**
* 任务池
*/
private JobPool jobPool;
private ReadyQueue readyQueue;
/**
* 索引
*/
private int index;
/**
*/
@Override
public void run() {
log.info("定时任务开始执行");
while (true) {
try {
DelayJob delayJob = delayBucket.getFirstDelayTime(index);
//没有任务
if (delayJob == null) {
sleep();
continue;
}
// 发现延时任务
// 延迟时间没到
if (delayJob.getDelayDate() > System.currentTimeMillis()) {
sleep();
continue;
}
Job job = jobPool.getJob(delayJob.getJodId());
//延迟任务元数据不存在
if (job == null) {
log.info("移除不存在任务:{}", JSON.toJSONString(delayJob));
delayBucket.removeDelayTime(index,delayJob);
continue;
}
JobStatus status = job.getStatus();
if (JobStatus.RESERVED.equals(status)) {
log.info("处理超时任务:{}", JSON.toJSONString(job));
// 超时任务
processTtrJob(delayJob,job);
} else {
log.info("处理延时任务:{}", JSON.toJSONString(job));
// 延时任务
processDelayJob(delayJob,job);
}
} catch (Exception e) {
log.error("扫描DelayBucket出错:",e.getStackTrace());
sleep();
}
}
}
/**
* 处理ttr的任务
*/
private void processTtrJob(DelayJob delayJob,Job job) {
job.setStatus(JobStatus.DELAY);
// 修改任务池状态
jobPool.addJob(job);
// 移除delayBucket中的任务
delayBucket.removeDelayTime(index,delayJob);
Long delayDate = System.currentTimeMillis() + job.getDelayTime();
delayJob.setDelayDate(delayDate);
// 再次添加到任务中
delayBucket.addDelayJob(delayJob);
}
/**
* 处理延时任务
*/
private void processDelayJob(DelayJob delayJob,Job job) {
job.setStatus(JobStatus.READY);
// 修改任务池状态
jobPool.addJob(job);
// 设置到待处理任务
readyQueue.pushJob(delayJob);
// 移除delayBucket中的任务
delayBucket.removeDelayTime(index,delayJob);
}
private void sleep(){
try {
Thread.sleep(DelayConfig.SLEEP_TIME);
} catch (InterruptedException e){
log.error("",e);
}
}
}
测试请求
/**
* 测试用请求
* @author daify
* @date 2019-07-29 10:26
**/
@RestController
@RequestMapping("delay")
public class DelayController {
@Autowired
private JobService jobService;
/**
* 添加
* @param request
* @return
*/
@RequestMapping(value = "add",method = RequestMethod.POST)
public String addDefJob(Job request) {
DelayJob delayJob = jobService.addDefJob(request);
return JSON.toJSONString(delayJob);
}
/**
* 获取
* @return
*/
@RequestMapping(value = "pop",method = RequestMethod.GET)
public String getProcessJob(String topic) {
Job process = jobService.getProcessJob(topic);
return JSON.toJSONString(process);
}
/**
* 完成一个执行的任务
* @param jobId
* @return
*/
@RequestMapping(value = "finish",method = RequestMethod.DELETE)
public String finishJob(Long jobId) {
jobService.finishJob(jobId);
return "success";
}
@RequestMapping(value = "delete",method = RequestMethod.DELETE)
public String deleteJob(Long jobId) {
jobService.deleteJob(jobId);
return "success";
}
}
测试
添加延迟任务
通过 postman 请求:localhost:8000/delay/addTest
此时这条延时任务被添加进了线程池中
2019-08-12 21
36.589 INFO 21444 --- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag
3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000}
2019-08-12 21
36.609 INFO 21444 --- [nio-8000-exec-6] d.s.redis.delay.container.DelayBucket : 添加延迟任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
根据设置 10 秒钟之后任务会被添加至 ReadyQueue 中
2019-08-12 21
46.744 INFO 21444 --- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue : 执行队列添加任务:DelayJob(jodId=3, delayDate=1565616106609, topic=test)
获得任务 这时候我们请求 localhost:8000/delay/pop
这个时候任务被响应,修改状态的同时设置其超时时间,然后放置在 DelayBucket 中
2019-08-09 19
02.342 INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue : 执行队列取出任务:{"delayDate":1565321728704,"jodId":1,"topic":"测试"}
2019-08-09 19
02.364 INFO 58456 --- [nio-8000-exec-3] d.samples.redis.delay.container.JobPool : 任务池添加任务:{"delayTime":10000,"id":1,"message":"延迟10秒,超时30秒","retryCount":0,"status":"RESERVED","topic":"测试","ttrTime":30000}
2019-08-09 19
02.384 INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.DelayBucket : 添加延迟任务:{"delayDate":1565321792364,"jodId":1,"topic":"测试"}
按照设计在 30 秒后,任务假如没有被消费将会重新放置在 ReadyQueue 中
2019-08-12 21
48.239 INFO 21444 --- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue : 执行队列取出任务:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
2019-08-12 21
48.261 INFO 21444 --- [nio-8000-exec-7] d.samples.redis.delay.container.JobPool : 任务池添加任务:{"delayTime":10000,"id":3,"message":"tag
3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":10000}
任务的删除 / 消费
现在我们请求:localhost:8000/delay/delete
此时在 Job pool 中此任务将会被移除,此时元数据已经不存在,但任务还在 DelayBucket 中循环,然而在循环中当检测到元数据已经不存的话此延时任务会被移除。
2019-08-12 21
54.880 INFO 21444 --- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool : 任务池移除任务:3
2019-08-12 21
59.104 INFO 21444 --- [pool-1-thread-5] d.s.redis.delay.handler.DelayJobHandler : 移除不存在任务:{"delayDate":1565616118261,"jodId":3,"topic":"test"}
控制台打印结果:
评论( 0 )