并發編程 - 行程
1、多道技術(基于單核背景下產生)
單道:指的是一條道路走到黑 --> 串行
串行:a,b需要使用cpu,a先試用,b等待a使用完成后,b才能使用cpu
多道:一條道路分配走 --> 并發
并發:a,b需要使用cpu,a先使用,b等待a,直到a進入“IO或執行時間過長”,a會(切換+保存狀態),然后b可以使用cpu,待b執行遇到“IO或執行時間過長”,再將cpu執行權限交給a,直到兩個程式結束

特點:
空間上的復用:(*******)
多個程式使用一個cpu
時間上的復用:(*******)
1.當執行程式遇到IO時,作業系統會將cpu的執行權限剝奪
優點:
cpu的執行效率提高
2.當執行程式遇到執行時間過長時,作業系統會將cpu的執行權限剝奪
缺點:
程式的執行效率低
2、行程
(1)什么是行程
行程是一個資源單位
(2)行程與程式
程式:一堆可執行的代碼檔案
行程:執行代碼的程序,稱之為行程
(3)行程調度(了解)
① 先來先服務調度演算法
比如程式 a,b,若a先來,則先讓a先服務,待a服務完畢后,b再服務
缺點:
執行效率低
② 短作業優先調度
執行時間越短,則先調度
缺點:
導致執行時間長的程式,需要等待所有時間短的程式執行完畢后,才能執行
現代作業系統的行程調度演算法: 時間片輪轉法 + 多級反饋佇列(知道)
③ 時間片輪轉法
比如同時有10個程式需要執行,作業系統會給你10秒,然后時間片輪轉法會將10秒分成10等份,
④ 多級反饋佇列
1級別佇列:優先級最高,先執行此佇列中的程式
2級別佇列:優先級以此類推
3級別佇列:...
3、串行、并發與并行
串行:類似于單道,相當于同步提交任務
并發:在單核(1個cpu)情況下,當執行a,b兩個程式時,a先執行,當a遇到IO時,b開始爭搶cpu的執行權限,再讓b執行,類似于多道,相當于異步提交任務,(看起來像同時運行)
并行:在多核(多個cpu)情況下,多個任務同時執行,(他們是真正意義上的同時運行)

4、同步與異步
同步與異步指的是 "提交任務的方式"
同步(串行):
兩個a,b程式都要提交并執行,假如a先提交執行,b必須等a執行完成后,b才能提交任務并執行
異步(并發):
兩個a,b程式都要提交并執行,假如a先提交執行,b無需等a執行完成,就可以直接提交任務并執行
5、阻塞與非阻塞
阻塞:
凡是遇到IO操作都會阻塞,進入阻塞態
IO操作:
input()
output()
time.sleep()
檔案的讀寫
資料的傳輸
非阻塞(不等待):
只要不遇到IO阻塞,其他都是非阻塞(比如計算從1到100萬的和)
IO密集型:很多IO操作
計算密集型:很多計算操作
6、行程的三種狀態
就緒態:
所有任務提交完成后,都會進入就緒態, (同步和異步)
運行態:
通過行程呼叫一個任務開始執行,該任務會進入運行態, (非阻塞)
程式運行時間過長,會將程式回傳給就緒態,等待下次調度
阻塞態:
凡是遇到IO操作的任務都會進入阻塞太,待IO操作結束,則阻塞態結束,進入就緒態,等待下次調度
7、行程的兩種創建方式(代碼)
方式1:直接呼叫Process類
例1:
import time from multiprocessing import Process def task(): # 任務 print("start...") time.sleep(3) print("end...") if __name__ == '__main__': # target=任務(函式地址) ---> 創建一個子行程 p_obj = Process(target=task) # 告訴作業系統去創建一個子行程 p_obj.start() # 告訴主行程,等待子行程結束后,再執行主行程 p_obj.join() print("正在執行當前主行程")
執行結果:
start...
end...
正在執行當前主行程
例2:
import time from multiprocessing import Process def task(name): # 任務 print(f"start...{name}的子行程") time.sleep(3) print(f"end...{name}的子行程") if __name__ == '__main__': # target=任務(函式地址) ---> 創建一個子行程 # 異步提交3個任務 (3個子行程) p_obj1 = Process(target=task, args=("apple", )) p_obj2 = Process(target=task, args=("orange", )) p_obj3 = Process(target=task, args=("banana", )) # 告訴作業系統去創建一個子行程 p_obj1.start() p_obj2.start() p_obj3.start() # 告訴主行程,等待子行程結束后,再執行主行程 p_obj1.join() p_obj2.join() p_obj3.join() print("正在執行當前主行程")
執行結果:
# apple、orange、banana三個行程同時提交 start...apple的子行程 start...orange的子行程 start...banana的子行程 end...apple的子行程 end...orange的子行程 end...banana的子行程 正在執行當前主行程
方式二:繼承Process類
import time from multiprocessing import Process class MyProcess(Process): def run(self): print(f"start...{self.name}的子行程") time.sleep(3) print(f"end...{self.name}的子行程") if __name__ == '__main__': # 定義一個空串列,將所有的子行程放進去并for回圈出來告訴主行程先執行子行程 list1 = [] # 假設10個行程 for i in range(10): obj = MyProcess() # 告訴作業系統去創建一個子行程 obj.start() list1.append(obj) for obj in list1: # 告訴主行程,等待子行程結束后,再執行主行程 obj.join() print("正在執行當前主行程")
執行結果:
# 10個行程同時提交 start...MyProcess-1的子行程 start...MyProcess-2的子行程 start...MyProcess-3的子行程 start...MyProcess-4的子行程 start...MyProcess-5的子行程 start...MyProcess-6的子行程 start...MyProcess-7的子行程 start...MyProcess-8的子行程 start...MyProcess-9的子行程 start...MyProcess-10的子行程 end...MyProcess-1的子行程 end...MyProcess-2的子行程 end...MyProcess-3的子行程 end...MyProcess-4的子行程 end...MyProcess-5的子行程 end...MyProcess-6的子行程 end...MyProcess-7的子行程 end...MyProcess-8的子行程 end...MyProcess-9的子行程 end...MyProcess-10的子行程 正在執行當前主行程
8、問答題
(1)在單核情況下是否可以實作并行?
不可以,并行只能在多核情況下實作
(2)阻塞與同步是一樣的嗎?非阻塞與異步是一樣的嗎?
不一樣
同步與異步:提交任務的方式
阻塞與非阻塞:行程的狀態
補充:
異步非阻塞:異步非阻塞可將cpu的利用率最大化! 用在通過并發對程式行程操作
9、子行程回收資源的兩種方式
① join()方法讓主行程等待子行程結束,并且回收子行程資源,主行程再結束并回收資源
② 主行程 "正常結束" ,子行程與主行程一并被回收資源
10、僵尸行程和孤兒行程(了解)
僵尸行程(有壞處,手動解決):
在子行程結束后,主行程沒有正常結束,子行程PID不會被回收
缺點:
> 作業系統中的PID號是有限的,如有行程PID號無法正常回收,則會占用PID號
> 資源浪費
> 若PID號滿了,則無法創建新的行程
孤兒行程(無壞處):
在子行程沒有結束時,主行程沒有 "正常結束" ,子行程PID不會被回收
作業系統優化機制(孤兒院):
當主行程意外終止,作業系統會檢測是否有正在運行的子行程,會把它們放入孤兒院中,讓作業系統幫你自動回收
# 查看行程pid的兩種方法 import time from multiprocessing import Process # 第一種:在子行程匯總呼叫,可以拿到子行程物件.pid可以拿到pid號 from multiprocessing import current_process # 第二種:在子行程匯總呼叫,可以拿到子行程物件.pid可以拿到pid號 import os def task(): print(f"start..{current_process().pid}") # 第一種 time.sleep(3) print(f"end..{os.getpid()}") # 第二種 if __name__ == '__main__': p = Process(target=task) # 告訴作業系統開啟子行程 p.start() # 告訴主行程,等待子行程結束后,再執行主行程 p.join() print("主行程結束...")
執行結果:
start..5224 end..5224 主行程結束...
11、守護行程
守護行程:當主行程結束時,子行程也必須結束,并回收資源
守護行程才能必須在創建子行程(呼叫p.start())之前設定
# 守護行程演示 import time from multiprocessing import Process def demo(name): print(f"start..{name}") time.sleep(3) print(f"end..{name}") if __name__ == '__main__': p = Process(target=demo, args=("apple",)) # 守護行程才能必須在呼叫p.start()之前設定 p.daemon = True # 將子行程p設定為守護行程 # 告訴作業系統開啟子行程 p.start() time.sleep(1) print("主行程結束...")
執行結果:
start..apple
主行程結束...
12、行程間資料是相互隔離的
from multiprocessing import Process number = 10 def func(): global number number += 10 if __name__ == '__main__': p = Process(target=func) p.start() print(number) # 10
執行結果:
10
13、互斥鎖
互斥鎖是一把鎖,用來保證資料讀寫安全
多行程實作并發會造成資料不安全,可以使用互斥鎖來避免這種問題
例:
搶票例子:假設有10個用戶來查看余票,余票為1張,進行搶票操作
""" 多行程實作并發會造成資料不安全, """ from multiprocessing import Process from multiprocessing import Lock # 行程互斥鎖 import json import time import random # 1.查看余票 def search(name): # 1.讀取資料 with open(r"data\orange.json", "r", encoding="utf-8") as f: user_dic = json.load(f) print(f"用戶【{name}】查看的余票為【{user_dic.get('number')}】") # 2.若有余票,購買成功,票數減少 def buy(name): # 讀取目錄data下的orange.json檔案 with open(r"data\orange.json", "r", encoding="utf-8") as f: user_dic = json.load(f) if user_dic.get('number') > 0: user_dic["number"] -= 1 # 設定網路延遲,讓其他用戶同時參與搶票 time.sleep(random.randint(1, 3)) # 最先寫入資料檔案的證明搶到票 with open(r"data\orange.json", "w", encoding="utf-8") as f: json.dump(user_dic, f) print(f"用戶【{name}】搶票成功") else: print(f"用戶【{name}】搶票失敗") def run(name, lock): # 假設有10個用戶來查看余票 search(name) lock.acquire() # 加鎖,第一個人搶到票寫入檔案之后,其他人就不能對檔案進行修改 buy(name) lock.release() # 釋放鎖 if __name__ == '__main__': lock = Lock() # 開啟多行程:實作并發 for line in range(10): p_obj = Process(target=run, args=(f"謝廣坤{line + 1}號", lock)) p_obj.start()
執行結果:
用戶【謝廣坤1號】查看的余票為【1】 用戶【謝廣坤2號】查看的余票為【1】 用戶【謝廣坤3號】查看的余票為【1】 用戶【謝廣坤4號】查看的余票為【1】 用戶【謝廣坤6號】查看的余票為【1】 用戶【謝廣坤5號】查看的余票為【1】 用戶【謝廣坤7號】查看的余票為【1】 用戶【謝廣坤8號】查看的余票為【1】 用戶【謝廣坤9號】查看的余票為【1】 用戶【謝廣坤10號】查看的余票為【1】 用戶【謝廣坤1號】搶票成功 用戶【謝廣坤2號】搶票失敗 用戶【謝廣坤3號】搶票失敗 用戶【謝廣坤4號】搶票失敗 用戶【謝廣坤6號】搶票失敗 用戶【謝廣坤5號】搶票失敗 用戶【謝廣坤7號】搶票失敗 用戶【謝廣坤8號】搶票失敗 用戶【謝廣坤9號】搶票失敗 用戶【謝廣坤10號】搶票失敗
14、佇列
遵循先進先出原則:
先存放的資料,就先取出來
佇列相當于一個第三方的管道,可以存放資料
佇列的三種使用方法:
第一種:from multiprocessing import Queue # multiprocessing提供的行程佇列,先進先出
第二種:from multiprocessing import JoinableQueue # 基于Queue封裝的佇列,先進先出
第三種:import queue # python內置的佇列queue,先進先出
應用:讓行程之間進行資料互動
例:
第一種方法:multiprocessing提供的行程佇列
# 第一種:Queue from multiprocessing import Queue # multiprocessing提供的行程佇列,先進先出 q_obj1 = Queue(5) # q_obj1佇列物件,最多可存5個資料 # 添加資料到佇列中 q_obj1.put("decade") print("添加第一個") q_obj1.put("zio") print("添加第二個") q_obj1.put("amazon") print("添加第三個") q_obj1.put("build") print("添加第四個") q_obj1.put("kabuto") print("添加第五個") # put()添加第6個時不會提示報錯,佇列滿了,就會進入阻塞 # q_obj1.put("ooo") # put_nowait()添加第6個時會提示報錯 # q_obj1.put_nowait("ooo") # 從佇列中取資料 print(q_obj1.get()) print(q_obj1.get()) print(q_obj1.get()) print(q_obj1.get()) print(q_obj1.get()) # .get()取出第6個時不會提示報錯,全部取出之后再取,就會進入阻塞 # print(q_obj1.get()) # .get_nowait()取出第6個時會提示報錯 # print(q_obj1.get_nowait())
執行結果:
添加第一個
添加第二個
添加第三個
添加第四個
添加第五個
decade
zio
amazon
build
kabuto
第二種方法:基于Queue封裝的佇列
# 第二種:JoinableQueue from multiprocessing import JoinableQueue # 基于Queue封裝的佇列,先進先出 q_obj1 = JoinableQueue(5) # q_obj1佇列物件,最多可存5個資料 # 添加資料到佇列中 q_obj1.put("decade") print("添加第一個") q_obj1.put("zio") print("添加第二個") q_obj1.put("amazon") print("添加第三個") q_obj1.put("build") print("添加第四個") q_obj1.put("kabuto") print("添加第五個") # put()添加第6個時不會提示報錯,佇列滿了,就會進入阻塞 # q_obj1.put("ooo") # put_nowait()添加第6個時會提示報錯 # q_obj1.put_nowait("ooo") # 從佇列中取資料 print(q_obj1.get()) print(q_obj1.get()) print(q_obj1.get()) print(q_obj1.get()) print(q_obj1.get()) # .get()取出第6個時不會提示報錯,全部取出之后再取,就會進入阻塞 # print(q_obj1.get()) # .get_nowait()取出第6個時會提示報錯 # print(q_obj1.get_nowait())
執行結果:
添加第一個
添加第二個
添加第三個
添加第四個
添加第五個
decade
zio
amazon
build
kabuto
第三種方法:python內置的佇列queue
# 第三種:queue import queue # python內置的佇列,先進先出 q_obj1 = queue.Queue(5) # q_obj1佇列物件,最多可存5個資料 # 添加資料到佇列中 q_obj1.put("decade") print("添加第一個") q_obj1.put("zio") print("添加第二個") q_obj1.put("amazon") print("添加第三個") q_obj1.put("build") print("添加第四個") q_obj1.put("kabuto") print("添加第五個") # put()添加第6個時不會提示報錯,佇列滿了,就會進入阻塞 # q_obj1.put("ooo") # put_nowait()添加第6個時會提示報錯 # q_obj1.put_nowait("ooo") # 從佇列中取資料 print(q_obj1.get()) print(q_obj1.get()) print(q_obj1.get()) print(q_obj1.get()) print(q_obj1.get()) # .get()取出第6個時不會提示報錯,全部取出之后再取,就會進入阻塞 # print(q_obj1.get()) # .get_nowait()取出第6個時會提示報錯 # print(q_obj1.get_nowait())
執行結果:
添加第一個
添加第二個
添加第三個
添加第四個
添加第五個
decade
zio
amazon
build
kabuto
15、IPC機制(行程間實作通信)
行程之間可以互相進行通信
例:foo和goo兩個函式之間進行資料通信
from multiprocessing import Process from multiprocessing import JoinableQueue import time def foo(q): x = 100 q.put(x) # 向佇列中添加資料 print("foo:添加資料成功!") time.sleep(2) # 遇到IO操作阻塞,執行goo子行程,goo子行程執行完成(無IO操作阻塞的情況下),繼續執行foo子行程 from_goo = q.get() # 向佇列中取資料 print(f"foo:從goo中獲取到的資料為{from_goo}") def goo(q): from_foo = q.get() # 向佇列中取資料 print(f"goo:從foo中獲取到的資料為{from_foo}") q.put(200) # 向佇列中添加資料 print("goo:添加資料成功!") if __name__ == '__main__': q = JoinableQueue(10) # 創建佇列可添加資料為10個 # 創建子行程 p1 = Process(target=foo, args=(q, )) p2 = Process(target=goo, args=(q, )) # 執行子行程 p1.start() p2.start()
執行結果:
foo:添加資料成功!
goo:從foo中獲取到的資料為100
goo:添加資料成功!
foo:從goo中獲取到的資料為200
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/203903.html
標籤:Python
上一篇:python3爬蟲應用--爬取網易云音樂(兩種辦法)
下一篇:關于frida的例外
