目錄
一、需求背景之為什么要有超時關單
二、所以多數訂單業務都是會有這個功能,那如何設計呢?
?三、RabbitMQ死信佇列-延遲訊息知識點回顧
3.1什么是rabbitmq的死信佇列
3.2什么是rabbitmq的死信交換機
3.3訊息有哪幾種情況成為死信
3.4什么是延遲佇列
四、代碼環節
4.1 Rabbitmq死信佇列配置
4.2延遲訊息的發送
4.3關單消費者處理
一、需求背景之為什么要有超時關單
原因一:第三方支付平臺的支付連接都是有時效性,創建訂單后,需要再一定的時間內支付完成
微信支付、支付寶支付等
也可以不關閉訂單,做訂單二次支付的操作,但業務鏈路會更加復雜
原因二:電商業務里面還會涉及到商品庫存的鎖定和釋放
二、所以多數訂單業務都是會有這個功能,那如何設計呢?
三、RabbitMQ死信佇列-延遲訊息知識點回顧
3.1什么是RabbitMQ的死信佇列
沒有被及時消費的訊息存放的佇列
3.2什么是rabbitmq的死信交換機
Dead Letter Exchange(死信交換機,縮寫:DLX)當訊息成為死信后,會被重新發送到另一個交換機,這個交換機就是DLX死信交換機

3.3訊息有哪幾種情況成為死信
消費者拒收訊息(basic.reject/ basic.nack),并且沒有重新入隊 requeue=false
訊息在佇列中未被消費,且超過佇列或者訊息本身的過期時間TTL(time-to-live)
佇列的訊息長度達到極限
結果:訊息成為死信后,如果該佇列系結了死信交換機,則訊息會被死信交換機重新路由到死信佇列
3.4什么是延遲佇列
一種帶有延遲功能的訊息佇列,Producer 將訊息發送到訊息佇列 服務端,但并不期望這條訊息立馬投遞,而是推遲到在當前時間點之后的某一個時間投遞到 Consumer 進行消費,該訊息即定時訊息
業界的一些實作方式:
.定時任務高精度輪訓
.redis監聽key過期
.jdk自帶的DelayQueue
.采用RocketMQ自帶延遲訊息功能
.RabbitMQ本身是不支持延遲佇列的, 結合死信佇列的特性,就可以做到延遲訊息
四、代碼環節
4.1 Rabbitmq死信佇列配置
package net.wnn.config;
import lombok.Data;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* * 自定義訊息佇列配置,
* * 發送 關單訊息-》延遲exchange-》order.close.delay.queue-》死信exchange-》order.close.queue
**/
@Configuration
@Data
public class RabbitMQConfig {
/**
* 交換機
*/
private String orderEventExchange="order.event.exchange";
/**
* 延遲佇列,不能被消費者監聽
*/
private String orderCloseDelayQueue = "order.close.delay.queue";
/**
* 關單佇列,延遲佇列的訊息過期后轉發的佇列,用于被消費者監聽
*/
private String orderCloseQueue = "order.close.queue";
/**
* 進入到延遲佇列的routingKey
*/
private String orderCloseDelayRoutingKey = "order.close.delay.routing.key";
/**
* 進入死信佇列的routingKey,訊息過期進入死信佇列的key
*/
private String orderCloseRoutingKey = "order.close.routing.key";
/**
* 過期時間,毫秒單位,臨時改為1分鐘過期
*/
private Integer ttl = 1000 * 60;
/**
* 訊息轉換器
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 創建交換機,topic型別,一般一個業務一個交換機
* @return
*/
@Bean
public Exchange orderEventExchange(){
return new TopicExchange(orderEventExchange,true,false);
}
/**
* 延遲佇列getOrderEventExchange
* @return
*/
@Bean
public Queue orderCloseDelayQueue(){
Map<String,Object> args = new HashMap<>(3);
args.put("x-dead-letter-exchange",orderEventExchange);
args.put("x-dead-letter-routing-key",orderCloseRoutingKey);
args.put("x-message-ttl",ttl);
return new Queue(orderCloseDelayQueue,true,false,false,args);
}
/**
* 死信佇列,是一個普通佇列,用于被監聽
* @return
*/
@Bean
public Queue orderCloseQueue(){
return new Queue(orderCloseQueue,true,false,false);
}
/**
* 第一個佇列 即延遲佇列和交換機建立系結關系
* @return
*/
@Bean
public Binding orderCloseDelayBinding(){
return new Binding(orderCloseDelayQueue,
Binding.DestinationType.QUEUE,orderEventExchange,orderCloseDelayRoutingKey,null);
}
/**
* 死信佇列和死信交換機建立系結關系
* @return
*/
@Bean
public Binding orderCloseBinding(){
return new Binding(orderCloseQueue,
Binding.DestinationType.QUEUE,orderEventExchange,orderCloseRoutingKey,null);
}
}
##----------rabbit配置--------------
spring.rabbitmq.host=120.79.xxx.xxx
spring.rabbitmq.port=5672
#需要手工創建虛擬主機
spring.rabbitmq.virtual-host=dev
spring.rabbitmq.username=admin
spring.rabbitmq.password=password
#訊息確認方式,manual(手動ack) 和auto(自動ack); 訊息消費重試到達指定次數進到例外交換機和例外佇列,需要改為自動ack確認訊息
spring.rabbitmq.listener.simple.acknowledge-mode=auto
4.2延遲訊息的發送
@Autowired
private RabbitMQConfig rabbitMQConfig;
@Autowired
private RabbitTemplate rabbitTemplate;
//發送延遲訊息
EventMessage eventMessage = EventMessage.builder()
.eventMessageType(EventMessageType.PRODUCT_ORDER_NEW.name())
.accountNo(loginUser.getAccountNo())
.bizId(orderOutTradeNo)
.build();
rabbitTemplate.convertAndSend(rabbitMQConfig.getOrderEventExchange(),rabbitMQConfig.getOrderCloseDelayRoutingKey(),eventMessage);
4.3關單消費者處理
package net.wnn.listener;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import net.wnn.enums.BizCodeEnum;
import net.wnn.exception.BizException;
import net.wnn.model.EventMessage;
import net.wnn.service.ProductOrderService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RabbitListener(queuesToDeclare = {@Queue("order.close.queue")})
public class ProductOrderMQListener {
@Autowired
private ProductOrderService productOrderService;
@RabbitHandler
public void productOrderHandler(EventMessage eventMessage, Message message, Channel channel){
log.info("監聽到訊息ProductOrderMQListener messsage訊息內容:{}",message);
try{
//關閉訂單 業務邏輯
productOrderService.closeProductOrder(eventMessage);
}catch (Exception e){
log.error("消費者失敗:{}",eventMessage);
throw new BizException(BizCodeEnum.MQ_CONSUME_EXCEPTION);
}
log.info("消費成功:{}",eventMessage);
}
}
監聽到延遲的關單訊息后,根據實際業務進行查詢訂單、支付等確定是否要進行關單操作
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/423728.html
標籤:其他
上一篇:Hive執行show databases出現RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata
下一篇:Spark基礎入門

