我在微服務中有一個類,如下所示:
import asyncio
import threading
class A:
def __init__(self):
self.state = []
self._flush_thread = self._start_flush()
self.tasks = set()
def _start_flush(self):
threading.Thread(target=self._submit_flush).start()
def _submit_flush(self):
self._thread_loop = asyncio.new_event_loop()
self._thread_loop.run_until_complete(self.flush_state()) #
async def regular_func(self):
# This function is called on an event loop that is managed by asyncio.run()
# process self.state, fire and forget next func
task = asyncio.create_task(B.process_inputs(self.state)) # Should call process_inputs in the main thread event loop
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
pass
async def flush_state(self):
# flush out self.state at regular intervals, to next func
while True:
# flush state
asyncio.run_coroutine_threadsafe(B.process_inputs(self.state), self._thread_loop) # Calls process_inputs in the new thread event loop
await asyncio.sleep(10)
pass
class B:
@staticmethod
async def process_inputs(self, inputs):
# process
在這兩個執行緒上,我有兩個單獨的事件回圈,以避免主事件回圈中的任何其他異步函式阻止其他異步函式運行。
我看到提交給給定事件回圈時asyncio.run_coroutine_threadsafe這是執行緒安全的。在不同的事件回圈之間呼叫仍然是執行緒安全的嗎?asyncio.run_coroutine_threadsafe(B.process_inputs())
編輯:
process_inputs將狀態上傳到物件存盤并使用我們傳入的狀態呼叫外部 API。
uj5u.com熱心網友回復:
這里的答案是,這asyncio.run_coroutine_threadsafe并不能保護我們免受跨不同事件回圈的任何執行緒安全問題的影響。我們需要實作鎖來保護任何被修改的共享狀態。感謝@Paul Cornelius 的回復。
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/529415.html
上一篇:如何在Python3中在有限的時間內(例如5分鐘)執行一個函式?
下一篇:在腳本中實作多執行緒/并行處理
