RabbitMQ死信佇列和延時佇列
? RabbitMQ本身是具有死信佇列和死信交換機屬性的,延時佇列 可以通過死信佇列和死信交換機來實作,在電商行業中,通常都會有一個需求:訂單超時未支付,自動取消該訂單,那么通過RabbitMQ實作的延時佇列就是實作該需求的一種方式,
1、死信佇列
? 死信顧名思義,就是死掉的資訊,英文是Dead Letter,死信交換機(Dead-Letter-Exchange)和普通交換機沒有區別,都是可以接受資訊并轉發到與之系結并能路由到的佇列,區別在于死信交換機是轉發死信的,而和該死信交換機系結的佇列就是死信佇列,說的再通俗一點,死信交換機和死信佇列其實都只是普通的交換機和佇列,只不過接受、轉發的資訊是死信,其他操作并沒有區別,
1.1 死信的條件
? 稱為死信的資訊,需要如下幾個條件:
- 訊息被消費者拒絕(通過basic.reject 或者 back.nack),并且設定 requeue=false,
- 訊息過期,因為佇列設定了TTL(Time To Live)時間,
- 訊息被丟棄,因為超過了佇列的長度限制,
這時以上幾個條件的方式基本上都有2種:1)rabbitmqctl命令列設定policy(策略)引數; 2)硬編碼,也就是在代碼中設定,
1.2 消費者拒絕
1.2.1 編碼方式
? 硬編碼就是在代碼中撰寫業務佇列宣告時對應的引數:
x-dead-letter-exchange:死信交換機,必須x-dead-letter-routing-key:死信交換機轉發到死信佇列的路由鍵,可選
Producer:
// producer
public class RejectProducer {
// 定義業務交換機
public static final String ORDER_X = "order.exchange";
// main方法
public static void main(String[] args) throws IOException {
// 獲取連接
Connection connection = RabbitMQUtil.getConnection();
// 獲取通道
Channel channel = RabbitMQUtil.getChannel(connection);
// 發訊息,路由鍵分別為:d.order.123、d.other.123、d
// 1)d.order.123:業務consumer會收到訊息,order死信佇列也會收到訊息
// 2)d.other.123:業務consumer會收到訊息,other死信佇列也會收到訊息
// 3)d:只有業務consumer會收到訊息
channel.basicPublish(ORDER_X, "d.order.123", null, "hello my friend".getBytes(StandardCharsets.UTF_8));
// 關閉資源
RabbitMQUtil.close(channel, connection);
}
}
Consumer:
/**
* 死信佇列條件:消費者拒絕
* 1)basic.reject(tag, requeue) 表示拒絕接受訊息,第二個引數表示是否重新入隊,如果為true,可能造成死回圈,需要注意
* 2)basic.nack(tag, multi, requeue) multi可以同時拒絕多條,requeue和reject相同,nack表示未ack的資料,會有單獨的標識
*/
public class RejectConsumer {
public static final String DEAD_LETTER_X = "dead.letter.exchange";
public static final String DEAD_LETTER_Q_1 = "dead.letter.queue.order";
public static final String DEAD_LETTER_Q_2 = "dead.letter.queue.other";
public static final String ORDER_X = "order.exchange";
public static final String ORDER_Q = "order.queue";
public static void main(String[] args) throws IOException {
// reject和nack方式
rejectAndNack();
}
public static void rejectAndNack() throws IOException {
// 獲取連接
Connection connection = RabbitMQUtil.getConnection();
// 獲取通道
Channel channel = RabbitMQUtil.getChannel(connection);
// 宣告死信佇列和死信交換機
declareOrderDLX(channel);
declareOtherDLX(channel);
// 宣告業務交換機
channel.exchangeDeclare(ORDER_X, "topic", false, true, null);
// 設定業務引數
Map<String, Object> arguments = new HashMap<>();
// 設定死信交換機,一個死信交換機系結了兩個死信佇列,根據路由鍵來區分訊息的轉發
arguments.put("x-dead-letter-exchange", DEAD_LETTER_X);
// 設定死信佇列路由key:
// 如果這里設定了路由鍵,則publish過來的訊息再轉發到死信交換機時,以該路由鍵轉發到死信佇列,
// 如果未設定該引數,則按照publish時的路由鍵轉發到死信佇列
// arguments.put("x-dead-letter-routing-key", "d.other");
// 宣告業務佇列
channel.queueDeclare(ORDER_Q, false, false, true, arguments);
// 系結業務佇列和業務交換機
channel.queueBind(ORDER_Q, ORDER_X, "#");
// 監聽業務佇列訊息
channel.basicConsume(ORDER_Q, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("order consumer: " + new String(body, StandardCharsets.UTF_8));
// 拒絕
channel.basicReject(envelope.getDeliveryTag(), false);
System.out.println("order properties: " + envelope.toString());
}
});
}
// 宣告order死信佇列
public static void declareOrderDLX(Channel channel) throws IOException {
// 宣告交換機,只是為了模擬,設定的duriable=false,autodelete=true
channel.exchangeDeclare(DEAD_LETTER_X, "topic", false, true, null);
// 宣告佇列
channel.queueDeclare(DEAD_LETTER_Q_1, false, false, true, null);
// 系結交換機和佇列,只接受 *.order.# 規則的訊息
channel.queueBind(DEAD_LETTER_Q_1, DEAD_LETTER_X, "*.order.#");
// 監聽訊息
channel.basicConsume(DEAD_LETTER_Q_1, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("dead letter consumer 【order】: " + new String(body, StandardCharsets.UTF_8));
System.out.println("dead letter properties 【order】:" + envelope.toString());
}
});
}
// 宣告other死信佇列
public static void declareOtherDLX(Channel channel) throws IOException {
// 宣告交換機
channel.exchangeDeclare(DEAD_LETTER_X, "topic", false, true, null);
// 宣告佇列
channel.queueDeclare(DEAD_LETTER_Q_2, false, false, true, null);
// 系結交換機和佇列,只接受 *.other.# 規則的訊息
channel.queueBind(DEAD_LETTER_Q_2, DEAD_LETTER_X, "*.other.#");
// 監聽訊息
channel.basicConsume(DEAD_LETTER_Q_2, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("dead letter consumer 【other】: " + new String(body, StandardCharsets.UTF_8));
System.out.println("dead letter properties 【other】:" + envelope.toString());
}
});
}
}
總結:
1) consumer方使用basic.reject或者basic.nack都會將訊息轉發到匹配的死信佇列(requeue=false),區別在于basic.reject相比basic.nack少一個引數mutil,表示是否批量back,而且nack的數量可以再web端看到,
2) 使用x-letter-dead-exchange設定死信交換機,這個是必須設定的,x-letter-dead-routing-key設定死信交換機轉發到死信佇列的路由鍵,相當于重新定義了publish的路由鍵,該引數可選,可以根據具體業務判斷是否需要設定,
1.1.2 策略方式
策略方式需要在rabbitmq的服務器上執行如下命令:
rabbitmqctl set_policy {策略名稱} ".*" '{"dead-letter-exchange":"my-dlx"}' --apply-to queues
例如:
rabbitmqctl set_policy dlx "dead.*" '{"dead-letter-exchange":"test-dead-letter-exchange"}' --apply-to queues
表示給所有的 以dead.開頭 的佇列設定死信交換機 test-dead-letter-exchange ,策略名字為 dlx,然后我們在rabbitmq的web界面新建一個名字為test-dead-letter-exchange的exchange,并且新建名為dead.order.queue和dead.other.queue的queue,系結test-dead-letter-exchange,路由鍵分別為:order.#和other.#,

說明:由于我們要模擬的死信轉發到死信佇列的情況,所以這兩個新建的queue都設定了ttl為10000ms,也就是10s,

我們看到訊息在10s之后成功到了dead.order.queue,說明我們的配置生效,這里我將這個程序畫個圖:

1.3 設定過期時間
檔案:https://www.rabbitmq.com/ttl.html
? 我們可以給 佇列 或者 訊息 設定過期時間,佇列 的過期時間,類似于 autoDelete 引數,表示佇列在指定時長內如果沒有使用的話會被洗掉,佇列沒有使用者,佇列最近未重新宣告(重新宣告續訂租約),以及basic.get至少在過期期間未被呼叫,例如,這可以用于RPC樣式的回復佇列,其中可以創建許多可能永遠不會被耗盡的佇列,訊息 的過期時間我們可以在發訊息時設定在訊息體,也可以給這個佇列設定一個訊息過期時間,其實就是兩種方式,一種設定在佇列上,另一種是設定在訊息上,
1.3.1 編碼方式
1)設定訊息體過期時間
// 設定訊息屬性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("2000") // 設定訊息過期時間,2s,既是所在佇列沒有過期時間也可以
.build();
// 發訊息
channel.basicPublish("hello", "order.123", properties, "hello my friend".getBytes(StandardCharsets.UTF_8));
2)設定佇列的訊息過期時間
// 設定引數
Map<String, Object> arguments = new HashMap<>();
// 設定佇列的過期時間,5s
arguments.put("x-message-ttl", 5000);
// 設定死信佇列
arguments.put("x-dead-letter-exchange", DEAD_LETTER_X);
// 宣告佇列
channel.queueDeclare(ORDER_Q, false, false, true, arguments);
我們可以看到,創建好佇列之后會有個TTL標識,x-message-ttl標識該佇列設定的訊息過期時間為5s,

3)設定佇列的過期時間
// 設定佇列引數
Map<String, Object> arguments = new HashMap<>();
// 設定佇列中訊息的過期時間,5s
arguments.put("x-message-ttl", 5000);
// 設定佇列的過期時間,10s如果佇列未使用(未操作),則洗掉佇列
arguments.put("x-expires", 10000);
// 宣告佇列
channel.queueDeclare(ORDER_Q, true, false, false, arguments);
注意:無論佇列中是否存在訊息,如果沒有操作佇列,就會被自動洗掉,
1.3.2 策略方式
1)設定佇列的訊息過期時間
rabbitmqctl set_policy --vhost /adu TTL ".*" '{"message-ttl":60000}' --apply-to queues
表示在 /adu虛擬主機下增加一個名稱為 TTL 策略,設所有佇列 message-ttl訊息過期時間60s,
2)設定佇列的過期時間
rabbitmqctl set_policy --vhost /adu expiry ".*" '{"expires":1800000}' --apply-to queues
表示在/adu虛擬主機下增加一個名稱為 expiry 的策略,設定所有的佇列過期時間為180s,
1.4 超過佇列長度
? 默認情況下,佇列沒有長度限制(但是總歸是有硬碟和記憶體的限制的),我們可以顯示的設定佇列的長度,可以是訊息的數量限制,也可以是佇列總訊息內容的占用記憶體長度,或者兩種都設定,一個佇列的最大長度,可以使用 策略 或者 編碼 的方式進行設定,或者在創建佇列時web界面設定,如果 策略 方式和 編碼 方式都設定了,則 值更小的 會生效,
? 如果佇列設定了佇列長度限制,那么當佇列中的訊息達到最大長度時,默認的 溢位 規則為 丟棄最老的訊息(佇列頭部),我們可以改變這個規則,使用 overflow 引數來配置,overflow 可選值為 x-reject-publish或者x-reject-publish-dls ,兩者都表示拒絕接受新訊息,區別在于 reject-publish-dlx 也會導致死信拒絕訊息1,
此處有個疑問:reject-publish-dlx和reject-publish的區別問題,針對官網的翻譯,我覺得
reject-publish-dlx是與之系結的死信佇列不會收到訊息,reject-publish相反會收到訊息,但是做實驗的時候剛好和我理解的相反?熟悉的鐵子們回復說一下哈呀,
1.4.1 編碼方式
? 使用x-max-length和x-max-length-bytes引數設定,
// 設定引數
Map<String, Object> arguments = new HashMap<>();
// 設定佇列最大長度,5條訊息
arguments.put("x-max-length", 5);
// 佇列溢位的策略:drop-head(默認)、reject-publish、reject-publish-dlx
// arguments.put("x-overflow", "reject-publish-dlx");
arguments.put("x-overflow", "reject-publish");
// 設定死信佇列
arguments.put("x-dead-letter-exchange", DEAD_LETTER_X);
// 宣告佇列
channel.queueDeclare(ORDER_Q, false, false, true, arguments);
1.4.2 策略方式
rabbitmqctl set_policy --vhost /adu limit "^five_msg" '{"dead-letter-exchange":"test-dead-letter-exchange","max-length":5,"overflow":"reject-publish-dlx"}' --apply-to queues
表示在 /adu虛擬主機下增加一個名稱為 limit 策略,設所有以 five_msg 開頭的佇列 訊息最多為5個、訊息溢位策略為拒絕、設定死信交換機 ,
設定
max-length-bytes也是同樣的方式,
2、延時佇列
? 延時佇列,顧名思義就是存放延時訊息的佇列,也就是說消費者在一定的延時后才會收到訊息,典型的應用場景就是如上所述的訂單超時未支付自動取消,
2.1 借助死信佇列實作
? 其實在介紹完 死信佇列 之后,就能大概看出來如何使用 死信佇列 來實作延時佇列了,就是使用訊息的TTL 屬性,將過期的訊息轉發到死信佇列中,業務監聽死信佇列的訊息就行了,這種情況適合給佇列設定訊息過期時間的情況,就是佇列中所有的訊息都是同一個過期時間,到期按照順序轉發到死信佇列中,不會有問題,
? 如果訊息的過期時間是在發訊息的時候設定在訊息體上的,可能會出問題,比如按順序發送msg1和msg2兩條訊息,msg1的過期時間為5s,msg2的過期時間為2s,正常理解下,結果肯定是msg2先到死信佇列被消費,但是結果卻是兩條訊息都在5s時轉發到死信佇列被消費,其實比較好理解,因為佇列的特性就是 先進先出 ,即使msg2先到了過期時間,但是msg1在它之前阻塞,只有msg1被消費了,msg2才能到隊頭被消費, 我們畫個圖:

2.2 借助RabbitMQ插件實作
? rabbitmq提供了一個插件 rabbitmq_delayed_message_exchange 讓我們能夠實作 延遲佇列 的效果,同時能夠解決 通過死信佇列實作延遲佇列 出現的訊息阻塞問題,該插件從RabbitMQ的3.6.12開始支持,要確認當前自己的rabbitmq版本是否支持該插件,
2.2.1 下載插件
下載地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
? 下載該插件后,將 rabbitmq_delayed_message_exchange-3.9.0.ez 包放到 RabbitMQ安裝目錄的plugins 目錄下:

2.2.2 啟用插件
執行控制臺命令,重啟rabbitmq服務:
# 1.列出所有插件
rabbitmq-plugins list
# 2.啟用rabbitmq_delayed_message_exchange
rabbitmq-plubins enable rabbitmq_delayed_message_exchange
# 3.重啟服務(好像可以不用重啟)
systemctl restart rabbitmq-server.service
? 在此之后,web界面的 exchanges 便可以創建type為 x-delayed-message 的交換機,或者在代碼中宣告該型別的交換機,要是用其延時功能,需要在發訊息的時候加一個 header :x-delay=xxx ,表示延時xxx毫秒,
// 宣告延時交換機,type=x-delayed-message,x-delayed-type=direct|fanout|topic
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "topic"); // 相當于之前exchange的type
channel.exchangeDeclare(EXCHANGE_NAME, "x-delayed-message", false, true, args);
// 發送訊息,x-delay,值為過期時間
Map<String, Object> headers = new HashMap<>();
headers.put("x-delay", 5000); // 5s
AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
.headers(headers)
.build();
// 發送訊息
channel.basicPublish(EXCHANGE_NAME, "other.save", props, "i am 5s message".getBytes(StandardCharsets.UTF_8));

3、總結
? 上面我們提到了使用RabbitMQ實作延時佇列功的方案:1)借助本事的死信佇列實作,監聽死信佇列;2)借助插件實作,優缺點如下:
- 死信佇列實作方式,需要在佇列上設定訊息過期時間,不靈活;需要再多用一個死信佇列,占用空間;rabbitmq本事自帶死信佇列,實作方便,
- 插件實作方式,需要下載安裝插件,要考慮版本兼容性;代碼邏輯簡單,容易上手,
? 回到我們開頭的需求:訂單支付超時自動取消,這個功能主要就是需要一個延時佇列,那通過rabbitmq實作延時佇列只是一種方式,還可以通過其他方式實作,比如Java的 DelayQueue 、Quartz定時任務、Redis的zset、時間輪 等都可以實作,具體方案還是要結合專案和具體方式的優缺點來選擇,比如專案中使用到了RabbitMQ,那使用RabbitMQ實作延遲佇列就是比較好的方式,那具體選擇插件方式還是死信佇列方式,還需要看專案中對該功能的靈活程度來選擇,
參考:
- https://www.rabbitmq.com/dlx.html
- https://www.cnblogs.com/williamwsj/p/8108970.html
- https://www.jianshu.com/p/256d2eaf1786
- https://www.rabbitmq.com/community-plugins.html
- https://blog.csdn.net/zhenghongcs/article/details/106700446
更多文章訪問:一名不愿透露姓名的程式員
最后:因小的才疏學淺,如有問題,請不吝指出,感謝感謝~
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/395064.html
標籤:其他
上一篇:使Flink SQL Kafka Source支持獨立設定并行度
下一篇:Spark統計每天新增用戶
