在RocketMQ 重復消費問題 | 訂單系統核心流程引入冪等性機制一文中,我們討論了訊息重復消費的問題,比較好的方案是采用在消費側使用業務判斷法來保證介面的冪等性,這樣就能避免訊息重復消費的問題,
今天要討論的是消費者代碼執行程序中出現例外,我們應該如何處理?
手動提交 offset
首先來看一段代碼,Consumer 類是一個消費者類,它我們為它注冊了一個監聽器,在處理完訊息之后,會將訊息的狀態回傳給 RocketMQ,執行成功回傳的是訊息狀態是 CONSUME_SUCCESS,
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");
// 設定 NameServer 地址
consumer.setNamesrvAddr("");
// 訂閱 Topic
consumer.subscribe("TopicTest", "*");
// 這次回呼介面,接收訊息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 對訊息的處理,比如發放優惠券、積分等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
畫一張圖來表示向 RocketMQ 提交訊息狀態的流程,如圖所示:

訊息者業務代碼出現例外怎么辦?
再來看一下消費者的代碼中監聽器的部分,它說如果訊息處理成功,那么就回傳訊息狀態為 CONSUME_SUCCESS,也有可能發放優惠券、積分等操作出現了例外,比如說資料庫掛掉了,這個時候應該怎么處理呢?
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 對訊息的處理,比如發放優惠券、積分等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
我們可以把代碼改一改,捕獲例外之后回傳訊息的狀態為 RECONSUME_LATER 表示稍后重試,
// 這次回呼介面,接收訊息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
try {
// 對訊息的處理,比如發放優惠券、積分等
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 萬一發生資料庫宕機等例外,回傳稍后重試訊息的狀態
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
重試佇列
這個時候,訊息會進入到 RocketMQ 的重試佇列中,
- 比如說消費者所屬的訊息組名稱為
AAAConsumerGroup - 其重試佇列名稱就叫做
%RETRY%AAAConsumerGroup - 重試佇列中的訊息過一段時間會再次發送給消費者,如果還是無法正常執行會再次進入重試佇列
- 默認重試16次,還是無法執行,訊息就會從重試佇列進入到死信佇列

死信佇列
- 重試佇列中的訊息重試16次任然無法執行,將會進入到死信佇列
- 死信佇列的名字是
%DLQ%AAAConsumerGroup - 死信佇列中的訊息可以后臺開一個執行緒,訂閱
%DLQ%AAAConsumerGroup,并不停重試

總結

本文從消費者的業務代碼出現例外講起,介紹了 RocketMQ 的重試佇列和死信佇列:
- 代碼正常執行回傳訊息狀態為
CONSUME_SUCCESS,執行例外回傳RECONSUME_LATER - 狀態為
RECONSUME_LATER的訊息會進入到重試佇列,重試佇列的名稱為%RETRY% + ConsumerGroupName; - 重試16次訊息任然沒有處理成功,訊息就會進入到死信佇列
%DLQ% + ConsumerGroupName;
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/156478.html
標籤:Java
下一篇:基數排序(Java)
