1.RabbitMQ簡介
因為RabbitMQ是基于開源的AMQP協議來實作的,所以在了解MQ時候,首先我們來了解下AMQP協議,AMQP,即Advanced Message Queuing Protocol,一個提供統一訊息服務的應用層標準高級訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中間件設計,基于此協議的客戶端與訊息中間件可傳遞訊息,并不受客戶端或者中間件不同產品、不同的開發語言等條件的限制,也就是說訊息生產者無需知道消費者如何處理訊息結果,反之亦然,解耦了組件跟組件依賴,RabbitMQ服務器端用Erlang語言撰寫,同時也支持多種客戶端來開發跨語言訊息傳遞,例如:Java,.NET,PHP,Python,JavaScript,Ruby,Go等,RabbitMQ還支持多種訊息傳遞協議、訊息排隊、傳遞確認、佇列的靈活路由、多種交換型別,還支持分布式集群以實作高可用性和吞吐量,適用于排隊演算法、秒殺活動、訊息分發、異步處理、資料同步、處理耗時任務、CQRS等應用場景,還可以通過HTTP-API命令列工具和用于管理和監視RabbitMQ的UI,
2.RabbitMQ在CentOS 7安裝
因為我對Linux運維知識面比較薄弱,所以在Linux上部署RabbitMQ這塊暫時不想耗太多時間在這上面去(后續有時間再深入了解),這里我完全是跟著園區Net大神曉晨大佬這篇文章(https://www.cnblogs.com/stulzq/p/7551819.html)去部署的,網上也有很多RabbitMQ在Linux部署文章參考,大家也可以自行度娘,
//安裝服務端erlang語言 rpm -Uvh https://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm //安裝socat yum install socat //安裝服務端RabbitMQ rpm -Uvh https://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/rabbitmq-server-3.6.12-1.el7.noarch.rpm
3.RabbitMQ服務端常用命令
//啟用Web管理平臺 rabbitmq-plugins enable rabbitmq_management //開啟服務 systemctl start rabbitmq-server.service //停止服務 systemctl stop rabbitmq-server.service //查看服務狀態 systemctl status rabbitmq-server.service //查看RabbitMQ狀態 rabbitmqctl status //添加用戶賦予管理員權限 rabbitmqctl add_user tom 12345 rabbitmqctl set_user_tags tom administrator //查看用戶串列 rabbitmqctl list_users //洗掉用戶 rabbitmqctl delete_user username //修改用戶密碼 rabbitmqctl oldPassword Username newPassword
4.訪問RabbitMQ Web管理平臺
當啟用RabbitMQ Web管理平臺,我們根據部署CentOS 7系統的IP在瀏覽器上打開http://IP:15672,如果新增了用戶,一定要設定新增用戶的VirtualHost的權限,不然客戶端呼叫RabbitMQ時候會報錯!具體處理方法如下截圖:
//未設定權限時

點解設定權限即可,
如果訪問顯示404,則是防火墻把通訊給過濾掉了,請執行命令把防火墻關閉掉再打開,以下我列出所有CentOS 7關于防火墻命令:
//查看防火狀態 systemctl status firewalld //暫時關閉防火墻 systemctl stop firewalld //永久關閉防火墻 systemctl disable firewalld //重啟防火墻 systemctl enable firewalld //永久關閉后重啟 chkconfig iptables on
關閉防火墻之后,在瀏覽器上就會看到下面管理平臺界面:
5.NET Core使用RabbitMQ
通過nuget安裝:https://www.nuget.org/packages/RabbitMQ.Client/
5.1定義生產者
class Program { static void Main(string[] args) { string queueName = "DirectExchangeQueueName"; string routeKey = "DirectExchangeQueueName"; //創建連接工廠 var factory = new ConnectionFactory { UserName = "dengwu",//用戶名 Password = "123456",//密碼 HostName = "192.168.112.133",//rabbitmq ip }; //創建連接 var connection = factory.CreateConnection(); //創建通道 var channel = connection.CreateModel(); //宣告一個佇列 channel.QueueDeclare(queueName, false, false, false, null); Console.WriteLine("\nRabbitMQ連接成功,請輸入訊息,輸入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //發布訊息 channel.BasicPublish("", routeKey, null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close(); } }
5.2定義消費者
class Program { static void Main(string[] args) { string queueName = "DirectExchangeQueueName"; //創建連接工廠 var factory = new ConnectionFactory { UserName = "dengwu",//用戶名 Password = "123456",//密碼 HostName = "192.168.112.133",//rabbitmq ip }; //創建連接 var connection = factory.CreateConnection(); //創建通道 var channel = connection.CreateModel(); //事件基本消費者 var consumer = new EventingBasicConsumer(channel); //接收到訊息事件 consumer.Received += (ch, ea) => { var boby = ea.Body; var message = Encoding.UTF8.GetString(boby.ToArray()); Console.WriteLine($"收到訊息: {message}"); //確認該訊息已被消費 channel.BasicAck(ea.DeliveryTag, false); //Console.WriteLine($"收到該訊息[{ea.DeliveryTag}] 延遲10s發送回執"); //Thread.Sleep(10000); //Console.WriteLine($"已發送回執[{ea.DeliveryTag}]"); }; //啟動消費者 設定為手動應答訊息 channel.BasicConsume(queueName, false, consumer); Console.WriteLine("消費者已啟動"); Console.ReadKey(); channel.Dispose(); connection.Close(); } }
運行:
通過啟動一個生產者,一個消費者,我們可以看到,生產者通過RabbitMQ決定投遞訊息給對應消費者,
5.3RabbitMQ消費失敗的處理
RabbitMQ采用訊息應答機制,即消費者收到一個訊息之后,需要發送一個應答,然后RabbitMQ才會將這個訊息從佇列中洗掉,如果消費者在消費程序中出現例外,斷開連接沒有發送應答,那么RabbitMQ會將這個訊息重新投遞,下面我們將消費者接收到訊息事件代碼修改如下:
//接收到訊息事件 consumer.Received += (ch, ea) => { var boby = ea.Body; var message = Encoding.UTF8.GetString(boby.ToArray()); Console.WriteLine($"收到該訊息[{ea.DeliveryTag}] 延遲10s發送回執"); Thread.Sleep(10000); Console.WriteLine($"已發送回執[{ea.DeliveryTag}]"); };
先在生產者里面預先傳遞三個訊息:

如果我們設定了訊息應答延遲10s,如果在這10s中,該消費者斷開了連接,那么訊息會被RabbitMQ重新投遞的,具體大家可以自行測驗,
6.總結
該章節主要簡單介紹RabbitMQ概念在Linux上簡單部署,接下來章節,我會陸續介紹AMQP Messaging中的基本概念跟Exchange(交換機),
參考文獻:
RabbitMQ官網
.NET Core 使用RabbitMQ
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/4880.html
標籤:.NET Core
