主頁 > 後端開發 > Python中的多行程、多執行緒和協程

Python中的多行程、多執行緒和協程

2021-04-22 06:07:27 後端開發

本文中的內容來自我的筆記,撰寫程序中參考了胡俊峰老師《Python程式設計與資料科學導論》課程的內容,

目錄
  • 并發處理:多行程和多執行緒
    • 前置
    • 多行程和多執行緒的比較
    • 多行程的機制和代碼實作
      • 基本用法
      • 行程復制
      • 行程池
        • 行程池的基本用法
        • 行程池中的行程復制
        • 在行程池中利用子行程的回傳值
      • 行程間通訊
        • Pipe
        • Queue
    • 多執行緒
      • 多執行緒的變數機制
  • 并發處理:協程
    • 用簡單的生成器實作協程
    • 用回呼函式(callback)將普通函式變為協程
    • 用async/await實作協程
      • 基礎使用
      • wait_for()
      • 實作生產者-消費者協程

并發處理:多行程和多執行緒

前置

概念:

  • 并發:一段時間內同時推進多個任務,但不一定要在一個時刻同時進行多個任務,
  • 并行:一段時間內同時推進多個任務,且在一個時刻要同時進行多個任務,
  • 并行是并發的子集;單核CPU交替執行多個任務是并發但不是并行;多核CPU同時執行多個任務既是并發也是并行,

何時需要并發?

  1. 需要同時處理多個任務
  2. 經常需要等待資源
  3. 多個子程序互相協作

電腦執行任務的機制:

  • 作業系統內核 負責任務(i.e. 行程/執行緒)的掛起與喚醒,和資源的分配(比如一個程式能訪問哪些記憶體地址)
  • 行程是資源分配的最小單元,不同行程之間不共享資源(比如可訪問的記憶體區域);行程可再分為執行緒,執行緒之間共享大部分資源,
    • 正是因為 是否共享資源 上的區別,執行緒間的切換(即掛起和喚醒)比行程間的切換更快,
    • 執行緒是調度執行的最小單元,這意味著作業系統內核會負責將多個執行緒并發執行,

多行程和多執行緒的比較

多行程:

  • 將任務拆分為多個行程來進行
    • 由內核決定是并行還是僅僅并發,
  • 行程間不共享記憶體
    • 優點:一個行程掛了不影響別的
    • 缺點:切換行程耗時大、行程間通信不便

多執行緒:

  • 將任務拆分為一個行程內的多個執行緒來進行
    • 由內核決定是并行還是僅僅并發,
    • 在CPython解釋器中有全域解釋器鎖,導致多執行緒只能并發而不能并行(多行程可以并行),
  • 行程間共享記憶體
    • 優點:切換耗時低、通信方便
    • 缺點:在并行時對全域變數要使用鎖機制
      • 鎖機制:一個執行緒使用一個全域變數時,先等待其(被其他執行緒)解鎖,再將其上鎖,再使用,用后再解鎖,
        • 如果不使用鎖的話:100個a+=1的執行緒執行完成后(初始a=0),a可能<100
      • 資料科學中可以為了提高效率而不使用鎖機制,但同時要容忍由此帶來的差錯,

多行程的機制和代碼實作

以下介紹的函式中,幾乎每一個有阻塞可能的,都會有一個可選的timeout引數,這件事將不再重提,

基本用法

from multiprocessing import Process
import os
import time

def task(duration, base_time):
    pid = os.getpid()
    print(f'son process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}')
    time.sleep(duration)
    print(f'son process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')

if __name__ == '__main__':
    pid = os.getpid()
    base_time = time.perf_counter()
    print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')
    p1 = Process(target=task, args=(1,base_time)) # a process that executes task(1,base_time); currently not running
    p2 = Process(target=task, args=(2,base_time)) # a process that executes task(2,base_time); currently not running
    p1.start()
    p2.start()
    print(p1.is_alive(), p2.is_alive()) # whether they are running
    print('main process can proceed while son processes are running')
    p1.join() # wait until p1 finishes executing (the main process will pause on this command in the meantime) and kill it after it finishes
    p2.join() # wait until p2 finishes executing (the main process will pause on this command in the meantime) and kill it after it finishes
    print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')
main process id 3316 starts at 0.000001s
True True
main process can proceed while son processes are running
son process id 15640 starts at 0.002056s with parameter 1
son process id 10716 starts at 0.003030s with parameter 2
son process id 15640 ends at 1.002352s
son process id 10716 ends at 2.017861s
main process id 3316 ends at 2.114324s

如果沒有p1.join()p2.join(),主行程會在很短的時間內完成執行,而此時子行程仍在運行,輸出結果如下:

main process id 11564 starts at 0.000001s
True True
main process can proceed while son processes are running
main process id 11564 ends at 0.011759s
son process id 13500 starts at 0.004392s with parameter 1
son process id 11624 starts at 0.003182s with parameter 2
son process id 13500 ends at 1.009420s
son process id 11624 ends at 2.021817s

為何0.004秒的事件在0.003秒的事件之前被輸出?

  1. 因為print陳述句的耗時未被計算在內
  2. 因為perf_counter()在Windows下有bug,它給出的時間在不同行程之間不完全同步

需要注意,一個子行程結束運行后仍然處于存活狀態;只有被join()之后才會正式死亡(即被從系統中除名),

關于if __name__ == '__main__'::

  • 在Python中,可以通過import來獲取其他檔案中的代碼;在檔案B的開頭(其他位置同理)import檔案A,相當于把A在B的開頭復制一份,
  • 如果在復制A的內容時,我們希望A中的一部分代碼在執行時被忽略(比如說測驗陳述句),就可以給A中的這些代碼加上if __name__ == '__main__':
    • 對于從別處import來的代碼,系統變數__name__在這段代碼中會等于來源檔案的名字(或模塊名,這你不用在意);對于存在于本檔案中的代碼,__name__會等于__main__
  • 由于某些原因,在Windows下,如果一個檔案的代碼中使用了多行程,則這個檔案中會隱式地import自己(一次或多次);將所有零級縮進的代碼放在if __name__ == '__main__':中,可以避免產生重復執行的問題(注意到如果不這樣做的話,import來的副本中還會再次import自身,導致無限遞回import并報錯),
    • 暫時可以認為,采取這一措施后就可完全消除“隱式import自身”所產生的效應,

行程復制

from multiprocessing import Process
import os

pid = os.getpid()

def task():
    global pid
    print(pid)
    pid = 1
    print(pid)

if __name__ == '__main__':
    p = Process(target=task)
    p.start()
    p.join()
    print(pid)

在Windows下的輸出:

4836
1
2944

在Linux下的輸出:

511
1
511

前兩個數都是由子行程輸出,第三個數由父行程輸出,

  • 注意到pid在子行程中被賦為1后,在父行程中并不是1,這說明,子行程的target函式中對運行環境的修改,不影響父行程的運行環境,事實上,反之也是成立的(父不影響子),也就是說,一旦子行程的運行環境完成創建之后,父行程的運行環境與子行程的運行環境之間就完全獨立,
    • 由于這個獨立性,子行程的運算結果也無法直接反饋給父行程,稍后會介紹兩種解決方式:1. 行程間通信 2. 利用 行程池apply方法的回傳值,
  • 注意到一三行的輸出在Windows下不同,而在Linux下相同,這說明,子行程中全域變數pid的取值,在Linux下是直接復制父行程中pid的取值而得到的,在Windows下是通過重新運行pid = os.getpid()而得到的,更一般地,有以下這兩個事實:
    • 在Windows中,Process(target)創建出的子行程是一張白紙(即運行環境空空如也);當呼叫start()的時候,它會先通過import陳述句來將父行程的整個代碼檔案完整執行一遍(從而創建出一個新的運行環境),然后再開始運行target函式,所以,if __name__ == '__main__':包起來的代碼,就只會被父行程執行;而未被包起來的零級縮進代碼,則也會被每個子行程(在自己的運行環境里)各自執行一遍,
      • 這就是之前提到的“隱式import自身”的機制,
    • 在Linux中,Process(target)創建出的子行程,會全盤復制父行程的運行環境,而不會自己重新創建,復制出來的子行程運行環境,與父行程的運行環境完全獨立,

Linux下的行程復制方式稱為fork,Windows下的行程復制方式稱為spawn,關于這些,詳見 https://stackoverflow.com/questions/64095876/multiprocessing-fork-vs-spawn ,

from multiprocessing import Process
import os

def task():
    pass

if __name__ == '__main__':
    p = Process(target=task)
    print('son process created')
    p.start()
    print('son process starts')
    p.join()
    print('son process ends')

print('gu?')

在Windows下的輸出

son process created
son process starts
gu?
son process ends
gu?

由此可見,Windows下子行程(在初始化時)在執行父行程的代碼檔案時,父行程中son_process.start()以后的內容(比如print('gu?'))也會被執行,

行程池

如果我們有很多的任務要同時進行,為每個任務各開一個行程既低效(創建和銷毀行程開銷大、無法全部并行、內核難以調度)又可能不被內核允許,

解決方案:使用行程池,池中放著幾個行程(一般不會比CPU的核數多很多),有新任務時找一個空閑行程分配給它,沒有空閑行程則等待,缺點是沒有空閑行程時需要等待,因此不能算是完全的并發,

行程池的基本用法
from multiprocessing import Pool
import os, time

def task(duration, base_time, task_name):
    pid = os.getpid()
    print(f'son process id {pid} starts working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}')
    time.sleep(duration)
    print(f'son process id {pid} ends working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s')

if __name__ == '__main__':
    pid = os.getpid()
    base_time = time.perf_counter()
    print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')
    pool = Pool(3) # a pool containing 3 subprocesses
    print('start assigning tasks')
    for i in range(4):
        pool.apply_async(task, args=(1, base_time, "TaskNo."+str(i+1))) # assign task to some process in pool and start running
                                                                        # if all son processes are busy, wait until one is free and then start
    pool.close() # no longer accepting new tasks, but already applied ones (including those that are waiting) keeps running.
    print('all tasks assigned; wait for son processes to finish')
    pool.join() # wait until all tasks are done, and then the pool is dead. `join()` can be called only if `close()` has already been called
    print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')
    print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')

輸出:(Win和Linux下輸出相似)

main process id 5236 starts at 0.000002s
start assigning tasks
all tasks assigned; wait for son processes to finish
son process id 8724 starts working on TaskNo.1 at 0.030557s with parameter 1
son process id 14584 starts working on TaskNo.2 at 0.037581s with parameter 1
son process id 10028 starts working on TaskNo.3 at 0.041210s with parameter 1
son process id 14584 ends working on TaskNo.2 at 1.042662s
son process id 8724 ends working on TaskNo.1 at 1.040211s
son process id 14584 starts working on TaskNo.4 at 1.044109s with parameter 1
son process id 10028 ends working on TaskNo.3 at 1.054017s
son process id 14584 ends working on TaskNo.4 at 2.055515s
all tasks finished at 2.214534s
main process id 5236 ends at 2.214884s

當使用apply_async(“異步呼叫”)添加任務時,主行程在子行程執行任務期間會繼續運行;如果用apply(“同步呼叫”)添加任務,則主行程會暫停(“阻塞”)直到該任務完成,一般使用apply_async而不是apply

行程池中的行程復制
from multiprocessing import Pool
import os, time

all_tasks_on_this_son_process = []

def task(duration, base_time, task_name):
    global all_tasks_on_this_son_process
    pid = os.getpid()
    print(f'son process id {pid} starts working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}, this process already executed',all_tasks_on_this_son_process)
    time.sleep(duration)
    print(f'son process id {pid} ends working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s')
    all_tasks_on_this_son_process += [task_name]

if __name__ == '__main__':
    pid = os.getpid()
    base_time = time.perf_counter()
    print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')
    pool = Pool(3)
    print('start assigning tasks')
    for i in range(4):
        pool.apply_async(task, args=(1, base_time, "TaskNo."+str(i+1)))
    pool.close()
    print('all tasks assigned; wait for son processes to finish')
    pool.join()
    print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')
    print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')

print('gu?')

Windows下輸出:

main process id 6116 starts at 0.000001s
start assigning tasks
all tasks assigned; wait for son processes to finish
gu?
gu?
gu?
son process id 16028 starts working on TaskNo.1 at 0.037577s with parameter 1, this process already executed []
son process id 11696 starts working on TaskNo.2 at 0.041393s with parameter 1, this process already executed []
son process id 5400 starts working on TaskNo.3 at 0.038409s with parameter 1, this process already executed []
son process id 11696 ends working on TaskNo.2 at 1.041521s
son process id 16028 ends working on TaskNo.1 at 1.038722s
son process id 11696 starts working on TaskNo.4 at 1.042543s with parameter 1, this process already executed ['TaskNo.2']
son process id 5400 ends working on TaskNo.3 at 1.052573s
son process id 11696 ends working on TaskNo.4 at 2.053483s
all tasks finished at 2.167447s
main process id 6116 ends at 2.167904s
gu?

在Windows下,池中的每個執行緒會在(且僅在)它分配到的的第一個任務將要開始執行時,運行一遍父行程的代碼以構建運行環境,一個行程在前一個任務中對運行環境的改變,原樣體現在下一個任務的運行環境里,(即接受新任務的時候會直接繼續使用上一個任務遺留下的運行環境)

Linux下輸出:

main process id 691 starts at 0.000001s
all tasks assigned; wait for son processes to finish
son process id 692 starts working on TaskNo.1 at 0.104757s with parameter 1, this process already executed []
son process id 693 starts working on TaskNo.2 at 0.104879s with parameter 1, this process already executed []
son process id 694 starts working on TaskNo.3 at 0.105440s with parameter 1, this process already executed []
son process id 692 ends working on TaskNo.1 at 1.106427s
son process id 693 ends working on TaskNo.2 at 1.106426s
son process id 694 ends working on TaskNo.3 at 1.107157s
son process id 692 starts working on TaskNo.4 at 1.107560s with parameter 1, this process already executed ['TaskNo.1']
son process id 692 ends working on TaskNo.4 at 2.110033s
all tasks finished at 2.117158s
main process id 691 ends at 2.117452s
gu?

在Linux下,池中的每個執行緒會在(且僅在)它的第一個任務將要開始執行時,從父行程將運行環境完整復制一遍,一個行程在前一個任務中對運行環境的改變,原樣體現在下一個任務的運行環境里,(即接受新任務的時候會直接繼續使用上一個任務遺留下的運行環境)

from multiprocessing import Pool
import os, time

all_tasks_on_this_son_process = []

def init(init_name):
    global all_tasks_on_this_son_process
    all_tasks_on_this_son_process += [init_name]

def task(duration, base_time, task_name):
    global all_tasks_on_this_son_process
    pid = os.getpid()
    print(f'son process id {pid} starts working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s with parameter {duration}, this process already executed',all_tasks_on_this_son_process)
    time.sleep(duration)
    print(f'son process id {pid} ends working on {task_name} at {"%.6f" % (time.perf_counter()-base_time)}s')
    all_tasks_on_this_son_process += [task_name]

if __name__ == '__main__':
    pid = os.getpid()
    base_time = time.perf_counter()
    print(f'main process id {pid} starts at {"%.6f" % (time.perf_counter()-base_time)}s')
    pool = Pool(3, initializer=init, initargs=('init',)) # look here
    print('start assigning tasks')
    for i in range(4):
        pool.apply_async(task, args=(1, base_time, "TaskNo."+str(i+1)))
    pool.close()
    print('all tasks assigned; wait for son processes to finish')
    pool.join()
    print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')
    print(f'main process id {pid} ends at {"%.6f" % (time.perf_counter()-base_time)}s')

輸出(Win下與Linux下相似):

main process id 18416 starts at 0.000004s
start assigning tasks
all tasks assigned; wait for son processes to finish
son process id 10052 starts working on TaskNo.1 at 0.053483s with parameter 1, this process already executed ['init']
son process id 17548 starts working on TaskNo.2 at 0.040412s with parameter 1, this process already executed ['init']
son process id 10124 starts working on TaskNo.3 at 0.049992s with parameter 1, this process already executed ['init']
son process id 10124 ends working on TaskNo.3 at 1.054387s
son process id 17548 ends working on TaskNo.2 at 1.044956s
son process id 10052 ends working on TaskNo.1 at 1.062396s
son process id 10124 starts working on TaskNo.4 at 1.055888s with parameter 1, this process already executed ['init', 'TaskNo.3']      
son process id 10124 ends working on TaskNo.4 at 2.060094s
all tasks finished at 2.443017s
main process id 18416 ends at 2.444705s
在行程池中利用子行程的回傳值
from multiprocessing import Pool
import time

def task(duration, base_time, task_name):
    time.sleep(duration)
    return f'{task_name} finished at {"%.6f" % (time.perf_counter()-base_time)}s'

if __name__ == '__main__':
    base_time = time.perf_counter()
    pool = Pool(2)
    return_values = []
    return_values.append(pool.apply(task, args=(1,base_time,'TaskNo.1_sync')))
    print('at time {}, r_v is {}'.format(time.perf_counter() - base_time, return_values))
    return_values.append(pool.apply_async(task, args=(2,base_time,'TaskNo.2_async')))
    print('at time {}, r_v is {}'.format(time.perf_counter() - base_time, return_values))
    pool.close()
    pool.join()
    print(f'all tasks finished at {"%.6f" % (time.perf_counter()-base_time)}s')
    assert return_values[1].ready() == True
    return_values[1] = return_values[1].get() # from ApplyResult to true return value
    print('results:', return_values)
at time 1.2109459, r_v is ['TaskNo.1_sync finished at 1.027223s']
at time 1.2124976, r_v is ['TaskNo.1_sync finished at 1.027223s', <multiprocessing.pool.ApplyResult object at 0x0000016D24D79AC0>]     
all tasks finished at 3.258190s
results: ['TaskNo.1_sync finished at 1.027223s', 'TaskNo.2_async finished at 3.041053s']

這里在pool.join()之后呼叫result.get(),所以可以立刻得到 子行程所執行的函式的回傳值;如果在對應的子行程尚未return時就呼叫result.get(),則主行程會阻塞直到子行程回傳,然后獲取子行程所執行的函式的回傳值,result.ready()回傳一個bool,表示對應的子行程是否已經return

此外,result.wait()會阻塞直到子行程回傳,但不會獲取回傳值,

一個ApplyResult實體可以多次呼叫get(),即可以多次獲取回傳值,

詳見 https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.AsyncResult ,

行程間通訊

可以認為,任何一個被跨行程傳送的物件,在傳送程序中都會被深拷貝,

Pipe
from multiprocessing import Process, Pipe
import time

def send_through_pipe(conn, pipe_name, sender_name, content, base_time):
    print(sender_name, 'tries to send', content, 'through', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))
    conn.send(content)
    print(sender_name, 'successfully finishes sending at', '%.6f'%(time.perf_counter()-base_time))
    
def receive_from_pipe(conn, pipe_name, receiver_name, base_time):
    print(receiver_name, 'tries to receive content from', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))
    content = conn.recv()
    print(receiver_name, 'successfully receives', content, 'at', '%.6f'%(time.perf_counter()-base_time))
    return content

def task(conn, pipe_name, process_name, base_time):
    receive_from_pipe(conn, pipe_name, process_name, base_time)
    time.sleep(1)
    send_through_pipe(conn, pipe_name, process_name, 142857, base_time)

if __name__ == '__main__':
    base_time = time.perf_counter()
    conn_A, conn_B = Pipe() # two endpoints of the pipe
    p1 = Process(target=task, args=(conn_B,'pipe','son',base_time))
    p1.start()
    
    time.sleep(1)
    send_through_pipe(conn_A, 'pipe', 'main', ['hello','hello','hi'], base_time) # any object can be sent
    receive_from_pipe(conn_A, 'pipe', 'main', base_time)
    p1.join()
son tries to receive content from pipe at 0.036439
main tries to send ['hello', 'hello', 'hi'] through pipe at 1.035570
main successfully finishes sending at 1.037174
main tries to receive content from pipe at 1.037318
son successfully receives ['hello', 'hello', 'hi'] at 1.037794
son tries to send 142857 through pipe at 2.039058
son successfully finishes sending at 2.040158
main successfully receives 142857 at 2.040441

另外,還可以用conn.poll()(回傳Bool型別)來獲知conn中是否有對面發來的未讀資訊,

from multiprocessing import Process, Pipe
import time

def send_through_pipe(conn, pipe_name, sender_name, content, base_time):
    print(sender_name, 'tries to send', content, 'through', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))
    conn.send(content)
    print(sender_name, 'successfully finishes sending at', '%.6f'%(time.perf_counter()-base_time))
    
def receive_from_pipe(conn, pipe_name, receiver_name, base_time):
    print(receiver_name, 'tries to receive content from', pipe_name, 'at', '%.6f'%(time.perf_counter()-base_time))
    content = conn.recv()
    print(receiver_name, 'successfully receives', content, 'at', '%.6f'%(time.perf_counter()-base_time))
    return content

def task1(conn, pipe_name, process_name, base_time):
    receive_from_pipe(conn, pipe_name, process_name, base_time)
    time.sleep(1)
    send_through_pipe(conn, pipe_name, process_name, 'greetings from ' + process_name, base_time)

def task2(conn, pipe_name, process_name, base_time):
    time.sleep(1)
    send_through_pipe(conn, pipe_name, process_name, 'greetings from ' + process_name, base_time)
    time.sleep(2)
    receive_from_pipe(conn, pipe_name, process_name, base_time)

if __name__ == '__main__':
    base_time = time.perf_counter()
    conn_A, conn_B = Pipe()
    p1 = Process(target=task1, args=(conn_A,'pipe','son1',base_time))
    p2 = Process(target=task2, args=(conn_B,'pipe','son2',base_time))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
son1 tries to receive content from pipe at 0.033372
son2 tries to send greetings from son2 through pipe at 1.058998
son2 successfully finishes sending at 1.060660
son1 successfully receives greetings from son2 at 1.061171
son1 tries to send greetings from son1 through pipe at 2.062389
son1 successfully finishes sending at 2.063290
son2 tries to receive content from pipe at 3.061378
son2 successfully receives greetings from son1 at 3.061843

由此可見:

  • Pipe可以暫存資料,而且其暫存的資料符合FIFO規則,
    • 但是,Pipe用于暫存資料的區域大小比較有限(具體大小隨OS而定),如果這個區域滿了,send()就會被阻塞,直到對面用recv()騰出位置為止,
  • Pipe的兩個端點可以分配給任意兩個行程,
    • 不建議把同一個端點分配給多個行程,這可能會帶來風險;如果確實需要的話,請使用Queue
Queue

本質上是一個能夠跨行程運行的佇列,

Queue的操作的時間開銷約為Pipe中對應操作的兩倍,

from multiprocessing import Process, Queue
import time

def put_into_queue(q, queue_name, putter_name, content, base_time):
    print(putter_name, 'tries to put', content, 'into', queue_name, 'at', '%.6f'%(time.perf_counter()-base_time))
    q.put(content)
    print(putter_name, 'successfully finishes putting at', '%.6f'%(time.perf_counter()-base_time))
    
def get_from_queue(q, queue_name, getter_name, base_time):
    print(getter_name, 'tries to receive content from', queue_name, 'at', '%.6f'%(time.perf_counter()-base_time))
    content = q.get()
    print(getter_name, 'successfully gets', content, 'at', '%.6f'%(time.perf_counter()-base_time))
    return content

def task1(q, delay, queue_name, process_name, base_time):
    time.sleep(delay)
    put_into_queue(q, queue_name, process_name, 'christmas card from ' + process_name, base_time)
    time.sleep(5)
    get_from_queue(q, queue_name, process_name, base_time)

def task2(q, delay, queue_name, process_name, base_time):
    time.sleep(delay)
    get_from_queue(q, queue_name, process_name, base_time)
    time.sleep(5)
    put_into_queue(q, queue_name, process_name, 'christmas card from ' + process_name, base_time)

if __name__ == '__main__':
    base_time = time.perf_counter()
    q = Queue()
    put_and_get_1 = Process(target=task1, args=(q,0,'queue','putAndGet_No.1',base_time))
    get_and_put_1 = Process(target=task2, args=(q,1,'queue','getAndPut_No.1',base_time))
    get_and_put_2 = Process(target=task2, args=(q,2,'queue','getAndPut_No.2',base_time))
    put_and_get_1.start()
    get_and_put_1.start()
    get_and_put_2.start()
    put_and_get_1.join()
    get_and_put_1.join()
    get_and_put_2.join()
putAndGet_No.1 tries to put christmas card from putAndGet_No.1 into queue at 0.077883
putAndGet_No.1 successfully finishes putting at 0.079291
getAndPut_No.1 tries to receive content from queue at 1.104196
getAndPut_No.1 successfully gets christmas card from putAndGet_No.1 at 1.105489
getAndPut_No.2 tries to receive content from queue at 2.126434
putAndGet_No.1 tries to receive content from queue at 5.081044
getAndPut_No.1 tries to put christmas card from getAndPut_No.1 into queue at 6.106381
getAndPut_No.1 successfully finishes putting at 6.107820
getAndPut_No.2 successfully gets christmas card from getAndPut_No.1 at 6.108565
getAndPut_No.2 tries to put christmas card from getAndPut_No.2 into queue at 11.109579
getAndPut_No.2 successfully finishes putting at 11.112493
putAndGet_No.1 successfully gets christmas card from getAndPut_No.2 at 11.113546

另外,如果Queue的大小實在過大以至于達到了某個上限,則put()操作也會被阻塞,不過應該很難把大小弄到那么大,

多執行緒

基本語法和多行程很相似,但機制上有重要的不同,由于全域解釋器鎖的存在,Python多執行緒并不實用,這里僅作簡單介紹,

從下圖中可以看到,多執行緒的基本代碼和多行程完全一致,下圖中的代碼在CPython解釋器中會運行大約3s,

另外,多執行緒中其實不需要這個if __name__ == '__main__':的判斷,

多執行緒的變數機制

import threading

lock_n = threading.Lock()
n = 0

def inc_n(m):
    global n
    lock_n.acquire(blocking=True)
    n += m
    lock_n.release()

threads = [threading.Thread(target=inc_n, args=(i,)) for i in range(1,11)]
[t.start() for t in threads]
[t.join() for t in threads]

print(n)
55
  • 由上可見,不同的執行緒之間共享運行環境(比如上面的變數n),
  • lock.acquire(blocking=True) 會一直阻塞直到鎖空出來為止;一旦空出來就會把它鎖上,

并發處理:協程

不同的程序在同一個執行緒內交替執行,每個協程在運行時獨占資源,一段運行結束后自阻塞,等待著被外部(如main函式)控制它的代碼喚醒,

相比多執行緒的優點:輕量級(在解釋器層面實作,不需要內核來做切換)、數量不限,

和多執行緒一樣,不同協程之間共用運行環境,

用簡單的生成器實作協程

def sum(init):
    s = init
    while True:
        delta = yield s # output s, and then input delta
        s += delta
        
g = sum(0)
print(next(g)) # run entil receiving the first output
print(g.send(1)) # send the first input, and then get the second output
print(g.send(2)) # send the second input, and then get the third output
0
1
3

上例中只是演示了生成器的自阻塞,以及生成器與其呼叫者之間的互動,

更進一步,還可以定義多個生成器執行不同的程序,并在main函式里進行對它們的調度(比如實作一個任務佇列),從而實作協程,

用回呼函式(callback)將普通函式變為協程

def calc(n,callback):
    r = 0
    for i in range(n):
        r += i
        callback()

def pause():
    print('pause')
    yield # just pause, do not return anything

g = calc(10,pause)

async/await實作協程

相比生成器實作的優點:可以在等待IO/等待網路通信等情況下時阻塞當前協程執行其他協程(而且不會中斷等待IO/通信)以節省時間(而只用生成器則無法做到);使用更靈活、方便,

  • 多執行緒其實也有前一個優點,所以CPython下的多執行緒也并不是毫無用處,但它的用處是協程用處的子集,
  • 一個注意事項:若想通過協程加速IO,必須使用python中專門的異步IO庫才行,

基礎使用

import time

start = time.perf_counter()

def sayhi(delay):
    time.sleep(delay)
    print(f'hi! at {time.perf_counter() - start}')

def main():
    sayhi(1)
    sayhi(2)

main()
hi! at 1.0040732999914326
hi! at 3.015253899997333
import time
import asyncio

start = time.perf_counter()

async def sayhi(delay):
    await asyncio.sleep(delay)
    print(f'hi! at {time.perf_counter() - start}')

async def main():
    sayhi1 = asyncio.create_task(sayhi(1))
    sayhi2 = asyncio.create_task(sayhi(2))
    await sayhi1
    await sayhi2

# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
hi! at 1.0037910000100965
hi! at 2.0026504999987083

上面的程式中:

  • async 宣告當前函式是一個協程,這一宣告使得函式體內可以使用create_taskawait,也使得該函式本身可以被create_taskawait
    • 一旦一個函式 f 被宣告為協程,f()做的事就不再是運行 f,而只是創建一個協程物件并回傳之(這個物件并不會自動被運行),需要使用 asyncio 中的相關工具來運行這個物件,

  • run(main()) 表示開始執行協程 main() ,要求 main() 必須是“主協程”,即它是整個程式中所有協程的入口(類似主函式),一個程式中 run(main()) 只應被呼叫一次,且在 main() 之外不應有任何協程被呼叫,
    • run(main()) 是阻塞的,協程的并發特性只有在main()內部才會顯現,從外部來看這就是一個普普通通的黑箱呼叫,
    • run()的作用是啟動 運行協程所需的環境(并在main()完成后將其關閉),但在IPython中,一開始運行就已經自動幫你啟動好了,所以可以直接用await(而且也不必把所有協程都放在一個主協程中,而可以散布在程式各處),

  • create_task(sayhi(1)) 表示為協程sayhi(1)在某個“任務池”中創建一個任務,并且開始執行該任務,回傳值是這個任務的handle,或者說“遙控器”,

    • 任務池中的任務會并發地執行,任務在何時可以中斷并切換到別的任務,這一點由await指定,
  • await sayhi1 有兩重含義:

      1. 阻塞當前協程(該陳述句所在的協程,這里是main())的執行,直到任務sayhi1完成,(類似Process.join()
      1. 告訴解釋器,現在當前協程(該陳述句所在的協程)開始阻塞,你可以切換協程了,
      • 如果這里await的不是sayhi1而是,比如說,一個接受http請求的操作,那么在解釋器切換協程后不會影響對這個請求的等待,這就是asyncio的強大之處,
        • 這一點在await asyncio.sleep(delay)就有體現,asyncio.sleep()就具有“切換協程不影響等待”的特性,
  • 關于await的幾件事:

    • await的可以不是已創建的任務而是一個協程物件(比如await sayhi(1)),此時不會將其加入任務池,而會直接開始執行(當然,也可能剛開始執行就被切換到別的協程,因為用了await),并一直阻塞直到完成,這會導致sayhi(1)無法作為一個任務、與其他任務平等地參與并發,但是它仍然可以隨著父協程(這里是main())的中斷和恢復而間接地參與并發,
    • 能夠被await的不只有協程物件和任務handle,還有任何awaitable object,即任何實作了__await__方法(從而告訴了解釋器如何在自己剛開始執行時就阻塞并切換協程,且不影響內部可能在進行的等待和其他操作)的物件,
    • await 的物件只可能在剛開始執行時立刻阻塞并切換協程,執行程序中其他可以阻塞的位置,是由這個物件內部使用的其他await陳述句指定的,而不是呼叫這個物件的那條await陳述句,
import time
import asyncio

start = time.perf_counter()

async def sayhi(delay):
    await asyncio.sleep(delay)
    print(f'hi! at {time.perf_counter() - start}')

async def main():
    await sayhi(1)
    await sayhi(2)

# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
hi! at 1.0072715999995125
hi! at 3.0168006000021705

wait_for()

await AA為任意awaitable)改成await asyncio.wait_for(A,timeout),就可以給await操作加上timeout秒的時限,一旦await了這么多秒還沒有結束,就會中斷A的執行并拋出asyncio.TimeoutError

不用關心wait_for具體做了什么,你只需要記住await asyncio.wait_for(A,timeout)這個句子就行,可以認為這個句子和await A在(除了timeout以外的)其他方面上沒有區別,下面是例子,

import time
import asyncio


async def eternity():
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
timeout!
import time
import asyncio

start = time.perf_counter()

async def sayhi(delay):
    await asyncio.sleep(delay)
    print(f'hi! at {time.perf_counter() - start}')

async def main():
    sayhi1 = asyncio.create_task(sayhi(1))
    sayhi2 = asyncio.create_task(sayhi(2))
    await asyncio.wait_for(sayhi1,1.05)
    await asyncio.wait_for(sayhi2,1.05)

# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
hi! at 1.0181081000046106
hi! at 2.0045300999918254
import time
import asyncio

start = time.perf_counter()

async def sayhi(delay):
    await asyncio.sleep(delay)
    print(f'hi! at {time.perf_counter() - start}')

async def main():
    sayhi1 = asyncio.create_task(sayhi(1))
    sayhi2 = asyncio.create_task(sayhi(2))
    await asyncio.wait_for(sayhi1,0.95)
    await asyncio.wait_for(sayhi2,1.05)

# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
---------------------------------------------------------------------------

TimeoutError                              Traceback (most recent call last)

<ipython-input-89-7f639d54114e> in <module>
     15 
     16 # asyncio.run(main()) # use outside IPython
---> 17 await main() # use inside IPython


<ipython-input-89-7f639d54114e> in main()
     11     sayhi1 = asyncio.create_task(sayhi(1))
     12     sayhi2 = asyncio.create_task(sayhi(2))
---> 13     await asyncio.wait_for(sayhi1,0.95)
     14     await asyncio.wait_for(sayhi2,1.05)
     15 


~\anaconda3\lib\asyncio\tasks.py in wait_for(fut, timeout, loop)
    488             # See https://bugs.python.org/issue32751
    489             await _cancel_and_wait(fut, loop=loop)
--> 490             raise exceptions.TimeoutError()
    491     finally:
    492         timeout_handle.cancel()


TimeoutError: 


hi! at 2.0194762000028277

另外,注意到即使協程sayhi1拋出了例外,父協程main()仍然能夠繼續執行sayhi2,可見不同協程間是有一定的獨立性的,

實作生產者-消費者協程

為此需要使用 asyncio.Queue ,它相比普通的佇列的區別是,其put/get操作會在無法執行時阻塞(這一點和multiprocessing.Queue很像),而且這些操作都是協程(注意到,這使得你呼叫它們時只會回傳協程物件而不會實際執行),可以await

import time
import asyncio

start = time.perf_counter()

async def producer(q):
    for i in range(5):
        await asyncio.sleep(1) # producing takes 1 sec
        await q.put(i) # will wait if q is full
        print(f'put {i} at {time.perf_counter() - start}')
    
    await q.join() # will wait until all objects produced are **taken out** and **consumed**.

async def consumer(q):
    for i in range(5):
        item = await q.get() # will wait if q is empty. BTW we see that "await XXX" is an expression not a command.
        print(f'get {item} at {time.perf_counter() - start}')
        await asyncio.sleep(1) # consuming takes 1 sec
        q.task_done() # tells the queue that [the object just taken out] has been consumed. just taking out is not enough!
        print(f'consumed {item} at {time.perf_counter() - start}')
    
async def main():
    q = asyncio.Queue()
    P = asyncio.create_task(producer(q))
    C = asyncio.create_task(consumer(q))
    await P
    await C
    print(f'done at {time.perf_counter() - start}')

# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
put 0 at 1.0108397000003606
get 0 at 1.0112231999955839
put 1 at 2.017216499996721
consumed 0 at 2.0176210000063293
get 1 at 2.0177472999930615
put 2 at 3.0279211000015493
consumed 1 at 3.0283254999958444
get 2 at 3.028457599997637
put 3 at 4.039952199993422
consumed 2 at 4.041183299996192
get 3 at 4.041302300000098
put 4 at 5.0465819999953965
consumed 3 at 5.04690839999239
get 4 at 5.047016099997563
consumed 4 at 6.047789799995371
done at 6.048323099996196
import time
import asyncio

start = time.perf_counter()

async def sleep_and_put(q):
    await asyncio.sleep(1)
    await q.put(1)

async def main():
    q = asyncio.Queue()
    C = asyncio.create_task(q.get())
    P = asyncio.create_task(sleep_and_put(q))
    await C
    await P
    print(f'finished at {time.perf_counter() - start}')

# asyncio.run(main()) # use outside IPython
await main() # use inside IPython
finished at 1.01112650000141

由上例可見,Queue.get()(其實Queue.put()等其他方法也一樣)是一個協程,因此也可以給它創建任務以進行并發,

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/278737.html

標籤:Python

上一篇:1.1Python起源

下一篇:Python系列爬蟲之批量下載網易云課堂視頻

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more