這是將不同大小的多個影像陣列保存在回圈中以及同時使用執行緒/行程的定時示例:
import tempfile
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
from time import perf_counter
import numpy as np
from cv2 import cv2
def save_img(idx, image, dst):
cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)
if __name__ == '__main__':
l1 = np.random.randint(0, 255, (100, 50, 50, 1))
l2 = np.random.randint(0, 255, (1000, 50, 50, 1))
l3 = np.random.randint(0, 255, (10000, 50, 50, 1))
temp_dir = tempfile.mkdtemp()
workers = 4
t1 = perf_counter()
for ll in l1, l2, l3:
t = perf_counter()
for i, img in enumerate(ll):
save_img(i, img, temp_dir)
print(f'Time for {len(ll)}: {perf_counter() - t} seconds')
for executor in ThreadPoolExecutor, ProcessPoolExecutor:
with executor(workers) as ex:
futures = [
ex.submit(save_img, i, img, temp_dir) for (i, img) in enumerate(ll)
]
for f in as_completed(futures):
f.result()
print(
f'Time for {len(ll)} ({executor.__name__}): {perf_counter() - t} seconds'
)
我在 i5 mbp 上得到這些持續時間:
Time for 100: 0.09495482999999982 seconds
Time for 100 (ThreadPoolExecutor): 0.14151873999999998 seconds
Time for 100 (ProcessPoolExecutor): 1.5136184309999998 seconds
Time for 1000: 0.36972280300000016 seconds
Time for 1000 (ThreadPoolExecutor): 0.619205703 seconds
Time for 1000 (ProcessPoolExecutor): 2.016624468 seconds
Time for 10000: 4.232915643999999 seconds
Time for 10000 (ThreadPoolExecutor): 7.251599262 seconds
Time for 10000 (ProcessPoolExecutor): 13.963426469999998 seconds
難道執行緒/行程不需要更少的時間來實作同樣的事情嗎?在這種情況下為什么不呢?
uj5u.com熱心網友回復:
代碼中的計時t是錯誤的,因為在測驗池之前沒有重置計時器。然而,時間的相對順序是正確的。一個可能的定時器復位代碼是:
import tempfile
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
from time import perf_counter
import numpy as np
from cv2 import cv2
def save_img(idx, image, dst):
cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)
if __name__ == '__main__':
l1 = np.random.randint(0, 255, (100, 50, 50, 1))
l2 = np.random.randint(0, 255, (1000, 50, 50, 1))
l3 = np.random.randint(0, 255, (10000, 50, 50, 1))
temp_dir = tempfile.mkdtemp()
workers = 4
for ll in l1, l2, l3:
t = perf_counter()
for i, img in enumerate(ll):
save_img(i, img, temp_dir)
print(f'Time for {len(ll)}: {perf_counter() - t} seconds')
for executor in ThreadPoolExecutor, ProcessPoolExecutor:
t = perf_counter()
with executor(workers) as ex:
futures = [
ex.submit(save_img, i, img, temp_dir) for (i, img) in enumerate(ll)
]
for f in as_completed(futures):
f.result()
print(
f'Time for {len(ll)} ({executor.__name__}): {perf_counter() - t} seconds'
)
多執行緒速度更快,特別是對于 I/O 系結行程。在這種情況下,壓縮影像是 CPU 密集型的,因此根據 OpenCV 和 python 包裝器的實作,多執行緒可能會慢得多。在很多情況下,罪魁禍首是 CPython 的 GIL,但我不確定是否是這種情況(我不知道 GIL 是否在imwrite呼叫程序中被釋放)。在我的設定(i7 8th gen)中,執行緒與 100 張影像的回圈一樣快,而 1000 和 10000 張影像的速度幾乎沒有快。如果ThreadPoolExecutor重用執行緒,則將新任務分配給現有執行緒會產生開銷。如果它不重用執行緒,則啟動新執行緒會產生開銷。
多處理繞過了 GIL 問題,但還有一些其他問題。首先,在行程之間傳遞資料需要一些時間,而在影像的情況下,它可能非常昂貴。其次,在 windows 的情況下,生成一個新行程需要很多時間。查看開銷(行程和執行緒)的一個簡單測驗是將save_image函式更改為一個什么都不做但仍需要酸洗等的函式:
def save_img(idx, image, dst):
if idx != idx:
print("impossible!")
并通過一個類似的沒有引數的方式來查看產生行程的開銷等。
我的設定中的計時顯示僅生成 10000 個行程需要 2.3 秒,而酸洗需要 0.6 秒,這遠遠超過處理所需的時間。
提高吞吐量并將開銷保持在最低限度的一種方法是中斷塊上的作業,并將每個塊提交給作業人員:
import tempfile
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from pathlib import Path
from time import perf_counter
import numpy as np
from cv2 import cv2
def save_img(idx, image, dst):
cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)
def multi_save_img(idx_start, images, dst):
for idx, image in zip(range(idx_start, idx_start len(images)), images):
cv2.imwrite((Path(dst) / f'{idx}.jpg').as_posix(), image)
if __name__ == '__main__':
l1 = np.random.randint(0, 255, (100, 50, 50, 1))
l2 = np.random.randint(0, 255, (1000, 50, 50, 1))
l3 = np.random.randint(0, 255, (10000, 50, 50, 1))
temp_dir = tempfile.mkdtemp()
workers = 4
for ll in l1, l2, l3:
t = perf_counter()
for i, img in enumerate(ll):
save_img(i, img, temp_dir)
print(f'Time for {len(ll)}: {perf_counter() - t} seconds')
chunk_size = len(ll)//workers
ends = [chunk_size * (_ 1) for _ in range(workers)]
ends[-1] = len(ll) % workers
starts = [chunk_size * _ for _ in range(workers)]
for executor in ThreadPoolExecutor, ProcessPoolExecutor:
t = perf_counter()
with executor(workers) as ex:
futures = [
ex.submit(multi_save_img, start, ll[start:end], temp_dir) for (start, end) in zip(starts, ends)
]
for f in as_completed(futures):
f.result()
print(
f'Time for {len(ll)} ({executor.__name__}): {perf_counter() - t} seconds'
)
對于多處理和多執行緒方法,這應該比簡單的 for 有顯著的提升。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/392614.html
上一篇:python中物件方法的多執行緒
