我有一個通過 redis pub/sub 傳來的東西,我需要將它分發到多個 websocket 連接,所以基本上每當訊息來自 redis 時,它都需要通過所有 websockets 連接分發。
我想要多個消費者。他們每個人都應該得到所有的訊息。
var channel = Channel.CreateBounded<int>(new BoundedChannelOptions(1)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = false
});
var cts = new CancellationTokenSource();
var producer = Task.Run(async () =>
{
int i = 0;
while (!cts.IsCancellationRequested)
{
channel.Writer.TryWrite(i );
await Task.Delay(TimeSpan.FromMilliseconds(250));
}
});
var readerOneTask = Task.Run(async () =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Reader one: {i}");
}
});
var readerTwoTask = Task.Run(async () =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Reader two: {i}");
}
});
cts.CancelAfter(TimeSpan.FromSeconds(5));
Console.ReadLine();
uj5u.com熱心網友回復:
單個Channel<T>不能向多個消費者廣播訊息。每次從通道讀取訊息時,都會消費該訊息,并且沒有其他消費者會得到它。如果您想向所有消費者廣播所有訊息,則必須為Channel<T>每個消費者創建一個專用訊息。
您可能會發現這個問題很有趣:IAsyncEnumerable 或 IAsyncEnumerator 的工廠。它顯示了為序列實作源/控制器的各種方法IAsyncEnumerable<T>,包括通道和 Rx 主題。
更新:下面是一個演示如何使用多個通道,以便將所有訊息傳播給所有消費者。
List<Channel<int>> channels = new();
async Task CreateConsumer(Func<Channel<int>, Task> body)
{
var channel = Channel.CreateUnbounded<int>();
lock (channels) channels.Add(channel);
try
{
await Task.Run(() => body(channel)).ConfigureAwait(false);
}
finally
{
lock (channels) channels.Remove(channel);
}
}
Task consumer1 = CreateConsumer(async channel =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Consumer one: {i}");
}
});
Task consumer2 = CreateConsumer(async channel =>
{
await foreach (var i in channel.Reader.ReadAllAsync())
{
Console.WriteLine($"Consumer two: {i}");
}
});
using CancellationTokenSource cts = new(TimeSpan.FromMilliseconds(3000));
Task producer = Task.Run(async () =>
{
int i = 0;
while (true)
{
i ;
lock (channels) channels.ForEach(channel => channel.Writer.TryWrite(i));
try { await Task.Delay(TimeSpan.FromMilliseconds(250), cts.Token); }
catch (OperationCanceledException) { break; }
}
});
producer.Wait();
lock (channels) channels.ForEach(channel => channel.Writer.Complete());
Task.WaitAll(consumer1, consumer2);
在 Fiddle 上試試。
這CreateConsumer是一個異步方法,負責創建通道并將其添加到串列中。它還負責在消費者完成時從串列中洗掉頻道。這很重要,否則如果消費者失敗,生產者將繼續在死通道中推送訊息,從而導致記憶體泄漏。
每個消費者可能不同的消費者“主體”作為異步 lambda 傳遞給CreateConsumer方法。
重要的是在啟動生產者之前啟動所有消費者并創建他們的通道。這就是為什么該CreateConsumer方法不包含在Task.Run. 這樣,CreateConsumer直到第一個內部的代碼在await呼叫CreateConsumer.
使用通道對串列的每次訪問都受保護lock,因為多個執行緒可能會同時嘗試讀取/修改串列。
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/485562.html
標籤:C# 。网 system.threading.channels
