1. 死信及死信佇列
1.1 什么是死信
一般來說,生產者將訊息投遞到佇列中,消費者從佇列取出訊息進行消費,但某些時候由于特定的原因導致佇列中的某些訊息無法被消費,這樣的訊息如果沒有后續的處理,就變成了死信(Dead Letter),所有的死信都會放到死信佇列中,
為什么為有死信?訊息變成死信一般是以下三種情況:
- 訊息被拒絕,即
basicReject/basicNack,并且設定 requeue 引數為 false,這種情況一般訊息丟失 , - 訊息過期(TTL),TTL全稱為Time-To-Live,表示的是訊息的有效期,默認情況下 Rabbit 中的訊息不過期,但是可以設定佇列的過期時間和訊息的過期時間以達到訊息過期的效果 ,訊息如果在佇列中一直沒有被消費并且存在時間超過了TTL,訊息就會變成了"死信" ,后續無法再被消費,
- 佇列達到最大長度,一般當設定了最大佇列長度或大小并達到最大值時,
1.2 死信交換器 DLX
在訊息的拒絕操作都是在requeue = true情形下,如果為 false 可以發現當發生例外確認后,訊息丟失了,這肯定是不能容忍的,所以提出了死信交換器(dead-letter-exchange)的概念,
死信交換器仍然只是一個普通的交換器,創建時并沒有特別要求和操作,在創建佇列的時候,宣告該交換器將用作保存被拒絕的訊息即可,相關的引數是 x-dead-letter-exchange,當這個佇列中有死信時,RabbitMQ 就會自動的將這個訊息重新發布到設定的 Exchange 上去,進而被路由到另一個佇列,
舉個栗子
1、生產者生產 3 條訊息
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class DlxProducer {
public final static String EXCHANGE_NAME = "dlx_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//建立連接
Connection connection = RabbitMQUtils.getConnection();
// 創建一個信道
Channel channel = connection.createChannel();
// 指定轉發
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String[] routekeys = {"rabbit", "cat", "dog"};
for (int i = 0; i < 3; i++) {
String routekey = routekeys[i % 3];
String msg = "Hello,RabbitMq" + (i + 1);
channel.basicPublish(EXCHANGE_NAME, routekey, null, msg.getBytes());
System.out.println("Sent " + routekey + ":" + msg);
}
// 關閉頻道和連接
channel.close();
connection.close();
}
}
2、普通消費者消費訊息,但是不能消費全部的訊息,并把不能消費得訊息投遞到死信佇列,如果是我們還想做點其他事情,我們可以在死信交換的時候改變死信訊息的路由鍵,具體的相關的引數是 x-dead-letter-routing-key,
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* 類說明:普通的消費者,但是自己無法消費的訊息,將投入死信佇列
*/
public class NormalDlxConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
//建立連接
Connection connection = RabbitMQUtils.getConnection();
// 創建一個信道
Channel channel = connection.createChannel();
channel.exchangeDeclare(DlxProducer.EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//系結死信交換器
//宣告一個佇列,并系結死信交換器
String queueName = "dlx_queue";
Map<String, Object> argos = new HashMap<String, Object>();
argos.put("x-dead-letter-exchange", DlxConsumer.DLX_EXCHANGE_NAME);
//死信路由鍵,會替換訊息原來的路由鍵
//args.put("x-dead-letter-routing-key", "dead");
channel.queueDeclare(queueName, false, true, false, argos);
//系結,將佇列和交換器通過路由鍵進行系結
channel.queueBind(queueName, DlxProducer.EXCHANGE_NAME, "#");
System.out.println("waiting for message........");
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
//如果是cat的訊息確認
if (envelope.getRoutingKey().equals("cat")) {
System.out.println("Received[" + envelope.getRoutingKey() + "]" + message);
channel.basicAck(envelope.getDeliveryTag(), false);
} else {
//如果是其他的訊息拒絕(queue=false),成為死信訊息
System.out.println("Will reject[" + envelope.getRoutingKey() + "]" + message);
channel.basicReject(envelope.getDeliveryTag(), false);
}
}
};
channel.basicConsume(queueName, false, consumer);
}
}
3、申明一個消費者,負責消費死信佇列
mport com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 類說明:普通的消費者,負責消費死信佇列dlx_accept
*/
public class DlxConsumer {
public final static String DLX_EXCHANGE_NAME = "dlx_accept";
public static void main(String[] args) throws IOException, TimeoutException {
//建立連接
Connection connection = RabbitMQUtils.getConnection();
// 創建一個信道
Channel channel = connection.createChannel();
channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = "dlx_accept";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, DLX_EXCHANGE_NAME, "#");
System.out.println("waiting for message........");
//宣告了一個死信消費者
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received dead letter[" + envelope.getRoutingKey() + "]" + message);
}
};
//消費者正式開始在指定佇列上消費訊息
channel.basicConsume(queueName, true, consumer);
}
}
測驗結果:

DLX和備用交換器的區別
- 備用交換器是主交換器無法路由訊息,那么訊息將被路由到這個新的備用交換器,而死信交換器則是接收過期或者被拒絕的訊息,
- 備用交換器是在宣告主交換器時發生聯系,而死信交換器則宣告佇列時發生聯系,
場景分析:備用交換器一般是用于生產者生產訊息時,確保訊息可以盡量進入 RabbitMQ,而死信交換器主要是用于消費者消費訊息產生死信的場景(比如訊息過期,佇列滿了,訊息拒絕且不重新投遞),
2. 什么是延時佇列
延時佇列,首先,它是一種佇列,佇列意味著內部的元素是有序的,元素出隊和入隊是有方向性的,元素從一端進入,從另一端取出,
其次,延時佇列,最重要的特性就體現在它的延時屬性上,跟普通的佇列不一樣的是,普通佇列中的元素總是等著希望被早點取出處理,而延時佇列中的元素則是希望被在指定時間得到取出和處理,所以延時佇列中的元素是都是帶時間屬性的,通常來說是需要被處理的訊息或者任務,
簡單來說,延時佇列就是用來存放需要在指定時間被處理的元素的佇列,
RabbitMQ 是沒有延時屬性可以設定的,但是可以通過DLX+TTL的方式來實作 RabbitMQ 的延時佇列,
3. 延時佇列的使用場景
那么什么時候需要用延時佇列呢?考慮一下以下場景:
- 訂單在十分鐘之內未支付則自動取消,
- 新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送訊息提醒,
- 賬單在一周內未支付,則自動結算,
- 用戶注冊成功后,如果三天內沒有登陸則進行短信提醒,
- 用戶發起退款,如果三天內沒有得到處理則通知相關運營人員,
- 預定會議后,需要在預定的時間點前十分鐘通知各個與會人員參加會議,
這些場景都有一個特點,需要在某個事件發生之后或者之前的指定時間點完成某一項任務,如:發生訂單生成事件,在十分鐘之后檢查該訂單支付狀態,然后將未支付的訂單進行關閉;發生店鋪創建事件,十天后檢查該店鋪上新商品數,然后通知上新數為 0 的商戶;發生賬單生成事件,檢查賬單支付狀態,然后自動結算未支付的賬單;發生新用戶注冊事件,三天后檢查新注冊用戶的活動資料,然后通知沒有任何活動記錄的用戶;發生退款事件,在三天之后檢查該訂單是否已被處理,如仍未被處理,則發送訊息給相關運營人員;發生預定會議事件,判斷離會議開始是否只有十分鐘了,如果是,則通知各個與會人員,
看起來似乎使用定時任務,一直輪詢資料,每秒查一次,取出需要被處理的資料,然后處理不就完事了嗎?如果資料量比較少,確實可以這樣做,比如:對于“如果賬單一周內未支付則進行自動結算”這樣的需求,如果對于時間不是嚴格限制,而是寬松意義上的一周,那么每天晚上跑個定時任務檢查一下所有未支付的賬單,確實也是一個可行的方案,但對于資料量比較大,并且時效性較強的場景,如:“訂單十分鐘內未支付則關閉“,短期內未支付的訂單資料可能會有很多,活動期間甚至會達到百萬甚至千萬級別,對這么龐大的資料量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內無法完成所有訂單的檢查,同時會給資料庫帶來很大壓力,無法滿足業務要求而且性能低下,
4. 延時佇列的實作
在 RabbitMQ 中一般采用的是 TTL+DLX 的方式來實作延時佇列,DLX 上面已經介紹了,通過在創建佇列的時候設定佇列的x-dead-letter-exchange屬性,而 TTL 也同樣可以設定屬性x-message-ttl,如下:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 5000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
這樣所有被投遞到該佇列的訊息都最多不會存活超過 5s,
集齊了延時佇列的兩大要素,現在來看看一條延時訊息的處理程序,我們以機票訂單為例,一般機票下單之后會要求在 30 分鐘之內支付,如果 30 分鐘之后沒有支付則取消該訂單(為什么要取消,因為你站位了又不支付,不能影響別人買票),在我之前的專案中處理方式就是通過定時任務直接查庫來處理的,因為當時的訂單量并不是很大,
當下單后我們把訂單資訊發送到 MQ 的延時佇列中,并設定 30 分鐘過期,30 分鐘以后延時佇列的資料在轉發到死信佇列中去,然后我們從死信佇列中獲取訂單資訊,并判斷它的支付狀態,如果已經支付,不做任何處理,如果未支付,則取消訂單,
4.1 實作原理
生產者發送訊息到延遲佇列,對延遲佇列或訊息設定過期時間(TTL),過期之后通過死信交換機(DLX)把訊息重新發送到需要消費的佇列(死信佇列)中去進行消費,

- 創建死信佇列;
- 創建死信交換機;
- 死信佇列和死信交換機系結;
- 創建延時佇列,通過
TTL+DLX,并配置x-dead-letter-exchange和x-message-ttl屬性; - 創建延時交換機;
- 延時佇列和延時交換機系結,
4.2 配置佇列和交換機
常量
public class DelayConstant {
/**
* 延遲佇列 TTL 名稱
*/
public static final String ORDER_DELAY_QUEUE = "ticket.order.delay.queue";
/**
* 延時佇列
* 延時訊息就是發送到該交換機的
*/
public static final String ORDER_DELAY_EXCHANGE = "ticket.order.delay.exchange";
/**
* routing key 名稱 路由鍵
* 具體延時訊息發送在該 routingKey 的
*/
public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";
/**
* 死信佇列
*/
public static final String DEAD_ORDER_QUEUE_NAME = "dead.ticket.order.queue";
/**
* 死信佇列交換機 DLX,dead letter發送到的 exchange
*/
public static final String DEAD_ORDER_EXCHANGE_NAME = "dead.ticket.order.exchange";
/**
* 路由
*/
public static final String DEAD_ORDER_ROUTING_KEY = "dead.order";
}
佇列配置
@Configuration
public class DelayRabbitConfig {
/**
* 1.死信佇列
*/
@Bean
public Queue orderQueue() {
return new Queue(DelayConstant.DEAD_ORDER_QUEUE_NAME, true);
}
/**
* 2.死信交換機
* 通過死信交換機把死信訊息發送到指定的佇列中去
* 將路由鍵和某模式進行匹配,此時佇列需要系結要一個模式上,
*/
@Bean
public TopicExchange orderTopicExchange() {
return new TopicExchange(DelayConstant.DEAD_ORDER_EXCHANGE_NAME);
}
/**
* 3.死信佇列(系結交換機)
*/
@Bean
public Binding orderBinding() {
// TODO 如果要讓延遲佇列之間有關聯,這里的 routingKey 和 系結的交換機很關鍵
return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(DelayConstant.DEAD_ORDER_ROUTING_KEY);
}
/**
* 4.延時佇列配置
* <p>
* 1、第一種方式是直接設定 Queue 延遲時間 但如果直接給佇列設定過期時間,這種做法不是很靈活,(當然二者是兼容的,默認是時間小的優先)
* params.put("x-message-ttl", 5 * 1000);
* 2、第二種就是每次發送訊息動態設定延遲時間,這樣我們可以靈活控制
*/
@Bean
public Queue delayOrderQueue() {
Map<String, Object> params = new HashMap<>();
// x-dead-letter-exchange 宣告了佇列里的死信轉發到的DLX名稱,即死信訊息轉發到那個佇列
params.put("x-dead-letter-exchange", DelayConstant.DEAD_ORDER_EXCHANGE_NAME);
// x-dead-letter-routing-key 宣告了這些死信在轉發時攜帶的 routing-key 名稱,
params.put("x-dead-letter-routing-key", DelayConstant.DEAD_ORDER_ROUTING_KEY);
return new Queue(DelayConstant.ORDER_DELAY_QUEUE, true, false, false, params);
}
/**
* 5.延時佇列系結到交換機上,要求該訊息與一個特定的路由鍵完全匹配,
* @return DirectExchange
*/
@Bean
public DirectExchange orderDelayExchange() {
return new DirectExchange(DelayConstant.ORDER_DELAY_EXCHANGE);
}
/**
* 6.延時佇列系結交換機
*/
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(DelayConstant.ORDER_DELAY_ROUTING_KEY);
}
}
4.3 創建訂單訊息
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order implements Serializable {
/**
* 訂單id
*/
private String orderId;
/**
* 訂單名稱
*/
private String name;
/**
* 訂單狀態 0:未支付,1:已支付,2:訂單已取消
*/
private Integer orderStatus;
/**
* 下單時間
*/
private Date orderTime;
/**
* 訂單金額
*/
private BigDecimal amount;
}
4.4 訊息生產者
import com.javatv.bean.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @description : 訊息生產者
*/
@Component
@Slf4j
public class RabbitmqOrderProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void publish(Order order, String messageId, String exchangeName, String key) {
/* 確認的回呼 確認訊息是否到達 Broker 服務器 其實就是是否到達交換器
* 如果發送時候指定的交換器不存在 ack 就是 false 代表訊息不可達
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("correlationData:{} , ack:{}", correlationData.getId(), ack);
if (!ack) {
System.out.println("進行對應的訊息補償機制");
}
});
/* 訊息失敗的回呼
* 例如訊息已經到達交換器上,但路由鍵匹配任何系結到該交換器的佇列,會觸發這個回呼,此時 replyText: NO_ROUTE
*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("message:{}; replyCode: {}; replyText: {} ; exchange:{} ; routingKey:{}",
message, replyCode, replyText, exchange, routingKey);
});
// 在實際中ID 應該是全域唯一 能夠唯一標識訊息 訊息不可達的時候觸發ConfirmCallback回呼方法時可以獲取該值,進行對應的錯誤處理
CorrelationData correlationData = new CorrelationData(messageId);
rabbitTemplate.convertAndSend(exchangeName, key, order, message -> {
/**
* 如果配置了 params.put("x-message-ttl", 60 * 1000 * 30);
* 那么這一句也可以省略,具體根據業務需要是宣告 Queue 的時候就指定好延遲時間還是在發送自己控制時間
* 這里為了演示設定為 10 s
*/
message.getMessageProperties().setExpiration(1000 * 10 + "");
return message;
}, correlationData);
}
}
4.5 訊息消費者
import com.javatv.bean.Order;
import com.javatv.constant.DelayConstant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class DelayConsumer {
@RabbitListener(queues = {DelayConstant.DEAD_ORDER_QUEUE_NAME})
public void orderDelayQueue(Order order, Message message, Channel channel) {
System.out.println("###########################################");
System.out.println("【orderDelayQueue 監聽的訊息】 - 【消費時間】 - ["
+new Date()+"]- 【訂單內容】 - ["+order.toString()+"]");
if(order.getOrderStatus() == 0) {
order.setOrderStatus(2);
System.out.println("【該訂單未支付,取消訂單】" + order.toString());
} else if(order.getOrderStatus() == 1) {
System.out.println("【該訂單已完成支付】");
} else if(order.getOrderStatus() == 2) {
System.out.println("【該訂單已取消】");
}
System.out.println("###########################################");
}
}
4.6 測驗
我們模擬兩條訂單訊息,如下:
@Test
public void sendDelay() {
Order order1 = new Order();
String id1 = String.valueOf(Math.round(Math.random() * 10000));
order1.setOrderId(id1);
order1.setOrderStatus(0);
order1.setName("杭州-北京");
Order order2 = new Order();
String id2 = String.valueOf(Math.round(Math.random() * 10000));
order2.setOrderId(id2);
order2.setOrderStatus(0);
order2.setName("北京-深圳");
orderProducer.publish(order1, id1, DelayConstant.ORDER_DELAY_EXCHANGE, DelayConstant.ORDER_DELAY_ROUTING_KEY);
orderProducer.publish(order2, id2, DelayConstant.ORDER_DELAY_EXCHANGE, DelayConstant.ORDER_DELAY_ROUTING_KEY);
}
當發布訊息后,我們在客戶端去查看資料:
1、剛開始存在于延時佇列里面,如下:

2、當訊息過期之后,則存在死信佇列中,如下:

然后我們在開啟消費者服務,監控如下:

4.7 不足之處
在正常情況下,我們等待的時間都是一樣的,假如都是 30 分鐘,但如果一個佇列存在不同的延時訊息怎么辦?
第一種就是不同的時間用不同的佇列;
第二種如果是同一佇列的話則存在以下問題:
如果我發送兩條延時訊息,第 1 條延時時間設為 60 s,第二條訊息設為 2 s,且第 1 條訊息優先發送,常規訊息處理應該是第 2 條訊息先過期并進入死信佇列然后進行消費,但實際情況是 RabbitMQ 只會檢查第 1 條訊息是否過期,如果過期則丟到死信佇列,如果第 1 條訊息的延時時長很長,而第二個訊息的延時時長很短,則第二個訊息并不會優先得到執行,(可自行測驗一下,這里不演示)
5. 延時佇列插件
對于上面存在的問題,如果不能實作在訊息粒度上添加 TTL,并使其在設定的 TTL 時間及時死亡,就無法設計成一個通用的延時佇列,
5.1 插件安裝
RabbitMQ 3.5.7版本以后支持延遲插件,通過插件可以很好的解決上面的問題,進入插件官網:
https://www.rabbitmq.com/community-plugins.html
找到 rabbitmq_delayed_message_exchange插件,然后解壓放置到RabbitMQ的插件目錄,

以前都是一個壓縮包,現在直接是解壓好的檔案,下載之后把.ez檔案拷貝到 RabbitMQ 安裝目錄下的 plugins 目錄中,如下:

然后進入 RabbitMQ 的安裝目錄下的 sbin 目錄,執行下面命令讓該插件生效,然后重啟RabbitMQ,
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
執行成功后如下:

5.2 配置佇列和交換機
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DelayMessageConfig {
public static final String DELAY_EXCHANGE_NAME = "plugin.delay.exchange";
public static final String DELAY_QUEUE_NAME = "plugin.delay.queue";
public static final String ROUTING_KRY = "plugin.delay.queue";
/**
* 宣告一個延遲佇列
* @return
*/
@Bean
Queue delayQueue(){
return QueueBuilder.durable(DELAY_QUEUE_NAME).build();
}
/**
* 宣告一個交換機
* @return
*/
@Bean
CustomExchange delayExchange(){
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true,false, args);
}
/**
* 系結
* @param delayQueue
* @param delayExchange
* @return
*/
@Bean
Binding queueBinding(Queue delayQueue, CustomExchange delayExchange){
return BindingBuilder.bind(delayQueue).to(delayExchange).with(ROUTING_KRY).noargs();
}
}
5.3 訊息生產者
import com.javatv.bean.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @description : 訊息生產者
*/
@Component
@Slf4j
public class RabbitmqDelayOrderProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
*
* @param order 訊息
* @param messageId 唯一id
* @param exchangeName 交換機
* @param key 路由鍵
* @param delayTime 延遲時間
*/
public void publish(Order order, String messageId, String exchangeName, String key,Integer delayTime) {
/* 確認的回呼 確認訊息是否到達 Broker 服務器 其實就是是否到達交換器
* 如果發送時候指定的交換器不存在 ack 就是 false 代表訊息不可達
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
log.info("correlationData:{} , ack:{}", correlationData.getId(), ack);
if (!ack) {
System.out.println("進行對應的訊息補償機制");
}
});
/* 訊息失敗的回呼
* 例如訊息已經到達交換器上,但路由鍵匹配任何系結到該交換器的佇列,會觸發這個回呼,此時 replyText: NO_ROUTE
*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
log.info("message:{}; replyCode: {}; replyText: {} ; exchange:{} ; routingKey:{}",
message, replyCode, replyText, exchange, routingKey);
});
// 在實際中ID 應該是全域唯一 能夠唯一標識訊息 訊息不可達的時候觸發ConfirmCallback回呼方法時可以獲取該值,進行對應的錯誤處理
CorrelationData correlationData = new CorrelationData(messageId);
rabbitTemplate.convertAndSend(exchangeName, key, order, message -> {
// 設定延遲時間
message.getMessageProperties().setDelay(delayTime);
return message;
}, correlationData);
}
}
5.4 訊息消費者
import com.javatv.bean.Order;
import com.javatv.constant.DelayConstant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class DelayPluginConsumer {
@RabbitListener(queues = {"plugin.delay.queue"})
public void orderDelayQueue(Order order, Message message, Channel channel) {
System.out.println("###########################################");
System.out.println("【orderDelayQueue 監聽的訊息】 - 【消費時間】 - ["
+new Date()+"]- 【訂單內容】 - ["+order.toString()+"]");
if(order.getOrderStatus() == 0) {
order.setOrderStatus(2);
System.out.println("【該訂單未支付,取消訂單】" + order.toString());
} else if(order.getOrderStatus() == 1) {
System.out.println("【該訂單已完成支付】");
} else if(order.getOrderStatus() == 2) {
System.out.println("【該訂單已取消】");
}
System.out.println("###########################################");
}
}
5.5 測驗
@Test
public void sendDelayPlugin() {
Order order1 = new Order();
order1.setOrderId("1");
order1.setOrderStatus(0);
order1.setName("杭州-北京");
Order order2 = new Order();
order2.setOrderId("2");
order2.setOrderStatus(0);
order2.setName("北京-深圳");
// 發送一條延遲 60s 的訊息
delayOrderProducer.publish(order1, "1", DelayPluginConfig.DELAY_EXCHANGE_NAME, DelayPluginConfig.ROUTING_KRY,1000 * 60);
// 發送一條延遲 5s 的訊息
delayOrderProducer.publish(order2, "2", DelayPluginConfig.DELAY_EXCHANGE_NAME, DelayPluginConfig.ROUTING_KRY,1000 * 5);
}
執行測驗類的 5s 內延遲佇列中是沒有訊息的,如下:

在 5s 之后存在第 1 條訊息,如下:

在 60s 之后存在第 2 條訊息,如下:

然后我們在開啟消費者的情況下,也不會因為延遲訊息時間長的訊息沒進入佇列而不消費延時時間短的訊息,如下:

6. 總結
如果你仔細看了這篇文章,你會發現在自己通過TTL+DLX實作延時佇列的時候,我把消費之前的訊息定義在為延時佇列,而過期的訊息存入的佇列我把它稱為了死信佇列,因為這條訊息是死信訊息,但插件定義的延遲佇列來放死信訊息,其實兩者的定義并不沖突,都是可以理解的,我的意思是這樣的牛角不要鉆,
在學習延時佇列的時候也查閱了部分博客,看到了這樣一個問題:
RabbitMQ 的延時任務和訊息確認機制沖突嗎?即使訊息發送成功了,
setReturnCallback這個回呼還是被觸發了,
就我目前演示的版本,setReturnCallback是沒有被觸發的,不知道是否之前的版本有過這樣的問題就不得而知了,
原始碼:https://gitee.com/javatv/advanced-way/tree/master/spring-boot-rabbitmq
參考:https://www.cnblogs.com/mfrank/p/11260355.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/432167.html
標籤:其他
上一篇:實時計算知識,最詳細的整理
下一篇:Flink RPC原始碼流程
