上一篇隨筆記錄到RabbitMQ的安裝,安裝完成,我們就開始使用吧,
RabbitMQ簡介
AMQP,即Advanced Message Queuing Protocol,高級訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中間件設計,訊息中間件主要用于組件之間的解耦,訊息的發送者無需知道訊息使用者的存在,反之亦然,
AMQP的主要特征是面向訊息、佇列、路由(包括點對點和發布/訂閱)、可靠性、安全,
RabbitMQ是一個開源的AMQP實作,服務器端用Erlang語言撰寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,用于在分布式系統中存盤轉發訊息,在易用性、擴展性、高可用性等方面表現不俗,
RabbitMQ提供了可靠的訊息機制、跟蹤機制和靈活的訊息路由,支持訊息集群和分布式部署,適用于排隊演算法、秒殺活動、訊息分發、異步處理、資料同步、處理耗時任務、CQRS等應用場景,
DotNet Core使用RabbitMQ
通過nuget安裝:https://www.nuget.org/packages/RabbitMQ.Client/
定義生產者:
//創建連接工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "guest",//用戶名 Password = "guest",//密碼 HostName = "127.0.0.1"//rabbitmq ip }; //創建連接 var connection = factory.CreateConnection(); //創建通道 var channel = connection.CreateModel(); //宣告一個佇列 channel.QueueDeclare("hello", false, false, false, null); Console.WriteLine("\nRabbitMQ連接成功,請輸入訊息,輸入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //發布訊息 channel.BasicPublish("", "hello", null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close();
定義消費者:
//創建連接工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "guest",//用戶名 Password = "guest",//密碼 HostName = "127.0.0.1"//rabbitmq ip }; //創建連接 var connection = factory.CreateConnection(); //創建通道 var channel = connection.CreateModel(); //事件基本消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到訊息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"收到訊息: {message}"); //確認該訊息已被消費 channel.BasicAck(ea.DeliveryTag, false); }; //啟動消費者 設定為手動應答訊息 channel.BasicConsume("hello", false, consumer); Console.WriteLine("消費者已啟動"); Console.ReadKey(); channel.Dispose(); connection.Close();
演示如下:

啟動了一個生產者,兩個消費者,可以看見兩個消費者都能接收到訊息,訊息投遞到哪個消費者是由RabbitMQ決定的,
RabbitMQ消費失敗的處理
RabbitMQ采用訊息應答機制,即消費者收到一個訊息之后,需要發送一個應答,然后RabbitMQ才會將這個訊息從佇列中洗掉,如果消費者在消費程序中出現例外,斷開連接切沒有發送應答,那么RabbitMQ會將這個訊息重新投遞,
我們來修改一下消費者的代碼:
//接收到訊息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"收到訊息: {message}"); Console.WriteLine($"收到該訊息[{ea.DeliveryTag}] 延遲10s發送回執"); Thread.Sleep(10000); //確認該訊息已被消費 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"已發送回執[{ea.DeliveryTag}]"); };
演示如下:

從圖中可以看出,設定了訊息應答延遲10s,如果在這10s中,該消費者斷開了連接,那么訊息會被RabbitMQ重新投遞,
使用RabbitMQ的Exchange
前面的例子,我們可以看到生產者將訊息投遞到Queue中,實際上這種方式在RabbitMQ中永遠都不會發生的,實際的情況是,生產者將訊息發送到Exchange(交換器),下圖中的X,由Exchange(交換器)將訊息路由到一個或多個Queue中(或者丟棄),

AMQP協議中的核心思想就是生產者和消費者隔離,生產者從不直接將訊息發送給佇列,生產者通常不知道是否一個訊息會被發送到佇列中,只是將訊息發送到一個交換機,先由Exchange來接收,然后Exchange按照特定的策略轉發到Queue進行存盤,同理,消費者也是如此,Exchange 就類似于一個交換機,轉發各個訊息分發到相應的佇列中,
Exchange Types(交換器型別)
RabbitMQ常用的Exchange Type有Fanout、Direct、Topic、Headers這四種
1、Fanout:
這種型別的Exchange路由規則非常簡單,它會把所有發送到該Exchange的訊息路由到所有與它系結的Queue中,這時Routing key不起作用

Fanout Exchange 不需要處理RouteKey ,只需要簡單的將佇列系結到exchange 上,這樣發送到exchange的訊息都會被轉發到與該交換機系結的所有佇列上,類似子網廣播,每臺子網內的主機都獲得了一份復制的訊息,
所以,Fanout Exchange 轉發訊息是最快的,
為了演示效果,定義了兩個佇列,分別為hello1,hello2,每個佇列都擁有一個消費者,
static void Main(string[] args) { string exchangeName = "TestFanoutChange"; string queueName1 = "hello1"; string queueName2 = "hello2"; string routeKey = ""; //創建連接工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "guest",//用戶名 Password = "guest",//密碼 HostName = "127.0.0.1"//rabbitmq ip }; //創建連接 var connection = factory.CreateConnection(); //創建通道 var channel = connection.CreateModel(); //定義一個Direct型別交換機 channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null); //定義佇列1 channel.QueueDeclare(queueName1, false, false, false, null); //定義佇列2 channel.QueueDeclare(queueName2, false, false, false, null); //將佇列系結到交換機 channel.QueueBind(queueName1, exchangeName, routeKey, null); channel.QueueBind(queueName2, exchangeName, routeKey, null); //生成兩個佇列的消費者 ConsumerGenerator(queueName1); ConsumerGenerator(queueName2); Console.WriteLine($"\nRabbitMQ連接成功,\n\n請輸入訊息,輸入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //發布訊息 channel.BasicPublish(exchangeName, routeKey, null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close(); }
/// <summary> /// 根據佇列名稱生成消費者 /// </summary> /// <param name="queueName"></param> static void ConsumerGenerator(string queueName) { //創建連接工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "guest",//用戶名 Password = "guest",//密碼 HostName = "127.0.0.1"//rabbitmq ip }; //創建連接 var connection = factory.CreateConnection(); //創建通道 var channel = connection.CreateModel(); //事件基本消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到訊息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"Queue:{queueName}收到訊息: {message}"); //確認該訊息已被消費 channel.BasicAck(ea.DeliveryTag, false); }; //啟動消費者 設定為手動應答訊息 channel.BasicConsume(queueName, false, consumer); Console.WriteLine($"Queue:{queueName},消費者已啟動"); }
運行效果如下:

2、Direct
這種型別的Exchange路由規則也很簡單,它會把訊息路由到哪些binding key與routingkey完全匹配的Queue中,

Direct模式,可以使用rabbitMQ自帶的Exchange:default Exchange ,所以不需要將Exchange進行任何系結(binding)操作 ,訊息傳遞時,RouteKey必須完全匹配,才會被佇列接收,否則該訊息會被拋棄,
static void Main(string[] args) { string exchangeName = "TestChange"; string queueName = "hello"; string routeKey = "helloRouteKey"; //創建連接工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "guest",//用戶名 Password = "guest",//密碼 HostName = "127.0.0.1"//rabbitmq ip }; //創建連接 var connection = factory.CreateConnection(); //創建通道 var channel = connection.CreateModel(); //定義一個Direct型別交換機 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null); //定義一個佇列 channel.QueueDeclare(queueName, false, false, false, null); //將佇列系結到交換機 channel.QueueBind(queueName, exchangeName, routeKey, null); Console.WriteLine($"\nRabbitMQ連接成功,Exchange:{exchangeName},Queue:{queueName},Route:{routeKey},\n\n請輸入訊息,輸入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //發布訊息 channel.BasicPublish(exchangeName, routeKey, null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close();
運行效果如下:

3、Topic
這種型別的Exchange的路由規則支持 binding key 和 routing key 的模糊匹配,會把訊息路由到滿足條件的Queue, binding key 中可以存在兩種特殊字符 *與 #,用于做模糊匹配,其中 * 用于匹配一個單詞,# 用于匹配0個或多個單詞,單詞以符號“.”為分隔符,
以上圖中的配置為例,routingKey=”quick.orange.rabbit”的訊息會同時路由到Q1與Q2,routingKey=”lazy.orange.fox”的訊息會路由到Q1與Q2,routingKey=”lazy.brown.fox”的訊息會路由到Q2,routingKey=”lazy.pink.rabbit”的訊息會路由到Q2(只會投遞給Q2一次,雖然這個routingKey與Q2的兩個bindingKey都匹配);routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit”的訊息將會被丟棄,因為它們沒有匹配任何bindingKey,
所以,Topic Exchange使用非常靈活,static void Main(string[] args) { string exchangeName = "TestTopicChange"; string queueName = "hello"; string routeKey = "TestRouteKey.*"; //創建連接工廠 ConnectionFactory factory = new ConnectionFactory { UserName = "guest",//用戶名 Password = "guest",//密碼 HostName = "127.0.0.1"//rabbitmq ip }; //創建連接 var connection = factory.CreateConnection(); //創建通道 var channel = connection.CreateModel(); //定義一個Direct型別交換機 channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null); //定義佇列1 channel.QueueDeclare(queueName, false, false, false, null); //將佇列系結到交換機 channel.QueueBind(queueName, exchangeName, routeKey, null); Console.WriteLine($"\nRabbitMQ連接成功,\n\n請輸入訊息,輸入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //發布訊息 channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close(); }
運行效果如下:

4、Headers
這種型別的Exchange不依賴于 routing key 與 binding key 的匹配規則來路由訊息,而是根據發送的訊息內容中的 headers 屬性進行匹配,
參考:
官網:https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
https://www.cnblogs.com/stulzq/p/7551819.html
https://www.jianshu.com/p/e55e971aebd8
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/98057.html
標籤:.NET Core
上一篇:(23)ASP.NET Core2.2 EF關系資料庫建模
下一篇:使用WebApi和Asp.Net Core Identity 認證 Blazor WebAssembly(Blazor客戶端應用)
