【目錄】
一、 multiprocessing模塊介紹
二、 process類的介紹
三、 process類的使用
四、僵尸行程和孤兒行程
五 、守護行程
六 、行程同步(互斥鎖)
七 、佇列--生產者消費者模型
八、死鎖現象與遞回鎖(見 多執行緒-應用部分 )
一、 multiprocessing模塊介紹
# python中的多執行緒無法利用多核優勢,如果想要充分地使用多核CPU的資源(os.cpu_count()查看),在python中大部分情況需要使用多行程,
# Python提供了multiprocessing 模塊——
# 作用:multiprocessing模塊用來開啟子行程,并在子行程中執行我們定制的任務(比如函式),該模塊與多執行緒模塊threading的編程介面類似
# 功能:multiprocessing模塊的功能眾多:支持子行程、通信和共享資料、執行不同形式的同步,提供了Process、Queue、Pipe、Lock等組件,
PS:需要再次強調的一點是:與執行緒不同,行程沒有任何共享狀態,行程修改的資料,改動僅限于該行程內,
二、 process類的介紹
1、創建行程的 類 process
Process([group [, target [, name [, args [, kwargs]]]]]),
由該類實體化得到的物件,表示一個子行程中的任務(尚未啟動)
強調:
1、需要使用關鍵字的方式來指定引數 eg: Process(target=task, args=('jason',))
2、group 引數未使用,值始終為None
3、target 表示呼叫物件,即子行程要執行的任務
4、args 表示呼叫物件target函式的位置引數元組,args=(1,2,'egon',) ,(元組形式,用逗號隔開元素,末尾一定必須有逗號)
5、kwargs 表示呼叫物件的字典,kwargs={'name':'egon','age':18}
6、name 為子行程的名稱
2、方法介紹
1、p.start()
啟動行程,并呼叫該子行程中的p.run()
2、p.run()行程啟動時運行的方法,正是它去呼叫target指定的函式,我們自定義類的類中一定要實作該方法
3、p.terminate()
強制終止行程p,不會進行任何清理操作,如果p創建了子行程,該子行程就成了僵尸行程,使用該方法需要特別小心這種情況,
如果p還保存了一個鎖那么也將不會被釋放,進而導致死鎖
4、p.is_alive()
如果p仍然運行,回傳True
5、p.join([timeout])
主執行緒等待p終止(強調:是主執行緒處于等的狀態,而p是處于運行的狀態),
timeout是可選的超時時間,需要強調的是,p.join只能join住start開啟的行程,而不能join住run開啟的行程
3、屬性介紹
1、p.daemon
默認值為False,如果設為True,代表p為后臺運行的守護行程,當p的父行程終止時,p也隨之終止,
并且設定為True后,p不能創建自己的新行程,必須在p.start()之前設定
2、p.name 行程的名稱
3、p.pid 當前行程的pid
4、p.ppid 當前行程的父行程的pid
5、p.exitcode
行程在運行時為None、如果為–N,表示被信號N結束(了解即可)
6、p.authkey
行程的身份驗證鍵,默認是由os.urandom()隨機生成的32字符的字串,
這個鍵的用途是為涉及網路連接的底層行程間通信提供安全性,這類連接只有在具有相同的身份驗證鍵時才能成功(了解即可)
三、 process類的使用
# windows作業系統下 創建行程一定要在main內創建,
因為windows下創建行程類似于模塊匯入的方式,會從上往下依次執行代碼
# linux中則是直接將代碼完整地拷貝一份
注意:在windows中,Process() 必須放到 # if __name__ == '__main__':下
1、創建行程的兩種方式
(1)、類實體化產生物件
# 第一種 from multiprocessing import Process import time def task(name): print('%s is running'%name) time.sleep(3) print('%s is over'%name) if __name__ == '__main__': # 1 創建一個物件 p = Process(target=task, args=('jason',)) # 容器型別哪怕里面只有1個元素 建議要用逗號隔開 # 2 開啟行程 p.start() # 告訴作業系統幫你創建一個行程 異步 print('主')
(2)、類的繼承 run方法
# 第二種方式 類的繼承 from multiprocessing import Process import time class MyProcess(Process): def run(self): print('hello bf girl') time.sleep(1) print('get out!') if __name__ == '__main__': p = MyProcess() p.start() print('主')
2、行程之間的記憶體空間是隔離的
from multiprocessing import Process n=100 #在windows系統中應該把全域變數定義在if __name__ == '__main__'之上就可以了 def work(): global n n=0 print('子行程內: ',n) if __name__ == '__main__': p=Process(target=work) p.start() print('主行程內: ',n)
3、Process物件的join方法——join:是主行程在等,等待子行程結束
from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.start() p.join(0.0001) #等待p停止,等0.0001秒就不再等了 print('開始') join:主行程等,等待子行程結束例子一
from multiprocessing import Process import time,os def task(): print('%s is running' %os.getpid()) time.sleep(3) if __name__ == '__main__': p=Process(target=task) p.start() p.join() # 等待行程p結束后,join函式內部會發送系統呼叫wait,去告訴作業系統回收掉行程p的id號 print(p.pid) # ???此時能否看到子行程p的id號 print('主') # ———————————————————————————— #答案:可以 #分析: p.join()是向作業系統發送請求,告知作業系統p的id號不需要再占用了,回收就可以, 此時在父行程內還可以看到p.pid,但此時的p.pid是一個無意義的id號,因為作業系統已經將該編號回收 打個比方: 我黨相當于作業系統,控制著整個中國的硬體,每個人相當于一個行程,每個人都需要跟我黨申請一個身份證號 該號碼就相當于行程的pid,人死后應該到我黨那里注銷身份證號,p.join()就相當于要求我黨回收身份證號,但p的家人(相當于主行程) 仍然持有p的身份證,但此刻的身份證已經沒有意義例子二
4、Process物件的其他方法或屬性(了解)
from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,5)) print('%s is piao end' %self.name) p1=Piao('egon1') p1.start() p1.terminate()#關閉行程,不會立即關閉,所以is_alive立刻查看的結果可能還是存活 print(p1.is_alive()) #結果為True print('開始') print(p1.is_alive()) #結果為Falseterminate 與 is_alive
from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): # self.name=name # super().__init__() #Process的__init__方法會執行self.name=Piao-1, # #所以加到這里,會覆寫我們的self.name=name #為我們開啟的行程設定名字的做法 super().__init__() self.name=name def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.start() print('開始') print(p.pid) #查看pidname與pid
四、僵尸行程和孤兒行程
1、僵尸行程——“死了沒死透”
僵尸行程(有害)
一個行程使用fork創建子行程,如果子行程退出,而父行程并沒有呼叫wait或waitpid獲取子行程的狀態資訊,那么子行程的行程描述符仍然保存在系統中,這種行程稱之為僵死行程,
詳解:我們知道在unix/linux中,正常情況下子行程是通過父行程創建的,子行程在創建新的行程,子行程的結束和父行程的運行是一個異步程序,即父行程永遠無法預測子行程到底什么時候結束,如果子行程一結束就立刻回收其全部資源,那么在父行程內將無法獲取子行程的狀態資訊,
#coding:utf-8 from multiprocessing import Process import time,os def run(): print('子',os.getpid()) if __name__ == '__main__': p=Process(target=run) p.start() print('主',os.getpid()) time.sleep(1000)僵尸行程
2、孤兒行程——“沒爹沒娘”
孤兒行程(無害)
一個父行程退出,而它的一個或多個子行程還在運行,那么那些子行程將成為孤兒行程,孤兒行程將被init行程(行程號為1)所收養,并由init行程對它們完成狀態收集作業,
孤兒行程是沒有父行程的行程,孤兒行程這個重任就落到了init行程身上,init行程就好像是一個民政局,專門負責處理孤兒行程的善后作業,每當出現一個孤兒行程的時候,內核就把孤 兒行程的父行程設定為init,而init行程會回圈地wait()它的已經退出的子行程,這樣,當一個孤兒行程凄涼地結束了其生命周期的時候,init行程就會代表黨和政府出面處理它的一切善后作業,因此孤兒行程并不會有什么危害,
孤兒行程-測驗
五 、守護行程
主行程創建守護行程——“皇帝(主行程)駕崩,好多陪葬(守護行程)”
其一:守護行程會在主行程代碼執行結束后就終止
其二:守護行程內無法再開啟子行程,
否則拋出例外:AssertionError: daemonic processes are not allowed to have children
注意:行程之間是互相獨立的,主行程代碼運行結束,守護行程隨即終止
from multiprocessing import Process import time import random class Piao(Process): def __init__(self,name): self.name=name super().__init__() def run(self): print('%s is piaoing' %self.name) time.sleep(random.randrange(1,3)) print('%s is piao end' %self.name) p=Piao('egon') p.daemon=True #一定要在p.start()前設定,設定p為守護行程,禁止p創建子行程,并且父行程代碼執行結束,p即終止運行 p.start() print('主')栗子1
#主行程代碼運行完畢,守護行程就會結束 from multiprocessing import Process from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() print("main-------") #列印該行則主行程代碼結束,則守護行程p1應該被終止,可能會有p1任務執行的列印資訊123,因為主行程列印main----時,p1也執行了,但是隨即被終止迷人的例子
六 、行程同步(互斥鎖)
行程之間資料不共享,但是共享同一套檔案系統,所以訪問 同一個檔案,或 同一個列印終端,是沒有問題的,
而共享帶來的是競爭,競爭帶來的結果就是錯亂,如何控制——加鎖處理——犧牲效率,保證有序和資料安全
1、多個行程共享同一列印終端
#并發運行,效率高,但競爭同一列印終端,帶來了列印錯亂 from multiprocessing import Process import os,time def work(): print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) if __name__ == '__main__': for i in range(3): p=Process(target=work) p.start()不加鎖:并發運行,效率高,但競爭同一列印終端,帶來了列印錯亂
#由并發變成了串行,犧牲了運行效率,但避免了競爭 from multiprocessing import Process,Lock import os,time def work(lock): lock.acquire() print('%s is running' %os.getpid()) time.sleep(2) print('%s is done' %os.getpid()) lock.release() if __name__ == '__main__': lock=Lock() for i in range(3): p=Process(target=work,args=(lock,)) p.start()加鎖:由并發變成了串行,犧牲了運行效率,但避免了競爭
2、多個行程共享同一檔案-- 檔案當資料庫,模擬搶票
#檔案db的內容為:{"count":1} #注意一定要用雙引號,不然json無法識別 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db.txt')) print('\033[43m剩余票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db.txt')) time.sleep(0.1) #模擬讀資料的網路延遲 if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模擬寫資料的網路延遲 json.dump(dic,open('db.txt','w')) print('\033[43m購票成功\033[0m') def task(lock): search() get() if __name__ == '__main__': lock=Lock() for i in range(100): #模擬并發100個客戶端搶票 p=Process(target=task,args=(lock,)) p.start()不加鎖:并發運行,效率高,但競爭寫同一檔案,資料寫入錯亂
#檔案db的內容為:{"count":1} #注意一定要用雙引號,不然json無法識別 from multiprocessing import Process,Lock import time,json,random def search(): dic=json.load(open('db.txt')) print('\033[43m剩余票數%s\033[0m' %dic['count']) def get(): dic=json.load(open('db.txt')) time.sleep(0.1) #模擬讀資料的網路延遲 if dic['count'] >0: dic['count']-=1 time.sleep(0.2) #模擬寫資料的網路延遲 json.dump(dic,open('db.txt','w')) print('\033[43m購票成功\033[0m') def task(lock): search() lock.acquire() get() lock.release() if __name__ == '__main__': lock=Lock() for i in range(100): #模擬并發100個客戶端搶票 p=Process(target=task,args=(lock,)) p.start()加鎖:購票行為由并發變成了串行,犧牲了運行效率,但保證了資料安全
3、總結——拋‘互斥鎖’磚,引出‘管道佇列’玉(請看第七部分--佇列)
#加鎖可以保證多個行程修改同一塊資料時,同一時間只能有一個任務可以進行修改,即串行的修改,沒錯,速度是慢了,但犧牲了速度卻保證了資料安全,
雖然可以用檔案共享資料實作行程間通信,但問題是:
1.效率低(共享資料基于檔案,而檔案是硬碟上的資料)
2.需要自己加鎖處理
#因此我們最好找尋一種解決方案能夠兼顧——基于訊息的IPC通信機制:佇列和管道
1、效率高(多個行程共享一塊記憶體的資料)
2、幫我們處理好鎖問題,這就是mutiprocessing模塊為我們提供的基于訊息的IPC通信機制:佇列和管道,
佇列和管道都是將資料存放于記憶體中,
佇列又是基于(管道+鎖)實作的,可以讓我們從復雜的鎖問題中解脫出來,
我們應該盡量避免使用共享資料,盡可能使用訊息傳遞和佇列,避免處理復雜的同步和鎖問題,而且在行程數目增多時,往往可以獲得更好的可獲展性,
七 、佇列--生產者消費者模型
行程彼此之間互相隔離,要實作行程間通信(IPC :Inter-Process Communication),
multiprocessing模塊支持兩種形式:佇列和管道,這兩種方式都是使用訊息傳遞的
1、類 Queue 的介紹與基本使用
(1)創建佇列的類(底層就是以管道和鎖定的方式實作):
Queue([maxsize]):創建共享的行程佇列,Queue是多行程安全的佇列,可以使用Queue實作多行程之間的資料傳遞,
引數介紹:
maxsize是佇列中允許最大項數,省略則無大小限制,
(2)主要方法介紹:
q.put()
用以插入資料到佇列中,put方法還有兩個可選引數:blocked和timeout,
如果blocked為True(默認值),并且timeout為正值,該方法會阻塞timeout指定的時間,直到該佇列有剩余的空間,如果超時,會拋出Queue.Full例外,
如果blocked為False,但該Queue已滿,會立即拋出Queue.Full例外,
q.get()
可以從佇列讀取并且洗掉一個元素,同樣,get方法有兩個可選引數:blocked和timeout,
如果blocked為True(默認值),并且timeout為正值,那么在等待時間內沒有取到任何元素,會拋出Queue.Empty例外,
如果blocked為False,有兩種情況存在,如果Queue有一個值可用,則立即回傳該值,否則,如果佇列為空,則立即拋出Queue.Empty例外.
q.get_nowait() 同q.get(False)
q.put_nowait() 同q.put(False)
q.empty()
呼叫此方法時q為空則回傳True,該結果不可靠,比如在回傳True的程序中,如果佇列中又加入了專案,
q.full()
呼叫此方法時q已滿則回傳True,該結果不可靠,比如在回傳True的程序中,如果佇列中的專案被取走,
q.qsize()
回傳佇列中目前專案的正確數量,結果也不可靠,理由同q.empty()和q.full()一樣
''' multiprocessing模塊支持行程間通信的兩種主要形式:管道和佇列 都是基于訊息傳遞實作的,但是佇列介面 ''' from multiprocessing import Process,Queue import time q=Queue(3) #put ,get ,put_nowait,get_nowait,full,empty q.put(3) q.put(3) q.put(3) print(q.full()) #滿了 print(q.get()) print(q.get()) print(q.get()) print(q.empty()) #空了應用舉例
2、生產者消費者模型
生產者消費者模型
在并發編程中使用生產者和消費者模式(通過一個容器來解決生產者和消費者的強耦合問題,)能夠解決絕大多數并發問題,
該模式通過平衡生產執行緒和消費執行緒的作業能力來提高程式的整體處理資料的速度,
#生產者消費者模型總結
#程式中有兩類角色
一類負責生產資料(生產者)
一類負責處理資料(消費者)
#引入生產者消費者模型為了解決的問題是:
平衡生產者與消費者之間的作業能力,從而提高程式整體處理資料的速度
#如何實作:
生產者<——>佇列<——>消費者
#生產者消費者模型 實作類程式的解耦和
from multiprocessing import Process,JoinableQueue import time,random,os def consumer(q): while True: res=q.get() time.sleep(random.randint(1,3)) print('\033[45m%s 吃 %s\033[0m' %(os.getpid(),res)) q.task_done() #向q.join()發送一次信號,證明一個資料已經被取走了 def producer(name,q): for i in range(10): time.sleep(random.randint(1,3)) res='%s%s' %(name,i) q.put(res) print('\033[44m%s 生產了 %s\033[0m' %(os.getpid(),res)) q.join() if __name__ == '__main__': q=JoinableQueue() #生產者們:即廚師們 p1=Process(target=producer,args=('包子',q)) p2=Process(target=producer,args=('骨頭',q)) p3=Process(target=producer,args=('泔水',q)) #消費者們:即吃貨們 c1=Process(target=consumer,args=(q,)) c2=Process(target=consumer,args=(q,)) c1.daemon=True c2.daemon=True #開始 p_l=[p1,p2,p3,c1,c2] for p in p_l: p.start() p1.join() p2.join() p3.join() print('主') #主行程等--->p1,p2,p3等---->c1,c2 #p1,p2,p3結束了,證明c1,c2肯定全都收完了p1,p2,p3發到佇列的資料 #因而c1,c2也沒有存在的價值了,應該隨著主行程的結束而結束,所以設定成守護行程View Code
參考資料:
https://www.cnblogs.com/linhaifeng/articles/7428874.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/154835.html
標籤:Python
