<style>td.subtitle { background-color: rgba(251, 251, 254, 1) } td.name { font-family: "Consolas" } td.center { text-align: center } td.name span { font-size: 13px; font-style: italic } td.name p { line-height: 2ex } td.example { font-family: "Consolas"; color: rgba(96, 96, 96, 1); font-size: 12px } td.example p { line-height: 2ex } td.example p span { color: rgba(176, 176, 176, 1); font-size: 12px } td.example a.advc { color: rgba(96, 96, 96, 1) } a.advc { border-bottom: 1px solid rgba(128, 128, 128, 1); text-decoration: none } a.subcata { border-bottom: 1px solid rgba(128, 128, 128, 1) } p.subcata { line-height: 30px; font-size: 18px } h2.dscp span { font-style: italic } .codebox { display: table-cell; width: 800px; padding-left: 20px; padding-right: 20px } .codebox span { color: rgba(144, 144, 144, 1) } .codebox i { fontsize: 8px } a.return { color: rgba(187, 187, 187, 1); font-size: 0.8em } code { font-family: "Courier New", Courier, monospace; font-size: 0.9em; line-height: 1.8em; border: 1px solid rgba(176, 176, 176, 1); border-radius: 2px; background: rgba(248, 248, 248, 1); padding: 2px; margin: 0 4px; vertical-align: middle }</style>
回傳目錄
本篇索引
(1)執行緒基本概念
(2)threading模塊
(3)執行緒間同步原語資源
(4)queue
(1)執行緒基本概念
當應用程式需要并發執行多個任務時,可以使用執行緒,多個執行緒(thread)同時運行在一個行程(process)的內部, 它們可以共享訪問本行程內的全域變數資料和資源,各個執行緒之間的調度由作業系統負責, 具體做法是:給每個執行緒分配一個小的時間片,并在所有的執行緒之間回圈切換,在具有多核的CPU上, 作業系統有時會安排盡可能使用每個CPU,從而并行執行執行緒,
并發編程的復雜性在于,多個執行緒可能同時更新一個資料,導致資料的損壞或不一致(術語叫做:競爭), 要解決這個問題,必須使用互斥鎖或其他類似的同步手段保護這些資料,
Python解釋器使用了內部的GIL(Global Interpreter Lock,全域解釋器鎖), 這限制了Python程式只能在一個處理器上運行,如果一個應用程式的大部分是I/O密集型的, 那么使用執行緒是沒有問題的,而如果是CPU密集型的,那使用多個執行緒沒任何好處,還會降低運行速度, 因此對于計算密集型的任務,最好使用C擴展模塊或multiprocessing模塊來代替, C擴展具有釋放解釋器鎖和并行運行的選項,前提是釋放鎖時不與解釋器進行互動, multiprocessing模塊將作業分派給不受鎖限制的單獨子行程,
使用多執行緒編程還要注意一個問題:開的執行緒的數量級不能太大,例如,一臺使用執行緒的網路服務器對于100個執行緒作業情況良好, 但如果增加到10000個執行緒,性能就會變得非常糟糕,因為每個執行緒都需要有自己的系統資源, 而且還會產生執行緒背景關系切換、鎖和其他相關開銷,算下來是個不小的開銷, 對這種I/O密集型應用,比較常見的做法是將程式撰寫為:異步事件處理機制的結構,比如“協程”, 使用異步和協程的方式編程,可以比較輕松地處理諸如10000個連接的情況,
沒有任何方法可以強制終止或掛起其他執行緒!這是設計上的原因,因此,執行緒只能自己掛起或自己終止,
(2)threading模塊
threading模塊提供Thread類和各種同步原語,用于撰寫多執行緒的程式, threading模塊可創建:Thread物件、Timer物件、 Lock物件、RLock物件等,
● Thread物件
Thread類用于表示單獨的控制執行緒,創建新執行緒的語法如下:
Thread(group=None, target=None, name=None, args=(), kwargs={})
此函式創建一個新的執行緒實體,target是一個可呼叫物件(執行緒啟動時,run()方法將呼叫此物件), name是執行緒名稱,默認將創建一個名為 “Target-N” 格式的唯一名稱, args、kwargs是傳遞給target函式的引數元組、引數字典,
Thread實體支持以下屬性和方法
| 屬性或方法 | 型別 | 說明 |
|---|---|---|
name |
屬性 | 執行緒名稱,這個字串用于唯一標識執行緒, |
ident |
屬性 | 整數執行緒識別符號,如果執行緒尚未啟動,它的值為None, |
daemon |
屬性 | 布林值,True表示為后臺執行緒,它的初始值從創建執行緒的執行緒繼承而來, 主執行緒(控制執行緒)不是后臺執行緒(主執行緒的daemon為 False), 通常 Python 解釋器退出之前,會等待所有執行緒終止,但不會等待daemon為 True的執行緒, 當主執行緒結束后,如果其他執行緒的的daemon都為True,Python解釋器將不等待那些執行緒, 整個Python程式將即時退出, |
| t.start() | 方法 | 在主執行緒中,呼叫此方法啟動執行緒, |
| t.run() | 方法 | 執行緒啟動時,將自動呼叫此方法,它將呼叫先前創建執行緒物件時,由target指定的目標函式, 可以在Thread的子類中,重新定義此方法, |
| t.join([timeout]) | 方法 | 在主執行緒中呼叫此方法,功能是等待直到執行緒實體 t 終止或超時為止,timeout是一個浮點數, 單位為“秒”,在執行緒啟動之前不能呼叫此函式,否則會報錯, |
| t.is_alive() | 方法 | 如果執行緒是活動的,回傳True,否則回傳False,從start()方法回傳的那一刻開始, 執行緒就是活動的,直到它的run()方法終止為止, |
● 執行緒使用實體
通常 Python 解釋器退出之前,會等待所有執行緒終止,但是,若將創建的執行緒的daemon設定為 True, 會使解釋器在主執行緒結束后立即退出程式,這些daemon為 True的執行緒將即時被銷毀,
下例中,執行緒 t 每5秒從后臺激活激活運行一次;30秒后,主執行緒結束,整個Python程式退出:
import threading
import time
def clock(interval):
while True:
print("The time is %s" % time.ctime())
time.sleep(interval)
t = threading.Thread(target=clock, args=(5,))
t.daemon = True
t.start()
time.sleep(30)
下例為將同一個執行緒定義為一個 Thread 的子類:
import threading
import time
class ClockThread(threading.Thread):
def __init__(self, interval):
threading.Thread.__init__(self)
self.daemon = True
self.interval = interval
def run(self):
while True:
print("The time is %s" % time.ctime())
time.sleep(self.interval)
t = ClockThread(5)
t.start()
time.sleep(30)
● Timer物件
Timer物件用于在稍后某個時間執行一個函式,呼叫語法如下:
Timer(interval, func [,args [,kwargs]])
此函式創建定時器物件,在interval秒之后運行函式func, args、kwargs是傳遞給func函式的引數元組、引數字典,
Timer實體支持以下方法:
| 屬性或方法 | 型別 | 說明 |
|---|---|---|
| t.start() | 方法 | 啟動定時器,func函式將在指定間隔時間后執行, |
| t.cancel() | 方法 | 如果函式尚未執行,取消定時器, |
● 實用工具函式
| 函式 | 說明 |
|---|---|
| active_count() | 回傳當前活動的Thread物件數量, |
| current_thread() | 回傳呼叫者的執行緒物件, |
| enumerate() | 列出當前所有活動的Thread物件, |
| local() | 回傳local物件,用于保存執行緒本地的資料,應該保證此物件在每個執行緒中是唯一的, |
| setprofile(func) | 設定一個組態檔函式,用于已創建的所有執行緒, func在每個執行緒開始運行之前被傳遞給 sys.setprofile()函式, |
| settrace(func) | 設定一個跟蹤函式,用于已創建的所有執行緒, func在每個執行緒開始運行前被傳遞給 sys.settrace()函式, |
| stack_size([size]) | 回傳創建新執行緒時使用的堆疊大小,可選整數引數size表示創建新執行緒時使用的堆疊大小, size的值可以是 32768(32 KB) 或更大,而且是 4096 的倍數,如果系統上不支持此操作, 將引發 ThreadError 例外, |
(3)執行緒間同步原語資源
● 鎖(Lock)
鎖(或稱為“互斥鎖”)有2個狀態:已鎖定、未鎖定,如果鎖處于已鎖定狀態,嘗試獲取鎖的執行緒將被阻塞, 直到鎖被釋放為止,如果有多個執行緒等待獲取鎖,當鎖被釋放時,只有一個執行緒能獲得它,具體哪個執行緒隨機, 創建鎖的語法如下,初始狀態為“未鎖定”:
threading.Lock()
Lock實體支持以下方法
| 實體方法 | 說明 |
|---|---|
| lock.acquire([blocking]) | 獲取鎖,成功則回傳 True,blocking引數默認為 True,當鎖為已鎖定時,則阻塞本執行緒, 若blocking設為 False,當無法獲取鎖時,將立即回傳 False,不阻塞, |
| lock.realease() | 釋放一個鎖,當鎖處于“未鎖定”狀態時、或從與原本呼叫acquire()方法的執行緒不同的執行緒呼叫此方法, 將出現錯誤, |
● 可重入鎖(RLock)
可重入鎖(RLock)類似于Lock物件,但同一個執行緒可以多次獲取它, 這允許擁有鎖的執行緒執行嵌套的acquire()和release()操作, 在這種情況下,只有最外層的release()操作才能將鎖重置為“未鎖定”狀態,
創建可重入鎖的語法如下:
threading.RLock()
RLock實體支持以下方法
| 實體方法 | 說明 |
|---|---|
| rlock.acquire([blocking]) | 獲取鎖,成功獲取則回傳 True,而且遞回級別被設定為1,如果此執行緒已擁有鎖, 則鎖的遞回級別加1,而且立即回傳,blocking的含義同上, |
| rlock.realease() | 通過減少鎖的遞回級別來釋放它,如果在減值后遞回基本為0,鎖將被置位“未鎖定”狀態, 否則,所繼續保持“已鎖定”狀態,只能由目前擁有鎖的執行緒來呼叫此函式 |
● 信號量(Semaphore)
信號量是一個基于計數器的同步原語,每次呼叫acquire()方法時此計數器減1, 每次呼叫release()方法時此計數器加1,如果計數器為0,acquire()方法將會阻塞, 直到其他執行緒呼叫release()方法為止,
創建信號量物件的語法如下(value是計數器初始值,默認為1):
threading.Semaphore([value])
以下創建有邊界的信號量物件(BoundedSemaphore),區別是release()操作的次數不能超過acquire()次數:
threading.BoundedSemaphore([value])
Semaphore實體支持以下方法
| 實體方法 | 說明 |
|---|---|
| s.acquire([blocking]) | 獲取信號量,若計數值大于0,則減1,然后立即回傳, 若計數值為0,此方法將阻塞,直到另一個執行緒呼叫release()方法為止, blocking的含義同上, |
| s.realease() | 通過將內部計數器的值加1來釋放一個信號量,如果計數值為0,而且另一執行緒在等待, 則該執行緒將被喚醒, |
“信號量”和“互斥鎖”之間的差別在于,信號量可用于發信號,例如:可以從不同執行緒呼叫 acquire()和release()方法,以便在生產者和消費者執行緒之間進行通信, 也可以用后面的“條件變數”來達成,
● 事件(Event)
“事件”用于執行緒間通信,一個執行緒發出事件,一個或多個其他執行緒等待它, Event實體內部管理著一個標志,可以用set()方法將它設為True, clear()方法設為False,wait()方法將阻塞執行緒,知道標志為True,
創建事件的語法如下:
threading.Event()
Event實體支持以下方法
| 實體方法 | 說明 |
|---|---|
| e.is_set() | 只有當內部標志為True時才回傳True, |
| e.set() | 將內部標志置為True,等待它變為True的所有執行緒都將被喚醒, |
| e.clear() | 將內部標志置為False, |
| e.wait([timeout]) | 阻塞直到內部標志為True,如果進入時內部標志為True,此方法立即回傳, timeout是一個浮點數,單位為秒,指定超時期限, |
“事件”不適合用于生產者-消費者問題,因為事件只有True和False兩種狀態, 處理程序中信號可能丟失,最好使用“條件變數”,
● 條件變數(Condition)
“條件變數”是另一種同步原語,典型用于生產者-消費者問題,
創建條件變數的語法如下,lock是可選的Lock或RLock實體,若預設則創建新的RLock實體
threading.Condition([lock])
Condition實體支持以下方法
| 實體方法 | 說明 |
|---|---|
| cv.acquire(*args) | 獲取底層鎖,此方法將呼叫底層鎖上的acquire(*args)方法, |
| cv.realease() | 釋放底層鎖,此方法將呼叫底層鎖上對應的release()方法, |
| cv.wait([timeout]) | 等待直到獲得通知或出現超時為止,此方法在呼叫執行緒已經獲取鎖之后呼叫, 呼叫時,將釋放底層鎖,而且執行緒將進入后隨眠狀態,直到另一個執行緒在條件變數上執行 notify()或notifyAll()將其喚醒為止,在執行緒被喚醒之后, 執行緒將重新獲取鎖,方法也會回傳, timeout是一個浮點數,單位為秒,指定超時期限, |
| cv.notify([n]) | 喚醒一個或多個等待此條件變數的執行緒,此方法只在呼叫執行緒已獲取條件變數內部鎖之后呼叫, 如果沒有正在等待的執行緒,它就什么也不做,n指定要喚醒的執行緒數量,默認為1, |
| cv.notify_all() | 喚醒所有等待此條件的執行緒, |
下面為使用條件變數的模板:
import threading
cv = threading.Condition()
def producer():
while True:
cv.acquire()
produce_item()
cv.notify()
cv.release()
def consumer():
while True:
cv.acquire()
while not item_is_available():
cv.wait() # 等待直到有項出現
cv.release()
consume_item()
如果存在多個執行緒等待同一個條件,notify()操作可能喚醒他們中的一個或多個, 因此某個執行緒被喚醒后,可能發現它等待的條件不存在了,所以在consumer()函式中使用while回圈, 如果執行緒醒來,但是生成的項已經消失,它就會回去等待下一個信號,
● 使用執行緒間資源的注意點
使用以上Lock等之類的執行緒間資源時,必須非常小心,依賴鎖的代碼應保證出現例外時正確地釋放鎖 否則可能導致死鎖,典型的代碼如下所示:
try:
lock.acquire()
# 關鍵部分
......
finally:
lock.release()
使用背景關系管理協議(with),更加簡潔:
with lock:
# 關鍵部分
......
另外,撰寫代碼時,一般應該避免同時獲取多個鎖,
(4)queue
queue模塊實作了各種“多生產者-多消費者佇列”,可用于在執行的多個執行緒間安全地交換資訊, 一般來說,執行緒間通信最佳的方式就是使用queue,者比前面的諸如“鎖”之類的資源都要好用,
queue模塊定義了以下3種不同的佇列型別,創建語法與說明如下:
| 創建函式 | 說明 |
|---|---|
| Queue([maxsize]) | 創建一個FIFO(先進先出)佇列,maxsize是佇列中可放入項的最大數量, 預設或置0則佇列大小為無窮大, |
| LifoQueue([maxsize]) | 創建一個LIFO(后進先出)佇列,(即:堆疊) |
| PriorityQueue([maxsize]) | 創建一個優先級佇列,使用這種佇列時,項應該是(priority, data)形式的元組, 其中priority是一個數字,數字越大優先級越高, |
佇列支持以下實體方法:
| 實體方法 | 說明 |
|---|---|
| q.qsize() | 回傳佇列的大小,因為其他執行緒可能正在更新佇列,此方法回傳的數字可能不可靠, |
| q.empty() | 如果佇列為空,則回傳True,否則回傳False, |
| q.full() | 如果佇列為滿,則回傳True,否則回傳False, |
| q.put(item [,block [,timeout]]) | 將item放入佇列,如果block為True(默認), 則呼叫者將阻塞直到佇列中出現可用的空閑位置為止;如block為False, 佇列滿時將引發Full例外,timeout提供可選的超時值,單位為秒, 如果超時則引發Full例外, |
| q.put_nowait(item) | 等價于q.put(item, False)方法, |
| q.get([block [,timeout]]) | 從佇列中取出一項回傳,如果block為True(默認), 則呼叫者將阻塞直到佇列中出現可取出的項為止;如block為False, 佇列空時將引發Full例外,timeout提供可選的超時值,單位為秒, 如果超時則引發Full例外, |
| q.get_nowait() | 等價于q.get(False)方法, |
| q.task_done() | 佇列的消費者用來指示對于項的處理已結束,如果使用此方法, 那么從佇列中每取出一項都應該手動呼叫一次, 此方法主要是輔助q.join()方法用的, |
| q.join() | 阻塞直到佇列中所有的項均被取出和處理完為止,當為佇列中的每一項都呼叫過了一次 q.task_done()方法,此方法將會直接回傳, |
● 使用佇列的示例
注意下例中q.task_done()和q.join()的用法, 它們將使得執行緒在處理完所有項后關閉,
import threading
from queue import Queue
class WorkerThread(threading.Thread):
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
self.input_queue = Queue()
def send(self, item):
self.input_queue.put(item)
def close(self):
self.input_queue.put(None)
self.input_queue.join()
def run(self):
while True:
item = self.input_queue.get()
if item is None:
break
# 處理項
print(item)
self.input_queue.task_done()
# 完成
self.input_queue.task_done()
return
# 使用示例
w = WorkerThread()
w.start()
w.send('hello')
w.send('world')
w.close()
回傳目錄
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/198235.html
標籤:Python
上一篇:Python3(六) 面向物件
下一篇:Python爬蟲之Scrapy
