python 中協程概念是從 3.4 版本增加的,但 3.4 版本采用是生成器實作,為了將協程和生成器的使用場景進行區分,使語意更加明確,在 python 3.5 中增加了 async 和 await 關鍵字,用于定義原生協程,
asyncio 異步 I/O 庫
python 中的 asyncio 庫提供了管理事件、協程、任務和執行緒的方法,以及撰寫并發代碼的原語,即 async 和 await,
該模塊的主要內容:
事件回圈:event_loop,管理所有的事件,是一個無限回圈方法,在回圈程序中追蹤事件發生的順序將它們放在佇列中,空閑時則呼叫相應的事件處理者來處理這些事件;協程:coroutine,子程式的泛化概念,協程可以在執行期間暫停,等待外部的處理(I/O 操作)完成之后,再從暫停的地方繼續運行,函式定義式使用async關鍵字,這樣這個函式就不會立即執行,而是回傳一個協程物件;Future和Task:Future物件表示尚未完成的計算,Task是Future的子類,包含了任務的各個狀態,作用是在運行某個任務的同時可以并發的運行多個任務,
異步函式的定義
異步函式本質上依舊是函式,只是在執行程序中會將執行權交給其它協程,與普通函式定義的區別是在 def 關鍵字前增加 async,
# 異步函式
import asyncio
# 異步函式
async def func(x):
print("異步函式")
return x ** 2
ret = func(2)
print(ret)
運行代碼輸入如下內容:
sys:1: RuntimeWarning: coroutine 'func' was never awaited
<coroutine object func at 0x0000000002C8C248>
函式回傳一個協程物件,如果想要函式得到執行,需要將其放到事件回圈 event_loop 中,
事件回圈 event_loop
event_loop 是 asyncio 模塊的核心,它將異步函式注冊到事件回圈上,
程序實作方式為:由 loop 在適當的時候呼叫協程,這里使用的方式名為 asyncio.get_event_loop(),然后由 run_until_complete(協程物件) 將協程注冊到事件回圈中,并啟動事件回圈,
import asyncio
# 異步函式
async def func(x):
print("異步函式")
return x ** 2
# 協程物件,該物件不能直接運行
coroutine1 = func(2)
# 事件回圈物件
loop = asyncio.get_event_loop()
# 將協程物件加入到事件回圈中,并執行
ret = loop.run_until_complete(coroutine1)
print(ret)
首先在 python 3.7 之前的版本中使用異步函式是安裝上述流程:
- 先通過
asyncio.get_event_loop()獲取事件回圈loop物件; - 然后通過不同的策略呼叫
loop.run_until_complete()或者loop.run_forever()執行異步函式,
在 python 3.7 之后的版本,直接使用 asyncio.run() 即可,該函式總是會創建一個新的事件回圈并在結束時進行關閉,
最新的官方檔案 都采用的是 run 方法,
官方案例
import asyncio
async def main():
print('hello')
await asyncio.sleep(1)
print('world')
asyncio.run(main())
接下來在查看一個完整的案例,并且結合 await 關鍵字,
import asyncio
import time
# 異步函式1
async def task1(x):
print("任務1")
await asyncio.sleep(2)
print("恢復任務1")
return x
# 異步函式2
async def task2(x):
print("任務2")
await asyncio.sleep(1)
print("恢復任務2")
return x
async def main():
start_time = time.perf_counter()
ret_1 = await task1(1)
ret_2 = await task2(2)
print("任務1 回傳的值是", ret_1)
print("任務2 回傳的值是", ret_2)
print("運行時間", time.perf_counter() - start_time)
if __name__ == '__main__':
# 創建一個事件回圈
loop = asyncio.get_event_loop()
# 將協程物件加入到事件回圈中,并執行
loop.run_until_complete(main())
代碼輸出如下所示:
任務1
恢復任務1
任務2
恢復任務2
任務1 回傳的值是 1
任務2 回傳的值是 2
運行時間 2.99929154
上述代碼創建了 3 個協程,其中 task1 和 task2 都放在了協程函式 main 中,I/O 操作通過 asyncio.sleep(1) 進行模擬,整個函式運行時間為 2.9999 秒,接近 3 秒,依舊是串行進行,如果希望修改為并發執行,將代碼按照下述進行修改,
import asyncio
import time
# 異步函式1
async def task1(x):
print("任務1")
await asyncio.sleep(2)
print("恢復任務1")
return x
# 異步函式2
async def task2(x):
print("任務2")
await asyncio.sleep(1)
print("恢復任務2")
return x
async def main():
start_time = time.perf_counter()
ret_1,ret_2 = await asyncio.gather(task1(1),task2(2))
print("任務1 回傳的值是", ret_1)
print("任務2 回傳的值是", ret_2)
print("運行時間", time.perf_counter() - start_time)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
上述代碼最大的變化是將 task1 和 task2 放到了 asyncio.gather() 中運行,此時代碼輸出時間明顯變短,
任務1
任務2
恢復任務2 # 任務2 由于等待時間短,先回傳,
恢復任務1
任務1 回傳的值是 1
任務2 回傳的值是 2
運行時間 2.0005669480000003
asyncio.gather() 可以更換為 asyncio.wait() ,修改代碼如下所示:
import asyncio
import time
# 異步函式1
async def task1(x):
print("任務1")
await asyncio.sleep(2)
print("恢復任務1")
return x
# 異步函式2
async def task2(x):
print("任務2")
await asyncio.sleep(1)
print("恢復任務2")
return x
async def main():
start_time = time.perf_counter()
done, pending = await asyncio.wait([task1(1), task2(2)])
print(done)
print(pending)
print("運行時間", time.perf_counter() - start_time)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
asyncio.wait() 回傳一個元組,其中包含一個已經完成的任務集合,一個未完成任務的集合,
gather 和 wait 的區別
gather:需要所有任務都執行結束,如果任意一個協程函式崩潰了,都會拋例外,不會回傳結果;wait:可以定義函式回傳的時機,可以設定為FIRST_COMPLETED(第一個結束的),FIRST_EXCEPTION(第一個出現例外的),ALL_COMPLETED(全部執行完,默認的),
done,pending = await asyncio.wait([task1(1),task2(2)],return_when=asyncio.tasks.FIRST_EXCEPTION)
創建 task
由于協程物件不能直接運行,在注冊到事件回圈時,是 run_until_complete 方法將其包裝成一個 task 物件,該物件是對 coroutine 物件的進一步封裝,它比 coroutine 物件多了運行狀態,例如 pending,running,finished,可以利用這些狀態獲取協程物件的執行情況,
下面顯示的將 coroutine 物件封裝成 task 物件,在上述代碼基礎上進行修改,
import asyncio
import time
# 異步函式1
async def task1(x):
print("任務1")
await asyncio.sleep(2)
print("恢復任務1")
return x
# 異步函式2
async def task2(x):
print("任務2")
await asyncio.sleep(1)
print("恢復任務2")
return x
async def main():
start_time = time.perf_counter()
# 封裝 task 物件
coroutine1 = task1(1)
task_1 = loop.create_task(coroutine1)
coroutine2 = task2(2)
task_2 = loop.create_task(coroutine2)
ret_1, ret_2 = await asyncio.gather(task_1, task_2)
print("任務1 回傳的值是", ret_1)
print("任務2 回傳的值是", ret_2)
print("運行時間", time.perf_counter() - start_time)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
由于 task 物件是 future 物件的子類物件,所以上述代碼也可以按照下述內容修改:
# task_2 = loop.create_task(coroutine2)
task_2 = asyncio.ensure_future(coroutine2)
下面將 task 物件的各個狀態進行列印輸出,
import asyncio
import time
# 異步函式1
async def task1(x):
print("任務1")
await asyncio.sleep(2)
print("恢復任務1")
return x
# 異步函式2
async def task2(x):
print("任務2")
await asyncio.sleep(1)
print("恢復任務2")
return x
async def main():
start_time = time.perf_counter()
# 封裝 task 物件
coroutine1 = task1(1)
task_1 = loop.create_task(coroutine1)
coroutine2 = task2(2)
# task_2 = loop.create_task(coroutine2)
task_2 = asyncio.ensure_future(coroutine2)
# 進入 pending 狀態
print(task_1)
print(task_2)
# 獲取任務的完成狀態
print(task_1.done(), task_2.done())
# 執行任務
await task_1
await task_2
# 再次獲取完成狀態
print(task_1.done(), task_2.done())
# 獲取回傳結果
print(task_1.result())
print(task_2.result())
print("運行時間", time.perf_counter() - start_time)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
await task_1 表示的是執行該協程,執行結束之后,task.done() 回傳 True,task.result() 獲取回傳值,
回呼回傳值
當協程執行完畢,需要獲取其回傳值,剛才已經演示了一種辦法,使用 task.result() 方法獲取,但是該方法僅當協程運行完畢時,才能獲取結果,如果協程沒有運行完畢,result() 方法會回傳 asyncio.InvalidStateError(無效狀態錯誤),
一般編碼都采用第二種方案,通過 add_done_callback() 方法系結回呼,
import asyncio
import requests
async def request_html():
url = 'https://www.csdn.net'
res = requests.get(url)
return res.status_code
def callback(task):
print('回呼:', task.result())
loop = asyncio.get_event_loop()
coroutine = request_html()
task = loop.create_task(coroutine)
# 系結回呼
task.add_done_callback(callback)
print(task)
print("*"*100)
loop.run_until_complete(task)
print(task)
上述代碼當 coroutine 執行完畢時,會呼叫 callback 函式,
如果回呼函式需要多個引數,請使用 functools 模塊中的偏函式(partial)方法
回圈事件關閉
建議每次編碼結束之后,都呼叫回圈事件物件 close() 方法,徹底清理 loop 物件,
本節課爬蟲專案
本節課要采集的站點由于全部都是 coser 圖片,所以地址在代碼中查看即可,
完整代碼如下所示:
import threading
import asyncio
import time
import requests
import lxml
from bs4 import BeautifulSoup
async def get(url):
return requests.get(url)
async def get_html(url):
print("準備抓取:", url)
res = await get(url)
return res.text
async def save_img(img_url):
# thumbMid_5ae3e05fd3945 將小圖替換為大圖
img_url = img_url.replace('thumb','thumbMid')
img_url = "http://mycoser.com/" + img_url
print("圖片下載中:", img_url)
res = await get(img_url)
if res is not None:
with open(f'./imgs/{time.time()}.jpg', 'wb') as f:
f.write(res.content)
return img_url,"ok"
async def main(url_list):
# 創建 5 個任務
tasks = [asyncio.ensure_future(get_html(url_list[_])) for _ in range(len(url_list))]
dones, pending = await asyncio.wait(tasks)
for task in dones:
html = task.result()
soup = BeautifulSoup(html, 'lxml')
divimg_tags = soup.find_all(attrs={'class': 'workimage'})
for div in divimg_tags:
ret = await save_img(div.a.img["data-original"])
print(ret)
if __name__ == '__main__':
urls = [f"http://mycoser.com/picture/lists/p/{page}" for page in range(1, 17)]
totle_page = len(urls) // 5 if len(urls) % 5 == 0 else len(urls) // 5 + 1
# 對 urls 串列進行切片,方便采集
for page in range(0, totle_page):
start_page = 0 if page == 0 else page * 5
end_page = (page + 1) * 5
# 回圈事件物件
loop = asyncio.get_event_loop()
loop.run_until_complete(main(urls[start_page:end_page]))
代碼說明
上述代碼中第一個要注意的是 await 關鍵字后面只能跟如下內容:
- 原生的協程物件;
- 一個包含
await方法的物件回傳的一個迭代器,
所以上述代碼 get_html 函式中嵌套了一個協程 get,主函式 main 里面為了運算方便,直接對 urls 進行了切片,然后通過回圈進行運行,
當然上述代碼的最后兩行,可以直接修改為:
# 回圈事件物件
# loop = asyncio.get_event_loop()
#
# loop.run_until_complete(main(urls[start_page:end_page]))
asyncio.run(main(urls[start_page:end_page]))
輕松獲取一堆高清圖片,

寫在后面
協程掌握了,python 爬蟲之路就開啟了,
今天是持續寫作的第 242 / 365 天,
期待 關注,點贊、評論、收藏,
更多精彩
《爬蟲 100 例,專欄銷售中,買完就能學會系列專欄》

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/323455.html
標籤:python
