由于某種原因,消費者和生產者任務中的代碼似乎從未被執行過。我哪里錯了?
using System.Threading.Channels;
namespace TEST.CHANNELS
{
public class Program
{
public static async Task Main(string[] args)
{
var channel = Channel.CreateUnbounded<int>();
var cancel = new CancellationToken();
await Consumer(channel, cancel);
await Producer(channel, cancel);
Console.ReadKey();
}
private static async Task Producer(Channel<int, int> ch, CancellationToken cancellationToken)
{
for (int i = 0; i < 59; i )
{
await Task.Delay(1000, cancellationToken);
await ch.Writer.WriteAsync(i, cancellationToken);
}
}
private static async Task Consumer(Channel<int, int> ch, CancellationToken cancellationToken)
{
await foreach (var item in ch.Reader.ReadAllAsync(cancellationToken))
{
Console.WriteLine(item);
}
}
}
}
uj5u.com熱心網友回復:
如果您是新手,我建議您閱讀教程:學習使用 Visual Studio 除錯 C# 代碼。您應該知道如何設定斷點以逐步查看您的代碼運行。
然而,現在由于這個涉及 async/Task,它可能看起來很混亂,但是當你介入時Consumer,你會看到它停在await foreach (var item in ch.Reader.ReadAllAsync(cancellationToken))一行。
原因是消費者正在等待生產者從未放入的東西。原因是您首先await停止了您的代碼,因此第二行永遠不會被執行。
await Consumer(channel, cancel);
await Producer(channel, cancel);
這應該可以解決問題:
var consumerTask = Consumer(channel, cancel);
var producerTask = Producer(channel, cancel);
await Task.WhenAll(consumerTask, producerTask);
上面的代碼說的是,
運行消費者任務,不要等待它,而是在
consumerTask.運行 Producer Task,不要等待,而是在
producerTask.等待兩個
consumerTask并producerTask完成使用Task.WhenAll。
請注意,消費者似乎仍然存在邏輯問題,因為它永遠不會退出,因此您ReadKey()可能不會受到打擊(您的應用程式會卡WhenAll在線路上)。如果你打算修復它,如果它是一個錯誤,我認為它對你來說更容易“練習”。
uj5u.com熱心網友回復:
您的代碼試圖在生成任何訊息之前使用通道中的所有訊息。雖然您可以存盤生產者/消費者任務而不是等待它們,但最好使用特定于通道的習語和模式。
與其將 Channel 用作某種容器,不如僅將 Readers 公開和共享給消費者創建和擁有的通道。這就是在 Go 中使用 Channels 的方式。
這就是為什么你也只能使用 ChannelReader 和 ChannelWriter 的原因:
- ChannelReader 是
ch ->Go 中的一個,是從通道讀取的唯一方法 - ChannelWriter 是
ch <-Go 中唯一的寫法。
使用自有頻道
如果您需要處理資料不同步,在一個任務做內部生產者/消費者的方法。這使得它很多容易控制的渠道,當完成或取消的處理知道。它還允許您非常輕松地從通道構建管道。
在您的情況下,生產者可能是:
public ChannelReader<int> Producer(CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Task.Run(()=>{
for (int i = 0; i < 59; i )
{
await Task.Delay(1000, cancellationToken);
await writer.WriteAsync(i, cancellationToken);
}
},cancellationToken)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
消費者,如果是懶惰的,可以是:
static async Task ConsumeNumbers(this ChannelReader<int> reader, CancellationToken cancellationToken)
{
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
Console.WriteLine(item);
}
}
使其成為擴展方法兩者都可以與:
await Producer(cancel)
.ConsumeNumbers(cancel);
在更一般的情況下,管道塊從通道讀取并回傳通道:
public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Task.Run(async ()=>{
await foreach (var item in reader.ReadAllAsync(cancellationToken))
{
var newItem=Math.Pow(item,pow);
await writer.WriteAsync(newItem);
}
},cancellationToken)
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
這將允許創建一系列步驟,例如:
await Producer(cancel)
.RaiseTo(0.3,cancel)
.RaiseTo(3,cancel)
.ConsumeNumbers(cancel);
并行處理
每個塊也可以使用多個任務,以加快處理速度。在 .NET 6 中,這可以通過以下方式輕松完成Parallel.ForEachAsync:
public ChannelReader<int> RaiseTo(this ChannelReader<int> reader, double pow,CancellationToken cancellationToken)
{
var channel=Channel.CreateUnbounded<int>();
var writer=channel.Writer;
_ = Parallel.ForEachAsync(
reader.ReadAllAsync(cancellationToken),
cancellationToken,
async item=>
{
var newItem=Math.Pow(item,pow);
await writer.WriteAsync(newItem);
})
.ContinueWith(t=>writer.TryComplete(t.Exception));
return channel;
}
注意順序
通道保留專案和讀取請求的順序。這意味著單任務步驟將始終按順序消費和產生訊息。雖然沒有這樣的保證Parallel.ForEachAsync。如果順序很重要,您必須添加代碼以確保按順序發出訊息,或者嘗試通過另一個步驟重新排序它們。
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/408715.html
標籤:
