py并發編程:GIL鎖、行程、執行緒、協程
- 1、行程、執行緒概念引入
- 1.1 行程的由來
- 1.2 執行緒的由來
- 1.2.1 創建行程
- 1.2.2 撤消行程
- 1.2.3 行程切換
- 1.3 ==執行緒與行程的關系與區別==
- 1.3.1 行程和執行緒的關系
- 1.3.2 行程和執行緒的區別
- 2、Python下的多執行緒實作
- 2.1 多執行緒的基本語法
- 2.1.1 創建多執行緒的兩種方式:
- 2.2 執行緒實體物件方法
- 2.2.1 **join方法和setDaemon方法**
- 2.2.2 **其他方法**
- 2.2.3 ==GIL(全域解釋器鎖)==
- 2.2.4 同步鎖【解決多執行緒不安全】
- 2.2.5 死鎖【Lock】與遞回鎖【RLock 解決死鎖】
- 2.2.6 信號量
- 2.2.6 佇列與生產者消費者模型
- 3、python下多行程的語法實作
- 3.1 多行程基本語法
- 3.2 行程池
- 3.3 行程通信
- 3.3.1 **行程佇列**
- 3.3.2 **管道通信**
- 3.3.3 **manager**
- 4、協程(coroutine)
- 4.1 行程、執行緒、協程的對比
- 4.2 yield實作的協程
- 4.3 greenlet實作的協程
- 4.4 gevent實作的協程
- 4.5 monkey補丁【猴子補丁】
1、行程、執行緒概念引入
1.1 行程的由來
- 行程就是一個程式在一個資料集上的一次動態執行程序,是系統進行資源分配和調度的基本單位,是作業系統結構的基礎,
- 行程一般由程式、資料集、行程控制塊(PCB)三部分組成,
- 我們撰寫的程式用來描述行程要完成哪些功能以及如何完成;
- 資料集則是程式在執行程序中所需要使用的資源;
- 行程控制塊用來記錄行程的外部特征,描述行程的執行變化程序,系統可以利用它來控制和管理行程,它是系統感知行程存在的唯一標志,
這里需要注意的是程式和行程的區別,一個程式是一個可執行的檔案,而一個行程則是一個執行中的程式實體,
舉一例說明行程:
想象一位有一手好廚藝的計算機科學家正在為他的女兒烘制生日蛋糕,他有做生日蛋糕的食譜,廚房里有所需的原料:面粉、雞蛋、糖、香草汁等,
在這個比喻中,做蛋糕的食譜就是程式(即用適當形式描述的演算法)計算機科學家就是處理器(cpu),而做蛋糕的各種原料就是輸入資料,行程就是廚師閱讀食譜、取來各種原料以及烘制蛋糕等一系列動作的總和,
現在假設計算機科學家的兒子哭著跑了進來,說他的頭被一只蜜蜂蟄了,計算機科學家就記錄下他照著食譜做到哪兒了(保存行程的當前狀態),然后拿出一本急救手冊,按照其中的指示處理蟄傷,
這里,我們看到處理機從一個行程(做蛋糕)切換到另一個高優先級的行程(實施醫療救治),每個行程擁有各自的程式(食譜和急救手冊),
當蜜蜂蟄傷處理完之后,這位計算機科學家又回來做蛋糕,從他離開時的那一步繼續做下去,
1.2 執行緒的由來
既能支持并發,又能降低開銷
假設,一個文本程式,需要接受鍵盤輸入,將內容顯示在螢屏上,還需要保存資訊到硬碟中,
若只有一個行程,勢必造成同一時間只能干一樣事的尷尬(當保存時,就不能通過鍵盤輸入內容),
若有多個行程,每個行程負責一個任務,行程A負責接收鍵盤輸入的任務,行程B負責將內容顯示在螢屏上的任務,行程C負責保存內容到硬碟中的任務,
這里行程A,B,C間的協作涉及到了行程通信問題,而且有共同都需要擁有的東西——-文本內容,不停的切換造成性能上極大的損失,
若有一種機制,可以使任務A,B,C共享資源,這樣背景關系切換所需要保存和恢復的內容就少了,同時又可以減少通信所帶來的性能損耗,那就好了,是的,這種機制就是執行緒,
如果說,在作業系統中引入行程的目的,是為了使多個程式能并發執行,以提高資源利用率和系統吞吐量,那么,在作業系統中再引入執行緒,則是為了減少程式在并發執行時所付出的時空開銷,使OS具有更好的并發性,
為了說明這一點,我們首先來回顧行程的兩個基本屬性:
- ① 行程是一個可擁有資源的獨立單位;
- ② 行程同時又是一個可獨立調度和分派的基本單位,
正是由于行程有這兩個基本屬性,才使之成為一個能獨立運行的基本單位,從而也就構成了行程并發執行的基礎,然而,為使程式能并發執行,系統還必須進行以下的一系列操作,
1.2.1 創建行程
系統在創建一個行程時,必須為它分配其所必需的、除處理機以外的所有資源,如記憶體空間、I/O設備,以及建立相應的PCB,
1.2.2 撤消行程
系統在撤消行程時,又必須先對其所占有的資源執行回收操作,然后再撤消PCB,
1.2.3 行程切換
對行程進行切換時,由于要保留當前行程的CPU環境和設定新選中行程的CPU環境,因而須花費不少的處理機時間,
換言之,由于行程是一個資源的擁有者,因而在創建、撤消和切換中,系統必須為之付出較大的時空開銷,正因如此,在系統中所設定的行程,其數目不宜過多,行程切換的頻率也不宜過高,這也就限制了并發程度的進一步提高,
如何能使多個程式更好地并發執行同時又盡量減少系統的開銷呢?若能將行程的上述兩個屬性分開,由作業系統分開處理,亦
即對于作為調度和分派的基本單位,不同時作為擁有資源的單位,以做到“輕裝上陣”;而對于擁有資源的基本單位,又不對之進行頻繁的切換,正是在這種思想的指導下,形成了執行緒的概念,

1.3 執行緒與行程的關系與區別
1.3.1 行程和執行緒的關系
- (1)一個執行緒只能屬于一個行程,而一個行程可以有多個執行緒,但至少有一個執行緒
- (2)資源分配給行程,同一行程的所有執行緒共享該行程的所有資源
- (3)執行緒是最小的執行單元,處理機分給執行緒,即真正在處理機上運行的是執行緒
- (4)執行緒在執行程序中,需要協作同步,不同行程的執行緒間要利用訊息通信的辦法實作同步,
執行緒是指行程內的一個執行單元,也是行程內的可調度物體.
1.3.2 行程和執行緒的區別
-
(1) 調度
在傳統的作業系統中,作為擁有資源的基本單位和獨立調度、分派的基本單位都是行程,而在引入執行緒的作業系統中,則把執行緒作為調度和分派的基本單位,而行程作為資源擁有的基本單位,把傳統行程的兩個屬性分開,使執行緒基本上不擁有資源,這樣執行緒便能輕裝前進,從而可顯著地提高系統的并發程度,在同一行程中,執行緒的切換不會引起行程的切換,但從一個行程中的執行緒切換到另一個行程中的執行緒時,將會引起行程的切換,
-
(2) 并發性
在引入執行緒的作業系統中,不僅行程之間可以并發執行,而且在一個行程中的多個執行緒之間亦可并發執行,使得作業系統具有更好的并發性,從而能更加有效地提高系統資源的利用率和系統的吞吐量,例如,在一個未引入執行緒的單CPU作業系統中,若僅設定一個檔案服務行程,當該行程由于某種原因而被阻塞時,便沒有其它的檔案服務行程來提供服務,在引入執行緒的作業系統中,則可以在一個檔案服務行程中設定多個服務執行緒,當第一個執行緒等待時,檔案服務行程中的第二個執行緒可以繼續運行,以提供檔案服務;當第二個執行緒阻塞時,則可由第三個繼續執行,提供服務,顯然,這樣的方法可以顯著地提高檔案服務的質量和系統的吞吐量,
-
(3) 擁有資源
不論是傳統的作業系統,還是引入了執行緒的作業系統,行程都可以擁有資源,是系統中擁有資源的一個基本單位,一般而言,執行緒自己不擁有系統資源(也有一點必不可少的資源),但它可以訪問其隸屬行程的資源,即一個行程的代碼段、資料段及所擁有的系統資源,如已打開的檔案、I/O設備等,可以供該行程中的所有執行緒所共享,
-
(4) 系統開銷
在創建或撤消行程時,系統都要為之創建和回收行程控制塊,分配或回收資源,如記憶體空間和I/O設備等,作業系統所付出的開銷明顯大于執行緒創建或撤消時的開銷,類似地,在行程切換時,涉及到當前行程CPU環境的保存及新被調度運行行程的CPU環境的設定,而執行緒的切換則僅需保存和設定少量暫存器內容,不涉及存盤器管理方面的操作,所以就切換代價而言,行程也是遠高于執行緒的,此外,由于一個行程中的多個執行緒具有相同的地址空間,在同步和通信的實作方面執行緒也比行程容易,在一些作業系統中,執行緒的切換、同步和通信都無須作業系統內核的干預,
2、Python下的多執行緒實作
2.1 多執行緒的基本語法
2.1.1 創建多執行緒的兩種方式:
- 方式1:Thread類直接創建
import time
import threading
def add(x, y): # 定義某個執行緒要運行的函式
print("%s + %s = %s" % (x, y, x + y))
time.sleep(3)
def mul(x, y):
print("%s * %s = %s" % (x, y, x * y))
time.sleep(5)
# 實體化執行緒物件
print(time.ctime())
t1 = threading.Thread(target=add, args=(1, 2))
t2 = threading.Thread(target=mul, args=(1, 4))
# 啟動執行緒
t1.start()
t2.start()
# 等待子執行緒結束
t1.join()
# t2.join()
print("ending...", time.ctime())
- 方式2:繼承Thread式創建
import time
import threading
class MyThread(threading.Thread):
def __init__(self,x,y):
# 注意 呼叫父類
super().__init__()
self.x = x
self.y = y
# 注意這里 為啥是 run
def run(self):
print("%s + %s = %s" % (self.x, self.y, self.x + self.y))
time.sleep(3)
t1=MyThread(56,44)
t2=MyThread(78,12)
t1.start()
t2.start()
print("ending")
2.2 執行緒實體物件方法
2.2.1 join方法和setDaemon方法
- join():在子執行緒完成運行之前,這個子執行緒的父執行緒將一直被阻塞,
- setDaemon(True):守候執行緒,必須設定在start之前,當非守候執行緒結束時,守候執行緒自動結束
A boolean value indicating whether this thread is a daemon thread (True) or not (False). This must be set before start() is called, otherwise RuntimeError is raised. Its initial value is inherited from the creating thread; the main thread is not a daemon thread and therefore all threads created in the main thread default to daemon = False.
The entire Python program exits when no alive non-daemon threads are left.
當daemon被設定為True時,如果主執行緒退出,那么子執行緒也將跟著退出,反之,子執行緒將繼續運行,直到正常退出,
在python腳本中,py主執行緒可以啟動其他子執行緒,當所有執行緒都運行結束時,行程結束,如果有一個執行緒沒有退出,py行程就不會退出,所以,必須保證所有執行緒都能及時結束,但是有一種執行緒的目的就是無限回圈,例如,一個定時觸發任務的執行緒,如果這個執行緒不結束,py行程就無法結束,問題是,由誰負責結束這個執行緒?然而這類執行緒經常沒有負責人來負責結束它們,但是,當其他執行緒結束時,py行程又必須要結束,怎么辦?答案是使用守護執行緒(Daemon Thread),
- 應用:比如監控行程,行程存活時上報監控資料,行程掛則守護同掛
import threading
from time import ctime,sleep
def listen_music(name):
print ("Begin listening to {name}. {time}".format(name=name,time=ctime()))
sleep(3)
print("end listening {time}".format(time=ctime()))
def write_blog(title):
print ("Begin recording the {title}. {time}".format(title=title,time=ctime()))
sleep(5)
print('end recording {time}'.format(time=ctime()))
if __name__ == '__main__':
# 執行緒串列
threads = []
t1 = threading.Thread(target=listen_music, args=('FILL ME',))
t2 = threading.Thread(target=write_blog, args=('python之路',))
threads.append(t1)
threads.append(t2)
# t2.setDaemon(True) # 將t2設定為守護執行緒
t1.setDaemon(True) # 將t1設定為守護執行緒
for t in threads:
#t.setDaemon(True) #注意:一定在start之前設定
t.start()
# t.join()
# for t in threads:
# t.join()
print ("all over %s" %ctime())
2.2.2 其他方法
Thread實體物件的方法
# isAlive(): 回傳執行緒是否活動的,
# getName(): 回傳執行緒名,
# setName(): 設定執行緒名,
threading模塊提供的一些方法:
# threading.currentThread(): 回傳當前的執行緒變數,
# threading.enumerate():
# 回傳一個包含正在運行的執行緒的list,正在運行指執行緒啟動后、結束前,不包括啟動前和終止后的執行緒,
# threading.activeCount(): 回傳正在運行的執行緒數量,與len(threading.enumerate())有相同的結果,
2.2.3 GIL(全域解釋器鎖)
GIL的影響:同一時刻同一行程只有一個執行緒可以被CPU執行!
解決解釋器級別的執行緒搶占資料安全的問題,但 用戶級別 的多執行緒還是不安全的!
'''
定義:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
native threads from executing Python bytecodes at once. This lock is necessary mainly
because CPython’s memory management is not thread-safe. (However, since the GIL
exists, other features have grown to depend on the guarantees that it enforces.)
'''
GIL 是最流程的 CPython 解釋器(平常稱為 Python)中的一個技術術語,中文譯為全域解釋器鎖,其本質上類似作業系統的 Mutex,GIL 的功能是:在 CPython 解釋器中執行的每一個 Python 執行緒,都會先鎖住自己,以阻止別的執行緒執行,
當然,CPython 不可能容忍一個執行緒一直獨占解釋器,它會輪流執行 Python 執行緒,這樣一來,用戶看到的就是“偽”并行,即 Python 執行緒在交替執行,來模擬真正并行的執行緒,為什么龜叔設計出 GIL 呢?其實,這和 CPython 的底層記憶體管理有關,CPython 使用參考計數來管理內容,所有 Python 腳本中創建的實體,都會配備一個參考計數,來記錄有多少個指標來指向它,當實體的參考計數的值為 0 時,會自動釋放其所占的記憶體,
>>> import sys
>>> a = []
>>> b = a
>>> sys.getrefcount(a)
3
可以看到,a 的參考計數值為 3,因為有 a、b 和作為引數傳遞的 getrefcount 都參考了一個空串列,
假設有兩個 Python 執行緒同時參考 a,那么雙方就都會嘗試操作該資料,很有可能造成參考計數的條件競爭,導致參考計數只增加 1(實際應增加 2),這造成的后果是,當第一個執行緒結束時,會把參考計數減少 1,此時可能已經達到釋放記憶體的條件(參考計數為 0),當第 2 個執行緒再次視圖訪問 a 時,就無法找到有效的記憶體了,所以,CPython 引進 GIL,可以最大程度上規避類似記憶體管理這樣復雜的競爭風險問題,
GIL作業示意圖:

上面這張圖,就是 GIL 在 Python 程式的作業示例,其中,Thread 1、2、3 輪流執行,每一個執行緒在開始執行時,都會鎖住 GIL,以阻止別的執行緒執行;同樣的,每一個執行緒執行完一段后,會釋放 GIL,以允許別的執行緒開始利用資源,為什么 Python 執行緒會去主動釋放 GIL 呢?畢竟,如果僅僅要求 Python 執行緒在開始執行時鎖住 GIL,且永遠不去釋放 GIL,那別的執行緒就都沒有運行的機會,其實,CPython 中還有另一個機制,叫做間隔式檢查(check_interval),意思是 CPython 解釋器會去輪詢檢查執行緒 GIL 的鎖住情況,每隔一段時間,Python 解釋器就會強制當前執行緒去釋放 GIL,這樣別的執行緒才能有執行的機會,注意,不同版本的 Python,其間隔式檢查的實作方式并不一樣,早期的 Python 是 100 個刻度(大致對應了 1000 個位元組碼);而 Python 3 以后,間隔時間大致為 15 毫秒,當然,我們不必細究具體多久會強制釋放 GIL,讀者只需要明白,CPython 解釋器會在一個“合理”的時間范圍內釋放 GIL 就可以了,
那么是不是python的多執行緒就完全沒用了呢? 當然不是!
在這里我們進行分類討論:
1、CPU密集型代碼(各種回圈處理、計數等等),在這種情況下,由于計算作業多,ticks計數很快就會達到閾值,然后觸發GIL的釋放與再競爭(多個執行緒來回切換當然是需要消耗資源的),所以python下的多執行緒對CPU密集型代碼并不友好,
2、IO密集型代碼(檔案處理、網路爬蟲等),多執行緒能夠有效提升效率(單執行緒下有IO操作會進行IO等待,造成不必要的時間浪費,而開啟多執行緒能在執行緒A等待時,自動切換到執行緒B,可以不浪費CPU的資源,從而能提升程式執行效率),所以python的多執行緒對IO密集型代碼比較友好,
而在python3.x中,GIL不使用ticks計數,改為使用計時器(執行時間達到閾值后,當前執行緒釋放GIL),這樣對CPU密集型程式更加友好,但依然沒有解決GIL導致的同一時間只能執行一個執行緒的問題,所以效率依然不盡如人意,
請注意:多核多執行緒比單核多執行緒更差,原因是單核下多執行緒,每次釋放GIL,喚醒的那個執行緒都能獲取到GIL鎖,所以能夠無縫執行,但多核下,CPU0釋放GIL后,其他CPU上的執行緒都會進行競爭,但GIL可能會馬上又被CPU0拿到,導致其他幾個CPU上被喚醒后的執行緒會醒著等待到切換時間后又進入待調度狀態,這樣會造成執行緒顛簸(thrashing),導致效率更低 ,
回到最開始的問題:經常我們會聽到老手說:“python下想要充分利用多核CPU,就用多行程”,原因是什么呢?
原因是:每個行程有各自獨立的GIL,互不干擾,這樣就可以真正意義上的并行執行,所以在python中,多行程的執行效率優于多執行緒(僅僅針對多核CPU而言),
所以在這里說結論:多核下,想做并行提升效率,比較通用的方法是使用多行程,能夠有效提高執行效率
GIL的影響:同一時刻同一行程只有一個執行緒可以被CPU執行!
2.2.4 同步鎖【解決多執行緒不安全】
解決用戶級別的多執行緒資料不安全問題!!!將資料不安全的地方加一把鎖,資料處理加鎖的地方進行串行處理,處理完成后釋放,
Python GIL不能絕對保證執行緒安全,有了 GIL,并不意味著 Python 程式員就不用去考慮執行緒安全了,因為即便 GIL 僅允許一個 Python 執行緒執行,但別忘了 Python 還有 check interval 這樣的搶占機制,
import time
import threading
def addNum():
global num #在每個執行緒中都獲取這個全域變數
#num-=1
temp=num
time.sleep(0.1) # 模擬耗時0.1s 已經執行緒已經進行搶占資源,資料則不安全!在0.1s中足以遍歷100個執行緒進行賦值,快到將所有的num都賦值成100
num =temp-1 # 對此公共變數進行-1操作
num = 100 #設定一個共享變數
thread_list = []
for i in range(100):
t = threading.Thread(target=addNum)
t.start()
thread_list.append(t)
for t in thread_list: #等待所有執行緒執行完畢
t.join()
print('Result: ', num)
鎖通常被用來實作對共享資源的同步訪問,為每一個共享資源創建一個Lock物件,當你需要訪問該資源時,呼叫acquire方法來獲取鎖物件(如果其它執行緒已經獲得了該鎖,則當前執行緒需等待其被釋放),待資源訪問完后,再呼叫release方法釋放鎖:
import threading
R=threading.Lock()
R.acquire()
'''
對公共資料的操作
'''
R.release()
import time
import threading
lock = threading.Lock()
def jianNum():
global num #在每個執行緒中都獲取這個全域變數
# num-=1
# num = num -1
# 鎖住執行緒
lock.acquire()
temp = num
time.sleep(0.001)
num = temp -1
# 放開鎖
lock.release()
num = 100 #設定一個共享變數
thread_list = []
for i in range(100):
t = threading.Thread(target=jianNum)
t.start()
thread_list.append(t)
for t in thread_list: #等待所有執行緒執行完畢
t.join()
print('Result: ', num)
2.2.5 死鎖【Lock】與遞回鎖【RLock 解決死鎖】
所謂死鎖: 是指兩個或兩個以上的行程或執行緒在執行程序中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去,此時稱系統處于死鎖狀態或系統產生了死鎖,這些永遠在互相等待的行程稱為死鎖行程,
import threading
import time
mutexA = threading.Lock()
mutexB = threading.Lock()
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
self.func1()
self.func2()
def func1(self):
mutexA.acquire() # 如果鎖被占用,則阻塞在這里,等待鎖的釋放
print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
mutexB.acquire()
print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
mutexB.release()
mutexA.release()
def func2(self):
mutexB.acquire()
print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
time.sleep(0.2)
mutexA.acquire()
print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
mutexA.release()
mutexB.release()
if __name__ == "__main__":
print("start---------------------------%s"%time.time())
for i in range(0, 10):
my_thread = MyThread()
my_thread.start()
在Python中為了支持在同一執行緒中多次請求同一資源,python提供了可重入鎖RLock,這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require,直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源,上面的例子如果使用RLock代替Lock,則不會發生死鎖:
mutex = threading.RLock()
import threading
import time
# mutexA = threading.Lock()
# mutexB = threading.Lock()
mutex = threading.RLock()
class MyThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
def run(self):
self.func1()
self.func2()
def func1(self):
mutex.acquire() # 如果鎖被占用,則阻塞在這里,等待鎖的釋放
print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
mutex.acquire()
print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
mutex.release()
mutex.release()
def func2(self):
mutex.acquire()
print ("I am %s , get res: %s---%s" %(self.name, "ResB",time.time()))
mutex.acquire()
print ("I am %s , get res: %s---%s" %(self.name, "ResA",time.time()))
mutex.release()
mutex.release()
if __name__ == "__main__":
print("start---------------------------%s"%time.time())
for i in range(0, 10):
my_thread = MyThread()
my_thread.start()
2.2.6 信號量
什么是信號量?
- 互斥鎖同時只允許一個執行緒更改資料,而信號量Semaphore是同時允許一定數量的執行緒更改資料 ,
import threading
import time
semaphore = threading.Semaphore(3)
def func():
if semaphore.acquire():
print (threading.currentThread().getName() + ' get semaphore')
time.sleep(2)
semaphore.release()
for i in range(20):
t1 = threading.Thread(target=func)
t1.start()
2.2.6 佇列與生產者消費者模型
Python的Queue模塊中提供了同步的、執行緒安全的佇列類,包括FIFO(先入先出)佇列Queue,LIFO(后入先出)佇列LifoQueue,和優先級佇列PriorityQueue,這些佇列都實作了鎖原語,能夠在多執行緒中直接使用,可以使用佇列來實作執行緒間的同步,
常用方法:
Queue.qsize() 回傳佇列的大小
Queue.empty() 如果佇列為空,回傳True,反之False
Queue.full() 如果佇列滿了,回傳True,反之False,Queue.full 與 maxsize 大小對應
Queue.get([block[, timeout]])獲取佇列,timeout等待時間
Queue.get_nowait() 相當于Queue.get(False),非阻塞方法
Queue.put(item) 寫入佇列,timeout等待時間
Queue.task_done() 在完成一項作業之后,Queue.task_done()函式向任務已經完成的佇列發送一個信號,每個get()呼叫得到一個任務,接下來task_done()呼叫告訴佇列該任務已經處理完畢,
Queue.join() 實際上意味著等到佇列為空,再執行別的操作
tase_done()的作用:只有消費者把佇列所有的資料處理完畢,queue.join()才會停止阻塞
生產者消費者模式并不是GOF提出的眾多模式之一,但它依然是開發同學編程程序中最常用的一種模式

生產者模塊兒負責產生資料,放入緩沖區,這些資料由另一個消費者模塊兒來從緩沖區取出并進行消費者相應的處理,該模式的優點在于:
- 解耦:緩沖區的存在可以讓生產者和消費者降低互相之間的依賴性,一個模塊兒代碼變化,不會直接影響另一個模塊兒
- 并發:由于緩沖區,生產者和消費者不是直接呼叫,而是兩個獨立的并發主體,生產者產生資料之后把它放入緩沖區,就繼續生產資料,不依賴消費者的處理速度
在執行緒世界里,生產者就是生產資料的執行緒,消費者就是消費資料的執行緒,在多執行緒開發當中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產資料,同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者,為了解決這個問題于是引入了生產者和消費者模式,
生產者消費者模式是通過一個容器來解決生產者和消費者的強耦合問題,生產者和消費者彼此之間不直接通訊,而通過阻塞佇列來進行通訊,所以生產者生產完資料之后不用等待消費者處理,直接扔給阻塞佇列,消費者不找生產者要資料,而是直接從阻塞佇列里取,阻塞佇列就相當于一個緩沖區,平衡了生產者和消費者的處理能力,
這就像我們郵寄信一樣!
import time
from queue import Queue
from threading import Thread
q = Queue()
def produce():
for i in range(10):
q.put(i)
print('生產:',i)
print('生產任務完畢!')
q.join()
print(produce.__name__,'函式結束!')
def consumer():
for i in range(10):
print('消費:', q.get())
q.task_done()
# if i == 4:
# print('休息1s...')
# time.sleep(1)#sleep作用:查看生產者是否阻塞
print(consumer.__name__,'函式結束!')
pro = Thread(target=produce)
con = Thread(target=consumer)
pro.start()
con.start()
con.join()
print('消費者任務完成')
pro.join()
print('生產者任務完成')
3、python下多行程的語法實作
3.1 多行程基本語法
from multiprocessing import Process
import os
import time
def info(name):
print("name:",name)
print('parent process:', os.getppid())
print('process id:', os.getpid())
print("------------------")
time.sleep(1)
if __name__ == '__main__':
info('main process')
p1 = Process(target=info, args=('yuan',))
p2 = Process(target=info, args=('alex',))
p1.start()
p2.start()
p1.join()
p2.join()
print("ending")
python解釋器是一份本地化的程式,本質上是可執行的檔案,是靜態的概念,程式運行起來成為行程,是動態的概念,python程式是跑在解釋器上的,嚴格來講,是跑在解釋器實體上的,一個解釋器實體其實就是解釋器跑起來的行程,二者合起來稱之為一個Python行程,各個解釋器實體之間是相互隔離的
3.2 行程池
行程池:定義了一個池子,在里面放上固定數量的行程,有需求來了,就拿這個池中的一個行程來處理任務,等到處理完畢,行程并不關閉,而是將行程再放回行程池中繼續等待任務,如果有許多任務需要執行,池中的行程數量不夠,任務就要等待之前的行程執行任務完畢歸來,拿到空閑行程才能繼續執行,
import multiprocessing
import os
import time
from concurrent.futures import ProcessPoolExecutor
def run_case(*text):
print('入參:{0},當前行程號:{1},父行程號:{2}'.format(text, os.getpid(),os.getppid()))
time.sleep(1)
return 123
if __name__ == '__main__':
pool_num = ProcessPoolExecutor(4)
print('主行程:{0}'.format(os.getpid()))
print('子行程開始咯')
for i in range(20):
ret = pool_num.submit(run_case, i)
# print(ret.result())
pool_num.shutdown()
print('子行程結束了')
print('主行程結束了')
總結:行程池的優勢在于不會立即銷毀行程,不會重新啟動新的行程,效率更高,
3.3 行程通信
3.3.1 行程佇列
from multiprocessing import Process, Queue
import time
def f(q,n):
time.sleep(2)
q.put(n*n+1)
if __name__ == '__main__':
q = Queue()
for i in range(3):
p = Process(target=f, args=(q,i))
p.start()
print(q.get())
print(q.get())
print(q.get())
3.3.2 管道通信
from multiprocessing import Process, Pipe
def f(conn):
conn.send([12, {"name": "yuan"}, 'hello'])
response = conn.recv()
print("子行程接收:", response)
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
data = parent_conn.recv()
print("主行程接收:",data) # prints "[42, None, 'hello']"
parent_conn.send("兒子你好!")
p.join()
3.3.3 manager
Queue和pipe只是實作了資料互動,并沒實作資料共享,即一個行程去更改另一個行程的資料,
from multiprocessing import Process, Manager
def f(d, l,n):
d[n] = n
d["name"] ="alvin"
l.append(n)
#print("l",l)
if __name__ == '__main__':
with Manager() as manager:
d = manager.dict()
l = manager.list(range(5))
p_list = []
for i in range(5):
p = Process(target=f, args=(d,l,i))
p.start()
p_list.append(p)
for res in p_list:
res.join()
print(d)
print(l)
4、協程(coroutine)
協程(coroutine)可以理解為是執行緒的優化,又稱之為輕量級行程,它是一種比執行緒更節省資源、效率更高的系統調度機制,協程具有這樣的特點,即在同時開啟的多個任務中,一次只執行一個,只有當前任務遭遇阻塞,才會切換到下一個任務繼續執行,這種機制可以實作多任務的同步,又能夠成功地避免執行緒中使用鎖的復雜性,簡化了開發,早先的協程是使用生成器關鍵字 yield 來實作的,代碼特別復雜難懂,自從Python3.5之后,確定了協程的語法,使得創建協程的方式得到改善,Python 中,能夠實作協程的模塊有多個,如 asyncio、tornado 或 gevent,
協程不是被作業系統內核所管理的,而是完全由程式所控制,也就是在用戶態執行,這樣帶來的好處是性能大幅度的提升,因為不會像執行緒切換那樣消耗資源,
協程不是行程也不是執行緒,而是一個特殊的函式,這個函式可以在某個地方掛起,并且可以重新在掛起處外繼續運行,所以說,協程與行程、執行緒相比并不是一個維度的概念,
一個行程可以包含多個執行緒,一個執行緒也可以包含多個協程,簡單來說,一個執行緒內可以由多個這樣的特殊函式在運行,但是有一點必須明確的是,一個執行緒的多個協程的運行是串行的,如果是多核CPU,多個行程或一個行程內的多個執行緒是可以并行運行的,但是一個執行緒內協程卻絕對是串行的,無論CPU有多少個核,畢竟協程雖然是一個特殊的函式,但仍然是一個函式,一個執行緒內可以運行多個函式,但這些函式都是串行運行的,當一個協程運行時,其它協程必須掛起,
4.1 行程、執行緒、協程的對比
- 協程既不是行程也不是執行緒,協程僅僅是一個特殊的函式,協程它與行程和執行緒不是一個維度的,
- 一個行程可以包含多個執行緒,一個執行緒可以包含多個協程,
- 一個執行緒內的多個協程雖然可以切換,但是多個協程是串行執行的,只能在一個執行緒內運行,沒法利用CPU多核能力,
- 協程與行程一樣,切換是存在背景關系切換問題的,
背景關系切換
- 行程的切換者是作業系統,切換時機是根據作業系統自己的切換策略,用戶是無感知的,行程的切換內容包括頁全域目錄、內核堆疊、硬體背景關系,切換內容保存在記憶體中,行程切換程序是由“用戶態到內核態到用戶態”的方式,切換效率低,
- 執行緒的切換者是作業系統,切換時機是根據作業系統自己的切換策略,用戶無感知,執行緒的切換內容包括內核堆疊和硬體背景關系,執行緒切換內容保存在內核堆疊中,執行緒切換程序是由“用戶態到內核態到用戶態”, 切換效率中等,
- 協程的切換者是用戶(編程者或應用程式),切換時機是用戶自己的程式所決定的,協程的切換內容是硬體背景關系,切換記憶體保存在用戶自己的變數(用戶堆疊或堆)中,協程的切換程序只有用戶態,即沒有陷入內核態,因此切換效率高,
4.2 yield實作的協程
import time
"""
傳統的生產者-消費者模型是一個執行緒寫訊息,一個執行緒取訊息,通過鎖機制控制佇列和等待,但一不小心就可能死鎖,
如果改用協程,生產者生產訊息后,直接通過yield跳轉到消費者開始執行,待消費者執行完畢后,切換回生產者繼續生產,效率極高,
"""
# 注意到consumer函式是一個generator(生成器):
# 任何包含yield關鍵字的函式都會自動成為生成器(generator)物件
def consumer():
r = ''
while True:
# 3、consumer通過yield拿到訊息,處理,又通過yield把結果傳回;
# yield指令具有return關鍵字的作用,然后函式的堆疊會自動凍結(freeze)在這一行,
# 當函式呼叫者的下一次利用next()或generator.send()或for-in來再次呼叫該函式時,
# 就會從yield代碼的下一行開始,繼續執行,再回傳下一次迭代結果,通過這種方式,迭代器可以實作無限序列和惰性求值,
n = yield r
if not n:
return
print('[CONSUMER] ←← Consuming %s...' % n)
time.sleep(1)
r = '200 OK'
def produce(c):
# 1、首先呼叫c.next()啟動生成器
next(c)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] →→ Producing %s...' % n)
# 2、然后,一旦生產了東西,通過c.send(n)切換到consumer執行;
cr = c.send(n)
# 4、produce拿到consumer處理的結果,繼續生產下一條訊息;
print('[PRODUCER] Consumer return: %s' % cr)
# 5、produce決定不生產了,通過c.close()關閉consumer,整個程序結束,
c.close()
if __name__ == '__main__':
# 6、整個流程無鎖,由一個執行緒執行,produce和consumer協作完成任務,所以稱為“協程”,而非執行緒的搶占式多任務,
c = consumer()
produce(c)
4.3 greenlet實作的協程
from greenlet import greenlet
import time
def task_1():
print("task_1...")
while True:
print("--This is task 1!--")
g2.switch() # 切換到g2中運行
time.sleep(0.5)
def task_2():
print("task_2...")
while True:
print("--This is task 2!--")
g1.switch() # 切換到g1中運行
time.sleep(0.5)
if __name__ == "__main__":
g1 = greenlet(task_1) # 定義greenlet物件
g2 = greenlet(task_2)
g1.switch() # 切換到g1中運行
greenlet已經實作了協程,但是這個需要人工切換,很麻煩,python中還有一個比greenlet更強大的并且能夠自動切換任務的模塊gevent,其原理是當一個greenlet遇到IO(比如網路、檔案操作等)操作時,比如訪問網路,就自動切換到其他的greenlet,等到IO操作完成,再在適當的時候切換回來繼續執行,由于IO操作非常耗時,經常使程式處于等待狀態,有了gevent為我們自動切換協程 ,就保證總有greenlet在運行,而不是等待IO,
4.4 gevent實作的協程
import gevent
def task_1(num):
for i in range(num):
print(gevent.getcurrent(), i)
gevent.sleep(1) # 模擬一個耗時操作,注意不能使用time模塊的sleep
if __name__ == "__main__":
g1 = gevent.spawn(task_1, 5) # 創建協程
g2 = gevent.spawn(task_1, 5)
g3 = gevent.spawn(task_1, 5)
g1.join() # 等待協程運行完畢
g2.join()
g3.join()
上述結果,在不添加gevent.sleep(1)時,是3個greenlet依次運行,而不是交替運行的,在添加gevent.sleep(1)后,程式運行到這后,交出控制權,執行下一個協程,等待這個耗時操作完成后再重新回到上一個協程,運行結果時交替運行,
4.5 monkey補丁【猴子補丁】
monkey補丁 不必強制使用gevent里面的sleep、sorcket等等了
from gevent import monkey
monkey.patch_all()
import gevent
from urllib import request
import time
def f(url):
print('GET: %s' % url)
resp = request.urlopen(url)
data = resp.read()
print('%d bytes received from %s.' % (len(data), url))
start=time.time()
gevent.joinall([
gevent.spawn(f, 'https://itk.org/'),
gevent.spawn(f, 'https://zhihu.com/'),
])
# f('https://itk.org/')
# f('https://zhihu.com/')
print(time.time()-start)
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/278982.html
標籤:python
上一篇:【python零基礎爬蟲入門】,爬取百度圖片,小孩子也能學會
下一篇:Python入門程式
