我正在尋找一種方法來有效地從磁盤中獲取一大塊值,然后在該塊上執行計算/計算。我的想法是一個 for 回圈,它會先運行磁盤提取任務,然后在提取的資料上運行計算。我想讓我的程式在運行計算時獲取下一批資料,這樣我就不必在每次計算完成時等待另一個資料獲取。我預計計算將花費比從磁盤獲取資料更長的時間,并且可能無法真正并行完成,因為單個計算任務已經將 CPU 使用率固定在接近 100% 的水平。
我在下面使用 trio 在 python 中提供了一些代碼(但也可以與 asyncio 一起使用以達到相同的效果)來說明我使用異步編程執行此操作的最佳嘗試:
import trio
import numpy as np
from datetime import datetime as dt
import time
testiters=10
dim = 6000
def generateMat(arrlen):
for _ in range(30):
retval= np.random.rand(arrlen, arrlen)
# print("matrix generated")
return retval
def computeOpertion(matrix):
return np.linalg.inv(matrix)
def runSync():
for _ in range(testiters):
mat=generateMat(dim)
result=computeOpertion(mat)
return result
async def matGenerator_Async(count):
for _ in range(count):
yield generateMat(dim)
async def computeOpertion_Async(matrix):
return computeOpertion(matrix)
async def runAsync():
async with trio.open_nursery() as nursery:
async for value in matGenerator_Async(testiters):
nursery.start_soon(computeOpertion_Async,value)
#await computeOpertion_Async(value)
print("Sync:")
start=dt.now()
runSync()
print(dt.now()-start)
print("Async:")
start=dt.now()
trio.run(runAsync)
print(dt.now()-start)
This code will simulate getting data from disk by generating 30 random matrices, which uses a small amount of cpu. It will then perform matrix inversion on the generated matrix, which uses 100% cpu (with openblas/mkl configuration in numpy). I compare the time taken to run the tasks by timing the synchronous and asynchronous operations.
From what I can tell, both jobs take exactly the same amount of time to finish, meaning the async operation did not speed up the execution. Observing the behavior of each computation, the sequential operation runs the fetch and computation in order and the async operation runs all the fetches first, then all the computations afterwards.
Is there a way to use asynchronously fetch and compute? Perhaps with futures or something like gather()? Asyncio has these functions, and trio has them in a seperate package trio_future. I am also open to solutions via other methods (threads and multiprocessing).
I believe that there likely exists a solution with multiprocessing that can make the disk reading operation run in a separate process. However, inter-process communication and blocking then becomes a hassle, as I would need some sort of semaphore to control how many blocks could be generated at a time due to memory constraints, and multiprocessing tends to be quite heavy and slow.
EDIT
Thank you VPfB for your answer. I am not able to sleep(0) in the operation, but I think even if I did, it would necessarily block the computation in favor of performing disk operations. I think this may be a hard limitation of python threading and asyncio, that it can only execute 1 thread at a time. Running two different processes simultaneously is impossible if both require anything but waiting for some external resource to respond from your CPU.
Perhaps there is a way with an executor for a multiprocessing pool. I have added the following code below:
import asyncio
import concurrent.futures
async def asynciorunAsync():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
async for value in matGenerator_Async(testiters):
result = await loop.run_in_executor(pool, computeOpertion,value)
print("Async with PoolExecutor:")
start=dt.now()
asyncio.run(asynciorunAsync())
print(dt.now()-start)
盡管對此進行計時,它仍然需要與同步示例相同的時間。我想我將不得不采用更復雜的解決方案,因為 async 和 await 似乎是一個過于粗糙的工具,無法正確執行此類任務切換。
uj5u.com熱心網友回復:
我不使用 trio,我的回答是基于 asyncio。
在這種情況下,我認為提高 asyncio 性能的唯一方法是將計算分解為更小的部分并await sleep(0)在它們之間插入。這將允許資料獲取任務運行。
Asyncio 使用協作調度。同步 CPU 系結例程不合作,它在運行時會阻止其他一切。
sleep()總是掛起當前任務,允許其他任務運行。將延遲設定為 0 可提供優化路徑以允許其他任務運行。長時間運行的函式可以使用它來避免在函式呼叫的整個持續時間內阻塞事件回圈。
(引自:asyncio.sleep)
如果這是不可能的,請嘗試在executor 中運行計算。這為純異步代碼添加了一些多執行緒功能。
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/360682.html
