之前待了7年的公司倒閉,終于找了一份真正的程式員作業,總算體驗了996的感覺,現在專案接近尾聲了,總算有點時間下寫博客了,找作業時看到中高級工程師都要求熟練\精通掌握RabbitMQ跟CAP,而做為中級開發工程師的我意識到,不得不學,這幾天找了時間學習了下,
以下我的理解說法不知規不規范,只是用我最通俗的理解寫出來
RabbitMQ是一種底層佇列的實作(Kafka也是一種佇列),CAP提供了一種通用佇列發布、訂閱使用方法,
可以理解為SqlServer、MySql跟EF的關系吧(EF比作為CAP),你不通過EF,也可以用SqlClient相關類來使用SqlServer,但EF提供一種通用的代碼使用方法,同樣的代碼可以同時用于SqlServer、MySql,代碼使用者不用關心我底層是用了SqlServer還是MySql,
RabbitMQ簡單介紹
我先說下RabbitMQ原生的使用方法,然后再說下怎么跟CAP結合
說到佇列,先理解下以下幾個概念
- 生產者:也可以說是發布者,主要是發布訊息,發送給交換器;
- 消費者:也可以說是訂閱者,從佇列中訂閱訊息進行處理并回傳應答;
- 交換器:可能連接多個佇列,將生產者發布的訊息發送到佇列中;
- 佇列:存放生產者產生的訊息,供消費者進行訂閱處理;
訊息從發布到訂閱的流程步驟:
生產者發布訊息給交換器(傳遞一個key值),交換器在它系結的佇列中根據key值及交換器模式找到匹配的佇列發送訊息,訂閱了此佇列的消費者就可以獲取訊息進行處理,并回傳應答;
交換器模式
- direct 訊息發送到RouteKey完全匹配的佇列中
- fanout 訊息轉發到交換器系結的所有佇列中
- topic 訊息發送到RouteKey模糊匹配的佇列中
- header 會用headers屬性來進行匹配,性能最差(實際使用中很少)
topic匹配規則:佇列的key為TestRouteKey.#,可以匹配到 TestRouteKey.A.B,佇列的key為TestRouteKey.*,可以匹配到TestRouteKey.A
以下是需要注意理解的點
- 生產者也可以直接發布訊息到佇列中;
- 如果交換器沒有系結任何佇列,那發布的訊息將直接丟棄;
- 一個訊息只能被一個消費者獲取,要實作訊息同時被多個消費者獲取,要使用交換器系結多個佇列;
- 消費者獲取訊息后,如果處理程序中失敗了沒有回傳應答,那訊息會在佇列中重新發送;
以下演示,可以創建兩個控制臺程式,然后在Main里面寫相關代碼進行測驗
生產者發布訊息代碼
安裝包 RabbitMQ.Client
//連接工廠 var factory = new ConnectionFactory(){ UserName="", Password="", HostName="", Port=0 }; //創建連接 var connection = factory.CreateConnection(); //創建通道 var channel = connection.CreateModel(); //宣告交換器,模式為direct channel.ExchangeDeclare("exchangeName","direct"); //宣告佇列 channel.QueueDeclare("queueName",durable:true); //將交換器跟佇列進行系結 channel.QueueBind("queueName","exchangeName","routeKey",null); //發布訊息 channel.BasicPublish("exchangeName","routeKey",null,Encoding.UTF8.GetBytes("hello world")); channel.Close(); connection.Close();
消費者訂閱訊息
var factory = new ConnectionFactory { UserName = "", Password = "", HostName = "", }; //創建個連接 var connection = factory.CreateConnection(); //創建個通道 var channel = connection.CreateModel(); var consumer = new EventingBasicConsumer(channel); //定義事件消費者,及消費接收事件(回傳應答) consumer.Received += (o, e) => { var message = Encoding.UTF8.GetString(e.Body.ToArray()); Console.WriteLine($"收到訊息:{message}"); channel.BasicAck(e.DeliveryTag, false); }; //啟動消費者,第二個引數是代表是否自動應答,false就得手動呼叫BasicAck方法 channel.BasicConsume("hello", false, consumer); Console.WriteLine("消費者已啟動"); Console.ReadKey(); channel.Close(); connection.Close();
CAP簡單介紹
上面簡單的介紹完RabbitMQ使用方法,下面再來簡單說下CAP是干什么的
CAP可用于微服務分布式事務解決方案,就是可以搭建不同站點,使用CAP,連接同一個RabbitMQ,部署在不同的服務器上,實作分布式部署,
那要實作CAP,需要一個資料庫來記錄事件,需要一個佇列來存放事件訊息,
CAP更詳情的檔案可查看它的官網,重點有中文的 http://cap.dotnetcore.xyz/
創建一個WebApi初始專案來演示一下,
安裝包 DotNetCore.CAP
安裝包DotNetCore.CAP.SqlServer,這是提供Sqlserver來記錄事件的包
安裝包 DotNetCore.CAP.RabbitMQ,這是提供RabbitMQ來存放事件訊息的包
安裝包 DotNetCore.CAP.Dashboard,這是提供一個Web管理后臺可查看發布、訂閱訊息情況
在Startup.cs的ConfigureServices方法中注入
services.AddCap(o=>{ o.UseSqlServer(""); o.UseRabbitMQ(mq => { mq.HostName = "";//RabbitMQ服務器地址 mq.Port=5672; mq.UserName = "admin"; mq.Password = "admin"; }); o.UseDashboard(); //添加監控儀表盤,通過http://localhost/cap訪問 o.FailedRetryInterval = 30;//失敗后的重拾間隔,默認60秒 o.FailedRetryCount = 10;//失敗后的重試次數,默認50次;在FailedRetryInterval默認60秒的情況下,即默認重試50*60秒(50分鐘)之后放棄失敗重試 o.SucceedMessageExpiredAfter = 60 * 60; //設定成功資訊的洗掉時間默認24*3600秒 });
然后在Controllers目錄下創建一個測驗控制器
[ApiController] [Route("[controller]/[action]")] public class TestController : ControllerBase { private readonly ICapPublisher _capPublisher; public TestController(ICapPublisher capPublisher) { _capPublisher = capPublisher; } [HttpPost] public void Test1() { //發布訊息,訊息被訂閱處理后,會回呼到Test.Callback _capPublisher.Publish<string>("Test.Event", "Hello,World","Test.Callback"); } [NonAction] [CapSubscribe("Test.Event")] //訂閱Test.Event事件 public string Test2(string message) { //進行訂閱訊息處理 Console.WriteLine(message); return "OK"; } [NonAction] [CapSubscribe("Test.Callback")] public void TestCallback(string result) { //發布訊息完成后的回呼 Console.WriteLine(result); } }
好了,上面就簡單的介紹了RabbitMQ跟CAP的使用方法,本來還在想這些東西適用于哪些場景,然后今天專案上線后出現問題了,里面涉及到兩個系統的呼叫,一個系統A因為介面被頻繁地呼叫超時,導致另一個系統B一直顯示出錯,我就發現這個場景就很適合用這個CAP了,
系統A的崩潰不應影響到系統B,而系統A崩潰時也可以自動進行重試,當系統B發布訊息后,也不用等待系統A,顯示處理中,等系統A處理成功后再通知系統B,B再顯示成功就可以了,
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/242637.html
標籤:C#
