目錄
1、匯入Demo工程
2、生產者訊息確認
2.1 修改配置
2.2 定義Return回呼
2.3 發送訊息
2.4 小結
3、訊息持久化?
3.1 交換機持久化
3.2 佇列持久化
3.3 訊息持久化
3.4 小結
4、消費者訊息確認
4.1.演示none模式
4.2.演示auto模式
5、消費失敗重試機制
5.1 本地重試
5.2.失敗策略
6、總結
7、專案工程
訊息從發送,到消費者接收,會經歷多個程序:

其中的每一步都可能導致訊息丟失,常見的丟失原因包括:
發送時丟失:
生產者發送的訊息未送達exchange
訊息到達exchange后未到達queue
MQ宕機,queue將訊息丟失
consumer接收到訊息后未消費就宕機
針對這些問題,RabbitMQ分別給出了解決方案:
生產者確認機制
mq持久化
消費者確認機制
失敗重試機制
本篇博客就來帶大家解決訊息的可靠性,
1、匯入Demo工程
匯入Demo課程,
https://gitee.com/boring-yingjie/rabbit-mq-message-confirmation.git

把組態檔application.yml配置修改完畢,
然后我們在consumer 的config包的通用配置類創建一個佇列,

運行consumer的啟動類,

2、生產者訊息確認
生產者確認機制:
RabbitMQ提供了publisher confirm機制來避免訊息發送到MQ程序中丟失,訊息發送到MQ以后,會回傳一個結果給發送者,表示訊息是否處理成功,結果有兩種請求: 1. publisher-confirm,發送者確認
訊息成功投遞到交換機,回傳Ack(acknowledge 告知已收到),
訊息未投遞到交換機,回傳Nack(未收到),
2. publisher-return,發送者回執 訊息投遞到交換機了,但是沒有路由到佇列,回傳ACK,及路由失敗原因,

注:
確認機制發送訊息時,需要給每個訊息設定一個全域唯一id以區分不同訊息,避免ACK沖突,
2.1 修改配置
首先,修改publisher服務中的application.yml檔案,添加下面的內容:
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true

配置說明:
publish-confirm-type:開啟publisher-confirm,這里支持兩種型別:
simple:同步等待confirm結果,直到超時(可能會導致訊息堵塞,不推薦),
correlated:異步回呼,定義ConfirmCallback,MQ回傳結果時會回呼這個ConfirmCallback,(推薦)
publish-returns:開啟publish-return功能,同樣是基于callback機制,不過是定義ReturnCallback,
template.mandatory:定義訊息路由失敗時的策略,true,則呼叫ReturnCallback;false:則直接丟棄訊息,
2.2 定義Return回呼
每個RabbitTemplate只能配置一個ReturnCallback,因此需要在專案加載時配置
修改publisher服務,添加一個:
package com.jie.mq.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @description:生產者通用配置
* @author: jie
* @time: 2022/2/25 15:34
*/
@Slf4j
@Configuration
public class CommonConfig implements ApplicationContextAware {
/**
* @description:每個RabbitTemplate只能配置一個ReturnCallback,因此需要在專案加載時配置
* @author: jie
* @time: 2022/2/25 15:35
*/
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
// 獲取RabbitTemplate
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 設定ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 投遞失敗,記錄日志
log.info("訊息發送到佇列失敗,回應碼{},失敗原因{},交換機{},路由鍵{},訊息{}",
replyCode, replyText, exchange, routingKey, message.toString());
// 如果有業務需要,可以重發訊息
});
}
}
2.3 發送訊息
我們發送訊息通過一個單元測驗來發,
這里面有一個最簡單的訊息發送代碼,

我們要去為amq.topic這個交換機系結一下simple.queue這個佇列,這里我用的是手動的方式,大家可以選擇使用代碼的方式,
打開瀏覽器
點擊進去

系結完成,回到代碼區,我現在發送訊息,符合要求,那一定能發送成功,所以我們要修改一下代碼,
@Test
public void testSendMessage2SimpleQueue() throws InterruptedException {
String routingKey = "simple.test";
//1、準備訊息
String message = "hello, spring amqp!";
//2、準備CorrelationData
//2.1 訊息ID
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
//2.2 準備ConfirmCallback
//成功回呼
correlationData.getFuture().addCallback(confirm -> {
//判斷結果
if(confirm.isAck()){
//ACK 訊息成功
log.error("訊息成功投遞到交換機!訊息ID:{}",correlationData.getId());
}else {
//NACK 訊息失敗
log.error("訊息投遞到交換機失敗!訊息ID:{}",correlationData.getId());
//重發訊息
}
//失敗回呼
}, throwable -> {
//記錄日志
log.error("訊息發送失敗!",throwable);
});
//3、發送訊息
rabbitTemplate.convertAndSend("amq.topic", routingKey, message,correlationData);
}
我們可以先運行代碼查看控制訊息,
這個是成功的情況,接下來演示一下失敗的情況,比如訊息根本沒有到達交換機,可能是交換機名稱填錯了,

還有一種就是交換機到達了,沒有到達佇列,比如佇列的名稱填錯了,

2.4 小結
SpringAMQP中處理訊息確認的幾種情況:
publisher-comfirm: 訊息成功發送到exchange,回傳Ack, 訊息發送失敗,沒有到達交換機,回傳Nack, 訊息發送程序中出現例外,沒有收到回執,
訊息成功發送到exchange,但沒有路由到queue
呼叫ReturnCallback,
3、訊息持久化
生產者確認可以確保訊息投遞到RabbitMQ的佇列中,但是訊息發送到RabbitMQ以后,如果突然宕機,也可能導致訊息丟失,
要想確保訊息在RabbitMQ中安全保存,必須開啟訊息持久化機制,
交換機持久化
佇列持久化
訊息持久化
3.1 交換機持久化
RabbitMQ中交換機默認是非持久化的,mq重啟后就丟失,
SpringAMQP中可以通過代碼指定交換機持久化:
@Bean
public DirectExchange simpleExchange(){
// 三個引數:交換機名稱、是否持久化、當沒有queue與其系結時是否自動洗掉
return new DirectExchange("simple.direct", true, false);
}

啟動服務,![]()
注:
事實上,默認情況下,由SpringAMQP宣告的交換機都是持久化的,
3.2 佇列持久化
RabbitMQ中佇列默認是非持久化的,mq重啟后就丟失,
SpringAMQP中可以通過代碼指定交換機持久化:
@Bean
public Queue simpleQueue(){
// 使用QueueBuilder構建佇列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
注:
事實上,默認情況下,由SpringAMQP宣告的佇列都是持久化的,
可以在RabbitMQ控制臺看到持久化的佇列都會帶上D的標示:
3.3 訊息持久化
利用SpringAMQP發送訊息時,可以設定訊息的屬性(MessageProperties),指定delivery-mode:
1:非持久化
2:持久化
用java代碼指定:
@Test
public void testDurableMessage() {
//準備訊息
Message message = MessageBuilder.withBody("hell,spring".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
// 2.發送訊息
rabbitTemplate.convertAndSend("simple.queue",message);
}
注:默認情況下,SpringAMQP發出的任何訊息都是持久化的,不用特意指定,
3.4 小結
默認情況下,由SpringAMQP宣告的交換機和佇列還有發出的訊息都是默認持久化的,
4、消費者訊息確認
RabbitMQ是閱后即焚機制,RabbitMQ確認訊息被消費者消費后會立刻洗掉,
而RabbitMQ是通過消費者回執來確認消費者是否成功處理訊息的:消費者獲取訊息后,應該向RabbitMQ發送ACK回執,表明自己已經處理訊息,
生產者訊息確認可以確保訊息投遞到佇列當中,而訊息的持久化可以保證不會因為MQ的宕機而導致訊息的丟失,經過這兩個我們可以保證訊息能投遞到消費者,
但是這個消費者是不是一定能消費這個訊息呢?如果訊息投遞到消費者的那一刻,消費者掛了,那這樣訊息還是沒有消費,訊息就丟失了,
RabbitMQ支持消費者確認機制,即:消費者處理訊息后可以向MQ發送Ack回執,MQ收到Ack回執后才會洗掉該訊息,而SpringAMQP則允許配置三種確認模式:
manual:手動ack,需要在業務代碼結束后,呼叫api發送ack,
auto:自動ack,由spring監測listener代碼是否出現例外,沒有例外則回傳ack;拋出例外則回傳nack,
none:關閉ack,MQ假定消費者獲取訊息后會成功處理,因此訊息投遞后立即被洗掉,
由此可知:
-
none模式下,訊息投遞是不可靠的,可能丟失,
-
auto模式類似事務機制,出現例外時回傳nack,訊息回滾到mq;沒有例外,回傳ack,
-
manual:自己根據業務情況,判斷什么時候該ack,
一般,我們都是使用默認的auto即可,
4.1.演示none模式
修改consumer服務的application.yml檔案,添加下面內容:
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: none # 關閉ack

我們在這個佇列先發留有一條訊息,
修改consumer服務的SpringRabbitListener類中的方法,模擬一個訊息處理例外:
我們打個斷點用 Debug運行,再回到瀏覽器查看,發現訊息已經沒了,
我們回到代碼區往下執行,
測驗可以發現,當訊息處理拋例外時,訊息依然被RabbitMQ洗掉了,
4.2.演示auto模式
再次把確認機制修改為auto:
然后我們再往佇列發送一條訊息,
再次用Debug運行程式,
可以發現此時訊息狀態為unacked(未確定狀態),這什么意思呢?就是我還沒收到Ack,我在等著你給我發呢,所以訊息還沒有洗掉,
拋出例外后,因為Spring會自動回傳nack,所以訊息恢復至Ready狀態,并且沒有被RabbitMQ洗掉,
如果我們把斷點去掉,就會發生一個恐怖的事情,它就會進去一個死回圈,它發現你投遞失敗了,又給你重新投遞,一直重復這個操作,
這個顯然也不太好,但是最起碼訊息不會丟失,
5、消費失敗重試機制
當消費者出現例外后,訊息會不斷requeue(重入隊)到佇列,再重新發送給消費者,然后再次例外,再次requeue,無限回圈,導致mq的訊息處理飆升,帶來不必要的壓力:
5.1 本地重試
我們可以利用Spring的retry機制,在消費者出現例外時利用本地重試,而不是無限制的requeue到mq佇列,
修改consumer服務的application.yml檔案,添加內容:
retry:
enabled: true # 開啟消費者失敗重試
initial-interval: 1000 # 初識的失敗等待時長為1秒
multiplier: 1 # 失敗的等待時長倍數,下次等待時長 = multiplier * initial-interval
max-attempts: 3 # 最大重試次數,事不過三
stateless: true # true無狀態;false有狀態,如果業務中包含事務,這里改為false
重啟consumer服務,重復之前的測驗,可以發現:
在重試3次后,SpringAMQP會拋出例外,說明本地重試觸發了,
查看RabbitMQ控制臺,發現訊息被洗掉了,說明最后SpringAMQP回傳的是ack,mq洗掉訊息了

5.2.失敗策略
在之前的測驗中,達到最大重試次數后,訊息會被丟棄,這是由Spring內部機制決定的,
在開啟重試模式后,重試次數耗盡,如果訊息依然失敗,則需要有MessageRecovery介面來處理,它包含三種不同的實作:
RejectAndDontRequeueRecoverer:重試耗盡后,直接reject,丟棄訊息,默認就是這種方式,
ImmediateRequeueMessageRecoverer:重試耗盡后,回傳nack,訊息重新入隊,
RepublishMessageRecoverer:重試耗盡后,將失敗訊息投遞到指定的交換機,
比較好的一種處理方案是RepublishMessageRecoverer,失敗后將訊息投遞到一個指定的,專門存放例外訊息的佇列,后續由人工集中處理,
在consumer服務中定義處理失敗訊息的交換機和佇列
package com.jie.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ErrorMessageConfig {
/**
* @description:例外訊息交換機
* @author: jie
* @time: 2022/2/26 18:39
*/
@Bean
public DirectExchange errorMessageExchange() {
return new DirectExchange("error.direct");
}
/**
* @description:佇列
* @author: jie
* @time: 2022/2/26 18:39
*/
@Bean
public Queue errorQueue(){
return new Queue("err.queue");
}
/**
* @description:將交換機和佇列系結
* @author: jie
* @time: 2022/2/26 18:39
*/
@Bean
public Binding errorMessageBinding(){
return BindingBuilder
.bind(errorQueue())
.to(errorMessageExchange())
.with("error");
}
/**
* @description:例外訊息處理器
* @author: jie
* @time: 2022/2/26 18:41
*/
@Bean
public MessageRecoverer republishMessageReciverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
}
}
重新啟動后我們向simple.queue發送訊息,

6、總結
如何確保RabbitMQ訊息的可靠性?
開啟生產者確認機制,確保生產者的訊息能到達佇列,
開啟持久化功能,確保訊息未消費前在佇列中不會丟失,
開啟消費者確認機制為auto,由spring確認訊息處理成功后完成ack,
開啟消費者失敗重試機制,并設定MessageRecoverer,多次重試失敗后將訊息投遞到例外交換機,交由人工處理,
7、專案工程
https://gitee.com/boring-yingjie/rabbit-mq-message-confirmation.git
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/433394.html
標籤:其他
上一篇:零拷貝機制在檔案傳輸中的使用手法
