1. threading行程中管理并發操作
threading模塊提供了管理多個執行緒執行的API,允許程式在同一個行程空間并發的運行多個操作,
1.1 Thread物件
要使用Thread,最簡單的方法就是用一個目標函式實體化一個Thread物件,并呼叫start()讓它開始作業,
import threading def worker(): """thread worker function""" print('Worker') threads = [] for i in range(5): t = threading.Thread(target=worker) threads.append(t) t.start()
輸出有5行,每一行都是"Worker",

如果能夠創建一個執行緒,并向它傳遞引數告訴它要完成什么作業,那么這會很有用,任何型別的物件都可以作為引數傳遞到執行緒,下面的例子傳遞了一個數,執行緒將列印出這個數,
import threading def worker(num): """thread worker function""" print('Worker: %s' % num) threads = [] for i in range(5): t = threading.Thread(target=worker, args=(i,)) threads.append(t) t.start()
現在這個整數引數會包含在各執行緒列印的訊息中,

1.2 確定當前執行緒
使用引數來標識或命名執行緒很麻煩,也沒有必要,每個Thread實體都有一個帶有默認值的名,該默認值可以在創建執行緒時改變,如果服務器行程中有多個服務執行緒處理不同的操作,那么在這樣的服務器行程中,對執行緒命名就很有用,
import threading import time def worker(): print(threading.current_thread().getName(), 'Starting') time.sleep(0.2) print(threading.current_thread().getName(), 'Exiting') def my_service(): print(threading.current_thread().getName(), 'Starting') time.sleep(0.3) print(threading.current_thread().getName(), 'Exiting') t = threading.Thread(name='my_service', target=my_service) w = threading.Thread(name='worker', target=worker) w2 = threading.Thread(target=worker) # use default name w.start() w2.start() t.start()
除錯輸出的每一行中包含有當前執行緒的名,執行緒名列中有"Thread-1"的行對應未命名的執行緒w2,

大多數程式并不使用print來進行除錯,logging模塊支持將執行緒名嵌入到各個日志訊息中(使用格式化代碼%(threadName)s),通過把執行緒名包含在日志訊息中,就能跟蹤這些訊息的來源,
import logging import threading import time def worker(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def my_service(): logging.debug('Starting') time.sleep(0.3) logging.debug('Exiting') logging.basicConfig( level=logging.DEBUG, format='[%(levelname)s] (%(threadName)-10s) %(message)s', ) t = threading.Thread(name='my_service', target=my_service) w = threading.Thread(name='worker', target=worker) w2 = threading.Thread(target=worker) # use default name w.start() w2.start() t.start()
而且logging是執行緒安全的,所以來自不同執行緒的訊息在輸出中會有所區分,

1.3 守護與非守護執行緒
到目前為止,示例程式都在隱式地等待所有執行緒完成作業之后才退出,不過,程式有
時會創建一個執行緒作為守護執行緒(daemon),這個執行緒可以一直運行而不阻塞主程式退出,
如果一個服務不能很容易地中斷執行緒,或者即使讓執行緒作業到一半時中止也不會造成資料
損失或破壞(例如,為一個服務監控工具生成“心跳”的執行緒),那么對于這些服務,使用
守護執行緒就很有用,要標志一個執行緒為守護執行緒,構造執行緒時便要傳入daemon=True或
者要呼叫它的setDaemon()方法并提供引數True,默認情況下執行緒不作為守護執行緒,
import threading import time import logging def daemon(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def non_daemon(): logging.debug('Starting') logging.debug('Exiting') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) d = threading.Thread(name='daemon', target=daemon, daemon=True) t = threading.Thread(name='non-daemon', target=non_daemon) d.start() t.start()
這個代碼的輸出中不包含守護執行緒的“Exiting“訊息,因為在從sleep()呼叫喚醒
守護執行緒之前,所有非守護執行緒(包括主執行緒)已經退出,

要等待一個守護執行緒完成作業,需要使用join()方法,
import threading import time import logging def daemon(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def non_daemon(): logging.debug('Starting') logging.debug('Exiting') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) d = threading.Thread(name='daemon', target=daemon, daemon=True) t = threading.Thread(name='non-daemon', target=non_daemon) d.start() t.start() d.join() t.join()
使用join()等待守護執行緒退出意味著它有機會生成它的"Exiting"訊息,

默認地,join()會無限阻塞,或者,還可以傳入一個浮點值,表示等待執行緒在多長
時間(秒數)后變為不活動,即使執行緒在這個時間段內未完成,join()也會回傳,
import threading import time import logging def daemon(): logging.debug('Starting') time.sleep(0.2) logging.debug('Exiting') def non_daemon(): logging.debug('Starting') logging.debug('Exiting') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) d = threading.Thread(name='daemon', target=daemon, daemon=True) t = threading.Thread(name='non-daemon', target=non_daemon) d.start() t.start() d.join(0.1) print('d.isAlive()', d.isAlive()) t.join()
由于傳人的超時時間小于守護執行緒睡眠的時間,所以join()回傳之后這個執行緒仍是"活著",

1.4 列舉所有執行緒
沒有必要為所有守護執行緒維護一個顯示句柄來確保它們在退出主行程之前已經完成,
enumerate()會回傳活動 Thread實體的一個串列,這個串列也包括當前執行緒,由于等
待當前執行緒終止(join)會引入一種死鎖情況,所以必須跳過,
import random import threading import time import logging def worker(): """thread worker function""" pause = random.randint(1, 5) / 10 logging.debug('sleeping %0.2f', pause) time.sleep(pause) logging.debug('ending') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) for i in range(3): t = threading.Thread(target=worker, daemon=True) t.start() main_thread = threading.main_thread() for t in threading.enumerate(): if t is main_thread: continue logging.debug('joining %s', t.getName()) t.join()
由于作業執行緒睡眠的時間量是隨機的,所以這個程式的輸出可能有變化,

1.5 派生執行緒
開始時,Thread要完成一些基本初始化,然后呼叫其run()方法,這會呼叫傳遞到建構式的目標函式,要創建Thread的一個子類,需要覆寫run()來完成所需的作業,
import threading import logging class MyThread(threading.Thread): def run(self): logging.debug('running') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) for i in range(5): t = MyThread() t.start()
run()的回傳值將被忽略,

由于傳遞到Thread建構式的args和kwargs值保存在私有變數中(這些變數名都有前綴),所以不能很容易地從子類訪問這些值,要向一個定制的執行緒型別傳遞引數,需要重新定義建構式,將這些值保存在子類可見的一個實體屬性中,
import threading import logging class MyThreadWithArgs(threading.Thread): def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None): super().__init__(group=group, target=target, name=name, daemon=daemon) self.args = args self.kwargs = kwargs def run(self): logging.debug('running with %s and %s', self.args, self.kwargs) logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) for i in range(5): t = MyThreadWithArgs(args=(i,), kwargs={'a': 'A', 'b': 'B'}) t.start()
MyThreadwithArgs使用的API與Thread相同,不過類似于其他定制類,這個類可以輕松地修改建構式方法,以取得更多引數或者與執行緒用途更直接相關的不同引數,

1.6 定時器執行緒
有時出于某種原因需要派生Thread,Timer就是這樣一個例子,Timer也包含在threading中,Timer在一個延遲之后開始作業,而且可以在這個延遲期間內的任意時刻被取消,
import threading import time import logging def delayed(): logging.debug('worker running') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) t1 = threading.Timer(0.3, delayed) t1.setName('t1') t2 = threading.Timer(0.3, delayed) t2.setName('t2') logging.debug('starting timers') t1.start() t2.start() logging.debug('waiting before canceling %s', t2.getName()) time.sleep(0.2) logging.debug('canceling %s', t2.getName()) t2.cancel() logging.debug('done')
這個例子中,第二個定時器永遠不會運行,看起來第一個定時器在主程式的其余部分完成之后還會運行,由于這不是一個守護執行緒,所以在主執行緒完成時其會隱式退出,

1.7 執行緒間傳送信號
盡管使用多執行緒的目的是并發地運行單獨的操作,但有時也需要在兩個或多個執行緒中同步操作,事件物件是實作執行緒間安全通信的一種簡單方法,Event管理一個內部標志,呼叫者可以用set()和clear()方法控制這個標志,其他執行緒可以使用wait()暫停,直到這個標志被設定,可有效地阻塞行程直至允許這些執行緒繼續,
import logging import threading import time def wait_for_event(e): """Wait for the event to be set before doing anything""" logging.debug('wait_for_event starting') event_is_set = e.wait() logging.debug('event set: %s', event_is_set) def wait_for_event_timeout(e, t): """Wait t seconds and then timeout""" while not e.is_set(): logging.debug('wait_for_event_timeout starting') event_is_set = e.wait(t) logging.debug('event set: %s', event_is_set) if event_is_set: logging.debug('processing event') else: logging.debug('doing other work') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) e = threading.Event() t1 = threading.Thread( name='block', target=wait_for_event, args=(e,), ) t1.start() t2 = threading.Thread( name='nonblock', target=wait_for_event_timeout, args=(e, 2), ) t2.start() logging.debug('Waiting before calling Event.set()') time.sleep(0.3) e.set() logging.debug('Event is set')
wait()方法取一個引數,表示等待事件的時間(秒數),達到這個時間后就超時,它
會回傳一個布林值,指示事件是否已設定,使呼叫者知道wait()為什么回傳,可以對事
件單獨地使用is_set()方法而不必擔心阻塞,
在這個例子中,wait_for_event_timeout()將檢查事件狀態而不會無限阻塞,wait_for_event()在wait()呼叫的位置阻塞,事件狀態改變之前它不會回傳,

1.8 控制資源訪問
除了同步執行緒操作,還有一點很重要,要能夠控制對共享資源的訪問,從而避免破壞或丟失資料,Python的內置資料結構(串列、字典等)是執行緒安全的,這是Python使用原子位元組碼來管理這些資料結構的一個副作用(更新程序中不會釋放保護Python內部資料結構的全域解釋器鎖GIL(Global Interpreter Lock)),Python中實作的其他資料結構或更簡單的型別(如整數和浮點數)則沒有這個保護,要保證同時安全地訪問一個物件,可以使用一個Lock物件,
import logging import random import threading import time class Counter: def __init__(self, start=0): self.lock = threading.Lock() self.value = start def increment(self): logging.debug('Waiting for lock') self.lock.acquire() try: logging.debug('Acquired lock') self.value = self.value + 1 finally: self.lock.release() def worker(c): for i in range(2): pause = random.random() logging.debug('Sleeping %0.02f', pause) time.sleep(pause) c.increment() logging.debug('Done') logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) counter = Counter() for i in range(2): t = threading.Thread(target=worker, args=(counter,)) t.start() logging.debug('Waiting for worker threads') main_thread = threading.main_thread() for t in threading.enumerate(): if t is not main_thread: t.join() logging.debug('Counter: %d', counter.value)
在這個例子中,worker()函式使一個Counter實體遞增,這個實體管理著一個Lock,以避免兩個執行緒同時改變其內部狀態,如果沒有使用Lock,就有可能丟失一次對value屬性的修改,

要確定是否有另一個執行緒請求這個鎖而不影響當前執行緒,可以向acquire()的blocking引數傳入False,在下一個例子中,worker()想要分別得到3次鎖,并統計為得到鎖而嘗試的次數,與此同時,lock_holder()在占有和釋放鎖之間回圈,每個狀態會短暫暫停,以模擬負載情況,
import logging import threading import time def lock_holder(lock): logging.debug('Starting') while True: lock.acquire() try: logging.debug('Holding') time.sleep(0.5) finally: logging.debug('Not holding') lock.release() time.sleep(0.5) def worker(lock): logging.debug('Starting') num_tries = 0 num_acquires = 0 while num_acquires < 3: time.sleep(0.5) logging.debug('Trying to acquire') have_it = lock.acquire(0) try: num_tries += 1 if have_it: logging.debug('Iteration %d: Acquired', num_tries) num_acquires += 1 else: logging.debug('Iteration %d: Not acquired', num_tries) finally: if have_it: lock.release() logging.debug('Done after %d iterations', num_tries) logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) lock = threading.Lock() holder = threading.Thread( target=lock_holder, args=(lock,), name='LockHolder', daemon=True, ) holder.start() worker = threading.Thread( target=worker, args=(lock,), name='Worker', ) worker.start()
worker()需要超過3次迭代才能得到3次鎖,

1.8.1 再入鎖
正常的Lock物件不能請求多次,即使是由同一個執行緒請求也不例外,如果同一個呼叫鏈中的多個函式訪問一個鎖,則可能會產生我們不希望的副作用,
import threading lock = threading.Lock() print('First try :', lock.acquire()) print('Second try:', lock.acquire(0))
在這里,對第二個acquire()呼叫給定超時值為0,以避免阻塞,因為鎖已經被第一個呼叫獲得,

如果同一個執行緒的不同代碼需要"重新獲得"鎖,那么在這種情況下要使用RLock,
import threading lock = threading.RLock() print('First try :', lock.acquire()) print('Second try:', lock.acquire(0))
與前面的例子相比,對代碼唯一的修改就是用RLock替換Lock,

1.8.2 鎖作為背景關系管理器
鎖實作了背景關系管理器API,并與with陳述句兼容,使用with則不再需要顯式地獲得和釋放鎖,
import threading import logging def worker_with(lock): with lock: logging.debug('Lock acquired via with') def worker_no_with(lock): lock.acquire() try: logging.debug('Lock acquired directly') finally: lock.release() logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) lock = threading.Lock() w = threading.Thread(target=worker_with, args=(lock,)) nw = threading.Thread(target=worker_no_with, args=(lock,)) w.start() nw.start()
函式worker_with()和worker_no_with()用等價的方式管理鎖,

1.9 同步執行緒
除了使用Event,還可以通過使用一個Condition物件來同步執行緒,由于Condition使用了一個Lock,所以它可以系結到一個共享資源,允許多個執行緒等待資源更新,在下一個例子中,consumer()執行緒要等待設定了Condition才能繼續,producer()執行緒負責設定條件,以及通知其他執行緒繼續,
import logging import threading import time def consumer(cond): """wait for the condition and use the resource""" logging.debug('Starting consumer thread') with cond: cond.wait() logging.debug('Resource is available to consumer') def producer(cond): """set up the resource to be used by the consumer""" logging.debug('Starting producer thread') with cond: logging.debug('Making resource available') cond.notifyAll() logging.basicConfig( level=logging.DEBUG, format='%(asctime)s (%(threadName)-2s) %(message)s', ) condition = threading.Condition() c1 = threading.Thread(name='c1', target=consumer, args=(condition,)) c2 = threading.Thread(name='c2', target=consumer, args=(condition,)) p = threading.Thread(name='p', target=producer, args=(condition,)) c1.start() time.sleep(0.2) c2.start() time.sleep(0.2) p.start()
這些執行緒使用with來獲得與Condition關聯的鎖,也可以顯式地使用acquire()和release()方法,

屏障(barrier)是另一種執行緒同步機制,Barrier會建立一個控制點,所有參與執行緒會在這里阻塞,直到所有這些參與“方”都到達這一點,采用這種方法,執行緒可以單獨啟動然后暫停,直到所有執行緒都準備好才可以繼續,
import threading import time def worker(barrier): print(threading.current_thread().name, 'waiting for barrier with {} others'.format( barrier.n_waiting)) worker_id = barrier.wait() print(threading.current_thread().name, 'after barrier', worker_id) NUM_THREADS = 3 barrier = threading.Barrier(NUM_THREADS) threads = [ threading.Thread( name='worker-%s' % i, target=worker, args=(barrier,), ) for i in range(NUM_THREADS) ] for t in threads: print(t.name, 'starting') t.start() time.sleep(0.1) for t in threads: t.join()
在這個例子中,Barrier被配置為會阻塞執行緒,直到3個執行緒都在等待,滿足這個條件時,所有執行緒被同時釋放從而越過這個控制點,wait()的回傳值指示了釋放的參與執行緒數,可以用來限制一些執行緒做清理資源等動作,

Barrier的abort()方法會使所有等待執行緒接收一個BrokenBarrierError,如果執行緒在wait()上被阻塞而停止處理,這就允許執行緒完成清理作業,
import threading import time def worker(barrier): print(threading.current_thread().name, 'waiting for barrier with {} others'.format( barrier.n_waiting)) try: worker_id = barrier.wait() except threading.BrokenBarrierError: print(threading.current_thread().name, 'aborting') else: print(threading.current_thread().name, 'after barrier', worker_id) NUM_THREADS = 3 barrier = threading.Barrier(NUM_THREADS + 1) threads = [ threading.Thread( name='worker-%s' % i, target=worker, args=(barrier,), ) for i in range(NUM_THREADS) ] for t in threads: print(t.name, 'starting') t.start() time.sleep(0.1) barrier.abort() for t in threads: t.join()
這個例子將Barrier配置為多加一個執行緒,即需要比實際啟動的執行緒再多一個參與執行緒,所以所有執行緒中的處理都會阻塞,在被阻塞的各個執行緒中,abort()呼叫會產生一個例外,

1.10 限制資源的并發訪問
有時可能需要允許多個作業執行緒同時訪問一個資源,但要限制總數,例如,連接池支持同時連接,但數目可能是固定的,或者一個網路應用可能支持固定數目的并發下載,這些連接就可以使用Semaphore來管理,
import logging import threading import time class ActivePool: def __init__(self): super(ActivePool, self).__init__() self.active = [] self.lock = threading.Lock() def makeActive(self, name): with self.lock: self.active.append(name) logging.debug('Running: %s', self.active) def makeInactive(self, name): with self.lock: self.active.remove(name) logging.debug('Running: %s', self.active) def worker(s, pool): logging.debug('Waiting to join the pool') with s: name = threading.current_thread().getName() pool.makeActive(name) time.sleep(0.1) pool.makeInactive(name) logging.basicConfig( level=logging.DEBUG, format='%(asctime)s (%(threadName)-2s) %(message)s', ) pool = ActivePool() s = threading.Semaphore(2) for i in range(4): t = threading.Thread( target=worker, name=str(i), args=(s, pool), ) t.start()
在這個例子中,ActivePool類只作為一種便利方法,用來跟蹤某個給定時刻哪些執行緒能夠運行,真正的資源池會為新的活動執行緒分配一個連接或另外某個值,并且當這個執行緒作業完成時再回收這個值,在這里,資源池只是用來保存活動執行緒的名,以顯示至少有兩個執行緒在并發運行,

1.11 執行緒特定的資料
有些資源需要鎖定以便多個執行緒使用,另外一些資源則需要保護,以使它們對并非是這些資源的“所有者”的執行緒隱藏,local()函式會創建一個物件,它能夠隱藏值,使其在不同執行緒中無法被看到,
import random import threading import logging def show_value(data): try: val = data.value except AttributeError: logging.debug('No value yet') else: logging.debug('value=https://www.cnblogs.com/liuhui0308/p/%s', val) def worker(data): show_value(data) data.value = random.randint(1, 100) show_value(data) logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) local_data = threading.local() show_value(local_data) local_data.value = 1000 show_value(local_data) for i in range(2): t = threading.Thread(target=worker, args=(local_data,)) t.start()
屬性local_data.value對所有執行緒都不可見,除非在某個執行緒中設定了這個屬性,這個執行緒才能看到它,

要初始化設定以使所有執行緒在開始時都有相同的值,可以使用一個子類,并在_init_()中設定這些屬性,
import random import threading import logging def show_value(data): try: val = data.value except AttributeError: logging.debug('No value yet') else: logging.debug('value=https://www.cnblogs.com/liuhui0308/p/%s', val) def worker(data): show_value(data) data.value = random.randint(1, 100) show_value(data) class MyLocal(threading.local): def __init__(self, value): super().__init__() logging.debug('Initializing %r', self) self.value = value logging.basicConfig( level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', ) local_data = MyLocal(1000) show_value(local_data) for i in range(2): t = threading.Thread(target=worker, args=(local_data,)) t.start()
這會在相同的物件上呼叫_init_()(注意id()值),每個執行緒中呼叫一次以設定默認值,

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/162481.html
標籤:Python
上一篇:python的自學之路2
