目錄
- 同步與異步
- 阻塞與非阻塞
- 綜合使用
- 創建行程的多種方式
- 前言
- windows系統創建行程的問題(重要)
- multiprocessing模塊之Process
- 展現異步
- 創建行程的方式(一):使用Process()創建行程物件
- 基本使用
- 給子行程運行的函式傳參
- 創建行程的方式(二):重寫Process類的run方法
- 還是如何傳參
- join方法
- 行程間的資料隔離
- IPC機制 訊息佇列
- multiprocessing模塊之Queue
- get() put()
- full() empty()
- get_nowait()
- 訊息佇列實作子行程訊息傳遞
- multiprocessing模塊之Queue
- 消費者模型
- 行程物件多種方法
- 如何查看行程號
- multiprocessing模塊之Process其他方法
- 守護行程
- 僵尸行程
- 孤兒行程
- 多行程資料錯亂問題
- 練習
同步與異步
用來表達任務的提交方式
同步:
提交完任務之后原地等待任務的回傳結果 期間不做任何事
異步:
提交完任務之后不愿地等待任務的回傳結果 直接去做其他事 有結果自動通知
阻塞與非阻塞
用來表達任務的執行狀態
阻塞
程式處于阻塞態
非阻塞
程式處于就緒態、運行態
綜合使用
- 同步阻塞
提交任務之后 cpu走了 行程不執行了 - 同步非阻塞
在原地做一些事情 - 異步阻塞
cpu沒來做不了事情 - 異步非阻塞(效率最高)
提交完任務之后 行程還繼續 cpu也還在
異步非阻塞框架(寫游戲、回應速度、效率高)
創建行程的多種方式
前言
# 1.創建行程的步驟
如何創建行程?用滑鼠雙擊一個桌面圖示,就創建了一個應用程式的行程,
而我們可以用python代碼創建行程,這其中至少有這樣的程序:
1.硬碟中存放python代碼
2.讀取代碼到記憶體
3.cpu執行代碼
4.作業系統創建新行程
# 2.不同作業系統的差異(重要)
因為行程是由作業系統創建的,所以根據不同作業系統也會有差異:
"""
在不同的作業系統中創建行程底層原理不一樣
windows
以匯入模塊的形式創建行程(需要使用if __name__ == 'main')
linux/mac
以拷貝代碼的形式創建行程 (復制的時候不包含創建行程的代碼)
"""
# 3.父行程和子行程
比如一個瀏覽器,可能會有很多分頁,那相對的來說瀏覽器就是主行程,一個頁面就是一個子行程,
一個py檔案,里面寫了創建行程的代碼,那這個py檔案運行之后,首先他自己是一個行程(父行程),被他創建出來的行程是他的子行程,
父行程和子行程是相對的概念,比如剛剛所說的py檔案,又是pycharm的子行程,
windows系統創建行程的問題(重要)
在windows系統下使用Python模塊multiprocessing模塊創建行程:
from multiprocessing import Process
import time
def task():
print('子行程開始執行')
time.sleep(3)
print('子行程執行結束')
p = Process(target=task) # 使用Process模塊創建一個子行程 在子行程運行task函式
p.start() # 運行子行程
print('主行程運行結束')
# 執行后會產生如下報錯資訊:
'''
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
Process finished with exit code 0
'''
為什么會產生這樣的報錯呢?
windows以匯入模塊的形式創建行程,

所以產生報錯的就是如下兩行代碼:

解決:
from multiprocessing import Process
import time
def task():
print('子行程開始執行')
time.sleep(3)
print('子行程執行結束')
if __name__ == '__main__':
p = Process(target=task)
p.start()
'''
將這兩行代碼放入if __name__ == 'main':
使主行程作為模塊匯入的時候,不再執行這兩行代碼,就不會出現問題了,
'''
multiprocessing模塊之Process
展現異步
展現異步之前,先舉一個同步的例子:
import time
def task():
print('子行程開始執行')
time.sleep(3)
print('子行程執行結束')
if __name__ == '__main__':
task() # 程式會跳回去執行task函式,task沒執行完就不會執行下面的print()
print('主行程運行結束')
'''輸出結果:
子行程開始執行
子行程執行結束
主行程運行結束
'''
展現異步:
def task():
print('子行程開始執行')
time.sleep(3)
print('子行程執行結束')
if __name__ == '__main__':
p = Process(target=task)
p.start()
print('主行程運行結束') # 這行代碼執行最快 主行程真的結束了嗎? =,=
'''輸出結果:
主行程運行結束
子行程開始執行
子行程執行結束
'''
為什么是這樣的輸出結果?
p.start()執行完后,相當于向作業系統發送了一個資訊,要創建一個子行程,作業系統要處理資訊是需要一定時間的,主行程只是發了一個資訊就完了,代碼繼續往下走,自然會執行print,等作業系統反應過來了,子行程才會開始執行,而這個速度是慢于主行程代碼執行速度的,也就是說: 作業系統開辟記憶體空間的時候 主行程不停著,直接運行下一行,
請解釋輸出結果:

請解釋輸出結果:

創建行程的方式(一):使用Process()創建行程物件
基本使用
import time
from multiprocessing import Process
def task():
print('子行程開始執行')
time.sleep(3)
print('子行程執行結束')
if __name__ == '__main__':
p = Process(target=task)
print(p, type(p)) # <Process(Process-1, initial)> <class 'multiprocessing.context.Process'>
p.start()
# 用Process創建一個子行程物件 然后呼叫子行程物件的start()方法運行子行程,
查看Process原始碼發現他繼承BaseProcess類:

BaseProcess類:看紅色框就好,

給子行程運行的函式傳參
import time
from multiprocessing import Process
def task1(a,b):
time.sleep(3)
print(a,b)
def task2(a,b):
time.sleep(3)
print(a,b)
if __name__ == '__main__':
p1 = Process(target=task1, args=('cloud','alice')) # 位置引數
p2 = Process(target=task2, kwargs={'a':'cloud','b':'alice'}) # 關鍵字引數
p1.start()
p2.start()
創建行程的方式(二):重寫Process類的run方法
import time
from multiprocessing import Process
class MyProcess(Process):
def run(self):
print('run is running')
time.sleep(3)
print('run is over')
if __name__ == '__main__':
obj = MyProcess() # 產生行程物件
obj.start() # 此時就會開一個子行程 子行程會去運行類中的run方法
print('主')
為什么可以這么做呢?我們知道Process繼承BaseProcess類,于是我們去看BaseProcess類的run方法:

可以得知這個run方法就是留給我們重寫的,
還是如何傳參
需要重寫BaseProcess類的雙下init:

from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self, name, age): # 重寫__init__方法
super().__init__() # 這里init不傳參是因為 父類init里面的形參全是默認引數
self.name = name # 這里的self是行程物件
self.age = age # 給新產生的行程物件新增兩個物件獨有的屬性name、age
# 備忘:__new__產生新物件
def run(self):
print('run is running', self.name, self.age) # 使用物件中的屬性
time.sleep(3)
print('run is over', self.name, self.age)
if __name__ == '__main__':
obj = MyProcess('cloud', 18)
obj.start()
print('主')
join方法
join方法的作用是:讓主行程代碼等待子行程代碼結束之后,再執行,
主行程會卡在join方法處,主行程join方法下面的代碼,不會執行,直到子行程運行結束,
# 1.基本使用
import time
from multiprocessing import Process
def task():
print('子行程開始執行')
time.sleep(3)
print('子行程執行結束')
if __name__ == '__main__':
p = Process(target=task)
p.start()
p.join()
print('主')
'''輸出結果:
子行程開始執行
子行程執行結束
主
'''
使用join方法會讓主程式等待子行程結束,不加join時,應該是'主'最先被輸出,
# 2.非阻塞例子
from multiprocessing import Process
import time
def task(name, n):
print('%s is running' % name)
time.sleep(n)
print('%s is over' % name)
if __name__ == '__main__':
p1 = Process(target=task, args=('jason1', 1)) # 子行程p1 執行時間為1s
p2 = Process(target=task, args=('jason2', 2)) # 子行程p2 執行時間為2s
p3 = Process(target=task, args=('jason3', 3)) # 子行程p3 執行時間為3s
start_time = time.time()
p1.start()
p2.start()
p3.start()
p1.join()
p2.join()
p3.join()
print(time.time() - start_time) # 請問運行時間是?
'''
jason1 is running
jason2 is running
jason3 is running
jason1 is over
jason2 is over
jason3 is over
3.2216908931732178
'''
# 3.阻塞例子
from multiprocessing import Process
import time
def task(name, n):
print('%s is running' % name)
time.sleep(n)
print('%s is over' % name)
if __name__ == '__main__':
p1 = Process(target=task, args=('jason1', 1)) # 子行程p1 執行時間為1s
p2 = Process(target=task, args=('jason2', 2)) # 子行程p2 執行時間為2s
p3 = Process(target=task, args=('jason3', 3)) # 子行程p3 執行時間為3s
start_time = time.time()
p1.start()
p1.join()
p2.start()
p2.join()
p3.start()
p3.join()
print(time.time() - start_time) # 請問運行時間是?
'''
jason1 is running
jason1 is over
jason2 is running
jason2 is over
jason3 is running
jason3 is over
6.449279308319092
'''
# 4.誰的速度快?
import time
from multiprocessing import Process
def task(name):
start_time = time.time()
print(f'{name}子行程開始執行')
time.sleep(3)
print(f'{name}子行程執行結束')
print(f'{name}行程耗時:{time.time()-start_time}')
if __name__ == '__main__':
p1 = Process(target=task,args = ('p1',))
p2 = Process(target=task,args = ('p2',))
p1.start()
p2.start()
p1.join()
print('主') # 這段print代碼有時可以跑的比第二個子行程還快!
# 5.無法阻擋子行程
import time
from multiprocessing import Process
def task(name):
start_time = time.time()
print(f'{name}子行程開始執行')
time.sleep(3)
print(f'{name}子行程執行結束')
print(f'{name}行程耗時:{time.time()-start_time}')
def task2(name):
start_time = time.time()
print(f'{name}子行程開始執行')
time.sleep(1)
print(f'{name}子行程執行結束')
print(f'{name}行程耗時:{time.time()-start_time}')
if __name__ == '__main__':
p1 = Process(target=task,args = ('p1',))
p2 = Process(target=task2,args = ('p2',))
p1.start()
p2.start()
p1.join() # join只能卡住主行程的代碼運行 不能阻擋子行程
print('主')
'''
p1子行程開始執行
p2子行程開始執行
p2子行程執行結束
p2行程耗時:1.0048680305480957
p1子行程執行結束
p1行程耗時:3.0130884647369385
主
'''
行程間的資料隔離
同一臺計算機上的多個行程資料是嚴格意義上的物理隔離(默認情況下)
from multiprocessing import Process
import time
money = 1000
def task():
global money
money = 666
print('子行程的task函式查看money', money)
if __name__ == '__main__':
p1 = Process(target=task)
p1.start() # 創建子行程
time.sleep(3) # 主行程代碼等待3秒
print(money) # 主行程代碼列印money
IPC機制 訊息佇列
IPC的概念:實作行程間通信
訊息佇列:訊息佇列可以理解成一個公共存資料的地方 所有行程都可以存 也可以取,
multiprocessing模塊之Queue
from multiprocessing import Queue
# 1.產生訊息佇列q
q = Queue(3) # 括號內可以指定存盤資料的個數
不給Queue傳值的情況下,會自動使用最大值:(SEM_VALUE_MAX)

此時佇列可以容納值的數量為:

get() put()
使用get從佇列中獲取值 使用put往佇列添加值 符合先進先出
from multiprocessing import Queue
q = Queue(3)
q.put(111)
q.put(222)
q.put(333)
print(q.get()) # 111
print(q.get()) # 222
print(q.get()) # 333
當佇列已滿時,你使用put添加值
或者佇列已空,你使用get獲取值
都會導致當前行程阻塞,等待有別的行程往佇列里添加/獲取值
from multiprocessing import Queue
q = Queue(3)
q.put(111)
q.put(222)
q.put(333)
q.put(444)
print('阻塞') # 無法輸出這行代碼
full() empty()
full 用于判斷佇列是否為滿,
但有些需要注意:
1.判斷的是當前行程的佇列
2.判斷的是執行full()這行代碼這個瞬時時間下,佇列的狀態是否為滿
empty 與 full 相反 判斷的是佇列是否為空
from multiprocessing import Queue
q = Queue(3)
q.put(111)
print(q.full()) # 判斷佇列是否已滿 False
q.put(222)
q.put(333)
print(q.full()) # 判斷佇列是否已滿 True
print(q.get())
print(q.get())
print(q.empty()) # 判斷佇列是否為空 False
print(q.get())
print(q.empty()) # 判斷佇列是否為空 True
注意:
full() empty() 在多行程中都不能使用!!!
可能當前行程你執行完q.empty 之后 馬上又另外的行程塞一個資料進來 q.empty只能判斷當前行程一個瞬時時間管道是否空,
get_nowait()
這個方法如他的名字,如果獲取不到佇列的值,就馬上拋出例外,
from multiprocessing import Queue
q = Queue(3)
q.put(111)
print(q.get())
print(q.get_nowait()) # queue.Empty
訊息佇列實作子行程訊息傳遞
from multiprocessing import Process, Queue
def product(q):
q.put('子行程p添加的資料')
def consumer(q):
print('子行程獲取佇列中的資料', q.get())
if __name__ == '__main__':
q = Queue()
# 主行程往佇列中添加資料
# q.put('我是主行程添加的資料')
p1 = Process(target=consumer, args=(q,))
p2 = Process(target=product, args=(q,))
p1.start()
p2.start()
print('主')
'''
consumer行程在訊息佇列沒有資料的時候 這個行程會等待product行程往Queue放東西
'''
消費者模型
"""回想爬蟲"""
生產者
負責產生資料的'人'
消費者
負責處理資料的'人'
該模型除了有生產者和消費者之外還必須有訊息佇列
(只要是能夠提供資料保存服務和提取服務的理論上都可以)
生產者消費者模型可以實作程式結耦合,兩個程式(生產者、消費者)基于訊息佇列都可以獨立運行,
用包子鋪舉例:
老板提前做好包子放在蒸籠(訊息佇列),
沒有客人的時候可以一直做包子,客人也不需要等待,隨時可以來拿包子,(程式結耦合)
行程物件多種方法
如何查看行程號
# 1.current_process查看
from multiprocessing import Process, current_process
def task():
print('子行程')
print(current_process()) # 查看行程
print(current_process().pid) # 查看行程號
if __name__ == '__main__':
p = Process(target=task)
p.start()
print('主行程:')
print(current_process())
print(current_process().pid)
'''
主行程:
<_MainProcess(MainProcess, started)>
504
子行程
<Process(Process-1, started)>
23260
'''
# 2.os模塊
import os
print(os.getpid()) # 獲取當前行程的行程號 # 17672
print(os.getppid()) # 獲取當前行程的父行程的行程號 # 28600 # 我用的是pycharm應該獲取的是pycharm的行程號
去cmd里輸入tasklist查看,果然如此:

multiprocessing模塊之Process其他方法
1.終止行程
p1.terminate()
ps:計算機作業系統都有對應的命令可以直接殺死行程
windows:taskkill /F /PID 行程號
2.判斷行程是否存活
p1.is_alive()
3.start()
4.join()
# 例子
import time
from multiprocessing import Process
def task():
print('子行程開始執行')
time.sleep(3)
print('子行程執行結束')
if __name__ == '__main__':
p = Process(target=task)
p.start()
p.terminate()
print(p.is_alive()) # True # 不是已經終止行程了嗎?為什么還是True
# 1.前腳剛開 后腳就關了 這時候行程都起不來 如果加中間sleep可能就子行程運行完了
# 2.執行terminate相當于讓作業系統關掉剛剛創建的子行程,而這是需要時間的,可能執行is_alive的速度比作業系統關行程的速度快,所以結果是True
守護行程
守護行程會隨著守護的行程結束而立刻結束
使用場景:
一鍵關閉所有子行程
eg: 吳勇是張紅的守護行程 一旦張紅嗝屁了 吳勇立刻嗝屁
from multiprocessing import Process
import time
def task(name):
print('德邦總管:%s' % name)
time.sleep(3)
print('德邦總管:%s' % name)
if __name__ == '__main__':
p1 = Process(target=task, args=('大張紅',))
p1.daemon = True
p1.start()
time.sleep(1)
print('恕瑞瑪皇帝:小吳勇嗝屁了')
僵尸行程
僵尸行程
行程執行完畢后并不會立刻銷毀所有的資料 會有一些資訊短暫保留下來
比如行程號、行程執行時間、行程消耗功率等給父行程查看
ps:所有的行程都會變成僵尸行程
孤兒行程
孤兒行程
子行程正常運行 父行程意外死亡 作業系統針對孤兒行程會派遣福利院管理
多行程資料錯亂問題
火車票搶票時經常出現這種問題:
本來有3張票 點進去之后一張票都沒有
是因為:你看到的這三張票基于 進入app的時間
你需要重繪 才能獲取當前時間的資訊
模擬搶票軟體
from multiprocessing import Process
import time
import json
import random
# 查票
def search(name):
with open(r'data.json', 'r', encoding='utf8') as f:
data = https://www.cnblogs.com/passion2021/archive/2022/11/18/json.load(f)
print('%s在查票 當前余票為:%s' % (name, data.get('ticket_num')))
# 買票
def buy(name):
# 再次確認票
with open(r'data.json', 'r', encoding='utf8') as f:
data = https://www.cnblogs.com/passion2021/archive/2022/11/18/json.load(f)
# 模擬網路延遲
time.sleep(random.randint(1, 3))
# 判斷是否有票 有就買
if data.get('ticket_num') > 0:
data['ticket_num'] -= 1
with open(r'data.json', 'w', encoding='utf8') as f:
json.dump(data, f)
print('%s買票成功' % name)
else:
print('%s很倒霉 沒有搶到票' % name)
def run(name):
search(name)
buy(name)
if __name__ == '__main__':
tick_dict = {'ticket_num': 1} # 創建車票字典 里面就只有一張票
with open('data.json', 'w', encoding='utf8') as f:
json.dump(tick_dict, f)
for i in range(10):
p = Process(target=run, args=('用戶%s'%i, ))
p.start()
"""
多行程操作資料很可能會造成資料錯亂>>>:互斥鎖
互斥鎖
將并發變成串行 犧牲了效率但是保障了資料的安全
也就是讓行程排隊
"""
輸出結果:

練習
1.將TCP服務端使用多行程實作并發效果
聊天全部采用自動發送 不要用input手動輸
2.整理今日內容及博客
3.查詢IT行業可能出現的鎖名稱及概念
4.整理理論內容 嘗試撰寫cs架構的軟體 實作資料的上傳與下載
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/536093.html
標籤:其他
上一篇:JSP的頁面結構學習筆記
下一篇:day18-web工程路徑
