我在事件回圈中運行以下代碼,我正在使用以下代碼下載大量檔案asyncio并限制使用下載的檔案數量asyncio.queue:
download_tasks = asyncio.Queue()
for file in files:
# download_file() is an async function that downloads a file from Microsoft blob storage
# that is basically await blob.download_blob()
download_tasks.put_nowait(asyncio.create_task(download_file(file=file))
async def worker():
while not download_tasks.empty():
return await download_tasks.get_nowait()
worker_limit = 10
# each call to download_file() returns a pandas dataframe
df_list = await asyncio.gather(*[worker() for _ in range(worker_limit)], return_exceptions=True)
df = pd.concat(df_list)
這段代碼似乎運行良好,但我最初將 for 回圈定義為:
for file in files:
# download_file() is an async function that downloads a file from Microsoft blob storage
# that is basically await blob.download_blob()
download_tasks.put_nowait(download_file(file=file)
使用此代碼,結果相同,但我收到以下警告:
RuntimeWarning: coroutine 'download_file' was never awaited
查看 asyncio 示例,有時我看到create_task()在創建要在 Gather中運行的協程串列或佇列時使用,有時我不使用。為什么在我的情況下需要它,使用它的最佳實踐是什么?
編輯:正如@user2357112supportsMonica 不禮貌地指出的那樣,其中的return宣告worker()并沒有真正意義。這段代碼的重點是限制并發,因為我可能必須一次下載數千個,并希望使用佇列將其限制為 10 個。所以我的實際問題是,如何使用 Gather 使用此佇列實作回傳所有結果?
uj5u.com熱心網友回復:
正如“user2357112 支持莫妮卡”所指出的那樣,最初的問題可能來自作業人員有一個,return所以每個作業人員將下載一個檔案然后退出,這意味著前 10 個之后的任何協程都將被忽略并且永遠不會等待(你可能會看到,如果你登錄download_tasks處理完成后的資訊)。
的create_tasks失敗,由于它會立即安排在同一時間下載(擊敗試圖限速/工人池),然后不正確的工人代碼只是第10個專案后,忽略任何東西。
無論如何,協程(例如,裸async函式)和任務之間的區別在于任務是獨立調度的。也就是說,一旦你創建了一個任務,它就會獨立地生活await,如果你不想要它的結果,你就不必這樣做。這類似于 Javascript 的異步函式。
但是,協程在被等待之前不會做任何事情,只有在明確輪詢它們時它們才會前進,并且只能通過等待它們來完成(直接或間接,例如gather或wait將等待/輪詢它們包裝的物件)。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/311001.html
上一篇:計算超出限制的連續行
下一篇:如何正確地將日期時間轉換為字串?
