一、前言
訊息佇列(Message Queue)是分布式系統必不可少的中間件,大部分訊息佇列產品(如RocketMQ/RabbitMQ/Kafka等)要求團隊有比較強的技術實力,不適用于中小團隊,并且對.NET技術的支持力度不夠,而Redis實作的輕量級訊息佇列很簡單,僅有Redis常規操作,幾乎不需要開發團隊掌握額外的知識!
寫這篇檔案的目的,是因為在最近開發程序中,需要用到多端訂閱的功能,之前設計的時候用的是rockemq,最近又重新整理了一遍專案架構,把orm替換成了二次封裝的shinysqlsugar,redis也替換成了shiny.redis,正好看到newlife.redis已經實作了多端消費的redis佇列,所以試著把rockemq改成redis佇列,但是在使用程序中,發現官方的檔案還是比較難懂的,有些地方沒寫明白,還好方法有注釋,連蒙帶猜也是實作了,在跟作者的溝通中也是一邊測一邊改,終于實作了滿足日常需求的功能,覺得還是有必要把這部分寫成檔案以供后面新人學習使用,
二、Newlife.Redis
NewLife.Redis 是一個Redis客戶端組件,以高性能處理大資料實時計算為目標,
特性
- 在ZTO大資料實時計算廣泛應用,200多個Redis實體穩定作業一年多,每天處理近1億包裹資料,日均呼叫量80億次
- 低延遲,Get/Set操作平均耗時200~600us(含往返網路通信)
- 大吞吐,自帶連接池,最大支持1000并發
- 高性能,支持二進制序列化
說到Newlife.Redis不得不推薦我基于 NewLife.Redis 二次封裝的組件庫Shiny.Redis,支持.net core3,.net5,.net6,該組件在原來的基礎上封裝成了單例模式,只需一句話即可完成組件注冊,通過建構式直接注入就行,具體用法可以查看教程:.net core Redis客戶端Shiny.Redis包庫的使用
最佳實踐
RedisQueue在中通大資料分析中,用于緩沖等待寫入Oracle/MySql的資料,多執行緒計算后寫入佇列,然后由專門執行緒定時拉取一批(500行),執行批量Insert/Update操作,該系統佇列,每天10億條訊息,Redis記憶體分配8G,實際使用小于100M,除非消費端故障導致產生積壓,
遞易智能科技全部使用可信佇列 RedisReliableQueue,約300多個佇列,按系統分布在各自的Redis實體,公有云2G記憶體主從版,積壓訊息小于10萬時,佇列專用的Redis實體記憶體占用小于100M,幾乎不占記憶體空間,
公司業務每天帶來100萬多訂單,由此衍生的訊息數約1000萬條,從未丟失訊息!
三、什么是訊息佇列
訊息佇列就是訊息在傳輸程序中保存訊息的容器,其核心功用是削峰和解耦!

早高峰,快遞公司的貨車前來各驛站卸貨,多名站點作業人員使用PDA掃描到站,大量資訊進入系統(1000tps),而通知快遞公司的介面只有400tps的處理能力,
通過增加MQ來保存訊息,讓超過系統處理能力的訊息滯留下來,等早高峰過后,系統即可完成處理,此為削峰!

在快遞柜業務流程中,快遞員投柜后需要經歷扣減系統費、短信通知用戶和推送通知快遞公司三個業務動作,傳統做法需要依次執行這些業務東西,如果其中某一步例外(例如用戶手機未開機或者快遞公司介面故障),將會延遲甚至中斷整個投柜流程,嚴重影響用戶體驗,
如果介面層收到投柜資料后,寫入訊息到MQ,后續三個子系統各自消費處理,將可以完美解決該問題,并且子系統故障不影響上游系統!此為解耦!
四、使用Redis實作訊息佇列
Redis的LIST結構,具備左進右出的功能,再使用BRPOP的阻塞彈出,即可完成一個最基本的訊息佇列 RedisQueue<T>,BRPOP確保每個訊息都被消費,且僅消費一次,
GetQueue取得佇列后,Add方法發布訊息,
TakeOne拉取消費一條訊息,指定10秒阻塞,10秒內有訊息立馬回傳,否則等到10秒超時后回傳空,
4.1 新建解決方案
新建一個空的解決方案,每一種不同型別的佇列的demo代碼放置在不同的專案中,這樣代碼就會變得很清晰,不容易亂了,

再新建一個類別庫用來存盤各個型別的佇列共用的一些東西,這樣不必每個專案都參考一次

添加Newlife.Redis包到類別庫,我這里用的是我自己封裝的Shiny.Redis

修改Class1.cs為RedisConfig.cs,用來定義整個demo中需要用到的常量,

新建類RedsiMessageModel.cs,用來存放佇列訊息物體
查看代碼
namespace RedisQueueDemo.Core
{
/// <summary>
/// 訊息物體
/// </summary>
public class RedisMessageModel
{
/// <summary>
/// ID
/// </summary>
public string Id { get; set; } = Guid.NewGuid().ToString();
/// <summary>
/// 創建時間
/// </summary>
public DateTime Time { get; set; } = DateTime.Now;
/// <summary>
/// 訊息內容
/// </summary>
public string Data { get; set; } = $"時間:{DateTime.Now}";
}
}
4.2 新建普通佇列專案
右鍵解決方案 ,新建一個Worker Service專案,為什么新建Worker Service專案呢,因為我覺得這樣比較好演示,實際生成環境中我們也是用Worker Service專案跑的,

參考Core類別庫

直接直接注冊redis

Woker.cs中直接建構式注入IRedisCacheManager

ExecuteAsync測驗下有沒有注冊成功

啟動專案,發現redis注入沒有問題

4.3 實作Redis訊息佇列
這里我們模擬1個生產者,2個消費者,生產者生產訊息,兩個消費者去搶訊息,所以我們需要至少3個行程,利用Worker Service可以輕松實作,
新建兩個定時任務Consumer1和Consumer2

在RedisConfig中定義我們佇列的Key

在生產者Worker.cs中我們實作沒過一秒鐘向佇列中插入一條資料

在Consumer1中向佇列去拿資料

在Consumer2中向佇列去拿資料

不要忘了在系統中注冊Consumer1和Consumer2

運行專案,可以發現一個簡單地訊息佇列就實作了,Consume1和Consume2處于搶訊息的狀態,只有一條資料會被消費掉,

使用Redis可視化工具也能看到佇列訊息

五、需要確認的佇列
如果通知快遞公司的物流推送子系統處理訊息時出錯,訊息丟失怎么辦?顯然不可能讓上游再發一次!
這里我們需要支持消費確認的可信佇列 RedisReliableQueue<T>,消費之后,除非程式主動確認消費,否則Redis不許洗掉訊息,
RedisReliableQueue采用Redis的LIST結構,LPUSH發布訊息,再使用BRPOPLPUSH的阻塞彈出,同時備份訊息到掛起串列,消費成功后確認時,再從掛起串列中洗掉,如果消費處理失敗,訊息將滯留在掛起串列,一定時間后自動轉移回去主佇列,重新分配消費,BRPOPLPUSH確保每個訊息都被消費,且僅消費一次,
GetReliableQueue獲取佇列實體后,Add發布訊息,TakeOneAsync異步消費一條訊息,并指定10秒阻塞超時,處理完成后再通過Acknowledge確認,
5.1 新建專案
新建Worker Service專案RedisQueueDemo.Reliable

參考Core專案,并注冊RedisCacheManager

RedisConfig中定義我們的Redis佇列Key

5.2 實作可信訊息佇列
模擬一個生產者,一個消費者,生產者生產訊息,消費者去消費,
Worker.cs我們啟動之后發送三條訊息到佇列

消費者Consumer.cs里我們這樣寫,別忘了在Program里AddHostedService
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
var queue = _redisCacheManager.GetReliableQueue<RedisMessageModel>(RedisConfig.ReliableKey);
queue.RetryInterval = 5;//重新處理確認佇列中死信的間隔,默認60s
while (!stoppingToken.IsCancellationRequested)
{
List<string> acknowledges = new List<string>();//已消費訊息串列
//一次拿十條,如果拿一條就用queue.TakeOneAsync(-1);-1是超時時間,默認0秒永遠阻塞;負數表示直接回傳,不阻塞,
var data = https://www.cnblogs.com/huguodong/p/queue.Take(10).ToList();
if (data.Count > 0)
{
Console.WriteLine($"消費者拿到了:{data.Count}條訊息");
int i = 0;
data.ForEach(msg =>
{
Console.WriteLine($"消費者收到訊息,訊息ID:{msg.Id},內容:{msg.Data}");
if (i < 2)//3條訊息,設定一條消費失敗
{
acknowledges.Add(msg.ToJson());//添加到已消費訊息串列,這里需要轉成Json字串,如果是用直接queue.TakeOneAsync取的直接queue.Acknowledge(mqMsg);
Console.WriteLine("消費成功");
}
else
{
Console.WriteLine($"消費訊息失敗:訊息ID:{msg.Id},時間:{DateTime.Now}");
}
i++;
});
queue.Acknowledge(acknowledges.ToArray());//告訴佇列已經消費了的資料
}
else
{
Console.WriteLine("消費者從佇列中沒有拿到資料:" + DateTime.Now);
await Task.Delay(1000, stoppingToken);
}
}
}
運行專案,可以看到,消費了3條,有一條消費失敗了,過了五秒鐘之后又重新消費了,這樣,可信佇列就實作了,保證了訊息的不丟失,

六、延遲佇列
某一天,小馬哥說,快遞員投柜一定時間時候,如果用戶沒有來取件,那么系統需要收取超期取件費,需要一個延遲佇列,
于是想到了Redis的ZSET,我們再來一個 RedisDelayQueue<T>,Add生產訊息時多了一個引數,指定若干秒后可以消費到該訊息,消費用法跟可信佇列一樣,
那么延遲佇列有什么用呢?我們生活中其實平時接觸到很多可以使用延遲佇列來解決的例子:
- 訂單超時30分鐘未付款將自動關閉
- 會議系統中,會議開始前10分鐘,發送會議提醒
- 夏天晚上時,我們經常會給空調設定指定時長的時間,到時空調自動關閉
- 再比如微波爐、烤箱、等等
可以發現延遲佇列想要實作的功能其實就是一個定時任務調度的一種,
6.1 新建專案
新建Worker Service專案RedisQueueDemo.Delay

參考Core專案,并注冊RedisCacheManager

RedisConfig中定義我們的Redis佇列Key

6.2 實作延遲佇列
模擬一個生產者,一個消費者,生產者生產訊息,消費者去消費,
Worker.cs我們啟動之后發送三條訊息到延遲佇列

消費者Consumer.cs里我們這樣寫,別忘了在Program里AddHostedService
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
var queue = _redisCacheManager.GetDelayQueue<RedisMessageModel>(RedisConfig.DelayKey);
while (!stoppingToken.IsCancellationRequested)
{
List<RedisMessageModel> acknowledges = new List<RedisMessageModel>();//已消費訊息串列
//一次拿十條,如果拿一條就用queue.TakeOneAsync(-1);-1是超時時間,默認0秒永遠阻塞;負數表示直接回傳,不阻塞,
var data = https://www.cnblogs.com/huguodong/p/queue.Take(10).ToList();
if (data.Count > 0)
{
Console.WriteLine($"消費者拿到了:{data.Count}條訊息");
data.ForEach(msg =>
{
Console.WriteLine($"消費者收到訊息,訊息ID:{msg.Id},內容:{msg.Data},時間:{DateTime.Now}");
acknowledges.Add(msg);//添加到已消費訊息串列,這里需要轉成Json字串,如果是用直接queue.TakeOneAsync取的直接queue.Acknowledge(mqMsg);
Console.WriteLine("消費成功");
});
queue.Acknowledge(acknowledges.ToArray());//告訴佇列已經消費了的資料
}
else
{
Console.WriteLine("消費者從佇列中沒有拿到資料:" + DateTime.Now);
await Task.Delay(1000, stoppingToken);
}
}
}
運行專案可以看到,消費3條訊息,每個訊息消費間隔五秒,

七、可重復消費佇列
又一天,資料中臺的小伙伴想要消費訂單佇列,但是不能夠啊,LIST結構做的佇列,每個訊息只能被消費一次,如果資料中臺的系統消費掉了,其它業務系統就會失去訊息,
那么我們就需要一個可以重復消費的佇列,保值一條訊息能被多個系統消費,Redis5.0開始新增的STREAM結構,Newlife.Redis再次封裝RedisStream,可以實作不同的消費組消費同一個佇列,
并且一個消費組還可以產生多個消費者,多個消費者之間是共享訂閱,類似于普通的對了,同一個組中的訊息哪個消費者搶到了就是誰的,
7.1 新建專案
新建Worker Service專案RedisQueueDemo.Stream

參考Core專案,并注冊RedisCacheManager

RedisConfig中定義我們的Redis佇列Key

7.2 實作可重復消費佇列
模擬一個生產者,兩個消費組,每個消費組有兩個消費者,生產者生產訊息,消費者去消費,
Worker.cs我們啟動之后發送4條訊息佇列

新建兩個消費組,每個消費組兩個消費者

消費組1消費者1:Group1Consumer1
查看代碼
using RedisQueueDemo.Core;
using Shiny.Redis;
namespace RedisQueueDemo.Stream
{
public class Group1Consumer1 : BackgroundService
{
private readonly IRedisCacheManager _redisCacheManager;
public Group1Consumer1(IRedisCacheManager redisCacheManager)
{
this._redisCacheManager = redisCacheManager;
}
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
var groupName = "消費組1";
var consumerName = "消費者1";
//這里封裝了一下,新的消費組將不會消費創建消費組之前的訊息
//默認新的消費組將會從頭開始消費佇列,可以使用FromLastOffset屬性來設定從當前最新一條訊息開始消費
var queue = _redisCacheManager.GetAutoSteamQueue<RedisMessageModel>(RedisConfig.Stream, groupName, consumerName);
//queue.FromLastOffset = true;
while (!stoppingToken.IsCancellationRequested)
{
//一次拿1條,如果只拿一條就用queue.TakeOneAsync(5);5是超時時間,默認10秒,
var data = https://www.cnblogs.com/huguodong/p/await queue.TakeMessagesAsync(1, 5);
if (data!= null)
{
var messages = data.ToList();//訊息串列
Console.WriteLine($"{groupName}-{consumerName}拿到了:{data.Count}條訊息");
var msgIds = messages.Select(it => it.Id).ToArray();//訊息ID
messages.ForEach(it =>
{
var msg = it.GetBody<RedisMessageModel>();//獲取物體
Console.WriteLine($"{groupName}-{consumerName}收到訊息,訊息ID:{msg.Id},內容:{msg.Data}");
});
queue.Acknowledge(msgIds);//告訴佇列已經消費了的資料
}
else
{
//Console.WriteLine("消費者從佇列中沒有拿到資料:" + DateTime.Now);
//await Task.Delay(1000, stoppingToken);
}
}
}
}
}
消費組1消費者2:Group1Consumer2
查看代碼
using RedisQueueDemo.Core;
using Shiny.Redis;
namespace RedisQueueDemo.Stream
{
public class Group1Consumer2 : BackgroundService
{
private readonly IRedisCacheManager _redisCacheManager;
public Group1Consumer2(IRedisCacheManager redisCacheManager)
{
this._redisCacheManager = redisCacheManager;
}
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
var groupName = "消費組1";
var consumerName = "消費者2";
//這里封裝了一下,新的消費組將不會消費創建消費組之前的訊息
//默認新的消費組將會從頭開始消費佇列,可以使用FromLastOffset屬性來設定從當前最新一條訊息開始消費
var queue = _redisCacheManager.GetAutoSteamQueue<RedisMessageModel>(RedisConfig.Stream, groupName, consumerName);
//queue.FromLastOffset = true;
while (!stoppingToken.IsCancellationRequested)
{
//一次拿1條,如果只拿一條就用queue.TakeOneAsync(5);5是超時時間,默認10秒,
var data = https://www.cnblogs.com/huguodong/p/await queue.TakeMessagesAsync(1, 5);
if (data!= null)
{
var messages = data.ToList();//訊息串列
Console.WriteLine($"{groupName}-{consumerName}拿到了:{data.Count}條訊息");
var msgIds = messages.Select(it => it.Id).ToArray();//訊息ID
messages.ForEach(it =>
{
var msg = it.GetBody<RedisMessageModel>();//獲取物體
Console.WriteLine($"{groupName}-{consumerName}收到訊息,訊息ID:{msg.Id},內容:{msg.Data}");
});
queue.Acknowledge(msgIds);//告訴佇列已經消費了的資料
}
else
{
//Console.WriteLine("消費者從佇列中沒有拿到資料:" + DateTime.Now);
//await Task.Delay(1000, stoppingToken);
}
}
}
}
}
消費組2消費者1:Group2Consumer1
查看代碼
using RedisQueueDemo.Core;
using Shiny.Redis;
namespace RedisQueueDemo.Stream
{
public class Group2Consumer1 : BackgroundService
{
private readonly IRedisCacheManager _redisCacheManager;
public Group2Consumer1(IRedisCacheManager redisCacheManager)
{
this._redisCacheManager = redisCacheManager;
}
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
var groupName = "消費組2";
var consumerName = "消費者1";
//這里封裝了一下,新的消費組將不會消費創建消費組之前的訊息
//默認新的消費組將會從頭開始消費佇列,可以使用FromLastOffset屬性來設定從當前最新一條訊息開始消費
var queue = _redisCacheManager.GetAutoSteamQueue<RedisMessageModel>(RedisConfig.Stream, groupName, consumerName);
//queue.FromLastOffset = true;
while (!stoppingToken.IsCancellationRequested)
{
//一次拿1條,如果只拿一條就用queue.TakeOneAsync(5);5是超時時間,默認10秒,
var data = https://www.cnblogs.com/huguodong/p/await queue.TakeMessagesAsync(1, 5);
if (data!= null)
{
var messages = data.ToList();//訊息串列
Console.WriteLine($"{groupName}-{consumerName}拿到了:{data.Count}條訊息");
var msgIds = messages.Select(it => it.Id).ToArray();//訊息ID
messages.ForEach(it =>
{
var msg = it.GetBody<RedisMessageModel>();//獲取物體
Console.WriteLine($"{groupName}-{consumerName}收到訊息,訊息ID:{msg.Id},內容:{msg.Data}");
});
queue.Acknowledge(msgIds);//告訴佇列已經消費了的資料
}
else
{
//Console.WriteLine("消費者從佇列中沒有拿到資料:" + DateTime.Now);
//await Task.Delay(1000, stoppingToken);
}
}
}
}
}
消費組2消費者2:Group2Consumer2
查看代碼
using RedisQueueDemo.Core;
using Shiny.Redis;
namespace RedisQueueDemo.Stream
{
public class Group2Consumer2 : BackgroundService
{
private readonly IRedisCacheManager _redisCacheManager;
public Group2Consumer2(IRedisCacheManager redisCacheManager)
{
this._redisCacheManager = redisCacheManager;
}
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
var groupName = "消費組2";
var consumerName = "消費者2";
//這里封裝了一下,新的消費組將不會消費創建消費組之前的訊息
//默認新的消費組將會從頭開始消費佇列,可以使用FromLastOffset屬性來設定從當前最新一條訊息開始消費
var queue = _redisCacheManager.GetAutoSteamQueue<RedisMessageModel>(RedisConfig.Stream, groupName, consumerName);
//queue.FromLastOffset = true;
while (!stoppingToken.IsCancellationRequested)
{
//一次拿1條,如果只拿一條就用queue.TakeOneAsync(5);5是超時時間,默認10秒,
var data = https://www.cnblogs.com/huguodong/p/await queue.TakeMessagesAsync(1, 5);
if (data!= null)
{
var messages = data.ToList();//訊息串列
Console.WriteLine($"{groupName}-{consumerName}拿到了:{data.Count}條訊息");
var msgIds = messages.Select(it => it.Id).ToArray();//訊息ID
messages.ForEach(it =>
{
var msg = it.GetBody<RedisMessageModel>();//獲取物體
Console.WriteLine($"{groupName}-{consumerName}收到訊息,訊息ID:{msg.Id},內容:{msg.Data}");
});
queue.Acknowledge(msgIds);//告訴佇列已經消費了的資料
}
else
{
//Console.WriteLine("消費者從佇列中沒有拿到資料:" + DateTime.Now);
//await Task.Delay(1000, stoppingToken);
}
}
}
}
}
注冊到系統中

運行專案,可以看到,總共發送了四條訊息,消費組1和2都收到了四條訊息,消費組1-消費者1收到了3條消費組1-消費者2收到了一條,消費組2-消費者1收到了3條消費組2-消費者2收到了一條,

7.3 可信佇列
RedisStream也支持自動回滾消費失敗的資料,我們這里把消費組1-消費者1設定不消費成功

運行專案,可以看到消費失敗之后自動重試了,

八、原始碼地址
Gitee:https://gitee.com/huguodong520/RedisQueueDemo.git
Github:https://github.com/huguodong/RedisQueueDemo.git
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/499564.html
標籤:.NET Core
