延時佇列
- RabbitMq延時佇列實際應用場景
- 基于死信佇列的延時佇列
- 基于延時交換機的延時佇列
- 延時佇列圖解
- SpringBoot偽代碼
- 配置類
- 生產者
- 訊息的TTL和佇列的TTL
- 總結
RabbitMq延時佇列實際應用場景
比如,boss讓你開發一個30分鐘客戶不付款就取消訂單的場景;
如果在促銷活動期間,肯定會有大量的未付款的訂單資料,如果用輪訓,或者是redis失效key來作為處理方案,肯定會被CTO罵死;因為,如果高頻次輪訓,雖然能保證比較及時的取消訂單,但是無疑給系統造成了很大壓力,如果用redis失效key來做,那么redis也會承受很高的壓力,此時,就需要mq這樣的削峰填谷、異步處理中間件幫忙處理此種場景,
將訂單資料放入延時佇列,到達過期時間,佇列資料發送給消費者,進行取消訂單操作,即保證了服務器的穩定性,也能保證客戶過期訂單被較為及時的處理掉,
基于死信佇列的延時佇列

如圖所示,先宣告不同過期時間(TTL)的業務佇列, 過期時間到了,業務佇列將資料發送給死信佇列,消費者就可以通過過期時間,來延時處理業務資料;
但是這種方案的缺點是,不夠靈活,如果業務每次的過期時間都是不同的,并且需要是自定義過期時間的,那么久需要新建更多不同過期時間的業務佇列,所以基于死信佇列的延時佇列局限性還是非常大的,
RabbitMq為了解決這個問題,提供了專門的延時交換機
基于延時交換機的延時佇列
要想試用此種延時佇列,需要到RabbitMq官網下載插件;
https://www.rabbitmq.com/community-plugins.html
下載rabbitmq_delayed_message_exchange插件,然后解壓放置到RabbitMQ的插件目錄,進入
RabbitMQ的安裝目錄下的plgins目錄,執行下面命令讓該插件生效,并且重啟RabbitMQ
# 安裝插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 重啟RabbitMq
rabbitmq-server restart
延時佇列圖解
在我們自定義的delay交換機中,這是一種新的交換型別,該型別訊息支持延遲投遞機制訊息傳遞后并不會立即投遞到目標佇列中,而是存盤在mnesia(一個分布式資料系統)表中,當達到投遞時間時,才投遞到目標佇列中,

SpringBoot偽代碼
配置類
/**
* 配置類
*/
@Configuration
public class RabbitConfig {
@Bean
public Queue delayedQueue() {
return new Queue("delayed.queue", true, false);
}
/**
* 自定義交換機,
*/
@Bean
public CustomExchange delayedExchange(){
Map<String, Object> args = newHashMap<>();
//自定義交換機的型別
args.put("x-delayed-type", "direct");
return new CustomExchange("delayed.exchange", "x-delayed-message", true, false, args);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue")Queue queue,
@Qualifier("delayedExchange") CustomExchange delayedExchange){
return BindingBuilder.bind(queue)
.to(delayedExchange)
.with("delayed.routingkey")
.noargs();
}
}
生產者
@GetMapping("sendDelayMsg")
public void sendMsg(@RequestParam String message, @RequestParam Integer delayTime) {
rabbitTemplate.convertAndSend("delayed.exchange", "delayed.routingkey", message,
correlationData ->{
correlationData.getMessageProperties()
.setDelay(delayTime);
return correlationData;
});
log.info("當前時間:{},發送一條延遲{}毫秒的資訊給佇列delayed.queue,訊息內容:{}",
new Date(),
delayTime,
message);
}
消費者無特殊配置,此處省略;
訊息的TTL和佇列的TTL
佇列TTL:
如果設定了佇列的TTL屬性,那么一旦訊息過期,就會被佇列丟棄(如果配置了死信佇列被丟到死信佇列中),
訊息TTL:
訊息即使過期,也不一定會被馬上丟棄,因為訊息是否過期是在即將投遞到消費者之前判定的,如果當前佇列有嚴重的訊息積壓情況,則已過期的訊息也許還能存活較長時間;另外,還需要注意的一點是,如果不設定TTL,表示訊息永遠不會過期,如果將TTL設定為0,則表示除非此時可以直接投遞該訊息到消費者,否則該訊息將會被丟棄,
總結
當然,技術服務與業務;延時佇列還有很多其它選擇,比如利用Java的DelayQueue,利用Redis的zset,利用Quartz或者利用kafka的時間輪,這些方式各有特點,看需要適用的場景
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/350858.html
標籤:其他
上一篇:Hadoop資料倉庫之資料治理
下一篇:分布式中間件之Kafka
