我有一個python 3.9使用Pool和ThreadPoolfrom的代碼multiprocessing.pool。目的是讓2 Pools,每個獨立產卵3 ThreadPools。換句話說,我希望2*3 = 6執行緒并行運行。
但是,下面的最小作業示例 (MWE) 代碼的輸出僅導致3不同的執行緒 ID。
我的問題:為什么它會這樣,我該如何合理地解決這個問題?
此外,如果這樣的N_POOL * N_THREADPOOL策略看起來不好,歡迎提出建議。實際任務是受 I/O 限制的(網路下載,然后是輕量級預處理)。我對并行性比較陌生。
MWE 代碼
from multiprocessing.pool import Pool, ThreadPool
from multiprocessing import Queue
from threading import get_ident
import random
from time import sleep
from functools import partial
# Params
N = 12
N_CPU = 2
N_THREAD = 3
# Just for CPU numbering
CPU_QUEUE = Queue(N_CPU)
for i in range(1, 1 N_CPU):
CPU_QUEUE.put(i)
def split_list_to_pools(ls_data, n_pools):
"""Split data into pools as lists of approx. equal lengths."""
n_each = int((len(ls_data) - 1) / n_pools) 1
return [ls_data[n_each * i:n_each * (i 1)] for i in range(n_pools)]
def process_threadpool(data, CPU_NO=-1):
"""Process incoming data one-by-one"""
sleep(3 random.random())
print(f"Threadpool id: {get_ident()} CPU_NO: {CPU_NO} / {N_CPU}, data: {data}")
def process_pool(ls_data):
"""Process a list of data."""
# Get initial pool status
CPU_NO = CPU_QUEUE.get()
print(f"Pool CPU_NO: {CPU_NO}, data: {ls_data}")
with ThreadPool(N_THREAD) as threadpool:
for _ in threadpool.imap_unordered(partial(process_threadpool, CPU_NO=CPU_NO), ls_data):
pass
if __name__ == '__main__':
# given data
ls_data = list(range(N))
# split data to pools
ls_ls_data = split_list_to_pools(ls_data, N_CPU)
print(f"data rearranged for pool: {ls_ls_data}")
# process in parallel
with Pool(N_CPU) as pool:
for _ in pool.imap_unordered(process_pool, ls_ls_data):
pass
print("Program Ended!")
輸出
僅存在 3 個不同的執行緒 ID,而不是預期的 6 個。
$ python so.py
data rearranged for pool: [[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11]]
Pool CPU_NO: 1, data: [0, 1, 2, 3, 4, 5]
Pool CPU_NO: 2, data: [6, 7, 8, 9, 10, 11]
Threadpool id: 140065165276928 CPU_NO: 1 / 2, data: 2
Threadpool id: 140065165276928 CPU_NO: 2 / 2, data: 8
Threadpool id: 140065182062336 CPU_NO: 2 / 2, data: 6
Threadpool id: 140065182062336 CPU_NO: 1 / 2, data: 0
Threadpool id: 140065173669632 CPU_NO: 2 / 2, data: 7
Threadpool id: 140065173669632 CPU_NO: 1 / 2, data: 1
Threadpool id: 140065165276928 CPU_NO: 1 / 2, data: 3
Threadpool id: 140065182062336 CPU_NO: 2 / 2, data: 10
Threadpool id: 140065182062336 CPU_NO: 1 / 2, data: 4
Threadpool id: 140065165276928 CPU_NO: 2 / 2, data: 9
Threadpool id: 140065173669632 CPU_NO: 1 / 2, data: 5
Threadpool id: 140065173669632 CPU_NO: 2 / 2, data: 11
Program Ended!
編輯:代碼在 debian 11 下運行
uj5u.com熱心網友回復:
您沒有指定您正在運行的平臺,但我必須假設它是使用fork創建新行程(例如 Linux)的平臺,否則我不相信您的代碼會正常作業,因為在spawn池中的每個行程都會正在創建自己的全域副本,CPU_QUEUE因此每個人都將獲得佇列中的第一項并相信它是 CPU id 1。
因此,我對代碼進行了兩項更改:
CPU_QUEUE通過使用池初始化程式使用單個佇列實體為池中的每個行程初始化全域變數,使代碼在平臺之間更具可移植性。time.sleep在函式開始時引入了一個呼叫,process_pool以使池中的每個行程都有機會處理其中一個提交的任務。如果沒有這個,理論上池中的一個行程可以處理所有提交的任務,這只會降低這種可能性。
當我在 Linux 下運行代碼時,我基本上看到了你所看到的。但是,當我在 Windows 下運行它時,由于上述更改,我現在可以這樣做,我看到:
data rearranged for pool: [[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11]]
Pool CPU_NO: 1, data: [6, 7, 8, 9, 10, 11]
Pool CPU_NO: 2, data: [0, 1, 2, 3, 4, 5]
Threadpool id: 16924 CPU_NO: 1 / 2, data: 8
Threadpool id: 15260 CPU_NO: 1 / 2, data: 6
Threadpool id: 19800 CPU_NO: 2 / 2, data: 1
Threadpool id: 7580 CPU_NO: 2 / 2, data: 2
Threadpool id: 20368 CPU_NO: 1 / 2, data: 7
Threadpool id: 18736 CPU_NO: 2 / 2, data: 0
Threadpool id: 19800 CPU_NO: 2 / 2, data: 3
Threadpool id: 16924 CPU_NO: 1 / 2, data: 9
Threadpool id: 7580 CPU_NO: 2 / 2, data: 4
Threadpool id: 15260 CPU_NO: 1 / 2, data: 10
Threadpool id: 18736 CPU_NO: 2 / 2, data: 5
Threadpool id: 20368 CPU_NO: 1 / 2, data: 11
Program Ended!
這是您期望看到的。我只能得出結論,在 Linux 下threading.get_ident只回傳一個行程內的唯一值。但是,如果您使用_thread.get_native_id()我已將其合并到下面的源代碼中的相反,那似乎確實給出了 6 個唯一值(如所希望的那樣):
data rearranged for pool: [[0, 1, 2, 3, 4, 5], [6, 7, 8, 9, 10, 11]]
Pool CPU_NO: 1, data: [0, 1, 2, 3, 4, 5]
Pool CPU_NO: 2, data: [6, 7, 8, 9, 10, 11]
Threadpool id: 81 CPU_NO: 2 / 2, data: 7
Threadpool id: 83 CPU_NO: 2 / 2, data: 8
Threadpool id: 78 CPU_NO: 1 / 2, data: 0
Threadpool id: 79 CPU_NO: 2 / 2, data: 6
Threadpool id: 80 CPU_NO: 1 / 2, data: 1
Threadpool id: 82 CPU_NO: 1 / 2, data: 2
Threadpool id: 78 CPU_NO: 1 / 2, data: 3
Threadpool id: 83 CPU_NO: 2 / 2, data: 10
Threadpool id: 81 CPU_NO: 2 / 2, data: 9
Threadpool id: 79 CPU_NO: 2 / 2, data: 11
Threadpool id: 80 CPU_NO: 1 / 2, data: 4
Threadpool id: 82 CPU_NO: 1 / 2, data: 5
Program Ended!
修改后的來源
from multiprocessing.pool import Pool, ThreadPool
from multiprocessing import Queue
#from threading import get_ident
from threading import get_native_id
import random
from time import sleep
from functools import partial
# Params
N = 12
N_CPU = 2
N_THREAD = 3
def init_pool_processes(the_queue):
global CPU_QUEUE
CPU_QUEUE = the_queue
def split_list_to_pools(ls_data, n_pools):
"""Split data into pools as lists of approx. equal lengths."""
n_each = int((len(ls_data) - 1) / n_pools) 1
return [ls_data[n_each * i:n_each * (i 1)] for i in range(n_pools)]
def process_threadpool(data, CPU_NO=-1):
"""Process incoming data one-by-one"""
sleep(3 random.random())
print(f"Threadpool id: {get_native_id()} CPU_NO: {CPU_NO} / {N_CPU}, data: {data}")
def process_pool(ls_data):
"""Process a list of data."""
# Get initial pool status
sleep(.2)
CPU_NO = CPU_QUEUE.get()
print(f"Pool CPU_NO: {CPU_NO}, data: {ls_data}")
with ThreadPool(N_THREAD) as threadpool:
for _ in threadpool.imap_unordered(partial(process_threadpool, CPU_NO=CPU_NO), ls_data):
pass
if __name__ == '__main__':
# Just for CPU numbering
CPU_QUEUE = Queue(N_CPU)
for i in range(1, 1 N_CPU):
CPU_QUEUE.put(i)
# given data
ls_data = list(range(N))
# split data to pools
ls_ls_data = split_list_to_pools(ls_data, N_CPU)
print(f"data rearranged for pool: {ls_ls_data}")
# process in parallel
with Pool(N_CPU, initializer=init_pool_processes, initargs=(CPU_QUEUE,)) as pool:
for _ in pool.imap_unordered(process_pool, ls_ls_data):
pass
print("Program Ended!")
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/447429.html
上一篇:如何獨立于異步回圈運行阻塞代碼
