RabbitMQ 說明
本章,我們主要從RabbitMQ簡介、RabbitMQ安裝、RabbitMQ常用命令、RabbitMQ架構模式、RabbitMQ使用、Quick.RabbitMQPlus的使用和RabbitMQ總結這幾個方面對
RabbitMQ進行介紹!
1、??RabbitMQ 簡介
RabbitMQ 是使用Erlang語言開發的開源訊息佇列系統,基于 AMQP 協議來實作,
AMQP 的主要特征是面向訊息、佇列、路由(包括點對點和發布/訂閱)、可靠性和安全,AMQP 協議更多用在企業系統內,對資料一致性、穩定性和可靠性要求很高的場景,對性能和吞吐量的要求還在其次,
MQ 全稱為 Message Queue,訊息佇列(MQ)是一種應用程式對應用程式的通信方法,應用程式通過讀寫出入佇列的訊息(針對應用程式的資料)來通信,而無需專用連接來鏈接它們,
訊息傳遞指的是程式之間通過在訊息中發送資料進行通信,而不是通過直接呼叫彼此來通信,直接呼叫通常是用于諸如遠程程序呼叫的技術,排隊指的是應用程式通過佇列來通信,佇列的使用除去了接收和發送應用程式同時執行的要求,
2、??RabbitMQ 安裝
以下我們主要介紹 RabbitMQ 在 Windows 環境中的安裝程序,
2.1、?? 下載 OTP
由于 RabbitMQ 使用 Erlang 技術開發,所以需要先安裝 Erlang 運行環境后,才能安裝訊息佇列服務,
我們到https://www.erlang.org/downloads下載相應版本的安裝包,如這里我們下載https://github.com/erlang/otp/releases/download/OTP-25.0.4/otp_win64_25.0.4.exe這個版本,如下圖所示:

2.2、?? 下載 RabbitMQ
我們到https://www.rabbitmq.com/download.html下載相應版本的安裝包,如這里我們下載https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.10.7/rabbitmq-server-3.10.7.exe這個版本,如下圖所示:

2.3、?? 安裝 Erlang 和 RabbitMQ
我們在 D 盤分別創建目錄D:\Net_Program\Net_RabbitMQErlang和D:\Net_Program\Net_RabbitMQ(實際環境中根據自己的需求新建目錄即可),用于安裝 Erlang 和 RabbitMQ,
雙擊下載下來的 otp_win64_25.0.4.exe 和 rabbitmq-server-3.10.7.exe 進行安裝,安裝程序中將安裝目錄選擇剛才創建的目錄即可,其他按照默認設定即可,
2.4、?? 激活 Rabbit MQ's Management Plugin
激活步驟如下所示:
-
以管理員身份運行 CMD
-
然后切換到 RabbitMQ 的安裝目錄 sbin 下,D:\Net_Program\Net_RabbitMQ\rabbitmq_server-3.10.7\sbin
-
然后輸入如下命令并執行
rabbitmq-plugins.bat enable rabbitmq_management如出現如下所示的提示資訊,原因是安裝了 Erlang 沒有重啟電腦導致的環境變數沒有生效,重啟電腦即可:

-
然后輸入如下命令重啟 RabbitMQ 服務
net stop rabbitmq && net start rabbitmq -
最后即可訪問 RabbitMQ 的管理控制臺了,訪問地址(默認賬戶和密碼為 guest):http://localhost:15672
激活程序如下圖所示:



2.5、?? 遠程設定
如果希望 RabbitMQ 允許遠程連接,比如在 Windows Service2012 服務器上安裝了 RabbitMQ,其他客戶端想連接此服務器的 RabbitMQ,則需要設定防火墻開放埠,
具體設定步驟(以 Windows Service2012 為例):
- 打開防火墻 → 入站規則 → 新建規則 → 選擇“埠”,下一步 → 選擇 TCP,并在特定本地埠中填入 15672,5671-5672,下一步 → 選擇“允許連接”,下一步 → 下一步 → 輸入名稱或描述 → 完成,
2.6、?? Docker 中安裝 RabbitMQ
如果你電腦上沒安裝 Docker,請先安裝 Docker,可參考:Docker 的安裝
RabbitMQ 在 Docker 中的鏡像地址:https://hub.docker.com/_/rabbitmq

2.6.1、拉取鏡像容器并安裝
-
拉取 RabbitMQ 鏡像
以管理員身份運行 CMD,執行如下命令拉取 RabbitMQ 鏡像:
docker pull rabbitmq

拉取完成后,我們可以打開 Docker Desktop 客戶端查看就多了一個名稱為 rabbitmq 的鏡像了,如下圖所示:

-
新建目錄
在
D:\Net_Program\Net_Docker\RabbitMQ下分別新建Data和Log檔案夾,用于存放 RabbitMQ 資料和日志:D:\Net_Program\Net_Docker\RabbitMQ\Data
D:\Net_Program\Net_Docker\RabbitMQ\Log
-
創建并啟動容器
以管理員身份運行 CMD,執行如下命令創建并啟動容器:
docker run -d --volume D:/Net_Program/Net_Docker/RabbitMQ/Data:/var/lib/rabbitmq --volume D:/Net_Program/Net_Docker/RabbitMQ/Log:/var/log/rabbitmq --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq
:::tip 引數說明:
完整執行命令如下:
docker run -d --volume D:/Net_Program/Net_Docker/RabbitMQ/Data:/var/lib/rabbitmq --volume D:/Net_Program/Net_Docker/RabbitMQ/Log:/var/log/rabbitmq --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=/ -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest rabbitmq-
-d:表示在后臺運行容器;
-
--volume D:/Net_Program/Net_Docker/RabbitMQ/Data:/var/lib/rabbitmq:映射 RabbitMQ 資料存盤目錄;
-
--volume D:/Net_Program/Net_Docker/RabbitMQ/Log:/var/log/rabbitmq:映射 RabbitMQ 日志存盤目錄;
-
--name rabbitmq:設定容器名稱;
-
--hostname:指定主機名(RabbitMQ 的一個重要注意事項是它根據所謂的 節點名稱 存盤資料,默認為主機名);
-
-p:將容器的埠 5672(應用訪問埠)和 15672 (控制臺 Web 埠號)映射到主機中;
-
-e:指定環境變數:
-
RABBITMQ_DEFAULT_VHOST:默認虛擬機名;
-
RABBITMQ_DEFAULT_USER:默認的用戶名;
-
RABBITMQ_DEFAULT_PASS:默認的用戶密碼;
-
:::
-
-
啟動 Docker 的時候自動啟動 RabbitMQ
以管理員身份運行 CMD,執行如下命令:
docker update rabbitmq --restart=always
2.6.2、安裝 Rabbit MQ's Management Plugin
-
方式 1:
以管理員身份運行 CMD,執行如下命令先進入 RabbitMQ 容器:
docker exec -it rabbitmq /bin/bash再執行如下命令:
rabbitmq-plugins enable rabbitmq_management
這時候在瀏覽器中打開http://localhost:15672即可查看 RabbitMQ 的 Web 管理端了,

-
方式 2:
以管理員身份運行 CMD,執行如下命令即可:
docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management -
注意事項:
當我們安裝好插件后,打開http://localhost:15672Web 管理界面,使用 guest 賬戶登錄進去后,點擊Channels標簽,會出現如下圖所示的警告提醒:

-
以管理員身份運行 CMD,執行如下命令先進入 RabbitMQ 容器:
docker exec -it rabbitmq /bin/bash -
再執行如下命令切換到以下路徑:
cd /etc/rabbitmq/conf.d/ -
再執行如下命令修改
management_agent.disable_metrics_collector=false:echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf -
再執行如下命令退出容器:
exit -
再執行如下命令重啟容器:
docker restart rabbitmq
執行命令程序:

界面效果:

-
3、??RabbitMQ 常用命令
3.1、?? 用戶管理
-
增加用戶:
rabbitmqctl add_user user1 pwd1 -
洗掉用戶:
rabbitmqctl delete_user user1 -
修改密碼:
rabbitmqctl change_password user1 123456 -
查看用戶串列:
rabbitmqctl list_users
3.2、?? 用戶角色
-
設定用戶角色:
rabbitmqctl set_user_tags user1 Taguser1 為用戶名稱
Tag 為角色名稱,如:administrator、monitoring、policymaker、management、impersonator
-
設定多個角色:
rabbitmqctl set_user_tags user1 Tag1 Tag2
執行命令如下圖所示:
![ml]
![ml]
![ml]
4、??RabbitMQ 架構模式
4.1、??RabbitMQ 架構模式

架構模式說明:
-
首先建立生產者與 MQ 之間的連接;
-
然后生產者將訊息發送到 MQ 的交換機中;
-
交換機將訊息分別存盤到與之系結的佇列中;
-
建立消費者與 MQ 之間的連接;
-
消費者指定消費哪個佇列的訊息;
-
最后佇列將訊息推送給對應的消費者,
4.2、??RabbitMQ 中幾個核心概念
-
Message(訊息):訊息是不透明的,是由一系列的可選屬性組成,如:路由鍵(RoutingKey)、相對其他訊息的優先權(Priority)、指出該訊息是否需要永久存盤(DeliveryMode)等;
-
Producer(生產者):生產者是向交換機發布訊息的客戶端應用程式;
-
Exchange(交換機):用來接受訊息并將訊息路由(存盤)給服務器中的佇列,交換機有四種型別,即決定訊息發布到那個佇列,具體有以下的型別:
-
Fanout:發布訂閱(廣播模式),每個發送到 Fanout 型別的交換器訊息,交換器會將訊息發送到它系結的所有佇列中,它轉發訊息是最快的,也是目前使用最多的型別,
-
Direct:路由模式,路由模式下,在發布訊息時指定不同的 RouteKey,交換機會根據不同的 RouteKey 分發訊息到不同的佇列中,簡單點說其實就是在 Fanout 基礎上多增加了一個 RoutingKey 條件,
-
Topic:通配符模式(主題),通配符模式和路由模式其實差不多,不同之處在于通配符模式中的路由可以宣告為模糊查詢,符號
#匹配一個或多個詞,符號*匹配一個詞,RabbitMQ 中通配符的通配符是用.來分割字串的,比如a.*只能匹配到 a.b、a.c,而a.#可以匹配到 a.a.c、a.a.b, -
Headers:
根據訊息內容中的 Headers 屬性匹配(性能差,不實用,使用少),該模式基本不使用,
-
-
Queue(佇列):訊息的
存放容器,一個訊息可以放在一個或者多個佇列中; -
Binding(系結):如果想要將訊息存放到具體的佇列中,就需要先將佇列和交換機進行系結,交換機跟佇列的系結可以是多對多的關系;
-
Connection(連接):如一個 Tcp 連接;
-
Channel(通道):多路復用連接中的一條獨立的雙向資料流通道,通道是建立在真實的 TCP 連接內的虛擬通道,AMQP 命令都是通過通道發出去的,不管是發布訊息、訂閱佇列還是接收訊息,都是通過通道完成的,因為對于作業系統來說創建和銷毀一個 TCP 連接都是很昂貴的開銷,所以使用通道以實作復用一條 TCP 連接;
-
Consumer(消費者):接收和消費訊息的客戶端應用程式;
-
Virtual Host(虛擬主機):即小型的 RabbitMQ 服務器,它表示一批交換器,訊息佇列和相關物件,連接時必須指定,默認是:/(以路徑區分);
-
Broker:訊息佇列服務器物體,
4.3、??RabbitMQ 幾種模式
4.3.1、簡單佇列模式
在該模式下,不用顯示宣告交換機,只需宣告一個佇列即可,
生產者指定佇列名稱發送訊息給 MQ,然后會有一個默認的交換機將訊息轉發給這個佇列,
消費者只需要監聽這個佇列,一有訊息就會得到通知做出回應,
如下圖所示:

4.3.2、作業佇列模式(Work Queues)
和簡單佇列模式基本一樣,不過有一點不同,該模式有多個消費者在監聽佇列,
RabbitMQ 會以輪詢的方式將訊息發給多個消費者確保一條訊息只會被一個消費者消費,即:在該模式下一條訊息只會被其中一個消費者消費,
4.3.3、Exchange - 發布訂閱模式(Fanout)
和上面 2 種模式默認提供交換機不同的是,該模式需要顯示宣告交換機,然后可以創建多個佇列和這個交換機進行系結,
生產者發訊息給 MQ 時需要指定交換機,然后交換機將訊息轉發給與自己系結的所有佇列.
消費者監聽指定的佇列獲得訊息,每個佇列可以有多個消費者監聽,同樣也是以輪詢的機制發給消費者,
如下圖所示:

該模式是目前使用最多的模式,
4.3.4、Exchange - 路由模式(Direct)
和發布訂閱模式不同的是,佇列系結交換機時需要指定一個 RoutingKey,
那么生產者發送訊息時不僅需要指定交換機還需要指定 RoutingKey,
這樣的話交換機就會把訊息轉發給跟自己系結并且 RoutingKey 相匹配的佇列,
如下圖所示:

PS:當生產者發送了一個訊息且發送的 RoutingKey 為 success 時,交換機會根據該 RoutingKey 匹配并轉發訊息到 Queue1 和 Queue2,兩個佇列都滿足了路由規則;當 RoutingKey 為 error 時,僅 Queue2 滿足,則將訊息轉發給 Queue2,
4.3.5、Exchange - 通配符模式(Topic)
和路由模式唯一的不同就是可以設定帶有通配符進行模糊匹配的 RoutingKey,
設定的 RoutingKey(不論是 BindingKey 還是 RoutingKey)都需要為帶.的字串,比如 a.b、c.d.e、fff.gggg.hhhh 等,最多為 255 個位元組.
在交換機和佇列系結時,給定的 RoutingKey 可以依照如下來設定:
-
:匹配 0~N 個單詞;
-
*:匹配 1 個單詞,
比如兩個 RoutingKey 分別為 index.和#.crt,當生產者發送訊息時給定的 RoutingKey 為 index.a、index.b 或 index.c 等都滿足 index.的規則,a.crt、aa.crt 或是 b.crt 等都滿足#.crt 的規則,
如下圖所示:

5、??RabbitMQ 使用
針對 RabbitMQ 的使用,這里我們主要介紹在.Net Core(.Net6)中的簡單使用,其他平臺或語言類似,僅作參考,
5.1、?? 安裝 RabbitMQ.Client 包
我們將使用RabbitMQ.Client這個包來實作,當然也可以使用其他包,如:EasyNetQ,
使用如下命令安裝 RabbitMQ.Client 包即可:
Install-Package RabbitMQ.Client -Version 6.4.0
5.2、?? 生產者實作
首先我們定義 RabbitMQHelper.cs 幫助類,該類需要實作泛型 T 的定義(T 為發送和接收的訊息物體),
然后在該類中定義訊息通道、訊息連接、交換機名稱、佇列名稱集合和路由規則變數,
定義建構式 RabbitMQHelper,并實作連接工廠的定義、訊息連接的初始化、訊息通道的初始化以及交換機型別(此處我們以發布訂閱模式 Fanout 的實作為例)的定義,
注意:以下實作代碼僅僅為了展示使用說明,沒有進一步進行封裝,
具體幫助類和呼叫示例代碼如下所示:
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text;
namespace Quick.RabbitMQPlus
{
public class RabbitMQHelper<T> where T : class
{
/// <summary>
/// 訊息通道
/// </summary>
readonly IModel _channel;
/// <summary>
/// 訊息連接
/// </summary>
readonly IConnection _connection;
/// <summary>
/// 交換機名稱
/// </summary>
readonly string _exchangeName = "TestExchangeName";
/// <summary>
/// 佇列名稱集合
/// </summary>
readonly List<string> _queueNames = new() { "Queue1", "Queue2" };
/// <summary>
/// 路由規則
/// </summary>
string _routeKey = "TestRouteName";
/// <summary>
/// 建構式
/// </summary>
public RabbitMQHelper()
{
//創建連接工廠
var connectionFactory = new ConnectionFactory
{
HostName = "192.168.1.1",
UserName = "admin",
Password = "123456",
Port = 5672
};
//創建連接
_connection = connectionFactory.CreateConnection();
//創建通道
_channel = _connection.CreateModel();
/*
* 定義一個Fanout型別交換機:
* 引數1:交換機名稱
* 引數2:交換機型別
* 引數3:是否開啟訊息持久化
* 引數4:是否設定如果這個佇列沒有其他消費者消費,佇列自動洗掉
* 引數5:指定佇列攜帶的資訊
*/
_channel.ExchangeDeclare(_exchangeName, ExchangeType.Fanout, false, false, null);
}
/// <summary>
/// 發送訊息
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
public async Task<(bool, string)> Send(T data)
{
return await Task.Run(() =>
{
if (_channel == null)
{
return (false, "RabbitMQ初始化錯誤,請檢查連接配置!");
}
var ret = true;
var errMsg = string.Empty;
try
{
if (string.IsNullOrWhiteSpace(_routeKey))
{
_routeKey = _exchangeName;
}
//將多個佇列系結到交換機上
foreach (var item in _queueNames)
{
/*
* 定義佇列:
* 引數1:佇列名稱
* 引數2:是否持久化,true為持久化,佇列會存盤到磁盤,服務器重啟時可以保證不丟失資訊
* 引數3:是否排他,true為排他,如果一個佇列宣告為排他佇列,該佇列僅對首次宣告它的連接可見,并在連接斷開時自動洗掉
* 引數4:是否自動洗掉,true為自動洗掉,自動洗掉的前提是:致少有一個消費者連接到這個佇列,之后所有與這個佇列連接的消費者都斷開時,才會自動洗掉
* 引數5:指定佇列攜帶的資訊
*/
_channel.QueueDeclare(item, true, false, false, null);
//將佇列系結到交換機
_channel.QueueBind(item, _exchangeName, _routeKey, null);
}
//將物體序列化為字串,該方法(ObjectToJson)需自己實作
var msg = ObjectToJson(data);
//將字串轉換為byte[]
var msgBody = Encoding.UTF8.GetBytes(msg);
/*
* 發布訊息:
* 引數1:交換機名稱,如果傳"",將使用RabbitMQ默認的交換機名稱
* 引數2:指定路由的規則,使用具體的佇列名稱,交換機為""時,訊息直接發送到佇列中
* 引數3:指定傳遞的訊息攜帶的properties
* 引數4:指定傳遞的訊息,byte[]型別
*/
_channel.BasicPublish(_exchangeName, _routeKey, null, msgBody);
}
catch (Exception e)
{
ret = false;
errMsg = e.Message;
}
return (ret, errMsg);
});
}
/// <summary>
/// 關閉通道和連接
/// </summary>
public void Close()
{
_channel.Close();
_connection.Close();
}
}
}
//訊息物體
var msgModel = new TestRabbitMQModel
{
UserId = rand.Next(1, 9999),
UserName = "Quber",
UserAge = rand.Next(20, 80),
CreateTime = DateTime.Now
};
//定義發送物件
var sendInstance = new RabbitMQHelper<TestRabbitMQModel>();
//發送訊息
var sendRet = await sendInstance.Send(msgModel);
if (sendRet.Item1)
{
//發送成功
//……
//訊息發送完成后,關閉通道(根據實際情況自行決定要不要呼叫關閉方法)
sendInstance.Close();
}
else
{
//發送失敗
var errMsg = $"失敗原因:{sendRet.Item2}";
}5.3、?? 消費者實作
消費者的實作和生產者的實作基本一樣,直接上代碼,如下所示:
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text;
namespace Quick.RabbitMQPlus
{
public class RabbitMQHelper<T> where T : class
{
/// <summary>
/// 訊息通道
/// </summary>
readonly IModel _channel;
/// <summary>
/// 訊息連接
/// </summary>
readonly IConnection _connection;
/// <summary>
/// 交換機名稱
/// </summary>
readonly string _exchangeName = "TestExchangeName";
/// <summary>
/// 佇列名稱集合
/// </summary>
readonly List<string> _queueNames = new() { "Queue1", "Queue2" };
/// <summary>
/// 路由規則
/// </summary>
string _routeKey = "TestRouteName";
/// <summary>
/// 建構式
/// </summary>
public RabbitMQHelper()
{
//創建連接工廠
var connectionFactory = new ConnectionFactory
{
HostName = "192.168.1.1",
UserName = "admin",
Password = "123456",
Port = 5672
};
//創建連接
_connection = connectionFactory.CreateConnection();
//創建通道
_channel = _connection.CreateModel();
/*
* 定義一個Fanout型別交換機:
* 引數1:交換機名稱
* 引數2:交換機型別
* 引數3:是否開啟訊息持久化
* 引數4:是否設定如果這個佇列沒有其他消費者消費,佇列自動洗掉
* 引數5:指定佇列攜帶的資訊
*/
_channel.ExchangeDeclare(_exchangeName, ExchangeType.Fanout, false, false, null);
}
/// <summary>
/// 接收訊息
/// </summary>
/// <param name="queueName">佇列名稱</param>
/// <param name="received">回呼方法</param>
/// <param name="prefetchCount">設定RabbitMQ一次最多推送多少條訊息給消費者,默認為10</param>
public async Task<(bool, string)> Receive(string queueName, Func<T?, string, Task<bool>> received, ushort prefetchCount = 10)
{
return await Task.Run(() =>
{
if (_channel == null)
{
return (false, "RabbitMQ初始化錯誤,請檢查連接配置!");
}
var ret = true;
var errMsg = string.Empty;
try
{
/*
* 設定限流機制
* 引數1:訊息本身的大小,如果設定為0,那么表示對訊息本身的大小不限制
* 引數2:設定RabbitMQ一次最多推送多少條訊息給消費者
* 引數3:是否將上面的設定應用于整個通道,false表示只應用于當前消費者
*/
_channel.BasicQos(0, prefetchCount, false);
//創建消費者物件
var consumer = new EventingBasicConsumer(_channel);
//接收到訊息事件
consumer.Received += async (_, ea) =>
{
//獲取訊息以及反序列化為物體(JsonToObject方法需自己實作)
var msg = Encoding.UTF8.GetString(ea.Body.ToArray());
var data = https://www.cnblogs.com/qubernet/p/JsonToObject(msg);
var retRec = true;
try
{
//接收消費事件,如果回傳true則代表處理成功,以便告知RabbitMQ該訊息已消費并處理成功
retRec = await received(data, msg);
}
catch (Exception e)
{
retRec = false;
}
//業務處理成功的時候
if (retRec)
{
//告知RabbitMQ該訊息成功處理,可以從佇列中洗掉該訊息了
_channel.BasicAck(ea.DeliveryTag, false);
}
else
{
/*
* 告知RabbitMQ該訊息處理失敗,重新加入佇列,以便后續可再次消費該訊息
* 引數1:
* 引數2:是否將該訊息重新加入佇列,true為重新加入佇列
*
* 需要注意的是:
* 假設await received(data, msg);一直對某些訊息都處理失敗(即retRec=false),
* 那么這些資料(這一批次的所有資料)會重新進入佇列,并在下次重新消費,
* 如果業務方法received不做處理的話,有可能會造成一直回圈消費該批次的訊息
*/
_channel.BasicReject(ea.DeliveryTag, true);
}
};
//啟動消費者,設定為手動應答訊息
/*
* 啟動消費者:
* 引數1:指定要消費哪個佇列的名稱
* 引數2:指定是否自動告訴RabbitMQ該訊息已收到
* 引數3:指定訊息的回呼
*/
_channel.BasicConsume(queueName, false, consumer);
}
catch (Exception e)
{
ret = false;
errMsg = e.Message;
}
return (ret, errMsg);
});
}
///
/// 關閉通道和連接
///
public void Close()
{
_channel.Close();
_connection.Close();
}
}
}
//定義接收物件
var recInstance = new RabbitMQHelper<TestRabbitMQModel1>();
//接收佇列Queue1的訊息
var retRec = await recInstance.Receive("Queue1", async (data, msg) =>
{
await Task.Delay(5000);
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"\r\n佇列1訊息:{msg}");
//回傳true代表業務邏輯處理成功,會告知MQ這條訊息已經接收成功,會從MQ佇列中洗掉
//回傳false代表業務邏輯處理失敗,會告知MQ這條訊息沒有處理成功,則MQ會繼續推送這條訊息
return true;
}, 1);
//接收訊息失敗
if (!retRec.Item1)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"\r\n佇列1接收失敗:{retRec1.Item2}");
}
//關閉通道(根據實際情況自行決定要不要呼叫關閉方法)
recInstance.Close();6、??Quick.RabbitMQPlus.Furion 的使用
為了更好更簡單的在.Net Core 中使用 RabbitMQ,特此基于RabbitMQ.Client封裝了Quick.RabbitMQPlus.Furion和Quick.RabbitMQPlus組件,
-
Quick.RabbitMQPlus.Furion:依賴于.Net6+、Furion
-
Quick.RabbitMQPlus:依賴于.Net6+
Quick.RabbitMQPlus.Furion 包地址為:https://www.nuget.org/packages/Quick.RabbitMQPlus.Furion
關于 Quick.RabbitMQPlus.Furion 的詳細使用說明,如下所示:
6.1、?? 更新日志
-
1.0.9
-
在路由模式消費的時候,去掉了呼叫實體化方法Instance()必須傳入引數
true的引數; -
提供了兩種使用方式,第一種就是v1.0.8之前的通過實體化的方式進行初始化使用,第二種就是v1.0.9之后增加了可以通過依賴注入的方式進行初始化使用(推薦使用依賴注入的方式);
-
新增加了
AddRabbitMQPlus和GetInstance方法,這兩個方法分別有 2 個多載,具體說明參見Quick.RabbitMQPlus方法; -
增加了 IQuickRabbitMQPlus 介面,如果使用依賴注入的方式,就可以實作該介面并使用其中的各個方法,
-
-
1.0.8
-
去掉了通過物體特性去配置各個屬性,目的在于簡化各個配置,都通過組態檔進行設定;
-
在接收資料的時候,可以通過物體特性QuickRabbitMQPlusReceive去設定接收的訊息佇列(如[QuickRabbitMQPlusReceive("TestRabbitMQName1")]),如果不要物體特性,那么就會通過配置中的QueueNames屬性去控制;
-
對接收資料的方法進行了多載;
-
配置中增加了Default屬性,用于設定默認連接配置(如果配置中有多個連接配置,并且沒有一個配置中設定 Default 為 true,那么默認會使用第一個配置);
-
配置中增加了PrefetchCount屬性,用于全域設定 RabbitMQ 一次最多推送多少條訊息給消費者,默認為 10;
-
配置中去掉了
RouteKey配置,如果使用的是路由模式,請使用RouteKeys屬性進行配置,前提是RouteKeys和QueueNames集合的數量需要保持一致; -
實體化組件物件的時候,去掉了泛型的定義,如
var sendInstance = QuickRabbitMQPlusInstance.Instance();; -
發送和接收的方法,增加了泛型的定義,這樣做的目的是不受限于在組件實體化的時候指定只能是某個泛型,將泛型的定義設定到方法上更靈活,如同一個組件實體化物件可發送不同的泛型物體資料,
-
-
1.0.7
-
去掉了必須要設定物體特性的控制(如果沒有物體特性,那么就需要在組態檔中將相關屬性配置齊全),默認使用的是配置中的第一個配置;
-
在路由模式下,可指定將訊息發送到對應的佇列中,需要配置
QueueNames和RouteKeys的集合數量保持一一對應的關系; -
Send 方法增加了第二個引數,路由 Key 名稱;
-
實體化物件的方法 Instance 增加了引數
Instance(bool isReceive = false),當前實體化物件是否為接收訊息,
-
-
1.0.6
-
新增加了可動態切換連接的方法
ChangeConn; -
去掉了
Furion的依賴; -
去掉了
Newtonsoft.Json的依賴; -
同時將原來的
Quick.RabbitMQPlus分為了Quick.RabbitMQPlus和Quick.RabbitMQPlus.Furion這兩個版本,
-
6.2、?? Quick.RabbitMQPlus.Furion 使用說明
該組件是基于RabbitMQ.Client和Furion組件進行封裝使用的,目的在于結合.Net Core 更快、更簡單和更靈活的使用 RabbitMQ!!!
功能說明:
-
支持
發布訂閱模式、路由模式,通配符模式和Headers屬性模式; -
可根據組態檔讀取 RabbitMQ 連接的各個配置(如:RabbitMQ 服務地址、賬號、密碼和交換機名稱等);
-
支持配置多個 RabbitMQ 的連接配置;
-
支持動態切換 RabbitMQ 的連接配置;
-
可根據物體定義的特性發布和訂閱訊息(已廢棄),目前只針對接收訊息定義了物體特性,并且只能指定接收訊息的佇列(v1.0.8 調整); -
支持配置將多個佇列系結到交換機;
-
一個消費端支持可以同時消費多個多列的訊息等;
-
支持使用同一個物體,將不同的訊息發送到不同的佇列中(使用路由模式,同時在發送的時候將路由 Key 傳入);
-
支持全域設定接收訊息一次性接收多少條的配置(v1.0.8 新增);
-
支持兩種使用方式,第一種就是v1.0.8之前的通過實體化的方式進行初始化使用,第二種就是v1.0.9之后增加了可以通過依賴注入的方式進行初始化使用(推薦使用依賴注入的方式),
6.3、?? 安裝
安裝命令如下所示:
Install-Package Quick.RabbitMQPlus.Furion
該組件的命名空間為:Quick.RabbitMQPlus,包括Quick.RabbitMQPlus原生組件,


6.4、?? 生產端
6.4.1、配置appsettings.json
在appsettings.json組態檔中創建節點QuickRabbitMQPlus>PrefetchCount和QuickRabbitMQPlusConfigs,PrefetchCount 為設定 RabbitMQ 一次最多推送多少條訊息給消費者(默認為 10),QuickRabbitMQPlusConfigs 為陣列型別(即可配置多個 RabbitMQ 服務地址),具體配置如下所示:
{
"QuickRabbitMQPlus": {
"PrefetchCount": 1,
"QuickRabbitMQPlusConfigs": [
{
"Default": false,
"ConnId": 1,
"UserName": "quber",
"Password": "0807_quberONE",
"HostName": "127.0.0.1",
"Port": 5672,
"ExchangeType": "direct",
"ExchangeName": "TestExchangeName",
"QueueNames": ["TestRabbitMQName1", "TestRabbitMQName2"],
"RouteKeys": ["TestRouteKey1", "TestRouteKey2"] //ExchangeType=direct才起作用,并且和QueueNames一一對應
//"ExchangeDurable": true,
//"QueueDurable": true,
//"MessageDurable": true
},
//fanout模式
{
"Default": true,
"ConnId": 2,
"UserName": "quber",
"Password": "0807_quberONE",
"HostName": "127.0.0.1",
"Port": 5672,
"ExchangeType": "fanout",
"ExchangeName": "TestExchangeName",
"QueueNames": ["TestRabbitMQName1", "TestRabbitMQName2"],
"RouteKeys": ["TestRouteKey1", "TestRouteKey2"] //ExchangeType=direct才起作用,并且和QueueNames一一對應
//"ExchangeDurable": true,
//"QueueDurable": true,
//"MessageDurable": true
}
]
}
}
配置說明(消費端通用):
| 屬性名稱 | 屬性說明 | 是否必填 | 備注 |
|---|---|---|---|
| PrefetchCount | 全域設定 RabbitMQ 一次最多推送多少條訊息給消費者,默認為 10 | 消費端才使用的屬性 | |
| Default | 是否為默認連接 | 默認為 false | |
| ConnId | 連接 Id(請確保該 Id 的唯一性) | √ | 如果要動態切換連接配置,請確保該 Id 有值并且唯一 |
| UserName | RabbitMQ 連接賬戶 | √ | |
| Password | RabbitMQ 連接密碼 | √ | |
| HostName | RabbitMQ 連接 IP | √ | |
| Port | RabbitMQ 連接埠 | 不填就是默認埠5672 | |
| ExchangeType | 交換機型別(fanout:發布訂閱模式、direct:路由模式、topic:通配符模式、headers:屬性匹配模式) | √ | |
| ExchangeName | 交換機名稱 | √ | |
| QueueNames | 佇列名稱集合(與交換機 ExchangeName 進行系結) | √ | 此處為集合,目的是在發布訊息時將訊息存盤到該佇列集合中去 |
| RouteKeys | 路由名稱集合(或通配符名稱集合) | ExchangeType=direct才起作用,并且和 QueueNames 是一一對應的關系,這樣配置目的是可以實作將訊息 1 發送到佇列 1,將訊息 2 發送到佇列 2 | |
| ExchangeDurable | 交換機是否持久化,默認為 true | 如果采用默認的設定,組態檔可以不要該屬性 | |
| QueueDurable | 佇列是否持久化,默認為 true | 如果采用默認的設定,組態檔可以不要該屬性 | |
| MessageDurable | 訊息是否持久化,默認為 true | 如果采用默認的設定,組態檔可以不要該屬性 |
6.4.2、配置 Program.cs
由于我們使用的是Furion,因此,我們可在程式啟動檔案中配置如下代碼(具體可參考Furion 入門指南),目的是注冊RabbitMQ 服務和配置選項 QuickRabbitMQPlusOptions,
-
依賴注入方式-WinForm 中使用:
[STAThread] static void Main() { ApplicationConfiguration.Initialize(); //初始化Furion Serve.Run(GenericRunOptions.DefaultSilence); }public void ConfigureServices(IServiceCollection services) { //注冊FrmMain表單類 services.AddScoped<FrmMain>(); //注入IQuickRabbitMQPlus的方式 //通過AddRabbitMQPlus添加依賴注入 services.AddRabbitMQPlus(); ////使用建構式獲取實體的方式: ////通過AddRabbitMQPlus添加依賴注入,并注冊TestConsumerClassForDI類 //services.AddRabbitMQPlus<TestConsumerClassForDI>() //DI容器生成serviceProvider var serviceProvider = services.BuildServiceProvider(); //通過serviceProvider獲取MainForm的注冊實體 var frmMain = serviceProvider.GetRequiredService<FrmMain>(); //var frmMain = (FrmMain)serviceProvider.GetService(typeof(FrmMain)); Application.Run(frmMain); }說明:上述的關鍵點就在于呼叫.AddRabbitMQPlus()或者.AddRabbitMQPlus<T>()方法對服務進行注冊,
-
實體化方式-WinForm 中使用:
[STAThread] static void Main() { //初始化Furion Serve.Run(GenericRunOptions.DefaultSilence); //或者 //Serve.Run(RunOptions.DefaultSilence.ConfigureBuilder(builder => //{ //注冊RabbitMQ連接配置物件 //builder.Services.AddConfigurableOptions<QuickRabbitMQPlusOptions>(); //}).Configure(app => //{ //})); ApplicationConfiguration.Initialize(); Application.Run(new FrmMain()); }public void ConfigureServices(IServiceCollection services) { //注冊RabbitMQ連接配置物件 services.AddConfigurableOptions<QuickRabbitMQPlusOptions>(); } -
Quick.RabbitMQPlus 組件,依賴注入方式-WinForm 中使用:
Program.cs 的 Main 方法:
ApplicationConfiguration.Initialize(); using IHost host = Host.CreateDefaultBuilder() .ConfigureServices((_, services) => { //注冊FrmMain表單類 services.AddScoped<FrmMain>(); //注入IQuickRabbitMQPlus的方式 //通過AddRabbitMQPlus添加依賴注入 services.AddRabbitMQPlus(); ////使用建構式獲取實體的方式: ////通過AddRabbitMQPlus添加依賴注入,并注冊TestConsumerClassForDI類 //services.AddRabbitMQPlus<TestConsumerClassForDI>() //DI容器生成serviceProvider var serviceProvider = services.BuildServiceProvider(); //通過serviceProvider獲取MainForm的注冊實體 var frmMain = serviceProvider.GetRequiredService<FrmMain>(); //var frmMain = (FrmMain)serviceProvider.GetService(typeof(FrmMain)); Application.Run(frmMain); } ) .Build(); host.RunAsync(); -
Quick.RabbitMQPlus 組件,實體化方式-WinForm 中使用:
Program.cs 的 Main 方法:
[STAThread] static void Main() { ApplicationConfiguration.Initialize(); Application.Run(new FrmMain()); }
其他庫的使用方式也基本類似,就不一一介紹了,
6.4.3、定義發送訊息物體
如下所示我們可以定義一個訊息物體:
namespace Quick.RabbitMQPlus.Publisher
{
public class TestRabbitMQModel
{
public int UserId { get; set; }
public string UserName { get; set; }
public int UserAge { get; set; }
public DateTime CreateTime { get; set; }
}
}
6.4.4、發送訊息 Demo
定義發送物件:
public partial class FrmMain : Form
{
private readonly IQuickRabbitMQPlus _quickRabbitMqPlus;
public FrmMain(IQuickRabbitMQPlus quickRabbitMqPlus)
{
InitializeComponent();
//定義發送物件
_quickRabbitMqPlus = quickRabbitMqPlus;
}
}//定義發送物件
var sendInstance = QuickRabbitMQPlusInstance.Instance();
發送單條訊息:
//發送10條資料
for (int i = 0; i < 10; i++)
{
var msgModel = new TestRabbitMQModel
{
UserId = rand.Next(1, 9999),
UserName = "Quick" + (i + 1),
UserAge = rand.Next(20, 80),
CreateTime = DateTime.Now
};
var sendRet = await _quickRabbitMqPlus.Send(msgModel);
if (sendRet.Item1)
{
//發送成功
}
else
{
//發送失敗
var errMsg = $"失敗原因:{sendRet.Item2}";
}
//間隔2秒發送一次
await Task.Delay(2000);
}
//訊息發送完成后,關閉通道
_quickRabbitMqPlus.Close();//當i % 2為0時,發送給路由TestRouteKey1對應的佇列TestRabbitMQName1,否則發送給路由TestRouteKey2對應的佇列TestRabbitMQName2
//此處就實作了在路由模式下,將不同的訊息發送給不同的佇列
//需要注意的時候,此方式需要將交換機型別配置為direct路由模式,同時需要設定配置的QueueNames和RouteKeys屬性(這兩屬性的集合數量需要保持一致,一一對應的關系)
var sendRet = await _quickRabbitMqPlus.Send(msgModel, i % 2 == 0 ? "TestRouteKey1" : "TestRouteKey2");
發送多條訊息:
var sendList = new List<TestRabbitMQModel>{
new TestRabbitMQModel(),
new TestRabbitMQModel()
};
var sendRet = await _quickRabbitMqPlus.Send(sendList);
切換連接:
//切換到connId=2的配置
_quickRabbitMqPlus.ChangeConn(2);
var sendRetConn2 = await _quickRabbitMqPlus.Send(msgModel);
//切換到connId=3的配置
_quickRabbitMqPlus.ChangeConn(3);
var sendRetConn3 = await _quickRabbitMqPlus.Send(msgModel);6.5、?? 消費端
6.5.1、配置appsettings.json與物體特性QuickRabbitMQPlusReceive
-
配置說明:
具體配置請參見生產端(和生產端完全一致),
需要注意的是,消費端中,增加了PrefetchCount配置,目的用于全域設定 RabbitMQ 一次最多推送多少條訊息給消費者,默認為 10,
需要注意的是,如果消費端中的 QueueNames 屬性設定了多個佇列,就代表該消費端同時接收多個佇列的訊息
-
物體特性配置說明(消費端使用):
屬性名稱 屬性說明 是否必填 備注 queueName 佇列名稱(多個佇列名稱請使用英文逗號,分隔) 如果同時設定了物體特性的佇列名稱和配置中的 QueueNames屬性,那么會優先采用物體的佇列名稱如下所示:
namespace Quick.RabbitMQPlus.Publisher { [QuickRabbitMQPlusReceive("TestRabbitMQName1")] //[QuickRabbitMQPlusReceive("TestRabbitMQName1,TestRabbitMQName2")] public class TestRabbitMQModel { public int UserId { get; set; } public string UserName { get; set; } public int UserAge { get; set; } public DateTime CreateTime { get; set; } } }
6.5.2、配置 Program.cs
由于我們使用的是Furion,因此,我們可在程式啟動檔案中配置如下代碼(具體可參考Furion 入門指南),目的是注冊RabbitMQ 服務和配置選項 QuickRabbitMQPlusOptions,
-
依賴注入方式-Worker Service 中使用:
//初始化Furion Serve.Run(GenericRunOptions.Default);public void ConfigureServices(IServiceCollection services) { //通過AddRabbitMQPlus添加依賴注入 services.AddRabbitMQPlus(); }說明:上述的關鍵點就在于呼叫.AddRabbitMQPlus()或者.AddRabbitMQPlus<T>()方法對服務進行注冊,
-
依賴注入方式-控制臺中使用:
//初始化Furion Serve.Run(GenericRunOptions.DefaultSilence);public void ConfigureServices(IServiceCollection services) { ////通過AddRabbitMQPlus添加依賴注入 //services.AddRabbitMQPlus(); //使用建構式獲取實體的方式: //通過AddRabbitMQPlus添加依賴注入,并注冊TestConsumerClassForDI類 services.AddRabbitMQPlus<TestConsumerClassForDI>(); } -
Quick.RabbitMQPlus 組件,依賴注入方式-Worker Service 中使用:
Program.cs 的 Main 方法:
IHost host = Host.CreateDefaultBuilder(args) .ConfigureServices(services => { services.AddHostedService<Worker>(); //通過AddRabbitMQPlus添加依賴注入 services.AddRabbitMQPlus(); }) .Build(); await host.RunAsync(); -
Quick.RabbitMQPlus 組件,依賴注入方式-控制臺中使用:
Program.cs 的 Main 方法:
using IHost host = Host.CreateDefaultBuilder(args) .ConfigureServices((_, services) => //注入IQuickRabbitMQPlus的方式 //通過AddRabbitMQPlus添加依賴注入 services.AddRabbitMQPlus() ////使用建構式獲取實體的方式: ////通過AddRabbitMQPlus添加依賴注入,并注冊TestConsumerClassForDI類 //services.AddRabbitMQPlus<TestConsumerClassForDI>() ) .Build();
6.5.3、定義接收訊息物體
如下所示我們可以定義 3 個訊息物體(第一個用于接收佇列TestRabbitMQName1的訊息,第二個用于接收佇列TestRabbitMQName2的訊息,第三個用于接收佇列TestRabbitMQName1和TestRabbitMQName2):
namespace Quick.RabbitMQPlus.Consumer
{
[QuickRabbitMQPlusReceive("TestRabbitMQName1")]
public class TestRabbitMQModel1
{
public int UserId { get; set; }
public string UserName { get; set; }
public int UserAge { get; set; }
public DateTime CreateTime { get; set; }
}
}namespace Quick.RabbitMQPlus.Consumer
{
[QuickRabbitMQPlusReceive("TestRabbitMQName2")]
public class TestRabbitMQModel2
{
public int UserId { get; set; }
public string UserName { get; set; }
public int UserAge { get; set; }
public DateTime CreateTime { get; set; }
}
}namespace Quick.RabbitMQPlus.Consumer
{
[QuickRabbitMQPlusReceive("TestRabbitMQName1,TestRabbitMQName2")]
public class TestRabbitMQModel3
{
public int UserId { get; set; }
public string UserName { get; set; }
public int UserAge { get; set; }
public DateTime CreateTime { get; set; }
}
}6.5.4、接收訊息 Demo
定義接收物件(依賴注入方式):
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly IQuickRabbitMQPlus _quickRabbitMqPlus;
public Worker(ILogger<Worker> logger, IQuickRabbitMQPlus quickRabbitMqPlus)
{
_logger = logger;
_quickRabbitMqPlus = quickRabbitMqPlus;
}
}//獲取IQuickRabbitMQPlus的實體(App是Furion中的靜態類)
var _quickRabbitMqPlus = App.GetService<IQuickRabbitMQPlus>();
//獲取IQuickRabbitMQPlus的實體(其中的host為IHost物件,GetInstance方法為封裝的擴展方法)
//var _quickRabbitMqPlus = host.Services.GetInstance<IQuickRabbitMQPlus>();
//獲取IQuickRabbitMQPlus的實體(其中的host為IHost物件,GetInstance方法為封裝的擴展方法)
var _quickRabbitMqPlus = host.GetInstance<IQuickRabbitMQPlus>();
定義接收物件(實體化方式):
//定義接收物件
var recInstance = QuickRabbitMQPlusInstance.Instance();
定義兩個消費端,一個消費端消費一個佇列,具體的接收訊息代碼如下所示(接收單條訊息):
namespace Quick.RabbitMQPlus.ConsumerServiceFurion
{
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly IQuickRabbitMQPlus _quickRabbitMqPlus;
public Worker(ILogger<Worker> logger, IQuickRabbitMQPlus quickRabbitMqPlus)
{
_logger = logger;
_quickRabbitMqPlus = quickRabbitMqPlus;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
//接收佇列1的訊息
var retRec1 = await _quickRabbitMqPlus.Receive<TestRabbitMQModel1>(async (data, msg) =>
{
await Task.Delay(1000);
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"\r\n佇列1訊息:{msg}");
//回傳true代表業務邏輯處理成功,會告知MQ這條訊息已經接收成功,會從MQ佇列中洗掉
//回傳false代表業務邏輯處理失敗,會告知MQ這條訊息沒有處理成功,則MQ會繼續推送這條訊息
return true;
}, async (errMsg, msg) =>
{
await Task.Delay(3000);
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"\r\n佇列1接收錯誤:{errMsg}\r\n原始資料:{msg}");
}, 1);
if (!retRec1.Item1)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"\r\n佇列1接收失敗:{retRec1.Item2}");
}
//接收佇列2的訊息
var retRec2 = await _quickRabbitMqPlus.Receive<TestRabbitMQModel2>(async (data, msg) =>
{
await Task.Delay(2500);
Console.ForegroundColor = ConsoleColor.Magenta;
Console.WriteLine($"\r\n佇列2訊息:{msg}");
//回傳true代表業務邏輯處理成功,會告知MQ這條訊息已經接收成功,會從MQ佇列中洗掉
//回傳false代表業務邏輯處理失敗,會告知MQ這條訊息沒有處理成功,則MQ會繼續推送這條訊息
return true;
}, async (errMsg, msg) =>
{
await Task.Delay(3000);
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"\r\n佇列1接收錯誤:{errMsg}\r\n原始資料:{msg}");
}, 10);
if (!retRec2.Item1)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"\r\n佇列2接收失敗:{retRec2.Item2}");
}
}
}
}
}//接收佇列1的訊息
var retRec1 = await _quickRabbitMqPlus.Receive<TestRabbitMQModel1>(async (data, msg) =>
{
await Task.Delay(1000);
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"\r\n佇列1訊息:{msg}");
//回傳true代表業務邏輯處理成功,會告知MQ這條訊息已經接收成功,會從MQ佇列中洗掉
//回傳false代表業務邏輯處理失敗,會告知MQ這條訊息沒有處理成功,則MQ會繼續推送這條訊息
return true;
});
if (!retRec1.Item1)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"\r\n佇列1接收失敗:{retRec1.Item2}");
}
//接收佇列2的訊息
var retRec2 = await _quickRabbitMqPlus.Receive<TestRabbitMQModel2>(async (data, msg) =>
{
await Task.Delay(2500);
Console.ForegroundColor = ConsoleColor.Magenta;
Console.WriteLine($"\r\n佇列2訊息:{msg}");
//回傳true代表業務邏輯處理成功,會告知MQ這條訊息已經接收成功,會從MQ佇列中洗掉
//回傳false代表業務邏輯處理失敗,會告知MQ這條訊息沒有處理成功,則MQ會繼續推送這條訊息
return true;
});
if (!retRec2.Item1)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"\r\n佇列2接收失敗:{retRec2.Item2}");
}效果如下所示:

定義一個消費端,同時消費兩個佇列,具體的接收訊息代碼如下所示(接收單條訊息):
//接收佇列1的訊息
var retRec = await _quickRabbitMqPlus.Receive<TestRabbitMQModel3>(async (data, msg) =>
{
await Task.Delay(1000);
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"\r\n佇列1、2訊息:{msg}");
//回傳true代表業務邏輯處理成功,會告知MQ這條訊息已經接收成功,會從MQ佇列中洗掉
//回傳false代表業務邏輯處理失敗,會告知MQ這條訊息沒有處理成功,則MQ會繼續推送這條訊息
return true;
});
if (!retRec.Item1)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"\r\n佇列1、2接收失敗:{retRec.Item2}");
}效果如下所示:

如果需要接收多條訊息,請使用Receives方法:
//接收佇列1的訊息
var retRec = await _quickRabbitMqPlus.Receives<TestRabbitMQModel3>(async (dataList, msg) =>
{
//此處的dataList為List<TestRabbitMQModel3>
return true;
});6.6、?? Quick.RabbitMQPlus.Furion 方法
-
首先宣告 Quick.RabbitMQPlus 的實體化物件,有兩種方式可以得到 Quick.RabbitMQPlus 的實體化物件,一種是通過依賴注入在建構式中得到,另一種是直接通過實體化物件的方式,具體可參照上述檔案中的相關示例,
-
依賴注入方法:
方法名稱 方法說明 方法引數 備注 AddRabbitMQPlus 添加依賴注入服務 多載 1:()
多載 2:<T>()該方法為IServiceCollection的擴展方法,目的是實作 IQuickRabbitMQPlus介面的注冊,
多載 1 代表注冊的是IQuickRabbitMQPlus服務;
多載 2 傳入了泛型 T,代表的是注冊了IQuickRabbitMQPlus服務的同時,也注冊了 T 這個服務(T這個泛型類中,在建構式中實作了IQuickRabbitMQPlus介面服務,該方法可能在控制臺程式使用的情況較多),GetInstance 獲取某介面服務的實體 多載 1:()
多載 2:()該方法為IServiceProvider或IHost的擴展方法,目的是獲取某介面或類的實體,
多載 1 是基于IServiceProvider的擴展;
多載 2 是基于IHost的擴展, -
其次就可以使用使用該實體化物件中的發送和接收方法了,具體說明如下所示:
方法名稱 方法說明 方法引數 備注 Send 發送訊息方法,支持單條訊息和多條訊息的發送 (data,routeKey) 方法的第一個引數 data 可以為 T 或 List<T>
方法的第二個引數為路由名稱(當交換機型別為路由模式的時候,該引數起作用,如可以實作使用同一個物體將不同的訊息發送到不同的佇列中)Receive 接收訊息(單條訊息),該方法有 3 個多載 多載 1:(received)
多載 2:(received, receivedError)
多載 3:(received, receivedError, prefetchCount)方法的第一個引數為回呼函式,該回呼函式包含 2 個回傳資料(第一個為T,第二個為T 對應的字串),并且該回呼函式需要回傳 bool 型別(以便告訴 RabbitMQ 服務該訊息是否處理成功);
方法的第二個引數為消費出錯的回呼函式,該回呼函式包括 2 個回傳資料(第一個為錯誤提示資訊,第二個為T 對應的字串)
方法的第三個引數為設定 RabbitMQ 一次最多推送多少條訊息給消費者,默認為 10Receives 接收訊息(多條訊息),該方法有 3 個多載 多載 1:(received)
多載 2:(received, receivedError)
多載 3:(received, receivedError, prefetchCount)方法的第一個引數為回呼函式,該回呼函式包含 2 個回傳資料(第一個為List<T>,第二個為List<T>對應的字串),并且該回呼函式需要回傳 bool 型別(以便告訴 RabbitMQ 服務該訊息是否處理成功);
方法的第二個引數為消費出錯的回呼函式,該回呼函式包括 2 個回傳資料(第一個為錯誤提示資訊,第二個為List<T>對應的字串)
方法的第三個引數為設定 RabbitMQ 一次最多推送多少條訊息給消費者,默認為 10ChangeConn 切換連接(切換組態檔中某個連接配置) (connId) 方法引數 connId 為 int 型別,即與配置中的 ConnId 保持一致 Close 關閉連接 () 注意,如果呼叫了該方法,又想重新使用實體化物件 mqInstance 發送或接收訊息,需要重新實體化該物件, 如下所示為接收訊息的使用方式:
//接收佇列1的訊息 var retRec1 = await _quickRabbitMqPlus.Receive<TestRabbitMQModel1>(async (data, msg) => { await Task.Delay(1000); Console.ForegroundColor = ConsoleColor.Green; Console.WriteLine($"\r\n佇列1訊息:{msg}"); //回傳true代表業務邏輯處理成功,會告知MQ這條訊息已經接收成功,會從MQ佇列中洗掉 //回傳false代表業務邏輯處理失敗,會告知MQ這條訊息沒有處理成功,則MQ會繼續推送這條訊息 return true; }, async (errMsg, msg) => { await Task.Delay(3000); Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine($"\r\n佇列1接收錯誤:{errMsg}\r\n原始資料:{msg}"); }, 1); if (!retRec1.Item1) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine($"\r\n佇列1接收失敗:{retRec1.Item2}"); }
7、??Quick.RabbitMQPlus 使用說明
Quick.RabbitMQPlus組件的使用方式和Quick.RabbitMQPlus.Furion組件完全一致(包括配置、物體特性和方法等),唯一不同的就是Quick.RabbitMQPlus.Furion需要在啟動程式中通過依賴注入注冊服務(services.AddRabbitMQPlus()、services.AddRabbitMQPlus<T>())或注冊 RabbitMQ 連接配置物件(builder.Services.AddConfigurableOptions<QuickRabbitMQPlusOptions>();),
8、??RabbitMQ 總結
經過以上對 RabbitMQ 的介紹和運用,簡單總結幾點關于 RabbitMQ 的重點注意事項:
-
需要了解掌握 RabbitMQ 的幾個核心概念:生產者、交換機、佇列、系結、連接、通道和消費者;
-
需要了解掌握 RabbitMQ 的幾種模式:簡單佇列模式、作業佇列模式、發布訂閱模式(使用最多)、路由模式和通配符模式;
-
在消費資料完成后,需要回應給 MQ 服務器,因此要理解自動回應和手動回應的原理和區別(一般都使用手動回應,為了確保訊息不丟失并且可再次消費);
-
如果想要訊息持久化,需要根據實際情況對
交換機、佇列和訊息設定其持久化配置; -
……
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/519333.html
標籤:.NET Core
