本文中的內容來自我的筆記,撰寫程序中參考了胡俊峰老師《Python程式設計與資料科學導論》課程的內容,
目錄- 并發處理:多行程和多執行緒
- 前置
- 多行程和多執行緒的比較
- 多行程的機制和代碼實作
- 基本用法
- 行程復制
- 行程池
- 行程池的基本用法
- 行程池中的行程復制
- 在行程池中利用子行程的回傳值
- 行程間通訊
- Pipe
- Queue
- 多執行緒
- 多執行緒的變數機制
- 并發處理:協程
- 用簡單的生成器實作協程
- 用回呼函式(callback)將普通函式變為協程
- 用async/await實作協程
- 基礎使用
- wait_for()
- 實作生產者-消費者協程
并發處理:多行程和多執行緒
前置
概念:
- 并發:一段時間內同時推進多個任務,但不一定要在一個時刻同時進行多個任務,
- 并行:一段時間內同時推進多個任務,且在一個時刻要同時進行多個任務,
- 并行是并發的子集;單核CPU交替執行多個任務是并發但不是并行;多核CPU同時執行多個任務既是并發也是并行,
何時需要并發?
- 需要同時處理多個任務
- 經常需要等待資源
- 多個子程序互相協作
電腦執行任務的機制:
- 作業系統內核 負責任務(i.e. 行程/執行緒)的掛起與喚醒,和資源的分配(比如一個程式能訪問哪些記憶體地址)
- 行程是資源分配的最小單元,不同行程之間不共享資源(比如可訪問的記憶體區域);行程可再分為執行緒,執行緒之間共享大部分資源,
- 正是因為 是否共享資源 上的區別,執行緒間的切換(即掛起和喚醒)比行程間的切換更快,
- 執行緒是調度執行的最小單元,這意味著作業系統內核會負責將多個執行緒并發執行,
多行程和多執行緒的比較
多行程:
- 將任務拆分為多個行程來進行
- 由內核決定是并行還是僅僅并發,
- 行程間不共享記憶體
- 優點:一個行程掛了不影響別的
- 缺點:切換行程耗時大、行程間通信不便
多執行緒:
- 將任務拆分為一個行程內的多個執行緒來進行
- 由內核決定是并行還是僅僅并發,
- 在CPython解釋器中有全域解釋器鎖,導致多執行緒只能并發而不能并行(多行程可以并行),
- 行程間共享記憶體
- 優點:切換耗時低、通信方便
- 缺點:在并行時對全域變數要使用鎖機制
- 鎖機制:一個執行緒使用一個全域變數時,先等待其(被其他執行緒)解鎖,再將其上鎖,再使用,用后再解鎖,
- 如果不使用鎖的話:100個
a+=1的執行緒執行完成后(初始a=0),a可能<100,
- 如果不使用鎖的話:100個
- 資料科學中可以為了提高效率而不使用鎖機制,但同時要容忍由此帶來的差錯,
- 鎖機制:一個執行緒使用一個全域變數時,先等待其(被其他執行緒)解鎖,再將其上鎖,再使用,用后再解鎖,
多行程的機制和代碼實作
以下介紹的函式中,幾乎每一個有阻塞可能的,都會有一個可選的timeout引數,這件事將不再重提,
基本用法
from multiprocessing import Process
import os
import time
def task(duration, base_time):
pid = os.getpid()
print(f'son process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}')
time.sleep(duration)
print(f'son process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')
if __name__ == '__main__':
pid = os.getpid()
base_time = time.perf_counter()
print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')
p1 = Process(target=task, args=(1,base_time)) # a process that executes task(1,base_time); currently not running
p2 = Process(target=task, args=(2,base_time)) # a process that executes task(2,base_time); currently not running
p1.start()
p2.start()
print(p1.is_alive(), p2.is_alive()) # whether they are running
print('main process can proceed while son processes are running')
p1.join() # wait until p1 finishes executing (the main process will pause on this command in the meantime) and kill it after it finishes
p2.join() # wait until p2 finishes executing (the main process will pause on this command in the meantime) and kill it after it finishes
print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')
main process id 3316 starts at 0.000001s
True True
main process can proceed while son processes are running
son process id 15640 starts at 0.002056s with parameter 1
son process id 10716 starts at 0.003030s with parameter 2
son process id 15640 ends at 1.002352s
son process id 10716 ends at 2.017861s
main process id 3316 ends at 2.114324s
如果沒有p1.join()和p2.join(),主行程會在很短的時間內完成執行,而此時子行程仍在運行,輸出結果如下:
main process id 11564 starts at 0.000001s
True True
main process can proceed while son processes are running
main process id 11564 ends at 0.011759s
son process id 13500 starts at 0.004392s with parameter 1
son process id 11624 starts at 0.003182s with parameter 2
son process id 13500 ends at 1.009420s
son process id 11624 ends at 2.021817s
為何0.004秒的事件在0.003秒的事件之前被輸出?
- 因為print陳述句的耗時未被計算在內
- 因為perf_counter()在Windows下有bug,它給出的時間在不同行程之間不完全同步,
需要注意,一個子行程結束運行后仍然處于存活狀態;只有被join()之后才會正式死亡(即被從系統中除名),
關于if __name__ == '__main__'::
- 在Python中,可以通過import來獲取其他檔案中的代碼;在檔案B的開頭(其他位置同理)import檔案A,相當于把A在B的開頭復制一份,
- 如果在復制A的內容時,我們希望A中的一部分代碼在執行時被忽略(比如說測驗陳述句),就可以給A中的這些代碼加上
if __name__ == '__main__':- 對于從別處import來的代碼,系統變數
__name__在這段代碼中會等于來源檔案的名字(或模塊名,這你不用在意);對于存在于本檔案中的代碼,__name__會等于__main__,
- 對于從別處import來的代碼,系統變數
- 由于某些原因,在Windows下,如果一個檔案的代碼中使用了多行程,則這個檔案中會隱式地import自己(一次或多次);將所有零級縮進的代碼放在
if __name__ == '__main__':中,可以避免產生重復執行的問題(注意到如果不這樣做的話,import來的副本中還會再次import自身,導致無限遞回import并報錯),- 暫時可以認為,采取這一措施后就可完全消除“隱式import自身”所產生的效應,
行程復制
from multiprocessing import Process
import os
pid = os.getpid()
def task():
global pid
print(pid)
pid = 1
print(pid)
if __name__ == '__main__':
p = Process(target=task)
p.start()
p.join()
print(pid)
在Windows下的輸出:
4836
1
2944
在Linux下的輸出:
511
1
511
前兩個數都是由子行程輸出,第三個數由父行程輸出,
- 注意到
pid在子行程中被賦為1后,在父行程中并不是1,這說明,子行程的target函式中對運行環境的修改,不影響父行程的運行環境,事實上,反之也是成立的(父不影響子),也就是說,一旦子行程的運行環境完成創建之后,父行程的運行環境與子行程的運行環境之間就完全獨立,- 由于這個獨立性,子行程的運算結果也無法直接反饋給父行程,稍后會介紹兩種解決方式:1. 行程間通信 2. 利用 行程池
apply方法的回傳值,
- 由于這個獨立性,子行程的運算結果也無法直接反饋給父行程,稍后會介紹兩種解決方式:1. 行程間通信 2. 利用 行程池
- 注意到一三行的輸出在Windows下不同,而在Linux下相同,這說明,子行程中全域變數
pid的取值,在Linux下是直接復制父行程中pid的取值而得到的,在Windows下是通過重新運行pid = os.getpid()而得到的,更一般地,有以下這兩個事實:- 在Windows中,
Process(target)創建出的子行程是一張白紙(即運行環境空空如也);當呼叫start()的時候,它會先通過import陳述句來將父行程的整個代碼檔案完整執行一遍(從而創建出一個新的運行環境),然后再開始運行target函式,所以,if __name__ == '__main__':包起來的代碼,就只會被父行程執行;而未被包起來的零級縮進代碼,則也會被每個子行程(在自己的運行環境里)各自執行一遍,- 這就是之前提到的“隱式import自身”的機制,
- 在Linux中,
Process(target)創建出的子行程,會全盤復制父行程的運行環境,而不會自己重新創建,復制出來的子行程運行環境,與父行程的運行環境完全獨立,
- 在Windows中,
Linux下的行程復制方式稱為fork,Windows下的行程復制方式稱為spawn,關于這些,詳見 https://stackoverflow.com/questions/64095876/multiprocessing-fork-vs-spawn ,
from multiprocessing import Process
import os
def task():
pass
if __name__ == '__main__':
p = Process(target=task)
print('son process created')
p.start()
print('son process starts')
p.join()
print('son process ends')
print('gu?')
在Windows下的輸出
son process created
son process starts
gu?
son process ends
gu?
由此可見,Windows下子行程(在初始化時)在執行父行程的代碼檔案時,父行程中son_process.start()以后的內容(比如print('gu?'))也會被執行,
行程池
如果我們有很多的任務要同時進行,為每個任務各開一個行程既低效(創建和銷毀行程開銷大、無法全部并行、內核難以調度)又可能不被內核允許,
解決方案:使用行程池,池中放著幾個行程(一般不會比CPU的核數多很多),有新任務時找一個空閑行程分配給它,沒有空閑行程則等待,缺點是沒有空閑行程時需要等待,因此不能算是完全的并發,
行程池的基本用法
from multiprocessing import Pool
import os, time
def task(duration, base_time, task_name):
pid = os.getpid()
print(f'son process id {pid} starts working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}')
time.sleep(duration)
print(f'son process id {pid} ends working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s')
if __name__ == '__main__':
pid = os.getpid()
base_time = time.perf_counter()
print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')
pool = Pool(3) # a pool containing 3 subprocesses
print('start assigning tasks')
for i in range(4):
pool.apply_async(task, args=(1, base_time, "TaskNo."+str(i+1))) # assign task to some process in pool and start running
# if all son processes are busy, wait until one is free and then start
pool.close() # no longer accepting new tasks, but already applied ones (including those that are waiting) keeps running.
print('all tasks assigned; wait for son processes to finish')
pool.join() # wait until all tasks are done, and then the pool is dead. `join()` can be called only if `close()` has already been called
print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')
print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')
輸出:(Win和Linux下輸出相似)
main process id 5236 starts at 0.000002s
start assigning tasks
all tasks assigned; wait for son processes to finish
son process id 8724 starts working on TaskNo.1 at 0.030557s with parameter 1
son process id 14584 starts working on TaskNo.2 at 0.037581s with parameter 1
son process id 10028 starts working on TaskNo.3 at 0.041210s with parameter 1
son process id 14584 ends working on TaskNo.2 at 1.042662s
son process id 8724 ends working on TaskNo.1 at 1.040211s
son process id 14584 starts working on TaskNo.4 at 1.044109s with parameter 1
son process id 10028 ends working on TaskNo.3 at 1.054017s
son process id 14584 ends working on TaskNo.4 at 2.055515s
all tasks finished at 2.214534s
main process id 5236 ends at 2.214884s
當使用apply_async(“異步呼叫”)添加任務時,主行程在子行程執行任務期間會繼續運行;如果用apply(“同步呼叫”)添加任務,則主行程會暫停(“阻塞”)直到該任務完成,一般使用apply_async而不是apply,
行程池中的行程復制
from multiprocessing import Pool
import os, time
all_tasks_on_this_son_process = []
def task(duration, base_time, task_name):
global all_tasks_on_this_son_process
pid = os.getpid()
print(f'son process id {pid} starts working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}, this process already executed',all_tasks_on_this_son_process)
time.sleep(duration)
print(f'son process id {pid} ends working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s')
all_tasks_on_this_son_process += [task_name]
if __name__ == '__main__':
pid = os.getpid()
base_time = time.perf_counter()
print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')
pool = Pool(3)
print('start assigning tasks')
for i in range(4):
pool.apply_async(task, args=(1, base_time, "TaskNo."+str(i+1)))
pool.close()
print('all tasks assigned; wait for son processes to finish')
pool.join()
print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')
print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')
print('gu?')
Windows下輸出:
main process id 6116 starts at 0.000001s
start assigning tasks
all tasks assigned; wait for son processes to finish
gu?
gu?
gu?
son process id 16028 starts working on TaskNo.1 at 0.037577s with parameter 1, this process already executed []
son process id 11696 starts working on TaskNo.2 at 0.041393s with parameter 1, this process already executed []
son process id 5400 starts working on TaskNo.3 at 0.038409s with parameter 1, this process already executed []
son process id 11696 ends working on TaskNo.2 at 1.041521s
son process id 16028 ends working on TaskNo.1 at 1.038722s
son process id 11696 starts working on TaskNo.4 at 1.042543s with parameter 1, this process already executed ['TaskNo.2']
son process id 5400 ends working on TaskNo.3 at 1.052573s
son process id 11696 ends working on TaskNo.4 at 2.053483s
all tasks finished at 2.167447s
main process id 6116 ends at 2.167904s
gu?
在Windows下,池中的每個執行緒會在(且僅在)它分配到的的第一個任務將要開始執行時,運行一遍父行程的代碼以構建運行環境,一個行程在前一個任務中對運行環境的改變,會原樣體現在下一個任務的運行環境里,(即接受新任務的時候會直接繼續使用上一個任務遺留下的運行環境)
Linux下輸出:
main process id 691 starts at 0.000001s
all tasks assigned; wait for son processes to finish
son process id 692 starts working on TaskNo.1 at 0.104757s with parameter 1, this process already executed []
son process id 693 starts working on TaskNo.2 at 0.104879s with parameter 1, this process already executed []
son process id 694 starts working on TaskNo.3 at 0.105440s with parameter 1, this process already executed []
son process id 692 ends working on TaskNo.1 at 1.106427s
son process id 693 ends working on TaskNo.2 at 1.106426s
son process id 694 ends working on TaskNo.3 at 1.107157s
son process id 692 starts working on TaskNo.4 at 1.107560s with parameter 1, this process already executed ['TaskNo.1']
son process id 692 ends working on TaskNo.4 at 2.110033s
all tasks finished at 2.117158s
main process id 691 ends at 2.117452s
gu?
在Linux下,池中的每個執行緒會在(且僅在)它的第一個任務將要開始執行時,從父行程將運行環境完整復制一遍,一個行程在前一個任務中對運行環境的改變,會原樣體現在下一個任務的運行環境里,(即接受新任務的時候會直接繼續使用上一個任務遺留下的運行環境)
from multiprocessing import Pool
import os, time
all_tasks_on_this_son_process = []
def init(init_name):
global all_tasks_on_this_son_process
all_tasks_on_this_son_process += [init_name]
def task(duration, base_time, task_name):
global all_tasks_on_this_son_process
pid = os.getpid()
print(f'son process id {pid} starts working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}, this process already executed',all_tasks_on_this_son_process)
time.sleep(duration)
print(f'son process id {pid} ends working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s')
all_tasks_on_this_son_process += [task_name]
if __name__ == '__main__':
pid = os.getpid()
base_time = time.perf_counter()
print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')
pool = Pool(3, initializer=init, initargs=('init',)) # look here
print('start assigning tasks')
for i in range(4):
pool.apply_async(task, args=(1, base_time, "TaskNo."+str(i+1)))
pool.close()
print('all tasks assigned; wait for son processes to finish')
pool.join()
print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')
print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')
輸出(Win下與Linux下相似):
main process id 18416 starts at 0.000004s
start assigning tasks
all tasks assigned; wait for son processes to finish
son process id 10052 starts working on TaskNo.1 at 0.053483s with parameter 1, this process already executed ['init']
son process id 17548 starts working on TaskNo.2 at 0.040412s with parameter 1, this process already executed ['init']
son process id 10124 starts working on TaskNo.3 at 0.049992s with parameter 1, this process already executed ['init']
son process id 10124 ends working on TaskNo.3 at 1.054387s
son process id 17548 ends working on TaskNo.2 at 1.044956s
son process id 10052 ends working on TaskNo.1 at 1.062396s
son process id 10124 starts working on TaskNo.4 at 1.055888s with parameter 1, this process already executed ['init', 'TaskNo.3']
son process id 10124 ends working on TaskNo.4 at 2.060094s
all tasks finished at 2.443017s
main process id 18416 ends at 2.444705s
在行程池中利用子行程的回傳值
from multiprocessing import Pool
import time
def task(duration, base_time, task_name):
time.sleep(duration)
return f'{task_name} finished at {"%.6f" % (time.perf_counter()-base_time)}s'
if __name__ == '__main__':
base_time = time.perf_counter()
pool = Pool(2)
return_values = []
return_values.append(pool.apply(task, args=(1,base_time,'TaskNo.1_sync')))
print('at time {}, r_v is {}'.format(time.perf_counter() - base_time, return_values))
return_values.append(pool.apply_async(task, args=(2,base_time,'TaskNo.2_async')))
print('at time {}, r_v is {}'.format(time.perf_counter() - base_time, return_values))
pool.close()
pool.join()
print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')
assert return_values[1].ready() == True
return_values[1] = return_values[1].get() # from ApplyResult to true return value
print('results:', return_values)
at time 1.2109459, r_v is ['TaskNo.1_sync finished at 1.027223s']
at time 1.2124976, r_v is ['TaskNo.1_sync finished at 1.027223s', <multiprocessing.pool.ApplyResult object at 0x0000016D24D79AC0>]
all tasks finished at 3.258190s
results: ['TaskNo.1_sync finished at 1.027223s', 'TaskNo.2_async finished at 3.041053s']
這里在pool.join()之后呼叫result.get(),所以可以立刻得到 子行程所執行的函式的回傳值;如果在對應的子行程尚未return時就呼叫result.get(),則主行程會阻塞直到子行程回傳,然后獲取子行程所執行的函式的回傳值,result.ready()回傳一個bool,表示對應的子行程是否已經return,
此外,result.wait()會阻塞直到子行程回傳,但不會獲取回傳值,
一個ApplyResult實體可以多次呼叫get(),即可以多次獲取回傳值,
詳見 https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult ,
行程間通訊
可以認為,任何一個被跨行程傳送的物件,在傳送程序中都會被深拷貝,
Pipe
from multiprocessing import Process, Pipe
import time
def send_through_pipe(conn, pipe_name, sender_name, content, base_time):
print(sender_name, 'tries to send', content, 'through', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))
conn.send(content)
print(sender_name, 'successfully finishes sending at', '%.6f'%(time.perf_counter()-base_time))
def receive_from_pipe(conn, pipe_name, receiver_name, base_time):
print(receiver_name, 'tries to receive content from', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))
content = conn.recv()
print(receiver_name, 'successfully receives', content, 'at', '%.6f'%(time.perf_counter()-base_time))
return content
def task(conn, pipe_name, process_name, base_time):
receive_from_pipe(conn, pipe_name, process_name, base_time)
time.sleep(1)
send_through_pipe(conn, pipe_name, process_name, 142857, base_time)
if __name__ == '__main__':
base_time = time.perf_counter()
conn_A, conn_B = Pipe() # two endpoints of the pipe
p1 = Process(target=task, args=(conn_B,'pipe','son',base_time))
p1.start()
time.sleep(1)
send_through_pipe(conn_A, 'pipe', 'main', ['hello','hello','hi'], base_time) # any object can be sent
receive_from_pipe(conn_A, 'pipe', 'main', base_time)
p1.join()
son tries to receive content from pipe at 0.036439
main tries to send ['hello', 'hello', 'hi'] through pipe at 1.035570
main successfully finishes sending at 1.037174
main tries to receive content from pipe at 1.037318
son successfully receives ['hello', 'hello', 'hi'] at 1.037794
son tries to send 142857 through pipe at 2.039058
son successfully finishes sending at 2.040158
main successfully receives 142857 at 2.040441
另外,還可以用conn.poll()(回傳Bool型別)來獲知conn中是否有對面發來的未讀資訊,
from multiprocessing import Process, Pipe
import time
def send_through_pipe(conn, pipe_name, sender_name, content, base_time):
print(sender_name, 'tries to send', content, 'through', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))
conn.send(content)
print(sender_name, 'successfully finishes sending at', '%.6f'%(time.perf_counter()-base_time))
def receive_from_pipe(conn, pipe_name, receiver_name, base_time):
print(receiver_name, 'tries to receive content from', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))
content = conn.recv()
print(receiver_name, 'successfully receives', content, 'at', '%.6f'%(time.perf_counter()-base_time))
return content
def task1(conn, pipe_name, process_name, base_time):
receive_from_pipe(conn, pipe_name, process_name, base_time)
time.sleep(1)
send_through_pipe(conn, pipe_name, process_name, 'greetings from ' + process_name, base_time)
def task2(conn, pipe_name, process_name, base_time):
time.sleep(1)
send_through_pipe(conn, pipe_name, process_name, 'greetings from ' + process_name, base_time)
time.sleep(2)
receive_from_pipe(conn, pipe_name, process_name, base_time)
if __name__ == '__main__':
base_time = time.perf_counter()
conn_A, conn_B = Pipe()
p1 = Process(target=task1, args=(conn_A,'pipe','son1',base_time))
p2 = Process(target=task2, args=(conn_B,'pipe','son2',base_time))
p1.start()
p2.start()
p1.join()
p2.join()
son1 tries to receive content from pipe at 0.033372
son2 tries to send greetings from son2 through pipe at 1.058998
son2 successfully finishes sending at 1.060660
son1 successfully receives greetings from son2 at 1.061171
son1 tries to send greetings from son1 through pipe at 2.062389
son1 successfully finishes sending at 2.063290
son2 tries to receive content from pipe at 3.061378
son2 successfully receives greetings from son1 at 3.061843
由此可見:
Pipe可以暫存資料,而且其暫存的資料符合FIFO規則,- 但是,
Pipe用于暫存資料的區域大小比較有限(具體大小隨OS而定),如果這個區域滿了,send()就會被阻塞,直到對面用recv()騰出位置為止,
- 但是,
Pipe的兩個端點可以分配給任意兩個行程,- 不建議把同一個端點分配給多個行程,這可能會帶來風險;如果確實需要的話,請使用
Queue,
- 不建議把同一個端點分配給多個行程,這可能會帶來風險;如果確實需要的話,請使用
Queue
本質上是一個能夠跨行程運行的佇列,
Queue的操作的時間開銷約為Pipe中對應操作的兩倍,
from multiprocessing import Process, Queue
import time
def put_into_queue(q, queue_name, putter_name, content, base_time):
print(putter_name, 'tries to put', content, 'into', queue_name, 'at', '%.6f'%(time.perf_counter()-base_time))
q.put(content)
print(putter_name, 'successfully finishes putting at', '%.6f'%(time.perf_counter()-base_time))
def get_from_queue(q, queue_name, getter_name, base_time):
print(getter_name, 'tries to receive content from', queue_name, 'at', '%.6f'%(time.perf_counter()-base_time))
content = q.get()
print(getter_name, 'successfully gets', content, 'at', '%.6f'%(time.perf_counter()-base_time))
return content
def task1(q, delay, queue_name, process_name, base_time):
time.sleep(delay)
put_into_queue(q, queue_name, process_name, 'christmas card from ' + process_name, base_time)
time.sleep(5)
get_from_queue(q, queue_name, process_name, base_time)
def task2(q, delay, queue_name, process_name, base_time):
time.sleep(delay)
get_from_queue(q, queue_name, process_name, base_time)
time.sleep(5)
put_into_queue(q, queue_name, process_name, 'christmas card from ' + process_name, base_time)
if __name__ == '__main__':
base_time = time.perf_counter()
q = Queue()
put_and_get_1 = Process(target=task1, args=(q,0,'queue','putAndGet_No.1',base_time))
get_and_put_1 = Process(target=task2, args=(q,1,'queue','getAndPut_No.1',base_time))
get_and_put_2 = Process(target=task2, args=(q,2,'queue','getAndPut_No.2',base_time))
put_and_get_1.start()
get_and_put_1.start()
get_and_put_2.start()
put_and_get_1.join()
get_and_put_1.join()
get_and_put_2.join()
putAndGet_No.1 tries to put christmas card from putAndGet_No.1 into queue at 0.077883
putAndGet_No.1 successfully finishes putting at 0.079291
getAndPut_No.1 tries to receive content from queue at 1.104196
getAndPut_No.1 successfully gets christmas card from putAndGet_No.1 at 1.105489
getAndPut_No.2 tries to receive content from queue at 2.126434
putAndGet_No.1 tries to receive content from queue at 5.081044
getAndPut_No.1 tries to put christmas card from getAndPut_No.1 into queue at 6.106381
getAndPut_No.1 successfully finishes putting at 6.107820
getAndPut_No.2 successfully gets christmas card from getAndPut_No.1 at 6.108565
getAndPut_No.2 tries to put christmas card from getAndPut_No.2 into queue at 11.109579
getAndPut_No.2 successfully finishes putting at 11.112493
putAndGet_No.1 successfully gets christmas card from getAndPut_No.2 at 11.113546
另外,如果Queue的大小實在過大以至于達到了某個上限,則put()操作也會被阻塞,不過應該很難把大小弄到那么大,
多執行緒
基本語法和多行程很相似,但機制上有重要的不同,由于全域解釋器鎖的存在,Python多執行緒并不實用,這里僅作簡單介紹,
從下圖中可以看到,多執行緒的基本代碼和多行程完全一致,下圖中的代碼在CPython解釋器中會運行大約3s,
另外,多執行緒中其實不需要這個if __name__ == '__main__':的判斷,

多執行緒的變數機制
import threading
lock_n = threading.Lock()
n = 0
def inc_n(m):
global n
lock_n.acquire(blocking=True)
n += m
lock_n.release()
threads = [threading.Thread(target=inc_n, args=(i,)) for i in range(1,11)]
[t.start() for t in threads]
[t.join() for t in threads]
print(n)
55
- 由上可見,不同的執行緒之間共享運行環境(比如上面的變數
n), lock.acquire(blocking=True)會一直阻塞直到鎖空出來為止;一旦空出來就會把它鎖上,
并發處理:協程
不同的程序在同一個執行緒內交替執行,每個協程在運行時獨占資源,一段運行結束后自阻塞,等待著被外部(如main函式)控制它的代碼喚醒,
相比多執行緒的優點:輕量級(在解釋器層面實作,不需要內核來做切換)、數量不限,
和多執行緒一樣,不同協程之間共用運行環境,
用簡單的生成器實作協程
def sum(init):
s = init
while True:
delta = yield s # output s, and then input delta
s += delta
g = sum(0)
print(next(g)) # run entil receiving the first output
print(g.send(1)) # send the first input, and then get the second output
print(g.send(2)) # send the second input, and then get the third output
0
1
3
上例中只是演示了生成器的自阻塞,以及生成器與其呼叫者之間的互動,
更進一步,還可以定義多個生成器執行不同的程序,并在main函式里進行對它們的調度(比如實作一個任務佇列),從而實作協程,
用回呼函式(callback)將普通函式變為協程
def calc(n,callback):
r = 0
for i in range(n):
r += i
callback()
def pause():
print('pause')
yield # just pause, do not return anything
g = calc(10,pause)
用async/await實作協程
相比生成器實作的優點:可以在等待IO/等待網路通信等情況下時阻塞當前協程執行其他協程(而且不會中斷等待IO/通信)以節省時間(而只用生成器則無法做到);使用更靈活、方便,
- 多執行緒其實也有前一個優點,所以CPython下的多執行緒也并不是毫無用處,但它的用處是協程用處的子集,
- 一個注意事項:若想通過協程加速IO,必須使用python中專門的異步IO庫才行,
基礎使用
import time
start = time.perf_counter()
def sayhi(delay):
time.sleep(delay)
print(f'hi! at {time.perf_counter() - start}')
def main():
sayhi(1)
sayhi(2)
main()
hi! at 1.0040732999914326
hi! at 3.015253899997333
import time
import asyncio
start = time.perf_counter()
async def sayhi(delay):
await asyncio.sleep(delay)
print(f'hi! at {time.perf_counter() - start}')
async def main():
sayhi1 = asyncio.create_task(sayhi(1))
sayhi2 = asyncio.create_task(sayhi(2))
await sayhi1
await sayhi2
# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
hi! at 1.0037910000100965
hi! at 2.0026504999987083
上面的程式中:
async宣告當前函式是一個協程,這一宣告使得函式體內可以使用create_task和await,也使得該函式本身可以被create_task和await,- 一旦一個函式
f被宣告為協程,f()做的事就不再是運行f,而只是創建一個協程物件并回傳之(這個物件并不會自動被運行),需要使用asyncio中的相關工具來運行這個物件,
- 一旦一個函式
run(main())表示開始執行協程main(),要求main()必須是“主協程”,即它是整個程式中所有協程的入口(類似主函式),一個程式中run(main())只應被呼叫一次,且在main()之外不應有任何協程被呼叫,run(main())是阻塞的,協程的并發特性只有在main()內部才會顯現,從外部來看這就是一個普普通通的黑箱呼叫,run()的作用是啟動 運行協程所需的環境(并在main()完成后將其關閉),但在IPython中,一開始運行就已經自動幫你啟動好了,所以可以直接用await(而且也不必把所有協程都放在一個主協程中,而可以散布在程式各處),
-
create_task(sayhi(1))表示為協程sayhi(1)在某個“任務池”中創建一個任務,并且開始執行該任務,回傳值是這個任務的handle,或者說“遙控器”,- 任務池中的任務會并發地執行,任務在何時可以中斷并切換到別的任務,這一點由
await指定,
- 任務池中的任務會并發地執行,任務在何時可以中斷并切換到別的任務,這一點由
-
await sayhi1有兩重含義:-
- 阻塞當前協程(該陳述句所在的協程,這里是
main())的執行,直到任務sayhi1完成,(類似Process.join())
- 阻塞當前協程(該陳述句所在的協程,這里是
-
- 告訴解釋器,現在當前協程(該陳述句所在的協程)開始阻塞,你可以切換協程了,
- 如果這里
await的不是sayhi1而是,比如說,一個接受http請求的操作,那么在解釋器切換協程后不會影響對這個請求的等待,這就是asyncio的強大之處,- 這一點在
await asyncio.sleep(delay)就有體現,asyncio.sleep()就具有“切換協程不影響等待”的特性,
- 這一點在
-
-
關于
await的幾件事:await的可以不是已創建的任務而是一個協程物件(比如await sayhi(1)),此時不會將其加入任務池,而會直接開始執行(當然,也可能剛開始執行就被切換到別的協程,因為用了await),并一直阻塞直到完成,這會導致sayhi(1)無法作為一個任務、與其他任務平等地參與并發,但是它仍然可以隨著父協程(這里是main())的中斷和恢復而間接地參與并發,- 能夠被
await的不只有協程物件和任務handle,還有任何awaitable object,即任何實作了__await__方法(從而告訴了解釋器如何在自己剛開始執行時就阻塞并切換協程,且不影響內部可能在進行的等待和其他操作)的物件, await的物件只可能在剛開始執行時立刻阻塞并切換協程,執行程序中其他可以阻塞的位置,是由這個物件內部使用的其他await陳述句指定的,而不是呼叫這個物件的那條await陳述句,
import time
import asyncio
start = time.perf_counter()
async def sayhi(delay):
await asyncio.sleep(delay)
print(f'hi! at {time.perf_counter() - start}')
async def main():
await sayhi(1)
await sayhi(2)
# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
hi! at 1.0072715999995125
hi! at 3.0168006000021705
wait_for()
把await A(A為任意awaitable)改成await asyncio.wait_for(A,timeout),就可以給await操作加上timeout秒的時限,一旦await了這么多秒還沒有結束,就會中斷A的執行并拋出asyncio.TimeoutError,
不用關心wait_for具體做了什么,你只需要記住await asyncio.wait_for(A,timeout)這個句子就行,可以認為這個句子和await A在(除了timeout以外的)其他方面上沒有區別,下面是例子,
import time
import asyncio
async def eternity():
await asyncio.sleep(3600)
print('yay!')
async def main():
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')
# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
timeout!
import time
import asyncio
start = time.perf_counter()
async def sayhi(delay):
await asyncio.sleep(delay)
print(f'hi! at {time.perf_counter() - start}')
async def main():
sayhi1 = asyncio.create_task(sayhi(1))
sayhi2 = asyncio.create_task(sayhi(2))
await asyncio.wait_for(sayhi1,1.05)
await asyncio.wait_for(sayhi2,1.05)
# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
hi! at 1.0181081000046106
hi! at 2.0045300999918254
import time
import asyncio
start = time.perf_counter()
async def sayhi(delay):
await asyncio.sleep(delay)
print(f'hi! at {time.perf_counter() - start}')
async def main():
sayhi1 = asyncio.create_task(sayhi(1))
sayhi2 = asyncio.create_task(sayhi(2))
await asyncio.wait_for(sayhi1,0.95)
await asyncio.wait_for(sayhi2,1.05)
# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
---------------------------------------------------------------------------
TimeoutError Traceback (most recent call last)
<ipython-input-89-7f639d54114e> in <module>
15
16 # asyncio.run(main()) # use outside IPython
---> 17 await main() # use inside IPython
<ipython-input-89-7f639d54114e> in main()
11 sayhi1 = asyncio.create_task(sayhi(1))
12 sayhi2 = asyncio.create_task(sayhi(2))
---> 13 await asyncio.wait_for(sayhi1,0.95)
14 await asyncio.wait_for(sayhi2,1.05)
15
~\anaconda3\lib\asyncio\tasks.py in wait_for(fut, timeout, loop)
488 # See https://bugs.python.org/issue32751
489 await _cancel_and_wait(fut, loop=loop)
--> 490 raise exceptions.TimeoutError()
491 finally:
492 timeout_handle.cancel()
TimeoutError:
hi! at 2.0194762000028277
另外,注意到即使協程sayhi1拋出了例外,父協程main()仍然能夠繼續執行sayhi2,可見不同協程間是有一定的獨立性的,
實作生產者-消費者協程
為此需要使用 asyncio.Queue ,它相比普通的佇列的區別是,其put/get操作會在無法執行時阻塞(這一點和multiprocessing.Queue很像),而且這些操作都是協程(注意到,這使得你呼叫它們時只會回傳協程物件而不會實際執行),可以await,
import time
import asyncio
start = time.perf_counter()
async def producer(q):
for i in range(5):
await asyncio.sleep(1) # producing takes 1 sec
await q.put(i) # will wait if q is full
print(f'put {i} at {time.perf_counter() - start}')
await q.join() # will wait until all objects produced are **taken out** and **consumed**.
async def consumer(q):
for i in range(5):
item = await q.get() # will wait if q is empty. BTW we see that "await XXX" is an expression not a command.
print(f'get {item} at {time.perf_counter() - start}')
await asyncio.sleep(1) # consuming takes 1 sec
q.task_done() # tells the queue that [the object just taken out] has been consumed. just taking out is not enough!
print(f'consumed {item} at {time.perf_counter() - start}')
async def main():
q = asyncio.Queue()
P = asyncio.create_task(producer(q))
C = asyncio.create_task(consumer(q))
await P
await C
print(f'done at {time.perf_counter() - start}')
# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
put 0 at 1.0108397000003606
get 0 at 1.0112231999955839
put 1 at 2.017216499996721
consumed 0 at 2.0176210000063293
get 1 at 2.0177472999930615
put 2 at 3.0279211000015493
consumed 1 at 3.0283254999958444
get 2 at 3.028457599997637
put 3 at 4.039952199993422
consumed 2 at 4.041183299996192
get 3 at 4.041302300000098
put 4 at 5.0465819999953965
consumed 3 at 5.04690839999239
get 4 at 5.047016099997563
consumed 4 at 6.047789799995371
done at 6.048323099996196
import time
import asyncio
start = time.perf_counter()
async def sleep_and_put(q):
await asyncio.sleep(1)
await q.put(1)
async def main():
q = asyncio.Queue()
C = asyncio.create_task(q.get())
P = asyncio.create_task(sleep_and_put(q))
await C
await P
print(f'finished at {time.perf_counter() - start}')
# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
finished at 1.01112650000141
由上例可見,Queue.get()(其實Queue.put()等其他方法也一樣)是一個協程,因此也可以給它創建任務以進行并發,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/278737.html
標籤:Python
上一篇:1.1Python起源
