问题:键盘中断与python的多处理池
如何使用python的多处理池处理KeyboardInterrupt事件?这是一个简单的示例:
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
sleep(1)
return i*i
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
# **** THIS PART NEVER EXECUTES. ****
pool.terminate()
print "You cancelled the program!"
sys.exit(1)
print "\nFinally, here are the results: ", results
if __name__ == "__main__":
go()
当运行上面的代码时,KeyboardInterrupt
按时会引发^C
,但是该过程只是在此时挂起,我必须在外部将其杀死。
我希望能够随时按下^C
并使所有进程正常退出。
回答 0
这是一个Python错误。等待threading.Condition.wait()中的条件时,从不发送KeyboardInterrupt。复制:
import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"
直到wait()返回,才会传递KeyboardInterrupt异常,并且它永远不会返回,因此中断永远不会发生。KeyboardInterrupt几乎应该可以中断条件等待。
请注意,如果指定了超时,则不会发生这种情况。cond.wait(1)将立即收到中断。因此,一种解决方法是指定超时。为此,请更换
results = pool.map(slowly_square, range(40))
与
results = pool.map_async(slowly_square, range(40)).get(9999999)
或类似。
回答 1
从我最近发现的情况来看,最好的解决方案是设置工作进程完全忽略SIGINT,并将所有清理代码限制在父进程中。这可以解决空闲和繁忙的工作进程的问题,并且在子进程中不需要错误处理代码。
import signal
...
def init_worker():
signal.signal(signal.SIGINT, signal.SIG_IGN)
...
def main()
pool = multiprocessing.Pool(size, init_worker)
...
except KeyboardInterrupt:
pool.terminate()
pool.join()
解释和完整的示例代码分别位于http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/和http://github.com/jreese/multiprocessing-keyboardinterrupt。
回答 2
由于某些原因,仅Exception
可正常处理从基类继承的异常。作为一种变通方法,你可能会重新提高你KeyboardInterrupt
作为一个Exception
实例:
from multiprocessing import Pool
import time
class KeyboardInterruptError(Exception): pass
def f(x):
try:
time.sleep(x)
return x
except KeyboardInterrupt:
raise KeyboardInterruptError()
def main():
p = Pool(processes=4)
try:
print 'starting the pool map'
print p.map(f, range(10))
p.close()
print 'pool map complete'
except KeyboardInterrupt:
print 'got ^C while pool mapping, terminating the pool'
p.terminate()
print 'pool is terminated'
except Exception, e:
print 'got exception: %r, terminating the pool' % (e,)
p.terminate()
print 'pool is terminated'
finally:
print 'joining pool processes'
p.join()
print 'join complete'
print 'the end'
if __name__ == '__main__':
main()
通常,您将获得以下输出:
staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end
因此,如果您点击^C
,您将获得:
staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end
回答 3
通常这种简单的结构工程Ctrl– C上池:
def signal_handle(_signal, frame):
print "Stopping the Jobs."
signal.signal(signal.SIGINT, signal_handle)
如几篇类似文章所述:
回答 4
投票表决的答案不能解决核心问题,但具有类似的副作用。
多重处理库的作者Jesse Noller解释了multiprocessing.Pool
在旧博客中使用CTRL + C时如何正确处理。
import signal
from multiprocessing import Pool
def initializer():
"""Ignore CTRL+C in the worker process."""
signal.signal(signal.SIGINT, signal.SIG_IGN)
pool = Pool(initializer=initializer)
try:
pool.map(perform_download, dowloads)
except KeyboardInterrupt:
pool.terminate()
pool.join()
回答 5
似乎有两个问题使多处理过程变得异常烦人。第一个(由Glenn指出)是您需要使用map_async
超时而不是map
为了获得即时响应(即,不要完成对整个列表的处理)。第二点(Andrey指出)是,多处理不会捕获不继承自Exception
(例如SystemExit
)的异常。所以这是我的解决方案,涉及这两个方面:
import sys
import functools
import traceback
import multiprocessing
def _poolFunctionWrapper(function, arg):
"""Run function under the pool
Wrapper around function to catch exceptions that don't inherit from
Exception (which aren't caught by multiprocessing, so that you end
up hitting the timeout).
"""
try:
return function(arg)
except:
cls, exc, tb = sys.exc_info()
if issubclass(cls, Exception):
raise # No worries
# Need to wrap the exception with something multiprocessing will recognise
import traceback
print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))
def _runPool(pool, timeout, function, iterable):
"""Run the pool
Wrapper around pool.map_async, to handle timeout. This is required so as to
trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool
Further wraps the function in _poolFunctionWrapper to catch exceptions
that don't inherit from Exception.
"""
return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)
def myMap(function, iterable, numProcesses=1, timeout=9999):
"""Run the function on the iterable, optionally with multiprocessing"""
if numProcesses > 1:
pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
mapFunc = functools.partial(_runPool, pool, timeout)
else:
pool = None
mapFunc = map
results = mapFunc(function, iterable)
if pool is not None:
pool.close()
pool.join()
return results
回答 6
我发现目前最好的解决方案是不使用multiprocessing.pool功能,而是使用自己的池功能。我提供了一个使用apply_async演示该错误的示例,以及一个示例,展示了如何避免完全使用池功能。
http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/
回答 7
我是Python的新手。我到处都在寻找答案,却偶然发现了这个以及其他一些博客和YouTube视频。我试图将粘贴作者的代码复制到上面,并在Windows 7 64位的python 2.7.13上重现它。这接近我想要实现的目标。
我使我的子进程忽略ControlC,并使父进程终止。似乎绕过子进程确实为我避免了这个问题。
#!/usr/bin/python
from multiprocessing import Pool
from time import sleep
from sys import exit
def slowly_square(i):
try:
print "<slowly_square> Sleeping and later running a square calculation..."
sleep(1)
return i * i
except KeyboardInterrupt:
print "<child processor> Don't care if you say CtrlC"
pass
def go():
pool = Pool(8)
try:
results = pool.map(slowly_square, range(40))
except KeyboardInterrupt:
pool.terminate()
pool.close()
print "You cancelled the program!"
exit(1)
print "Finally, here are the results", results
if __name__ == '__main__':
go()
从头开始的那部分pool.terminate()
似乎永远不会执行。
回答 8
您可以尝试使用Pool对象的apply_async方法,如下所示:
import multiprocessing
import time
from datetime import datetime
def test_func(x):
time.sleep(2)
return x**2
def apply_multiprocessing(input_list, input_function):
pool_size = 5
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)
try:
jobs = {}
for value in input_list:
jobs[value] = pool.apply_async(input_function, [value])
results = {}
for value, result in jobs.items():
try:
results[value] = result.get()
except KeyboardInterrupt:
print "Interrupted by user"
pool.terminate()
break
except Exception as e:
results[value] = e
return results
except Exception:
raise
finally:
pool.close()
pool.join()
if __name__ == "__main__":
iterations = range(100)
t0 = datetime.now()
results1 = apply_multiprocessing(iterations, test_func)
t1 = datetime.now()
print results1
print "Multi: {}".format(t1 - t0)
t2 = datetime.now()
results2 = {i: test_func(i) for i in iterations}
t3 = datetime.now()
print results2
print "Non-multi: {}".format(t3 - t2)
输出:
100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000
此方法的优点是中断之前处理的结果将返回到结果字典中:
>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}
回答 9
奇怪的是,您似乎也必须处理KeyboardInterrupt
孩子中的孩子。我本来希望它能像写的那样工作…尝试更改slowly_square
为:
def slowly_square(i):
try:
sleep(1)
return i * i
except KeyboardInterrupt:
print 'You EVIL bastard!'
return 0
那应该可以按您预期的那样工作。