尊重原創著作權: https://www.gewuweb.com/hot/7715.html
Python多執行緒、多行程最全整理
尊重原創著作權: https://www.gewuweb.com/sitemap.html
在學習Python的程序中,有接觸到多執行緒編程相關的知識點,先前一直都沒有徹底的搞明白,今天準備花一些時間,把里面的細節盡可能的梳理清楚,
執行緒與行程的區別
行程(process)和執行緒(thread)是作業系統的基本概念,但是它們比較抽象,不容易掌握,關于多行程和多執行緒,教科書上最經典的一句話是“
行程是資源分配的最小單位,執行緒是CPU調度的最小單位
”,執行緒是程式中一個單一的順序控制流程,行程內一個相對獨立的、可調度的執行單元,是系統獨立調度和分派CPU的基本單位指運行中的程式的調度單位,在單個程式中同時運行多個執行緒完成不同的作業,稱為多執行緒,
行程和執行緒區別
行程是資源分配的基本單位,所有與該行程有關的資源,都被記錄在行程控制塊PCB中,以表示該行程擁有這些資源或正在使用它們,另外,行程也是搶占處理機的調度單位,它擁有一個完整的虛擬地址空間,當行程發生調度時,不同的行程擁有不同的虛擬地址空間,而同一行程內的不同執行緒共享同一地址空間,
與行程相對應,執行緒與資源分配無關,它屬于某一個行程,并與行程內的其他執行緒一起共享行程的資源,執行緒只由相關堆疊(系統堆疊或用戶堆疊)暫存器和執行緒控制表TCB組成,暫存器可被用來存盤執行緒內的區域變數,但不能存盤其他執行緒的相關變數,
通常在一個行程中可以包含若干個執行緒,它們可以利用行程所擁有的資源,在引入執行緒的作業系統中,通常都是把行程作為分配資源的基本單位,而把執行緒作為獨立運行和獨立調度的基本單位,
由于執行緒比行程更小,基本上不擁有系統資源,故對它的調度所付出的開銷就會小得多,能更高效的提高系統內多個程式間并發執行的程度,從而顯著提高系統資源的利用率和吞吐量,
因而近年來推出的通用作業系統都引入了執行緒,以便進一步提高系統的并發性,并把它視為現代作業系統的一個重要指標,
執行緒與行程的區別可以歸納為以下4點:
-
地址空間和其它資源(如打開檔案):行程間相互獨立,同一行程的各執行緒間共享,某行程內的執行緒在其它行程不可見,
-
通信:行程間通信IPC,執行緒間可以直接讀寫行程資料段(如全域變數)來進行通信——需要行程同步和互斥手段的輔助,以保證資料的一致性,
-
調度和切換:執行緒背景關系切換比行程背景關系切換要快得多,
-
在多執行緒OS中,行程不是一個可執行的物體,
多行程和多執行緒的比較
| 對比維度 | 多行程 | 多執行緒 | 總結 |
|---|---|---|---|
| 資料共享、同步 | 資料共享復雜,同步簡單 | 資料共享簡單,同步復雜 | 各有優劣 |
| 記憶體、CPU | 占用記憶體多,切換復雜,CPU利用率低 | 占用記憶體少,切換簡單,CPU利用率高 | 執行緒占優 |
| 創建、銷毀、切換 | 復雜,速度慢 | 簡單,速度快 | 執行緒占優 |
| 編程、除錯 | 編程簡單,除錯簡單 | 編程復雜,除錯復雜 | 行程占優 |
| 可靠性 | 行程間不會互相影響 | 一個執行緒掛掉將導致整個行程掛掉 | 行程占優 |
| 分布式 | 適用于多核、多機,擴展到多臺機器簡單 | 適合于多核 | 行程占優 |
總結,行程和執行緒還可以類比為火車和車廂:
-
執行緒在行程下行進(單純的車廂無法運行)
-
一個行程可以包含多個執行緒(一輛火車可以有多個車廂)
-
不同行程間資料很難共享(一輛火車上的乘客很難換到另外一輛火車,比如站點換乘)
-
同一行程下不同執行緒間資料很易共享(A車廂換到B車廂很容易)
-
行程要比執行緒消耗更多的計算機資源(采用多列火車相比多個車廂更耗資源)
-
行程間不會相互影響,一個執行緒掛掉將導致整個行程掛掉(一列火車不會影響到另外一列火車,但是如果一列火車上中間的一節車廂著火了,將影響到該趟火車的所有車廂)
-
行程可以拓展到多機,行程最多適合多核(不同火車可以開在多個軌道上,同一火車的車廂不能在行進的不同的軌道上)
-
行程使用的記憶體地址可以上鎖,即一個執行緒使用某些共享記憶體時,其他執行緒必須等它結束,才能使用這一塊記憶體,(比如火車上的洗手間)-”互斥鎖(mutex)”
-
行程使用的記憶體地址可以限定使用量(比如火車上的餐廳,最多只允許多少人進入,如果滿了需要在門口等,等有人出來了才能進去)-“信號量(semaphore)”
Python全域解釋器鎖GIL
全域解釋器鎖(英語:Global Interpreter
Lock,縮寫GIL),并不是Python的特性,它是在實作Python決議器(CPython)時所引入的一個概念,由于CPython是大部分環境下默認的Python執行環境,所以在很多人的概念里CPython就是Python,也就想當然的把GIL歸結為Python語言的缺陷,那么CPython實作中的GIL又是什么呢?來看看官方的解釋:
The mechanism used by the CPython interpreter to assure that only one
thread executes Python bytecode at a time. This simplifies the CPython
implementation by making the object model (including critical built-in types
such as dict) implicitly safe against concurrent access. Locking the entire
interpreter makes it easier for the interpreter to be multi-threaded, at the
expense of much of the parallelism afforded by multi-processor machines.
Python代碼的執行由Python 虛擬機(也叫解釋器主回圈,CPython版本)來控制,Python
在設計之初就考慮到要在解釋器的主回圈中,同時只有一個執行緒在執行,即在任意時刻,只有一個執行緒在解釋器中運行,對Python
虛擬機的訪問由全域解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個執行緒在運行,
GIL 有什么好處?簡單來說,它在單執行緒的情況更快,并且在和 C 庫結合時更方便,而且不用考慮執行緒安全問題,這也是早期 Python
最常見的應用場景和優勢,另外,GIL的設計簡化了CPython的實作,使得物件模型,包括關鍵的內建型別如字典,都是隱含可以并發訪問的,鎖住全域解釋器使得比較容易的實作對多執行緒的支持,但也損失了多處理器主機的并行計算能力,
在多執行緒環境中,Python 虛擬機按以下方式執行:
-
設定GIL
-
切換到一個執行緒去運行
-
運行直至指定數量的位元組碼指令,或者執行緒主動讓出控制(可以呼叫sleep(0))
-
把執行緒設定為睡眠狀態
-
解鎖GIL
-
再次重復以上所有步驟
Python3.2前,GIL的釋放邏輯是當前執行緒遇見IO操作或者ticks計數達到100(ticks可以看作是python自身的一個計數器,專門做用于GIL,每次釋放后歸零,這個計數可以通過
sys.setcheckinterval
來調整),進行釋放,因為計算密集型執行緒在釋放GIL之后又會立即去申請GIL,并且通常在其它執行緒還沒有調度完之前它就已經重新獲取到了GIL,就會導致一旦計算密集型執行緒獲得了GIL,那么它在很長一段時間內都將占據GIL,甚至一直到該執行緒執行結束,
Python
3.2開始使用新的GIL,新的GIL實作中用一個固定的超時時間來指示當前的執行緒放棄全域鎖,在當前執行緒保持這個鎖,且其他執行緒請求這個鎖時,當前執行緒就會在5毫秒后被強制釋放該鎖,該改進在單核的情況下,對于單個執行緒長期占用GIL的情況有所好轉,
在單核CPU上,數百次的間隔檢查才會導致一次執行緒切換,在多核CPU上,存在嚴重的執行緒顛簸(thrashing),而每次釋放GIL鎖,執行緒進行鎖競爭、切換執行緒,會消耗資源,單核下多執行緒,每次釋放GIL,喚醒的那個執行緒都能獲取到GIL鎖,所以能夠無縫執行,但多核下,CPU0釋放GIL后,其他CPU上的執行緒都會進行競爭,但GIL可能會馬上又被CPU0拿到,導致其他幾個CPU上被喚醒后的執行緒會醒著等待到切換時間后又進入待調度狀態,這樣會造成執行緒顛簸(thrashing),導致效率更低,
另外,從上面的實作機制可以推匯出,Python的多執行緒對IO密集型代碼要比CPU密集型代碼更加友好,
針對GIL的應對措施:
-
使用更高版本Python(對GIL機制進行了優化)
-
使用多行程替換多執行緒(多行程之間沒有GIL,但是行程本身的資源消耗較多)
-
指定cpu運行執行緒(使用affinity模塊)
-
使用Jython、IronPython等無GIL解釋器
-
全IO密集型任務時才使用多執行緒
-
使用協程(高效的單執行緒模式,也稱微執行緒;通常與多行程配合使用)
-
將關鍵組件用C/C++撰寫為Python擴展,通過ctypes使Python程式直接呼叫C語言編譯的元件的匯出函式,(with nogil調出GIL限制)
Python的多行程包multiprocessing
Python的threading包主要運用多執行緒的開發,但由于GIL的存在,Python中的多執行緒其實并不是真正的多執行緒,如果想要充分地使用多核CPU的資源,大部分情況需要使用多行程,在Python
2.6版本的時候引入了multiprocessing包,它完整的復制了一套threading所提供的介面方便遷移,唯一的不同就是它使用了多行程而不是多執行緒,每個行程有自己的獨立的GIL,因此也不會出現行程之間的GIL爭搶,
借助這個multiprocessing,你可以輕松完成從單行程到并發執行的轉換,multiprocessing支持子行程、通信和共享資料、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件,
Multiprocessing產生的背景
除了應對Python的GIL以外,產生multiprocessing的另外一個原因時Windows作業系統與Linux/Unix系統的不一致,
Unix/Linux作業系統提供了一個fork系統呼叫,它非常特殊,普通的函式,呼叫一次,回傳一次,但是fork呼叫一次,回傳兩次,因為作業系統自動把當前行程(父行程)復制了一份(子行程),然后,分別在父行程和子行程內回傳,子行程永遠回傳0,而父行程回傳子行程的ID,這樣做的理由是,一個父行程可以fork出很多子行程,所以,父行程要記下每個子行程的ID,而子行程只需要呼叫getpid就可以拿到父行程的ID,
Python的os模塊封裝了常見的系統呼叫,其中就包括fork,可以在Python程式中輕松創建子行程:
import os
print('Process (%s) start...' % os.getpid)
\# Only works on Unix/Linux/Mac:
pid = os.fork
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid, os.getppid))
else:
print('I (%s) just created a child process (%s).' % (os.getpid, pid))
上述代碼在Linux、Unix和Mac上的執行結果為:
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
有了fork呼叫,一個行程在接到新任務時就可以復制出一個子行程來處理新任務,常見的Apache服務器就是由父行程監聽埠,每當有新的http請求時,就fork出子行程來處理新的http請求,
由于Windows沒有fork呼叫,上面的代碼在Windows上無法運行,由于Python是跨平臺的,自然也應該提供一個跨平臺的多行程支持,multiprocessing模塊就是跨平臺版本的多行程模塊,multiprocessing模塊封裝了fork呼叫,使我們不需要關注fork的細節,由于Windows沒有fork呼叫,因此,multiprocessing需要“模擬”出fork的效果,
multiprocessing常用組件及功能
創建管理行程模塊:
-
Process(用于創建行程)
-
Pool(用于創建管理行程池)
-
Queue(用于行程通信,資源共享)
-
Value,Array(用于行程通信,資源共享)
-
Pipe(用于管道通信)
-
Manager(用于資源共享)
同步子行程模塊:
-
Condition(條件變數)
-
Event(事件)
-
Lock(互斥鎖)
-
RLock(可重入的互斥鎖(同一個行程可以多次獲得它,同時不會造成阻塞)
-
Semaphore(信號量)
接下來就一起來學習下每個組件及功能的具體使用方法,
Process(用于創建行程)
multiprocessing模塊提供了一個Process類來代表一個行程物件,
在multiprocessing中,每一個行程都用一個Process類來表示,
構造方法:Process([group [, target [, name [, args [, kwargs]]]]])
-
group:分組,實際上不使用,值始終為None
-
target:表示呼叫物件,即子行程要執行的任務,你可以傳入方法名
-
name:為子行程設定名稱
-
args:要傳給target函式的位置引數,以元組方式進行傳入,
-
kwargs:要傳給target函式的字典引數,以字典方式進行傳入,
實體方法:
-
start:啟動行程,并呼叫該子行程中的p.run
-
run:行程啟動時運行的方法,正是它去呼叫target指定的函式,我們自定義類的類中一定要實作該方法
-
terminate:強制終止行程p,不會進行任何清理操作,如果p創建了子行程,該子行程就成了僵尸行程,使用該方法需要特別小心這種情況,如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖
-
is_alive:回傳行程是否在運行,如果p仍然運行,回傳True
-
join([timeout]):行程同步,主行程等待子行程完成后再執行后面的代碼,執行緒等待p終止(強調:是主執行緒處于等的狀態,而p是處于運行的狀態),timeout是可選的超時時間(超過這個時間,父執行緒不再等待子執行緒,繼續往下執行),需要強調的是,p.join只能join住start開啟的行程,而不能join住run開啟的行程
屬性介紹:
-
daemon:默認值為False,如果設為True,代表p為后臺運行的守護行程;當p的父行程終止時,p也隨之終止,并且設定為True后,p不能創建自己的新行程;必須在p.start之前設定
-
name:行程的名稱
-
pid:行程的pid
-
exitcode:行程在運行時為None、如果為–N,表示被信號N結束(了解即可)
-
authkey:行程的身份驗證鍵,默認是由os.urandom隨機生成的32字符的字串,這個鍵的用途是為涉及網路連接的底層行程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
使用示例:(注意:在windows中Process()必須放到if name == ‘ main ’:下)
from multiprocessing import Process
import os
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid))
if __name__=='__main__':
print('Parent process %s.' % os.getpid)
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start
p.join
print('Child process end.')
Pool(用于創建管理行程池)
Pool類用于需要執行的目標很多,而手動限制行程數量又太繁瑣時,如果目標少且不用控制行程數量則可以用Process類,Pool可以提供指定數量的行程,供用戶呼叫,當有新的請求提交到Pool中時,如果池還沒有滿,那么就會創建一個新的行程用來執行該請求;但如果池中的行程數已經達到規定最大值,那么該請求就會等待,直到池中有行程結束,就重用行程池中的行程,
構造方法:Pool([processes[, initializer[, initargs[, maxtasksperchild[,
context]]]]])
-
processes :要創建的行程數,如果省略,將默認使用cpu_count回傳的數量,
-
initializer:每個作業行程啟動時要執行的可呼叫物件,默認為None,如果initializer是None,那么每一個作業行程在開始的時候會呼叫initializer(*initargs),
-
initargs:是要傳給initializer的引陣列,
-
maxtasksperchild:作業行程退出之前可以完成的任務數,完成后用一個新的作業行程來替代原行程,來讓閑置的資源被釋放,maxtasksperchild默認是None,意味著只要Pool存在作業行程就會一直存活,
-
context: 用在制定作業行程啟動時的背景關系,一般使用Pool 或者一個context物件的Pool方法來創建一個池,兩種方法都適當的設定了context,
實體方法:
-
apply(func[, args[, kwargs]]):在一個池作業行程中執行func(args,*kwargs),然后回傳結果,需要強調的是:此操作并不會在所有池作業行程中并執行func函式,如果要通過不同引數并發地執行func函式,必須從不同執行緒呼叫p.apply函式或者使用p.apply_async,它是阻塞的,apply很少使用
-
apply_async(func[, arg[, kwds={}[, callback=None]]]):在一個池作業行程中執行func(args,*kwargs),然后回傳結果,此方法的結果是AsyncResult類的實體,callback是可呼叫物件,接收輸入引數,當func的結果變為可用時,將理解傳遞給callback,callback禁止執行任何阻塞操作,否則將接收其他異步操作中的結果,它是非阻塞,
-
map(func, iterable[, chunksize=None]):Pool類中的map方法,與內置的map函式用法行為基本一致,它會使行程阻塞直到回傳結果,注意,雖然第二個引數是一個迭代器,但在實際使用中,必須在整個佇列都就緒后,程式才會運行子行程,
-
map_async(func, iterable[, chunksize=None]):map_async與map的關系同apply與apply_async
-
imap:imap 與 map的區別是,map是當所有的行程都已經執行完了,并將結果回傳了,imap則是立即回傳一個iterable可迭代物件,
-
imap_unordered:不保證回傳的結果順序與行程添加的順序一致,
-
close:關閉行程池,防止進一步操作,如果所有操作持續掛起,它們將在作業行程終止前完成,
-
join:等待所有作業行程退出,此方法只能在close或teminate之后呼叫,讓其不再接受新的Process,
-
terminate:結束作業行程,不再處理未處理的任務,
方法apply_async和map_async的回傳值是AsyncResul的實體obj,實體具有以下方法:
-
get:回傳結果,如果有必要則等待結果到達,timeout是可選的,如果在指定時間內還沒有到達,將引發例外,如果遠程操作中引發了例外,它將在呼叫此方法時再次被引發,
-
ready:如果呼叫完成,回傳True
-
successful:如果呼叫完成且沒有引發例外,回傳True,如果在結果就緒之前呼叫此方法,引發例外
-
wait([timeout]):等待結果變為可用,
-
terminate:立即終止所有作業行程,同時不執行任何清理或結束任何掛起作業,如果p被垃圾回收,將自動呼叫此函式
使用示例:
\# -*- coding:utf-8 -*-
\# Pool+map
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
lists = range(100)
pool = Pool(8)
pool.map(test, lists)
pool.close
pool.join
\# -*- coding:utf-8 -*-
\# 異步行程池(非阻塞)
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
pool = Pool(8)
for i in range(100):
'''
For回圈中執行步驟:
(1)回圈遍歷,將100個子行程添加到行程池(相對父行程會阻塞)
(2)每次執行8個子行程,等一個子行程執行完后,立馬啟動新的子行程,(相對父行程不阻塞)
apply_async為異步行程池寫法,異步指的是啟動子行程的程序,與父行程本身的執行(print)是異步的,而For回圈中往行程池添加子行程的程序,與父行程本身的執行卻是同步的,
'''
pool.apply_async(test, args=(i,)) # 維持執行的行程總數為8,當一個行程執行完后啟動一個新行程.
print("test")
pool.close
pool.join
\# -*- coding:utf-8 -*-
\# 異步行程池(非阻塞)
from multiprocessing import Pool
def test(i):
print(i)
if __name__ == "__main__":
pool = Pool(8)
for i in range(100):
'''
實際測驗發現,for回圈內部執行步驟:
(1)遍歷100個可迭代物件,往行程池放一個子行程
(2)執行這個子行程,等子行程執行完畢,再往行程池放一個子行程,再執行,(同時只執行一個子行程)
for回圈執行完畢,再執行print函式,
'''
pool.apply(test, args=(i,)) # 維持執行的行程總數為8,當一個行程執行完后啟動一個新行程.
print("test")
pool.close
pool.join
Queue(用于行程通信,資源共享)
在使用多行程的程序中,最好不要使用共享資源,普通的全域變數是不能被子行程所共享的,只有通過Multiprocessing組件構造的資料結構可以被共享,
Queue
是用來創建行程間資源共享的佇列的類,使用Queue可以達到多行程間資料傳遞的功能(缺點:只適用Process類,不能在Pool行程池中使用),
構造方法:Queue([maxsize])
- maxsize是佇列中允許最大項數,省略則無大小限制,
實體方法:
-
put:用以插入資料到佇列,put方法還有兩個可選引數:blocked和timeout,如果blocked為True(默認值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該佇列有剩余的空間,如果超時,會拋出Queue.Full例外,如果blocked為False,但該Queue已滿,會立即拋出Queue.Full例外,
-
get:可以從佇列讀取并且洗掉一個元素,get方法有兩個可選引數:blocked和timeout,如果blocked為True(默認值),并且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty例外,如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即回傳該值,否則,如果佇列為空,則立即拋出Queue.Empty例外,若不希望在empty的時候拋出例外,令blocked為True或者引數全部置空即可,
-
get_nowait:同q.get(False)
-
put_nowait:同q.put(False)
-
empty:呼叫此方法時q為空則回傳True,該結果不可靠,比如在回傳True的程序中,如果佇列中又加入了專案,
-
full:呼叫此方法時q已滿則回傳True,該結果不可靠,比如在回傳True的程序中,如果佇列中的專案被取走,
-
qsize:回傳佇列中目前專案的正確數量,結果也不可靠,理由同q.empty和q.full一樣
使用示例:
from multiprocessing import Process, Queue
import os, time, random
def write(q):
print('Process to write: %s' % os.getpid)
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random)
def read(q):
print('Process to read: %s' % os.getpid)
while True:
value = https://www.cnblogs.com/lihanlin/p/q.get(True)
print('Get %s from queue.' % value)
if __name__ == "__main__":
q = Queue
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start
pr.start
pw.join # 等待pw結束
pr.terminate # pr行程里是死回圈,無法等待其結束,只能強行終止
JoinableQueue
就像是一個Queue物件,但佇列允許專案的使用者通知生成者專案已經被成功處理,通知行程是使用共享的信號和條件變數來實作的,
構造方法:JoinableQueue([maxsize])
- maxsize:佇列中允許最大項數,省略則無大小限制,
實體方法
JoinableQueue的實體p除了與Queue物件相同的方法之外還具有:
-
task_done:使用者使用此方法發出信號,表示q.get的回傳專案已經被處理,如果呼叫此方法的次數大于從佇列中洗掉專案的數量,將引發ValueError例外
-
join:生產者呼叫此方法進行阻塞,直到佇列中所有的專案均被處理,阻塞將持續到佇列中的每個專案均呼叫q.task_done方法為止
使用示例:
\# -*- coding:utf-8 -*-
from multiprocessing import Process, JoinableQueue
import time, random
def consumer(q):
while True:
res = q.get
print('消費者拿到了 %s' % res)
q.task_done
def producer(seq, q):
for item in seq:
time.sleep(random.randrange(1,2))
q.put(item)
print('生產者做好了 %s' % item)
q.join
if __name__ == "__main__":
q = JoinableQueue
seq = ('產品%s' % i for i in range(5))
p = Process(target=consumer, args=(q,))
p.daemon = True # 設定為守護行程,在主執行緒停止時p也停止,但是不用擔心,producer內呼叫q.join保證了consumer已經處理完佇列中的所有元素
p.start
producer(seq, q)
print('主執行緒')
Value,Array(用于行程通信,資源共享)
multiprocessing
中Value和Array的實作原理都是在共享記憶體中創建ctypes物件來達到共享資料的目的,兩者實作方法大同小異,只是選用不同的ctypes資料型別而已,
Value
構造方法:Value((typecode_or_type, args[, lock])
-
typecode_or_type:定義ctypes物件的型別,可以傳Type code或 C Type,具體對照表見下文,
-
args:傳遞給typecode_or_type建構式的引數
-
lock:默認為True,創建一個互斥鎖來限制對Value物件的訪問,如果傳入一個鎖,如Lock或RLock的實體,將用于同步,如果傳入False,Value的實體就不會被鎖保護,它將不是行程安全的,
typecode_or_type支持的型別:
| Type code | C Type | Python Type | Minimum size in bytes |
| --------- | ------------------ | ----------------- | --------------------- |
| `'b'` | signed char | int | 1 |
| `'B'` | unsigned char | int | 1 |
| `'u'` | Py_UNICODE | Unicode character | 2 |
| `'h'` | signed short | int | 2 |
| `'H'` | unsigned short | int | 2 |
| `'i'` | signed int | int | 2 |
| `'I'` | unsigned int | int | 2 |
| `'l'` | signed long | int | 4 |
| `'L'` | unsigned long | int | 4 |
| `'q'` | signed long long | int | 8 |
| `'Q'` | unsigned long long | int | 8 |
| `'f'` | float | float | 4 |
| `'d'` | double | float | 8 |
參考地址:https://docs.python.org/3/library/array.html
Array
構造方法:Array(typecode_or_type, size_or_initializer, **kwds[, lock])
-
typecode_or_type:同上
-
size_or_initializer:如果它是一個整數,那么它確定陣列的長度,并且陣列將被初始化為零,否則,size_or_initializer是用于初始化陣列的序列,其長度決定陣列的長度,
-
kwds:傳遞給typecode_or_type建構式的引數
-
lock:同上
使用示例:
import multiprocessing
def f(n, a):
n.value = https://www.cnblogs.com/lihanlin/p/3.14
a[0] = 5
if __name__ =='__main__':
num = multiprocessing.Value('d', 0.0)
arr = multiprocessing.Array('i', range(10))
p = multiprocessing.Process(target=f, args=(num, arr))
p.start
p.join
print(num.value)
print(arr[:])
注意:Value和Array只適用于Process類,
Pipe(用于管道通信)
多行程還有一種資料傳遞方式叫管道原理和
Queue相同,Pipe可以在行程之間創建一條管道,并回傳元組(conn1,conn2),其中conn1,conn2表示管道兩端的連接物件,強調一點:必須在產生Process物件之前產生管道,
構造方法:Pipe([duplex])
- dumplex:默認管道是全雙工的,如果將duplex射成False,conn1只能用于接收,conn2只能用于發送,
實體方法:
-
send(obj):通過連接發送物件,obj是與序列化兼容的任意物件
-
recv:接收conn2.send(obj)發送的物件,如果沒有訊息可接收,recv方法會一直阻塞,如果連接的另外一端已經關閉,那么recv方法會拋出EOFError,
-
close:關閉連接,如果conn1被垃圾回收,將自動呼叫此方法
-
fileno:回傳連接使用的整數檔案描述符
-
poll([timeout]):如果連接上的資料可用,回傳True,timeout指定等待的最長時限,如果省略此引數,方法將立即回傳結果,如果將timeout射成None,操作將無限期地等待資料到達,
-
recv_bytes([maxlength]):接收c.send_bytes方法發送的一條完整的位元組訊息,maxlength指定要接收的最大位元組數,如果進入的訊息,超過了這個最大值,將引發IOError例外,并且在連接上無法進行進一步讀取,如果連接的另外一端已經關閉,再也不存在任何資料,將引發EOFError例外,
-
send_bytes(buffer [, offset [, size]]):通過連接發送位元組資料緩沖區,buffer是支持緩沖區介面的任意物件,offset是緩沖區中的位元組偏移量,而size是要發送位元組數,結果資料以單條訊息的形式發出,然后呼叫c.recv_bytes函式進行接收
-
recv_bytes_into(buffer [, offset]):接收一條完整的位元組訊息,并把它保存在buffer物件中,該物件支持可寫入的緩沖區介面(即bytearray物件或類似的物件),offset指定緩沖區中放置訊息處的位元組位移,回傳值是收到的位元組數,如果訊息長度大于可用的緩沖區空間,將引發BufferTooShort例外,
使用示例:
from multiprocessing import Process, Pipe
import time
\# 子行程執行方法
def f(Subconn):
time.sleep(1)
Subconn.send("吃了嗎")
print("來自父親的問候:", Subconn.recv)
Subconn.close
if __name__ == "__main__":
parent_conn, child_conn = Pipe # 創建管道兩端
p = Process(target=f, args=(child_conn,)) # 創建子行程
p.start
print("來自兒子的問候:", parent_conn.recv)
parent_conn.send("嗯")
Manager(用于資源共享)
Manager回傳的manager物件控制了一個server行程,此行程包含的python物件可以被其他的行程通過proxies來訪問,從而達到多行程間資料通信且安全,Manager模塊常與Pool模塊一起使用,
Manager支持的型別有list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Queue,Value和Array,
管理器是獨立運行的子行程,其中存在真實的物件,并以服務器的形式運行,其他行程通過使用代理訪問共享物件,這些代理作為客戶端運行,Manager是BaseManager的子類,回傳一個啟動的SyncManager實體,可用于創建共享物件并回傳訪問這些共享物件的代理,
BaseManager ,創建管理器服務器的基類
構造方法:BaseManager([address[, authkey]])
-
address:(hostname,port),指定服務器的網址地址,默認為簡單分配一個空閑的埠
-
authkey:連接到服務器的客戶端的身份驗證,默認為current_process.authkey的值
實體方法:
-
start([initializer[, initargs]]):啟動一個單獨的子行程,并在該子行程中啟動管理器服務器
-
get_server:獲取服務器物件
-
connect:連接管理器物件
-
shutdown:關閉管理器物件,只能在呼叫了start方法之后呼叫
實體屬性:
- address:只讀屬性,管理器服務器正在使用的地址
SyncManager , 以下型別均不是行程安全的,需要加鎖..
實體方法:
-
Array(self,*args,**kwds)
-
BoundedSemaphore(self,*args,**kwds)
-
Condition(self,*args,**kwds)
-
Event(self,*args,**kwds)
-
JoinableQueue(self,*args,**kwds)
-
Lock(self,*args,**kwds)
-
Namespace(self,*args,**kwds)
-
Pool(self,*args,**kwds)
-
Queue(self,*args,**kwds)
-
RLock(self,*args,**kwds)
-
Semaphore(self,*args,**kwds)
-
Value(self,*args,**kwds)
-
dict(self,*args,**kwds)
-
list(self,*args,**kwds)
使用示例:
import multiprocessing
def f(x, arr, l, d, n):
x.value = https://www.cnblogs.com/lihanlin/p/3.14
arr[0] = 5
l.append('Hello')
d[1] = 2
n.a = 10
if __name__ == '__main__':
server = multiprocessing.Manager
x = server.Value('d', 0.0)
arr = server.Array('i', range(10))
l = server.list
d = server.dict
n = server.Namespace
proc = multiprocessing.Process(target=f, args=(x, arr, l, d, n))
proc.start
proc.join
print(x.value)
print(arr)
print(l)
print(d)
print(n)
同步子行程模塊
Lock(互斥鎖)
Lock鎖的作用是當多個行程需要訪問共享資源的時候,避免訪問的沖突,加鎖保證了多個行程修改同一塊資料時,同一時間只能有一個修改,即串行的修改,犧牲了速度但保證了資料安全,Lock包含兩種狀態——鎖定和非鎖定,以及兩個基本的方法,
構造方法:Lock
實體方法:
-
acquire([timeout]): 使執行緒進入同步阻塞狀態,嘗試獲得鎖定,
-
release: 釋放鎖,使用前執行緒必須已獲得鎖定,否則將拋出例外,
使用示例:
from multiprocessing import Process, Lock
def l(lock, num):
lock.acquire
print("Hello Num: %s" % (num))
lock.release
if __name__ == '__main__':
lock = Lock # 這個一定要定義為全域
for num in range(20):
Process(target=l, args=(lock, num)).start
RLock(可重入的互斥鎖(同一個行程可以多次獲得它,同時不會造成阻塞)
RLock(可重入鎖)是一個可以被同一個執行緒請求多次的同步指令,RLock使用了“擁有的執行緒”和“遞回等級”的概念,處于鎖定狀態時,RLock被某個執行緒擁有,擁有RLock的執行緒可以再次呼叫acquire,釋放鎖時需要呼叫release相同次數,可以認為RLock包含一個鎖定池和一個初始值為0的計數器,每次成功呼叫
acquire/release,計數器將+1/-1,為0時鎖處于未鎖定狀態,
構造方法:RLock
實體方法:
-
acquire([timeout]):同Lock
-
release: 同Lock
Semaphore(信號量)
信號量是一個更高級的鎖機制,信號量內部有一個計數器而不像鎖物件內部有鎖標識,而且只有當占用信號量的執行緒數超過信號量時執行緒才阻塞,這允許了多個執行緒可以同時訪問相同的代碼區,比如廁所有3個坑,那最多只允許3個人上廁所,后面的人只能等里面有人出來了才能再進去,如果指定信號量為3,那么來一個人獲得一把鎖,計數加1,當計數等于3時,后面的人均需要等待,一旦釋放,就有人可以獲得一把鎖,
構造方法:Semaphore([value])
- value:設定信號量,默認值為1
實體方法:
-
acquire([timeout]):同Lock
-
release: 同Lock
使用示例:
from multiprocessing import Process, Semaphore
import time, random
def go_wc(sem, user):
sem.acquire
print('%s 占到一個茅坑' % user)
time.sleep(random.randint(0, 3))
sem.release
print(user, 'OK')
if __name__ == '__main__':
sem = Semaphore(2)
p_l =
for i in range(5):
p = Process(target=go_wc, args=(sem, 'user%s' % i,))
p.start
p_l.append(p)
for i in p_l:
i.join
Condition(條件變數)
可以把Condition理解為一把高級的鎖,它提供了比Lock,
RLock更高級的功能,允許我們能夠控制復雜的執行緒同步問題,Condition在內部維護一個鎖物件(默認是RLock),可以在創建Condigtion物件的時候把瑣物件作為引數傳入,Condition也提供了acquire,
release方法,其含義與鎖的acquire,
release方法一致,其實它只是簡單的呼叫內部鎖物件的對應的方法而已,Condition還提供了其他的一些方法,
構造方法:Condition([lock/rlock])
- 可以傳遞一個Lock/RLock實體給構造方法,否則它將自己生成一個RLock實體,
實體方法:
-
acquire([timeout]):首先進行acquire,然后判斷一些條件,如果條件不滿足則wait
-
release:釋放 Lock
-
wait([timeout]): 呼叫這個方法將使執行緒進入Condition的等待池等待通知,并釋放鎖,使用前執行緒必須已獲得鎖定,否則將拋出例外,處于wait狀態的執行緒接到通知后會重新判斷條件,
-
notify: 呼叫這個方法將從等待池挑選一個執行緒并通知,收到通知的執行緒將自動呼叫acquire嘗試獲得鎖定(進入鎖定池);其他執行緒仍然在等待池中,呼叫這個方法不會釋放鎖定,使用前執行緒必須已獲得鎖定,否則將拋出例外,
-
notifyAll: 呼叫這個方法將通知等待池中所有的執行緒,這些執行緒都將進入鎖定池嘗試獲得鎖定,呼叫這個方法不會釋放鎖定,使用前執行緒必須已獲得鎖定,否則將拋出例外,
使用示例:
import multiprocessing
import time
def stage_1(cond):
"""perform first stage of work,
then notify stage_2 to continue
"""
name = multiprocessing.current_process.name
print('Starting', name)
with cond:
print('{} done and ready for stage 2'.format(name))
cond.notify_all
def stage_2(cond):
"""wait for the condition telling us stage_1 is done"""
name = multiprocessing.current_process.name
print('Starting', name)
with cond:
cond.wait
print('{} running'.format(name))
if __name__ == '__main__':
condition = multiprocessing.Condition
s1 = multiprocessing.Process(name='s1',
target=stage_1,
args=(condition,))
s2_clients = [
multiprocessing.Process(
name='stage_2[{}]'.format(i),
target=stage_2,
args=(condition,),
)
for i in range(1, 3)
]
for c in s2_clients:
c.start
time.sleep(1)
s1.start
s1.join
for c in s2_clients:
c.join
Event(事件)
Event內部包含了一個標志位,初始的時候為false,可以使用set來將其設定為true;或者使用clear將其從新設定為false;可以使用is_set來檢查標志位的狀態;另一個最重要的函式就是wait(timeout=None),用來阻塞當前執行緒,直到event的內部標志位被設定為true或者timeout超時,如果內部標志位為true則wait函式理解回傳,
使用示例:
import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print('wait_for_event: starting')
e.wait
print('wait_for_event: e.is_set->', e.is_set)
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set->', e.is_set)
if __name__ == '__main__':
e = multiprocessing.Event
w1 = multiprocessing.Process(
name='block',
target=wait_for_event,
args=(e,),
)
w1.start
w2 = multiprocessing.Process(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
w2.start
print('main: waiting before calling Event.set')
time.sleep(3)
e.set
print('main: event is set')
其他內容
multiprocessing.dummy 模塊與 multiprocessing 模塊的區別:dummy 模塊是多執行緒,而 multiprocessing
是多行程, api
都是通用的,所有可以很方便將代碼在多執行緒和多行程之間切換,multiprocessing.dummy通常在IO場景可以嘗試使用,比如使用如下方式引入執行緒池,
from multiprocessing.dummy import Pool as ThreadPool
multiprocessing.dummy與早期的threading,不同的點好像是在多多核CPU下,只系結了一個核心(具體未考證),
參考檔案:
-
https://docs.python.org/3/library/multiprocessing.html
-
https://www.rddoc.com/doc/Python/3.6.0/zh/library/multiprocessing/
Python并發之concurrent.futures
Python標準庫為我們提供了threading和multiprocessing模塊撰寫相應的多執行緒/多行程代碼,從Python3.2開始,標準庫為我們提供了concurrent.futures模塊,它提供了ThreadPoolExecutor和ProcessPoolExecutor兩個類,實作了對threading和multiprocessing的更高級的抽象,對撰寫執行緒池/行程池提供了直接的支持,concurrent.futures基礎模塊是executor和future,
Executor
Executor是一個抽象類,它不能被直接使用,它為具體的異步執行定義了一些基本的方法,ThreadPoolExecutor和ProcessPoolExecutor繼承了Executor,分別被用來創建執行緒池和行程池的代碼,
ThreadPoolExecutor物件
ThreadPoolExecutor類是Executor子類,使用執行緒池執行異步呼叫,
class concurrent.futures.ThreadPoolExecutor(max_workers)
使用max_workers數目的執行緒池執行異步呼叫,
ProcessPoolExecutor物件
ThreadPoolExecutor類是Executor子類,使用行程池執行異步呼叫,
class concurrent.futures.ProcessPoolExecutor(max_workers=None)
使用max_workers數目的行程池執行異步呼叫,如果max_workers為None則使用機器的處理器數目(如4核機器max_worker配置為None時,則使用4個行程進行異步并發),
submit方法
Executor中定義了submit方法,這個方法的作用是提交一個可執行的回呼task,并回傳一個future實體,future物件代表的就是給定的呼叫,
Executor.submit(fn, *args, **kwargs)
-
fn:需要異步執行的函式
-
*args, **kwargs:fn引數
使用示例:
from concurrent import futures
def test(num):
import time
return time.ctime, num
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test, 1)
print(future.result)
map方法
除了submit,Exectuor還為我們提供了map方法,這個方法回傳一個map(func,
*iterables)迭代器,迭代器中的回呼執行回傳的結果有序的,
Executor.map(func, *iterables, timeout=None)
-
func:需要異步執行的函式
-
*iterables:可迭代物件,如串列等,每一次func執行,都會從iterables中取引數,
-
timeout:設定每次異步操作的超時時間,timeout的值可以是int或float,如果操作超時,會回傳raisesTimeoutError;如果不指定timeout引數,則不設定超時間,
使用示例:
from concurrent import futures
def test(num):
import time
return time.ctime, num
data = https://www.cnblogs.com/lihanlin/p/[1, 2, 3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:
for future in executor.map(test, data):
print(future)
shutdown方法
釋放系統資源,在Executor.submit或 Executor.map等異步操作后呼叫,使用with陳述句可以避免顯式呼叫此方法,
Executor.shutdown(wait=True)
Future
Future可以理解為一個在未來完成的操作,這是異步編程的基礎,通常情況下,我們執行io操作,訪問url時(如下)在等待結果回傳之前會產生阻塞,cpu不能做其他事情,而Future的引入幫助我們在等待的這段時間可以完成其他的操作,
Future類封裝了可呼叫的異步執行,Future 實體通過 Executor.submit方法創建,
-
cancel:試圖取消呼叫,如果呼叫當前正在執行,并且不能被取消,那么該方法將回傳False,否則呼叫將被取消,方法將回傳True,
-
cancelled:如果成功取消呼叫,回傳True,
-
running:如果呼叫當前正在執行并且不能被取消,回傳True,
-
done:如果呼叫成功地取消或結束了,回傳True,
-
result(timeout=None):回傳呼叫回傳的值,如果呼叫還沒有完成,那么這個方法將等待超時秒,如果呼叫在超時秒內沒有完成,那么就會有一個Futures.TimeoutError將報出,timeout可以是一個整形或者浮點型數值,如果timeout不指定或者為None,等待時間無限,如果futures在完成之前被取消了,那么 CancelledError 將會報出,
-
exception(timeout=None):回傳呼叫拋出的例外,如果呼叫還未完成,該方法會等待timeout指定的時長,如果該時長后呼叫還未完成,就會報出超時錯誤futures.TimeoutError,timeout可以是一個整形或者浮點型數值,如果timeout不指定或者為None,等待時間無限,如果futures在完成之前被取消了,那么 CancelledError 將會報出,如果呼叫完成并且無例外報出,回傳None.
-
add_done_callback(fn):將可呼叫fn捆綁到future上,當Future被取消或者結束運行,fn作為future的唯一引數將會被呼叫,如果future已經運行完成或者取消,fn將會被立即呼叫,
-
wait(fs, timeout=None, return_when=ALL_COMPLETED)
-
等待fs提供的 Future 實體(possibly created by different Executor instances) 運行結束,回傳一個命名的2元集合,分表代表已完成的和未完成的
-
return_when 表明什么時候函式應該回傳,它的值必須是一下值之一:
-
FIRST_COMPLETED :函式在任何future結束或者取消的時候回傳,
-
FIRST_EXCEPTION :函式在任何future因為例外結束的時候回傳,如果沒有future報錯,效果等于
-
ALL_COMPLETED :函式在所有future結束后才會回傳,
-
-
-
as_completed(fs, timeout=None):引數是一個 Future 實體串列,回傳值是一個迭代器,在運行結束后產出 Future實體 ,
使用示例:
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
def return_after_5_secs(num):
sleep(randint(1, 5))
return "Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures =
for x in range(5):
futures.append(pool.submit(return_after_5_secs, x))
print(1)
for x in as_completed(futures):
print(x.result)
print(2)
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/466958.html
標籤:Python
