主頁 > .NET開發 > RabbitMQ從零到集群高可用(.NetCore5.0) - RabbitMQ簡介和六種作業模式詳解

RabbitMQ從零到集群高可用(.NetCore5.0) - RabbitMQ簡介和六種作業模式詳解

2021-08-31 15:24:00 .NET開發

 系列文章:

RabbitMQ從零到集群高可用(.NetCore5.0) - RabbitMQ簡介和六種作業模式詳解

RabbitMQ從零到集群高可用(.NetCore5.0) - 死信佇列,延時佇列

一、RabbitMQ簡介

是一個開源的訊息代理和佇列服務器,用來通過普通協議在完全不同的應用之間共享資料,RabbitMQ是使用Erlang(高并發語言)語言來撰寫的,并且RabbitMQ是基于AMQP協議的,

1.1 AMQP協議

Advanced Message Queuing Protocol(高級訊息佇列協議)

1.2 AMQP專業術語:(多路復用->在同一個執行緒中開啟多個通道進行操作)

  • Server:又稱broker,接受客戶端的鏈接,實作AMQP物體服務
  • Connection:連接,應用程式與broker的網路連接
  • Channel:網路信道,幾乎所有的操作都在channel中進行,Channel是進行訊息讀寫的通道,客戶端可以建立多個channel,每個channel代表一個會話任務,
  • Message:訊息,服務器與應用程式之間傳送的資料,由Properties和Body組成.Properties可以對訊息進行修飾,必須訊息的優先級、延遲等高級特性;Body則是訊息體內容,
  • virtualhost: 虛擬地址,用于進行邏輯隔離,最上層的訊息路由,一個virtual host里面可以有若干個Exchange和Queue,同一個Virtual Host 里面不能有相同名稱的Exchange 或 Queue,
  • Exchange:交換機,接收訊息,根據路由鍵轉單訊息到系結佇列
  • Binding: Exchange和Queue之間的虛擬鏈接,binding中可以包換routing key
  • Routing key: 一個路由規則,虛擬機可用它來確定如何路由一個特定訊息,(如負載均衡)

1.3 RabbitMQ整體架構

 

 

ClientA(生產者)發送訊息到Exchange1(交換機),同時帶上RouteKey(路由Key),Exchange1找到系結交換機為它和系結傳入的RouteKey的佇列,把訊息轉發到對應的佇列,消費者Client1,Client2,Client3只需要指定對應的佇列名即可以消費佇列資料,

交換機和佇列多對多關系,實際開發中一般是一個交換機對多個佇列,防止設計復雜化,

 

二、安裝RabbitMQ

安裝方式不影響下面的使用,這里用Docker安裝

#15672埠為web管理端的埠,5672為RabbitMQ服務的埠
docker run -d  --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:3-management

輸入:ip:5672訪問驗證,

建一個名為develop的Virtual host(虛擬主機)使用,專案中一般是一個專案建一個Virtual host用,能夠隔離佇列,

切換Virtual host

三、RabbitMQ六種佇列模式在.NetCore中使用

(1)簡單佇列

最簡單的作業佇列,其中一個訊息生產者,一個訊息消費者,一個佇列,也稱為點對點模式

 

 

 

描述:一個生產者 P 發送訊息到佇列 Q,一個消費者 C 接收

建一個RabbitMQHelper.cs類

/// <summary>
    /// RabbitMQ幫助類
    /// </summary>
    public class RabbitMQHelper
    {
        private static ConnectionFactory factory;
        private static object lockObj = new object();
        /// <summary>
        /// 獲取單個RabbitMQ連接
        /// </summary>
        /// <returns></returns>
        public static IConnection GetConnection()
        {
            if (factory == null)
            {
                lock (lockObj)
                {
                    if (factory == null)
                    {
                         factory = new ConnectionFactory
                        {
                            HostName = "172.16.2.84",//ip
                            Port = 5672,//
                            UserName = "admin",//賬號
                            Password = "123456",//密碼
                            VirtualHost = "develop" //虛擬主機
                        };
                    }
                }
            }
            return factory.CreateConnection();
        }
    }

生產者代碼:

新建發送類Send.cs

 

 public static void SimpleSendMsg()
        {
            string queueName = "simple_order";//佇列名
            //創建連接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //創建信道
                using (var channel = connection.CreateModel())
                {//創建佇列
                    channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    for (var i = 0; i < 10; i++)
                    {
                        string message = $"Hello RabbitMQ MessageHello,{i + 1}";
                        var body = Encoding.UTF8.GetBytes(message);//發送訊息
                        channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: null, body);
                        Console.WriteLine($"發送訊息到佇列:{queueName},內容:{message}");
                    }
                }
            }
        }

創建佇列引數決議:

durable:是否持久化,

exclusive:排他佇列,只有創建它的連接(connection)能連,創建它的連接關閉,會自動洗掉佇列,

autoDelete:被消費后,消費者數量都斷開時自動洗掉佇列,

arguments:創建佇列的引數,

發送訊息引數決議:

exchange:交換機,為什么能傳空呢,因為RabbitMQ內置有一個默認的交換機,如果傳空時,就會用默認交換機,

routingKey:路由名稱,這里用佇列名稱做路由key,

mandatory:true告訴服務器至少將訊息route到一個佇列種,否則就將訊息return給發送者;false:沒有找到路由則訊息丟棄,

執行效果:

 

 

佇列產生10條訊息,

 

 消費者代碼:

新建Recevie.cs類

 public static void SimpleConsumer()
        {
            string queueName = "simple_order";
            var connection = RabbitMQHelper.GetConnection();
            {
                //創建信道
                var channel = connection.CreateModel();
                {
                    //創建佇列
                     channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    int i = 0;
                    consumer.Received += (model, ea) =>
                    {
                        //消費者業務處理
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{i},佇列{queueName}消費訊息長度:{message.Length}");
                        i++;
                    };
                    channel.BasicConsume(queueName, true, consumer);
                }
            }
        }

消費者只需要知道佇列名就可以消費了,不需要Exchange和routingKey,

注:消費者這里有一個創建佇列,它本身不需要,是預防消費端程式先執行,沒有佇列會報錯,

執行效果:

 

 

 

 

 

訊息已經被消費完,

(2)作業佇列模式

一個訊息生產者,一個交換器,一個訊息佇列,多個消費者,同樣也稱為點對點模式

 

生產者P發送訊息到佇列,多個消費者C消費佇列的資料,

作業佇列也稱為公平性佇列模式,回圈分發,RabbitMQ 將按順序將每條訊息發送給下一個消費者,每個消費者將獲得相同數量的訊息,

生產者:

Send.cs代碼:

     /// <summary>
        /// 作業佇列模式
        /// </summary>
        public static void WorkerSendMsg()
        {
            string queueName = "worker_order";//佇列名
            //創建連接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //創建信道
                using (var channel = connection.CreateModel())
                {
                    //創建佇列
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
             var properties = channel.CreateBasicProperties();
             properties.Persistent = true; //訊息持久化
                    for ( var i=0;i<10;i++)
                    {
                        string message = $"Hello RabbitMQ MessageHello,{i+1}";
                        var body = Encoding.UTF8.GetBytes(message);
                        //發送訊息到rabbitmq
                        channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: properties, body);
                        Console.WriteLine($"發送訊息到佇列:{queueName},內容:{message}");
                    }
                }
            }
        }

 

引數durable:true,需要持久化,實際專案中肯定需要持久化的,不然重啟RabbitMQ資料就會丟失了,

執行效果:

 

 

 

 寫入10條資料,有持久化標識D,

 

 

 消費端:

Recevie代碼:

 public static void WorkerConsumer()
        {
            string queueName = "worker_order";
            var connection = RabbitMQHelper.GetConnection();
            {
                //創建信道
                var channel = connection.CreateModel();
                {
                    //創建佇列
                    channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    //prefetchCount:1來告知RabbitMQ,不要同時給一個消費者推送多于 N 個訊息,也確保了消費速度和性能
                    channel.BasicQos(prefetchSize: 0, prefetchCount:1, global: false);
                    int i = 1;
                    int index = new Random().Next(10);
                    consumer.Received += (model, ea) =>
                    {
                        //處理業務
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{i},消費者:{index},佇列{queueName}消費訊息長度:{message.Length}");
                         channel.BasicAck(ea.DeliveryTag, false); //訊息ack確認,告訴mq這條佇列處理完,可以從mq洗掉了
               Thread.Sleep(1000);
i++; }; channel.BasicConsume(queueName,autoAck:false, consumer); } } }

BasicQos引數決議:

prefetchSize:每條訊息大小,一般設為0,表示不限制,

prefetchCount:1,作用限流,告訴RabbitMQ不要同時給一個消費者推送多于N個訊息,消費者會把N條訊息快取到本地一條條消費,如果不設,RabbitMQ會進可能快的把訊息推到客戶端,導致客戶端記憶體升高,設定合理可以不用頻繁從RabbitMQ 獲取能提升消費速度和性能,設的太多的話則會增大本地記憶體,需要根據機器性能合理設定,官方建議設為30,

global:是否為全域設定,

這些限流設定針對消費者autoAck:false時才有效,如果是自動Ack的,限流不生效,

 

執行兩個消費者,效果:

 

可以看到消費者號的標識,8,2,8,2是平均的,一個消費者5個,RabbitMQ上也能看到有2個消費者,Unacked數是2,因為每個客戶端的限流數是1,

 

作業佇列模式也是很常用的佇列模式,

(3)發布訂閱模式 

Pulish/Subscribe,無選擇接收訊息,一個訊息生產者,一個交換機(交換機型別為fanout),多個訊息佇列,多個消費者,稱為發布/訂閱模式

在應用中,只需要簡單的將佇列系結到交換機上,一個發送到交換機的訊息都會被轉發到與該交換機系結的所有佇列上,很像子網廣播,每臺子網內的主機都獲得了一份復制的訊息,

 

 

 

生產者P只需把訊息發送到交換機X,系結這個交換機的佇列都會獲得一份一樣的資料,

 

應用場景:適合于用同一份資料源做不同的業務,

生產者代碼:

     /// <summary>
        /// 發布訂閱, 扇形佇列
        /// </summary>
        public static void SendMessageFanout()
        {
            //創建連接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //創建信道
                using (var channel = connection.CreateModel())
                {
                    string exchangeName = "fanout_exchange";
                    //創建交換機,fanout型別
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
                    string queueName1 = "fanout_queue1";
                    string queueName2 = "fanout_queue2";
                    string queueName3 = "fanout_queue3";
                    //創建佇列
                    channel.QueueDeclare(queueName1, false, false, false);
                    channel.QueueDeclare(queueName2, false, false, false);
                    channel.QueueDeclare(queueName3, false, false, false);

                    //把創建的佇列系結交換機,routingKey不用給值,給了也沒意義的
                    channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "");
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "");
                    channel.QueueBind(queue: queueName3, exchange: exchangeName, routingKey: "");
             var properties = channel.CreateBasicProperties();
             properties.Persistent = true; //訊息持久化
                    //向交換機寫10條訊息
                    for (int i = 0; i < 10; i++)
                    {
                        string message = $"RabbitMQ Fanout {i + 1} Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey: "", null, body);
                        Console.WriteLine($"發送Fanout訊息:{message}");
                    }
                }
            }
        }

 

執行代碼:

 

 

 

 向交換機發送10條訊息,則系結這個交換機的3個佇列都會有10條訊息,

消費端的代碼和作業佇列的一樣,只需知道佇列名即可消費,宣告時要和生產者的宣告一樣,

(4)路由模式(推薦使用)

在發布/訂閱模式的基礎上,有選擇的接收訊息,也就是通過 routing 路由進行匹配條件是否滿足接收訊息,

 

 

 

 上圖是一個結合日志消費級別的配圖,在路由模式它會把訊息路由到那些 binding key 與 routing key 完全匹配的 Queue 中,此模式也就是 Exchange 模式中的direct模式,

 生產者P發送資料是要指定交換機(X)和routing發送訊息 ,指定的routingKey=error,則佇列Q1和佇列Q2都會有一份資料,如果指定routingKey=into,或=warning,交換機(X)只會把訊息發到Q2佇列,

 生產者代碼:

 /// <summary>
        /// 路由模式,點到點直連佇列
        /// </summary>
        public static void SendMessageDirect()
        {
            //創建連接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //創建信道
                using (var channel = connection.CreateModel())
                {
                    //宣告交換機物件,fanout型別
                    string exchangeName = "direct_exchange";
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
                    //創建佇列
                    string queueName1 = "direct_errorlog";
                    string queueName2 = "direct_alllog";
                    channel.QueueDeclare(queueName1, true, false, false);
                    channel.QueueDeclare(queueName2, true, false, false);

                    //把創建的佇列系結交換機,direct_errorlog佇列只系結routingKey:error
                    channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "error");
                    //direct_alllog佇列系結routingKey:error,info
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "info");
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "error");
             var properties = channel.CreateBasicProperties();
             properties.Persistent = true; //訊息持久化
                    //向交換機寫10條錯誤日志和10條Info日志
                    for (int i = 0; i < 10; i++)
                    {
                        string message = $"RabbitMQ Direct {i + 1} error Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey: "error", properties, body);
                        Console.WriteLine($"發送Direct訊息error:{message}");

                        string message2 = $"RabbitMQ Direct {i + 1} info Message";
                        var body2 = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey: "info", properties, body2);
                        Console.WriteLine($"info:{message2}");

                    }
                }
            }
        }

 

這里創建一個direct型別的交換機,兩個路由key,一個error,一個info,兩個佇列,一個佇列只系結error,一個佇列系結error和info,向error和info各發10條訊息,

執行代碼:

 

 

 查看RabbitMQ管理界面,direct_errorlog佇列10條,而direct_alllog有20條,因為direct_alllog佇列兩個routingKey的訊息都進去了,

 

 

 

 

 

 點進去看下兩個佇列系結的交換機和routingKey

 

 

 

 

 

 

 

 

 消費者代碼:

消費者和作業佇列一樣,只需根據佇列名消費即可,這里只消費direct_errorlog佇列作示例

 public static void DirectConsumer()
        {
            string queueName = "direct_errorlog";
            var connection = RabbitMQHelper.GetConnection();
            {
                //創建信道
                var channel = connection.CreateModel();
                {
                    //創建佇列
                    channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    ///prefetchCount:1來告知RabbitMQ,不要同時給一個消費者推送多于 N 個訊息,也確保了消費速度和性能
                    ///global:是否設為全域的
                    ///prefetchSize:單條訊息大小,通常設0,表示不做限制
                    //是autoAck=false才會有效
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
                    int i = 1;
                    consumer.Received += (model, ea) =>
                    {
                        //處理業務
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Console.WriteLine($"{i},佇列{queueName}消費訊息長度:{message.Length}");
                        channel.BasicAck(ea.DeliveryTag, false); //訊息ack確認,告訴mq這條佇列處理完,可以從mq洗掉了
                        i++;
                    };
                    channel.BasicConsume(queueName, autoAck: false, consumer);
                }
            }
        }

 

普通場景中推薦使用路由模式,因為路由模式有交換機,有路由key,能夠更好的拓展各種應用場景,

(5)主題模式

topics(主題)模式跟routing路由模式類似,只不過路由模式是指定固定的路由鍵 routingKey,而主題模式是可以模糊匹配路由鍵 routingKey,類似于SQL中 = 和 like 的關系,

 

P 表示為生產者、 X 表示交換機、C1C2 表示為消費者,紅色表示佇列,

 

 topics 模式與 routing 模式比較相近,topics 模式不能具有任意的 routingKey,必須由一個英文句點號“.”分隔的字串(我們將被句點號“.”分隔開的每一段獨立的字串稱為一個單詞),比如 "lazy.orange.a",topics routingKey 中可以存在兩種特殊字符"*"與“#”,用于做模糊匹配,其中“*”用于匹配一個單詞,“#”用于匹配多個單詞(可以是零個),

以上圖為例:

如果發送訊息的routingKey設定為:

aaa.orange.rabbit,那么訊息會路由到Q1與Q2,

routingKey=aaa.orange.bb的訊息會路由到Q1,

routingKey=lazy.aa.bb.cc的訊息會路由到Q2;

routingKey=lazy.aa.rabbit的訊息會路由到 Q2(只會投遞給Q2一次,雖然這個routingKey 與 Q2 的兩個 bindingKey 都匹配);

沒匹配routingKey的訊息將會被丟棄,

生產者代碼:

 

 public static void SendMessageTopic()
        {
            //創建連接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //創建信道
                using (var channel = connection.CreateModel())
                {
                    //宣告交換機物件,fanout型別
                    string exchangeName = "topic_exchange";
                    channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
                    //佇列名
                    string queueName1 = "topic_queue1";
                    string queueName2 = "topic_queue2";
                    //路由名
                    string routingKey1 = "*.orange.*";
                    string routingKey2 = "*.*.rabbit";
                    string routingKey3 = "lazy.#";
                    channel.QueueDeclare(queueName1, true, false, false);
                    channel.QueueDeclare(queueName2, true, false, false);

                    //把創建的佇列系結交換機,routingKey指定routingKey
                    channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: routingKey1);
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey2);
                    channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey3);
                    //向交換機寫10條訊息
                    for (int i = 0; i < 10; i++)
                    {
                        string message = $"RabbitMQ Direct {i + 1} Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        channel.BasicPublish(exchangeName, routingKey: "aaa.orange.rabbit", null, body);
                        channel.BasicPublish(exchangeName, routingKey: "lazy.aa.rabbit", null, body);
                        Console.WriteLine($"發送Topic訊息:{message}");
                    }
                }
            }
        }

這里演示了 routingKey為aaa.orange.rabbit,和lazy.aa.rabbit的情況,第一個匹配到Q1和Q2,第二個匹配到Q2,所以應該Q1是10條,Q2有20條,

執行后看rabbitMQ界面:

 

(6)RPC模式

與上面其他5種所不同之處,該模式是擁有請求/回復的,也就是有回應的,上面5種都沒有,

RPC是指遠程程序呼叫,也就是說兩臺服務器A,B,一個應用部署在A服務器上,想要呼叫B服務器上應用提供的處理業務,處理完后然后在A服務器繼續執行下去,把異步的訊息以同步的方式執行,

 

 客戶端(C)宣告一個排他佇列自己訂閱,然后發送訊息到RPC佇列同時也把這個排他佇列名也在訊息里傳進去,服務端監聽RPC佇列,處理完業務后把處理結果發送到這個排他佇列,然后客戶端收到結果,繼續處理自己的邏輯,

RPC的處理流程:

  • 當客戶端啟動時,創建一個匿名的回呼佇列,
  • 客戶端為RPC請求設定2個屬性:replyTo:設定回呼佇列名字;correlationId:標記request,
  • 請求被發送到rpc_queue佇列中,
  • RPC服務器端監聽rpc_queue佇列中的請求,當請求到來時,服務器端會處理并且把帶有結果的訊息發送給客戶端,接收的佇列就是replyTo設定的回呼佇列,
  • 客戶端監聽回呼佇列,當有訊息時,檢查correlationId屬性,如果與request中匹配,那就是結果了,

服務端代碼:

 public class RPCServer
    {
        public static void RpcHandle()
        {

            var connection = RabbitMQHelper.GetConnection();
            {
                var channel = connection.CreateModel();
                {
                    string queueName = "rpc_queue";
                    channel.QueueDeclare(queue: queueName, durable: false,
                      exclusive: false, autoDelete: false, arguments: null);
                    channel.BasicQos(0, 1, false);
                    var consumer = new EventingBasicConsumer(channel);
                    channel.BasicConsume(queue: queueName,
                      autoAck: false, consumer: consumer);
                    Console.WriteLine("【服務端】等待RPC請求...");

                    consumer.Received += (model, ea) =>
                    {
                        string response = null;

                        var body = ea.Body.ToArray();
                        var props = ea.BasicProperties;
                        var replyProps = channel.CreateBasicProperties();
                        replyProps.CorrelationId = props.CorrelationId;

                        try
                        {
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine($"【服務端】接收到資料:{ message},開始處理");
                            response = $"訊息:{message},處理完成";
                        }
                        catch (Exception e)
                        {
                            Console.WriteLine("錯誤:" + e.Message);
                            response = "";
                        }
                        finally
                        {
                            var responseBytes = Encoding.UTF8.GetBytes(response);
                            channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
                              basicProperties: replyProps, body: responseBytes);
                            channel.BasicAck(deliveryTag: ea.DeliveryTag,
                              multiple: false);
                        }
                    };
                }
            }
        }
       
    }

 

客戶端:

 public class RPCClient
    {
        private readonly IConnection connection;
        private readonly IModel channel;
        private readonly string replyQueueName;
        private readonly EventingBasicConsumer consumer;
        private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
        private readonly IBasicProperties props;

        public RPCClient()
        {
            connection = RabbitMQHelper.GetConnection();

            channel = connection.CreateModel();
            replyQueueName = channel.QueueDeclare().QueueName;
            consumer = new EventingBasicConsumer(channel);

            props = channel.CreateBasicProperties();
            var correlationId = Guid.NewGuid().ToString();
            props.CorrelationId = correlationId; //給訊息id
            props.ReplyTo = replyQueueName;//回呼的佇列名,Client關閉后會自動洗掉

            consumer.Received += (model, ea) =>
            {
                var body = ea.Body.ToArray();
                var response = Encoding.UTF8.GetString(body);
                //監聽的訊息Id和定義的訊息Id相同代表這條訊息服務端處理完成
                if (ea.BasicProperties.CorrelationId == correlationId)
                {
                    respQueue.Add(response);
                }
            };

            channel.BasicConsume(
                consumer: consumer,
                queue: replyQueueName,
                autoAck: true);
        }

        public string Call(string message)
        {
            var messageBytes = Encoding.UTF8.GetBytes(message);
            //發送訊息
            channel.BasicPublish(
                exchange: "",
                routingKey: "rpc_queue",
                basicProperties: props,
                body: messageBytes);
            //等待回復
            return respQueue.Take();
        }

        public void Close()
        {
            connection.Close();
        }
    }

 

執行代碼:

 static void Main(string[] args)
        {
            Console.WriteLine("Hello World!");
            //啟動服務端,正常邏輯是在另一個程式
            RPCServer.RpcHandle();
            //實體化客戶端
            var rpcClient = new RPCClient();
            string message = $"訊息id:{new Random().Next(1, 1000)}";
            Console.WriteLine($"【客服端】RPC請求中,{message}");
            //向服務端發送訊息,等待回復
            var response = rpcClient.Call(message);
            Console.WriteLine("【客服端】收到回復回應:{0}", response);
            rpcClient.Close();
            Console.ReadKey();
        }

 

測驗效果:

 

 z執行完,客服端close后,可以接著自己的下一步業務處理,

 

 

總結:

以上便是RabbitMQ的6中模式在.net core中實際使用,其中(1)簡單佇列,(2)作業佇列,(4)路由模式,(6)RPC模式的交換機型別都是direct,(3)發布訂閱的交換機是fanout,(5)topics的交換機是topic,正常場景用的是direct,默認交換機也是direct型別的,推薦用(4)路由模式因為指定交換機名比起默認的交換機會容易擴展場景,其他的交換機看業務場景所需使用

下面位置可以看到交換機型別,amq.開頭那幾個是內置的,避免交換機過多可以直接使用,

 

轉載請註明出處,本文鏈接:https://www.uj5u.com/net/295786.html

標籤:.NET Core

上一篇:iNeuOS工業互聯平臺,聚合和變化率計算、設備IO和通訊狀態、組態快捷鍵、創建檔案夾、選擇應用圖示等,發布:v3.6版本

下一篇:物件池在 .NET (Core)中的應用[3]: 擴展篇

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • WebAPI簡介

    Web體系結構: 有三個核心:資源(resource),URL(統一資源識別符號)和表示 他們的關系是這樣的:一個資源由一個URL進行標識,HTTP客戶端使用URL定位資源,表示是從資源回傳資料,媒體型別是資源回傳的資料格式。 接下來我們說下HTTP. HTTP協議的系統是一種無狀態的方式,使用請求/ ......

    uj5u.com 2020-09-09 22:07:47 more
  • asp.net core 3.1 入口:Program.cs中的Main函式

    本文分析Program.cs 中Main()函式中代碼的運行順序分析asp.net core程式的啟動,重點不是剖析原始碼,而是理清程式開始時執行的順序。到呼叫了哪些實體,哪些法方。asp.net core 3.1 的程式入口在專案Program.cs檔案里,如下。ususing System; us ......

    uj5u.com 2020-09-09 22:07:49 more
  • asp.net網站作為websocket服務端的應用該如何寫

    最近被websocket的一個問題困擾了很久,有一個需求是在web網站中搭建websocket服務。客戶端通過網頁與服務器建立連接,然后服務器根據ip給客戶端網頁發送資訊。 其實,這個需求并不難,只是剛開始對websocket的內容不太了解。上網搜索了一下,有通過asp.net core 實作的、有 ......

    uj5u.com 2020-09-09 22:08:02 more
  • ASP.NET 開源匯入匯出庫Magicodes.IE Docker中使用

    Magicodes.IE在Docker中使用 更新歷史 2019.02.13 【Nuget】版本更新到2.0.2 【匯入】修復單列匯入的Bug,單元測驗“OneColumnImporter_Test”。問題見(https://github.com/dotnetcore/Magicodes.IE/is ......

    uj5u.com 2020-09-09 22:08:05 more
  • 在webform中使用ajax

    如果你用過Asp.net webform, 說明你也算是.NET 開發的老兵了。WEBform應該是2011 2013左右,當時還用visual studio 2005、 visual studio 2008。后來基本都用的是MVC。 如果是新開發的專案,估計沒人會用webform技術。但是有些舊版 ......

    uj5u.com 2020-09-09 22:08:50 more
  • iis添加asp.net網站,訪問提示:由于擴展配置問題而無法提供您請求的

    今天在iis服務器配置asp.net網站,遇到一個問題,記錄一下: 問題:由于擴展配置問題而無法提供您請求的頁面。如果該頁面是腳本,請添加處理程式。如果應下載檔案,請添加 MIME 映射。 WindowServer2012服務器,添加角色安裝完.netframework和iis之后,運行aspx頁面 ......

    uj5u.com 2020-09-09 22:10:00 more
  • WebAPI-處理架構

    帶著問題去思考,大家好! 問題1:HTTP請求和回傳相應的HTTP回應資訊之間發生了什么? 1:首先是最底層,托管層,位于WebAPI和底層HTTP堆疊之間 2:其次是 訊息處理程式管道層,這里比如日志和快取。OWIN的參考是將訊息處理程式管道的一些功能下移到堆疊下端的OWIN中間件了。 3:控制器處理 ......

    uj5u.com 2020-09-09 22:11:13 more
  • 微信門戶開發框架-使用指導說明書

    微信門戶應用管理系統,采用基于 MVC + Bootstrap + Ajax + Enterprise Library的技術路線,界面層采用Boostrap + Metronic組合的前端框架,資料訪問層支持Oracle、SQLServer、MySQL、PostgreSQL等資料庫。框架以MVC5,... ......

    uj5u.com 2020-09-09 22:15:18 more
  • WebAPI-HTTP編程模型

    帶著問題去思考,大家好!它是什么?它包含什么?它能干什么? 訊息 HTTP編程模型的核心就是訊息抽象,表示為:HttPRequestMessage,HttpResponseMessage.用于客戶端和服務端之間交換請求和回應訊息。 HttpMethod類包含了一組靜態屬性: private stat ......

    uj5u.com 2020-09-09 22:15:23 more
  • 部署WebApi隨筆

    一、跨域 NuGet參考Microsoft.AspNet.WebApi.Cors WebApiConfig.cs中配置: // Web API 配置和服務 config.EnableCors(new EnableCorsAttribute("*", "*", "*")); 二、清除默認回傳XML格式 ......

    uj5u.com 2020-09-09 22:15:48 more
最新发布
  • C#多執行緒學習(二) 如何操縱一個執行緒

    <a href="https://www.cnblogs.com/x-zhi/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/2943582/20220801082530.png" alt="" /></...

    uj5u.com 2023-04-19 09:17:20 more
  • C#多執行緒學習(二) 如何操縱一個執行緒

    C#多執行緒學習(二) 如何操縱一個執行緒 執行緒學習第一篇:C#多執行緒學習(一) 多執行緒的相關概念 下面我們就動手來創建一個執行緒,使用Thread類創建執行緒時,只需提供執行緒入口即可。(執行緒入口使程式知道該讓這個執行緒干什么事) 在C#中,執行緒入口是通過ThreadStart代理(delegate)來提供的 ......

    uj5u.com 2023-04-19 09:16:49 more
  • 記一次 .NET某醫療器械清洗系統 卡死分析

    <a href="https://www.cnblogs.com/huangxincheng/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/214741/20200614104537.png" alt="" /&g...

    uj5u.com 2023-04-18 08:39:04 more
  • 記一次 .NET某醫療器械清洗系統 卡死分析

    一:背景 1. 講故事 前段時間協助訓練營里的一位朋友分析了一個程式卡死的問題,回過頭來看這個案例比較經典,這篇稍微整理一下供后來者少踩坑吧。 二:WinDbg 分析 1. 為什么會卡死 因為是表單程式,理所當然就是看主執行緒此時正在做什么? 可以用 ~0s ; k 看一下便知。 0:000> k # ......

    uj5u.com 2023-04-18 08:33:10 more
  • SignalR, No Connection with that ID,IIS

    <a href="https://www.cnblogs.com/smartstar/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/u36196.jpg" alt="" /></a>...

    uj5u.com 2023-03-30 17:21:52 more
  • 一次對pool的誤用導致的.net頻繁gc的診斷分析

    <a href="https://www.cnblogs.com/dotnet-diagnostic/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/3115652/20230225090434.png" alt=""...

    uj5u.com 2023-03-28 10:15:33 more
  • 一次對pool的誤用導致的.net頻繁gc的診斷分析

    <a href="https://www.cnblogs.com/dotnet-diagnostic/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/3115652/20230225090434.png" alt=""...

    uj5u.com 2023-03-28 10:13:31 more
  • C#遍歷指定檔案夾中所有檔案的3種方法

    <a href="https://www.cnblogs.com/xbhp/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/957602/20230310105611.png" alt="" /></a&...

    uj5u.com 2023-03-27 14:46:55 more
  • C#/VB.NET:如何將PDF轉為PDF/A

    <a href="https://www.cnblogs.com/Carina-baby/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/2859233/20220427162558.png" alt="" />...

    uj5u.com 2023-03-27 14:46:35 more
  • 武裝你的WEBAPI-OData聚合查詢

    <a href="https://www.cnblogs.com/podolski/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/616093/20140323000327.png" alt="" /><...

    uj5u.com 2023-03-27 14:46:16 more