我在 StackOverflow 上閱讀了許多關于使用 Task 來包裝基于回呼的 API 的文章和問題,我正試圖在與 Solace PubSub 訊息代理進行通信時使用這種技術。
我最初的觀察是,這種技術似乎是在轉移并發性的責任。例如,Solace代理庫有一個Send()方法,它可能會阻塞,然后我們在網路通信完成后得到一個回呼以指示 "真正的 "成功或失敗。因此,這個Send()方法可以被快速呼叫,而供應商庫在內部限制了并發性。
當你在周圍放置一個任務時,你似乎要么將操作序列化(foreach message await SendWrapperAsync(message)),要么通過決定啟動多少個任務(例如,使用 TPL 資料流)來自己接管并發性的責任。
在任何情況下,我決定用一個擔保人來包裝Send呼叫,該擔保人將永遠重試,直到回呼顯示成功為止,并對并發性負責。這是一個 "有保障 "的訊息傳遞系統。失敗不是一種選擇。這需要保證人能夠應用背壓,但這其實不在這個問題的范圍內。我在下面的示例代碼中對它有一些評論。
它確實意味著我的熱路徑,即包裹了發送 回呼的路徑,由于重試邏輯而變得 "特別熱"。因此,這里有大量的 TaskCompletionSource 創建。
供應商自己的檔案建議盡可能地重用他們的Message物件,而不是為每個Send重新創建它們。我已經決定使用Channel作為一個環形緩沖器。但這讓我想知道--除了TaskCompletionSource方法之外,是否還有其他的替代方法--也許有一些其他的物件也可以被快取在環形緩沖區中并被重用,從而達到相同的結果?
我意識到這可能是對微觀優化的過度熱心嘗試,而且說實話,我正在探索C#的幾個方面,這些方面高于我的工資等級(我是一個SQL人,真的),所以我可能錯過了一些明顯的東西。如果答案是 "你實際上不需要這種優化",那就不能讓我安心了。如果答案是 "這確實是唯一明智的方法",我的好奇心就會得到滿足。
這里是一個功能齊全的控制臺應用程式,它模擬了MockBroker物件中的Solace庫的行為,以及我對它進行封裝的嘗試。我的熱點路徑是Guarantor類中的SendOneAsync方法。這段代碼對SO來說可能有點太長了,但這是我能創建的最小的演示,它捕捉到了所有的重要元素。
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
內部 class Message { public bool sent。public int payload; public object correlator; }
//模擬第三方庫行為。
internal class MockBroker
{
public bool TrySend(Message m, Action< Message> callback)
{
if (r.NextDouble() < 0.5) return false; //模擬立即失敗的機會/"會阻塞 "的反應。
Task.Run(() => { Thread.Sleep(100); m.send = r.NextDouble() < 0.5; callback(m); }); //模擬網路呼叫。
return true。
}
private Random r = new();
}
//將MockBroker變成一個具有異步并發限制的 "保證 "發送器。
internal class Guarantorpublic Guarantor(int maxConcurrency)
{
_broker = new MockBroker()。
//避免SendOneAsync中的訊息分配。
_ringBuffer = Channel.CreateBounded<Message>(maxConcurrency)。
for (int i = 0; i < maxConcurrency; i ) _ringBuffer.Writer.TryWrite(new Message())。
}
//真正的代碼推送到T.T.T.DataFlow塊中,具有有界容量和并行性。
//執行選項都等于這里的maxConcurrency,提供并發性和背壓。
public async Task Post(int payload) => await SendOneAsync(payload);
private async Task SendOneAsync(int payload)。
{
Message msg = await _ringBuffer.Reader.ReadAsync()。
msg.payload = payload;
//發送必須最終成功。
while (true)
{
// ***能否避免這種分配?***
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously)。
msg.correlator = tcs;
//真實代碼中的類方法,在此行內以使邏輯更明顯。
Action<Message> callback = (msg) => (msg.correlator as TaskCompletionSource<bool>).SetResult(msg.send)。
if (_broker.TrySend(msg, callback) && await tcs.Task) break;
else
{
//簡單的演示重試邏輯。
Console.WriteLine($"retrying {msg.payload}")。
await Task.Delay(500) 。
}
}
//真正的代碼在這里引發一個事件以表示成功交付。
await _ringBuffer.Writer.WriteAsync(msg)。
Console.WriteLine(payload)。
}
private Channel<Message> _ringBuffer;
private MockBroker _broker;
}
internal class Programprivate static async 任務 Main(string[] args)
{
//最多 10 個并發的發送。
保證人g = new(10)。
//hacky simulation,因為在這個演示中,沒有任何東西產生連續事件,。
//沒有DataFlowBlock提供并發性(它將被通道所限制),。
//沒有人在訊息成功發送時進行通知。
List<Task> sends = new(100)。
for (int i = 0; i < 100; i ) sends.Add(g.Post(i))。
await Task.WhenAll(sends)。
}
uj5u.com熱心網友回復:
是的,你可以避免分配TaskCompletionSource實體,通過使用輕量級的ValueTasks代替Tasks。起初,你需要一個可重用的物件來實作IValueTaskSource<T>介面,而Message似乎是完美的候選。為了實作這個介面,你可以使用ManualResetValueTaskSourceCore<T>結構。這是一個可變的結構,所以它不應該被宣告為只讀。你只需要將介面方法委托給這個名字很長的結構的相應方法即可:
using System.Threading.Tasks.Sources;
internal class Message : IValueTaskSource<bool>
{
public bool sent; public int payload; public object correlator;
private ManualResetValueTaskSourceCore<bool> _source; // Mutable struct, not readonly。
public void Reset() => _source.Reset()。
public short Version => _source.Version;
public void SetResult(bool result) => _source. SetResult(result)。
ValueTaskSourceStatus IValueTaskSource<bool>.GetStatus(short token)
=> _source.GetStatus(token);
void IValueTaskSource<bool>.OnCompleted(Action<object> continuation,
object state, short token, ValueTaskSourceOnCompletedFlags flags)
=> _source.OnCompleted(continuation, state, token, flags)。
bool IValueTaskSource<bool>.GetResult(short token) => _source.GetResult(token)。
}
三個成員GetStatus、OnCompleted和GetResult是實作該介面的必要條件。其他三個成員(Reset、Version和SetResult)將被用于創建和控制ValueTask<bool>s。
現在讓我們把MockBroker類的TrySend方法包裝成一個異步方法TrySendAsync,它回傳一個ValueTask<bool>
static class MockBrokerExtensions
{
public staticValueTask<bool> TrySendAsync(this MockBroker source, message)。
{
message.Reset()。
bool result = source.TrySend(message, m => m.SetResult(m.sed))。
if (!result) message.SetResult(false)。
return new ValueTask<bool>(message, message.Version)。
}
message.Reset();重置了IValueTaskSource<bool>,并宣告之前的異步操作已經完成。一個IValueTaskSource<T>一次只支持一個異步操作,產生的ValueTask<T>只能被等待一次,而且在下一個Reset()之后就不能再被等待了。這就是你為避免分配物件所必須付出的代價:你必須遵循更嚴格的規則。如果你試圖彎曲規則(有意或無意),ManualResetValueTaskSourceCore<T>將開始到處拋出InvalidOperationExceptions。
現在讓我們使用TrySendAsync擴展方法:
while (true)
{
if (await _broker.TrySendAsync(msg)) break;
//簡單演示重試邏輯
Console.WriteLine($"retrying {msg.payload}")。
await Task.Delay(500) 。
你可以在Console中列印整個操作前后的GC.GetTotalAllocatedBytes(true),以看到區別。請確保在Release模式下運行應用程式,以看到真實的畫面。你可能會發現差別并不明顯,因為一個TaskCompletionSource實體的大小與Task.Delay所分配的位元組,以及為在Console中寫東西所產生的所有string相比,是相當小的。
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/318036.html
標籤:
