
雷猴啊,兄弟們!今天來展示一下如何用Python快速實作一個執行緒池,
一、序言
當有多個 IO 密集型的任務要被處理時,我們自然而然會想到多執行緒,但如果任務非常多,我們不可能每一個任務都啟動一個執行緒去處理,這個時候最好的辦法就是實作一個執行緒池,至于池子里面的執行緒數量可以根據業務場景進行設定,
比如我們實作一個有 10 個執行緒的執行緒池,這樣可以并發地處理 10 個任務,每個執行緒將任務執行完之后,便去執行下一個任務,通過使用執行緒池,可以避免因執行緒創建過多而導致資源耗盡,而且任務在執行時的生命周期也可以很好地把控,
而執行緒池的實作方式也很簡單,但這里我們不打算手動實作,因為 Python 提供了一個標準庫 concurrent.futures,已經內置了對執行緒池的支持,所以本篇文章,我們就來詳細介紹一下該模塊的用法,
二、正文
1、Future 物件
當我們往執行緒池里面提交一個函式時,會分配一個執行緒去執行,同時立即回傳一個 Future 物件,通過 Future 物件可以監控函式的執行狀態,有沒有出現例外,以及有沒有執行完畢等等,如果函式執行完畢,內部便會呼叫 future.set_result 將回傳值設定到 future 里面,然后外界便可呼叫 future.result 拿到回傳值,
除此之外 future 還可以系結回呼,一旦函式執行完畢,就會以 future 為引數,自動觸發回呼,所以 future 被稱為未來物件,可以把它理解為函式的一個容器,當我們往執行緒池提交一個函式時,會立即創建相應的 future 然后回傳,函式的執行狀態什么的,都通過 future 來查看,當然也可以給它系結一個回呼,在函式執行完畢時自動觸發,
那么下面我們就來看一下 future 的用法,文字的話理解起來可能有點枯燥,
將函式提交到執行緒池里面運行時,會立即回傳一個物件
這個物件就叫做 Future 物件,里面包含了函式的執行狀態等等
當然我們也可以手動創建一個Future物件,
from concurrent.futures import Future # 創建 Future 物件 future future = Future() # 給 future 系結回呼 # Python學習交流群: 279199867 def callback(f: Future): print("當set_result的時候會執行回呼,result:", f.result()) future.add_done_callback(callback) # 通過 add_done_callback 方法即可給 future 系結回呼 # 呼叫的時候會自動將 future 作為引數 # 如果需要多個引數,那么就使用偏函式 # 回呼函式什么時候執行呢? # 顯然是當 future 執行 set_result 的時候 # 如果 future 是向執行緒池提交函式時回傳的 # 那么當函式執行完畢時會自動執行 future.set_result(xx) # 并將自身的回傳設定進去 # 而這里的 future 是我們手動創建的,因此需要手動執行 future.set_result("嘿嘿")
當set_result的時候會執行回呼,result: 嘿嘿
需要注意的是:只能執行一次 set_result,但是可以多次呼叫 result 獲取結果,
from concurrent.futures import Future future = Future() future.set_result("哼哼") print(future.result()) # 哼哼 print(future.result()) # 哼哼 print(future.result()) # 哼哼
執行 future.result() 之前一定要先 set_result,否則會一直處于阻塞狀態,當然 result 方法還可以接收一個 timeout 引數,表示超時時間,如果在指定時間內沒有獲取到值就會拋出例外,
2、提交函式自動創建 Future 物件
我們上面是手動創建的 Future 物件,但作業中很少會手動創建,我們將函式提交到執行緒池里面運行的時候,會自動創建 Future 物件并回傳,這個 Future 物件里面就包含了函式的執行狀態,比如此時是處于暫停、運行中還是完成等等,并且函式在執行完畢之后,還會呼叫 future.set_result 將自身的回傳值設定進去,
from concurrent.futures import ThreadPoolExecutor import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" # 創建一個執行緒池 # 里面還可以指定 max_workers 引數,表示最多創建多少個執行緒 # 如果不指定,那么每提交一個函式,都會為其創建一個執行緒 executor = ThreadPoolExecutor() # 通過 submit 即可將函式提交到執行緒池,一旦提交,就會立刻運行 # 因為開啟了一個新的執行緒,主執行緒會繼續往下執行 # 至于 submit 的引數,按照函式名,對應引數提交即可 # 切記不可寫成task("古明地覺", 3),這樣就變成呼叫了 future = executor.submit(task, "螢屏前的你", 3) # 由于函式里面出現了 time.sleep,并且指定的 n 是 3 # 所以函式內部會休眠 3 秒,顯然此時處于運行狀態 print(future) """ <Future at 0x7fbf701726d0 state=running> """ # 我們說 future 相當于一個容器,包含了內部函式的執行狀態 # 函式是否正在運行中 print(future.running()) """ True """ # 函式是否執行完畢 print(future.done()) """ False """ # 主程式也 sleep 3 秒 time.sleep(3) # 顯然此時函式已經執行完畢了 # 并且列印結果還告訴我們回傳值型別是 str print(future) """ <Future at 0x7fbf701726d0 state=finished returned str> """ print(future.running()) """ False """ print(future.done()) """ True """ # 函式執行完畢時,會將回傳值設定在 future 里 # 也就是說一旦執行了 future.set_result # 那么就表示函式執行完畢了,然后外界可以呼叫 result 拿到回傳值 print(future.result()) """ 螢屏前的你 睡了 3 秒 """
這里再強調一下 future.result(),這一步是會阻塞的,舉個例子:
# 提交函式 future = executor.submit(task, "螢屏前的你", 3) start = time.perf_counter() future.result() end = time.perf_counter() print(end - start) # 3.00331525
可以看到,future.result() 這一步花了將近 3s,其實也不難理解,future.result() 是干嘛的?就是為了獲取函式的回傳值,可函式都還沒有執行完畢,它又從哪里獲取呢?所以只能先等待函式執行完畢,將回傳值通過 set_result 設定到 future 里面之后,外界才能呼叫 future.result() 獲取到值,
如果不想一直等待的話,那么在獲取值的時候可以傳入一個超時時間,
from concurrent.futures import ( ThreadPoolExecutor, TimeoutError ) import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" executor = ThreadPoolExecutor() future = executor.submit(task, "螢屏前的你", 3) try: # 1 秒之內獲取不到值,拋出 TimeoutError res = future.result(1) except TimeoutError: pass # 再 sleep 2 秒,顯然函式執行完畢了 time.sleep(2) # 獲取回傳值 print(future.result()) """ 螢屏前的你 睡了 3 秒 """
當然啦,這么做其實還不夠智能,因為我們不知道函式什么時候執行完畢,所以最好的辦法還是系結一個回呼,當函式執行完畢時,自動觸發回呼,
from concurrent.futures import ThreadPoolExecutor import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" def callback(f): print(f.result()) executor = ThreadPoolExecutor() future = executor.submit(task, "螢屏前的你", 3) # 系結回呼,3 秒之后自動呼叫 future.add_done_callback(callback) """ 螢屏前的你 睡了 3 秒 """
需要注意的是,在呼叫 submit 方法之后,提交到執行緒池的函式就已經開始執行了,而不管函式有沒有執行完畢,我們都可以給對應的 future 系結回呼,
如果函式完成之前添加回呼,那么會在函式完成后觸發回呼,如果函式完成之后添加回呼,由于函式已經完成,代表此時的 future 已經有值了,或者說已經 set_result 了,那么會立即觸發回呼,
3、future.set_result 到底干了什么事情
當函式執行完畢之后,會執行 set_result,那么這個方法到底干了什么事情呢?

我們看到 future 有兩個被保護的屬性,分別是 _result 和 _state,顯然 _result 用于保存函式的回傳值,而 future.result() 本質上也是回傳 _result 屬性的值,而 _state 屬性則用于表示函式的執行狀態,初始為 PENDING,執行中為 RUNING,執行完畢時被設定為 FINISHED,
呼叫 future.result() 的時候,會判斷 _state 的屬性,如果還在執行中就一直等待,當 _state 為 FINISHED 的時候,就回傳 _result 屬性的值,
4、提交多個函式
我們上面每次只提交了一個函式,但其實可以提交任意多個,我們來看一下:
from concurrent.futures import ThreadPoolExecutor import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" executor = ThreadPoolExecutor() futures = [executor.submit(task, "螢屏前的你", 3), executor.submit(task, "螢屏前的你", 4), executor.submit(task, "螢屏前的你", 1)] # 此時都處于running print(futures) """ [<Future at 0x1b5ff622550 state=running>, <Future at 0x1b5ff63ca60 state=running>, <Future at 0x1b5ff63cdf0 state=running>] """ time.sleep(3) # 主程式 sleep 3s 后 # futures[0]和futures[2]處于 finished # futures[1]仍處于 running print(futures) """ [<Future at 0x1b5ff622550 state=running>, <Future at 0x1b5ff63ca60 state=running>, <Future at 0x1b5ff63cdf0 state=finished returned str>] """
如果是多個函式,要如何拿到回傳值呢?很簡單,遍歷 futures 即可,
executor = ThreadPoolExecutor() futures = [executor.submit(task, "螢屏前的你", 5), executor.submit(task, "螢屏前的你", 2), executor.submit(task, "螢屏前的你", 4), executor.submit(task, "螢屏前的你", 3), executor.submit(task, "螢屏前的你", 6)] for future in futures: print(future.result()) """ 螢屏前的你 睡了 5 秒 螢屏前的你 睡了 2 秒 螢屏前的你 睡了 4 秒 螢屏前的你 睡了 3 秒 螢屏前的你 睡了 6 秒 """
這里面有一些值得說一說的地方,首先 futures 里面有 5 個 future,記做 future1, future2, future3, future4, future5,
當使用 for 回圈遍歷的時候,實際上會依次遍歷這 5 個 future,所以回傳值的順序就是我們添加的函式的順序,由于 future1 對應的函式休眠了 5s,那么必須等到 5s 后,future1 里面才會有值,
但這五個函式是并發執行的,future2, future3, future4 由于只休眠了 2s, 4s, 3s,所以肯定會先執行完畢,然后執行 set_result,將回傳值設定到對應的 future 里,
但 Python 的 for 回圈不可能在第一次迭代還沒有結束,就去執行第二次迭代,因為 futures 里面的幾個 future 的順序已經一開始就被定好了,只有當第一個 future.result() 執行完成之后,才會執行第二個 future.result(),以及第三個、第四個,
因此即便后面的函式已經執行完畢,但由于 for 回圈的順序,也只能等著,直到前面的 future.result() 執行完畢,所以當第一個 future.result() 結束時,后面三個 future.result() 會立刻輸出,因為它們內部的函式已經執行結束了,
而最后一個 future,由于內部函式 sleep 了 6 秒,因此要再等待 1 秒,才會列印 future.result(),
5、使用 map 來提交多個函式
使用 submit 提交函式會回傳一個 future,并且還可以給 future 系結一個回呼,但如果不關心回呼的話,那么還可以使用 map 進行提交,
executor = ThreadPoolExecutor() # map 內部也是使用了 submit results = executor.map(task, ["螢屏前的你"] * 3, [3, 1, 2]) # 并且回傳的是迭代器 print(results) """ <generator object ... at 0x0000022D78EFA970> """ # 此時遍歷得到的是不再是 future # 而是 future.result() for result in results: print(result) """ 螢屏前的你 睡了 3 秒 螢屏前的你 睡了 1 秒 螢屏前的你 睡了 2 秒 """
可以看到,當使用for回圈的時候,map 執行的邏輯和 submit 是一樣的,唯一的區別是,此時不需要再呼叫 result 了,因為回傳的就是函式的回傳值,
或者我們直接呼叫 list 也行,
executor = ThreadPoolExecutor() results = executor.map(task, ["螢屏前的你"] * 3, [3, 1, 2]) print(list(results)) """ ['螢屏前的你 睡了 3 秒', '螢屏前的你 睡了 1 秒', '螢屏前的你 睡了 2 秒'] """
results 是一個生成器,呼叫 list 的時候會將里面的值全部產出,由于 map 內部還是使用的 submit,然后通過 future.result() 拿到回傳值,而耗時最長的函式需要 3 秒,因此這一步會阻塞 3 秒,3 秒過后,會列印所有函式的回傳值,
6、按照順序等待執行
上面在獲取回傳值的時候,是按照函式的提交順序獲取的,如果我希望哪個函式先執行完畢,就先獲取哪個函式的回傳值,該怎么做呢?
from concurrent.futures import ( ThreadPoolExecutor, as_completed ) import time def task(name, n): time.sleep(n) return f"{name} 睡了 {n} 秒" executor = ThreadPoolExecutor() futures = [executor.submit(task, "螢屏前的你", 5), executor.submit(task, "螢屏前的你", 2), executor.submit(task, "螢屏前的你", 1), executor.submit(task, "螢屏前的你", 3), executor.submit(task, "螢屏前的你", 4)] for future in as_completed(futures): print(future.result()) """ 螢屏前的你 睡了 1 秒 螢屏前的你 睡了 2 秒 螢屏前的你 睡了 3 秒 螢屏前的你 睡了 4 秒 螢屏前的你 睡了 5 秒 """
此時誰先完成,誰先回傳,
7、取消一個函式的執行
我們通過 submit 可以將函式提交到執行緒池中執行,但如果我們想取消該怎么辦呢?
executor = ThreadPoolExecutor() future1 = executor.submit(task, "螢屏前的你", 1) future2 = executor.submit(task, "螢屏前的你", 2) future3 = executor.submit(task, "螢屏前的你", 3) # 取消函式的執行 # 會將 future 的 _state 屬性設定為 CANCELLED future3.cancel() # 查看是否被取消 print(future3.cancelled()) # False
問題來了,呼叫 cancelled 方法的時候,回傳的是False,這是為什么?很簡單,因為函式已經被提交到執行緒池里面了,函式已經運行了,而只有在還沒有運行時,取消才會成功,
可這不矛盾了嗎?函式一旦提交就會運行,只有不運行才會取消成功,這怎么辦?還記得執行緒池的一個叫做 max_workers 的引數嗎?用來控制執行緒池內的執行緒數量,我們可以將最大的執行緒數設定為2,那么當第三個函式進去的時候,就不會執行了,而是處于暫停狀態,
executor = ThreadPoolExecutor(max_workers=2) future1 = executor.submit(task, "螢屏前的你", 1) future2 = executor.submit(task, "螢屏前的你", 2) future3 = executor.submit(task, "螢屏前的你", 3) # 如果池子里可以創建空閑執行緒 # 那么函式一旦提交就會運行,狀態為 RUNNING print(future1._state) # RUNNING print(future2._state) # RUNNING # 但 future3 內部的函式還沒有運行 # 因為池子里無法創建新的空閑執行緒了,所以狀態為 PENDING print(future3._state) # PENDING # 取消函式的執行,前提是函式沒有運行 # 會將 future 的 _state 屬性設定為 CANCELLED future3.cancel() # 查看是否被取消 print(future3.cancelled()) # True print(future3._state) # CANCELLED
在啟動執行緒池的時候,肯定是需要設定容量的,不然處理幾千個函式要開啟幾千個執行緒嗎,另外當函式被取消了,就不可以再呼叫 future.result() 了,否則的話會拋出 CancelledError,
8、函式執行時出現例外
我們前面的邏輯都是函式正常執行的前提下,但天有不測風云,如果函式執行時出現例外了該怎么辦?
from concurrent.futures import ThreadPoolExecutor def task1(): 1 / 0 def task2(): pass executor = ThreadPoolExecutor(max_workers=2) future1 = executor.submit(task1) future2 = executor.submit(task2) print(future1) print(future2) """ <Future at 0x7fe3e00f9e50 state=finished raised ZeroDivisionError> <Future at 0x7fe3e00f9eb0 state=finished returned NoneType> """ # 結果顯示 task1 函式執行出現例外了 # 那么這個例外要怎么獲取呢? print(future1.exception()) print(future1.exception().__class__) """ division by zero <class 'ZeroDivisionError'> """ # 如果執行沒有出現例外,那么 exception 方法回傳 None print(future2.exception()) # None # 注意:如果函式執行出現例外了 # 那么呼叫 result 方法會將例外拋出來 future1.result() """ Traceback (most recent call last): File "...", line 4, in task1 1 / 0 ZeroDivisionError: division by zero """
出現例外時,呼叫 future.set_exception 將例外設定到 future 里面,而 future 有一個 _exception 屬性,專門保存設定的例外,當呼叫 future.exception() 時,也會直接回傳 _exception 屬性的值,
9、等待所有函式執行完畢
假設我們往執行緒池提交了很多個函式,如果希望提交的函式都執行完畢之后,主程式才能往下執行,該怎么辦呢?其實方案有很多:
第一種:
from concurrent.futures import ThreadPoolExecutor import time def task(n): time.sleep(n) return f"sleep {n}" executor = ThreadPoolExecutor() future1 = executor.submit(task, 5) future2 = executor.submit(task, 2) future3 = executor.submit(task, 4) # 這里是不會阻塞的 print("start") # 遍歷所有的 future,并呼叫其 result 方法 # 這樣就會等到所有的函式都執行完畢之后才會往下走 for future in [future1, future2, future3]: print(future.result()) print("end") """ start sleep 5 sleep 2 sleep 4 end """
第二種:
from concurrent.futures import ( ThreadPoolExecutor, wait ) import time def task(n): time.sleep(n) return f"sleep {n}" executor = ThreadPoolExecutor() future1 = executor.submit(task, 5) future2 = executor.submit(task, 2) future3 = executor.submit(task, 4) # return_when 有三個可選引數 # FIRST_COMPLETED:當任意一個任務完成或者取消 # FIRST_EXCEPTION:當任意一個任務出現例外 # 如果都沒出現例外等同于ALL_COMPLETED # ALL_COMPLETED:所有任務都完成,默認是這個值 fs = wait([future1, future2, future3], return_when="ALL_COMPLETED") # 此時回傳的fs是DoneAndNotDoneFutures型別的namedtuple # 里面有兩個值,一個是done,一個是not_done print(fs.done) """ {<Future at 0x1df1400 state=finished returned str>, <Future at 0x2f08e48 state=finished returned str>, <Future at 0x9f7bf60 state=finished returned str>} """ print(fs.not_done) """ set() """ for f in fs.done: print(f.result()) """ start sleep 5 sleep 2 sleep 4 end """
第三種:
# 使用背景關系管理 with ThreadPoolExecutor() as executor: future1 = executor.submit(task, 5) future2 = executor.submit(task, 2) future3 = executor.submit(task, 4) # 所有函式執行完畢(with陳述句結束)后才會往下執行
第四種:
executor = ThreadPoolExecutor() future1 = executor.submit(task, 5) future2 = executor.submit(task, 2) future3 = executor.submit(task, 4) # 所有函式執行結束后,才會往下執行 executor.shutdown()
三、小結
如果我們需要啟動多執行緒來執行函式的話,那么不妨使用執行緒池,每呼叫一個函式就從池子里面取出一個執行緒,函式執行完畢就將執行緒放回到池子里以便其它函式執行,如果池子里面空了,或者說無法創建新的空閑執行緒,那么接下來的函式就只能處于等待狀態了,
最后,concurrent.futures 不僅可以用于實作執行緒池,還可以用于實作行程池,兩者的 API 是一樣的:
from concurrent.futures import ProcessPoolExecutor import time def task(n): time.sleep(n) return f"sleep {n}" executor = ProcessPoolExecutor() # Windows 上需要加上這一行 if __name__ == '__main__': future1 = executor.submit(task, 5) future2 = executor.submit(task, 2) future3 = executor.submit(task, 4) executor.shutdown() print(future1.result()) print(future2.result()) print(future3.result()) """ sleep 5 sleep 2 sleep 4 """
執行緒池和行程池的 API 是一致的,但作業中很少會創建行程池,
兄弟們今天的分享就到這,債見!
文章不過癮?試試看視頻吧!
Python爬蟲入門到實戰全集100集教程:代碼總是學完就忘記?100個爬蟲實戰專案!讓你沉迷學習丨學以致用丨下一個Python大神就是你! Python tkinter 合集:全網最全python tkinter教程!包含所有知識點!輕松做出好看的tk程式!轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/500749.html
標籤:其他
