RabbitMQ
整合RabbitMQ
/**
* 使用RabbitMQ
* 1、引入ampq場景,RabbitAutoConfiguration 就會自動生效
* 2、給容器中自動配置了
* RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate
* 所有的屬性都是在
* @EnableConfigurationProperties(RabbitProperties.class)
* @ConfigurationProperties(prefix = "spring.rabbitmq")
* public class RabbitProperties
* 3、給組態檔中配置 spring.rabbitmq 資訊
* 4、@EnableRabbit 開啟功能
* 5、監聽訊息:使用 @RabbitListener,必須有 @EnableRabbit
* @RabbitListener:類 + 方法上
* @RabbitHandler: 只能標在方法上
*/
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
# rabbit 組態檔
spring.rabbitmq.host=192.168.106.101
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
測驗
package com.atguigu.gulimall.order;
import com.atguigu.gulimall.order.entity.OrderReturnApplyEntity;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.Date;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class GulimallOrderApplicationTests {
@Autowired
AmqpAdmin amqpAdmin;
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 1、創建Exchange[hello.java.exchange]、Queue、Binding
* - 使用 AmqpAdmin 進行創建
*
* 2、如何收發訊息 -> RabbitTemplate
* 如果發送的訊息是個物件,使用序列化機制,將物件寫出去,物件實作 Serializable 介面
* 自定義序列化添加配置
* @Configuration
* public class MyRabbitConfig {
* @Bean
* public MessageConverter messageConverter() {
* return new Jackson2JsonMessageConverter();
* }
* }
*/
@Test
public void sendMessageTest() {
String msg = "Hello World";
OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
orderReturnApplyEntity.setId(1L);
orderReturnApplyEntity.setSkuName("華為");
orderReturnApplyEntity.setCreateTime(new Date());
rabbitTemplate.convertAndSend("hello.java.exchange", "hello.java", orderReturnApplyEntity);
log.info("訊息發送完成:{}", orderReturnApplyEntity);
}
@Test
public void createExchange() {
//amqpAdmin
/**
* DirectExchange
* public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)
*/
DirectExchange exchange = new DirectExchange("hello.java.exchange", true,false);
amqpAdmin.declareExchange(exchange);
log.info("Exchange[{}]創建成功", "hello.java.exchange");
}
@Test
public void createQueue() {
/**
* public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete)
*/
Queue queue = new Queue("hello-java-queue", true, false,true);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]創建成功", "hello-java-queue");
}
@Test
public void createBinding() {
/**
* public Binding(String destination【目的地】,
* DestinationType destinationType【目的地型別】,
* String exchange【交換機】,
* String routingKey【路由鍵】,
* Map<String, Object> arguments)【引數】
* 將 exchange 指定交換機和 destination目的地進行系結,使用routingKey作為指定路由鍵
*/
Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello.java.exchange","hello.java",null);
amqpAdmin.declareBinding(binding);
log.info("Binding == 創建成功");
}
}
測驗監聽訊息
/**
* queues:宣告需要監聽的所欲佇列
*
* org.springframework.amqp.core.Message;
*
* 引數可以寫以下型別
* 1、Message message;原生訊息詳細資訊,頭 + 體
* 2、T<發送的訊息的型別> OrderReturnApplyEntity content
* 3、Channel channel:當前傳輸資料的通道
*
* Queue:可以很多人都來監聽,只要收到訊息,佇列洗掉訊息,而且只有一個人收到此訊息
* 1、訂單服務啟動多個:同一個訊息,只能有一個客戶端收到
* 2、只有一個訊息完全處理完,方法運行結束,我們就可以接受到下一個訊息
*/
//@RabbitListener(queues = {"hello-java-queue"})
@RabbitHander
public void receiveMessage(Message message, OrderReturnReasonEntity content) {
System.out.println("接收到訊息....:"+ message + "===>內容;" + content + "型別是:" + message.getClass());
byte[] body = message.getBody();
//訊息頭屬性資訊
MessageProperties properties = message.getMessageProperties();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("訊息處理完成=》" + content.getName());
}
@RabbitListener
簡介:
1.用于標注在監聽類或監聽方法上,接收訊息,需要指定監聽的佇列(陣列)
2.使用該注解之前,需要在啟動類加上該注解:@EnableRabbit
3.@RabbitListener即可以標注在方法上又可以標注在類上
標注在類上:表示該類是監聽類,使得@RabbitHandler注解生效
標注在方法上:表示該方法時監聽方法,會監聽指定佇列獲得訊息
4.一般只標注在方法上,并配合@RabbitHandler使用,多載的方式接收不同訊息物件
@RabbitHandler
作用:
配合@RabbitListener,使用方法多載的方法接收不同的訊息型別
簡介:
1.用于標注在監聽方法上,接收訊息,不需要指定監聽的佇列
2.使用該注解之前,需要在啟動類加上該注解:@EnableRabbit
3.@RabbitListener只可以標注在方法,多載的方式接收不同訊息物件
發送端訊息確認配置
1、配置
2、定制 RabbitTemplate,設定確認回呼
# rabbit 組態檔
spring.rabbitmq.host=192.168.106.101
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
# 開啟發送端確認
spring.rabbitmq.publisher-confirms=true
#開啟發送端訊息抵達確認
spring.rabbitmq.publisher-returns=true
#只要抵達佇列,以異步發送優先回呼returnconfirm
spring.rabbitmq.template.mandatory=true
# 手動ack訊息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
package com.atguigu.gulimall.order.config;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 定制 rabbitTemplate
* 1、服務收到訊息就回呼
* 1、spring.rabbitmq.publisher-confirms=true
* 2、設定確認回呼ConfirmCallback
* 2、訊息正確地打佇列進行回呼
* 1、spring.rabbitmq.publisher-returns=true
* spring.rabbitmq.template.mandatory=true
* 2、設定訊息抵達佇列的回呼
* 3、消費端確認【保證每一個訊息被正確消費,此時才可以讓broker洗掉】
* 1、默認是自動確認,只要訊息接受到,自動確認,服務端就會移除這個訊息
* 2、手動確認默認,只要沒有明確告訴MQ,貨物被簽收,沒有ACK,訊息一直是unacked狀態,
* 即使Cosumer宕機,訊息也不會丟失,會重新變成Ready,等待下一次新的consumer鏈接發給他
* 3、如果手動確認:Channel channel -> long deliveryTag = properties.getDeliveryTag(); -> channel.basicAck(deliveryTag, false);
* channel.basicAck(deliveryTag, false); 簽收
* channel.basicNack(deliveryTag, false, true); 拒簽
*/
@PostConstruct // MyRabbitConfig 物件創建完成以后執行這個方法
public void initRabbitTemplate(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 只要抵達服務器,ack就確認為true
* @param correlationData 當前訊息的唯一關聯資料(訊息的唯一id)
* @param ack 是否成功或者失敗
* @param cause 失敗的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm..." + correlationData + "==> ack:" + ack + "==> cause:" + cause);
}
});
//設定訊息抵達佇列的回呼
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要訊息沒有投遞給指定的佇列,就觸發失敗回呼
* @param message 投遞失敗的訊息詳細資訊
* @param replyCode 回復的狀態碼
* @param replyText 回復的文本內容
* @param exchange 訊息發給那個交換機
* @param routingKey 當時這個訊息使用哪個路由鍵
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("Fail Message:" + message + "==> replyTest:" + replyText + "==>exchange" + exchange + "==>routingKey:" + routingKey);
}
});
}
}
/**
* queues:宣告需要監聽的所欲佇列
* <p>
* org.springframework.amqp.core.Message;
* <p>
* 引數可以寫以下型別
* 1、Message message;原生訊息詳細資訊,頭 + 體
* 2、T<發送的訊息的型別> OrderReturnApplyEntity content
* 3、Channel channel:當前傳輸資料的通道
* <p>
* Queue:可以很多人都來監聽,只要收到訊息,佇列洗掉訊息,而且只有一個人收到此訊息
* 1、訂單服務啟動多個:同一個訊息,只能有一個客戶端收到
* 2、只有一個訊息完全處理完,方法運行結束,我們就可以接受到下一個訊息
*/
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws IOException {
//System.out.println("接收到訊息....:"+ message + "===>內容;" + content + "型別是:" + message.getClass());
System.out.println("接收到訊息....:" + content);
byte[] body = message.getBody();
//訊息頭屬性資訊
MessageProperties properties = message.getMessageProperties();
/*try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}*/
System.out.println("訊息處理完成=》" + content.getName());
long deliveryTag = properties.getDeliveryTag();
System.out.println("deliverTag: " + deliveryTag);
if (deliveryTag % 2 == 0) {
//識訓
// 簽識訓取,非批量模式
channel.basicAck(deliveryTag, false);
} else {
//requeue 重新入隊
//basicNack(long deliveryTag, boolean multiple, boolean requeue)
channel.basicNack(deliveryTag, false, true);
System.out.println("沒有簽收的貨物....." + deliveryTag);
}
}
最終整合
1.匯入mq依賴
<!--amqp高級訊息佇列協議,rabbitmq實作-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.ware模塊匯入配置
spring:
rabbitmq:
host: 192.168.56.10
port: 5672
# 虛擬主機
virtual-host: /
# 開啟發送端發送確認,無論是否到達broker都會觸發回呼【發送端確認機制+本地事務表】
publisher-confirm-type: correlated
# 開啟發送端抵達佇列確認,訊息未被佇列接收時觸發回呼【發送端確認機制+本地事務表】
publisher-returns: true
# 訊息在沒有被佇列接收時是否強行退回
template:
mandatory: true
# 消費者手動確認模式,關閉自動確認,否則會訊息丟失
listener:
simple:
acknowledge-mode: manual
3.添加注解
// 開啟rabbit
@EnableRabbit
4.創建配置類
/**
* @Author: wanzenghui
* @Date: 2021/12/15 0:04
*/
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter() {
// 使用json序列化器來序列化訊息,發送訊息時,訊息物件會被序列化成json格式
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
* 1、服務收到訊息就會回呼
* 1、spring.rabbitmq.publisher-confirms: true
* 2、設定確認回呼
* 2、訊息正確抵達佇列就會進行回呼
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、設定確認回呼ReturnCallback
* <p>
* 3、消費端確認(保證每個訊息都被正確消費,此時才可以broker洗掉這個訊息)
*/
@PostConstruct // (MyRabbitConfig物件創建完成以后,執行這個方法)
public void initRabbitTemplate() {
/**
* 發送訊息觸發confirmCallback回呼
* @param correlationData:當前訊息的唯一關聯資料(如果發送訊息時未指定此值,則回呼時回傳null)
* @param ack:訊息是否成功收到(ack=true,訊息抵達Broker)
* @param cause:失敗的原因
*/
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("發送訊息觸發confirmCallback回呼" +
"\ncorrelationData =https://www.cnblogs.com/loongnuts/archive/2022/11/10/==>" + correlationData +
"\nack ===> " + ack + "" +
"\ncause ===> " + cause);
System.out.println("=================================================");
});
/**
* 訊息未到達佇列觸發returnCallback回呼
* 只要訊息沒有投遞給指定的佇列,就觸發這個失敗回呼
* @param message:投遞失敗的訊息詳細資訊
* @param replyCode:回復的狀態碼
* @param replyText:回復的文本內容
* @param exchange:接收訊息的交換機
* @param routingKey:接收訊息的路由鍵
*/
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// 需要修改資料庫 訊息的狀態【后期定期重發訊息】
System.out.println("訊息未到達佇列觸發returnCallback回呼" +
"\nmessage ===> " + message +
"\nreplyCode ===> " + replyCode +
"\nreplyText ===> " + replyText +
"\nexchange ===> " + exchange +
"\nroutingKey ===> " + routingKey);
System.out.println("==================================================");
});
}
}
5.創建ware解鎖庫存的延時佇列、死信佇列、交換機、系結關系
/**
* 創建佇列,交換機,延時佇列,系結關系 的configuration
* 1.Broker中的Queue、Exchange、Binding不存在的情況下,會自動創建(在RabbitMQ),不會重復創建覆寫
* 2.懶加載,只有第一次使用的時候才會創建(例如監聽佇列)
*/
@Configuration
public class MyRabbitMQConfig {
/**
* 用于首次創建佇列、交換機、系結關系的監聽
* @param message
*/
@RabbitListener(queues = "stock.release.stock.queue")
public void handle(Message message) {
}
/**
* 交換機
* Topic,可以系結多個佇列
*/
@Bean
public Exchange stockEventExchange() {
//String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
return new TopicExchange("stock-event-exchange", true, false);
}
/**
* 死信佇列
*/
@Bean
public Queue stockReleaseStockQueue() {
//String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
return new Queue("stock.release.stock.queue", true, false, false);
}
/**
* 延時佇列
*/
@Bean
public Queue stockDelay() {
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "stock-event-exchange");
arguments.put("x-dead-letter-routing-key", "stock.release");
// 訊息過期時間 2分鐘
arguments.put("x-message-ttl", 120000);
return new Queue("stock.delay.queue", true, false, false,arguments);
}
/**
* 系結:交換機與死信佇列
*/
@Bean
public Binding stockLocked() {
//String destination, DestinationType destinationType, String exchange, String routingKey,
// Map<String, Object> arguments
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.release.#",
null);
}
/**
* 系結:交換機與延時佇列
*/
@Bean
public Binding stockLockedBinding() {
return new Binding("stock.delay.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.locked",
null);
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/531453.html
標籤:其他
下一篇:淺談PHP設計模式的模板方法模式
