我lst在 python 中有一個串列。我想對f這個串列的每一項呼叫一個函式。該函式f呼叫第三方函式g。我還想測量g串列中每個專案的每個函式呼叫所花費的時間lst。我想加快行程,所以我使用多處理池來并行化執行。目前,我有以下代碼,但它不起作用。我從這篇文章中了解到map_async只能呼叫一元函式。我還想利用在其中創建多個行程的優勢,map_async所以我不想切換到apply_async. 有人可以建議我在這里實作目標的更好選擇是什么?
我當前的解決方案不起作用:
import multiprocessing as mp
time_metrics = {}
def f(idx):
global time_metrics
a = time.now()
g(idx)
b = time.now()
time_metrics[idx] = b-a
lst = [1, 2, 3, 4, 5, 6]
pool = mp.Pool(7)
pool.map_async(f, lst)
pool.close()
pool.join()
print(time_metrics)
uj5u.com熱心網友回復:
多處理不共享記憶體空間,它使用行程“分叉”將當前行程狀態(或僅其中一部分,取決于所使用的分叉/衍生型別和作業系統)克隆到 RAM 中的新位置并分配給一個新的行程 ID,然后獨立運行。如果你想使用共享記憶體區域,任務會變得更加復雜,我發現在我的一些舊專案中共享記憶體比使用佇列將資料傳遞回父行程并存盤到字典中要慢。
對于此任務,雖然在我看來您不需要執行任何操作,但您可以只回傳時間值,然后在池完成執行后(以同步模式,而不是異步模式,以便行程池阻塞,直到所有流程都完成了任務)您可以迭代并收集結果。
所以這可能是最簡單的解決方案:
from datetime import datetime
import multiprocessing as mp
time_metrics = {}
def g(a):
# placeholder function for whatever you have as g()
for i in range(5000*a):
pass
def f(idx):
# once spawned, a process calling this function cannot edit objects in the memory of the parent process,
# unless using the special shared memory objects in the mp class.
a = datetime.utcnow()
g(idx)
b = datetime.utcnow()
return (idx, b - a)
if __name__ == "__main__":
lst = [1, 2, 3, 4, 5, 6]
# don't assign 1 process for each job, use only number of cores your machine has, as rarely any benefit of using more, especially for benchmarking.
with mp.Pool() as pool:
# blocks until result is available
results = pool.map(f, lst)
for row in results:
time_metrics[row[0]] = row[1]
print(time_metrics)
如果您有興趣,可以將其重構為使用來自多處理庫的共享記憶體字典或 mp.Queue 的實體將結果傳遞回父行程進行收集,但就我而言,這個問題不是必需的看。
您是否真的需要使用池的異步版本,或者這種方法是否足夠?
如果您真的想使用 map_async,則此代碼段有效:
from datetime import datetime
import multiprocessing as mp
time_metrics = {}
def g(a):
for i in range(5000*a):
pass
def f(idx):
a = datetime.utcnow()
g(idx)
b = datetime.utcnow()
return (idx, b - a)
def append_res(result: tuple):
for row in result:
time_metrics[row[0]] = row[1]
if __name__ == "__main__":
lst = [1, 2, 3, 4, 5, 6]
# don't assign 1 process for each job, use only number of cores your machine has, as rarely any benefit of using more, especially for benchmarking.
with mp.Pool() as pool:
# doesn't block until result is available.
# callback is applied to list of results when all the tasks are complete
results = pool.map_async(f, lst, callback = append_res)
# wait for result to become available, otherwise parent process will exit the context manager and processes will not complete
results.wait()
print(time_metrics)
與 .map() 相比,我不是 100% 確定 .map_async() 的行為,.map() 將按順序將該函式應用于可迭代物件,并且在分配給前一個任務之前不會啟動新任務程序結束。只要機器上的每個 CPU 內核處理的 Python 行程數量不比內核多很多,這使得基準測驗很有用,因為這只會增加開銷和負載,并且會給您不準確的基準測驗。使用 map_async,通常使用異步函式,單個結果可用的順序可能不是它們分配的順序,這對我來說意味著所有任務都同時分配給行程池,這可能會產生競爭任務之間的 CPU 資源,并可能產生不準確的基準,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/398420.html
標籤:蟒蛇-3.x 异步 多处理 python-多处理
下一篇:如何等待遞回異步函式的所有呼叫
