前言
MQ(Message Queue)就是訊息佇列,其有點有很多:解耦、異步、削峰等等,本文來聊一下RabbitMQ的一些概念以及使用,
RabbitMq
案例
Springboot整合RabbitMQ簡單案例
基本概念

- Exchange:訊息交換機,它指定訊息按什么規則,路由到哪個佇列,
- Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列,
- Binding:系結,它的作用就是把exchange和queue按照路由規則系結起來,
- Routing Key:路由關鍵字,exchange根據這個關鍵字進行訊息投遞,
- Producer:訊息生產者,就是投遞訊息的程式,
- Consumer:訊息消費者,就是接受訊息的程式,
發布訊息到RabbitMQ需要經過兩步:
- producer → exchange
- exchange 根據 exchange 的型別和 routing key 確定將訊息投遞到哪個佇列
作業流程
了解了RabbitMQ的一些概念,我們來捋捋使用RabbitMQ的流程:
- 創建Exchange
- 創建Queue
- 將Queue系結進Exchange中(此處會設定routing key)
- 生產者發布訊息
- 消費者訂閱訊息
交換機(Exchange)
交換機可以系結佇列,系結時可以給佇列指定路由(Routing key)和引數(Arguments)
所有的訊息發送都是經過交換機轉發到佇列的,而不是直接到佇列中
交換機型別:
-
direct
根據確定的路由(routing key)轉發訊息到佇列中(一條訊息可以發到多個佇列,只要路由相同)
-
fanout
路由無效,只要和該交換機系結的佇列,都能接收到訊息
-
topic
允許路由使用*和#來進行模糊匹配
*表示一個單詞
表示任意數量(零個或多個)單詞
例如:如果佇列的路由為com.# 那么往交換機發訊息是,路由填com.ccc 佇列就可以收到訊息
-
headers
忽略路由,由引數(Arguments)來確定轉發的佇列
訊息過期時間TTL
有兩種方式設定TTL,創建佇列時設定整個佇列的TTL或者在發送訊息時單獨設定每條訊息的TTL,訊息存活時間取兩者的最小值,
-
創建佇列時設定
是訊息的存活時間,不是佇列的存活時間,別搞混了,
@Bean public Queue queue(){ Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 5000); // 設定佇列中的訊息5秒過期 return new Queue("queueName",true, false, false, args); } -
發送訊息時設定
public void makeOrder(String userid,String productid,int num){ String exchangeName = "ttl_exchange"; String routingKey = "ttlmessage"; //給訊息設定過期時間 MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){ public Message postProcessMessage(Message message){ // 設定訊息5秒過期 message.getMessageProperties().setExpiration("5000"); return message; } } rabbitTemplate.convertAndSend(exchangeName,routingKey,"message",messagePostProcessor); }
死信佇列
死信佇列也是一個正常佇列,只是當系結了死信佇列的佇列滿足相應條件,就會將滿足條件的訊息轉移到死信佇列中,
進入死信佇列的條件:
- 訊息被拒絕
- 訊息過期(超時)
- 佇列達到最大長度
死信佇列的配置:
-
按照正常步驟定義一個佇列(交換機、佇列、系結)
-
給需要系結死信佇列的佇列添加x-dead-letter-exchange(死信佇列的交換機)和x-dead-letter-routing-key(死信佇列的路由)引數
@Bean public Queue queue(){ Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", "死信佇列交換機名稱"); args.put("x-dead-letter-routing-key", "死信佇列路由"); return new Queue("queueName",true, false, false, args); }
如何保證MQ訊息正確送達與消費
可靠性生產和推送
步驟:
- 發送訊息前資料庫保存MQ訊息發送日志
- MQ訊息發送后使用回呼更新日志狀態
實作:
上面我們講了,發布訊息到RabbitMQ需要經過兩步:
producer → exchange
exchange 根據 exchange 的型別和 routing key 確定將訊息投遞到哪個佇列
所以,發布訊息的確認也分兩個部分,以下是確認步驟:
-
修改MQ應答機制(yml)
spring: rabbitmq: username: rmq password: 123456 virtual-host: / host: localhost port: 5672 # 發送訊息確認,producer -> exchange publisher-confirm-type: correlated # 發送訊息確認,exchange -> queue publisher-returns: true -
新增mq的回呼方法
/** * PostConstruct注解好多人以為是Spring提供的,其實是Java自己的注解, * Java中該注解的說明:@PostConstruct該注解被用來修飾一個非靜態的void()方法, * 被@PostConstruct修飾的方法會在服務器加載Servlet的時候運行,并且只會被服務器執行一次, * PostConstruct在建構式之后執行,init()方法之前執行, * Constructor(構造方法) -> @Autowired(依賴注入) -> @PostConstruct(注釋的方法) */ @PostConstruct private void regCallBack() { // producer -> exchange 成功或失敗都會觸發此回呼 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { // 這個id是在訊息發送的時候傳入的 String id = correlationData.getId(); // 如果ack為true代表訊息被mq成功接收 if (!ack) { // 應答失敗,修改日志狀態 System.out.println("exchange 應答失敗,做失敗處理!"); } else { // 應答成功,修改日志狀態 System.out.println("exchange 成功處理"); } } }); // 這個回呼只有exchange -> queue 失敗時才會觸發 rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("exchange -> queue 發送失敗"); } }); } -
修改MQ發送訊息的方法,增加日志id的傳遞
String correlationId = "這是日志id"; rabbitTemplate.convertAndSend(exchange, routeKey, message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 消費者需要correlationId才做這個處理 message.getMessageProperties().setCorrelationId(correlationId); return message; } }, new CorrelationData(correlationId)); // 如果消費者不需要獲取correlationId,則用下面這種即可 rabbitTemplate.convertAndSend(exchange, routeKey, msg, new CorrelationData(correlationId));
可靠性消費
步驟:
- 開啟手動應答
- 監聽器增加手動應答邏輯
實作:
-
開啟手動應答
spring: rabbitmq: username: rmq password: 123456 virtual-host: / host: localhost port: 5672 listener: simple: acknowledge-mode: manual # 將自動應答ack模式改成手動應答acknowledge-mode有三種型別:
- nome:不進行ack,rabbitmq默認消費者正確處理所有請求
- munual:手動確認
- auto:自動確認訊息(默認型別),如果消費者拋出例外,則訊息重回佇列,
-
監聽器增加手動應答邏輯
@RabbitListener(queues = {"佇列名字"}) public void messageConsumer(String orderMsg, Channel channel, @Headers Map<String,Object> headers) throws Exception{ // 需要producer做相應處理,consumer才能拿到correlationId String correlationId = messages.getMessageProperties().getCorrelationId(); System.out.println("訊息為:" + orderMsg); long tag = Long.parseLong(headers.get(AmqpHeaders.DELIVERY_TAG).toString()); try { // 消費成功,進行確認 channel.basicAck(tag, false); } catch (IOException e) { // 消費失敗,重發 // requeue代表是否重發,為false則直接將訊息丟棄,有死信就進入死信佇列 channel.basicNack(tag, false, true); } }
總結
本文介紹了RabbitMQ的一些概念和簡單使用,有不少東西其實是沒有講清楚的,比如publisher-confirm-type和acknowledge-mode的幾種型別的區別等等,主要是在官方檔案找不到相關的細致描述,查檔案的能力還是有待提高,,,
參考資料
RabbitMq 技術檔案 - 騰訊云開發者社區-騰訊云 (tencent.com)
Spring AMQP
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/538300.html
標籤:Java
上一篇:java 基礎——陣列
