系統重構解耦的程序涉及不同領域服務分拆,或同一服務下實時回應部分和非回應部分分拆,分解后的各部分通過異步訊息的流轉傳遞,完成整體的業務邏輯,但是頻繁的在業務層面直接呼叫不同訊息佇列的SDK,個人感覺不夠簡潔,最近開源一個中間件OSS.Dataflow,希望能幫到看到的同學,
OSS.Dataflow主要實作異步訊息傳遞的程序抽象,在業務層面提供訊息發布訂閱的統一抽象介面,在業務邏輯分支之間,以簡單的呼叫完成訊息的傳遞,和具體的訊息存盤觸發實作無關,同時,在底層的存盤和觸發層面提取介面,能夠在系統的全域適配具體的訊息基礎設施,(在這些介面之上,還實作了事件處理器,通過訊息的重復投放,實作事件執行的容錯補充機制,這個后邊文章再介紹,源代碼單元測驗有示例,)
一. 訊息業務側使用
OSS.Dataflow 的代碼可以通過Gitee和GitHub獲取,使用時可以通過Nuget直接安裝,也可以通過命令列:Install-Package OSS.DataFlow
組件的使用非常簡單,只需要關注:
- 訊息發布者介面,由組件注冊時回傳,供業務方法呼叫傳入訊息體,
- 訊息訂閱(消費)者介面實作或委托方法,在組件注冊時傳入,
具體示例:
- 訊息的發布訂閱獨立呼叫示例
// 全域初始化,注入訂閱者實作 const string msgPSKey = "Publisher-Subscriber-MsgKey"; DataFlowFactory.RegisterSubscriber<MsgData>(msgPSKey, async (data) => { // 當前通過注入消費的委托方法,也可通過介面實作 // DoSomething(data); return true; }); // 獲取發布者介面 private static readonly IDataPublisher publisher = DataFlowFactory.CreatePublisher(); // 業務方法中發布訊息 await publisher.Publish(msgPSKey,new MsgData() {name = "test"});
2. 訊息的流式呼叫示例
// 直接注冊消費實作并獲取訊息發布介面 private static readonly IDataPublisher _delegateFlowpusher = DataFlowFactory.RegisterFlow<MsgData>("delegate_flow",async (data) => { // 當前通過注入消費的委托方法,也可通過介面實作 // DoSomething(data); return true; }); // 業務方法中發布訊息 await _delegateFlowpusher.Publish("normal_flow",new MsgData() {name = "test"});
如上,只需要獲取發布者,并注入消費實作,即可完成整個訊息的異步消費處理,同一個訊息key可以注冊多個消費實作,當有訊息進入消費時,會并發處理,
二. 訊息底層存盤適配擴展
前邊介紹了業務介面的使用,和具體訊息佇列或資料庫等隔離,這是對接業務層面的使用,因為業務場景不同,不同的專案對訊息的回應速度和處理機制又各有需求,所以 OSS.DataFlow 同樣提供了對接訊息產品的擴展介面,方便使用者適配已有訊息基礎設施,
1. 訊息存盤適配介面
對于事件訊息處理,需要關注兩件事情:接收存盤 和 消費觸發,在類別庫中提供了 DataFlowManager 訊息流管理類,用戶可以通過實作IDataPublisherProvider介面,完成具體的存盤實作,
同時在不同的訊息產品觸發消費時(比如資料庫定時任務或者RabbitMQ消費), 呼叫通知方法(NotifySubscriber ),來觸發通過類別庫注冊的具體的業務訂閱處理,
// 訊息流核心部件管理者 public static class DataFlowManager { /// <summary> /// 自定義 資料流發布(存盤)實作的 提供者 /// </summary> public static IDataPublisherProvider PublisherProvider { get; set; } /// <summary> /// 通過自定義訊息觸發機制通知訂閱者 /// 呼叫時請做例外攔截,防止臟資料導致 msgData 型別錯誤 /// </summary> /// <param name="msgDataKey"></param> /// <param name="msgData">訊息內容,自定義觸發時,請注意和注冊訂閱者的消費資料型別轉換安全</param> /// <returns></returns> public static Task<bool> NotifySubscriber(string msgDataKey, object msgData) { .... } }
關于 IDataPublisherProvider
public interface IDataPublisherProvider { /// <summary> /// 資料發布者 /// </summary> /// <param name="option"></param> /// <returns> 回傳訊息發布介面實作 </returns> IDataPublisher CreatePublisher(DataPublisherOption option); } /// <summary> /// 資料的發布者 /// </summary> public interface IDataPublisher { /// <summary> /// 推進資料(存盤具體訊息佇列或者資料庫實作) /// </summary> /// <param name="dataKey"></param> /// <param name="data"></param> /// <returns>是否推入成功</returns> Task<bool> Publish<TData>(string dataKey,TData data); }
可以看到 IDataPublisher 介面負責具體的存盤實作,可以根據 DataPublisherOption 的 source_name 業務屬性實作對不同業務需求回傳不同的具體實作,
2. 默認實作介紹
借助.Net 自身的記憶體訊息佇列,在類別庫中提供了默認的內部訊息存盤轉發實作(記憶體級別),使用者可以自行實作擴展相關介面并進行全域配置,
內置的.Net Core訊息佇列, 設定了默認1個佇列,最大并發為32執行緒, 如果需要可以通過設定DataPublisherOption的source_name,類別庫將會為每個source_name 創建獨立的記憶體佇列,
如果你已經看到這里,并且感徑訓行的話可以在下方點個贊,或者也可以關注我的公總號(見二維碼)

轉載請註明出處,本文鏈接:https://www.uj5u.com/net/355992.html
標籤:.NET技术
上一篇:10分鐘簡單學習net core集成jwt權限認證,快速接入專案落地使用
下一篇:CentOS8.0安裝Nacos
