
文章目錄
- 探究低層建筑:asyncio
- 同步/異步
- 了解一下協程
- 相對于執行緒,協程的優勢
- 同步代碼轉異步代碼
- 通過asyncio講解協程
- 所以,代碼到底怎么寫?!!!
- 協程可以做哪些事?
- 定義協程函式:
- 驗證某函式是否協程函式:
- await是什么情況:
- 運行協程:
- 回呼
- 多協程
- 關倍訓圈
探究低層建筑:asyncio
Python由于全域鎖(GIL)的存在,一直無法發揮多核的優勢,其性能一直飽受詬病,
不過,在IO密集型的網路編程各種,異步處理比同步處理能夠提升非常之高的速度,
而相對于其他語言,Python還有一個很明顯的優勢,那就是它的庫很多啊!!!
Python3版本引入了async/await特性,其特點是:當執行程序中遇到IO請求的時候,可以將CPU資源出讓,運行其他的任務;待IO完成之后,繼續執行之前的任務,協程切換與執行緒切換比較類似,但協程切換更輕,不需要作業系統參與(沒有堆疊切換操作,也沒有用戶態與內核態切換),
同步/異步
在介紹協程之前,我還是再說一下同步和異步的概念,如果對這兩個概念都混淆不清的話,下面的更不用說了,
==同步:串行,異步:并行,==不要被字面意思所迷惑,
同步是指完成事務的邏輯,先執行第一個事務,如果阻塞了,會一直等待,直到這個事務完成,再執行第二個事務,順序執行,,,
異步是和同步相對的,異步是指在處理呼叫這個事務的之后,不會等待這個事務的處理結果,直接處理第二個事務去了,通過狀態、通知、回呼來通知呼叫者處理結果,
我再簡單的介紹一下協程:
了解一下協程
協程,英文Coroutines,是一種比執行緒更加輕量級的存在,正如一個行程可以擁有多個執行緒一樣,一個執行緒也可以擁有多個協程,

子程式,或者稱為函式,在所有語言中都是層級呼叫,比如A呼叫B,B在執行程序中又呼叫了C,C執行完畢回傳,B執行完畢回傳,最后是A執行完畢,
所以子程式呼叫是通過堆疊實作的,一個執行緒就是執行一個子程式,
子程式呼叫總是一個入口,一次回傳,呼叫順序是明確的,而協程的呼叫和子程式不同,
協程看上去也是子程式,但執行程序中,在子程式內部可中斷,然后轉而執行別的子程式,在適當的時候再回傳來接著執行,
注意,在一個子程式中中斷,去執行其他子程式,不是函式呼叫,有點類似CPU的中斷,比如子程式A、B:
def A():
print '1'
print '2'
print '3'
def B():
print 'x'
print 'y'
print 'z'
假設由協程執行,在執行A的程序中,可以隨時中斷,去執行B,B也可能在執行程序中中斷再去執行A,結果可能是:
1 x 2 y 3 z
但是在A中是沒有呼叫B的,所以協程的呼叫比函式呼叫理解起來要難一些,
相對于執行緒,協程的優勢
最大的優勢就是協程極高的執行效率,因為子程式切換不是執行緒切換,而是由程式自身控制,因此,沒有執行緒切換的開銷,和多執行緒比,執行緒數量越多,協程的性能優勢就越明顯,
第二大優勢就是不需要多執行緒的鎖機制,因為只有一個執行緒,也不存在同時寫變數沖突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多執行緒高很多,
因為協程是一個執行緒執行,那怎么利用多核CPU呢?最簡單的方法是多行程+協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的性能,
同步代碼轉異步代碼
以下為一段同步代碼:
import time
def hello():
time.sleep(1)
def run():
for i in range(5):
hello()
print('Hello World:%s' % time.time()) # 任何偉大的代碼都是從Hello World 開始的!
run()
以下是一段異步代碼:
import time
import asyncio
# 定義異步函式
async def hello():
asyncio.sleep(1)
print('Hello World:%s' % time.time())
def run():
for i in range(5):
loop.run_until_complete(hello())
loop = asyncio.get_event_loop()
run()
通過asyncio講解協程
- 通過async def來定義一個協程函式,通過await來執行一個協程物件,協程物件、協程函式的概念如下所示:
async def func_1(): # 1. 定義了一個協程函式
pass
async def func_2(): # 2. 注意要在函式內部呼叫協程函式,自身也必須定義為協程
# 3. func_1()呼叫產生了一個協程物件,通過await來執行這個協程,如果不加await,
# 直接以func_1()方式呼叫,則func_1中代碼并不會執行,
await func_1()
async def 用來定義異步函式,其內部有異步操作,每個執行緒有一個事件回圈,主執行緒呼叫asyncio.get_event_loop()時會創建事件回圈,你需要把異步的任務丟給這個回圈的run_until_complete()方法,事件回圈會安排協同程式的執行,
- 一般情況下,無法在一個非協程函式中阻塞地呼叫另一個協程,但你可以通過asyncio.ensure_future()來異步執行這個協程:
import asyncio
async def fun_1(): # 1. 定義了一個協程函式
pass
def bar():
asyncio.ensure_future(fun_1()) # 這里fun_1()將會在某個時間執行,具體執行順序未知
# 這里是阻塞執行fun_1(),但這種呼叫,只能在event loop進入回圈之前呼叫(loop.run_forever()),
# 否則會拋例外
asyncio.get_event_loop().run_until_complete(fun_1)
print("fun_1() is executed!")
在一些框架中,會將某些函式定義為協程(即通過async修飾),這些函式都是在某個地方通過create_task,或者ensure_future來進行調度的,
-
協程鎖:協程之間也可能會有資源共享沖突,要防止資源共享沖突產生的資料一致性問題,需要使用asyncio.Lock,asyncio.Lock也遵從背景關系管理協議,
-
協程睡眠:協程函式在執行中會占用本執行緒的全部CPU時間,除非遇到IO切換出去,因此,如果你在函式中使用sleep(),在多執行緒中,一個執行緒進入sleep狀態,作業系統會切換到其它執行緒執行,整個程式仍然是可回應的(除了該執行緒,它必須等待睡眠狀態結束);而對協程來說,同一loop中的其它協程都不會得到執行,因為這個sleep會占用本執行緒的全部執行時間,直到協程執行完畢,
上面的問題引出一個推論,也就是如果一個協程確實需要睡眠(比如某種定時任務),必須使用asyncio.sleep()
- 如果我們要通過asyncio來遠程呼叫一個服務,應該如何封裝呢?假設你使用的底層通訊的API是發送和接收分離的(一般比較靠近底層的API都是這樣設計的),那么你會面臨這樣的問題:當你通過異步請求(比如send)發出API request后,服務器的回應可能是通過on_message這樣的API來接收的,如何讓程式在呼叫send之后,就能得到(形式上)回傳結果,然后根據回傳結果繼續執行呢?
from typing import Dict
# 全域事件注冊表,鍵為外發請求的track_id,該track_id需要服務器在回應請求時傳回,
# 值為另一個dict,儲存著對應的asyncio.Event和網路請求的回傳結果,這里也可以使用list,
# 在強調性能的場合下,使用List[event: asyncio.Event, result: object]更好,
_events: Dict[str, Dict] = {}
# 定義阻塞呼叫的協程
async def sync_call(request):
event = asyncio.Event()
track_id = str(uuid.uuid4())
_events[track_id] = {
"events": event,
"result": None
}
# 發送網路請求,以下僅為示例,具體網路請求要根據業務具體場景來替換,這一步一般是立即回傳,
# 服務器并沒有來得及準備好response
await aiohttp.request(...)
# L1: 阻塞地等待事件結果,當框架(或者你的網路例程)收到服務器回傳結果時,根據track_id
# 找到對應的event,觸發之
await event.wait()
# 獲取結果,并做清理
response = _events[track_id].get("result")
_events.pop(track_id)
return response
# 在框架(或者你的網路例程)的訊息接收處,比如on_message函式體中:
async def on_message(response):
# 如果服務器不傳回track_id,則整個機制無法生效
track_id = response.get("track_id")
waited = _events.get(track_id)
if waited:
waited["result"] = response
waited["event"].set() # !這里喚醒在L1處等待執行的
不能再深挖了,畢竟大家都是第一次接觸這個模塊兒,
必須要再深挖,這里面包含了太多的后端設計思想,是一個很重要的模塊兒,
但是不是在這篇里面深挖,過幾天會再出一篇關于asyncio的底層原理的博客,歡迎大家關注,
所以,代碼到底怎么寫?!!!
我相信,看了這么久,還是沒有幾個人知道這玩意兒到底要怎么寫代碼,
說實話,換我看了這么多我也不知道啊,
沒事兒啊,重在理解嘛,是吧,

協程可以做哪些事?
* 等待一個 future 結束
* 等待另一個協程(產生一個結果,或引發一個例外)
* 產生一個結果給正在等它的協程
* 引發一個例外給正在等它的協程
定義協程函式:
async def do_some_work(x): pass
驗證某函式是否協程函式:
print(asyncio.iscoroutinefunction(do_some_work)) # True
await是什么情況:
async def do_some_work(x):
print("Waiting " + str(x))
await asyncio.sleep(x)
asyncio.sleep 也是一個協程,所以 await asyncio.sleep(x) 就是等待另一個協程,
看一下檔案解釋:
sleep(delay, result=None, *, loop=None)
Coroutine that completes after a given time (in seconds)
看不懂沒關系,我現在也不懂,誒,就是玩兒,
運行協程:
呼叫協程函式,協程并不會開始運行,只是回傳一個協程物件,還會引發一條警告,
要讓這個協程物件運行的話,有兩種方式:
* 在另一個已經運行的協程中用 `await` 等待它
* 通過 `ensure_future` 函式計劃它的執行
下面先拿到當前執行緒預設的 loop ,然后把協程物件交給 loop.run_until_complete,協程物件隨后會在 loop 里得到運行,
loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_work(5))
# 上面這行代碼屬于簡寫,完整寫法是這樣的:
# loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))
# run_until_complete 的引數是一個 future,它在內部會通過 ensure_future 函式把協程物件包裝成了 future,
接下來就比較抽象了,需要一定的基礎了,
回呼
假如協程是一個 IO 的讀操作,我們希望知道它什么時候結束運行,以便下一步資料的處理,這一需求可以通過往 future 添加回呼來實作,
def done_callback(futu):
print('Done')
futu = asyncio.ensure_future(do_some_work(3))
futu.add_done_callback(done_callback)
loop.run_until_complete(futu)
多協程
為了把多個協程交給 loop,需要借助 asyncio.gather 函式,
方法一:
loop.run_until_complete(asyncio.gather(do_some_work(3), do_some_work(5)))
方法二:
先把協程存在串列里
coros = [do_some_work(1), do_some_work(3)]
loop.run_until_complete(asyncio.gather(*coros))
這兩個協程是并發運行的,所以等待的時間不是 1 + 3 = 4 秒,而是以耗時較長的那個協程為準,
關倍訓圈
loop 只要不關閉,就還可以再運行,但是如果關閉了,就不能再運行了,
建議呼叫 loop.close,以徹底清理 loop 物件防止誤用,
這一篇就先到這里啦,至于asyncio再往底層走,這周會更新的啦,能看到這里的小伙伴不容易,需要多大的毅力啊,
不準備收藏一下嗎?一次看這么多,怕是很難一次性消化掉吧,


轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/278870.html
標籤:python
上一篇:python檔案名批量重命名腳本
