多執行緒基礎概念
并行與并發
- 并行:同時處理多個任務,必須在多核環境下
- 一段時間內同時處理多個任務,單核也可以并發
并發手段
- 執行緒:內核空間的調度
- 行程:內核空間的調度
- 協程:用戶空間的調度
執行緒可以允許程式在同一行程空間中并發運行多個操作,本次主要介紹Python標準庫中的多執行緒模塊threading,
threading模塊
執行緒初始化
使用threading模塊的Thread類初始化物件然后呼叫start方法啟動執行緒,
import threading
import time
def worker(num):
time.sleep(1)
print('worker-{}'.format(num))
# 創建執行緒物件 target引數是一個函式, 這個函式即執行緒要執行的邏輯
threads = [threading.Thread(target=worker, args=(i, ))for i in range(5)]
for t in threads:
t.start()
# start 方法啟動一個執行緒, 當這個執行緒的邏輯執行完畢的時候,執行緒自動退出, Python 沒有提供主動退出執行緒的方法
# 輸出以下結果
worker-0worker-1worker-2worker-3
worker-4
初始化的五個執行緒的執行邏輯中的print方法列印字串及換行符出現了隨機分布,即出現了資源競爭,
給執行緒傳遞引數
import threading
import time
def worker(*args, **kwargs):
time.sleep(1)
print(args)
print(kwargs)
threads = threading.Thread(target=worker, args=(1, 2, 3), kwargs={'a':'b'}).start()
# 輸出
(1, 2, 3)
{'a': 'b'}
args傳遞位置引數,kwargs傳遞關鍵字引數,
Thread常用引數和方法
>>> help(threading.Thread)
可以看到Thread函式的初始化方法中的引數如下:
| __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
| This constructor should always be called with keyword arguments. Arguments are:
|
| *group* should be None; reserved for future extension when a ThreadGroup
| class is implemented.
|
| *target* is the callable object to be invoked by the run()
| method. Defaults to None, meaning nothing is called.
|
| *name* is the thread name. By default, a unique name is constructed of
| the form "Thread-N" where N is a small decimal number.
|
| *args* is the argument tuple for the target invocation. Defaults to ().
|
| *kwargs* is a dictionary of keyword arguments for the target
| invocation. Defaults to {}.
name
表示執行緒名稱,默認情況下,執行緒名稱是Thread-N,N是一個較小的十進制數,我們可以傳遞name引數,控制執行緒名稱,
以下會匯入logging模塊來顯示執行緒的名稱等詳細資訊
import threading
import time
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
def worker(num):
time.sleep(1)
logging.info('worker-{}'.format(num))
threads = [threading.Thread(target=worker, args=(i, ), name='workerthread-{}'.format(i)) for i in range(5)]
for t in threads:
t.start()
# 輸出
2017-03-20 21:39:29,339 INFO [workerthread-0] worker-0
2017-03-20 21:39:29,340 INFO [workerthread-1] worker-1
2017-03-20 21:39:29,340 INFO [workerthread-2] worker-2
2017-03-20 21:39:29,340 INFO [workerthread-3] worker-3
2017-03-20 21:39:29,346 INFO [workerthread-4] worker-4
其中logging模塊的basicConfig函式的format中的%(threadName)s就是用來輸出當前執行緒的名稱的,
執行緒可以重名, 執行緒名并不是執行緒的唯一標識,但是通常應該避免執行緒重名,通常的處理手段是加前綴
daemon
Daemon:守護
和Daemon執行緒相對應的還有Non-Daemon執行緒,在此Thread初始化函式中的daemon引數即表示執行緒是否是Daemon執行緒,
- Daemon執行緒:會伴隨主執行緒結束而結束(可以理解為主執行緒結束,守護執行緒結束)
- Non-Daemon執行緒:不會隨著主執行緒結束而結束,主執行緒需要等待Non-Daemon結束
import logging
import time
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
def worker():
logging.info('starting')
time.sleep(2)
logging.info('stopping')
if __name__ == '__main__':
logging.info('starting')
t1 = threading.Thread(target=worker, name='worker1', daemon=False)
t1.start()
time.sleep(1)
t2 = threading.Thread(target=worker, name='worker2', daemon=True)
t2.start()
logging.info('stopping')
# 輸出
2017-03-20 23:28:06,404 INFO [MainThread] starting
2017-03-20 23:28:06,436 INFO [worker1] starting
2017-03-20 23:28:07,492 INFO [worker2] starting
2017-03-20 23:28:07,492 INFO [MainThread] stopping # 主執行緒執行完成
2017-03-20 23:28:08,439 INFO [worker1] stopping # 主執行緒執行完成之后會等Non-Daemon執行緒執行完成,但是并不會等Daemon執行緒執行完成,即Daemon執行緒會隨著主執行緒執行完成而釋放
Thread.join()
如果想等Daemon執行緒執行完成之后主執行緒再退出,可以使用執行緒物件的join()方法
import logging
import time
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
def worker():
logging.info('starting')
time.sleep(2)
logging.info('stopping')
if __name__ == '__main__':
logging.info('starting')
t1 = threading.Thread(target=worker, name='worker1', daemon=False)
t1.start()
time.sleep(1)
t2 = threading.Thread(target=worker, name='worker2', daemon=True)
t2.start()
logging.info('stopping')
t1.join()
t2.join()
# 輸出
2017-03-20 23:41:07,217 INFO [MainThread] starting
2017-03-20 23:41:07,243 INFO [worker1] starting
2017-03-20 23:41:08,245 INFO [worker2] starting
2017-03-20 23:41:08,246 INFO [MainThread] stopping
2017-03-20 23:41:09,243 INFO [worker1] stopping
2017-03-20 23:41:10,248 INFO [worker2] stopping
使用join函式只有主執行緒就需要等待Daemon執行緒執行完成在推出,
join函式的原型:join(self, timeout=None)
join方法會阻塞直到執行緒退出或者超時, timeout 是可選的,如果不設定timeout, 會一直等待執行緒退出,如果設定了timeout,會在超時之后退出或者執行緒執行完成退出,
因為join函式總是回傳None,因此在超時時間到達之后如果要知道執行緒是否還是存活的,可以呼叫is_alive()方法判斷執行緒是否存活,
threading常用方法
enumerate()
列出當前所有的存活的執行緒
>>> threading.enumerate()
[<_MainThread(MainThread, started 140209670301504)>, <Thread(worker1, started 140209545410304)>, <Thread(worker2, started daemon 140209537017600)>]
local()
import logging
import threading
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
ctx = threading.local()
ctx.data = https://www.cnblogs.com/CHLL55/archive/2020/10/06/5
data ='a'
def worker():
logging.info(data)
logging.info(ctx.data)
worker()
threading.Thread(target=worker).start()
# 輸出
2017-03-21 00:02:08,102 INFO [MainThread] a
2017-03-21 00:02:08,113 INFO [MainThread] 5
2017-03-21 00:02:08,119 INFO [Thread-34] a
Exception in thread Thread-34:
Traceback (most recent call last):
File "/home/clg/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run()
File "/home/clg/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "<ipython-input-28-5395bd925d87>", line 7, in worker
logging.info(ctx.data)
AttributeError: '_thread._local' object has no attribute 'data'
執行緒共享記憶體、狀態和資源,但是thread模塊的local類的物件的屬性, 只在當前執行緒可見,
Thread類的派生
Python中可以通過繼承 Thread 類并重寫 run 方法來撰寫多執行緒的邏輯,此時邏輯函式就是run,
class mythread(threading.Thread):
def run(self):
print('mythread run')
t = mythread()
t.run() # 輸出mythread run
t.start() # 輸出mythread run
通過繼承方式派生而來的子類物件可以同時執行start方法和run方法,結果是一樣的,都是執行子類的run方法,但是非繼承的方式不能同時使用start方法和run方法,會報錯,
派生時邏輯函式的引數傳遞
class mythread(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__() # 需要呼叫父類的初始化方法初始化
self.args = args
self.kwargs = kwargs
def run(self):
print('mythread run', self.args, self.kwargs)
t = mythread(1, 2, 3, a='b')
t.start() # 輸出mythread run (1, 2, 3) {'a': 'b'}
Timer類
Timer類:Thread類的派生類,也在threading模塊中,意為定時器,用作執行緒的延遲執行,
>>> help(threading.Timer)
Timer類的初始化方法:__init__(self, interval, function, args=None, kwargs=None)
- interval:時間間隔,即幾秒之后開始執行function
- function:執行緒執行的邏輯函式
- args:位置引數
- kwargs:關鍵字引數
代碼
import threading
import time
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
def worker():
logging.info('worker running')
t1 = threading.Timer(interval=3, function=worker)
t2 = threading.Timer(interval=3, function=worker)
t1.setName('t1')
t2.setName('t2')
logging.info('start')
t1.start()
t2.start()
time.sleep(2)
logging.info('canceling {}'.format(t1.name))
t1.cancel() # 2s之后仍然可以取消t1
logging.info('end')
# 輸出
2017-03-21 19:28:52,801 INFO [MainThread] start
2017-03-21 19:28:54,811 INFO [MainThread] canceling t1
2017-03-21 19:28:54,819 INFO [MainThread] end
2017-03-21 19:28:55,808 INFO [t2] worker running
Timer.cancel():取消仍然存活的定時器,如果定時器已經開始執行function,則無法取消,
Timer.setDaemon(True):設定定時器為守護執行緒
執行緒同步
當使用多個執行緒來訪問同一個資料時,會經常出現資源爭用等執行緒安全問題(比如多個執行緒都在操作同一資料導致資料不一致),這時候我們就可以使用一些同步技術來解決這類問題,比如Event,Lock,Condition,Barrier,Semaphore等等,
Event
>>> help(threading.Event)
Event物件內置一個標志,這個標志可以由set()方法和clear()方法設定,執行緒可以使用wait()方法進行阻塞等待,知道Event物件內置標志被set,
- clear(self):設定內置標志為False
- set(self):設定內置標志為True
- wait(self, timeout=None):開始阻塞,直到內置標志被設定為True(即wait會阻塞執行緒直到set方法被呼叫或者超時)
- is_set(self):當且僅當內置標志為True的時候回傳True
代碼
以下代碼實作的邏輯是:一個boss和五個睡覺工人,只要有一個工人完成了睡覺任務,那么就喚醒boss和其他工人,
import datetime
import threading
import logging
import random
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
def worker(event: threading.Event):
s = random.randint(1, 5)
event.wait(s) # wait方法而不使用sleep方法,可以讓其他工人收到通知后不再等待
logging.info('sleep {}'.format(s))
event.set()
def boss(event:threading.Event):
start = datetime.datetime.now()
event.wait()
end = datetime.datetime.now()
logging.info('that boss exit takes {}s'.format(end - start))
def start():
event = threading.Event()
b = threading.Thread(target=boss, args=(event, ), name='boss')
b.start()
for i in range(5):
t = threading.Thread(target=worker, args=(event, ), name='worker-{}'.format(i))
t.start()
執行start()方法,測驗結果
>>> start()
2017-03-21 21:20:17,195 INFO [worker-2] sleep 1
2017-03-21 21:20:17,198 INFO [boss] that boss exit takes 0:00:01.004954s
2017-03-21 21:20:17,199 INFO [worker-0] sleep 2
2017-03-21 21:20:17,199 INFO [worker-1] sleep 3
2017-03-21 21:20:17,199 INFO [worker-3] sleep 2
2017-03-21 21:20:17,198 INFO [worker-4] sleep 1
可以看到:worker-2退出之后,boss和另外四個worker也瞬間就退出了,所以event物件的內置狀態被set之后,相關執行緒就不再wait了,
- event:在執行緒之間發送信號,通常用于某個執行緒需要等待其他執行緒處理完成某些動作之后才能啟動
wait()方法的timeout引數
def worker(event: threading.Event):
while not event.wait(3):
logging.info('run run run')
event = threading.Event()
threading.Thread(target=worker, args=(event, )).start()
# 輸出
2017-03-21 21:32:47,275 INFO [Thread-8] run run run
2017-03-21 21:32:50,277 INFO [Thread-8] run run run
2017-03-21 21:32:53,281 INFO [Thread-8] run run run
2017-03-21 21:32:56,284 INFO [Thread-8] run run run
...
程式每隔3s就會輸出一次結果,直到執行set()方法才會停止,因此我們可以寫一個定時器(類似于Thread類的派生類Timer),
代碼
class Timer:
def __init__(self, interval, function, *args, **kwargs):
self.interval = interval
self.function = function
self.args = args
self.kwargs = kwargs
self.event = threading.Event()
self.thread = threading.Thread(target=self.__target(), args=args, kwargs=kwargs)
def __target(self):
if not self.event.wait(self.interval):
return self.function
def start(self):
self.thread.start()
def cancel(self):
self.event.set()
def worker(act):
logging.info('run-{}'.format(act))
t = Timer(5, worker, 'hahaha')
t.start() # 輸出2017-03-21 22:14:59,645 INFO [Thread-20] run-hahaha
延遲5s之后執行了邏輯函式,也可以使用cancel函式取消,(要注意引數的傳遞,此處Timer初始化不能使用關鍵字引數)
Lock
event是用來同步執行緒之間的操作的,但是如果要控制共享資源的訪問那就需要用到鎖機制了,在Python標準庫中的實作就是內置的lock類,
>>> help(threading.Lock)
threading.Lock()函式會創建一個lock類的物件,
>>> help(threading.Lock())
鎖物件是一個同步原語(synchronization primitive),lock物件主要有以下三個方法:
- acquire(): acquire(blocking=True, timeout=-1) -> bool 獲得鎖(即鎖定鎖),成功獲得鎖回傳True,沒有獲得鎖則回傳False,
- release(): release() 釋放鎖
- locked(): locked() -> bool 檢查鎖是否被鎖住
代碼
以下代碼實作了在多個行程同時對資源進行訪問時,進行加鎖和解鎖的操作,保證加減操作和賦值操作組合之后的原子性,
class Counter: # 計時器有加減方法,都會修改value值,因此都需要加鎖處理
def __init__(self, start=0):
self.value = https://www.cnblogs.com/CHLL55/archive/2020/10/06/start
self.lock = threading.Lock()
def inc(self):
self.lock.acquire()
try:
self.value += 1
finally:
self.lock.release() # 需要用finally陳述句保證鎖一定會被釋放,否則資源永遠不可訪問
def dec(self):
self.lock.acquire()
try:
self.value -= 1
finally:
self.lock.release()
def inc_worker(c: Counter):
pause = random.random()
logging.info('sleeping-{}'.format(pause))
time.sleep(pause)
c.inc()
logging.info('cur_value:{}'.format(c.value))
def dec_worker(c: Counter):
pause = random.random()
logging.info('sleeping-{}'.format(pause))
time.sleep(pause)
c.dec()
logging.info('cur_value:{}'.format(c.value))
c = Counter()
for i in range(2):
threading.Thread(target=inc_worker, args=(c, ), name='inc_worker-{}'.format(i)).start()
for i in range(3):
threading.Thread(target=dec_worker, args=(c, ), name='dec_worker-{}'.format(i)).start()
測驗輸出
2017-03-21 23:17:44,761 INFO [inc_worker-0] sleeping-0.6542416949220327
2017-03-21 23:17:44,766 INFO [inc_worker-1] sleeping-0.48615543229897873
2017-03-21 23:17:44,771 INFO [dec_worker-0] sleeping-0.12355589507242459
2017-03-21 23:17:44,776 INFO [dec_worker-1] sleeping-0.5276710391905681
2017-03-21 23:17:44,784 INFO [dec_worker-2] sleeping-0.5546251407611247
2017-03-21 23:17:44,900 INFO [dec_worker-0] cur_value:-1
2017-03-21 23:17:45,258 INFO [inc_worker-1] cur_value:0
2017-03-21 23:17:45,312 INFO [dec_worker-1] cur_value:-1
2017-03-21 23:17:45,351 INFO [dec_worker-2] cur_value:-2
2017-03-21 23:17:45,421 INFO [inc_worker-0] cur_value:-1
可見,各項操作之間保持相互原子性,沒有出現干擾,
因為lock類實作了__enter__和__exit__兩個魔術方法,因此支持背景關系管理器,可以修改以上Counter類的實作方法如下:
class Counter:
def __init__(self, start=0):
self.value = https://www.cnblogs.com/CHLL55/archive/2020/10/06/start
self.lock = threading.Lock()
def inc(self):
self.lock.acquire()
with self.lock:
self.value += 1
def dec(self):
self.lock.acquire()
with self.lock:
self.value -= 1
即使用背景關系管理器來代替try...finally...陳述句,測驗輸出應該以以上結果一致,
acquire方法的blocking引數
當blocking=True時,A執行緒中執行了lock.acquire()方法之后并且沒有執行到lock.release()方法,如果在B執行緒中再次執行lock.acquire()方法,則B執行緒阻塞,
- 正如以上代碼實作,當有n個執行緒需要修改一個共享資源的時候,其他執行緒在獲取鎖之前都處于阻塞狀態,(python的阻塞都會讓出cpu的時間片,因此不是忙等待)
當blocking=Fasle時,A執行緒中執行了lock.acquire()方法之后并且沒有執行到lock.release()方法,如果在B執行緒中再次執行lock.acquire()方法,則B執行緒不會阻塞,并且acquire函式回傳False,
acquire方法的timeout引數
當blocking=True并且timeout>0時,acquire會一直阻塞到超時或者鎖被釋放,
acquire(0)的引數傳遞
模擬acquire方法的默認引數,撰寫一下函式進行模擬引數傳遞的程序:
def print1(blocking=True, timeout=-1):
print(blocking, timeout)
print1(0) # 輸出0 -1
print1(10) # 輸出10 -1
可見第一個位置引數,替代了blocking,也就是說lock.acquire(0)等效于lock.acquire(blocking=False)
RLock
正常的lock物件是不能多次呼叫acquire的,但是可重用鎖RLock可以多次呼叫 acquire 而不阻塞,而且 release 時也要執行和 acquire 一樣的次數,
Condition
除了Event物件之外,執行緒同步還可以使用條件同步機制Condition,一類執行緒等待特定條件,而另一類執行緒發出特定條件滿足的信號,
>>> help(threading.Condition)
在Condition的幫助中有以下幾個方法:
- 初始化方法:init(self, lock=None),如果給定了lock引數,那么必須是Lock或者Rlock物件,并且被當做底層鎖來使用,如果沒有指定,那么會創建一個RLock物件的鎖,也被當做底層鎖來使用,
- 實作了
__enter__和__exit__方法,支持背景關系管理器, - notify(self, n=1):喚醒一個或多個在當前Condition上等待的其他執行緒,如果此方法的呼叫執行緒沒有獲得鎖,那么在呼叫的時候就會報錯RuntimeError
- notify_all(self):喚醒所有執行緒
- wait(self, timeout=None):一直等待著知道被notifyed或者發生超時
實體代碼
以下代碼實作的是:有一個生產者執行緒,會生產若干次,每次生產結束后需要通知所有的消費者執行緒來消費,因此下面代碼使用的是notify_all方法,
import threading
import time
import logging
import random
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
class Producer_Consumer_Model:
def __init__(self):
self.data = https://www.cnblogs.com/CHLL55/archive/2020/10/06/None
self.event = threading.Event() # 用來控制消費者退出
self.condition = threading.Condition()
def Consumer(self):
while not self.event.is_set():
with self.condition:
self.condition.wait() # 一直等待直到收到生產者通知notify_all
logging.info(self.data) # 收到通知之后,開始執行消費者的業務邏輯部分
def Producer(self):
for _ in range(4): # 每個生產者生產4次
data = random.randint(0, 100)
logging.info(data)
with self.condition:
self.data = data # 寫入成功就表示生產成功,因此需要在此加鎖并且能夠通知消費者執行緒去消費,因此選擇使用condition來處理
self.condition.notify_all() # 生產成功之后通知所有的消費者去消費
self.event.wait(1) # 沒生產一次等待1s
self.event.set() # 所有的生產完成之后通知消費者退出
m = Producer_Consumer_Model()
for i in range(3):
threading.Thread(target=m.Consumer, name='Consumer-{}'.format(i)).start()
p = threading.Thread(target=m.Producer, name='Producer')
p.start()
測驗結果(一個生產者,三個消費者)
2017-03-22 22:07:42,875 INFO [Producer] 16
2017-03-22 22:07:42,883 INFO [Consumer-0] 16
2017-03-22 22:07:42,890 INFO [Consumer-2] 16
2017-03-22 22:07:42,894 INFO [Consumer-1] 16
2017-03-22 22:07:43,884 INFO [Producer] 76
2017-03-22 22:07:43,888 INFO [Consumer-0] 76
2017-03-22 22:07:43,895 INFO [Consumer-2] 76
2017-03-22 22:07:43,898 INFO [Consumer-1] 76
2017-03-22 22:07:44,889 INFO [Producer] 31
2017-03-22 22:07:44,891 INFO [Consumer-0] 31
2017-03-22 22:07:44,911 INFO [Consumer-2] 31
2017-03-22 22:07:44,913 INFO [Consumer-1] 31
2017-03-22 22:07:45,892 INFO [Producer] 17
2017-03-22 22:07:45,894 INFO [Consumer-0] 17
2017-03-22 22:07:45,907 INFO [Consumer-2] 17
2017-03-22 22:07:45,910 INFO [Consumer-1] 17
可見,生產者每生產一次,所有的消費者就會去消費,如果想控制每次生產之后通知幾個消費者來消費,那么就可以使用notify方法,指定消費者執行緒個數,
代碼如下
import threading
import time
import logging
import random
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
class Producer_Consumer_Model:
def __init__(self):
self.data = https://www.cnblogs.com/CHLL55/archive/2020/10/06/None
self.event = threading.Event() # 用來控制消費者退出
self.condition = threading.Condition()
def Consumer(self):
while not self.event.is_set():
with self.condition:
self.condition.wait() # 一直等待直到收到生產者通知notify_all
logging.info(self.data) # 收到通知之后,開始執行消費者的業務邏輯部分
def Producer(self):
for _ in range(4): # 每個生產者生產4次
data = random.randint(0, 100)
logging.info(data)
with self.condition:
self.data = data # 寫入成功就表示生產成功,因此需要在此加鎖并且能夠通知消費者執行緒去消費,因此選擇使用condition來處理
self.condition.notify(1) # 生產成功之后通知所有的消費者去消費
self.event.wait(1) # 沒生產一次等待1s
self.event.set() # 所有的生產完成之后通知消費者退出
m = Producer_Consumer_Model()
for i in range(3):
threading.Thread(target=m.Consumer, name='Consumer-{}'.format(i)).start()
p = threading.Thread(target=m.Producer, name='Producer')
p.start()
測驗結果(一個生產者,三個消費者,每次生產之后只通知一個消費者去消費)
2017-03-22 22:24:52,933 INFO [Producer] 11
2017-03-22 22:24:52,948 INFO [Consumer-0] 11
2017-03-22 22:24:53,949 INFO [Producer] 47
2017-03-22 22:24:53,967 INFO [Consumer-1] 47
2017-03-22 22:24:54,967 INFO [Producer] 14
2017-03-22 22:24:54,983 INFO [Consumer-2] 14
2017-03-22 22:24:55,986 INFO [Producer] 54
2017-03-22 22:24:55,993 INFO [Consumer-0] 54
- Condition 通常用于生產者消費者模式, 生產者生產訊息之后, 使用notify 或者 notify_all 通知消費者消費,
- 消費者使用wait方法阻塞等待生產者通知
- notify通知指定個wait的執行緒, notify_all通知所有的wait執行緒
- 無論notify/notify_all還是wait 都必須先acqurie, 完成后必須確保release, 通常使用with語法
Barrier
Barrier類存在于threading模塊中,中文可以翻譯成柵欄
>>> help(threading.Barrier)
可以看到Barrier的主要方法和屬性:
__init__(self, parties, action=None, timeout=None):初始化方法,創建一個Barrierparties:所有參與的執行緒的數量action:所有的執行緒都wait之后并且在執行緒釋放之前就會執行這個action函式,相當于集結之后要做的事情,timeout:相當于給需要等待的每個執行緒的wait方法加上timeout引數,超時則barrier不再生效
abort(self):將Barrier設定成broken狀態reset(self):將Barrier重置為最初狀態wait(self, timeout=None):在Barrier前等待,回傳在Barrier前等待的下標,從0到parties-1broken:如果Barrier處于broken狀態則回傳Truen_waiting:當前已經在Barrier處等待的執行緒的數量parties:需要在Barrier處等待的執行緒的數量
示例代碼
import threading
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
barrier = threading.Barrier(parties=3)
def worker(barrier: threading.Barrier):
logging.info('waiting for barrier with {} others'.format(barrier.n_waiting))
worker_id = barrier.wait()
logging.info('after barrier {}'.format(worker_id))
for i in range(3):
threading.Thread(target=worker, args=(barrier, ), name='worker-{}'.format(i)).start()
測驗結果
2017-03-22 23:25:03,992 INFO [worker-0] waiting for barrier with 0 others
2017-03-22 23:25:03,995 INFO [worker-1] waiting for barrier with 1 others
2017-03-22 23:25:03,998 INFO [worker-2] waiting for barrier with 2 others
2017-03-22 23:25:04,001 INFO [worker-2] after barrier 2
2017-03-22 23:25:04,001 INFO [worker-0] after barrier 0
2017-03-22 23:25:04,001 INFO [worker-1] after barrier 1
可見,所有的執行緒都會一直等待,知道所有的執行緒都到期了,然后就通過barrier,繼續執行后續操作,
Barrier會建立一個控制點,所有參與的執行緒都會阻塞,直到所有參與的“各方”達到這一點, 它讓執行緒分開啟動,然后暫停,直到它們都準備好再繼續,因此,這一點可以理解為各個執行緒的一個集結點,
abort函式的使用
將Barrier設定成broken狀態,所有執行緒在參與集結程序中,只要執行了barrier.abort方法,那么正在等待的執行緒都會拋出threading.BrokenBarrierError例外,可以理解為,只要有一個執行緒確定已經到不了Barrier并且通知了Barrier,那么Barrier就會執行abort方法,通知所有正在wait的執行緒放棄集結,
實體代碼
import threading
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
def worker(barrier: threading.Barrier):
logging.info('waiting for barrier with {} others'.format(barrier.n_waiting))
try:
worker_id = barrier.wait()
except threading.BrokenBarrierError:
logging.info('aborting')
else:
logging.info('after barrier {}'.format(worker_id))
barrier = threading.Barrier(4) # 需要等待4個執行緒
for i in range(3):
threading.Thread(target=worker, args=(barrier, ), name='worker-{}'.format(i)).start() # 3個執行緒都開始wait
barrier.abort() # 還有一個執行緒沒有到wait,此時執行abort方法,則所有正在wait的執行緒都拋出例外
測驗結果
2017-03-22 23:47:43,184 INFO [worker-0] waiting for barrier with 0 others
2017-03-22 23:47:43,192 INFO [worker-1] waiting for barrier with 1 others
2017-03-22 23:47:43,201 INFO [worker-2] waiting for barrier with 2 others
2017-03-22 23:47:43,211 INFO [worker-2] aborting
2017-03-22 23:47:43,207 INFO [worker-1] aborting
2017-03-22 23:47:43,207 INFO [worker-0] aborting
Semaphore
Semaphore類存在于threading模塊中
help(threading.Semaphore)
信號量內部管理者一個計數器,這個計數器的值等于release()方法呼叫的次數減去acquire()方法呼叫的次數然后再加上初始值value,value默認為1,
可以看到Semaphore的主要方法:
__init__(self, value=https://www.cnblogs.com/CHLL55/archive/2020/10/06/1):初始化一個信號量,value為內部計數器賦初值,默認為1acquire(self, blocking=True, timeout=None):獲取信號量,內部計數器減一release(self):釋放信號量,內部計數器加一
示例代碼
import threading
import time
import logging
import random
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')
class Pool:
def __init__(self, num):
self.num = num
self.conns = [self._make_connect(i) for i in range(num)] # 存放連接
self.sem = threading.Semaphore(num) # 信號量內部計數器初始為連接數
def _make_connect(self, name): # 根據連接名稱創建連接
conn = 'connect-{}'.format(name)
return conn
def get_connect(self): # 從連接池獲取連接
self.sem.acquire()
return self.conns.pop()
def return_connect(self, conn): # 將連接conn返還到連接池中
self.conns.insert(0, conn)
self.sem.release()
def worker(pool: Pool):
logging.info('starting')
conn = pool.get_connect() # 如果獲取不到則會阻塞在acquire處
logging.info('get connect {}'.format(conn))
t = random.randint(1, 3)
time.sleep(t)
logging.info('takes {}s'.format(t))
pool.return_connect(conn)
logging.info('return connect {}'.format(conn))
pool = Pool(2) # 連接池中有兩個連接可以使用
for i in range(3): # 三個執行緒使用兩個連接開始任務
threading.Thread(target=worker, args=(pool, ), name='worker-{}'.format(i)).start()
測驗結果
2017-03-23 00:54:36,056 INFO [worker-0] starting
2017-03-23 00:54:36,062 INFO [worker-0] get connect connect-1
2017-03-23 00:54:36,074 INFO [worker-1] starting
2017-03-23 00:54:36,079 INFO [worker-1] get connect connect-0
2017-03-23 00:54:36,089 INFO [worker-2] starting
2017-03-23 00:54:39,074 INFO [worker-0] takes 3s
2017-03-23 00:54:39,076 INFO [worker-0] return connect connect-1
2017-03-23 00:54:39,076 INFO [worker-2] get connect connect-1
2017-03-23 00:54:39,093 INFO [worker-1] takes 3s
2017-03-23 00:54:39,097 INFO [worker-1] return connect connect-0
2017-03-23 00:54:40,093 INFO [worker-2] takes 1s
2017-03-23 00:54:40,107 INFO [worker-2] return connect connect-1
這個測驗結果顯示:三個執行緒獲取連接池中的兩個連接,結果出現了一個執行緒等待其他執行緒執行完成之后再獲取連接的程序,
Queue
Condition執行緒同步部分用來傳遞資料的是一個封裝在生產者消費者模型中的元素data(正常使用情況下一般封裝的都是一個串列,類似與Barrier部分的連接池中的conns串列),
Python的queue模塊中提供了同步的、執行緒安全的佇列類,包括三種佇列:
- FIFO(先入先出)佇列Queue
- LIFO(后入先出)佇列LifoQueue
- 優先級佇列PriorityQueue
這些佇列都實作了鎖原語,能夠在多執行緒中直接使用,可以使用佇列來實作執行緒間的同步,因此我們可以使用queue模塊來替換掉生產者消費者中的全域元素,代碼如下:
import random
import queue
import threading
class Producer_Consumer_Model:
def __init__(self):
self.q = queue.Queue()
self.event = threading.Event()
def Consumer(self):
while not self.event.is_set():
logging.info(self.q.get())
def Producer(self):
while not self.event.wait(3):
data = https://www.cnblogs.com/CHLL55/archive/2020/10/06/random.randint(1, 100)
logging.info(data)
self.q.put(data)
m = Producer_Consumer_Model()
threading.Thread(target=m.Consumer, name='Consumer').start()
threading.Thread(target=m.Producer, name='Producer').start()
測驗結果
2017-03-23 10:11:22,990 INFO [Producer] 26
2017-03-23 10:11:22,993 INFO [Consumer] 26
2017-03-23 10:11:25,993 INFO [Producer] 89
2017-03-23 10:11:26,003 INFO [Consumer] 89
2017-03-23 10:11:29,004 INFO [Producer] 14
2017-03-23 10:11:29,006 INFO [Consumer] 14
2017-03-23 10:11:32,007 INFO [Producer] 17
2017-03-23 10:11:32,009 INFO [Consumer] 17
每生產一次,消費者就會消費一次,當消費者執行緒,讀取Queue則呼叫Queue.get()方法,若Queue為空時消費者執行緒獲取不到內容,就會阻塞在這里,直到成功獲取內容,
執行緒同步總結
- Event:主要用于執行緒之間的事件通知
- Lock,Rlock:主要用于保護共享資源
- Condition:主要用于生產者消費者模型,可以理解為Event和Lock的結合體
- Barrier:同步指定個等待的執行緒
- Semaphore:主要用于保護資源,和Lock的區別在于可以多個執行緒訪問共享資源,而鎖一次只能一個執行緒訪問到共享資源,即鎖是value=https://www.cnblogs.com/CHLL55/archive/2020/10/06/1的信號量
- Queue:使用FIFO佇列進行同步,適用于生產者消費者模型
GIL
GIL(Global Interpreter Lock):全域解釋器鎖
Python代碼的執行由Python 主回圈來控制,Python 在設計之初就考慮到要在解釋器的主回圈中,同時只有一個執行緒在執行,即在任意時刻,只有一個執行緒在解釋器中運行,對Python 主回圈的訪問由全域解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個執行緒在運行,
因此Python多執行緒程式的執行順序如下:
- 設定GIL
- 切換到一個執行緒去運行
- 運行
- 結束執行緒
- 解鎖GIL
- 重復以上步驟
因此,Python的多執行緒并沒有實作并行,只是實作了并發而已,如果要實作真正的并行,那就需要使用Python的多行程模塊multiprocessing(multiprocessing模塊的宗旨是像管理執行緒一樣來管理行程),
參考資料
- threading — Manage Concurrent Operations Within a Process
- Python執行緒同步機制: Locks, RLocks, Semaphores, Conditions, Events和Queues
記得幫我點贊哦!
精心整理了計算機各個方向的從入門、進階、實戰的視頻課程和電子書,按照目錄合理分類,總能找到你需要的學習資料,還在等什么?快去關注下載吧!!!

念念不忘,必有回響,小伙伴們幫我點個贊吧,非常感謝,
我是職場亮哥,YY高級軟體工程師、四年作業經驗,拒絕咸魚爭當龍頭的斜杠程式員,
聽我說,進步多,程式人生一把梭
如果有幸能幫到你,請幫我點個【贊】,給個關注,如果能順帶評論給個鼓勵,將不勝感激,
職場亮哥文章串列:更多文章

本人所有文章、回答都與著作權保護平臺有合作,著作權歸職場亮哥所有,未經授權,轉載必究!
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/159499.html
標籤:其他
