我有n執行緒同時運行。這些執行緒正在處理包含m測驗用例的串列。例如,執行緒n-1正在處理 itemm[i-1]而執行緒n正在處理 item m[i]。如果例如執行緒n-1失敗或回傳信號,我想停止所有執行緒。我怎樣才能做到這一點?
這是一個 MWE:
這是我的處理函式
def process(input_addr):
i = 1
print('Total number of executed unit tests: {}'.format(i))
print("executed {}. thread".format(input_addr))
try:
command = 'python3 ' input_addr
result = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
msg, err = result.communicate()
if msg.decode('utf-8') != '':
stat = parse_shell(msg.decode('utf-8'))
if stat:
print('Test Failed')
return True
else:
stat = parse_shell(err)
if stat:
print('Test Failed')
return True
except Exception as e:
print("thread.\nMessage:{1}".format(e))
這是我的游泳池:
def pre_run_test_files(self):
with Pool(10) as p:
p.map(process, self.test_files)
我在用:
from multiprocessing import Pool
謝謝。
uj5u.com熱心網友回復:
您可以擁有自己的作業函式,process只需引發例外并使用error_callback函式對池進行apply_async呼叫terminate,如下面的演示所示:
from multiprocessing import Pool
def process(i):
import time
time.sleep(1)
if i == 6:
raise ValueError(f'Bad value: {i}')
print(i, flush=True)
def my_error_callback(e):
pool.terminate()
print(e)
if __name__ == '__main__':
pool = Pool(4)
for i in range(20):
pool.apply_async(process, args=(i,), error_callback=my_error_callback)
# wait for all tasks to complete
pool.close()
pool.join()
印刷:
0
1
3
2
4
5
7
Bad value: 6
您應該能夠根據您的特定問題調整上述代碼。
uj5u.com熱心網友回復:
我找到了解決方案:
def process(i, input_addr, event):
kill_flag = False
if not event.is_set():
print('Total number of executed unit tests: {}'.format(i))
print("executed {}. thread".format(input_addr))
try:
command = 'python3 ' input_addr
result = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
msg, err = result.communicate()
if msg.decode('utf-8') != '':
stat = parse_shell(msg.decode('utf-8'))
if stat:
print('Test Failed')
kill_flag = True
# all_run.append(input_addr)
#write_list_to_txt(input_addr, valid_tests)
else:
kill_flag = False
else:
stat = parse_shell(err)
if stat:
print('Test Failed')
kill_flag = True
# all_run.append(input_addr)
#write_list_to_txt(input_addr, valid_tests)
else:
kill_flag = False
except Exception as e:
print("thread.\nMessage:{1}".format(e))
if kill_flag:
event.set()
def manager():
p= multiprocessing.Pool(10)
m = multiprocessing.Manager()
event = m.Event()
for i,f in enumerate(self.test_files):
p.apply_async(process, (i, f, event))
p.close()
event.wait()
p.terminate()
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/393745.html
