我有我的主要功能run_tests,它首先啟動新的、單獨的執行緒來啟動新行程,然后在主回圈中我嘗試檢測那些已經完成和那些已經超時的執行緒。
import time
import traceback
from typing import List
from threading import Thread
from multiprocess import (Semaphore,
Process,
Pipe)
from hanging_threads import start_monitoring
class S_Process(Process):
def __init__(self,
test_name: str,
semaphore: Semaphore,
pipe_conn: Pipe,
*args,
**kwargs
) -> None:
Process.__init__(self, *args, **kwargs)
self.__child_conn = pipe_conn
self.__test_name = test_name
self.__semaphore = semaphore
def run(self) -> None:
self.__semaphore.acquire()
self.__child_conn.send(0)
Process.run(self)
self.__child_conn.send(self.__test_name)
self.__semaphore.release()
def terminate(self) -> None:
self.__semaphore.release()
super().terminate()
@property
def test_name(self) -> str:
return self.__test_name
class Task(object):
def __init__(self,
process: S_Process,
pipe_conn: Pipe,
) -> None:
self.process = process
self.pipe_conn = pipe_conn
self.duration = None
self.test_name = None
self.status = 'NOTRUN'
def run(self) -> None:
self.process.start()
self.pipe_conn.recv()
self.duration = time.perf_counter()
self.status = 'RUNNING'
def join(self) -> None:
self.process.join()
if self.process.is_alive():
self.process.kill()
self.set_result()
def terminate(self) -> None:
self.process.terminate()
def set_result(self) -> None:
self.test_name = self.pipe_conn.recv()
self.status = 'ENDED'
class Tasks(object):
def __init__(self) -> None:
self.remaining: List[Task] = []
self.completed: List[Task] = []
def add(self,
process: S_Process,
pipe_conn: Pipe
) -> None:
task = Task(process, pipe_conn)
self.remaining.append(task)
def complete(self, task: Task) -> None:
self.completed.append(task)
self.remaining.remove(task)
def info(self) -> List[str]:
output: List[str] = []
for task in self.completed:
output.append(f"Test Name: {task.result.test_name} "
f"Result: {task.result.status} "
f"Duration: {task.result.duration} "
f"Retries: {task.result.retries}")
return output
def run_tests() -> None:
start_monitoring()
tasks = Tasks()
semaphore = Semaphore(2)
for i in range(8):
parent_conn, child_conn = Pipe()
process = S_Process(
target=test_function,
args=(),
test_name=f'test_{i}',
semaphore=semaphore,
pipe_conn=child_conn
)
tasks.add(process, parent_conn)
def runner(tasks):
try:
for task in tasks:
print('running task')
task.run()
except Exception:
print(traceback.format_exc())
TIMEOUT = 5
runner = Thread(target=runner, args=(tasks.remaining,))
runner.start()
while tasks.remaining:
for task in tasks.remaining:
if not task.process.is_alive() and task.status == 'RUNNING':
print('JOINING:', task.process.test_name)
task.join()
tasks.complete(task)
if task.status == "RUNNING":
check_time = time.perf_counter() - task.duration
if (check_time > TIMEOUT):
print('TERMINATING:', task.process.test_name)
task.terminate()
tasks.complete(task)
print('Rem:', len(tasks.remaining))
print('End:', len(tasks.completed))
time.sleep(0.2)
def test_function():
print('test_func')
time.sleep(3)
if __name__ == "__main__":
run_tests()
方法 task.run() 啟動行程并等待 pipe_conn.recv() 獲取行程確實已獲取信號量并開始作業的資訊,以便我可以測量其持續時間。
當我將信號量設定為“2”(最多 2 個行程可以同時運行)和 7-8 個任務并啟動 run_tests 時,它運行良好,直到第三/第四個行程被加入/終止。感謝hang_threads包,我發現我的運行執行緒因此錯誤而死:
---------- Thread 9068 "Thread-2 (runner)" hangs ----------
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 973, in _bootstrap
self._bootstrap_inner()
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 1016, in _bootstrap_inner
self.run()
File "C:\Program Files\WindowsApps\PythonSoftwareFoundation.Python.3.10_3.10.2032.0_x64__qbz5n2kfra8p0\lib\threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
File "c:\Users\<user>\Documents\Projects\eightest\eightest\runner.py", line 261, in runner
task.run()
File "c:\Users\<user>\Documents\Projects\eightest\eightest\runner.py", line 64, in run
if self.pipe_conn.recv() == 'started':
File "C:\Users\<user>\Documents\Projects\eightest\.venv\lib\site-packages\multiprocess\connection.py", line 258, in recv
buf = self._recv_bytes()
File "C:\Users\<user>\Documents\Projects\eightest\.venv\lib\site-packages\multiprocess\connection.py", line 313, in _recv_bytes
waitres = _winapi.WaitForMultipleObjects(
為什么前幾個行程開始和結束都很好,然后在某些時候執行緒無法處理管道?此外,回圈掛在 6 個任務結束和 2 個剩余任務上,而這 2 個任務永遠無法啟動。
uj5u.com熱心網友回復:
我花了很長時間試圖弄清楚這一點。問題是您的任務運行執行緒具有:
for task in tasks:
print('running task')
task.run()
哪里tasks是對 的參考tasks.remaining。tasks.remaining當主執行緒從這個相同的串列中洗掉任務時,嘗試迭代本質上是串列的內容會出現問題。結果(在我的桌面上)兩個任務永遠不會被迭代,因此永遠不會啟動。解決方案是讓任務運行執行緒迭代串列的副本tasks.remaining。
我對您的代碼進行了其他更改。我所有的修改都用#Booboo 注釋。另外,我沒有hanging_threads模塊,所以我注釋掉了與監視器相關的代碼:
import time
import traceback
from typing import List
from threading import Thread
from multiprocess import (Semaphore,
Process,
Pipe)
#from hanging_threads import start_monitoring #Booboo
class S_Process(Process):
def __init__(self,
test_name: str,
semaphore: Semaphore,
pipe_conn: Pipe,
*args,
**kwargs
) -> None:
Process.__init__(self, *args, **kwargs)
self.__child_conn = pipe_conn
self.__test_name = test_name
self.__semaphore = semaphore
def run(self) -> None:
with self.__semaphore: #Booboo
"""
"""
self.__child_conn.send(0)
#Booboo we must catch any possible exceptions raise by the
# target function to ensure we do the send below:
try:
Process.run(self)
except Exception as e:
print(e)
self.__child_conn.send(self.__test_name)
"""
def terminate(self) -> None:
self.__semaphore.release()
super().terminate()
"""
@property
def test_name(self) -> str:
return self.__test_name
class Task(object):
def __init__(self,
process: S_Process,
pipe_conn: Pipe,
) -> None:
self.process = process
self.pipe_conn = pipe_conn
self.duration = None
self.test_name = None
self.status = 'NOTRUN'
def run(self) -> None:
self.process.start()
self.pipe_conn.recv()
self.duration = time.perf_counter()
self.status = 'RUNNING'
def join(self) -> None:
self.process.join()
#Booboo This method is only called if the process is not alive:
#assert not self.process.is_alive()
""" #Booboo
if self.process.is_alive(): # The process cannot be alive
self.process.kill()
"""
self.set_result()
def terminate(self) -> None:
self.process.terminate()
def set_result(self) -> None:
self.test_name = self.pipe_conn.recv()
self.duration = time.perf_counter() - self.duration #Booboo
self.status = 'ENDED'
class Tasks(object):
def __init__(self) -> None:
self.remaining: List[Task] = []
self.completed: List[Task] = []
def add(self,
process: S_Process,
pipe_conn: Pipe,
) -> None:
task = Task(process, pipe_conn)
self.remaining.append(task)
def complete(self, task: Task) -> None:
self.completed.append(task)
self.remaining.remove(task)
def info(self) -> List[str]:
output: List[str] = []
for task in self.completed:
output.append(f"Test Name: {task.test_name} " #Booboo
f"Result: {task.status} " #Booboo
f"Duration: {task.duration}") # #Booboo
#f"Retries: {task.result.retries}") #Booboo
return output
def run_tests() -> None:
#start_monitoring() #Booboo
tasks = Tasks()
semaphore = Semaphore(2)
for i in range(8):
parent_conn, child_conn = Pipe()
process = S_Process(
target=test_function,
args=(),
test_name=f'test_{i}',
semaphore=semaphore,
pipe_conn=child_conn
)
tasks.add(process, parent_conn)
def runner(tasks):
try:
for task in tasks:
task.run()
print('running task', task.process.test_name) #Booboo
except Exception:
print(traceback.format_exc())
TIMEOUT = 5
runner = Thread(target=runner, args=(tasks.remaining.copy(),)) #Booboo
runner.start()
while tasks.remaining:
for task in tasks.remaining.copy():
if not task.process.is_alive() and task.status == 'RUNNING':
print('JOINING:', task.process.test_name)
task.join()
tasks.complete(task)
elif task.status == "RUNNING": #Booboo
check_time = time.perf_counter() - task.duration
if (check_time > TIMEOUT):
print('TERMINATING:', task.process.test_name)
task.terminate()
tasks.complete(task)
print('Rem:', len(tasks.remaining))
print('End:', len(tasks.completed))
time.sleep(0.2)
def test_function():
print('test_func')
time.sleep(3)
if __name__ == "__main__":
run_tests()
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/517033.html
