我對 Python queue.Queue 非常熟悉。當您想要在消費者和生產者執行緒之間建立可靠的流時,這絕對是您想要的。但是,有時您的生產者比消費者快,并且被迫丟棄資料(例如,對于實時視頻幀捕獲。我們通常只想緩沖最后一幀或兩幀)。
Python 是否提供類似于queue.Queue?的異步緩沖區類?如何使用queue.Queue.
例如,我可以:
buf = queue.Queue(maxsize=3)
def produce(msg):
if buf.full():
buf.get(block=False) # Make space
buf.put(msg, block=False)
def consume():
msg = buf.get(block=True)
work(msg)
盡管我不是特別喜歡生產不是鎖定的佇列原子操作。例如,消費可能在 full 和 get 之間開始,對于多生產者場景,它(可能)會被破壞。
有開箱即用的解決方案嗎?
uj5u.com熱心網友回復:
沒有為此內置任何東西,但它看起來足夠簡單,可以構建您自己的緩沖區類,該類包裝 aQueue并在其自己的鎖之間提供互斥,.put()并.get()在Condition添加專案時使用變數喚醒潛在的消費者。像這樣:
import threading
class SBuf:
def __init__(self, maxsize):
import queue
self.q = queue.Queue()
self.maxsize = maxsize
self.nonempty = threading.Condition()
def get(self):
with self.nonempty:
while not self.q.qsize():
self.nonempty.wait()
assert self.q.qsize()
return self.q.get()
def put(self, v):
with self.nonempty:
while self.q.qsize() >= self.maxsize:
self.q.get()
self.q.put(v)
assert 0 < self.q.qsize() <= self.maxsize
self.nonempty.notify_all()
順便說一句,我建議不要嘗試用原始鎖構建這種邏輯。當然可以做到,但是Condition變數經過精心設計,可以將您從意外競爭條件的世界中拯救出來。Condition變數有一個學習曲線,但一個非常值得攀登:它們通常使事情變得簡單而不是大腦破壞。實際上,Python 的threading模塊在內部使用它們來實作各種事情。
uj5u.com熱心網友回復:
佇列已經是多處理和多執行緒安全的,因為您不能同時從佇列中寫入和讀取。但是,您是正確的,沒有什么可以阻止佇列在full()和get命令之間進行修改。
因此,您可以使用鎖,這是您可以控制多行之間的執行緒訪問的方式。該鎖只能被獲取一次,所以如果它當前被鎖定,所有其他執行緒將等待直到它被釋放才能繼續。
import threading
lock = threading.Lock()
def produce(msg):
lock.acquire()
if buf.full():
buf.get(block=False) # Make space
buf.put(msg, block=False)
lock.release()
def consume():
msg = None
while !msg:
lock.acquire()
try:
msg = buf.get(block=False)
except queue.Empty:
# buffer is empty, wait and try again
sleep(0.01)
lock.release()
work(msg)
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/383612.html
上一篇:將時間序列列更改為日期
下一篇:使用多個執行緒時不同步的列印訊息
