場景
開發中經常需要用到定時任務,對于商城來說,定時任務尤其多,比如優惠券定時過期、訂單定時關閉、微信支付2小時未支付關閉訂單等等,都需要用到定時任務,但是定時任務本身有一個問題,
一般來說我們都是通過定時輪詢查詢資料庫來判斷是否有任務需要執行,也就是說不管怎么樣,我們需要先查詢資料庫,而且有些任務對時間準確要求比較高的,需要每秒查詢一次,對于系統小倒是無所謂,如果系統本身就大而且資料也多的情況下,這就不大現實了,所以需要其他方式的,當然實作的方式有多種多樣的,比如Redis實作定時佇列、基于優先級佇列的JDK延遲佇列、時間輪等,
因為我們專案中本身就使用到了Rabbitmq,所以基于方便開發和維護的原則,我們使用了Rabbitmq延遲佇列來實作定時任務,不知道rabbitmq是什么的和不知道springboot怎么集成Rabbitmq的可以查看公眾號Java技術堆疊之前的文章Spring boot集成RabbitMQ,
Spring Boot 基礎教程和示例代碼:https://github.com/javastacks/spring-boot-best-practice
Rabbitmq延遲佇列
Rabbitmq本身是沒有延遲佇列的,只能通過Rabbitmq本身佇列的特性來實作,想要Rabbitmq實作延遲佇列,需要使用Rabbitmq的死信交換機(Exchange)和訊息的存活時間TTL(Time To Live)
死信交換機
一個訊息在滿足如下條件下,會進死信交換機,記住這里是交換機而不是佇列,一個交換機可以對應很多佇列,
一個訊息被Consumer拒收了,并且reject方法的引數里requeue是false,也就是說不會被再次放在佇列里,被其他消費者使用,
上面的訊息的TTL到了,訊息過期了,
佇列的長度限制滿了,排在前面的訊息會被丟棄或者扔到死信路由上,
死信交換機就是普通的交換機,只是因為我們把過期的訊息扔進去,所以叫死信交換機,并不是說死信交換機是某種特定的交換機
訊息TTL(訊息存活時間)
訊息的TTL就是訊息的存活時間,RabbitMQ可以對佇列和訊息分別設定TTL,對佇列設定就是佇列沒有消費者連著的保留時間,也可以對每一個單獨的訊息做單獨的設定,超過了這個時間,我們認為這個訊息就死了,稱之為死信,如果佇列設定了,訊息也設定了,那么會取小的,所以一個訊息如果被路由到不同的佇列中,這個訊息死亡的時間有可能不一樣(不同的佇列設定),這里單講單個訊息的TTL,因為它才是實作延遲任務的關鍵,
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);
可以通過設定訊息的expiration欄位或者x-message-ttl屬性來設定時間,兩者是一樣的效果,只是expiration欄位是字串引數,所以要寫個int型別的字串:當上面的訊息扔到佇列中后,過了60秒,如果沒有被消費,它就死了,不會被消費者消費到,這個訊息后面的,沒有“死掉”的訊息對頂上來,被消費者消費,死信在佇列中并不會被洗掉和釋放,它會被統計到佇列的訊息數中去
處理流程圖

創建交換機(Exchanges)和佇列(Queues)
創建死信交換機

如圖所示,就是創建一個普通的交換機,這里為了方便區分,把交換機的名字取為:delay
創建自動過期訊息佇列
這個佇列的主要作用是讓訊息定時過期的,比如我們需要2小時候關閉訂單,我們就需要把訊息放進這個佇列里面,把訊息過期時間設定為2小時

創建一個一個名為delay_queue1的自動過期的佇列,當然圖片上面的引數并不會讓訊息自動過期,因為我們并沒有設定x-message-ttl引數,如果整個佇列的訊息有訊息都是相同的,可以設定,這里為了靈活,所以并沒有設定,另外兩個引數x-dead-letter-exchange代表訊息過期后,訊息要進入的交換機,這里配置的是delay,也就是死信交換機,x-dead-letter-routing-key是配置訊息過期后,進入死信交換機的routing-key,跟發送訊息的routing-key一個道理,根據這個key將訊息放入不同的佇列
創建訊息處理佇列
這個佇列才是真正處理訊息的佇列,所有進入這個佇列的訊息都會被處理

訊息佇列的名字為delay_queue2
訊息佇列系結到交換機
進入交換機詳情頁面,將創建的2個佇列(delayqueue1和delayqueue2)系結到交換機上面

自動過期訊息佇列的routing key 設定為delay
系結delayqueue2

delay*queue2 的key要設定為創建自動過期的佇列的x-dead-letter-routing-key引數,這樣當訊息過期的時候就可以自動把訊息放入delay_queue2這個佇列中了
系結后的管理頁面如下圖:

當然這個系結也可以使用代碼來實作,只是為了直觀表現,所以本文使用的管理平臺來操作
發送訊息
String msg = "hello word"; MessageProperties messageProperties = newMessageProperties(); messageProperties.setExpiration("6000"); messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes()); Message message = newMessage(msg.getBytes(), messageProperties); rabbitTemplate.convertAndSend("delay", "delay",message);
主要的代碼就是
messageProperties.setExpiration("6000");
設定了讓訊息6秒后過期
注意:因為要讓訊息自動過期,所以一定不能設定delay_queue1的監聽,不能讓這個佇列里面的訊息被接受到,否則訊息一旦被消費,就不存在過期了
接收訊息
接收訊息配置好delay_queue2的監聽就好了
package wang.raye.rabbitmq.demo1;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DelayQueue {
/** 訊息交換機的名字*/
public static final String EXCHANGE = "delay";
/** 佇列key1*/
public static final String ROUTINGKEY1 = "delay";
/** 佇列key2*/
public static final String ROUTINGKEY2 = "delay_key";
/**
* 配置鏈接資訊
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("120.76.237.8",5672);
connectionFactory.setUsername("kberp");
connectionFactory.setPassword("kberp");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); // 必須要設定
return connectionFactory;
}
/**
* 配置訊息交換機
* 針對消費者配置
FanoutExchange: 將訊息分發到所有的系結佇列,無routingkey的概念
HeadersExchange :通過添加屬性key-value匹配
DirectExchange:按照routingkey分發到指定佇列
TopicExchange:多關鍵字匹配
*/
@Bean
public DirectExchange defaultExchange() {
return new DirectExchange(EXCHANGE, true, false);
}
/**
* 配置訊息佇列2
* 針對消費者配置
* @return
*/
@Bean
public Queue queue() {
return new Queue("delay_queue2", true); //佇列持久
}
/**
* 將訊息佇列2與交換機系結
* 針對消費者配置
* @return
*/
@Bean
@Autowired
public Binding binding() {
return BindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);
}
/**
* 接受訊息的監聽,這個監聽會接受訊息佇列1的訊息
* 針對消費者配置
* @return
*/
@Bean
@Autowired
public SimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認
container.setMessageListener(new ChannelAwareMessageListener() {
public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
byte[] body = message.getBody();
System.out.println("delay_queue2 收到訊息 : " + new String(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //確認訊息成功消費
}
});
return container;
}
}
在訊息監聽中處理需要定時處理的任務就好了,因為Rabbitmq能發送訊息,所以可以把任務特征碼發過來,比如關閉訂單就把訂單id發過來,這樣就避免了需要查詢一下那些訂單需要關閉而加重MySQL負擔了,畢竟一旦訂單量大的話,查詢本身也是一件很費IO的事情
總結
基于Rabbitmq實作定時任務,就是將訊息設定一個過期時間,放入一個沒有讀取的佇列中,讓訊息過期后自動轉入另外一個佇列中,監控這個佇列訊息的監聽處來處理定時任務具體的操作,
原文鏈接:https://blog.csdn.net/wantnrun/article/details/80401641
著作權宣告:本文為CSDN博主「RayeWang」的原創文章,遵循CC 4.0 BY-SA著作權協議,轉載請附上原文出處鏈接及本宣告,
近期熱文推薦:
1.1,000+ 道 Java面試題及答案整理(2021最新版)
2.終于靠開源專案弄到 IntelliJ IDEA 激活碼了,真香!
3.阿里 Mock 工具正式開源,干掉市面上所有 Mock 工具!
4.Spring Cloud 2020.0.0 正式發布,全新顛覆性版本!
5.《Java開發手冊(嵩山版)》最新發布,速速下載!
覺得不錯,別忘了隨手點贊+轉發哦!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/288034.html
標籤:Java
上一篇:你知道什么是CI/CD嗎?不懂?五分鐘讓你徹底理解!
下一篇:自寫一個生成ID的工具類
