什么是延時佇列
指訊息進入佇列后不會立即被消費,可以被延遲一定的時間,再進行消費.RabbitMQ沒有提供延遲佇列功能,但是可以使用TTL+DLX來實作延遲佇列效果
使用場景
電商平臺下單后,30分鐘未支付,取消訂單回滾庫存;新用戶注冊成功一周后,發送問候短信等等.
延時佇列實作
模擬電商平臺下單后,30分鐘后未支付,取消訂單回滾庫存

創建配置類
@Configuration
public class DelayConfig {
/**
* 創建一個正常的佇列
*
* @return
*/
@Bean
public Queue createNormalQueue() {
return QueueBuilder.durable("order_queue").build();
}
/**
* 創建一個死信佇列
*
* @return
*/
@Bean
public Queue createDeadQueue() {
return QueueBuilder.durable("order_dead_queue")
.withArgument("x-dead-letter-exchange", "order_dead_exchange") //設定死信交換機
.withArgument("x-dead-letter-routing-key", "order_dead")//設定死信路由key
.withArgument("x-message-ttl", 30000)// 佇列中訊息30秒過期
.build();
}
/**
* 創建一個正常的交換機
*
* @return
*/
@Bean
public DirectExchange createNormalExchange() {
return new DirectExchange("order_exchange");
}
/**
* 創建死信交換機
*
* @return
*/
@Bean
public DirectExchange createDeadExchange() {
return new DirectExchange("order_dead_exchange");
}
/**
* 創建系結:將正常佇列系結到死信交換機上面
*
* @return
*/
@Bean
public Binding createDeadBinding(@Qualifier(value = "createNormalQueue") Queue queue,
@Qualifier(value = "createDeadExchange") DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order_dead");
}
/**
* 創建系結:將死信佇列系結到正常的交換機上面
*
* @return
*/
@Bean
public Binding binding(@Qualifier(value = "createDeadQueue")Queue queue,
@Qualifier(value = "createNormalExchange")DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order");
}
}
創建監聽類
@Component
public class DelayListener {
@RabbitListener(queues = "order_queue")
public void listener(Message message, Channel channel, String msg) throws IOException {
// 模擬業務代碼執行
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(simpleDateFormat.format(new Date()) + "收到訊息:" + msg);
System.out.println("檢查訂單是否付款操作開始::沒有支付就取消訂單,回滾庫存");
// 簽收訊息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
創建controller用于測驗
@RestController
public class DelayController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping(value = "/send")
public void send(){
// 模擬業務代碼執行
String orderId = UUID.randomUUID().toString().replace("-","");
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(simpleDateFormat.format(new Date())+"創建訂單:"+orderId);
// 通過正常的交換機和routingKey把orderId發送到死信佇列
rabbitTemplate.convertAndSend("order_exchange","order",orderId);
}
}
注意
- 為了方便測驗,我在配置類中的死信佇列訊息過期時間設定的是30秒,再真實的場景中根據自己的需求來就好了.
- 發送訊息要發送給order_dead_queue(死信佇列),監聽要監聽order_queue(正常佇列)
測驗
http://localhost:18081/send:再發出資訊后,延遲了30秒后,消費到了資訊

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/339081.html
標籤:其他
