1.行程間的通信方式
1.磁盤互動: 速度慢,不安全
2.socket套接字
3.管道通信(Pipe)
4.訊息佇列(Queue, Manager().Queue, JoinableQueue)
5.共享記憶體(Value, Array)
6.信號(os.kill, signal)
7.信號量(Semaphore)
8.共享資料(Manager)
2.管道通信-Pipe
1.概述: 在記憶體中開辟一塊空間,對多個行程可見,通過管道實作多行程通信
2.語法
from multiprocess import Pipe fd1, fd2 = Pipe(duplex=True) # duplex默認為True表示雙向管道,設定False則表示單向管道 回傳值: 回傳兩個管道流物件,表示管道的兩端 如果 duplex=True 則是雙向管道則 fd1可讀可寫,fd2可讀可寫 如果 duplex=False 則是單向管道則 fd1只能,讀fd2只能寫 date = fd1.recv() # 接收并回傳接收的訊息(每次接收一條),如果管道沒有訊息會阻塞等待 fd2.send(data) # 發送訊息,可以是字串或其他型別,date為要發送的內容,如果沒有接收端則管道破裂
3.示例1
from multiprocessing import Process from multiprocessing import Pipe import time import os # 創建一個雙向管道 fd1, fd2 = Pipe() # 如果引數為False,則表示創建一個單向管道,此時fd1只等recv,fd2只能send # fd1, fd2 = Pipe(False) def fun(name): time.sleep(1) # 每個子行程都向管道中發送字串訊息 fd1.send("hello %s" % name) # 收發支持Python的多種資料型別,數值,字串,串列等等 print("fun行程的pid是:%s 父行程的pid是:%s" % (os.getpid(), os.getppid())) jobs = list() for i in range(5): p = Process(target=fun, args=(i,)) jobs.append(p) p.start() # 父行程從管道中取訊息 for i in range(5): data = fd2.recv() print(data) for i in jobs: i.join() """執行結果 fun行程的pid是:24334 父行程的pid是:24333 hello 0 hello 1 fun行程的pid是:24335 父行程的pid是:24333 hello 2 fun行程的pid是:24336 父行程的pid是:24333 hello 3 fun行程的pid是:24337 父行程的pid是:24333 hello 4 fun行程的pid是:24338 父行程的pid是:24333 """
4.示例2
from multiprocessing import Pipe from multiprocessing import Process def func(con): con1, con2 = con con1.close() # 子行程使用con2和父行程通信,所以關閉con1 while 1: try: print(con2.recv()) # 當主行程的con1發資料時,子行程要死回圈的去接收 except EOFError: # 如果主行程的con1發完資料并關閉con1,子行程的con2繼續接收時,就會報錯,使用try的方式,獲取錯誤 con2.close() # 獲取到錯誤,就是指子行程已經把管道中所有資料都接收完了,所以用這種方式去關閉管道 break if __name__ == '__main__': con1, con2 = Pipe() p = Process(target=func, args=((con1, con2),)) p.start() con2.close() # 在父行程中,使用con1去和子行程通信,所以不需要con2,就提前關閉 for i in range(10): # 生產資料 con1.send(i) # 給子行程的con2發送資料 con1.close() # 生產完資料,關閉父行程這一端的管道
3.訊息佇列通信-Queue, Manager().Queue, JoinableQueue
1.訊息佇列概述
在記憶體中開辟佇列模型,用來存放訊息,任何擁有佇列的行程都可以存取訊息,訊息佇列是先進先出
from queue import Queue # 是行程內非阻塞佇列即執行緒佇列,類似于普通串列
from multiprocessing import Queue # 是跨行程通信佇列,用于解決多行程間的通信問題
from multiprocessing import Manager # 是行程池中各子行程間的通信,使用鎖 lock = manager.Queue().Lock()
2.語法
q = Queue(maxsize=0) # 創建一個訊息佇列,回傳訊息佇列物件 引數: maxsize默認為0表示佇列可存放訊息,不指定或數量為負值時容量由記憶體而定,大于0表示佇列最多存放多少條訊息 q.put(item,[block[, timeout]]) # 將item訊息寫入佇列,當佇列滿時會阻塞,要存放的訊息(字串,整數,串列) 可選引數: block: 默認為True表示阻塞,這種為False則非阻塞 timeout: 在block為True時設定超時時間,單位是秒, 例如: q.put("test", True, 3) 如果block值為默認的True 沒有設定timeout,訊息列隊如果已經沒有空間可寫入,此時程式將被阻塞(停在寫入狀態),直到從訊息列隊騰出空間為止 設定了timeout,則會等待timeout秒,若還沒空間,則拋出"Queue.Full"例外 如果block值為False: 訊息列隊如果沒有空間可寫入,則會立刻拋出"Queue.Full"例外 q.get([block[, timeout]]) # 向佇列中取出并回傳訊息,然后將其從列隊中移除,當佇列空時會阻塞 可選引數: block: 默認為True表示阻塞,設定為False則非阻塞 timeout: 在block為True時設定超時時間,單位是秒, 例如 q.get(True, 3) 如果block值為True 沒有設定timeout,訊息列隊如果為空,此時程式將被阻塞(停在讀取狀態),直到從訊息列隊讀到訊息為止 設定了timeout,則會等待timeout秒,若還沒讀取到任何訊息,則拋出"Queue.Empty"例外 如果block值為False: 訊息列隊如果為空,則會立刻拋出"Queue.Empty"例外 q.put_nowait(item) # 用法相當于 q.put(item, False) q.get_nowait() # 用法相當于 q.get(False) q.full() # 判斷佇列是否為滿,滿則回傳True q.empty() # 判斷佇列是否為空,空則回傳True q.qsize() # 得到當前佇列中訊息的個數 q.close() # 關閉佇列
3.行程佇列
from multiprocessing import Process from multiprocessing import Queue import time import random # 寫資料行程執行的代碼: def write(q): for value in ['A', 'B', 'C']: print('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) # 讀資料行程執行的代碼: def read(q): while True: if not q.empty(): value = q.get(True) print('Get %s from queue.' % value) time.sleep(random.random()) else: break if __name__ == '__main__': # 父行程創建Queue,并傳給各個子行程: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 啟動子行程pw,寫入: pw.start() # 等待pw結束: pw.join() # 啟動子行程pr,讀取: pr.start() pr.join() # pr行程里是死回圈,無法等待其結束,只能強行終止: print("") print("所有資料都寫入并且讀完") """執行結果 Put A to queue... Put B to queue... Put C to queue... Get A from queue. Get B from queue. Get C from queue. """
4.行程池佇列
# 行程池中使用佇列,修改import中的Queue為Manager from multiprocessing import Manager from multiprocessing import Pool import os import time def reader(q): print("reader啟動(%s),父行程為(%s)" % (os.getpid(), os.getppid())) for i in range(q.qsize()): print("reader從Queue獲取到訊息:%s" % q.get(True)) def writer(q): print("writer啟動(%s),父行程為(%s)" % (os.getpid(), os.getppid())) for i in "itcast": q.put(i) if __name__ == "__main__": print("(%s) start" % os.getpid()) q = Manager().Queue() # 使用Manager中的Queue po = Pool() po.apply_async(writer, (q,)) time.sleep(1) # 先讓上面的任務向Queue存入資料,然后再讓下面的任務開始從中取資料 po.apply_async(reader, (q,)) po.close() po.join() print("(%s) End" % os.getpid()) """執行結果 (24474) start writer啟動(24476),父行程為(24474) reader啟動(24477),父行程為(24474) reader從Queue獲取到訊息:i reader從Queue獲取到訊息:t reader從Queue獲取到訊息:c reader從Queue獲取到訊息:a reader從Queue獲取到訊息:s reader從Queue獲取到訊息:t (24474) End """
5.行程佇列實作生產者消費者模型
# 實作方案一: 生產者生產結束的標識,放到生產者行程中 from multiprocessing import Queue from multiprocessing import Process # 生產者 def consumer(q, name): while 1: info = q.get() if info: print('%s 拿走了%s' % (name, info)) else: # 當消費者獲得佇列中資料時,如果獲得的是None,就是獲得到了生產者不再生產資料的標識 break # 此時消費者結束即可 # 消費者 def producer(q, product): for i in range(20): info = product + '的智能手機%s號' % str(i) q.put(info) q.put(None) # 讓生產者生產完資料后,給消費者一個不再生產資料的標識 if __name__ == '__main__': q = Queue(10) p_pro = Process(target=producer, args=(q, '中國制造')) p_con = Process(target=consumer, args=(q, 'Kali')) p_pro.start() p_con.start() """執行結果 Kali 拿走了中國制造的手辦0號 Kali 拿走了中國制造的手辦1號 Kali 拿走了中國制造的手辦2號 Kali 拿走了中國制造的手辦3號 Kali 拿走了中國制造的手辦4號 Kali 拿走了中國制造的手辦5號 Kali 拿走了中國制造的手辦6號 Kali 拿走了中國制造的手辦7號 Kali 拿走了中國制造的手辦8號 Kali 拿走了中國制造的手辦9號 Kali 拿走了中國制造的手辦10號 Kali 拿走了中國制造的手辦11號 Kali 拿走了中國制造的手辦12號 Kali 拿走了中國制造的手辦13號 Kali 拿走了中國制造的手辦14號 Kali 拿走了中國制造的手辦15號 Kali 拿走了中國制造的手辦16號 Kali 拿走了中國制造的手辦17號 Kali 拿走了中國制造的手辦18號 Kali 拿走了中國制造的手辦19號 """ # 實作方案二: 生產者生產結束的標識,放到父行程中 from multiprocessing import Queue from multiprocessing import Process # 生產者 def consumer(q, name, color): while 1: info = q.get() if info: print('%s%s 拿走了%s \033[0m' % (color, name, info)) else: # 當消費者獲得佇列中資料時,如果獲得的是None,就是獲得到了生產者不再生產資料的標識 break # 此時消費者結束即可 # 消費者 def producer(q, product): for i in range(10): info = product + '的智能手機%s號' % str(i) q.put(info) if __name__ == '__main__': q = Queue(10) # 多個生產者行程 p_pro1 = Process(target=producer, args=(q, '中國制造')) p_pro2 = Process(target=producer, args=(q, '美國制造')) p_pro3 = Process(target=producer, args=(q, '日本制造')) # 多個消費者行程 p_con1 = Process(target=consumer, args=(q, 'Kali', '\033[31m')) p_con2 = Process(target=consumer, args=(q, 'Coco', '\033[32m')) p_l = [p_con1, p_con2, p_pro1, p_pro2, p_pro3] [i.start() for i in p_l] # 父行程通過等待生產者行程結束后再發送結束標識 p_pro1.join() p_pro2.join() p_pro3.join() q.put(None) # 幾個消費者就要接受幾個結束標識 q.put(None) """執行結果 Kali 拿走了日本制造的智能手機0號 Coco 拿走了日本制造的智能手機1號 Kali 拿走了日本制造的智能手機2號 Coco 拿走了日本制造的智能手機3號 Kali 拿走了日本制造的智能手機4號 Coco 拿走了日本制造的智能手機5號 Kali 拿走了日本制造的智能手機6號 Coco 拿走了日本制造的智能手機7號 Kali 拿走了日本制造的智能手機8號 Coco 拿走了日本制造的智能手機9號 Kali 拿走了美國制造的智能手機0號 Coco 拿走了美國制造的智能手機1號 Kali 拿走了美國制造的智能手機2號 Coco 拿走了美國制造的智能手機3號 Kali 拿走了美國制造的智能手機4號 Coco 拿走了美國制造的智能手機5號 Kali 拿走了美國制造的智能手機6號 Coco 拿走了美國制造的智能手機7號 Kali 拿走了美國制造的智能手機8號 Coco 拿走了美國制造的智能手機9號 Kali 拿走了中國制造的智能手機0號 Coco 拿走了中國制造的智能手機1號 Kali 拿走了中國制造的智能手機2號 Coco 拿走了中國制造的智能手機3號 Kali 拿走了中國制造的智能手機4號 Coco 拿走了中國制造的智能手機5號 Kali 拿走了中國制造的智能手機6號 Coco 拿走了中國制造的智能手機7號 Kali 拿走了中國制造的智能手機8號 Coco 拿走了中國制造的智能手機9號 """
6.行程佇列JoinableQueue模塊+守護行程實作生產者消費者模型
from multiprocessing import Process,JoinableQueue q = JoinableQueue() # q.join() # 用于生產者,等待 q.task_done的回傳結果,通過回傳結果,生產者就能獲得消費者當前消費了多少個資料 # q.task_done() # 用于消費者,是指每消費佇列中一個資料,就給join回傳一個標識 # 假設生產者生產了100個資料,join就能記錄下100這個數字,每次消費者消費一個資料,就必須要task_done回傳一個標識 # 當生產者(join)接收到100個消費者回傳來的標識的時候,生產者就能知道消費者已經把所有資料都消費完了 # 消費者 def consumer(q, name, color): while 1: info = q.get() print('%s %s 拿走了%s \033[0m' % (color, name, info)) q.task_done() # 生產者 def producer(q, product): for i in range(20): info = product + '的智能手機%s號' % str(i) q.put(info) q.join() # 記錄了生產了20個資料在佇列中,此時會阻塞等待消費者消費完佇列中所有資料 if __name__ == '__main__': q = JoinableQueue(10) p_pro1 = Process(target=producer, args=(q, '中國制造')) p_con1 = Process(target=consumer, args=(q, 'Kali', '\033[31m')) p_con1.daemon = True # 把消費者行程設為守護行程 p_con1.start() p_pro1.start() p_pro1.join() # 主行程等待生產者行程結束 # 程式有3個行程,主行程和生產者行程和消費者行程,當主行程執行到p_pro1.join()代碼時,主行程會等待生產行程結束 # 而生產行程中執行到q.join()會等待消費者行程把所有資料消費完,生產者行程才結束 # 現在的狀態就是主行程等待生產者行程結束,生產者行程等待消費者消費完所有資料 # 所以,把消費者設定為守護行程,當主行程執行完,就代表生產行程已經結束,也就代表消費者行程已經把佇列中資料消費完 # 此時,主行程一旦結束,守護行程也就是消費者行程也就跟著結束,整個程式也就能正常結束了 """執行結果 Kali 拿走了中國制造的智能手機0號 Kali 拿走了中國制造的智能手機1號 Kali 拿走了中國制造的智能手機2號 Kali 拿走了中國制造的智能手機3號 Kali 拿走了中國制造的智能手機4號 Kali 拿走了中國制造的智能手機5號 Kali 拿走了中國制造的智能手機6號 Kali 拿走了中國制造的智能手機7號 Kali 拿走了中國制造的智能手機8號 Kali 拿走了中國制造的智能手機9號 Kali 拿走了中國制造的智能手機10號 Kali 拿走了中國制造的智能手機11號 Kali 拿走了中國制造的智能手機12號 Kali 拿走了中國制造的智能手機13號 Kali 拿走了中國制造的智能手機14號 Kali 拿走了中國制造的智能手機15號 Kali 拿走了中國制造的智能手機16號 Kali 拿走了中國制造的智能手機17號 Kali 拿走了中國制造的智能手機18號 Kali 拿走了中國制造的智能手機19號 """
4.共享記憶體-Value, Array
1.概述: 在記憶體中開辟一段空間存盤資料,對多個行程可見,每次寫入共享記憶體的資料會覆寫之前的內容,對記憶體格式化較少,但存取速度快
2.語法
from multiprocess import Value from multiprocess import Array obj = Value(ctype, obj) # 開辟共享記憶體空間,回傳一個共享記憶體物件 引數: ctype: str要轉變的c型別(對照ctype表) obj: 寫入共享記憶體的初始值 obj.value # 得到共享記憶體中的值 obj = Array(ctype, obj) # 開辟共享記憶體空間(陣列),回傳一個共享記憶體物件 引數: ctype: 要轉換的型別 obj: 存入到共享記憶體中的資料,obj是一個串列且串列中的資料型別必須一致,obj是正整數則表示開辟一個多大的記憶體空間
3.行程間通信-Value
from multiprocessing import Process from multiprocessing import Value import time import random # 向共享記憶體中存取 def deposit(money): for i in range(100): time.sleep(0.01) money.value += random.randint(1, 100) # 向共享記憶體中取錢 def withdraw(money): for i in range(100): time.sleep(0.01) money.value -= random.randint(1, 50) def main(): money = Value("i", 1000) # "i"在ctype對照表中對應的資料型別是Python的int型別 de = Process(target=deposit, args=(money,)) wi = Process(target=withdraw, args=(money,)) de.start() wi.start() de.join() wi.join() print(money.value) # 本次執行結果money.value的值是3401 if __name__ == "__main__": main()
4.行程間通信-Array
from multiprocessing import Process from multiprocessing import Array def func(shm): for i in shm: print(i, end=" ") # 子行程中列印結果: 1 2 3 4 5 shm[0] = 11 def main(): # 開辟記憶體共享空間,可容納5個整數,初始值是[1, 2, 3, 4, 5] shm = Array("i", [1, 2, 3, 4, 5]) # "i"在ctype對照表中對應的資料型別是Python的int型別 # 子記憶體共享空間開辟一個包含5個整形的空間 # shm = Array("i", 5) # 沒有初始值遍歷shm結果為 0 0 0 0 0 此時可以靈活的修改資料 p = Process(target=func, args=(shm,)) p.start() p.join() print() for i in shm: print(i, end=" ") # 主行程中列印結果: 11 2 3 4 5 if __name__ == "__main__": main()
5.管道訊息佇列共享記憶體對比
| 對比項 | 管道 | 訊息佇列 | 共享記憶體 |
| 開辟空間 | 記憶體 | 記憶體 | 記憶體 |
| 讀寫方式 | 雙向/單向 | 先進先出 | 操作覆寫記憶體 |
| 效率 | 一般 | 一般 | 快 |
| 應用 | 多用于親緣行程 | 方便靈活廣泛 | 較復雜 |
| 是否需要互斥機制 | 否 | 否 | 是 |
6.信號-os.kill, signal
1.概述
1.信號的名字,作用和處理信號都是有系統定義的,一個行程向另一個行程通過信號傳遞某種訊息
2.信號的默認處理方法: 系統定義,信號給接收信號的行程帶來的行為一般有 終止 暫停 忽略
3.信號屬于異步通信方式,信號的發送不會影響行程的持續執行
4.命令列下信號操作
kill -l: 查看信號
kill -signame PID: 給PID的行程發送一個信號, 例如 kell -9 2332
2.常用信號
# 默認操作: 終止的信號 SIGHUP: 該信號在用戶終端連接(正常或非正常)結束時發出,通常是在終端的控制行程結束時,通知同一會話內的各個作業與控制終端不再關系 SIGINT: 該信號在用戶鍵入INTR字符(通常是Ctrl+c)時發出,終端驅動程式發送此信號并送到前臺行程中的每一個行程 SIGQUIT: 該信號和SIGINT類似,但由QUIT字符(通常是'"Ctrl+\"')來控制 SIGILL: 該信號在一個行程企圖執行一條非法指令時(可執行檔案本身出現錯誤,或者試圖執行資料段,堆疊溢位時)發出 SIGFPE: 該信號在發送致命的算術運算錯誤時發出,不僅包括浮點運算錯誤還包括溢位和除數為0等其他所有的算數的錯誤 SIGKILL: 該信號用來立即結束程式的運行,并且不能被阻塞,處理和忽略 SIGALRM: 該信號當一個定時器到時的時候發出 SIGABORT: 該信號用于結束行程 # 默認操作: 暫停行程 SIGSTOP: 該信號用于暫停一個行程,并且不能被阻塞,處理和忽略 SIGTSTP: 該信號用于暫停互動行程,用戶可以鍵入SUSP字符(通常是Ctrl+z)發出這個信號 # 默認操作: 忽略 SIGCHLD: 子行程改變狀態時,父行程會收到這個信號
3.發送信號
語法
import signal os.kill(pid, sig) # 向一個行程發送一個信號 引數: pid: 要發送信號的行程PID sig: 要發送的信號 signal.alarm(sec) # 向自身發送一個時鐘信號SIGALRM,一個行程中只能同時有一個時鐘,后面的時鐘時間會覆寫前面的時鐘時間 引數: sec代表時鐘秒數
示例1
import os import signal # 向 2332 行程發送SIGKILL信號,殺死2332行程 os.kill(2332, signal.SIGKILL)
示例2
import time import signal # 3秒鐘之后向自身發送SIGALRM信號,終止行程 signal.alarm(3) time.sleep(1) signal.alarm(5) # 后面的時鐘時間會覆寫前面的時鐘時間 while True: time.sleep(1) print("等待時鐘...")
4.信號處理
語法
import signal signal.pause() # 阻塞等待一個信號的發生 signal.signal(signum, handler) # 處理一個信號 使用說明: 是一個異步處理信號的函式,只要執行在行程中就會按照指定方法處理信號,但是不能處理 SIGSTOP 和 SIGKILL 信號 引數: signum: 要處理的信號 handler: 對該信號的處理方法 SIG_DFL: 采用默認方法 SIG_IGN: 忽略這個信號 func: 回呼函式,自定義處理方法 自定義處理方法格式要求 def func(sig, frame): # sig: 接收到的信號;frame: 信號物件 pass
示例1
import time import signal # 5秒鐘之后向自身發送SIGALRM信號,終止行程 signal.alarm(5) # 采用默認的方法處理SIGALRM信號 signal.signal(signal.SIGALRM, signal.SIG_DFL) # 采用忽略信號的方法處理SIGALRM信號 # signal.signal(signal.SIGALRM, signal.SIG_IGN) # signal.signal(signal.SIGINT, signal.SIG_IGN) # 忽略Ctrl+c終止行程,即行程運行時無法使用Ctrl+c來終止 while True: time.sleep(1) print("等待時鐘...")
示例2
import time import signal # 5秒鐘之后向自身發送SIGALRM信號,終止行程 signal.alarm(5) # 固定格式要求 def handler(sig, frame): if sig == signal.SIGALRM: print("收到了時鐘信號: %s" % sig) elif sig == signal.SIGINT: print("收到了時鐘信號: %s" % sig) # 通過自定義方法handler處理SIGALRM信號 signal.signal(signal.SIGALRM, handler) signal.signal(signal.SIGINT, handler) while True: time.sleep(1) print("等待時鐘...")
5.使用信號處理僵尸行程
import signal
原理: 在父行程中忽略子行程的發送信號
語法: signal.signal(signal.SIGCHLD, signal.SIG_IGN)
6.多行程實作信號通信
import multiprocessing from signal import * from time import sleep import os # 售票員處理信號 def conductor_handler(sig, frame): if sig == SIGINT: os.kill(os.getppid(), SIGUSR1) elif sig == SIGQUIT: os.kill(os.getppid(), SIGUSR2) elif sig == SIGUSR1: print("到站了") os._exit(0) # 子行程中售票員發送信號 def conductor(): signal(SIGINT, conductor_handler) signal(SIGQUIT, conductor_handler) signal(SIGUSR1, conductor_handler) # 子行程中忽略父行程要處理的信號 signal(SIGTSTP, SIG_IGN) while True: sleep(3) print("車內開始廣播...") # 司機處理信號 def driver_handler(sig, frame): if sig == SIGUSR1: print("老司機開車") elif sig == SIGUSR2: print("老司機以鎖死車門") elif sig == SIGTSTP: os.kill(p_pid, SIGUSR1) def main(): p = multiprocessing.Process(target=conductor) p.start() global p_pid p_pid = p.pid # 父行程中司機發送信號 signal(SIGUSR1, driver_handler) signal(SIGUSR2, driver_handler) signal(SIGTSTP, driver_handler) # 父行程忽略子行程要處理的信號 signal(SIGINT, SIG_IGN) signal(SIGQUIT, SIG_IGN) p.join() if __name__ == "__main__": main() """執行結果 ~/Desktop/python3/demo $ python3 demo5.py 車內開始廣播... 車內開始廣播... ^C老司機開車 ^C老司機開車 ^C老司機開車 車內開始廣播... ^\老司機以鎖死車門 ^\老司機以鎖死車門 ^\老司機以鎖死車門 車內開始廣播... 車內開始廣播... ^Z到站了 """
7.信號量-Semaphore
1.概述: 給定一定的信號數量,對多個行程可見并且多個行程均可操作,行程根據信號量的多少可以有不同的行為
2.語法
from multiprocess import Semaphore sem = Semaphore(num) # 定義信號量并回傳信號量物件,num是給定信號量的初始個數 sem.acquire() # 將信號量減一,信號量為0時呼叫次方法會阻塞等待 sem.release() # 將信號量加一
3.示例1
import time import multiprocessing def func(sem): print("行程%s等待信號量" % multiprocessing.current_process()) # current_process獲取當前行程物件 sem.acquire() # 信號量減1 print("行程%s消耗信號量" % multiprocessing.current_process()) time.sleep(2) print("行程%s添加信號量" % multiprocessing.current_process()) sem.release() # 信號量加1 def main(): # 創建信號量初始值為3 sem = multiprocessing.Semaphore(3) jobs = list() for i in range(4): p = multiprocessing.Process(target=func, args=(sem,)) p.start() jobs.append(p) for i in jobs: i.join() if __name__ == "__main__": main() """執行結果 行程<Process(Process-1, started)>等待信號量 行程<Process(Process-1, started)>消耗信號量 行程<Process(Process-2, started)>等待信號量 行程<Process(Process-2, started)>消耗信號量 行程<Process(Process-3, started)>等待信號量 行程<Process(Process-3, started)>消耗信號量 行程<Process(Process-4, started)>等待信號量 行程<Process(Process-1, started)>添加信號量 行程<Process(Process-2, started)>添加信號量 行程<Process(Process-4, started)>消耗信號量 行程<Process(Process-3, started)>添加信號量 行程<Process(Process-4, started)>添加信號量 """
4.示例2
from multiprocessing import Process from multiprocessing import Semaphore import time import random def func(i, sem): sem.acquire() print('第%s個人進入小黑屋,拿了鑰匙鎖上門' % i) time.sleep(random.randint(3, 5)) print('第%s個人出去小黑屋,還了鑰匙打開門' % i) sem.release() if __name__ == '__main__': sem = Semaphore(5) # 初始化了一把鎖5把鑰匙,也就是說允許5個人同時進入小黑屋 # 之后其他人必須等待,等有人從小黑屋出來,還了鑰匙,才能允許后邊的人進入 for i in range(20): p = Process(target=func, args=(i, sem,)) p.start()
8.共享資料-Manager
1.概述
Python中提供了強大的Manager類,專門用于實作多行程之間的資料共享
Manager類是資料不安全的,Manager類包含的常用方法和屬性與Multiprocessing中其他常用類的方法屬性一致
Manager管理的共享資料型別有: Value, Array, dict, list, Lock, Semaphore等等,同時Manager還可以共享類的實體物件
2.Manager實作dict功能
from multiprocessing import Manager, Process, Lock def work(d, lock): with lock: # 不加鎖而操作共享的資料,肯定會出現資料錯亂 d['count'] -= 1 if __name__ == '__main__': lock = Lock() with Manager() as m: dic = m.dict({'count': 100}) p_l = [] for i in range(100): p = Process(target=work, args=(dic, lock)) p_l.append(p) p.start() for p in p_l: p.join() print(dic)
3.Manager實作list功能
from multiprocessing import Manager, Process def func(num): num[0] -= 1 print("子行程中num的值是%s" % num) if __name__ == "__main__": m = Manager() num = m.list([1, 2, 3]) p = Process(target=func, args=(num,)) p.start() p.join() print("父行程中num的值是%s" % num)
4.通過Manager行程間共享實體物件
from multiprocessing import Process, Value, Lock from multiprocessing.managers import BaseManager class Employee(object): def __init__(self, name, salary): self.name = name self.salary = Value('i', salary) def increase(self): self.salary.value += 100 def getPay(self): return self.name + ':' + str(self.salary.value) class MyManager(BaseManager): pass def Manager2(): m = MyManager() m.start() return m MyManager.register('Employee', Employee) def func1(em, lock): with lock: em.increase() if __name__ == '__main__': manager = Manager2() em = manager.Employee('zhangsan', 1000) lock = Lock() proces = [Process(target=func1, args=(em, lock)) for i in range(10)] for p in proces: p.start() for p in proces: p.join() print(em.getPay())
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/30870.html
標籤:Python
