目錄
1、什么是死信交換機
2、TTL
2.1 Demo
1、準備接收超時死信的死信交換機
2、宣告一個佇列,并且指定TTL
3、發送訊息
4、小結
3、延遲佇列
3.1 安裝DelayExchange插件
1、上傳插件
2、安裝插件
3.2 DelayExchange原理
3.3 使用DelayExchange
3.4 發送訊息
3.5 小結
4、專案Demo地址
本篇博客帶大家研究MQ的延遲訊息問題,在此之前先了解一下死信交換機,
1、什么是死信交換機
首先我們要知道什么是死信?
當一個佇列中的訊息滿足下列情況之一時,可以成為死信(dead letter):
消費者使用basic.reject或 basic.nack宣告消費失敗,并且訊息的requeue引數設定為false,
訊息是一個過期訊息,超時無人消費,
要投遞的佇列訊息堆積滿了,最早的訊息可能成為死信,
一般呢?一旦訊息變成死信是會被我們丟棄的,但是有了死信交換機就不一樣了,
如果這個包含死信的佇列配置了
dead-letter-exchange屬性,指定了一個交換機,那么佇列中的死信就會投遞到這個交換機中,而這個交換機稱為死信交換機(Dead Letter Exchange,簡稱DLX),
其實呢,所謂的死信交換機就是一個普通交換機,只不過是某個佇列用dead-letter-exchange這個屬性系結到一起了,當這個佇列出現了死信,就會丟到我們這個死信交換機里了,就有點像垃圾桶一樣的了,
如圖,一個訊息被消費者拒絕了,變成了死信:
因為simple.queue系結了死信交換機 dl.direct,因此死信會投遞給這個交換機:
如果這個死信交換機也系結了一個佇列,則訊息最侄訓進入這個存放死信的佇列:

另外,佇列將死信投遞給死信交換機時,必須知道兩個資訊:
死信交換機名稱
死信交換機與死信佇列系結的RoutingKey
這樣才能確保投遞的訊息能到達死信交換機,并且正確的路由到死信佇列,
小結:
什么樣的訊息會成為死信?
訊息被消費者reject或者回傳nack,
訊息超時未消費,
佇列滿了,
2、TTL
TTL,也就是Time-To-Live,如果一個佇列中的訊息TTL結束仍未消費,則會變為死信,TTL超時分為兩種情況:
訊息所在的佇列設定了超時時間
訊息本身設定了超時時間

2.1 Demo
1、準備接收超時死信的死信交換機
在consumer服務的SpringRabbitListener中,定義一個新的消費者,并且宣告 死信交換機、死信佇列:
/**
* @description:注解方式宣告死信交換機、死信佇列
* @author: jie
* @time: 2022/3/5 10:30
*/
@RabbitListener(bindings = @QueueBinding(
//佇列,持久化為true
value = @Queue(name = "dl.ttl.queue", durable = "true"),
//交換機
exchange = @Exchange(name = "dl.ttl.direct"),
//Routing Key
key = "ttl"
))
public void listenDlQueue(String msg){
log.info("接收到 dl.ttl.queue的延遲訊息:{}", msg);
}
2、宣告一個佇列,并且指定TTL
package com.jie.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TTLMessageConfig {
/**
* @description:交換機
* @author: jie
* @time: 2022/3/5 10:36
*/
@Bean
public DirectExchange ttlDirectExchange(){
return new DirectExchange("ttl.direct");
}
/**
* @description:佇列
* @author: jie
* @time: 2022/3/5 10:38
*/
@Bean
public Queue ttlQueue(){
return QueueBuilder
//指定佇列名稱,并持久化
.durable("ttl.queue")
//設定佇列的超時時間,10秒
.ttl(10000)
//指定死信交換機
.deadLetterExchange("dl.ttl.direct")
//設定RoutingKey
.deadLetterRoutingKey("dl")
.build();
}
/**
* @description:將佇列和交換機系結
* @author: jie
* @time: 2022/3/5 10:41
*/
@Bean
public Binding ttlBinding(){
return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");
}
}
3、發送訊息
3.1 不指定TTL,
@Test
public void testTTLQueue() {
// 創建訊息
String message = "hello, ttl queue";
// 訊息ID,需要封裝到CorrelationData中
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
// 發送訊息
rabbitTemplate.convertAndSend("ttl.direct", "ttl", message, correlationData);
// 記錄日志
log.error("發送訊息成功");
}
訊息發送時間:

接受訊息的時間:![]()
因為佇列的TTL值是10000ms,也就是10秒,可以看到訊息發送與接收之間的時差剛好是10秒,
我們這個是基于佇列去設定延遲時間,我們給佇列設定了10秒鐘,我們也可以給訊息設定延遲,
3.2 指定TTL
public void testTTLMessage() {
//準備訊息
Message message = MessageBuilder
.withBody("hell,TTL".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
//設定延遲時間
.setExpiration("5000")
.build();
// 2.發送訊息
rabbitTemplate.convertAndSend("ttl.direct","ttl",message);
//3、記錄日志
log.info("訊息已經成功發送!");
}
這里呢?我們的佇列是10秒,而訊息是5秒,到底是哪個優先呢?還是15秒呢?
訊息發送時間:![]()
訊息接受時間:![]()
這里我們可以看出,當兩個都有延遲的時候,它會以較短的時間為準,
4、小結
訊息超時的兩種方式是?
給佇列設定ttl屬性,進入佇列后超過ttl時間的訊息變為死信
給訊息設定ttl屬性,佇列接收到訊息超過ttl時間后變為死信
如何實作發送一個訊息20秒后消費者才收到訊息?
給訊息的目標佇列指定死信交換機
將消費者監聽的佇列系結到死信交換機
發送訊息時給訊息設定超時時間為20秒
3、延遲佇列
概念:
利用TTL結合死信交換機,我們實作了訊息發出后,消費者延遲收到訊息的效果,這種訊息模式就稱為延遲佇列(Delay Queue)模式,
延遲佇列的使用場景包括:
1、延遲發送短信,
2、用戶下單,如果用戶在15 分鐘內未支付,則自動取消,
3、預約作業會議,20分鐘后自動通知所有參會人員,
因為延遲佇列的需求非常多,所以RabbitMQ的官方也推出了一個插件,原生支持延遲佇列效果,
這個插件就是DelayExchange插件,參考RabbitMQ的插件串列頁面:Community Plugins — RabbitMQ
使用方式可以參考官網地址:Scheduling Messages with RabbitMQ | RabbitMQ - Blog
3.1 安裝DelayExchange插件
官方的安裝指南地址為:Scheduling Messages with RabbitMQ | RabbitMQ - Blog
上述檔案是基于linux原生安裝RabbitMQ,然后安裝插件,
因為我是基于Docker安裝RabbitMQ,所以下面我會講解基于Docker來安裝RabbitMQ插件,
RabbitMQ有一個官方的插件社區,地址為:Community Plugins — RabbitMQ
其中包含各種各樣的插件,包括我們要使用的DelayExchange插件:

下載好后,就會獲得一個ez檔案,
![]()
1、上傳插件
因為我們是基于Docker安裝,所以需要先查看RabbitMQ的插件目錄對應的資料卷,
我之前設定的RabbitMQ的資料卷名稱為mq-plugins,所以我使用下面命令查看資料卷:
docker volume inspect mq-plugins
可以得到下面結果:
接下來,將插件上傳到這個目錄即可:
2、安裝插件
最后就是安裝了,需要進入MQ容器內部來執行安裝,我的容器名為mq,所以執行下面命令:
docker exec -it mq bash
執行時,請將其中的 -it 后面的mq替換為你自己的容器名.
進入容器內部后,執行下面命令開啟插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

3.2 DelayExchange原理
DelayExchange的本質還是官方的三種交換機,只是添加了延遲功能,因此使用時只需要宣告一個交換機,交換機的型別可以是任意型別,然后設定delayed屬性為true即可,
接收訊息,
判斷訊息是否具備x-delay屬性,
如果有x-delay屬性,說明是延遲訊息,持久化到硬碟,讀取x-delay值,作為延遲時間,
回傳routing not found結果給訊息發送者,
x-delay時間到期后,重新投遞訊息到指定佇列,
3.3 使用DelayExchange
1、基于注解的方式(推薦)
2、基于Bean的方式
3.4 發送訊息
發送訊息時,一定要攜帶x-delay屬性,指定延遲的時間:
發送訊息時間:![]()
接受訊息時間:![]()
相差五秒,說明是有用的,
3.5 小結
延遲佇列插件的使用步驟包括哪些?
?宣告一個交換機,添加delayed屬性為true
?發送訊息時,添加x-delay頭,值為超時時間
4、專案Demo地址
無聊的英杰/RabbitMQ延遲訊息問題
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/438082.html
標籤:其他
