Python的多行程因為可以充分利用CPU多核的特點,所以通常用于計算密集型的場景或者需要大量資料操作的場景,而對于多執行緒,在某些語言中因為可以充分利用CPU,所以可能多執行緒的場景使用得多一點,但是在Python中,多執行緒只能在CPU的單核中運行,不能充分利用CPU多核的特點,所以Python多執行緒通常用于IO密集型的場景或者少量資料的并發操作場景,總而言之,Python的多執行緒只是并發執行,而不是真正的并行執行,而且只能在CPU單核上進行,所以如果需要進行大量的資料操作或者比較耗時的并行操作,那么就可以考慮使用多行程了,
本文只是根據官方檔案簡單記了一下multiprocessing模塊中行程的基本操作,包括創建行程、行程啟動方式、行程間通信、行程間同步、行程池,如果需要其他更多操作,可以參考此模塊的官方中文檔案
創建行程
實體化Process類創建一個行程物件,然后呼叫它的start方法即可生成一個新的行程(子行程),Process行程物件的使用其實和多執行緒模塊threading中的Thread執行緒物件非常相似,可以參考著來使用,
"""
簡單示例:創建一個子行程
"""
import os
from multiprocessing import Process
def func(s):
# 輸出傳入的引數,當前子行程的行程ID,當前行程的父行程ID
print(s, os.getpid(), os.getppid())
# 注意:此處的if __name__ == '__main__'陳述句不能少
if __name__ == '__main__':
# 列印當前行程的行程ID
print(os.getpid())
print('main process start...')
# 創建行程物件
p = Process(target=func, args=('hello', ))
# 生成一個行程,并開始運行新的行程
p.start()
# 等待子行程運行完畢
p.join()
print('main process end!')
列印輸出
13888
main process start...
hello 12484 13888
main process end!
Process類
Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None):group不用特別指定,使用默認就行;target表示需要呼叫的物件;name表示新行程的名稱;args和kwargs表示傳給target物件的元組引數和字典引數;daemon是一個關鍵字引數,使用時必須指定引數名,表示是否為守護行程,如果不指定則默認繼承自呼叫者行程,
注:需要注意的是如果重寫了Process的__init__方法,那么在做任何操作之前需要先呼叫Process.__init__()方法,
常用的方法和屬性:
run:表示行程活動的方法,即此方法的運行是在新開啟的行程中,如果在子類中重寫了此方法,應該在此方法中呼叫target物件,start():用于啟動行程活動(注意此方法是在呼叫者行程中,而不是在新的行程中),并用于保證run方法在一個新的行程中被呼叫,join([timeout]):如果timeout引數沒有指定(默認),則會阻塞當前行程直到呼叫join方法的行程(子行程)運行結束,如果指定了timeout引數,則會阻塞指定的秒數,注意,join方法不能在start方法之前呼叫,但join方法可以呼叫多次,如果想要知道行程的狀態(包括是否結束),可以查看行程物件的exitcode值來進行判斷,name:行程名稱,沒什么實際意義,只是用來表示行程,多個行程可能有相同的名稱,如果沒有特別指定,則默認命名格式為Process-N1:N2:N3...,is_alive():此行程是否存活,damemon:表示行程是否為守護行程,這個標識必須在start()方法呼叫之前進行設定,如果不設定,默認繼承創建者行程,當一個行程終止時,會嘗試終止它的所有守護子行程,需要注意的是,守護行程是不允許創建子行程的,pid:行程ID,exitcode:行程退出狀態,當行程還未結束時,值為None,如果行程結束了,會用一個負值-N表示結束信號,authkey:行程的身份驗證密鑰(位元組字串),當multiprocessing被初始化時,主行程會使用os.urandom()分配一個隨機的字串,當創建Process子行程時,子行程會繼承其父行程的身份密鑰,當然,你也可以修改子行程的身份密鑰,sentinel:系統物件的數字句柄,當行程結束時將變為“ready”,如果想要使用multiprocessing.connection.wait()一次等待多個事件,那可以使用這個值,否則呼叫join()方法會更簡單,terminate():終止行程,在Unix上使用的是SIGTERM信號,在Windows上使用的是TerminateProcess(),注意,行程的后代行程不會被終止(會變成“孤兒”行程),另外,如果被終止的行程在使用Pipe或Queue時,它們有可能會被損害,并無法被其他行程使用;如果被終止的行程已獲得鎖或信號量等,則有可能導致其他行程死鎖,所以請謹慎使用此方法,kill():也是終止行程,但是在Unix上使用的是SIGKILL信號,close():關閉Process物件,并釋放與之關聯的所有資源,如果底層行程仍在運行,則會引發ValueError,而且,一旦close()方法成功回傳,Process物件的大多數方法和屬性也可能會引發ValueError,
行程啟動方式
multiprocessing模塊中行程的啟動方式有三種spawn、fork和forkserver,在不同的系統平臺上它們的使用和默認設定也會有所不同:
- spawn:由父行程啟動一個新的Python解釋器
Process子行程,子行程只會繼承run()方法中所必需的資源,而父行程中那些非必需的檔案描述符和句柄是不會被繼承的,而且,相對于使用fork和forkserver來啟動行程,spawn方法啟動是非常慢的,spawn啟動方式可以在Unix和Windows上使用,且Windows上默認使用此方法啟動, - fork:父行程使用
os.fork()來產生一個新的Python解釋器分叉(fork)子行程,子行程在開始時與父行程是相同的,即子行程會繼承父行程擁有的所有資源,這種方式的問題在于當父行程中存在多執行緒時,啟動的新的子行程的安全性需要自己留意,fort啟動方式只能在Unix中使用,且也是Unix中默認的啟動方式, - forkserver:程式會先使用forkserver啟動一個服務器行程,然后當需要運行一個新的行程時,父行程會先連接到服務器并請求其分叉(fork)一個新的行程, 相比于fork啟動方式,由于forkserver啟動的服務器行程是單執行緒的行程,所以由它通過
os.fork()啟動的行程是安全的(此服務器行程沒有多執行緒的情況),forkserver啟動方式可以在Unix平臺使用,并支持通過Unix管道傳遞檔案描述符,
設定統一的啟動方式:可以在程式運行開始時,即if __name__ == "__main__"中使用multiprocessing.set_start_method(method)函式來設定啟動方式,設定時傳入對應啟動方式的字串即可("spawn"/"fork"/"forkserver"),但是需要注意兩點,一是需要在if __name__ == "__main__"子句中指定,二是只能指定一次,指定之后就不能在其他地方再次指定,
設定特定的啟動方式:可以使用multiprocessing.get_context(method)函式來設定背景關系中的啟動方式,需要注意的是在此背景關系中創建的物件可能與其他背景關系中的物件不兼容,比如,使用fork方式的背景關系中的鎖不能傳遞給spawn或forkserver中使用,另外,如果你不想采用默認的方式或者全域統一的方式,就可以考慮使用get_context(method)方法來指定自己的啟動方式,
注:在Unix上,spawn和forkserver啟動方法不能和“凍結的”可執行內容一同使用(例如,PyInstaller和cx_Freeze包產生的二進制檔案),但是fork啟動方法可以,
行程間通信
使用多行程時,一般使用訊息機制(Pipe()管道和Queue()佇列)實作行程間的通信,而且應該盡可能地避免同步操作,例如鎖,(如果這兩種方式不能滿足你的要求,可以參考下官方檔案中關于multiprocessing.connection的描述,它提供了如監聽器物件Listener和客戶端物件Client等通信方式,感興趣的話也可以去看下)
Pipe類
Pipe([duplex]):回傳一對連接物件(conn1,conn2),它們代表了管道的兩端,引數duplex默認True,表示雙向的(雙工通信),表示管道每一端都可以進行發送和接收資料;如果設定False,則表示單向的(單工通信),此時conn1只能接受資料,conn2只能發送資料,
"""
簡單示例:使用管道Pipe進行行程間通信
"""
from multiprocessing import Process, Pipe
def func(conn):
print('send a list object ot other side...')
# 從管道物件的一端發送資料物件
conn.send(['33', 44, None])
conn.close()
if __name__ == '__main__':
# 默認創建一個雙工管道物件,回傳的兩個物件代表管道的兩端,
# 雙工表示兩端的物件都可以發送和接收資料,但是需要注意,
# 需要避免多個行程或執行緒從一端同時讀或寫資料
parent_conn, child_conn = Pipe()
p = Process(target=func, args=(child_conn, ))
p.start()
# 從管道的另一端接收資料物件
print(parent_conn.recv())
p.join()
Connection類
multiprocessing.connection.Connection:Connection物件允許收發可以序列化的物件或字串,Connection物件通常使用Pipe來創建,
常用的方法:
send(obj):將一個物件發送到連接的另一端,另一端可以使用recv()方法來讀取,注意,發送的物件必須是可以序列化的,物件如果過大可能會引發ValueError例外,recv():回傳一個對端使用send()方法發送的物件,該方法會一直阻塞直到接收到物件為止,如果對端關閉了連接或者沒有東西可以接收時,將會拋出EOFError例外,fileno():回傳由連接物件使用的描述符或句柄,close():關閉連接,當連接物件被垃圾回收時,這個方法會被自動呼叫,poll([timeout]):回傳連接物件中是否有可以讀取的資料,如果未指定引數timeout(默認),此方法會立刻回傳結果,如果指定了timeout,則會阻塞對應timeout秒數,如果timeout為None,則會一直阻塞,不會發生超時,send_bytes(buffer[, offset[, size]]):從一個bytes-like object(位元組類物件)中取出位元組陣列作為一條完整訊息發送,offset引數表示偏移量或者buffer中資料的位置,size表示從offset開始讀取多少資料,如果buffer過大,可能會引發ValueError例外,recv_bytes([maxlength]):以字串的形式回傳一條對端發送過來的位元組資料,此方法會一直阻塞直到接收到訊息,如果對端關閉了連接或者沒有資料可以接收時,將會拋出EOFError例外,如果接收的資料長度大于了maxlength指定的長度,那么也會拋出EOFError例外,并且此時此連接物件不再可讀,recv_bytes_into(buffer[, offset]):將一條完整的位元組資料讀入buffer,并回傳資料的位元組數,此方法會一直阻塞直到接收到資料,如果對端關倍訓者沒有資料可以讀取,則會拋出EOFError例外,buffer必須是一個可寫入的位元組類物件,如果指定了offset引數,將會從offset指定的位置開始寫入buffer,如果buffer過小,也會引發BufferTooShort例外,
Queue類
Queue佇列采用的是FIFO(先進先出)的通信方式,(另外還有SimpleQueue和JoinableQueue,感興趣的可以參考下官方檔案)
當一個物件被放入佇列中時,這個物件首先會被一個后臺執行緒式列化,然后會將序列化的資料通過一個底層管道傳遞到佇列中,從佇列中將資料取出來時也會進行反序列化的操作,
注意一點,在一個空佇列中放入物件后,它的empty()方法會在一個極小的延遲后才會回傳False,
注:如果一個子行程將一些物件放入佇列中,那么這個行程在所有緩沖區的物件被重繪進管道之前,是不會終止的,所以,通常在終止這類行程時,應該保證佇列中的資料都已被使用了,(見示例中的注釋)
"""
簡單示例:使用佇列Queue進行行程間通信
"""
from multiprocessing import Process, Queue
def func(q):
print('put a list object to queue...')
# 向Queue物件中添加一個物件
q.put(['33', 44, None])
# q.put('X' * 1000000)
if __name__ == '__main__':
# 創建一個佇列
q = Queue()
p = Process(target=func, args=(q, ))
p.start()
# 從Queue物件中獲取一個物件
print(q.get())
# 這里需要注意,當向佇列中放入的資料較大時,比如將['33', 44, None]替換為'X' * 1000000時,
# 就會在join()處卡死,為了避免這種情況,
# 通常的做法是先使用get()將資料取出來,再使用join()方法
p.join()
Queue([maxsize]):回傳一個使用Pipe管道和少量鎖和信號量實作的共享佇列實體,當一個行程將一個物件放入佇列時,一個寫入執行緒將會啟動并將物件從緩沖區寫入管道中,
注:multiprocessing.Queue實作了標準庫queue.Queue中除了task_done()和join()的所有方法,
常用的方法和屬性:
qsize():回傳佇列的大致長度,但這個數字在多行程或多執行緒的環境中通常是不可靠的,注意,在Unix平臺上,例如Mac OS X,這個方法可能會拋出NotImplementedError,因為該平臺沒有實作sem_getvalue(),empty():佇列為空則回傳True,否則回傳False,在多行程或多執行緒的環境中,此方法是不可靠的,full():佇列滿則回傳True,否則回傳False,在多行程或多執行緒的環境中,此方法是不可靠的,put(obj[, block[, timeout]]):將物件obj放入佇列,如果引數block為True(默認)且timeout為None(默認),則會阻塞當前行程,直到有空的緩沖槽,如果設定了timeout,則會阻塞指定的timeout秒數,如果阻塞timeout指定秒數后還是沒有可用的緩沖槽,則會拋出queue.Full例外,如果block為False,此時會忽略timeout引數,并且當前有空的緩沖槽可用時才能放入物件,否則會拋出queue.Full例外,put_nowait(obj):相當于put(obj, False),get([block[, timeout]]):從佇列中獲取一個物件,如果引數block為True(默認)且timeout為None(默認),則會阻塞當前行程,直到獲取到物件,如果設定了timeout,則會阻塞指定的timeout秒數,如果阻塞timeout指定秒數后還是沒有獲取到物件,則會拋出queue.Empty例外,如果block設定為False,此時會忽略timeout引數,并且當前有物件可以獲取時才能獲取,否則會拋出queue.Empty例外,get_nowait():相當于get(False),close():表示當前行程將不會再往佇列中放入物件了,一旦緩沖區的所有資料被寫入管道后,對應的后臺執行緒就會退出,而且這個方法在佇列被gc回收時會自動呼叫,join_thread():等待后臺執行緒,這個方法僅在呼叫了close()方法之后可以被呼叫,并且會阻塞當前行程,當變為非阻塞狀態之后,佇列的后臺執行緒會退出,以此確保緩沖區中的所有資料都被寫入管道中,默認情況下,如果一個行程不是此佇列的創建者行程,當它退出時,默認會嘗試等待這個佇列的后臺執行緒,當然這個行程也可以使用cancel_join_thread()方法使join_thread()方法什么都不做直接跳過,cancel_join_thread():用于防止join_thread()方法阻塞當前行程,即防止行程退出時自動等待佇列后執行緒的情況,使用這個方法有可能會導致佇列中的資料丟失,因此大多情況下這個方法并不需要用到,當然,如果你只是想要行程馬上退出,也不在意資料的丟失,那么可以使用這個方法,
注:multiprocessing使用了queue.Empty和queue.Full例外去表示超時,需要從內置的queue模塊中匯入它們,而不是從multiprocessing中匯入,
行程間同步
通常來說同步原語在多行程環境中并不像在多執行緒環境中那么必要,但是也可以參考下,注意,也可以使用Manager()物件創建同步原語,
multiprocessing.Barrier(parties[, action[, timeout]]):類似threading.Barrier的柵欄物件,
multiprocessing.Semaphore([value]):信號量物件,類似于threading.Semaphore,
multiprocessing.BoundedSemaphore([value]):類似threading.BoundedSemaphore的有界信號量物件,
multiprocessing.Condition([lock]):是threading.Condition的別名,引數lock應該是multiprocessing中的Lock或者RLock物件,
multiprocessing.Event:類似threading.Event的事件物件,
multiprocessing.Lock:原始鎖,除非特別說明,否則用法與threading.Lock是一致的,
acquire(block=True, timeout=None):獲取鎖,需要注意一下引數block和timeout與threading.Lock中的名稱和用法的區別,如果block設定為True(默認值),此方法會阻塞行程直到獲取鎖;如果block引數設定為False,行程將不會阻塞,且會忽略timeout引數;如果設定了timeout引數且為正數,則會阻塞指定秒數,如果設定為負數,則等效于值為0的情況,如果timeout為None(默認值),則會一直阻塞,需要注意timeout設定為負數和0時,其作用和threading.Lock是不一致的,此方法的回傳值,在獲取到鎖并將鎖的狀態設定為“鎖住”時回傳True,超時或者沒有獲取到鎖時回傳False,release():釋放鎖,注意,任何行程或執行緒都可釋放這種鎖,并不是只有獲取鎖的行程或執行緒才可以釋放鎖,當試圖釋放一個未“鎖住”的鎖時會引發ValueError例外,其他用法與threading.Lock是一致的,
multiprocessing.RLock:遞回鎖,類似于threading.RLock,只能由獲取鎖的行程或執行緒來進行釋放,并且可以獲取多次,注意,釋放次數必須要與獲取次數一致,acquire(block=True, timeout=None):release():當block設定為True時(默認值),會阻塞行程直到獲取鎖,如果當前行程已經獲取到了鎖(遞回鎖可以多次獲取),那么不會阻塞,并且鎖內的遞回等級加1,并回傳True,如果block設定為False,則不會阻塞,此時如果沒有獲取到鎖,則鎖內的遞回等級不會變,并回傳False,timeout的使用與multiprocessing.Lock.acquire是一樣的,但是注意,此引數與threading.RLock中的使用是有區別的,- ``:釋放鎖,使鎖內的遞回等級減1,如果釋放后鎖的遞回等級降低為0,則會重置鎖的狀態為“釋放”狀態,表名此時鎖沒有被任何行程或執行緒持有;如果釋放后鎖的遞回等級不為0,則鎖定狀態還是“未釋放”的狀態,當前行程或執行緒仍然是鎖的持有者,如果鎖已經處于“釋放”狀態,或者是非鎖的持有者呼叫了此方法,則會拋出
AssertionError例外,注意這個例外與threading.RLock.release()中拋出的例外是不同的,
"""
簡單示例:使用鎖保證行程間的同步操作
"""
from multiprocessing import Process, Lock
def func(lc, num):
# 使用鎖保證以下代碼同一時間只有一個行程在執行
lc.acquire()
print('process num: ', num)
lc.release()
if __name__ == '__main__':
lock = Lock()
for i in range(5):
Process(target=func, args=(lock, i)).start()
列印輸出
process num: 0
process num: 1
process num: 3
process num: 2
process num: 4
行程間共享資料
在多行程的并發編程中應當盡量避免使用共享狀態,但是如果必須要使用的話,multiprocessing模塊提供了兩種方式來使用:共享記憶體和服務行程管理器(Manager()管理器物件會開啟一個服務行程,允許不同機器上的行程通過網路共享資料,本文就不寫了,感興趣的可以去官方檔案了解下(有對應的中文檔案)),
共享記憶體
可以在共享記憶體中創建可被子行程繼承的共享ctypes物件,特點是快捷方便,
multiprocessing.Value(typecode_or_type, *args, lock=True):回傳一個在共享記憶體上創建的ctypes物件,可以通過它的value屬性來訪問它的值,
typecode_or_type:指定回傳的物件型別,可以是一個ctypes型別,也可以是array模塊中每個型別對應的單字符的字串,args:這個引數會傳給這個類的建構式,lock:默認為True,則會新建一個遞回鎖用于對這個值的同步訪問操作;如果lock指定為一個Lock或RLock,則會使用這個鎖來控制這個變數的同步操作;如果lock為False,那么這個值將沒有鎖進行保護,也就是說這個變數不是行程安全的,- 注:如果想要進行
+=這種操作時,因為這種操作并不是原子性的,它是分開的讀和寫操作,所以可以考慮使用如下方式進行這種操作:with my_value_obj.get_lock(): my_value_obj.value += 1,
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True):回傳一個在共享記憶體中創建的ctypes型別的陣列, typecode_or_type:指定陣列中元素的型別,可以是一個ctypes型別,也可以是array模塊中每個型別對應的單字符的字串,size_or_initializer:如果是一個整數,則用于指定陣列的長度,否則,應該傳入一個序列用于初始化這個陣列物件,這個序列的長度就是這個陣列物件的長度,lock:默認為True,則會新建一個鎖用于對這個值的同步訪問操作;如果lock指定為一個Lock或RLock,則會使用這個鎖來控制這個變數的同步操作;如果lock為False,那么這個值將沒有鎖進行保護,也就是說這個變數不是行程安全的,- 注:
ctypes.c_char型別的陣列具有 value和raw屬性,可以用來保存和提取字串,
"""
簡單示例:使用共享記憶體的方式,共享值Value物件和資料Array物件
"""
from multiprocessing import Process, Value, Array
def func(n, a):
n.value = https://www.cnblogs.com/guyuyun/archive/2020/10/04/3.333
for i in range(len(a)):
a[i] = -a[i]
if __name__ =='__main__':
num = Value('d', 0.0) # 第一個引數d表示資料型別“double”雙精度浮點型別
arr = Array('i', range(6)) # 第一個引數i表示資料型別“integer”整型
p = Process(target=func, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
列印輸出
3.333
[0, -1, -2, -3, -4, -5]
行程池
創建一個Pool行程池物件,并執行提交給它的任務,行程池物件允許其中的行程以不同的方式運行,但是需要注意,Pool物件的方法只能是創建它的行程才能呼叫,
Pool類
multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]]):創建一個行程池物件,支持帶有超時和回呼的異步結果,以及一個并行的map實作,
processes:指定行程池中的作業行程數量,如果為None,則使用os.cpu_count()的回傳值,initializer和initargs:如果initializer不為None,則每個作業行程將會在啟動時呼叫initializer(*initargs),maxtasksperchild:指定一個作業行程在退出或者被一個新的行程替代之前能完成的任務數量,以便保證資源的釋放,默認為None,表示作業行程的壽命與行程池是相同的,context:指定啟動的作業行程的背景關系,通常一個行程池是通過multiprocessing.Pool()或者背景關系物件的Pool()來創建的,而這兩種創建行程池的方式都是可以的,- 注:使用行程池物件時,應該正確終結該物件,應該將行程池物件當做背景關系管理器來使用(
with陳述句),或者手動呼叫close()和terminate()方法,而依賴于垃圾回收器來銷毀行程池物件是不正確的做法,
行程池物件的常用方法: apply(func[, args[, kwds]]):在行程池中開啟一個新的行程并執行func函式,另外兩個引數則是函式的引數,在這個函式執行完之前,當前行程會一直阻塞,apply_async(func[, args[, kwds[, callback[, error_callback]]]]):這是apply方法的一個變體,會回傳一個AsyncResult結果物件,如果指定了callback引數,則會在func函式執行成功后將回傳結果當做引數傳入callback指定的可呼叫物件,執行失敗則會呼叫error_callback指定的可呼叫物件,map(func, iterable[, chunksize]):內置map()函式的一個并行版本,會一直阻塞當前行程直到運行完可迭代物件中的所有元素,并回傳結果,此方法會將可迭代物件分割為許多塊,chunksize引數用于指定每個塊的大小,并行的行程,每個行程會對應一個塊,每次會運行塊中的一個元素,注意,對于比較大的迭代物件,可能會很耗時,此時可以考慮使用imap()或者imap_unordered(),并且使用時指定chunksize引數可能會得到更好的效率,map_async(func, iterable[, chunksize[, callback[, error_callback]]]):map方法的一個變體,可以回傳一個處理后的AsyncResult結果物件,其他引數的使用與appply_async方法是一致的,imap(func, iterable[, chunksize]):map()方法的延遲執行版本,對于較大的迭代,chunksize設定一個較大的值會比默認值1會有更高的執行效率,同樣,對于比較消耗記憶體的迭代,建議使用這個方法,而不是使用map()方法,如果chunksize為1,則imap()方法回傳的迭代器的next()方法擁有一個可選的引數timeout,如果在指定的timeout時間內未得到執行結果,next(timeout)就會拋出multiprocessing.TimeoutError例外,imap_unordered(func, iterable[, chunksize]):和imap()類似,只不過回傳的結果是無序的,當然只有一個行程的時候,回傳的結果就是有序的,starmap(func, iterable[, chunksize]):和map()類似,不過iterable中的每個元素都會被再次解包作為func的引數傳入進去,如[(1, 2), (3, 4)]會轉化為類似[func(1, 2), func(3, 4)],starmap_async(func, iterable[, chunksize[, callback[, error_callback]]]):和starmap類似,會回傳一個結果物件,close():會阻止后續任務提交到行程池,當所有任務都執行完成后,作業行程就會退出,terminate():不用等待未完成的任務,立即停止作業行程,當行程池被垃圾回收時,此方法會被立即呼叫,join():等待作業行程結束,注意,呼叫此方法前必須先呼叫close()方法或terminate()方法,
AsyncResult類
multiprocessing.pool.AsyncResult:apply_async()和map_async()這兩個方法回傳的結果物件對應的類,
常用的方法:
get([timeout]):用于獲取執行結果,如果timeout引數不是None,并且在指定時間內沒有得到執行結果,則會拋出multiprocessing.TimeoutError例外,wait([timeout]):阻塞當前行程,直到回傳結果,或者timeout超時,ready():判斷執行是否完成,successful():判斷呼叫是否已經完成,并且未引發例外,如果未執行完成,則會引發ValueError例外,
"""
這是官方檔案上給出的示例,我就直接貼在這兒了
"""
from multiprocessing import Pool
import time
def f(x):
return x * x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
編程指導
這是官方檔案中對于multiprocessing模塊給出的一些編程建議,我放在這里了,可以參考下,
對于所有啟動方法
- 避免共享狀態:應該避免在行程間傳遞大量資料,傳遞的資料應該越少越好,最好使用佇列或者管道進行行程間的通信,而不是使用底層的同步原語,
- 可序列化:保證代理的方法的引數是可序列化的,
- 代理的執行緒安全:不要在多執行緒之間同時使用一個代理物件,除非你用鎖保護它,但是在多行程之間使用相同的代理物件是不會有問題的,
- 使用join避免僵尸行程:在Unix上,如果一個行程執行完成但是沒有被
join,就會變成僵尸行程,一般來說,僵尸行程不會很多,因為每次啟動新行程或者active_children()被呼叫時,所有已執行完成且沒有被join的行程都會被自動被join,而且對一個執行完成的行程呼叫Process.is_alive也會join這個行程,盡管如此,對自己啟動的行程顯式呼叫join依然是最佳的實踐, - 繼承優于序列化、反序列化:當使用spawn或者forkserver的啟動方式時,
multiprocessing模塊中的許多型別都必須是可序列化的,這樣子行程才能使用它們,但是,通常我們都應該避免使用管道和佇列來發送共享物件到另一個行程,而是應該優先采用讓子行程通過繼承的方式從父行程中訪問這些共享物件, - 避免手動殺死行程:通過
Process.terminate終止一個行程很容易導致這個行程正在使用的資源(如鎖、信號量、管道和佇列)損壞或者變得不可用,導致其他需要使用這些資源的行程無法使用,所以,最好是那些從來不使用這些共享資源的行程才呼叫Process.terminate, - 使用佇列的行程的join:如果一個行程使用了佇列,并往佇列中放入資料,那么這個行程會一直阻塞,直到所有的快取項都被
feeder執行緒傳遞給底層管道,這意味著,在這個行程使用join方法之前,需要保證放入佇列的全部資料都已經被其他的執行緒或行程消費完,否則不能保證這個佇列的行程可以正常終止(注意,非守護行程都會自動join),如下是一個死鎖的示例:解決辦法是交換最后兩行或者洗掉p.join()這一行,
from multiprocessing import Process, Queue
def f(q):
q.put('X' * 1000000)
if __name__ == '__main__':
queue = Queue()
p = Process(target=f, args=(queue,))
p.start()
p.join() # this deadlocks
obj = queue.get()
- 顯式傳遞資源給子行程:在Unix上,使用fork方式啟動的子行程可以使用父行程中全域創建的共享資源,但是還是建議顯式的傳遞資源給子行程,這樣保證了子行程結束后,這個資源也不會被回收,如果直接使用,有可能會導致子行程結束時這個資源被釋放掉,如下,示例1為錯誤示范,應該為示例2的方式,
"""示例1"""
from multiprocessing import Process, Lock
def f():
... do something using "lock" ...
if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=f).start()
"""示例2"""
from multiprocessing import Process, Lock
def f(l):
... do something using "l" ...
if __name__ == '__main__':
lock = Lock()
for i in range(10):
Process(target=f, args=(lock,)).start()
spawn和forkserver啟動方式
spawn和forkserver的以下一些特點,相對于另外一種fork啟動方式,會有一些區別和限制,
- 更依賴序列化:
Process.__init__()的所有引數都必須是可序列化的,同樣的,Process的子類實體在呼叫start方法時也必須保證是可以被序列化的, - 全域變數:如果子行程在代碼中嘗試訪問一個全域變數時,需要小心,它此時的值可能與父行程中執行
Process.start方法時的值不一樣了,當然,如果它是模塊級別的常量時,是沒問題的, - 安全匯入主模塊:需要確保主模塊可以被新啟動的Python解釋器(比如啟動了一個子行程)安全匯入而不會引發其他問題,見示例1和示例2,
示例1:以下代碼會引發RuntimeError,
from multiprocessing import Process
def foo():
print('hello')
p = Process(target=foo)
p.start()
示例2:對于以上代碼,應該使用if __name__ == '__main__'來保護程式入口點,
from multiprocessing import Process, freeze_support, set_start_method
def foo():
print('hello')
# 這個入口點可以允許子行程安全匯入此模塊并使用此模塊中的foo函式
if __name__ == '__main__':
freeze_support() # 如果正常運行程式而不是需要打包“凍結”,則可以忽略此句,
set_start_method('spawn')
p = Process(target=foo)
p.start()
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/155885.html
標籤:其他
上一篇:win7 配置AMP環境(apache2.4.39 + php7.1.28)
下一篇:學習第45天
