我正在嘗試使用多處理來加快處理大量檔案的速度,而不是一一閱讀。在此之前我做了一個測驗來學習。下面是我的代碼:
from multiprocessing.pool import Pool
from time import sleep, time
def print_cube(num):
aa1 = num * num
aa2 = num * num * num
return aa1, aa2
def main1():
start = time()
x = []
y = []
p = Pool(16)
for j in range(1, 5):
results = p.apply_async(print_cube, args = (j, ))
x.append(results.get()[0])
y.append(results.get()[1])
end = time()
return end - start, x, y
def main2():
start = time()
x = []
y = []
for j in range(1, 5):
results = print_cube(j)
x.append(results[0])
y.append(results[1])
end = time()
return end - start, x, y
if __name__ == "__main__":
print("Method1{0}time : {1}{2}x : {3}{4}y : {5}".format('\n' ,main1()[0], '\n', main1()[1], '\n', main1()[2]))
print("Method2{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main2()[0], '\n', main2()[1], '\n', main2()[2]))
結果是:
Method1
time : 0.1549079418182373
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 0.000000
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
方法1使用多處理,消耗更多CPU,但比方法2花費更多時間。
即使回圈次數j達到 5000 或更多,方法 2 也比方法 1 作業得更好。誰能告訴我我的代碼有什么問題?
uj5u.com熱心網友回復:
使用多處理會產生額外的開銷,例如 (1) 創建行程,(2) 將引數傳遞給在不同行程中運行的作業函式,以及 (3) 將結果傳遞回主行程。因此,worker 函式必須足夠占用 CPU 資源,以便通過并行運行它所獲得的收益抵消了我剛才提到的額外開銷。您的作業函式print_cube不符合該標準,因為它沒有足夠的 CPU 密集型。
但是你甚至沒有并行運行你的作業函式。
您正在通過呼叫方法在回圈中提交任務,該方法multiprocessing.pool.Pool.apply_async回傳一個實體,multiprocessing.pool.AsyncResult但在您apply_async再次呼叫提交下一個任務之前,您正在呼叫方法get,AsyncResult因此阻塞直到第一個任務完成并在提交第二個任務之前回傳其結果任務!!!您必須提交所有任務apply_async并保存回傳的AsyncResult實體,然后才能呼叫get這些實體。只有這樣,您才能實作并行性。即使這樣,您的作業函式 ,print_cube也使用太少的 CPU 來克服多處理使用比串行處理更高性能的額外開銷。
在下面的代碼中,我 (1) 更正了多處理代碼以執行并行性并創建一個大小為 5 的池(沒有理由創建一個具有比您將提交的任務數量或 CPU 數量更多的行程的池您為純 CPU 系結任務而擁有的處理器;這只是您無緣無故創建的額外開銷)和(2)修改print_cube為非常 CPU 密集型以展示多處理如何具有優勢(盡管是以人為的方式):
from multiprocessing.pool import Pool
from time import sleep, time
def print_cube(num):
# emulate a CPU-intensive calculation:
for _ in range(10_000_000):
aa1 = num * num
aa2 = num * num * num
return aa1, aa2
def main1():
start = time()
x = []
y = []
p = Pool(5)
# Submit all the tasks and save the AsyncResult instances:
results = [p.apply_async(print_cube, args = (j, )) for j in range(1, 5)]
# Now wait for the return values:
for result in results:
# Unpack the tuple:
x_value, y_value = result.get()
x.append(x_value)
y.append(y_value)
end = time()
return end - start, x, y
def main2():
start = time()
x = []
y = []
for j in range(1, 5):
results = print_cube(j)
x.append(results[0])
y.append(results[1])
end = time()
return end - start, x, y
if __name__ == "__main__":
print("Method1{0}time : {1}{2}x : {3}{4}y : {5}".format('\n' ,main1()[0], '\n', main1()[1], '\n', main1()[2]))
print("Method2{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main2()[0], '\n', main2()[1], '\n', main2()[2]))
印刷:
Method1
time : 1.109999656677246
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 2.827015
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
重要的提示
除非您有固態驅動器,否則您可能會發現嘗試并行讀取多個檔案可能會適得其反,因為頭部來回移動。這也可能是更適合多執行緒的作業。
uj5u.com熱心網友回復:
@Booboo 首先,非常感謝您詳細而出色的解釋。它對我更好地理解 python 的多處理工具有很大幫助,你的代碼也是一個很好的例子。并且下次嘗試應用多處理時,我想我會首先考慮任務是否滿足您所說的多處理的特性。對于我進行了一些實驗的遲到的回復,我深表歉意。
其次,我在我的計算機上運行了您提供的代碼,它顯示的結果與您的相似,其中方法 1 確實比方法 2 花費的時間更少,但 CPU 消耗更高。
Method1
time : 1.0751237869262695
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 3.642306
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
第三,關于你寫的筆記,資料檔案存盤在固態硬碟中,我測驗了在Method1(with multiprocessing),Method2(nothing)中處理大約50 * 100 MB csv檔案的時間和CPU消耗,和 Method3(多執行緒),分別。Method2 確實消耗了高百分比的 CPU,50%,但沒有像 Method1 那樣達到最大值。結果如下:
time : 12.527468204498291
time : 59.400668144226074
time : 35.45922660827637
第四,下面是模擬 CPU 密集型計算的示例:
import threading
from multiprocessing.pool import Pool
from queue import Queue
from time import time
def print_cube(num):
# emulate a CPU-intensive calculation:
for _ in range(10_000_000_0):
aa1 = num * num
aa2 = num * num * num
return aa1, aa2
def print_cube_queue(num, q):
# emulate a CPU-intensive calculation:
for _ in range(10_000_000_0):
aa1 = num * num
aa2 = num * num * num
q.put((aa1, aa2))
def main1():
start = time()
x = []
y = []
p = Pool(8)
# Submit all the tasks and save the AsyncResult instances:
results = [p.apply_async(print_cube, args = (j, )) for j in range(1, 5)]
# Now wait for the return values:
for result in results:
# Unpack the tuple:
x_value, y_value = result.get()
x.append(x_value)
y.append(y_value)
end = time()
return end - start, x, y
def main2():
start = time()
x = []
y = []
for j in range(1, 5):
results = print_cube(j)
x.append(results[0])
y.append(results[1])
end = time()
return end - start, x, y
def main3():
start = time()
q = Queue()
x = []
y = []
threads = []
for j in range(1, 5):
t = threading.Thread(target=print_cube_queue, args = (j, q))
t.start()
threads.append(t)
for thread in threads:
thread.join()
results = []
for thread in threads:
x_value, y_value = q.get()
x.append(x_value)
y.append(y_value) #q.get()按順序從q中拿出一個值
end = time()
return end - start, x, y
if __name__ == "__main__":
print("Method1{0}time : {1}{2}x : {3}{4}y : {5}".format('\n' ,main1()[0], '\n', main1()[1], '\n', main1()[2]))
print("Method2{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main2()[0], '\n', main2()[1], '\n', main2()[2]))
print("Method3{0}time : {1:.6f}{2}x : {3}{4}y : {5}".format('\n' ,main3()[0], '\n', main3()[1], '\n', main3()[2]))
結果是:
Method1
time : 9.838010549545288
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method2
time : 35.850124
x : [1, 4, 9, 16]
y : [1, 8, 27, 64]
Method3
time : 37.191602
x : [4, 16, 9, 1]
y : [8, 1, 64, 27]
我做了一些搜索,不知道是因為 GIL 還是其他原因。
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/447866.html
