引言
同步:不同程式單元為了完成某個任務,在執行程序中需靠某種通信方式以協調一致,稱這些程式單元是同步執行的,
例如購物系統中更新商品庫存,需要用“行鎖”作為通信信號,讓不同的更新請求強制排隊順序執行,那更新庫存的操作是同步的,
簡言之,同步意味著有序,
阻塞:程式未得到所需計算資源時被掛起的狀態,
程式在等待某個操作完成期間,自身無法繼續干別的事情,則稱該程式在該操作上是阻塞的,
常見的阻塞形式有:網路I/O阻塞、磁盤I/O阻塞、用戶輸入阻塞等,
阻塞狀態下的性能提升
引入多行程:
在一個程式內,依次執行10次太耗時,那開10個一樣的程式同時執行不就行了,于是我們想到了多行程編程,為什么會先想到多行程呢?發展脈絡如此,在更早的作業系統(Linux 2.4)及其以前,行程是 OS 調度任務的物體,是面向行程設計的OS.
改善效果立竿見影,但仍然有問題,總體耗時并沒有縮減到原來的十分之一,而是九分之一左右,還有一些時間耗到哪里去了?行程切換開銷,
行程切換開銷不止像“CPU的時間觀”所列的“背景關系切換”那么低,CPU從一個行程切換到另一個行程,需要把舊行程運行時的暫存器狀態、記憶體狀態全部保存好,再將另一個行程之前保存的資料恢復,對CPU來講,幾個小時就干等著,當行程數量大于CPU核心數量時,行程切換是必然需要的,
除了切換開銷,多行程還有另外的缺點,一般的服務器在能夠穩定運行的前提下,可以同時處理的行程數在數十個到數百個規模,如果行程數量規模更大,系統運行將不穩定,而且可用記憶體資源往往也會不足,
多行程解決方案在面臨每天需要成百上千萬次下載任務的爬蟲系統,或者需要同時搞定數萬并發的電商系統來說,并不適合,
除了切換開銷大,以及可支持的任務規模小之外,多行程還有其他缺點,如狀態共享等問題,后文會有提及,此處不再細究,
多執行緒(改進(多行程帶來多問題))
由于執行緒的資料結構比行程更輕量級,同一個行程可以容納多個執行緒,從行程到執行緒的優化由此展開,后來的OS也把調度單位由行程轉為執行緒,行程只作為執行緒的容器,用于管理行程所需的資源,而且OS級別的執行緒是可以被分配到不同的CPU核心同時運行的,
結果符合預期,比多行程耗時要少些,從運行時間上看,多執行緒似乎已經解決了切換開銷大的問題,而且可支持的任務數量規模,也變成了數百個到數千個,
但是,多執行緒仍有問題,特別是Python里的多執行緒,首先,Python中的多執行緒因為GIL的存在,它們并不能利用CPU多核優勢,一個Python行程中,只允許有一個執行緒處于運行狀態,那為什么結果還是如預期,耗時縮減到了十分之一?
因為在做阻塞的系統呼叫時,例如sock.connect(),sock.recv()時,當前執行緒會釋放GIL,讓別的執行緒有執行機會,但是單個執行緒內,在阻塞呼叫上還是阻塞的,
小提示:Python中 time.sleep 是阻塞的,都知道使用它要謹慎,但在多執行緒編程中,time.sleep 并不會阻塞其他執行緒,
除了GIL之外,所有的多執行緒還有通病,它們是被OS調度,調度策略是搶占式的,以保證同等優先級的執行緒都有均等的執行機會,那帶來的問題是:并不知道下一時刻是哪個執行緒被運行,也不知道它正要執行的代碼是什么,所以就可能存在競態條件,
例如爬蟲作業執行緒從任務佇列拿待抓取URL的時候,如果多個爬蟲執行緒同時來取,那這個任務到底該給誰?那就需要用到“鎖”或“同步佇列”來保證下載任務不會被重復執行,
而且執行緒支持的多任務規模,在數百到數千的數量規模,在大規模的高頻網路互動系統中,仍然有些吃力,當然,多執行緒最主要的問題還是競態條件,
非阻塞(阻塞狀態下的性能提升)
原非阻方案
def nonblocking_way():
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect(('example.com', 80))
except BlockingIOError:
# 非阻塞連接程序中也會拋出例外
pass
request = 'GET / HTTP/1.0\r\nHost: example.com\r\n\r\n'
data = https://www.cnblogs.com/changting/p/request.encode('ascii')
# 不知道socket何時就緒,所以不斷嘗試發送
while True:
try:
sock.send(data)
# 直到send不拋例外,則發送完成
break
except OSError:
pass
response = b''
while True:
try:
chunk = sock.recv(4096)
while chunk:
response += chunk
chunk = sock.recv(4096)
break
except OSError:
pass
return response
首先注意到兩點,就感覺被騙了,一是耗時與同步阻塞相當,二是代碼更復雜,要非阻塞何用?且慢,
上第9行代碼sock.setblocking(False)告訴OS,讓socket上阻塞呼叫都改為非阻塞的方式,之前我們說到,非阻塞就是在做一件事的時候,不阻礙呼叫它的程式做別的事情,上述代碼在執行完 sock.connect() 和 sock.recv() 后的確不再阻塞,可以繼續往下執行請求準備的代碼或者是執行下一次讀取,
代碼變得更復雜也是上述原因所致,第11行要放在try陳述句內,是因為socket在發送非阻塞連接請求程序中,系統底層也會拋出例外,connect()被呼叫之后,立即可以往下執行第15和16行的代碼,
需要while回圈不斷嘗試 send(),是因為connect()已經非阻塞,在send()之時并不知道 socket 的連接是否就緒,只有不斷嘗試,嘗試成功為止,即發送資料成功了,recv()呼叫也是同理,
雖然 connect() 和 recv() 不再阻塞主程式,空出來的時間段CPU沒有空閑著,但并沒有利用好這空閑去做其他有意義的事情,而是在回圈嘗試讀寫 socket (不停判斷非阻塞呼叫的狀態是否就緒),還得處理來自底層的可忽略的例外,也不能同時處理多個 socket ,
非阻塞改進
- epoll
判斷非阻塞呼叫是否就緒如果 OS 能做,是不是應用程式就可以不用自己去等待和判斷了,就可以利用這個空閑去做其他事情以提高效率,
所以OS將I/O狀態的變化都封裝成了事件,如可讀事件、可寫事件,并且提供了專門的系統模塊讓應用程式可以接收事件通知,這個模塊就是select,讓應用程式可以通過select注冊檔案描述符和回呼函式,當檔案描述符的狀態發生變化時,select 就呼叫事先注冊的回呼函式,
select因其演算法效率比較低,后來改進成了poll,再后來又有進一步改進,BSD內核改進成了kqueue模塊,而Linux內核改進成了epoll模塊,這四個模塊的作用都相同,暴露給程式員使用的API也幾乎一致,區別在于kqueue 和 epoll 在處理大量檔案描述符時效率更高,
鑒于 Linux 服務器的普遍性,以及為了追求更高效率,所以我們常常聽聞被探討的模塊都是 epoll
2 . 回呼((Callback))
把I/O事件的等待和監聽任務交給了 OS,那 OS 在知道I/O狀態發生改變后(例如socket連接已建立成功可發送資料),它又怎么知道接下來該干嘛呢?只能回呼,
需要我們將發送資料與讀取資料封裝成獨立的函式,讓epoll代替應用程式監聽socket狀態時,得告訴epoll:“如果socket狀態變為可以往里寫資料(連接建立成功了),請呼叫HTTP請求發送函式,如果socket 變為可以讀資料了(客戶端已收到回應),請呼叫回應處理函式,”
首先,不斷嘗試send() 和 recv() 的兩個回圈被消滅掉了,
其次,匯入了selectors模塊,并創建了一個DefaultSelector 實體,Python標準庫提供的selectors模塊是對底層select/poll/epoll/kqueue的封裝,DefaultSelector類會根據 OS 環境自動選擇最佳的模塊,那在 Linux 2.5.44 及更新的版本上都是epoll了,
然后,在第25行和第31行分別注冊了socket可寫事件(EVENT_WRITE)和可讀事件(EVENT_READ)發生后應該采取的回呼函式,
雖然代碼結構清晰了,阻塞操作也交給OS去等待和通知了,但是,我們要抓取10個不同頁面,就得創建10個Crawler實體,就有20個事件將要發生,那如何從selector里獲取當前正發生的事件,并且得到對應的回呼函式去執行呢?
- 事件回圈(Event Loop)
為了解決上述問題,那我們只得采用老辦法,寫一個回圈,去訪問selector模塊,等待它告訴我們當前是哪個事件發生了,應該對應哪個回呼,這個等待事件通知的回圈,稱之為事件回圈,
重要的是第49行代碼,selector.select() 是一個阻塞呼叫,因為如果事件不發生,那應用程式就沒事件可處理,所以就干脆阻塞在這里等待事件發生,那可以推斷,如果只下載一篇網頁,一定要connect()之后才能send()繼而recv(),那它的效率和阻塞的方式是一樣的,因為不在connect()/recv()上阻塞,也得在select()上阻塞,
'
python3 asyncio封裝上述非阻塞的改進方法,用時4年打造異步標準庫
協程引入
yield
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[CONSUMER] Consuming %s...' % n)
r = '200 OK'
def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
c = consumer()
produce(c)
demo決議:
注意到consumer函式是一個generator,把一個consumer傳入produce后:
首先呼叫c.send(None)啟動生成器;
然后,一旦生產了東西,通過c.send(n)切換到consumer執行;
consumer通過yield拿到訊息,處理,又通過yield把結果傳回;
produce拿到consumer處理的結果,繼續生產下一條訊息;
produce決定不生產了,通過c.close()關閉consumer,整個程序結束
在 Python 中呼叫協程物件1的 send() 方法時,第一次呼叫必須使用引數 None, 這使得協程的使用變得十分麻煩
解決此問題:
借助 Python 自身的特性來避免這一問題,比如,創建一個裝飾器
def routine(func):
def start(*args, **kwargs):
cr = func(*args, **kwargs)
cr.send(None)
return cr
return start
@routine
def product():
pass
yield from
yield from 是Python 3.3 新引入的語法(PEP 380),它主要解決的就是在生成器里玩生成器不方便的問題,它有兩大主要功能,
第一個功能是:讓嵌套生成器不必通過回圈迭代yield,而是直接yield from,以下兩種在生成器里玩子生成器的方式是等價的,
def gen_one():
subgen = range(10) yield from subgendef gen_two():
subgen = range(10) for item in subgen: yield item
第二個功能就是在子生成器和原生成器的呼叫者之間打開雙向通道,兩者可以直接通信,
def gen():
yield from subgen()def subgen():
while True:
x = yield
yield x+1def main():
g = gen()
next(g) # 驅動生成器g開始執行到第一個 yield
retval = g.send(1) # 看似向生成器 gen() 發送資料
print(retval) # 回傳2
g.throw(StopIteration) # 看似向gen()拋入例外
用yield from改進基于生成器的協程,代碼抽象程度更高,使業務邏輯相關的代碼更精簡,由于其雙向通道功能可以讓協程之間隨心所欲傳遞資料,使Python異步編程的協程解決方案大大向前邁進了一步,
于是Python語言開發者們充分利用yield from,使 Guido 主導的Python異步編程框架Tulip迅速脫胎換骨,并迫不及待得讓它在 Python 3.4 中換了個名字asyncio以“實習生”角色出現在標準庫中,
asyncio
asyncio是Python 3.4 試驗性引入的異步I/O框架(PEP 3156),提供了基于協程做異步I/O撰寫單執行緒并發代碼的基礎設施,其核心組件有事件回圈(Event Loop)、協程(Coroutine)、任務(Task)、未來物件(Future)以及其他一些擴充和輔助性質的模塊,
在引入asyncio的時候,還提供了一個裝飾器@asyncio.coroutine用于裝飾使用了yield from的函式,以標記其為協程,但并不強制使用這個裝飾器,
- 協程的隱式創建方式
import threading
import asyncio
@asyncio.coroutine
def hello():
print('Hello world! (%s)' % threading.currentThread())
yield from asyncio.sleep(1)
print('Hello again! (%s)' % threading.currentThread())
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
@asyncio.coroutine把一個generator標記為coroutine型別,然后,我們就把這個coroutine扔到EventLoop中執行,
hello()會首先列印出Hello world!,然后,yield from語法可以讓我們方便地呼叫另一個generator,由于asyncio.sleep()也是一個coroutine,所以執行緒不會等待asyncio.sleep(),而是直接中斷并執行下一個訊息回圈,當asyncio.sleep()回傳時,執行緒就可以從yield from拿到回傳值(此處是None),然后接著執行下一行陳述句,
把asyncio.sleep(1)看成是一個耗時1秒的IO操作,在此期間,主執行緒并未等待,而是去執行EventLoop中其他可以執行的coroutine了,因此可以實作并發執行,
- 協程顯示的創建方式(便于理解隱式創建)
創建任務task(2種)
import asyncio
import time
now = lambda : time.time()
async def do_some_work(x):
print('Waiting: ', x)
start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
# task = asyncio.ensure_future(coroutine) # 方式一
task = loop.create_task(coroutine) # 方式二
print(task)
loop.run_until_complete(task)
print(task)
print('TIME: ', now() - start)
創建task后,task在加入事件回圈之前是pending狀態,加入loop后運行中是running狀態,loop呼叫完是Done,運行完是finished狀態,雖說本質上協程函式和task指的東西都一樣,但是task有了協程函式的狀態,
其中loop.run_until_complete()接受一個future引數,futurn具體指代一個協程函式,而task是future的子類,所以我們不宣告一個task直接傳入協程函式也能執行,
- 協程系結系結回呼函式
通過task的task.add_done_callback(callback)方法系結回呼函式,回呼函式接收一個future物件引數如task,在內部通過future.result()獲得協程函式的回傳值,
import asyncio
async def test(x):
return x+3
def callback(y):
print(y.result())
coroutine = test(5)
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
task
<Task pending coro=<test() running at <ipython-input-4-61142fef17d8>:1>>
task.add_done_callback(callback)
loop.run_until_complete(task)
- 協程耗時任務掛起
多任務宣告了協程函式,也同時在loop中注冊了,他的執行也是順序執行的,因為在異步函式中沒有宣告那些操作是耗時操作,所以會順序執行,await的作用就是告訴控制器這個步驟是耗時的,async可以定義協程物件,使用await可以針對耗時的操作進行掛起
import asyncio
import time
async def test(1):
time.sleep(1)
print(time.time())
tasks = [asyncio.ensure_future(test()) for _ in range(3)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
1547187398.7611663
1547187399.7611988
1547187400.7632194
上面執行并不是異步執行,而是順序執行,但是改成下面形式那就是異步執行:
import asyncio
import time
async def test(t):
await asyncio.sleep(1)
print(time.time())
tasks = [asyncio.ensure_future(test()) for _ in range(3)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
1547187398.7611663
1547187399.7611988
1547187400.7632194
async、await(將耗時函式掛起)
用asyncio提供的@asyncio.coroutine可以把一個generator標記為coroutine型別,然后在coroutine內部用yield from呼叫另一個coroutine實作異步操作,
為了簡化并更好地標識異步IO,從Python 3.5開始引入了新的語法async和await,可以讓coroutine的代碼更簡潔易讀,
請注意,async和await是針對coroutine的新語法,要使用新的語法,只需要做兩步簡單的替換:
把@asyncio.coroutine替換為async;
把yield from替換為await,
async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")
aiohttp
asyncio可以實作單執行緒并發IO操作,如果僅用在客戶端,發揮的威力不大,如果把asyncio用在服務器端,例如Web服務器,由于HTTP連接就是IO操作,因此可以用單執行緒+coroutine實作多用戶的高并發支持,
asyncio實作了TCP、UDP、SSL等協議,aiohttp則是基于asyncio實作的HTTP框架,
import asyncio
from aiohttp import web
async def index(request):
await asyncio.sleep(0.5)
return web.Response(body=b'<h1>Index</h1>')
async def hello(request):
await asyncio.sleep(0.5)
text = '<h1>hello, %s!</h1>' % request.match_info['name']
return web.Response(body=text.encode('utf-8'))
async def init(loop):
app = web.Application(loop=loop)
app.router.add_route('GET', '/', index)
app.router.add_route('GET', '/hello/{name}', hello)
srv = await loop.create_server(app.make_handler(), '127.0.0.1', 8000)
print('Server started at http://127.0.0.1:8000...')
return srv
loop = asyncio.get_event_loop() # 創建一個事件回圈(池)
loop.run_until_complete(init(loop)) # 將協程物件包裝并注冊協程物件
loop.run_forever()
多行程配合使用 + 協程
方法1:
asyncio、aiohttp需要配合aiomultiprocess
方法2:
gevent.pool import Pool
multiprocessing import Process
核心代碼
def main():
file_list = ["7001", "7002", "7003"]
p_lst = [] # 執行緒串列
for i in file_list:
# self.run(i)
p = Process(target=read_file, args=(i,)) # 子行程呼叫函式
p.start() # 啟動子行程
p_lst.append(p) # 將所有行程寫入串列中
def read_file(self, number):
"""
讀取檔案
:param number: 檔案標記
:return:
"""
file_name = os.path.join(self.BASE_DIR, "data", "%s.txt" % number)
# print(file_name)
self.write_log(number, "開始讀取檔案 {}".format(file_name),"green")
with open(file_name, encoding='utf-8') as f:
# 使用協程池,執行任務,語法: pool.map(func,iterator)
# partial使用偏函式傳遞引數
# 注意:has_null第一個引數,必須是迭代器遍歷的值
pool.map(partial(self.has_null, number=number), f)
多協程
使用loop.run_until_complete(syncio.wait(tasks)) 也可以使用 loop.run_until_complete(asyncio.gather(*tasks)) ,前者傳入task串列,會對task進行解包操作,
多協程嵌套
async def get(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
print(response)
print(time.time())
import time
async def request():
url = "http://www.baidu.com"
resulit = await get(url)
tasks = [asyncio.ensure_future(request()) for _ in range(10000)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
- 回傳方式(3種)
async def get(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
print(response)
print(time.time())
async def request():
url = "http://www.baidu.com"
tasks = [asyncio.ensure_future(url) for _ in range(1000)]
方式一:
dones, pendings = await asyncio.wait(tasks) # 回傳future物件,不回傳直接結果
for task in dones:
print('Task ret: ', task.result())
方式二:
results = await asyncio.gather(*tasks) # 直接回傳結果
方式三:
for task in asyncio.as_completed(tasks):
result = await task
print('Task ret: {}'.format(result)) # 迭代方式回傳結果
tasks = asyncio.ensure_future(request())
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
停止協程任務
實作結束task有兩種方式:關閉單個task、關閉loop,涉及主要函式:
asyncio.Task.all_tasks()獲取事件回圈任務串列
KeyboardInterrupt捕獲停止例外(Ctrl+C)
loop.stop()停止任務回圈
task.cancel()取消單個任務
loop.run_forever()
loop.close()關閉事件回圈,不然會重啟
gevent(第三方包實作協程方式)
python程式實作的一種單執行緒下的多任務執行調度器,簡單來說在一個執行緒里,先后執行AB兩個任務,但是當A遇到耗時操作(網路等待、檔案讀寫等),這個時候gevent會讓A繼續執行,但是同時也會開始執行B任務,如果B在遇到耗時操作同時A又執行完了耗時操作,gevent又繼續執行A,
import gevent
def test(time):
print(1)
gevent.sleep(time)
print(2)
def test2(time):
print(3)
gevent.sleep(time)
print(4)
if __name__ == '__main__':
gevent.joinall([
gevent.spawn(test, 2),
gevent.spawn(test2, 3)
])
借鑒文章:
https://mp.weixin.qq.com/s/GgamzHPyZuSg45LoJKsofA
https://rgb-24bit.github.io/blog/2019/python-coroutine-event-loop.html
https://zhuanlan.zhihu.com/p/54657754
https://cloud.tencent.com/developer/article/1590280
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/127911.html
標籤:其他
上一篇:第2章 順序表及其順序存盤
下一篇:斐波那契數列
