文章目錄
- 1. 前言
- 2. 執行緒
- 2.1 執行緒的最大意義在于并行
- 2.2 使用執行緒處理IO密集型任務
- 2.3 使用執行緒處理計算密集型任務
- 2.4 執行緒池
- 3. 行程
- 3.1 使用行程處理計算密集型任務
- 3.2 行程間通信示例
- 3.3 行程池
- 4. 協程
- 4.1 協程和執行緒的區別
- 4.2 協程演進史
- 4.3 協程應用示例
1. 前言
前些日子寫過幾篇關于執行緒和行程的文章,概要介紹了Python內置的執行緒模塊(threading)和行程模塊(multiprocessing)的使用方法,側重點是執行緒間同步和行程間同步,隨后,陸續收到了不少讀者的私信,咨詢行程、執行緒和協程的使用方法,行程、執行緒和協程分別適用于何種應用場景,以及混合使用行程、執行緒和協程的技巧,歸納起來,核心的問題大致有以下幾個:
- 使用執行緒是為了并行還是加速?
- 為什么我使用多執行緒之后,處理速度并沒有預期的快,甚至更慢了?
- 我應該選擇多行程處理還是多執行緒處理?
- 協程和執行緒有什么不同?
- 什么情況下使用協程?
在行程、執行緒和協程的使用上,初學者之所以感到困惑,最主要的原因是對任務的理解不到位,任務是由一個行程、或者執行緒、或者協程獨立完成的、相對獨立的一系列作業組合,通常,我們會把任務寫成一個函式,任務有3種型別:
- 計算密集型任務:任務包含大量計算,CPU占用率高
- IO密集型任務:任務包含頻繁的、持續的網路IO和磁盤IO
- 混合型任務:既有計算也有IO
也有觀點認為還有一種資料密集型任務,但我認為資料密集型任務一般出現在分布式系統或異構系統上,必定伴隨著計算密集和IO密集,因此,任然可以歸類到混合型任務,
下面,我們就以幾個實體來講解演示行程、執行緒和協程的適用場景、使用方法,以及如何優化我們的代碼,
2. 執行緒
2.1 執行緒的最大意義在于并行
通常,代碼是單執行緒順序執行的,這個執行緒就是主執行緒,僅有主執行緒的話,在同一時刻就只能做一件事情;如果有多件事情要做,那也只能做完一件再去做另一件,這有點類似于過去的說書藝人,情節人物復雜時,只能“花開兩朵,各表一枝”,下面這個題目,就是一個需要同時做兩件事情的例子,
請寫一段代碼,提示用戶從鍵盤輸入任意字符,然后等待用戶輸入,如果用戶在10秒鐘完成輸入(按回車鍵),則顯示輸入內容并結束程式;否則,不再等待用戶輸入,而是直接提示超時并結束程式,
我們知道,input()函式用于從鍵盤接收輸入,time.sleep()函式可以令程式停止運行指定的時長,不過,在等待鍵盤輸入的時候,sleep()函式就無法計時,而在休眠的時候,input()函式就無法接收鍵盤輸入,不借助于執行緒,我們無法同時做這兩件事情,如果使用執行緒技術的話,我們可以在主執行緒中接收鍵盤輸入,在子執行緒中啟動sleep()函式,一旦休眠結束,子執行緒就殺掉主執行緒,結束程式,
import os, time
import threading
def monitor(pid):
time.sleep(10)
print('\n超時退出!')
os._exit(0)
m = threading.Thread(target=monitor, args=(os.getpid(), ))
m.setDaemon(True)
m.start()
s = input('請輸入>>>')
print('接收到鍵盤輸入:%s'%s)
print('程式正常結束,')
2.2 使用執行緒處理IO密集型任務
假如從100個網站抓取資料,使用單執行緒的話,就需要逐一請求這100個站點并處理應答結果,所花費時間就是每個站點花費時間的總和,如果使用多個執行緒來實作的話,結果會怎樣呢?
import time
import requests
import threading
urls = ['https://www.baidu.com', 'https://cn.bing.com']
def get_html(n):
for i in range(n):
url = urls[i%2]
resp = requests.get(url)
#print(resp.ok, url)
t0 = time.time()
get_html(100) # 請求100次
t1 = time.time()
print('1個執行緒請求100次,耗時%0.3f秒鐘'%(t1-t0))
for n_thread in (2,5,10,20,50):
t0 = time.time()
ths = list()
for i in range(n_thread):
ths.append(threading.Thread(target=get_html, args=(100//n_thread,)))
ths[-1].setDaemon(True)
ths[-1].start()
for i in range(n_thread):
ths[i].join()
t1 = time.time()
print('%d個執行緒請求100次,耗時%0.3f秒鐘'%(n_thread, t1-t0))
上面的代碼用百度和必應兩個網站來模擬100個站點,運行結果如下所示,單執行緒處理大約需要30秒鐘,分別使用2、5、10個執行緒來處理的話,所耗時間與執行緒數量基本上保持反比關系,當執行緒數量繼續增加20個時,速度不再有顯著提升,若將執行緒數量增至50個,時間消耗反倒略有增加,
1個執行緒請求100次,耗時30.089秒鐘
2個執行緒請求100次,耗時15.087秒鐘
5個執行緒請求100次,耗時7.803秒鐘
10個執行緒請求100次,耗時4.112秒鐘
20個執行緒請求100次,耗時3.160秒鐘
50個執行緒請求100次,耗時3.564秒鐘
這個結果表明,對于IO密集型(本例僅測驗網路IO,沒有磁盤IO)的任務,適量的執行緒可以在一定程度上提高處理速度,隨著執行緒數量的增加,速度的提升不再明顯,
2.3 使用執行緒處理計算密集型任務
對于曝光不足或明暗變化劇烈的照片可以通過演算法來修正,下圖左是一張落日圖,因為太陽光線較強導致暗區細節無法辨識,通過低端增強演算法可以還原為下圖右的樣子,

低端增強演算法(也有人叫做伽馬矯正)其實很簡單:對于
[
0
,
255
]
[0,255]
[0,255]區間內的灰度值
v
0
v_0
v0?,指定矯正系數
γ
\gamma
γ,使用下面的公式,即可達到矯正后得灰度值
v
1
v_1
v1?,其中
γ
\gamma
γ一般選擇2或者3,上圖右就是
γ
\gamma
γ為3的效果,
v
1
=
255
×
(
v
0
255
)
1
γ
v_1 = 255\times(\frac{v_0}{255})^{\frac{1}{\gamma}}
v1?=255×(255v0??)γ1?
下面的代碼,對于一張解析度為4088x2752的照片實施低端增強演算法,這是一項計算密集型的任務,代碼中分別使用了廣播和矢量計算、單執行緒逐像素計算、多執行緒逐像素計算等三種方法,以驗證多執行緒對于計算密集型任務是否有提速效果,
import time
import cv2
import numpy as np
import threading
def gamma_adjust_np(im, gamma, out_file):
"""伽馬增強函式:使用廣播和矢量化計算"""
out = (np.power(im.astype(np.float32)/255, 1/gamma)*255).astype(np.uint8)
cv2.imwrite(out_file, out)
def gamma_adjust_py(im, gamma, out_file):
"""伽馬增強函式:使用回圈逐像素計算"""
rows, cols = im.shape
out = im.astype(np.float32)
for i in range(rows):
for j in range(cols):
out[i,j] = pow(out[i,j]/255, 1/3)*255
cv2.imwrite(out_file, out.astype(np.uint8))
im = cv2.imread('river.jpg', cv2.IMREAD_GRAYSCALE)
rows, cols = im.shape
print('照片解析度為%dx%d'%(cols, rows))
t0 = time.time()
gamma_adjust_np(im, 3, 'river_3.jpg')
t1 = time.time()
print('借助NumPy廣播特性,耗時%0.3f秒鐘'%(t1-t0))
t0 = time.time()
im_3 = gamma_adjust_py(im, 3, 'river_3_cycle.jpg')
t1 = time.time()
print('單執行緒逐像素處理,耗時%0.3f秒鐘'%(t1-t0))
t0 = time.time()
th_1 = threading.Thread(target=gamma_adjust_py, args=(im[:rows//2], 3, 'river_3_1.jpg'))
th_1.setDaemon(True)
th_1.start()
th_2 = threading.Thread(target=gamma_adjust_py, args=(im[rows//2:], 3, 'river_3_2.jpg'))
th_2.setDaemon(True)
th_2.start()
th_1.join()
th_2.join()
t1 = time.time()
print('啟用兩個執行緒逐像素處理,耗時%0.3f秒鐘'%(t1-t0))
運行結果如下:
照片解析度為4088x2752
借助NumPy廣播特性,耗時0.381秒鐘
單執行緒逐像素處理,耗時34.228秒鐘
啟用兩個執行緒逐像素處理,耗時36.087秒鐘
結果顯示,對一張千萬級像素的照片做低端增強,借助于NumPy的廣播和矢量化計算,耗時0.38秒鐘;單執行緒逐像素處理的話,耗時相當于NumPy的100倍;啟用多執行緒的話,速度不僅沒有加快,反倒是比單執行緒更慢,這說明,對于計算密集型的任務來說,多執行緒并不能提高處理速度,相反,因為要創建和管理執行緒,處理速度會更慢一些,
2.4 執行緒池
盡管多執行緒可以并行處理多個任務,但開啟執行緒不僅花費時間,也需要占用系統資源,因此,執行緒數量不是越多越快,而是要保持在合理的水平上,執行緒池可以讓我們用固定數量的執行緒完成比執行緒數量多得多的任務,下面的代碼演示了使用 Python 的標準模塊創建執行緒池,計算多個數值的平方,
>>> from concurrent.futures import ThreadPoolExecutor
>>> def pow2(x):
return x*x
>>> with ThreadPoolExecutor(max_workers=4) as pool: # 4個執行緒的執行緒池
result = pool.map(pow2, range(10)) # 使用4個執行緒分別計算0~9的平方
>>> list(result) # result是一個生成器,轉成串列才可以直觀地看到計算結果
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
如果每個執行緒的任務各不相同,使用不同的執行緒函式,任務結束后的結果處理也不一樣,同樣可以使用這個執行緒池,下面的代碼對多個數值中的奇數做平方運算,偶數做立方運算,執行緒任務結束后,列印各自的計算結果,
>>> from concurrent.futures import ThreadPoolExecutor
>>> def pow2(x):
return x*x
>>> def pow3(x):
return x*x*x
>>> def save_result(task): # 保存執行緒計算結果
global result
result.append(task.result())
>>> result = list()
>>> with ThreadPoolExecutor(max_workers=3) as pool:
for i in range(10):
if i%2: # 奇數做平方運算
task = pool.submit(pow2, i)
else: # 偶數做立方運算
task = pool.submit(pow3, i)
task.add_done_callback(save_result) # 為每個執行緒指定結束后的回呼函式
>>> result
[0, 1, 8, 9, 64, 25, 216, 49, 512, 81]
3. 行程
3.1 使用行程處理計算密集型任務
和執行緒相比,行程的最大優勢是可以充分例用計算資源——這一點不難理解,因為不同的行程可以運行的不同CPU的不同的核上,假如一臺計算機的CPU共有16核,則可以啟動16個或更多個行程來并行處理任務,對于上面的例子,我們改用行程來處理,效果會怎樣呢?
import time
import cv2
import numpy as np
import multiprocessing as mp
def gamma_adjust_py(im, gamma, out_file):
"""伽馬增強函式:使用回圈逐像素計算"""
rows, cols = im.shape
out = im.astype(np.float32)
for i in range(rows):
for j in range(cols):
out[i,j] = pow(out[i,j]/255, 1/3)*255
cv2.imwrite(out_file, out.astype(np.uint8))
if __name__ == '__main__':
mp.freeze_support()
im_fn = 'river.jpg'
im = cv2.imread(im_fn, cv2.IMREAD_GRAYSCALE)
rows, cols = im.shape
print('照片解析度為%dx%d'%(cols, rows))
t0 = time.time()
pro_1 = mp.Process(target=gamma_adjust_py, args=(im[:rows//2], 3, 'river_3_1.jpg'))
pro_1.daemon = True
pro_1.start()
pro_2 = mp.Process(target=gamma_adjust_py, args=(im[rows//2:], 3, 'river_3_2.jpg'))
pro_2.daemon = True
pro_2.start()
pro_1.join()
pro_2.join()
t1 = time.time()
print('啟用兩個行程程逐像素處理,耗時%0.3f秒鐘'%(t1-t0))
運行結果如下:
照片解析度為4088x2752
啟用兩個行程程逐像素處理,耗時17.786秒鐘
使用單個執行緒或兩個執行緒的時候,耗時大約30+秒,改用兩個行程后,耗時17.786秒,差不多快了一倍,如果使用4個行程(前提是運行代碼的計算機至少有4個CPU核)的話,速度還能提高一倍,有興趣的朋友可以試一下,這個測驗表明,對于計算密集型的任務,使用多行程并行處理是有效的提速手段,通常,行程數量選擇CPU核數的整倍數,
3.2 行程間通信示例
多行程并行彌補了多執行緒技術的不足,我們可以在每一顆 CPU 上,或多核 CPU 的每一個核上啟動一個行程,如果有必要,還可以在每個行程內再創建適量的執行緒,最大限度地使用計算資源來解決問題,不過,行程技術也有很大的局限性,因為行程不在同一塊記憶體區域內,所以和執行緒相比,行程間的資源共享、通信、同步等都要麻煩得多,受到的限制也更多,
我們知道,執行緒間通信可以使用佇列、互斥鎖、信號量、事件和條件等多種同步方式,同樣的,這些手段也可以應用在行程間,此外,multiprocessing 模塊還提供了管道和共享記憶體等行程間通信的手段,下面僅演示一個行程間使用佇列通信,更多的通信方式請參考由人民郵電出版社出版的拙著《Python高手修煉之道》,
這段代碼演示了典型的生產者—消費者模式,行程 A 負責隨機地往地上“撒錢”(寫佇列),行程 B 負責從地上“撿錢”(讀佇列),
import os, time, random
import multiprocessing as mp
def sub_process_A(q):
"""A行程函式:生成資料"""
while True:
time.sleep(5*random.random()) # 在0-5秒之間隨機延時
q.put(random.randint(10,100)) # 隨機生成[10,100]之間的整數
def sub_process_B(q):
"""B行程函式:使用資料"""
words = ['哈哈,', '天哪!', 'My God!', '咦,天上掉餡餅了?']
while True:
print('%s撿到了%d塊錢!'%(words[random.randint(0,3)], q.get()))
if __name__ == '__main__':
print('主行程(%s)開始,按回車鍵結束本程式'%os.getpid())
q = mp.Queue(10)
p_a = mp.Process(target=sub_process_A, args=(q,))
p_a.daemon = True
p_a.start()
p_b = mp.Process(target=sub_process_B, args=(q,))
p_b.daemon = True
p_b.start()
input()
3.3 行程池
使用多行程并行處理任務時,處理效率和行程數量并不總是成正比,當行程數量超過一定限度后,完成任務所需時間反而會延長,行程池提供了一個保持合理行程數量的方案,但合理行程數量需要根據硬體狀況及運行狀況來確定,通常設定為 CPU 的核數,
multiprocessing.Pool(n) 可創建 n 個行程的行程池供用戶呼叫,如果行程池任務不滿,則新的行程請求會被立即執行;如果行程池任務已滿,則新的請求將等待至有可用行程時才被執行,向行程池提交任務有以下兩種方式,
- apply_async(func[, args[, kwds[, callback]]]) :非阻塞式提交,即使行程池已滿,也會接
受新的任務,不會阻塞主行程,新任務將處于等待狀態, - apply(func[, args[, kwds]]) :阻塞式提交,若行程池已滿,則主行程阻塞,直至有空閑
行程可以使用,
下面的代碼演示了行程池的典型用法,讀者可自行嘗試阻塞式提交和非阻塞式提交兩種方法的差異,
import time
import multiprocessing as mp
def power(x, a=2):
"""行程函式:冪函式"""
time.sleep(1)
print('%d的%d次方等于%d'%(x, a, pow(x, a)))
def demo():
mpp = mp.Pool(processes=4)
for item in [2,3,4,5,6,7,8,9]:
mpp.apply_async(power, (item, )) # 非阻塞提交新任務
#mpp.apply(power, (item, )) # 阻塞提交新任務
mpp.close() # 關閉行程池,意味著不再接受新的任務
print('主行程走到這里,正在等待子行程結束')
mpp.join() # 等待所有子行程結束
print('程式結束')
if __name__ == '__main__':
demo()
4. 協程
4.1 協程和執行緒的區別
如前文所述,執行緒常用于多任務并行,對于可以切分的IO密集型任務,將切分的每一小塊任務分配給一個執行緒,可以顯著提高處理速度,而協程,無論有多少個,都被限定在一個執行緒內執行,因此,協程又被稱為微執行緒,
從宏觀上看,執行緒任務和協程任務都是并行的,從微觀上看,執行緒任務是分時切片輪流執行的,這種切換是系統自動完成的,無需程式員干預;而協程則是根據任務特點,在任務阻塞時將控制權交給其他協程,這個權力交接的時機和位置,由程式員指定,由此可以看出,參與協程管理的每一個任務,必須存在阻塞的可能,且阻塞條件會被其它任務破壞,從而得以在阻塞解除后繼續執行,
盡管協程難以駕馭,但是由于是在一個執行緒內運行,免除了執行緒或行程的切換開銷,因而協程的運行效率高,在特定場合下仍然被廣泛使用,
4.2 協程演進史
Py2時代,Python并不支持協程,僅可通過yield實作部分的協程功能,另外,還可以通過gevent等第三方庫實作協程,gevent最好玩的,莫過于monkey_patch(猴子補丁),曾經有一段時間,我特別喜歡使用它,
從Py3.4開始,Python內置asyncio標準庫,正式原生支持協程,asyncio的異步操作,需要在協程中通過yield from完成,的函式稱為協程函式則需要使用@asyncio.coroutine裝飾器,
不理解生成器的同學,很難駕馭yield這個反人類思維的東西,為了更貼近人類思維,Py3.5引入了新的語法async和await,可以讓協程的代碼稍微易懂一點點,如果此前沒有接觸過協程,我建議你只學習async和await的用法就足夠了,不需要去了解早期的yield和后來的yield from,本質上,async就是@asyncio.coroutine,await就是yield from,換個馬甲,看起來就順眼多了,
4.3 協程應用示例
作為基礎知識,在介紹協程應用示例前,先來介紹一下佇列,在行程、執行緒、協程模塊中,都有佇列(Queue)物件,佇列作為行程、執行緒、協程間最常用的通信方式,有一個不容忽視的特性:阻塞式讀和寫,當佇列為空時,讀會被阻塞,直到讀出資料;當佇列滿時,寫會被阻塞,直到佇列空出位置后寫入成功,因為佇列具有阻塞式讀寫的特點,正好可以在協程中利用阻塞切換其他協程任務,
我們來構思一個這樣的應用:某個富豪(rich)手拿一沓鈔票,隨機取出幾張,撒在地上(如果地上已經有鈔票的話,就等有人撿走了在撒);另有名為A、B、C的三個幸運兒(lucky),緊盯著撒錢的富豪,只要富豪把錢撒到地上,他們立刻就去撿起來,
如果用協程實作上述功能的話,我們可以用長度為1的協程佇列來存放富豪每一次拋灑的錢,一旦佇列中有錢(佇列滿),富豪就不能繼續拋灑了,拋灑被阻塞,協程控制權轉移,三個幸運兒中的某一個獲得控制權,就去讀佇列(撿錢),如果佇列中沒有錢(佇列空),撿錢被阻塞,協程控制權轉移,依靠佇列的阻塞和解除阻塞,一個富豪和三個幸運兒可以順利地分配完富豪手中的鈔票,為了讓這個程序可以慢到適合觀察,可以在富豪拋錢之前,再增加一個隨機延時,當然,這個延時不能使用time模塊的sleep()函式,而是使用協程模塊asyncio的sleep()函式,下面是完整的撒錢-撿錢代碼,
import asyncio, random
async def rich(q, total):
"""任性的富豪,隨機撒錢"""
while total > 0:
money = random.randint(10,100)
total -= money
await q.put(money) # 隨機生成[10,100]之間的整數
print('富豪瀟灑地拋了%d塊錢'%money)
await asyncio.sleep(3*random.random()) # 在0-3秒之間隨機延時
async def lucky(q, name):
"""隨時可以撿到錢的幸運兒"""
while True:
money = await q.get()
q.task_done()
print('%s撿到了%d塊錢!'%(name, money))
async def run():
q = asyncio.Queue(1)
producers = [asyncio.create_task(rich(q, 300))]
consumers = [asyncio.create_task(lucky(q, name)) for name in 'ABC']
await asyncio.gather(*producers,)
await q.join()
for c in consumers:
c.cancel()
if __name__ == '__main__':
asyncio.run(run())
運行結果如下:
富豪瀟灑地拋了42塊錢
A撿到了42塊錢!
富豪瀟灑地拋了97塊錢
A撿到了97塊錢!
富豪瀟灑地拋了100塊錢
B撿到了100塊錢!
富豪瀟灑地拋了35塊錢
C撿到了35塊錢!
富豪瀟灑地拋了17塊錢
A撿到了17塊錢!
富豪拋完了手中的錢,轉身離去
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/233095.html
標籤:AI
下一篇:ssh密鑰驗證
