我只是在做簡單的 I/O 任務,想提高我的程式的性能,使用 1000 個執行緒(這很重要,因為我想同時運行大量任務,而 MultiProcessingPool 沒有做這項作業 obv如果我只有 8 個內核,我只能運行 8 個任務)啟動它們的時間太長,CLI 似乎凍結了,2-3 分鐘后任務終于開始了。所以我想將它們分布在多行程的內核中,以利用我機器的更多功能。
所以我當前的代碼看起來像這樣(真正的 runTask 方法要復雜得多,而不僅僅是列印,而且 profileTasks 串列中的資料不僅僅是一個字串):
from concurrent.futures import ThreadPoolExecutor, as_completed
class ThreadingxMultiprocessing():
def __init__(self) -> None:
profileTasks = ["TEST1",
"TEST2",
"TEST3",
"TEST4",
"TEST5",
"TEST6",
"TEST7",
"TEST8",
"TEST9",
"TEST10",
"TEST11",
"TEST12",
"TEST13",
"TEST14",
"TEST15",
"TEST16",
"TEST17",
"TEST18",
"TEST19",
"TEST20",
"TEST21",
"TEST22",
"TEST23",
"TEST24",
"... and some more to get to 1k profiles",]
self.threads=1000
while True:
with ThreadPoolExecutor(max_workers=self.threads) as executor:
for index, profile in enumerate(profileTasks):
executor.submit(
self.runTask, index, profile
)
break
def runTask(self, index, profile):
print(index,profile)
ThreadingxMultiprocessing()
我想過這樣的事情,將執行緒除以您擁有的 CPU 內核數量,然后將它們平均分布在它們之上:
from concurrent.futures import ThreadPoolExecutor, as_completed
import multiprocessing
import math
number_of_cpucores = multiprocessing.cpu_count()
class ThreadingxMultiprocessing():
def __init__(self) -> None:
profileTasks = ["TEST1",
"TEST2",
"TEST3",
"TEST4",
"TEST5",
"TEST6",
"TEST7",
"TEST8",
"TEST9",
"TEST10",
"TEST11",
"TEST12",
"TEST13",
"TEST14",
"TEST15",
"TEST16",
"TEST17",
"TEST18",
"TEST19",
"TEST20",
"TEST21",
"TEST22",
"TEST23",
"TEST24",
"... and some more to get to 1k profiles"]
self.threads=1000
#round them to get an integer datatype
threads_in_each_process = math.ceil(float(self.threads)/ float(number_of_cpucores))
#-> and then starting the thread pools e.g. with 125 threads each if you have 8 cores
multiprocessing.Process()
def runTask(self, index, profile):
print(index,profile)
ThreadingxMultiprocessing()
但是我真的不知道如何設定它,也許你們中的任何人都有想法?
uj5u.com熱心網友回復:
您顯然可以創建一個每個行程都將運行的函式,該函式將創建執行緒池并向其提交作業,您只需使用自定義函式將作業分成相等的部分。
from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor
def runTask(index, profile):
print(index, profile)
def process_function(work_and_workers):
work_list, workers = work_and_workers
work_to_do = []
with ThreadPoolExecutor(max_workers=workers) as pool:
for element in work_list:
work_to_do.append(pool.submit(runTask, *element))
for element in work_to_do:
element.result()
def chunks(lst, n):
"""Yield successive n-sized chunks from lst."""
for i in range(0, len(lst), n):
yield lst[i:i n]
class ThreadingxMultiprocessing():
def __init__(self) -> None:
profileTasks = [f"Test{x}" for x in range(1000)]
self.cores = 8
self.threads_per_worker = 125
self.chunk_size = 125
work_to_do = []
with ProcessPoolExecutor(max_workers=self.cores) as executor:
for index, profile in enumerate(profileTasks):
work_to_do.append((index,profile))
executor.map(process_function,
((x, self.threads_per_worker) for x in chunks(work_to_do, self.chunk_size)))
if __name__ == "__main__":
ThreadingxMultiprocessing()
這是任何人都可以撰寫的用于盲目并行化作業的最糟糕的代碼,即使作業大小完全相同,根據作業的不同,您將獲得比具有 16 個作業人員的簡單行程池更差的性能。
這里最大的兩個問題是平衡跨行程的作業并將結果發送回主行程,佇列在這方面會很有用,但平衡作業在大多數系統上作業量太大,因為這種“同等大小的作業”不是將平均分配到您的物理核心。
即使作業平衡并將結果回傳到主行程,在 8 核機器上運行 1000 個執行緒也會因為不斷的背景關系切換而變慢,而且 IO 通常不會處理 1000 個并發命中并且可能會崩潰或慢點,所以雖然這“并行化”了這項作業,但這就像讓 1000 個人為一個孩子烤一個小蛋糕一樣……它不會很漂亮。
您可能應該研究其他并行化機制,例如 AsyncIO 或減少執行緒數,因為在問題上拋出“并行”會使您的代碼變慢,而不是更快。
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/525524.html
標籤:Python多线程多处理
下一篇:在腳本中實作多執行緒/并行處理
