問題是我需要如下創建一個異步方法/庫(這樣它就不會阻塞異步事件回圈):
- 從正在定義的異步方法創建未來(假設它是methodA)
- 將未來放入某個佇列/串列/字典中,一項服務將在可用時將結果填充到未來(結果需要很長時間才能可用)
- 在方法A中等待未來
我正在等待創建的未來的問題是永遠阻塞,如下面的簡化示例。
import asyncio
from asyncio import Future
from queue import Queue
from threading import Thread
futures_queue: Queue[Future] = Queue()
def fill_result_service():
counter = 0
while True:
fut = futures_queue.get()
print(f"Processing fut={id(fut)}")
fut.set_result(f"OK: {counter}")
counter = 1
filler_thread = Thread(target=fill_result_service)
filler_thread.start()
async def main_not_ok():
fut: Future[str] = asyncio.get_running_loop().create_future()
print(f"Putting fut={id(fut)} into queue")
futures_queue.put(fut)
result = await fut
assert result.startswith("OK")
print("main_not_ok() completed")
async def main_ok():
fut: Future[str] = asyncio.get_running_loop().create_future()
tmp_thread = Thread(target=lambda: fut.set_result("OK: Local thread"))
tmp_thread.start()
result = await fut
assert result.startswith("OK")
print("main_ok() completed")
if __name__ == "__main__":
print("Running main_ok: ")
asyncio.run(main_ok()) # work as expected
print("\n\n\nRunning main_not_ok: ")
asyncio.run(main_not_ok()) #blocking forever
我一直在努力除錯它半天,無法弄清楚。請幫我。
uj5u.com熱心網友回復:
Queue您可以通過以下方式做到這一點:
import asyncio
import time
from random import randint
from threading import Thread
def set_future_result(future: asyncio.Future, event: asyncio.Event, loop: asyncio.AbstractEventLoop):
"""Thread target function. It gives some result to futures."""
async def _event_status_change(_event: asyncio.Event):
"""Wrap event in coroutine to run it threadsafe"""
_event.set()
time.sleep(3)
res = randint(1, 10)
if res > 8:
future.set_exception(Exception(f"Result: {res} " "Error !!! " * 2))
else:
future.set_result(res)
asyncio.run_coroutine_threadsafe(_event_status_change(event), loop)
async def asyncio_loop_killer(task: asyncio.Task):
"""Just task to finish our app in several seconds."""
n = 20
while n:
await asyncio.sleep(1)
n -= 1
if task.done():
break
else:
task.cancel()
async def tasks_producer():
"""Main function of our app. It produces futures for children threads."""
loop = asyncio.get_event_loop()
while True:
future, event = asyncio.Future(), asyncio.Event()
event.clear()
worker = Thread(target=set_future_result, args=(future, event, loop,), daemon=True)
worker.start()
await event.wait()
if res := future.exception():
print(f"Error: {res}")
break
print(f"Result: {future.result()}")
async def async_main():
"""Wrapper around all async activity."""
producer_task = asyncio.create_task(tasks_producer())
await asyncio_loop_killer(producer_task)
if __name__ == '__main__':
asyncio.run(async_main())
add_done_callback還要檢查Future
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/517050.html
上一篇:當5組執行緒中的任何一個停止時如何啟動下一個執行緒?
下一篇:按一定順序執行執行緒
