我有一個場景,我正在從資料庫中讀取一些資料。此資料以 的形式回傳IAsyncEnumerable<MyData>。讀取資料后,我想將其發送給消費者。這個消費者是異步的。現在我的代碼看起來像這樣:
// C#
IAsyncEnumerable<MyData> enumerable = this.dataSource.Read(query);
await foreach (var data in enumerable)
{
await this.consumer.Write(data);
}
我的問題是,當我列舉資料庫時,我鎖定了資料。我不想持有這個鎖的時間超過我需要的時間。
如果消費者消費資料的速度比生產者生產資料的速度慢,有沒有什么方法可以讓我急切地從資料源中讀取資料,而無需呼叫ToListor ToListAsync。我想避免一次將所有資料讀入記憶體,如果現在生產者比消費者慢,這將導致相反的問題。如果資料庫上的鎖不是盡可能短也沒關系,我想要在記憶體中一次有多少資料和我們保持列舉運行多長時間之間進行可配置的權衡。
我的想法是有某種方法可以使用佇列或類似通道的資料結構來充當生產者和消費者之間的緩沖區。
在 Golang 我會做這樣的事情:
// go
queue := make(chan MyData, BUFFER_SIZE)
go dataSource.Read(query, queue)
// Read sends data on the channel, closes it when done
for data := range queue {
consumer.Write(data)
}
有沒有辦法在 C# 中獲得類似的行為?
uj5u.com熱心網友回復:
這是Rafael的答案中ConsumeBuffered擴展方法的更強大的實作。這個使用 a作為緩沖區,而不是 a 。優點是列舉源和緩沖的兩個序列不會分別阻塞一個執行緒。已經注意完成源序列的列舉,以防緩沖序列的列舉被下游消費者過早地放棄。Channel<T>BlockingCollection<T>
public static async IAsyncEnumerable<T> ConsumeBuffered<T>(
this IAsyncEnumerable<T> source, int capacity,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentNullException.ThrowIfNull(source);
Channel<T> channel = Channel.CreateBounded<T>(new BoundedChannelOptions(capacity)
{
SingleWriter = true,
SingleReader = true,
});
using CancellationTokenSource completionCts = new();
Task producer = Task.Run(async () =>
{
Exception exception = null;
try
{
await foreach (T item in source.WithCancellation(completionCts.Token)
.ConfigureAwait(false))
{
await channel.Writer.WriteAsync(item, completionCts.Token)
.ConfigureAwait(false);
}
}
catch (Exception ex) { exception = ex; }
channel.Writer.Complete(exception);
});
try
{
await foreach (var item in channel.Reader.ReadAllAsync(cancellationToken)
.ConfigureAwait(false))
{
yield return item;
cancellationToken.ThrowIfCancellationRequested();
}
}
finally // Happens when the caller disposes the output enumerator
{
completionCts.Cancel();
await producer.ConfigureAwait(false);
}
}
設定有界通道的SingleWriterandSingleReader選項有點學術,可以省略。目前(.NET 6)在System.Threading.Channels庫中只有一個有界Channel<T>實作,無論提供什么選項。此實作基于與.NET同步的(與 a 類似的內部 .NET 型別) 。Deque<T>Queue<T>lock
通道在try/finally塊內列舉,因為 C# 迭代器將finally塊作為自動生成/的Dispose/DisposeAsync方法的一部分執行。IEnumerator<T>IAsyncEnumerator<T>
上述ConsumeBuffered方法未經測驗。理論上它應該是沒有錯誤的,但實際上它可能不是。
注意:如果外部CancellationToken被取消,取消將作為 傳播OperationCanceledException,并且所有緩沖的專案都將丟失。在具有多個生產者和消費者的生產者-消費者場景中,這可能是一個問題。建議CancellationToken僅用于破壞整個處理管道的目的,而不是用于其中的一部分。
uj5u.com熱心網友回復:
感謝@Evk 將我指向BlockingCollection<T>,這是我想出的解決方案。IAsyncEnumerable即使消費者跟不上,它也能讓我熱切地生產。也有可能想出一個類似的解決方案System.Threading.Channels來模仿 Go 的例子。
public static async IAsyncEnumerable<T> ConsumeBuffered<T>(this IAsyncEnumerable<T> enumerable, int? maxBuffer = null)
where T: class
{
using (BlockingCollection<T> queue = maxBuffer == null ? new BlockingCollection<T>() : new BlockingCollection<T>(maxBuffer.Value))
{
Task producer = Task.Run(
async () =>
{
await foreach (T item in enumerable.ConfigureAwait(false))
{
queue.Add(item);
}
queue.CompleteAdding();
});
while (true)
{
T next;
try
{
next = queue.Take();
}
catch (InvalidOperationException _)
{
// thrown when we try to Take after last item
break;
}
yield return next;
}
// this might not be needed, task must be done
// if we exited the loop
await producer.ConfigureAwait(false);
}
}
可能需要對邊緣情況進行一些拋光和測驗,但似乎在 UT 中有效
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/521694.html
上一篇:選擇內的Linq變數
