主頁 > 後端開發 > RabbitMQ快速使用代碼手冊

RabbitMQ快速使用代碼手冊

2023-06-17 07:27:13 後端開發

本篇博客的內容為RabbitMQ在開發程序中的快速上手使用,側重于代碼部分,幾乎沒有相關概念的介紹,相關概念請參考以下csdn博客,兩篇都是我找的精華帖,供大家學習,本篇博客也持續更新~~~
內容代碼部分由于word轉md格式有些問題,可以直接查看我的有道云筆記,鏈接:https://note.youdao.com/s/Ab7Cjiu

參考檔案

csdn博客:

基礎部分:https://blog.csdn.net/qq_35387940/article/details/100514134

高級部分:https://blog.csdn.net/weixin_49076273/article/details/124991012

application.yml

server:

port: 8021

spring:

#給專案來個名字

application:

name: rabbitmq-provider

#配置rabbitMq 服務器

rabbitmq:

host: 127.0.0.1

port: 5672

username: root

password: root

#虛擬host 可以不設定,使用server默認host

virtual-host: JCcccHost

#確認訊息已發送到交換機(Exchange)

#publisher-confirms: true

publisher-confirm-type: correlated

#確認訊息已發送到佇列(Queue)

publisher-returns: true

完善更多資訊

spring:

rabbitmq:

host: localhost

port: 5672

virtual-host: /

username: guest

password: guest

publisher-confirm-type: correlated

publisher-returns: true

template:

mandatory: true

retry:

#發布重試,默認false

enabled: true

#重試時間 默認1000ms

initial-interval: 1000

#重試最大次數 最大3

max-attempts: 3

#重試最大間隔時間

max-interval: 10000

#重試的時間隔乘數,比如配2,0
第一次等于10s,第二次等于20s,第三次等于40s

multiplier: 1

listener:

\# 默認配置是simple

type: simple

simple:

\# 手動ack Acknowledge mode of container. auto none

acknowledge-mode: manual

#消費者呼叫程式執行緒的最小數量

concurrency: 10

#消費者最大數量

max-concurrency: 10

#限制消費者每次只處理一條資訊,處理完在繼續下一條

prefetch: 1

#啟動時是否默認啟動容器

auto-startup: true

#被拒絕時重新進入佇列

default-requeue-rejected: true

相關注解說明

@RabbitListener 注解是指定某方法作為訊息消費的方法,例如監聽某 Queue
里面的訊息,

@RabbitListener標注在方法上,直接監聽指定的佇列,此時接收的引數需要與發送市型別一致,

\@Component

public class PointConsumer {

//監聽的佇列名

\@RabbitListener(queues = \"point.to.point\")

public void processOne(String name) {

System.out.println(\"point.to.point:\" + name);

}

}

@RabbitListener 可以標注在類上面,需配合 @RabbitHandler 注解一起使用

@RabbitListener 標注在類上面表示當有收到訊息的時候,就交給
@RabbitHandler 的方法處理,根據接受的引數型別進入具體的方法中,

\@Component

\@RabbitListener(queues = \"consumer_queue\")

public class Receiver {

\@RabbitHandler

public void processMessage1(String message) {

System.out.println(message);

}

\@RabbitHandler

public void processMessage2(byte\[\] message) {

System.out.println(new String(message));

}

}

@Payload

可以獲取訊息中的 body 資訊

\@RabbitListener(queues = \"debug\")

public void processMessage1(@Payload String body) {

System.out.println(\"body:\"+body);

}

@Header,@Headers

可以獲得訊息中的 headers 資訊

\@RabbitListener(queues = \"debug\")

public void processMessage1(@Payload String body, \@Header String token)
{

System.out.println(\"body:\"+body);

System.out.println(\"token:\"+token);

}

\@RabbitListener(queues = \"debug\")

public void processMessage1(@Payload String body, \@Headers
Map\<String,Object\> headers) {

System.out.println(\"body:\"+body);

System.out.println(\"Headers:\"+headers);

}

快速使用

配置xml檔案

<dependency\>

\<groupId\>org.springframework.boot\</groupId\>

\<artifactId\>spring-boot-starter-amqp\</artifactId\>

\</dependency\>

配置exchange、queue

注解快速創建版本

\@Configuration

public class RabbitmqConfig {

//創建交換機

//通過ExchangeBuilder能創建direct、topic、Fanout型別的交換機

\@Bean(\"bootExchange\")

public Exchange bootExchange() {

return
ExchangeBuilder.topicExchange(\"zx_topic_exchange\").durable(true).build();

}

//創建佇列

\@Bean(\"bootQueue\")

public Queue bootQueue() {

return QueueBuilder.durable(\"zx_queue\").build();

}

/\*\*

\* 將佇列與交換機系結

\*

\* \@param queue

\* \@param exchange

\* \@return

\*/

\@Bean

public Binding bindQueueExchange(@Qualifier(\"bootQueue\") Queue queue,
\@Qualifier(\"bootExchange\") Exchange exchange) {

return
BindingBuilder.bind(queue).to(exchange).with(\"boot.#\").noargs();

}

}

Direct

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/\*\*

\* \@Author : JCccc

\* \@CreateTime : 2019/9/3

\* \@Description :

\*\*/

\@Configuration

public class DirectRabbitConfig {

//佇列 起名:TestDirectQueue

\@Bean

public Queue TestDirectQueue() {

//
durable:是否持久化,默認是false,持久化佇列:會被存盤在磁盤上,當訊息代理重啟時仍然存在,暫存佇列:當前連接有效

//
exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后佇列即被洗掉,此參考優先級高于durable

//
autoDelete:是否自動洗掉,當沒有生產者或者消費者使用此佇列,該佇列會自動洗掉,

// return new Queue(\"TestDirectQueue\",true,true,false);

//一般設定一下佇列的持久化就好,其余兩個就是默認false

return new Queue(\"TestDirectQueue\",true);

}

//Direct交換機 起名:TestDirectExchange

\@Bean

DirectExchange TestDirectExchange() {

// return new DirectExchange(\"TestDirectExchange\",true,true);

return new DirectExchange(\"TestDirectExchange\",true,false);

}

//系結 將佇列和交換機系結, 并設定用于匹配鍵:TestDirectRouting

\@Bean

Binding bindingDirect() {

return
BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(\"TestDirectRouting\");

}

\@Bean

DirectExchange lonelyDirectExchange() {

return new DirectExchange(\"lonelyDirectExchange\");

}

}

Fanout

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/\*\*

\* \@Author : JCccc

\* \@CreateTime : 2019/9/3

\* \@Description :

\*\*/

\@Configuration

public class FanoutRabbitConfig {

/\*\*

\* 創建三個佇列 :fanout.A fanout.B fanout.C

\* 將三個佇列都系結在交換機 fanoutExchange 上

\* 因為是扇型交換機, 路由鍵無需配置,配置也不起作用

\*/

\@Bean

public Queue queueA() {

return new Queue(\"fanout.A\");

}

\@Bean

public Queue queueB() {

return new Queue(\"fanout.B\");

}

\@Bean

public Queue queueC() {

return new Queue(\"fanout.C\");

}

\@Bean

FanoutExchange fanoutExchange() {

return new FanoutExchange(\"fanoutExchange\");

}

\@Bean

Binding bindingExchangeA() {

return BindingBuilder.bind(queueA()).to(fanoutExchange());

}

\@Bean

Binding bindingExchangeB() {

return BindingBuilder.bind(queueB()).to(fanoutExchange());

}

\@Bean

Binding bindingExchangeC() {

return BindingBuilder.bind(queueC()).to(fanoutExchange());

}

}

Topic

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.Queue;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

/\*\*

\* \@Author : JCccc

\* \@CreateTime : 2019/9/3

\* \@Description :

\*\*/

\@Configuration

public class TopicRabbitConfig {

//系結鍵

public final static String man = \"topic.man\";

public final static String woman = \"topic.woman\";

\@Bean

public Queue firstQueue() {

return new Queue(TopicRabbitConfig.man);

}

\@Bean

public Queue secondQueue() {

return new Queue(TopicRabbitConfig.woman);

}

\@Bean

TopicExchange exchange() {

return new TopicExchange(\"topicExchange\");

}

//將firstQueue和topicExchange系結,而且系結的鍵值為topic.man

//這樣只要是訊息攜帶的路由鍵是topic.man,才會分發到該佇列

\@Bean

Binding bindingExchangeMessage() {

return BindingBuilder.bind(firstQueue()).to(exchange()).with(man);

}

//將secondQueue和topicExchange系結,而且系結的鍵值為用上通配路由鍵規則topic.#

// 這樣只要是訊息攜帶的路由鍵是以topic.開頭,都會分發到該佇列

\@Bean

Binding bindingExchangeMessage2() {

return
BindingBuilder.bind(secondQueue()).to(exchange()).with(\"topic.#\");

}

}

生產者發送訊息

直接發送給佇列

//指定訊息佇列的名字,直接發送訊息到訊息佇列中

\@Test

public void testSimpleQueue() {

// 佇列名稱

String queueName = \"simple.queue\";

// 訊息

String message = \"hello, spring amqp!\";

// 發送訊息

rabbitTemplate.convertAndSend(queueName, message);

}

發送給交換機,然后走不同的模式

////指定交換機的名字,將訊息發送給交換機,然后不同模式下,訊息佇列根據key得到訊息

\@Test

public void testSendDirectExchange() {

// 交換機名稱,有三種型別

String exchangeName = \"itcast.direct\";

// 訊息

String message =
\"紅色警報!日本亂排核廢水,導致海洋生物變異,驚現哥斯拉!\";

// 發送訊息,red為佇列的key,因此此佇列會得到訊息

rabbitTemplate.convertAndSend(exchangeName, \"red\", message);

}

也可以將發送的訊息封裝到HashMap中然后發送給交換機

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.GetMapping;

import org.springframework.web.bind.annotation.RestController;

import java.time.LocalDateTime;

import java.time.format.DateTimeFormatter;

import java.util.HashMap;

import java.util.Map;

import java.util.UUID;

/\*\*

\* \@Author : JCccc

\* \@CreateTime : 2019/9/3

\* \@Description :

\*\*/

\@RestController

public class SendMessageController {

\@Autowired

RabbitTemplate rabbitTemplate;
//使用RabbitTemplate,這提供了接收/發送等等方法

\@GetMapping(\"/sendDirectMessage\")

public String sendDirectMessage() {

String messageId = String.valueOf(UUID.randomUUID());

String messageData = https://www.cnblogs.com/"test message, hello!\";

String createTime =
LocalDateTime.now().format(DateTimeFormatter.ofPattern(\"yyyy-MM-dd
HH:mm:ss\"));

Map\<String,Object\> map=new HashMap\<\>();

map.put(\"messageId\",messageId);

map.put(\"messageData\",messageData);

map.put(\"createTime\",createTime);

//將訊息攜帶系結鍵值:TestDirectRouting 發送到交換機TestDirectExchange

rabbitTemplate.convertAndSend(\"TestDirectExchange\",
\"TestDirectRouting\", map);

return \"ok\";

}

}

消費者接收訊息

//使用注解@RabbitListener定義當前方法監聽RabbitMQ中指定名稱的訊息佇列,

\@Component

public class MessageListener {

\@RabbitListener(queues = \"direct_queue\")

public void receive(String id){

System.out.println(\"已完成短信發送業務(rabbitmq direct),id:\"+id);

}

}

引數用Map接收也可以

\@Component

\@RabbitListener(queues = \"TestDirectQueue\")//監聽的佇列名稱
TestDirectQueue

public class DirectReceiver {

\@RabbitHandler

public void process(Map testMessage) {

System.out.println(\"DirectReceiver消費者收到訊息 : \" +
testMessage.toString());

}

}

高級特性

訊息可靠性傳遞

有confirm和return兩種

在application.yml中添加以下配置項:

server:

port: 8021

spring:

#給專案來個名字

application:

name: rabbitmq-provider

#配置rabbitMq 服務器

rabbitmq:

host: 127.0.0.1

port: 5672

username: root

password: root

#虛擬host 可以不設定,使用server默認host

virtual-host: JCcccHost

#確認訊息已發送到交換機(Exchange)

#publisher-confirms: true

publisher-confirm-type: correlated

#確認訊息已發送到佇列(Queue)

publisher-returns: true

有兩種配置方法:

寫到配置類中

寫到工具類或者普通類中,但是這個類得實作那兩個介面

寫法一

撰寫訊息確認回呼函式

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.connection.ConnectionFactory;

import org.springframework.amqp.rabbit.connection.CorrelationData;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

\@Configuration

public class RabbitConfig {

\@Bean

public RabbitTemplate createRabbitTemplate(ConnectionFactory
connectionFactory){

RabbitTemplate rabbitTemplate = new RabbitTemplate();

rabbitTemplate.setConnectionFactory(connectionFactory);

//設定開啟Mandatory,才能觸發回呼函式,無論訊息推送結果怎么樣都強制呼叫回呼函式

rabbitTemplate.setMandatory(true);

rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

\@Override

public void confirm(CorrelationData correlationData, boolean ack, String
cause) {

System.out.println(\"ConfirmCallback:
\"+\"相關資料:\"+correlationData);

System.out.println(\"ConfirmCallback: \"+\"確認情況:\"+ack);

System.out.println(\"ConfirmCallback: \"+\"原因:\"+cause);

}

});

rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

\@Override

public void returnedMessage(Message message, int replyCode, String
replyText, String exchange, String routingKey) {

System.out.println(\"ReturnCallback: \"+\"訊息:\"+message);

System.out.println(\"ReturnCallback: \"+\"回應碼:\"+replyCode);

System.out.println(\"ReturnCallback: \"+\"回應資訊:\"+replyText);

System.out.println(\"ReturnCallback: \"+\"交換機:\"+exchange);

System.out.println(\"ReturnCallback: \"+\"路由鍵:\"+routingKey);

}

});

return rabbitTemplate;

}

}

寫法二

\@Component

\@Slf4j

public class SmsRabbitMqUtils implements RabbitTemplate.ConfirmCallback,
RabbitTemplate.ReturnsCallback {

\@Resource

private RedisTemplate\<String, String\> redisTemplate;

\@Resource

private RabbitTemplate rabbitTemplate;

private String finalId = null;

private SmsDTO smsDTO = null;

/\*\*

\* 發布者確認的回呼

\*

\* \@param correlationData 回呼的相關資料,

\* \@param b ack為真,nack為假

\* \@param s 一個可選的原因,用于nack,如果可用,否則為空,

\*/

\@Override

public void confirm(CorrelationData correlationData, boolean b, String
s) {

// 訊息發送成功,將redis中訊息的狀態(status)修改為1

if (b) {

redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX +
finalId, \"status\", 1);

} else {

// 發送失敗,放入redis失敗集合中,并洗掉集合資料

log.error(\"短信訊息投送失敗:{}\--\>{}\", correlationData, s);

redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);

redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,
this.smsDTO);

}

}

/\*\*

\* 發生例外時的訊息回傳提醒

\*

\* \@param returnedMessage

\*/

\@Override

public void returnedMessage(ReturnedMessage returnedMessage) {

log.error(\"發生例外,回傳訊息回呼:{}\", returnedMessage);

// 發送失敗,放入redis失敗集合中,并洗掉集合資料

redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + finalId);

redisTemplate.opsForHash().put(RedisConstant.MQ_PRODUCER, finalId,
this.smsDTO);

}

\@PostConstruct

public void init() {

rabbitTemplate.setConfirmCallback(this);

rabbitTemplate.setReturnsCallback(this);

}

}

訊息確認機制

手動確認

yml配置

#手動確認 manual

listener:

simple:

acknowledge-mode: manual

寫法一

首先在消費者專案中創建MessageListenerConfig

import com.elegant.rabbitmqconsumer.receiver.MyAckReceiver;

import org.springframework.amqp.core.AcknowledgeMode;

import org.springframework.amqp.core.Queue;

import
org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

import
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

\@Configuration

public class MessageListenerConfig {

\@Autowired

private CachingConnectionFactory connectionFactory;

\@Autowired

private MyAckReceiver myAckReceiver;//訊息接收處理類

\@Bean

public SimpleMessageListenerContainer simpleMessageListenerContainer() {

SimpleMessageListenerContainer container = new
SimpleMessageListenerContainer(connectionFactory);

container.setConcurrentConsumers(1);

container.setMaxConcurrentConsumers(1);

container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //
RabbitMQ默認是自動確認,這里改為手動確認訊息

//設定一個佇列

container.setQueueNames(\"TestDirectQueue\");

//如果同時設定多個如下: 前提是佇列都是必須已經創建存在的

//
container.setQueueNames(\"TestDirectQueue\",\"TestDirectQueue2\",\"TestDirectQueue3\");

//另一種設定佇列的方法,如果使用這種情況,那么要設定多個,就使用addQueues

//container.setQueues(new Queue(\"TestDirectQueue\",true));

//container.addQueues(new Queue(\"TestDirectQueue2\",true));

//container.addQueues(new Queue(\"TestDirectQueue3\",true));

container.setMessageListener(myAckReceiver);

return container;

}

}

然后創建手動確認監聽類MyAckReceiver(手動確認模式需要實作ChannelAwareMessageListener)

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.Message;

import
org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import org.springframework.stereotype.Component;

import java.io.ByteArrayInputStream;

import java.io.ObjectInputStream;

import java.util.Map;

\@Component

public class MyAckReceiver implements ChannelAwareMessageListener {

\@Override

public void onMessage(Message message, Channel channel) throws Exception
{

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

byte\[\] body = message.getBody();

ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(body));

Map\<String,String\> msgMap = (Map\<String,String\>) ois.readObject();

String messageId = msgMap.get(\"messageId\");

String messageData = https://www.cnblogs.com/Changes404/archive/2023/06/16/msgMap.get(/"messageData\");

String createTime = msgMap.get(\"createTime\");

ois.close();

System.out.println(\" MyAckReceiver messageId:\"+messageId+\"
messageData:\"+messageData+\" createTime:\"+createTime);

System.out.println(\"消費的主題訊息來自:\"+message.getMessageProperties().getConsumerQueue());

channel.basicAck(deliveryTag, true);
//第二個引數,手動確認可以被批處理,當該引數為 true 時,則可以一次性確認
delivery_tag 小于等于傳入值的所有訊息

//channel.basicReject(deliveryTag,
true);//第二個引數,true會重新放回佇列,所以需要自己根據業務邏輯判斷什么時候使用拒絕

} catch (Exception e) {

channel.basicReject(deliveryTag, false);

e.printStackTrace();

}

}

}

如果想實作不同的佇列,有不同的監聽確認處理機制,做不同的業務處理,那么這樣做:

首先需要在配置類中系結佇列,然后只需要根據訊息來自不同的佇列名進行區分處理即可

import com.rabbitmq.client.Channel;

import org.springframework.amqp.core.Message;

import
org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;

import org.springframework.stereotype.Component;

import java.io.ByteArrayInputStream;

import java.io.ObjectInputStream;

import java.util.Map;

\@Component

public class MyAckReceiver implements ChannelAwareMessageListener {

\@Override

public void onMessage(Message message, Channel channel) throws Exception
{

long deliveryTag = message.getMessageProperties().getDeliveryTag();

try {

byte\[\] body = message.getBody();

ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(body));

Map\<String,String\> msgMap = (Map\<String,String\>) ois.readObject();

String messageId = msgMap.get(\"messageId\");

String messageData = https://www.cnblogs.com/Changes404/archive/2023/06/16/msgMap.get(/"messageData\");

String createTime = msgMap.get(\"createTime\");

ois.close();

if
(\"TestDirectQueue\".equals(message.getMessageProperties().getConsumerQueue())){

System.out.println(\"消費的訊息來自的佇列名為:\"+message.getMessageProperties().getConsumerQueue());

System.out.println(\"訊息成功消費到 messageId:\"+messageId+\"
messageData:\"+messageData+\" createTime:\"+createTime);

System.out.println(\"執行TestDirectQueue中的訊息的業務處理流程\...\...\");

}

if
(\"fanout.A\".equals(message.getMessageProperties().getConsumerQueue())){

System.out.println(\"消費的訊息來自的佇列名為:\"+message.getMessageProperties().getConsumerQueue());

System.out.println(\"訊息成功消費到 messageId:\"+messageId+\"
messageData:\"+messageData+\" createTime:\"+createTime);

System.out.println(\"執行fanout.A中的訊息的業務處理流程\...\...\");

}

channel.basicAck(deliveryTag, true);

//channel.basicReject(deliveryTag, true);//為true會重新放回佇列

} catch (Exception e) {

channel.basicReject(deliveryTag, false);

e.printStackTrace();

}

}

}

寫法二

\@Component

\@Slf4j

public class SendSmsListener {

\@Resource

private RedisTemplate\<String, String\> redisTemplate;

\@Resource

private SendSmsUtils sendSmsUtils;

/\*\*

\* 監聽發送短信普通佇列

\* \@param smsDTO

\* \@param message

\* \@param channel

\* \@throws IOException

\*/

\@RabbitListener(queues = SMS_QUEUE_NAME)

public void sendSmsListener(SmsDTO smsDTO, Message message, Channel
channel) throws IOException {

String messageId = message.getMessageProperties().getMessageId();

int retryCount = (int)
redisTemplate.opsForHash().get(RedisConstant.SMS_MESSAGE_PREFIX +
messageId, \"retryCount\");

if (retryCount \> 3) {

//重試次數大于3,直接放到死信佇列

log.error(\"短信訊息重試超過3次:{}\", messageId);

//basicReject方法拒絕deliveryTag對應的訊息,第二個引數是否requeue,true則重新入佇列,否則丟棄或者進入死信佇列,

//該方法reject后,該消費者還是會消費到該條被reject的訊息,

channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);

redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);

return;

}

try {

String phoneNum = smsDTO.getPhoneNum();

String code = smsDTO.getCode();

if(StringUtils.isAnyBlank(phoneNum,code)){

throw new RuntimeException(\"sendSmsListener引數為空\");

}

// 發送訊息

SendSmsResponse sendSmsResponse = sendSmsUtils.sendSmsResponse(phoneNum,
code);

SendStatus\[\] sendStatusSet = sendSmsResponse.getSendStatusSet();

SendStatus sendStatus = sendStatusSet\[0\];

if(!\"Ok\".equals(sendStatus.getCode()) \|\|!\"send
success\".equals(sendStatus.getMessage())){

throw new RuntimeException(\"發送驗證碼失敗\");

}

//手動確認訊息

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

log.info(\"短信發送成功:{}\",smsDTO);

redisTemplate.delete(RedisConstant.SMS_MESSAGE_PREFIX + messageId);

} catch (Exception e) {

redisTemplate.opsForHash().put(RedisConstant.SMS_MESSAGE_PREFIX+messageId,\"retryCount\",retryCount+1);

channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);

}

}

/\*\*

\* 監聽到發送短信死信佇列

\* \@param sms

\* \@param message

\* \@param channel

\* \@throws IOException

\*/

\@RabbitListener(queues = SMS_DELAY_QUEUE_NAME)

public void smsDelayQueueListener(SmsDTO sms, Message message, Channel
channel) throws IOException {

try{

log.error(\"監聽到死信佇列訊息==\>{}\",sms);

channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

}catch (Exception e){

channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);

}

}

}

消費端限流

#配置RabbitMQ

spring:

rabbitmq:

host: 192.168.126.3

port: 5672

username: guest

password: guest

virtual-host: /

#開啟自動確認 none 手動確認 manual

listener:

simple:

#消費端限流機制必須開啟手動確認

acknowledge-mode: manual

#消費端最多拉取的訊息條數,簽收后不滿該條數才會繼續拉取

prefetch: 5

訊息存活時間TTL

可以設定佇列的存活時間,也可以設定具體訊息的存活時間

設定佇列中所有訊息的存活時間

return QueueBuilder

.durable(QUEUE_NAME)//佇列持久化

.ttl(10000)//設定佇列的所有訊息存活10s

.build();

即在創建佇列時,設定存活時間

設定某條訊息的存活時間

//發送訊息,并設定該訊息的存活時間

\@Test

public void testSendMessage()

{

//1.創建訊息屬性

MessageProperties messageProperties = new MessageProperties();

//2.設定存活時間

messageProperties.setExpiration(\"10000\");

//3.創建訊息物件

Message message = new
Message(\"sendMessage\...\".getBytes(),messageProperties);

//4.發送訊息

rabbitTemplate.convertAndSend(\"my_topic_exchange1\",\"my_routing\",message);

}

若設定中間的訊息的存活時間,當過期時,該訊息不會被移除,但是該訊息已經不會被消費了,需要等到該訊息到隊里頂端才會被移除,因為佇列是頭出,尾進,故而要移除它需要等到它在頂端時才可以,

在佇列設定存活時間,也在單條訊息設定存活時間,則以時間短的為準

死信佇列

死信佇列和普通佇列沒有任何區別,只需要將普通佇列需要系結死信交換機和死信佇列就能夠實作功能

import org.springframework.amqp.core.\*;

import org.springframework.beans.factory.annotation.Qualifier;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

\@Configuration//Rabbit配置類

public class RabbitConfig4 {

private final String DEAD_EXCHANGE = \"dead_exchange\";

private final String DEAD_QUEUE = \"dead_queue\";

private final String NORMAL_EXCHANGE = \"normal_exchange\";

private final String NORMAL_QUEUE = \"normal_queue\";

//創建死信交換機

\@Bean(DEAD_EXCHANGE)

public Exchange deadExchange()

{

return ExchangeBuilder

.topicExchange(DEAD_EXCHANGE)//交換機型別 ;引數為名字
topic為通配符模式的交換機

.durable(true)//是否持久化,true即存到磁盤,false只在記憶體上

.build();

}

//創建死信佇列

\@Bean(DEAD_QUEUE)

public Queue deadQueue()

{

return QueueBuilder

.durable(DEAD_QUEUE)//佇列持久化

//.maxPriority(10)//設定佇列的最大優先級,最大可以設定255,但官網推薦不超過10,太高比較浪費資源

.build();

}

//死信交換機系結死信佇列

\@Bean

//@Qualifier注解,使用名稱裝配進行使用

public Binding bindDeadQueue(@Qualifier(DEAD_EXCHANGE) Exchange
exchange, \@Qualifier(DEAD_QUEUE) Queue queue)

{

return BindingBuilder

.bind(queue)

.to(exchange)

.with(\"dead_routing\")

.noargs();

}

//創建普通交換機

\@Bean(NORMAL_EXCHANGE)

public Exchange normalExchange()

{

return ExchangeBuilder

.topicExchange(NORMAL_EXCHANGE)//交換機型別 ;引數為名字
topic為通配符模式的交換機

.durable(true)//是否持久化,true即存到磁盤,false只在記憶體上

.build();

}

//創建普通佇列

\@Bean(NORMAL_QUEUE)

public Queue normalQueue()

{

return QueueBuilder

.durable(NORMAL_QUEUE)//佇列持久化

//.maxPriority(10)//設定佇列的最大優先級,最大可以設定255,但官網推薦不超過10,太高比較浪費資源

.deadLetterExchange(DEAD_EXCHANGE)//系結死信交換機

.deadLetterRoutingKey(\"dead_routing\")//死信佇列路由關鍵字

.ttl(10000)//訊息存活10s

.maxLength(10)//佇列最大長度為10

.build();

}

//普通交換機系結普通佇列

\@Bean

//@Qualifier注解,使用名稱裝配進行使用

public Binding bindNormalQueue(@Qualifier(NORMAL_EXCHANGE) Exchange
exchange, \@Qualifier(NORMAL_QUEUE) Queue queue)

{

return BindingBuilder

.bind(queue)

.to(exchange)

.with(\"my_routing\")

.noargs();

}

}

延遲佇列

RabbitMQ并未實作延遲佇列功能,所以可以通過死信佇列實作延遲佇列的功能

即給普通佇列設定存活時間30分鐘,過期后發送至死信佇列,在死信消費者監聽死信佇列訊息,查看訂單狀態,是否支付,未支付則取消訂單,回退庫存即可,

消費者監聽延遲佇列

\@Component

public class ExpireOrderConsumer {

//監聽過期訂單佇列

\@RabbitListener(queues = \"expire_queue\")

public void listenMessage(String orderId)

{

//模擬處理資料庫等業務

System.out.println(\"查詢\"+orderId+\"號訂單的狀態,如果已支付無需處理,如果未支付則回退庫存\");

}

}

控制層代碼

\@RestController

public class OrderController {

\@Autowired

private RabbitTemplate rabbitTemplate;

\@RequestMapping(value = https://www.cnblogs.com/"/place/{orderId}\",method =
RequestMethod.GET)

public String placeOrder(@PathVariable String orderId)

{

//模擬service層處理

System.out.println(\"處理訂單資料\...\");

//將訂單id發送到訂單佇列

rabbitTemplate.convertAndSend(\"order_exchange\",\"order_routing\",orderId);

return \"下單成功,修改庫存\";

}

}

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/555344.html

標籤:其他

上一篇:HTTP請求:requests的進階使用方法淺析

下一篇:返回列表

標籤雲
其他(161110) Python(38236) JavaScript(25498) Java(18244) C(15237) 區塊鏈(8271) C#(7972) AI(7469) 爪哇(7425) MySQL(7254) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5875) 数组(5741) R(5409) Linux(5347) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4599) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2436) ASP.NET(2404) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) .NET技术(1984) 功能(1967) HtmlCss(1967) Web開發(1951) C++(1941) python-3.x(1918) 弹簧靴(1913) xml(1889) PostgreSQL(1881) .NETCore(1863) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • RabbitMQ快速使用代碼手冊

    本篇博客的內容為RabbitMQ在開發程序中的快速上手使用,側重于代碼部分,幾乎沒有相關概念的介紹,相關概念請參考以下csdn博客,兩篇都是我找的精華帖,供大家學習。本篇博客也持續更新~~~ ......

    uj5u.com 2023-06-17 07:27:13 more
  • HTTP請求:requests的進階使用方法淺析

    上篇文章講解了requests模塊的基礎使用,其中有get、put、post等多種請求方式,使用data、json等格式做為請求引數,在請求體中添加請求頭部資訊的常見資訊,如:headers、cookies,以及對請求回應的處理方法。接下來講解一下requests的高級用法。 ......

    uj5u.com 2023-06-17 07:27:08 more
  • 逍遙自在學C語言 | 指標的基礎用法

    ## 前言 在C語言中,指標是一項重要的概念,它允許我們直接訪問和操作記憶體地址。 可以說,指標是C語言一大優勢。用得好,你寫程式如同趙子龍百萬軍中取上將首級;用得不好,則各種問題層出不窮,有種雙拳難敵四手的感覺。 本文將介紹指標的基礎知識,包括指標的定義、初始化、訪問和運算。 ## 一、人物簡介 - ......

    uj5u.com 2023-06-17 07:26:58 more
  • Python 自動化測驗的配置層實作方式對標與落地

    Python中什么是組態檔,組態檔如何使用,有哪些支持的組態檔等內容,話不多說,讓我們一起看看吧~ ## 1 什么是組態檔? 組態檔是用于配置計算機程式的引數和初始化設定的檔案,如果沒有這些配置程式可能無法運行或是影響運行(運行速度、便捷性等),使用組態檔的好處在于,部分內容以及環境運行 ......

    uj5u.com 2023-06-17 07:26:51 more
  • 內網環境下批量安裝python庫

    最近組里安排了新內網,又要配環境。 眾所周知,內網安裝python庫需要先到www.pypi.org找到對應版本的包,然后再下載whl檔案,上傳到內網,再用`pip install "檔案地址"`去安裝。 這樣就會出現一個問題,鬼知道這個包需要的前置依賴是什么,pip會自動檢查前置依賴,然后自動從源 ......

    uj5u.com 2023-06-17 07:26:43 more
  • Servlet重要類及其方法的應用

    # Servlet重要類及其方法的應用 ## Servlet重要類及其方法的應用 ### 1.1 HttpServlet的一些方法介紹 ```java // this.getInitParameter();得到初始化引數 // this.getServletConfig();得到servlet配置就 ......

    uj5u.com 2023-06-17 07:26:31 more
  • 前端學習C語言 - 陣列和位元組序

    ## 陣列 本篇主要介紹:`一維二維陣列`、`字符陣列`、`陣列名和初始化注意點`以及`位元組序`。 ### 一維陣列 #### 初始化 有以下幾種方式對陣列初始化: ```c // 定義一個有5個元素的陣列,未初始化 int a[5]; // 定義一個有5個元素的陣列,將第一個初始化0,后面幾個元素 ......

    uj5u.com 2023-06-17 07:26:26 more
  • springBoot 自動裝配

    1.前言 自動裝配則是 SpringBoot 的核心,自動裝配是如何實作的呢?為什么我們只要引入一個 starter 組件依賴就能實作自動裝配呢,接下來就讓我們一起來探討下 SpringBoot 的自動裝配機制 2.自動裝配原理 提到自動裝配,那么你首先得知道spring的SPI(servicepr ......

    uj5u.com 2023-06-17 07:26:13 more
  • [ARM 匯編]進階篇—存盤訪問指令—2.3.2 多資料傳輸指令

    在 ARM 匯編中,多資料傳輸指令用于一次性從存盤器中加載多個資料到暫存器組,或將暫存器組中的多個資料存盤到存盤器。這些指令通常用于高效地處理陣列、結構體等資料結構。在本節中,我們將詳細介紹 ARM 匯編中的多資料傳輸指令,并通過實體幫助你更好地理解和掌握這些指令。 1. 加載多個資料到暫存器組(L ......

    uj5u.com 2023-06-17 07:26:02 more
  • 成為Spring Boot大師:推薦一門精選視頻課程

    Spring Boot是Java生態系統中備受追捧的開發框架之一,它簡化了Java應用程式的搭建和配置程序,使開發者能夠更快速、高效地構建強大的應用程式。如果你希望在Spring Boot領域中邁向專家級水平,并且想要通過一門優質的視頻課程來加速你的學習程序,我們向你推薦以下精選課程: 鏈接:[ht ......

    uj5u.com 2023-06-17 07:25:42 more