在Python的调用者线程中捕获线程的异常

问题:在Python的调用者线程中捕获线程的异常

我对Python和多线程编程非常陌生。基本上,我有一个脚本可以将文件复制到另一个位置。我希望将其放置在另一个线程中,以便可以输出....以指示脚本仍在运行。

我遇到的问题是,如果无法复制文件,它将引发异常。如果在主线程中运行,这没关系;但是,具有以下代码不起作用:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

在线程类本身中,我试图重新引发异常,但是它不起作用。我已经看到这里的人问类似的问题,但是他们似乎都在做比我想做的事情更具体的事情(而且我不太了解所提供的解决方案)。我见过有人提到的用法sys.exc_info(),但是我不知道在哪里或如何使用它。

非常感谢所有帮助!

编辑:线程类的代码如下:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder

    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise

I’m very new to Python and multithreaded programming in general. Basically, I have a script that will copy files to another location. I would like this to be placed in another thread so I can output .... to indicate that the script is still running.

The problem that I am having is that if the files cannot be copied it will throw an exception. This is ok if running in the main thread; however, having the following code does not work:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

In the thread class itself, I tried to re-throw the exception, but it does not work. I have seen people on here ask similar questions, but they all seem to be doing something more specific than what I am trying to do (and I don’t quite understand the solutions offered). I have seen people mention the usage of sys.exc_info(), however I do not know where or how to use it.

All help is greatly appreciated!

EDIT: The code for the thread class is below:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder

    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise

回答 0

问题是thread_obj.start()立即返回。您产生的子线程在其自己的上下文中执行,并带有自己的堆栈。在那里发生的任何异常都在子线程的上下文中,并且在其自己的堆栈中。我现在可以想到的一种将此信息传达给父线程的方法是使用某种消息传递,因此您可能会对此进行研究。

尝试以下尺寸:

import sys
import threading
import Queue


class ExcThread(threading.Thread):

    def __init__(self, bucket):
        threading.Thread.__init__(self)
        self.bucket = bucket

    def run(self):
        try:
            raise Exception('An error occured here.')
        except Exception:
            self.bucket.put(sys.exc_info())


def main():
    bucket = Queue.Queue()
    thread_obj = ExcThread(bucket)
    thread_obj.start()

    while True:
        try:
            exc = bucket.get(block=False)
        except Queue.Empty:
            pass
        else:
            exc_type, exc_obj, exc_trace = exc
            # deal with the exception
            print exc_type, exc_obj
            print exc_trace

        thread_obj.join(0.1)
        if thread_obj.isAlive():
            continue
        else:
            break


if __name__ == '__main__':
    main()

The problem is that thread_obj.start() returns immediately. The child thread that you spawned executes in its own context, with its own stack. Any exception that occurs there is in the context of the child thread, and it is in its own stack. One way I can think of right now to communicate this information to the parent thread is by using some sort of message passing, so you might look into that.

Try this on for size:

import sys
import threading
import Queue


class ExcThread(threading.Thread):

    def __init__(self, bucket):
        threading.Thread.__init__(self)
        self.bucket = bucket

    def run(self):
        try:
            raise Exception('An error occured here.')
        except Exception:
            self.bucket.put(sys.exc_info())


def main():
    bucket = Queue.Queue()
    thread_obj = ExcThread(bucket)
    thread_obj.start()

    while True:
        try:
            exc = bucket.get(block=False)
        except Queue.Empty:
            pass
        else:
            exc_type, exc_obj, exc_trace = exc
            # deal with the exception
            print exc_type, exc_obj
            print exc_trace

        thread_obj.join(0.1)
        if thread_obj.isAlive():
            continue
        else:
            break


if __name__ == '__main__':
    main()

回答 1

通过该concurrent.futures模块,可以轻松地在单独的线程(或进程)中进行工作并处理任何导致的异常:

import concurrent.futures
import shutil

def copytree_with_dots(src_path, dst_path):
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Execute the copy on a separate thread,
        # creating a future object to track progress.
        future = executor.submit(shutil.copytree, src_path, dst_path)

        while future.running():
            # Print pretty dots here.
            pass

        # Return the value returned by shutil.copytree(), None.
        # Raise any exceptions raised during the copy process.
        return future.result()

concurrent.futures包含在Python 3.2中,并且可以作为早期版本的向后移植futures模块

The concurrent.futures module makes it simple to do work in separate threads (or processes) and handle any resulting exceptions:

import concurrent.futures
import shutil

def copytree_with_dots(src_path, dst_path):
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Execute the copy on a separate thread,
        # creating a future object to track progress.
        future = executor.submit(shutil.copytree, src_path, dst_path)

        while future.running():
            # Print pretty dots here.
            pass

        # Return the value returned by shutil.copytree(), None.
        # Raise any exceptions raised during the copy process.
        return future.result()

concurrent.futures is included with Python 3.2, and is available as the backported futures module for earlier versions.


回答 2

这个问题有很多非常复杂的答案。我是否对此简化了,因为这对我来说大多数事情似乎已经足够。

from threading import Thread

class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            if hasattr(self, '_Thread__target'):
                # Thread uses name mangling prior to Python 3.
                self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            else:
                self.ret = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            self.exc = e

    def join(self):
        super(PropagatingThread, self).join()
        if self.exc:
            raise self.exc
        return self.ret

如果确定只能在一个或另一个版本的Python上运行,则可以将该run()方法缩减为仅损坏的版本(如果仅在3之前的Python版本上运行),或者只是干净的版本(如果您只能在以3开头的Python版本上运行)。

用法示例:

def f(*args, **kwargs):
    print(args)
    print(kwargs)
    raise Exception('I suck at this')

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()

当您加入时,您将看到另一个线程引发的异常。

如果仅使用sixPython 3或仅在Python 3上使用,则可以改善重新引发异常时所获得的堆栈跟踪信息。您可以将内部异常包装在新的外部异常中,而不是仅在连接时使用堆栈,并使用

six.raise_from(RuntimeError('Exception in thread'),self.exc)

要么

raise RuntimeError('Exception in thread') from self.exc

There are a lot of really weirdly complicated answers to this question. Am I oversimplifying this, because this seems sufficient for most things to me.

from threading import Thread

class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            if hasattr(self, '_Thread__target'):
                # Thread uses name mangling prior to Python 3.
                self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            else:
                self.ret = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            self.exc = e

    def join(self):
        super(PropagatingThread, self).join()
        if self.exc:
            raise self.exc
        return self.ret

If you’re certain you’ll only ever be running on one or the other version of Python, you could reduce the run() method down to just the mangled version (if you’ll only be running on versions of Python before 3), or just the clean version (if you’ll only be running on versions of Python starting with 3).

Example usage:

def f(*args, **kwargs):
    print(args)
    print(kwargs)
    raise Exception('I suck at this')

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()

And you’ll see the exception raised on the other thread when you join.

If you are using six or on Python 3 only, you can improve the stack trace information you get when the exception is re-raised. Instead of only the stack at the point of the join, you can wrap the inner exception in a new outer exception, and get both stack traces with

six.raise_from(RuntimeError('Exception in thread'),self.exc)

or

raise RuntimeError('Exception in thread') from self.exc

回答 3

尽管不可能直接捕获在不同线程中引发的异常,但是这里的代码可以透明地获取与该功能非常接近的内容。在等待线程完成其工作时,您的子线程必须ExThread代替该类的子类,threading.Thread并且父线程必须调用该child_thread.join_with_exception()方法,而不是调用该方法child_thread.join()

此实现的技术细节:当子线程引发异常时,它将通过a传递给Queue父线程,然后再次在父线程中引发。请注意,这种方法无需等待。

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except BaseException:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    def run_with_exception(self):
        thread_name = threading.current_thread().name
        raise MyException("An error in thread '{}'.".format(thread_name))

def main():
    t = MyThread()
    t.start()
    try:
        t.join_with_exception()
    except MyException as ex:
        thread_name = threading.current_thread().name
        print "Caught a MyException in thread '{}': {}".format(thread_name, ex)

if __name__ == '__main__':
    main()

Although it is not possible to directly catch an exception thrown in a different thread, here’s a code to quite transparently obtain something very close to this functionality. Your child thread must subclass the ExThread class instead of threading.Thread and the parent thread must call the child_thread.join_with_exception() method instead of child_thread.join() when waiting for the thread to finish its job.

Technical details of this implementation: when the child thread throws an exception, it is passed to the parent through a Queue and thrown again in the parent thread. Notice that there’s no busy waiting in this approach .

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except BaseException:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    def run_with_exception(self):
        thread_name = threading.current_thread().name
        raise MyException("An error in thread '{}'.".format(thread_name))

def main():
    t = MyThread()
    t.start()
    try:
        t.join_with_exception()
    except MyException as ex:
        thread_name = threading.current_thread().name
        print "Caught a MyException in thread '{}': {}".format(thread_name, ex)

if __name__ == '__main__':
    main()

回答 4

如果线程中发生异常,最好的方法是在期间在调用者线程中重新引发它join。您可以使用该sys.exc_info()函数获取有关当前正在处理的异常的信息。该信息可以简单地存储为线程对象的属性,直到join被调用为止,此时可以重新引发该信息。

请注意,Queue.Queue在这种简单的情况下,线程最多引发1个异常,在引发异常后立即完成,因此不需要(如其他答案中所建议的)。我们只需等待线程完成即可避免出现竞争情况。

例如,扩展ExcThread(在下面),覆盖excRun(而不是run)。

Python 2.x:

import threading

class ExcThread(threading.Thread):
  def excRun(self):
    pass

  def run(self):
    self.exc = None
    try:
      # Possibly throws an exception
      self.excRun()
    except:
      import sys
      self.exc = sys.exc_info()
      # Save details of the exception thrown but don't rethrow,
      # just complete the function

  def join(self):
    threading.Thread.join(self)
    if self.exc:
      msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
      new_exc = Exception(msg)
      raise new_exc.__class__, new_exc, self.exc[2]

Python 3.x:

的3参数形式raise在Python 3中消失了,因此将最后一行更改为:

raise new_exc.with_traceback(self.exc[2])

If an exception occurs in a thread, the best way is to re-raise it in the caller thread during join. You can get information about the exception currently being handled using the sys.exc_info() function. This information can simply be stored as a property of the thread object until join is called, at which point it can be re-raised.

Note that a Queue.Queue (as suggested in other answers) is not necessary in this simple case where the thread throws at most 1 exception and completes right after throwing an exception. We avoid race conditions by simply waiting for the thread to complete.

For example, extend ExcThread (below), overriding excRun (instead of run).

Python 2.x:

import threading

class ExcThread(threading.Thread):
  def excRun(self):
    pass

  def run(self):
    self.exc = None
    try:
      # Possibly throws an exception
      self.excRun()
    except:
      import sys
      self.exc = sys.exc_info()
      # Save details of the exception thrown but don't rethrow,
      # just complete the function

  def join(self):
    threading.Thread.join(self)
    if self.exc:
      msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
      new_exc = Exception(msg)
      raise new_exc.__class__, new_exc, self.exc[2]

Python 3.x:

The 3 argument form for raise is gone in Python 3, so change the last line to:

raise new_exc.with_traceback(self.exc[2])

回答 5

concurrent.futures.as_completed

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

以下解决方法:

  • 调用异常后立即返回主线程
  • 不需要额外的用户定义类,因为它不需要:
    • 明确的 Queue
    • 在工作线程周围添加一个else

资源:

#!/usr/bin/env python3

import concurrent.futures
import time

def func_that_raises(do_raise):
    for i in range(3):
        print(i)
        time.sleep(0.1)
    if do_raise:
        raise Exception()
    for i in range(3):
        print(i)
        time.sleep(0.1)

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    futures.append(executor.submit(func_that_raises, False))
    futures.append(executor.submit(func_that_raises, True))
    for future in concurrent.futures.as_completed(futures):
        print(repr(future.exception()))

可能的输出:

0
0
1
1
2
2
0
Exception()
1
2
None

不幸的是,由于一个交易失败,不可能杀死期货以取消其他交易:

如果您执行以下操作:

for future in concurrent.futures.as_completed(futures):
    if future.exception() is not None:
        raise future.exception()

然后with捕获它,并等待第二个线程完成后再继续。以下行为类似:

for future in concurrent.futures.as_completed(futures):
    future.result()

因为future.result()如果发生一个异常,则会重新引发异常。

如果您想退出整个Python过程,则可以使用os._exit(0),但是这可能意味着您需要重构。

具有完美异常语义的自定义类

我最终在以下方面为自己编写了一个完美的接口:限制一次运行的最大线程数的正确方法?部分“带有错误处理的队列示例”。该类旨在既方便,又使您可以完全控制提交和结果/错误处理。

在Python 3.6.7,Ubuntu 18.04上进行了测试。

concurrent.futures.as_completed

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

The following solution:

  • returns to the main thread immediately when an exception is called
  • requires no extra user defined classes because it does not need:
    • an explicit Queue
    • to add an except else around your work thread

Source:

#!/usr/bin/env python3

import concurrent.futures
import time

def func_that_raises(do_raise):
    for i in range(3):
        print(i)
        time.sleep(0.1)
    if do_raise:
        raise Exception()
    for i in range(3):
        print(i)
        time.sleep(0.1)

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    futures.append(executor.submit(func_that_raises, False))
    futures.append(executor.submit(func_that_raises, True))
    for future in concurrent.futures.as_completed(futures):
        print(repr(future.exception()))

Possible output:

0
0
1
1
2
2
0
Exception()
1
2
None

It is unfortunately not possible to kill futures to cancel the others as one fails:

If you do something like:

for future in concurrent.futures.as_completed(futures):
    if future.exception() is not None:
        raise future.exception()

then the with catches it, and waits for the second thread to finish before continuing. The following behaves similarly:

for future in concurrent.futures.as_completed(futures):
    future.result()

since future.result() re-raises the exception if one occurred.

If you want to quit the entire Python process, you might get away with os._exit(0), but this likely means you need a refactor.

Custom class with perfect exception semantics

I ended up coding the perfect interface for myself at: The right way to limit maximum number of threads running at once? section “Queue example with error handling”. That class aims to be both convenient, and give you total control over submission and result / error handling.

Tested on Python 3.6.7, Ubuntu 18.04.


回答 6

这是一个令人讨厌的小问题,我想提出自己的解决方案。我发现的其他一些解决方案(例如async.io)看起来很有希望,但也有一些黑匣子。队列/事件循环方法将您与特定实现联系在一起。但是,并发的期货源代码大约只有1000行,并且很容易理解。它使我可以轻松地解决我的问题:无需进行过多设置即可创建临时工作线程,并能够捕获主线程中的异常。

我的解决方案使用并发的期货API和线程API。它允许您创建一个工作线程,为您提供线程和未来。这样,您可以加入线程以等待结果:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

…或者您可以让工作人员在完成后仅发送回调:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

…或者您可以循环播放直到事件结束:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)

这是代码:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())

…以及测试功能:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')

This was a nasty little problem, and I’d like to throw my solution in. Some other solutions I found (async.io for example) looked promising but also presented a bit of a black box. The queue / event loop approach sort of ties you to a certain implementation. The concurrent futures source code, however, is around only 1000 lines, and easy to comprehend. It allowed me to easily solve my problem: create ad-hoc worker threads without much setup, and to be able to catch exceptions in the main thread.

My solution uses the concurrent futures API and threading API. It allows you to create a worker which gives you both the thread and the future. That way, you can join the thread to wait for the result:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

…or you can let the worker just send a callback when done:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

…or you can loop until the event completes:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)

Here’s the code:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())

…and the test function:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')

回答 7

作为Threading的入门者,我花了很长时间了解如何实现Mateusz Kobos的代码(上述)。这是一个澄清的版本,可帮助您了解如何使用它。

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except Exception:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    # This overrides the "run_with_exception" from class "ExThread"
    # Note, this is where the actual thread to be run lives. The thread
    # to be run could also call a method or be passed in as an object
    def run_with_exception(self):
        # Code will function until the int
        print "sleeping 5 seconds"
        import time
        for i in 1, 2, 3, 4, 5:
            print i
            time.sleep(1) 
        # Thread should break here
        int("str")
# I'm honestly not sure why these appear here? So, I removed them. 
# Perhaps Mateusz can clarify?        
#         thread_name = threading.current_thread().name
#         raise MyException("An error in thread '{}'.".format(thread_name))

if __name__ == '__main__':
    # The code lives in MyThread in this example. So creating the MyThread 
    # object set the code to be run (but does not start it yet)
    t = MyThread()
    # This actually starts the thread
    t.start()
    print
    print ("Notice 't.start()' is considered to have completed, although" 
           " the countdown continues in its new thread. So you code "
           "can tinue into new processing.")
    # Now that the thread is running, the join allows for monitoring of it
    try:
        t.join_with_exception()
    # should be able to be replace "Exception" with specific error (untested)
    except Exception, e: 
        print
        print "Exceptioon was caught and control passed back to the main thread"
        print "Do some handling here...or raise a custom exception "
        thread_name = threading.current_thread().name
        e = ("Caught a MyException in thread: '" + 
             str(thread_name) + 
             "' [" + str(e) + "]")
        raise Exception(e) # Or custom class of exception, such as MyException

As a noobie to Threading, it took me a long time to understand how to implement Mateusz Kobos’s code (above). Here’s a clarified version to help understand how to use it.

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except Exception:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    # This overrides the "run_with_exception" from class "ExThread"
    # Note, this is where the actual thread to be run lives. The thread
    # to be run could also call a method or be passed in as an object
    def run_with_exception(self):
        # Code will function until the int
        print "sleeping 5 seconds"
        import time
        for i in 1, 2, 3, 4, 5:
            print i
            time.sleep(1) 
        # Thread should break here
        int("str")
# I'm honestly not sure why these appear here? So, I removed them. 
# Perhaps Mateusz can clarify?        
#         thread_name = threading.current_thread().name
#         raise MyException("An error in thread '{}'.".format(thread_name))

if __name__ == '__main__':
    # The code lives in MyThread in this example. So creating the MyThread 
    # object set the code to be run (but does not start it yet)
    t = MyThread()
    # This actually starts the thread
    t.start()
    print
    print ("Notice 't.start()' is considered to have completed, although" 
           " the countdown continues in its new thread. So you code "
           "can tinue into new processing.")
    # Now that the thread is running, the join allows for monitoring of it
    try:
        t.join_with_exception()
    # should be able to be replace "Exception" with specific error (untested)
    except Exception, e: 
        print
        print "Exceptioon was caught and control passed back to the main thread"
        print "Do some handling here...or raise a custom exception "
        thread_name = threading.current_thread().name
        e = ("Caught a MyException in thread: '" + 
             str(thread_name) + 
             "' [" + str(e) + "]")
        raise Exception(e) # Or custom class of exception, such as MyException

回答 8

类似于RickardSjogren的没有Queue,sys等的方式,但是也没有一些信号侦听器:直接执行与except块相对应的异常处理程序。

#!/usr/bin/env python3

import threading

class ExceptionThread(threading.Thread):

    def __init__(self, callback=None, *args, **kwargs):
        """
        Redirect exceptions of thread to an exception handler.

        :param callback: function to handle occured exception
        :type callback: function(thread, exception)
        :param args: arguments for threading.Thread()
        :type args: tuple
        :param kwargs: keyword arguments for threading.Thread()
        :type kwargs: dict
        """
        self._callback = callback
        super().__init__(*args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except BaseException as e:
            if self._callback is None:
                raise e
            else:
                self._callback(self, e)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs, self._callback

仅self._callback和run()中的except块是普通threading.Thread的附加项。

Similar way like RickardSjogren’s without Queue, sys etc. but also without some listeners to signals: execute directly an exception handler which corresponds to an except block.

#!/usr/bin/env python3

import threading

class ExceptionThread(threading.Thread):

    def __init__(self, callback=None, *args, **kwargs):
        """
        Redirect exceptions of thread to an exception handler.

        :param callback: function to handle occured exception
        :type callback: function(thread, exception)
        :param args: arguments for threading.Thread()
        :type args: tuple
        :param kwargs: keyword arguments for threading.Thread()
        :type kwargs: dict
        """
        self._callback = callback
        super().__init__(*args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except BaseException as e:
            if self._callback is None:
                raise e
            else:
                self._callback(self, e)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs, self._callback

Only self._callback and the except block in run() is additional to normal threading.Thread.


回答 9

我知道我在这里参加聚会有点晚了,但是我遇到了一个非常类似的问题,但是它包括使用tkinter作为GUI,并且mainloop使得无法使用任何依赖.join()的解决方案。因此,我改编了原始问题的EDIT中给出的解决方案,但使其变得更笼统,以使其他人易于理解。

这是正在使用的新线程类:

import threading
import traceback
import logging


class ExceptionThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except Exception:
            logging.error(traceback.format_exc())


def test_function_1(input):
    raise IndexError(input)


if __name__ == "__main__":
    input = 'useful'

    t1 = ExceptionThread(target=test_function_1, args=[input])
    t1.start()

当然,从日志记录中,您总是可以用其他方法来处理异常,例如将其打印出来或将其输出到控制台。

这使您可以像完全使用Thread类一样使用ExceptionThread类,而无需进行任何特殊修改。

I know I’m a bit late to the party here but I was having a very similar problem but it included using tkinter as a GUI, and the mainloop made it impossible to use any of the solutions that depend on .join(). Therefore I adapted the solution given in the EDIT of the original question, but made it more general to make it easier to understand for others.

Here is the new thread class in action:

import threading
import traceback
import logging


class ExceptionThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except Exception:
            logging.error(traceback.format_exc())


def test_function_1(input):
    raise IndexError(input)


if __name__ == "__main__":
    input = 'useful'

    t1 = ExceptionThread(target=test_function_1, args=[input])
    t1.start()

Of course you can always have it handle the exception some other way from logging, such as printing it out, or having it output to the console.

This allows you to use the ExceptionThread class exactly like you would the Thread class, without any special modifications.


回答 10

我喜欢的一种方法是基于观察者模式。我定义了一个信号类,我的线程使用该信号类向侦听器发出异常。它也可以用于从线程返回值。例:

import threading

class Signal:
    def __init__(self):
        self._subscribers = list()

    def emit(self, *args, **kwargs):
        for func in self._subscribers:
            func(*args, **kwargs)

    def connect(self, func):
        self._subscribers.append(func)

    def disconnect(self, func):
        try:
            self._subscribers.remove(func)
        except ValueError:
            raise ValueError('Function {0} not removed from {1}'.format(func, self))


class WorkerThread(threading.Thread):

    def __init__(self, *args, **kwargs):
        super(WorkerThread, self).__init__(*args, **kwargs)
        self.Exception = Signal()
        self.Result = Signal()

    def run(self):
        if self._Thread__target is not None:
            try:
                self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            except Exception as e:
                self.Exception.emit(e)
            else:
                self.Result.emit(self._return_value)

if __name__ == '__main__':
    import time

    def handle_exception(exc):
        print exc.message

    def handle_result(res):
        print res

    def a():
        time.sleep(1)
        raise IOError('a failed')

    def b():
        time.sleep(2)
        return 'b returns'

    t = WorkerThread(target=a)
    t2 = WorkerThread(target=b)
    t.Exception.connect(handle_exception)
    t2.Result.connect(handle_result)
    t.start()
    t2.start()

    print 'Threads started'

    t.join()
    t2.join()
    print 'Done'

我没有使用线程的足够经验来宣称这是一种完全安全的方法。但这对我有用,我喜欢这种灵活性。

One method I am fond of is based on the observer pattern. I define a signal class which my thread uses to emit exceptions to listeners. It can also be used to return values from threads. Example:

import threading

class Signal:
    def __init__(self):
        self._subscribers = list()

    def emit(self, *args, **kwargs):
        for func in self._subscribers:
            func(*args, **kwargs)

    def connect(self, func):
        self._subscribers.append(func)

    def disconnect(self, func):
        try:
            self._subscribers.remove(func)
        except ValueError:
            raise ValueError('Function {0} not removed from {1}'.format(func, self))


class WorkerThread(threading.Thread):

    def __init__(self, *args, **kwargs):
        super(WorkerThread, self).__init__(*args, **kwargs)
        self.Exception = Signal()
        self.Result = Signal()

    def run(self):
        if self._Thread__target is not None:
            try:
                self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            except Exception as e:
                self.Exception.emit(e)
            else:
                self.Result.emit(self._return_value)

if __name__ == '__main__':
    import time

    def handle_exception(exc):
        print exc.message

    def handle_result(res):
        print res

    def a():
        time.sleep(1)
        raise IOError('a failed')

    def b():
        time.sleep(2)
        return 'b returns'

    t = WorkerThread(target=a)
    t2 = WorkerThread(target=b)
    t.Exception.connect(handle_exception)
    t2.Result.connect(handle_result)
    t.start()
    t2.start()

    print 'Threads started'

    t.join()
    t2.join()
    print 'Done'

I do not have enough experience of working with threads to claim that this is a completely safe method. But it has worked for me and I like the flexibility.


回答 11

使用裸露的exceptions不是一个好习惯,因为您通常会收获比讨价还价更多的东西。

我建议修改,except使其仅捕获您要处理的异常。我认为提高它并没有达到预期的效果,因为当您TheThread在外部实例化时try,如果它引发异常,则分配将永远不会发生。

相反,您可能只想提醒它并继续前进,例如:

def run(self):
    try:
       shul.copytree(self.sourceFolder, self.destFolder)
    except OSError, err:
       print err

然后,当该异常被捕获时,您可以在那里处理它。然后,当外部程序try从中捕获到异常时TheThread,您就会知道它不会是您已经处理过的异常,它将帮助您隔离处理流程。

Using naked excepts is not a good practice because you usually catch more than you bargain for.

I would suggest modifying the except to catch ONLY the exception that you would like to handle. I don’t think that raising it has the desired effect, because when you go to instantiate TheThread in the outer try, if it raises an exception, the assignment is never going to happen.

Instead you might want to just alert on it and move on, such as:

def run(self):
    try:
       shul.copytree(self.sourceFolder, self.destFolder)
    except OSError, err:
       print err

Then when that exception is caught, you can handle it there. Then when the outer try catches an exception from TheThread, you know it won’t be the one you already handled, and will help you isolate your process flow.


回答 12

捕获线程异常并将其传递回调用方方法的一种简单方法是将字典或列表传递给worker方法。

示例(将字典传递给worker方法):

import threading

def my_method(throw_me):
    raise Exception(throw_me)

def worker(shared_obj, *args, **kwargs):
    try:
        shared_obj['target'](*args, **kwargs)
    except Exception as err:
        shared_obj['err'] = err

shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"

th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()

if shared_obj['err']:
    print(">>%s" % shared_obj['err'])

A simple way of catching thread’s exception and communicating back to the caller method could be by passing dictionary or a list to worker method.

Example (passing dictionary to worker method):

import threading

def my_method(throw_me):
    raise Exception(throw_me)

def worker(shared_obj, *args, **kwargs):
    try:
        shared_obj['target'](*args, **kwargs)
    except Exception as err:
        shared_obj['err'] = err

shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"

th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()

if shared_obj['err']:
    print(">>%s" % shared_obj['err'])

回答 13

用异常存储包装线程。

import threading
import sys
class ExcThread(threading.Thread):

    def __init__(self, target, args = None):
        self.args = args if args else []
        self.target = target
        self.exc = None
        threading.Thread.__init__(self)

    def run(self):
        try:
            self.target(*self.args)
            raise Exception('An error occured here.')
        except Exception:
            self.exc=sys.exc_info()

def main():
    def hello(name):
        print(!"Hello, {name}!")
    thread_obj = ExcThread(target=hello, args=("Jack"))
    thread_obj.start()

    thread_obj.join()
    exc = thread_obj.exc
    if exc:
        exc_type, exc_obj, exc_trace = exc
        print(exc_type, ':',exc_obj, ":", exc_trace)

main()

Wrap Thread with exception storage.

import threading
import sys
class ExcThread(threading.Thread):

    def __init__(self, target, args = None):
        self.args = args if args else []
        self.target = target
        self.exc = None
        threading.Thread.__init__(self)

    def run(self):
        try:
            self.target(*self.args)
            raise Exception('An error occured here.')
        except Exception:
            self.exc=sys.exc_info()

def main():
    def hello(name):
        print(!"Hello, {name}!")
    thread_obj = ExcThread(target=hello, args=("Jack"))
    thread_obj.start()

    thread_obj.join()
    exc = thread_obj.exc
    if exc:
        exc_type, exc_obj, exc_trace = exc
        print(exc_type, ':',exc_obj, ":", exc_trace)

main()

回答 14

pygolang提供sync.WorkGroup,尤其是将异常从生成的工作线程传播到主线程。例如:

#!/usr/bin/env python
"""This program demostrates how with sync.WorkGroup an exception raised in
spawned thread is propagated into main thread which spawned the worker."""

from __future__ import print_function
from golang import sync, context

def T1(ctx, *argv):
    print('T1: run ... %r' % (argv,))
    raise RuntimeError('T1: problem')

def T2(ctx):
    print('T2: ran ok')

def main():
    wg = sync.WorkGroup(context.background())
    wg.go(T1, [1,2,3])
    wg.go(T2)

    try:
        wg.wait()
    except Exception as e:
        print('Tmain: caught exception: %r\n' %e)
        # reraising to see full traceback
        raise

if __name__ == '__main__':
    main()

运行时给出以下内容:

T1: run ... ([1, 2, 3],)
T2: ran ok
Tmain: caught exception: RuntimeError('T1: problem',)

Traceback (most recent call last):
  File "./x.py", line 28, in <module>
    main()
  File "./x.py", line 21, in main
    wg.wait()
  File "golang/_sync.pyx", line 198, in golang._sync.PyWorkGroup.wait
    pyerr_reraise(pyerr)
  File "golang/_sync.pyx", line 178, in golang._sync.PyWorkGroup.go.pyrunf
    f(pywg._pyctx, *argv, **kw)
  File "./x.py", line 10, in T1
    raise RuntimeError('T1: problem')
RuntimeError: T1: problem

该问题的原始代码就是:

    wg = sync.WorkGroup(context.background())

    def _(ctx):
        shul.copytree(sourceFolder, destFolder)
    wg.go(_)

    # waits for spawned worker to complete and, on error, reraises
    # its exception on the main thread.
    wg.wait()

pygolang provides sync.WorkGroup which, in particular, propagates exception from spawned worker threads to the main thread. For example:

#!/usr/bin/env python
"""This program demostrates how with sync.WorkGroup an exception raised in
spawned thread is propagated into main thread which spawned the worker."""

from __future__ import print_function
from golang import sync, context

def T1(ctx, *argv):
    print('T1: run ... %r' % (argv,))
    raise RuntimeError('T1: problem')

def T2(ctx):
    print('T2: ran ok')

def main():
    wg = sync.WorkGroup(context.background())
    wg.go(T1, [1,2,3])
    wg.go(T2)

    try:
        wg.wait()
    except Exception as e:
        print('Tmain: caught exception: %r\n' %e)
        # reraising to see full traceback
        raise

if __name__ == '__main__':
    main()

gives the following when run:

T1: run ... ([1, 2, 3],)
T2: ran ok
Tmain: caught exception: RuntimeError('T1: problem',)

Traceback (most recent call last):
  File "./x.py", line 28, in <module>
    main()
  File "./x.py", line 21, in main
    wg.wait()
  File "golang/_sync.pyx", line 198, in golang._sync.PyWorkGroup.wait
    pyerr_reraise(pyerr)
  File "golang/_sync.pyx", line 178, in golang._sync.PyWorkGroup.go.pyrunf
    f(pywg._pyctx, *argv, **kw)
  File "./x.py", line 10, in T1
    raise RuntimeError('T1: problem')
RuntimeError: T1: problem

The original code from the question would be just:

    wg = sync.WorkGroup(context.background())

    def _(ctx):
        shul.copytree(sourceFolder, destFolder)
    wg.go(_)

    # waits for spawned worker to complete and, on error, reraises
    # its exception on the main thread.
    wg.wait()