主頁 > .NET開發 > C#佇列學習筆記:RabbitMQ安裝及使用

C#佇列學習筆記:RabbitMQ安裝及使用

2020-09-16 11:40:48 .NET開發

    一、環境搭建

    1.1、由于RabbitMQ是使用Erlang語言開發的,因此要安裝Erlang運行時環境,下載地址:Erlang官網下載  CSDN分享下載

    1.2、去RabbitMQ官網下載RabbitMQ Server服務端程式,選擇合適的平臺版本下載并安裝,

    RabbitMQ安裝時,會自動在Windows服務中創建RabbitMQ服務,并自動啟動,

    1.3、開始->所有程式->RabbitMQ Server->RabbitMQ Command Prompt (sbin dir):

    運行RabbitMQ Command Prompt與cmd下cd C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.3\sbin的效果是一樣的,

    1.3.1、sbin目錄下的rabbitmqctl.bat,是用來查看和控制服務端狀態的,運行rabbitmqctl status檢查RabbitMQ狀態:

    1.3.3、RabbitMQ Server上面也有用戶概念,使用rabbitmqctl list_users命令,可以看到目前的用戶:

    可以看到,現在只有一個名為gues角色為administratort的用戶,這個是RabbitMQ默認為我們創建的,它有RabbitMQ的所有權限,一般情況下,我們需要新建一個自己的用戶,并設定密碼及授予權限,同時設定為管理員,操作方法如下:

rabbitmqctl add_user hello world
rabbitmqctl set_permissions hello ".*" ".*" ".*"
rabbitmqctl set_user_tags hello administrator

    上面的第一命令添加了一個名為hello的用戶并設定了密碼world;第二條命令為用戶hello分別授予對所有訊息佇列的配置、讀和寫的權限;第三條命令將用戶hello設定為管理員,

    現在我們可以將默認的guest用戶刪掉,使用下面的命令即可:

rabbitmqctl delete_user guest

    如果要修改密碼,可以使用下面的命令:

rabbitmqctl change_password {username} {newpassowrd}

   二、管理界面

    RabbitMQ還有一個管理界面,是以插件形式提供的,通過該界面可以查看RabbitMQ Server當前的狀態,啟用命令如下: 

rabbitmq-plugins enable rabbitmq_management

    現在,在瀏覽器中輸入 http://server-name:15672/ 即可,

    注:server-name為計算機名或IP地址,如果是本地的,直接用localhost即可,登錄界面,使用我們之前創建的hello用戶登錄,

    三、開始使用

    在.NET中使用RabbitMQ需要下載RabbitMQ客戶端程式集,下載解壓后在bin下找到RabbitMQ.Client.dll,并添加參考到專案中,

    3.1、Hello World

    為了展示RabbitMQ的基本使用,我們發送一個HelloWorld訊息,然后接收并處理,

rabbitmq hello world

    3.1.1、創建一個名為Send的客戶端控制臺程式,用來將訊息發送到RabbitMQ訊息佇列中,代碼如下:

    class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.宣告佇列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    //5.構建byte訊息資料包
                    string message = args.Length > 0 ? args[0] : "Hello World";
                    var body = Encoding.UTF8.GetBytes(message); //訊息是以二進制陣列的形式傳輸的
                    //6.發送資料包
                    channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
                    Console.WriteLine($"Send {message}");
                    Console.Read();
                }
            }
            #endregion
        }
    }
Send.cs

    3.1.2、創建一個名為Receive的服務端控制臺程式,用來接收RabbitMQ訊息佇列中的訊息,代碼如下:

    class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.創建信道
                using (var channel = connection.CreateModel())
                {
                    //4.宣告佇列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    //5.構造消費者實體
                    var consumer = new EventingBasicConsumer(channel);
                    //6.系結訊息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine($"Received {message}");
                    };
                    //7.啟動消費者
                    channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }
Receive.cs

    3.1.3、先運行訊息接收端,再運行訊息發送端,結果如下:

    從上面的代碼中可以看出,發送端和接收端的代碼前4步都是一樣的,主要的區別在于發送端呼叫channel.BasicPublish方法發送訊息,而接收端需要實體化一個EventingBasicConsumer實體來進行訊息處理,另外一點需要注意的是:訊息接收端和發送端的佇列名稱(queue)必須保持一致,這里指定的佇列名稱為hello,

    3.2、作業佇列

    作業佇列(work queues,又稱Task Queues)的主要思想是:為了避免立即執行一些實時性要求不高但是比較耗資源或時間的操作(如寫日志),把任務當作訊息發送到佇列中,由一個運行在后臺的作業者(worker)行程取出并處理,當有多個作業者(workers)運行時,任務會在它們之間共享,

    現在發送一些字串來模擬耗時的任務,在字串中加上點號(.)來表示任務的復雜程度,一個點號將會耗時1秒鐘,比如"Hello World..."就會耗時3秒鐘,

    class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.宣告佇列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    //5.構建byte訊息資料包
                    string message = args.Length > 0 ? string.Join(" ", args) : "Hello World...";
                    var properties = channel.CreateBasicProperties();
                    properties.DeliveryMode = 2;//設定訊息是否持久化 1:非持久化 2:持久化
                    var body = Encoding.UTF8.GetBytes(message);//訊息是以二進制陣列的形式傳輸的
                    //6.發送資料包
                    channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
                    Console.WriteLine($"Send {message}");
                    Console.Read();
                }
            }
            #endregion
        }
    }
Send.cs
    class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.創建信道
                using (var channel = connection.CreateModel())
                {
                    //4.宣告佇列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    //5.構造消費者實體
                    var consumer = new EventingBasicConsumer(channel);
                    //6.系結訊息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                    };
                    //7.啟動消費者
                    channel.BasicConsume(queue: "hello", noAck: true, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }
Receive.cs

    3.3輪詢分發

    使用作業佇列的一個好處就是它能夠并行的處理佇列,如果堆積了很多任務,我們只需要添加更多的作業者(workers)就可以了,擴展很簡單,

    現在,我們先啟動兩個接收端,等待接受訊息,然后啟動一個發送端開始發送訊息(cmd->send.exe所在的目錄),

    上面發了10條資訊,兩個接收端各收到5條資訊,

    默認情況下,RabbitMQ會將每個訊息按照順序依次分發給下一個消費者,所以每個消費者接收到的訊息個數大致是平均的, 這種訊息分發的方式稱之為輪詢(round-robin),

    3.4、訊息回應

    當處理一個比較耗時得任務的時候,也許想知道消費者(consumers)是否運行到一半就掛掉,在當前的代碼中,當RabbitMQ將訊息發送給消費者之后,馬上就會將該訊息從佇列中移除,此時,如果把處理這個訊息的作業者(worker)停掉,正在處理的這條訊息就會丟失,同時,所有發送到這個作業者的還沒有處理的訊息都會丟失,

    我們不想丟失任何任務訊息,如果一個作業者掛掉了,我們希望該訊息能夠重新發送給其它的作業者,

    為了防止訊息丟失,RabbitMQ提供了訊息回應(acknowledgments)機制,消費者會通過一個ack(回應),告訴RabbitMQ已經收到并處理了某條訊息,然后RabbitMQ才會釋放并洗掉這條訊息,如果消費者掛掉了,沒有發送回應,RabbitMQ就會認為訊息沒有被完全處理,然后重新發送給其它消費者,這樣,即使作業者偶爾的掛掉,也不會丟失訊息,

    訊息是沒有超時這個概念的,當作業者與它斷開連的時候,RabbitMQ會重新發送訊息,這樣在處理一個耗時非常長的訊息任務的時候就不會出問題了,

    訊息回應默認是開啟的,在之前的例子中使用了no_Ack=true標識把它關閉,是時候移除這個標識了,當作業者完成了任務,就會發送一個回應,

    下面修改Receive.cs,主要改動的是:將 autoAck:true修改為autoAck:fasle,以及在訊息處理完畢后手動呼叫BasicAck方法進行手動訊息確認,

    class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.創建信道
                using (var channel = connection.CreateModel())
                {
                    //4.宣告佇列
                    channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    //5.構造消費者實體
                    var consumer = new EventingBasicConsumer(channel);
                    //6.系結訊息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                        //7.發送訊息確認信號(手動訊息確認)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //8.啟動消費者(noAck: false 啟用訊息回應)
                    channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }
Receive.cs

    一個很常見的錯誤就是忘掉了BasicAck這個方法,這個錯誤很常見,但是后果很嚴重,當客戶端退出時,待處理的訊息就會被重新分發,但是RabitMQ會消耗越來越多的記憶體,因為這些沒有被應答的訊息不能夠被釋放,除錯這種case,可以使用rabbitmqctl列印messages_unacknowledged欄位,

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.3\sbin>rabbitmqctl list_queues name messages_ready messages_unacknowledged
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages_ready  messages_unacknowledged
hello   1       0

    3.5、訊息持久化

    訊息確認確保了即使消費端例外,訊息也不會丟失能夠被重新分發處理,但是如果RabbitMQ服務端例外,訊息依然會丟失,除非我們指定durable:true,否則當RabbitMQ退出或崩潰時,訊息將依然會丟失,通過指定durable:true(佇列),并指定Persistent=true(訊息),來告知RabbitMQ將訊息持久化,一句話概括:需要保證佇列和訊息都是持久化的,

    class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.宣告佇列(指定durable:true,告知rabbitmq對訊息進行持久化,)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //將訊息標記為持久性 - 將IBasicProperties.SetPersistent設定為true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //5.構建byte訊息資料包
                    string message = args.Length > 0 ? args[0] : "Hello World";
                    var body = Encoding.UTF8.GetBytes(message);//訊息是以二進制陣列的形式傳輸的
                    //6.發送資料包(指定basicProperties)
                    channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);
                    Console.WriteLine($"Send {message}");
                    Console.Read();
                }
            }
            #endregion
        }
    }
Send.cs
    class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.創建信道
                using (var channel = connection.CreateModel())
                {
                    //4.宣告佇列(指定durable:true,告知rabbitmq對訊息進行持久化,)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //將訊息標記為持久性 - 將IBasicProperties.SetPersistent設定為true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //5.構造消費者實體
                    var consumer = new EventingBasicConsumer(channel);
                    //6.系結訊息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                        //7.發送訊息確認信號(手動訊息確認)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //8.啟動消費者(noAck: false 啟用訊息回應)
                    channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }
Receive.cs

    將訊息標記為持久性不能完全保證訊息不會丟失,雖然它告訴RabbitMQ將訊息保存到磁盤,但是當RabbitMQ接受訊息并且還沒有保存時??,仍然有一個很短的時間視窗,RabbitMQ可能只是將訊息保存到了快取中,并沒有將其寫入到磁盤上,持久化不是一定能夠保證的,但是對于一個簡單任務佇列來說已經足夠,

    如果需要確保訊息佇列的持久化,可以使用publisher confirms,

    3.6、公平分發

    你可能會注意到,訊息的分發可能并沒有如我們想要的那樣公平分配,比如,對于兩個作業者,當奇數個訊息的任務比較重但是偶數個訊息任務比較輕時,奇數個作業者始終處于忙碌狀態,而偶數個作業者始終處于空閑狀態,但是RabbitMQ并不知道這些,它仍然會平均依次地分發訊息,

    為了改變這一狀態,我們可以使用basicQos方法,設定perfetchCount=1 ,這樣就告訴RabbitMQ 不要在同一時間給一個作業者發送多于1個的訊息,換句話說,在一個作業者還在處理訊息并且沒有回應訊息之前,不要給它分發新的訊息,而是將這條新的訊息發送給下一個不那么忙碌的作業者,

//Receive.cs
//4.宣告佇列(指定durable:true,告知rabbitmq對訊息進行持久化,)
//channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
channel.QueueDeclare("hello", true, false, false, null);
//將訊息標記為持久性 - 將IBasicProperties.SetPersistent設定為true
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//設定prefetchCount : 1來告知RabbitMQ,在未收到消費端的訊息確認時,不再分發訊息,
//channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
channel.BasicQos(0, 1, false);

    3.7完整實體

    class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.宣告佇列(指定durable:true,告知rabbitmq對訊息進行持久化,)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //將訊息標記為持久性 - 將IBasicProperties.SetPersistent設定為true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //5.構建byte訊息資料包
                    string message = args.Length > 0 ? args[0] : "Hello World";
                    var body = Encoding.UTF8.GetBytes(message);//訊息是以二進制陣列的形式傳輸的
                    //6.發送資料包(指定basicProperties)
                    channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);
                    Console.WriteLine($"Send {message}");
                    Console.Read();
                }
            }
            #endregion
        }
    }
Send.cs
    class Program
    {
        static void Main(string[] args)
        {
            #region Hello World
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.創建信道
                using (var channel = connection.CreateModel())
                {
                    //4.宣告佇列(指定durable:true,告知rabbitmq對訊息進行持久化,)
                    channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    //將訊息標記為持久性 - 將IBasicProperties.SetPersistent設定為true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //設定prefetchCount : 1來告知RabbitMQ,在未收到消費端的訊息確認時,不再分發訊息,
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    //5.構造消費者實體
                    var consumer = new EventingBasicConsumer(channel);
                    //6.系結訊息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                        //7.發送訊息確認信號(手動訊息確認)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //8.啟動消費者(noAck: false 啟用訊息回應)
                    channel.BasicConsume(queue: "hello", noAck: false, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }
Receive.cs

    四、Exchange

    上面的示例,生產者和消費者直接是通過相同佇列名稱進行匹配銜接的,消費者訂閱某個佇列,生產者創建訊息發布到佇列中,佇列再將訊息轉發到訂閱的消費者,這樣就會有一個局限性,即消費者一次只能發送訊息到某一個佇列,

    那消費者如何才能發送訊息到多個訊息佇列呢?

    RabbitMQ提供了Exchange,它類似于路由器的功能,對訊息進行路由,將訊息發送到多個佇列上,Exchange一方面從生產者接收訊息,另一方面將訊息推送到佇列,但是Exchange是如何知道將訊息附加到哪個佇列或者直接忽略的呢?這些其實是由Exchange Type來定義的,關于Exchange的圖文介紹,請看上一篇《C#佇列學習筆記:RabbitMQ基礎知識》,此處僅提供示例代碼,

    4.1、fanout

    class Program
    {
        static void Main(string[] args)
        {
            #region fanout exchange type
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用fanout exchange type,指定exchange名稱,
                    channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
                    //將訊息標記為持久性 - 將IBasicProperties.SetPersistent設定為true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //5.構建byte訊息資料包
                    for (int i = 1; i <= 10; i++)
                    {
                        string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i);
                        var body = Encoding.UTF8.GetBytes(message);//訊息是以二進制陣列的形式傳輸的
                        //6.發送資料包(指定exchange;fanout型別無需指定routingKey;指定basicProperties)
                        channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: properties, body: body);
                        Console.WriteLine($"Send {message}");
                    }
                    Console.Read();
                }
            }
            #endregion
        }
    }
Send.cs
    class Program
    {
        static void Main(string[] args)
        {
            #region fanout exchange type
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.創建信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用fanout exchange type,指定exchange名稱,
                    channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
                    //5.宣告佇列(隨機生成佇列名稱)
                    var queueName = channel.QueueDeclare().QueueName;
                    //系結佇列到指定fanout型別exchange,fanout型別無需指定routingKey,
                    channel.QueueBind(queue: queueName, exchange: "fanoutEC", routingKey: "");
                    //將訊息標記為持久性 - 將IBasicProperties.SetPersistent設定為true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //設定prefetchCount : 1來告知RabbitMQ,在未收到消費端的訊息確認時,不再分發訊息,
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    //6.構造消費者實體
                    var consumer = new EventingBasicConsumer(channel);
                    //7.系結訊息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                        //8.發送訊息確認信號(手動訊息確認)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //9.啟動消費者(noAck: false 啟用訊息回應)
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }
Receive.cs

    4.2、direct

    class Program
    {
        static void Main(string[] args)
        {
            #region direct exchange type
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用direct exchange type,指定exchange名稱,
                    channel.ExchangeDeclare(exchange: "directEC", type: "direct");
                    //將訊息標記為持久性 - 將IBasicProperties.SetPersistent設定為true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //5.構建byte訊息資料包
                    for (int i = 1; i <= 10; i++)
                    {
                        string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i);
                        var body = Encoding.UTF8.GetBytes(message);//訊息是以二進制陣列的形式傳輸的
                        //6.發送資料包(指定exchange;direct型別必須指定routingKey;指定basicProperties)
                        channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: properties, body: body);
                        Console.WriteLine($"Send {message}");
                    }
                    Console.Read();
                }
            }
            #endregion
        }
    }
Send.cs
    class Program
    {
        static void Main(string[] args)
        {
            #region direct exchange type
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.創建信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用direct exchange type,指定exchange名稱,
                    channel.ExchangeDeclare(exchange: "directEC", type: "direct");
                    //5.宣告佇列(隨機生成佇列名稱)
                    var queueName = channel.QueueDeclare().QueueName;
                    //系結佇列到指定direct型別exchange,direct型別必須指定routingKey,
                    channel.QueueBind(queue: queueName, exchange: "directEC", routingKey: "green");
                    //將訊息標記為持久性 - 將IBasicProperties.SetPersistent設定為true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //設定prefetchCount : 1來告知RabbitMQ,在未收到消費端的訊息確認時,不再分發訊息,
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    //6.構造消費者實體
                    var consumer = new EventingBasicConsumer(channel);
                    //7.系結訊息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                        //8.發送訊息確認信號(手動訊息確認)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //9.啟動消費者(noAck: false 啟用訊息回應)
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }
Receive.cs

    4.3、topic

    class Program
    {
        static void Main(string[] args)
        {
            #region topic exchange type
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用topic exchange type,指定exchange名稱,
                    channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
                    //將訊息標記為持久性 - 將IBasicProperties.SetPersistent設定為true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //5.構建byte訊息資料包
                    for (int i = 1; i <= 10; i++)
                    {
                        string message = args.Length > 0 ? args[0] : "Hello World" + new string('.', i);
                        var body = Encoding.UTF8.GetBytes(message);//訊息是以二進制陣列的形式傳輸的
                        //6.發送資料包(指定exchange;topic型別必須指定routingKey;指定basicProperties)
                        channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: properties, body: body);
                        Console.WriteLine($"Send {message}");
                    }
                    Console.Read();
                }
            }
            #endregion
        }
    }
Send.cs
    class Program
    {
        static void Main(string[] args)
        {
            #region topic exchange type
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.創建信道
                using (var channel = connection.CreateModel())
                {
                    //4.使用topic exchange type,指定exchange名稱,
                    channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
                    //5.宣告佇列(隨機生成佇列名稱)
                    var queueName = channel.QueueDeclare().QueueName;
                    //系結佇列到指定topic型別exchange,topic型別必須指定routingKey,
                    channel.QueueBind(queue: queueName, exchange: "topicEC", routingKey: "#.*.fast");
                    //將訊息標記為持久性 - 將IBasicProperties.SetPersistent設定為true
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //設定prefetchCount : 1來告知RabbitMQ,在未收到消費端的訊息確認時,不再分發訊息,
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    //6.構造消費者實體
                    var consumer = new EventingBasicConsumer(channel);
                    //7.系結訊息接收后的事件委托
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int dots = message.Split('.').Length - 1;
                        Thread.Sleep(dots * 1000);
                        Console.WriteLine($"Received {message}");
                        //8.發送訊息確認信號(手動訊息確認)
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //9.啟動消費者(noAck: false 啟用訊息回應)
                    channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
                    Console.Read();
                }
            }
            #endregion
        }
    }
Receive.cs

    五、RPC

    RPC--Remote Procedure Call,遠程程序呼叫,RabbitMQ是如何進行遠程呼叫的呢?示意圖如下:

    第一步:主要是進行遠程呼叫的客戶端需要指定接收遠程回呼的佇列,并宣告消費者監聽此佇列,

    第二步:遠程呼叫的服務端除了要宣告消費端接收遠程呼叫請求外,還要將結果發送到客戶端用來監聽結果的佇列中去,

    class Program
    {
        static void Main(string[] args)
        {
            #region rpc
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.建立信道
                using (var channel = connection.CreateModel())
                {
                    //4.宣告唯一guid用來標識此次發送的遠程呼叫請求
                    var correlationId = Guid.NewGuid().ToString();
                    //5.宣告需要監聽的回呼佇列
                    var replyQueue = channel.QueueDeclare().QueueName;
                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;//將訊息標記為持久性
                    properties.ReplyTo = replyQueue;//指定回呼佇列
                    properties.CorrelationId = correlationId;//指定訊息唯一標識
                    //6.構建byte訊息資料包
                    string number = args.Length > 0 ? args[0] : "30";
                    var body = Encoding.UTF8.GetBytes(number);
                    //7.發送資料包
                    channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
                    Console.WriteLine($"Request fib({number})");
                    //8.創建消費者用于處理訊息回呼(遠程呼叫回傳結果)
                    var callbackConsumer = new EventingBasicConsumer(channel);
                    channel.BasicConsume(queue: replyQueue, noAck: false, consumer: callbackConsumer);
                    callbackConsumer.Received += (model, ea) =>
                    {
                        //僅當訊息回呼的ID與發送的ID一致時,說明遠程呼叫結果正確回傳,
                        if (ea.BasicProperties.CorrelationId == correlationId)
                        {
                            var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
                            Console.WriteLine($"{responseMsg}");
                        }
                    };
                    Console.Read();
                }
            }
            #endregion
        }
    }
Send.cs
    class Program
    {
        static void Main(string[] args)
        {
            #region rpc
            //1.實體化連接工廠
            var factory = new ConnectionFactory
            {
                HostName = "localhost",
                UserName = "hello",
                Password = "world"
            };
            //2.建立連接
            using (var connection = factory.CreateConnection())
            {
                //3.創建信道
                using (var channel = connection.CreateModel())
                {
                    //4.宣告佇列接收遠程呼叫請求
                    channel.QueueDeclare(queue: "rpc_queue", durable: true, exclusive: false, autoDelete: false, arguments: null);
                    var consumer = new EventingBasicConsumer(channel);
                    Console.WriteLine("Waiting for message.");
                    //5.請求處理邏輯
                    consumer.Received += (model, ea) =>
                    {
                        var message = Encoding.UTF8.GetString(ea.Body);
                        int n = int.Parse(message);
                        Console.WriteLine($"Receive request of Fib({n})");
                        int result = Fib(n);
                        //6.從請求的引數中獲取請求的唯一標識,在訊息回傳時同樣系結,
                        var properties = ea.BasicProperties;
                        var replyProerties = channel.CreateBasicProperties();
                        replyProerties.CorrelationId = properties.CorrelationId;
                        //7.將遠程呼叫結果發送到客戶端監聽的佇列上
                        channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
                            basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
                        //8.手動發回訊息確認
                        channel.BasicAck(ea.DeliveryTag, false);
                        Console.WriteLine($"Return result: Fib({n})= {result}");
                    };
                    channel.BasicConsume(queue: "rpc_queue", noAck: false, consumer: consumer);
                    Console.Read();
                }
            }

            int Fib (int n)
            {
                if (n <= 2)
                    return 1;
                else
                    return Fib(n - 1) + Fib(n - 2);
            }
            #endregion
        }
    }
Receive.cs

    六、總結

    本文介紹了RabbitMQ訊息代理在Windows上的安裝以及在.NET中的使用,訊息佇列在構建分布式系統、提高系統的可擴展性及回應性方面,有著很重要的作用,

 

    參考自:

    https://www.cnblogs.com/yangecnu/p/Introduce-RabbitMQ.html#!comments

    https://www.cnblogs.com/sheng-jie/p/7192690.html

 

轉載請註明出處,本文鏈接:https://www.uj5u.com/net/56708.html

標籤:C#

上一篇:C# 添加、修改、洗掉PPT中的超鏈接

下一篇:字串轉DateTime

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • WebAPI簡介

    Web體系結構: 有三個核心:資源(resource),URL(統一資源識別符號)和表示 他們的關系是這樣的:一個資源由一個URL進行標識,HTTP客戶端使用URL定位資源,表示是從資源回傳資料,媒體型別是資源回傳的資料格式。 接下來我們說下HTTP. HTTP協議的系統是一種無狀態的方式,使用請求/ ......

    uj5u.com 2020-09-09 22:07:47 more
  • asp.net core 3.1 入口:Program.cs中的Main函式

    本文分析Program.cs 中Main()函式中代碼的運行順序分析asp.net core程式的啟動,重點不是剖析原始碼,而是理清程式開始時執行的順序。到呼叫了哪些實體,哪些法方。asp.net core 3.1 的程式入口在專案Program.cs檔案里,如下。ususing System; us ......

    uj5u.com 2020-09-09 22:07:49 more
  • asp.net網站作為websocket服務端的應用該如何寫

    最近被websocket的一個問題困擾了很久,有一個需求是在web網站中搭建websocket服務。客戶端通過網頁與服務器建立連接,然后服務器根據ip給客戶端網頁發送資訊。 其實,這個需求并不難,只是剛開始對websocket的內容不太了解。上網搜索了一下,有通過asp.net core 實作的、有 ......

    uj5u.com 2020-09-09 22:08:02 more
  • ASP.NET 開源匯入匯出庫Magicodes.IE Docker中使用

    Magicodes.IE在Docker中使用 更新歷史 2019.02.13 【Nuget】版本更新到2.0.2 【匯入】修復單列匯入的Bug,單元測驗“OneColumnImporter_Test”。問題見(https://github.com/dotnetcore/Magicodes.IE/is ......

    uj5u.com 2020-09-09 22:08:05 more
  • 在webform中使用ajax

    如果你用過Asp.net webform, 說明你也算是.NET 開發的老兵了。WEBform應該是2011 2013左右,當時還用visual studio 2005、 visual studio 2008。后來基本都用的是MVC。 如果是新開發的專案,估計沒人會用webform技術。但是有些舊版 ......

    uj5u.com 2020-09-09 22:08:50 more
  • iis添加asp.net網站,訪問提示:由于擴展配置問題而無法提供您請求的

    今天在iis服務器配置asp.net網站,遇到一個問題,記錄一下: 問題:由于擴展配置問題而無法提供您請求的頁面。如果該頁面是腳本,請添加處理程式。如果應下載檔案,請添加 MIME 映射。 WindowServer2012服務器,添加角色安裝完.netframework和iis之后,運行aspx頁面 ......

    uj5u.com 2020-09-09 22:10:00 more
  • WebAPI-處理架構

    帶著問題去思考,大家好! 問題1:HTTP請求和回傳相應的HTTP回應資訊之間發生了什么? 1:首先是最底層,托管層,位于WebAPI和底層HTTP堆疊之間 2:其次是 訊息處理程式管道層,這里比如日志和快取。OWIN的參考是將訊息處理程式管道的一些功能下移到堆疊下端的OWIN中間件了。 3:控制器處理 ......

    uj5u.com 2020-09-09 22:11:13 more
  • 微信門戶開發框架-使用指導說明書

    微信門戶應用管理系統,采用基于 MVC + Bootstrap + Ajax + Enterprise Library的技術路線,界面層采用Boostrap + Metronic組合的前端框架,資料訪問層支持Oracle、SQLServer、MySQL、PostgreSQL等資料庫。框架以MVC5,... ......

    uj5u.com 2020-09-09 22:15:18 more
  • WebAPI-HTTP編程模型

    帶著問題去思考,大家好!它是什么?它包含什么?它能干什么? 訊息 HTTP編程模型的核心就是訊息抽象,表示為:HttPRequestMessage,HttpResponseMessage.用于客戶端和服務端之間交換請求和回應訊息。 HttpMethod類包含了一組靜態屬性: private stat ......

    uj5u.com 2020-09-09 22:15:23 more
  • 部署WebApi隨筆

    一、跨域 NuGet參考Microsoft.AspNet.WebApi.Cors WebApiConfig.cs中配置: // Web API 配置和服務 config.EnableCors(new EnableCorsAttribute("*", "*", "*")); 二、清除默認回傳XML格式 ......

    uj5u.com 2020-09-09 22:15:48 more
最新发布
  • C#多執行緒學習(二) 如何操縱一個執行緒

    <a href="https://www.cnblogs.com/x-zhi/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/2943582/20220801082530.png" alt="" /></...

    uj5u.com 2023-04-19 09:17:20 more
  • C#多執行緒學習(二) 如何操縱一個執行緒

    C#多執行緒學習(二) 如何操縱一個執行緒 執行緒學習第一篇:C#多執行緒學習(一) 多執行緒的相關概念 下面我們就動手來創建一個執行緒,使用Thread類創建執行緒時,只需提供執行緒入口即可。(執行緒入口使程式知道該讓這個執行緒干什么事) 在C#中,執行緒入口是通過ThreadStart代理(delegate)來提供的 ......

    uj5u.com 2023-04-19 09:16:49 more
  • 記一次 .NET某醫療器械清洗系統 卡死分析

    <a href="https://www.cnblogs.com/huangxincheng/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/214741/20200614104537.png" alt="" /&g...

    uj5u.com 2023-04-18 08:39:04 more
  • 記一次 .NET某醫療器械清洗系統 卡死分析

    一:背景 1. 講故事 前段時間協助訓練營里的一位朋友分析了一個程式卡死的問題,回過頭來看這個案例比較經典,這篇稍微整理一下供后來者少踩坑吧。 二:WinDbg 分析 1. 為什么會卡死 因為是表單程式,理所當然就是看主執行緒此時正在做什么? 可以用 ~0s ; k 看一下便知。 0:000> k # ......

    uj5u.com 2023-04-18 08:33:10 more
  • SignalR, No Connection with that ID,IIS

    <a href="https://www.cnblogs.com/smartstar/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/u36196.jpg" alt="" /></a>...

    uj5u.com 2023-03-30 17:21:52 more
  • 一次對pool的誤用導致的.net頻繁gc的診斷分析

    <a href="https://www.cnblogs.com/dotnet-diagnostic/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/3115652/20230225090434.png" alt=""...

    uj5u.com 2023-03-28 10:15:33 more
  • 一次對pool的誤用導致的.net頻繁gc的診斷分析

    <a href="https://www.cnblogs.com/dotnet-diagnostic/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/3115652/20230225090434.png" alt=""...

    uj5u.com 2023-03-28 10:13:31 more
  • C#遍歷指定檔案夾中所有檔案的3種方法

    <a href="https://www.cnblogs.com/xbhp/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/957602/20230310105611.png" alt="" /></a&...

    uj5u.com 2023-03-27 14:46:55 more
  • C#/VB.NET:如何將PDF轉為PDF/A

    <a href="https://www.cnblogs.com/Carina-baby/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/2859233/20220427162558.png" alt="" />...

    uj5u.com 2023-03-27 14:46:35 more
  • 武裝你的WEBAPI-OData聚合查詢

    <a href="https://www.cnblogs.com/podolski/" target="_blank"><img width="48" height="48" class="pfs" src="https://pic.cnblogs.com/face/616093/20140323000327.png" alt="" /><...

    uj5u.com 2023-03-27 14:46:16 more