我正在使用 python 標準庫中的multiprocessing.Pool來運行一堆作業人員。每個工人都使用 python 的子行程庫啟動子行程。每個工人都有責任管理子流程并在完成后清理它們。
import multiprocessing as mp
def main():
processes = 3
jobs = 6
pool = mp.Pool(processes)
for i in range(jobs):
args = (i,)
pool.apply_async(worker, args)
pool.close()
pool.join()
def worker(i):
# start processes
# wait for completion
# clean up
time.sleep(1)
main()
當我按 Ctrl C 從命令列退出腳本時,我試圖以一種理智的方式捕獲KeyboardInterrupt ,它就開始了。注意:這個例子是我實際程式的一個小版本,它盡力說明我遇到的問題。我在stackoverflow上找到了這些相關的帖子:
- 捕捉 Ctrl C / SIGINT 并在 python 中優雅地退出多行程
- python的多處理池的鍵盤中斷
前者比后者更適用。調查我發現在命令列上按 Ctrl C 時,signal.SIGINT 被發送到所有行程(父行程或主行程以及所有子行程和子子行程)。僅供參考,我在 bash 終端上使用 Ubuntu 18.04。
我采用了建議的方法并忽略了子行程的中斷信號。為了方便和我自己的理智,我給自己寫了一個背景關系管理器。
import multiprocessing as mp
import contextlib # <---
import signal # <---
def main():
processes = 3
jobs = 6
with ignore_interrupt_signals(): # <--- # <---
pool = mp.Pool(processes)
for i in range(jobs):
args = (i,)
pool.apply_async(worker, args)
pool.close()
pool.join()
@contextlib.contextmanager
def ignore_interrupt_signals(): # <---
previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
yield
signal.signal(signal.SIGINT, previous_handler)
def worker(i):
# start processes
# wait for completion
# clean up
time.sleep(1)
main()
除了作業人員需要一種方法來知道它應該關閉并重要地清理其所有產生的子行程之外,這作業得很好。對于背景關系,每個工人大約需要 45 分鐘才能完成。
I decided the best way to do this was to use a multiprocessing.Event. The event would be called stop_event and would be set if any process received a signal.SIGINT. To use a multiprocessing.Event that is used by the main and child processes it had to be managed by an multiprocessing.Manager through a proxy.
import multiprocessing as mp
import contextlib
import signal
import sys
def main():
processes = 3
jobs = 6
with ignore_interrupt_signals():
pool = mp.Pool(processes)
manager = mp.Manager() # <---
stop_event = manager.Event() # <---
try:
for i in range(jobs):
args = (i,)
pool.apply_async(worker, args)
except KeyboardInterrupt: # <---
stop_event.set() # <---
pool.close()
pool.join()
sys.exit() # <---
@contextlib.contextmanager
def ignore_interrupt_signals():
previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
yield
signal.signal(signal.SIGINT, previous_handler)
def worker(i, stop_event):
# start processes
while not stop_event.set(): # <---
# wait for completion
time.sleep(1)
# clean up
main()
Now the workers were safe from getting interrupted and not cleaning up their subprocesses, and the main processes catches the KeyboardInterrupt after the pool is created. When the exception is caught the pool is closed and the processes are joined. I thought this would work. However, I got an IOError from the stop_event.set() call.
Termination started due to Ctrl C, cleaning up...
Traceback (most recent call last):
File "...", line 1109, in <module>
main()
File "...", line 42, in main
args.func(args)
File "...", line 189, in run_command
stop_event.set()
File "/usr/lib/python2.7/multiprocessing/managers.py", line 1011, in set
return self._callmethod('set')
File "/usr/lib/python2.7/multiprocessing/managers.py", line 758, in _callmethod
conn.send((self._id, methodname, args, kwds))
IOError: [Errno 32] Broken pipe
The traceback has a lot of other tracebacks removed but the traceback of interest is a broken pipe when trying to set the stop_event using the manager proxy.
uj5u.com熱心網友回復:
multiprocessing.Manager作為服務器啟動,該服務器作為另一個行程啟動。因此,管理器行程也接收到來自 Ctrl C的signal.SIGINT并終止。這會導致 stop_event 管理器代理使用的管道不正常地關閉。我發現避免這個問題的最好方法是啟動管理器,同時忽略signal.SIGINT 。
import multiprocessing as mp
import contextlib
import signal
import sys
def main():
processes = 3
jobs = 6
with ignore_interrupt_signals():
pool = mp.Pool(processes)
manager = mp.Manager() # <---
stop_event = manager.Event()
try:
for i in range(jobs):
args = (i,)
pool.apply_async(worker, args)
except KeyboardInterrupt:
stop_event.set()
pool.close()
pool.join()
sys.exit()
@contextlib.contextmanager
def ignore_interrupt_signals():
previous_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
yield
signal.signal(signal.SIGINT, previous_handler)
def worker(i, stop_event):
# start processes
while not stop_event.set():
# wait for completion
time.sleep(1)
# clean up
main()
我回答了我自己的問題,因為我希望這對其他人有幫助,因為我花了很多時間在我的代碼庫中跟蹤套接字和管道錯誤以確定這個相當簡單的解決方案。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/417971.html
標籤:
