假設我被提供給一個事件生產者API,它包括Start()、Pause()和Resume()方法,以及一個ItemAvailable事件。生產者本身是外部代碼,我無法控制其執行緒。在呼叫Pause()之后,仍有一些專案可能會通過(生產者實際上是遠程的,所以專案可能已經在網路上飛行)。
還假設我正在撰寫消費者代碼,其中消費可能比生產更慢。
關鍵要求是
- 消費者事件處理程式必須不阻塞生產者執行緒,并且
- 所有事件必須被處理。
- 所有事件都必須被處理(不能丟棄任何資料)。
我在消費者中引入了一個緩沖器,以平滑一些突發事件。但在突發性擴大的情況下,我希望呼叫 Producer.Pause(),然后在適當的時間呼叫 Resume(),以避免消費者端記憶體耗盡。
我有一個解決方案,利用Interlocked來增加和減少一個計數器,將其與一個閾值進行比較,以決定是否是時候Pause或Resume。
問題。就效率(和優雅)而言,是否有比Interlocked計數器(以下代碼中的int current)更好的解決方案?
更新后的MVP(不再從限制器上反彈):
namespace Experiments
{
internal class Program {
{
//簡單的外部生產者API用于演示目的。
private class Producer
{
public void Pause Pause(inti) { _blocker. Reset(); Console.WriteLine($"暫停在{i}"); }
title">Resume(inti) { _blocker. Set(); Console.WriteLine($"resumed at {i}"/span>); }
public async Task Start()
{
await Task.Run
(
() =>
{
for (int i = 0; i < 10000; i )
{
_blocker.Wait()。
ItemAvailable?.Invoke(this, i) 。
}
}
);
}
public event EventHandler<int> ItemAvailable;
private ManualResetEventSlim _blocker = new(true)。
}
private static async 任務 Main(string[] args)
{
var p = new Producer()。
var buffer = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleWriter = true }) 。
int threshold = 1000;
int resumeAt = 10;
int current = 0;
int paused = 0;
p.ItemAvailable = (_, i) =>
{
if (Interlocked.Increment(ref current) >= threshold
&& Interlocked.CompareExchange(ref paused, 0, 1)==0)
) p.Pause(i);
buffer.Writer.TryWrite(i)。
};
var processor = Task.Run
(
async ( ) =>
{
await foreach (int i in buffer.Reader.ReadAllAsync()
{
Console.WriteLine($" processing {i}")。
await Task.Delay(10) 。
if
(
Interlocked.Decrement(ref current) < resumeAt
&& Interlocked.CompareExchange(ref paused, 1, 0) ==1.
) p.Resume(i)。
}
}
);
p.Start()。
await處理器。
}
}
uj5u.com熱心網友回復:
如果你的目標是優雅,你可以考慮在一個自定義的Channel<T>內烘烤壓力感知功能。下面是一個PressureAwareUnboundedChannel<T>類,它源于Channel<T>。它提供了基類的所有功能,此外,當通道處于壓力之下時,以及當壓力被釋放時,它將發出通知。這些通知是通過一個IProgress<bool>實體推送的,當壓力超過一個特定的高閾值時,它會發出一個true值,而當壓力下降到一個特定的低閾值時,則發出一個false值。
public sealed class PressureAwareUnboundedChannel< T> : Channel<T>
{
private readonly Channel<T> _channel。
private readonly int _highPressureThreshold;
private readonly int _lowPressureThreshold;
private readonly IProgress<bool> _pressureProgress。
private int _pressureState = 0; // 0: no pressure, 1: under pressurepublic PressureAwareUnboundedChannel(int lowPressureThreshold,
int highPressureThreshold, IProgress<bool> pressureProgress)。
{
if (lowPressureThreshold < 0)
throw new ArgumentOutOfRangeException(nameof(lowPressureThreshold))。
if (highPressureThreshold < lowPressureThreshold)
throw new ArgumentOutOfRangeException(nameof(highPressureThreshold))。
if (pressureProgress == null)
throw new ArgumentNullException(nameof(pressureProgress))。
_highPressureThreshold = highPressureThreshold;
_lowPressureThreshold = lowPressureThreshold。
_pressureProgress = pressureProgress。
_channel = Channel.CreateBounded<T>(Int32.MaxValue)。
this.Writer = new ChannelWriter(this)。
this.Reader = new ChannelReader(this)。
}
private class ChannelWriter : ChannelWriter<T>
{
private readonly PressureAwareUnboundedChannel<T> _parent;
public ChannelWriter(PressureAwareUnboundedChannel<T> parent)
=> _parent = parent。
public override bool TryComplete(Exception error = null)
=> _parent._channel.Writer.TryComplete(error)。
public override bool TryWrite(T item)
{
bool success = _parent._channel.Writer.TryWrite(item)。
if (success) _parent.SignalWriteOrRead();
return success;
}
public override ValueTask< bool> WaitToWriteAsync()
CancellationToken cancellationToken = default)?
=> _parent._channel.Writer.WaitToWriteAsync(cancellationToken)。
}
private class ChannelReader : ChannelReader< T>
{
private readonly PressureAwareUnboundedChannel<T> _parent;
public ChannelReader(PressureAwareUnboundedChannel<T> parent)
=> _parent = parent。
public override Task Completion => _parent._channel.Reader.Completion。
public override bool CanCount => _parent._channel.Reader.CanCount。
public override int Count => _parent._channel.Reader.Count。
public override bool TryRead(span class="hljs-keyword">out T item)
{
bool success = _parent._channel.Reader.TryRead(out item)。
if (success) _parent.SignalWriteOrRead();
return success;
}
public override ValueTask< bool> WaitToReadAsync()
CancellationToken cancellationToken = default)。
=> _parent._channel.Reader.WaitToReadAsync(cancellationToken)。
}
private void SignalWriteOrRead()
{
var currentCount = _channel.Reader.Count。
bool underPressure;
if (currentCount > _highPressureThreshold)
underPressure = true。
else if (currentCount <= _lowPressureThreshold)
underPressure = false;
else[/span
return;
int newState = underPressure ? 1 : 0;
int oldState = underPressure ? 0 : 1;
if (Interlocked.CompareExchange(
ref _pressureState, newState, oldState) != oldState) return;
_pressureProgress.Report(underPressure)。
}
封裝的Channel<T>實際上是一個有界通道,其容量等于最大的Int32值,因為只有有界通道才實作Reader.Count屬性。
使用實體:
var progress = new Progress<bool> (underPressure =>
{
if (underPressure) Producer.Pause(); else Producer.Resume()。
});
var channel = new PressureAwareUnboundedChannel<Item> (500, 1000, progress) 。
在這個例子中,當通道記憶體儲的專案超過1000個時,Producer將被暫停,當專案數量下降到500個或更少時,它將被恢復。
Progress<bool>動作是在Progress<bool>創建時被捕獲的背景關系上呼叫。因此,如果你在一個GUI應用程式的UI執行緒上創建它,該動作將在UI執行緒上被呼叫,否則它將在ThreadPool上被呼叫。在后一種情況下,將沒有保護措施防止Action<bool>的重疊呼叫。如果Producer類不是執行緒安全的,你將不得不在處理程式中添加同步功能。例子:
var progress = new Progress<bool> (underPressure =>
{
lock (Producer) if (underPressure) Producer.Pause(); else Producer.Resume()。
});
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/333687.html
標籤:
上一篇:為什么我們應該在<algorithm>頭的函式之前使用std命名空間而不應該在<cmath>頭的函式之前使用它?
下一篇:改變VSC中的php顏色標簽
