一、訊息佇列MQ概述
MQ全稱為Message Queue,訊息佇列是應用程式和應用程式之間的通信方法,
1.1 為什么使用MQ
在專案中,可將一些無需即時回傳且耗時的操作提取出來,進行異步處理,而這種異步處理的方式大大的節省了服務器的請求回應時間,從而提高了系統的吞吐量,
開發中訊息佇列通常有如下優點:
**(1) 異步提速: **異步處理任務,將不需要同步處理且耗時長的操作交由訊息佇列進行異步處理,大大提高了應用程式的處理時間,
**(2) 應用解耦: **應用程式解耦合,MQ充當中介,生產方通過MQ與消費方互動,它將應用程式解耦合了
**(3) 削峰填谷: **在訪問量劇增的情況下,應用仍然需要繼續發揮作用,但是這樣的突發流量并不常見;如果為以能處理這類峰值訪問為標準,來投入資源隨時待命無疑是巨大的浪費,使用MQ能夠使關鍵組件頂住突發的訪問壓力,而不會因為突發的超負荷請求而完全崩潰,
**(4) 可恢復性: **系統的一部分組件失效時,不會影響到整個系統,MQ降低了程式間的耦合度,所以即使一個處理訊息的掛掉,加入佇列中的訊息仍然可以在系統恢復后被再次被改程式處理,
(5) 排序保證: 訊息佇列可以控制資料處理的順序,因為訊息佇列本身使用的是佇列這個資料結構,FIFO(先進先出),在一些場景中,資料處理的順序很重要,比如商品下單、搶票、秒殺…等,
1.2. 訊息佇列產品
市場上常見的訊息佇列有如下:

1.3. AMQP 和 JMS
Dubbo協議:Dubbo 預設協議采用單一長連接和 NIO 異步通訊,適合于小資料量大并發的服務呼叫,以及服務消費者機器數遠大于服務提供者機器數的情況,
HTTP協議(HyperText Transfer Protocol,超文本傳輸協議)是因特網上應用最為廣泛的一種網路傳輸協議,所有的WWW檔案都必須遵守這個標準,
MQ是訊息通信的模型;實作MQ的大致有兩種主流方式:AMQP、JMS,
1.3.1. AMQP
AMQP,即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高級訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中間件設計,
1.3.2. JMS
JMS即Java訊息服務(JavaMessage Service)應用程式介面,是一個Java平臺中關于面向訊息中間件的API,用于在兩個應用程式之間,或分布式系統中發送訊息,進行異步通信,
1.3.3. AMQP 與 JMS 區別
JMS是定義了統一的介面,來對訊息操作進行統一;AMQP是通過規定協議來統一資料互動的格式JMS限定了必須使用Java語言;AMQP只是協議,不規定實作方式,因此是跨語言的,JMS規定了兩種訊息模式;而AMQP的訊息模式更加豐富.
| JMS | AMQP | |
|---|---|---|
| 定義 | Java api | Wire-protocol |
| 跨語言 | 否 | 是 |
| 跨平臺 | 否 | 是 |
1.4. RabbitMQ
RabbitMQ是由erlang語言開發,基于AMQP(Advanced Message Queue 高級訊息佇列協議)協議實作的訊息佇列產品,它是一種應用程式之間的通信方法,訊息佇列在分布式系統開發中應用非常廣泛,
RabbitMQ官方地址:http://www.rabbitmq.com/
RabbitMQ提供了6種模式:Hello Word簡單模式,work作業模式,Publish/Subscribe發布與訂閱模式,Routing路由模式,Topics主題模式(通配符模式),RPC遠程呼叫模式(遠程呼叫,不太算MQ;不作介紹)
官網對應模式介紹:https://www.rabbitmq.com/getstarted.html
應用場景:
1、雙十一商品秒殺/搶票功能實作
我們在雙11的時候,當我們凌晨大量的秒殺和搶購商品,然后去結算的時候,就會發現,界面會提醒我們,讓我們稍等,以及一些友好的圖片文字提醒,而不是像前幾年的時代,動不動就頁面卡死,報錯等來呈現給用戶,
2、積分兌換(積分可用于多平臺)
積分兌換模塊,有一個公司多個部門都要用到這個模塊,這時候就可以通過訊息佇列解耦這個特性來實作, 各部門系統做各部門的事,但是他們都可以用這個積分系統進行商品的兌換等,其他模塊與積分模塊完全解耦,
3、大平臺用戶注冊
發送郵件、用戶大資料分析操作等 基于同步變異步功能實作
用戶注冊真實操作步驟:
- 用戶注冊選擇的興趣標簽,根據用戶的屬性,行為進行用戶分析,計算出推薦內容
- 注冊后可能需要發送郵件給用戶
- 發送短信給用戶
- 發送給用戶指南的系統通知
- …等等
正常情況注冊,不出現高并發,假如有大量的用戶注冊,發生了高并發,就會出現如下情況:
郵件介面承受不住,或是分析資訊時的大量計算使 cpu 滿載,這將會出現,雖然用戶資料記錄很快的添加到資料庫中了,但是執行流程卻卡在發郵件或分析用戶資訊的情況,導致請求的回應時間大幅增長,甚至出現超時,這就有點不劃算了,面對這種情況一般也是將這些操作放入訊息佇列(生產者消費者模型),訊息佇列慢慢的進行處理,同時可以很快的完成注冊請求,不會影響用戶使用其他功能,
1.5 相關定義:
- **Connection:**publisher/consumer 和 broker 之間的 TCP 連接
- Channel: 訊息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務
- Exchange: 訊息交換機,它指定訊息按什么規則,路由到哪個佇列
- Queue: 訊息佇列載體,每個訊息都會被投入到一個或多個佇列
- VHost: 虛擬主機,一個broker里可以開設多個vhost,用作不同用戶的權限分離,
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-X9lFeYaO-1646011259625)(RabbitMQ%E8%AE%B2%E4%B9%89.assets/image-20191226053108367.png)]
由Exchange、Queue、RoutingKey三個才能決定一個訊息從Exchange到Queue的唯一的線路,
二、安裝及配置RabbitMQ
2.1. docker中安裝RabbitMQ
- 下載鏡像
docker pull rabbitmq:management
- 創建容器
docker run -di --name=my_rabbitmq -p 5671:5671 -p 5672:5672 -p4369:4369 -p 15671:15671 -p 15672:15672 -p 25672:25672 rabbitmq:management
解釋如下:
15672 (if management plugin is enabled.管理界面 )
15671 management監聽埠
5672, 5671 (AMQP 0-9-1 without and with TLS 訊息佇列協議是一個訊息協議)
4369 (epmd) epmd 代表 Erlang 埠映射守護行程
25672 (Erlang distribution)
- 訪問后臺
瀏覽器中輸入地址
http://192.168.200.128:15672/
- 設定容器開機自動啟動
docker update --restart=always 容器ID
2.2. 用戶以及Virtual Hosts配置
2.2.1. 用戶角色
RabbitMQ在安裝好后,可以訪問http://192.168.200.128:15672;其自帶了guest/guest的用戶名和密碼;如果需要創建自定義用戶;那么也可以登錄管理界面后,如下操作:
![[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-saPtEctI-1646011259627)(RabbitMQ%E8%AE%B2%E4%B9%89.assets/image-20191224130727629.png)]](https://img.uj5u.com/2022/03/01/302354010713202.png)

角色說明:
-
超級管理員(administrator):可登陸管理控制臺,可查看所有的資訊,并且可以對用戶,策略(policy)進行操作,
-
監控者(monitoring):可登陸管理控制臺,同時可以查看rabbitmq節點的相關資訊(行程數,記憶體使用情況,磁盤使用情況等)
-
策略制定者(policymaker):可登陸管理控制臺, 同時可以對策略(policy)進行管理,但無法查看節點及其相關資訊(上圖紅框標識的部分),
-
普通管理者(management):僅可登陸管理控制臺,無法看到節點資訊,也無法對策略進行管理,
-
其他無法登陸管理控制臺,通常就是普通的生產者和消費者,
2.2.2. Virtual Hosts配置
RabbitMQ的權限管理;在RabbitMQ中可以虛擬訊息服務器Virtual Host,每個Virtual Hosts相當于一個相對獨立的RabbitMQ服務器,每個VirtualHost之間是相互隔離的,exchange、queue、message不能互通,Virtual Name一般以/開頭,
-
創建Virtual Hosts
![[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-3jM3gVYP-1646011259627)(RabbitMQ%E8%AE%B2%E4%B9%89.assets/image-20191224130914131.png)]](https://img.uj5u.com/2022/03/01/302354010713204.png)
-
設定Virtual Hosts權限
![[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-eX6CwigY-1646011259627)(RabbitMQ%E8%AE%B2%E4%B9%89.assets/image-20191224131458585.png)]](https://img.uj5u.com/2022/03/01/302354010713205.png)

2.2.3 添加佇列
![[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-zX6nS33q-1646011259628)(RabbitMQ%E8%AE%B2%E4%B9%89.assets/image-20191224131854869.png)]](https://img.uj5u.com/2022/03/01/302354010713207.png)
持久化:如果選durable,則佇列訊息自動持久化到磁盤上,如果選transient,則不會持久化;
自動洗掉:默認值no,如果yes,則在訊息佇列沒有使用的情況下,佇列自行洗掉,
2.2.4 添加交換機
![[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-EZz0e1Qg-1646011259628)(RabbitMQ%E8%AE%B2%E4%B9%89.assets/image-20191224132241282.png)]](https://img.uj5u.com/2022/03/01/302354010713208.png)
自動洗掉:默認值no,如果是yes,則在將所有佇列與交換機取消系結之后,交換機將自動洗掉,
交換機型別:
- fanout:廣播型別
- direct:路由型別
- topic:通配符型別,基于訊息的路由鍵路由
- headers:通配符型別,基于訊息的header路由
內部交換器:默認值no,如果是yes,訊息無法直接發送到該交換機,必須通過交換機的轉發才能到達次交換機,本交換機只能與交換機系結,
2.2.5 佇列與交換機進行系結


三、Spring Boot整合RabbitMQ
3.1. 簡介
在spring boot專案中,只需要引入start-amqp起步依賴,即可整合RabbitMQ成功;我們基于SpringBoot封裝的RabbitTemplate模板物件,可以非常方便的發送訊息,接收訊息(使用注解),
amqp的官方GitHub地址:https://github.com/spring-projects/spring-amqp
一般在開發程序中,我們有兩個角色:
[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-LDbo5j8z-1646011259630)(RabbitMQ%E8%AE%B2%E4%B9%89.assets/image-20191224123350539.png)]
3.2. 搭建步驟:
1、創建父工程:
2、生產者工程:
- 創建SpringBoot工程:rabbitmq-producer
- 勾選起步依賴坐標:spring for RabbitMQ
- 配置RabbitMQ:服務host地址及埠、虛擬主機、服務賬戶密碼
3、消費者工程:
- 創建SpringBoot工程:rabbitmq-consumer
- 勾選起步依賴坐標:spring for RabbitMQ
- 配置RabbitMQ:服務host地址及埠、虛擬主機、服務賬戶密碼
3.3. 搭建程序:
3.3.1 創建父工程:
創建生產者工程:rabbitmq-producer(創建程序不在贅述)
創建消費者工程:rabbitmq-consumer(創建程序不在贅述)

3.3.2 搭建生產者工程
1、創建工程
創建SpringBoot的生產者工程:rabbitmq-producer

2、起步依賴坐標
pom.xml檔案內容為如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>rabbitmq-producer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq-producer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<releases>
<enabled>false</enabled>
</releases>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
</pluginRepositories>
</project>
3、啟動類
@SpringBootApplication
public class SpringbootRabbitmqProducerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqProducerApplication.class, args);
}
}
4、配置RabbitMQ
1)組態檔application.properties,內容如下:
# RabbitMQ 服務host地址
spring.rabbitmq.host=192.168.200.128
# 埠
spring.rabbitmq.port=5672
# 虛擬主機地址
spring.rabbitmq.virtual-host=/wwg
# rabbit服務的用戶名
spring.rabbitmq.username=admin
# rabbit服務的密碼
spring.rabbitmq.password=admin
3.3.3 搭建消費者工程
1、創建工程
創建SpringBoot的消費者工程:rabbitmq-consumer

2、勾選起步依賴坐標
pom.xml檔案內容為如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>rabbitmq-consumer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>rabbitmq-consumer</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<releases>
<enabled>false</enabled>
</releases>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<releases>
<enabled>false</enabled>
</releases>
</pluginRepository>
</pluginRepositories>
</project>
3、啟動類
@SpringBootApplication
public class SpringbootRabbitmqConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootRabbitmqConsumerApplication.class, args);
}
}
4、配置RabbitMQ
application.properties,內容如下:
# RabbitMQ 服務host地址
spring.rabbitmq.host=192.168.200.128
# 埠
spring.rabbitmq.port=5672
# 虛擬主機地址
spring.rabbitmq.virtual-host=/wwg
# rabbit服務的用戶名
spring.rabbitmq.username=admin
# rabbit服務的密碼
spring.rabbitmq.password=admin
四、RabbitMQ五種作業模式【重要】
4.1 Hello World簡單模式
4.1.1 什么是簡單模式

4.1.2 RabbitMQ管理界面操作
-
創建simple_queue佇列用于演示Hello World簡單模式
-
點擊
simple_queue可以進入到這個queue的管理界面 -
點擊
Get Message按鈕可以獲取查看佇列中的訊息
4.1.3 生產者代碼
- rabbitmq-producer專案測驗代碼如下:
package com.sg.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo01TestSimpleQueue {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
//測驗簡單模式
public void testSimple() {
//向訊息佇列發送一條簡單訊息
/**
* 引數1:訊息佇列名稱
* 引數2:訊息內容
*/
rabbitTemplate.convertAndSend("queue1", "hello !");
}
}
4.1.4 消費者代碼
- rabbitmq-consumer專案創建監聽器:
package com.sg.rabbitmq.simple;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消費者,接收訊息佇列訊息監聽器
* 必須將當前監聽器物件注入Spring的容器中
*/
@Component
public class SimpleListener {
@RabbitListener(queues = "queue1")
public void simpleHandler(String msg){
System.out.println("=====接收訊息====>"+msg);
}
}
然后啟動SpringbootRabbitmqConsumerApplication, 就可以接收到RabbitMQ服務器發送來的訊息


4.2 Work queues作業佇列模式
4.2.1 什么是作業佇列模式

4.2.2 RabbitMQ管理界面操作
- 創建
work_queue佇列用于演示work作業佇列模式
4.2.3 生產者代碼
rabbitmq-producer專案測驗代碼如下:
package com.sg.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo02TestWorkQueue {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
//測驗作業佇列模式(一個生產者多個消費者) 測驗結果: 每條訊息只能被一個消費者消費(競爭關系)
public void testWorkQueue() {
for (int i = 0; i < 1000; i++) {
rabbitTemplate.convertAndSend("work_queue", "hello 【" + i + "】");
}
}
}
4.2.4 消費者代碼
- rabbitmq-consumer專案創建監聽器1:
package com.sg.rabbitmq.work;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkListener1 {
@RabbitListener(queues = "work_queue")
public void work_queue1(String msg, Channel channel, Message message) throws IOException {
System.out.println("=====接收訊息1====>"+msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
@RabbitListener(queues = "work_queue")
public void work_queue2(String msg){
System.out.println("=====接收訊息2====>"+msg);
}
}

注意:兩個消費者為競爭關系,消費者1收到的訊息消費者2不會收到
4.3 三種模式概覽
![[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-JhH9ghBw-1646011259632)(RabbitMQ%E8%AE%B2%E4%B9%89.assets/image-20191205102917088.png)]](https://img.uj5u.com/2022/03/01/3023540107132019.png)
而在訂閱模型中,多了一個exchange角色,而且程序略有變化:
- P:生產者,也就是要發送訊息的程式,但是不再發送到佇列中,而是發給X(交換機)
- C:消費者,訊息的接受者,會一直等待訊息到來,
- Queue:訊息佇列,接收訊息、快取訊息,
- Exchange:交換機,圖中的X,一方面,接收生產者發送的訊息,另一方面,知道如何處理訊息,例如遞交給某個特別佇列、遞交給所有佇列、或是將訊息丟棄,到底如何操作,取決于Exchange的型別,
Exchange有常見以下3種型別:
- Fanout:廣播 將訊息交給所有系結到交換機的佇列,沒有路由鍵,訊息佇列需要系結交換機,fanout 型別交換機轉發訊息是最快的,廣播模式說的就是Fanout型別交換機
- Direct:定向 把訊息定向交給訊息佇列,通過路由鍵(routing key)指定,訊息佇列需要系結交換機,要求,發送訊息的路由鍵與交換機的路由鍵完全匹配,路由模式說的就是 direct 型別交換機,
- Topic:主題(通配符) 把訊息通過通配方式交給訊息佇列,通過路由鍵(routing key)指定,訊息佇列需要系結交換機,要求發送訊息的路由鍵滿足交換機路由鍵的通配符條件,通配符模式說的就是topic模式交換機,
- 其中通配符模式使用了2個通配符,
#號和*號, - 符號
#匹配一個或多個單詞,用.分隔 - 符號
*匹配不多不少一個單詞 - 因此
audit.#能夠匹配到audit.irs.corporate,但是audit.*只會匹配到audit.irs,
- 其中通配符模式使用了2個通配符,
注意:Exchange(交換機)只負責轉發訊息,不具備存盤訊息的能力,因此如果沒有任何佇列與Exchange系結,或者沒有符合路由規則的佇列,那么訊息會丟失
4.4 Publish/Subscribe發布與訂閱模式
4.4.1 什么是發布訂閱模式
![[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-5RWsB0lT-1646011259632)(RabbitMQ%E8%AE%B2%E4%B9%89.assets/image-20191205102917088-1577312524234.png)]](https://img.uj5u.com/2022/03/01/3023540107132020.png)
4.4.2 RabbitMQ管理界面操作
- 創建兩個佇列 fanout_queue1和 fanout_queue2(創建步驟請看上面)

-
創建Exchange交換器
fanout_exchange(創建步驟請看上面)

-
將創建的fanout_exchange交換器和 fanout_queue1, fanout_queue2佇列系結(系結步驟請看上面)

4.4.3 生產者代碼
- rabbitmq-producer專案測驗代碼如下:
package com.sg.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* 目標:將訊息發送給交換機,通過交換機廣播給訊息佇列,路由鍵為空字串
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo03TestPublishAndSubscribe {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
//測驗廣播方式 測驗結果: 兩個訊息佇列均會接收全部訊息
public void testFanout() {
for (int i = 0; i < 100; i++) {
/**
* 引數1:交換機名稱
* 引數2:路由鍵
* 引數3:訊息內容
*/
rabbitTemplate.convertAndSend("fanout_ex", "", "hello 【" + i + "】!");
}
}
}
4.4.4 消費者代碼
- rabbitmq-consumer專案創建監聽器:
package com.sg.rabbitmq.pubandsub;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class PubAndSubListener1 {
@RabbitListener(queues = "fanout_queue1")
public void fanout1(String msg, Channel channel, Message message) throws IOException {
System.out.println("=====fanout_queue1====>"+msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
@RabbitListener(queues = "fanout_queue2")
public void fanout2(String msg){
System.out.println("=====fanout_queue2====>"+msg);
}
}
結果:fanout_queue1和fanout_queue2均會收到相同資料

4.5 Routing路由模式
4.5.1 什么是路由模式

4.5.2 RabbitMQ管理界面操作
創建兩個佇列分別叫做 routing_queue1 和 routing_queue2
創建交換器 routing_exchange , 型別為 direct , 用于演示路由模式

設定系結: 將創建的交換器routing_exchange 和 routing_queue1, routing_queue2 系結在一起, 路由鍵Routing Key分別為 info 和 error;


4.5.3 生產者代碼
- rabbitmq-producer專案測驗代碼如下:
package com.sg.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* 目標:將訊息發送給交換機,通過交換機路由給指定的訊息佇列,通過路由鍵類指定
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo04TestRoutingModel {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
//測驗路由方式 測驗結果: 根據routingKey的設定,不同的消費者進行消費
public void testDirect() {
for (int i = 0; i < 100; i++) {
if (i % 2 == 0) {
rabbitTemplate.convertAndSend("routing_ex", "info", "hello 【" + i + "】!");
} else {
rabbitTemplate.convertAndSend("routing_ex", "error", "hello 【" + i + "】!");
}
}
}
}
4.5.4 消費者代碼
- rabbitmq-consumer專案創建監聽器:
package com.sg.rabbitmq.routing;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 路由模式:訊息佇列接收監聽器1,接收來自路由模式發送的訊息
*/
@Component
@RabbitListener(queues = "routing_queue1")
public class RoutingListener1 {
@RabbitHandler
public void routingHandler(String msg){
System.out.println("=====路由模式訊息接收監聽器【1】=====>"+msg);
}
}
- rabbitmq-consumer專案創建監聽器:
package com.sg.rabbitmq.routing;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 路由模式:訊息佇列接收監聽器2,接收來自路由模式發送的訊息
*/
@Component
public class RoutingListener2 {
@RabbitListener(queues = "routing_queue2")
public void routingInfo(String msg, Channel channel, Message message) throws IOException {
System.out.println("=====routingInfo====>"+msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
@RabbitListener(queues = "routing_queue1")
public void routingError(String msg){
System.out.println("=====routingError====>"+msg);
}
}
結果:根據routing_key訊息被不同消費者消費

4.6 Topics通配符模式(主題模式)
4.6.1 什么是通配符(主題)模式
Topic型別與Direct相比,都是可以根據RoutingKey把訊息路由到不同的佇列,只不過Topic型別Exchange可以讓佇列在系結Routing key的時候使用通配符!
Routingkey: 一般都是有一個或多個單詞組成,多個單詞之間以”.”分割,例如:item.insert
通配符規則:
#:匹配一個或多個詞,多個詞用點號分隔
*:匹配不多不少恰好1個詞
舉例:
item.#: 能夠匹配item.insert.abc.bbc或者item.insert
**item.*:**只能匹配item.insert
![[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-3BRwhftI-1646011259635)(RabbitMQ%E8%AE%B2%E4%B9%89.assets/image-20191205104428234.png)]](https://img.uj5u.com/2022/03/01/3023540107132031.png)
4.6.2 RabbitMQ管理界面操作
創建佇列 topic_queue1 和 topic_queue1!

創建交換器 topic_exchange , type型別為 topic

-
設定系結:
?
topic_queue1系結的Routing Key路由鍵為item.*?
topic_queue2系結的Routing Key路由鍵為item.#

4.6.3 生產者代碼
- rabbitmq-producer專案測驗代碼如下:
package com.sg.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* 目標:將訊息發送給交換機,通過交換機路由給指定的訊息佇列,路由鍵使用通配符
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class Demo05TestTopicModel {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
//測驗Topic方式
public void testTopic() {
//向通配符交換機發送訊息
rabbitTemplate.convertAndSend("topic_ex", "item.insert", "hello 路由鍵item.insert");
rabbitTemplate.convertAndSend("topic_ex", "item.insert.abc", "hello 路由鍵:item.insert.abc");
}
}
4.6.4 消費者代碼
- rabbitmq-consumer專案創建監聽器:
package com.sg.rabbitmq.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 通配符模式:訊息佇列接收監聽器1,接收來自通配符模式發送的訊息
*
*/
@Component
@RabbitListener(queues = "topic_queue1")
public class TopicListener1 {
@RabbitListener(queues = "topic_queue1")
public void topic1(String msg){
System.out.println("=====topic*====>"+msg);
}
@RabbitListener(queues = "topic_queue2")
public void topic2(String msg){
System.out.println("=====topic#====>"+msg);
}
}
結果:
item.#: 能夠匹配item.insert.abc.bbc或者item.insert
**item.*:**只能匹配item.insert

## 4.7 模式總結RabbitMQ
作業模式:
- **簡單模式 HelloWorld : **
- 角色:一個生產者、一個佇列、一個消費者,不需要交換機(默認交換機)
- **作業佇列模式 Work Queue:**
- 角色:一個生產者、多個佇列、多個消費者(競爭關系),不需要交換機(默認交換機)
- **發布訂閱模式 Publish/subscribe: **
- 角色:一個生產者、一個交換機、多個佇列、多個消費者
- 交換機型別為**==fanout==**,并且交換機和佇列要進行系結,不設定路由鍵(routing key)
- 當發送訊息到交換機后,交換機會將訊息廣播發送到系結的佇列
- **路由模式 Routing: **
- 角色:一個生產者、一個交換機、多個佇列、多個消費者
- 交換機型別為**==direct==**,并且交換機和佇列要進行系結,并且需要設定路由鍵(routing key)
- 當發送訊息到交換機后,交換機會根據路由鍵(routing key)將訊息發送到佇列
- **通配符模式 Topic: **
- 角色:一個生產者、一個交換機、多個佇列、多個消費者
- 交換機型別為**==topic==**,并且交換機和佇列要進行系結,并且需要設定路由鍵(routing key)
- 路由鍵字串使用通配符,`*`和`#`,*好匹配一個單詞,#匹配一個或多個單詞,單詞用`.`分隔
- 當發送訊息到交換機后,交換機會根據路由鍵(routing key)加通配符將訊息發送到佇列
4.7 模式總結RabbitMQ
作業模式:
- **簡單模式 HelloWorld : **
- 角色:一個生產者、一個佇列、一個消費者,不需要交換機(默認交換機)
- 作業佇列模式 Work Queue:
- 角色:一個生產者、多個佇列、多個消費者(競爭關系),不需要交換機(默認交換機)
- **發布訂閱模式 Publish/subscribe: **
- 角色:一個生產者、一個交換機、多個佇列、多個消費者
- 交換機型別為**fanout**,并且交換機和佇列要進行系結,不設定路由鍵(routing key)
- 當發送訊息到交換機后,交換機會將訊息廣播發送到系結的佇列
- **路由模式 Routing: **
- 角色:一個生產者、一個交換機、多個佇列、多個消費者
- 交換機型別為**direct**,并且交換機和佇列要進行系結,并且需要設定路由鍵(routing key)
- 當發送訊息到交換機后,交換機會根據路由鍵(routing key)將訊息發送到佇列
- **通配符模式 Topic: **
- 角色:一個生產者、一個交換機、多個佇列、多個消費者
- 交換機型別為**topic**,并且交換機和佇列要進行系結,并且需要設定路由鍵(routing key)
- 路由鍵字串使用通配符,
*和#,*好匹配一個單詞,#匹配一個或多個單詞,單詞用.分隔 - 當發送訊息到交換機后,交換機會根據路由鍵(routing key)加通配符將訊息發送到佇列
第一次寫博客希望對大家有所幫助,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/434521.html
標籤:其他
