本篇博客的內容為RabbitMQ在開發程序中的快速上手使用,側重于代碼部分,幾乎沒有相關概念的介紹,相關概念請參考以下csdn博客,兩篇都是我找的精華帖,供大家學習,本篇博客也持續更新~~~
內容代碼部分由于word轉md格式有些問題,可以直接查看我的有道云筆記,鏈接:https://note.youdao.com/s/Ab7Cjiu
參考檔案
csdn博客:
基礎部分:https://blog.csdn.net/qq_35387940/article/details/100514134
高級部分:https://blog.csdn.net/weixin_49076273/article/details/124991012
application.yml
server:
port: 8021
spring:
#給專案來個名字
application:
name: rabbitmq-provider
#配置rabbitMq 服務器
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: root
#虛擬host 可以不設定,使用server默認host
virtual-host: JCcccHost
#確認訊息已發送到交換機(Exchange)
#publisher-confirms: true
publisher-confirm-type: correlated
#確認訊息已發送到佇列(Queue)
publisher-returns: true
完善更多資訊
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
retry:
#發布重試,默認false
enabled: true
#重試時間 默認1000ms
initial-interval: 1000
#重試最大次數 最大3
max-attempts: 3
#重試最大間隔時間
max-interval: 10000
#重試的時間隔乘數,比如配2,0
第一次等于10s,第二次等于20s,第三次等于40s
multiplier: 1
listener:
\# 默認配置是simple
type: simple
simple:
\# 手動ack Acknowledge mode of container. auto none
acknowledge-mode: manual
#消費者呼叫程式執行緒的最小數量
concurrency: 10
#消費者最大數量
max-concurrency: 10
#限制消費者每次只處理一條資訊,處理完在繼續下一條
prefetch: 1
#啟動時是否默認啟動容器
auto-startup: true
#被拒絕時重新進入佇列
default-requeue-rejected: true
相關注解說明
@RabbitListener 注解是指定某方法作為訊息消費的方法,例如監聽某 Queue
里面的訊息,
@RabbitListener標注在方法上,直接監聽指定的佇列,此時接收的引數需要與發送市型別一致,
\@Component
public class PointConsumer {
//監聽的佇列名
\@RabbitListener(queues = \"point.to.point\")
public void processOne(String name) {
System.out.println(\"point.to.point:\" + name);
}
}
@RabbitListener 可以標注在類上面,需配合 @RabbitHandler 注解一起使用
@RabbitListener 標注在類上面表示當有收到訊息的時候,就交給
@RabbitHandler 的方法處理,根據接受的引數型別進入具體的方法中,
\@Component
\@RabbitListener(queues = \"consumer_queue\")
public class Receiver {
\@RabbitHandler
public void processMessage1(String message) {
System.out.println(message);
}
\@RabbitHandler
public void processMessage2(byte\[\] message) {
System.out.println(new String(message));
}
}
@Payload
可以獲取訊息中的 body 資訊
\@RabbitListener(queues = \"debug\")
public void processMessage1(@Payload String body) {
System.out.println(\"body:\"+body);
}
@Header,@Headers
可以獲得訊息中的 headers 資訊
\@RabbitListener(queues = \"debug\")
public void processMessage1(@Payload String body, \@Header String token)
{
System.out.println(\"body:\"+body);
System.out.println(\"token:\"+token);
}
\@RabbitListener(queues = \"debug\")
public void processMessage1(@Payload String body, \@Headers
Map\<String,Object\> headers) {
System.out.println(\"body:\"+body);
System.out.println(\"Headers:\"+headers);
}
快速使用
配置xml檔案
<dependency\>
\<groupId\>org.springframework.boot\</groupId\>
\<artifactId\>spring-boot-starter-amqp\</artifactId\>
\</dependency\>
配置exchange、queue
注解快速創建版本
\@Configuration
public class RabbitmqConfig {
//創建交換機
//通過ExchangeBuilder能創建direct、topic、Fanout型別的交換機
\@Bean(\"bootExchange\")
public Exchange bootExchange() {
return
ExchangeBuilder.topicExchange(\"zx_topic_exchange\").durable(true).build();
}
//創建佇列
\@Bean(\"bootQueue\")
public Queue bootQueue() {
return QueueBuilder.durable(\"zx_queue\").build();
}
/\*\*
\* 將佇列與交換機系結
\*
\* \@param queue
\* \@param exchange
\* \@return
\*/
\@Bean
public Binding bindQueueExchange(@Qualifier(\"bootQueue\") Queue queue,
\@Qualifier(\"bootExchange\") Exchange exchange) {
return
BindingBuilder.bind(queue).to(exchange).with(\"boot.#\").noargs();
}
}
Direct
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/\*\*
\* \@Author : JCccc
\* \@CreateTime : 2019/9/3
\* \@Description :
\*\*/
\@Configuration
public class DirectRabbitConfig {
//佇列 起名:TestDirectQueue
\@Bean
public Queue TestDirectQueue() {
//
durable:是否持久化,默認是false,持久化佇列:會被存盤在磁盤上,當訊息代理重啟時仍然存在,暫存佇列:當前連接有效
//
exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后佇列即被洗掉,此參考優先級高于durable
//
autoDelete:是否自動洗掉,當沒有生產者或者消費者使用此佇列,該佇列會自動洗掉,
// return new Queue(\"TestDirectQueue\",true,true,false);
//一般設定一下佇列的持久化就好,其余兩個就是默認false
return new Queue(\"TestDirectQueue\",true);
}
//Direct交換機 起名:TestDirectExchange
\@Bean
DirectExchange TestDirectExchange() {
// return new DirectExchange(\"TestDirectExchange\",true,true);
return new DirectExchange(\"TestDirectExchange\",true,false);
}
//系結 將佇列和交換機系結, 并設定用于匹配鍵:TestDirectRouting
\@Bean
Binding bindingDirect() {
return
BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(\"TestDirectRouting\");
}
\@Bean
DirectExchange lonelyDirectExchange() {
return new DirectExchange(\"lonelyDirectExchange\");
}
}
Fanout
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/\*\*
\* \@Author : JCccc
\* \@CreateTime : 2019/9/3
\* \@Description :
\*\*/
\@Configuration
public class FanoutRabbitConfig {
/\*\*
\* 創建三個佇列 :fanout.A fanout.B fanout.C
\* 將三個佇列都系結在交換機 fanoutExchange 上
\* 因為是扇型交換機, 路由鍵無需配置,配置也不起作用
\*/
\@Bean
public Queue queueA() {
return new Queue(\"fanout.A\");
}
\@Bean
public Queue queueB() {
return new Queue(\"fanout.B\");
}
\@Bean
public Queue queueC() {
return new Queue(\"fanout.C\");
}
\@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(\"fanoutExchange\");
}
\@Bean
Binding bindingExchangeA() {
return BindingBuilder.bind(queueA()).to(fanoutExchange());
}
\@Bean
Binding bindingExchangeB() {
return BindingBuilder.bind(queueB()).to(fanoutExchange());
}
\@Bean
Binding bindingExchangeC() {
return BindingBuilder.bind(queueC()).to(fanoutExchange());
}
}
Topic
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/\*\*
\* \@Author : JCccc
\* \@CreateTime : 2019/9/3
\* \@Description :
\*\*/
\@Configuration
public class TopicRabbitConfig {
//系結鍵
public final static String man = \"topic.man\";
public final static String woman = \"topic.woman\";
\@Bean
public Queue firstQueue() {
return new Queue(TopicRabbitConfig.man);
}
\@Bean
public Queue secondQueue() {
return new Queue(TopicRabbitConfig.woman);
}
\@Bean
TopicExchange exchange() {
return new TopicExchange(\"topicExchange\");
}
//將firstQueue和topicExchange系結,而且系結的鍵值為topic.man
//這樣只要是訊息攜帶的路由鍵是topic.man,才會分發到該佇列
\@Bean
Binding bindingExchangeMessage() {
return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);
}
//將secondQueue和topicExchange系結,而且系結的鍵值為用上通配路由鍵規則topic.#
// 這樣只要是訊息攜帶的路由鍵是以topic.開頭,都會分發到該佇列
\@Bean
Binding bindingExchangeMessage2() {
return
BindingBuilder.bind(secondQueue()).to(exchange()).with(\"topic.#\");
}
}
生產者發送訊息
直接發送給佇列
//指定訊息佇列的名字,直接發送訊息到訊息佇列中
\@Test
public void testSimpleQueue() {
// 佇列名稱
String queueName = \"simple.queue\";
// 訊息
String message = \"hello, spring amqp!\";
// 發送訊息
rabbitTemplate.convertAndSend(queueName, message);
}
發送給交換機,然后走不同的模式
////指定交換機的名字,將訊息發送給交換機,然后不同模式下,訊息佇列根據key得到訊息
\@Test
public void testSendDirectExchange() {
// 交換機名稱,有三種型別
String exchangeName = \"itcast.direct\";
// 訊息
String message =
\"紅色警報!日本亂排核廢水,導致海洋生物變異,驚現哥斯拉!\";
// 發送訊息,red為佇列的key,因此此佇列會得到訊息
rabbitTemplate.convertAndSend(exchangeName, \"red\", message);
}
也可以將發送的訊息封裝到HashMap中然后發送給交換機
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
/\*\*
\* \@Author : JCccc
\* \@CreateTime : 2019/9/3
\* \@Description :
\*\*/
\@RestController
public class SendMessageController {
\@Autowired
RabbitTemplate rabbitTemplate;
//使用RabbitTemplate,這提供了接收/發送等等方法
\@GetMapping(\"/sendDirectMessage\")
public String sendDirectMessage() {
String messageId = String.valueOf(UUID.randomUUID());
String messageData = https://www.cnblogs.com/"test message, hello!\";
String createTime =
LocalDateTime.now().format(DateTimeFormatter.ofPattern(\"yyyy-MM-dd
HH:mm:ss\"));
Map\<String,Object\> map=new HashMap\<\>();
map.put(\"messageId\",messageId);
map.put(\"messageData\",messageData);
map.put(\"createTime\",createTime);
//將訊息攜帶系結鍵值:TestDirectRouting 發送到交換機TestDirectExchange
rabbitTemplate.convertAndSend(\"TestDirectExchange\",
\"TestDirectRouting\", map);
return \"ok\";
}
}
消費者接收訊息
//使用注解@RabbitListener定義當前方法監聽RabbitMQ中指定名稱的訊息佇列,
\@Component
public class MessageListener {
\@RabbitListener(queues = \"direct_queue\")
public void receive(String id){
System.out.println(\"已完成短信發送業務(rabbitmq direct),id:\"+id);
}
}
引數用Map接收也可以
\@Component
\@RabbitListener(queues = \"TestDirectQueue\")//監聽的佇列名稱
TestDirectQueue
public class DirectReceiver {
\@RabbitHandler
public void process(Map testMessage) {
System.out.println(\"DirectReceiver消費者收到訊息 : \" +
testMessage.toString());
}
}
高級特性
訊息可靠性傳遞
有confirm和return兩種
在application.yml中添加以下配置項:
server:
port: 8021
spring:
#給專案來個名字
application:
name: rabbitmq-provider
#配置rabbitMq 服務器
rabbitmq:
host: 127.0.0.1
port: 5672
username: root
password: root
#虛擬host 可以不設定,使用server默認host
virtual-host: JCcccHost
#確認訊息已發送到交換機(Exchange)
#publisher-confirms: true
publisher-confirm-type: correlated
#確認訊息已發送到佇列(Queue)
publisher-returns: true
有兩種配置方法:
寫到配置類中
寫到工具類或者普通類中,但是這個類得實作那兩個介面
寫法一
撰寫訊息確認回呼函式
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
\@Configuration
public class RabbitConfig {
\@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory
connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//設定開啟Mandatory,才能觸發回呼函式,無論訊息推送結果怎么樣都強制呼叫回呼函式
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
\@Override
public void confirm(CorrelationData correlationData, boolean ack, String
cause) {
System.out.println(\"ConfirmCallback:
\"+\"相關資料:\"+correlationData);
System.out.println(\"ConfirmCallback: \"+\"確認情況:\"+ack);
System.out.println(\"ConfirmCallback: \"+\"原因:\"+cause);
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
\@Override
public void returnedMessage(Message message, int replyCode, String
replyText, String exchange, String routingKey) {
System.out.println(\"ReturnCallback: \"+\"訊息:\"+message);
System.out.println(\"ReturnCallback: \"+\"回應碼:\"+replyCode);
System.out.println(\"ReturnCallback: \"+\"回應資訊:\"+replyText);
System.out.println(\"ReturnCallback: \"+\"交換機:\"+exchange);
System.out.println(\"ReturnCallback: \"+\"路由鍵:\"+routingKey);
}
});
return rabbitTemplate;
}
}
寫法二
\@Component
\@Slf4j
public class SmsRabbitMqUtils implements RabbitTemplate.ConfirmCallback,
RabbitTemplate.ReturnsCallback {
\@Resource
private RedisTemplate\<String, String\> redisTemplate;
\@Resource
private RabbitTemplate rabbitTemplate;
private String finalId = null;
private SmsDTO smsDTO = null;
/\*\*
\* 發布者確認的回呼
\*
\* \@param correlationData 回呼的相關資料,
\* \@param b ack為真,nack為假
\* \@param s 一個可選的原因,用于nack,如果可用,否則為空,
\*/
\@Override
public void confirm(CorrelationData correlationData, boolean b, String
s) {
// 訊息發送成功,將redis中訊息的狀態(status)修改為1
if (b) {
redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX +
finalId, \"status\", 1);
} else {
// 發送失敗,放入redis失敗集合中,并洗掉集合資料
log.error(\"短信訊息投送失敗:{}\--\>{}\", correlationData, s);
redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);
redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,
this.smsDTO);
}
}
/\*\*
\* 發生例外時的訊息回傳提醒
\*
\* \@param returnedMessage
\*/
\@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error(\"發生例外,回傳訊息回呼:{}\", returnedMessage);
// 發送失敗,放入redis失敗集合中,并洗掉集合資料
redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);
redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,
this.smsDTO);
}
\@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
}
訊息確認機制
手動確認
yml配置
#手動確認 manual
listener:
simple:
acknowledge-mode: manual
寫法一
首先在消費者專案中創建MessageListenerConfig
import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import
org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
\@Configuration
public class MessageListenerConfig {
\@Autowired
private CachingConnectionFactory connectionFactory;
\@Autowired
private MyAckReceiver myAckReceiver;//訊息接收處理類
\@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new
SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //
RabbitMQ默認是自動確認,這里改為手動確認訊息
//設定一個佇列
container.setQueueNames(\"TestDirectQueue\");
//如果同時設定多個如下: 前提是佇列都是必須已經創建存在的
//
container.setQueueNames(\"TestDirectQueue\",\"TestDirectQueue2\",\"TestDirectQueue3\");
//另一種設定佇列的方法,如果使用這種情況,那么要設定多個,就使用addQueues
//container.setQueues(new Queue(\"TestDirectQueue\",true));
//container.addQueues(new Queue(\"TestDirectQueue2\",true));
//container.addQueues(new Queue(\"TestDirectQueue3\",true));
container.setMessageListener(myAckReceiver);
return container;
}
}
然后創建手動確認監聽類MyAckReceiver(手動確認模式需要實作ChannelAwareMessageListener)
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import
org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;
\@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
\@Override
public void onMessage(Message message, Channel channel) throws Exception
{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
byte\[\] body = message.getBody();
ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(body));
Map\<String,String\> msgMap = (Map\<String,String\>) ois.readObject();
String messageId = msgMap.get(\"messageId\");
String messageData = https://www.cnblogs.com/Changes404/archive/2023/06/16/msgMap.get(/"messageData\");
String createTime = msgMap.get(\"createTime\");
ois.close();
System.out.println(\" MyAckReceiver messageId:\"+messageId+\"
messageData:\"+messageData+\" createTime:\"+createTime);
System.out.println(\"消費的主題訊息來自:\"+message.getMessageProperties().getConsumerQueue());
channel.basicAck(deliveryTag, true);
//第二個引數,手動確認可以被批處理,當該引數為 true 時,則可以一次性確認
delivery_tag 小于等于傳入值的所有訊息
//channel.basicReject(deliveryTag,
true);//第二個引數,true會重新放回佇列,所以需要自己根據業務邏輯判斷什么時候使用拒絕
} catch (Exception e) {
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
}
如果想實作不同的佇列,有不同的監聽確認處理機制,做不同的業務處理,那么這樣做:
首先需要在配置類中系結佇列,然后只需要根據訊息來自不同的佇列名進行區分處理即可
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import
org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.util.Map;
\@Component
public class MyAckReceiver implements ChannelAwareMessageListener {
\@Override
public void onMessage(Message message, Channel channel) throws Exception
{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
byte\[\] body = message.getBody();
ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(body));
Map\<String,String\> msgMap = (Map\<String,String\>) ois.readObject();
String messageId = msgMap.get(\"messageId\");
String messageData = https://www.cnblogs.com/Changes404/archive/2023/06/16/msgMap.get(/"messageData\");
String createTime = msgMap.get(\"createTime\");
ois.close();
if
(\"TestDirectQueue\".equals(message.getMessageProperties().getConsumerQueue())){
System.out.println(\"消費的訊息來自的佇列名為:\"+message.getMessageProperties().getConsumerQueue());
System.out.println(\"訊息成功消費到 messageId:\"+messageId+\"
messageData:\"+messageData+\" createTime:\"+createTime);
System.out.println(\"執行TestDirectQueue中的訊息的業務處理流程\...\...\");
}
if
(\"fanout.A\".equals(message.getMessageProperties().getConsumerQueue())){
System.out.println(\"消費的訊息來自的佇列名為:\"+message.getMessageProperties().getConsumerQueue());
System.out.println(\"訊息成功消費到 messageId:\"+messageId+\"
messageData:\"+messageData+\" createTime:\"+createTime);
System.out.println(\"執行fanout.A中的訊息的業務處理流程\...\...\");
}
channel.basicAck(deliveryTag, true);
//channel.basicReject(deliveryTag, true);//為true會重新放回佇列
} catch (Exception e) {
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
}
寫法二
\@Component
\@Slf4j
public class SendSmsListener {
\@Resource
private RedisTemplate\<String, String\> redisTemplate;
\@Resource
private SendSmsUtils sendSmsUtils;
/\*\*
\* 監聽發送短信普通佇列
\* \@param smsDTO
\* \@param message
\* \@param channel
\* \@throws IOException
\*/
\@RabbitListener(queues = SMS_QUEUE_NAME)
public void sendSmsListener(SmsDTO smsDTO, Message message, Channel
channel) throws IOException {
String messageId = message.getMessageProperties().getMessageId();
int retryCount = (int)
redisTemplate.opsForHash().get(RedisConstant.SMS_MESSAGE_PREFIX +
messageId, \"retryCount\");
if (retryCount \> 3) {
//重試次數大于3,直接放到死信佇列
log.error(\"短信訊息重試超過3次:{}\", messageId);
//basicReject方法拒絕deliveryTag對應的訊息,第二個引數是否requeue,true則重新入佇列,否則丟棄或者進入死信佇列,
//該方法reject后,該消費者還是會消費到該條被reject的訊息,
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);
return;
}
try {
String phoneNum = smsDTO.getPhoneNum();
String code = smsDTO.getCode();
if(StringUtils.isAnyBlank(phoneNum,code)){
throw new RuntimeException(\"sendSmsListener引數為空\");
}
// 發送訊息
SendSmsResponse sendSmsResponse = sendSmsUtils.sendSmsResponse(phoneNum,
code);
SendStatus\[\] sendStatusSet = sendSmsResponse.getSendStatusSet();
SendStatus sendStatus = sendStatusSet\[0\];
if(!\"Ok\".equals(sendStatus.getCode()) \|\|!\"send
success\".equals(sendStatus.getMessage())){
throw new RuntimeException(\"發送驗證碼失敗\");
}
//手動確認訊息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
log.info(\"短信發送成功:{}\",smsDTO);
redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);
} catch (Exception e) {
redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX+messageId,\"retryCount\",retryCount+1);
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
/\*\*
\* 監聽到發送短信死信佇列
\* \@param sms
\* \@param message
\* \@param channel
\* \@throws IOException
\*/
\@RabbitListener(queues = SMS_DELAY_QUEUE_NAME)
public void smsDelayQueueListener(SmsDTO sms, Message message, Channel
channel) throws IOException {
try{
log.error(\"監聽到死信佇列訊息==\>{}\",sms);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
消費端限流
#配置RabbitMQ
spring:
rabbitmq:
host: 192.168.126.3
port: 5672
username: guest
password: guest
virtual-host: /
#開啟自動確認 none 手動確認 manual
listener:
simple:
#消費端限流機制必須開啟手動確認
acknowledge-mode: manual
#消費端最多拉取的訊息條數,簽收后不滿該條數才會繼續拉取
prefetch: 5
訊息存活時間TTL
可以設定佇列的存活時間,也可以設定具體訊息的存活時間
設定佇列中所有訊息的存活時間
return QueueBuilder
.durable(QUEUE_NAME)//佇列持久化
.ttl(10000)//設定佇列的所有訊息存活10s
.build();
即在創建佇列時,設定存活時間
設定某條訊息的存活時間
//發送訊息,并設定該訊息的存活時間
\@Test
public void testSendMessage()
{
//1.創建訊息屬性
MessageProperties messageProperties = new MessageProperties();
//2.設定存活時間
messageProperties.setExpiration(\"10000\");
//3.創建訊息物件
Message message = new
Message(\"sendMessage\...\".getBytes(),messageProperties);
//4.發送訊息
rabbitTemplate.convertAndSend(\"my_topic_exchange1\",\"my_routing\",message);
}
若設定中間的訊息的存活時間,當過期時,該訊息不會被移除,但是該訊息已經不會被消費了,需要等到該訊息到隊里頂端才會被移除,因為佇列是頭出,尾進,故而要移除它需要等到它在頂端時才可以,
在佇列設定存活時間,也在單條訊息設定存活時間,則以時間短的為準
死信佇列
死信佇列和普通佇列沒有任何區別,只需要將普通佇列需要系結死信交換機和死信佇列就能夠實作功能
import org.springframework.amqp.core.\*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
\@Configuration//Rabbit配置類
public class RabbitConfig4 {
private final String DEAD_EXCHANGE = \"dead_exchange\";
private final String DEAD_QUEUE = \"dead_queue\";
private final String NORMAL_EXCHANGE = \"normal_exchange\";
private final String NORMAL_QUEUE = \"normal_queue\";
//創建死信交換機
\@Bean(DEAD_EXCHANGE)
public Exchange deadExchange()
{
return ExchangeBuilder
.topicExchange(DEAD_EXCHANGE)//交換機型別 ;引數為名字
topic為通配符模式的交換機
.durable(true)//是否持久化,true即存到磁盤,false只在記憶體上
.build();
}
//創建死信佇列
\@Bean(DEAD_QUEUE)
public Queue deadQueue()
{
return QueueBuilder
.durable(DEAD_QUEUE)//佇列持久化
//.maxPriority(10)//設定佇列的最大優先級,最大可以設定255,但官網推薦不超過10,太高比較浪費資源
.build();
}
//死信交換機系結死信佇列
\@Bean
//@Qualifier注解,使用名稱裝配進行使用
public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange
exchange, \@Qualifier(DEAD_QUEUE) Queue queue)
{
return BindingBuilder
.bind(queue)
.to(exchange)
.with(\"dead_routing\")
.noargs();
}
//創建普通交換機
\@Bean(NORMAL_EXCHANGE)
public Exchange normalExchange()
{
return ExchangeBuilder
.topicExchange(NORMAL_EXCHANGE)//交換機型別 ;引數為名字
topic為通配符模式的交換機
.durable(true)//是否持久化,true即存到磁盤,false只在記憶體上
.build();
}
//創建普通佇列
\@Bean(NORMAL_QUEUE)
public Queue normalQueue()
{
return QueueBuilder
.durable(NORMAL_QUEUE)//佇列持久化
//.maxPriority(10)//設定佇列的最大優先級,最大可以設定255,但官網推薦不超過10,太高比較浪費資源
.deadLetterExchange(DEAD_EXCHANGE)//系結死信交換機
.deadLetterRoutingKey(\"dead_routing\")//死信佇列路由關鍵字
.ttl(10000)//訊息存活10s
.maxLength(10)//佇列最大長度為10
.build();
}
//普通交換機系結普通佇列
\@Bean
//@Qualifier注解,使用名稱裝配進行使用
public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange
exchange, \@Qualifier(NORMAL_QUEUE) Queue queue)
{
return BindingBuilder
.bind(queue)
.to(exchange)
.with(\"my_routing\")
.noargs();
}
}
延遲佇列
RabbitMQ并未實作延遲佇列功能,所以可以通過死信佇列實作延遲佇列的功能
即給普通佇列設定存活時間30分鐘,過期后發送至死信佇列,在死信消費者監聽死信佇列訊息,查看訂單狀態,是否支付,未支付則取消訂單,回退庫存即可,
消費者監聽延遲佇列
\@Component
public class ExpireOrderConsumer {
//監聽過期訂單佇列
\@RabbitListener(queues = \"expire_queue\")
public void listenMessage(String orderId)
{
//模擬處理資料庫等業務
System.out.println(\"查詢\"+orderId+\"號訂單的狀態,如果已支付無需處理,如果未支付則回退庫存\");
}
}
控制層代碼
\@RestController
public class OrderController {
\@Autowired
private RabbitTemplate rabbitTemplate;
\@RequestMapping(value = https://www.cnblogs.com/"/place/{orderId}\",method =
RequestMethod.GET)
public String placeOrder(@PathVariable String orderId)
{
//模擬service層處理
System.out.println(\"處理訂單資料\...\");
//將訂單id發送到訂單佇列
rabbitTemplate.convertAndSend(\"order_exchange\",\"order_routing\",orderId);
return \"下單成功,修改庫存\";
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/555344.html
標籤:其他
下一篇:返回列表
