一、背景知識
爬蟲的本質就是一個socket客戶端與服務端的通信程序,如果我們有多個url待爬取,只用一個執行緒且采用串行的方式執行,那只能等待爬取一個結束后才能繼續下一個,效率會非常低,
需要強調的是:對于單執行緒下串行N個任務,并不完全等同于低效,如果這N個任務都是純計算的任務,那么該執行緒對cpu的利用率仍然會很高,之所以單執行緒下串行多個爬蟲任務低效,是因為爬蟲任務是明顯的IO密集型程式,
二、同步、異步、回呼機制
1 同步呼叫:即提交一個任務后就在原地等待任務結束,等到拿到任務的結果后再繼續下一行代碼,效率低下
同步呼叫
import requests
def parse_page(res):
print('決議 %s' %(len(res)))
def get_page(url):
print('下載 %s' %url)
response=requests.get(url)
if response.status_code == 200:
return response.text
urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']
for url in urls:
res=get_page(url) #呼叫一個任務,就在原地等待任務結束拿到結果后才繼續往后執行
parse_page(res)
2 一個簡單的解決方案:多執行緒或多行程
多執行緒或多行程
# 在服務器端使用多執行緒(多行程),多執行緒(多行程)目的是讓每個連接都擁有獨立的執行緒(或行程),這樣任何一個連接的阻塞都不會影響其他的連接
# 開啟多行程或都執行緒的方式,我們是無法無限制地開啟多行程或多執行緒的:在遇到要同時回應成百上千路的連接請求,則無論多執行緒還是多行程都會嚴重占據系統資源,降低系統對外界回應效率,而且執行緒與行程本身也更容易進入假死狀態,
# IO密集型程式應該用多執行緒
import requests
from threading import Thread,current_thread
def parse_page(res):
print('%s 決議 %s' %(current_thread().getName(),len(res)))
def get_page(url,callback=parse_page):
print('%s 下載 %s' %(current_thread().getName(),url))
response=requests.get(url)
if response.status_code == 200:
callback(response.text)
if __name__ == '__main__':
urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']
for url in urls:
t=Thread(target=get_page,args=(url,))
t.start()
3 改進方案: 執行緒池或行程池+異步呼叫:提交一個任務后并不會等待任務結束,而是繼續下一行代碼
行程池或執行緒池:異步呼叫+回呼機制
# 很多程式員可能會考慮使用“執行緒池”或“連接池”,“執行緒池”旨在減少創建和銷毀執行緒的頻率,其維持一定合理數量的執行緒,并讓空閑的執行緒重新承擔新的執行任務,“連接池”維持連接的快取池,盡量重用已有的連接、減少創建和關閉連接的頻率,這兩種技術都可以很好的降低系統開銷,都被廣泛應用很多大型系統,如websphere、tomcat和各種資料庫等
#IO密集型程式應該用多執行緒,所以此時我們使用執行緒池
import requests
from threading import current_thread
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
def parse_page(res):
res=res.result()
print('%s 決議 %s' %(current_thread().getName(),len(res)))
def get_page(url):
print('%s 下載 %s' %(current_thread().getName(),url))
response=requests.get(url)
if response.status_code == 200:
return response.text
if __name__ == '__main__':
urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']
pool=ThreadPoolExecutor(50)
# pool=ProcessPoolExecutor(50)
for url in urls:
pool.submit(get_page,url).add_done_callback(parse_page)
pool.shutdown(wait=True)
改進后方案其實也存在著問題:
“執行緒池”和“連接池”技術也只是在一定程度上緩解了頻繁呼叫IO介面帶來的資源占用,而且,所謂“池”始終有其上限,當請求大大超過上限時,“池”構成的系統對外界的回應并不比沒有池的時候效果好多少,所以使用“池”必須考慮其面臨的回應規模,并根據回應規模調整“池”的大小
三、高性能
對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,“執行緒池”或“連接池”或許可以緩解部分壓力,但是不能解決所有問題,總之,多執行緒模型可以方便高效的解決小規模的服務請求,但面對大規模的服務請求,多執行緒模型也會遇到瓶頸,可以用非阻塞介面來嘗試解決這個問題,
上述無論哪種解決方案其實沒有解決一個性能相關的問題:IO阻塞,無論是多行程還是多執行緒,在遇到IO阻塞時都會被作業系統強行剝奪走CPU的執行權限,程式的執行效率因此就降低了下來,
解決這一問題的關鍵在于,我們自己從應用程式級別檢測IO阻塞然后切換到我們自己程式的其他任務執行,這樣把我們程式的IO降到最低,我們的程式處于就緒態就會增多,以此來迷惑作業系統,作業系統便以為我們的程式是IO比較少的程式,從而會盡可能多的分配CPU給我們,這樣也就達到了提升程式執行效率的目的
1 在python3.3之后新增了asyncio模塊,可以幫我們檢測IO(只能是網路IO),實作應用程式級別的切換
基本使用
import asyncio
@asyncio.coroutine
def task(task_id,senconds):
print('%s is start' %task_id)
yield from asyncio.sleep(senconds) #只能檢測網路IO,檢測到IO后切換到其他任務執行
print('%s is end' %task_id)
tasks=[task(task_id="任務1",senconds=3),task("任務2",2),task(task_id="任務3",senconds=1)]
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
2 但asyncio模塊只能發tcp級別的請求,不能發http協議,因此,在我們需要發送http請求的時候,需要我們自定義http報頭
asyncio+自定義http協議報頭
import asyncio
import requests
import uuid
user_agent='Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.221 Safari/537.36 SE 2.X MetaSr 1.0'
def parse_page(host,res):
print('%s 決議結果 %s' %(host,len(res)))
with open('%s.html' %(uuid.uuid1()),'wb') as f:
f.write(res)
@asyncio.coroutine
def get_page(host,port=80,url='/',callback=parse_page,ssl=False):
print('下載 http://%s:%s%s' %(host,port,url))
#步驟一(IO阻塞):發起tcp鏈接,是阻塞操作,因此需要yield from
if ssl:
port=443
recv,send=yield from asyncio.open_connection(host=host,port=443,ssl=ssl)
# 步驟二:封裝http協議的報頭,因為asyncio模塊只能封裝并發送tcp包,因此這一步需要我們自己封裝http協議的包
request_headers="""GET %s HTTP/1.0\r\nHost: %s\r\nUser-agent: %s\r\n\r\n""" %(url,host,user_agent)
# requset_headers="""POST %s HTTP/1.0\r\nHost: %s\r\n\r\nname=egon&password=123""" % (url, host,)
request_headers=request_headers.encode('utf-8')
# 步驟三(IO阻塞):發送http請求包
send.write(request_headers)
yield from send.drain()
# 步驟四(IO阻塞):接收回應頭
while True:
line=yield from recv.readline()
if line == b'\r\n':
break
print('%s Response headers:%s' %(host,line))
# 步驟五(IO阻塞):接收回應體
text=yield from recv.read()
# 步驟六:執行回呼函式
callback(host,text)
# 步驟七:關閉套接字
send.close() #沒有recv.close()方法,因為是四次揮手斷鏈接,雙向鏈接的兩端,一端發完資料后執行send.close()另外一端就被動地斷開
if __name__ == '__main__':
tasks=[
get_page('www.baidu.com',url='/s?wd=美女',ssl=True),
get_page('www.cnblogs.com',url='/',ssl=True),
]
loop=asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
3 自定義http報頭多少有點麻煩,于是有了aiohttp模塊,專門幫我們封裝http報頭,然后我們還需要用asyncio檢測IO實作切換
asyncio+aiohttp
import aiohttp
import asyncio
@asyncio.coroutine
def get_page(url):
print('GET:%s' %url)
response=yield from aiohttp.request('GET',url)
data=https://www.cnblogs.com/cqzlei/archive/2022/09/25/yield from response.read()
print(url,data)
response.close()
return 1
tasks=[
get_page('https://www.python.org/doc'),
get_page('https://www.cnblogs.com/linhaifeng'),
get_page('https://www.openstack.org')
]
loop=asyncio.get_event_loop()
results=loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
print('=====>',results) #[1, 1, 1]
4 此外,還可以將requests.get函式傳給asyncio,就能夠被檢測了
asyncio+requests模塊
import requests
import asyncio
@asyncio.coroutine
def get_page(func,*args):
print('GET:%s' %args[0])
loog=asyncio.get_event_loop()
furture=loop.run_in_executor(None,func,*args)
response=yield from furture
print(response.url,len(response.text))
return 1
tasks=[
get_page(requests.get,'https://www.python.org/doc'),
get_page(requests.get,'https://www.cnblogs.com/linhaifeng'),
get_page(requests.get,'https://www.openstack.org')
]
loop=asyncio.get_event_loop()
results=loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
print('=====>',results) #[1, 1, 1]
5 還有之前在協程時介紹的gevent模塊
gevent+requests
from gevent import monkey;monkey.patch_all()
import gevent
import requests
def get_page(url):
print('GET:%s' %url)
response=requests.get(url)
print(url,len(response.text))
return 1
# g1=gevent.spawn(get_page,'https://www.python.org/doc')
# g2=gevent.spawn(get_page,'https://www.cnblogs.com/linhaifeng')
# g3=gevent.spawn(get_page,'https://www.openstack.org')
# gevent.joinall([g1,g2,g3,])
# print(g1.value,g2.value,g3.value) #拿到回傳值
#協程池
from gevent.pool import Pool
pool=Pool(2)
g1=pool.spawn(get_page,'https://www.python.org/doc')
g2=pool.spawn(get_page,'https://www.cnblogs.com/linhaifeng')
g3=pool.spawn(get_page,'https://www.openstack.org')
gevent.joinall([g1,g2,g3,])
print(g1.value,g2.value,g3.value) #拿到回傳值
6 封裝了gevent+requests模塊的grequests模塊
grequests
#pip3 install grequests
import grequests
request_list=[
grequests.get('https://wwww.xxxx.org/doc1'),
grequests.get('https://www.cnblogs.com/linhaifeng'),
grequests.get('https://www.openstack.org')
]
##### 執行并獲取回應串列 #####
# response_list = grequests.map(request_list)
# print(response_list)
##### 執行并獲取回應串列(處理例外) #####
def exception_handler(request, exception):
# print(request,exception)
print("%s Request failed" %request.url)
response_list = grequests.map(request_list, exception_handler=exception_handler)
print(response_list)
7 twisted:是一個網路框架,其中一個功能是發送異步請求,檢測IO并自動切換
twisted基本用法
# 問題一:error: Microsoft Visual C++ 14.0 is required. Get it with "Microsoft Visual C++ Build Tools": http://landinghub.visualstudio.com/visual-cpp-build-tools
https://www.lfd.uci.edu/~gohlke/pythonlibs/#twisted
pip3 install C:\Users\Administrator\Downloads\Twisted-17.9.0-cp36-cp36m-win_amd64.whl
pip3 install twisted
# 問題二:ModuleNotFoundError: No module named 'win32api'
https://sourceforge.net/projects/pywin32/files/pywin32/
# 問題三:openssl
pip3 install pyopenssl
# twisted基本用法
from twisted.web.client import getPage,defer
from twisted.internet import reactor
def all_done(arg):
# print(arg)
reactor.stop()
def callback(res):
print(res)
return 1
defer_list=[]
urls=[
'http://www.baidu.com',
'http://www.bing.com',
'https://www.python.org',
]
for url in urls:
obj=getPage(url.encode('utf=-8'),)
obj.addCallback(callback)
defer_list.append(obj)
defer.DeferredList(defer_list).addBoth(all_done)
reactor.run()
# twisted的getPage的詳細用法
from twisted.internet import reactor
from twisted.web.client import getPage
import urllib.parse
def one_done(arg):
print(arg)
reactor.stop()
post_data = https://www.cnblogs.com/cqzlei/archive/2022/09/25/urllib.parse.urlencode({'check_data': 'adf'})
post_data = https://www.cnblogs.com/cqzlei/archive/2022/09/25/bytes(post_data, encoding='utf8')
headers = {b'Content-Type': b'application/x-www-form-urlencoded'}
response = getPage(bytes('http://dig.chouti.com/login', encoding='utf8'),
method=bytes('POST', encoding='utf8'),
postdata=https://www.cnblogs.com/cqzlei/archive/2022/09/25/post_data,
cookies={},
headers=headers)
response.addBoth(one_done)
reactor.run()
8 tornado
tornado用法
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop
def handle_response(response):
"""
處理回傳值內容(需要維護計數器,來停止IO回圈),呼叫 ioloop.IOLoop.current().stop()
:param response:
:return:
"""
if response.error:
print("Error:", response.error)
else:
print(response.body)
def func():
url_list = [
'http://www.baidu.com',
'http://www.bing.com',
]
for url in url_list:
print(url)
http_client = AsyncHTTPClient()
http_client.fetch(HTTPRequest(url), handle_response)
ioloop.IOLoop.current().add_callback(func)
ioloop.IOLoop.current().start()
# 發現上例在所有任務都完畢后也不能正常結束,為了解決該問題,讓我們來加上計數器
from tornado.httpclient import AsyncHTTPClient
from tornado.httpclient import HTTPRequest
from tornado import ioloop
count=0
def handle_response(response):
"""
處理回傳值內容(需要維護計數器,來停止IO回圈),呼叫 ioloop.IOLoop.current().stop()
:param response:
:return:
"""
if response.error:
print("Error:", response.error)
else:
print(len(response.body))
global count
count-=1 #完成一次回呼,計數減1
if count == 0:
ioloop.IOLoop.current().stop()
def func():
url_list = [
'http://www.baidu.com',
'http://www.bing.com',
]
global count
for url in url_list:
print(url)
http_client = AsyncHTTPClient()
http_client.fetch(HTTPRequest(url), handle_response)
count+=1 #計數加1
ioloop.IOLoop.current().add_callback(func)
ioloop.IOLoop.current().start()
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/509491.html
標籤:其他
