我使用 pytest 創建了一個小測驗用例來演示該問題:
from threading import Thread, get_ident
def test_threading():
threads = []
ids = []
def append_id():
# time.sleep(1)
ids.append(get_ident())
for _ in range(5):
thread = Thread(target=append_id)
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
assert len(set(ids)) == 5
測驗失敗,因為get_ident為不同的執行緒回傳相同的 ID。但是當我添加time.sleep(1)每個執行緒時,測驗通過了。我不確定我明白為什么。
我正在使用 Python 3.9.0 和 Pytest 7.1.2。
uj5u.com熱心網友回復:
從以下檔案:get_ident
回傳當前執行緒的“執行緒識別符號”。這是一個非零整數。它的價值沒有直接的意義;它旨在作為一種神奇的 cookie,例如用于索引執行緒特定資料的字典。當一個執行緒退出并創建另一個執行緒時,執行緒識別符號可能會被回收。
由于您的執行緒運行得太快(沒有time.sleep(1)),ID 正在被回收。
您可以為name每個執行緒提供一個。此名稱不必是唯一的,但您可以在測驗中使用它(或者在您的應用程式中,如果您需要在背景關系中唯一的東西):
from threading import Thread, get_ident, current_thread
def test_threading():
threads = []
names = []
ids = []
def append_id():
# time.sleep(1)
ids.append(get_ident())
names.append(current_thread().name)
for i in range(5):
thread = Thread(target=append_id, name=f'Thread {i}')
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
assert len(set(names)) == 5
print(f'Names: {names}')
print(f'Ids: {set(ids)}')
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/496627.html
