我正在撰寫一個簡單的生產者/消費者應用程式,但我注意到一個非常奇怪的行為..這是代碼:
private Thread _timelineThread = null;
private BufferBlock<RtpPacket> _queue = null;
private AutoResetEvent _stopping = new AutoResetEvent(false);
static void Main(string[] args)
{
// Start consumer thread
Consume();
// Produce
var t = new Thread(() =>
{
while (true)
{
var packet = RtpPacket.GetNext();
_queue.Post(packet);
Thread.Sleep(70);
}
}
t.Join();
}
static void Consume()
{
_timelineThread = new Thread(async () =>
{
while (_stopping.WaitOne(0) == false)
{
// Start consuming...
while (await _queue.OutputAvailableAsync())
{
var packet = await _queue.ReceiveAsync();
// Some processing...
}
}
});
_timelineThread.Start();
}
這是一個無限回圈(直到我路由 _stopping 信號)。但是,當 _timelineThread 遇到第一個 await _queue.OutputAvailableAsync() 時,執行緒將狀態更改為“已停止”。有什么問題我沒有考慮?
如果我將 Consume() 函式更改為:
static void Consume()
{
_timelineThread = new Thread(() =>
{
while (_stopping.WaitOne(0) == false)
{
// Start consuming...
while (_queue.OutputAvailableAsync().GetAwaiter().GetResult())
{
var packet = _queue.ReceiveAsync().GetAwaiter().GetResult();
// Some processing...
}
}
});
_timelineThread.Start();
}
執行緒運行沒有任何問題..但代碼幾乎與前一個相同..
編輯:一小時后,這個“hack”似乎也不起作用..執行緒正在“運行”,但我沒有從佇列中收到任何資料..
uj5u.com熱心網友回復:
該Thread構造不明白async的代表。你可以在這里閱讀:
- 可以在 ThreadStart 方法中使用“異步”嗎?
- 異步執行緒體回圈,它只是作業,但如何?
我的建議是使用 synchronousBlockingCollection<RtpPacket>而不是BufferBlock<RtpPacket>,并通過列舉GetConsumingEnumerable方法來使用它:
var _queue = new BlockingCollection<RtpPacket>();
var producer = new Thread(() =>
{
while (true)
{
var packet = RtpPacket.GetNext();
if (packet == null) { _queue.CompleteAdding(); break; }
_queue.Add(packet);
Thread.Sleep(70);
}
});
var consumer = new Thread(() =>
{
foreach (var packet in _queue.GetConsumingEnumerable())
{
// Some processing...
}
});
producer.Start();
consumer.Start();
producer.Join();
consumer.Join();
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/359327.html
