鋼鐵知識庫,一個學習python爬蟲、資料分析的知識庫,人生苦短,快用python,
之前我們使用requests庫爬取某個站點的時候,每發出一個請求,程式必須等待網站回傳回應才能接著運行,而在整個爬蟲程序中,整個爬蟲程式是一直在等待的,實際上沒有做任何事情,
像這種占用磁盤/記憶體IO、網路IO的任務,大部分時間是CPU在等待的操作,就叫IO密集型任務,對于這種情況有沒有優化方案呢,當然有,那就是使用aiohttp庫實作異步爬蟲,
aiohttp是什么
我們在使用requests請求時,只能等一個請求先出去再回來,才會發送下一個請求,明顯效率不高阿,這時候如果換成異步請求的方式,就不會有這個等待,一個請求發出去,不管這個請求什么時間回應,程式通過await掛起協程物件后直接進行下一個請求,
解決方法就是通過 aiohttp + asyncio,什么是aiohttp?一個基于 asyncio 的異步 HTTP 網路模塊,可用于實作異步爬蟲,速度明顯快于 requests 的同步爬蟲,
requests和aiohttp區別
區別就是一個同步一個是異步,話不多說直接上代碼看效果,
安裝aiohttp
pip install aiohttp
- requests同步示例:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# author: 鋼鐵知識庫
import time
import requests
# 同步請求
def main():
start = time.time()
for i in range(5):
res = requests.get('http://httpbin.org/delay/2')
print(f'當前時間:{datetime.datetime.now()}, status_code = {res.status_code}')
print(f'requests同步耗時:{time.time() - start}')
if __name__ == '__main__':
main()
'''
當前時間:2022-09-05 15:44:51.991685, status_code = 200
當前時間:2022-09-05 15:44:54.528918, status_code = 200
當前時間:2022-09-05 15:44:57.057373, status_code = 200
當前時間:2022-09-05 15:44:59.643119, status_code = 200
當前時間:2022-09-05 15:45:02.167362, status_code = 200
requests同步耗時:12.785893440246582
'''
可以看到5次請求總共用12.7秒,再來看同樣的請求異步多少時間,
- aiohttp異步示例:
#!/usr/bin/env python
# file: day6-9同步和異步.py
# author: 鋼鐵知識庫
import asyncio
import time
import aiohttp
async def async_http():
# 宣告一個支持異步的背景關系管理器
async with aiohttp.ClientSession() as session:
res = await session.get('http://httpbin.org/delay/2')
print(f'當前時間:{datetime.datetime.now()}, status_code = {res.status}')
tasks = [async_http() for _ in range(5)]
start = time.time()
# Python 3.7 及以后,不需要顯式宣告事件回圈,可以使用 asyncio.run()來代替最后的啟動操作
asyncio.run(asyncio.wait(tasks))
print(f'aiohttp異步耗時:{time.time() - start}')
'''
當前時間:2022-09-05 15:42:32.363966, status_code = 200
當前時間:2022-09-05 15:42:32.366957, status_code = 200
當前時間:2022-09-05 15:42:32.374973, status_code = 200
當前時間:2022-09-05 15:42:32.384909, status_code = 200
當前時間:2022-09-05 15:42:32.390318, status_code = 200
aiohttp異步耗時:2.5826876163482666
'''
兩次對比可以看到執行程序,時間一個是順序執行,一個是同時執行,這就是同步和異步的區別,
aiohttp使用介紹
接下來我們會詳細介紹aiohttp庫的用法和爬取實戰,aiohttp 是一個支持異步請求的庫,它和 asyncio 配合使用,可以使我們非常方便地實作異步請求操作,asyncio模塊,其內部實作了對TCP、UDP、SSL協議的異步操作,但是對于HTTP請求,就需要aiohttp實作了,
aiohttp分為兩部分,一部分是Client,一部分是Server,下面來說說aiohttp客戶端部分的用法,
基本實體
先寫一個簡單的案例
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : 鋼鐵知識庫
import asyncio
import aiohttp
async def get_api(session, url):
# 宣告一個支持異步的背景關系管理器
async with session.get(url) as response:
return await response.text(), response.status
async def main():
async with aiohttp.ClientSession() as session:
html, status = await get_api(session, 'http://httpbin.org/delay/2')
print(f'html: {html[:50]}')
print(f'status : {status}')
if __name__ == '__main__':
# Python 3.7 及以后,不需要顯式宣告事件回圈,可以使用 asyncio.run(main())來代替最后的啟動操作
asyncio.get_event_loop().run_until_complete(main())
'''
html: {
"args": {},
"data": "",
"files": {},
status : 200
Process finished with exit code 0
'''
aiohttp請求的方法和之前有明顯區別,主要包括如下幾點:
- 除了匯入aiohttp庫,還必須引入asyncio庫,因為要實作異步,需要啟動協程,
- 異步的方法定義不同,前面都要統一加async來修飾,
- with as用于宣告背景關系管理器,幫我們自動分配和釋放資源,加上async代碼支持異步,
- 對于回傳協程物件的操作,前面需要加await來修飾,response.text()回傳的是協程物件,
- 最后運行啟用回圈事件
注意:Python3.7及以后的版本中,可以使用asyncio.run(main())代替最后的啟動操作,
URL引數設定
對于URL引數的設定,我們可以借助params設定,傳入一個字典即可,實體如下:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : 鋼鐵知識庫
import aiohttp
import asyncio
async def main():
params = {'name': '鋼鐵知識庫', 'age': 23}
async with aiohttp.ClientSession() as session:
async with session.get('https://www.httpbin.org/get', params=params) as res:
print(await res.json())
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
'''
{'args': {'age': '23', 'name': '鋼鐵知識庫'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-63162e34-1acf7bde7a6d801368494c72'}, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/get?name=鋼鐵知識庫&age=23'}
'''
可以看到實際請求的URL后面帶了后綴,這就是params的內容,
請求型別
除了get請求,aiohttp還支持其它請求型別,如POST、PUT、DELETE等,和requests使用方式類似,
session.post('http://httpbin.org/post', data=https://www.cnblogs.com/jiba/archive/2022/09/09/b'data')
session.put('http://httpbin.org/put', data=https://www.cnblogs.com/jiba/archive/2022/09/09/b'data')
session.delete('http://httpbin.org/delete')
session.head('http://httpbin.org/get')
session.options('http://httpbin.org/get')
session.patch('http://httpbin.org/patch', data=https://www.cnblogs.com/jiba/archive/2022/09/09/b'data')
要使用這些方法,只需要把對應的方法和引數替換一下,用法和get類似就不再舉例,
回應的幾個方法
對于回應來說,我們可以用如下方法分別獲取其中的回應情況,狀態碼、回應頭、回應體、回應體二進制內容、回應體JSON結果,實體如下:
#!/usr/bin/env python
# @Author : 鋼鐵知識庫
import aiohttp
import asyncio
async def main():
data = https://www.cnblogs.com/jiba/archive/2022/09/09/{'name': '鋼鐵知識庫', 'age': 23}
async with aiohttp.ClientSession() as session:
async with session.post('https://www.httpbin.org/post', data=https://www.cnblogs.com/jiba/archive/2022/09/09/data) as response:
print('status:', response.status) # 狀態碼
print('headers:', response.headers) # 回應頭
print('body:', await response.text()) # 回應體
print('bytes:', await response.read()) # 回應體二進制內容
print('json:', await response.json()) # 回應體json資料
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
'''
status: 200
headers: <CIMultiDictProxy('Date': 'Tue, 06 Sep 2022 00:18:36 GMT', 'Content-Type': 'application/json', 'Content-Length': '534', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
body: {
"args": {},
"data": "",
"files": {},
"form": {
"age": "23",
"name": "\u94a2\u94c1\u77e5\u8bc6\u5e93"
},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Content-Length": "57",
"Content-Type": "application/x-www-form-urlencoded",
"Host": "www.httpbin.org",
"User-Agent": "Python/3.8 aiohttp/3.8.1",
"X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1"
},
"json": null,
"origin": "122.55.11.188",
"url": "https://www.httpbin.org/post"
}
bytes: b'{\n "args": {}, \n "data": "", \n "files": {}, \n "form": {\n "age": "23", \n "name": "\\u94a2\\u94c1\\u77e5\\u8bc6\\u5e93"\n }, \n "headers": {\n "Accept": "*/*", \n "Accept-Encoding": "gzip, deflate", \n "Content-Length": "57", \n "Content-Type": "application/x-www-form-urlencoded", \n "Host": "www.httpbin.org", \n "User-Agent": "Python/3.8 aiohttp/3.8.1", \n "X-Amzn-Trace-Id": "Root=1-631691dc-6aa1b2b85045a1a0481d06e1"\n }, \n "json": null, \n "origin": "122.5.132.196", \n "url": "https://www.httpbin.org/post"\n}\n'
json: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '23', 'name': '鋼鐵知識庫'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Content-Length': '57', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.8 aiohttp/3.8.1', 'X-Amzn-Trace-Id': 'Root=1-631691dc-6aa1b2b85045a1a0481d06e1'}, 'json': None, 'origin': '122.55.11.188', 'url': 'https://www.httpbin.org/post'}
'''
可以看到有些欄位前面需要加await,因為其回傳的是一個協程物件(如async修飾的方法),那么前面就要加await,
超時設定
我們可以借助ClientTimeout物件設定超時,例如要設定1秒的超時時間,可以這么實作:
#!/usr/bin/env python
# @Author : 鋼鐵知識庫
import aiohttp
import asyncio
async def main():
# 設定 1 秒的超時
timeout = aiohttp.ClientTimeout(total=1)
data = https://www.cnblogs.com/jiba/archive/2022/09/09/{'name': '鋼鐵知識庫', 'age': 23}
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get('https://www.httpbin.org/delay/2', data=https://www.cnblogs.com/jiba/archive/2022/09/09/data) as response:
print('status:', response.status) # 狀態碼
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
'''
Traceback (most recent call last):
####中間省略####
raise asyncio.TimeoutError from None
asyncio.exceptions.TimeoutError
'''
這里設定了超時1秒請求延時2秒,發現拋出例外asyncio.TimeoutError,如果正常則回應200,
并發限制
aiohttp可以支持非常高的并發量,但面對高并發網站可能會承受不住,隨時有掛掉的危險,這時需要對并發進行一些控制,現在我們借助asyncio 的Semaphore來控制并發量,實體如下:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : 鋼鐵知識庫
import asyncio
from datetime import datetime
import aiohttp
# 宣告最大并發量
semaphore = asyncio.Semaphore(2)
async def get_api():
async with semaphore:
print(f'scrapting...{datetime.now()}')
async with session.get('https://www.baidu.com') as response:
await asyncio.sleep(2)
# print(f'當前時間:{datetime.now()}, {response.status}')
async def main():
global session
session = aiohttp.ClientSession()
tasks = [asyncio.ensure_future(get_api()) for _ in range(1000)]
await asyncio.gather(*tasks)
await session.close()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
'''
scrapting...2022-09-07 08:11:14.190000
scrapting...2022-09-07 08:11:14.292000
scrapting...2022-09-07 08:11:16.482000
scrapting...2022-09-07 08:11:16.504000
scrapting...2022-09-07 08:11:18.520000
scrapting...2022-09-07 08:11:18.521000
'''
在main方法里,我們宣告了1000個task,如果沒有通過Semaphore進行并發限制,那這1000放到gather方法后會被同時執行,并發量相當大,有了信號量的控制之后,同時運行的task數量就會被控制,這樣就能給aiohttp限制速度了,
aiohttp異步爬取實戰
接下來我們通過異步方式練手一個小說爬蟲,需求如下:
需求頁面:https://dushu.baidu.com/pc/detail?gid=4308080950
目錄介面:https://dushu.baidu.com/api/pc/getCatalog?data=https://www.cnblogs.com/jiba/archive/2022/09/09/{"book_id":"4308080950"}
詳情介面:https://dushu.baidu.com/api/pc/getChapterContent?data=https://www.cnblogs.com/jiba/archive/2022/09/09/{"book_id":"4295122774","cid":"4295122774|116332"}
關鍵引數:book_id:小說ID、cid:章節id
采集要求:使用協程方式寫入,資料存放進mongo
需求分析:點開需求頁面,通過F12抓包可以發現兩個介面,一個目錄介面,一個詳情介面,
首先第一步先請求目錄介面拿到cid章節id,然后將cid傳遞給詳情介面拿到小說資料,最后存入mongo即可,
話不多說,直接上代碼:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author : 鋼鐵知識庫
# 不合適就是不合適,真正合適的,你不會有半點猶豫,
import asyncio
import json,re
import logging
import aiohttp
import requests
from utils.conn_db import ConnDb
# 日志格式
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
# 章節目錄api
b_id = '4308080950'
url = 'https://dushu.baidu.com/api/pc/getCatalog?data=https://www.cnblogs.com/jiba/archive/2022/09/09/{"book_id":"'+b_id+'"}'
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/104.0.0.0 Safari/537.36"
}
# 并發宣告
semaphore = asyncio.Semaphore(5)
async def download(title,b_id, cid):
data = https://www.cnblogs.com/jiba/archive/2022/09/09/{"book_id": b_id,
"cid": f'{b_id}|{cid}',
}
data = https://www.cnblogs.com/jiba/archive/2022/09/09/json.dumps(data)
detail_url ='https://dushu.baidu.com/api/pc/getChapterContent?data=https://www.cnblogs.com/jiba/archive/2022/09/09/{}'.format(data)
async with semaphore:
async with aiohttp.ClientSession(headers=headers) as session:
async with session.get(detail_url) as response:
res = await response.json()
content = {
'title': title,
'content': res['data']['novel']['content']
}
# print(title)
await save_data(content)
async def save_data(data):
if data:
client = ConnDb().conn_motor_mongo()
db = client.baidu_novel
collection = db.novel
logging.info('saving data %s', data)
await collection.update_one(
{'title': data.get('title')},
{'$set': data},
upsert=True
)
async def main():
res = requests.get(url, headers=headers)
tasks = []
for re in res.json()['data']['novel']['items']: # 拿到某小說目錄cid
title = re['title']
cid = re['cid']
tasks.append(download(title, b_id, cid)) # 將請求放到串列里,再通過gather執行并發
await asyncio.gather(*tasks)
if __name__ == '__main__':
asyncio.run(main())
至此,我們就使用aiohttp完成了對小說章節的爬取,
要實作異步處理,得先要有掛起操作,當一個任務需要等待 IO 結果的時候,可以掛起當前任務,轉而去執行其他任務,這樣才能充分利用好資源,要實作異步,需要了解 await 的用法,使用 await 可以將耗時等待的操作掛起,讓出控制權,當協程執行的時候遇到 await,時間回圈就會將本協程掛起,轉而去執行別的協程,直到其他的協程掛起或執行完畢,
await 后面的物件必須是如下格式之一:
- A native coroutine object returned from a native coroutine function,一個原生 coroutine 物件,
- A generator-based coroutine object returned from a function decorated with types.coroutine,一個由 types.coroutine 修飾的生成器,這個生成器可以回傳 coroutine 物件,
- An object with an await method returning an iterator,一個包含 await 方法的物件回傳的一個迭代器,
---- 20220909 鋼鐵知識庫
總結
以上就是借助協程async和異步aiohttp兩個主要模塊完成異步爬蟲的內容,
aiohttp 以異步方式爬取網站的耗時遠小于 requests 同步方式,以上列舉的例子希望對你有幫助,
注意,執行緒和協程是兩個概念,后面找機會我們再聊聊行程和執行緒、執行緒和協程的關系,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/506008.html
標籤:其他
上一篇:我的設計模式之旅 ② 單例模式
