1.同步和互斥
1.目的: 對共有資源的操作會產生爭奪,同步互斥是一種解決爭奪的方案
2.臨界資源: 多個行程或執行緒都可以操作的資源
3.臨界區: 操作臨界資源的代碼段
4.同步:
同步是一種合作關系,為完成某個任務多行程或多執行緒之間形成一種協調,按照條件依次執行傳遞告知資源情況,這種協調可能是因為阻塞關系達成的
同步就是協同步調,按預定的先后次序進行運行,如:訊息佇列通信
行程(執行緒)同步可理解為行程(執行緒)A和B一塊配合,A執行到一定程度時要依靠B的某個結果于是停下來示意B運行;B執行再將結果給A;A再繼續操作
5.互斥:
互斥是一種制約關系,一個行程(執行緒)進入到臨界區會進行加鎖操作,其它行程(執行緒)在企圖操作臨界資源就會阻塞,只有當資源被釋放才能進行操作
當多個執行緒同時修改共享資料的時候,需要進行同步控制,使用互斥鎖則保證了每次只有一個執行緒進行寫入操作,保證了多執行緒情況下資料的正確性
某個執行緒要更改共享資料時,先將其鎖定此時資源即上鎖,其他執行緒不能更改;直到該執行緒釋放資源即解鎖,其他的執行緒才能再次鎖定該資源
2.行程事件-Event
1.行程事件概述: 一個行程通過對Event的事件狀態的設定,另外一個行程判斷事件狀態來確認是阻塞等待還是繼續執行
2.語法概述
from multiprocessing import Event e = Event() # 創建事件物件 e.wait() # 提供事件阻塞 e.set() # 對事件物件行程設定,此時wait判斷如果事件被set則結束阻塞 e.clear() # 清除該事件物件的set e.is_set() # 監測物件是否被設定,設定回傳True
3.臨界資源操作示例
from multiprocessing import Process from multiprocessing import current_process from multiprocessing import Event import time def wait_event(e): print("子行程%s阻塞等待主行程開放臨界區" % current_process()) e.wait() print("子行程%s開始操作臨界區" % current_process(), e.is_set()) def wait_event_timeout(e): print("子行程%s阻塞等待主行程開放臨界區" % current_process()) e.wait(3) print("子行程%s等待3秒后不再等待主行程開放臨界區" % current_process(), e.is_set()) print("子行程%s開始執行其他操作" % current_process(), e.is_set()) def main(): e = Event() p1 = Process(target=wait_event, name="block", args=(e,)) p2 = Process(target=wait_event_timeout, name="non-block", args=(e,)) p1.start() p2.start() print("主行程正在操作臨界資源") print("-" * 50) time.sleep(6) e.set() print("-" * 50) print("主行程結束對臨界資源的操作,開放臨界區") p1.join() p2.join() if __name__ == "__main__": main() """執行結果 主行程正在操作臨界資源 -------------------------------------------------- 子行程<Process(block, started)>阻塞等待主行程開放臨界區 子行程<Process(non-block, started)>阻塞等待主行程開放臨界區 子行程<Process(non-block, started)>等待3秒后不再等待主行程開放臨界區 False 子行程<Process(non-block, started)>開始執行其他操作 False -------------------------------------------------- 主行程結束對臨界資源的操作,開放臨界區 子行程<Process(block, started)>開始操作臨界區 True """
4.紅綠燈示例
from multiprocessing import Process from multiprocessing import Event import time import random def tra(e): '''信號燈函式''' # e.set() # print('\033[32m 綠燈亮! \033[0m') while 1: # 紅綠燈得一直亮著,要么是紅燈要么是綠燈 if e.is_set(): # True,代表綠燈亮,那么此時代表可以過車 time.sleep(5) # 所以在這讓燈等5秒鐘,這段時間讓車過 print('\033[31m 紅燈亮! \033[0m') # 綠燈亮了5秒后應該提示到紅燈亮 e.clear() # 把is_set設定為False else: time.sleep(5) # 此時代表紅燈亮了,此時應該紅燈亮5秒,在此等5秒 print('\033[32m 綠燈亮! \033[0m') # 紅的亮夠5秒后,該綠燈亮了 e.set() # 將is_set設定為True def Car(i,e): e.wait() # 車等在紅綠燈,此時要看是紅燈還是綠燈,如果is_set為True就是綠燈,此時可以過車 print('第%s輛車過去了'%i) if __name__ == '__main__': e = Event() triff_light = Process(target=tra, args=(e,)) # 信號燈的行程 triff_light.start() for i in range(50): # 描述50輛車的行程 if i % 3 == 0: time.sleep(2) car = Process(target=Car, args=(i + 1, e,)) car.start()
3.行程鎖-Lock
1.行程鎖概述: 在lock物件處于上鎖狀態時,再企圖上鎖則會阻塞,直到鎖被釋放才能繼續執行上鎖操作
2.語法概述
from multiprocess import Lock lock = Lock() # 創建鎖物件 lock.acquire() # 上鎖 lock.release() # 解鎖
3.背景關系管理器實作鎖管理
from multiprocess import Lock lock = Lock() # 創建鎖物件 # 給with代碼段上鎖,with代碼的結束自動解鎖 with lock: pass
4.行程鎖的實作
from multiprocessing import Process from multiprocessing import Lock from time import sleep import sys def worker1(lock): lock.acquire() # 上鎖 for _ in range(5): sleep(1) # sys.stdout為所有行程共有資源 sys.stdout.write("worker1輸出\n") lock.release() # 釋放鎖 def worker2(lock): lock.acquire() # 上鎖 for _ in range(5): sleep(1) sys.stdout.write("worker2輸出\n") lock.release() # 釋放鎖 def main(): # 創建Lock物件 lock = Lock() p1 = Process(target=worker1, args=(lock,)) p2 = Process(target=worker2, args=(lock,)) p1.start() p2.start() p1.join() p2.join() if __name__ == "__main__": main() """執行結果 worker1輸出 worker1輸出 worker1輸出 worker1輸出 worker1輸出 worker2輸出 worker2輸出 worker2輸出 worker2輸出 worker2輸出 """
4.行程條件變數-Condition
1.行程條件變數概述: Condition條件變數通常與一個鎖關聯
需要在多個Contidion中共享一個鎖時,可以傳遞一個Lock/RLock實體給構造方法,否則它將自己生成一個RLock實體
除了Lock帶有的鎖定池外,Condition還包含一個等待池,池中的執行緒處于狀態圖中的等待阻塞狀態
直到另一個執行緒呼叫notify()/notifyAll()通知;得到通知后執行緒進入鎖定池等待鎖定
2.語法
from multiprocessing import Condition con = Condition con.acquire(): 行程鎖 con.release(): 釋放鎖 con.wait(timeout) 行程掛起,直到收到一個notify通知或者超時才會被喚醒繼續運行,timeout超時時間,秒 wait()必須在已獲得Lock前提下才能呼叫否則會觸發RuntimeError con.notify(n=1) 通知其他行程,那些掛起的行程接到這個通知之后會開始運行,默認是通知一個正等待該condition的行程,最多則喚醒n個等待的行程 notify()必須在已獲得Lock前提下才能呼叫否則會觸發RuntimeError,notify()不會主動釋放Lock con.notifyAll(): 如果wait狀態行程比較多,notifyAll的作用就是通知所有行程
3.行程條件變數的實作
import multiprocessing, time def A(cond): name = multiprocessing.current_process().name print("%s行程開始執行A函式" % name) with cond: print("行程%s執行完成,通知其他行程可以執行" % name) cond.notify_all() # 通知其他行程,那些掛起的行程接到這個通知之后會開始運行 def B(cond): name = multiprocessing.current_process().name print("%s行程開始執行B函式" % name) with cond: cond.wait() # 行程掛起,直到收到一個notify通知或者超時才會被喚醒繼續運行 print("%s行程繼續執行B函式" % name) def main(): # 創建行程條件變數 cond = multiprocessing.Condition() p = multiprocessing.Process(target=A, args=(cond,)) p_list = [multiprocessing.Process(target=B, name="Process2[%d]" % i, args=(cond,)) for i in range(1, 3)] # 開始行程 for i in p_list: i.start() time.sleep(1) p.start() # 阻塞等待回收行程 p.join() for i in p_list: i.join() if __name__ == "__main__": main() """執行結果 Process2[1]行程開始執行B函式 Process2[2]行程開始執行B函式 Process-1行程開始執行A函式 行程Process-1執行完成,通知其他行程可以執行 Process2[1]行程繼續執行B函式 Process2[2]行程繼續執行B函式 """
5.執行緒事件-Event
1.語法概述
from threading import Event e = Event() # 創建事件物件 e.wait() # 提供事件阻塞 e.set() # 對事件物件行程設定,此時wait判斷如果事件被set則結束阻塞 e.clear() # 清除該事件物件的set e.is_set() # 監測物件是否被設定,設定回傳True
2.事件解決執行緒間的資源競爭
import threading from time import sleep def fun(event): print("呼叫foo") global s s = "奔波兒灞" def foo(event): print("等待口令") sleep(2) if s == "奔波兒灞": print("收到的口令是: %s" % s) else: print("口令錯誤...") event.set() # 對事件物件行程設定,此時wait判斷如果事件被set則結束阻塞 def bar(event): print("bar開始執行") sleep(1) event.wait() # 提供事件阻塞 global s s = "霸波爾奔" def main(): s = None event = threading.Event() t1 = threading.Thread(name="fun", target=fun, args=(event,)) t2 = threading.Thread(name="foo", target=foo, args=(event,)) t3 = threading.Thread(name="bar", target=bar, args=(event,)) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join() if __name__ == "__main__": main() """執行結果 呼叫foo 等待口令 bar開始執行 收到的口令是: 奔波兒灞 """
3.事件實作執行緒間的同步機制
from threading import Thread, Event import time, random def conn_mysql(e, i): count = 1 while count <= 3: if e.is_set(): print('第%s個人連接成功!' % i) break print('正在嘗試第%s次重新連接...' % (count)) e.wait(0.5) count += 1 def check_mysql(e): print('\033[42m 資料庫正在維護 \033[0m') time.sleep(random.randint(1, 2)) e.set() if __name__ == '__main__': e = Event() t_check = Thread(target=check_mysql, args=(e,)) t_check.start() for i in range(10): t_conn = Thread(target=conn_mysql, args=(e, i)) t_conn.start()
6.執行緒鎖-Lock
1.語法概述
lock = Lock() # 創建鎖 lock.acquire() # 加鎖 lock.release() # 解鎖
2.執行緒鎖解決執行緒間的資源競爭
import threading import time g_num = 0 def test1(num): global g_num for i in range(num): mutex.acquire() # 上鎖 g_num += 1 mutex.release() # 解鎖 print("---test1---g_num=%d" % g_num) def test2(num): global g_num for i in range(num): mutex.acquire() # 上鎖 g_num += 1 mutex.release() # 解鎖 print("---test2---g_num=%d" % g_num) # 創建一個互斥鎖,默認是未上鎖的狀態 mutex = threading.Lock() # 創建2個執行緒,讓他們各自對g_num加1000000次 p1 = threading.Thread(target=test1, args=(1000000,)) p1.start() p2 = threading.Thread(target=test2, args=(1000000,)) p2.start() # 等待計算完成 while len(threading.enumerate()) != 1: time.sleep(1) print("2個執行緒對同一個全域變數操作之后的最終結果是:%s" % g_num)
3.執行緒鎖的死鎖問題-添加超時時間避免死鎖
# 在執行緒間共享多個資源的時候,如果兩個執行緒分別占有一部分資源并且同時等待對方的資源,就會造成死鎖 # 避免死鎖:程式設計時要盡量避免(銀行家演算法), 添加超時時間等 import threading import time class MyThread1(threading.Thread): def run(self): # 對mutexA上鎖 mutexA.acquire() # mutexA上鎖后,延時1秒,等待另外那個執行緒 把mutexB上鎖 print(self.name + "----do1---up----") time.sleep(1) # 此時會堵塞,因為這個mutexB已經被另外的執行緒搶先上鎖了 mutexB.acquire() print(self.name + "----do1---down----") mutexB.release() # 對mutexA解鎖 mutexA.release() class MyThread2(threading.Thread): def run(self): # 對mutexB上鎖 mutexB.acquire() # mutexB上鎖后,延時1秒,等待另外那個執行緒 把mutexA上鎖 print(self.name + "----do2---up----") time.sleep(1) # 此時會堵塞,因為這個mutexA已經被另外的執行緒搶先上鎖了 mutexA.acquire() print(self.name + "----do2---down----") mutexA.release() # 對mutexB解鎖 mutexB.release() mutexA = threading.Lock() mutexB = threading.Lock() if __name__ == "__main__": t1 = MyThread1() t2 = MyThread2() t1.start() t2.start()
4.執行緒鎖的死鎖問題-通過遞回鎖RLock避免死鎖
from threading import Thread from threading import RLock import time # RLock是遞回鎖 --- 是無止盡的鎖,但是所有鎖都有一個共同的鑰匙 # 想解決死鎖,配一把公共的鑰匙就可以了 def man(l_tot, l_pap): l_tot.acquire() # 是男的獲得廁所資源,把廁所鎖上了 print('Kali在廁所上廁所') time.sleep(1) l_pap.acquire() # 男的拿紙資源 print('Kali拿到衛生紙了!') time.sleep(0.5) print('Kali完事了!') l_pap.release() # 男的先還紙 l_tot.release() # 男的還廁所 def woman(l_tot, l_pap): l_pap.acquire() # 女的拿紙資源 print('Coco拿到衛生紙了!') time.sleep(1) l_tot.acquire() # 是女的獲得廁所資源,把廁所鎖上了 print('Coco在廁所上廁所') time.sleep(0.5) print('Coco完事了!') l_tot.release() # 女的還廁所 l_pap.release() # 女的先還紙 if __name__ == '__main__': l_tot = RLock() l_pap = RLock() t_man = Thread(target=man, args=(l_tot, l_pap)) t_woman = Thread(target=woman, args=(l_tot, l_pap)) t_man.start() t_woman.start()
7.執行緒條件變數-Condition
語法概述
# 條件是讓程式員自己去調度執行緒的一個機制 con = threading.Condition() con.acquire() # 對資源加鎖,加鎖后其他位置再加鎖則阻塞 con.release() # 解鎖 con.wait() # wait函式只能在加鎖狀態下使用,wait函式會先解鎖然后讓執行緒處于等待通知的阻塞狀態 con.notify() # 發送通知,執行緒接收到通知后結束wait阻塞并且執行acquire加鎖操作
執行緒條件變數示例-打壓股市
import threading import time class Gov(threading.Thread): def run(self): global num con.acquire() # 對資源加鎖,加鎖后其他位置再加鎖則阻塞 while True: print("開始拉升股市") num += 1 print("拉升了%s個點" % num) time.sleep(1) if num == 5: print("暫時安全") con.notify() # 發送通知,執行緒接收到通知后結束wait阻塞并且執行acquire加鎖操作 print("不操作狀態") con.wait() # wait先解鎖然后讓執行緒處于等待通知的阻塞狀態,直到接收到t2執行緒發出的通知后結束阻塞并加鎖 con.release() class Consumers(threading.Thread): def run(self): global num con.acquire() # 再對資源加鎖,此時會阻塞在這里 while True: if num > 0: print("開始打壓股市") num -= 1 print("下降到%s個點" % num) time.sleep(1) if num == 0: print("天臺見") con.notify() # 發送通知,執行緒接收到通知后結束wait阻塞并且執行acquire加鎖操作 print("不能再下降了") con.wait() # wait先解鎖然后讓執行緒處于等待通知的阻塞狀態,直到接收到t1執行緒發出的通知后結束阻塞并加鎖 con.release() def main(): global num num = 0 # 創建條件變數 global con con = threading.Condition() t1 = Gov() t2 = Consumers() t1.start() t2.start() t1.join() t2.join() if __name__ == "__main__": main() """執行結果 開始拉升股市 拉升了1個點 開始拉升股市 拉升了2個點 開始拉升股市 拉升了3個點 開始拉升股市 拉升了4個點 開始拉升股市 拉升了5個點 暫時安全 不操作狀態 開始打壓股市 下降到4個點 開始打壓股市 下降到3個點 開始打壓股市 下降到2個點 開始打壓股市 下降到1個點 開始打壓股市 下降到0個點 天臺見 不能再下降了 開始拉升股市 拉升了1個點 """
執行緒條件變數示例-吃火鍋
import threading import time con = threading.Condition() num = 0 # 生產者 class Producer(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): # 鎖定執行緒 global num con.acquire() while True: print("開始添加!!!") num += 1 print("火鍋里面魚丸個數:%s" % str(num)) time.sleep(1) if num >= 5: print("火鍋里面里面魚丸數量已經到達5個,無法添加了!") # 喚醒等待的執行緒 con.notify() # 喚醒小伙伴開吃啦 # 等待通知 con.wait() # 釋放鎖 con.release() # 消費者 class Consumers(threading.Thread): def __init__(self): threading.Thread.__init__(self) def run(self): con.acquire() global num while True: print("開始吃啦!!!") num -= 1 print("火鍋里面剩余魚丸數量:%s" % str(num)) time.sleep(2) if num <= 0: print("鍋底沒貨了,趕緊加魚丸吧!") con.notify() # 喚醒其它執行緒 # 等待通知 con.wait() con.release() def main(): p = Producer() # 實體化一個生產者物件 c = Consumers() # 實體化一個消費者物件 p.start() c.start() if __name__ == "__main__": main()
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/13980.html
標籤:Python
上一篇:node.js/npm升級正確操作(windows和linux均有)
下一篇:0827Python練習
