标签归档:exception-handling

如何将traceback / sys.exc_info()值保存在变量中?

问题:如何将traceback / sys.exc_info()值保存在变量中?

我想将错误的名称和追溯详细信息保存到变量中。这是我的尝试。

import sys
try:
    try:
        print x
    except Exception, ex:
        raise NameError
except Exception, er:
    print "0", sys.exc_info()[0]
    print "1", sys.exc_info()[1]
    print "2", sys.exc_info()[2]

输出:

0 <type 'exceptions.NameError'>
1 
2 <traceback object at 0xbd5fc8>

所需输出:

0 NameError
1
2 Traceback (most recent call last):
  File "exception.py", line 6, in <module>
    raise NameError

PS:我知道可以使用追溯模块轻松完成此操作,但是我想在此了解sys.exc_info()[2]对象的用法。

I want to save the name of the error and the traceback details into a variable. Here’s is my attempt.

import sys
try:
    try:
        print x
    except Exception, ex:
        raise NameError
except Exception, er:
    print "0", sys.exc_info()[0]
    print "1", sys.exc_info()[1]
    print "2", sys.exc_info()[2]

Output:

0 <type 'exceptions.NameError'>
1 
2 <traceback object at 0xbd5fc8>

Desired Output:

0 NameError
1
2 Traceback (most recent call last):
  File "exception.py", line 6, in <module>
    raise NameError

P.S. I know this can be done easily using the traceback module, but I want to know usage of sys.exc_info()[2] object here.


回答 0

这是我的方法:

>>> import traceback
>>> try:
...   int('k')
... except:
...   var = traceback.format_exc()
... 
>>> print var
Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
ValueError: invalid literal for int() with base 10: 'k'

但是,您应该查看traceback文档,因为您可能会发现更合适的方法,这取决于您以后要如何处理变量…

This is how I do it:

>>> import traceback
>>> try:
...   int('k')
... except:
...   var = traceback.format_exc()
... 
>>> print var
Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
ValueError: invalid literal for int() with base 10: 'k'

You should however take a look at the traceback documentation, as you might find there more suitable methods, depending to how you want to process your variable afterwards…


回答 1

sys.exc_info()返回具有三个值(类型,值,回溯)的元组。

  1. 这里的类型获取正在处理的异常的异常类型
  2. 值是要传递给异常类的构造函数的参数
  3. traceback包含堆栈信息,例如发生异常的位置等。

例如,在以下程序中

try:

    a = 1/0

except Exception,e:

    exc_tuple = sys.exc_info()

现在,如果我们打印元组,则值将为this。

  1. exc_tuple [0]的值将为“ ZeroDivisionError
  2. exc_tuple [1]的值将是“ 整数除法或以零 ”(作为参数传递给异常类的字符串)
  3. exc_tuple [2]的值将是“ (某些内存地址)的引用对象

也可以通过简单地以字符串格式打印异常来获取上述详细信息。

print str(e)

sys.exc_info() returns a tuple with three values (type, value, traceback).

  1. Here type gets the exception type of the Exception being handled
  2. value is the arguments that are being passed to constructor of exception class
  3. traceback contains the stack information like where the exception occurred etc.

For Example, In the following program

try:

    a = 1/0

except Exception,e:

    exc_tuple = sys.exc_info()

Now If we print the tuple the values will be this.

  1. exc_tuple[0] value will be “ZeroDivisionError
  2. exc_tuple[1] value will be “integer division or modulo by zero” (String passed as parameter to the exception class)
  3. exc_tuple[2] value will be “trackback object at (some memory address)

The above details can also be fetched by simply printing the exception in string format.

print str(e)

回答 2

使用traceback.extract_stack(),如果你想模块和函数名和行号方便。

''.join(traceback.format_stack())如果只需要一个看起来像traceback.print_stack()输出的字符串,请使用。

请注意,即使''.join()你会得到一个多行字符串,因为的元素format_stack()包含\n。请参见下面的输出。

记住要import traceback

这是的输出traceback.extract_stack()。添加了格式以提高可读性。

>>> traceback.extract_stack()
[
   ('<string>', 1, '<module>', None),
   ('C:\\Python\\lib\\idlelib\\run.py', 126, 'main', 'ret = method(*args, **kwargs)'),
   ('C:\\Python\\lib\\idlelib\\run.py', 353, 'runcode', 'exec(code, self.locals)'),
   ('<pyshell#1>', 1, '<module>', None)
]

这是的输出''.join(traceback.format_stack())。添加了格式以提高可读性。

>>> ''.join(traceback.format_stack())
'  File "<string>", line 1, in <module>\n
   File "C:\\Python\\lib\\idlelib\\run.py", line 126, in main\n
       ret = method(*args, **kwargs)\n
   File "C:\\Python\\lib\\idlelib\\run.py", line 353, in runcode\n
       exec(code, self.locals)\n  File "<pyshell#2>", line 1, in <module>\n'

Use traceback.extract_stack() if you want convenient access to module and function names and line numbers.

Use ''.join(traceback.format_stack()) if you just want a string that looks like the traceback.print_stack() output.

Notice that even with ''.join() you will get a multi-line string, since the elements of format_stack() contain \n. See output below.

Remember to import traceback.

Here’s the output from traceback.extract_stack(). Formatting added for readability.

>>> traceback.extract_stack()
[
   ('<string>', 1, '<module>', None),
   ('C:\\Python\\lib\\idlelib\\run.py', 126, 'main', 'ret = method(*args, **kwargs)'),
   ('C:\\Python\\lib\\idlelib\\run.py', 353, 'runcode', 'exec(code, self.locals)'),
   ('<pyshell#1>', 1, '<module>', None)
]

Here’s the output from ''.join(traceback.format_stack()). Formatting added for readability.

>>> ''.join(traceback.format_stack())
'  File "<string>", line 1, in <module>\n
   File "C:\\Python\\lib\\idlelib\\run.py", line 126, in main\n
       ret = method(*args, **kwargs)\n
   File "C:\\Python\\lib\\idlelib\\run.py", line 353, in runcode\n
       exec(code, self.locals)\n  File "<pyshell#2>", line 1, in <module>\n'

回答 3

当您从异常处理程序中取出异常对象或回溯对象时要小心,因为这会导致循环引用并且gc.collect()将无法收集。在ipython / jupyter笔记本环境中,这似乎是一个特殊的问题,在该环境中无法在正确的时间清除回溯对象,甚至对gc.collect()in finallysection 的显式调用也不起作用。这就是一个很大的问题,如果您有一些大的对象因此而无法回收它们的内存(例如,没有此解决方案的CUDA内存不足异常,则需要完整的内核重新启动才能恢复)。

通常,如果要保存回溯对象,则需要从对的引用中清除它locals(),如下所示:

import sys, traceback, gc
type, val, tb = None, None, None
try:
    myfunc()
except:
    type, val, tb = sys.exc_info()
    traceback.clear_frames(tb)
# some cleanup code
gc.collect()
# and then use the tb:
if tb:
    raise type(val).with_traceback(tb)

对于jupyter notebook,您至少必须在异常处理程序中执行此操作:

try:
    myfunc()
except:
    type, val, tb = sys.exc_info()
    traceback.clear_frames(tb)
    raise type(val).with_traceback(tb)
finally:
    # cleanup code in here
    gc.collect()

经过python 3.7测试。

ps ipython或jupyter Notebook env的问题在于它具有%tb魔术功能,可以保存回溯并在以后的任何时候使用。结果locals(),参与回溯的所有帧中的任何帧都不会被释放,直到笔记本退出或另一个异常将覆盖先前存储的回溯。这是很成问题的。它不应存储没有清洗其框架的回溯。已在此处提交修订。

Be careful when you take the exception object or the traceback object out of the exception handler, since this causes circular references and gc.collect() will fail to collect. This appears to be of a particular problem in the ipython/jupyter notebook environment where the traceback object doesn’t get cleared at the right time and even an explicit call to gc.collect() in finally section does nothing. And that’s a huge problem if you have some huge objects that don’t get their memory reclaimed because of that (e.g. CUDA out of memory exceptions that w/o this solution require a complete kernel restart to recover).

In general if you want to save the traceback object, you need to clear it from references to locals(), like so:

import sys, traceback, gc
type, val, tb = None, None, None
try:
    myfunc()
except:
    type, val, tb = sys.exc_info()
    traceback.clear_frames(tb)
# some cleanup code
gc.collect()
# and then use the tb:
if tb:
    raise type(val).with_traceback(tb)

In the case of jupyter notebook, you have to do that at the very least inside the exception handler:

try:
    myfunc()
except:
    type, val, tb = sys.exc_info()
    traceback.clear_frames(tb)
    raise type(val).with_traceback(tb)
finally:
    # cleanup code in here
    gc.collect()

Tested with python 3.7.

p.s. the problem with ipython or jupyter notebook env is that it has %tb magic which saves the traceback and makes it available at any point later. And as a result any locals() in all frames participating in the traceback will not be freed until the notebook exits or another exception will overwrite the previously stored backtrace. This is very problematic. It should not store the traceback w/o cleaning its frames. Fix submitted here.


回答 4

该对象可用作Exception.with_traceback()函数中的参数:

except Exception as e:
    tb = sys.exc_info()
    print(e.with_traceback(tb[2]))

The object can be used as a parameter in Exception.with_traceback() function:

except Exception as e:
    tb = sys.exc_info()
    print(e.with_traceback(tb[2]))

“最终”是否总是在Python中执行?

问题:“最终”是否总是在Python中执行?

对于Python中任何可能的try-finally块,是否保证finally将始终执行该块?

例如,假设我在一个except街区中返回:

try:
    1/0
except ZeroDivisionError:
    return
finally:
    print("Does this code run?")

或者,也许我重新提出一个Exception

try:
    1/0
except ZeroDivisionError:
    raise
finally:
    print("What about this code?")

测试表明finally上述示例确实可以执行,但我想我还没有想到其他场景。

在任何情况下,某个finally块可能无法在Python中执行?

For any possible try-finally block in Python, is it guaranteed that the finally block will always be executed?

For example, let’s say I return while in an except block:

try:
    1/0
except ZeroDivisionError:
    return
finally:
    print("Does this code run?")

Or maybe I re-raise an Exception:

try:
    1/0
except ZeroDivisionError:
    raise
finally:
    print("What about this code?")

Testing shows that finally does get executed for the above examples, but I imagine there are other scenarios I haven’t thought of.

Are there any scenarios in which a finally block can fail to execute in Python?


回答 0

“保证”一词比任何finally应得的实现都要强大得多。什么是保证的是,如果执行全部的流出tryfinally结构,它会通过finally这样做。无法保证执行将流出tryfinally

  • finally中一台生成器或异步协同程序可能永远不会运行,如果对象根本不会执行到结束。可能有很多方式发生。这是一个:

    def gen(text):
        try:
            for line in text:
                try:
                    yield int(line)
                except:
                    # Ignore blank lines - but catch too much!
                    pass
        finally:
            print('Doing important cleanup')
    
    text = ['1', '', '2', '', '3']
    
    if any(n > 1 for n in gen(text)):
        print('Found a number')
    
    print('Oops, no cleanup.')
    

    请注意,这个示例有些棘手:当生成器被垃圾回收时,Python尝试finally通过抛出GeneratorExit异常来运行该块,但是这里我们捕获了该异常,然后yield再次出现,此时Python打印警告(“生成器忽略了GeneratorExit ”)并放弃。有关详细信息,请参见PEP 342(通过增强型生成器的协程)

    生成器或协同程序可能不会执行到结束的其他方式包括:如果对象只是从来没有GC’ed(是的,这是可能的,即使在CPython的),或者如果async with awaitS IN __aexit__,或者如果对象awaitS或yieldS IN一个finally块。此列表并非详尽无遗。

  • finally如果所有非守护程序线程都首先退出,则守护程序线程中的A 可能永远不会执行

  • os._exit将立即停止该进程而不执行finally块。

  • os.fork可能导致finally执行两次。如果您对共享资源的访问未正确同步,则可能会导致并发访问冲突(崩溃,停顿等),这不仅会发生两次常见的正常问题,还会导致并发访问冲突。

    由于multiprocessing在使用fork start方法(Unix上的默认设置)时使用fork-without-exec创建工作进程,然后os._exit在工作程序完成后调用工作程序,finally因此multiprocessing交互可能会出现问题(示例)。

  • C级分段故障将阻止finally块运行。
  • kill -SIGKILL将阻止finally块运行。SIGTERM并且SIGHUP也将阻止finally运行,除非你安装一个处理器来控制自己的关断块; 默认情况下,Python不处理SIGTERMSIGHUP
  • 中的异常finally会阻止清理完成。其中特别值得注意的情况是,如果用户点击控制-C 只是因为我们已经开始执行该finally块。Python将引发KeyboardInterrupt并跳过该finally块内容的每一行。(KeyboardInterrupt-safe代码很难编写)。
  • 如果计算机断电,或者休眠且无法唤醒,finally则无法运行数据块。

finally区块不是交易系统;它不提供原子性保证或任何种类的保证。这些示例中的一些可能看起来很明显,但是很容易忘记这种事情可能发生并且finally过于依赖。

“Guaranteed” is a much stronger word than any implementation of finally deserves. What is guaranteed is that if execution flows out of the whole tryfinally construct, it will pass through the finally to do so. What is not guaranteed is that execution will flow out of the tryfinally.

  • A finally in a generator or async coroutine might never run, if the object never executes to conclusion. There are a lot of ways that could happen; here’s one:

    def gen(text):
        try:
            for line in text:
                try:
                    yield int(line)
                except:
                    # Ignore blank lines - but catch too much!
                    pass
        finally:
            print('Doing important cleanup')
    
    text = ['1', '', '2', '', '3']
    
    if any(n > 1 for n in gen(text)):
        print('Found a number')
    
    print('Oops, no cleanup.')
    

    Note that this example is a bit tricky: when the generator is garbage collected, Python attempts to run the finally block by throwing in a GeneratorExit exception, but here we catch that exception and then yield again, at which point Python prints a warning (“generator ignored GeneratorExit”) and gives up. See PEP 342 (Coroutines via Enhanced Generators) for details.

    Other ways a generator or coroutine might not execute to conclusion include if the object is just never GC’ed (yes, that’s possible, even in CPython), or if an async with awaits in __aexit__, or if the object awaits or yields in a finally block. This list is not intended to be exhaustive.

  • A finally in a daemon thread might never execute if all non-daemon threads exit first.

  • os._exit will halt the process immediately without executing finally blocks.

  • os.fork may cause finally blocks to execute twice. As well as just the normal problems you’d expect from things happening twice, this could cause concurrent access conflicts (crashes, stalls, …) if access to shared resources is not correctly synchronized.

    Since multiprocessing uses fork-without-exec to create worker processes when using the fork start method (the default on Unix), and then calls os._exit in the worker once the worker’s job is done, finally and multiprocessing interaction can be problematic (example).

  • A C-level segmentation fault will prevent finally blocks from running.
  • kill -SIGKILL will prevent finally blocks from running. SIGTERM and SIGHUP will also prevent finally blocks from running unless you install a handler to control the shutdown yourself; by default, Python does not handle SIGTERM or SIGHUP.
  • An exception in finally can prevent cleanup from completing. One particularly noteworthy case is if the user hits control-C just as we’re starting to execute the finally block. Python will raise a KeyboardInterrupt and skip every line of the finally block’s contents. (KeyboardInterrupt-safe code is very hard to write).
  • If the computer loses power, or if it hibernates and doesn’t wake up, finally blocks won’t run.

The finally block is not a transaction system; it doesn’t provide atomicity guarantees or anything of the sort. Some of these examples might seem obvious, but it’s easy to forget such things can happen and rely on finally for too much.


回答 1

是。 最后总是胜利。

克服它的唯一方法是在finally:有机会执行之前停止执行(例如,使解释器崩溃,关闭计算机,永远暂停生成器)。

我想还有其他我没想到的情况。

您可能还没有想到以下几点:

def foo():
    # finally always wins
    try:
        return 1
    finally:
        return 2

def bar():
    # even if he has to eat an unhandled exception, finally wins
    try:
        raise Exception('boom')
    finally:
        return 'no boom'

根据您退出解释器的方式,有时您可以最终“取消”,但不是这样:

>>> import sys
>>> try:
...     sys.exit()
... finally:
...     print('finally wins!')
... 
finally wins!
$

使用不稳定的方法os._exit(在我看来,这属于“使解释器崩溃”的原因):

>>> import os
>>> try:
...     os._exit(1)
... finally:
...     print('finally!')
... 
$

我当前正在运行以下代码,以测试在宇宙热死之后,是否最终仍然可以执行:

try:
    while True:
       sleep(1)
finally:
    print('done')

但是,我仍在等待结果,因此请稍后再检查。

Yes. Finally always wins.

The only way to defeat it is to halt execution before finally: gets a chance to execute (e.g. crash the interpreter, turn off your computer, suspend a generator forever).

I imagine there are other scenarios I haven’t thought of.

Here are a couple more you may not have thought about:

def foo():
    # finally always wins
    try:
        return 1
    finally:
        return 2

def bar():
    # even if he has to eat an unhandled exception, finally wins
    try:
        raise Exception('boom')
    finally:
        return 'no boom'

Depending on how you quit the interpreter, sometimes you can “cancel” finally, but not like this:

>>> import sys
>>> try:
...     sys.exit()
... finally:
...     print('finally wins!')
... 
finally wins!
$

Using the precarious os._exit (this falls under “crash the interpreter” in my opinion):

>>> import os
>>> try:
...     os._exit(1)
... finally:
...     print('finally!')
... 
$

I’m currently running this code, to test if finally will still execute after the heat death of the universe:

try:
    while True:
       sleep(1)
finally:
    print('done')

However, I’m still waiting on the result, so check back here later.


回答 2

根据Python文档

无论以前发生了什么,一旦代码块完成并处理了所有引发的异常,便会执行final块。即使异常处理程序或else块中存在错误,并且引发了新的异常,final块中的代码仍将运行。

还应注意,如果有多个return语句,包括finally块中的一个语句,则finally块返回是唯一将执行的语句。

According to the Python documentation:

No matter what happened previously, the final-block is executed once the code block is complete and any raised exceptions handled. Even if there’s an error in an exception handler or the else-block and a new exception is raised, the code in the final-block is still run.

It should also be noted that if there are multiple return statements, including one in the finally block, then the finally block return is the only one that will execute.


回答 3

好吧,是的,不是。

可以保证的是,Python将始终尝试执行finally块。如果您从该块返回或引发未捕获的异常,则在实际返回或引发异常之前执行finally块。

(只要运行问题中的代码,您本可以控制自己的一切)

我能想象的唯一情况是在Python解释器本身崩溃(例如在C代码内部或由于断电)时,将不会执行finally块。

Well, yes and no.

What is guaranteed is that Python will always try to execute the finally block. In the case where you return from the block or raise an uncaught exception, the finally block is executed just before actually returning or raising the exception.

(what you could have controlled yourself by simply running the code in your question)

The only case I can imagine where the finally block will not be executed is when the Python interpretor itself crashes for example inside C code or because of power outage.


回答 4

我没有使用生成器功能就发现了这一点:

import multiprocessing
import time

def fun(arg):
  try:
    print("tried " + str(arg))
    time.sleep(arg)
  finally:
    print("finally cleaned up " + str(arg))
  return foo

list = [1, 2, 3]
multiprocessing.Pool().map(fun, list)

睡眠可以是可能运行时间不一致的任何代码。

这里出现的情况是,第一个完成的并行处理成功地离开了try块,但随后尝试从该函数返回一个在任何地方都未定义的值(foo),这会导致异常。该异常会杀死映射,而不允许其他进程到达其finally块。

另外,如果您bar = bazz在try块中的sleep()调用之后添加该行。然后,到达该行的第一个进程将引发异常(因为未定义bazz),这将导致运行其自己的finally块,但随后杀死该映射,从而导致其他try块消失而未到达其finally块,并且第一个过程也不到达其return语句。

这对于Python多处理意味着什么,即使哪一个进程都可能有异常,您也不能相信异常处理机制来清理所有进程中的资源。在多处理映射调用之外需要其他信号处理或管理资源。

I found this one without using a generator function:

import multiprocessing
import time

def fun(arg):
  try:
    print("tried " + str(arg))
    time.sleep(arg)
  finally:
    print("finally cleaned up " + str(arg))
  return foo

list = [1, 2, 3]
multiprocessing.Pool().map(fun, list)

The sleep can be any code that might run for inconsistent amounts of time.

What appears to be happening here is that the first parallel process to finish leaves the try block successfully, but then attempts to return from the function a value (foo) that hasn’t been defined anywhere, which causes an exception. That exception kills the map without allowing the other processes to reach their finally blocks.

Also, if you add the line bar = bazz just after the sleep() call in the try block. Then the first process to reach that line throws an exception (because bazz isn’t defined), which causes its own finally block to be run, but then kills the map, causing the other try blocks to disappear without reaching their finally blocks, and the first process not to reach its return statement, either.

What this means for Python multiprocessing is that you can’t trust the exception-handling mechanism to clean up resources in all processes if even one of the processes can have an exception. Additional signal handling or managing the resources outside the multiprocessing map call would be necessary.


回答 5

接受的答案的附录,只是为了帮助了解它的工作原理,并提供了一些示例:

  • 这个:

     try:
         1
     except:
         print 'except'
     finally:
         print 'finally'

    将输出

    最后

  •    try:
           1/0
       except:
           print 'except'
       finally:
           print 'finally'

    将输出

    除了
    最后

Addendum to the accepted answer, just to help to see how it works, with a few examples:

  • This:

     try:
         1
     except:
         print 'except'
     finally:
         print 'finally'
    

    will output

    finally

  •    try:
           1/0
       except:
           print 'except'
       finally:
           print 'finally'
    

    will output

    except
    finally


如何获取在Python中捕获的异常的名称?

问题:如何获取在Python中捕获的异常的名称?

如何获得在Python中引发的异常的名称?

例如,

try:
    foo = bar
except Exception as exception:
    name_of_exception = ???
    assert name_of_exception == 'NameError'
    print "Failed with exception [%s]" % name_of_exception

例如,我捕获了多个(或所有)异常,并想在错误消息中打印异常的名称。

How can I get the name of an exception that was raised in Python?

e.g.,

try:
    foo = bar
except Exception as exception:
    name_of_exception = ???
    assert name_of_exception == 'NameError'
    print "Failed with exception [%s]" % name_of_exception

For example, I am catching multiple (or all) exceptions, and want to print the name of the exception in an error message.


回答 0

以下是获取异常类名称的几种不同方法:

  1. type(exception).__name__
  2. exception.__class__.__name__
  3. exception.__class__.__qualname__

例如,

try:
    foo = bar
except Exception as exception:
    assert type(exception).__name__ == 'NameError'
    assert exception.__class__.__name__ == 'NameError'
    assert exception.__class__.__qualname__ == 'NameError'

Here are a few different ways to get the name of the class of the exception:

  1. type(exception).__name__
  2. exception.__class__.__name__
  3. exception.__class__.__qualname__

e.g.,

try:
    foo = bar
except Exception as exception:
    assert type(exception).__name__ == 'NameError'
    assert exception.__class__.__name__ == 'NameError'
    assert exception.__class__.__qualname__ == 'NameError'

回答 1

这行得通,但似乎必须有一种更简单,更直接的方法?

try:
    foo = bar
except Exception as exception:
    assert repr(exception) == '''NameError("name 'bar' is not defined",)'''
    name = repr(exception).split('(')[0]
    assert name == 'NameError'

This works, but it seems like there must be an easier, more direct way?

try:
    foo = bar
except Exception as exception:
    assert repr(exception) == '''NameError("name 'bar' is not defined",)'''
    name = repr(exception).split('(')[0]
    assert name == 'NameError'

回答 2

您也可以使用sys.exc_info()exc_info()返回3个值:类型,值,回溯。关于文档:https : //docs.python.org/3/library/sys.html#sys.exc_info

import sys

try:
    foo = bar
except Exception:
    exc_type, value, traceback = sys.exc_info()
    assert exc_type.__name__ == 'NameError'
    print "Failed with exception [%s]" % exc_type.__name__

You can also use sys.exc_info(). exc_info() returns 3 values: type, value, traceback. On documentation: https://docs.python.org/3/library/sys.html#sys.exc_info

import sys

try:
    foo = bar
except Exception:
    exc_type, value, traceback = sys.exc_info()
    assert exc_type.__name__ == 'NameError'
    print "Failed with exception [%s]" % exc_type.__name__

回答 3

如果您想要完全限定的类名(例如,sqlalchemy.exc.IntegrityError而不是仅使用IntegrityError),则可以使用下面的函数,该函数是我从MB对另一个问题的出色回答(我只是重命名了一些变量以适合自己的口味)而来:

def get_full_class_name(obj):
    module = obj.__class__.__module__
    if module is None or module == str.__class__.__module__:
        return obj.__class__.__name__
    return module + '.' + obj.__class__.__name__

例:

try:
    # <do something with sqlalchemy that angers the database>
except sqlalchemy.exc.SQLAlchemyError as e:
    print(get_full_class_name(e))

# sqlalchemy.exc.IntegrityError

If you want the fully qualified class name (e.g. sqlalchemy.exc.IntegrityError instead of just IntegrityError), you can use the function below, which I took from MB’s awesome answer to another question (I just renamed some variables to suit my tastes):

def get_full_class_name(obj):
    module = obj.__class__.__module__
    if module is None or module == str.__class__.__module__:
        return obj.__class__.__name__
    return module + '.' + obj.__class__.__name__

Example:

try:
    # <do something with sqlalchemy that angers the database>
except sqlalchemy.exc.SQLAlchemyError as e:
    print(get_full_class_name(e))

# sqlalchemy.exc.IntegrityError

回答 4

此处的其他答案非常适合用于探索目的,但是如果主要目标是记录异常(包括异常的名称),则可以考虑使用logging.exception而不是print?

The other answers here are great for exploration purposes, but if the primary goal is to log the exception (including the name of the exception), perhaps consider using logging.exception instead of print?


从异常对象中提取回溯信息

问题:从异常对象中提取回溯信息

给定一个Exception对象(来源不明),有没有办法获取其回溯?我有这样的代码:

def stuff():
   try:
       .....
       return useful
   except Exception as e:
       return e

result = stuff()
if isinstance(result, Exception):
    result.traceback <-- How?

获得异常后,如何从Exception对象中提取回溯?

Given an Exception object (of unknown origin) is there way to obtain its traceback? I have code like this:

def stuff():
   try:
       .....
       return useful
   except Exception as e:
       return e

result = stuff()
if isinstance(result, Exception):
    result.traceback <-- How?

How can I extract the traceback from the Exception object once I have it?


回答 0

这个问题的答案取决于您使用的Python版本。

在Python 3中

很简单:异常附带了一个__traceback__包含回溯的属性。此属性也是可写的,并且可以使用with_traceback异常方法方便地设置:

raise Exception("foo occurred").with_traceback(tracebackobj)

这些功能在raise文档中作了最少描述。

这部分答案应归功于Vyctor,后者首先发布了此信息。我之所以将其包含在此处,仅是因为此答案停留在顶部,并且Python 3变得越来越普遍。

在Python 2中

这很烦人。回溯的麻烦在于它们具有对堆栈框架的引用,而堆栈框架具有对回溯的引用,这些回溯具有对引用了…的堆栈框架的引用。这给垃圾收集器带来了问题。(感谢ecatmur首先指出这一点。)

解决此问题的一种好方法是在离开该子句后以手术方式中断循环except,这就是Python 3所做的。Python 2解决方案更加丑陋:为您提供了一个即席函数sys.exc_info(),该函数仅在 except 子句中有效。它返回一个包含异常,异常类型和当前正在处理的异常的回溯的元组。

因此,如果您在except子句中,则可以将的输出sys.exc_info()traceback模块一起使用来做各种有用的事情:

>>> import sys, traceback
>>> def raise_exception():
...     try:
...         raise Exception
...     except Exception:
...         ex_type, ex, tb = sys.exc_info()
...         traceback.print_tb(tb)
...     finally:
...         del tb
... 
>>> raise_exception()
  File "<stdin>", line 3, in raise_exception

但是,随着您的编辑表示,你正在试图获得该回溯,如果你的异常没有被处理的已打印,它之后已经被处理。这个问题要难得多。不幸的是,在不处理任何异常时sys.exc_info返回(None, None, None)。其他相关sys属性也无济于事。sys.exc_traceback不处理任何异常时不推荐使用且未定义;sys.last_traceback似乎很完美,但似乎仅在交互式会话中定义。

如果可以控制如何引发异常,则可以使用inspect自定义异常来存储某些信息。但是我不完全确定那将如何工作。

实话实说,捕获并返回异常是一件不寻常的事情。这可能表明您仍然需要进行重构。

The answer to this question depends on the version of Python you’re using.

In Python 3

It’s simple: exceptions come equipped with a __traceback__ attribute that contains the traceback. This attribute is also writable, and can be conveniently set using the with_traceback method of exceptions:

raise Exception("foo occurred").with_traceback(tracebackobj)

These features are minimally described as part of the raise documentation.

All credit for this part of the answer should go to Vyctor, who first posted this information. I’m including it here only because this answer is stuck at the top, and Python 3 is becoming more common.

In Python 2

It’s annoyingly complex. The trouble with tracebacks is that they have references to stack frames, and stack frames have references to the tracebacks that have references to stack frames that have references to… you get the idea. This causes problems for the garbage collector. (Thanks to ecatmur for first pointing this out.)

The nice way of solving this would be to surgically break the cycle after leaving the except clause, which is what Python 3 does. The Python 2 solution is much uglier: you are provided with an ad-hoc function,sys.exc_info(), which only works inside the except clause. It returns a tuple containing the exception, the exception type, and the traceback for whatever exception is currently being handled.

So if you are inside the except clause, you can use the output of sys.exc_info() along with the traceback module to do various useful things:

>>> import sys, traceback
>>> def raise_exception():
...     try:
...         raise Exception
...     except Exception:
...         ex_type, ex, tb = sys.exc_info()
...         traceback.print_tb(tb)
...     finally:
...         del tb
... 
>>> raise_exception()
  File "<stdin>", line 3, in raise_exception

But as your edit indicates, you’re trying to get the traceback that would have been printed if your exception had not been handled, after it has already been handled. That’s a much harder question. Unfortunately, sys.exc_info returns (None, None, None) when no exception is being handled. Other related sys attributes don’t help either. sys.exc_traceback is deprecated and undefined when no exception is being handled; sys.last_traceback seems perfect, but it appears only to be defined during interactive sessions.

If you can control how the exception is raised, you might be able to use inspect and a custom exception to store some of the information. But I’m not entirely sure how that would work.

To tell the truth, catching and returning an exception is kind of an unusual thing to do. This might be a sign that you need to refactor anyway.


回答 1

Python 3.0 [PEP 3109]开始,内置类Exception具有__traceback__包含的属性traceback object(对于Python 3.2.3):

>>> try:
...     raise Exception()
... except Exception as e:
...     tb = e.__traceback__
...
>>> tb
<traceback object at 0x00000000022A9208>

问题是,在谷歌搜索__traceback__一段时间后,我发现只有几篇文章,但是没有一篇描述您是否或为什么应该使用__traceback__

但是,针对的Python 3文档raise指出:

通常会在引发异常并将其附加__traceback__为可写属性的情况下自动创建回溯对象。

因此,我认为它应该被使用。

Since Python 3.0[PEP 3109] the built in class Exception has a __traceback__ attribute which contains a traceback object (with Python 3.2.3):

>>> try:
...     raise Exception()
... except Exception as e:
...     tb = e.__traceback__
...
>>> tb
<traceback object at 0x00000000022A9208>

The problem is that after Googling __traceback__ for a while I found only few articles but none of them describes whether or why you should (not) use __traceback__.

However, the Python 3 documentation for raise says that:

A traceback object is normally created automatically when an exception is raised and attached to it as the __traceback__ attribute, which is writable.

So I assume it’s meant to be used.


回答 2

一种从Python 3中的异常对象以字符串形式获取回溯的方法:

import traceback

# `e` is an exception object that you get from somewhere
traceback_str = ''.join(traceback.format_tb(e.__traceback__))

traceback.format_tb(...)返回字符串列表。''.join(...)将他们连接在一起。有关更多参考,请访问:https : //docs.python.org/3/library/traceback.html#traceback.format_tb

A way to get traceback as a string from an exception object in Python 3:

import traceback

# `e` is an exception object that you get from somewhere
traceback_str = ''.join(traceback.format_tb(e.__traceback__))

traceback.format_tb(...) returns a list of strings. ''.join(...) joins them together. For more reference, please visit: https://docs.python.org/3/library/traceback.html#traceback.format_tb


回答 3

顺便说一句,如果您希望像在终端上看到的那样真正获得完整的追溯,则需要这样做:

>>> try:
...     print(1/0)
... except Exception as e:
...     exc = e
...
>>> exc
ZeroDivisionError('division by zero')
>>> tb_str = traceback.format_exception(etype=type(exc), value=exc, tb=exc.__traceback__)
>>> tb_str
['Traceback (most recent call last):\n', '  File "<stdin>", line 2, in <module>\n', 'ZeroDivisionError: division by zero\n']
>>> print("".join(tb_str))
Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
ZeroDivisionError: division by zero

如果您使用format_tb上述答案,则建议您获得的信息较少:

>>> tb_str = "".join(traceback.format_tb(exc.__traceback__))
>>> print("".join(tb_str))
  File "<stdin>", line 2, in <module>

As an aside, if you want to actually get the full traceback as you would see it printed to your terminal, you want this:

>>> try:
...     print(1/0)
... except Exception as e:
...     exc = e
...
>>> exc
ZeroDivisionError('division by zero')
>>> tb_str = traceback.format_exception(etype=type(exc), value=exc, tb=exc.__traceback__)
>>> tb_str
['Traceback (most recent call last):\n', '  File "<stdin>", line 2, in <module>\n', 'ZeroDivisionError: division by zero\n']
>>> print("".join(tb_str))
Traceback (most recent call last):
  File "<stdin>", line 2, in <module>
ZeroDivisionError: division by zero

If you use format_tb as above answers suggest you’ll get less information:

>>> tb_str = "".join(traceback.format_tb(exc.__traceback__))
>>> print("".join(tb_str))
  File "<stdin>", line 2, in <module>

回答 4

有一个很好的理由是,回溯不存储在异常中。因为回溯保留对堆栈本地的引用,所以这将导致循环引用和(临时)内存泄漏,直到循环GC启动。(这就是为什么永远不要将回溯存储在局部变量中的原因。)

关于我,我唯一想到的就是您可以进行修补 stuff的全局变量,以便当它认为正在捕获时Exception,实际上是在捕获特殊类型,并且异常作为调用者传播给您:

module_containing_stuff.Exception = type("BogusException", (Exception,), {})
try:
    stuff()
except Exception:
    import sys
    print sys.exc_info()

There’s a very good reason the traceback is not stored in the exception; because the traceback holds references to its stack’s locals, this would result in a circular reference and (temporary) memory leak until the circular GC kicks in. (This is why you should never store the traceback in a local variable.)

About the only thing I can think of would be for you to monkeypatch stuff‘s globals so that when it thinks it’s catching Exception it’s actually catching a specialised type and the exception propagates to you as the caller:

module_containing_stuff.Exception = type("BogusException", (Exception,), {})
try:
    stuff()
except Exception:
    import sys
    print sys.exc_info()

我想exceptions处理“列表索引超出范围”。

问题:我想exceptions处理“列表索引超出范围”。

我正在使用BeautifulSoup并解析一些HTML。

我从每个HTML (使用for循环)中获取特定数据,并将该数据添加到特定列表中。

问题是,某些HTML具有不同的格式(它们中没有我想要的数据)

因此,我尝试使用异常处理并将值添加null到列表中(我应该这样做,因为数据顺序很重要。)

例如,我有一个类似的代码:

soup = BeautifulSoup(links)
dlist = soup.findAll('dd', 'title')
# I'm trying to find content between <dd class='title'> and </dd>
gotdata = dlist[1]
# and what i want is the 2nd content of those
newlist.append(gotdata)
# and I add that to a newlist

并且某些链接没有任何链接<dd class='title'>,所以我想要做的是将字符串添加null到列表中。

错误出现:

list index out of range.

我尝试做的是添加一些像这样的行:

if not dlist[1]:  
   newlist.append('null')
   continue

但这行不通。它仍然显示错误:

list index out of range.

我该怎么办?我应该使用异常处理吗?还是有更简单的方法?

有什么建议?任何帮助都将非常棒!

I am using BeautifulSoup and parsing some HTMLs.

I’m getting a certain data from each HTML (using for loop) and adding that data to a certain list.

The problem is, some of the HTMLs have different format (and they don’t have the data that I want in them).

So, I was trying to use exception handling and add value null to the list (I should do this since the sequence of data is important.)

For instance, I have a code like:

soup = BeautifulSoup(links)
dlist = soup.findAll('dd', 'title')
# I'm trying to find content between <dd class='title'> and </dd>
gotdata = dlist[1]
# and what i want is the 2nd content of those
newlist.append(gotdata)
# and I add that to a newlist

and some of the links don’t have any <dd class='title'>, so what I want to do is add string null to the list instead.

The error appears:

list index out of range.

What I have done tried is to add some lines like this:

if not dlist[1]:  
   newlist.append('null')
   continue

But it doesn’t work out. It still shows error:

list index out of range.

What should I do about this? Should I use exception handling? or is there any easier way?

Any suggestions? Any help would be really great!


回答 0

处理异常的方法是:

try:
    gotdata = dlist[1]
except IndexError:
    gotdata = 'null'

当然,你也可以检查len()dlist; 但是处理异常更为直观。

Handling the exception is the way to go:

try:
    gotdata = dlist[1]
except IndexError:
    gotdata = 'null'

Of course you could also check the len() of dlist; but handling the exception is more intuitive.


回答 1

您有两个选择;处理异常或测试长度:

if len(dlist) > 1:
    newlist.append(dlist[1])
    continue

要么

try:
    newlist.append(dlist[1])
except IndexError:
    pass
continue

如果经常没有第二项,则使用第一项;如果有时没有第二项,则使用第二项。

You have two options; either handle the exception or test the length:

if len(dlist) > 1:
    newlist.append(dlist[1])
    continue

or

try:
    newlist.append(dlist[1])
except IndexError:
    pass
continue

Use the first if there often is no second item, the second if there sometimes is no second item.


回答 2

三元就足够了。更改:

gotdata = dlist[1]

gotdata = dlist[1] if len(dlist) > 1 else 'null'

这是一种较短的表达方式

if len(dlist) > 1:
    gotdata = dlist[1]
else: 
    gotdata = 'null'

A ternary will suffice. change:

gotdata = dlist[1]

to

gotdata = dlist[1] if len(dlist) > 1 else 'null'

this is a shorter way of expressing

if len(dlist) > 1:
    gotdata = dlist[1]
else: 
    gotdata = 'null'

回答 3

引用ThiefMaster♦有时,我们会得到一个错误,其值指定为’\ n’或null并执行处理ValueError所需的错误:

处理异常是解决之道

try:
    gotdata = dlist[1]
except (IndexError, ValueError):
    gotdata = 'null'

Taking reference of ThiefMaster♦ sometimes we get an error with value given as ‘\n’ or null and perform for that required to handle ValueError:

Handling the exception is the way to go

try:
    gotdata = dlist[1]
except (IndexError, ValueError):
    gotdata = 'null'

回答 4

for i in range (1, len(list))
    try:
        print (list[i])

    except ValueError:
        print("Error Value.")
    except indexError:
        print("Erorr index")
    except :
        print('error ')
for i in range (1, len(list))
    try:
        print (list[i])

    except ValueError:
        print("Error Value.")
    except indexError:
        print("Erorr index")
    except :
        print('error ')

回答 5

对于任何对较短方式感兴趣的人:

gotdata = len(dlist)>1 and dlist[1] or 'null'

但是为了获得最佳性能,我建议使用False而不是’null’,那么单行测试就足够了:

gotdata = len(dlist)>1 and dlist[1]

For anyone interested in a shorter way:

gotdata = len(dlist)>1 and dlist[1] or 'null'

But for best performance, I suggest using False instead of 'null', then a one line test will suffice:

gotdata = len(dlist)>1 and dlist[1]

如何在Python中正确获取异常消息

问题:如何在Python中正确获取异常消息

从Python标准库的组件中获取异常消息的最佳方法是什么?

我注意到在某些情况下,您可以通过如下message字段获取它:

try:
  pass
except Exception as ex:
  print(ex.message)

但在某些情况下(例如在套接字错误的情况下),您必须执行以下操作:

try:
  pass
except socket.error as ex:
  print(ex)

我想知道是否有标准方法可以涵盖大多数情况?

What is the best way to get exceptions’ messages from components of standard library in Python?

I noticed that in some cases you can get it via message field like this:

try:
  pass
except Exception as ex:
  print(ex.message)

but in some cases (for example, in case of socket errors) you have to do something like this:

try:
  pass
except socket.error as ex:
  print(ex)

I wondered is there any standard way to cover most of these situations?


回答 0

如果您查看有关内置错误文档,则会看到大多数Exception类将其第一个参数分配为message属性。并非所有人都这样做。

值得注意的是,EnvironmentError(与子类IOErrorOSError)具有的第一自变量errno,第二的strerror。没有messagestrerror大致类似于通常的message

更一般而言,的子类Exception可以执行他们想要的任何操作。它们可能具有也可能没有message属性。将来的内置Exception可能没有message属性。Exception从第三方库或用户代码导入的任何子类都可能没有message属性。

我认为处理此问题的正确方法是,确定Exception要捕获的特定子类,然后仅捕获那些子类,而不是所有带有的except Exception子类,然后利用特定子类定义的任何属性(如果需要)。

如果您必须执行print某些操作,则我认为打印捕获到的内容Exception本身很可能会执行您想要的操作,无论它是否具有message属性。

您也可以像这样检查消息属性,但是我不会真正建议它,因为它看起来很混乱:

try:
    pass
except Exception as e:
    # Just print(e) is cleaner and more likely what you want,
    # but if you insist on printing message specifically whenever possible...
    if hasattr(e, 'message'):
        print(e.message)
    else:
        print(e)

If you look at the documentation for the built-in errors, you’ll see that most Exception classes assign their first argument as a message attribute. Not all of them do though.

Notably,EnvironmentError (with subclasses IOError and OSError) has a first argument of errno, second of strerror. There is no messagestrerror is roughly analogous to what would normally be a message.

More generally, subclasses of Exception can do whatever they want. They may or may not have a message attribute. Future built-in Exceptions may not have a message attribute. Any Exception subclass imported from third-party libraries or user code may not have a message attribute.

I think the proper way of handling this is to identify the specific Exception subclasses you want to catch, and then catch only those instead of everything with an except Exception, then utilize whatever attributes that specific subclass defines however you want.

If you must print something, I think that printing the caught Exception itself is most likely to do what you want, whether it has a message attribute or not.

You could also check for the message attribute if you wanted, like this, but I wouldn’t really suggest it as it just seems messy:

try:
    pass
except Exception as e:
    # Just print(e) is cleaner and more likely what you want,
    # but if you insist on printing message specifically whenever possible...
    if hasattr(e, 'message'):
        print(e.message)
    else:
        print(e)

回答 1

为了改善@artofwarfare提供的答案,这是我考虑的一种更整洁的方法来检查message属性并打印它或将Exception对象打印为后备。

try:
    pass 
except Exception as e:
    print getattr(e, 'message', repr(e))

调用repr是可选的,但我发现在某些用例中有必要。


更新#1:

@MadPhysicist发表评论之后,这证明了为什么repr可能需要调用。尝试在解释器中运行以下代码:

try:
    raise Exception 
except Exception as e:
    print(getattr(e, 'message', repr(e)))
    print(getattr(e, 'message', str(e)))

更新#2:

这是一个具有Python 2.7和3.5细节的演示:https : //gist.github.com/takwas/3b7a6edddef783f2abddffda1439f533

To improve on the answer provided by @artofwarfare, here is what I consider a neater way to check for the message attribute and print it or print the Exception object as a fallback.

try:
    pass 
except Exception as e:
    print getattr(e, 'message', repr(e))

The call to repr is optional, but I find it necessary in some use cases.


Update #1:

Following the comment by @MadPhysicist, here’s a proof of why the call to repr might be necessary. Try running the following code in your interpreter:

try:
    raise Exception 
except Exception as e:
    print(getattr(e, 'message', repr(e)))
    print(getattr(e, 'message', str(e)))

Update #2:

Here is a demo with specifics for Python 2.7 and 3.5: https://gist.github.com/takwas/3b7a6edddef783f2abddffda1439f533


回答 2

我有同样的问题。我认为最好的解决方案是使用log.exception,它将自动打印出堆栈跟踪和错误消息,例如:

try:
    pass
    log.info('Success')
except:
    log.exception('Failed')

I had the same problem. I think the best solution is to use log.exception, which will automatically print out stack trace and error message, such as:

try:
    pass
    log.info('Success')
except:
    log.exception('Failed')

回答 3

我也有同样的问题。对此进行深入研究,我发现Exception类具有一个args属性,该属性捕获了用于创建异常的参数。如果将except的异常范围缩小到一个子集,则应该能够确定它们的构造方式,从而确定哪个参数包含消息。

try:
   # do something that may raise an AuthException
except AuthException as ex:
   if ex.args[0] == "Authentication Timeout.":
      # handle timeout
   else:
      # generic handling

I too had the same problem. Digging into this I found that the Exception class has an args attribute, which captures the arguments that were used to create the exception. If you narrow the exceptions that except will catch to a subset, you should be able to determine how they were constructed, and thus which argument contains the message.

try:
   # do something that may raise an AuthException
except AuthException as ex:
   if ex.args[0] == "Authentication Timeout.":
      # handle timeout
   else:
      # generic handling

在Python中,如何将警告视为异常?

问题:在Python中,如何将警告视为异常?

我在python代码中使用的第三方库(用C编写)正在发出警告。我希望能够使用try except语法正确处理这些警告。有没有办法做到这一点?

A third-party library (written in C) that I use in my python code is issuing warnings. I want to be able to use the try except syntax to properly handle these warnings. Is there a way to do this?


回答 0

引用python手册(27.6.4。Testing Warnings):

import warnings

def fxn():
    warnings.warn("deprecated", DeprecationWarning)

with warnings.catch_warnings(record=True) as w:
    # Cause all warnings to always be triggered.
    warnings.simplefilter("always")
    # Trigger a warning.
    fxn()
    # Verify some things
    assert len(w) == 1
    assert issubclass(w[-1].category, DeprecationWarning)
    assert "deprecated" in str(w[-1].message)

To quote from the python handbook (27.6.4. Testing Warnings):

import warnings

def fxn():
    warnings.warn("deprecated", DeprecationWarning)

with warnings.catch_warnings(record=True) as w:
    # Cause all warnings to always be triggered.
    warnings.simplefilter("always")
    # Trigger a warning.
    fxn()
    # Verify some things
    assert len(w) == 1
    assert issubclass(w[-1].category, DeprecationWarning)
    assert "deprecated" in str(w[-1].message)

回答 1

要将警告作为错误处理,只需使用以下命令:

import warnings
warnings.filterwarnings("error")

之后,您将能够捕获与错误相同的警告,例如,它将起作用:

try:
    some_heavy_calculations()
except RuntimeWarning:
    import ipdb; ipdb.set_trace()

PS添加了此答案,因为注释中的最佳答案包含拼写错误:filterwarnigns而不是filterwarnings

To handle warnings as errors simply use this:

import warnings
warnings.filterwarnings("error")

After this you will be able to catch warnings same as errors, e.g. this will work:

try:
    some_heavy_calculations()
except RuntimeWarning:
    import ipdb; ipdb.set_trace()

P.S. Added this answer because the best answer in comments contains misspelling: filterwarnigns instead of filterwarnings.


回答 2

如果只希望脚本在警告上失败,则可以使用:

python -W error foobar.py

If you just want you script to fail on warnings you can invoke python with the -W argument:

python -W error foobar.py

回答 3

这是一个变体,可让您更清楚地了解如何仅使用自定义警告。

import warnings
with warnings.catch_warnings(record=True) as w:
    # Cause all warnings to always be triggered.
    warnings.simplefilter("always")

    # Call some code that triggers a custom warning.
    functionThatRaisesWarning()

    # ignore any non-custom warnings that may be in the list
    w = filter(lambda i: issubclass(i.category, UserWarning), w)

    if len(w):
        # do something with the first warning
        email_admins(w[0].message)

Here’s a variation that makes it clearer how to work with only your custom warnings.

import warnings
with warnings.catch_warnings(record=True) as w:
    # Cause all warnings to always be triggered.
    warnings.simplefilter("always")

    # Call some code that triggers a custom warning.
    functionThatRaisesWarning()

    # ignore any non-custom warnings that may be in the list
    w = filter(lambda i: issubclass(i.category, UserWarning), w)

    if len(w):
        # do something with the first warning
        email_admins(w[0].message)

回答 4

在某些情况下,您需要使用ctypes将警告变成错误。例如:

str(b'test')  # no error
import warnings
warnings.simplefilter('error', BytesWarning)
str(b'test')  # still no error
import ctypes
ctypes.c_int.in_dll(ctypes.pythonapi, 'Py_BytesWarningFlag').value = 2
str(b'test')  # this raises an error

In some cases, you need use ctypes to turn warnings into errors. For example:

str(b'test')  # no error
import warnings
warnings.simplefilter('error', BytesWarning)
str(b'test')  # still no error
import ctypes
ctypes.c_int.in_dll(ctypes.pythonapi, 'Py_BytesWarningFlag').value = 2
str(b'test')  # this raises an error

用其他类型和消息重新引发异常,保留现有信息

问题:用其他类型和消息重新引发异常,保留现有信息

我正在编写一个模块,并希望它可以引发的异常具有统一的异常层次结构(例如,从FooError抽象类继承所有foo模块的特定异常)。这使模块的用户可以捕获那些特定的异常,并在需要时进行区别处理。但是从模块引发的许多异常是由于其他一些异常而引发的;例如,由于文件上的OSError而导致某些任务失败。

我需要的是“包装”捕获的异常,使其具有不同的类型和消息,以便通过捕获异常的方式在传播层次结构中进一步获取信息。但是我不想丢失现有的类型,消息和堆栈跟踪;这对于尝试调试问题的人来说都是有用的信息。顶级异常处理程序是不好的,因为我正在尝试在异常传播到传播堆栈之前对其进行装饰,并且顶级处理程序为时已晚。

这可以通过foo从现有类型(例如class FooPermissionError(OSError, FooError))中派生模块的特定异常类型来部分解决,但这并没有使将现有异常实例包装为新类型或修改消息变得更加容易。

Python的PEP 3134 “异常链接和嵌入式回溯”讨论了Python 3.0中接受的“链接”异常对象更改,以指示在处理现有异常期间引发了新异常。

我想做的是相关的:我需要它在早期的Python版本中也能工作,我不需要链,而只需要多态。什么是正确的方法?

I’m writing a module and want to have a unified exception hierarchy for the exceptions that it can raise (e.g. inheriting from a FooError abstract class for all the foo module’s specific exceptions). This allows users of the module to catch those particular exceptions and handle them distinctly, if needed. But many of the exceptions raised from the module are raised because of some other exception; e.g. failing at some task because of an OSError on a file.

What I need is to “wrap” the exception caught such that it has a different type and message, so that information is available further up the propagation hierarchy by whatever catches the exception. But I don’t want to lose the existing type, message, and stack trace; that’s all useful information for someone trying to debug the problem. A top-level exception handler is no good, since I’m trying to decorate the exception before it makes its way further up the propagation stack, and the top-level handler is too late.

This is partly solved by deriving my module foo‘s specific exception types from the existing type (e.g. class FooPermissionError(OSError, FooError)), but that doesn’t make it any easier to wrap the existing exception instance in a new type, nor modify the message.

Python’s PEP 3134 “Exception Chaining and Embedded Tracebacks” discusses a change accepted in Python 3.0 for “chaining” exception objects, to indicate that a new exception was raised during the handling of an existing exception.

What I’m trying to do is related: I need it also working in earlier Python versions, and I need it not for chaining, but only for polymorphism. What is the right way to do this?


回答 0

Python 3引入了异常链接(如PEP 3134中所述)。这允许在引发异常时引用现有异常作为“原因”:

try:
    frobnicate()
except KeyError as exc:
    raise ValueError("Bad grape") from exc

因此,捕获的异常成为新异常的一部分(是“原因”),并且任何捕获新异常的代码均可使用。

通过使用此功能,__cause__可以设置属性。内置的异常处理程序还知道如何报告异常的“原因”和“上下文”以及回溯。


Python 2中,该用例似乎没有很好的答案(如Ian BickingNed Batchelder所述)。笨蛋

Python 3 introduced exception chaining (as described in PEP 3134). This allows, when raising an exception, to cite an existing exception as the “cause”:

try:
    frobnicate()
except KeyError as exc:
    raise ValueError("Bad grape") from exc

The caught exception (exc, a KeyError) thereby becomes part of (is the “cause of”) the new exception, a ValueError. The “cause” is available to whatever code catches the new exception.

By using this feature, the __cause__ attribute is set. The built-in exception handler also knows how to report the exception’s “cause” and “context” along with the traceback.


In Python 2, it appears this use case has no good answer (as described by Ian Bicking and Ned Batchelder). Bummer.


回答 1

您可以使用sys.exc_info()获取回溯,并使用回溯引发新的异常(如PEP所述)。如果要保留旧的类型和消息,则可以对异常进行保留,但这仅在捕获异常的任何东西都在寻找它时才有用。

例如

import sys

def failure():
    try: 1/0
    except ZeroDivisionError, e:
        type, value, traceback = sys.exc_info()
        raise ValueError, ("You did something wrong!", type, value), traceback

当然,这确实没有那么有用。如果是这样,我们将不需要该PEP。我不建议这样做。

You can use sys.exc_info() to get the traceback, and raise your new exception with said traceback (as the PEP mentions). If you want to preserve the old type and message, you can do so on the exception, but that’s only useful if whatever catches your exception looks for it.

For example

import sys

def failure():
    try: 1/0
    except ZeroDivisionError, e:
        type, value, traceback = sys.exc_info()
        raise ValueError, ("You did something wrong!", type, value), traceback

Of course, this is really not that useful. If it was, we wouldn’t need that PEP. I’d not recommend doing it.


回答 2

您可以创建自己的异常类型,扩展您捕获的任何异常

class NewException(CaughtException):
    def __init__(self, caught):
        self.caught = caught

try:
    ...
except CaughtException as e:
    ...
    raise NewException(e)

但是,大多数情况下,我认为捕获异常,处理raise异常以及原始异常(并保留回溯)或会更简单raise NewException()。如果我正在调用您的代码,并且收到了您的自定义异常之一,那么我希望您的代码已经处理了您必须捕获的任何异常。因此,我不需要自己访问它。

编辑:我发现这种分析方法可以引发您自己的异常并保留原始异常。没有漂亮的解决方案。

You could create your own exception type that extends whichever exception you’ve caught.

class NewException(CaughtException):
    def __init__(self, caught):
        self.caught = caught

try:
    ...
except CaughtException as e:
    ...
    raise NewException(e)

But most of the time, I think it would be simpler to catch the exception, handle it, and either raise the original exception (and preserve the traceback) or raise NewException(). If I were calling your code, and I received one of your custom exceptions, I’d expect that your code has already handled whatever exception you had to catch. Thus I don’t need to access it myself.

Edit: I found this analysis of ways to throw your own exception and keep the original exception. No pretty solutions.


回答 3

我还发现,很多时候我需要对出现的错误进行“包装”。

这既包含在函数范围内,有时也仅在函数内包含一些行。

创建了要用于decorator和的包装器context manager


实作

import inspect
from contextlib import contextmanager, ContextDecorator
import functools    

class wrap_exceptions(ContextDecorator):
    def __init__(self, wrapper_exc, *wrapped_exc):
        self.wrapper_exc = wrapper_exc
        self.wrapped_exc = wrapped_exc

    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_val, exc_tb):
        if not exc_type:
            return
        try:
            raise exc_val
        except self.wrapped_exc:
            raise self.wrapper_exc from exc_val

    def __gen_wrapper(self, f, *args, **kwargs):
        with self:
            for res in f(*args, **kwargs):
                yield res

    def __call__(self, f):
        @functools.wraps(f)
        def wrapper(*args, **kw):
            with self:
                if inspect.isgeneratorfunction(f):
                    return self.__gen_wrapper(f, *args, **kw)
                else:
                    return f(*args, **kw)
        return wrapper

用法示例

装饰工

@wrap_exceptions(MyError, IndexError)
def do():
   pass

调用do方法时,不必担心IndexErrorMyError

try:
   do()
except MyError as my_err:
   pass # handle error 

上下文管理器

def do2():
   print('do2')
   with wrap_exceptions(MyError, IndexError):
       do()

里面do2,在中context manager,如果IndexError被抬起,它将被包裹并抬起MyError

I also found that many times i need some “wrapping” to errors raised.

This included both in a function scope and sometimes wrap only some lines inside a function.

Created a wrapper to be used a decorator and context manager:


Implementation

import inspect
from contextlib import contextmanager, ContextDecorator
import functools    

class wrap_exceptions(ContextDecorator):
    def __init__(self, wrapper_exc, *wrapped_exc):
        self.wrapper_exc = wrapper_exc
        self.wrapped_exc = wrapped_exc

    def __enter__(self):
        pass

    def __exit__(self, exc_type, exc_val, exc_tb):
        if not exc_type:
            return
        try:
            raise exc_val
        except self.wrapped_exc:
            raise self.wrapper_exc from exc_val

    def __gen_wrapper(self, f, *args, **kwargs):
        with self:
            for res in f(*args, **kwargs):
                yield res

    def __call__(self, f):
        @functools.wraps(f)
        def wrapper(*args, **kw):
            with self:
                if inspect.isgeneratorfunction(f):
                    return self.__gen_wrapper(f, *args, **kw)
                else:
                    return f(*args, **kw)
        return wrapper

Usage examples

decorator

@wrap_exceptions(MyError, IndexError)
def do():
   pass

when calling do method, don’t worry about IndexError, just MyError

try:
   do()
except MyError as my_err:
   pass # handle error 

context manager

def do2():
   print('do2')
   with wrap_exceptions(MyError, IndexError):
       do()

inside do2, in the context manager, if IndexError is raised, it will be wrapped and raised MyError


回答 4

满足您需求的最直接的解决方案应该是:

try:
     upload(file_id)
except Exception as upload_error:
     error_msg = "Your upload failed! File: " + file_id
     raise RuntimeError(error_msg, upload_error)

这样,您以后就可以打印消息以及上载功能引发的特定错误

The most straighforward solution to your needs should be this:

try:
     upload(file_id)
except Exception as upload_error:
     error_msg = "Your upload failed! File: " + file_id
     raise RuntimeError(error_msg, upload_error)

In this way you can later print your message and the specific error throwed by the upload function


更好地“尝试”某些东西并捕获异常或测试是否有可能首先避免异常?

问题:更好地“尝试”某些东西并捕获异常或测试是否有可能首先避免异常?

我应该测试if某种东西是有效的还是只是try为了做它并捕获异常?

  • 有没有可靠的文档说首选方法?
  • 还有一种方法更pythonic吗?

例如,我应该:

if len(my_list) >= 4:
    x = my_list[3]
else:
    x = 'NO_ABC'

要么:

try:
    x = my_list[3]
except IndexError:
    x = 'NO_ABC'

一些想法…
PEP 20说:

错误绝不能默默传递。
除非明确地保持沉默。

应该使用a try而不是an if解释为无声传递的错误吗?如果是这样,您是否通过以这种方式使用它来明确使其静音,从而使其正常运行?


不是指只能以一种方式做事的情况;例如:

try:
    import foo
except ImportError:
    import baz

Should I test if something is valid or just try to do it and catch the exception?

  • Is there any solid documentation saying that one way is preferred?
  • Is one way more pythonic?

For example, should I:

if len(my_list) >= 4:
    x = my_list[3]
else:
    x = 'NO_ABC'

Or:

try:
    x = my_list[3]
except IndexError:
    x = 'NO_ABC'

Some thoughts…
PEP 20 says:

Errors should never pass silently.
Unless explicitly silenced.

Should using a try instead of an if be interpreted as an error passing silently? And if so, are you explicitly silencing it by using it in this way, therefore making it OK?


I’m not referring to situations where you can only do things 1 way; for example:

try:
    import foo
except ImportError:
    import baz

回答 0

你应该更喜欢try/exceptif/else如果结果

  • 加快速度(例如,通过防止额外的查询)
  • 更清晰的代码(行数更少/更易于阅读)

通常,它们并存。


加速

如果尝试通过以下方式在长列表中查找元素:

try:
    x = my_list[index]
except IndexError:
    x = 'NO_ABC'

index可能在列表中并且通常不引发IndexError 时,尝试除外是最好的选择。这样,您就可以避免进行额外的查询if index < len(my_list)

Python鼓励使用异常,可以使用Dive Into Python中的短语来处理异常。您的示例不仅(优美地)处理异常,而不是让其静默通过,而且仅在未找到索引的特殊情况下才发生异常(因此,单词异常!)。


清洁代码

Python的官方文档中提到了EAFP比获得许可更容易获得宽恕Rob Knight指出捕获错误而不是避免错误可以使代码简洁,更易于阅读。他的示例如下所示:

更差(LBYL“跳前先看”)

#check whether int conversion will raise an error
if not isinstance(s, str) or not s.isdigit():
    return None
elif len(s) > 10:    #too many digits for int conversion
    return None
else:
    return int(s)

更好(EAFP:寻求宽恕比获得许可更容易)

try:
    return int(s)
except (TypeError, ValueError, OverflowError): #int conversion failed
    return None

You should prefer try/except over if/else if that results in

  • speed-ups (for example by preventing extra lookups)
  • cleaner code (fewer lines/easier to read)

Often, these go hand-in-hand.


speed-ups

In the case of trying to find an element in a long list by:

try:
    x = my_list[index]
except IndexError:
    x = 'NO_ABC'

the try, except is the best option when the index is probably in the list and the IndexError is usually not raised. This way you avoid the need for an extra lookup by if index < len(my_list).

Python encourages the use of exceptions, which you handle is a phrase from Dive Into Python. Your example not only handles the exception (gracefully), rather than letting it silently pass, also the exception occurs only in the exceptional case of index not being found (hence the word exception!).


cleaner code

The official Python Documentation mentions EAFP: Easier to ask for forgiveness than permission and Rob Knight notes that catching errors rather than avoiding them, can result in cleaner, easier to read code. His example says it like this:

Worse (LBYL ‘look before you leap’):

#check whether int conversion will raise an error
if not isinstance(s, str) or not s.isdigit():
    return None
elif len(s) > 10:    #too many digits for int conversion
    return None
else:
    return int(s)

Better (EAFP: Easier to ask for forgiveness than permission):

try:
    return int(s)
except (TypeError, ValueError, OverflowError): #int conversion failed
    return None

回答 1

在这种情况下,您应该完全使用其他方法:

x = myDict.get("ABC", "NO_ABC")

不过,通常来说:如果您希望测试经常失败,请使用if。如果测试相对于尝试操作并失败则捕获异常而言代价高昂,请使用try。如果以上条件均不适用,则更容易阅读。

In this particular case, you should use something else entirely:

x = myDict.get("ABC", "NO_ABC")

In general, though: If you expect the test to fail frequently, use if. If the test is expensive relative to just trying the operation and catching the exception if it fails, use try. If neither one of these conditions applies, go with whatever reads easier.


回答 2

使用tryexcept直接,而不是内侧if后卫应该始终是否有竞争条件的可能性来完成。例如,如果要确保目录存在,请不要执行以下操作:

import os, sys
if not os.path.isdir('foo'):
  try:
    os.mkdir('foo')
  except OSError, e
    print e
    sys.exit(1)

如果另一个线程或进程在isdir和之间创建目录,则将mkdir退出。相反,请执行以下操作:

import os, sys, errno
try:
  os.mkdir('foo')
except OSError, e
  if e.errno != errno.EEXIST:
    print e
    sys.exit(1)

仅当无法创建’foo’目录时,该命令才会退出。

Using try and except directly rather than inside an if guard should always be done if there is any possibility of a race condition. For example, if you want to ensure that a directory exists, do not do this:

import os, sys
if not os.path.isdir('foo'):
  try:
    os.mkdir('foo')
  except OSError, e
    print e
    sys.exit(1)

If another thread or process creates the directory between isdir and mkdir, you’ll exit. Instead, do this:

import os, sys, errno
try:
  os.mkdir('foo')
except OSError, e
  if e.errno != errno.EEXIST:
    print e
    sys.exit(1)

That will only exit if the ‘foo’ directory can’t be created.


回答 3

如果在进行某些操作之前先检查一下是否会失败,那么您可能应该赞成这样做。毕竟,构造异常(包括相关的回溯)需要花费时间。

异常应用于:

  1. 出乎意料的事情,或者…
  2. 您需要跳到不只一个逻辑层次的事情(例如a break不能使您走得太远),或者…
  3. 您不确切知道该如何提前处理异常的事情,或者…
  4. 提前检查故障的成本很高(相对于尝试操作而言)

请注意,通常,真正的答案是“都不是”-例如,在第一个示例中,您真正应该做的只是.get()提供默认值:

x = myDict.get('ABC', 'NO_ABC')

If it’s trivial to check whether something will fail before you do it, you should probably favor that. After all, constructing exceptions (including their associated tracebacks) takes time.

Exceptions should be used for:

  1. things that are unexpected, or…
  2. things where you need to jump more than one level of logic (e.g. where a break doesn’t get you far enough), or…
  3. things where you don’t know exactly what is going to be handling the exception ahead of time, or…
  4. things where checking ahead of time for failure is expensive (relative to just attempting the operation)

Note that oftentimes, the real answer is “neither” – for instance, in your first example, what you really should do is just use .get() to provide a default:

x = myDict.get('ABC', 'NO_ABC')

回答 4

正如其他职位所提到的,这取决于情况。使用try / except代替预先检查数据的有效性存在一些危险,尤其是在较大的项目中使用时。

  • 在try块中的代码可能有机会在捕获异常之前进行各种破坏-如果您事先使用if语句主动进行检查,则可以避免这种情况。
  • 如果在try块中调用的代码引发了一个常见的异常类型(如TypeError或ValueError),则您实际上可能没有捕获到您期望捕获的相同异常-可能是其他原因导致甚至在进入之前或之后引发相同的异常类可能引发异常的行。

例如,假设您有:

try:
    x = my_list[index_list[3]]
except IndexError:
    x = 'NO_ABC'

IndexError没有任何关于尝试获取index_list或my_list元素时是否发生的信息。

As the other posts mention, it depends on the situation. There are a few dangers with using try/except in place of checking the validity of your data in advance, especially when using it on bigger projects.

  • The code in the try block may have a chance to wreak all sorts of havoc before the exception is caught – if you proactively check beforehand with an if statement you can avoid this.
  • If the code called in your try block raises a common exception type, like TypeError or ValueError, you may not actually catch the same exception you were expecting to catch – it may be something else that raise the same exception class before or after even getting to the line where your exception may be raised.

e.g., suppose you had:

try:
    x = my_list[index_list[3]]
except IndexError:
    x = 'NO_ABC'

The IndexError says nothing about whether it occurred when trying to get an element of index_list or my_list.


回答 5

应该使用try而不是if来解释为无声传递的错误吗?如果是这样,您是否通过以这种方式使用它来明确使其静音,从而使其正常运行?

使用try表示可能会通过错误,这与使其静默通过相反。使用except导致它根本不通过。

try: except:if: else:逻辑更为复杂的情况下,首选使用。简单胜于复杂。复杂胜于复杂;要求宽恕比允许容易。

警告:“错误永远都不能静默传递”,是代码可能引发您所知道的异常,并且您的设计承认存在这种可能性的情况,但您并未以处理异常的方式进行设计。在我看来,明确地消除错误将像passexcept块中那样进行,仅应在了解“不做任何事情”确实是特定情况下的正确错误处理的情况下进行操作。(这是我真正需要使用编写良好的代码进行注释的少数几次。)

但是,在您的特定示例中,都不适合:

x = myDict.get('ABC', 'NO_ABC')

每个人都指出这一点的原因-即使您承认您希望总体上理解并且无法提出更好的例子-是在很多情况下实际上存在等效的避让,而寻找它们是解决问题的第一步。

Should using a try instead of an if be interpreted as an error passing silently? And if so, are you explicitly silencing it by using it in this way, therefore making it OK?

Using try is acknowledging that an error may pass, which is the opposite of having it pass silently. Using except is causing it not to pass at all.

Using try: except: is preferred in cases where if: else: logic is more complicated. Simple is better than complex; complex is better than complicated; and it’s easier to ask for forgiveness than permission.

What “errors should never pass silently” is warning about, is the case where code could raise an exception that you know about, and where your design admits the possibility, but you haven’t designed in a way to deal with the exception. Explicitly silencing an error, in my view, would be doing something like pass in an except block, which should only be done with an understanding that “doing nothing” really is the correct error handling in the particular situation. (This is one of the few times where I feel like a comment in well-written code is probably really needed.)

However, in your particular example, neither is appropriate:

x = myDict.get('ABC', 'NO_ABC')

The reason everyone is pointing this out – even though you acknowledge your desire to understand in general, and inability to come up with a better example – is that equivalent side-steps actually exist in quite a lot of cases, and looking for them is the first step in solving the problem.


回答 6

每当try/except用于控制流时,请问自己:

  1. 是否容易看到该try块何时成功以及何时失败?
  2. 您是否知道该区块内的所有副作用try
  3. 您是否知道该块引发异常的所有情况try
  4. 如果该try块的实现发生更改,您的控制流是否仍将按预期运行?

如果对这些问题中的一个或多个的回答为“否”,则可能会有很多宽容的要求。最有可能来自您未来的自我。


一个例子。我最近在一个更大的项目中看到了如下代码:

try:
    y = foo(x)
except ProgrammingError:
    y = bar(x)

与程序员交谈后,发现预期的控制流程为:

如果x是整数,则y = foo(x)。

如果x是整数列表,则y = bar(x)。

之所以foo可行,是因为进行了数据库查询,如果x为整数,则查询将成功,如果为列表,ProgrammingError则将抛出if x

try/except在这里使用是一个不好的选择:

  1. 异常的名称ProgrammingError不会给出实际的问题(x不是整数),这使得很难看到发生了什么。
  2. ProgrammingError数据库调用,浪费时间内上升。如果事实证明是foo在引发异常之前将某些内容写入数据库或更改了其他系统的状态,那么事情将变得非常可怕。
  3. 尚不清楚是否ProgrammingError仅在x整数列表时才引发。例如,假设foo的数据库查询中有错字。这可能还会引发一个ProgrammingError。结果是,bar(x)x是整数时,现在也称为。这可能会引发神秘异常或产生不可预见的结果。
  4. try/except块为的所有未来实现增加了要求foo。每当我们进行更改时foo,我们现在都必须考虑它如何处理列表,并确保它引发一个错误,ProgrammingError而不是一个AttributeError或根本不引发一个错误。

Whenever you use try/except for control flow, ask yourself:

  1. Is it easy to see when the try block succeeds and when it fails?
  2. Are you aware of all side effects inside the try block?
  3. Are you aware of all cases in which the try block throws the exception?
  4. If the implementation of the try block changes, will your control flow still behave as expected?

If the answer to one or more of these questions is ‘no’, there might be a lot of forgiveness to ask for; most likely from your future self.


An example. I recently saw code in a larger project that looked like this:

try:
    y = foo(x)
except ProgrammingError:
    y = bar(x)

Talking to the programmer it turned that the intended control flow was:

If x is an integer, do y = foo(x).

If x is a list of integers, do y = bar(x).

This worked because foo made a database query and the query would be successful if x was an integer and throw a ProgrammingError if x was a list.

Using try/except is a bad choice here:

  1. The name of the exception, ProgrammingError, does not give away the actual problem (that x is not an integer), which makes it difficult to see what is going on.
  2. The ProgrammingError is raised during a database call, which wastes time. Things would get truly horrible if it turned out that foo writes something to the database before it throws an exception, or alters the state of some other system.
  3. It is unclear if ProgrammingError is only raised when x is a list of integers. Suppose for instance that there is a typo in foo‘s database query. This might also raise a ProgrammingError. The consequence is that bar(x) is now also called when x is an integer. This might raise cryptic exceptions or produce unforeseeable results.
  4. The try/except block adds a requirement to all future implementations of foo. Whenever we change foo, we must now think about how it handles lists and make sure that it throws a ProgrammingError and not, say, an AttributeError or no error at all.

回答 7

对于一般含义,您可以考虑阅读Python中的成语和反成语:异常

在您的特定情况下,如其他人所述,您应该使用dict.get()

get(key [,默认])

如果key在字典中,则返回key的值,否则返回默认值。如果未提供default,则默认为None,因此此方法永远不会引发KeyError。

For a general meaning, you may consider reading Idioms and Anti-Idioms in Python: Exceptions.

In your particular case, as others stated, you should use dict.get():

get(key[, default])

Return the value for key if key is in the dictionary, else default. If default is not given, it defaults to None, so that this method never raises a KeyError.


在Python的调用者线程中捕获线程的异常

问题:在Python的调用者线程中捕获线程的异常

我对Python和多线程编程非常陌生。基本上,我有一个脚本可以将文件复制到另一个位置。我希望将其放置在另一个线程中,以便可以输出....以指示脚本仍在运行。

我遇到的问题是,如果无法复制文件,它将引发异常。如果在主线程中运行,这没关系;但是,具有以下代码不起作用:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

在线程类本身中,我试图重新引发异常,但是它不起作用。我已经看到这里的人问类似的问题,但是他们似乎都在做比我想做的事情更具体的事情(而且我不太了解所提供的解决方案)。我见过有人提到的用法sys.exc_info(),但是我不知道在哪里或如何使用它。

非常感谢所有帮助!

编辑:线程类的代码如下:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder

    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise

I’m very new to Python and multithreaded programming in general. Basically, I have a script that will copy files to another location. I would like this to be placed in another thread so I can output .... to indicate that the script is still running.

The problem that I am having is that if the files cannot be copied it will throw an exception. This is ok if running in the main thread; however, having the following code does not work:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

In the thread class itself, I tried to re-throw the exception, but it does not work. I have seen people on here ask similar questions, but they all seem to be doing something more specific than what I am trying to do (and I don’t quite understand the solutions offered). I have seen people mention the usage of sys.exc_info(), however I do not know where or how to use it.

All help is greatly appreciated!

EDIT: The code for the thread class is below:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder

    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise

回答 0

问题是thread_obj.start()立即返回。您产生的子线程在其自己的上下文中执行,并带有自己的堆栈。在那里发生的任何异常都在子线程的上下文中,并且在其自己的堆栈中。我现在可以想到的一种将此信息传达给父线程的方法是使用某种消息传递,因此您可能会对此进行研究。

尝试以下尺寸:

import sys
import threading
import Queue


class ExcThread(threading.Thread):

    def __init__(self, bucket):
        threading.Thread.__init__(self)
        self.bucket = bucket

    def run(self):
        try:
            raise Exception('An error occured here.')
        except Exception:
            self.bucket.put(sys.exc_info())


def main():
    bucket = Queue.Queue()
    thread_obj = ExcThread(bucket)
    thread_obj.start()

    while True:
        try:
            exc = bucket.get(block=False)
        except Queue.Empty:
            pass
        else:
            exc_type, exc_obj, exc_trace = exc
            # deal with the exception
            print exc_type, exc_obj
            print exc_trace

        thread_obj.join(0.1)
        if thread_obj.isAlive():
            continue
        else:
            break


if __name__ == '__main__':
    main()

The problem is that thread_obj.start() returns immediately. The child thread that you spawned executes in its own context, with its own stack. Any exception that occurs there is in the context of the child thread, and it is in its own stack. One way I can think of right now to communicate this information to the parent thread is by using some sort of message passing, so you might look into that.

Try this on for size:

import sys
import threading
import Queue


class ExcThread(threading.Thread):

    def __init__(self, bucket):
        threading.Thread.__init__(self)
        self.bucket = bucket

    def run(self):
        try:
            raise Exception('An error occured here.')
        except Exception:
            self.bucket.put(sys.exc_info())


def main():
    bucket = Queue.Queue()
    thread_obj = ExcThread(bucket)
    thread_obj.start()

    while True:
        try:
            exc = bucket.get(block=False)
        except Queue.Empty:
            pass
        else:
            exc_type, exc_obj, exc_trace = exc
            # deal with the exception
            print exc_type, exc_obj
            print exc_trace

        thread_obj.join(0.1)
        if thread_obj.isAlive():
            continue
        else:
            break


if __name__ == '__main__':
    main()

回答 1

通过该concurrent.futures模块,可以轻松地在单独的线程(或进程)中进行工作并处理任何导致的异常:

import concurrent.futures
import shutil

def copytree_with_dots(src_path, dst_path):
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Execute the copy on a separate thread,
        # creating a future object to track progress.
        future = executor.submit(shutil.copytree, src_path, dst_path)

        while future.running():
            # Print pretty dots here.
            pass

        # Return the value returned by shutil.copytree(), None.
        # Raise any exceptions raised during the copy process.
        return future.result()

concurrent.futures包含在Python 3.2中,并且可以作为早期版本的向后移植futures模块

The concurrent.futures module makes it simple to do work in separate threads (or processes) and handle any resulting exceptions:

import concurrent.futures
import shutil

def copytree_with_dots(src_path, dst_path):
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Execute the copy on a separate thread,
        # creating a future object to track progress.
        future = executor.submit(shutil.copytree, src_path, dst_path)

        while future.running():
            # Print pretty dots here.
            pass

        # Return the value returned by shutil.copytree(), None.
        # Raise any exceptions raised during the copy process.
        return future.result()

concurrent.futures is included with Python 3.2, and is available as the backported futures module for earlier versions.


回答 2

这个问题有很多非常复杂的答案。我是否对此简化了,因为这对我来说大多数事情似乎已经足够。

from threading import Thread

class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            if hasattr(self, '_Thread__target'):
                # Thread uses name mangling prior to Python 3.
                self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            else:
                self.ret = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            self.exc = e

    def join(self):
        super(PropagatingThread, self).join()
        if self.exc:
            raise self.exc
        return self.ret

如果确定只能在一个或另一个版本的Python上运行,则可以将该run()方法缩减为仅损坏的版本(如果仅在3之前的Python版本上运行),或者只是干净的版本(如果您只能在以3开头的Python版本上运行)。

用法示例:

def f(*args, **kwargs):
    print(args)
    print(kwargs)
    raise Exception('I suck at this')

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()

当您加入时,您将看到另一个线程引发的异常。

如果仅使用sixPython 3或仅在Python 3上使用,则可以改善重新引发异常时所获得的堆栈跟踪信息。您可以将内部异常包装在新的外部异常中,而不是仅在连接时使用堆栈,并使用

six.raise_from(RuntimeError('Exception in thread'),self.exc)

要么

raise RuntimeError('Exception in thread') from self.exc

There are a lot of really weirdly complicated answers to this question. Am I oversimplifying this, because this seems sufficient for most things to me.

from threading import Thread

class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            if hasattr(self, '_Thread__target'):
                # Thread uses name mangling prior to Python 3.
                self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            else:
                self.ret = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            self.exc = e

    def join(self):
        super(PropagatingThread, self).join()
        if self.exc:
            raise self.exc
        return self.ret

If you’re certain you’ll only ever be running on one or the other version of Python, you could reduce the run() method down to just the mangled version (if you’ll only be running on versions of Python before 3), or just the clean version (if you’ll only be running on versions of Python starting with 3).

Example usage:

def f(*args, **kwargs):
    print(args)
    print(kwargs)
    raise Exception('I suck at this')

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()

And you’ll see the exception raised on the other thread when you join.

If you are using six or on Python 3 only, you can improve the stack trace information you get when the exception is re-raised. Instead of only the stack at the point of the join, you can wrap the inner exception in a new outer exception, and get both stack traces with

six.raise_from(RuntimeError('Exception in thread'),self.exc)

or

raise RuntimeError('Exception in thread') from self.exc

回答 3

尽管不可能直接捕获在不同线程中引发的异常,但是这里的代码可以透明地获取与该功能非常接近的内容。在等待线程完成其工作时,您的子线程必须ExThread代替该类的子类,threading.Thread并且父线程必须调用该child_thread.join_with_exception()方法,而不是调用该方法child_thread.join()

此实现的技术细节:当子线程引发异常时,它将通过a传递给Queue父线程,然后再次在父线程中引发。请注意,这种方法无需等待。

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except BaseException:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    def run_with_exception(self):
        thread_name = threading.current_thread().name
        raise MyException("An error in thread '{}'.".format(thread_name))

def main():
    t = MyThread()
    t.start()
    try:
        t.join_with_exception()
    except MyException as ex:
        thread_name = threading.current_thread().name
        print "Caught a MyException in thread '{}': {}".format(thread_name, ex)

if __name__ == '__main__':
    main()

Although it is not possible to directly catch an exception thrown in a different thread, here’s a code to quite transparently obtain something very close to this functionality. Your child thread must subclass the ExThread class instead of threading.Thread and the parent thread must call the child_thread.join_with_exception() method instead of child_thread.join() when waiting for the thread to finish its job.

Technical details of this implementation: when the child thread throws an exception, it is passed to the parent through a Queue and thrown again in the parent thread. Notice that there’s no busy waiting in this approach .

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except BaseException:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    def run_with_exception(self):
        thread_name = threading.current_thread().name
        raise MyException("An error in thread '{}'.".format(thread_name))

def main():
    t = MyThread()
    t.start()
    try:
        t.join_with_exception()
    except MyException as ex:
        thread_name = threading.current_thread().name
        print "Caught a MyException in thread '{}': {}".format(thread_name, ex)

if __name__ == '__main__':
    main()

回答 4

如果线程中发生异常,最好的方法是在期间在调用者线程中重新引发它join。您可以使用该sys.exc_info()函数获取有关当前正在处理的异常的信息。该信息可以简单地存储为线程对象的属性,直到join被调用为止,此时可以重新引发该信息。

请注意,Queue.Queue在这种简单的情况下,线程最多引发1个异常,在引发异常后立即完成,因此不需要(如其他答案中所建议的)。我们只需等待线程完成即可避免出现竞争情况。

例如,扩展ExcThread(在下面),覆盖excRun(而不是run)。

Python 2.x:

import threading

class ExcThread(threading.Thread):
  def excRun(self):
    pass

  def run(self):
    self.exc = None
    try:
      # Possibly throws an exception
      self.excRun()
    except:
      import sys
      self.exc = sys.exc_info()
      # Save details of the exception thrown but don't rethrow,
      # just complete the function

  def join(self):
    threading.Thread.join(self)
    if self.exc:
      msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
      new_exc = Exception(msg)
      raise new_exc.__class__, new_exc, self.exc[2]

Python 3.x:

的3参数形式raise在Python 3中消失了,因此将最后一行更改为:

raise new_exc.with_traceback(self.exc[2])

If an exception occurs in a thread, the best way is to re-raise it in the caller thread during join. You can get information about the exception currently being handled using the sys.exc_info() function. This information can simply be stored as a property of the thread object until join is called, at which point it can be re-raised.

Note that a Queue.Queue (as suggested in other answers) is not necessary in this simple case where the thread throws at most 1 exception and completes right after throwing an exception. We avoid race conditions by simply waiting for the thread to complete.

For example, extend ExcThread (below), overriding excRun (instead of run).

Python 2.x:

import threading

class ExcThread(threading.Thread):
  def excRun(self):
    pass

  def run(self):
    self.exc = None
    try:
      # Possibly throws an exception
      self.excRun()
    except:
      import sys
      self.exc = sys.exc_info()
      # Save details of the exception thrown but don't rethrow,
      # just complete the function

  def join(self):
    threading.Thread.join(self)
    if self.exc:
      msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
      new_exc = Exception(msg)
      raise new_exc.__class__, new_exc, self.exc[2]

Python 3.x:

The 3 argument form for raise is gone in Python 3, so change the last line to:

raise new_exc.with_traceback(self.exc[2])

回答 5

concurrent.futures.as_completed

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

以下解决方法:

  • 调用异常后立即返回主线程
  • 不需要额外的用户定义类,因为它不需要:
    • 明确的 Queue
    • 在工作线程周围添加一个else

资源:

#!/usr/bin/env python3

import concurrent.futures
import time

def func_that_raises(do_raise):
    for i in range(3):
        print(i)
        time.sleep(0.1)
    if do_raise:
        raise Exception()
    for i in range(3):
        print(i)
        time.sleep(0.1)

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    futures.append(executor.submit(func_that_raises, False))
    futures.append(executor.submit(func_that_raises, True))
    for future in concurrent.futures.as_completed(futures):
        print(repr(future.exception()))

可能的输出:

0
0
1
1
2
2
0
Exception()
1
2
None

不幸的是,由于一个交易失败,不可能杀死期货以取消其他交易:

如果您执行以下操作:

for future in concurrent.futures.as_completed(futures):
    if future.exception() is not None:
        raise future.exception()

然后with捕获它,并等待第二个线程完成后再继续。以下行为类似:

for future in concurrent.futures.as_completed(futures):
    future.result()

因为future.result()如果发生一个异常,则会重新引发异常。

如果您想退出整个Python过程,则可以使用os._exit(0),但是这可能意味着您需要重构。

具有完美异常语义的自定义类

我最终在以下方面为自己编写了一个完美的接口:限制一次运行的最大线程数的正确方法?部分“带有错误处理的队列示例”。该类旨在既方便,又使您可以完全控制提交和结果/错误处理。

在Python 3.6.7,Ubuntu 18.04上进行了测试。

concurrent.futures.as_completed

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

The following solution:

  • returns to the main thread immediately when an exception is called
  • requires no extra user defined classes because it does not need:
    • an explicit Queue
    • to add an except else around your work thread

Source:

#!/usr/bin/env python3

import concurrent.futures
import time

def func_that_raises(do_raise):
    for i in range(3):
        print(i)
        time.sleep(0.1)
    if do_raise:
        raise Exception()
    for i in range(3):
        print(i)
        time.sleep(0.1)

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    futures.append(executor.submit(func_that_raises, False))
    futures.append(executor.submit(func_that_raises, True))
    for future in concurrent.futures.as_completed(futures):
        print(repr(future.exception()))

Possible output:

0
0
1
1
2
2
0
Exception()
1
2
None

It is unfortunately not possible to kill futures to cancel the others as one fails:

If you do something like:

for future in concurrent.futures.as_completed(futures):
    if future.exception() is not None:
        raise future.exception()

then the with catches it, and waits for the second thread to finish before continuing. The following behaves similarly:

for future in concurrent.futures.as_completed(futures):
    future.result()

since future.result() re-raises the exception if one occurred.

If you want to quit the entire Python process, you might get away with os._exit(0), but this likely means you need a refactor.

Custom class with perfect exception semantics

I ended up coding the perfect interface for myself at: The right way to limit maximum number of threads running at once? section “Queue example with error handling”. That class aims to be both convenient, and give you total control over submission and result / error handling.

Tested on Python 3.6.7, Ubuntu 18.04.


回答 6

这是一个令人讨厌的小问题,我想提出自己的解决方案。我发现的其他一些解决方案(例如async.io)看起来很有希望,但也有一些黑匣子。队列/事件循环方法将您与特定实现联系在一起。但是,并发的期货源代码大约只有1000行,并且很容易理解。它使我可以轻松地解决我的问题:无需进行过多设置即可创建临时工作线程,并能够捕获主线程中的异常。

我的解决方案使用并发的期货API和线程API。它允许您创建一个工作线程,为您提供线程和未来。这样,您可以加入线程以等待结果:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

…或者您可以让工作人员在完成后仅发送回调:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

…或者您可以循环播放直到事件结束:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)

这是代码:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())

…以及测试功能:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')

This was a nasty little problem, and I’d like to throw my solution in. Some other solutions I found (async.io for example) looked promising but also presented a bit of a black box. The queue / event loop approach sort of ties you to a certain implementation. The concurrent futures source code, however, is around only 1000 lines, and easy to comprehend. It allowed me to easily solve my problem: create ad-hoc worker threads without much setup, and to be able to catch exceptions in the main thread.

My solution uses the concurrent futures API and threading API. It allows you to create a worker which gives you both the thread and the future. That way, you can join the thread to wait for the result:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

…or you can let the worker just send a callback when done:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

…or you can loop until the event completes:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)

Here’s the code:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())

…and the test function:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')

回答 7

作为Threading的入门者,我花了很长时间了解如何实现Mateusz Kobos的代码(上述)。这是一个澄清的版本,可帮助您了解如何使用它。

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except Exception:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    # This overrides the "run_with_exception" from class "ExThread"
    # Note, this is where the actual thread to be run lives. The thread
    # to be run could also call a method or be passed in as an object
    def run_with_exception(self):
        # Code will function until the int
        print "sleeping 5 seconds"
        import time
        for i in 1, 2, 3, 4, 5:
            print i
            time.sleep(1) 
        # Thread should break here
        int("str")
# I'm honestly not sure why these appear here? So, I removed them. 
# Perhaps Mateusz can clarify?        
#         thread_name = threading.current_thread().name
#         raise MyException("An error in thread '{}'.".format(thread_name))

if __name__ == '__main__':
    # The code lives in MyThread in this example. So creating the MyThread 
    # object set the code to be run (but does not start it yet)
    t = MyThread()
    # This actually starts the thread
    t.start()
    print
    print ("Notice 't.start()' is considered to have completed, although" 
           " the countdown continues in its new thread. So you code "
           "can tinue into new processing.")
    # Now that the thread is running, the join allows for monitoring of it
    try:
        t.join_with_exception()
    # should be able to be replace "Exception" with specific error (untested)
    except Exception, e: 
        print
        print "Exceptioon was caught and control passed back to the main thread"
        print "Do some handling here...or raise a custom exception "
        thread_name = threading.current_thread().name
        e = ("Caught a MyException in thread: '" + 
             str(thread_name) + 
             "' [" + str(e) + "]")
        raise Exception(e) # Or custom class of exception, such as MyException

As a noobie to Threading, it took me a long time to understand how to implement Mateusz Kobos’s code (above). Here’s a clarified version to help understand how to use it.

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except Exception:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    # This overrides the "run_with_exception" from class "ExThread"
    # Note, this is where the actual thread to be run lives. The thread
    # to be run could also call a method or be passed in as an object
    def run_with_exception(self):
        # Code will function until the int
        print "sleeping 5 seconds"
        import time
        for i in 1, 2, 3, 4, 5:
            print i
            time.sleep(1) 
        # Thread should break here
        int("str")
# I'm honestly not sure why these appear here? So, I removed them. 
# Perhaps Mateusz can clarify?        
#         thread_name = threading.current_thread().name
#         raise MyException("An error in thread '{}'.".format(thread_name))

if __name__ == '__main__':
    # The code lives in MyThread in this example. So creating the MyThread 
    # object set the code to be run (but does not start it yet)
    t = MyThread()
    # This actually starts the thread
    t.start()
    print
    print ("Notice 't.start()' is considered to have completed, although" 
           " the countdown continues in its new thread. So you code "
           "can tinue into new processing.")
    # Now that the thread is running, the join allows for monitoring of it
    try:
        t.join_with_exception()
    # should be able to be replace "Exception" with specific error (untested)
    except Exception, e: 
        print
        print "Exceptioon was caught and control passed back to the main thread"
        print "Do some handling here...or raise a custom exception "
        thread_name = threading.current_thread().name
        e = ("Caught a MyException in thread: '" + 
             str(thread_name) + 
             "' [" + str(e) + "]")
        raise Exception(e) # Or custom class of exception, such as MyException

回答 8

类似于RickardSjogren的没有Queue,sys等的方式,但是也没有一些信号侦听器:直接执行与except块相对应的异常处理程序。

#!/usr/bin/env python3

import threading

class ExceptionThread(threading.Thread):

    def __init__(self, callback=None, *args, **kwargs):
        """
        Redirect exceptions of thread to an exception handler.

        :param callback: function to handle occured exception
        :type callback: function(thread, exception)
        :param args: arguments for threading.Thread()
        :type args: tuple
        :param kwargs: keyword arguments for threading.Thread()
        :type kwargs: dict
        """
        self._callback = callback
        super().__init__(*args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except BaseException as e:
            if self._callback is None:
                raise e
            else:
                self._callback(self, e)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs, self._callback

仅self._callback和run()中的except块是普通threading.Thread的附加项。

Similar way like RickardSjogren’s without Queue, sys etc. but also without some listeners to signals: execute directly an exception handler which corresponds to an except block.

#!/usr/bin/env python3

import threading

class ExceptionThread(threading.Thread):

    def __init__(self, callback=None, *args, **kwargs):
        """
        Redirect exceptions of thread to an exception handler.

        :param callback: function to handle occured exception
        :type callback: function(thread, exception)
        :param args: arguments for threading.Thread()
        :type args: tuple
        :param kwargs: keyword arguments for threading.Thread()
        :type kwargs: dict
        """
        self._callback = callback
        super().__init__(*args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except BaseException as e:
            if self._callback is None:
                raise e
            else:
                self._callback(self, e)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs, self._callback

Only self._callback and the except block in run() is additional to normal threading.Thread.


回答 9

我知道我在这里参加聚会有点晚了,但是我遇到了一个非常类似的问题,但是它包括使用tkinter作为GUI,并且mainloop使得无法使用任何依赖.join()的解决方案。因此,我改编了原始问题的EDIT中给出的解决方案,但使其变得更笼统,以使其他人易于理解。

这是正在使用的新线程类:

import threading
import traceback
import logging


class ExceptionThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except Exception:
            logging.error(traceback.format_exc())


def test_function_1(input):
    raise IndexError(input)


if __name__ == "__main__":
    input = 'useful'

    t1 = ExceptionThread(target=test_function_1, args=[input])
    t1.start()

当然,从日志记录中,您总是可以用其他方法来处理异常,例如将其打印出来或将其输出到控制台。

这使您可以像完全使用Thread类一样使用ExceptionThread类,而无需进行任何特殊修改。

I know I’m a bit late to the party here but I was having a very similar problem but it included using tkinter as a GUI, and the mainloop made it impossible to use any of the solutions that depend on .join(). Therefore I adapted the solution given in the EDIT of the original question, but made it more general to make it easier to understand for others.

Here is the new thread class in action:

import threading
import traceback
import logging


class ExceptionThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except Exception:
            logging.error(traceback.format_exc())


def test_function_1(input):
    raise IndexError(input)


if __name__ == "__main__":
    input = 'useful'

    t1 = ExceptionThread(target=test_function_1, args=[input])
    t1.start()

Of course you can always have it handle the exception some other way from logging, such as printing it out, or having it output to the console.

This allows you to use the ExceptionThread class exactly like you would the Thread class, without any special modifications.


回答 10

我喜欢的一种方法是基于观察者模式。我定义了一个信号类,我的线程使用该信号类向侦听器发出异常。它也可以用于从线程返回值。例:

import threading

class Signal:
    def __init__(self):
        self._subscribers = list()

    def emit(self, *args, **kwargs):
        for func in self._subscribers:
            func(*args, **kwargs)

    def connect(self, func):
        self._subscribers.append(func)

    def disconnect(self, func):
        try:
            self._subscribers.remove(func)
        except ValueError:
            raise ValueError('Function {0} not removed from {1}'.format(func, self))


class WorkerThread(threading.Thread):

    def __init__(self, *args, **kwargs):
        super(WorkerThread, self).__init__(*args, **kwargs)
        self.Exception = Signal()
        self.Result = Signal()

    def run(self):
        if self._Thread__target is not None:
            try:
                self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            except Exception as e:
                self.Exception.emit(e)
            else:
                self.Result.emit(self._return_value)

if __name__ == '__main__':
    import time

    def handle_exception(exc):
        print exc.message

    def handle_result(res):
        print res

    def a():
        time.sleep(1)
        raise IOError('a failed')

    def b():
        time.sleep(2)
        return 'b returns'

    t = WorkerThread(target=a)
    t2 = WorkerThread(target=b)
    t.Exception.connect(handle_exception)
    t2.Result.connect(handle_result)
    t.start()
    t2.start()

    print 'Threads started'

    t.join()
    t2.join()
    print 'Done'

我没有使用线程的足够经验来宣称这是一种完全安全的方法。但这对我有用,我喜欢这种灵活性。

One method I am fond of is based on the observer pattern. I define a signal class which my thread uses to emit exceptions to listeners. It can also be used to return values from threads. Example:

import threading

class Signal:
    def __init__(self):
        self._subscribers = list()

    def emit(self, *args, **kwargs):
        for func in self._subscribers:
            func(*args, **kwargs)

    def connect(self, func):
        self._subscribers.append(func)

    def disconnect(self, func):
        try:
            self._subscribers.remove(func)
        except ValueError:
            raise ValueError('Function {0} not removed from {1}'.format(func, self))


class WorkerThread(threading.Thread):

    def __init__(self, *args, **kwargs):
        super(WorkerThread, self).__init__(*args, **kwargs)
        self.Exception = Signal()
        self.Result = Signal()

    def run(self):
        if self._Thread__target is not None:
            try:
                self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            except Exception as e:
                self.Exception.emit(e)
            else:
                self.Result.emit(self._return_value)

if __name__ == '__main__':
    import time

    def handle_exception(exc):
        print exc.message

    def handle_result(res):
        print res

    def a():
        time.sleep(1)
        raise IOError('a failed')

    def b():
        time.sleep(2)
        return 'b returns'

    t = WorkerThread(target=a)
    t2 = WorkerThread(target=b)
    t.Exception.connect(handle_exception)
    t2.Result.connect(handle_result)
    t.start()
    t2.start()

    print 'Threads started'

    t.join()
    t2.join()
    print 'Done'

I do not have enough experience of working with threads to claim that this is a completely safe method. But it has worked for me and I like the flexibility.


回答 11

使用裸露的exceptions不是一个好习惯,因为您通常会收获比讨价还价更多的东西。

我建议修改,except使其仅捕获您要处理的异常。我认为提高它并没有达到预期的效果,因为当您TheThread在外部实例化时try,如果它引发异常,则分配将永远不会发生。

相反,您可能只想提醒它并继续前进,例如:

def run(self):
    try:
       shul.copytree(self.sourceFolder, self.destFolder)
    except OSError, err:
       print err

然后,当该异常被捕获时,您可以在那里处理它。然后,当外部程序try从中捕获到异常时TheThread,您就会知道它不会是您已经处理过的异常,它将帮助您隔离处理流程。

Using naked excepts is not a good practice because you usually catch more than you bargain for.

I would suggest modifying the except to catch ONLY the exception that you would like to handle. I don’t think that raising it has the desired effect, because when you go to instantiate TheThread in the outer try, if it raises an exception, the assignment is never going to happen.

Instead you might want to just alert on it and move on, such as:

def run(self):
    try:
       shul.copytree(self.sourceFolder, self.destFolder)
    except OSError, err:
       print err

Then when that exception is caught, you can handle it there. Then when the outer try catches an exception from TheThread, you know it won’t be the one you already handled, and will help you isolate your process flow.


回答 12

捕获线程异常并将其传递回调用方方法的一种简单方法是将字典或列表传递给worker方法。

示例(将字典传递给worker方法):

import threading

def my_method(throw_me):
    raise Exception(throw_me)

def worker(shared_obj, *args, **kwargs):
    try:
        shared_obj['target'](*args, **kwargs)
    except Exception as err:
        shared_obj['err'] = err

shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"

th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()

if shared_obj['err']:
    print(">>%s" % shared_obj['err'])

A simple way of catching thread’s exception and communicating back to the caller method could be by passing dictionary or a list to worker method.

Example (passing dictionary to worker method):

import threading

def my_method(throw_me):
    raise Exception(throw_me)

def worker(shared_obj, *args, **kwargs):
    try:
        shared_obj['target'](*args, **kwargs)
    except Exception as err:
        shared_obj['err'] = err

shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"

th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()

if shared_obj['err']:
    print(">>%s" % shared_obj['err'])

回答 13

用异常存储包装线程。

import threading
import sys
class ExcThread(threading.Thread):

    def __init__(self, target, args = None):
        self.args = args if args else []
        self.target = target
        self.exc = None
        threading.Thread.__init__(self)

    def run(self):
        try:
            self.target(*self.args)
            raise Exception('An error occured here.')
        except Exception:
            self.exc=sys.exc_info()

def main():
    def hello(name):
        print(!"Hello, {name}!")
    thread_obj = ExcThread(target=hello, args=("Jack"))
    thread_obj.start()

    thread_obj.join()
    exc = thread_obj.exc
    if exc:
        exc_type, exc_obj, exc_trace = exc
        print(exc_type, ':',exc_obj, ":", exc_trace)

main()

Wrap Thread with exception storage.

import threading
import sys
class ExcThread(threading.Thread):

    def __init__(self, target, args = None):
        self.args = args if args else []
        self.target = target
        self.exc = None
        threading.Thread.__init__(self)

    def run(self):
        try:
            self.target(*self.args)
            raise Exception('An error occured here.')
        except Exception:
            self.exc=sys.exc_info()

def main():
    def hello(name):
        print(!"Hello, {name}!")
    thread_obj = ExcThread(target=hello, args=("Jack"))
    thread_obj.start()

    thread_obj.join()
    exc = thread_obj.exc
    if exc:
        exc_type, exc_obj, exc_trace = exc
        print(exc_type, ':',exc_obj, ":", exc_trace)

main()

回答 14

pygolang提供sync.WorkGroup,尤其是将异常从生成的工作线程传播到主线程。例如:

#!/usr/bin/env python
"""This program demostrates how with sync.WorkGroup an exception raised in
spawned thread is propagated into main thread which spawned the worker."""

from __future__ import print_function
from golang import sync, context

def T1(ctx, *argv):
    print('T1: run ... %r' % (argv,))
    raise RuntimeError('T1: problem')

def T2(ctx):
    print('T2: ran ok')

def main():
    wg = sync.WorkGroup(context.background())
    wg.go(T1, [1,2,3])
    wg.go(T2)

    try:
        wg.wait()
    except Exception as e:
        print('Tmain: caught exception: %r\n' %e)
        # reraising to see full traceback
        raise

if __name__ == '__main__':
    main()

运行时给出以下内容:

T1: run ... ([1, 2, 3],)
T2: ran ok
Tmain: caught exception: RuntimeError('T1: problem',)

Traceback (most recent call last):
  File "./x.py", line 28, in <module>
    main()
  File "./x.py", line 21, in main
    wg.wait()
  File "golang/_sync.pyx", line 198, in golang._sync.PyWorkGroup.wait
    pyerr_reraise(pyerr)
  File "golang/_sync.pyx", line 178, in golang._sync.PyWorkGroup.go.pyrunf
    f(pywg._pyctx, *argv, **kw)
  File "./x.py", line 10, in T1
    raise RuntimeError('T1: problem')
RuntimeError: T1: problem

该问题的原始代码就是:

    wg = sync.WorkGroup(context.background())

    def _(ctx):
        shul.copytree(sourceFolder, destFolder)
    wg.go(_)

    # waits for spawned worker to complete and, on error, reraises
    # its exception on the main thread.
    wg.wait()

pygolang provides sync.WorkGroup which, in particular, propagates exception from spawned worker threads to the main thread. For example:

#!/usr/bin/env python
"""This program demostrates how with sync.WorkGroup an exception raised in
spawned thread is propagated into main thread which spawned the worker."""

from __future__ import print_function
from golang import sync, context

def T1(ctx, *argv):
    print('T1: run ... %r' % (argv,))
    raise RuntimeError('T1: problem')

def T2(ctx):
    print('T2: ran ok')

def main():
    wg = sync.WorkGroup(context.background())
    wg.go(T1, [1,2,3])
    wg.go(T2)

    try:
        wg.wait()
    except Exception as e:
        print('Tmain: caught exception: %r\n' %e)
        # reraising to see full traceback
        raise

if __name__ == '__main__':
    main()

gives the following when run:

T1: run ... ([1, 2, 3],)
T2: ran ok
Tmain: caught exception: RuntimeError('T1: problem',)

Traceback (most recent call last):
  File "./x.py", line 28, in <module>
    main()
  File "./x.py", line 21, in main
    wg.wait()
  File "golang/_sync.pyx", line 198, in golang._sync.PyWorkGroup.wait
    pyerr_reraise(pyerr)
  File "golang/_sync.pyx", line 178, in golang._sync.PyWorkGroup.go.pyrunf
    f(pywg._pyctx, *argv, **kw)
  File "./x.py", line 10, in T1
    raise RuntimeError('T1: problem')
RuntimeError: T1: problem

The original code from the question would be just:

    wg = sync.WorkGroup(context.background())

    def _(ctx):
        shul.copytree(sourceFolder, destFolder)
    wg.go(_)

    # waits for spawned worker to complete and, on error, reraises
    # its exception on the main thread.
    wg.wait()