什么是死信
在 RabbitMQ 中充當主角的就是訊息,在不同場景下,訊息會有不同地表現,
死信就是訊息在特定場景下的一種表現形式,這些場景包括:
- 訊息被拒絕訪問,即 RabbitMQ回傳 nack 的信號時
- 訊息的 TTL 過期時
- 訊息佇列達到最大長度
- 訊息不能入隊時,
上述場景經常產生死信,即訊息在這些場景中時,被稱為死信,
什么是死信佇列
死信佇列就是用于儲存死信的訊息佇列,在死信佇列中,有且只有死信構成,不會存在其余型別的訊息,
死信佇列在 RabbitMQ 中并不會單獨存在,往往死信佇列都會系結這一個普通的訊息佇列,當所系結的訊息佇列中,有訊息變成死信了,那么這個訊息就會重新被交換機路由到指定的死信佇列中去,我們可以通過對這個死信佇列進行監聽,從而手動的去對這一訊息進行補償,
那么,我們到底如何來使用死信佇列呢?
死信佇列基本使用
在 RabbitMQ 中,死信佇列的標識為 x-dead-letter-exchange ,通過觀察死信佇列的標識,我們不難發現,其標識最后為 exchange ,即 RabbitMQ 中的交換機,RabbitMQ 中的死信佇列就是由死信交換機而得出的,
要想使用死信佇列,我們需要首先宣告一個普通的訊息佇列,并將死信佇列的標識系結到這個普通的訊息佇列上, 這個程序需要我們在生產端進行配置,代碼如下所示:
// 使用 ConnectionFactory 創建了一個客戶端連接 RabbitMQ Server 的連接,
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("xx");
connectionFactory.setPort("5672");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
// 使用建立好的連接,來創建了一個頻道 channel ,
Channel channel = connection.createChanel();
// 宣告了一個普通佇列的額外引數的 Map ,
// 這個 Map 的 key 就是死信佇列的標識,value 就是我們后續宣告的真正的死信交換機的名稱,
Map<String, Object> argumentsMap = new HashMap();
argumentsMap.put("x-dead-letter-exchange", "dlx_exchange");
/*
使用 channel 的 exchangeDeclare 方法和 queueDeclare 方法,
分別宣告了一個名為 dlx_common_exchange 的交換機和名為 dlx_common_queue 的普通訊息佇列,
之所以名稱中有 common ,是因為要對這個交換機和佇列做一個標識,表示該交換機和佇列是系結了死信佇列的,
*/
channel.exchangeDeclare("dlx_common_exchange", "direct", true, false, null);
channel.queueDeclare("dlx_common_queue", true, false, false, argumentsMap);
/*
使用 channel 的 queueBind 方法來講宣告的普通交換機和訊息佇列進行系結,并且制定了 routingKey ,
這樣訊息就可以經 dlx_common_exchange 根據 routingKey 來路由到 dlx_common_queue 中,
*/
channel.queueBind("dlx_common_queue", "dlx_common_exchange", routingKey);
宣告了要系結死信佇列的普通佇列之后,最后我們需要宣告真正的死信佇列
// 使用 chanel 的 exchangeDeclare 方法來宣告了一個名為 dlx_exchange 的交換機,
channel.exchangeDeclare("dlx_exchange", "direct", true, false, null);
// 使用 channel 的 queueDeclare 方法來宣告了一個名為 dlx_queue 的佇列,
channel.queueDeclare("dlx_queue", true, false, false, null);
// 使用 channel 的 queueBind 方法,來將 dlx_exchange 的交換機與 dlx_queue 佇列進行了系結,
channel.queueBind("dlx_queue", "dlx_exchange", routingKey);
MQ處理訊息失敗了怎么辦
在生產環境中,使用MQ的時候設計兩個佇列:
- 一個是業務佇列,專門用來處理訊息;
- 一個是死信佇列,用來處理例外情況,
比如說消費訊息的時候,資料庫故障了,此時無法將資料落盤,那么消費者每次消費一條訊息,嘗試落盤持久化的時候,都會遇到資料庫報錯,此時消費者就可以把這條訊息拒絕訪問,或者標志位處理失敗!
- 一旦標志這條訊息處理失敗了之后,MQ就會把這條訊息轉入提前設定好的一個死信佇列中,
- 在資料庫故障期間,所有訊息全部處理失敗,全部會轉入死信佇列,
- 然后消費者會專門有一個后臺執行緒,監控資料庫是否正常,能否請求的,不停的監視,
- 一旦發現對方恢復正常,這個后臺執行緒就從死信佇列消費出來處理失敗的訂單,重新處理邏輯
- 死信佇列的使用,其實就是MQ在生產實踐中非常重要的一環,也就是架構設計必須要考慮的,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/356930.html
標籤:其他
上一篇:Flink簡介和開發總結
