1.原始碼獲取地址
文章末尾有源代碼地址
https://www.sunnyblog.top/detail.html?id=1265257400324063232
本章節主要實作訊息的延遲消費,在學習延遲消費之前必須先了解RabbitMQ兩個基本概念,訊息的TTL和死信Exchange,通過這兩者的組合來實作訊息的延遲消費,
不想看原理講解的,直接通過標題6看代碼實作
2.訊息的TTL(Time To Live)
訊息的TTL就是訊息的存活時間,RabbitMQ可以對佇列和訊息分別設定TTL,對佇列設定就是佇列沒有消費者連著的保留時間,也可以對每一個單獨的訊息做單獨的設定,超過了這個時間,我們認為這個訊息就死了,稱之為死信,
3.死信交換器 Dead Letter Exchanges
- 一個訊息在滿足如下條件下,會進死信路由,記住這里是路由而不是佇列,一個路由可以對應很多佇列,
- 一個訊息被Consumer拒收了,并且reject方法的引數里requeue是false,也就是說不會被再次放在佇列里,被其他消費者使用,
- 上面的訊息的TTL到了,訊息過期了
- 佇列的長度限制滿了,排在前面的訊息會被丟棄或者扔到死信路由上, 死信交換器(Dead Letter Exchange)其實就是一種普通的exchange,和創建其他exchange沒有兩樣,只是在某一個設定Dead Letter Exchange的佇列中有訊息過期了,會自動觸發訊息的轉發,發送到Dead Letter Exchange中去,
4.實作延遲消費原理

- 大概原理:首先發送訊息到死信佇列,死信佇列設定ttl過期時間,到期之后會自動將訊息發送到一般佇列實作訊息的消費
- 實作步驟如下
- 創建死信交換器
- 創建死信佇列
- 將死信佇列與死信交換機系結,不能是任意系結了,而是要指定一個RoutingKey(路由key)訊息的發送方在向 Exchange發送訊息時,也必須指定訊息的RoutingKey,Exchange不再把訊息交給每一個系結的佇列,而是根據訊息的Routing Key進行判斷,只有佇列的Routingkey與訊息的Routing key完全一致,才會接收到訊息.
- 創建正常交換器
- 創建正常佇列
- 將正常佇列系結到正常交換器
5.基于案例實作訊息的延遲消費
這里我們以最熟悉的12306購票為例進行案例場景的分析,12306購票步驟如下:
- 首先登錄12306根據日期和起點站等條件進行搶票下訂單
- 搶到票訂單處于未支付狀態,并提示支付時間30分鐘內

- 這里就可以使用到延時佇列,在下訂單完成的時候將訂單號發送到MQ的死信佇列,并設定30分鐘過期,30分鐘以后死信佇列的資料會轉發到正常佇列,從正常佇列中獲取到下訂單的訂單號,然后我們根據訂單號查詢訂單的支付狀態,如果已經支付我們不做任何操作,如果未支付取消訂單,關閉支付狀態,將票回滾到票池供其他用戶購買
6.代碼實作
-
在RabbitMQConfig中創建佇列、交換機以及系結關系
@Configuration public class RabbitMQConfig { /** * 測驗發送訊息到MQ * @return */ @Bean public Queue testHello() { return new Queue(SysConstant.QUEUE_TEST_HELLO); } /** * 死信交換機 * @return */ @Bean public DirectExchange sysOrderDelayExchange() { return new DirectExchange(SysConstant.SYS_ORDER_DELAY_EXCHANGE); } /** * 死信佇列 * @return */ @Bean public Queue sysOrderDelayQueue() { Map<String, Object> map = new HashMap<String, Object>(16); map.put("x-dead-letter-exchange",SysConstant.SYS_ORDER_RECEIVE_EXCHANGE); //指定死信送往的交換機 map.put("x-dead-letter-routing-key", SysConstant.SYS_ORDER_RECEIVE_KEY); //指定死信的routingkey return new Queue(SysConstant.SYS_ORDER_DELAY_QUEUE, true, false, false, map); } /** * 給死信佇列系結死信交換機 * @return */ @Bean public Binding sysOrderDelayBinding() { return BindingBuilder.bind(sysOrderDelayQueue()).to(sysOrderDelayExchange()).with(SysConstant.SYS_ORDER_DELAY_KEY); } /** * 死信接收交換機,用于接收死信佇列的訊息 * @return */ @Bean public DirectExchange sysOrderReceiveExchange() { return new DirectExchange(SysConstant.SYS_ORDER_RECEIVE_EXCHANGE); } /** * 死信接收佇列 * @return */ @Bean public Queue sysOrderReceiveQueue() { return new Queue(SysConstant.SYS_ORDER_RECEIVE_QUEUE); } /** * 死信接收交換機系結接收死信佇列消費佇列 * @return */ @Bean public Binding sysOrdeReceiveBinding() { return BindingBuilder.bind(sysOrderReceiveQueue()).to(sysOrderReceiveExchange()).with(SysConstant.SYS_ORDER_RECEIVE_KEY); } } -
發送延時訊息到死信交換器方法 @Service public class MsgService { @Autowired private RabbitTemplate rabbitTemplate; /** * 發送延時訊息到mq * @param exchange 死信交換機 * @param routeKey 路由key * @param data 發送資料 * @param delayTime 過期時間,單位毫秒 */ public void sendDelayMsgToMQ(String exchange, String routeKey, String data,int delayTime) { rabbitTemplate.convertAndSend(exchange, routeKey, data, message -> { message.getMessageProperties().setExpiration(delayTime + ""); return message; }); } } -
監聽佇列訊息ReceiveMsgListener類
/** * 獲取到的延時訊息 * 這里接收到訊息進行對應的業務處理(例如:取消訂單,關閉支付,回滾庫存等 ...) * @param msg */ @RabbitListener(queues = SysConstant.SYS_ORDER_RECEIVE_QUEUE) @RabbitHandler public void getdelayMsg(String msg) { log.info("MQ接收訊息時間:{},訊息內容:{}", DateUtil.formatDateTime(DateUtil.date()),msg); log.info("------->這里實作訂單關閉、支付關閉、回滾庫存業務邏輯..."); } -
創建Controller向佇列發送訊息,設定過期時間10秒 @RestController @RequestMapping("mq") @Slf4j public class MQController { @Autowired private MsgService msgService; @GetMapping("sendMsg") public String sendMsg() { log.info("發送延時訊息時間:" + DateUtil.formatDateTime(DateUtil.date())); OrderInfo orderInfo = new OrderInfo(); orderInfo.setOrderId(IdUtil.fastSimpleUUID()); orderInfo.setOrderState("待支付"); orderInfo.setPayMoney(999.88); msgService.sendDelayMsgToMQ(SysConstant.SYS_ORDER_DELAY_EXCHANGE,SysConstant.SYS_ORDER_DELAY_KEY, JSONUtil.toJsonStr(orderInfo),10*1000);//1分鐘 return JSONUtil.toJsonStr("發送延時訊息成功"); } } -
啟動服務,可以看到MQ中創建對應的佇列和交換器


- 控制臺日志可以看到發送訊息與消費訊息間隔時間是10s

7.更多MQ技術檔案獲取
https://www.sunnyblog.top/index.html?tagId=1264009609236971520
詳細開發技術檔案盡在 點擊這里查看技術檔案 ;更多技術文章: https://www.sunnyblog.top;任何疑問加QQ群咨詢:534073451
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/180695.html
標籤:Java
