前言 : rabbitMQ為了防止訊息不丟失的情況,可以使用事物訊息,但是性能下降250倍,為此引入確認機制

如上圖所示:
一、publisher confirmCallBack確認模式
springboot開啟rabbitmq可靠抵達 ——confirmCallBack
spring:
rabbitmq:
publisher-confirm-type: correlated
當我們的publisher 到達 broker (服務器時候) ,回傳confirmCallback,當訊息沒有抵達broker的時候回傳true,并會給出失敗原因,
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 使用json序列化 將訊息轉為json
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* MyRabbitConfig 方法創建完成之后 執行 postConstruct
*/
@PostConstruct
public void initRabbitTemplate(){
/**
* 設定確認回呼
*/
rabbitTemplate. setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 只要 訊息到達 broker(服務器) 就回傳true ,如果不抵達回傳false
* @param correlationData 訊息關聯資料(通過id關聯)
* @param b 訊息是否成功收到
* @param s 失敗的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
}
});
}
}
二、publisher returnCallBack 確認模式
springboot開啟rabbitmq可靠抵達 —— returnCallBack
當我們開啟publisher-returns 時候,將 spring.rabbitmq.template.mandatory 開啟
作用:只要訊息抵達佇列 ,以異步方式優先回呼 rerunConfirm
spring:
rabbitmq:
publisher-returns: true
#只要訊息抵達佇列 ,以異步方式優先回呼 rerunConfirm
template:
mandatory: true
設定broker 抵達 queue 時候的returnCallBack回呼
只要訊息沒有投遞給指定的佇列,就觸發這個失敗回呼
/**
* 第二步:設定broker 抵達 queue 時候的returnCallBack回呼
*/
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
/**
* 只要訊息沒有投遞給指定的佇列,就觸發這個失敗回呼
*
* @param returnedMessage 回傳訊息失敗的物體
*/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
}
});
貼上RetrunedMessage物體回傳的內容(此處為rabbitMQ自帶物體,非自建)
public class ReturnedMessage {
//投遞失敗的訊息詳細資訊
private final Message message;
//回復的狀態碼
private final int replyCode;
//回復的文本內容
private final String replyText;
//當時這個訊息發給哪個交換機
private final String exchange;
//當時這個訊息用哪個路由鍵
private final String routingKey;
public ReturnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
this.message = message;
this.replyCode = replyCode;
this.replyText = replyText;
this.exchange = exchange;
this.routingKey = routingKey;
}
public Message getMessage() {
return this.message;
}
public int getReplyCode() {
return this.replyCode;
}
public String getReplyText() {
return this.replyText;
}
public String getExchange() {
return this.exchange;
}
public String getRoutingKey() {
return this.routingKey;
}
public String toString() {
return "ReturnedMessage [message=" + this.message + ", replyCode=" + this.replyCode + ", replyText=" + this.replyText + ", exchange=" + this.exchange + ", routingKey=" + this.routingKey + "]";
}
}
路由鍵系結為 :
/**
* 系結交換機和佇列
*/
@Test
void bindingQueueExchange(){
/**
* String destination 【目的地】
* Binding.DestinationType destinationType, 【目的地系結型別】
* String exchange, 【交換機】
* String routingKey, 【路由鍵】
* @Nullable Map<String, Object> argument 【自定義引數】
*/
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE,exchange,"chendazui.#",null);
amqpAdmin.declareBinding(binding);
log.info("[{}]系結成功","chendazui-binding");
}
當我們將路由鍵修改后:
@Test
void sendMessage(){
MessageUtil messageUtil = new MessageUtil();
messageUtil.setCode("1");
messageUtil.setMsg("我到了");
messageUtil.setOrder("陳大嘴的訂單");
messageUtil.setUser("陳大嘴");
messageUtil.setDateTime(new Date());
/**
* 發送訊息物體類必須要序列化
*/
rabbitTemplate.convertAndSend(exchange,"chendazui1.#",messageUtil);
log.info("[{}]訊息已經發出",messageUtil);
}
這個時候我們測驗結果如下:
當前指向的交換機==>chendazui-exchange投遞失敗訊息詳情(Body:'{"code":"1","msg":"我到了","order":"陳大嘴的訂單","user":"陳大嘴","dateTime":1634780951481}' MessageProperties [headers={__TypeId__=com.example.demo.util.MessageUtil}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])回復的文本內容NO_ROUTE路由鍵chendazui1.#回復的狀態碼=>312
這時候我們returnCallBack機制捕捉到失敗訊息,訊息未抵達佇列queue,
相關代碼地址:
RabbitMQ Demo: rabbitmq 代碼示例
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/330167.html
標籤:其他
