一、五種模式詳解
1.簡單模式(Queue模式)
當生產端發送訊息到交換機,交換機根據訊息屬性發送到佇列,消費者監聽系結佇列實作訊息的接收和消費邏輯撰寫.簡單模式下,強調的一個佇列queue只被一個消費者監聽消費.
1.1 結構

- 生產者:生成訊息,發送到交換機
- 交換機:根據訊息屬性,將訊息發送給佇列
- 消費者:監聽這個佇列,發現訊息后,獲取訊息執行消費邏輯
1.2應用場景
常見的應用場景就是一發,一接的結構
例如:
- 手機短信
- 郵件單發
2.爭搶模式(Work模式)
強調的也是后端佇列與消費者系結的結構
2.1結構

- 生產者:發送訊息到交換機
- 交換機:根據訊息屬性將訊息發送給佇列
- 消費者:多個消費者,同時系結監聽一個佇列,之間形成了爭搶訊息的效果
2.2應用場景
- 搶紅包
- 資源分配系統
3.路由模式(Route模式 Direct定向)
從路由模式開始,關心的就是訊息如何到達的佇列,路由模式需要使用的交換機型別就是路由交換機(direct)
3.1 結構

- 生產端:發送訊息,在訊息中處理訊息內容,攜帶一個routingkey
- 交換機:接收訊息,根據訊息的routingkey去計算匹配后端佇列的routingkey
- 佇列:存盤交換機發送的訊息
- 消費端:簡單模式 作業爭搶
3.2應用場景
- 短信
- 聊天工具
- 郵箱,,
手機號/郵箱地址,都可以是路由key
4.發布訂閱模式(Pulish/Subscribe模式 Fanout廣播)
不計算路由的一種特殊交換機
4.1結構

4.2應用場景
- 訊息推送
- 廣告
5.主題模式(Topics模式 Tpoic通配符)
路由key值是一種多級路徑,中國.四川.成都.武侯區
5.1結構

-
生產端:攜帶路由key,發送訊息到交換機
-
佇列:系結交換機和路由不一樣,不是一個具體的路由key,而可以使用*和#代替一個范圍
| * | 字串,只能表示一級 |
| --- | --- |
| # | 多級字串 | -
交換機:根據匹配規則,將路由key對應發送到佇列
-
訊息路由key:
- 北京市.朝陽區.酒仙橋
- 北京市.#: 匹配true
- 上海市.浦東區.*: 沒匹配false
- 新疆.烏魯木齊.#
5.2 應用場景
- 做物流分揀的多級傳遞.
6.完整結構

二、代碼實作
1.創建SpringBoot工程
1.1 工程基本資訊

1.2 依賴資訊

1.3 組態檔applicasion.properties
# 應用名稱
spring.application.name=springboot-demo
# Actuator Web 訪問埠
management.server.port=8801
management.endpoints.jmx.exposure.include=*
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
# 應用服務 WEB 訪問埠
server.port=8801
######################### RabbitMQ配置 ########################
# RabbitMQ主機
spring.rabbitmq.host=127.0.0.1
# RabbitMQ虛擬主機
spring.rabbitmq.virtual-host=demo
# RabbitMQ服務埠
spring.rabbitmq.port=5672
# RabbitMQ服務用戶名
spring.rabbitmq.username=admin
# RabbitMQ服務密碼
spring.rabbitmq.password=admin
# RabbitMQ服務發布確認屬性配置
## NONE值是禁用發布確認模式,是默認值
## CORRELATED值是發布訊息成功到交換器后會觸發回呼方法
## SIMPLE值經測驗有兩種效果,其一效果和CORRELATED值一樣會觸發回呼方法,其二在發布訊息成功后使用rabbitTemplate呼叫waitForConfirms或waitForConfirmsOrDie方法等待broker節點回傳發送結果,根據回傳結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie方法如果回傳false則會關閉channel,則接下來無法發送訊息到broker;
spring.rabbitmq.publisher-confirm-type=simple
# RabbitMQ服務開啟訊息發送確認
spring.rabbitmq.publisher-returns=true
######################### simple模式配置 ########################
# RabbitMQ服務 訊息接收確認模式
## NONE:不確認
## AUTO:自動確認
## MANUAL:手動確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 指定最小的消費者數量
spring.rabbitmq.listener.simple.concurrency=1
# 指定最大的消費者數量
spring.rabbitmq.listener.simple.max-concurrency=1
# 開啟支持重試
spring.rabbitmq.listener.simple.retry.enabled=true
2.簡單模式
2.1 創建SimpleQueueConfig 簡單佇列配置類
package com.gmtgo.demo.simple;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 大帥
*/
@Configuration
public class SimpleQueueConfig {
/**
* 定義簡單佇列名.
*/
private final String simpleQueue = "queue_simple";
@Bean
public Queue simpleQueue() {
return new Queue(simpleQueue);
}
}
2.2 撰寫生產者
package com.gmtgo.demo.simple;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author 大帥
*/
@Slf4j
@Component
public class SimpleProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
for (int i = 0; i < 5; i++) {
String message = "簡單訊息" + i;
log.info("我是生產資訊:{}", message);
rabbitTemplate.convertAndSend( "queue_simple", message);
}
}
}
2.3 撰寫消費者
package com.gmtgo.demo.simple;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class SimpleConsumers {
@RabbitListener(queues = "queue_simple")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費資訊:{}", new String(message.getBody()));
}
}
2.4 撰寫訪問類
package com.gmtgo.demo.simple;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 大帥
*/
@RestController
@RequestMapping(value = "https://www.cnblogs.com/rabbitMq")
public class SimpleRabbitMqController {
@Autowired
private SimpleProducer simpleProducer;
@RequestMapping(value = "https://www.cnblogs.com/simpleQueueTest")
public String simpleQueueTest() {
simpleProducer.sendMessage();
return "success";
}
}
2.5 測驗啟動專案訪問 simpleQueueTest
- 訪問地址:http://127.0.0.1:8801/rabbitMq/simpleQueueTest
- 結果:

3.Work佇列
3.1 撰寫作業配置
package com.gmtgo.demo.work;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 大帥
*/
@Configuration
public class WorkQueueConfig {
/**
* 佇列名.
*/
private final String work = "work_queue";
@Bean
public Queue workQueue() {
return new Queue(work);
}
}
3.2 撰寫生產者
package com.gmtgo.demo.work;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author 大帥
*/
@Slf4j
@Component
public class WorkProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
for (int i = 0; i < 10; i++) {
String message = "作業訊息" + i;
log.info("我是生產資訊:{}", message);
rabbitTemplate.convertAndSend("work_queue", message);
}
}
}
3.3 撰寫消費者1
package com.gmtgo.demo.work;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class WorkConsumers1 {
@RabbitListener(queues = "work_queue")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費資訊1:{}", new String(message.getBody()));
}
}
3.4 撰寫消費者2
package com.gmtgo.demo.work;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class WorkConsumers2 {
@RabbitListener(queues = "work_queue")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費資訊2:{}", new String(message.getBody()));
}
}
3.5 撰寫測驗方法
package com.gmtgo.demo.work;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 大帥
*/
@RestController
@RequestMapping(value = "https://www.cnblogs.com/cgqplus/p/rabbitMq")
public class WorkRabbitMqController {
@Autowired
private WorkProducer workProducer;
@RequestMapping(value = "https://www.cnblogs.com/cgqplus/p/workQueueTest")
public String workQueueTest() {
workProducer.sendMessage();
return "success";
}
}
3.6 測驗啟動專案訪問 workQueueTest
- 訪問地址http://127.0.0.1:8801/rabbitMq/workQueueTest
- 結果:

控制臺列印,發現10條訊息 偶數條消費者1獲取,奇數條消費者2獲取,并且平均分配,
當然通過代碼實作按需分配,即誰的性能強,誰優先原則,實作負載均衡,
配置可控分配數
4. 發布訂閱模式(Publish/Subscibe模式)
訂閱模式–多個消費者監聽不同的佇列,但佇列都系結同一個交換機
4.1 撰寫訂閱配置類
package com.gmtgo.demo.fanout;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 大帥
*/
@Configuration
public class FanoutQueueConfig {
/**
* 宣告佇列名.
*/
private final String fanout1 = "fanout_queue_1";
private final String fanout2 = "fanout_queue_2";
/**
* 宣告交換機的名字.
*/
private final String fanoutExchange = "fanoutExchange";
/**
* 宣告佇列.
*
* @return
*/
@Bean
public Queue fanoutQueue1() {
return new Queue(fanout1);
}
@Bean
public Queue fanoutQueue2() {
return new Queue(fanout2);
}
/**
* 宣告交換機.
*/
@Bean
public FanoutExchange exchange() {
return new FanoutExchange(fanoutExchange);
}
/**
* 佇列系結交換機,也可在可視化工具中進行系結.
*
* @return
*/
@Bean
public Binding bindingFanoutQueue1(Queue fanoutQueue1, FanoutExchange exchange) {
return BindingBuilder.bind(fanoutQueue1).to(exchange);
}
@Bean
public Binding bindingFanoutQueue2(Queue fanoutQueue2, FanoutExchange exchange) {
return BindingBuilder.bind(fanoutQueue2).to(exchange);
}
}
4.2 撰寫訂閱生產者
package com.gmtgo.demo.fanout;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author 大帥
*/
@Slf4j
@Component
public class FanoutProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage() {
for (int i = 0; i < 5; i++) {
String message = "訂閱模式訊息" + i;
log.info("我是生產資訊:{}", message);
rabbitTemplate.convertAndSend("fanoutExchange", "", message);
}
}
}
4.3 撰寫訂閱消費者1
package com.gmtgo.demo.fanout;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class FanoutConsumers1 {
@RabbitListener(queues = "fanout_queue_1")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費資訊1:{}", new String(message.getBody()));
}
}
4.4 撰寫訂閱消費者2
package com.gmtgo.demo.fanout;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class FanoutConsumers2 {
@RabbitListener(queues = "fanout_queue_2")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費資訊2:{}", new String(message.getBody()));
}
}
4.5 撰寫測驗方法
package com.gmtgo.demo.fanout;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 大帥
*/
@RestController
@RequestMapping(value = "https://www.cnblogs.com/cgqplus/p/rabbitMq")
public class FanoutRabbitMqController {
@Autowired
private FanoutProducer fanoutProducer;
@RequestMapping(value = "https://www.cnblogs.com/cgqplus/p/fanoutQueueTest")
public String fanoutQueueTest() {
fanoutProducer.sendMessage();
return "success";
}
}
3.6 測驗啟動專案訪問 fanoutQueueTest
- 訪問地址http://127.0.0.1:8801/rabbitMq/fanoutQueueTest
- 結果:

控制臺列印 ,發現兩個系結了不同佇列的消費者都接受到了同一條訊息查看RabbitMq 服務器:



5. 路由模式(Route模式 Direct定向)
5.1 撰寫路由配置類
package com.gmtgo.demo.direct;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 大帥
*/
@Configuration
public class DirectQueueConfig {
/**
* 宣告佇列名.
*/
private final String direct1 = "direct_queue_1";
private final String direct2 = "direct_queue_2";
/**
* 宣告交換機的名字.
*/
private final String directExchange = "directExchange";
/**
* 宣告佇列.
*
* @return
*/
@Bean
public Queue directQueue1() {
return new Queue(direct1);
}
@Bean
public Queue directQueue2() {
return new Queue(direct2);
}
/**
* 宣告路由交換機.
*
* @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange(directExchange);
}
/**
* 佇列系結交換機,指定routingKey,也可在可視化工具中進行系結.
*
* @return
*/
@Bean
Binding bindingDirectExchange1(Queue directQueue1, DirectExchange exchange) {
return BindingBuilder.bind(directQueue1).to(exchange).with("update");
}
/**
* 佇列系結交換機,指定routingKey,也可在可視化工具中進行系結.
*
* @return
*/
@Bean
Binding bindingDirectExchange2(Queue directQueue2, DirectExchange exchange) {
return BindingBuilder.bind(directQueue2).to(exchange).with("add");
}
}
5.2 撰寫生產者
package com.gmtgo.demo.direct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author 大帥
*/
@Slf4j
@Component
public class DirectProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessageA() {
for (int i = 0; i < 5; i++) {
String message = "路由模式--routingKey=update訊息" + i;
log.info("我是生產資訊:{}", message);
rabbitTemplate.convertAndSend("directExchange", "update", message);
}
}
public void sendMessageB() {
for (int i = 0; i < 5; i++) {
String message = "路由模式--routingKey=add訊息" + i;
log.info("我是生產資訊:{}", message);
rabbitTemplate.convertAndSend("directExchange", "add", message);
}
}
}
5.3 撰寫消費者1
package com.gmtgo.demo.direct;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class DirectConsumers1 {
@RabbitListener(queues = "direct_queue_1")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費資訊1:{}", new String(message.getBody()));
}
}
5.4 撰寫消費者2
package com.gmtgo.demo.direct;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class DirectConsumers2 {
@RabbitListener(queues = "direct_queue_2")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費資訊2:{}", new String(message.getBody()));
}
}
5.5 撰寫訪問類
package com.gmtgo.demo.direct;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 大帥
*/
@RestController
@RequestMapping(value = "https://www.cnblogs.com/cgqplus/p/rabbitMq")
public class DirectRabbitMqController {
@Autowired
private DirectProducer directProducer;
@RequestMapping(value = "https://www.cnblogs.com/cgqplus/p/directQueueTest1")
public String directQueueTest1() {
directProducer.sendMessageA();
return "success";
}
@RequestMapping(value = "https://www.cnblogs.com/cgqplus/p/directQueueTest2")
public String directQueueTest2() {
directProducer.sendMessageB();
return "success";
}
}
5.6 測驗啟動專案訪問directQueueTest1 , directQueueTest2
- 訪問地址http://127.0.0.1:8801/rabbitMq/directQueueTest1
- 訪問地址http://127.0.0.1:8801/rabbitMq/directQueueTest2
- 結果:
- directQueueTest1:
- directQueueTest2:
- directQueueTest1:
6. 主題模式(Topics模式 Tpoic通配符)
6.1 撰寫路由配置類
package com.gmtgo.demo.topic;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 大帥
*/
@Configuration
public class TopicQueueConfig {
/**
* 宣告佇列名.
*/
private final String topic1 = "topic_queue_1";
private final String topic2 = "topic_queue_2";
/**
* 宣告交換機的名字.
*/
private final String topicExchange = "topicExchange";
/**
* 宣告佇列.
*
* @return
*/
@Bean
public Queue topicQueue1() {
return new Queue(topic1);
}
@Bean
public Queue topicQueue2() {
return new Queue(topic2);
}
/**
* 宣告路由交換機.
*
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(topicExchange);
}
/**
* 佇列系結交換機,指定routingKey,也可在可視化工具中進行系結.
*
* @return
*/
@Bean
Binding bindingTopicExchange1(Queue topicQueue1, TopicExchange exchange) {
return BindingBuilder.bind(topicQueue1).to(exchange).with("topic.keyA");
}
/**
* 佇列系結交換機,指定routingKey,也可在可視化工具中進行系結.
* 系結的routing key 也可以使用通配符:
* *:匹配不多不少一個詞
* #:匹配一個或多個詞
*
* @return
*/
@Bean
Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange exchange) {
return BindingBuilder.bind(topicQueue2).to(exchange).with("topic.#");
}
}
6.2 撰寫生產者
package com.gmtgo.demo.topic;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author 大帥
*/
@Slf4j
@Component
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessageA() {
for (int i = 0; i < 5; i++) {
String message = "通配符模式--routingKey=topic.keyA訊息" + i;
log.info("我是生產資訊:{}", message);
rabbitTemplate.convertAndSend("topicExchange", "topic.keyA", message);
}
}
public void sendMessageB() {
for (int i = 0; i < 5; i++) {
String message = "通配符模式--routingKey=topic.#訊息" + i;
log.info("我是生產資訊:{}", message);
rabbitTemplate.convertAndSend("topicExchange", "topic.keyD.keyE", message);
}
}
}
6.3 撰寫消費者1
package com.gmtgo.demo.topic;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class TopicConsumers1 {
@RabbitListener(queues = "topic_queue_1")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費資訊1:{}",new String(message.getBody()));
}
}
6.4 撰寫消費者2
package com.gmtgo.demo.topic;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author 大帥
*/
@Slf4j
@Component
public class TopicConsumers2 {
@RabbitListener(queues = "topic_queue_2")
public void readMessage(Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("我是消費資訊2:{}",new String(message.getBody()));
}
}
6.5 撰寫訪問類
package com.gmtgo.demo.topic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 大帥
*/
@RestController
@RequestMapping(value = "https://www.cnblogs.com/cgqplus/p/rabbitMq")
public class TopicRabbitMqController {
@Autowired
private TopicProducer topicProducer;
@RequestMapping(value = "https://www.cnblogs.com/cgqplus/p/topicQueueTest1")
public String topicQueueTest1() {
topicProducer.sendMessageA();
return "success";
}
@RequestMapping(value = "https://www.cnblogs.com/cgqplus/p/topicQueueTest2")
public String topicQueueTest2() {
topicProducer.sendMessageB();
return "success";
}
}
6.6 測驗啟動專案訪問topicQueueTest1 , topicQueueTest2
- 訪問地址http://127.0.0.1:8801/rabbitMq/topicQueueTest1
- 訪問地址http://127.0.0.1:8801/rabbitMq/topicQueueTest2
- 結果:
- topicQueueTest1,兩個消費者都能消費
- topicQueueTest2,只有消費者2 可以消費

- topicQueueTest1,兩個消費者都能消費
至此,五種佇列的實作已結束!
7. 實作生產者訊息確認
7.1 組態檔
######################### RabbitMQ配置 ########################
# RabbitMQ主機
spring.rabbitmq.host=127.0.0.1
# RabbitMQ虛擬主機
spring.rabbitmq.virtual-host=demo
# RabbitMQ服務埠
spring.rabbitmq.port=5672
# RabbitMQ服務用戶名
spring.rabbitmq.username=admin
# RabbitMQ服務密碼
spring.rabbitmq.password=admin
# RabbitMQ服務發布確認屬性配置
## NONE值是禁用發布確認模式,是默認值
## CORRELATED值是發布訊息成功到交換器后會觸發回呼方法
## SIMPLE值經測驗有兩種效果,其一效果和CORRELATED值一樣會觸發回呼方法,其二在發布訊息成功后使用rabbitTemplate呼叫waitForConfirms或waitForConfirmsOrDie方法等待broker節點回傳發送結果,根據回傳結果來判定下一步的邏輯,要注意的點是waitForConfirmsOrDie方法如果回傳false則會關閉channel,則接下來無法發送訊息到broker;
spring.rabbitmq.publisher-confirm-type=simple
# 連接超時時間
spring.rabbitmq.connection-timeout=20000
# RabbitMQ服務開啟訊息發送確認
spring.rabbitmq.publisher-returns=true
######################### simple模式配置 ########################
# RabbitMQ服務 訊息接收確認模式
## NONE:不確認
## AUTO:自動確認
## MANUAL:手動確認
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 指定最小的消費者數量
spring.rabbitmq.listener.simple.concurrency=1
# 指定最大的消費者數量
spring.rabbitmq.listener.simple.max-concurrency=1
# 每次只消費一個訊息
spring.rabbitmq.listener.simple.prefetch=1
# 開啟支持重試
spring.rabbitmq.listener.simple.retry.enabled=true
# 啟用強制資訊,默認為false
spring.rabbitmq.template.mandatory=true
7.2 撰寫訊息發送確認類 RabbitConfirmCallback
package com.gmtgo.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* @author 大帥
*/
@Slf4j
public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("=======ConfirmCallback=========");
log.info("correlationData {} " , correlationData);
log.info("ack = {}" , ack);
log.info("cause = {}" , cause);
log.info("=======ConfirmCallback=========");
}
}
7.3 撰寫訊息發送交換機回傳機制RabbitConfirmReturnCallBack
package com.gmtgo.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* @author 大帥
*/
@Slf4j
public class RabbitConfirmReturnCallBack implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("--------------ReturnCallback----------------");
log.info("message = " + message);
log.info("replyCode = {}", replyCode);
log.info("replyText = {}", replyText);
log.info("exchange = {}", exchange);
log.info("routingKey = {}", routingKey);
log.info("--------------ReturnCallback----------------");
}
}
7.4 RabbitMQ配置
在我們的rabbit佇列配置類里設定RabbitTemplate
舉例:
package com.gmtgo.demo.topic;
import com.gmtgo.demo.config.RabbitConfirmCallback;
import com.gmtgo.demo.config.RabbitConfirmReturnCallBack;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
* @author 大帥
*/
@Configuration
public class TopicQueueConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void initRabbitTemplate() {
// 設定生產者訊息確認
rabbitTemplate.setConfirmCallback(new RabbitConfirmCallback());
rabbitTemplate.setReturnCallback(new RabbitConfirmReturnCallBack());
}
/**
* 宣告佇列名.
*/
private final String topic1 = "topic_queue_1";
private final String topic2 = "topic_queue_2";
/**
* 宣告交換機的名字.
*/
private final String topicExchange = "topicExchange";
/**
* 宣告佇列.
*
* @return
*/
@Bean
public Queue topicQueue1() {
return new Queue(topic1);
}
@Bean
public Queue topicQueue2() {
return new Queue(topic2);
}
/**
* 宣告路由交換機.
*
* @return
*/
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(topicExchange);
}
/**
* 佇列系結交換機,指定routingKey,也可在可視化工具中進行系結.
*
* @return
*/
@Bean
Binding bindingTopicExchange1(Queue topicQueue1, TopicExchange exchange) {
return BindingBuilder.bind(topicQueue1).to(exchange).with("topic.keyA");
}
/**
* 佇列系結交換機,指定routingKey,也可在可視化工具中進行系結.
* 系結的routing key 也可以使用通配符:
* *:匹配不多不少一個詞
* #:匹配一個或多個詞
*
* @return
*/
@Bean
Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange exchange) {
return BindingBuilder.bind(topicQueue2).to(exchange).with("topic.#");
}
}
啟動專案發送訊息,訊息被正常消費,confim回呼回傳ack=true如果我們將exchange修改,發送到一個不存在的exchange中,會怎么樣呢?
會發現confirm回呼為false,列印出結果為不存在topicExchange1111的交換機

如果我們在消費端處理邏輯時出錯會怎么樣呢?修改消費端代碼我們在消費時讓它報錯

confirm回呼為true,但是在rabbitmq的web界面會發現存在5條沒有消費的訊息

如果我們把
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
中最后一個引數改為false呢,會發現在web管理界面沒有未被消費的訊息,說明這條訊息已經被摒棄,
實際開發中,到底是打回到佇列呢還是摒棄,要看自己的需求,但是打回佇列應該有次數限制,不然會陷入死回圈,
繼續測驗,將routingKey修改為一個沒有的key,
7.5 結論
- 如果訊息沒有到exchange,則confirm回呼,ack=false
- 如果訊息到達exchange,則confirm回呼,ack=true
- exchange到queue成功,則不回呼return
- exchange到queue失敗,則回呼return
8. 專案示例代碼:
下載地址:springboot-rabbitmq-demo_1619322789961
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/280153.html
標籤:Java
上一篇:簡單聊聊記憶體逃逸 | 劍指offer - golang
下一篇:ArryaList原始碼淺析



