來源:blog.csdn.net/qq330983778/article/details/99341671
業務流程
首先我們分析下這個流程
- 用戶提交任務,首先將任務推送至延遲佇列中,
- 延遲佇列接收到任務后,首先將任務推送至job pool中,然后計算其執行時間,
- 然后生成延遲任務(僅僅包含任務id)放入某個桶中
- 時間組件時刻輪詢各個桶,當時間到達的時候從job pool中獲得任務元資訊,
- 監測任務的合法性如果已經洗掉則pass,繼續輪詢,如果任務合法則再次計算時間
- 如果合法則計算時間,如果時間合法:根據topic將任務放入對應的ready queue,然后從bucket中移除,如果時間不合法,則重新計算時間再次放入bucket,并移除之前的bucket中的內容
- 消費端輪詢對應topic的ready queue,獲取job后做自己的業務邏輯,與此同時,服務端將已經被消費端獲取的job按照其設定的TTR,重新計算執行時間,并將其放入bucket,
- 完成消費后,發送finish訊息,服務端根據job id洗掉對應資訊,

物件
我們現在可以了解到中間存在的幾個組件
- 延遲佇列,為Redis延遲佇列,實作訊息傳遞
- Job pool 任務池保存job元資訊,根據文章描述使用K/V的資料結構,key為ID,value為job
- Delay Bucket 用來保存業務的延遲任務,文章中描述使用輪詢方式放入某一個Bucket可以知道其并沒有使用topic來區分,個人這里默認使用順序插入
- Timer 時間組件,負責掃描各個Bucket,根據文章描述存在多個Timer,但是同一個Timer同一時間只能掃描一個Bucket
- Ready Queue 負責存放需要被完成的任務,但是根據描述根據Topic的不同存在多個Ready Queue,
其中Timer負責輪詢,Job pool、Delay Bucket、Ready Queue都是不同職責的集合,
任務狀態
- ready:可執行狀態,
- delay:不可執行狀態,等待時鐘周期,
- reserved:已被消費者讀取,但沒有完成消費,
- deleted:已被消費完成或者已被洗掉,
對外提供的介面

額外的內容
- 首先根據狀態狀態描述,finish和delete操作都是將任務設定成deleted狀態,
- 根據文章描述的操作,在執行finish或者delete的操作的時候任務已經從元資料中移除,此時deleted狀態可能只存在極短時間,所以實際實作中就直接洗掉了,
- 文章中并沒有說明回應超時后如何處理,所以個人現在將其重新投入了待處理佇列,
- 文章中因為使用了集群,所以使用redis的setnx鎖來保證多個時間回圈處理多個桶的時候不會出現重復回圈,這里因為是簡單的實作,所以就很簡單的每個桶設定一個時間佇列處理,也是為了方便簡單處理,關于分布式鎖可以看我之前的文章里面有描述,
實作
現在我們根據設計內容完成設計,這一塊設計我們分四步完成
任務及相關物件
目前需要兩個物件,一個是任務物件(job)一個負責保存任務參考的物件(delay job),Spring Boot 基礎就不介紹了,推薦下這個實戰教程:
https://github.com/javastacks/spring-boot-best-practice
任務物件
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Job implements Serializable {
/**
* 延遲任務的唯一標識,用于檢索任務
*/
@JsonSerialize(using = ToStringSerializer.class)
private Long id;
/**
* 任務型別(具體業務型別)
*/
private String topic;
/**
* 任務的延遲時間
*/
private long delayTime;
/**
* 任務的執行超時時間
*/
private long ttrTime;
/**
* 任務具體的訊息內容,用于處理具體業務邏輯用
*/
private String message;
/**
* 重試次數
*/
private int retryCount;
/**
* 任務狀態
*/
private JobStatus status;
}
任務參考物件
@Data
@AllArgsConstructor
public class DelayJob implements Serializable {
/**
* 延遲任務的唯一標識
*/
private long jodId;
/**
* 任務的執行時間
*/
private long delayDate;
/**
* 任務型別(具體業務型別)
*/
private String topic;
public DelayJob(Job job) {
this.jodId = job.getId();
this.delayDate = System.currentTimeMillis() + job.getDelayTime();
this.topic = job.getTopic();
}
public DelayJob(Object value, Double score) {
this.jodId = Long.parseLong(String.valueOf(value));
this.delayDate = System.currentTimeMillis() + score.longValue();
}
}
容器
目前我們需要完成三個容器的創建,Job任務池、延遲任務容器、待完成任務容器
job任務池,為普通的K/V結構,提供基礎的操作
@Component
@Slf4j
public class JobPool {
@Autowired
private RedisTemplate redisTemplate;
private String NAME = "job.pool";
private BoundHashOperations getPool () {
BoundHashOperations ops = redisTemplate.boundHashOps(NAME);
return ops;
}
/**
* 添加任務
* @param job
*/
public void addJob (Job job) {
log.info("任務池添加任務:{}", JSON.toJSONString(job));
getPool().put(job.getId(),job);
return ;
}
/**
* 獲得任務
* @param jobId
* @return
*/
public Job getJob(Long jobId) {
Object o = getPool().get(jobId);
if (o instanceof Job) {
return (Job) o;
}
return null;
}
/**
* 移除任務
* @param jobId
*/
public void removeDelayJob (Long jobId) {
log.info("任務池移除任務:{}",jobId);
// 移除任務
getPool().delete(jobId);
}
}
延遲任務,使用可排序的ZSet保存資料,提供取出最小值等操作
@Slf4j
@Component
public class DelayBucket {
@Autowired
private RedisTemplate redisTemplate;
private static AtomicInteger index = new AtomicInteger(0);
@Value("${thread.size}")
private int bucketsSize;
private List <String> bucketNames = new ArrayList <>();
@Bean
public List <String> createBuckets() {
for (int i = 0; i < bucketsSize; i++) {
bucketNames.add("bucket" + i);
}
return bucketNames;
}
/**
* 獲得桶的名稱
* @return
*/
private String getThisBucketName() {
int thisIndex = index.addAndGet(1);
int i1 = thisIndex % bucketsSize;
return bucketNames.get(i1);
}
/**
* 獲得桶集合
* @param bucketName
* @return
*/
private BoundZSetOperations getBucket(String bucketName) {
return redisTemplate.boundZSetOps(bucketName);
}
/**
* 放入延時任務
* @param job
*/
public void addDelayJob(DelayJob job) {
log.info("添加延遲任務:{}", JSON.toJSONString(job));
String thisBucketName = getThisBucketName();
BoundZSetOperations bucket = getBucket(thisBucketName);
bucket.add(job,job.getDelayDate());
}
/**
* 獲得最新的延期任務
* @return
*/
public DelayJob getFirstDelayTime(Integer index) {
String name = bucketNames.get(index);
BoundZSetOperations bucket = getBucket(name);
Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 1);
if (CollectionUtils.isEmpty(set)) {
return null;
}
ZSetOperations.TypedTuple typedTuple = (ZSetOperations.TypedTuple) set.toArray()[0];
Object value = https://www.cnblogs.com/javastack/p/typedTuple.getValue();
if (value instanceof DelayJob) {
return (DelayJob) value;
}
return null;
}
/**
* 移除延時任務
* @param index
* @param delayJob
*/
public void removeDelayTime(Integer index,DelayJob delayJob) {
String name = bucketNames.get(index);
BoundZSetOperations bucket = getBucket(name);
bucket.remove(delayJob);
}
}
待完成任務,內部使用topic進行細分,每個topic對應一個list集合
@Component
@Slf4j
public class ReadyQueue {
@Autowired
private RedisTemplate redisTemplate;
private String NAME = "process.queue";
private String getKey(String topic) {
return NAME + topic;
}
/**
* 獲得佇列
* @param topic
* @return
*/
private BoundListOperations getQueue (String topic) {
BoundListOperations ops = redisTemplate.boundListOps(getKey(topic));
return ops;
}
/**
* 設定任務
* @param delayJob
*/
public void pushJob(DelayJob delayJob) {
log.info("執行佇列添加任務:{}",delayJob);
BoundListOperations listOperations = getQueue(delayJob.getTopic());
listOperations.leftPush(delayJob);
}
/**
* 移除并獲得任務
* @param topic
* @return
*/
public DelayJob popJob(String topic) {
BoundListOperations listOperations = getQueue(topic);
Object o = listOperations.leftPop();
if (o instanceof DelayJob) {
log.info("執行佇列取出任務:{}", JSON.toJSONString((DelayJob) o));
return (DelayJob) o;
}
return null;
}
}
輪詢處理
設定了執行緒池為每個bucket設定一個輪詢操作
@Component
public class DelayTimer implements ApplicationListener <ContextRefreshedEvent> {
@Autowired
private DelayBucket delayBucket;
@Autowired
private JobPool jobPool;
@Autowired
private ReadyQueue readyQueue;
@Value("${thread.size}")
private int length;
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
ExecutorService executorService = new ThreadPoolExecutor(
length,
length,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue <Runnable>());
for (int i = 0; i < length; i++) {
executorService.execute(
new DelayJobHandler(
delayBucket,
jobPool,
readyQueue,
i));
}
}
}
測驗請求
/**
* 測驗用請求
* @author daify
**/
@RestController
@RequestMapping("delay")
public class DelayController {
@Autowired
private JobService jobService;
/**
* 添加
* @param request
* @return
*/
@RequestMapping(value = "https://www.cnblogs.com/javastack/p/add",method = RequestMethod.POST)
public String addDefJob(Job request) {
DelayJob delayJob = jobService.addDefJob(request);
return JSON.toJSONString(delayJob);
}
/**
* 獲取
* @return
*/
@RequestMapping(value = "https://www.cnblogs.com/javastack/p/pop",method = RequestMethod.GET)
public String getProcessJob(String topic) {
Job process = jobService.getProcessJob(topic);
return JSON.toJSONString(process);
}
/**
* 完成一個執行的任務
* @param jobId
* @return
*/
@RequestMapping(value = "https://www.cnblogs.com/javastack/p/finish",method = RequestMethod.DELETE)
public String finishJob(Long jobId) {
jobService.finishJob(jobId);
return "success";
}
@RequestMapping(value = "https://www.cnblogs.com/javastack/p/delete",method = RequestMethod.DELETE)
public String deleteJob(Long jobId) {
jobService.deleteJob(jobId);
return "success";
}
}
測驗
添加延遲任務
通過postman請求:localhost:8000/delay/add

此時這條延時任務被添加進了執行緒池中
2019-08-12 21:21:36.589 INFO 21444 --- [nio-8000-exec-6] d.samples.redis.delay.container.JobPool : 任務池添加任務:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000}
2019-08-12 21:21:36.609 INFO 21444 --- [nio-8000-exec-6] d.s.redis.delay.container.DelayBucket : 添加延遲任務:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
根據設定10秒鐘之后任務會被添加至ReadyQueue中
2019-08-12 21:21:46.744 INFO 21444 --- [pool-1-thread-4] d.s.redis.delay.container.ReadyQueue : 執行佇列添加任務:DelayJob(jodId=3, delayDate=1565616106609, topic=test)
獲得任務
這時候我們請求localhost:8000/delay/pop
這個時候任務被回應,修改狀態的同時設定其超時時間,然后放置在DelayBucket中
2019-08-09 19:36:02.342 INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.ReadyQueue : 執行佇列取出任務:{"delayDate":1565321728704,"jodId":1,"topic":"測驗"}
2019-08-09 19:36:02.364 INFO 58456 --- [nio-8000-exec-3] d.samples.redis.delay.container.JobPool : 任務池添加任務:{"delayTime":10000,"id":1,"message":"延遲10秒,超時30秒","retryCount":0,"status":"RESERVED","topic":"測驗","ttrTime":30000}
2019-08-09 19:36:02.384 INFO 58456 --- [nio-8000-exec-3] d.s.redis.delay.container.DelayBucket : 添加延遲任務:{"delayDate":1565321792364,"jodId":1,"topic":"測驗"}
按照設計在30秒后,任務假如沒有被消費將會重新放置在ReadyQueue中
2019-08-12 21:21:48.239 INFO 21444 --- [nio-8000-exec-7] d.s.redis.delay.container.ReadyQueue : 執行佇列取出任務:{"delayDate":1565616106609,"jodId":3,"topic":"test"}
2019-08-12 21:21:48.261 INFO 21444 --- [nio-8000-exec-7] d.samples.redis.delay.container.JobPool : 任務池添加任務:{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"RESERVED","topic":"test","ttrTime":10000}
任務的洗掉/消費
現在我們請求:localhost:8000/delay/delete

此時在Job pool中此任務將會被移除,此時元資料已經不存在,但任務還在DelayBucket中回圈,然而在回圈中當檢測到元資料已經不存的話此延時任務會被移除,
2019-08-12 21:21:54.880 INFO 21444 --- [nio-8000-exec-8] d.samples.redis.delay.container.JobPool : 任務池移除任務:3
2019-08-12 21:21:59.104 INFO 21444 --- [pool-1-thread-5] d.s.redis.delay.handler.DelayJobHandler : 移除不存在任務:{"delayDate":1565616118261,"jodId":3,"topic":"test"}
近期熱文推薦:
1.1,000+ 道 Java面試題及答案整理(2021最新版)
2.別在再滿屏的 if/ else 了,試試策略模式,真香!!
3.臥槽!Java 中的 xx ≠ null 是什么新語法?
4.Spring Boot 2.5 重磅發布,黑暗模式太炸了!
5.《Java開發手冊(嵩山版)》最新發布,速速下載!
覺得不錯,別忘了隨手點贊+轉發哦!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/302614.html
標籤:Java
