1.消費發送的機制
1.1訊息發送我們都知道會先發送到交換機上,然后再根據定的路由規則,由交換機將訊息路由到不同的 Queue(佇列)中,再由不同的消費者去消費,如下圖

所以我們就應該保證訊息成功到達交換機 和對列,如果都做到了納悶我們訊息就發送成功了對吧
2.常見的方案
2.1開啟事務機制
2.2 發送方確認機制
3.MQ事務機制確保訊息可靠性(不建議)
3.1 在RabbitMQ的配置類中,準備一個事務管理器(這里我只貼出MQ配置類關于事務的)
/**
* 事務管理器
* @param connectionFactory
* @return
*/
@Bean
RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}
3.2 在生產者(Product)打上@Transactional注解(事務) 然后我們在發送訊息的時候呼叫 setChannelTransacted 方法設定為 true 開啟事務模式
rabbitTemplate.setChannelTransacted(true);
3.3 這個時候我們的事務就已經OK了,我們模擬一下,在發送訊息的時候制造一個錯誤
@Autowired
RabbitTemplate rabbitTemplate;
@RequestMapping("/direct")
@Transactional
public String faSon(){
rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT, "direct", "direct");
int i = 1/0;
return "消費者訊息發送成功";
}
由上圖我們可以看見我們制造了一個int i = 1/0的錯誤,所以當我們去發送訊息的時候,我們后臺拋出了例外,我們的對列也沒有訊息投送過來


這個時候可以看出來我們的事務啟效果了,接下來我們修改下代碼,不開啟事務機制,
把事務管理器注銷,然后在發送者這邊修改代碼
public String faSon(){
rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT, "direct", "direct");
int i = 1/0;
return "消費者訊息發送成功";
}
這個時候可以看出來我們已經去掉了事務,然后我們啟動,訪問在去看看效果

這里后臺肯定會報錯拋例外的,主要是我們去看看MQ的可視化頁面

可以從上圖看出,我們的訊息依然發送成功了,說明我們這個訊息不可靠,盡管我們后臺出錯了,也發了,所以我們就可以通過事務來解決了訊息的可靠性,但是不建議,為什么了,因為事務模式其實效率有點低,這并非一個最佳解決方案,
4.發送方確認機制
4.1配置yml
publisher-confirm-type: correlated #到達交換器的確認回呼 none:表示禁用發布確認模式,默認即此,correlated:表示成功發布訊息到交換器后會觸發的回呼方法, simple:類似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的呼叫,
publisher-returns: true #訊息到達佇列的回呼
4.2 然后寫配置類
Configuration
public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
public static final String DIRECT = "DIRECT";
public static final String TEST_QUEUE= "TopicExchange";
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 宣告交換機
*/
@Bean
public DirectExchange getExchange(){
return new DirectExchange(DIRECT,true,false);
}
/**
* 宣告對列
*/
@Bean
public Queue getQueue(){
return new Queue(TEST_QUEUE,true,false,false);
}
/**
* 系結交換機對對列
*/
@Bean
public Binding bindingQueueExchange(){
return BindingBuilder.bind(getQueue()).to(getExchange()).with("direct");
}
/**
* 訊息到達交換機回呼
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack){
System.out.println("訊息成功到達交換器");
}else {
System.out.println("訊息發送失敗");
}
}
/**
* 訊息到達對列的回呼
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("訊息未成功路由到佇列");
}
}
這里我們實作了RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 兩個介面,這兩個介面,前者的回呼叫來確定訊息到達交換器,后者則會在訊息路由到佇列失敗時被呼叫,
然后在配置類中定義 initRabbitTemplate 方法并添加 @PostConstruct 注解,在該方法中為 rabbitTemplate 分別配置這兩個 Callback,
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
接下來我們就去發送訊息,我們先嘗試發送到一個沒有的交換機上面看看

我們后臺列印了出結果

由此可見我們發送到交換機失敗了,確認機制起了作用,我們在發到沒有的對列看看效果

我們后臺效果看出了發送到對列失敗了,我們的回呼監聽到了,這就是 publisher-confirm 模式,相比于事務,這種模式下的訊息吞吐量會得到極大的提升
5.重試機制
5.1自帶的重試機制
前面所說的事務機制和發送方確認機制,都是發送方確認訊息發送成功的辦法,如果發送方一開始就連不上 MQ,那么 Spring Boot 中也有相應的重試機制,但是這個重試機制就和 MQ 本身沒有關系了,這是利用 Spring 中的 retry 機制來完成的,具體配置如下:
template:
retry:
enabled: true #開啟重試機制
initial-interval: 1000ms #重試起始間隔時間
max-interval: 10 #最大重試間隔時間,
max-attempts: 10 #重試次數
multiplier: 2 #間隔時間乘數,(這里配置間隔時間乘數為 2,則第一次間隔時間 1 秒,第二次重試間隔時間 2 秒,第三次 4 秒,以此類推)
再次啟動 Spring Boot 專案,然后關掉 MQ,此時嘗試發送訊息,就會發送失敗,進而導致自動重試,

好了,今天就先寫到這來了,下一章會講如何確保訊息的成功訊息,嘿嘿覺得可以的老鐵點個贊!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423732.html
標籤:其他
上一篇:點擊曝光日志的資料處理
