在 python 爬蟲中使用協程,能大幅度提高對目標站點的采集效率,所以我們要反復學習本概念,并將其用在爬蟲案例中,
協程的定義
有了兩篇文章的鋪墊,現在定義一個協程應該是非常簡單的了,在一個函式前面增加 async 關鍵字,函式就變成了協程,你可以直接通過 isinstance 函式,驗證其型別,
from collections.abc import Coroutine
async def func():
print("我是協程函式")
if __name__ == '__main__':
# 創建協程物件,注意協程物件不會運行函式內代碼,即不會輸出任何資訊
coroutine = func()
# 型別判斷
print(isinstance(coroutine, Coroutine))
代碼輸入如下內容:
True
sys:1: RuntimeWarning: coroutine 'func' was never awaited
通過型別判斷,得到添加 async 關鍵字的函式是協程型別,下面的警告暫時忽略,原始是該協程沒有被注冊到事件回圈中并得到呼叫,
使用協程
本次依舊采用一個爬蟲案例學習協程,目標站點為 http://banan.huiben.61read.com/,該站點為中少繪本網站,它是中國少年兒童新聞出版總社旗下的繪本網站,網站有大量兒童繪本影片,并且無廣告,影片都是 MP4 格式,便于下載,
import asyncio
import requests
# 協程函式
async def get_html():
res = requests.get("http://banan.huiben.61read.com/Video/List/1d4a3be3-0a72-4260-979b-743d9db8ad85")
if res is not None:
return res.status_code
else:
return None
# 宣告協程物件
coroutine = get_html()
# 事件回圈物件
loop = asyncio.get_event_loop()
# 將協程轉換為任務
task = loop.create_task(coroutine)
# task = asyncio.ensure_future(coroutine) # 使用該方法,也可以將協程轉換為任務
# 將 task 任務放入事件回圈中并呼叫
loop.run_until_complete(task)
# 輸出結果
print("結果輸出",task.result())
也可以對上述代碼進行改造,python3.7 之后,可以使用 asyncio.run() 方法來運行最高層級的入口函式,
import asyncio
import requests
# 協程函式
async def get_html():
res = requests.get("http://banan.huiben.61read.com/Video/List/1d4a3be3-0a72-4260-979b-743d9db8ad85")
if res is not None:
print(res.status_code)
else:
return None
async def main():
await get_html()
# 宣告協程物件
coroutine = get_html()
asyncio.run(main())
接下來參考上述代碼,實作對兩個 MP4 視頻的下載,
# http://static.61read.com/flipbooks/huiben/huazhuangwuhui/web/1.mp4
# http://static.61read.com/flipbooks/huiben/jingubanghexiaofengche/web/1.mp4
import asyncio
import time
import requests
async def requests_get(url):
headers = {
"Referer": "http://banan.huiben.61read.com/",
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Safari/537.36"
}
try:
res = requests.get(url, headers=headers)
return res
except Exception as e:
print(e)
return None
async def get_video(url):
res = await requests_get(url)
if res is not None:
with open(f'./mp4/{time.time()}.mp4', "wb") as f:
f.write(res.content)
async def main():
start_time = time.perf_counter()
# http://static.61read.com/flipbooks/huiben/huazhuangwuhui/web/1.mp4
# http://static.61read.com/flipbooks/huiben/jingubanghexiaofengche/web/1.mp4
await get_video("http://static.61read.com/flipbooks/huiben/huazhuangwuhui/web/1.mp4")
await get_video("http://static.61read.com/flipbooks/huiben/jingubanghexiaofengche/web/1.mp4")
print("代碼運行時間:", time.perf_counter() - start_time)
if __name__ == '__main__':
asyncio.run(main())
測驗在上述代碼下,下載兩個視頻耗時 44S(不同電腦與網速時間不同),
使用 asyncio.create_task() 函式用來并發運行多個協程
繼續修改代碼,優化執行時間,
import asyncio
import time
import requests
async def requests_get(url):
headers = {
"Referer": "http://banan.huiben.61read.com/",
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Safari/537.36"
}
try:
res = requests.get(url, headers=headers)
return res
except Exception as e:
print(e)
return None
async def get_video(url):
res = await requests_get(url)
if res is not None:
with open(f'./mp4/{time.time()}.mp4', "wb") as f:
f.write(res.content)
async def main():
start_time = time.perf_counter()
# http://static.61read.com/flipbooks/huiben/huazhuangwuhui/web/1.mp4
# http://static.61read.com/flipbooks/huiben/jingubanghexiaofengche/web/1.mp4
task1 = asyncio.create_task(
get_video("http://static.61read.com/flipbooks/huiben/huazhuangwuhui/web/1.mp4"))
task2 = asyncio.create_task(
get_video("http://static.61read.com/flipbooks/huiben/jingubanghexiaofengche/web/1.mp4"))
await task1
await task2
print("代碼運行時間:", time.perf_counter() - start_time)
if __name__ == '__main__':
asyncio.run(main())
代碼運行時間為 27S,可以看到效率得到了提升,
在正式分析上述代碼前,先學習一個 可等待物件 概念
可等待物件
可以在 await 陳述句中使用的物件,就是可等待物件,可等待物件有三種主要型別:協程,任務,Future
協程在 python 中一定要區分好,協程函式與協程物件,后者是前者所回傳的物件,
創建任務
asyncio.create_task(coro, *, name=None) 為創建任務物件并調度其執行,引數1為協程物件,引數2為任務名稱,該函式是 python3.7 之后加入的,如果使用之前的版本,請使用 asyncio.ensure_future() 函式,
并發運行任務
函式原型如下所示:
asyncio.gather(*aws, loop=None, return_exceptions=False) -> awaitable
并發運行序列中的可等待物件,如果
aws中的某個可等待物件為協程,它將自動被作為一個任務調度,
return_exceptions 引數說明:
return_exceptions為 False (默認),所引發的首個例外會立即傳播給等待gather()的任務,aws 序列中的其他可等待物件不會被取消并將繼續運行;return_exceptions為 True,例外會和成功的結果一樣處理,并聚合至結果串列,
如果 gather() 被取消,所有被提交 (尚未完成) 的可等待物件也會被取消,
簡單等待
函式原型如下:
asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED) -> coroutine
并發運行
aws指定的可等待物件并阻塞執行緒直到滿足 return_when 指定的條件,
如果 aws(上述引數) 中的某個可等待物件為協程,它將自動作為任務加入日程,直接向 wait() 傳入協程物件已棄用,
該函式回傳兩個 Task/Future 集合,一般寫作 (done, pending),
return_when 指定此函式應在何時回傳,它必須為以下常數之一:
FIRST_COMPLETED:函式將在任意可等待物件結束或取消時回傳;FIRST_EXCEPTION:函式將在任意可等待物件因引發例外而結束時回傳,當沒有引發任何例外時它就相當于 ALL_COMPLETED;ALL_COMPLETED:函式將在所有可等待物件結束或取消時回傳,
與 wait() 方法類似的一個方法是 wait_for,該方法原型如下:
asyncio.wait_for(aw, timeout, *, loop=None) -> coroutine
等待 aw 可等待物件 完成,指定 timeout 秒數后超時,
這個函式可以傳遞協程,如果發生超時,任務取消并引發 asyncio.TimeoutError,
wait() 與 wait_for() 的區別是:wait() 在超時發生時不會取消可等待物件,
系結回呼函式
異步I/O的實作原理,就是在I/O操作的地方掛起程式,等I/O結束后,再繼續執行,
撰寫爬蟲程式,很多時候都要依賴IO的回傳值,這就要用到回呼了,
同步編程實作回呼
直接在
await前宣告變數,獲取回呼值
import asyncio
import time
import requests
async def requests_get(url):
headers = {
"Referer": "http://banan.huiben.61read.com/",
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Safari/537.36"
}
try:
res = requests.get(url, headers=headers)
return res
except Exception as e:
print(e)
return None
async def get_video(url):
res = await requests_get(url)
if res is not None:
with open(f'./mp4/{time.time()}.mp4', "wb") as f:
f.write(res.content)
return (url,"success")
else:
return None
async def main():
start_time = time.perf_counter()
# http://static.61read.com/flipbooks/huiben/huazhuangwuhui/web/1.mp4
# http://static.61read.com/flipbooks/huiben/jingubanghexiaofengche/web/1.mp4
task1 = asyncio.create_task(
get_video("http://static.61read.com/flipbooks/huiben/huazhuangwuhui/web/1.mp4"))
task2 = asyncio.create_task(
get_video("http://static.61read.com/flipbooks/huiben/jingubanghexiaofengche/web/1.mp4"))
# 同步回呼方法
ret1 = await task1
ret2 = await task2
print(ret1,ret2)
print("代碼運行時間:", time.perf_counter() - start_time)
if __name__ == '__main__':
asyncio.run(main())
通過 asyncio 添加回呼函式功能來實作
用到的方式是
add_done_callback,添加一個回呼,該回呼將在 Task 物件完成時被運行,與之對應的是移除回呼函式,remove_done_callback,
import asyncio
import time
import requests
async def requests_get(url):
headers = {
"Referer": "http://banan.huiben.61read.com/",
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Safari/537.36"
}
try:
res = requests.get(url, headers=headers)
return res
except Exception as e:
print(e)
return None
async def get_video(url):
res = await requests_get(url)
if res is not None:
with open(f'./mp4/{time.time()}.mp4', "wb") as f:
f.write(res.content)
return (url, "success")
else:
return None
async def main():
start_time = time.perf_counter()
# http://static.61read.com/flipbooks/huiben/huazhuangwuhui/web/1.mp4
# http://static.61read.com/flipbooks/huiben/jingubanghexiaofengche/web/1.mp4
task1 = asyncio.create_task(
get_video("http://static.61read.com/flipbooks/huiben/huazhuangwuhui/web/1.mp4"))
task1.add_done_callback(callback)
task2 = asyncio.create_task(
get_video("http://static.61read.com/flipbooks/huiben/jingubanghexiaofengche/web/1.mp4"))
task2.add_done_callback(callback)
# 同步回呼方法
await task1
await task2
print("代碼運行時間:", time.perf_counter() - start_time)
def callback(future):
print('回呼函式,回傳結果是:', future.result())
if __name__ == '__main__':
asyncio.run(main())
本節課的爬蟲案例
本節課爬蟲由于涉及很多 MP4 視頻,完整代碼在 codechina 下載,主要思路整理如下,
第一步:獲取所有串列頁的地址
具體資料位置如下所示,由于資料都在一個頁面中,顧獲取方式比較簡單,直接決議網頁即可,

第二步:獲取視頻下載地址
通過下述流程獲取視頻地址,在查找程序中發現,視頻縮略圖的地址與視頻播放器地址存在一定的規律,如下所示:
# 視頻縮略圖地址
http://static.61read.com/flipbooks/huiben/chudiandetouyuzei/cover.jpg
# 視頻地址
http://static.61read.com/flipbooks/huiben/chudiandetouyuzei/web/1.mp4
即去除 cover.jpg,替換為 web/1.mp4,這樣大幅度降低我們獲取視頻的層級了,

第三步:撰寫代碼下載視頻
import asyncio
import time
import requests
from bs4 import BeautifulSoup
import lxml
BASE_URL = "http://banan.huiben.61read.com"
async def requests_get(url):
headers = {
"Referer": "http://banan.huiben.61read.com/",
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/94.0.4606.54 Safari/537.36"
}
try:
res = requests.get(url, headers=headers)
return res
except Exception as e:
print(e)
return None
async def get_video(name, url):
res = await requests_get(url)
if res is not None:
with open(f'./mp4/{name}.mp4', "wb") as f:
f.write(res.content)
return (name, url, "success")
else:
return None
async def get_list_url():
"""獲取串列頁地址"""
res = await requests_get("http://banan.huiben.61read.com/")
soup = BeautifulSoup(res.text, "lxml")
all_a = []
for ul in soup.find_all(attrs={'class', 'inline'}):
all_a.extend(BASE_URL + _['href'] for _ in ul.find_all('a'))
return all_a
async def get_mp4_url(url):
"""獲取MP4地址"""
res = await requests_get(url)
soup = BeautifulSoup(res.text, "lxml")
mp4s = []
for div_tag in soup.find_all(attrs={'class', 'item_list'}):
# 獲取圖片縮略圖
src = div_tag.a.img['src']
# 將縮略圖地址替換為 mp4 視頻地址
src = src.replace('cover.jpg', 'web/1.mp4').replace('cover.png', 'web/1.mp4')
name = div_tag.div.a.text.strip()
mp4s.append((src, name))
return mp4s
async def main():
# 獲取串列頁地址任務
task_list_url = asyncio.create_task(get_list_url())
all_a = await task_list_url
# 創建任務串列
tasks = [asyncio.ensure_future(get_mp4_url(url)) for url in all_a]
# 添加回呼函式
# ret = map(lambda x: x.add_done_callback(callback), tasks)
# 異步執行
dones, pendings = await asyncio.wait(tasks)
all_mp4 = []
for task in dones:
all_mp4.extend(task.result())
# 獲取到所有的MP4地址
totle = len(all_mp4)
print("累計獲取到【", totle, "】個視頻")
print("_" * 100)
print("準備下載視頻")
# 每次下載10個
totle_page = totle // 10 if totle % 10 == 0 else totle // 10 + 1
# print(totle_page)
for page in range(0, totle_page):
print("正在下載第{}頁視頻".format(page + 1))
start_page = 0 if page == 0 else page * 10
end_page = (page + 1) * 10
print("待下載地址")
print(all_mp4[start_page:end_page])
mp4_download_tasks = [asyncio.ensure_future(get_video(name, url)) for url, name in all_mp4[start_page:end_page]]
mp4_dones, mp4_pendings = await asyncio.wait(mp4_download_tasks)
for task in mp4_dones:
print(task.result())
if __name__ == '__main__':
asyncio.run(main())

寫在后面
如需完整代碼,請查看評論區置頂評論,
今天是持續寫作的第 243 / 365 天,
期待 關注,點贊、評論、收藏,
更多精彩
《爬蟲 100 例,專欄銷售中,買完就能學會系列專欄》

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/325507.html
標籤:其他
