我是廖志偉,一名Java開發工程師、幕后大佬社區創始人、Java領域優質創作者、CSDN博客專家,擁有多年一線研發經驗,研究過各種常見框架及中間件的底層原始碼,對于大型分布式、微服務、三高架構(高性能、高并發、高可用)有過實踐架構經驗,
博主:java_wxid
社區:幕后大佬
文章目錄
- 一、RabbitMQ的基本概念,以及6種作業模式,訊息確認機制
- 二、6 種作業模式
- 一、理論
- 二、代碼
- 三、訊息確認機制:confirm狀態和return狀態
- 一、理論
- 二、代碼
- 二、Spring整合RabbitMQ(簡單模式,廣播模式,路由模式,通配符模式,訊息可靠性投遞,防止訊息丟失,TTL,死信佇列,延遲佇列,訊息積壓,訊息冪等性)
- 一、專案代碼
- 1.生產者
- 1.專案架構圖:
- 2.pom.xml依賴:
- 3.spring-rabbitmq-producer.xml:
- 4.rabbitmq.properties:
- 5.ProducerTest:
- 2.消費者
- 1.專案架構圖
- 2.pom.xml依賴:
- 3.spring-rabbitmq-consumer.xml
- 4.rabbitmq.properties
- 5.ConsumerTest
- 6.FanoutListener
- 7.FanoutListener2
- 8.SpringDirectQueue
- 9.SpringQueueListener
- 10.TopicListenerOne
- 11.TopicListenerTwo
- 12.TopicListenerThree
- 二、專案演示
- 演示簡單模式:
- 演示廣播模式:
- 演示路由模式:
- 演示通配符模式:
- 三、訊息可靠性投遞
- 1.rabbitmq 整個訊息投遞的路徑
- 2.實作訊息可靠性投遞的步驟
- 3.具體實作可靠訊息投遞的代碼
- 生產者
- 消費者
- 4.具體實作可靠訊息投遞的演示
- 正常發訊息Demo演示
- 例外發訊息Demo演示
- 四、訊息在消費端限流
- 1.限流示例圖
- 2.實作步驟
- 3.具體實作消費端限流代碼
- 消費者
- 生成者
- 4.具體實作消費端限流Demo演示
- 啟動消費者
- 啟動生產者
- 查看消費者控制臺日志
- 例外情況,消費未進行簽收
- 五、TTL
- 1.業務場景
- 2.定義
- 3.實作步驟
- 4.通過RabbitMQ管理控制臺頁面實作Demo
- 1.創建訊息
- 2.創建交換機
- 3.將交換機和訊息系結
- 4.發送訊息
- 5.通過代碼實作TTL
- 六、死信佇列
- 1.定義
- 2.訊息成為死信的三種情況
- 3.佇列系結死信交換機
- 4.代碼實作
- 七、延遲佇列
- 1.定義
- 2.場景
- 3.具體實作
- 1.生產者
- 2.消費者
- 4.Demo演示
- 八、訊息積壓
- 1.場景
- 2.解決方案
- 九、訊息冪等性
- 1.定義
- 2.解決方案
- 三、Springboot集成RabbitMQ(直連模式,作業佇列模式,發布訂閱模式,路由模式,通配符模式
- 一、集成步驟
- 一、生產者:
- 二、消費者:
- 二、實作步驟
- 1.專案架構圖
- 2.創建專案
- 1.pom依賴
- 2.application.properties配置
- 3.config配置
- HelloWorldConfig
- FanoutConfig
- WorkConfig
- DirectConfig
- TopicConfig
- 4.消費端component
- 5.生產者controller
- 三、演示步驟
- 1.啟動專案
- 2.呼叫介面演示
- 1.直連模式
- 1.介面呼叫
- 2.控制臺列印
- 2.作業佇列模式
- 1.介面呼叫
- 2.控制臺列印
- 3.發布訂閱模式(交換機型別:fanout)
- 1.介面呼叫
- 2.控制臺列印
- 4.路由作業模式(交換機型別:direct)
- 1.介面呼叫
- 2.控制臺列印
- 5.通配符模式(交換機型別:topic)
- 1.介面呼叫
- 2.控制臺列印
- 四、RabbitMQ集群搭建
- 一、準備作業
- 二、集群搭建
- 1.集群搭建步驟
- 2.集群搭建負載均衡-HAProxy搭建
- 1.執行安裝
- 2.haproxy.cfg配置詳解
本文的大概內容:
一、RabbitMQ的基本概念,以及6種作業模式,訊息確認機制
RabbitMQ 簡介:RabbitMQ 基于 AMQP 標準,采用 Erlang 語言開發的訊息中間件,
基本概念:
- Producer:作為訊息的生成者,
- Consumer:作為訊息的消費者,
- Connection:訊息的發布方或者訊息的消費方 和broker 之間的 TCP 連接,
- Channel:Channel 是在 connection內部建立的邏輯連接,如果應用程式支持多執行緒,通常每個thread創建單獨的 channel 進行通訊,AMQP method包含了channel id 幫助客戶端和message broker 識別 channel,所以 channel之間是完全隔離的,減少了作業系統建立 TCP connection 的開銷,
- Broker:接收和分發訊息的應用,RabbitMQ服務就是Message Broker,
- Virtual host:虛擬機,出于多租戶和安全因素設計的,把 AMQP的基本組件劃分到一個虛擬的分組中,可以類比mysql資料庫會創建很多庫,庫和庫之間是獨立的,當多個不同的用戶使用同一個RabbitMQserver 提供的服務時,可以劃分出多個vhost,每個用戶在自己的 vhost 創建 exchange/queue 等,
- Queue:佇列,訊息佇列,接收訊息、快取訊息,訊息最終被送到這里等待 consumer 取走,
- Binding:系結,exchange 和 queue 之間的虛擬連接,binding 中可以包含 routing key,Binding 資訊被保存到 exchange 中的查詢表中,用于 message 的分發依據
- Exchange:交換機,message 到達 broker 的第一站,根據分發規則,匹配查詢表中的 routing key,分發訊息到queue 中去,
交換機常用的型別有:
Fanout:廣播,將訊息交給所有系結到交換機的佇列
Direct:定向,把訊息交給符合指定routing key 的佇列
Topic:通配符,把訊息交給符合routing pattern(路由模式) 的佇列
二、6 種作業模式
一、理論
RabbitMQ 提供了 6 種作業模式,簡單模式、work queues、Publish/Subscribe
發布與訂閱模式、Routing 路由模式、Topics 主題模式、RPC 遠程呼叫模式(遠程呼叫,不太算訊息佇列)
簡單模式:一個生產者生產訊息發送到佇列里面,一個消費者從佇列里面拿訊息,進行消費訊息,一個生產者、一個消費者,不需要設定交換機(使用默認的交換機)

說明:類似一個郵箱,可以快取訊息;生產者向其中投遞訊息,消費者從其中取出訊息,
Work queues 作業佇列模式:一個生產者生產訊息發送到佇列里面,一個或者多個消費者從佇列里面拿訊息,進行消費訊息,一個生產者、多個消費者(競爭關系),不需要設定交換機(使用默認的交換機)

說明:對于任務過重或任務較多情況使用作業佇列可以提高任務處理的速度,應用場景:過年過節12306搶票,發短信給用戶,可以接入多個短信服務進行發送,提供任務的處理速度,
Pub/Sub 訂閱模式 :一個生產者生產訊息發送到交換機里面,由交換機處理訊息,佇列與交換機的任意系結,將訊息指派給某個佇列,一個或者多個消費者從佇列里面拿訊息,進行消費訊息,需要設定型別為 fanout 的交換機,并且交換機和佇列進行系結,當發送訊息到交換機后,交換機會將訊息發送到系結的佇列,

說明:Exchange(交換機)只負責轉發訊息,不具備存盤訊息的能力,因此如果沒有任何佇列與 Exchange 系結,或者沒有符合路由規則的佇列,那么訊息會丟失!
Routing 路由模式:一個生產者生產訊息發送到交換機里面,并且指定一個路由key,佇列與交換機的系結是通過路由key進行系結的,消費者在消費的時候需要根據路由key從交換機里面拿訊息,進行消費訊息,需要設定型別為 direct 的交換機,交換機和佇列進行系結,并且指定 routing key,當發送訊息到交換機后,交換機會根據 routing key 將訊息發送到對應的佇列,

說明:Routing 模式要求佇列在系結交換機時要指定 routing key,訊息會轉發到符合 routing key 的佇列,
Topics 通配符模式:一個生產者生產訊息發送到交換機里面,并且使用通配符的形式(類似mysql里面的模糊查詢,比如想獲取一批帶有item前綴的資料),佇列與交換機的系結是通過通配符進行系結的,消費者在消費的時候需要根據根據通配符從交換機里面拿訊息,進行消費訊息,需要設定型別為 topic 的交換機,交換機和佇列進行系結,并且指定通配符方式的 routing key,當發送訊息到交換機后,交換機會根據 routing key 將訊息發送到對應的佇列
說明:通配符規則:# 匹配一個或多個詞,* 匹配不多不少恰好1個詞,例如:Lazy.# 能夠匹配 Lazy.insert.content或者 Lazy.insert,Lazy.* 只能匹配Lazy.insert,
二、代碼

創建一個Maven工程,引入pom依賴:
<dependencies>
<!--rabbitmq客戶端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.3.0</version>
</dependency>
<!--json轉換工具包-->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>
</dependencies>
創建一個連接Rabbitmq的工具類:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitUtils {
private static ConnectionFactory connectionFactory = new ConnectionFactory();
static {
connectionFactory.setHost("你的rabbitmq的ip地址");
connectionFactory.setPort(5672);//RabbitMQ的默認埠號,根據實際情況修改
connectionFactory.setUsername("你的rabbitmq的用戶名稱");
connectionFactory.setPassword("你的rabbitmq的用戶密碼");
connectionFactory.setVirtualHost("你的rabbitmq的虛擬機");
}
public static Connection getConnection(){
Connection conn = null;
try {
conn = connectionFactory.newConnection();
return conn;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
簡單模式:
為了區分好理解,我每個模式都去創建一個虛擬機,這里我先去rabbitMq管控頁面創建一個虛擬機

修改工具類的虛擬機:

生產者:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
public static void main(String[] args) throws Exception {
//獲取TCP長連接
Connection conn = RabbitUtils.getConnection();
//創建通信“通道”,相當于TCP中的虛擬連接
Channel channel = conn.createChannel();
//創建佇列,宣告并創建一個佇列,如果佇列已存在,則使用這個佇列
//channel.queueDeclare的五個引數
//第一個引數:佇列名稱ID
//第二個引數:是否持久化,false對應不持久化資料,MQ停掉資料就會丟失
//第三個引數:是否佇列私有化,false則代表所有消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用,其他消費者不讓訪問
//第四個:是否自動洗掉,false代表連接停掉后不自動洗掉掉這個佇列
//其他額外的引數, null
channel.queueDeclare(RabbitConstant.QUEUE_TEST,false, false, false, null);
String message = "要發送的message";
//channel.basicPublish的四個引數
//exchange 交換機,暫時用不到,在后面進行發布訂閱時才會用到
//佇列名稱
//額外的設定屬性
//最后一個引數是要傳遞的訊息位元組陣列
channel.basicPublish("", RabbitConstant.QUEUE_TEST, null,message.getBytes());
channel.close();
conn.close();
System.out.println("===發送成功===");
}
}
消費者:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) throws Exception{
//獲取TCP長連接
Connection conn = RabbitUtils.getConnection();
//創建通信“通道”,相當于TCP中的虛擬連接
Channel channel = conn.createChannel();
//創建佇列,宣告并創建一個佇列,如果佇列已存在,則使用這個佇列
//第一個引數:佇列名稱ID
//第二個引數:是否持久化,false對應不持久化資料,MQ停掉資料就會丟失
//第三個引數:是否佇列私有化,false則代表所有消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用,其他消費者不讓訪問
//第四個:是否自動洗掉,false代表連接停掉后不自動洗掉掉這個佇列
//其他額外的引數, null
channel.queueDeclare(RabbitConstant.QUEUE_TEST,false, false, false, null);
//從MQ服務器中獲取資料
//創建一個訊息消費者
//第一個引數:佇列名
//第二個引數代表是否自動確認收到訊息,false代表手動編程來確認訊息,這是MQ的推薦做法
//第三個引數要傳入DefaultConsumer的實作類
channel.basicConsume(RabbitConstant.QUEUE_TEST, false, new Reciver(channel));
}
}
class Reciver extends DefaultConsumer {
private Channel channel;
//重寫建構式,Channel通道物件需要從外層傳入,在handleDelivery中要用到
public Reciver(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消費者接收到的訊息:"+message);
System.out.println("訊息的TagId:"+envelope.getDeliveryTag());
//false只確認簽收當前的訊息,設定為true的時候則代表簽收該消費者所有未簽收的訊息
channel.basicAck(envelope.getDeliveryTag(), false);
}
我先啟動消費者后啟動生產者,這樣只要生產者一生產訊息,消費者就可以立馬消費,



Work queues 作業佇列模式:
為了區分好理解,我每個模式都去創建一個虛擬機,這里我先去rabbitMq管控頁面創建一個虛擬機
修改工具類的虛擬機

為了模擬某些業務,這里使用自定義物體類發送訊息,所以我新建了一個自定義物體類
/**
* 自定義的物體類:發送內容
*/
public class SenderContent {
private String name;
private String content;
public SenderContent(String name, String content) {
this.name = name;
this.content = content;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
生產者:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 生成者
*/
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SENDER_CONTENT, false, false, false, null);
for(int i = 1 ; i <= 100 ; i++) {
SenderContent senderContent = new SenderContent("姓名:" + i, "內容:" + i);
String jsonSMS = new Gson().toJson(senderContent);
channel.basicPublish("" , RabbitConstant.QUEUE_SENDER_CONTENT , null , jsonSMS.getBytes());
}
System.out.println("發送資料成功");
channel.close();
connection.close();
}
}
消費者一:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消費者1
*/
public class ConsumerOne {
public static void main(String[] args) throws Exception {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SENDER_CONTENT, false, false, false, null);
//如果不寫basicQos(1),則自動MQ會將所有請求平均發送給所有消費者
//basicQos,MQ不再對消費者一次發送多個請求,而是消費者處理完一個訊息后(確認后),在從佇列中獲取一個新的
channel.basicQos(1);//處理完一個取一個
channel.basicConsume(RabbitConstant.QUEUE_SENDER_CONTENT , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println("ConsumerOne-發送成功:" + jsonSMS);
try {
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
//確認簽收
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
消費者二:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消費者2
*/
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SENDER_CONTENT, false, false, false, null);
//如果不寫basicQos(1),則自動MQ會將所有請求平均發送給所有消費者
//basicQos,MQ不再對消費者一次發送多個請求,而是消費者處理完一個訊息后(確認后),在從佇列中獲取一個新的
channel.basicQos(1);//處理完一個取一個
channel.basicConsume(RabbitConstant.QUEUE_SENDER_CONTENT , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println("ConsumerTwo-發送成功:" + jsonSMS);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//確認簽收
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
消費者三:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消費者3
*/
public class ConsumerThree {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_SENDER_CONTENT, false, false, false, null);
//如果不寫basicQos(1),則自動MQ會將所有請求平均發送給所有消費者
//basicQos,MQ不再對消費者一次發送多個請求,而是消費者處理完一個訊息后(確認后),在從佇列中獲取一個新的
channel.basicQos(1);//處理完一個取一個
channel.basicConsume(RabbitConstant.QUEUE_SENDER_CONTENT , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String jsonSMS = new String(body);
System.out.println("ConsumerThree-發送成功:" + jsonSMS);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
//確認簽收
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
這里對每個消費者設定不同的休眠時間演示每個消費者處理業務的時間不同,查看訊息消費的情況





可以看出消費者一消費的最多,消費者三消費的最少,因為代碼中設定了這個
channel.basicQos(1);//處理完一個取一個
消費者處理完一個訊息后(確認后),在從佇列中獲取一個新的,
Pub/Sub 訂閱模式 :
為了區分好理解,我每個模式都去創建一個虛擬機,這里我先去rabbitMq管控頁面創建一個虛擬機
創建一個交換機:這里用廣播模式作為交換機的型別用來演示

修改工具類的虛擬機

生產者:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.Scanner;
/**
* 發布者
*/
public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = RabbitUtils.getConnection();
//鍵盤輸入
String input = new Scanner(System.in).next();
Channel channel = connection.createChannel();
//第一個引數交換機名字 其他引數和之前的一樣
channel.basicPublish(RabbitConstant.EXCHANGE_CONTENT,"" , null , input.getBytes());
channel.close();
connection.close();
}
}
消費者一:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消費者1
*/
public class ConsumerOne {
public static void main(String[] args) throws IOException {
//獲取TCP長連接
Connection connection = RabbitUtils.getConnection();
//獲取虛擬連接
final Channel channel = connection.createChannel();
//宣告佇列資訊
channel.queueDeclare(RabbitConstant.QUEUE_ONE, false, false, false, null);
//queueBind用于將佇列與交換機系結
//引數1:佇列名 引數2:互動機名 引數三:路由key(暫時用不到)
channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT, "");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_ONE , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者一收到資訊:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
消費者二:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消費者2
*/
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
//獲取TCP長連接
Connection connection = RabbitUtils.getConnection();
//獲取虛擬連接
final Channel channel = connection.createChannel();
//宣告佇列資訊
channel.queueDeclare(RabbitConstant.QUEUE_TWO, false, false, false, null);
//queueBind用于將佇列與交換機系結
//引數1:佇列名 引數2:互動機名 引數三:路由key(暫時用不到)
channel.queueBind(RabbitConstant.QUEUE_TWO, RabbitConstant.EXCHANGE_CONTENT, "");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_TWO , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者二收到資訊:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
演示效果:




Routing 路由模式:
為了區分好理解,我每個模式都去創建一個虛擬機,這里我先去rabbitMq管控頁面創建一個虛擬機
修改工具類的虛擬機

創建交換機:這里的交換機type型別一定要改成routing模式,如果還是廣播模式的fanout的話,跟上面發布和訂閱模式出現的效果會是一樣的,
錯誤實體:
正確的實體:
生產者:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* 發布者
*/
public class Producer {
public static void main(String[] args) throws Exception {
Map area = new LinkedHashMap<String, String>();
area.put("routing.one.a.20201127", "中國湖南長沙20201127私密資料");
area.put("routing.one.d.20201128", "中國河北石家莊20201128私密資料");
area.put("routing.two.b.20201127", "中國湖北武漢20201127私密資料");
area.put("routing.two.e.20201128", "中國湖北武漢20201128私密資料");
area.put("routing.three.c.20201127", "中國湖南株洲20201128私密資料");
area.put("routing.three.f.20201128", "中國河南鄭州20201128私密資料");
area.put("us.one.a.20201127", "美國加州洛杉磯20201127私密資料");
area.put("us.two.b.20201128", "美國加州洛杉磯20201128私密資料");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, String> me = itr.next();
//第一個引數交換機名字 第二個引數作為 訊息的routing key
channel.basicPublish(RabbitConstant.EXCHANGE_CONTENT_ROUTING,me.getKey() , null , me.getValue().getBytes());
}
channel.close();
connection.close();
}
}
消費者一:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消費者1
*/
public class ConsumerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_ONE, false, false, false, null);
//queueBind用于將佇列與交換機系結
//引數1:佇列名 引數2:互動機名 引數三:路由key
channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT_ROUTING, "routing.one.a.20201127");
channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT_ROUTING, "us.one.a.20201127");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_ONE , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1收到資訊:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
消費者二:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消費者2
*/
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_TWO, false, false, false, null);
//queueBind用于將佇列與交換機系結
//引數1:佇列名 引數2:互動機名 引數三:路由key
channel.queueBind(RabbitConstant.QUEUE_TWO, RabbitConstant.EXCHANGE_CONTENT_ROUTING, "routing.one.d.20201128");
channel.queueBind(RabbitConstant.QUEUE_TWO, RabbitConstant.EXCHANGE_CONTENT_ROUTING, "routing.two.e.20201128");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_TWO , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2收到資訊:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
效果:


路由模式需要消費者指定路由key
Topics 通配符模式:
為了區分好理解,我每個模式都去創建一個虛擬機,這里我先去rabbitMq管控頁面創建一個虛擬機
修改工具類的虛擬機

創建互動機,型別為topic
生產者:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* 發布者
*/
public class Producer {
public static void main(String[] args) throws Exception {
Map area = new LinkedHashMap<String, String>();
area.put("routing.one.a.20201127", "中國湖南長沙20201127私密資料");
area.put("routing.one.d.20201128", "中國河北石家莊20201128私密資料");
area.put("routing.two.b.20201127", "中國湖北武漢20201127私密資料");
area.put("routing.two.e.20201128", "中國湖北武漢20201128私密資料");
area.put("routing.three.c.20201127", "中國湖南株洲20201128私密資料");
area.put("routing.three.f.20201128", "中國河南鄭州20201128私密資料");
area.put("us.one.a.20201127", "美國加州洛杉磯20201127私密資料");
area.put("us.two.b.20201128", "美國加州洛杉磯20201128私密資料");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, String> me = itr.next();
//第一個引數交換機名字 第二個引數作為 訊息的routing key
channel.basicPublish(RabbitConstant.EXCHANGE_CONTENT_TOPIC,me.getKey() , null , me.getValue().getBytes());
}
channel.close();
connection.close();
}
}
消費者一:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消費者1
*/
public class ConsumerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_ONE, false, false, false, null);
//queueBind用于將佇列與交換機系結
//引數1:佇列名 引數2:互動機名 引數三:路由key
channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT_TOPIC, "*.*.*.20201127");
// channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT_TOPIC, "us.two.b.20201128");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_ONE , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1收到資訊:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
消費者二:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消費者2
*/
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
//獲取TCP長連接
Connection connection = RabbitUtils.getConnection();
//獲取虛擬連接
final Channel channel = connection.createChannel();
//宣告佇列資訊
channel.queueDeclare(RabbitConstant.QUEUE_TWO, false, false, false, null);
//指定佇列與交換機以及routing key之間的關系
channel.queueBind(RabbitConstant.QUEUE_TWO, RabbitConstant.EXCHANGE_CONTENT_TOPIC, "us.#");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_TWO , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2收到資訊:" + new String(body));
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
效果:



說明:如果想切換模式進行測驗,只需要修改工具類中的虛擬機即可,前面的命名都是一樣的,就是為了在這個時候體現出每個虛擬機都是隔離的,所以那么key是一樣的也沒關系,
三、訊息確認機制:confirm狀態和return狀態
一、理論
confirm狀態:表示生產者將訊息投遞到Broker時產生的狀態,會出現二種情況:
- ack:表示已經被Broker簽收
- nack:表示表示已經被Broker拒收,原因可能有佇列滿了,限流,IO例外…
return狀態:表示生產者將訊息投遞到Broker,被Broker簽收,但是沒有對應的佇列進行投遞,將訊息回退給生產者的狀態,
說明:這二種狀態都只和生產者有關,于消費者沒關系,
二、代碼
沿用之前的topic虛擬機,就不在創建新的虛擬機了
創建一個交換機:
生產者:
package com.liao.rabbitmq.confirm;
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* 發布者
*/
public class Producer {
public static void main(String[] args) throws Exception {
Map area = new LinkedHashMap<String, String>();
area.put("routing.one.a.20211001", "中國長沙20211001私密資料");
area.put("routing.two.b.20211001", "中國武漢20211001私密資料");
area.put("routing.three.c.20211001", "中國株洲20211001私密資料");
area.put("routing.one.d.20211002", "中國石家莊20211002私密資料");
area.put("routing.two.e.20211002", "中國武漢20211002私密資料");
area.put("routing.three.f.20211002", "中國鄭州20211002私密資料");
area.put("routing.error.f.aaa", "未成功投遞的私密資料");
area.put("us.one.a.20211001", "美國洛杉磯20211001私密資料");
area.put("us.two.b.20211002", "美國洛杉磯20211002私密資料");
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//開啟confirm監聽模式
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long l, boolean b) throws IOException {
//第二個引數代表接收的資料是否為批量接收,一般我們用不到,
System.out.println("訊息已被Broker接收,Tag:" + l );
}
public void handleNack(long l, boolean b) throws IOException {
System.out.println("訊息已被Broker拒收,Tag:" + l);
}
});
channel.addReturnListener(new ReturnCallback() {
public void handle(Return r) {
System.err.println("===========================");
System.err.println("Return編碼:" + r.getReplyCode() + "-Return描述:" + r.getReplyText());
System.err.println("交換機:" + r.getExchange() + "-路由key:" + r.getRoutingKey() );
System.err.println("Return主題:" + new String(r.getBody()));
System.err.println("===========================");
}
});
Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();
while (itr.hasNext()) {
Map.Entry<String, String> me = itr.next();
//Routing key 第二個引數相當于資料篩選的條件
//第三個引數為:mandatory true代表如果訊息無法正常投遞則return回生產者,如果false,則直接將訊息放棄,
channel.basicPublish(RabbitConstant.EXCHANGE_CONTENT_TOPIC_CONFIRM,me.getKey() ,true, null , me.getValue().getBytes());
}
//如果關閉則無法進行監聽,因此此處不需要關閉
/*channel.close();
connection.close();*/
}
}
消費者一:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消費者1
*/
public class ConsumerOne {
public static void main(String[] args) throws IOException {
Connection connection = RabbitUtils.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(RabbitConstant.QUEUE_ONE, false, false, false, null);
//queueBind用于將佇列與交換機系結
//引數1:佇列名 引數2:互動機名 引數三:路由key
channel.queueBind(RabbitConstant.QUEUE_ONE, RabbitConstant.EXCHANGE_CONTENT_TOPIC_CONFIRM, "*.*.*.20211001");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_ONE , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者1收到資訊:" + new String(body));
//channel.basicNack的三個引數
//第一個引數:long deliveryTag:唯一標識 ID,
//第二個引數:boolean multiple:是否批處理,當該引數為 true 時,則可以一次性確認 delivery_tag 小于等于傳入值的所有訊息,
//第三個引數:boolean requeue:如果 requeue 引數設定為 true,則 RabbitMQ 會重新將這條訊息存入佇列,以便發送給下一個訂閱的消費者; 如果 requeue 引數設定為 false,則 RabbitMQ 立即會還把訊息從佇列中移除,而不會把它發送給新的消費者,
// channel.basicNack(envelope.getDeliveryTag() , false,false);
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
消費者二:
import com.liao.rabbitmq.utils.RabbitConstant;
import com.liao.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* 消費者2
*/
public class ConsumerTwo {
public static void main(String[] args) throws IOException {
//獲取TCP長連接
Connection connection = RabbitUtils.getConnection();
//獲取虛擬連接
final Channel channel = connection.createChannel();
//宣告佇列資訊
channel.queueDeclare(RabbitConstant.QUEUE_TWO, false, false, false, null);
//指定佇列與交換機以及routing key之間的關系
channel.queueBind(RabbitConstant.QUEUE_TWO, RabbitConstant.EXCHANGE_CONTENT_TOPIC_CONFIRM, "us.#");
channel.basicQos(1);
channel.basicConsume(RabbitConstant.QUEUE_TWO , false , new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費者2收到資訊:" + new String(body));
// channel.basicNack(envelope.getDeliveryTag() , false,false);
channel.basicAck(envelope.getDeliveryTag() , false);
}
});
}
}
演示效果:


可以看到列印return都是key中沒有20211001后綴或者沒有us前綴的資料
二、Spring整合RabbitMQ(簡單模式,廣播模式,路由模式,通配符模式,訊息可靠性投遞,防止訊息丟失,TTL,死信佇列,延遲佇列,訊息積壓,訊息冪等性)
本文通過實戰代碼,Spring整合RabbitMQ,專案分二個模塊,consumer和produle,
一、專案代碼
1.生產者
1.專案架構圖:

代碼如下(示例):
2.pom.xml依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sky</groupId>
<artifactId>spring-rabbitmq-produle</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.spring-rabbitmq-producer.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加載組態檔-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--定義管理交換機、佇列-->
<rabbit:admin connection-factory="connectionFactory"/>
<!--定義持久化佇列,不存在則自動創建;不系結到交換機則系結到默認交換機
默認交換機型別為direct,名字為:"",路由鍵為佇列的名稱
-->
<!--
id:bean的名稱
name:queue的名稱
auto-declare:自動創建
auto-delete:自動洗掉, 最后一個消費者和該佇列斷開連接后,自動洗掉佇列
durable:是否持久化
-->
<rabbit:queue id="spring_queue" name="spring_queue" auto-declare="true"/>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~廣播;所有佇列都能收到訊息~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定義廣播交換機中的持久化佇列,不存在則自動創建-->
<rabbit:queue id="spring_fanout_queue_1" name="spring_fanout_queue_1" auto-declare="true"/>
<!--定義廣播交換機中的持久化佇列,不存在則自動創建-->
<rabbit:queue id="spring_fanout_queue_2" name="spring_fanout_queue_2" auto-declare="true"/>
<!--定義廣播型別交換機;并系結上述兩個佇列-->
<rabbit:fanout-exchange id="spring_fanout_exchange" name="spring_fanout_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding queue="spring_fanout_queue_1" />
<rabbit:binding queue="spring_fanout_queue_2"/>
</rabbit:bindings>
</rabbit:fanout-exchange>
<!-- 定義佇列-->
<rabbit:queue id="spring_direct_queue" name="spring_direct_queue" auto-declare="true"/>
<!--
定義 Routing 路由模式 互動機
-->
<rabbit:direct-exchange name="spring_direct_exchange" >
<rabbit:bindings>
<!--direct 型別的交換機系結佇列 key :路由key queue:佇列名稱-->
<rabbit:binding queue="spring_direct_queue" key="direct"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- ~~~~~~~~~~~~~~~~~~~~~~~~~~~~通配符;*匹配一個單詞,#匹配多個單詞 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -->
<!--定義廣播交換機中的持久化佇列,不存在則自動創建-->
<rabbit:queue id="spring_topic_queue_one" name="spring_topic_queue_one" auto-declare="true"/>
<!--定義廣播交換機中的持久化佇列,不存在則自動創建-->
<rabbit:queue id="spring_topic_queue_two" name="spring_topic_queue_two" auto-declare="true"/>
<!--定義廣播交換機中的持久化佇列,不存在則自動創建-->
<rabbit:queue id="spring_topic_queue_three" name="spring_topic_queue_three" auto-declare="true"/>
<!--
宣告 topic 型別的交換機
-->
<rabbit:topic-exchange id="spring_topic_exchange" name="spring_topic_exchange" auto-declare="true">
<rabbit:bindings>
<rabbit:binding pattern="one.*" queue="spring_topic_queue_one"/>
<rabbit:binding pattern="two.#" queue="spring_topic_queue_two"/>
<rabbit:binding pattern="three.#" queue="spring_topic_queue_three"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--定義rabbitTemplate物件操作可以在代碼中方便發送訊息-->
<rabbit:template id="rabbitTemplate" connection-factory="connectionFactory"/>
</beans>
4.rabbitmq.properties:
rabbitmq.host=110.42.239.246
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=spring
說明:這里免費提供rabbitmq連接方式給大家使用學習
5.ProducerTest:
package com.sky.springrabbitmqprodule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:spring-rabbitmq-producer.xml")
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 簡單模式發訊息
*/
@Test
public void testHelloWorld(){
rabbitTemplate.convertAndSend("spring_queue","hello world spring....");
}
/**
* 廣播模式發訊息
*/
@Test
public void testFanout(){
rabbitTemplate.convertAndSend("spring_fanout_exchange","","spring fanout....");
}
/**
* 路由模式發訊息
*/
@Test
public void testDirect(){
rabbitTemplate.convertAndSend("spring_direct_exchange","direct","spring Direct....");
}
/**
* 通配符模式發訊息
*/
@Test
public void testTopics(){
rabbitTemplate.convertAndSend("spring_topic_exchange","one.onekey","spring topic one....");
rabbitTemplate.convertAndSend("spring_topic_exchange","two.twokey.topic","spring topic two....");
rabbitTemplate.convertAndSend("spring_topic_exchange","three.threekey.topic","spring topic three....");
}
}
2.消費者
1.專案架構圖

代碼如下(示例):
2.pom.xml依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sky</groupId>
<artifactId>spring-rabbitmq-consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.8.RELEASE</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.1.7.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
https://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!--加載組態檔-->
<context:property-placeholder location="classpath:rabbitmq.properties"/>
<!-- 定義rabbitmq connectionFactory -->
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
port="${rabbitmq.port}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
virtual-host="${rabbitmq.virtual-host}"/>
<!--簡單模式-->
<!-- <bean id="springQueueListener" class="com.sky.springrabbitmqconsumer.listener.SpringQueueListener"/>-->
<!--廣播模式-->
<!-- <bean id="fanoutListener1" class="com.sky.springrabbitmqconsumer.listener.FanoutListener"/>-->
<!-- <bean id="fanoutListener2" class="com.sky.springrabbitmqconsumer.listener.FanoutListener2"/>-->
<!--路由模式-->
<!-- <bean id="springDirectQueue" class="com.sky.springrabbitmqconsumer.listener.SpringDirectQueue"/>-->
<!--通配符模式-->
<bean id="topicListenerOne" class="com.sky.springrabbitmqconsumer.listener.TopicListenerOne"/>
<bean id="topicListenerTwo" class="com.sky.springrabbitmqconsumer.listener.TopicListenerTwo"/>
<bean id="topicListenerThree" class="com.sky.springrabbitmqconsumer.listener.TopicListenerThree"/>
<rabbit:listener-container connection-factory="connectionFactory" auto-declare="true">
<!--簡單模式-->
<!-- <rabbit:listener ref="springQueueListener" queue-names="spring_queue"/>-->
<!--廣播模式-->
<!-- <rabbit:listener ref="fanoutListener1" queue-names="spring_fanout_queue_1"/>-->
<!-- <rabbit:listener ref="fanoutListener2" queue-names="spring_fanout_queue_2"/>-->
<!--路由模式-->
<!-- <rabbit:listener ref="springDirectQueue" queue-names="spring_direct_queue"/>-->
<!--通配符模式-->
<rabbit:listener ref="topicListenerOne" queue-names="spring_topic_queue_one"/>
<rabbit:listener ref="topicListenerTwo" queue-names="spring_topic_queue_two"/>
<rabbit:listener ref="topicListenerThree" queue-names="spring_topic_queue_three"/>
</rabbit:listener-container>
</beans>
4.rabbitmq.properties
rabbitmq.host=110.42.239.246
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtual-host=spring
說明:配置和生產者的一致
5.ConsumerTest
package com.sky.springrabbitmqconsumer.test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class ConsumerTest {
public static void main(String[] args) {
//初始化IOC容器
ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:spring-rabbitmq-consumer.xml");
}
}
6.FanoutListener
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class FanoutListener implements MessageListener {
@Override
public void onMessage(Message message) {
//列印訊息
System.out.println(new String(message.getBody()));
}
}
7.FanoutListener2
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class FanoutListener2 implements MessageListener {
@Override
public void onMessage(Message message) {
//列印訊息
System.out.println(new String(message.getBody()));
}
}
8.SpringDirectQueue
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class SpringDirectQueue implements MessageListener {
@Override
public void onMessage(Message message) {
//列印訊息
System.out.println(new String(message.getBody()));
}
}
9.SpringQueueListener
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class SpringQueueListener implements MessageListener {
@Override
public void onMessage(Message message) {
//列印訊息
System.out.println(new String(message.getBody()));
}
}
10.TopicListenerOne
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class TopicListenerOne implements MessageListener {
@Override
public void onMessage(Message message) {
//列印訊息
System.out.println(new String(message.getBody()));
}
}
11.TopicListenerTwo
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class TopicListenerTwo implements MessageListener {
@Override
public void onMessage(Message message) {
//列印訊息
System.out.println(new String(message.getBody()));
}
}
12.TopicListenerThree
package com.sky.springrabbitmqconsumer.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
public class TopicListenerThree implements MessageListener {
@Override
public void onMessage(Message message) {
//列印訊息
System.out.println(new String(message.getBody()));
}
}
上面就是這個專案的所有代碼了,下面就是Demo演示內容,
二、專案演示
演示簡單模式:
消費者取消注釋:
消費者啟動服務:
生產者發送訊息:
消費者查看訊息:

演示廣播模式:
消費者取消注釋:

消費者啟動服務:

生產者發送訊息:

消費者查看訊息:

演示路由模式:
消費者取消注釋:

消費者啟動服務:

生產者發送訊息:

消費者查看訊息:

演示通配符模式:
消費者取消注釋:

消費者啟動服務:

生產者發送訊息:

消費者查看訊息:

三、訊息可靠性投遞
訊息可靠性實作需要保證以下幾點:
-
持久化
exchange要持久化
queue要持久化
message要持久化 -
生產方確認Confirm
-
消費方確認Ack
-
Broker高可用
1.rabbitmq 整個訊息投遞的路徑
producer—>rabbitmq broker—>exchange—>queue—>consumer
- 訊息從producer 到 exchange 則會回傳一個 confirmCallback ,
- 訊息從exchange–>queue 投遞失敗則會回傳一個 returnCallback ,
2.實作訊息可靠性投遞的步驟
- 生產者設定ConnectionFactory的publisher-confirms=“true” 開啟 確認模式,
- 生產者設定ConnectionFactory的publisher-returns=“true” 開啟 退回模式,
- 生產者使用rabbitTemplate.setConfirmCallback設定回呼函式,當訊息發送到exchange后回呼confirm方法,在方法中判斷ack,如果為true,則發送成功,如果為false,則發送失敗,需要處理,
- 生產者使用rabbitTemplate.setReturnCallback設定退回函式,當訊息從exchange路由到queue失敗后,如果設定了rabbitTemplate.setMandatory(true)引數,則會將訊息退回給producer,并執行回呼函式returnedMessage,
- 消費者在rabbit:listener-container標簽中設定acknowledge屬性,設定ack方式 none:自動確認,manual:手動確認(none自動確認模式很危險,當生產者發送多條訊息,消費者接收到一條資訊時,會自動認為當前發送的訊息已經簽收了,這個時候消費者進行業務處理時出現了例外情況,也會認為訊息已經正常簽收處理了,而佇列里面顯示都被消費掉了,所以真實開發都會改為手動簽收,可以防止訊息丟失)
- 消費者如果在消費端沒有出現例外,則呼叫channel.basicAck(deliveryTag,false);方法確認簽收訊息
- 消費者如果出現例外,則在catch中呼叫 basicNack或 basicReject,拒絕訊息,讓MQ重新發送訊息,
3.具體實作可靠訊息投遞的代碼
說明:基于上述Spring整合RabbitMQ的代碼進行改動
生產者
第一處改動:設定確認模式和退回模式
代碼:
publisher-confirms="true"
publisher-returns="true"
第二處改動:宣告佇列和互動機的bean
代碼:
<!--訊息可靠性投遞(生產端)-->
<rabbit:queue id="test_queue_confirm" name="test_queue_confirm"></rabbit:queue>
<rabbit:direct-exchange name="test_exchange_confirm">
<rabbit:bindings>
<rabbit:binding queue="test_queue_confirm" key="confirm"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
第三處改動:撰寫Confirm測驗方法
//測驗Confirm 模式
@Test
public void testConfirm() {
//定義回呼
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 相關配置資訊
* @param ack exchange交換機 是否成功收到了訊息,true 成功,false代表失敗
* @param cause 失敗原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm方法被執行了....");
//ack 為 true表示 訊息已經到達交換機
if (ack) {
//接收成功
System.out.println("接收成功訊息" + cause);
} else {
//如果沒有投遞到交換機中去就會接收失敗,比如:將交換機名稱寫錯
System.out.println("接收失敗訊息" + cause);
//做一些處理,讓訊息再次發送,
}
}
});
//進行訊息發送
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message Confirm...");
//進行睡眠操作
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
第四處改動:撰寫Return測驗方法
//測驗 return模式
@Test
public void testReturn() {
//設定交換機處理失敗訊息的模式為true的時候,訊息達到不了佇列時,會將訊息重新回傳給生產者
rabbitTemplate.setMandatory(true);
//定義回呼
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* @param message 訊息物件
* @param replyCode 錯誤碼
* @param replyText 錯誤資訊
* @param exchange 交換機
* @param routingKey 路由鍵
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("return 執行了....");
System.out.println("message:"+message);
System.out.println("replyCode:"+replyCode);
System.out.println("replyText:"+replyText);
System.out.println("exchange:"+exchange);
System.out.println("routingKey:"+routingKey);
//處理業務
}
});
//進行訊息發送
rabbitTemplate.convertAndSend("test_exchange_confirm","confirm","message return...");
//進行睡眠操作
try {
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
}
消費者
第一處改動:
監聽器:AckListener
package com.sky.springrabbitmqconsumer.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class AckListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//1、獲取訊息的id
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//2、獲取訊息
System.out.println("message:"+new String(message.getBody()));
//3、進行業務處理
System.out.println("=====進行業務處理====");
//模擬出現例外
int i = 5/0;
//4、進行訊息簽收
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
//拒絕簽收
/*
* 第三個引數:requeue:重回佇列,如果設定為true,則訊息重新回到queue,broker會重新發送該訊息給消費端
*/
System.out.println("=====業務處理例外,訊息重新回到queue,broker會重新發送該訊息給消費端====");
channel.basicNack(deliveryTag, true, true);
}
}
}
第二處改動:
原來是通過宣告一個個的bean物件,現在改為了掃描某個包下面的類
<context:component-scan base-package="com.sky.springrabbitmqconsumer.listener" />
第三處改動:
在rabbit:listener-container標簽中設定acknowledge屬性改為手動確認,(限流設定:prefetch屬性改為每次抓取2條訊息,并且監聽自定義的ackListener)
4.具體實作可靠訊息投遞的演示
正常發訊息Demo演示
啟動生產者Confirm模式:
啟動消費者:
啟動生產者Return模式:
消費者的控制臺就會不停的列印:

例外發訊息Demo演示
生產者Confirm模式:
生產者Return模式:

四、訊息在消費端限流
1.限流示例圖

2.實作步驟
- 在rabbit:listener-container中配置 prefetch屬性設定消費端一次拉取多少訊息
- 消費端的確認模式一定為手動確認:acknowledge=“manual”
3.具體實作消費端限流代碼
消費者
第一處修改:監聽器:QosListener
package com.sky.springrabbitmqconsumer.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class QosListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//獲取到的訊息
System.out.println(new String(message.getBody()));
Thread.sleep(3000);
//處理業務邏輯
//進行訊息的簽收,第二個引數:true表示之前沒簽收的都給他簽收掉
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
第二處修改:

<!--定義監聽器容器
acknowledge="manual":手動簽收
自動確認:acknowledge="none"
手動確認:acknowledge="manual"
根據例外情況確認:acknowledge="auto",(這種方式使用麻煩,不作講解)
prefetch="1":每次抓取多少條訊息,只有訊息確認簽收了,才會拉取下一條,否則不會拉取訊息
-->
<rabbit:listener-container connection-factory="connectionFactory"
auto-declare="true"
acknowledge="manual"
prefetch="2">
<rabbit:listener ref="qosListener" queue-names="test_queue_confirm"></rabbit:listener>
生成者
批量發送訊息測驗方法
//批量發送訊息,讓消費者每次拉去指定的數量
@Test
public void testQos(){
for (int i = 0; i < 10; i++) {
// 發送訊息
rabbitTemplate.convertAndSend("test_exchange_confirm", "confirm", "message confirm....");
}
}
4.具體實作消費端限流Demo演示
啟動消費者

啟動生產者

查看消費者控制臺日志
說明:每隔3秒列印一條訊息
例外情況,消費未進行簽收
重啟消費者和生產者發訊息,這個時候會看到,原本發送的十條訊息,實際只有二條訊息列印在消費者的控制臺上面,因為prefetch屬性配置了2,所以一次性拉取了二條,
<hr style=" border:solid; width:100px; height:1px;" color=#000000 size=1">
五、TTL
1.業務場景
比如我們在下訂單的時候,如果超過30分鐘未支付,就取消這個訂單,把當前商品的庫存加回去,
2.定義
TTL 全稱 Time To Live(存活時間/過期時間),當訊息到達存活時間后,還沒有被消費,會被自動清除,RabbitMQ可以對訊息設定過期時間,也可以對整個佇列(Queue)設定過期時間,
3.實作步驟
- 設定佇列過期時間使用引數:x-message-ttl,單位:ms(毫秒),會對整個佇列訊息統一過期,
- 設定訊息過期時間使用引數:expiration,單位:ms(毫秒),當該訊息在佇列頭部時(消費時),會單獨判斷這一訊息是否過期,
- 如果兩者都進行了設定,以時間短的為準,
4.通過RabbitMQ管理控制臺頁面實作Demo
1.創建訊息

2.創建交換機

3.將交換機和訊息系結

4.發送訊息
超過5秒沒有消費者消費,就自動失效了,
5.通過代碼實作TTL
添加ttl佇列
<!--ttl-->
<rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
<!--設定queue的引數-->
<rabbit:queue-arguments>
<!--x-message-ttl指佇列的過期時間-->
<entry key="x-message-ttl" value="100000" value-type="java.lang.Integer"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:topic-exchange name="test_exchange_ttl" >
<rabbit:bindings>
<rabbit:binding pattern="ttl.#" queue="test_queue_ttl"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
發送訊息測驗方法
//ttl測驗
@Test
public void testTtl(){
for (int i = 0; i < 10; i++) {
// 發送訊息
rabbitTemplate.convertAndSend("test_exchange_confirm", "ttl.test", "message confirm....");
}
}
啟動測驗方法

等待10秒

六、死信佇列
1.定義
死信佇列,英文縮寫:DLX ,Dead Letter Exchange(死信交換機),當訊息成為Dead message后,可以被重新發送到另一個交換機,這個交換機就是DLX,
說明:死信交換機和死信佇列和普通的沒有區別,當訊息成為死信后,如果該佇列系結了死信交換機,則訊息會被死信交換機重新路由到死信佇列,
2.訊息成為死信的三種情況
- 佇列訊息長度到達限制;
- 消費者拒接消費訊息,basicNack/basicReject,并且不把訊息重新放入原目標佇列,requeue=false;
- 原佇列存在訊息過期設定,訊息到達超時時間未被消費;
3.佇列系結死信交換機
給佇列設定引數: x-dead-letter-exchange 和 x-dead-letter-routing-key
4.代碼實作
在spring-rabbitmq-producer.xml中添加佇列和交換機
<!--
死信佇列:
1. 宣告正常的佇列(test_queue_dlx)和交換機(test_exchange_dlx)
2. 宣告死信佇列(queue_dlx)和死信交換機(exchange_dlx)
3. 正常佇列系結死信交換機
設定兩個引數:
* x-dead-letter-exchange:死信交換機名稱
* x-dead-letter-routing-key:發送給死信交換機的routingkey
-->
<!--
1. 宣告正常的佇列(test_queue_dlx)和交換機(test_exchange_dlx)
-->
<rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
<!--3. 正常佇列系結死信交換機-->
<rabbit:queue-arguments>
<!--3.1 x-dead-letter-exchange:死信交換機名稱-->
<entry key="x-dead-letter-exchange" value="exchange_dlx" />
<!--3.2 x-dead-letter-routing-key:發送給死信交換機的routingkey-->
<entry key="x-dead-letter-routing-key" value="dlx.test" />
<!--4.1 設定佇列的過期時間 ttl-->
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
<!--4.2 設定佇列的長度限制 max-length -->
<entry key="x-max-length" value="10" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<!-- 正常的交換機系結正常的佇列-->
<rabbit:topic-exchange name="test_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--
2. 宣告死信佇列(queue_dlx)和死信交換機(exchange_dlx)
-->
<rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
啟動生產者:
RabbitMQ管控頁面查看

訊息拒收同理
七、延遲佇列
1.定義
延遲佇列,即訊息進入佇列后不會立即被消費,只有到達指定時間后,才會被消費,在RabbitMQ中并未提供延遲佇列功能,但是可以使用:TTL+死信佇列 組合實作延遲佇列的效果,
2.場景

-
下單后,30分鐘未支付,取消訂單,回滾庫存,
-
新用戶注冊成功7天后,發送短信問候,
3.具體實作
1.生產者
在spring-rabbitmq-producer.xml添加以下代碼
<!--
延遲佇列:
1. 定義正常交換機(order_exchange)和佇列(order_queue)
2. 定義死信交換機(order_exchange_dlx)和佇列(order_queue_dlx)
3. 系結,設定正常佇列過期時間為30分鐘
-->
<!-- 1. 定義正常交換機(order_exchange)和佇列(order_queue)-->
<rabbit:queue id="order_queue" name="order_queue">
<!--3. 系結,設定正常佇列過期時間為30分鐘-->
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
<entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
<entry key="x-message-ttl" value="10000" value-type="java.lang.Integer" />
</rabbit:queue-arguments>
</rabbit:queue>
<!-- 訂單業務的交換機和佇列-->
<rabbit:topic-exchange name="order_exchange">
<rabbit:bindings>
<rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
<!--2. 定義死信交換機(order_exchange_dlx)和佇列(order_queue_dlx)-->
<rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
<rabbit:topic-exchange name="order_exchange_dlx">
<rabbit:bindings>
<rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
</rabbit:bindings>
</rabbit:topic-exchange>
發訊息測驗
/*
* 測驗延時訊息
* */
@Test
public void testDelay() throws InterruptedException {
//1.發送訂單訊息, 將來是在訂單系統中,下單成功后,發送訊息
rabbitTemplate.convertAndSend("order_exchange","order.msg","訂單資訊:id=1,time=2021年10月...");
//2.列印倒計時10秒
for (int i = 10; i > 0 ; i--) {
System.out.println(i+"...");
Thread.sleep(1000);
}
}
2.消費者
spring-rabbitmq-consumer.xml配置
<!--延遲佇列效果實作: 一定要監聽的是 死信佇列!!!-->
<rabbit:listener ref="orderListener" queue-names="order_queue_dlx"></rabbit:listener>
添加監聽器OrderListener
package com.sky.springrabbitmqconsumer.listener;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
@Component
public class OrderListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1.接收轉換訊息
System.out.println(new String(message.getBody()));
//2. 處理業務邏輯
System.out.println("處理業務邏輯...");
System.out.println("根據訂單id查詢其狀態...");
System.out.println("判斷狀態是否為支付成功");
System.out.println("取消訂單,回滾庫存....");
//3. 手動簽收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//e.printStackTrace();
System.out.println("出現例外,拒絕接受");
//4.拒絕簽收,不重回佇列 requeue=false
channel.basicNack(deliveryTag,true,false);
}
}
}
4.Demo演示
啟動生產者
啟動消費者
說明:過了十秒之后才發送訊息
八、訊息積壓
1.場景
- 消費者宕機訊息積壓
- 消費者消費能力不足
- 發送這發送流量太大
2.解決方案
上線更多的消費者,進行正常消費,上線專門的佇列消費訪問,先將訊息批量取出來,記錄到資料庫中,再慢慢處理,
九、訊息冪等性
1.定義
冪等性指一次和多次請求某一個資源,對于資源本身應該具有同樣的結果,也就是說,其任意多次執行對資源本身所產生的影響均與一次執行的影響相同,在MQ中指,消費多條相同的訊息,得到與消費該訊息一次相同的結果,
2.解決方案
訊息冪等性保障–樂觀鎖機制

除此之外,我還提供了專案地址提供給大家clone,地址鏈接:[https://gitee.com/java_wxid/liao](https://gitee.com/java_wxid/liao),專案中的spring-rabbitmq-produle改為了spring-rabbitmq-producer,原意是產品生產訊息給消費者消費,怕大家把produle搞錯,改為了生產者,更加通俗易懂,
三、Springboot集成RabbitMQ(直連模式,作業佇列模式,發布訂閱模式,路由模式,通配符模式
提示:Springboot集成Rabbitmq實戰案例,通過介面呼叫的方式演示,
一、集成步驟
一、生產者:
-
創建生產者SpringBoot工程
-
引入pom依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> -
撰寫rabbitmq配置訊息
-
定義交換機,佇列以及系結關系的配置類
-
注入RabbitTemplate,呼叫方法,完成訊息發送
二、消費者:
-
創建生產者SpringBoot工程
-
引入pom依賴
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> -
撰寫rabbitmq配置訊息
-
定義監聽類,使用@RabbitListener注解完成佇列監聽,
二、實作步驟
1.專案架構圖

2.創建專案
代碼如下(示例):
1.pom依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.sky</groupId>
<artifactId>springboot-rabbitmq-module</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-rabbitmq-module</name>
<description>springboot-rabbitmq-module</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>springboot_rabbitmq</finalName>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-war-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
2.application.properties配置
server.port=8080
#spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.addresses=110.42.239.246
spring.rabbitmq.virtual-host=springboot
#spring.rabbitmq.addresses=110.42.239.246:5672,110.42.239.247:5672,110.42.239.248:5672
說明:這里免費提供rabbitmq連接方式給大家使用學習
3.config配置
HelloWorldConfig
package com.sky.springbootrabbitmqmodule.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* HelloWorld rabbitmq課上講解的第一個作業模式
* 直連模式只需要宣告佇列,所有訊息都通過佇列轉發,
* 無需設定交換機
*/
@Configuration
public class HelloWorldConfig {
@Bean
public Queue setQueue() {
return new Queue("helloWorldqueue");
}
}
FanoutConfig
package com.sky.springbootrabbitmqmodule.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Fanout模式需要宣告exchange,并系結queue,由exchange負責轉發到queue上,
* 廣播模式 交換機型別設定為:fanout
*/
@Configuration
public class FanoutConfig {
//宣告佇列
@Bean
public Queue fanoutQ1() {
return new Queue("fanout.q1");
}
@Bean
public Queue fanoutQ2() {
return new Queue("fanout.q2");
}
//宣告exchange
@Bean
public FanoutExchange setFanoutExchange() {
return new FanoutExchange("fanoutExchange");
}
//宣告Binding,exchange與queue的系結關系
@Bean
public Binding bindQ1() {
return BindingBuilder.bind(fanoutQ1()).to(setFanoutExchange());
}
@Bean
public Binding bindQ2() {
return BindingBuilder.bind(fanoutQ2()).to(setFanoutExchange());
}
}
WorkConfig
package com.sky.springbootrabbitmqmodule.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class WorkConfig {
//宣告佇列
@Bean
public Queue workQ1() {
return new Queue("work_sb_mq_q");
}
}
DirectConfig
package com.sky.springbootrabbitmqmodule.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*
路由模式|Routing模式 交換機型別:direct
*/
@Configuration
public class DirectConfig {
//宣告佇列
@Bean
public Queue directQ1() {
return new Queue("direct_sb_mq_q1");
}
@Bean
public Queue directQ2() {
return new Queue("direct_sb_mq_q2");
}
//宣告exchange
@Bean
public DirectExchange setDirectExchange() {
return new DirectExchange("directExchange");
}
//宣告binding,需要宣告一個routingKey
@Bean
public Binding bindDirectBind1() {
return BindingBuilder.bind(directQ1()).to(setDirectExchange()).with("directBind.one");
}
@Bean
public Binding bindDirectBind2() {
return BindingBuilder.bind(directQ2()).to(setDirectExchange()).with("directBind.two");
}
}
TopicConfig
package com.sky.springbootrabbitmqmodule.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/*
Topics模式 交換機型別 topic
* */
@Configuration
public class TopicConfig {
//宣告佇列
@Bean
public Queue topicQ1() {
return new Queue("topic_sb_mq_q1");
}
@Bean
public Queue topicQ2() {
return new Queue("topic_sb_mq_q2");
}
//宣告exchange
@Bean
public TopicExchange setTopicExchange() {
return new TopicExchange("topicExchange");
}
//宣告binding,需要宣告一個roytingKey
@Bean
public Binding bindTopicHebei1() {
return BindingBuilder.bind(topicQ1()).to(setTopicExchange()).with("directBind.*");
}
@Bean
public Binding bindTopicHebei2() {
return BindingBuilder.bind(topicQ2()).to(setTopicExchange()).with("#.two");
}
}
4.消費端component
package com.sky.springbootrabbitmqmodule.component;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class ConcumerReceiver {
//直連模式的多個消費者,會分到其中一個消費者進行消費,類似task模式
//通過注入RabbitContainerFactory物件,來設定一些屬性,相當于task里的channel.basicQos
@RabbitListener(queues="helloWorldqueue")
public void helloWorldReceive(String message) {
System.out.println("helloWorld模式 received message : " +message);
}
//作業佇列模式
@RabbitListener(queues="work_sb_mq_q")
public void wordQueueReceiveq1(String message) {
System.out.println("作業佇列模式1 received message : " +message);
}
@RabbitListener(queues="work_sb_mq_q")
public void wordQueueReceiveq2(String message) {
System.out.println("作業佇列模式2 received message : " +message);
}
//pub/sub模式進行訊息監聽
@RabbitListener(queues="fanout.q1")
public void fanoutReceiveq1(String message) {
System.out.println("發布訂閱模式1received message : " +message);
}
@RabbitListener(queues="fanout.q2")
public void fanoutReceiveq2(String message) {
System.out.println("發布訂閱模式2 received message : " +message);
}
//Routing路由模式
@RabbitListener(queues="direct_sb_mq_q1")
public void routingReceiveq1(String message) {
System.out.println("Routing路由模式routingReceiveqOne received message : " +message);
}
@RabbitListener(queues="direct_sb_mq_q2")
public void routingReceiveq2(String message) {
System.out.println("Routing路由模式routingReceiveqTwo received message : " +message);
}
//topic 模式
//注意這個模式會有優先匹配原則,例如發送routingKey=hunan.IT,那匹配到hunan.*(hunan.IT,hunan.eco),之后就不會再去匹配*.ITd
@RabbitListener(queues="topic_sb_mq_q1")
public void topicReceiveq1(String message) {
System.out.println("Topic模式 topic_sb_mq_q1 received message : " +message);
}
@RabbitListener(queues="topic_sb_mq_q2")
public void topicReceiveq2(String message) {
System.out.println("Topic模式 topic_sb_mq_q2 received message : " +message);
}
}
5.生產者controller
package com.sky.springbootrabbitmqmodule.controller;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.io.UnsupportedEncodingException;
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
//helloWorld 直連模式
@GetMapping(value="/helloWorldSend")
public Object helloWorldSend(String message) throws AmqpException, UnsupportedEncodingException {
//設定部分請求引數
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//發訊息
rabbitTemplate.send("helloWorldqueue",new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : "+message;
}
//作業佇列模式
@GetMapping(value="/workqueueSend")
public Object workqueueSend(String message) throws AmqpException, UnsupportedEncodingException {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//制造多個訊息進行發送操作
for (int i = 0; i <10 ; i++) {
rabbitTemplate.send("work_sb_mq_q", new Message(message.getBytes("UTF-8"),messageProperties));
}
return "message sended : "+message;
}
// pub/sub 發布訂閱模式 交換機型別 fanout
@GetMapping(value="/fanoutSend")
public Object fanoutSend(String message) throws AmqpException, UnsupportedEncodingException {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//fanout模式只往exchange里發送訊息,分發到exchange下的所有queue
rabbitTemplate.send("fanoutExchange", "", new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : "+message;
}
//routing路由作業模式 交換機型別 direct
@GetMapping(value="/directSend")
public Object routingSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {
if(null == routingKey) {
routingKey="directBind.one";
}
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//fanout模式只往exchange里發送訊息,分發到exchange下的所有queue
rabbitTemplate.send("directExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : routingKey >"+routingKey+";message > "+message;
}
//topic 作業模式 交換機型別 topic
@GetMapping(value="/topicSend")
public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {
if(null == routingKey) {
routingKey="directBind.one";
}
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//fanout模式只往exchange里發送訊息,分發到exchange下的所有queue
rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : routingKey >"+routingKey+";message > "+message;
}
}
上面就是這個專案的所有代碼了,下面就是Demo演示內容,
三、演示步驟
1.啟動專案

2.呼叫介面演示
1.直連模式
1.介面呼叫

2.控制臺列印

2.作業佇列模式
1.介面呼叫

2.控制臺列印

3.發布訂閱模式(交換機型別:fanout)
1.介面呼叫

2.控制臺列印

4.路由作業模式(交換機型別:direct)
1.介面呼叫

2.控制臺列印

5.通配符模式(交換機型別:topic)
1.介面呼叫

2.控制臺列印
除此之外,我還提供了專案地址提供給大家clone,地址鏈接:https://gitee.com/java_wxid/liao
四、RabbitMQ集群搭建
一、準備作業
首先搭建好多臺獨立的RabbitMQ,這里可以使用寶塔圖形化頁面安裝也可自行安裝 假設已經單獨搭建好二臺服務器了,這個時候就需要對服務器之間資料互通,
二、集群搭建
1.集群搭建步驟
設定服務器別名
- 服務器1:hostnamectl set‐hostname m1
- 服務器2:hostnamectl set‐hostname m2
在m1服務器中統一 erlang.cookie 檔案中 cookie 值 將m1中的 .erlang.cookie 同步到 m2中
scp /var/lib/rabbitmq/.erlang.cookie m2:/var/lib/rabbitmq/.erlang.cookie
說明:上面的命令中m2使用ip也行
Rabbitmq 集群添加節點:重啟m2機器中rabbitmq 的服務在m2執行
#停止用戶請求
rabbitmqctl stop_app
#將m2合并到集群中
rabbitmqctl join_cluster ‐‐ram rabbit@m2
#開啟用戶請求
rabbitmqctl start_app
#開啟管理頁面
rabbitmq‐plugins enable rabbitmq_management
#重啟服務
systemctl restart rabbitmq‐server.service
查看集群資訊
rabbitmqctl cluster_status
2.集群搭建負載均衡-HAProxy搭建
1.執行安裝
#1、安裝 yum install haproxy
#2、配置haproxy.cfg檔案 具體參照 如下配置HAProxy vim /etc/haproxy/haproxy.cfg,進入檔案找到maxconn 3000把后面的內容都洗掉,添加集群監聽,開啟haproxy監控服務,代碼如下:
#對MQ集群進行監聽
listen rabbitmq_cluster
bind 0.0.0.0:5672
option tcplog
mode tcp
option clitcpka
timeout connect 1s
timeout client 10s
timeout server 10s
balance roundrobin
server node1 節點1 ip地址:5672 check inter 5s rise 2 fall 3
server node2 節點2 ip地址:5672 check inter 5s rise 2 fall 3
#開啟haproxy監控服務
listen http_front
bind 0.0.0.0:1080
stats refresh 30s
stats uri /haproxy_stats
stats auth admin:admin
#3、啟動haproxy systemctl start haproxy
#4、查看haproxy行程狀態 systemctl status haproxy.service
#狀態如下說明已經啟動成功 Active: active (running)
#訪問如下地址對mq節點進行監控 http://服務器IP:1080/haproxy_stats
#代碼中訪問mq集群地址,則變為訪問haproxy地址:5672
2.haproxy.cfg配置詳解
listen rabbitmg cluster
bind 0.0.0.0:5672#通過5672對M1, M2進行映射
option tcplog #記錄tcp連接的狀態和時間
mode tcp#四層協議代理,即對TCP協議轉發
option clitcpka #開啟TCP的Keep Alive. (長連接模式)
timeout connect 1s #haproxy與mq建立連接的超時時間
timeout client 10s#客戶端與haproxy最大空閑時間,
timeout server 10s #服務器與haproxy最大空閑時間
balance roundrobin #采用輪詢轉發訊息
#每5秒發送一次心跳包,如連續兩次有回應則代表狀態良好,
#如連續三次沒有回應,則視為服務故障,該節點將被剔除,
server node1 ip1:5672 check inter 5s rise 2 fall 3
server node2 ip2:5672 check inter 5s rise 2 fall 3
listen http front
#監聽埠
bind 0.0.0.0:1080
#統計頁面自動重繪時間stats refresh 30s
#統計頁面url
stats uri /haproxy?stats
#指定HAproxy訪問用戶名和密碼設定
stats auth admin:admin
這個時候連接就可以通過haproxy代理連接啦,當然啦,HAProxy也有自己的管理頁面,就是你安裝服務器的ip加配置的1080埠,可以直接訪問啦,當然HA也可以配置多臺的,

總結
以上就是今天要講的內容,全文共計八萬八千多字,詳細的介紹了RabbitMQ的基本概念,六種作業模式,訊息可靠性投遞,死信佇列,延遲佇列等,通過圖文代碼的方式直觀的表述出來給大家演示效果,除此之外,還包含了對Spring框架集成RabbitMQ以及SpringBoot框架集成RabbitMQ,還希望各位讀者大大能夠在評論區積極參與討論,給文章提出一些寶貴的意見或者建議,合理的內容,我會采納更新博文,重新分享給大家,
給讀者大大的話
我本身是一個很普通的程式員,放在人堆里,除了與生俱來的盛世美顏、所剩不多的發量,就剩下180的大高個了,就是我這樣的一個人,默默堅持寫博文也有好多年了,有句老話說的好,牛逼之前都是傻逼式的堅持,希望自己可以通過大量的作品,時間的積累,個人魅力、運氣和時機,可以打造屬于自己的技術影響力,同時也希望自己可以成為一個懂技術,懂業務,懂管理的綜合型人才,作為專案架構路線的總設計師,掌控全域的團隊大腦,技術團隊中的絕對核心是我未來幾年不斷前進的目標,
提示:以下都是資源分享,求個一鍵三連,
博客封面
首先我要說聲抱歉,作為一個學習的平臺,封面引人注目是營銷策略,大家不用太過在意哈,專注博客內容本身即可,當然有同學惦記著我博客的封面,這里也分享出來給大家,
點擊:博客封面
提取碼:2021
面試資料
福利大放送,我就求個一鍵三連,拜托了,這對我真的很重要,
點擊:面試資料
提取碼:2021
200套PPT模板
福利大放送,我就求個一鍵三連,拜托了,這對我真的很重要,
點擊:200套PPT模板
提取碼:2021
提問的智慧
福利大放送,我就求個一鍵三連,拜托了,這對我真的很重要,
點擊:提問的智慧
提取碼:2021
Java開發學習路線
| 名稱 | 鏈接 |
|---|---|
| JavaSE | 點擊: JavaSE |
| MySQL專欄 | 點擊: MySQL專欄 |
| JDBC專欄 | 點擊: JDBC專欄 |
| MyBatis專欄 | 點擊: MyBatis專欄 |
| Web專欄 | 點擊: Web專欄 |
| Spring專欄 | 點擊: Spring專欄 |
| SpringMVC專欄 | 點擊: SpringMVC專欄 |
| SpringBoot專欄 | 點擊: SpringBoot專欄 |
| SpringCould專欄 | 點擊: SpringCould專欄 |
| Redis專欄 | 點擊: Redis專欄 |
| Linux專欄 | 點擊: Linux專欄 |
| Maven3專欄 | 點擊: Maven3專欄 |
| Spring Security5專欄 | 點擊: Spring Security5專欄 |
| 更多專欄 | 更多專欄,請到 java_wxid主頁 查看 |
P5學習路線圖
P6學習路線圖
P7學習路線圖
P8學習路線圖

圖片參考地址:https://www.tulingxueyuan.cn/
以上四張圖詳細介紹了作為Java開發作業者所需要具備的知識技能,同學們學廢了嘛,有想法系統學習的同學可以私聊我,記得點波關注喲,
點擊關注博主:java_wxid
點擊關注社區:幕后大佬
一鍵三連
感謝大家的支持,用心寫博文分享給大家,你的支持(點贊、收藏、關注)是對我創作的最大幫助,
微信公眾號:南北踏塵
主頁地址:java_wxid
社區地址:幕后大佬
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/375797.html
標籤:其他
上一篇:hadoop環境搭建(四)
下一篇:hadoop環境搭建(一)補充
