最近在開發一個輕量級ASP.NET MVC開發框架,需要加入日志記錄,郵件發送,短信發送等功能,為了保持模塊的獨立性,所以需要通過訊息通信的方式進行處理,為了保持框架在部署,使用,二次開發程序中的簡易便捷性,所以沒有選擇傳統的MQ,而是基于Redis的訂閱發布實作一個系統內部訊息組件,話不多說,上碼!
資料結構定義
訊息物體包含幾個部分,訂閱通道名稱,資訊頭,資訊體,資訊差異化額外資訊字典,資訊頭主要包含訊息標識,訊息日期,資訊體包含資訊內容,資訊物體型別等
public class Message
{
public string MessageChannel { set; get; }
public MessageHead @MessageHead { set; get; }
public MessageBody @MessageBody { set; get; }
[JsonExtensionData]
public Dictionary<string,Object> @MessageExtra { set; get; }
public Message()
{
}
public void AddExtra(string Name, string Value)
{
if (@MessageExtra == null)
{
@MessageExtra = new Dictionary<string, object>();
}
@MessageExtra.Add(Name, Value);
}
public Object GetExtra(string Name)
{
return @MessageExtra[Name];
}
}
public class MessageHead
{
public string MessageID { set; get; }
public DateTime MessageDate { set; get; }
public MessageHead()
{
MessageID = CommonUtil.CreateCommonGuid();
MessageDate = DateTime.Now;
}
}
public class MessageBody
{
public string MessageJsonContent { set; get; }
public Type MessageMapperType { set; get; }
}
注:因為訊息訂閱發布傳遞程序中,我是通過Json序列化傳輸的,使用程序中可能需要一些額外的鍵值對資訊,這里在物件中定義的是Dictinary物件,但是Dictinary本身是不支持序列化的,所以需要加上注解JsonExtensionData
訂閱通道宣告
我們需要達到的效果是,在系統啟動時,所有訊息通道可以根據系統中的應用自動訂閱,這里就需要一個注解來標識我們的訂閱通道接收訊息的實作類
[AttributeUsage(AttributeTargets.Class)]
public class MessageChanelAttribute : Attribute
{
private string _ChannleName;
public string ChannelName
{
get
{
return this._ChannleName;
}
set
{
this._ChannleName = value;
}
}
}
訊息的個性化策略處理
Redis的三方庫我這里使用的是StackExchange.Redis.dll,在訊息訂閱時,需要為Channel指定接收到訊息時的處理委托,我們在自動訂閱的程序中肯定也要收集好各類訊息處理類并與Channel一一對應,這時候我們就需要一個基類FastDefaultMessageHandler,我們的具體的訊息處理類繼承自FastDefaultMessageHandler,重寫處理方法即可
[Component]
[MessageChanelAttribute(ChannelName = "DefaultMessage")]
public class FastDefaultMessageHandler : IFastMessageHandle
{
[AutoWired]
public DBUtil @DBUtil;
public void HandleMessage(RedisChannel ChannelName, RedisValue Message)
{
FastExecutor.Message.Design.Message Entity = JsonConvert.DeserializeObject<FastExecutor.Message.Design.Message>(Message);
try
{
if (!CheckMessageIsConsume(Entity))
{
this.CustomHandle(Entity);
}
}
catch (Exception e)
{
StringBuilder ExceptionLog = new StringBuilder();
ExceptionLog.AppendFormat("例外Message所屬Channel:{0}", Entity.MessageChannel + Environment.NewLine);
ExceptionLog.AppendFormat("例外Message插入時間:{0}", Entity.MessageHead.MessageDate.ToString() + Environment.NewLine);
ExceptionLog.AppendFormat("例外Message內容:{0}", Message + Environment.NewLine);
ExceptionLog.AppendFormat("例外資訊:{0}", e.Message + Environment.NewLine);
LogUtil.WriteLog("Logs/MessageErrorLog", "log_", ExceptionLog.ToString() + Environment.NewLine);
ExceptionLog.AppendFormat("========================================================================================================================================================================" + Environment.NewLine);
MessageACK.MoveMessageToExceptionChannel(Entity.MessageChannel, Entity);
}
finally
{
MessageACK.ConfirmMessageFinish(Entity.MessageChannel, Entity.MessageHead.MessageID);
}
}
public virtual void CustomHandle(FastExecutor.Message.Design.Message @Message)
{
}
public virtual bool CheckMessageIsConsume(FastExecutor.Message.Design.Message @Message)
{
return false;
}
}
其中的HandleMessage方法就是我們在訂閱Channel時對應的委托,會呼叫類中的CustomHandle的虛方法,子類繼承重寫該方法就會基于多型進行策略呼叫,CheckMessageIsConsume方法是用于確認訊息是否重復消費的,也可以被重寫,下面看一個訪問日志類的實體,使用MessageChanelAttribute標注宣告該實作類需要訂閱發布的Channel名稱為Visit,CustomHandle方法中實作了插入資料庫操作,CheckMessageIsConsume方法判斷該條日志資料是否已消費(已經存在于資料庫)
[MessageChanelAttribute(ChannelName = "Visit")]
public class VisitLog : FastDefaultMessageHandler
{
public override void CustomHandle(Message.Design.Message Message)
{
Frame_VisitLog LogEntity = JsonConvert.DeserializeObject<Frame_VisitLog>(Message.MessageBody.MessageJsonContent);
@DBUtil.Insert(LogEntity);
base.CustomHandle(Message);
}
public override bool CheckMessageIsConsume(Message.Design.Message Message)
{
Frame_VisitLog LogEntity = JsonConvert.DeserializeObject<Frame_VisitLog>(Message.MessageBody.MessageJsonContent);
DBRow Row = new DBRow("Frame_VisitLog", "RowGuid", LogEntity.RowGuid);
if (Row.IsExist())
{
return true;
}
else
{
return false;
}
}
}
訊息自動訂閱
我們希望系統在啟動時就尋找出定義好Channel和實作類,自動實作訂閱,這里就需要用到IOC容器,啟動系統時將所有的訊息處理類放入容器中,在自動訂閱時全部取出來,根據訊息處理類中宣告的Channel名稱進行自動訂閱
public void Init()
{
List<Type> HandlerTypeList = InjectUtil.Container.GetRegistType(typeof(IFastMessageHandle));
foreach (Type HandlerType in HandlerTypeList)
{
MessageChanelAttribute Channel = Attribute.GetCustomAttribute(HandlerType, typeof(MessageChanelAttribute)) as MessageChanelAttribute;
RedisUtil.Subscribe(Channel.ChannelName, ((FastDefaultMessageHandler)InjectUtil.Container.Resolve(HandlerType)).HandleMessage);
}
}
注:
1.這里的IOC容器是我自己實作的,地址:https://gitee.com/grassprogramming/FastIOC,大家可以用AutoFac代替
2.RedisUtil是對StackExchange.Redis.dll封裝的處理類,地址:https://gitee.com/grassprogramming/FastUtil
訊息發送
訊息只需要呼叫Redis的發布方法即可,將Channel名稱與定義好的資料物體類傳入,序列化為Json
public void SendMessage<T>(string ChannleName, T CustomMessageEntity, Dictionary<string, string> ExtraData = https://www.cnblogs.com/yanpeng19940119/p/null)
{
FastExecutor.Message.Design.Message MessageEntity = new Design.Message();
MessageEntity.MessageChannel = ChannleName;
MessageHead Head = new MessageHead();
MessageBody Body = new MessageBody();
Body.MessageMapperType = typeof(T);
Body.MessageJsonContent = JsonConvert.SerializeObject(CustomMessageEntity);
MessageEntity.MessageHead = Head;
MessageEntity.MessageBody = Body;
if (ExtraData != null)
{
foreach (var item in ExtraData)
{
MessageEntity.AddExtra(item.Key, item.Value);
}
}
RedisUtil.Publish(ChannleName, MessageEntity);
MessageACK.CopyMessageToACKList(ChannleName, MessageEntity);
}
訊息確認與存盤
Redis作訂閱發布模式作為訊息組件的問題有兩方面
問題:訊息消費完沒有確認機制
解決方案
基于Redis的Hash存盤方式建立一個訊息存盤欄位,在發送訊息時拷貝到訊息Hash字典中,消費完畢后再洗掉,對應SendMessage中的MessageACK.CopyMessageToACKList方法和FastDefaultMessageHandler中的MessageACK.ConfirmMessageFinish方法,本質就是對Hash字典的增加與洗掉功能
問題:訊息處理端掛了再次重啟訊息會丟失
解決方案
確認機制已經保證了訊息即使沒有被消費完但是處理端宕機訊息也不會丟失,需要注意的是,訊息沒有丟失僅僅是Hash字典中有存盤,但是訊息通道中不存在了,所以我們在系統每次啟動時掃描這個Hash字典,重新發布訊息到Channel,這樣可能導致重復消費,所以需要靠FastDefaultMessageHandler中的CheckMessageIsConsume方法判斷,同時訊息處理者本身處理例外我們也需要記錄下來,比如發短信供應商介面有問題,訊息處理例外會進入Redis的ChannelException通道,我們可以根據需求實作一個可視化界面決定是否通過手動恢復
最后
Message組件相關代碼地址:https://gitee.com/grassprogramming/FastExecutor/tree/master/code/FastExecutor/FastExecutor.Message
存在不足問題:如果訊息是單純記錄日志問題,沒辦法確認訊息是否消費了
如果大家有什么好的建議,可留言一起交流學習,共同進步
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/123957.html
標籤:C#
下一篇:C# 時間戳的相關操作
