主頁 >  其他 > 一文搞懂 RabbitMQ 延時佇列(訂單定時取消為例)

一文搞懂 RabbitMQ 延時佇列(訂單定時取消為例)

2022-02-25 08:01:50 其他

1. 死信及死信佇列

1.1 什么是死信

一般來說,生產者將訊息投遞到佇列中,消費者從佇列取出訊息進行消費,但某些時候由于特定的原因導致佇列中的某些訊息無法被消費,這樣的訊息如果沒有后續的處理,就變成了死信(Dead Letter),所有的死信都會放到死信佇列中,

為什么為有死信?訊息變成死信一般是以下三種情況:

  1. 訊息被拒絕,即basicReject/basicNack,并且設定 requeue 引數為 false,這種情況一般訊息丟失 ,
  2. 訊息過期(TTL),TTL全稱為Time-To-Live,表示的是訊息的有效期,默認情況下 Rabbit 中的訊息不過期,但是可以設定佇列的過期時間和訊息的過期時間以達到訊息過期的效果 ,訊息如果在佇列中一直沒有被消費并且存在時間超過了TTL,訊息就會變成了"死信" ,后續無法再被消費,
  3. 佇列達到最大長度,一般當設定了最大佇列長度或大小并達到最大值時,

1.2 死信交換器 DLX

在訊息的拒絕操作都是在requeue = true情形下,如果為 false 可以發現當發生例外確認后,訊息丟失了,這肯定是不能容忍的,所以提出了死信交換器(dead-letter-exchange)的概念,

死信交換器仍然只是一個普通的交換器,創建時并沒有特別要求和操作,在創建佇列的時候,宣告該交換器將用作保存被拒絕的訊息即可,相關的引數是 x-dead-letter-exchange,當這個佇列中有死信時,RabbitMQ 就會自動的將這個訊息重新發布到設定的 Exchange 上去,進而被路由到另一個佇列,

舉個栗子

1、生產者生產 3 條訊息

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class DlxProducer {

    public final static String EXCHANGE_NAME = "dlx_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立連接
        Connection connection = RabbitMQUtils.getConnection();
        // 創建一個信道
        Channel channel = connection.createChannel();
        // 指定轉發
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String[] routekeys = {"rabbit", "cat", "dog"};
        for (int i = 0; i < 3; i++) {
            String routekey = routekeys[i % 3];
            String msg = "Hello,RabbitMq" + (i + 1);
            channel.basicPublish(EXCHANGE_NAME, routekey, null, msg.getBytes());
            System.out.println("Sent " + routekey + ":" + msg);
        }
        // 關閉頻道和連接
        channel.close();
        connection.close();
    }
}

2、普通消費者消費訊息,但是不能消費全部的訊息,并把不能消費得訊息投遞到死信佇列,如果是我們還想做點其他事情,我們可以在死信交換的時候改變死信訊息的路由鍵,具體的相關的引數是 x-dead-letter-routing-key

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * 類說明:普通的消費者,但是自己無法消費的訊息,將投入死信佇列
 */
public class NormalDlxConsumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立連接
        Connection connection = RabbitMQUtils.getConnection();
        // 創建一個信道
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(DlxProducer.EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        //系結死信交換器
        //宣告一個佇列,并系結死信交換器
        String queueName = "dlx_queue";
        Map<String, Object> argos = new HashMap<String, Object>();
        argos.put("x-dead-letter-exchange", DlxConsumer.DLX_EXCHANGE_NAME);
        //死信路由鍵,會替換訊息原來的路由鍵
        //args.put("x-dead-letter-routing-key", "dead");
        channel.queueDeclare(queueName, false, true, false, argos);
        //系結,將佇列和交換器通過路由鍵進行系結
        channel.queueBind(queueName, DlxProducer.EXCHANGE_NAME, "#");
        System.out.println("waiting for message........");
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                //如果是cat的訊息確認
                if (envelope.getRoutingKey().equals("cat")) {
                    System.out.println("Received[" + envelope.getRoutingKey() + "]" + message);
                    channel.basicAck(envelope.getDeliveryTag(), false);
                } else {
                    //如果是其他的訊息拒絕(queue=false),成為死信訊息
                    System.out.println("Will reject[" + envelope.getRoutingKey() + "]" + message);
                    channel.basicReject(envelope.getDeliveryTag(), false);
                }
            }
        };
        channel.basicConsume(queueName, false, consumer);
    }
}

3、申明一個消費者,負責消費死信佇列

mport com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 類說明:普通的消費者,負責消費死信佇列dlx_accept
 */
public class DlxConsumer {

    public final static String DLX_EXCHANGE_NAME = "dlx_accept";

    public static void main(String[] args) throws IOException, TimeoutException {
        //建立連接
        Connection connection = RabbitMQUtils.getConnection();
        // 創建一個信道
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
        String queueName = "dlx_accept";
        channel.queueDeclare(queueName, false, false, false, null);
        channel.queueBind(queueName, DLX_EXCHANGE_NAME, "#");
        System.out.println("waiting for message........");
        //宣告了一個死信消費者
        final Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag,
                                       Envelope envelope,
                                       AMQP.BasicProperties properties,
                                       byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("Received dead letter[" + envelope.getRoutingKey() + "]" + message);
            }
        };
        //消費者正式開始在指定佇列上消費訊息
        channel.basicConsume(queueName, true, consumer);
    }
}

測驗結果:

image-20211019141510180

DLX和備用交換器的區別

  1. 備用交換器是主交換器無法路由訊息,那么訊息將被路由到這個新的備用交換器,而死信交換器則是接收過期或者被拒絕的訊息,
  2. 備用交換器是在宣告主交換器時發生聯系,而死信交換器則宣告佇列時發生聯系,

場景分析:備用交換器一般是用于生產者生產訊息時,確保訊息可以盡量進入 RabbitMQ,而死信交換器主要是用于消費者消費訊息產生死信的場景(比如訊息過期,佇列滿了,訊息拒絕且不重新投遞),

2. 什么是延時佇列

延時佇列,首先,它是一種佇列,佇列意味著內部的元素是有序的,元素出隊和入隊是有方向性的,元素從一端進入,從另一端取出,

其次,延時佇列,最重要的特性就體現在它的延時屬性上,跟普通的佇列不一樣的是,普通佇列中的元素總是等著希望被早點取出處理,而延時佇列中的元素則是希望被在指定時間得到取出和處理,所以延時佇列中的元素是都是帶時間屬性的,通常來說是需要被處理的訊息或者任務,

簡單來說,延時佇列就是用來存放需要在指定時間被處理的元素的佇列,

RabbitMQ 是沒有延時屬性可以設定的,但是可以通過DLX+TTL的方式來實作 RabbitMQ 的延時佇列,

3. 延時佇列的使用場景

那么什么時候需要用延時佇列呢?考慮一下以下場景:

  1. 訂單在十分鐘之內未支付則自動取消,
  2. 新創建的店鋪,如果在十天內都沒有上傳過商品,則自動發送訊息提醒,
  3. 賬單在一周內未支付,則自動結算,
  4. 用戶注冊成功后,如果三天內沒有登陸則進行短信提醒,
  5. 用戶發起退款,如果三天內沒有得到處理則通知相關運營人員,
  6. 預定會議后,需要在預定的時間點前十分鐘通知各個與會人員參加會議,

這些場景都有一個特點,需要在某個事件發生之后或者之前的指定時間點完成某一項任務,如:發生訂單生成事件,在十分鐘之后檢查該訂單支付狀態,然后將未支付的訂單進行關閉;發生店鋪創建事件,十天后檢查該店鋪上新商品數,然后通知上新數為 0 的商戶;發生賬單生成事件,檢查賬單支付狀態,然后自動結算未支付的賬單;發生新用戶注冊事件,三天后檢查新注冊用戶的活動資料,然后通知沒有任何活動記錄的用戶;發生退款事件,在三天之后檢查該訂單是否已被處理,如仍未被處理,則發送訊息給相關運營人員;發生預定會議事件,判斷離會議開始是否只有十分鐘了,如果是,則通知各個與會人員,

看起來似乎使用定時任務,一直輪詢資料,每秒查一次,取出需要被處理的資料,然后處理不就完事了嗎?如果資料量比較少,確實可以這樣做,比如:對于“如果賬單一周內未支付則進行自動結算”這樣的需求,如果對于時間不是嚴格限制,而是寬松意義上的一周,那么每天晚上跑個定時任務檢查一下所有未支付的賬單,確實也是一個可行的方案,但對于資料量比較大,并且時效性較強的場景,如:“訂單十分鐘內未支付則關閉“,短期內未支付的訂單資料可能會有很多,活動期間甚至會達到百萬甚至千萬級別,對這么龐大的資料量仍舊使用輪詢的方式顯然是不可取的,很可能在一秒內無法完成所有訂單的檢查,同時會給資料庫帶來很大壓力,無法滿足業務要求而且性能低下,

4. 延時佇列的實作

在 RabbitMQ 中一般采用的是 TTL+DLX 的方式來實作延時佇列,DLX 上面已經介紹了,通過在創建佇列的時候設定佇列的x-dead-letter-exchange屬性,而 TTL 也同樣可以設定屬性x-message-ttl,如下:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 5000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

這樣所有被投遞到該佇列的訊息都最多不會存活超過 5s,

集齊了延時佇列的兩大要素,現在來看看一條延時訊息的處理程序,我們以機票訂單為例,一般機票下單之后會要求在 30 分鐘之內支付,如果 30 分鐘之后沒有支付則取消該訂單(為什么要取消,因為你站位了又不支付,不能影響別人買票),在我之前的專案中處理方式就是通過定時任務直接查庫來處理的,因為當時的訂單量并不是很大,

當下單后我們把訂單資訊發送到 MQ 的延時佇列中,并設定 30 分鐘過期,30 分鐘以后延時佇列的資料在轉發到死信佇列中去,然后我們從死信佇列中獲取訂單資訊,并判斷它的支付狀態,如果已經支付,不做任何處理,如果未支付,則取消訂單,

4.1 實作原理

生產者發送訊息到延遲佇列,對延遲佇列或訊息設定過期時間(TTL),過期之后通過死信交換機(DLX)把訊息重新發送到需要消費的佇列(死信佇列)中去進行消費,

image-20220224192355290

  1. 創建死信佇列;
  2. 創建死信交換機;
  3. 死信佇列和死信交換機系結;
  4. 創建延時佇列,通過TTL+DLX,并配置x-dead-letter-exchangex-message-ttl屬性;
  5. 創建延時交換機;
  6. 延時佇列和延時交換機系結,

4.2 配置佇列和交換機

常量

public class DelayConstant {

    /**
     * 延遲佇列 TTL 名稱
     */
    public static final String ORDER_DELAY_QUEUE = "ticket.order.delay.queue";

    /**
     * 延時佇列
     * 延時訊息就是發送到該交換機的
     */
    public static final String ORDER_DELAY_EXCHANGE = "ticket.order.delay.exchange";

    /**
     * routing key 名稱 路由鍵
     * 具體延時訊息發送在該 routingKey 的
     */
    public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";

    /**
     * 死信佇列
     */
    public static final String DEAD_ORDER_QUEUE_NAME = "dead.ticket.order.queue";

    /**
     * 死信佇列交換機 DLX,dead letter發送到的 exchange
     */
    public static final String DEAD_ORDER_EXCHANGE_NAME = "dead.ticket.order.exchange";

    /**
     * 路由
     */
    public static final String DEAD_ORDER_ROUTING_KEY = "dead.order";
}

佇列配置

@Configuration
public class DelayRabbitConfig {


    /**
     * 1.死信佇列
     */
    @Bean
    public Queue orderQueue() {
        return new Queue(DelayConstant.DEAD_ORDER_QUEUE_NAME, true);
    }

    /**
     * 2.死信交換機
     * 通過死信交換機把死信訊息發送到指定的佇列中去
     * 將路由鍵和某模式進行匹配,此時佇列需要系結要一個模式上,
     */
    @Bean
    public TopicExchange orderTopicExchange() {
        return new TopicExchange(DelayConstant.DEAD_ORDER_EXCHANGE_NAME);
    }

    /**
     * 3.死信佇列(系結交換機)
     */
    @Bean
    public Binding orderBinding() {
        // TODO 如果要讓延遲佇列之間有關聯,這里的 routingKey 和 系結的交換機很關鍵
        return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(DelayConstant.DEAD_ORDER_ROUTING_KEY);
    }


    /**
     * 4.延時佇列配置
     * <p>
     * 1、第一種方式是直接設定 Queue 延遲時間 但如果直接給佇列設定過期時間,這種做法不是很靈活,(當然二者是兼容的,默認是時間小的優先)
     * params.put("x-message-ttl", 5 * 1000);
     * 2、第二種就是每次發送訊息動態設定延遲時間,這樣我們可以靈活控制
     */
    @Bean
    public Queue delayOrderQueue() {
        Map<String, Object> params = new HashMap<>();
        // x-dead-letter-exchange 宣告了佇列里的死信轉發到的DLX名稱,即死信訊息轉發到那個佇列
        params.put("x-dead-letter-exchange", DelayConstant.DEAD_ORDER_EXCHANGE_NAME);
        // x-dead-letter-routing-key 宣告了這些死信在轉發時攜帶的 routing-key 名稱,
        params.put("x-dead-letter-routing-key", DelayConstant.DEAD_ORDER_ROUTING_KEY);
        return new Queue(DelayConstant.ORDER_DELAY_QUEUE, true, false, false, params);
    }

    /**
     * 5.延時佇列系結到交換機上,要求該訊息與一個特定的路由鍵完全匹配,
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderDelayExchange() {
        return new DirectExchange(DelayConstant.ORDER_DELAY_EXCHANGE);
    }

    /**
     * 6.延時佇列系結交換機
     */
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(DelayConstant.ORDER_DELAY_ROUTING_KEY);
    }
}

4.3 創建訂單訊息

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order implements Serializable {

    /**
     * 訂單id
     */
    private String orderId;

    /**
     * 訂單名稱
     */
    private String name;


    /**
     * 訂單狀態 0:未支付,1:已支付,2:訂單已取消
     */
    private Integer orderStatus;

    /**
     * 下單時間
     */
    private Date orderTime;

    /**
     * 訂單金額
     */
    private BigDecimal amount;
}

4.4 訊息生產者

import com.javatv.bean.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @description : 訊息生產者
 */
@Component
@Slf4j
public class RabbitmqOrderProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void publish(Order order, String messageId, String exchangeName, String key) {
        /* 確認的回呼 確認訊息是否到達 Broker 服務器 其實就是是否到達交換器
         * 如果發送時候指定的交換器不存在 ack 就是 false 代表訊息不可達
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("correlationData:{} , ack:{}", correlationData.getId(), ack);
            if (!ack) {
                System.out.println("進行對應的訊息補償機制");
            }
        });
        /* 訊息失敗的回呼
         * 例如訊息已經到達交換器上,但路由鍵匹配任何系結到該交換器的佇列,會觸發這個回呼,此時 replyText: NO_ROUTE
         */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("message:{}; replyCode: {}; replyText: {} ; exchange:{} ; routingKey:{}",
                    message, replyCode, replyText, exchange, routingKey);
        });
        // 在實際中ID 應該是全域唯一 能夠唯一標識訊息 訊息不可達的時候觸發ConfirmCallback回呼方法時可以獲取該值,進行對應的錯誤處理
        CorrelationData correlationData = new CorrelationData(messageId);
        rabbitTemplate.convertAndSend(exchangeName, key, order, message -> {
            /**
             * 如果配置了 params.put("x-message-ttl", 60 * 1000 * 30);
             * 那么這一句也可以省略,具體根據業務需要是宣告 Queue 的時候就指定好延遲時間還是在發送自己控制時間
             * 這里為了演示設定為 10 s
             */
            message.getMessageProperties().setExpiration(1000 * 10 + "");
            return message;
        }, correlationData);
    }
}

4.5 訊息消費者

import com.javatv.bean.Order;
import com.javatv.constant.DelayConstant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class DelayConsumer {

    @RabbitListener(queues = {DelayConstant.DEAD_ORDER_QUEUE_NAME})
    public void orderDelayQueue(Order order, Message message, Channel channel) {
        System.out.println("###########################################");
        System.out.println("【orderDelayQueue 監聽的訊息】 - 【消費時間】 - ["
                +new Date()+"]- 【訂單內容】 - ["+order.toString()+"]");

        if(order.getOrderStatus() == 0) {
            order.setOrderStatus(2);
            System.out.println("【該訂單未支付,取消訂單】" + order.toString());
        } else if(order.getOrderStatus() == 1) {
            System.out.println("【該訂單已完成支付】");
        } else if(order.getOrderStatus() == 2) {
            System.out.println("【該訂單已取消】");
        }
        System.out.println("###########################################");
    }
}

4.6 測驗

我們模擬兩條訂單訊息,如下:

@Test
public void sendDelay() {
    Order order1 = new Order();
    String id1 = String.valueOf(Math.round(Math.random() * 10000));
    order1.setOrderId(id1);
    order1.setOrderStatus(0);
    order1.setName("杭州-北京");
    Order order2 = new Order();
    String id2 = String.valueOf(Math.round(Math.random() * 10000));
    order2.setOrderId(id2);
    order2.setOrderStatus(0);
    order2.setName("北京-深圳");
    orderProducer.publish(order1, id1, DelayConstant.ORDER_DELAY_EXCHANGE, DelayConstant.ORDER_DELAY_ROUTING_KEY);
    orderProducer.publish(order2, id2, DelayConstant.ORDER_DELAY_EXCHANGE, DelayConstant.ORDER_DELAY_ROUTING_KEY);
}

當發布訊息后,我們在客戶端去查看資料:

1、剛開始存在于延時佇列里面,如下:

image-20220224194621196

2、當訊息過期之后,則存在死信佇列中,如下:

image-20220224194712936

然后我們在開啟消費者服務,監控如下:

image-20220224195058492

4.7 不足之處

在正常情況下,我們等待的時間都是一樣的,假如都是 30 分鐘,但如果一個佇列存在不同的延時訊息怎么辦?

第一種就是不同的時間用不同的佇列;

第二種如果是同一佇列的話則存在以下問題:

如果我發送兩條延時訊息,第 1 條延時時間設為 60 s,第二條訊息設為 2 s,且第 1 條訊息優先發送,常規訊息處理應該是第 2 條訊息先過期并進入死信佇列然后進行消費,但實際情況是 RabbitMQ 只會檢查第 1 條訊息是否過期,如果過期則丟到死信佇列,如果第 1 條訊息的延時時長很長,而第二個訊息的延時時長很短,則第二個訊息并不會優先得到執行,(可自行測驗一下,這里不演示)

5. 延時佇列插件

對于上面存在的問題,如果不能實作在訊息粒度上添加 TTL,并使其在設定的 TTL 時間及時死亡,就無法設計成一個通用的延時佇列,

5.1 插件安裝

RabbitMQ 3.5.7版本以后支持延遲插件,通過插件可以很好的解決上面的問題,進入插件官網:

https://www.rabbitmq.com/community-plugins.html

找到 rabbitmq_delayed_message_exchange插件,然后解壓放置到RabbitMQ的插件目錄,

image-20220224220256019

以前都是一個壓縮包,現在直接是解壓好的檔案,下載之后把.ez檔案拷貝到 RabbitMQ 安裝目錄下的 plugins 目錄中,如下:

image-20220224221617367

然后進入 RabbitMQ 的安裝目錄下的 sbin 目錄,執行下面命令讓該插件生效,然后重啟RabbitMQ,

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

執行成功后如下:

image-20220224221856186

5.2 配置佇列和交換機

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class DelayMessageConfig {
 
	public static final String DELAY_EXCHANGE_NAME = "plugin.delay.exchange";
	
	public static final String DELAY_QUEUE_NAME = "plugin.delay.queue";
	
	public static final String ROUTING_KRY = "plugin.delay.queue";
	
	/**
	 * 宣告一個延遲佇列
	 * @return
	 */
	@Bean
	Queue delayQueue(){
		return QueueBuilder.durable(DELAY_QUEUE_NAME).build();
	}

	/**
	 * 宣告一個交換機
	 * @return
	 */
	@Bean
	CustomExchange delayExchange(){
		
		Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
		return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true,false, args);
		
	}
	/**
	 * 系結
	 * @param delayQueue
	 * @param delayExchange
	 * @return
	 */
	@Bean
	Binding queueBinding(Queue delayQueue, CustomExchange delayExchange){
		
	    return BindingBuilder.bind(delayQueue).to(delayExchange).with(ROUTING_KRY).noargs();
		
	}
}

5.3 訊息生產者

import com.javatv.bean.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @description : 訊息生產者
 */
@Component
@Slf4j
public class RabbitmqDelayOrderProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    /**
     *
     * @param order 訊息
     * @param messageId 唯一id
     * @param exchangeName 交換機
     * @param key 路由鍵
     * @param delayTime 延遲時間
     */
    public void publish(Order order, String messageId, String exchangeName, String key,Integer delayTime) {
        /* 確認的回呼 確認訊息是否到達 Broker 服務器 其實就是是否到達交換器
         * 如果發送時候指定的交換器不存在 ack 就是 false 代表訊息不可達
         */
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            log.info("correlationData:{} , ack:{}", correlationData.getId(), ack);
            if (!ack) {
                System.out.println("進行對應的訊息補償機制");
            }
        });
        /* 訊息失敗的回呼
         * 例如訊息已經到達交換器上,但路由鍵匹配任何系結到該交換器的佇列,會觸發這個回呼,此時 replyText: NO_ROUTE
         */
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            log.info("message:{}; replyCode: {}; replyText: {} ; exchange:{} ; routingKey:{}",
                    message, replyCode, replyText, exchange, routingKey);
        });
        // 在實際中ID 應該是全域唯一 能夠唯一標識訊息 訊息不可達的時候觸發ConfirmCallback回呼方法時可以獲取該值,進行對應的錯誤處理
        CorrelationData correlationData = new CorrelationData(messageId);
        rabbitTemplate.convertAndSend(exchangeName, key, order, message -> {
            // 設定延遲時間
            message.getMessageProperties().setDelay(delayTime);
            return message;
        }, correlationData);
    }
}

5.4 訊息消費者

import com.javatv.bean.Order;
import com.javatv.constant.DelayConstant;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class DelayPluginConsumer {

    @RabbitListener(queues = {"plugin.delay.queue"})
    public void orderDelayQueue(Order order, Message message, Channel channel) {
        System.out.println("###########################################");
        System.out.println("【orderDelayQueue 監聽的訊息】 - 【消費時間】 - ["
                +new Date()+"]- 【訂單內容】 - ["+order.toString()+"]");

        if(order.getOrderStatus() == 0) {
            order.setOrderStatus(2);
            System.out.println("【該訂單未支付,取消訂單】" + order.toString());
        } else if(order.getOrderStatus() == 1) {
            System.out.println("【該訂單已完成支付】");
        } else if(order.getOrderStatus() == 2) {
            System.out.println("【該訂單已取消】");
        }
        System.out.println("###########################################");
    }
}

5.5 測驗

@Test
public void sendDelayPlugin() {
    Order order1 = new Order();
    order1.setOrderId("1");
    order1.setOrderStatus(0);
    order1.setName("杭州-北京");
    
    Order order2 = new Order();
    order2.setOrderId("2");
    order2.setOrderStatus(0);
    order2.setName("北京-深圳");
    
    // 發送一條延遲 60s 的訊息
    delayOrderProducer.publish(order1, "1", DelayPluginConfig.DELAY_EXCHANGE_NAME, DelayPluginConfig.ROUTING_KRY,1000 * 60);
    // 發送一條延遲 5s 的訊息
    delayOrderProducer.publish(order2, "2", DelayPluginConfig.DELAY_EXCHANGE_NAME, DelayPluginConfig.ROUTING_KRY,1000 * 5);
}

執行測驗類的 5s 內延遲佇列中是沒有訊息的,如下:

image-20220224224931433

在 5s 之后存在第 1 條訊息,如下:

image-20220224224640228

在 60s 之后存在第 2 條訊息,如下:

image-20220224224735212

然后我們在開啟消費者的情況下,也不會因為延遲訊息時間長的訊息沒進入佇列而不消費延時時間短的訊息,如下:

image-20220224225206594

6. 總結

如果你仔細看了這篇文章,你會發現在自己通過TTL+DLX實作延時佇列的時候,我把消費之前的訊息定義在為延時佇列,而過期的訊息存入的佇列我把它稱為了死信佇列,因為這條訊息是死信訊息,但插件定義的延遲佇列來放死信訊息,其實兩者的定義并不沖突,都是可以理解的,我的意思是這樣的牛角不要鉆,

在學習延時佇列的時候也查閱了部分博客,看到了這樣一個問題:

RabbitMQ 的延時任務和訊息確認機制沖突嗎?即使訊息發送成功了,setReturnCallback 這個回呼還是被觸發了,

就我目前演示的版本,setReturnCallback是沒有被觸發的,不知道是否之前的版本有過這樣的問題就不得而知了,

原始碼:https://gitee.com/javatv/advanced-way/tree/master/spring-boot-rabbitmq

參考:https://www.cnblogs.com/mfrank/p/11260355.html

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/432167.html

標籤:其他

上一篇:實時計算知識,最詳細的整理

下一篇:Flink RPC原始碼流程

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more