1.前言說明
前面一篇博客中提到了使用原生java代碼進行測驗RabbitMQ實作多種交換機型別的佇列場景,但是在專案中我們一般使用SpringBoot專案,而且RabbitMQ天生對于Spring的支持是非常良好的,所以這里基于SpringBoot我搭建了一個模擬購買商品訂單下單并發送訊息使用RabbitMQ訊息佇列的場景來分析實作不同模式下的場景,
也是對于SpringBoot整合RabbitMQ的一種總結,
使用到的模型如下圖所示,在下訂單處理的同時,采用訊息佇列生產者向MQ訊息中間件中生產訊息發送給對應的佇列,創建消費者來消費佇列中的訊息呼叫服務,

2.基于SpringBoot配置類構建訊息佇列
專案構建我采用的是IDEA中Spring Initializr構建器創建的SpringBoot Maven專案,這部分主要是使用到了Spring RabbitMQ與SpringBoot Web的依賴組件,
由于原生支持,在IDEA中勾選對應的選項即可,非常簡單,無需考慮多余的Maven Repository引入,
創建SpringBoot專案主要有springboot-order-rabbitmq-consumer與springboot-order-rabbitmq-producer兩個Module,

這里還是簡單說明一下pom.xml與application.yml配置:
pom.xml
<dependencies> <!--rabbitmq starter依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies>View Code
application.yml
# 服務埠
server:
port: 8080
# 配置rabbitmq服務
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
host: 127.0.0.1 #基于本地windows RabbitMQ測驗,云服務填寫對應地址即可
port: 5672
View Code
2.1.生產者配置類
RabbitMQ中訊息佇列模式主要常用的模式就是:fanout、direct、topic模式,這里我主要講解fanout與direct進行配置類構建生產者消費者,
整合生成訊息佇列(交換機、Queues及系結關系、Routing key)可以從生產者端也可從消費者端進行,
主要構建方式有兩種:
①配置類生成交換機與佇列
②注解形式系結交換機佇列關系(topic使用注解方式構建)
這里先說第一種配置類方式:
使用配置類生成訊息生產者佇列主要配置類說明:
主要配置類XxxTypeRabbitConfig
//注意:XxxType表示是交換機型別:可以是Fanout/Direct/Topic/Headers @Configuration public class XxxTypeRabbitConfig { //使用注入方式宣告對應的Queue @Bean public Queue emailQueue() { // durable:是否持久化,默認是false,持久化佇列:會被存盤在磁盤上,當訊息代理重啟時仍然存在,暫存佇列:當前連接有效 // exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后佇列即被洗掉,此參考優先級高于durable // autoDelete:是否自動洗掉,當沒有生產者或者消費者使用此佇列,該佇列會自動洗掉, //一般設定一下佇列的持久化就好,其余兩個就是默認false return new Queue("email.xxxType.queue", true); } @Bean public Queue smsQueue() { return new Queue("sms.xxxType.queue", true); } @Bean public Queue weixinQueue() { return new Queue("weixin.xxxType.queue", true); } //宣告交換機,不同的交換機型別不同:DirectExchange/FanoutExchange/TopicExchange/HeadersExchange @Bean public XxxTypeExchange xxxTypeOrderExchange() { return new XxxTypeExchange("xxxType_order_exchange", true, false); } //系結關系:將佇列和交換機系結, 并設定用于匹配鍵:routingKey @Bean public Binding bindingXxxType1() { return BindingBuilder .bind(weixinQueue()) //系結哪個Queue .to(fanoutOrderExchange()); //是哪個交換機 } @Bean public Binding bindingXxxType2() { return BindingBuilder.bind(smsQueue()).to(xxxTypeOrderExchange()); } @Bean public Binding bindingXxxType3() { return BindingBuilder.bind(emailQueue()).to(xxxTypeOrderExchange()); } }View Code
訊息發送類,主要給創建的佇列填充訊息,這里主要用到RabbitTemplate類呼叫convertAndSend方法進行對應交換機訊息佇列的發送:
@Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; // 1: 定義交換機 private String exchangeName = ""; // 2: 路由key private String routeKey = ""; //XxxType型別交換機 public void makeOrderXxxType(Long userId, Long productId, int num) { exchangeName = "xxxType_order_exchange"; routeKey = ""; // 1: 模擬用戶下單 String orderNumer = UUID.randomUUID().toString(); // 2: 根據商品id productId 去查詢商品的庫存 // int numstore = productSerivce.getProductNum(productId); // 3:判斷庫存是否充足 // if(num > numstore ){ return "商品庫存不足..."; } // 4: 下單邏輯 // orderService.saveOrder(order); // 5: 下單成功要扣減庫存 // 6: 下單完成以后 System.out.println("用戶 " + userId + ",訂單編號是:" + orderNumer); // 發送訂單資訊給RabbitMQ xxxType rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer); } }View Code
2.2.Fanout模式訊息生產者
①創建交換機與佇列生成配置類,注意fanout這里系結Queues的時候不要設定routing key,是采用廣播訂閱發送的方式:
/** * @Description: fanout交換機型別就是對應的訊息采用廣播訂閱模式,訂閱系結交換機的佇列都應該收到訊息 * @Author: fengye * @Date: 2021/4/16 14:29 */ @Configuration public class FanoutRabbitConfig { //使用注入方式宣告對應的Queue @Bean public Queue emailQueue() { // durable:是否持久化,默認是false,持久化佇列:會被存盤在磁盤上,當訊息代理重啟時仍然存在,暫存佇列:當前連接有效 // exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后佇列即被洗掉,此參考優先級高于durable // autoDelete:是否自動洗掉,當沒有生產者或者消費者使用此佇列,該佇列會自動洗掉, //一般設定一下佇列的持久化就好,其余兩個就是默認false return new Queue("email.fanout.queue", true); } @Bean public Queue smsQueue() { return new Queue("sms.fanout.queue", true); } @Bean public Queue weixinQueue() { return new Queue("weixin.fanout.queue", true); } //宣告交換機,不同的交換機型別不同:DirectExchange/FanoutExchange/TopicExchange/HeadersExchange @Bean public FanoutExchange fanoutOrderExchange() { return new FanoutExchange("fanout_order_exchange", true, false); } //系結關系:將佇列和交換機系結, 并設定用于匹配鍵:routingKey @Bean public Binding bindingFanout1() { return BindingBuilder .bind(weixinQueue()) //系結哪個Queue .to(fanoutOrderExchange()); //是哪個交換機 } @Bean public Binding bindingFanout2() { return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()); } @Bean public Binding bindingFanout3() { return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()); } }
②訊息佇列發送到Queue,使用OrderService進行發送,主要用到了RabbitTemplate:
@Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; // 1: 定義交換機 private String exchangeName = ""; // 2: 路由key private String routeKey = ""; //Fanout型別交換機 public void makeOrderFanout(Long userId, Long productId, int num) { exchangeName = "fanout_order_exchange"; routeKey = ""; // 1: 模擬用戶下單 String orderNumer = UUID.randomUUID().toString(); // 2: 根據商品id productId 去查詢商品的庫存 // int numstore = productSerivce.getProductNum(productId); // 3:判斷庫存是否充足 // if(num > numstore ){ return "商品庫存不足..."; } // 4: 下單邏輯 // orderService.saveOrder(order); // 5: 下單成功要扣減庫存 // 6: 下單完成以后 System.out.println("用戶 " + userId + ",訂單編號是:" + orderNumer); // 發送訂單資訊給RabbitMQ fanout rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer); } }
③生產者方啟動測驗類向fanout_order_exchange交換機佇列發送訊息,存盤到訊息佇列中:
@SpringBootTest class RabbitmqApplicationTests { @Autowired private OrderService orderService; @Test void fanoutTest() throws InterruptedException { for (int i = 0; i < 10; i++) { Thread.sleep(1000); Long userId = 100L + i; Long productId = 10001L + i; int num = 10; orderService.makeOrderFanout(userId, productId, num); } } }
運行結果:



生成佇列并存盤10條訊息,
2.3.Fanout模式訊息消費者
①配置類實作訊息消費者佇列比較簡單,主要就是使用@RabbitListener系結對應的佇列,并使用@RabbitHandler接收訊息對應中的引數資訊即可,注意選擇合適的資料型別接收:

對應訊息佇列類配置:
//通過@RabbitListener系結佇列接收訊息 @RabbitListener(queues = {"weixin.fanout.queue"}) @Component public class FanoutDuanxinConsumer { //佇列中的訊息會通過@RabbitHandler注解注入到方法引數中,就可以獲取到佇列中的訊息 @RabbitHandler public void reviceMessage(String message){ System.out.println("weixin fanout----接收到了訂單資訊是:->" + message); } } @RabbitListener(queues = {"email.fanout.queue"}) @Component public class FanoutEmailConsumer { @RabbitHandler public void reviceMessage(String message){ System.out.println("email fanout----接收到了訂單資訊是:->" + message); } } @RabbitListener(queues = {"sms.fanout.queue"}) @Component public class FanoutSMSConsumer { @RabbitHandler public void reviceMessage(String message){ System.out.println("sms fanout----接收到了訂單資訊是:->" + message); } }
啟動訊息接收者consumer SpringBoot專案:


可以看到訊息佇列存盤訊息已被消費,控制臺列印出了對應的訊息資訊,
2.4.Direct模式訊息生產者
Direct模式訊息生產者基于配置類構建與Fanout一樣,這里簡單說明一下配置類的增加的代碼就行:
修改XxxTypeConfig基類為DirectExchange:
/** * @Description: direct交換機型別采用routing key與Queue進行系結,通過key不同一對一進行訊息傳遞 * @Author: fengye * @Date: 2021/4/16 14:29 */ @Configuration public class DirectRabbitConfig { //使用注入方式宣告對應的Queue @Bean public Queue emailQueue() { // durable:是否持久化,默認是false,持久化佇列:會被存盤在磁盤上,當訊息代理重啟時仍然存在,暫存佇列:當前連接有效 // exclusive:默認也是false,只能被當前創建的連接使用,而且當連接關閉后佇列即被洗掉,此參考優先級高于durable // autoDelete:是否自動洗掉,當沒有生產者或者消費者使用此佇列,該佇列會自動洗掉, //一般設定一下佇列的持久化就好,其余兩個就是默認false return new Queue("email.direct.queue", true); } @Bean public Queue smsQueue() { return new Queue("sms.direct.queue", true); } @Bean public Queue weixinQueue() { return new Queue("weixin.direct.queue", true); } //宣告交換機,不同的交換機型別不同:DirectExchange/FanoutExchange/TopicExchange/HeadersExchange @Bean public DirectExchange directOrderExchange() { return new DirectExchange("direct_order_exchange", true, false); } //系結關系:將佇列和交換機系結, 并設定用于匹配鍵:routingKey @Bean public Binding bindingFanout1() { return BindingBuilder .bind(weixinQueue()) //系結哪個Queue .to(directOrderExchange()) //是哪個交換機 .with("weixin"); //對應什么key } @Bean public Binding bindingFanout2() { return BindingBuilder.bind(smsQueue()).to(directOrderExchange()).with("sms"); } @Bean public Binding bindingFanout3() { return BindingBuilder.bind(emailQueue()).to(directOrderExchange()).with("email"); } }
對應訊息發送Service類:
@Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; // 1: 定義交換機 private String exchangeName = ""; // 2: 路由key private String routeKey = ""; //Direct型別交換機 public void makeOrderDirect(Long userId, Long productId, int num) { exchangeName = "direct_order_exchange"; routeKey = "weixin"; // 1: 模擬用戶下單 String orderNumer = UUID.randomUUID().toString(); // 2: 根據商品id productId 去查詢商品的庫存 // int numstore = productSerivce.getProductNum(productId); // 3:判斷庫存是否充足 // if(num > numstore ){ return "商品庫存不足..."; } // 4: 下單邏輯 // orderService.saveOrder(order); // 5: 下單成功要扣減庫存 // 6: 下單完成以后 System.out.println("用戶 " + userId + ",訂單編號是:" + orderNumer); // 發送訂單資訊給RabbitMQ fanout rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer); } }
執行測驗類進行測驗:
@SpringBootTest class RabbitmqApplicationTests { @Autowired private OrderService orderService; @Test void directTest() throws InterruptedException { for (int i = 0; i < 10; i++) { Thread.sleep(1000); Long userId = 100L + i; Long productId = 10001L + i; int num = 10; orderService.makeOrderDirect(userId, productId, num); } } }
運行結果:
可以看到DirectQueue訊息佇列已經生成并存盤到對應的weixin路由Key的佇列中:



2.5.Direct模式訊息消費者

①創建對應的訊息佇列消費者類,使用@RabbitListener、@RabbitHandler進行監聽并系結訊息獲取結果,這部分與上面的Fanout模式消費者是一樣的:
//通過@RabbitListener系結佇列接收訊息 @RabbitListener(queues = {"weixin.direct.queue"}) @Component public class DirectDuanxinConsumer { //佇列中的訊息會通過@RabbitHandler注解注入到方法引數中,就可以獲取到佇列中的訊息 @RabbitHandler public void reviceMessage(String message){ System.out.println("duanxin direct queue----接收到了訂單資訊是:->" + message); } } @RabbitListener(queues = {"email.direct.queue"}) @Component public class DirectEmailConsumer { @RabbitHandler public void reviceMessage(String message){ System.out.println("email direct----接收到了訂單資訊是:->" + message); } } @RabbitListener(queues = {"sms.direct.queue"}) @Component public class DirectSMSConsumer { @RabbitHandler public void reviceMessage(String message){ System.out.println("sms direct----接收到了訂單資訊是:->" + message); } }
②啟動SpringBoot專案進行消費測驗:


可以看到訊息佇列中系結weixin端佇列收到了10條訊息,
3.基于SpringBoot注解類構建訊息佇列
使用注解方式實作訊息佇列主要是從消費者進行交換機與Queues佇列的系結關系建立,并使用@Component進行注入,可以比較簡單地處理交換機與佇列之間的系結關系,隨SpringBoot專案一啟動就同時創建Exchange與Queues佇列的關系,
下面總的說一下主要的注解:
//通過@RabbitListener系結佇列接收訊息 // bindings其實就是用來確定佇列和交換機系結關系 @RabbitListener(bindings = @QueueBinding( //佇列名字,系結對應的佇列接收訊息 value = https://www.cnblogs.com/yif0118/archive/2021/04/17/@Queue(value ="weixin.xxxType.queue", autoDelete = "false"), //交換機名字,必須和生產者中交換機名相同;指定系結的交換機型別 exchange = @Exchange(value = "https://www.cnblogs.com/yif0118/archive/2021/04/17/xxxType_order_exchange", type = ExchangeTypes.XXXType), key = "com.#" ))
//佇列中的訊息會通過@RabbitHandler注解注入到方法引數中,就可以獲取到佇列中的訊息 @RabbitHandler
3.1.Topic模式訊息消費者
topic模式這里從訊息消費者Springboot專案入手,優先創建出RabbitMQ上的訊息佇列與交換機進行系結,基于@RabbitListener與@QueueBinding會隨專案啟動自動創建訊息佇列:

//通過@RabbitListener系結佇列接收訊息 // bindings其實就是用來確定佇列和交換機系結關系 @RabbitListener(bindings = @QueueBinding( //佇列名字,系結對應的佇列接收訊息 value = https://www.cnblogs.com/yif0118/archive/2021/04/17/@Queue(value ="weixin.topic.queue", autoDelete = "false"), //交換機名字,必須和生產者中交換機名相同;指定系結的交換機型別 exchange = @Exchange(value = "https://www.cnblogs.com/yif0118/archive/2021/04/17/topic_order_exchange", type = ExchangeTypes.TOPIC), key = "com.#" )) @Component public class TopicDuanxinConsumer { //佇列中的訊息會通過@RabbitHandler注解注入到方法引數中,就可以獲取到佇列中的訊息 @RabbitHandler public void reviceMessage(String message){ System.out.println("duanxin topic----接收到了訂單資訊是:->" + message); } } @RabbitListener(bindings = @QueueBinding( //佇列名字,系結對應的佇列接收訊息 value = https://www.cnblogs.com/yif0118/archive/2021/04/17/@Queue(value ="email.topic.queue", autoDelete = "false"), //交換機名字,必須和生產者中交換機名相同;指定系結的交換機型別 exchange = @Exchange(value = "https://www.cnblogs.com/yif0118/archive/2021/04/17/topic_order_exchange", type = ExchangeTypes.TOPIC), key = "#.order.#" )) @Component public class TopicEmailConsumer { //佇列中的訊息會通過@RabbitHandler注解注入到方法引數中,就可以獲取到佇列中的訊息 @RabbitHandler public void reviceMessage(String message){ System.out.println("email topic----接收到了訂單資訊是:->" + message); } } @RabbitListener(bindings = @QueueBinding( //佇列名字,系結對應的佇列接收訊息 value = https://www.cnblogs.com/yif0118/archive/2021/04/17/@Queue(value ="sms.topic.queue", autoDelete = "false"), //交換機名字,必須和生產者中交換機名相同;指定系結的交換機型別 exchange = @Exchange(value = "https://www.cnblogs.com/yif0118/archive/2021/04/17/topic_order_exchange", type = ExchangeTypes.TOPIC), key = "*.course.*" )) @Component public class TopicSMSConsumer { //佇列中的訊息會通過@RabbitHandler注解注入到方法引數中,就可以獲取到佇列中的訊息 @RabbitHandler public void reviceMessage(String message){ System.out.println("sms topic----接收到了訂單資訊是:->" + message); } }
啟動SpringBoot消費者專案,進行驗證:



3.2.Topic模式訊息生產者
使用注解配置無需再創建對應的配置類Config來系結Exchange與Queues的關系了,
直接使用Sevice呼叫服務發送訊息即可,
①服務呼叫、向佇列中發送訊息:
@Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; // 1: 定義交換機 private String exchangeName = ""; // 2: 路由key private String routeKey = ""; //Topic型別交換機 public void makeOrderTopic(Long userId, Long productId, int num) { exchangeName = "topic_order_exchange"; routeKey = "com.course.user"; // 1: 模擬用戶下單 String orderNumer = UUID.randomUUID().toString(); // 2: 根據商品id productId 去查詢商品的庫存 // int numstore = productSerivce.getProductNum(productId); // 3:判斷庫存是否充足 // if(num > numstore ){ return "商品庫存不足..."; } // 4: 下單邏輯 // orderService.saveOrder(order); // 5: 下單成功要扣減庫存 // 6: 下單完成以后 System.out.println("用戶 " + userId + ",訂單編號是:" + orderNumer); // 發送訂單資訊給RabbitMQ fanout rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer); } }
②服務測驗:
@SpringBootTest class RabbitmqApplicationTests { @Autowired private OrderService orderService; @Test void topicTest() throws InterruptedException { for (int i = 0; i < 10; i++) { Thread.sleep(1000); Long userId = 100L + i; Long productId = 10001L + i; int num = 10; orderService.makeOrderTopic(userId, productId, num); } } }
訊息發送:

消費方consumer服務(消費者服務不停止)接收訊息:

本博客示例涉及代碼均已上傳至Github:
RabbitMQStudy
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/277289.html
標籤:其他
