标签归档:keyboardinterrupt

键盘中断与python的多处理池

问题:键盘中断与python的多处理池

如何使用python的多处理池处理KeyboardInterrupt事件?这是一个简单的示例:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

当运行上面的代码时,KeyboardInterrupt按时会引发^C,但是该过程只是在此时挂起,我必须在外部将其杀死。

我希望能够随时按下^C并使所有进程正常退出。

How can I handle KeyboardInterrupt events with python’s multiprocessing Pools? Here is a simple example:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

When running the code above, the KeyboardInterrupt gets raised when I press ^C, but the process simply hangs at that point and I have to kill it externally.

I want to be able to press ^C at any time and cause all of the processes to exit gracefully.


回答 0

这是一个Python错误。等待threading.Condition.wait()中的条件时,从不发送KeyboardInterrupt。复制:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

直到wait()返回,才会传递KeyboardInterrupt异常,并且它永远不会返回,因此中断永远不会发生。KeyboardInterrupt几乎应该可以中断条件等待。

请注意,如果指定了超时,则不会发生这种情况。cond.wait(1)将立即收到中断。因此,一种解决方法是指定超时。为此,请更换

    results = pool.map(slowly_square, range(40))

    results = pool.map_async(slowly_square, range(40)).get(9999999)

或类似。

This is a Python bug. When waiting for a condition in threading.Condition.wait(), KeyboardInterrupt is never sent. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

The KeyboardInterrupt exception won’t be delivered until wait() returns, and it never returns, so the interrupt never happens. KeyboardInterrupt should almost certainly interrupt a condition wait.

Note that this doesn’t happen if a timeout is specified; cond.wait(1) will receive the interrupt immediately. So, a workaround is to specify a timeout. To do that, replace

    results = pool.map(slowly_square, range(40))

with

    results = pool.map_async(slowly_square, range(40)).get(9999999)

or similar.


回答 1

从我最近发现的情况来看,最好的解决方案是设置工作进程完全忽略SIGINT,并将所有清理代码限制在父进程中。这可以解决空闲和繁忙的工作进程的问题,并且在子进程中不需要错误处理代码。

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

解释和完整的示例代码分别位于http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/http://github.com/jreese/multiprocessing-keyboardinterrupt

From what I have recently found, the best solution is to set up the worker processes to ignore SIGINT altogether, and confine all the cleanup code to the parent process. This fixes the problem for both idle and busy worker processes, and requires no error handling code in your child processes.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

Explanation and full example code can be found at http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ and http://github.com/jreese/multiprocessing-keyboardinterrupt respectively.


回答 2

由于某些原因,仅Exception可正常处理从基类继承的异常。作为一种变通方法,你可能会重新提高你KeyboardInterrupt作为一个Exception实例:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

通常,您将获得以下输出:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

因此,如果您点击^C,您将获得:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

For some reasons, only exceptions inherited from the base Exception class are handled normally. As a workaround, you may re-raise your KeyboardInterrupt as an Exception instance:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Normally you would get the following output:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

So if you hit ^C, you will get:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

回答 3

通常这种简单的结构工程CtrlC上池:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

如几篇类似文章所述:

无需尝试即可在Python中捕获键盘中断

Usually this simple structure works for CtrlC on Pool :

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

As was stated in few similar posts:

Capture keyboardinterrupt in Python without try-except


回答 4

投票表决的答案不能解决核心问题,但具有类似的副作用。

多重处理库的作者Jesse Noller解释了multiprocessing.Pool在旧博客中使用CTRL + C时如何正确处理。

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

The voted answer does not tackle the core issue but a similar side effect.

Jesse Noller, the author of the multiprocessing library, explains how to correctly deal with CTRL+C when using multiprocessing.Pool in a old blog post.

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

回答 5

似乎有两个问题使多处理过程变得异常烦人。第一个(由Glenn指出)是您需要使用map_async超时而不是map为了获得即时响应(即,不要完成对整个列表的处理)。第二点(Andrey指出)是,多处理不会捕获不继承自Exception(例如SystemExit)的异常。所以这是我的解决方案,涉及这两个方面:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results

It seems there are two issues that make exceptions while multiprocessing annoying. The first (noted by Glenn) is that you need to use map_async with a timeout instead of map in order to get an immediate response (i.e., don’t finish processing the entire list). The second (noted by Andrey) is that multiprocessing doesn’t catch exceptions that don’t inherit from Exception (e.g., SystemExit). So here’s my solution that deals with both of these:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results

回答 6

我发现目前最好的解决方案是不使用multiprocessing.pool功能,而是使用自己的池功能。我提供了一个使用apply_async演示该错误的示例,以及一个示例,展示了如何避免完全使用池功能。

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

I found, for the time being, the best solution is to not use the multiprocessing.pool feature but rather roll your own pool functionality. I provided an example demonstrating the error with apply_async as well as an example showing how to avoid using the pool functionality altogether.

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/


回答 7

我是Python的新手。我到处都在寻找答案,却偶然发现了这个以及其他一些博客和YouTube视频。我试图将粘贴作者的代码复制到上面,并在Windows 7 64位的python 2.7.13上重现它。这接近我想要实现的目标。

我使我的子进程忽略ControlC,并使父进程终止。似乎绕过子进程确实为我避免了这个问题。

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

从头开始的那部分pool.terminate()似乎永远不会执行。

I’m a newbie in Python. I was looking everywhere for answer and stumble upon this and a few other blogs and youtube videos. I have tried to copy paste the author’s code above and reproduce it on my python 2.7.13 in windows 7 64- bit. It’s close to what I wanna achieve.

I made my child processes to ignore the ControlC and make the parent process terminate. Looks like bypassing the child process does avoid this problem for me.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

The part starting at pool.terminate() never seems to execute.


回答 8

您可以尝试使用Pool对象的apply_async方法,如下所示:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

输出:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

此方法的优点是中断之前处理的结果将返回到结果字典中:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

You can try using the apply_async method of a Pool object, like this:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Output:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

An advantage of this method is that results processed before interruption will be returned in the results dictionary:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

回答 9

奇怪的是,您似乎也必须处理KeyboardInterrupt孩子中的孩子。我本来希望它能像写的那样工作…尝试更改slowly_square为:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

那应该可以按您预期的那样工作。

Strangely enough it looks like you have to handle the KeyboardInterrupt in the children as well. I would have expected this to work as written… try changing slowly_square to:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

That should work as you expected.


无需尝试即可在Python中捕获键盘中断

问题:无需尝试即可在Python中捕获键盘中断

Python中是否有某种方法可以捕获KeyboardInterrupt事件而不将所有代码放入tryexcept语句中?

如果用户按下Ctrl+,我想干净无痕地退出C

Is there some way in Python to capture KeyboardInterrupt event without putting all the code inside a tryexcept statement?

I want to cleanly exit without trace if user presses Ctrl+C.


回答 0

是的,您可以使用模块signal安装中断处理程序,并使用threading.Event永远等待:

import signal
import sys
import time
import threading

def signal_handler(signal, frame):
    print('You pressed Ctrl+C!')
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
print('Press Ctrl+C')
forever = threading.Event()
forever.wait()

Yes, you can install an interrupt handler using the module signal, and wait forever using a threading.Event:

import signal
import sys
import time
import threading

def signal_handler(signal, frame):
    print('You pressed Ctrl+C!')
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
print('Press Ctrl+C')
forever = threading.Event()
forever.wait()

回答 1

如果您只想不显示回溯,则使代码如下所示:

## all your app logic here
def main():
   ## whatever your app does.


if __name__ == "__main__":
   try:
      main()
   except KeyboardInterrupt:
      # do nothing here
      pass

(是的,我知道这并不能直接回答问题,但是还不清楚为什么需要try / except块会令人反感-也许这会使OP变得不那么烦人了)

If all you want is to not show the traceback, make your code like this:

## all your app logic here
def main():
   ## whatever your app does.


if __name__ == "__main__":
   try:
      main()
   except KeyboardInterrupt:
      # do nothing here
      pass

(Yes, I know that this doesn’t directly answer the question, but it’s not really clear why needing a try/except block is objectionable — maybe this makes it less annoying to the OP)


回答 2

设置自己的信号处理程序的另一种方法是使用上下文管理器来捕获异常并忽略它:

>>> class CleanExit(object):
...     def __enter__(self):
...             return self
...     def __exit__(self, exc_type, exc_value, exc_tb):
...             if exc_type is KeyboardInterrupt:
...                     return True
...             return exc_type is None
... 
>>> with CleanExit():
...     input()    #just to test it
... 
>>>

这将删除tryexcept块,同时保留一些明确的说明。

这还允许您仅在代码的某些部分中忽略中断,而不必每次都设置和重置信号处理程序。

An alternative to setting your own signal handler is to use a context-manager to catch the exception and ignore it:

>>> class CleanExit(object):
...     def __enter__(self):
...             return self
...     def __exit__(self, exc_type, exc_value, exc_tb):
...             if exc_type is KeyboardInterrupt:
...                     return True
...             return exc_type is None
... 
>>> with CleanExit():
...     input()    #just to test it
... 
>>>

This removes the tryexcept block while preserving some explicit mention of what is going on.

This also allows you to ignore the interrupt only in some portions of your code without having to set and reset again the signal handlers everytime.


回答 3

我知道这是一个古老的问题,但是我首先来到这里,然后发现了该atexit模块。我还不知道它的跨平台跟踪记录或完整的警告说明,但是到目前为止,这正是我KeyboardInterrupt在Linux上尝试进行后期清理时一直在寻找的东西。只是想以另一种方式解决问题。

我想在Fabric操作的上下文中进行退出后清理,因此将所有内容都包装在try/ except中对我来说也不是一种选择。我觉得atexit这种情况可能非常适合,因为您的代码不在控制流的最高级别。

atexit 具有非常强大的功能并且易于使用,例如:

import atexit

def goodbye():
    print "You are now leaving the Python sector."

atexit.register(goodbye)

您还可以将其用作装饰器(从2.6开始;该示例来自docs):

import atexit

@atexit.register
def goodbye():
    print "You are now leaving the Python sector."

如果您只想使其特定KeyboardInterrupt,那么另一个人对此问题的答案可能会更好。

但是请注意,该atexit模块只有约70行代码,并且创建类似版本以不同方式对待异常(例如将异常作为参数传递给回调函数)并不难。(这样做的局限性是atexit需要修改后的版本:目前,我无法为exit-callback-functions知道异常的方法;atexit处理程序捕获异常,调用回调,然后重新引发该exceptions。但是您可以采取不同的方法。)

有关更多信息,请参见:

I know this is an old question but I came here first and then discovered the atexit module. I do not know about its cross-platform track record or a full list of caveats yet, but so far it is exactly what I was looking for in trying to handle post-KeyboardInterrupt cleanup on Linux. Just wanted to throw in another way of approaching the problem.

I want to do post-exit clean-up in the context of Fabric operations, so wrapping everything in try/except wasn’t an option for me either. I feel like atexit may be a good fit in such a situation, where your code is not at the top level of control flow.

atexit is very capable and readable out of the box, for example:

import atexit

def goodbye():
    print "You are now leaving the Python sector."

atexit.register(goodbye)

You can also use it as a decorator (as of 2.6; this example is from the docs):

import atexit

@atexit.register
def goodbye():
    print "You are now leaving the Python sector."

If you wanted to make it specific to KeyboardInterrupt only, another person’s answer to this question is probably better.

But note that the atexit module is only ~70 lines of code and it would not be hard to create a similar version that treats exceptions differently, for example passing the exceptions as arguments to the callback functions. (The limitation of atexit that would warrant a modified version: currently I can’t conceive of a way for the exit-callback-functions to know about the exceptions; the atexit handler catches the exception, calls your callback(s), then re-raises that exception. But you could do this differently.)

For more info see:


回答 4

您可以通过替换来防止打印堆栈跟踪KeyboardInterrupt,而无需try: ... except KeyboardInterrupt: pass(最明显,最恰当的“最佳”解决方案,但您已经知道并要求其他东西)sys.excepthook。就像是

def custom_excepthook(type, value, traceback):
    if type is KeyboardInterrupt:
        return # do nothing
    else:
        sys.__excepthook__(type, value, traceback)

You can prevent printing a stack trace for KeyboardInterrupt, without try: ... except KeyboardInterrupt: pass (the most obvious and propably “best” solution, but you already know it and asked for something else) by replacing sys.excepthook. Something like

def custom_excepthook(type, value, traceback):
    if type is KeyboardInterrupt:
        return # do nothing
    else:
        sys.__excepthook__(type, value, traceback)

回答 5

我尝试了每个人提出的建议解决方案,但我必须自己临时编写代码才能真正起作用。以下是我的即兴代码:

import signal
import sys
import time

def signal_handler(signal, frame):
    print('You pressed Ctrl+C!')
    print(signal) # Value is 2 for CTRL + C
    print(frame) # Where your execution of program is at moment - the Line Number
    sys.exit(0)

#Assign Handler Function
signal.signal(signal.SIGINT, signal_handler)

# Simple Time Loop of 5 Seconds
secondsCount = 5
print('Press Ctrl+C in next '+str(secondsCount))
timeLoopRun = True 
while timeLoopRun:  
    time.sleep(1)
    if secondsCount < 1:
        timeLoopRun = False
    print('Closing in '+ str(secondsCount)+ ' seconds')
    secondsCount = secondsCount - 1

I tried the suggested solutions by everyone, but I had to improvise code myself to actually make it work. Following is my improvised code:

import signal
import sys
import time

def signal_handler(signal, frame):
    print('You pressed Ctrl+C!')
    print(signal) # Value is 2 for CTRL + C
    print(frame) # Where your execution of program is at moment - the Line Number
    sys.exit(0)

#Assign Handler Function
signal.signal(signal.SIGINT, signal_handler)

# Simple Time Loop of 5 Seconds
secondsCount = 5
print('Press Ctrl+C in next '+str(secondsCount))
timeLoopRun = True 
while timeLoopRun:  
    time.sleep(1)
    if secondsCount < 1:
        timeLoopRun = False
    print('Closing in '+ str(secondsCount)+ ' seconds')
    secondsCount = secondsCount - 1

回答 6

如果有人正在寻找快速的最小解决方案,

import signal

# The code which crashes program on interruption

signal.signal(signal.SIGINT, call_this_function_if_interrupted)

# The code skipped if interrupted

If someone is in search for a quick minimal solution,

import signal

# The code which crashes program on interruption

signal.signal(signal.SIGINT, call_this_function_if_interrupted)

# The code skipped if interrupted