前言
上一篇博客上已經實作了使用EventBus對具體事件行為的分發處理,某種程度上也算是基于事件驅動思想編程了,但是如上篇博客結尾處一樣,我們原始碼的執行效率依然達不到心里預期,在下單流程里我們明顯可以將部分行為進行異步處理,提升下單操作的執行效率,
Redis基礎命令
Redis有兩種方式可支持我們實作MQ功能,1、使用串列(List)相關命令特性;2、使用publish、subscribe命令特性;
這里我是采取串列相關命令實作,
使用串列(List)相關命令的特性實作
- 壓入資料(發布訊息)
使用串列(List)的LPUSHRPUSH命令可以從串列左邊和右邊壓入資料;
LPUSH
將一個或多個值插入到串列頭部(此處可以將串列想象成一個從左到右的鏈表資料結構,LPUSH就是將指定的值插入最左側!)
如下命令,將多個元素壓入list1的頭部(最左側)
LPUSH list1 測驗1 測驗2
執行結果如下:

上面是寫入多個元素,我們也可以寫入單個元素
LPUSH list1 測驗3

需要留意,每次執行完LPUSH后,Redis會回傳當前串列的長度,
RPUSH
在指定串列的尾部(相當于一個鏈表的最右側)添加單個或多個元素
如下命令,還是在list1上添加多個元素,并查看執行后的list1元素資訊
RPUSH list 測驗4 測驗5

同理,RPUSH也可直接寫入單個元素,和LPUSH一樣,
- 拉取資料(消費資料)
這里的拉取資料不單單是讀取List內的元素,而是將元素從串列中取出來
BLPOP
移出并獲取串列的第一個元素(從左至右), 如果串列沒有元素會阻塞當前執行緒,直到等待超時或發現可彈出元素為止,
如下命令,從list1這個串列獲取從左至右第一個元素,在100秒內如果獲取則結束阻塞,否則阻塞到100秒之后,
BLPOP list1 100
執行結果如下:

需要留意的是BLPOP命令如果拉取到資料則會回傳兩行資料,1行為串列的key名稱,1行為獲取到的元素值,如果直到阻塞結束都沒有獲取到元素值則直接回傳命令執行超時,如下圖:

BRPOP
移出并獲取串列的最后一個元素(從左至右), 如果串列沒有元素會阻塞當前執行緒直到等待超時或發現可彈出元素為止,該命令與BLPOP除了獲取的元素位置不同,其他特性全部一致,
LPOP
移出并獲取串列的第一個元素(從左至右),如獲取到元素則回傳元素資訊,沒有元素則立即回傳null,
如下命令:
LPOP list1

RPOP
移出并獲取串列的最后一個元素(從左至右),如獲取到元素則回傳元素資訊,沒有元素則立即回傳null,該命令與LPOP除了獲取的元素位置不同其他特性全部一致;
RPOPLPUSH
移除串列的最后一個元素(最右側的元素),并將該元素添加到另一個串列并回傳,該命令如獲取到元素則回傳元素資訊,否則回傳錯誤資訊,
可以通過RPOPLPUSH這個命令的特性對MQ內一致性要求較高的業務進行處理,在從串列獲取元素成功后將該元素添加到一個備份串列,在業務處理完畢后再從備份串列將該元素洗掉,
執行下面命令測驗下:
RPOPLPUSH list1 listback

BRPOPLPUSH
從串列中彈出一個值,將彈出的元素插入到另外一個串列中并回傳它; 如果串列沒有元素會阻塞串列直到等待超時或發現可彈出元素為止,
該命令其實就是在BRPOP的基礎上將LPUSH的功能加上了,依舊也保留了指定超時時間內未獲取到元素則阻塞執行緒,
執行下面命令測驗下:
BRPOPLPUSH list1 listback 10
執行結果如下:

完善代碼
基于上面Redis的相關命令,我們再完善下上篇博客的代碼,這里我們需要新增一個控制臺啟動項,將它作為消費服務,原來的控制臺即訂單保存的控制臺作為訊息發布的服務,
下單代碼更改為下面的樣子:
/// <summary>
/// 異步方式觸發訂單相關事件
/// </summary>
public static void AsynEventHandle()
{
Guid userId = Guid.NewGuid();
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
var order = new OrderModel()
{
CreateTime = DateTime.Now,
Id = Guid.NewGuid(),
Money = (decimal)300.00,
Number = 1,
ProductName = "鮮花一束",
UserId = userId
};
Console.WriteLine($"模擬存盤訂單【采取Redis做訊息佇列的異步方式】");
Thread.Sleep(1000);
FullRedis fullRedis = new FullRedis("127.0.0.1:6379", "", 1);
//這里嘗試過使用redis 的訂閱發布模式,在執行發布命令時候發現值但凡出現空格或者"符號則會例外...
fullRedis.LPUSH("orders", new OrderModel[] { order });
stopwatch.Stop();
Console.WriteLine($"下單總耗時:{stopwatch.ElapsedMilliseconds}毫秒");
Console.ReadLine();
}
可以看到,我們已經將事件總線相關代碼給移除了,上面代碼除了向Redis的佇列(List)里寫入元素外就只是對訂單進行了持久化動作,所以看代碼就知道執行效率的提升了,
接下來,看消費服務的代碼,
static void Main(string[] args)
{
XTrace.UseConsole();
Console.WriteLine("進入Redis訊息訂閱者模式訂單訊息推送訂閱者客戶端!");
EventBus eventBus = new EventBus();
eventBus.EventRegister(typeof(OrderCreateEventNotifyHandle), typeof(OrderCreateEventData));
eventBus.EventRegister(typeof(OrderCreateEventStockLockHandle), typeof(OrderCreateEventData));
FullRedis fullRedis = new FullRedis("127.0.0.1:6379", "", 1);
fullRedis.Log = XTrace.Log;
fullRedis.Timeout = 30000;
OrderModel order = null;
while (order == null)
{
order = fullRedis.BLPOP<OrderModel>("orders", 20);
if (order != null)
{
Console.WriteLine($"得到訂單資訊:{JsonConvert.SerializeObject(order)}");
//執行相關事件
eventBus.Trigger(new OrderCreateEventData()
{
Order = order,
});
//再次設定為null方便回圈讀取
order = null;
}
}
Console.ReadLine();
}
消費服務首先從Redis里通過BLPOP從orders串列中獲取元素,再觸發事件總線,執行訂單保存相關業務處理,
最終看下執行效率如何?
訊息發布的執行效率(訂單保存)

訊息消費

可以看到目前訊息發布的執行效率下單總耗時間為1170毫秒,我們再改為同步的測驗下結果:

可以看到,同步執行的結果是3035毫秒,
小結
兩種方式相差了將近2000毫秒~ 而且后續如果再繼續擴展訂單存盤相關處理的話同步執行的回應時間會更加拉長,而采取Redis MQ的方式配合事件總線我們可以將整個業務拆分為獨立的應用,采取分布式的方式提高回應效率,同時事件總線的加入方便我們后續業務的擴展,
訊息發布端將訂單資訊寫入到串列后如果訊息消費者在拉取到資料后業務執行程序中代碼出現例外導致無法滿足業務的完整性如何處理
答:可以使用上述Redis命令中的RPOPLPUSH或BRPOPLPUSH在拉取元素后寫入到一個備份的串列中,在我們的邏輯代碼執行完畢后在將備份串列中的該元素值移除,
上述代碼已發布到Github,有需要的自行下載,
地址為:https://github.com/QQ897878763/OrderRedisSample
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/71849.html
標籤:其他
上一篇:abp(net core)+easyui+efcore實作倉儲管理系統——入庫管理之三存盤程序(三十九)
下一篇:控制臺Main函式傳參
