主頁 > 後端開發 > 05_行程間通信 IPC

05_行程間通信 IPC

2020-09-14 05:00:35 後端開發

1.行程間的通信方式

    1.磁盤互動: 速度慢,不安全

    2.socket套接字

    3.管道通信(Pipe)

    4.訊息佇列(Queue, Manager().Queue, JoinableQueue)

    5.共享記憶體(Value, Array)

    6.信號(os.kill, signal)

    7.信號量(Semaphore)

    8.共享資料(Manager)

2.管道通信-Pipe

    1.概述: 在記憶體中開辟一塊空間,對多個行程可見,通過管道實作多行程通信

    2.語法

from multiprocess import Pipe

fd1, fd2 = Pipe(duplex=True)  # duplex默認為True表示雙向管道,設定False則表示單向管道
    回傳值: 回傳兩個管道流物件,表示管道的兩端
        如果 duplex=True 則是雙向管道則 fd1可讀可寫,fd2可讀可寫
        如果 duplex=False 則是單向管道則 fd1只能,讀fd2只能寫
date = fd1.recv()  # 接收并回傳接收的訊息(每次接收一條),如果管道沒有訊息會阻塞等待
fd2.send(data)  # 發送訊息,可以是字串或其他型別,date為要發送的內容,如果沒有接收端則管道破裂

    3.示例1

from multiprocessing import Process
from multiprocessing import Pipe
import time
import os

# 創建一個雙向管道
fd1, fd2 = Pipe()

# 如果引數為False,則表示創建一個單向管道,此時fd1只等recv,fd2只能send
# fd1, fd2 = Pipe(False)

def fun(name):
    time.sleep(1)
    # 每個子行程都向管道中發送字串訊息
    fd1.send("hello %s" % name)  # 收發支持Python的多種資料型別,數值,字串,串列等等
    print("fun行程的pid是:%s 父行程的pid是:%s" % (os.getpid(), os.getppid()))


jobs = list()
for i in range(5):
    p = Process(target=fun, args=(i,))
    jobs.append(p)
    p.start()

# 父行程從管道中取訊息
for i in range(5):
    data = fd2.recv()
    print(data)

for i in jobs:
    i.join()
"""執行結果
    fun行程的pid是:24334 父行程的pid是:24333
    hello 0
    hello 1
    fun行程的pid是:24335 父行程的pid是:24333
    hello 2
    fun行程的pid是:24336 父行程的pid是:24333
    hello 3
    fun行程的pid是:24337 父行程的pid是:24333
    hello 4
    fun行程的pid是:24338 父行程的pid是:24333
"""

    4.示例2

from multiprocessing import Pipe
from multiprocessing import Process


def func(con):
    con1, con2 = con
    con1.close()  # 子行程使用con2和父行程通信,所以關閉con1
    while 1:
        try:
            print(con2.recv())  # 當主行程的con1發資料時,子行程要死回圈的去接收
        except EOFError:  # 如果主行程的con1發完資料并關閉con1,子行程的con2繼續接收時,就會報錯,使用try的方式,獲取錯誤
            con2.close()  # 獲取到錯誤,就是指子行程已經把管道中所有資料都接收完了,所以用這種方式去關閉管道
            break


if __name__ == '__main__':
    con1, con2 = Pipe()
    p = Process(target=func, args=((con1, con2),))
    p.start()
    con2.close()  # 在父行程中,使用con1去和子行程通信,所以不需要con2,就提前關閉
    for i in range(10):  # 生產資料
        con1.send(i)  # 給子行程的con2發送資料
    con1.close()  # 生產完資料,關閉父行程這一端的管道

3.訊息佇列通信-Queue, Manager().Queue, JoinableQueue

    1.訊息佇列概述
        在記憶體中開辟佇列模型,用來存放訊息,任何擁有佇列的行程都可以存取訊息,訊息佇列是先進先出
        from queue import Queue  # 是行程內非阻塞佇列即執行緒佇列,類似于普通串列
        from multiprocessing import Queue  # 是跨行程通信佇列,用于解決多行程間的通信問題
        from multiprocessing import Manager  # 是行程池中各子行程間的通信,使用鎖 lock = manager.Queue().Lock()

    2.語法

q = Queue(maxsize=0)  # 創建一個訊息佇列,回傳訊息佇列物件
    引數: maxsize默認為0表示佇列可存放訊息,不指定或數量為負值時容量由記憶體而定,大于0表示佇列最多存放多少條訊息
q.put(item,[block[, timeout]])  # 將item訊息寫入佇列,當佇列滿時會阻塞,要存放的訊息(字串,整數,串列)
    可選引數:
        block: 默認為True表示阻塞,這種為False則非阻塞
        timeout: 在block為True時設定超時時間,單位是秒, 例如: q.put("test", True, 3)
            如果block值為默認的True
                沒有設定timeout,訊息列隊如果已經沒有空間可寫入,此時程式將被阻塞(停在寫入狀態),直到從訊息列隊騰出空間為止
                設定了timeout,則會等待timeout秒,若還沒空間,則拋出"Queue.Full"例外
            如果block值為False:
                訊息列隊如果沒有空間可寫入,則會立刻拋出"Queue.Full"例外
q.get([block[, timeout]])  # 向佇列中取出并回傳訊息,然后將其從列隊中移除,當佇列空時會阻塞
    可選引數:
        block: 默認為True表示阻塞,設定為False則非阻塞
        timeout: 在block為True時設定超時時間,單位是秒, 例如 q.get(True, 3)
            如果block值為True
                沒有設定timeout,訊息列隊如果為空,此時程式將被阻塞(停在讀取狀態),直到從訊息列隊讀到訊息為止
                設定了timeout,則會等待timeout秒,若還沒讀取到任何訊息,則拋出"Queue.Empty"例外
            如果block值為False:
                訊息列隊如果為空,則會立刻拋出"Queue.Empty"例外
q.put_nowait(item)  # 用法相當于 q.put(item, False)
q.get_nowait()  # 用法相當于 q.get(False)
q.full()  # 判斷佇列是否為滿,滿則回傳True
q.empty()  # 判斷佇列是否為空,空則回傳True
q.qsize()  # 得到當前佇列中訊息的個數
q.close()  # 關閉佇列

    3.行程佇列

from multiprocessing import Process
from multiprocessing import Queue

import time
import random


# 寫資料行程執行的代碼:
def write(q):
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())


# 讀資料行程執行的代碼:
def read(q):
    while True:
        if not q.empty():
            value = q.get(True)
            print('Get %s from queue.' % value)
            time.sleep(random.random())
        else:
            break


if __name__ == '__main__':
    # 父行程創建Queue,并傳給各個子行程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 啟動子行程pw,寫入:
    pw.start()
    # 等待pw結束:
    pw.join()
    # 啟動子行程pr,讀取:
    pr.start()
    pr.join()
    # pr行程里是死回圈,無法等待其結束,只能強行終止:
    print("")
    print("所有資料都寫入并且讀完")
"""執行結果
    Put A to queue...
    Put B to queue...
    Put C to queue...
    Get A from queue.
    Get B from queue.
    Get C from queue.
"""

    4.行程池佇列

# 行程池中使用佇列,修改import中的Queue為Manager
from multiprocessing import Manager
from multiprocessing import Pool

import os
import time


def reader(q):
    print("reader啟動(%s),父行程為(%s)" % (os.getpid(), os.getppid()))
    for i in range(q.qsize()):
        print("reader從Queue獲取到訊息:%s" % q.get(True))


def writer(q):
    print("writer啟動(%s),父行程為(%s)" % (os.getpid(), os.getppid()))
    for i in "itcast":
        q.put(i)


if __name__ == "__main__":
    print("(%s) start" % os.getpid())
    q = Manager().Queue()  # 使用Manager中的Queue
    po = Pool()
    po.apply_async(writer, (q,))

    time.sleep(1)  # 先讓上面的任務向Queue存入資料,然后再讓下面的任務開始從中取資料

    po.apply_async(reader, (q,))
    po.close()
    po.join()
    print("(%s) End" % os.getpid())
"""執行結果
    (24474) start
    writer啟動(24476),父行程為(24474)
    reader啟動(24477),父行程為(24474)
    reader從Queue獲取到訊息:i
    reader從Queue獲取到訊息:t
    reader從Queue獲取到訊息:c
    reader從Queue獲取到訊息:a
    reader從Queue獲取到訊息:s
    reader從Queue獲取到訊息:t
    (24474) End
"""

    5.行程佇列實作生產者消費者模型

# 實作方案一: 生產者生產結束的標識,放到生產者行程中
from multiprocessing import Queue
from multiprocessing import Process


# 生產者
def consumer(q, name):
    while 1:
        info = q.get()
        if info:
            print('%s 拿走了%s' % (name, info))
        else:  # 當消費者獲得佇列中資料時,如果獲得的是None,就是獲得到了生產者不再生產資料的標識
            break  # 此時消費者結束即可


# 消費者
def producer(q, product):
    for i in range(20):
        info = product + '的智能手機%s號' % str(i)
        q.put(info)
    q.put(None)  # 讓生產者生產完資料后,給消費者一個不再生產資料的標識


if __name__ == '__main__':
    q = Queue(10)
    p_pro = Process(target=producer, args=(q, '中國制造'))
    p_con = Process(target=consumer, args=(q, 'Kali'))
    p_pro.start()
    p_con.start()
"""執行結果
    Kali 拿走了中國制造的手辦0號
    Kali 拿走了中國制造的手辦1號
    Kali 拿走了中國制造的手辦2號
    Kali 拿走了中國制造的手辦3號
    Kali 拿走了中國制造的手辦4號
    Kali 拿走了中國制造的手辦5號
    Kali 拿走了中國制造的手辦6號
    Kali 拿走了中國制造的手辦7號
    Kali 拿走了中國制造的手辦8號
    Kali 拿走了中國制造的手辦9號
    Kali 拿走了中國制造的手辦10號
    Kali 拿走了中國制造的手辦11號
    Kali 拿走了中國制造的手辦12號
    Kali 拿走了中國制造的手辦13號
    Kali 拿走了中國制造的手辦14號
    Kali 拿走了中國制造的手辦15號
    Kali 拿走了中國制造的手辦16號
    Kali 拿走了中國制造的手辦17號
    Kali 拿走了中國制造的手辦18號
    Kali 拿走了中國制造的手辦19號
"""
# 實作方案二: 生產者生產結束的標識,放到父行程中
from multiprocessing import Queue
from multiprocessing import Process


# 生產者
def consumer(q, name, color):
    while 1:
        info = q.get()
        if info:
            print('%s%s 拿走了%s \033[0m' % (color, name, info))
        else:  # 當消費者獲得佇列中資料時,如果獲得的是None,就是獲得到了生產者不再生產資料的標識
            break  # 此時消費者結束即可


# 消費者
def producer(q, product):
    for i in range(10):
        info = product + '的智能手機%s號' % str(i)
        q.put(info)


if __name__ == '__main__':
    q = Queue(10)
    # 多個生產者行程
    p_pro1 = Process(target=producer, args=(q, '中國制造'))
    p_pro2 = Process(target=producer, args=(q, '美國制造'))
    p_pro3 = Process(target=producer, args=(q, '日本制造'))
    # 多個消費者行程
    p_con1 = Process(target=consumer, args=(q, 'Kali', '\033[31m'))
    p_con2 = Process(target=consumer, args=(q, 'Coco', '\033[32m'))

    p_l = [p_con1, p_con2, p_pro1, p_pro2, p_pro3]
    [i.start() for i in p_l]

    # 父行程通過等待生產者行程結束后再發送結束標識
    p_pro1.join()
    p_pro2.join()
    p_pro3.join()
    q.put(None)  # 幾個消費者就要接受幾個結束標識
    q.put(None)
"""執行結果
    Kali 拿走了日本制造的智能手機0號 
    Coco 拿走了日本制造的智能手機1號 
    Kali 拿走了日本制造的智能手機2號 
    Coco 拿走了日本制造的智能手機3號 
    Kali 拿走了日本制造的智能手機4號 
    Coco 拿走了日本制造的智能手機5號 
    Kali 拿走了日本制造的智能手機6號 
    Coco 拿走了日本制造的智能手機7號 
    Kali 拿走了日本制造的智能手機8號 
    Coco 拿走了日本制造的智能手機9號 
    Kali 拿走了美國制造的智能手機0號 
    Coco 拿走了美國制造的智能手機1號 
    Kali 拿走了美國制造的智能手機2號 
    Coco 拿走了美國制造的智能手機3號 
    Kali 拿走了美國制造的智能手機4號 
    Coco 拿走了美國制造的智能手機5號 
    Kali 拿走了美國制造的智能手機6號 
    Coco 拿走了美國制造的智能手機7號 
    Kali 拿走了美國制造的智能手機8號 
    Coco 拿走了美國制造的智能手機9號 
    Kali 拿走了中國制造的智能手機0號 
    Coco 拿走了中國制造的智能手機1號 
    Kali 拿走了中國制造的智能手機2號 
    Coco 拿走了中國制造的智能手機3號 
    Kali 拿走了中國制造的智能手機4號 
    Coco 拿走了中國制造的智能手機5號 
    Kali 拿走了中國制造的智能手機6號 
    Coco 拿走了中國制造的智能手機7號 
    Kali 拿走了中國制造的智能手機8號 
    Coco 拿走了中國制造的智能手機9號 
"""

    6.行程佇列JoinableQueue模塊+守護行程實作生產者消費者模型

from multiprocessing import Process,JoinableQueue

q = JoinableQueue()

# q.join()  # 用于生產者,等待 q.task_done的回傳結果,通過回傳結果,生產者就能獲得消費者當前消費了多少個資料
# q.task_done()  # 用于消費者,是指每消費佇列中一個資料,就給join回傳一個標識

# 假設生產者生產了100個資料,join就能記錄下100這個數字,每次消費者消費一個資料,就必須要task_done回傳一個標識
# 當生產者(join)接收到100個消費者回傳來的標識的時候,生產者就能知道消費者已經把所有資料都消費完了


# 消費者
def consumer(q, name, color):
    while 1:
        info = q.get()
        print('%s %s 拿走了%s \033[0m' % (color, name, info))
        q.task_done()


# 生產者
def producer(q, product):
    for i in range(20):
        info = product + '的智能手機%s號' % str(i)
        q.put(info)
    q.join()  # 記錄了生產了20個資料在佇列中,此時會阻塞等待消費者消費完佇列中所有資料

if __name__ == '__main__':
    q = JoinableQueue(10)
    p_pro1 = Process(target=producer, args=(q, '中國制造'))
    p_con1 = Process(target=consumer, args=(q, 'Kali', '\033[31m'))
    p_con1.daemon = True  # 把消費者行程設為守護行程
    p_con1.start()
    p_pro1.start()
    p_pro1.join()  # 主行程等待生產者行程結束
    # 程式有3個行程,主行程和生產者行程和消費者行程,當主行程執行到p_pro1.join()代碼時,主行程會等待生產行程結束
    # 而生產行程中執行到q.join()會等待消費者行程把所有資料消費完,生產者行程才結束
    # 現在的狀態就是主行程等待生產者行程結束,生產者行程等待消費者消費完所有資料
    # 所以,把消費者設定為守護行程,當主行程執行完,就代表生產行程已經結束,也就代表消費者行程已經把佇列中資料消費完
    # 此時,主行程一旦結束,守護行程也就是消費者行程也就跟著結束,整個程式也就能正常結束了
"""執行結果
     Kali 拿走了中國制造的智能手機0號 
     Kali 拿走了中國制造的智能手機1號 
     Kali 拿走了中國制造的智能手機2號 
     Kali 拿走了中國制造的智能手機3號 
     Kali 拿走了中國制造的智能手機4號 
     Kali 拿走了中國制造的智能手機5號 
     Kali 拿走了中國制造的智能手機6號 
     Kali 拿走了中國制造的智能手機7號 
     Kali 拿走了中國制造的智能手機8號 
     Kali 拿走了中國制造的智能手機9號 
     Kali 拿走了中國制造的智能手機10號 
     Kali 拿走了中國制造的智能手機11號 
     Kali 拿走了中國制造的智能手機12號 
     Kali 拿走了中國制造的智能手機13號 
     Kali 拿走了中國制造的智能手機14號 
     Kali 拿走了中國制造的智能手機15號 
     Kali 拿走了中國制造的智能手機16號 
     Kali 拿走了中國制造的智能手機17號 
     Kali 拿走了中國制造的智能手機18號 
     Kali 拿走了中國制造的智能手機19號 
"""

4.共享記憶體-Value, Array

    1.概述: 在記憶體中開辟一段空間存盤資料,對多個行程可見,每次寫入共享記憶體的資料會覆寫之前的內容,對記憶體格式化較少,但存取速度快

    2.語法

from multiprocess import Value
from multiprocess import Array

obj = Value(ctype, obj)  # 開辟共享記憶體空間,回傳一個共享記憶體物件
    引數:
        ctype: str要轉變的c型別(對照ctype表)
        obj: 寫入共享記憶體的初始值
obj.value  # 得到共享記憶體中的值
obj = Array(ctype, obj)  # 開辟共享記憶體空間(陣列),回傳一個共享記憶體物件
    引數:
        ctype: 要轉換的型別
        obj: 存入到共享記憶體中的資料,obj是一個串列且串列中的資料型別必須一致,obj是正整數則表示開辟一個多大的記憶體空間

    3.行程間通信-Value

from multiprocessing import Process
from multiprocessing import Value
import time
import random


# 向共享記憶體中存取
def deposit(money):
    for i in range(100):
        time.sleep(0.01)
        money.value += random.randint(1, 100)


# 向共享記憶體中取錢
def withdraw(money):
    for i in range(100):
        time.sleep(0.01)
        money.value -= random.randint(1, 50)


def main():
    money = Value("i", 1000)  # "i"在ctype對照表中對應的資料型別是Python的int型別
    de = Process(target=deposit, args=(money,))
    wi = Process(target=withdraw, args=(money,))
    de.start()
    wi.start()
    de.join()
    wi.join()
    print(money.value)  # 本次執行結果money.value的值是3401


if __name__ == "__main__":
    main()

    4.行程間通信-Array

from multiprocessing import Process
from multiprocessing import Array


def func(shm):
    for i in shm:
        print(i, end=" ")  # 子行程中列印結果: 1 2 3 4 5
    shm[0] = 11


def main():
    # 開辟記憶體共享空間,可容納5個整數,初始值是[1, 2, 3, 4, 5]
    shm = Array("i", [1, 2, 3, 4, 5])  # "i"在ctype對照表中對應的資料型別是Python的int型別

    # 子記憶體共享空間開辟一個包含5個整形的空間
    # shm = Array("i", 5)  # 沒有初始值遍歷shm結果為 0 0 0 0 0 此時可以靈活的修改資料

    p = Process(target=func, args=(shm,))
    p.start()
    p.join()
    print()
    for i in shm:
        print(i, end=" ")  # 主行程中列印結果: 11 2 3 4 5


if __name__ == "__main__":
    main()

5.管道訊息佇列共享記憶體對比

對比項 管道 訊息佇列 共享記憶體
開辟空間 記憶體 記憶體 記憶體
讀寫方式 雙向/單向 先進先出  操作覆寫記憶體 
效率 一般 一般
應用  多用于親緣行程   方便靈活廣泛  較復雜
 是否需要互斥機制 

6.信號-os.kill, signal

    1.概述
        1.信號的名字,作用和處理信號都是有系統定義的,一個行程向另一個行程通過信號傳遞某種訊息
        2.信號的默認處理方法: 系統定義,信號給接收信號的行程帶來的行為一般有 終止 暫停 忽略
        3.信號屬于異步通信方式,信號的發送不會影響行程的持續執行
        4.命令列下信號操作
            kill -l: 查看信號
            kill -signame PID: 給PID的行程發送一個信號, 例如 kell -9 2332

    2.常用信號

# 默認操作: 終止的信號
SIGHUP: 該信號在用戶終端連接(正常或非正常)結束時發出,通常是在終端的控制行程結束時,通知同一會話內的各個作業與控制終端不再關系
SIGINT: 該信號在用戶鍵入INTR字符(通常是Ctrl+c)時發出,終端驅動程式發送此信號并送到前臺行程中的每一個行程
SIGQUIT: 該信號和SIGINT類似,但由QUIT字符(通常是'"Ctrl+\"')來控制
SIGILL: 該信號在一個行程企圖執行一條非法指令時(可執行檔案本身出現錯誤,或者試圖執行資料段,堆疊溢位時)發出
SIGFPE: 該信號在發送致命的算術運算錯誤時發出,不僅包括浮點運算錯誤還包括溢位和除數為0等其他所有的算數的錯誤
SIGKILL: 該信號用來立即結束程式的運行,并且不能被阻塞,處理和忽略
SIGALRM: 該信號當一個定時器到時的時候發出
SIGABORT: 該信號用于結束行程

# 默認操作: 暫停行程
SIGSTOP: 該信號用于暫停一個行程,并且不能被阻塞,處理和忽略
SIGTSTP: 該信號用于暫停互動行程,用戶可以鍵入SUSP字符(通常是Ctrl+z)發出這個信號

# 默認操作: 忽略
SIGCHLD: 子行程改變狀態時,父行程會收到這個信號

    3.發送信號

        語法

import signal

os.kill(pid, sig)  # 向一個行程發送一個信號
    引數:
        pid: 要發送信號的行程PID
        sig: 要發送的信號
signal.alarm(sec)  # 向自身發送一個時鐘信號SIGALRM,一個行程中只能同時有一個時鐘,后面的時鐘時間會覆寫前面的時鐘時間
    引數: sec代表時鐘秒數

        示例1

import os
import signal

# 向 2332 行程發送SIGKILL信號,殺死2332行程
os.kill(2332, signal.SIGKILL)

        示例2

import time
import signal

# 3秒鐘之后向自身發送SIGALRM信號,終止行程
signal.alarm(3)
time.sleep(1)
signal.alarm(5)  # 后面的時鐘時間會覆寫前面的時鐘時間
while True:
    time.sleep(1)
    print("等待時鐘...")

    4.信號處理

        語法

import signal

signal.pause()  # 阻塞等待一個信號的發生
signal.signal(signum, handler)  # 處理一個信號
    使用說明: 是一個異步處理信號的函式,只要執行在行程中就會按照指定方法處理信號,但是不能處理 SIGSTOP 和 SIGKILL 信號
    引數:
        signum: 要處理的信號
        handler: 對該信號的處理方法
            SIG_DFL: 采用默認方法
            SIG_IGN: 忽略這個信號
            func: 回呼函式,自定義處理方法
    自定義處理方法格式要求
        def func(sig, frame):  # sig: 接收到的信號;frame: 信號物件
            pass

        示例1

import time
import signal

# 5秒鐘之后向自身發送SIGALRM信號,終止行程
signal.alarm(5)

# 采用默認的方法處理SIGALRM信號
signal.signal(signal.SIGALRM, signal.SIG_DFL)

# 采用忽略信號的方法處理SIGALRM信號
# signal.signal(signal.SIGALRM, signal.SIG_IGN)
# signal.signal(signal.SIGINT, signal.SIG_IGN)  # 忽略Ctrl+c終止行程,即行程運行時無法使用Ctrl+c來終止

while True:
    time.sleep(1)
    print("等待時鐘...")

        示例2

import time
import signal

# 5秒鐘之后向自身發送SIGALRM信號,終止行程
signal.alarm(5)


# 固定格式要求
def handler(sig, frame):
    if sig == signal.SIGALRM:
        print("收到了時鐘信號: %s" % sig)
    elif sig == signal.SIGINT:
        print("收到了時鐘信號: %s" % sig)


# 通過自定義方法handler處理SIGALRM信號
signal.signal(signal.SIGALRM, handler)
signal.signal(signal.SIGINT, handler)

while True:
    time.sleep(1)
    print("等待時鐘...")

    5.使用信號處理僵尸行程
        import signal

        原理: 在父行程中忽略子行程的發送信號
        語法: signal.signal(signal.SIGCHLD, signal.SIG_IGN)

    6.多行程實作信號通信

import multiprocessing
from signal import *
from time import sleep
import os


# 售票員處理信號
def conductor_handler(sig, frame):
    if sig == SIGINT:
        os.kill(os.getppid(), SIGUSR1)
    elif sig == SIGQUIT:
        os.kill(os.getppid(), SIGUSR2)
    elif sig == SIGUSR1:
        print("到站了")
        os._exit(0)


# 子行程中售票員發送信號
def conductor():
    signal(SIGINT, conductor_handler)
    signal(SIGQUIT, conductor_handler)
    signal(SIGUSR1, conductor_handler)

    # 子行程中忽略父行程要處理的信號
    signal(SIGTSTP, SIG_IGN)
    while True:
        sleep(3)
        print("車內開始廣播...")


# 司機處理信號
def driver_handler(sig, frame):
    if sig == SIGUSR1:
        print("老司機開車")
    elif sig == SIGUSR2:
        print("老司機以鎖死車門")
    elif sig == SIGTSTP:
        os.kill(p_pid, SIGUSR1)


def main():
    p = multiprocessing.Process(target=conductor)
    p.start()
    global p_pid
    p_pid = p.pid

    # 父行程中司機發送信號
    signal(SIGUSR1, driver_handler)
    signal(SIGUSR2, driver_handler)
    signal(SIGTSTP, driver_handler)

    # 父行程忽略子行程要處理的信號
    signal(SIGINT, SIG_IGN)
    signal(SIGQUIT, SIG_IGN)

    p.join()


if __name__ == "__main__":
    main()
"""執行結果
    ~/Desktop/python3/demo $ python3 demo5.py
    車內開始廣播...
    車內開始廣播...
    ^C老司機開車
    ^C老司機開車
    ^C老司機開車
    車內開始廣播...
    ^\老司機以鎖死車門
    ^\老司機以鎖死車門
    ^\老司機以鎖死車門
    車內開始廣播...
    車內開始廣播...
    ^Z到站了
"""

7.信號量-Semaphore

    1.概述: 給定一定的信號數量,對多個行程可見并且多個行程均可操作,行程根據信號量的多少可以有不同的行為

    2.語法

from multiprocess import Semaphore

sem = Semaphore(num)  # 定義信號量并回傳信號量物件,num是給定信號量的初始個數
sem.acquire()  # 將信號量減一,信號量為0時呼叫次方法會阻塞等待
sem.release()  # 將信號量加一

    3.示例1

import time
import multiprocessing


def func(sem):
    print("行程%s等待信號量" % multiprocessing.current_process())  # current_process獲取當前行程物件
    sem.acquire()  # 信號量減1
    print("行程%s消耗信號量" % multiprocessing.current_process())
    time.sleep(2)
    print("行程%s添加信號量" % multiprocessing.current_process())
    sem.release()  # 信號量加1


def main():
    # 創建信號量初始值為3
    sem = multiprocessing.Semaphore(3)
    jobs = list()
    for i in range(4):
        p = multiprocessing.Process(target=func, args=(sem,))
        p.start()
        jobs.append(p)
    for i in jobs:
        i.join()


if __name__ == "__main__":
    main()
"""執行結果
    行程<Process(Process-1, started)>等待信號量
    行程<Process(Process-1, started)>消耗信號量
    行程<Process(Process-2, started)>等待信號量
    行程<Process(Process-2, started)>消耗信號量
    行程<Process(Process-3, started)>等待信號量
    行程<Process(Process-3, started)>消耗信號量
    行程<Process(Process-4, started)>等待信號量
    行程<Process(Process-1, started)>添加信號量
    行程<Process(Process-2, started)>添加信號量
    行程<Process(Process-4, started)>消耗信號量
    行程<Process(Process-3, started)>添加信號量
    行程<Process(Process-4, started)>添加信號量
"""

    4.示例2

from multiprocessing import Process
from multiprocessing import Semaphore
import time
import random

def func(i, sem):
    sem.acquire()
    print('第%s個人進入小黑屋,拿了鑰匙鎖上門' % i)
    time.sleep(random.randint(3, 5))
    print('第%s個人出去小黑屋,還了鑰匙打開門' % i)
    sem.release()


if __name__ == '__main__':
    sem = Semaphore(5)  # 初始化了一把鎖5把鑰匙,也就是說允許5個人同時進入小黑屋
    # 之后其他人必須等待,等有人從小黑屋出來,還了鑰匙,才能允許后邊的人進入
    for i in range(20):
        p = Process(target=func, args=(i, sem,))
        p.start()

8.共享資料-Manager

    1.概述
        Python中提供了強大的Manager類,專門用于實作多行程之間的資料共享
        Manager類是資料不安全的,Manager類包含的常用方法和屬性與Multiprocessing中其他常用類的方法屬性一致
        Manager管理的共享資料型別有: Value, Array, dict, list, Lock, Semaphore等等,同時Manager還可以共享類的實體物件

    2.Manager實作dict功能

from multiprocessing import Manager, Process, Lock


def work(d, lock):
    with lock:  # 不加鎖而操作共享的資料,肯定會出現資料錯亂
        d['count'] -= 1


if __name__ == '__main__':
    lock = Lock()
    with Manager() as m:
        dic = m.dict({'count': 100})
        p_l = []
        for i in range(100):
            p = Process(target=work, args=(dic, lock))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)

    3.Manager實作list功能

from multiprocessing import Manager, Process


def func(num):
    num[0] -= 1
    print("子行程中num的值是%s" % num)


if __name__ == "__main__":
    m = Manager()
    num = m.list([1, 2, 3])
    p = Process(target=func, args=(num,))
    p.start()
    p.join()
    print("父行程中num的值是%s" % num)

    4.通過Manager行程間共享實體物件

from multiprocessing import Process, Value, Lock
from multiprocessing.managers import BaseManager


class Employee(object):
    def __init__(self, name, salary):
        self.name = name
        self.salary = Value('i', salary)

    def increase(self):
        self.salary.value += 100

    def getPay(self):
        return self.name + ':' + str(self.salary.value)


class MyManager(BaseManager):
    pass


def Manager2():
    m = MyManager()
    m.start()
    return m


MyManager.register('Employee', Employee)


def func1(em, lock):
    with lock:
        em.increase()


if __name__ == '__main__':
    manager = Manager2()
    em = manager.Employee('zhangsan', 1000)
    lock = Lock()
    proces = [Process(target=func1, args=(em, lock)) for i in range(10)]
    for p in proces:
        p.start()
    for p in proces:
        p.join()
    print(em.getPay())

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/30870.html

標籤:Python

上一篇:python寫入二維netCDF檔案(txt轉nc檔案)

下一篇:c++中string類的用法(網上查的,分享下)

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more