RabbitMQ生產方式和考慮訊息可靠性投遞及其他問題
- 3種Exchange(交換機)與對應生產方式
- Queue佇列多載引數
- RabbitMQ生產方式
- Work queues(作業模式)
- Routing(路由模式)
- Publish/Subscribe(訂閱模式)
- Topics(主題模式)
- 訊息投遞流程
- 考慮以下問題
- 如何保證訊息沒有重復消費
- 訊息可靠性投遞
- 生產端
- confirm確認模式(保證訊息從生產者投遞到交換機)
- return退回模式(保證訊息從交換機投遞到佇列)
- 消費端
- 怎么保證訊息傳遞的順序
- 一致性問題

3種Exchange(交換機)與對應生產方式
- direct
將訊息發送到與交換機系結且對應routingkey的佇列
生產方式有:Work queues(作業模式)、Routing(路由模式) - fanout
將訊息發送到與交換機系結的佇列,不需要routingkey
生產方式有:Publish/Subscribe(訂閱模式) - topic
將訊息發送到與交換機系結的佇列,routingkey模糊匹配
生產方式有:Topics(主題模式)
Queue佇列多載引數
durable:是否持久化,true 交換機或者佇列會存到本地檔案資料庫,當mq重啟,依然在,false的話,重啟或者斷電,就沒了;默認true
autoDelete:是否自動洗掉,true當沒有Consumer消費者時候,自動洗掉掉;默認false
exclusive:是否獨占:true的話只有一個消費者監聽這個佇列,默認false
/**
* 定義一個direct佇列
*
* @return
*/
@Bean
public Queue directQueue() {
return new Queue(DIRECT_QUEUE);
}
/**
* 定義一個帶參direct佇列
*
* boolean durable 是否持久化
* boolean autoDelete 是否自動洗掉
* boolean exclusive 是否獨占
* @return
*/
@Bean
public Queue directQueue2() {
return new Queue(DIRECT_QUEUE, true, false, false);
}
RabbitMQ生產方式
-
Work queues(作業模式)
一個或者多個消費者共同消費一個佇列中的訊息(佇列中的每一個訊息只可能被其中一個消費者消費)
-
Routing(路由模式)
跟訂閱模式類似,只不過在訂閱模式的基礎上加上了型別,訂閱模式是分發到所有系結到交換機的佇列,路由模式
-
Publish/Subscribe(訂閱模式)
生產者生產的訊息,所有訂閱過的消費者都能夠接收到訊息(佇列中的每一個訊息可以被多個消費者消費)
-
Topics(主題模式)
主題模式和路由模式很像,路由模式是精確匹配,主題模式是模糊匹配,* 匹配一個單詞,# 匹配零個或者多個單詞
訊息投遞流程
Producer(生產者)=>Channel(網路信道)=>Exchange(交換機)=>Queue(佇列,到此會進行存盤,即完成投遞)=>Channel=>Consumer(消費者)
考慮以下問題
本文著重講解訊息可靠性投遞
-
如何保證訊息沒有重復消費
保證消費者的冪等性 -
訊息可靠性投遞
生產端
confirm確認模式和return退回模式(一般是一起使用)confirm確認模式(保證訊息從生產者投遞到交換機)
生產者訊息投遞后,如果Broker收到訊息,則會給生產者一個應答(ack),生產者進行接收應答,用來確定這條訊息是否正常的發送到Broker實作如下:
1.組態檔:spring.rabbitmq.publisher-confirm-type=correlatedserver: port: 80 spring: rabbitmq: host: 192.168.31.6 port: 5672 username: linhy password: 123456 virtual-host: / # confirm 確認模式 publisher-confirm-type: correlated2.訊息發送類實作RabbitTemplate.ConfirmCallback介面,實作confirm方法
3.初始化RabbitTemplate,rabbitTemplate.setConfirmCallback(this);
4.發送訊息new一個CorrelationData物件將主鍵ID放入,將該物件一起發送(confirm方法引數是一個訊息唯一標識,即CorrelationData物件,所以需要將主鍵ID放入)
5.判斷confirm方法ack引數進行補發操作
訊息發送類具體代碼示例public class RabbitMqServiceImpl implements RabbitMqService, RabbitTemplate.ConfirmCallback { @Resource private AmqpTemplate amqpTemplate; @Resource private RabbitTemplate rabbitTemplate; /** * 初始化confirm 確認模式 */ @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); } /** * String exchange 交換機名稱 * String routingKey 路由Key * Object object 具體發送的訊息 * * @param message */ @Override public void sendMessage(String message) { CorrelationData correlationData = new CorrelationData(); correlationData.setId("12050"); rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, RabbitMQConfig.ACK_DIRECT_ROUTING_KEY, message, correlationData);// 訊息推送失敗return } /** * confirm只能保證訊息從生產者投遞到交換機后,進入佇列前 * * @param correlationData 訊息唯一標識 * @param ack 交換機是否成功收到訊息 true成功 false失敗 * @param cause 失敗原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("訊息已被推送=========>" + correlationData); if (ack) { System.out.println("交換機接受訊息成功" + cause); } else { System.out.println("交換機接受訊息失敗" + cause); // 訊息補發操作 System.out.println("失敗id:" + correlationData.getId()); } } }return退回模式(保證訊息從交換機投遞到佇列)
用于處理交換機不存在或指定的路由key路由不到的情況實作如下:
1.組態檔:publisher-returns: true(默認為false,即訊息路由不可達時訊息丟棄;true訊息回退)server: port: 80 spring: rabbitmq: host: 192.168.31.6 port: 5672 username: linhy password: 123456 virtual-host: / # confirm 確認模式 publisher-confirm-type: correlated # return 退回模式 publisher-returns: true2.訊息發送類實作RabbitTemplate.ReturnCallback介面,實作returnedMessage方法
3.初始化RabbitTemplate,rabbitTemplate.setReturnCallback(this);
4.獲取主鍵ID進行日志紀錄或其他操作(returnedMessage方法引數是訊息主體,生產者發送時可將主鍵ID作為訊息主體發送)
訊息發送類具體代碼示例public class RabbitMqServiceImpl implements RabbitMqService, RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Resource private AmqpTemplate amqpTemplate; @Resource private RabbitTemplate rabbitTemplate; /** * 初始化confirm 確認模式和return 退回模式 */ @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } /** * String exchange 交換機名稱 * String routingKey 路由Key * Object object 具體發送的訊息 * * @param message */ @Override public void sendMessage(String message) { CorrelationData correlationData = new CorrelationData(); correlationData.setId("12050"); rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, RabbitMQConfig.ACK_DIRECT_ROUTING_KEY, message, correlationData);// 訊息推送失敗return } /** * confirm只能保證訊息從生產者投遞到交換機后,進入佇列前 * * @param correlationData 訊息唯一標識 * @param ack 交換機是否成功收到訊息 true成功 false失敗 * @param cause 失敗原因 */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("訊息已被推送=========>" + correlationData); if (ack) { System.out.println("交換機接受訊息成功" + cause); } else { System.out.println("交換機接受訊息失敗" + cause); // 訊息補發操作 System.out.println("失敗id:" + correlationData.getId()); } } /** * returnedMessage能保證訊息從交換機投遞到佇列 * * @param message 訊息主體 * @param replyCode 回傳code * @param replyText 回傳資訊 * @param exchange 交換機 * @param routingKey 路由key */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("訊息推送失敗,return方法被執行..."); System.out.println("訊息主體:" + new String(message.getBody())); System.out.println("replyCode:" + replyCode); System.out.println("replyText:" + replyText); System.out.println("exchange:" + exchange); System.out.println("routingKey:" + routingKey); } }消費端
消費端手動ACK確認實作如下:
1.確認方法改為手動確認,組態檔:acknowledge-mode: manualserver: port: 81 spring: rabbitmq: host: 192.168.31.6 port: 5672 username: admin password: admin virtual-host: / # 消費端手動ack確認佇列訊息 listener: simple: acknowledge-mode: manual2.消費類增加Channel物件引數和deliveryTag引數
3.channel.basicAck(deliveryTag, true)確認佇列訊息,channel.拒簽,是否回到佇列
訊息接收類具體代碼示例
Servicepublic interface RabbitMqService { /** * 消費端手動ack確認佇列訊息 * * @param message 訊息 * @param channel 管道 * @param deliveryTag 標號 */ public void receiveSubMessageAck(String message, Channel channel, long deliveryTag); }Service實作
public class RabbitMqServiceImpl implements RabbitMqService { @Resource private AmqpTemplate amqpTemplate; @Override @RabbitListener(queues = {RabbitMQConfig.ACK_QUEUE}) public void receiveSubMessageAck(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) { try { System.out.println("接收到的mq訊息:" + message); // 處理業務 System.out.println("開始處理業務=========>"); // 消費者手動確認佇列訊息(批量確認) if (deliveryTag == 5) { channel.basicAck(deliveryTag, true);// long deliveryTag 標號(當前訊息在佇列中的的索引), boolean multiple 是否批量處理 } // 消費者手動確認佇列訊息(單個確認,低效) // channel.basicAck(deliveryTag, true); } catch (Exception e) { // 報錯,消費者進行拒簽 e.printStackTrace(); try { channel.basicNack(deliveryTag, false, false);//long deliveryTag 標號, boolean multiple 是否批量處理, boolean requeue 是否回到佇列(false直接丟棄) } catch (IOException ioException) { ioException.printStackTrace(); } } } } -
怎么保證訊息傳遞的順序
開啟多個佇列,訊息投遞到同一個佇列,一個消費者消費 -
一致性問題
一致性問題即事務問題,可使用nacos注冊中心+seata解決分布式事務問題,下次更新
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/379505.html
標籤:其他
