主頁 > 後端開發 > Python多執行緒

Python多執行緒

2020-10-06 14:49:00 後端開發

多執行緒基礎概念

并行與并發

  • 并行:同時處理多個任務,必須在多核環境下
  • 一段時間內同時處理多個任務,單核也可以并發

并發手段

  • 執行緒:內核空間的調度
  • 行程:內核空間的調度
  • 協程:用戶空間的調度

執行緒可以允許程式在同一行程空間中并發運行多個操作,本次主要介紹Python標準庫中的多執行緒模塊threading,

threading模塊

執行緒初始化

使用threading模塊的Thread類初始化物件然后呼叫start方法啟動執行緒,

import threading
import time

def worker(num):
    time.sleep(1)
    print('worker-{}'.format(num))

# 創建執行緒物件 target引數是一個函式, 這個函式即執行緒要執行的邏輯
threads = [threading.Thread(target=worker, args=(i, ))for i in range(5)]
for t in threads:
    t.start()
    # start 方法啟動一個執行緒, 當這個執行緒的邏輯執行完畢的時候,執行緒自動退出, Python 沒有提供主動退出執行緒的方法

# 輸出以下結果
worker-0worker-1worker-2worker-3



worker-4

初始化的五個執行緒的執行邏輯中的print方法列印字串及換行符出現了隨機分布,即出現了資源競爭,

給執行緒傳遞引數

import threading
import time

def worker(*args, **kwargs):
    time.sleep(1)
    print(args)
    print(kwargs)

threads = threading.Thread(target=worker, args=(1, 2, 3), kwargs={'a':'b'}).start()

# 輸出
(1, 2, 3)
{'a': 'b'}

args傳遞位置引數,kwargs傳遞關鍵字引數,

Thread常用引數和方法

>>> help(threading.Thread)

可以看到Thread函式的初始化方法中的引數如下:

 |  __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)
 |      This constructor should always be called with keyword arguments. Arguments are:
 |      
 |      *group* should be None; reserved for future extension when a ThreadGroup
 |      class is implemented.
 |      
 |      *target* is the callable object to be invoked by the run()
 |      method. Defaults to None, meaning nothing is called.
 |      
 |      *name* is the thread name. By default, a unique name is constructed of
 |      the form "Thread-N" where N is a small decimal number.
 |      
 |      *args* is the argument tuple for the target invocation. Defaults to ().
 |      
 |      *kwargs* is a dictionary of keyword arguments for the target
 |      invocation. Defaults to {}.

name

表示執行緒名稱,默認情況下,執行緒名稱是Thread-N,N是一個較小的十進制數,我們可以傳遞name引數,控制執行緒名稱,

以下會匯入logging模塊來顯示執行緒的名稱等詳細資訊

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker(num):
    time.sleep(1)
    logging.info('worker-{}'.format(num))

threads = [threading.Thread(target=worker, args=(i, ), name='workerthread-{}'.format(i)) for i in range(5)]
for t in threads:
    t.start()

# 輸出
2017-03-20 21:39:29,339 INFO [workerthread-0] worker-0
2017-03-20 21:39:29,340 INFO [workerthread-1] worker-1
2017-03-20 21:39:29,340 INFO [workerthread-2] worker-2
2017-03-20 21:39:29,340 INFO [workerthread-3] worker-3
2017-03-20 21:39:29,346 INFO [workerthread-4] worker-4

其中logging模塊的basicConfig函式的format中的%(threadName)s就是用來輸出當前執行緒的名稱的,

執行緒可以重名, 執行緒名并不是執行緒的唯一標識,但是通常應該避免執行緒重名,通常的處理手段是加前綴

daemon

Daemon:守護

和Daemon執行緒相對應的還有Non-Daemon執行緒,在此Thread初始化函式中的daemon引數即表示執行緒是否是Daemon執行緒,

  • Daemon執行緒:會伴隨主執行緒結束而結束(可以理解為主執行緒結束,守護執行緒結束)
  • Non-Daemon執行緒:不會隨著主執行緒結束而結束,主執行緒需要等待Non-Daemon結束
import logging
import time
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')


def worker():
    logging.info('starting')
    time.sleep(2)
    logging.info('stopping')


if __name__ == '__main__':
    logging.info('starting')
    t1 = threading.Thread(target=worker, name='worker1', daemon=False)
    t1.start()
    time.sleep(1)
    t2 = threading.Thread(target=worker, name='worker2', daemon=True)
    t2.start()
    logging.info('stopping')

# 輸出
2017-03-20 23:28:06,404 INFO [MainThread] starting
2017-03-20 23:28:06,436 INFO [worker1] starting
2017-03-20 23:28:07,492 INFO [worker2] starting
2017-03-20 23:28:07,492 INFO [MainThread] stopping  # 主執行緒執行完成
2017-03-20 23:28:08,439 INFO [worker1] stopping  # 主執行緒執行完成之后會等Non-Daemon執行緒執行完成,但是并不會等Daemon執行緒執行完成,即Daemon執行緒會隨著主執行緒執行完成而釋放

Thread.join()

如果想等Daemon執行緒執行完成之后主執行緒再退出,可以使用執行緒物件的join()方法

import logging
import time
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')


def worker():
    logging.info('starting')
    time.sleep(2)
    logging.info('stopping')


if __name__ == '__main__':
    logging.info('starting')
    t1 = threading.Thread(target=worker, name='worker1', daemon=False)
    t1.start()
    time.sleep(1)
    t2 = threading.Thread(target=worker, name='worker2', daemon=True)
    t2.start()
    logging.info('stopping')
    t1.join()
    t2.join()

# 輸出
2017-03-20 23:41:07,217 INFO [MainThread] starting
2017-03-20 23:41:07,243 INFO [worker1] starting
2017-03-20 23:41:08,245 INFO [worker2] starting
2017-03-20 23:41:08,246 INFO [MainThread] stopping
2017-03-20 23:41:09,243 INFO [worker1] stopping
2017-03-20 23:41:10,248 INFO [worker2] stopping

使用join函式只有主執行緒就需要等待Daemon執行緒執行完成在推出,

join函式的原型:join(self, timeout=None)

join方法會阻塞直到執行緒退出或者超時, timeout 是可選的,如果不設定timeout, 會一直等待執行緒退出,如果設定了timeout,會在超時之后退出或者執行緒執行完成退出,

因為join函式總是回傳None,因此在超時時間到達之后如果要知道執行緒是否還是存活的,可以呼叫is_alive()方法判斷執行緒是否存活,

threading常用方法

enumerate()

列出當前所有的存活的執行緒

>>> threading.enumerate()
[<_MainThread(MainThread, started 140209670301504)>, <Thread(worker1, started 140209545410304)>, <Thread(worker2, started daemon 140209537017600)>]

local()

import logging
import threading

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

ctx = threading.local()
ctx.data = https://www.cnblogs.com/CHLL55/archive/2020/10/06/5
data ='a'

def worker():
    logging.info(data)
    logging.info(ctx.data)

worker()
threading.Thread(target=worker).start()

# 輸出
2017-03-21 00:02:08,102 INFO [MainThread] a
2017-03-21 00:02:08,113 INFO [MainThread] 5
2017-03-21 00:02:08,119 INFO [Thread-34] a
Exception in thread Thread-34:
Traceback (most recent call last):
  File "/home/clg/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run()
  File "/home/clg/.pyenv/versions/3.5.2/lib/python3.5/threading.py", line 862, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-28-5395bd925d87>", line 7, in worker
    logging.info(ctx.data)
AttributeError: '_thread._local' object has no attribute 'data'

執行緒共享記憶體、狀態和資源,但是thread模塊的local類的物件的屬性, 只在當前執行緒可見,

Thread類的派生

Python中可以通過繼承 Thread 類并重寫 run 方法來撰寫多執行緒的邏輯,此時邏輯函式就是run,

class mythread(threading.Thread):
    def run(self):
        print('mythread run')

t = mythread()
t.run()  # 輸出mythread run
t.start()  # 輸出mythread run

通過繼承方式派生而來的子類物件可以同時執行start方法和run方法,結果是一樣的,都是執行子類的run方法,但是非繼承的方式不能同時使用start方法和run方法,會報錯,

派生時邏輯函式的引數傳遞

class mythread(threading.Thread):
    def __init__(self, *args, **kwargs):
        super().__init__()  # 需要呼叫父類的初始化方法初始化
        self.args = args
        self.kwargs = kwargs

    def run(self):
        print('mythread run', self.args, self.kwargs)

t = mythread(1, 2, 3, a='b')
t.start()  # 輸出mythread run (1, 2, 3) {'a': 'b'}

Timer類

Timer類:Thread類的派生類,也在threading模塊中,意為定時器,用作執行緒的延遲執行,

>>> help(threading.Timer)

Timer類的初始化方法:__init__(self, interval, function, args=None, kwargs=None)

  • interval:時間間隔,即幾秒之后開始執行function
  • function:執行緒執行的邏輯函式
  • args:位置引數
  • kwargs:關鍵字引數

代碼

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker():
    logging.info('worker running')

t1 = threading.Timer(interval=3, function=worker)
t2 = threading.Timer(interval=3, function=worker)
t1.setName('t1')
t2.setName('t2')
logging.info('start')
t1.start()
t2.start()
time.sleep(2)
logging.info('canceling {}'.format(t1.name))
t1.cancel()  # 2s之后仍然可以取消t1
logging.info('end')

# 輸出
2017-03-21 19:28:52,801 INFO [MainThread] start
2017-03-21 19:28:54,811 INFO [MainThread] canceling t1
2017-03-21 19:28:54,819 INFO [MainThread] end
2017-03-21 19:28:55,808 INFO [t2] worker running

Timer.cancel():取消仍然存活的定時器,如果定時器已經開始執行function,則無法取消,

Timer.setDaemon(True):設定定時器為守護執行緒

執行緒同步

當使用多個執行緒來訪問同一個資料時,會經常出現資源爭用等執行緒安全問題(比如多個執行緒都在操作同一資料導致資料不一致),這時候我們就可以使用一些同步技術來解決這類問題,比如Event,Lock,Condition,Barrier,Semaphore等等,

Event

>>> help(threading.Event)

Event物件內置一個標志,這個標志可以由set()方法和clear()方法設定,執行緒可以使用wait()方法進行阻塞等待,知道Event物件內置標志被set,

  1. clear(self):設定內置標志為False
  2. set(self):設定內置標志為True
  3. wait(self, timeout=None):開始阻塞,直到內置標志被設定為True(即wait會阻塞執行緒直到set方法被呼叫或者超時)
  4. is_set(self):當且僅當內置標志為True的時候回傳True

代碼

以下代碼實作的邏輯是:一個boss和五個睡覺工人,只要有一個工人完成了睡覺任務,那么就喚醒boss和其他工人,

import datetime
import threading
import logging
import random

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker(event: threading.Event):
    s = random.randint(1, 5)
    event.wait(s)  # wait方法而不使用sleep方法,可以讓其他工人收到通知后不再等待
    logging.info('sleep {}'.format(s))
    event.set()


def boss(event:threading.Event):
    start = datetime.datetime.now()
    event.wait()
    end = datetime.datetime.now()
    logging.info('that boss exit takes {}s'.format(end - start))

def start():
    event = threading.Event()
    b = threading.Thread(target=boss, args=(event, ), name='boss')
    b.start()
    for i in range(5):
        t = threading.Thread(target=worker, args=(event, ), name='worker-{}'.format(i))
        t.start()

執行start()方法,測驗結果

>>> start()

2017-03-21 21:20:17,195 INFO [worker-2] sleep 1
2017-03-21 21:20:17,198 INFO [boss] that boss exit takes 0:00:01.004954s
2017-03-21 21:20:17,199 INFO [worker-0] sleep 2
2017-03-21 21:20:17,199 INFO [worker-1] sleep 3
2017-03-21 21:20:17,199 INFO [worker-3] sleep 2
2017-03-21 21:20:17,198 INFO [worker-4] sleep 1

可以看到:worker-2退出之后,boss和另外四個worker也瞬間就退出了,所以event物件的內置狀態被set之后,相關執行緒就不再wait了,

  • event:在執行緒之間發送信號,通常用于某個執行緒需要等待其他執行緒處理完成某些動作之后才能啟動

wait()方法的timeout引數

def worker(event: threading.Event):
    while not event.wait(3):
        logging.info('run run run')

event = threading.Event()
threading.Thread(target=worker, args=(event, )).start()

# 輸出
2017-03-21 21:32:47,275 INFO [Thread-8] run run run
2017-03-21 21:32:50,277 INFO [Thread-8] run run run
2017-03-21 21:32:53,281 INFO [Thread-8] run run run
2017-03-21 21:32:56,284 INFO [Thread-8] run run run
...

程式每隔3s就會輸出一次結果,直到執行set()方法才會停止,因此我們可以寫一個定時器(類似于Thread類的派生類Timer),

代碼

class Timer:
    def __init__(self, interval, function, *args, **kwargs):
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.event = threading.Event()
        self.thread = threading.Thread(target=self.__target(), args=args, kwargs=kwargs)

    def __target(self):
        if not self.event.wait(self.interval):
            return self.function

    def start(self):
        self.thread.start()

    def cancel(self):
        self.event.set()

def worker(act):
    logging.info('run-{}'.format(act))

t = Timer(5, worker, 'hahaha')
t.start()  # 輸出2017-03-21 22:14:59,645 INFO [Thread-20] run-hahaha

延遲5s之后執行了邏輯函式,也可以使用cancel函式取消,(要注意引數的傳遞,此處Timer初始化不能使用關鍵字引數

Lock

event是用來同步執行緒之間的操作的,但是如果要控制共享資源的訪問那就需要用到鎖機制了,在Python標準庫中的實作就是內置的lock類,

>>> help(threading.Lock)

threading.Lock()函式會創建一個lock類的物件,

>>> help(threading.Lock())

鎖物件是一個同步原語(synchronization primitive),lock物件主要有以下三個方法:

  1. acquire(): acquire(blocking=True, timeout=-1) -> bool 獲得鎖(即鎖定鎖),成功獲得鎖回傳True,沒有獲得鎖則回傳False,
  2. release(): release() 釋放鎖
  3. locked(): locked() -> bool 檢查鎖是否被鎖住

代碼

以下代碼實作了在多個行程同時對資源進行訪問時,進行加鎖和解鎖的操作,保證加減操作和賦值操作組合之后的原子性,

class Counter:  # 計時器有加減方法,都會修改value值,因此都需要加鎖處理
    def __init__(self, start=0):
        self.value = https://www.cnblogs.com/CHLL55/archive/2020/10/06/start
        self.lock = threading.Lock()

    def inc(self):
        self.lock.acquire()
        try:
            self.value += 1
        finally:
            self.lock.release()  # 需要用finally陳述句保證鎖一定會被釋放,否則資源永遠不可訪問

    def dec(self):
        self.lock.acquire()
        try:
            self.value -= 1
        finally:
            self.lock.release()

def inc_worker(c: Counter):
    pause = random.random()
    logging.info('sleeping-{}'.format(pause))
    time.sleep(pause)
    c.inc()
    logging.info('cur_value:{}'.format(c.value))

def dec_worker(c: Counter):
    pause = random.random()
    logging.info('sleeping-{}'.format(pause))
    time.sleep(pause)
    c.dec()
    logging.info('cur_value:{}'.format(c.value))

c = Counter()
for i in range(2):
    threading.Thread(target=inc_worker, args=(c, ), name='inc_worker-{}'.format(i)).start()

for i in range(3):
    threading.Thread(target=dec_worker, args=(c, ), name='dec_worker-{}'.format(i)).start()

測驗輸出

2017-03-21 23:17:44,761 INFO [inc_worker-0] sleeping-0.6542416949220327
2017-03-21 23:17:44,766 INFO [inc_worker-1] sleeping-0.48615543229897873
2017-03-21 23:17:44,771 INFO [dec_worker-0] sleeping-0.12355589507242459
2017-03-21 23:17:44,776 INFO [dec_worker-1] sleeping-0.5276710391905681
2017-03-21 23:17:44,784 INFO [dec_worker-2] sleeping-0.5546251407611247
2017-03-21 23:17:44,900 INFO [dec_worker-0] cur_value:-1
2017-03-21 23:17:45,258 INFO [inc_worker-1] cur_value:0
2017-03-21 23:17:45,312 INFO [dec_worker-1] cur_value:-1
2017-03-21 23:17:45,351 INFO [dec_worker-2] cur_value:-2
2017-03-21 23:17:45,421 INFO [inc_worker-0] cur_value:-1

可見,各項操作之間保持相互原子性,沒有出現干擾,

因為lock類實作了__enter____exit__兩個魔術方法,因此支持背景關系管理器,可以修改以上Counter類的實作方法如下:

class Counter:
    def __init__(self, start=0):
        self.value = https://www.cnblogs.com/CHLL55/archive/2020/10/06/start
        self.lock = threading.Lock()

    def inc(self):
        self.lock.acquire()
        with self.lock:
            self.value += 1

    def dec(self):
        self.lock.acquire()
        with self.lock:
            self.value -= 1

即使用背景關系管理器來代替try...finally...陳述句,測驗輸出應該以以上結果一致,

acquire方法的blocking引數

當blocking=True時,A執行緒中執行了lock.acquire()方法之后并且沒有執行到lock.release()方法,如果在B執行緒中再次執行lock.acquire()方法,則B執行緒阻塞,

  • 正如以上代碼實作,當有n個執行緒需要修改一個共享資源的時候,其他執行緒在獲取鎖之前都處于阻塞狀態,(python的阻塞都會讓出cpu的時間片,因此不是忙等待)

當blocking=Fasle時,A執行緒中執行了lock.acquire()方法之后并且沒有執行到lock.release()方法,如果在B執行緒中再次執行lock.acquire()方法,則B執行緒不會阻塞,并且acquire函式回傳False,

acquire方法的timeout引數

當blocking=True并且timeout>0時,acquire會一直阻塞到超時或者鎖被釋放,

acquire(0)的引數傳遞

模擬acquire方法的默認引數,撰寫一下函式進行模擬引數傳遞的程序:

def print1(blocking=True, timeout=-1):
    print(blocking, timeout)

print1(0)  # 輸出0 -1
print1(10)  # 輸出10 -1

可見第一個位置引數,替代了blocking,也就是說lock.acquire(0)等效于lock.acquire(blocking=False)

RLock

正常的lock物件是不能多次呼叫acquire的,但是可重用鎖RLock可以多次呼叫 acquire 而不阻塞,而且 release 時也要執行和 acquire 一樣的次數,

Condition

除了Event物件之外,執行緒同步還可以使用條件同步機制Condition,一類執行緒等待特定條件,而另一類執行緒發出特定條件滿足的信號,

>>> help(threading.Condition)

在Condition的幫助中有以下幾個方法:

  • 初始化方法:init(self, lock=None),如果給定了lock引數,那么必須是Lock或者Rlock物件,并且被當做底層鎖來使用,如果沒有指定,那么會創建一個RLock物件的鎖,也被當做底層鎖來使用,
  • 實作了__enter____exit__方法,支持背景關系管理器,
  • notify(self, n=1):喚醒一個或多個在當前Condition上等待的其他執行緒,如果此方法的呼叫執行緒沒有獲得鎖,那么在呼叫的時候就會報錯RuntimeError
  • notify_all(self):喚醒所有執行緒
  • wait(self, timeout=None):一直等待著知道被notifyed或者發生超時

實體代碼

以下代碼實作的是:有一個生產者執行緒,會生產若干次,每次生產結束后需要通知所有的消費者執行緒來消費,因此下面代碼使用的是notify_all方法,

import threading
import time
import logging
import random
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

class Producer_Consumer_Model:
    def __init__(self):
        self.data = https://www.cnblogs.com/CHLL55/archive/2020/10/06/None
        self.event = threading.Event()  # 用來控制消費者退出
        self.condition = threading.Condition()

    def Consumer(self):
        while not self.event.is_set():
            with self.condition:
                self.condition.wait()  # 一直等待直到收到生產者通知notify_all
                logging.info(self.data)  # 收到通知之后,開始執行消費者的業務邏輯部分

    def Producer(self):
        for _ in range(4):  # 每個生產者生產4次
            data = random.randint(0, 100)
            logging.info(data)
            with self.condition:
                self.data = data  # 寫入成功就表示生產成功,因此需要在此加鎖并且能夠通知消費者執行緒去消費,因此選擇使用condition來處理
                self.condition.notify_all()  # 生產成功之后通知所有的消費者去消費
            self.event.wait(1)  # 沒生產一次等待1s
        self.event.set()  # 所有的生產完成之后通知消費者退出

m = Producer_Consumer_Model()
for i in range(3):
    threading.Thread(target=m.Consumer, name='Consumer-{}'.format(i)).start()

p = threading.Thread(target=m.Producer, name='Producer')
p.start()

測驗結果(一個生產者,三個消費者)

2017-03-22 22:07:42,875 INFO [Producer] 16
2017-03-22 22:07:42,883 INFO [Consumer-0] 16
2017-03-22 22:07:42,890 INFO [Consumer-2] 16
2017-03-22 22:07:42,894 INFO [Consumer-1] 16
2017-03-22 22:07:43,884 INFO [Producer] 76
2017-03-22 22:07:43,888 INFO [Consumer-0] 76
2017-03-22 22:07:43,895 INFO [Consumer-2] 76
2017-03-22 22:07:43,898 INFO [Consumer-1] 76
2017-03-22 22:07:44,889 INFO [Producer] 31
2017-03-22 22:07:44,891 INFO [Consumer-0] 31
2017-03-22 22:07:44,911 INFO [Consumer-2] 31
2017-03-22 22:07:44,913 INFO [Consumer-1] 31
2017-03-22 22:07:45,892 INFO [Producer] 17
2017-03-22 22:07:45,894 INFO [Consumer-0] 17
2017-03-22 22:07:45,907 INFO [Consumer-2] 17
2017-03-22 22:07:45,910 INFO [Consumer-1] 17

可見,生產者每生產一次,所有的消費者就會去消費,如果想控制每次生產之后通知幾個消費者來消費,那么就可以使用notify方法,指定消費者執行緒個數,

代碼如下

import threading
import time
import logging
import random
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

class Producer_Consumer_Model:
    def __init__(self):
        self.data = https://www.cnblogs.com/CHLL55/archive/2020/10/06/None
        self.event = threading.Event()  # 用來控制消費者退出
        self.condition = threading.Condition()

    def Consumer(self):
        while not self.event.is_set():
            with self.condition:
                self.condition.wait()  # 一直等待直到收到生產者通知notify_all
                logging.info(self.data)  # 收到通知之后,開始執行消費者的業務邏輯部分

    def Producer(self):
        for _ in range(4):  # 每個生產者生產4次
            data = random.randint(0, 100)
            logging.info(data)
            with self.condition:
                self.data = data  # 寫入成功就表示生產成功,因此需要在此加鎖并且能夠通知消費者執行緒去消費,因此選擇使用condition來處理
                self.condition.notify(1)  # 生產成功之后通知所有的消費者去消費
            self.event.wait(1)  # 沒生產一次等待1s
        self.event.set()  # 所有的生產完成之后通知消費者退出

m = Producer_Consumer_Model()
for i in range(3):
    threading.Thread(target=m.Consumer, name='Consumer-{}'.format(i)).start()

p = threading.Thread(target=m.Producer, name='Producer')
p.start()

測驗結果(一個生產者,三個消費者,每次生產之后只通知一個消費者去消費)

2017-03-22 22:24:52,933 INFO [Producer] 11
2017-03-22 22:24:52,948 INFO [Consumer-0] 11
2017-03-22 22:24:53,949 INFO [Producer] 47
2017-03-22 22:24:53,967 INFO [Consumer-1] 47
2017-03-22 22:24:54,967 INFO [Producer] 14
2017-03-22 22:24:54,983 INFO [Consumer-2] 14
2017-03-22 22:24:55,986 INFO [Producer] 54
2017-03-22 22:24:55,993 INFO [Consumer-0] 54
  1. Condition 通常用于生產者消費者模式, 生產者生產訊息之后, 使用notify 或者 notify_all 通知消費者消費,
  2. 消費者使用wait方法阻塞等待生產者通知
  3. notify通知指定個wait的執行緒, notify_all通知所有的wait執行緒
  4. 無論notify/notify_all還是wait 都必須先acqurie, 完成后必須確保release, 通常使用with語法

Barrier

Barrier類存在于threading模塊中,中文可以翻譯成柵欄

>>> help(threading.Barrier)

可以看到Barrier的主要方法和屬性:

  • __init__(self, parties, action=None, timeout=None):初始化方法,創建一個Barrier
    • parties:所有參與的執行緒的數量
    • action:所有的執行緒都wait之后并且在執行緒釋放之前就會執行這個action函式,相當于集結之后要做的事情,
    • timeout:相當于給需要等待的每個執行緒的wait方法加上timeout引數,超時則barrier不再生效
  • abort(self):將Barrier設定成broken狀態
  • reset(self):將Barrier重置為最初狀態
  • wait(self, timeout=None):在Barrier前等待,回傳在Barrier前等待的下標,從0到parties-1
  • broken:如果Barrier處于broken狀態則回傳True
  • n_waiting:當前已經在Barrier處等待的執行緒的數量
  • parties:需要在Barrier處等待的執行緒的數量

示例代碼

import threading
import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

barrier = threading.Barrier(parties=3)

def worker(barrier: threading.Barrier):
    logging.info('waiting for barrier with {} others'.format(barrier.n_waiting))
    worker_id = barrier.wait()
    logging.info('after barrier {}'.format(worker_id))

for i in range(3):
    threading.Thread(target=worker, args=(barrier, ), name='worker-{}'.format(i)).start()

測驗結果

2017-03-22 23:25:03,992 INFO [worker-0] waiting for barrier with 0 others
2017-03-22 23:25:03,995 INFO [worker-1] waiting for barrier with 1 others
2017-03-22 23:25:03,998 INFO [worker-2] waiting for barrier with 2 others
2017-03-22 23:25:04,001 INFO [worker-2] after barrier 2
2017-03-22 23:25:04,001 INFO [worker-0] after barrier 0
2017-03-22 23:25:04,001 INFO [worker-1] after barrier 1

可見,所有的執行緒都會一直等待,知道所有的執行緒都到期了,然后就通過barrier,繼續執行后續操作,

Barrier會建立一個控制點,所有參與的執行緒都會阻塞,直到所有參與的“各方”達到這一點, 它讓執行緒分開啟動,然后暫停,直到它們都準備好再繼續,因此,這一點可以理解為各個執行緒的一個集結點,

abort函式的使用

將Barrier設定成broken狀態,所有執行緒在參與集結程序中,只要執行了barrier.abort方法,那么正在等待的執行緒都會拋出threading.BrokenBarrierError例外,可以理解為,只要有一個執行緒確定已經到不了Barrier并且通知了Barrier,那么Barrier就會執行abort方法,通知所有正在wait的執行緒放棄集結,

實體代碼

import threading
import logging

logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

def worker(barrier: threading.Barrier):
    logging.info('waiting for barrier with {} others'.format(barrier.n_waiting))
    try:
        worker_id = barrier.wait()
    except threading.BrokenBarrierError:
        logging.info('aborting')
    else:
        logging.info('after barrier {}'.format(worker_id))

barrier = threading.Barrier(4)  # 需要等待4個執行緒
for i in range(3):
    threading.Thread(target=worker, args=(barrier, ), name='worker-{}'.format(i)).start()  # 3個執行緒都開始wait
barrier.abort()  # 還有一個執行緒沒有到wait,此時執行abort方法,則所有正在wait的執行緒都拋出例外

測驗結果

2017-03-22 23:47:43,184 INFO [worker-0] waiting for barrier with 0 others
2017-03-22 23:47:43,192 INFO [worker-1] waiting for barrier with 1 others
2017-03-22 23:47:43,201 INFO [worker-2] waiting for barrier with 2 others
2017-03-22 23:47:43,211 INFO [worker-2] aborting
2017-03-22 23:47:43,207 INFO [worker-1] aborting
2017-03-22 23:47:43,207 INFO [worker-0] aborting

Semaphore

Semaphore類存在于threading模塊中

help(threading.Semaphore)

信號量內部管理者一個計數器,這個計數器的值等于release()方法呼叫的次數減去acquire()方法呼叫的次數然后再加上初始值value,value默認為1,

可以看到Semaphore的主要方法:

  • __init__(self, value=https://www.cnblogs.com/CHLL55/archive/2020/10/06/1):初始化一個信號量,value為內部計數器賦初值,默認為1
  • acquire(self, blocking=True, timeout=None):獲取信號量,內部計數器減一
  • release(self):釋放信號量,內部計數器加一

示例代碼

import threading
import time
import logging
import random
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s [%(threadName)s] %(message)s')

class Pool:
    def __init__(self, num):
        self.num = num
        self.conns = [self._make_connect(i) for i in range(num)]  # 存放連接
        self.sem = threading.Semaphore(num)  # 信號量內部計數器初始為連接數

    def _make_connect(self, name):  # 根據連接名稱創建連接
        conn = 'connect-{}'.format(name)
        return conn

    def get_connect(self):  # 從連接池獲取連接
        self.sem.acquire()
        return self.conns.pop()

    def return_connect(self, conn):  # 將連接conn返還到連接池中
        self.conns.insert(0, conn)
        self.sem.release()

def worker(pool: Pool):
    logging.info('starting')
    conn = pool.get_connect()  # 如果獲取不到則會阻塞在acquire處
    logging.info('get connect {}'.format(conn))
    t = random.randint(1, 3)
    time.sleep(t)
    logging.info('takes {}s'.format(t))
    pool.return_connect(conn)
    logging.info('return connect {}'.format(conn))

pool = Pool(2)  # 連接池中有兩個連接可以使用
for i in range(3):  # 三個執行緒使用兩個連接開始任務
    threading.Thread(target=worker, args=(pool, ), name='worker-{}'.format(i)).start()

測驗結果

2017-03-23 00:54:36,056 INFO [worker-0] starting
2017-03-23 00:54:36,062 INFO [worker-0] get connect connect-1
2017-03-23 00:54:36,074 INFO [worker-1] starting
2017-03-23 00:54:36,079 INFO [worker-1] get connect connect-0
2017-03-23 00:54:36,089 INFO [worker-2] starting
2017-03-23 00:54:39,074 INFO [worker-0] takes 3s
2017-03-23 00:54:39,076 INFO [worker-0] return connect connect-1
2017-03-23 00:54:39,076 INFO [worker-2] get connect connect-1
2017-03-23 00:54:39,093 INFO [worker-1] takes 3s
2017-03-23 00:54:39,097 INFO [worker-1] return connect connect-0
2017-03-23 00:54:40,093 INFO [worker-2] takes 1s
2017-03-23 00:54:40,107 INFO [worker-2] return connect connect-1

這個測驗結果顯示:三個執行緒獲取連接池中的兩個連接,結果出現了一個執行緒等待其他執行緒執行完成之后再獲取連接的程序,

Queue

Condition執行緒同步部分用來傳遞資料的是一個封裝在生產者消費者模型中的元素data(正常使用情況下一般封裝的都是一個串列,類似與Barrier部分的連接池中的conns串列),

Python的queue模塊中提供了同步的、執行緒安全的佇列類,包括三種佇列:

  • FIFO(先入先出)佇列Queue
  • LIFO(后入先出)佇列LifoQueue
  • 優先級佇列PriorityQueue

這些佇列都實作了鎖原語,能夠在多執行緒中直接使用,可以使用佇列來實作執行緒間的同步,因此我們可以使用queue模塊來替換掉生產者消費者中的全域元素,代碼如下:

import random
import queue
import threading

class Producer_Consumer_Model:
    def __init__(self):
        self.q = queue.Queue()
        self.event = threading.Event()

    def Consumer(self):
        while not self.event.is_set():
            logging.info(self.q.get())

    def Producer(self):
        while not self.event.wait(3):
            data = https://www.cnblogs.com/CHLL55/archive/2020/10/06/random.randint(1, 100)
            logging.info(data)
            self.q.put(data)

m = Producer_Consumer_Model()
threading.Thread(target=m.Consumer, name='Consumer').start()
threading.Thread(target=m.Producer, name='Producer').start()

測驗結果

2017-03-23 10:11:22,990 INFO [Producer] 26
2017-03-23 10:11:22,993 INFO [Consumer] 26
2017-03-23 10:11:25,993 INFO [Producer] 89
2017-03-23 10:11:26,003 INFO [Consumer] 89
2017-03-23 10:11:29,004 INFO [Producer] 14
2017-03-23 10:11:29,006 INFO [Consumer] 14
2017-03-23 10:11:32,007 INFO [Producer] 17
2017-03-23 10:11:32,009 INFO [Consumer] 17

每生產一次,消費者就會消費一次,當消費者執行緒,讀取Queue則呼叫Queue.get()方法,若Queue為空時消費者執行緒獲取不到內容,就會阻塞在這里,直到成功獲取內容,

執行緒同步總結

  • Event:主要用于執行緒之間的事件通知
  • Lock,Rlock:主要用于保護共享資源
  • Condition:主要用于生產者消費者模型,可以理解為Event和Lock的結合體
  • Barrier:同步指定個等待的執行緒
  • Semaphore:主要用于保護資源,和Lock的區別在于可以多個執行緒訪問共享資源,而鎖一次只能一個執行緒訪問到共享資源,即鎖是value=https://www.cnblogs.com/CHLL55/archive/2020/10/06/1的信號量
  • Queue:使用FIFO佇列進行同步,適用于生產者消費者模型

GIL

GIL(Global Interpreter Lock):全域解釋器鎖

Python代碼的執行由Python 主回圈來控制,Python 在設計之初就考慮到要在解釋器的主回圈中,同時只有一個執行緒在執行,即在任意時刻,只有一個執行緒在解釋器中運行,對Python 主回圈的訪問由全域解釋器鎖(GIL)來控制,正是這個鎖能保證同一時刻只有一個執行緒在運行,

因此Python多執行緒程式的執行順序如下:

  1. 設定GIL
  2. 切換到一個執行緒去運行
  3. 運行
  4. 結束執行緒
  5. 解鎖GIL
  6. 重復以上步驟

因此,Python的多執行緒并沒有實作并行,只是實作了并發而已,如果要實作真正的并行,那就需要使用Python的多行程模塊multiprocessing(multiprocessing模塊的宗旨是像管理執行緒一樣來管理行程),


參考資料

  1. threading — Manage Concurrent Operations Within a Process
  2. Python執行緒同步機制: Locks, RLocks, Semaphores, Conditions, Events和Queues

記得幫我點贊哦!

精心整理了計算機各個方向的從入門、進階、實戰的視頻課程和電子書,按照目錄合理分類,總能找到你需要的學習資料,還在等什么?快去關注下載吧!!!

resource-introduce

念念不忘,必有回響,小伙伴們幫我點個贊吧,非常感謝,

我是職場亮哥,YY高級軟體工程師、四年作業經驗,拒絕咸魚爭當龍頭的斜杠程式員,

聽我說,進步多,程式人生一把梭

如果有幸能幫到你,請幫我點個【贊】,給個關注,如果能順帶評論給個鼓勵,將不勝感激,

職場亮哥文章串列:更多文章

wechat-platform-guide-attention

本人所有文章、回答都與著作權保護平臺有合作,著作權歸職場亮哥所有,未經授權,轉載必究!

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/159499.html

標籤:其他

上一篇:DateTimePicker1,DateTimePicker2,如何得到下個月的第一天和最后一天?

下一篇:基于Python通過OpenCV實作的口罩識別系統操作篇

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more