除了標題不好,背景關系:
本質上,我有一個事件佇列和事件處理器,它偵聽佇列并在沒有結果的情況下產生長時間運行的任務(只要比事件之間的間隔稍長,因此在接收另一個事件之前等待每個任務會導致處理器落后于實時處理每個事件的事件)。
(如果更多細節有幫助 - 事件佇列是 Azure 的 EventHub,我使用 Azure 訂閱它EventProcessorClient。)
作為測驗而不是等待任務,我嘗試觸發并忘記任務,并使用最少的例外日志記錄:
private Task OnEvent(ProcessEventArgs arg) {
try
{
_ = Task.Factory.StartNew(async () => await MyLongTask(arg.Data.Body))
.ContinueWith(t =>
{
switch(t.Status)
{
case TaskStatus.Created:
case TaskStatus.WaitingForActivation:
case TaskStatus.WaitingToRun:
case TaskStatus.Running:
case TaskStatus.WaitingForCildrenToComplete:
case TaskStatus.RanToCompletion:
break;
case TaskStatus.Canceled:
case TaskStatus.Faulted:
_logger.LogError(t.Exception, "Processing task failed");
break;
default:
break;
}
}, arg.CancellationToken);
}
catch(Exception e)
{
_logger.LogError(e, "Failed to process event");
}
return Task.CompletedTask;
}
這適用于處理實時事件,并行任務的數量保持在 2-5。問題是如果處理器出現故障,它需要趕上實時事件,而不是跳過它們并且有超過 20 個左右的并行任務,這會很快使主機資源匱乏(cpu/記憶體繁重的任務) )。
到目前為止,我唯一的想法是讓 WaitHandle/Semaphore 在任務計數器上觸發。任務工廠內的任務將首先等待 WaitHandle/Semaphore,然后遞增計數器,然后才開始實際的處理任務。所有ContinueWith(..)switch case 都會遞減它。
但是 atm 我不知道這有多安全,因為我對此沒有太多經驗。
uj5u.com熱心網友回復:
最終Task.Factory在我收到事件而不是觸發任務時呼叫的基礎上實作了這個基本實用程式
public class ThrottledParallelTaskFactory
{
private readonly Semaphore _semaphore;
private readonly TimeSpan _timeout;
private readonly ILogger _logger;
public ThrottledParallelTaskFactory(int limit, TimeSpan? timeout = null, ILogger logger = null)
{
_semaphore = new Semaphore(limit, limit, nameof(ThrottledParallelTaskFactory));
_timeout = timeout ?? TimeSpan.FromSeconds(15);
_logger = logger;
}
public Task StartNew(Func<Task> func)
{
_semaphore.WaitOne(_timeout);
return Task.Factory.StartNew(func)
.ContinueWith(t =>
{
_semaphore.Release();
if(t.Status == TaskStatus.Canceled || t.Status == TaskStatus.Faulted)
{
_logger?.LogError(t.Exception, "Parallel task failed");
}
});
}
}
所以我OnEvent現在看起來像這樣
private readonly ThrottledParallelTaskFactory _throttledParallelTaskFactory = ...;
private Task OnEvent(ProcessEventArgs arg) {
try
{
_ = _throttledParallelTaskFactory.StartNew(async () => await MyLongTask(...);
}
catch(Exception e)
{
_logger.LogError(e, "Failed to process event");
}
return Task.CompletedTask;
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/399946.html
下一篇:對呼叫兩次的片段使用視圖模型
