标签归档:multithreading

线程和多处理模块之间有什么区别?

问题:线程和多处理模块之间有什么区别?

我正在学习如何在Python中使用threadingmultiprocessing模块并行运行某些操作并加快代码速度。

我发现很难理解一个threading.Thread()对象与一个对象之间的区别(也许是因为我没有任何理论背景)multiprocessing.Process()

另外,对我来说,如何实例化一个作业队列并使其只有4个(例如)并行运行,而另一个则等待资源释放后再执行,对我来说也不是很清楚。

我发现文档中的示例很清楚,但并不十分详尽。一旦尝试使事情复杂化,我就会收到很多奇怪的错误(例如无法腌制的方法,等等)。

那么,什么时候应该使用threadingand multiprocessing模块?

您能否将我链接到一些资源,以解释这两个模块的概念以及如何在复杂的任务中正确使用它们?

I am learning how to use the threading and the multiprocessing modules in Python to run certain operations in parallel and speed up my code.

I am finding this hard (maybe because I don’t have any theoretical background about it) to understand what the difference is between a threading.Thread() object and a multiprocessing.Process() one.

Also, it is not entirely clear to me how to instantiate a queue of jobs and having only 4 (for example) of them running in parallel, while the other wait for resources to free before being executed.

I find the examples in the documentation clear, but not very exhaustive; as soon as I try to complicate things a bit, I receive a lot of weird errors (like a method that can’t be pickled, and so on).

So, when should I use the threading and multiprocessing modules?

Can you link me to some resources that explain the concepts behind these two modules and how to use them properly for complex tasks?


回答 0

什么朱利奥·佛朗哥说,对于多线程多处理与真一般

但是,Python *还有一个问题:有一个全局解释器锁,可以防止同一进程中的两个线程同时运行Python代码。这意味着,如果您有8个核心,并且将代码更改为使用8个线程,则它将无法使用800%的CPU并无法以8倍的速度运行;它会使用相同的100%CPU,并以相同的速度运行。(实际上,它的运行速度会稍慢一些,因为即使您没有任何共享数据,线程处理也会带来额外的开销,但是现在暂时忽略它。)

也有exceptions。如果您的代码繁重的计算实际上不是在Python中发生的,而是在某些具有自定义C代码的库中执行的,这些代码可以正确地进行GIL处理,例如numpy应用程序,则您将从线程中获得预期的性能收益。如果繁重的计算是由运行并等待的某些子进程完成的,则情况也是如此。

更重要的是,在某些情况下,这无关紧要。例如,网络服务器花费大部分时间来读取网络中的数据包,而GUI应用花费大部分时间来等待用户事件。在网络服务器或GUI应用程序中使用线程的原因之一是允许您执行长时间运行的“后台任务”,而不会阻止主线程继续为网络数据包或GUI事件提供服务。这在Python线程中工作得很好。(从技术上讲,这意味着Python线程为您提供了并发性,即使它们没有为您提供核心并行性。)

但是,如果您使用纯Python编写受CPU约束的程序,则使用更多线程通常无济于事。

对于GIL,使用单独的进程没有这种问题,因为每个进程都有自己的单独的GIL。当然,线程和进程之间仍然具有与其他任何语言相同的权衡取舍–在进程之间共享数据比在线程之间共享更加困难,而且成本更高,运行大量进程或创建和销毁这些开销可能会很高等等。但是GIL在处理方面的平衡上权衡沉重,这对于C或Java而言并非如此。因此,您会发现自己在Python中比在C或Java中使用多处理的频率更高。


同时,Python的“含电池”理念带来了一些好消息:编写代码很容易,只需进行一次更改即可在线程和进程之间来回切换。

如果您根据独立的“作业”来设计代码,除了输入和输出,这些作业不与其他作业(或主程序)共享任何内容,则可以使用该concurrent.futures库在线程池周围编写代码,如下所示:

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    executor.submit(job, argument)
    executor.map(some_function, collection_of_independent_things)
    # ...

您甚至可以获取这些作业的结果,并将其传递给其他作业,按执行顺序或完成顺序等待;等等。阅读有关Future对象的部分以获取详细信息。

现在,如果事实证明您的程序一直在使用100%CPU,并且添加更多线程只会使其速度变慢,那么您就遇到了GIL问题,因此您需要切换到进程。您要做的就是更改第一行:

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:

唯一真正的警告是,作业的自变量和返回值必须可腌制(而不需要花费太多时间或内存来腌制)才能使用跨进程。通常这不是问题,但有时是问题。


但是,如果您的工作不能自给自足怎么办?如果您可以根据将消息从一个传递到另一个的工作来设计代码,那仍然很容易。您可能必须使用threading.Threadmultiprocessing.Process代替依赖池。并且您将必须显式创建queue.Queuemultiprocessing.Queue对象。(还有很多其他选择,例如管道,套接字,带有斑点的文件等等。但是,要点是,如果执行器的自动魔力不足,则必须手动执行某些操作。)

但是,如果您甚至不能依靠消息传递怎么办?如果您需要两项工作来同时改变同一个结构并看到彼此的变化,该怎么办?在这种情况下,您将需要进行手动同步(锁定,信号量,条件等),并且,如果要使用进程,则需要显式的共享内存对象进行引导。这是当多线程(或多处理)变得困难时。如果可以避免,那就太好了;如果不能,那么您将需要阅读的内容超过某人可以提供的答案。


通过评论,您想了解Python中的线程和进程之间的区别。的确,如果您阅读了朱利奥·佛朗哥(Giulio Franco)的答案和我的知识以及我们所有的链接,那应该涵盖了所有内容……但是总结肯定会很有用,所以这里是:

  1. 线程默认共享数据;流程没有。
  2. 作为(1)的结果,在进程之间发送数据通常需要对其进行酸洗和酸洗。**
  3. (1)的另一个结果是,在进程之间直接共享数据通常需要将其放入低级格式,如Value,Array和ctypesTypes。
  4. 流程不受GIL约束。
  5. 在某些平台(主要是Windows)上,创建和销毁进程的成本要高得多。
  6. 对流程有一些额外的限制,其中某些限制在不同平台上有所不同。有关详细信息,请参见编程指南
  7. threading模块不具有该模块的某些功能multiprocessing。(您可以使用multiprocessing.dummy大多数缺少的API放在线程之上,也可以使用更高级别的模块,例如concurrent.futures,不必担心。)

*出现此问题的实际上不是Python语言,而是该语言的“标准”实现CPython。其他一些实现没有JIL,例如Jython。

**如果您正在使用fork start方法进行多处理(在大多数非Windows平台上可以使用),则每个子进程都将获得启动子级时父级拥有的任何资源,这可能是将数据传递给子级的另一种方式。

What Giulio Franco says is true for multithreading vs. multiprocessing in general.

However, Python* has an added issue: There’s a Global Interpreter Lock that prevents two threads in the same process from running Python code at the same time. This means that if you have 8 cores, and change your code to use 8 threads, it won’t be able to use 800% CPU and run 8x faster; it’ll use the same 100% CPU and run at the same speed. (In reality, it’ll run a little slower, because there’s extra overhead from threading, even if you don’t have any shared data, but ignore that for now.)

There are exceptions to this. If your code’s heavy computation doesn’t actually happen in Python, but in some library with custom C code that does proper GIL handling, like a numpy app, you will get the expected performance benefit from threading. The same is true if the heavy computation is done by some subprocess that you run and wait on.

More importantly, there are cases where this doesn’t matter. For example, a network server spends most of its time reading packets off the network, and a GUI app spends most of its time waiting for user events. One reason to use threads in a network server or GUI app is to allow you to do long-running “background tasks” without stopping the main thread from continuing to service network packets or GUI events. And that works just fine with Python threads. (In technical terms, this means Python threads give you concurrency, even though they don’t give you core-parallelism.)

But if you’re writing a CPU-bound program in pure Python, using more threads is generally not helpful.

Using separate processes has no such problems with the GIL, because each process has its own separate GIL. Of course you still have all the same tradeoffs between threads and processes as in any other languages—it’s more difficult and more expensive to share data between processes than between threads, it can be costly to run a huge number of processes or to create and destroy them frequently, etc. But the GIL weighs heavily on the balance toward processes, in a way that isn’t true for, say, C or Java. So, you will find yourself using multiprocessing a lot more often in Python than you would in C or Java.


Meanwhile, Python’s “batteries included” philosophy brings some good news: It’s very easy to write code that can be switched back and forth between threads and processes with a one-liner change.

If you design your code in terms of self-contained “jobs” that don’t share anything with other jobs (or the main program) except input and output, you can use the concurrent.futures library to write your code around a thread pool like this:

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    executor.submit(job, argument)
    executor.map(some_function, collection_of_independent_things)
    # ...

You can even get the results of those jobs and pass them on to further jobs, wait for things in order of execution or in order of completion, etc.; read the section on Future objects for details.

Now, if it turns out that your program is constantly using 100% CPU, and adding more threads just makes it slower, then you’re running into the GIL problem, so you need to switch to processes. All you have to do is change that first line:

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:

The only real caveat is that your jobs’ arguments and return values have to be pickleable (and not take too much time or memory to pickle) to be usable cross-process. Usually this isn’t a problem, but sometimes it is.


But what if your jobs can’t be self-contained? If you can design your code in terms of jobs that pass messages from one to another, it’s still pretty easy. You may have to use threading.Thread or multiprocessing.Process instead of relying on pools. And you will have to create queue.Queue or multiprocessing.Queue objects explicitly. (There are plenty of other options—pipes, sockets, files with flocks, … but the point is, you have to do something manually if the automatic magic of an Executor is insufficient.)

But what if you can’t even rely on message passing? What if you need two jobs to both mutate the same structure, and see each others’ changes? In that case, you will need to do manual synchronization (locks, semaphores, conditions, etc.) and, if you want to use processes, explicit shared-memory objects to boot. This is when multithreading (or multiprocessing) gets difficult. If you can avoid it, great; if you can’t, you will need to read more than someone can put into an SO answer.


From a comment, you wanted to know what’s different between threads and processes in Python. Really, if you read Giulio Franco’s answer and mine and all of our links, that should cover everything… but a summary would definitely be useful, so here goes:

  1. Threads share data by default; processes do not.
  2. As a consequence of (1), sending data between processes generally requires pickling and unpickling it.**
  3. As another consequence of (1), directly sharing data between processes generally requires putting it into low-level formats like Value, Array, and ctypes types.
  4. Processes are not subject to the GIL.
  5. On some platforms (mainly Windows), processes are much more expensive to create and destroy.
  6. There are some extra restrictions on processes, some of which are different on different platforms. See Programming guidelines for details.
  7. The threading module doesn’t have some of the features of the multiprocessing module. (You can use multiprocessing.dummy to get most of the missing API on top of threads, or you can use higher-level modules like concurrent.futures and not worry about it.)

* It’s not actually Python, the language, that has this issue, but CPython, the “standard” implementation of that language. Some other implementations don’t have a GIL, like Jython.

** If you’re using the fork start method for multiprocessing—which you can on most non-Windows platforms—each child process gets any resources the parent had when the child was started, which can be another way to pass data to children.


回答 1

一个进程中可以存在多个线程。属于同一进程的线程共享同一内存区域(可以读取和写入相同的变量,并且可以互相干扰)。相反,不同的进程驻留在不同的内存区域中,并且每个进程都有自己的变量。为了进行通信,进程必须使用其他通道(文件,管道或套接字)。

如果要并行化计算,则可能需要多线程处理,因为您可能希望线程在同一内存上进行协作。

说到性能,线程的创建和管理速度比进程要快(因为操作系统不需要分配整个新的虚拟内存区域),并且线程间通信通常比进程间通信快。但是线程很难编程。线程可以互相干扰,并且可以互相写入内存,但是这种情况并不总是很明显(由于多种因素,主要是指令重新排序和内存缓存),因此您将需要同步原语来控制访问您的变量。

Multiple threads can exist in a single process. The threads that belong to the same process share the same memory area (can read from and write to the very same variables, and can interfere with one another). On the contrary, different processes live in different memory areas, and each of them has its own variables. In order to communicate, processes have to use other channels (files, pipes or sockets).

If you want to parallelize a computation, you’re probably going to need multithreading, because you probably want the threads to cooperate on the same memory.

Speaking about performance, threads are faster to create and manage than processes (because the OS doesn’t need to allocate a whole new virtual memory area), and inter-thread communication is usually faster than inter-process communication. But threads are harder to program. Threads can interfere with one another, and can write to each other’s memory, but the way this happens is not always obvious (due to several factors, mainly instruction reordering and memory caching), and so you are going to need synchronization primitives to control access to your variables.


回答 2

我相信此链接可以优雅地回答您的问题。

简而言之,如果您的一个子问题不得不等待另一个子问题完成,那么多线程就可以了(例如,在I / O繁重的操作中);相反,如果您的子问题确实可能同时发生,则建议进行多处理。但是,创建的进程数不会超过核心数。

I believe this link answers your question in an elegant way.

To be short, if one of your sub-problems has to wait while another finishes, multithreading is good (in I/O heavy operations, for example); by contrast, if your sub-problems could really happen at the same time, multiprocessing is suggested. However, you won’t create more processes than your number of cores.


回答 3

Python文档引号

我在以下位置突出了有关Process vs Threads和GIL的重要Python文档报价:CPython中的全局解释器锁(GIL)是什么?

进程与线程实验

我做了一些基准测试,以便更具体地显示差异。

在基准测试中,我为8个超线程上的各种线程的CPU和IO绑定时间计时 CPU。每个线程提供的功总是相同的,因此,更多线程意味着提供更多的总功。

结果是:

在此处输入图片说明

绘制数据

结论:

  • 对于CPU限制的工作,多处理总是更快,大概是由于GIL

  • 用于IO绑定工作。两者的速度完全一样

  • 由于我在8个超线程计算机上,因此线程最多只能扩展到大约4倍,而不是预期的8倍。

    与C POSIX绑定的CPU工作达到预期的8倍加速相比,它是什么time(1)输出中的“ real”,“ user”和“ sys”是什么意思?

    TODO:我不知道是什么原因,一定还有其他Python低效率正在发挥作用。

测试代码:

#!/usr/bin/env python3

import multiprocessing
import threading
import time
import sys

def cpu_func(result, niters):
    '''
    A useless CPU bound function.
    '''
    for i in range(niters):
        result = (result * result * i + 2 * result * i * i + 3) % 10000000
    return result

class CpuThread(threading.Thread):
    def __init__(self, niters):
        super().__init__()
        self.niters = niters
        self.result = 1
    def run(self):
        self.result = cpu_func(self.result, self.niters)

class CpuProcess(multiprocessing.Process):
    def __init__(self, niters):
        super().__init__()
        self.niters = niters
        self.result = 1
    def run(self):
        self.result = cpu_func(self.result, self.niters)

class IoThread(threading.Thread):
    def __init__(self, sleep):
        super().__init__()
        self.sleep = sleep
        self.result = self.sleep
    def run(self):
        time.sleep(self.sleep)

class IoProcess(multiprocessing.Process):
    def __init__(self, sleep):
        super().__init__()
        self.sleep = sleep
        self.result = self.sleep
    def run(self):
        time.sleep(self.sleep)

if __name__ == '__main__':
    cpu_n_iters = int(sys.argv[1])
    sleep = 1
    cpu_count = multiprocessing.cpu_count()
    input_params = [
        (CpuThread, cpu_n_iters),
        (CpuProcess, cpu_n_iters),
        (IoThread, sleep),
        (IoProcess, sleep),
    ]
    header = ['nthreads']
    for thread_class, _ in input_params:
        header.append(thread_class.__name__)
    print(' '.join(header))
    for nthreads in range(1, 2 * cpu_count):
        results = [nthreads]
        for thread_class, work_size in input_params:
            start_time = time.time()
            threads = []
            for i in range(nthreads):
                thread = thread_class(work_size)
                threads.append(thread)
                thread.start()
            for i, thread in enumerate(threads):
                thread.join()
            results.append(time.time() - start_time)
        print(' '.join('{:.6e}'.format(result) for result in results))

GitHub上游+在同一目录上绘制代码

在带有CPU的Lenovo ThinkPad P51笔记本电脑上的Ubuntu 18.10,Python 3.6.7上进行了测试:Intel Core i7-7820HQ CPU(4核/ 8线程),RAM:2x三星M471A2K43BB1-CRC(2x 16GiB),SSD:Samsung MZVLB512HAJQ- 000L7(3,000 MB / s)。

可视化在给定时间正在运行的线程

这篇帖子https://rohanvarma.me/GIL/告诉我,只要调度线程的target=参数threading.Thread与相同,就可以运行回调multiprocessing.Process

这使我们可以精确查看每次运行哪个线程。完成此操作后,我们将看到类似的内容(我制作了此特定图形):

            +--------------------------------------+
            + Active threads / processes           +
+-----------+--------------------------------------+
|Thread   1 |********     ************             |
|         2 |        *****            *************|
+-----------+--------------------------------------+
|Process  1 |***  ************** ******  ****      |
|         2 |** **** ****** ** ********* **********|
+-----------+--------------------------------------+
            + Time -->                             +
            +--------------------------------------+

这将表明:

  • 线程由GIL完全序列化
  • 进程可以并行运行

Python documentation quotes

I’ve highlighted the key Python documentation quotes about Process vs Threads and the GIL at: What is the global interpreter lock (GIL) in CPython?

Process vs thread experiments

I did a bit of benchmarking in order to show the difference more concretely.

In the benchmark, I timed CPU and IO bound work for various numbers of threads on an 8 hyperthread CPU. The work supplied per thread is always the same, such that more threads means more total work supplied.

The results were:

enter image description here

Plot data.

Conclusions:

  • for CPU bound work, multiprocessing is always faster, presumably due to the GIL

  • for IO bound work. both are exactly the same speed

  • threads only scale up to about 4x instead of the expected 8x since I’m on an 8 hyperthread machine.

    Contrast that with a C POSIX CPU-bound work which reaches the expected 8x speedup: What do ‘real’, ‘user’ and ‘sys’ mean in the output of time(1)?

    TODO: I don’t know the reason for this, there must be other Python inefficiencies coming into play.

Test code:

#!/usr/bin/env python3

import multiprocessing
import threading
import time
import sys

def cpu_func(result, niters):
    '''
    A useless CPU bound function.
    '''
    for i in range(niters):
        result = (result * result * i + 2 * result * i * i + 3) % 10000000
    return result

class CpuThread(threading.Thread):
    def __init__(self, niters):
        super().__init__()
        self.niters = niters
        self.result = 1
    def run(self):
        self.result = cpu_func(self.result, self.niters)

class CpuProcess(multiprocessing.Process):
    def __init__(self, niters):
        super().__init__()
        self.niters = niters
        self.result = 1
    def run(self):
        self.result = cpu_func(self.result, self.niters)

class IoThread(threading.Thread):
    def __init__(self, sleep):
        super().__init__()
        self.sleep = sleep
        self.result = self.sleep
    def run(self):
        time.sleep(self.sleep)

class IoProcess(multiprocessing.Process):
    def __init__(self, sleep):
        super().__init__()
        self.sleep = sleep
        self.result = self.sleep
    def run(self):
        time.sleep(self.sleep)

if __name__ == '__main__':
    cpu_n_iters = int(sys.argv[1])
    sleep = 1
    cpu_count = multiprocessing.cpu_count()
    input_params = [
        (CpuThread, cpu_n_iters),
        (CpuProcess, cpu_n_iters),
        (IoThread, sleep),
        (IoProcess, sleep),
    ]
    header = ['nthreads']
    for thread_class, _ in input_params:
        header.append(thread_class.__name__)
    print(' '.join(header))
    for nthreads in range(1, 2 * cpu_count):
        results = [nthreads]
        for thread_class, work_size in input_params:
            start_time = time.time()
            threads = []
            for i in range(nthreads):
                thread = thread_class(work_size)
                threads.append(thread)
                thread.start()
            for i, thread in enumerate(threads):
                thread.join()
            results.append(time.time() - start_time)
        print(' '.join('{:.6e}'.format(result) for result in results))

GitHub upstream + plotting code on same directory.

Tested on Ubuntu 18.10, Python 3.6.7, in a Lenovo ThinkPad P51 laptop with CPU: Intel Core i7-7820HQ CPU (4 cores / 8 threads), RAM: 2x Samsung M471A2K43BB1-CRC (2x 16GiB), SSD: Samsung MZVLB512HAJQ-000L7 (3,000 MB/s).

Visualize which threads are running at a given time

This post https://rohanvarma.me/GIL/ taught me that you can run a callback whenever a thread is scheduled with the target= argument of threading.Thread and the same for multiprocessing.Process.

This allows us to view exactly which thread runs at each time. When this is done, we would see something like (I made this particular graph up):

            +--------------------------------------+
            + Active threads / processes           +
+-----------+--------------------------------------+
|Thread   1 |********     ************             |
|         2 |        *****            *************|
+-----------+--------------------------------------+
|Process  1 |***  ************** ******  ****      |
|         2 |** **** ****** ** ********* **********|
+-----------+--------------------------------------+
            + Time -->                             +
            +--------------------------------------+

which would show that:

  • threads are fully serialized by the GIL
  • processes can run in parallel

回答 4

这是python 2.6.x的一些性能数据,这些数据使人们质疑在IO绑定的情况下线程比多处理的性能更高。这些结果来自40个处理器的IBM System x3650 M4 BD。

IO绑定处理:进程池的性能优于线程池

>>> do_work(50, 300, 'thread','fileio')
do_work function took 455.752 ms

>>> do_work(50, 300, 'process','fileio')
do_work function took 319.279 ms

CPU限制处理:进程池的性能优于线程池

>>> do_work(50, 2000, 'thread','square')
do_work function took 338.309 ms

>>> do_work(50, 2000, 'process','square')
do_work function took 287.488 ms

这些不是严格的测试,但是它们告诉我,与线程处理相比,多处理并非完全没有表现。

交互式python控制台中用于上述测试的代码

from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
import time
import sys
import os
from glob import glob

text_for_test = str(range(1,100000))

def fileio(i):
 try :
  os.remove(glob('./test/test-*'))
 except : 
  pass
 f=open('./test/test-'+str(i),'a')
 f.write(text_for_test)
 f.close()
 f=open('./test/test-'+str(i),'r')
 text = f.read()
 f.close()


def square(i):
 return i*i

def timing(f):
 def wrap(*args):
  time1 = time.time()
  ret = f(*args)
  time2 = time.time()
  print '%s function took %0.3f ms' % (f.func_name, (time2-time1)*1000.0)
  return ret
 return wrap

result = None

@timing
def do_work(process_count, items, process_type, method) :
 pool = None
 if process_type == 'process' :
  pool = Pool(processes=process_count)
 else :
  pool = ThreadPool(processes=process_count)
 if method == 'square' : 
  multiple_results = [pool.apply_async(square,(a,)) for a in range(1,items)]
  result = [res.get()  for res in multiple_results]
 else :
  multiple_results = [pool.apply_async(fileio,(a,)) for a in range(1,items)]
  result = [res.get()  for res in multiple_results]


do_work(50, 300, 'thread','fileio')
do_work(50, 300, 'process','fileio')

do_work(50, 2000, 'thread','square')
do_work(50, 2000, 'process','square')

Here’s some performance data for python 2.6.x that calls to question the notion that threading is more performant that multiprocessing in IO-bound scenarios. These results are from a 40-processor IBM System x3650 M4 BD.

IO-Bound Processing : Process Pool performed better than Thread Pool

>>> do_work(50, 300, 'thread','fileio')
do_work function took 455.752 ms

>>> do_work(50, 300, 'process','fileio')
do_work function took 319.279 ms

CPU-Bound Processing : Process Pool performed better than Thread Pool

>>> do_work(50, 2000, 'thread','square')
do_work function took 338.309 ms

>>> do_work(50, 2000, 'process','square')
do_work function took 287.488 ms

These aren’t rigorous tests, but they tell me that multiprocessing isn’t entirely unperformant in comparison to threading.

Code used in the interactive python console for the above tests

from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
import time
import sys
import os
from glob import glob

text_for_test = str(range(1,100000))

def fileio(i):
 try :
  os.remove(glob('./test/test-*'))
 except : 
  pass
 f=open('./test/test-'+str(i),'a')
 f.write(text_for_test)
 f.close()
 f=open('./test/test-'+str(i),'r')
 text = f.read()
 f.close()


def square(i):
 return i*i

def timing(f):
 def wrap(*args):
  time1 = time.time()
  ret = f(*args)
  time2 = time.time()
  print '%s function took %0.3f ms' % (f.func_name, (time2-time1)*1000.0)
  return ret
 return wrap

result = None

@timing
def do_work(process_count, items, process_type, method) :
 pool = None
 if process_type == 'process' :
  pool = Pool(processes=process_count)
 else :
  pool = ThreadPool(processes=process_count)
 if method == 'square' : 
  multiple_results = [pool.apply_async(square,(a,)) for a in range(1,items)]
  result = [res.get()  for res in multiple_results]
 else :
  multiple_results = [pool.apply_async(fileio,(a,)) for a in range(1,items)]
  result = [res.get()  for res in multiple_results]


do_work(50, 300, 'thread','fileio')
do_work(50, 300, 'process','fileio')

do_work(50, 2000, 'thread','square')
do_work(50, 2000, 'process','square')

回答 5

好吧,朱利奥·佛朗哥(Giulio Franco)回答了大多数问题。我将进一步阐述消费者-生产者问题,我想这将使您走上使用多线程应用程序的解决方案的正确轨道。

fill_count = Semaphore(0) # items produced
empty_count = Semaphore(BUFFER_SIZE) # remaining space
buffer = Buffer()

def producer(fill_count, empty_count, buffer):
    while True:
        item = produceItem()
        empty_count.down();
        buffer.push(item)
        fill_count.up()

def consumer(fill_count, empty_count, buffer):
    while True:
        fill_count.down()
        item = buffer.pop()
        empty_count.up()
        consume_item(item)

您可以从以下网站阅读有关同步原语的更多信息:

 http://linux.die.net/man/7/sem_overview
 http://docs.python.org/2/library/threading.html

伪代码在上面。我想您应该搜索生产者-消费者问题以获取更多参考。

Well, most of the question is answered by Giulio Franco. I will further elaborate on the consumer-producer problem, which I suppose will put you on the right track for your solution to using a multithreaded app.

fill_count = Semaphore(0) # items produced
empty_count = Semaphore(BUFFER_SIZE) # remaining space
buffer = Buffer()

def producer(fill_count, empty_count, buffer):
    while True:
        item = produceItem()
        empty_count.down();
        buffer.push(item)
        fill_count.up()

def consumer(fill_count, empty_count, buffer):
    while True:
        fill_count.down()
        item = buffer.pop()
        empty_count.up()
        consume_item(item)

You could read more on the synchronization primitives from:

 http://linux.die.net/man/7/sem_overview
 http://docs.python.org/2/library/threading.html

The pseudocode is above. I suppose you should search the producer-consumer-problem to get more references.


使用多重处理Pool.map()时无法腌制

问题:使用多重处理Pool.map()时无法腌制

我正在尝试使用multiprocessingPool.map()功能同时划分工作。当我使用以下代码时,它可以正常工作:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

但是,当我以更加面向对象的方式使用它时,它将无法正常工作。它给出的错误信息是:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

当以下是我的主程序时,会发生这种情况:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

这是我的someClass课:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

任何人都知道问题可能是什么,或解决问题的简单方法?

I’m trying to use multiprocessing‘s Pool.map() function to divide out work simultaneously. When I use the following code, it works fine:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

However, when I use it in a more object-oriented approach, it doesn’t work. The error message it gives is:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

This occurs when the following is my main program:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

and the following is my someClass class:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

Anyone know what the problem could be, or an easy way around it?


回答 0

问题在于,多处理必须使进程中的事物腌制,并且绑定的方法不可腌制。解决方法(无论您是否认为它“简单” ;-)是向您的程序中添加基础结构,以允许对这些方法进行腌制,并使用copy_reg标准库方法进行注册。

例如,史蒂文·贝萨德(Steven Bethard)对这个线程的贡献(接近线程的结尾)显示了一种非常可行的方法,该方法允许通过进行酸洗/取消酸洗copy_reg

The problem is that multiprocessing must pickle things to sling them among processes, and bound methods are not picklable. The workaround (whether you consider it “easy” or not;-) is to add the infrastructure to your program to allow such methods to be pickled, registering it with the copy_reg standard library method.

For example, Steven Bethard’s contribution to this thread (towards the end of the thread) shows one perfectly workable approach to allow method pickling/unpickling via copy_reg.


回答 1

所有这些解决方案都是丑陋的,因为除非您跳出标准库,否则多重处理和酸洗会受到破坏和限制。

如果您使用的一个分支multiprocessingpathos.multiprocesssing,你可以直接使用类和类方法在多处理的map功能。这是因为dill用代替picklecPickle,并且dill可以在python中序列化几乎所有内容。

pathos.multiprocessing还提供了异步映射功能…,并且可以map使用多个参数(例如map(math.pow, [1,2,3], [4,5,6]))运行

请参阅: 多重处理和莳萝可以一起做什么?

和:http : //matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

明确地说,您可以完全按照自己的意愿进行操作,如果需要,可以从解释器中进行操作。

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

在此处获取代码:https : //github.com/uqfoundation/pathos

All of these solutions are ugly because multiprocessing and pickling is broken and limited unless you jump outside the standard library.

If you use a fork of multiprocessing called pathos.multiprocesssing, you can directly use classes and class methods in multiprocessing’s map functions. This is because dill is used instead of pickle or cPickle, and dill can serialize almost anything in python.

pathos.multiprocessing also provides an asynchronous map function… and it can map functions with multiple arguments (e.g. map(math.pow, [1,2,3], [4,5,6]))

See: What can multiprocessing and dill do together?

and: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

And just to be explicit, you can do exactly want you wanted to do in the first place, and you can do it from the interpreter, if you wanted to.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

Get the code here: https://github.com/uqfoundation/pathos


回答 2

您还可以__call__()在中定义一个方法someClass(),该方法会调用someClass.go(),然后将的实例传递someClass()给池。该对象是可腌制的,并且对我来说很好用…

You could also define a __call__() method inside your someClass(), which calls someClass.go() and then pass an instance of someClass() to the pool. This object is pickleable and it works fine (for me)…


回答 3

尽管对史蒂文·贝萨德的解决方案有一些限制:

当您将类方法注册为函数时,每次您的方法处理完成时,都会令人惊讶地调用类的析构函数。因此,如果您有一个类的实例调用其方法的n倍,则成员可能在两次运行之间消失,并且可能会收到一条消息malloc: *** error for object 0x...: pointer being freed was not allocated(例如,打开的成员文件)或pure virtual method called, terminate called without an active exception(这意味着我使用的成员对象的生存期短于我的想法)。当处理大于池大小的n时,我得到了这个。这是一个简短的示例:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

输出:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

__call__方法不是那么等效,因为从结果中读取了[None,…]:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

所以这两种方法都不令人满意…

Some limitations though to Steven Bethard’s solution :

When you register your class method as a function, the destructor of your class is surprisingly called every time your method processing is finished. So if you have 1 instance of your class that calls n times its method, members may disappear between 2 runs and you may get a message malloc: *** error for object 0x...: pointer being freed was not allocated (e.g. open member file) or pure virtual method called, terminate called without an active exception (which means than the lifetime of a member object I used was shorter than what I thought). I got this when dealing with n greater than the pool size. Here is a short example :

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

Output:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

The __call__ method is not so equivalent, because [None,…] are read from the results :

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

So none of both methods is satisfying…


回答 4

您可以使用另一种快捷方式,尽管根据类实例中的内容,效率可能会很低。

就像大家都说过的那样,问题在于multiprocessing代码必须对发送给它已启动的子流程的内容进行腌制,并且腌制者不执行实例方法。

但是,除了发送实例方法之外,您可以将实际的类实例以及要调用的函数的名称发送到一个普通函数,该普通函数随后用于getattr调用该实例方法,从而在Pool子进程中创建绑定方法。这类似于定义__call__方法,不同之处在于您可以调用多个成员函数。

从所有答案中窃取@EricH。的代码并对其进行注释(我重新输入了代码,因此所有名称都更改了,因此,由于某种原因,这似乎比剪切粘贴更容易:-))来说明所有魔术:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

输出显示,确实,构造函数被调用一次(在原始pid中),而析构函数被调用9次(每次复制一次=根据需要每个pool-worker-process 2到3次,在原始副本中被调用一次处理)。通常是可以的,因为在这种情况下,默认的选择器会复制整个实例,然后(半)秘密地重新填充它,在这种情况下,请执行以下操作:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

这就是为什么即使在三个工作进程中都调用了析构函数八次,每次都将析构函数从1递减为0的原因,但是当然您仍然会遇到这种麻烦。如有必要,您可以提供自己的__setstate__

    def __setstate__(self, adict):
        self.count = adict['count']

例如在这种情况下。

There’s another short-cut you can use, although it can be inefficient depending on what’s in your class instances.

As everyone has said the problem is that the multiprocessing code has to pickle the things that it sends to the sub-processes it has started, and the pickler doesn’t do instance-methods.

However, instead of sending the instance-method, you can send the actual class instance, plus the name of the function to call, to an ordinary function that then uses getattr to call the instance-method, thus creating the bound method in the Pool subprocess. This is similar to defining a __call__ method except that you can call more than one member function.

Stealing @EricH.’s code from his answer and annotating it a bit (I retyped it hence all the name changes and such, for some reason this seemed easier than cut-and-paste :-) ) for illustration of all the magic:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

The output shows that, indeed, the constructor is called once (in the original pid) and the destructor is called 9 times (once for each copy made = 2 or 3 times per pool-worker-process as needed, plus once in the original process). This is often OK, as in this case, since the default pickler makes a copy of the entire instance and (semi-) secretly re-populates it—in this case, doing:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

—that’s why even though the destructor is called eight times in the three worker processes, it counts down from 1 to 0 each time—but of course you can still get into trouble this way. If necessary, you can provide your own __setstate__:

    def __setstate__(self, adict):
        self.count = adict['count']

in this case for instance.


回答 5

您还可以__call__()在中定义一个方法someClass(),该方法会调用someClass.go(),然后将的实例传递someClass()给池。该对象是可腌制的,并且对我来说很好用…

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()

You could also define a __call__() method inside your someClass(), which calls someClass.go() and then pass an instance of someClass() to the pool. This object is pickleable and it works fine (for me)…

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()

回答 6

上面parisjohn的解决方案对我来说很好用。另外,代码看起来干净而且易于理解。就我而言,有一些使用Pool调用的函数,因此我在下面修改了parisjohn的代码。我进行了调用,以便能够调用多个函数,并且函数名称从传入自变量dict中go()

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()

The solution from parisjohn above works fine with me. Plus the code looks clean and easy to understand. In my case there are a few functions to call using Pool, so I modified parisjohn’s code a bit below. I made call to be able to call several functions, and the function names are passed in the argument dict from go():

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()

回答 7

一个可能的解决方案是切换到using multiprocessing.dummy。这是多处理接口的基于线程的实现,在python 2.7中似乎没有此问题。我在这里经验不足,但是快速的导入更改使我可以在类方法上调用apply_async。

以下是一些很好的资源multiprocessing.dummy

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/

A potentially trivial solution to this is to switch to using multiprocessing.dummy. This is a thread based implementation of the multiprocessing interface that doesn’t seem to have this problem in Python 2.7. I don’t have a lot of experience here, but this quick import change allowed me to call apply_async on a class method.

A few good resources on multiprocessing.dummy:

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/


回答 8

在这种简单的情况下,someClass.f不从类继承任何数据并且不向类附加任何内容,一种可能的解决方案是将out分离出来f,以便对其进行腌制:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))

In this simple case, where someClass.f is not inheriting any data from the class and not attaching anything to the class, a possible solution would be to separate out f, so it can be pickled:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))

回答 9

为什么不使用单独的func?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)

Why not to use separate func?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)

回答 10

我遇到了同样的问题,但发现有一个JSON编码器可用于在进程之间移动这些对象。

from pyVmomi.VmomiSupport import VmomiJSONEncoder

使用它来创建您的列表:

jsonSerialized = json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)

然后在映射函数中,使用它来恢复对象:

pfVmomiObj = json.loads(jsonSerialized)

I ran into this same issue but found out that there is a JSON encoder that can be used to move these objects between processes.

from pyVmomi.VmomiSupport import VmomiJSONEncoder

Use this to create your list:

jsonSerialized = json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)

Then in the mapped function, use this to recover the object:

pfVmomiObj = json.loads(jsonSerialized)

回答 11

更新:截至撰写本文之日,namedTuples可供选择(从python 2.7开始)

这里的问题是子进程无法导入对象的类-在这种情况下,是类P-,在多模型项目的情况下,类P应该可以在使用子进程的任何地方导入

一个快速的解决方法是通过将其影响到globals()使其可导入

globals()["P"] = P

Update: as of the day of this writing, namedTuples are pickable (starting with python 2.7)

The issue here is the child processes aren’t able to import the class of the object -in this case, the class P-, in the case of a multi-model project the Class P should be importable anywhere the child process get used

a quick workaround is to make it importable by affecting it to globals()

globals()["P"] = P

在Python的调用者线程中捕获线程的异常

问题:在Python的调用者线程中捕获线程的异常

我对Python和多线程编程非常陌生。基本上,我有一个脚本可以将文件复制到另一个位置。我希望将其放置在另一个线程中,以便可以输出....以指示脚本仍在运行。

我遇到的问题是,如果无法复制文件,它将引发异常。如果在主线程中运行,这没关系;但是,具有以下代码不起作用:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

在线程类本身中,我试图重新引发异常,但是它不起作用。我已经看到这里的人问类似的问题,但是他们似乎都在做比我想做的事情更具体的事情(而且我不太了解所提供的解决方案)。我见过有人提到的用法sys.exc_info(),但是我不知道在哪里或如何使用它。

非常感谢所有帮助!

编辑:线程类的代码如下:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder

    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise

I’m very new to Python and multithreaded programming in general. Basically, I have a script that will copy files to another location. I would like this to be placed in another thread so I can output .... to indicate that the script is still running.

The problem that I am having is that if the files cannot be copied it will throw an exception. This is ok if running in the main thread; however, having the following code does not work:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

In the thread class itself, I tried to re-throw the exception, but it does not work. I have seen people on here ask similar questions, but they all seem to be doing something more specific than what I am trying to do (and I don’t quite understand the solutions offered). I have seen people mention the usage of sys.exc_info(), however I do not know where or how to use it.

All help is greatly appreciated!

EDIT: The code for the thread class is below:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder

    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise

回答 0

问题是thread_obj.start()立即返回。您产生的子线程在其自己的上下文中执行,并带有自己的堆栈。在那里发生的任何异常都在子线程的上下文中,并且在其自己的堆栈中。我现在可以想到的一种将此信息传达给父线程的方法是使用某种消息传递,因此您可能会对此进行研究。

尝试以下尺寸:

import sys
import threading
import Queue


class ExcThread(threading.Thread):

    def __init__(self, bucket):
        threading.Thread.__init__(self)
        self.bucket = bucket

    def run(self):
        try:
            raise Exception('An error occured here.')
        except Exception:
            self.bucket.put(sys.exc_info())


def main():
    bucket = Queue.Queue()
    thread_obj = ExcThread(bucket)
    thread_obj.start()

    while True:
        try:
            exc = bucket.get(block=False)
        except Queue.Empty:
            pass
        else:
            exc_type, exc_obj, exc_trace = exc
            # deal with the exception
            print exc_type, exc_obj
            print exc_trace

        thread_obj.join(0.1)
        if thread_obj.isAlive():
            continue
        else:
            break


if __name__ == '__main__':
    main()

The problem is that thread_obj.start() returns immediately. The child thread that you spawned executes in its own context, with its own stack. Any exception that occurs there is in the context of the child thread, and it is in its own stack. One way I can think of right now to communicate this information to the parent thread is by using some sort of message passing, so you might look into that.

Try this on for size:

import sys
import threading
import Queue


class ExcThread(threading.Thread):

    def __init__(self, bucket):
        threading.Thread.__init__(self)
        self.bucket = bucket

    def run(self):
        try:
            raise Exception('An error occured here.')
        except Exception:
            self.bucket.put(sys.exc_info())


def main():
    bucket = Queue.Queue()
    thread_obj = ExcThread(bucket)
    thread_obj.start()

    while True:
        try:
            exc = bucket.get(block=False)
        except Queue.Empty:
            pass
        else:
            exc_type, exc_obj, exc_trace = exc
            # deal with the exception
            print exc_type, exc_obj
            print exc_trace

        thread_obj.join(0.1)
        if thread_obj.isAlive():
            continue
        else:
            break


if __name__ == '__main__':
    main()

回答 1

通过该concurrent.futures模块,可以轻松地在单独的线程(或进程)中进行工作并处理任何导致的异常:

import concurrent.futures
import shutil

def copytree_with_dots(src_path, dst_path):
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Execute the copy on a separate thread,
        # creating a future object to track progress.
        future = executor.submit(shutil.copytree, src_path, dst_path)

        while future.running():
            # Print pretty dots here.
            pass

        # Return the value returned by shutil.copytree(), None.
        # Raise any exceptions raised during the copy process.
        return future.result()

concurrent.futures包含在Python 3.2中,并且可以作为早期版本的向后移植futures模块

The concurrent.futures module makes it simple to do work in separate threads (or processes) and handle any resulting exceptions:

import concurrent.futures
import shutil

def copytree_with_dots(src_path, dst_path):
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Execute the copy on a separate thread,
        # creating a future object to track progress.
        future = executor.submit(shutil.copytree, src_path, dst_path)

        while future.running():
            # Print pretty dots here.
            pass

        # Return the value returned by shutil.copytree(), None.
        # Raise any exceptions raised during the copy process.
        return future.result()

concurrent.futures is included with Python 3.2, and is available as the backported futures module for earlier versions.


回答 2

这个问题有很多非常复杂的答案。我是否对此简化了,因为这对我来说大多数事情似乎已经足够。

from threading import Thread

class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            if hasattr(self, '_Thread__target'):
                # Thread uses name mangling prior to Python 3.
                self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            else:
                self.ret = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            self.exc = e

    def join(self):
        super(PropagatingThread, self).join()
        if self.exc:
            raise self.exc
        return self.ret

如果确定只能在一个或另一个版本的Python上运行,则可以将该run()方法缩减为仅损坏的版本(如果仅在3之前的Python版本上运行),或者只是干净的版本(如果您只能在以3开头的Python版本上运行)。

用法示例:

def f(*args, **kwargs):
    print(args)
    print(kwargs)
    raise Exception('I suck at this')

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()

当您加入时,您将看到另一个线程引发的异常。

如果仅使用sixPython 3或仅在Python 3上使用,则可以改善重新引发异常时所获得的堆栈跟踪信息。您可以将内部异常包装在新的外部异常中,而不是仅在连接时使用堆栈,并使用

six.raise_from(RuntimeError('Exception in thread'),self.exc)

要么

raise RuntimeError('Exception in thread') from self.exc

There are a lot of really weirdly complicated answers to this question. Am I oversimplifying this, because this seems sufficient for most things to me.

from threading import Thread

class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            if hasattr(self, '_Thread__target'):
                # Thread uses name mangling prior to Python 3.
                self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            else:
                self.ret = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            self.exc = e

    def join(self):
        super(PropagatingThread, self).join()
        if self.exc:
            raise self.exc
        return self.ret

If you’re certain you’ll only ever be running on one or the other version of Python, you could reduce the run() method down to just the mangled version (if you’ll only be running on versions of Python before 3), or just the clean version (if you’ll only be running on versions of Python starting with 3).

Example usage:

def f(*args, **kwargs):
    print(args)
    print(kwargs)
    raise Exception('I suck at this')

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()

And you’ll see the exception raised on the other thread when you join.

If you are using six or on Python 3 only, you can improve the stack trace information you get when the exception is re-raised. Instead of only the stack at the point of the join, you can wrap the inner exception in a new outer exception, and get both stack traces with

six.raise_from(RuntimeError('Exception in thread'),self.exc)

or

raise RuntimeError('Exception in thread') from self.exc

回答 3

尽管不可能直接捕获在不同线程中引发的异常,但是这里的代码可以透明地获取与该功能非常接近的内容。在等待线程完成其工作时,您的子线程必须ExThread代替该类的子类,threading.Thread并且父线程必须调用该child_thread.join_with_exception()方法,而不是调用该方法child_thread.join()

此实现的技术细节:当子线程引发异常时,它将通过a传递给Queue父线程,然后再次在父线程中引发。请注意,这种方法无需等待。

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except BaseException:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    def run_with_exception(self):
        thread_name = threading.current_thread().name
        raise MyException("An error in thread '{}'.".format(thread_name))

def main():
    t = MyThread()
    t.start()
    try:
        t.join_with_exception()
    except MyException as ex:
        thread_name = threading.current_thread().name
        print "Caught a MyException in thread '{}': {}".format(thread_name, ex)

if __name__ == '__main__':
    main()

Although it is not possible to directly catch an exception thrown in a different thread, here’s a code to quite transparently obtain something very close to this functionality. Your child thread must subclass the ExThread class instead of threading.Thread and the parent thread must call the child_thread.join_with_exception() method instead of child_thread.join() when waiting for the thread to finish its job.

Technical details of this implementation: when the child thread throws an exception, it is passed to the parent through a Queue and thrown again in the parent thread. Notice that there’s no busy waiting in this approach .

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except BaseException:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    def run_with_exception(self):
        thread_name = threading.current_thread().name
        raise MyException("An error in thread '{}'.".format(thread_name))

def main():
    t = MyThread()
    t.start()
    try:
        t.join_with_exception()
    except MyException as ex:
        thread_name = threading.current_thread().name
        print "Caught a MyException in thread '{}': {}".format(thread_name, ex)

if __name__ == '__main__':
    main()

回答 4

如果线程中发生异常,最好的方法是在期间在调用者线程中重新引发它join。您可以使用该sys.exc_info()函数获取有关当前正在处理的异常的信息。该信息可以简单地存储为线程对象的属性,直到join被调用为止,此时可以重新引发该信息。

请注意,Queue.Queue在这种简单的情况下,线程最多引发1个异常,在引发异常后立即完成,因此不需要(如其他答案中所建议的)。我们只需等待线程完成即可避免出现竞争情况。

例如,扩展ExcThread(在下面),覆盖excRun(而不是run)。

Python 2.x:

import threading

class ExcThread(threading.Thread):
  def excRun(self):
    pass

  def run(self):
    self.exc = None
    try:
      # Possibly throws an exception
      self.excRun()
    except:
      import sys
      self.exc = sys.exc_info()
      # Save details of the exception thrown but don't rethrow,
      # just complete the function

  def join(self):
    threading.Thread.join(self)
    if self.exc:
      msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
      new_exc = Exception(msg)
      raise new_exc.__class__, new_exc, self.exc[2]

Python 3.x:

的3参数形式raise在Python 3中消失了,因此将最后一行更改为:

raise new_exc.with_traceback(self.exc[2])

If an exception occurs in a thread, the best way is to re-raise it in the caller thread during join. You can get information about the exception currently being handled using the sys.exc_info() function. This information can simply be stored as a property of the thread object until join is called, at which point it can be re-raised.

Note that a Queue.Queue (as suggested in other answers) is not necessary in this simple case where the thread throws at most 1 exception and completes right after throwing an exception. We avoid race conditions by simply waiting for the thread to complete.

For example, extend ExcThread (below), overriding excRun (instead of run).

Python 2.x:

import threading

class ExcThread(threading.Thread):
  def excRun(self):
    pass

  def run(self):
    self.exc = None
    try:
      # Possibly throws an exception
      self.excRun()
    except:
      import sys
      self.exc = sys.exc_info()
      # Save details of the exception thrown but don't rethrow,
      # just complete the function

  def join(self):
    threading.Thread.join(self)
    if self.exc:
      msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
      new_exc = Exception(msg)
      raise new_exc.__class__, new_exc, self.exc[2]

Python 3.x:

The 3 argument form for raise is gone in Python 3, so change the last line to:

raise new_exc.with_traceback(self.exc[2])

回答 5

concurrent.futures.as_completed

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

以下解决方法:

  • 调用异常后立即返回主线程
  • 不需要额外的用户定义类,因为它不需要:
    • 明确的 Queue
    • 在工作线程周围添加一个else

资源:

#!/usr/bin/env python3

import concurrent.futures
import time

def func_that_raises(do_raise):
    for i in range(3):
        print(i)
        time.sleep(0.1)
    if do_raise:
        raise Exception()
    for i in range(3):
        print(i)
        time.sleep(0.1)

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    futures.append(executor.submit(func_that_raises, False))
    futures.append(executor.submit(func_that_raises, True))
    for future in concurrent.futures.as_completed(futures):
        print(repr(future.exception()))

可能的输出:

0
0
1
1
2
2
0
Exception()
1
2
None

不幸的是,由于一个交易失败,不可能杀死期货以取消其他交易:

如果您执行以下操作:

for future in concurrent.futures.as_completed(futures):
    if future.exception() is not None:
        raise future.exception()

然后with捕获它,并等待第二个线程完成后再继续。以下行为类似:

for future in concurrent.futures.as_completed(futures):
    future.result()

因为future.result()如果发生一个异常,则会重新引发异常。

如果您想退出整个Python过程,则可以使用os._exit(0),但是这可能意味着您需要重构。

具有完美异常语义的自定义类

我最终在以下方面为自己编写了一个完美的接口:限制一次运行的最大线程数的正确方法?部分“带有错误处理的队列示例”。该类旨在既方便,又使您可以完全控制提交和结果/错误处理。

在Python 3.6.7,Ubuntu 18.04上进行了测试。

concurrent.futures.as_completed

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

The following solution:

  • returns to the main thread immediately when an exception is called
  • requires no extra user defined classes because it does not need:
    • an explicit Queue
    • to add an except else around your work thread

Source:

#!/usr/bin/env python3

import concurrent.futures
import time

def func_that_raises(do_raise):
    for i in range(3):
        print(i)
        time.sleep(0.1)
    if do_raise:
        raise Exception()
    for i in range(3):
        print(i)
        time.sleep(0.1)

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    futures.append(executor.submit(func_that_raises, False))
    futures.append(executor.submit(func_that_raises, True))
    for future in concurrent.futures.as_completed(futures):
        print(repr(future.exception()))

Possible output:

0
0
1
1
2
2
0
Exception()
1
2
None

It is unfortunately not possible to kill futures to cancel the others as one fails:

If you do something like:

for future in concurrent.futures.as_completed(futures):
    if future.exception() is not None:
        raise future.exception()

then the with catches it, and waits for the second thread to finish before continuing. The following behaves similarly:

for future in concurrent.futures.as_completed(futures):
    future.result()

since future.result() re-raises the exception if one occurred.

If you want to quit the entire Python process, you might get away with os._exit(0), but this likely means you need a refactor.

Custom class with perfect exception semantics

I ended up coding the perfect interface for myself at: The right way to limit maximum number of threads running at once? section “Queue example with error handling”. That class aims to be both convenient, and give you total control over submission and result / error handling.

Tested on Python 3.6.7, Ubuntu 18.04.


回答 6

这是一个令人讨厌的小问题,我想提出自己的解决方案。我发现的其他一些解决方案(例如async.io)看起来很有希望,但也有一些黑匣子。队列/事件循环方法将您与特定实现联系在一起。但是,并发的期货源代码大约只有1000行,并且很容易理解。它使我可以轻松地解决我的问题:无需进行过多设置即可创建临时工作线程,并能够捕获主线程中的异常。

我的解决方案使用并发的期货API和线程API。它允许您创建一个工作线程,为您提供线程和未来。这样,您可以加入线程以等待结果:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

…或者您可以让工作人员在完成后仅发送回调:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

…或者您可以循环播放直到事件结束:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)

这是代码:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())

…以及测试功能:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')

This was a nasty little problem, and I’d like to throw my solution in. Some other solutions I found (async.io for example) looked promising but also presented a bit of a black box. The queue / event loop approach sort of ties you to a certain implementation. The concurrent futures source code, however, is around only 1000 lines, and easy to comprehend. It allowed me to easily solve my problem: create ad-hoc worker threads without much setup, and to be able to catch exceptions in the main thread.

My solution uses the concurrent futures API and threading API. It allows you to create a worker which gives you both the thread and the future. That way, you can join the thread to wait for the result:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

…or you can let the worker just send a callback when done:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

…or you can loop until the event completes:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)

Here’s the code:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())

…and the test function:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')

回答 7

作为Threading的入门者,我花了很长时间了解如何实现Mateusz Kobos的代码(上述)。这是一个澄清的版本,可帮助您了解如何使用它。

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except Exception:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    # This overrides the "run_with_exception" from class "ExThread"
    # Note, this is where the actual thread to be run lives. The thread
    # to be run could also call a method or be passed in as an object
    def run_with_exception(self):
        # Code will function until the int
        print "sleeping 5 seconds"
        import time
        for i in 1, 2, 3, 4, 5:
            print i
            time.sleep(1) 
        # Thread should break here
        int("str")
# I'm honestly not sure why these appear here? So, I removed them. 
# Perhaps Mateusz can clarify?        
#         thread_name = threading.current_thread().name
#         raise MyException("An error in thread '{}'.".format(thread_name))

if __name__ == '__main__':
    # The code lives in MyThread in this example. So creating the MyThread 
    # object set the code to be run (but does not start it yet)
    t = MyThread()
    # This actually starts the thread
    t.start()
    print
    print ("Notice 't.start()' is considered to have completed, although" 
           " the countdown continues in its new thread. So you code "
           "can tinue into new processing.")
    # Now that the thread is running, the join allows for monitoring of it
    try:
        t.join_with_exception()
    # should be able to be replace "Exception" with specific error (untested)
    except Exception, e: 
        print
        print "Exceptioon was caught and control passed back to the main thread"
        print "Do some handling here...or raise a custom exception "
        thread_name = threading.current_thread().name
        e = ("Caught a MyException in thread: '" + 
             str(thread_name) + 
             "' [" + str(e) + "]")
        raise Exception(e) # Or custom class of exception, such as MyException

As a noobie to Threading, it took me a long time to understand how to implement Mateusz Kobos’s code (above). Here’s a clarified version to help understand how to use it.

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except Exception:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    # This overrides the "run_with_exception" from class "ExThread"
    # Note, this is where the actual thread to be run lives. The thread
    # to be run could also call a method or be passed in as an object
    def run_with_exception(self):
        # Code will function until the int
        print "sleeping 5 seconds"
        import time
        for i in 1, 2, 3, 4, 5:
            print i
            time.sleep(1) 
        # Thread should break here
        int("str")
# I'm honestly not sure why these appear here? So, I removed them. 
# Perhaps Mateusz can clarify?        
#         thread_name = threading.current_thread().name
#         raise MyException("An error in thread '{}'.".format(thread_name))

if __name__ == '__main__':
    # The code lives in MyThread in this example. So creating the MyThread 
    # object set the code to be run (but does not start it yet)
    t = MyThread()
    # This actually starts the thread
    t.start()
    print
    print ("Notice 't.start()' is considered to have completed, although" 
           " the countdown continues in its new thread. So you code "
           "can tinue into new processing.")
    # Now that the thread is running, the join allows for monitoring of it
    try:
        t.join_with_exception()
    # should be able to be replace "Exception" with specific error (untested)
    except Exception, e: 
        print
        print "Exceptioon was caught and control passed back to the main thread"
        print "Do some handling here...or raise a custom exception "
        thread_name = threading.current_thread().name
        e = ("Caught a MyException in thread: '" + 
             str(thread_name) + 
             "' [" + str(e) + "]")
        raise Exception(e) # Or custom class of exception, such as MyException

回答 8

类似于RickardSjogren的没有Queue,sys等的方式,但是也没有一些信号侦听器:直接执行与except块相对应的异常处理程序。

#!/usr/bin/env python3

import threading

class ExceptionThread(threading.Thread):

    def __init__(self, callback=None, *args, **kwargs):
        """
        Redirect exceptions of thread to an exception handler.

        :param callback: function to handle occured exception
        :type callback: function(thread, exception)
        :param args: arguments for threading.Thread()
        :type args: tuple
        :param kwargs: keyword arguments for threading.Thread()
        :type kwargs: dict
        """
        self._callback = callback
        super().__init__(*args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except BaseException as e:
            if self._callback is None:
                raise e
            else:
                self._callback(self, e)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs, self._callback

仅self._callback和run()中的except块是普通threading.Thread的附加项。

Similar way like RickardSjogren’s without Queue, sys etc. but also without some listeners to signals: execute directly an exception handler which corresponds to an except block.

#!/usr/bin/env python3

import threading

class ExceptionThread(threading.Thread):

    def __init__(self, callback=None, *args, **kwargs):
        """
        Redirect exceptions of thread to an exception handler.

        :param callback: function to handle occured exception
        :type callback: function(thread, exception)
        :param args: arguments for threading.Thread()
        :type args: tuple
        :param kwargs: keyword arguments for threading.Thread()
        :type kwargs: dict
        """
        self._callback = callback
        super().__init__(*args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except BaseException as e:
            if self._callback is None:
                raise e
            else:
                self._callback(self, e)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs, self._callback

Only self._callback and the except block in run() is additional to normal threading.Thread.


回答 9

我知道我在这里参加聚会有点晚了,但是我遇到了一个非常类似的问题,但是它包括使用tkinter作为GUI,并且mainloop使得无法使用任何依赖.join()的解决方案。因此,我改编了原始问题的EDIT中给出的解决方案,但使其变得更笼统,以使其他人易于理解。

这是正在使用的新线程类:

import threading
import traceback
import logging


class ExceptionThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except Exception:
            logging.error(traceback.format_exc())


def test_function_1(input):
    raise IndexError(input)


if __name__ == "__main__":
    input = 'useful'

    t1 = ExceptionThread(target=test_function_1, args=[input])
    t1.start()

当然,从日志记录中,您总是可以用其他方法来处理异常,例如将其打印出来或将其输出到控制台。

这使您可以像完全使用Thread类一样使用ExceptionThread类,而无需进行任何特殊修改。

I know I’m a bit late to the party here but I was having a very similar problem but it included using tkinter as a GUI, and the mainloop made it impossible to use any of the solutions that depend on .join(). Therefore I adapted the solution given in the EDIT of the original question, but made it more general to make it easier to understand for others.

Here is the new thread class in action:

import threading
import traceback
import logging


class ExceptionThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except Exception:
            logging.error(traceback.format_exc())


def test_function_1(input):
    raise IndexError(input)


if __name__ == "__main__":
    input = 'useful'

    t1 = ExceptionThread(target=test_function_1, args=[input])
    t1.start()

Of course you can always have it handle the exception some other way from logging, such as printing it out, or having it output to the console.

This allows you to use the ExceptionThread class exactly like you would the Thread class, without any special modifications.


回答 10

我喜欢的一种方法是基于观察者模式。我定义了一个信号类,我的线程使用该信号类向侦听器发出异常。它也可以用于从线程返回值。例:

import threading

class Signal:
    def __init__(self):
        self._subscribers = list()

    def emit(self, *args, **kwargs):
        for func in self._subscribers:
            func(*args, **kwargs)

    def connect(self, func):
        self._subscribers.append(func)

    def disconnect(self, func):
        try:
            self._subscribers.remove(func)
        except ValueError:
            raise ValueError('Function {0} not removed from {1}'.format(func, self))


class WorkerThread(threading.Thread):

    def __init__(self, *args, **kwargs):
        super(WorkerThread, self).__init__(*args, **kwargs)
        self.Exception = Signal()
        self.Result = Signal()

    def run(self):
        if self._Thread__target is not None:
            try:
                self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            except Exception as e:
                self.Exception.emit(e)
            else:
                self.Result.emit(self._return_value)

if __name__ == '__main__':
    import time

    def handle_exception(exc):
        print exc.message

    def handle_result(res):
        print res

    def a():
        time.sleep(1)
        raise IOError('a failed')

    def b():
        time.sleep(2)
        return 'b returns'

    t = WorkerThread(target=a)
    t2 = WorkerThread(target=b)
    t.Exception.connect(handle_exception)
    t2.Result.connect(handle_result)
    t.start()
    t2.start()

    print 'Threads started'

    t.join()
    t2.join()
    print 'Done'

我没有使用线程的足够经验来宣称这是一种完全安全的方法。但这对我有用,我喜欢这种灵活性。

One method I am fond of is based on the observer pattern. I define a signal class which my thread uses to emit exceptions to listeners. It can also be used to return values from threads. Example:

import threading

class Signal:
    def __init__(self):
        self._subscribers = list()

    def emit(self, *args, **kwargs):
        for func in self._subscribers:
            func(*args, **kwargs)

    def connect(self, func):
        self._subscribers.append(func)

    def disconnect(self, func):
        try:
            self._subscribers.remove(func)
        except ValueError:
            raise ValueError('Function {0} not removed from {1}'.format(func, self))


class WorkerThread(threading.Thread):

    def __init__(self, *args, **kwargs):
        super(WorkerThread, self).__init__(*args, **kwargs)
        self.Exception = Signal()
        self.Result = Signal()

    def run(self):
        if self._Thread__target is not None:
            try:
                self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            except Exception as e:
                self.Exception.emit(e)
            else:
                self.Result.emit(self._return_value)

if __name__ == '__main__':
    import time

    def handle_exception(exc):
        print exc.message

    def handle_result(res):
        print res

    def a():
        time.sleep(1)
        raise IOError('a failed')

    def b():
        time.sleep(2)
        return 'b returns'

    t = WorkerThread(target=a)
    t2 = WorkerThread(target=b)
    t.Exception.connect(handle_exception)
    t2.Result.connect(handle_result)
    t.start()
    t2.start()

    print 'Threads started'

    t.join()
    t2.join()
    print 'Done'

I do not have enough experience of working with threads to claim that this is a completely safe method. But it has worked for me and I like the flexibility.


回答 11

使用裸露的exceptions不是一个好习惯,因为您通常会收获比讨价还价更多的东西。

我建议修改,except使其仅捕获您要处理的异常。我认为提高它并没有达到预期的效果,因为当您TheThread在外部实例化时try,如果它引发异常,则分配将永远不会发生。

相反,您可能只想提醒它并继续前进,例如:

def run(self):
    try:
       shul.copytree(self.sourceFolder, self.destFolder)
    except OSError, err:
       print err

然后,当该异常被捕获时,您可以在那里处理它。然后,当外部程序try从中捕获到异常时TheThread,您就会知道它不会是您已经处理过的异常,它将帮助您隔离处理流程。

Using naked excepts is not a good practice because you usually catch more than you bargain for.

I would suggest modifying the except to catch ONLY the exception that you would like to handle. I don’t think that raising it has the desired effect, because when you go to instantiate TheThread in the outer try, if it raises an exception, the assignment is never going to happen.

Instead you might want to just alert on it and move on, such as:

def run(self):
    try:
       shul.copytree(self.sourceFolder, self.destFolder)
    except OSError, err:
       print err

Then when that exception is caught, you can handle it there. Then when the outer try catches an exception from TheThread, you know it won’t be the one you already handled, and will help you isolate your process flow.


回答 12

捕获线程异常并将其传递回调用方方法的一种简单方法是将字典或列表传递给worker方法。

示例(将字典传递给worker方法):

import threading

def my_method(throw_me):
    raise Exception(throw_me)

def worker(shared_obj, *args, **kwargs):
    try:
        shared_obj['target'](*args, **kwargs)
    except Exception as err:
        shared_obj['err'] = err

shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"

th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()

if shared_obj['err']:
    print(">>%s" % shared_obj['err'])

A simple way of catching thread’s exception and communicating back to the caller method could be by passing dictionary or a list to worker method.

Example (passing dictionary to worker method):

import threading

def my_method(throw_me):
    raise Exception(throw_me)

def worker(shared_obj, *args, **kwargs):
    try:
        shared_obj['target'](*args, **kwargs)
    except Exception as err:
        shared_obj['err'] = err

shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"

th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()

if shared_obj['err']:
    print(">>%s" % shared_obj['err'])

回答 13

用异常存储包装线程。

import threading
import sys
class ExcThread(threading.Thread):

    def __init__(self, target, args = None):
        self.args = args if args else []
        self.target = target
        self.exc = None
        threading.Thread.__init__(self)

    def run(self):
        try:
            self.target(*self.args)
            raise Exception('An error occured here.')
        except Exception:
            self.exc=sys.exc_info()

def main():
    def hello(name):
        print(!"Hello, {name}!")
    thread_obj = ExcThread(target=hello, args=("Jack"))
    thread_obj.start()

    thread_obj.join()
    exc = thread_obj.exc
    if exc:
        exc_type, exc_obj, exc_trace = exc
        print(exc_type, ':',exc_obj, ":", exc_trace)

main()

Wrap Thread with exception storage.

import threading
import sys
class ExcThread(threading.Thread):

    def __init__(self, target, args = None):
        self.args = args if args else []
        self.target = target
        self.exc = None
        threading.Thread.__init__(self)

    def run(self):
        try:
            self.target(*self.args)
            raise Exception('An error occured here.')
        except Exception:
            self.exc=sys.exc_info()

def main():
    def hello(name):
        print(!"Hello, {name}!")
    thread_obj = ExcThread(target=hello, args=("Jack"))
    thread_obj.start()

    thread_obj.join()
    exc = thread_obj.exc
    if exc:
        exc_type, exc_obj, exc_trace = exc
        print(exc_type, ':',exc_obj, ":", exc_trace)

main()

回答 14

pygolang提供sync.WorkGroup,尤其是将异常从生成的工作线程传播到主线程。例如:

#!/usr/bin/env python
"""This program demostrates how with sync.WorkGroup an exception raised in
spawned thread is propagated into main thread which spawned the worker."""

from __future__ import print_function
from golang import sync, context

def T1(ctx, *argv):
    print('T1: run ... %r' % (argv,))
    raise RuntimeError('T1: problem')

def T2(ctx):
    print('T2: ran ok')

def main():
    wg = sync.WorkGroup(context.background())
    wg.go(T1, [1,2,3])
    wg.go(T2)

    try:
        wg.wait()
    except Exception as e:
        print('Tmain: caught exception: %r\n' %e)
        # reraising to see full traceback
        raise

if __name__ == '__main__':
    main()

运行时给出以下内容:

T1: run ... ([1, 2, 3],)
T2: ran ok
Tmain: caught exception: RuntimeError('T1: problem',)

Traceback (most recent call last):
  File "./x.py", line 28, in <module>
    main()
  File "./x.py", line 21, in main
    wg.wait()
  File "golang/_sync.pyx", line 198, in golang._sync.PyWorkGroup.wait
    pyerr_reraise(pyerr)
  File "golang/_sync.pyx", line 178, in golang._sync.PyWorkGroup.go.pyrunf
    f(pywg._pyctx, *argv, **kw)
  File "./x.py", line 10, in T1
    raise RuntimeError('T1: problem')
RuntimeError: T1: problem

该问题的原始代码就是:

    wg = sync.WorkGroup(context.background())

    def _(ctx):
        shul.copytree(sourceFolder, destFolder)
    wg.go(_)

    # waits for spawned worker to complete and, on error, reraises
    # its exception on the main thread.
    wg.wait()

pygolang provides sync.WorkGroup which, in particular, propagates exception from spawned worker threads to the main thread. For example:

#!/usr/bin/env python
"""This program demostrates how with sync.WorkGroup an exception raised in
spawned thread is propagated into main thread which spawned the worker."""

from __future__ import print_function
from golang import sync, context

def T1(ctx, *argv):
    print('T1: run ... %r' % (argv,))
    raise RuntimeError('T1: problem')

def T2(ctx):
    print('T2: ran ok')

def main():
    wg = sync.WorkGroup(context.background())
    wg.go(T1, [1,2,3])
    wg.go(T2)

    try:
        wg.wait()
    except Exception as e:
        print('Tmain: caught exception: %r\n' %e)
        # reraising to see full traceback
        raise

if __name__ == '__main__':
    main()

gives the following when run:

T1: run ... ([1, 2, 3],)
T2: ran ok
Tmain: caught exception: RuntimeError('T1: problem',)

Traceback (most recent call last):
  File "./x.py", line 28, in <module>
    main()
  File "./x.py", line 21, in main
    wg.wait()
  File "golang/_sync.pyx", line 198, in golang._sync.PyWorkGroup.wait
    pyerr_reraise(pyerr)
  File "golang/_sync.pyx", line 178, in golang._sync.PyWorkGroup.go.pyrunf
    f(pywg._pyctx, *argv, **kw)
  File "./x.py", line 10, in T1
    raise RuntimeError('T1: problem')
RuntimeError: T1: problem

The original code from the question would be just:

    wg = sync.WorkGroup(context.background())

    def _(ctx):
        shul.copytree(sourceFolder, destFolder)
    wg.go(_)

    # waits for spawned worker to complete and, on error, reraises
    # its exception on the main thread.
    wg.wait()

Python线程中的join()有什么用?

问题:Python线程中的join()有什么用?

我正在研究python线程并遇到了join()

作者告诉我们,如果线程处于守护程序模式,那么我需要使用它,join()以便线程可以在主线程终止之前完成自身。

但我一直在使用也没见他t.join()即使tdaemon

示例代码是这个

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def daemon():
    logging.debug('Starting')
    time.sleep(2)
    logging.debug('Exiting')

d = threading.Thread(name='daemon', target=daemon)
d.setDaemon(True)

def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

d.join()
t.join()

我不知道这是什么用途,t.join()因为它不是守护程序,即使删除它也看不到任何变化

I was studying the python threading and came across join().

The author told that if thread is in daemon mode then i need to use join() so that thread can finish itself before main thread terminates.

but I have also seen him using t.join() even though t was not daemon

example code is this

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def daemon():
    logging.debug('Starting')
    time.sleep(2)
    logging.debug('Exiting')

d = threading.Thread(name='daemon', target=daemon)
d.setDaemon(True)

def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

d.join()
t.join()

i don’t know what is use of t.join() as it is not daemon and i can see no change even if i remove it


回答 0

展示这种机制的技术有些笨拙:join()大概是由主线程调用的。也可以由另一个线程调用它,但是会不必要地使该图复杂化。

join-调用应该放在主线程的轨道上,但是为了表达线程关系并使其尽可能简单,我选择将其放在子线程中。

without join:
+---+---+------------------                     main-thread
    |   |
    |   +...........                            child-thread(short)
    +..................................         child-thread(long)

with join
+---+---+------------------***********+###      main-thread
    |   |                             |
    |   +...........join()            |         child-thread(short)
    +......................join()......         child-thread(long)

with join and daemon thread
+-+--+---+------------------***********+###     parent-thread
  |  |   |                             |
  |  |   +...........join()            |        child-thread(short)
  |  +......................join()......        child-thread(long)
  +,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,     child-thread(long + daemonized)

'-' main-thread/parent-thread/main-program execution
'.' child-thread execution
'#' optional parent-thread execution after join()-blocked parent-thread could 
    continue
'*' main-thread 'sleeping' in join-method, waiting for child-thread to finish
',' daemonized thread - 'ignores' lifetime of other threads;
    terminates when main-programs exits; is normally meant for 
    join-independent tasks

因此,您看不到任何更改的原因是因为您的主线程在之后没有执行任何操作join。您可以说join(仅)与主线程的执行流程相关。

例如,如果要同时下载一堆页面以将它们串联成一个大页面,则可以使用线程开始并发下载,但是需要等到最后一页/线程完成后才能开始组装单个页面在很多。那是您使用的时间join()

A somewhat clumsy ascii-art to demonstrate the mechanism: The join() is presumably called by the main-thread. It could also be called by another thread, but would needlessly complicate the diagram.

join-calling should be placed in the track of the main-thread, but to express thread-relation and keep it as simple as possible, I choose to place it in the child-thread instead.

without join:
+---+---+------------------                     main-thread
    |   |
    |   +...........                            child-thread(short)
    +..................................         child-thread(long)

with join
+---+---+------------------***********+###      main-thread
    |   |                             |
    |   +...........join()            |         child-thread(short)
    +......................join()......         child-thread(long)

with join and daemon thread
+-+--+---+------------------***********+###     parent-thread
  |  |   |                             |
  |  |   +...........join()            |        child-thread(short)
  |  +......................join()......        child-thread(long)
  +,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,     child-thread(long + daemonized)

'-' main-thread/parent-thread/main-program execution
'.' child-thread execution
'#' optional parent-thread execution after join()-blocked parent-thread could 
    continue
'*' main-thread 'sleeping' in join-method, waiting for child-thread to finish
',' daemonized thread - 'ignores' lifetime of other threads;
    terminates when main-programs exits; is normally meant for 
    join-independent tasks

So the reason you don’t see any changes is because your main-thread does nothing after your join. You could say join is (only) relevant for the execution-flow of the main-thread.

If, for example, you want to concurrently download a bunch of pages to concatenate them into a single large page, you may start concurrent downloads using threads, but need to wait until the last page/thread is finished before you start assembling a single page out of many. That’s when you use join().


回答 1

直接来自文档

join([timeout])等待线程终止。这将阻塞调用线程,直到被调用join()方法的线程终止(正常或通过未处理的异常终止),或者直到发生可选的超时。

这意味着,主线程其产卵td,等待t完成,直到它完成。

根据您的程序采用的逻辑,您可能要等到线程完成后再继续执行主线程。

另外从文档:

线程可以标记为“守护程序线程”。该标志的重要性在于,仅保留守护程序线程时,整个Python程序都会退出。

一个简单的例子,说我们有这个:

def non_daemon():
    time.sleep(5)
    print 'Test non-daemon'

t = threading.Thread(name='non-daemon', target=non_daemon)

t.start()

结束于:

print 'Test one'
t.join()
print 'Test two'

这将输出:

Test one
Test non-daemon
Test two

在此,主线程显式等待t线程完成,直到print第二次调用为止。

或者,如果我们有这个:

print 'Test one'
print 'Test two'
t.join()

我们将得到以下输出:

Test one
Test two
Test non-daemon

在这里,我们在主线程中完成工作,然后等待t线程完成。在这种情况下,我们甚至可以删除显式连接t.join(),并且程序将隐式等待t完成。

Straight from the docs

join([timeout]) Wait until the thread terminates. This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception – or until the optional timeout occurs.

This means that the main thread which spawns t and d, waits for t to finish until it finishes.

Depending on the logic your program employs, you may want to wait until a thread finishes before your main thread continues.

Also from the docs:

A thread can be flagged as a “daemon thread”. The significance of this flag is that the entire Python program exits when only daemon threads are left.

A simple example, say we have this:

def non_daemon():
    time.sleep(5)
    print 'Test non-daemon'

t = threading.Thread(name='non-daemon', target=non_daemon)

t.start()

Which finishes with:

print 'Test one'
t.join()
print 'Test two'

This will output:

Test one
Test non-daemon
Test two

Here the master thread explicitly waits for the t thread to finish until it calls print the second time.

Alternatively if we had this:

print 'Test one'
print 'Test two'
t.join()

We’ll get this output:

Test one
Test two
Test non-daemon

Here we do our job in the main thread and then we wait for the t thread to finish. In this case we might even remove the explicit joining t.join() and the program will implicitly wait for t to finish.


回答 2

感谢您提供此主题-它对我也有很大帮助。

我今天学到了有关.join()的知识。

这些线程并行运行:

d.start()
t.start()
d.join()
t.join()

这些依次运行(不是我想要的):

d.start()
d.join()
t.start()
t.join()

特别是,我试图变得聪明和整洁:

class Kiki(threading.Thread):
    def __init__(self, time):
        super(Kiki, self).__init__()
        self.time = time
        self.start()
        self.join()

这可行!但是它顺序运行。我可以将self.start()放在__ init __中,但是不能将self.join()放入。必须每个线程启动之后执行此操作。

join()是导致主线程等待线程完成的原因。否则,您的线程将独自运行。

因此,可以将join()视为主线程上的一个“保留”方法-它可以对线程进行解线程,并在主线程可以继续之前在主线程中按顺序执行。它可以确保您的线程在主线程向前移动之前是完整的。请注意,这意味着在调用join()之前线程已经完成是可以的-在调用join()时立即立即释放主线程。

实际上,我刚才想到的是,主线程在d.join()上等待,直到线程d完成,然后才移至t.join()。

实际上,非常清楚,请考虑以下代码:

import threading
import time

class Kiki(threading.Thread):
    def __init__(self, time):
        super(Kiki, self).__init__()
        self.time = time
        self.start()

    def run(self):
        print self.time, " seconds start!"
        for i in range(0,self.time):
            time.sleep(1)
            print "1 sec of ", self.time
        print self.time, " seconds finished!"


t1 = Kiki(3)
t2 = Kiki(2)
t3 = Kiki(1)
t1.join()
print "t1.join() finished"
t2.join()
print "t2.join() finished"
t3.join()
print "t3.join() finished"

它产生此输出(请注意print语句如何彼此穿线。)

$ python test_thread.py
32   seconds start! seconds start!1

 seconds start!
1 sec of  1
 1 sec of 1  seconds finished!
 21 sec of
3
1 sec of  3
1 sec of  2
2  seconds finished!
1 sec of  3
3  seconds finished!
t1.join() finished
t2.join() finished
t3.join() finished
$ 

t1.join()正在阻止主线程。所有三个线程在t1.join()完成之前完成,并且主线程继续执行打印,然后执行t2.join()然后执行打印,然后执行t3.join()然后进行打印。

欢迎更正。我也是线程新手。

(注意:如果您有兴趣,我正在为DrinkBot编写代码,并且我需要线程来同时运行配料泵,而不是顺序运行-等待每杯饮料的时间更少。)

Thanks for this thread — it helped me a lot too.

I learned something about .join() today.

These threads run in parallel:

d.start()
t.start()
d.join()
t.join()

and these run sequentially (not what I wanted):

d.start()
d.join()
t.start()
t.join()

In particular, I was trying to clever and tidy:

class Kiki(threading.Thread):
    def __init__(self, time):
        super(Kiki, self).__init__()
        self.time = time
        self.start()
        self.join()

This works! But it runs sequentially. I can put the self.start() in __ init __, but not the self.join(). That has to be done after every thread has been started.

join() is what causes the main thread to wait for your thread to finish. Otherwise, your thread runs all by itself.

So one way to think of join() as a “hold” on the main thread — it sort of de-threads your thread and executes sequentially in the main thread, before the main thread can continue. It assures that your thread is complete before the main thread moves forward. Note that this means it’s ok if your thread is already finished before you call the join() — the main thread is simply released immediately when join() is called.

In fact, it just now occurs to me that the main thread waits at d.join() until thread d finishes before it moves on to t.join().

In fact, to be very clear, consider this code:

import threading
import time

class Kiki(threading.Thread):
    def __init__(self, time):
        super(Kiki, self).__init__()
        self.time = time
        self.start()

    def run(self):
        print self.time, " seconds start!"
        for i in range(0,self.time):
            time.sleep(1)
            print "1 sec of ", self.time
        print self.time, " seconds finished!"


t1 = Kiki(3)
t2 = Kiki(2)
t3 = Kiki(1)
t1.join()
print "t1.join() finished"
t2.join()
print "t2.join() finished"
t3.join()
print "t3.join() finished"

It produces this output (note how the print statements are threaded into each other.)

$ python test_thread.py
32   seconds start! seconds start!1

 seconds start!
1 sec of  1
 1 sec of 1  seconds finished!
 21 sec of
3
1 sec of  3
1 sec of  2
2  seconds finished!
1 sec of  3
3  seconds finished!
t1.join() finished
t2.join() finished
t3.join() finished
$ 

The t1.join() is holding up the main thread. All three threads complete before the t1.join() finishes and the main thread moves on to execute the print then t2.join() then print then t3.join() then print.

Corrections welcome. I’m also new to threading.

(Note: in case you’re interested, I’m writing code for a DrinkBot, and I need threading to run the ingredient pumps concurrently rather than sequentially — less time to wait for each drink.)


回答 3

方法join()

阻塞调用线程,直到调用join()方法的线程终止。

来源:http : //docs.python.org/2/library/threading.html

The method join()

blocks the calling thread until the thread whose join() method is called is terminated.

Source : http://docs.python.org/2/library/threading.html


回答 4

简单了解

与join-解释器将等待,直到您的过程完成终止

>>> from threading import Thread
>>> import time
>>> def sam():
...   print 'started'
...   time.sleep(10)
...   print 'waiting for 10sec'
... 
>>> t = Thread(target=sam)
>>> t.start()
started

>>> t.join() # with join interpreter will wait until your process get completed or terminated
done?   # this line printed after thread execution stopped i.e after 10sec
waiting for 10sec
>>> done?

没有连接解释器不会等到进程被终止

>>> t = Thread(target=sam)
>>> t.start()
started
>>> print 'yes done' #without join interpreter wont wait until process get terminated
yes done
>>> waiting for 10sec

With join – interpreter will wait until your process get completed or terminated

>>> from threading import Thread
>>> import time
>>> def sam():
...   print 'started'
...   time.sleep(10)
...   print 'waiting for 10sec'
... 
>>> t = Thread(target=sam)
>>> t.start()
started

>>> t.join() # with join interpreter will wait until your process get completed or terminated
done?   # this line printed after thread execution stopped i.e after 10sec
waiting for 10sec
>>> done?

without join – interpreter wont wait until process get terminated,

>>> t = Thread(target=sam)
>>> t.start()
started
>>> print 'yes done' #without join interpreter wont wait until process get terminated
yes done
>>> waiting for 10sec

回答 5

join(t)同时为非守护程序线程和守护程序线程创建函数时,主线程(或主进程)应等待t几秒钟,然后才能进一步在其自己的进程上工作。在t几秒钟的等待时间内,两个子线程都应该做他们可以做的事情,例如打印一些文本。在t几秒钟之后,如果非守护程序线程仍然没有完成其工作,并且它仍可以在主进程完成其工作之后完成它,但是对于守护程序线程,它只是错过了机会窗口。但是,它将在python程序退出后最终消失。如果有问题,请纠正我。

When making join(t) function for both non-daemon thread and daemon thread, the main thread (or main process) should wait t seconds, then can go further to work on its own process. During the t seconds waiting time, both of the children threads should do what they can do, such as printing out some text. After the t seconds, if non-daemon thread still didn’t finish its job, and it still can finish it after the main process finishes its job, but for daemon thread, it just missed its opportunity window. However, it will eventually die after the python program exits. Please correct me if there is something wrong.


回答 6

在python 3.x中,join()用于将线程与主线程连接,即,当join()用于特定线程时,主线程将停止执行,直到连接线程的执行完成为止。

#1 - Without Join():
import threading
import time
def loiter():
    print('You are loitering!')
    time.sleep(5)
    print('You are not loitering anymore!')

t1 = threading.Thread(target = loiter)
t1.start()
print('Hey, I do not want to loiter!')
'''
Output without join()--> 
You are loitering!
Hey, I do not want to loiter!
You are not loitering anymore! #After 5 seconds --> This statement will be printed

'''
#2 - With Join():
import threading
import time
def loiter():
    print('You are loitering!')
    time.sleep(5)
    print('You are not loitering anymore!')

t1 = threading.Thread(target = loiter)
t1.start()
t1.join()
print('Hey, I do not want to loiter!')

'''
Output with join() -->
You are loitering!
You are not loitering anymore! #After 5 seconds --> This statement will be printed
Hey, I do not want to loiter! 

'''

In python 3.x join() is used to join a thread with the main thread i.e. when join() is used for a particular thread the main thread will stop executing until the execution of joined thread is complete.

#1 - Without Join():
import threading
import time
def loiter():
    print('You are loitering!')
    time.sleep(5)
    print('You are not loitering anymore!')

t1 = threading.Thread(target = loiter)
t1.start()
print('Hey, I do not want to loiter!')
'''
Output without join()--> 
You are loitering!
Hey, I do not want to loiter!
You are not loitering anymore! #After 5 seconds --> This statement will be printed

'''
#2 - With Join():
import threading
import time
def loiter():
    print('You are loitering!')
    time.sleep(5)
    print('You are not loitering anymore!')

t1 = threading.Thread(target = loiter)
t1.start()
t1.join()
print('Hey, I do not want to loiter!')

'''
Output with join() -->
You are loitering!
You are not loitering anymore! #After 5 seconds --> This statement will be printed
Hey, I do not want to loiter! 

'''

回答 7

此示例演示了该.join()操作:

import threading
import time

def threaded_worker():
    for r in range(10):
        print('Other: ', r)
        time.sleep(2)

thread_ = threading.Timer(1, threaded_worker)
thread_.daemon = True  # If the main thread kills, this thread will be killed too. 
thread_.start()

flag = True

for i in range(10):
    print('Main: ', i)
    time.sleep(2)
    if flag and i > 4:
        print(
            '''
            Threaded_worker() joined to the main thread. 
            Now we have a sequential behavior instead of concurrency.
            ''')
        thread_.join()
        flag = False

出:

Main:  0
Other:  0
Main:  1
Other:  1
Main:  2
Other:  2
Main:  3
Other:  3
Main:  4
Other:  4
Main:  5
Other:  5

            Threaded_worker() joined to the main thread. 
            Now we have a sequential behavior instead of concurrency.

Other:  6
Other:  7
Other:  8
Other:  9
Main:  6
Main:  7
Main:  8
Main:  9

This example demonstrate the .join() action:

import threading
import time

def threaded_worker():
    for r in range(10):
        print('Other: ', r)
        time.sleep(2)

thread_ = threading.Timer(1, threaded_worker)
thread_.daemon = True  # If the main thread kills, this thread will be killed too. 
thread_.start()

flag = True

for i in range(10):
    print('Main: ', i)
    time.sleep(2)
    if flag and i > 4:
        print(
            '''
            Threaded_worker() joined to the main thread. 
            Now we have a sequential behavior instead of concurrency.
            ''')
        thread_.join()
        flag = False

Out:

Main:  0
Other:  0
Main:  1
Other:  1
Main:  2
Other:  2
Main:  3
Other:  3
Main:  4
Other:  4
Main:  5
Other:  5

            Threaded_worker() joined to the main thread. 
            Now we have a sequential behavior instead of concurrency.

Other:  6
Other:  7
Other:  8
Other:  9
Main:  6
Main:  7
Main:  8
Main:  9

回答 8

主线程(或任何其他线程)加入其他线程的原因有几个

  1. 线程可能已经创建或持有(锁定)了一些资源。加入调用线程可能能够代表它清除资源

  2. join()是自然的阻塞调用,用于在被调用线程终止后继续继续调用join调用线程。

如果python程序未加入其他线程,则python解释器仍将代表其加入非守护程序线程。

There are a few reasons for the main thread (or any other thread) to join other threads

  1. A thread may have created or holding (locking) some resources. The join-calling thread may be able to clear the resources on its behalf

  2. join() is a natural blocking call for the join-calling thread to continue after the called thread has terminated.

If a python program does not join other threads, the python interpreter will still join non-daemon threads on its behalf.


回答 9

“使用join()有什么用?” 你说。确实,答案与“关闭文件有什么用,因为当程序退出时python和OS会为我关闭我的文件吗?”。

这仅仅是一个良好的编程问题。您应该在代码中不应运行该线程的位置加入join()线程,这是因为您肯定要确保该线程未在运行以干扰您自己的代码,或者您想在一个线程中正确运行更大的系统。

您可能因为join()可能需要额外的时间而说“我不希望我的代码延迟给出答案”。在某些情况下,这可能是完全正确的,但是现在您需要考虑到您的代码“为Python和OS清理留下了麻烦”。如果出于性能原因执行此操作,强烈建议您记录该行为。如果您要构建其他人希望使用的库/包,则尤其如此。

除了性能原因外,没有任何理由不加入join(),我认为您的代码不需要表现得那么好。

“What’s the use of using join()?” you say. Really, it’s the same answer as “what’s the use of closing files, since python and the OS will close my file for me when my program exits?”.

It’s simply a matter of good programming. You should join() your threads at the point in the code that the thread should not be running anymore, either because you positively have to ensure the thread is not running to interfere with your own code, or that you want to behave correctly in a larger system.

You might say “I don’t want my code to delay giving an answer” just because of the additional time that the join() might require. This may be perfectly valid in some scenarios, but you now need to take into account that your code is “leaving cruft around for python and the OS to clean up”. If you do this for performance reasons, I strongly encourage you to document that behavior. This is especially true if you’re building a library/package that others are expected to utilize.

There’s no reason to not join(), other than performance reasons, and I would argue that your code does not need to perform that well.


在python中创建线程

问题:在python中创建线程

我有一个脚本,我希望一个函数与另一个函数同时运行。

我看过的示例代码:

import threading

def MyThread (threading.thread):
    # doing something........

def MyThread2 (threading.thread):
    # doing something........

MyThread().start()
MyThread2().start()

我在进行这项工作时遇到了麻烦。我更愿意使用线程函数而不是类来实现这一点。

这是工作脚本:

from threading import Thread

class myClass():

    def help(self):
        os.system('./ssh.py')

    def nope(self):
        a = [1,2,3,4,5,6,67,78]
        for i in a:
            print i
            sleep(1)


if __name__ == "__main__":
    Yep = myClass()
    thread = Thread(target = Yep.help)
    thread2 = Thread(target = Yep.nope)
    thread.start()
    thread2.start()
    thread.join()
    print 'Finished'

I have a script and I want one function to run at the same time as the other.

The example code I have looked at:

import threading

def MyThread (threading.thread):
    # doing something........

def MyThread2 (threading.thread):
    # doing something........

MyThread().start()
MyThread2().start()

I am having trouble getting this working. I would prefer to get this going using a threaded function rather than a class.

This is the working script:

from threading import Thread

class myClass():

    def help(self):
        os.system('./ssh.py')

    def nope(self):
        a = [1,2,3,4,5,6,67,78]
        for i in a:
            print i
            sleep(1)


if __name__ == "__main__":
    Yep = myClass()
    thread = Thread(target = Yep.help)
    thread2 = Thread(target = Yep.nope)
    thread.start()
    thread2.start()
    thread.join()
    print 'Finished'

回答 0

您无需使用的子类Thread即可完成这项工作-请看一下我在下面发布的简单示例,了解如何:

from threading import Thread
from time import sleep

def threaded_function(arg):
    for i in range(arg):
        print("running")
        sleep(1)


if __name__ == "__main__":
    thread = Thread(target = threaded_function, args = (10, ))
    thread.start()
    thread.join()
    print("thread finished...exiting")

在这里,我展示了如何使用线程模块创建一个线程,该线程调用普通函数作为其目标。您可以看到如何在线程构造函数中将所需的任何参数传递给它。

You don’t need to use a subclass of Thread to make this work – take a look at the simple example I’m posting below to see how:

from threading import Thread
from time import sleep

def threaded_function(arg):
    for i in range(arg):
        print("running")
        sleep(1)


if __name__ == "__main__":
    thread = Thread(target = threaded_function, args = (10, ))
    thread.start()
    thread.join()
    print("thread finished...exiting")

Here I show how to use the threading module to create a thread which invokes a normal function as its target. You can see how I can pass whatever arguments I need to it in the thread constructor.


回答 1

您的代码存在一些问题:

def MyThread ( threading.thread ):
  • 您不能使用函数子类化;只有一个Class
  • 如果要使用子类,则需要threading.Thread,而不是threading.thread

如果您真的只想使用函数来执行此操作,则有两个选择:

使用线程:

import threading
def MyThread1():
    pass
def MyThread2():
    pass

t1 = threading.Thread(target=MyThread1, args=[])
t2 = threading.Thread(target=MyThread2, args=[])
t1.start()
t2.start()

带螺纹:

import thread
def MyThread1():
    pass
def MyThread2():
    pass

thread.start_new_thread(MyThread1, ())
thread.start_new_thread(MyThread2, ())

Thread.start_new_thread的文档

There are a few problems with your code:

def MyThread ( threading.thread ):
  • You can’t subclass with a function; only with a class
  • If you were going to use a subclass you’d want threading.Thread, not threading.thread

If you really want to do this with only functions, you have two options:

With threading:

import threading
def MyThread1():
    pass
def MyThread2():
    pass

t1 = threading.Thread(target=MyThread1, args=[])
t2 = threading.Thread(target=MyThread2, args=[])
t1.start()
t2.start()

With thread:

import thread
def MyThread1():
    pass
def MyThread2():
    pass

thread.start_new_thread(MyThread1, ())
thread.start_new_thread(MyThread2, ())

Doc for thread.start_new_thread


回答 2

我试图添加另一个join(),似乎可行。这是代码

from threading import Thread
from time import sleep

def function01(arg,name):
    for i in range(arg):
        print(name,'i---->',i,'\n')
        print (name,"arg---->",arg,'\n')
        sleep(1)

def test01():
    thread1 = Thread(target = function01, args = (10,'thread1', ))
    thread1.start()
    thread2 = Thread(target = function01, args = (10,'thread2', ))
    thread2.start()
    thread1.join()
    thread2.join()
    print ("thread finished...exiting")

test01()

I tried to add another join(), and it seems worked. Here is code

from threading import Thread
from time import sleep

def function01(arg,name):
    for i in range(arg):
        print(name,'i---->',i,'\n')
        print (name,"arg---->",arg,'\n')
        sleep(1)

def test01():
    thread1 = Thread(target = function01, args = (10,'thread1', ))
    thread1.start()
    thread2 = Thread(target = function01, args = (10,'thread2', ))
    thread2.start()
    thread1.join()
    thread2.join()
    print ("thread finished...exiting")

test01()

回答 3

您可以targetThread构造函数中使用参数直接传递被调用而不是的函数run

You can use the target argument in the Thread constructor to directly pass in a function that gets called instead of run.


回答 4

您是否覆盖了run()方法?如果您改写了__init__,您确定要呼叫基地threading.Thread.__init__()吗?

启动两个线程后,主线程是否继续在子线程上无限期地工作/阻塞/联接,以使主线程在子线程完成其任务之前不会结束执行?

最后,您是否遇到任何未处理的异常?

Did you override the run() method? If you overrided __init__, did you make sure to call the base threading.Thread.__init__()?

After starting the two threads, does the main thread continue to do work indefinitely/block/join on the child threads so that main thread execution does not end before the child threads complete their tasks?

And finally, are you getting any unhandled exceptions?


回答 5

Python 3具有启动并行任务的功能。这使我们的工作更加轻松。

它具有线程池进程池

以下提供了一个见解:

ThreadPoolExecutor示例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

另一个例子

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Python 3 has the facility of Launching parallel tasks. This makes our work easier.

It has for thread pooling and Process pooling.

The following gives an insight:

ThreadPoolExecutor Example

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

Another Example

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

为什么Java虚拟机中没有GIL?为什么Python需要这么糟糕?

问题:为什么Java虚拟机中没有GIL?为什么Python需要这么糟糕?

我希望有人能够提供一些有关Java虚拟机的根本差异的见解,从而使Java虚拟机能够很好地实现线程而无需使用全局解释器锁(GIL),而Python则需要这样做。

I’m hoping someone can provide some insight as to what’s fundamentally different about the Java Virtual Machine that allows it to implement threads nicely without the need for a Global Interpreter Lock (GIL), while Python necessitates such an evil.


回答 0

Python(该语言)不需要GIL(这就是为什么它可以在JVM [Jython]和.NET [IronPython]上完美实现的原因,并且这些实现可以自由地使用多线程)。CPython(流行的实现)一直使用GIL来简化编码(尤其是垃圾收集机制的编码)和非线程安全的C编码库的集成(过去有很多这样的库); -)。

空载燕子项目,其它的宏伟目标中,做计划一个GIL -免费的虚拟机为Python -引用该网站,“此外,我们打算移除GIL和修复在Python多线程的状态,我们认为,这是可以通过实施更复杂的GC系统来实现,例如IBM的Recycler(Bacon等,2001)。”

Python (the language) doesn’t need a GIL (which is why it can perfectly be implemented on JVM [Jython] and .NET [IronPython], and those implementations multithread freely). CPython (the popular implementation) has always used a GIL for ease of coding (esp. the coding of the garbage collection mechanisms) and of integration of non-thread-safe C-coded libraries (there used to be a ton of those around;-).

The Unladen Swallow project, among other ambitious goals, does plan a GIL-free virtual machine for Python — to quote that site, “In addition, we intend to remove the GIL and fix the state of multithreading in Python. We believe this is possible through the implementation of a more sophisticated GC system, something like IBM’s Recycler (Bacon et al, 2001).”


回答 1

JVM(至少是热点)的确与“ GIL”具有类似的概念,它的锁定粒度要好得多,其中大部分来自更先进的GC热点。

在CPython中,这是一个很大的锁(可能并非如此,但对于参数而言已经足够好了),在JVM中,它的使用范围更广,涉及不同的概念。

例如,查看热点代码中的vm / runtime / safepoint.hpp,这实际上是一个障碍。一旦到达安全点,整个VM就Java代码而言都已停止,就像python VM在GIL处停止一样。

在Java世界中,此类VM暂停事件被称为“世界停止”,在这些时候,只有绑定到某些条件的本机代码可以自由运行,其余的VM已停止。

另外,由于Java中缺少粗略的锁,因此JNI的编写变得更加困难,因为JVM对其FFI调用的环境的保证较少,这是cpython相当容易的事情之一(尽管不像使用ctypes那样容易)。

The JVM (at least hotspot) does have a similar concept to the “GIL”, it’s just much finer in its lock granularity, most of this comes from the GC’s in hotspot which are more advanced.

In CPython it’s one big lock (probably not that true, but good enough for arguments sake), in the JVM it’s more spread about with different concepts depending on where it is used.

Take a look at, for example, vm/runtime/safepoint.hpp in the hotspot code, which is effectively a barrier. Once at a safepoint the entire VM has stopped with regard to java code, much like the python VM stops at the GIL.

In the Java world such VM pausing events are known as “stop-the-world”, at these points only native code that is bound to certain criteria is free running, the rest of the VM has been stopped.

Also the lack of a coarse lock in java makes JNI much more difficult to write, as the JVM makes less guarantees about its environment for FFI calls, one of the things that cpython makes fairly easy (although not as easy as using ctypes).


回答 2

以下是此博客文章http://www.grouplens.org/node/244中的注释,它暗示了为什么为IronPython或Jython取消GIL这么容易的原因,这是CPython使用引用计数,而另外2个VM具有垃圾回收器。

为什么我不知道这是为什么的确切机制,但这听起来似乎是合理的原因。

There is a comment down below in this blog post http://www.grouplens.org/node/244 that hints at the reason why it was so easy dispense with a GIL for IronPython or Jython, it is that CPython uses reference counting whereas the other 2 VMs have garbage collectors.

The exact mechanics of why this is so I don’t get, but it does sounds like a plausible reason.


回答 3

在此链接中,它们具有以下解释:

…“解释器的部分不是线程安全的,尽管主要是因为通过大量使用锁使它们全部成为线程安全,这会极大地减慢单线程()。这似乎与使用引用计数(JVM)的CPython垃圾收集器有关。而CLR不需要,因此不需要每次都锁定/释放引用计数。但是,即使有人想到了可接受的解决方案并实施了该解决方案,第三方库仍然会遇到同样的问题。”

In this link they have the following explanation:

… “Parts of the Interpreter aren’t threadsafe, though mostly because making them all threadsafe by massive lock usage would slow single-threaded extremely (source). This seems to be related to the CPython garbage collector using reference counting (the JVM and CLR don’t, and therefore don’t need to lock/release a reference count every time). But even if someone thought of an acceptable solution and implemented it, third party libraries would still have the same problems.”


回答 4

Python缺少jit / aot,并且它在多线程处理器上编写的时间框架不存在。另外,您可以重新编译缺少GIL的Julia lang中的所有内容,并提高Python代码的速度。而且Jython有点烂,它比Cpython和Java慢。如果您想使用Python并考虑使用并行插件,则不会立即提高速度,但是可以使用合适的插件进行并行编程。

Python lacks jit/aot and the time frame it was written at multithreaded processors didn’t exist. Alternatively you could recompile everything in Julia lang which lacks GIL and gain some speed boost on your Python code. Also Jython kind of sucks it’s slower than Cpython and Java. If you want to stick to Python consider using parallel plugins, you won’t gain an instant speed boost but you can do parallel programming with the right plugin.


守护程序线程说明

问题:守护程序线程说明

Python文档中 它说:

线程可以标记为“守护程序线程”。该标志的重要性在于,仅保留守护程序线程时,整个Python程序都会退出。初始值是从创建线程继承的。

没有人对这意味着什么有更清楚的解释,或者有实际的示例显示了将线程设置为的位置daemonic

为我澄清一下:因此,您唯一不希望将线程设置为的情况daemonic是,您希望它们在主线程退出后继续运行吗?

In the Python documentation it says:

A thread can be flagged as a “daemon thread”. The significance of this flag is that the entire Python program exits when only daemon threads are left. The initial value is inherited from the creating thread.

Does anyone have a clearer explanation of what that means or a practical example showing where you would set threads as daemonic?

Clarify it for me: so the only situation you wouldn’t set threads as daemonic, is when you want them to continue running after the main thread exits?


回答 0

一些线程执行后台任务,例如发送keepalive数据包,执行定期垃圾回收等。这些仅在主程序正在运行时才有用,并且可以在其他非守护程序线程退出后将其杀死。

如果没有守护程序线程,则必须跟踪它们,并告诉它们退出,然后程序才能完全退出。通过将它们设置为守护程序线程,可以让它们运行并忘记它们,并且在程序退出时,所有守护程序线程都会自动终止。

Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it’s okay to kill them off once the other, non-daemon, threads have exited.

Without daemon threads, you’d have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.


回答 1

假设您正在制作某种仪表板小部件。作为其一部分,您希望它在您的电子邮件框中显示未读邮件数。因此,您将创建一个小线程,该线程将:

  1. 连接到邮件服务器,并询问您有多少未读邮件。
  2. 用更新的计数向GUI发出信号。
  3. 睡一会儿。

当您的小部件启动时,它将创建此线程,将其指定为守护程序,然后启动它。因为它是一个守护程序,所以您不必考虑它。当您的小部件退出时,线程将自动停止。

Let’s say you’re making some kind of dashboard widget. As part of this, you want it to display the unread message count in your email box. So you make a little thread that will:

  1. Connect to the mail server and ask how many unread messages you have.
  2. Signal the GUI with the updated count.
  3. Sleep for a little while.

When your widget starts up, it would create this thread, designate it a daemon, and start it. Because it’s a daemon, you don’t have to think about it; when your widget exits, the thread will stop automatically.


回答 2

其他张贴者提供了一些有关使用守护程序线程的情况的示例。但是,我的建议是永远不要使用它们。

不是因为它们没有用,而是因为如果使用它们,您会遇到一些不良的副作用。在Python运行时开始拆除主线程中的内容之后,守护程序线程仍然可以执行,从而导致一些非常奇怪的异常。

更多信息在这里:

https://joeshaw.org/python-daemon-threads-considered-harmful/

https://mail.python.org/pipermail/python-list/2005-February/343697.html

严格来说,您永远不需要它们,在某些情况下,它只是使实施更容易。

Other posters gave some examples for situations in which you’d use daemon threads. My recommendation, however, is never to use them.

It’s not because they’re not useful, but because there are some bad side effects you can experience if you use them. Daemon threads can still execute after the Python runtime starts tearing down things in the main thread, causing some pretty bizarre exceptions.

More info here:

https://joeshaw.org/python-daemon-threads-considered-harmful/

https://mail.python.org/pipermail/python-list/2005-February/343697.html

Strictly speaking you never need them, it just makes implementation easier in some cases.


回答 3

一种更简单的思考方式可能是:当main返回时,如果仍有非守护进程线程仍在运行,则您的进程将不会退出。

一点建议:涉及线程和同步时,干净关闭很容易出错-如果可以避免,请这样做。尽可能使用守护程序线程。

A simpler way to think about it, perhaps: when main returns, your process will not exit if there are non-daemon threads still running.

A bit of advice: Clean shutdown is easy to get wrong when threads and synchronization are involved – if you can avoid it, do so. Use daemon threads whenever possible.


回答 4

克里斯已经解释了什么是守护程序线程,所以让我们谈谈实际用法。许多线程池实现将守护程序线程用于任务工作者。工作者是从任务队列执行任务的线程。

工作人员需要无限期地等待任务队列中的任务,因为他们不知道何时出现新任务。分配任务的线程(例如主线程)仅知道任务何时结束。主线程等待任务队列变空,然后退出。如果worker是用户线程,即非守护程序,则程序不会终止。即使工人没有做任何有用的事情,它将继续等待这些无限期地运转的工人。标记工人守护程序线程,主线程将在处理完任务后立即杀死它们。

Chris already explained what daemon threads are, so let’s talk about practical usage. Many thread pool implementations use daemon threads for task workers. Workers are threads which execute tasks from task queue.

Worker needs to keep waiting for tasks in task queue indefinitely as they don’t know when new task will appear. Thread which assigns tasks (say main thread) only knows when tasks are over. Main thread waits on task queue to get empty and then exits. If workers are user threads i.e. non-daemon, program won’t terminate. It will keep waiting for these indefinitely running workers, even though workers aren’t doing anything useful. Mark workers daemon threads, and main thread will take care of killing them as soon as it’s done handling tasks.


回答 5

引用克里斯:“ …程序退出时,所有守护程序线程都会被自动杀死。”。我认为可以总结一下。使用它们时应小心,因为它们会在主程序执行完成时突然终止。

Quoting Chris: “… when your program quits, any daemon threads are killed automatically.”. I think that sums it up. You should be careful when you use them as they abruptly terminate when main program executes to completion.


回答 6

当您的第二个线程是非守护程序线程时,您的应用程序的主主线程无法退出,因为它的退出条件也与非守护程序线程的退出绑定在一起。无法在python中强制杀死线程,因此您的应用程序将必须真正等待非守护进程线程退出。如果这种行为不是您想要的,则将您的第二个线程设置为守护程序,这样它就不会阻止您的应用程序退出。

When your second thread is non-Daemon, your application’s primary main thread cannot quit because its exit criteria is being tied to the exit also of non-Daemon thread(s). Threads cannot be forcibly killed in python, therefore your app will have to really wait for the non-Daemon thread(s) to exit. If this behavior is not what you want, then set your second thread as daemon so that it won’t hold back your application from exiting.


函数调用超时

问题:函数调用超时

我正在Python中调用一个函数,该函数可能会停滞并迫使我重新启动脚本。

如何调用该函数或将其包装在其中,以便如果花费的时间超过5秒,脚本将取消该函数并执行其他操作?

I’m calling a function in Python which I know may stall and force me to restart the script.

How do I call the function or what do I wrap it in so that if it takes longer than 5 seconds the script cancels it and does something else?


回答 0

如果您在UNIX上运行,则可以使用信号包:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

调用后10秒钟,将调用alarm.alarm(10)处理程序。这引发了一个异常,您可以从常规Python代码中拦截该异常。

该模块不能很好地与线程配合使用(但是,谁可以呢?)

请注意,由于发生超时时会引发异常,因此它可能最终在函数内部被捕获并被忽略,例如一个这样的函数:

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue

You may use the signal package if you are running on UNIX:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

10 seconds after the call signal.alarm(10), the handler is called. This raises an exception that you can intercept from the regular Python code.

This module doesn’t play well with threads (but then, who does?)

Note that since we raise an exception when timeout happens, it may end up caught and ignored inside the function, for example of one such function:

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue

回答 1

您可以multiprocessing.Process用来精确地做到这一点。

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()

You can use multiprocessing.Process to do exactly that.

Code

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate - may not work if process is stuck for good
        p.terminate()
        # OR Kill - will work for sure, no chance for process to finish nicely however
        # p.kill()

        p.join()

回答 2

如何调用该函数或将其包装起来,以便如果花费的时间超过5秒钟,脚本将取消该函数?

我发布了要点,用装饰器和来解决此问题threading.Timer。这是一个细分。

导入和设置以实现兼容性

它已经通过Python 2和3进行了测试。它也应该在Unix / Linux和Windows下运行。

首先是进口。这些尝试使代码保持一致,而不管Python版本如何:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

使用版本无关代码:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

现在,我们已经从标准库中导入了我们的功能。

exit_after 装饰工

接下来,我们需要一个函数来终止main()子线程:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

这是装饰器本身:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

用法

这是直接回答您有关5秒后退出的问题的用法!:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

演示:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

第二个函数调用将不会结束,而是该过程应退出并回溯!

KeyboardInterrupt 并不总是停止休眠线程

请注意,在Windows上的Python 2上,睡眠不会总是被键盘中断打断,例如:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

除非它明确检查PyErr_CheckSignals(),否则也不太可能中断在扩展程序中运行的代码,请参见 Cython,Python和KeyboardInterrupt被忽略

在任何情况下,我都避免将线程休眠超过一秒钟-这是处理器时间的永恒。

如何调用该函数或将其包装在其中,以便如果花费的时间超过5秒,脚本将取消该函数并执行其他操作?

要捕获它并执行其他操作,可以捕获KeyboardInterrupt。

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

How do I call the function or what do I wrap it in so that if it takes longer than 5 seconds the script cancels it?

I posted a gist that solves this question/problem with a decorator and a threading.Timer. Here it is with a breakdown.

Imports and setups for compatibility

It was tested with Python 2 and 3. It should also work under Unix/Linux and Windows.

First the imports. These attempt to keep the code consistent regardless of the Python version:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

Use version independent code:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

Now we have imported our functionality from the standard library.

exit_after decorator

Next we need a function to terminate the main() from the child thread:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

And here is the decorator itself:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

Usage

And here’s the usage that directly answers your question about exiting after 5 seconds!:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

Demo:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

The second function call will not finish, instead the process should exit with a traceback!

KeyboardInterrupt does not always stop a sleeping thread

Note that sleep will not always be interrupted by a keyboard interrupt, on Python 2 on Windows, e.g.:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

nor is it likely to interrupt code running in extensions unless it explicitly checks for PyErr_CheckSignals(), see Cython, Python and KeyboardInterrupt ignored

I would avoid sleeping a thread more than a second, in any case – that’s an eon in processor time.

How do I call the function or what do I wrap it in so that if it takes longer than 5 seconds the script cancels it and does something else?

To catch it and do something else, you can catch the KeyboardInterrupt.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

回答 3

我有一个不同的建议,它是一个纯函数(具有与线程建议相同的API),并且似乎可以正常工作(基于此线程的建议)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

I have a different proposal which is a pure function (with the same API as the threading suggestion) and seems to work fine (based on suggestions on this thread)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

回答 4

在搜索单元测试的超时调用时,我遇到了这个线程。我没有在答案或第三方软件包中找到任何简单的东西,因此我在下面编写了装饰器,您可以直接进入代码:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

然后,使测试或您喜欢的任何功能超时就这么简单:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

I ran across this thread when searching for a timeout call on unit tests. I didn’t find anything simple in the answers or 3rd party packages so I wrote the decorator below you can drop right into code:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

Then it’s as simple as this to timeout a test or any function you like:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

回答 5

stopit在pypi上找到软件包似乎可以很好地处理超时问题。

我喜欢@stopit.threading_timeoutable装饰器,它增加了一个timeout向装饰的函数参数,该参数完成了您所期望的操作,从而停止了该函数。

在pypi上查看:https ://pypi.python.org/pypi/stopit

The stopit package, found on pypi, seems to handle timeouts well.

I like the @stopit.threading_timeoutable decorator, which adds a timeout parameter to the decorated function, which does what you expect, it stops the function.

Check it out on pypi: https://pypi.python.org/pypi/stopit


回答 6

有很多建议,但没有一个建议使用current.futures,我认为这是处理此问题的最清晰的方法。

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

超级简单的阅读和维护。

我们创建一个池,提交一个进程,然后等待最多5秒钟,然后引发一个TimeoutError,您可以根据需要捕获并处理它。

原生于python 3.2+,并反向移植到2.7(点安装期货)。

线程和进程之间的切换很简单,只要更换ProcessPoolExecutorThreadPoolExecutor

如果您想在超时时终止进程,建议您调查Pebble

There are a lot of suggestions, but none using concurrent.futures, which I think is the most legible way to handle this.

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

Super simple to read and maintain.

We make a pool, submit a single process and then wait up to 5 seconds before raising a TimeoutError that you could catch and handle however you needed.

Native to python 3.2+ and backported to 2.7 (pip install futures).

Switching between threads and processes is as simple as replacing ProcessPoolExecutor with ThreadPoolExecutor.

If you want to terminate the Process on timeout I would suggest looking into Pebble.


回答 7

出色,易于使用且可靠的PyPi项目超时装饰器https://pypi.org/project/timeout-decorator/

安装方式

pip install timeout-decorator

用法

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()

Great, easy to use and reliable PyPi project timeout-decorator (https://pypi.org/project/timeout-decorator/)

installation:

pip install timeout-decorator

Usage:

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()

回答 8

我是wrapt_timeout_decorator的作者

乍一看,这里介绍的大多数解决方案在Linux上都无法正常工作-因为我们有fork()和signal()-但是在Windows上,情况看起来有些不同。当涉及到Linux上的子线程时,您将无法再使用Signals。

为了在Windows下产生一个进程,它必须是可挑选的-许多修饰函数或Class方法不是。

因此,您需要使用更好的Pickler,如莳萝和多进程(而不是Pickle和多进程)-这就是为什么您不能使用ProcessPoolExecutor(或仅在功能有限的情况下)的原因。

对于超时本身-您需要定义超时的含义-因为在Windows上将花费大量(且不确定)的时间来生成该进程。如果超时时间短,这可能会很棘手。让我们假设,生成该过程大约需要0.5秒(很容易!!!)。如果您给出0.2秒的超时时间,应该怎么办?函数是否应在0.5 + 0.2秒后超时(让方法运行0.2秒)?还是被调用的进程应该在0.2秒后超时(在这种情况下,修饰的函数将始终超时,因为在那个时间内它甚至没有生成)?

嵌套的装饰器也很讨厌,您不能在子线程中使用Signals。如果要创建真正的通用跨平台装饰器,则需要考虑所有这些因素(并进行测试)。

其他问题会将异常传递回调用者以及日志记录问题(如果在修饰的函数中使用-不支持在另一个进程中记录文件)

我试图涵盖所有的极端情况,您可能会考虑wrapt_timeout_decorator包,或者至少测试那里使用的单元测试启发的自己的解决方案。

@Alexis Eggermont-很遗憾,我没有足够的意见要发表-也许其他人可以通知您-我想我已经解决了您的导入问题。

I am the author of wrapt_timeout_decorator

Most of the solutions presented here work wunderfully under Linux on the first glance – because we have fork() and signals() – but on windows the things look a bit different. And when it comes to subthreads on Linux, You cant use Signals anymore.

In order to spawn a process under Windows, it needs to be picklable – and many decorated functions or Class methods are not.

So You need to use a better pickler like dill and multiprocess (not pickle and multiprocessing) – thats why You cant use ProcessPoolExecutor (or only with limited functionality).

For the timeout itself – You need to define what timeout means – because on Windows it will take considerable (and not determinable) time to spawn the process. This can be tricky on short timeouts. Lets assume, spawning the process takes about 0.5 seconds (easily !!!). If You give a timeout of 0.2 seconds what should happen ? Should the function time out after 0.5 + 0.2 seconds (so let the method run for 0.2 seconds)? Or should the called process time out after 0.2 seconds (in that case, the decorated function will ALWAYS timeout, because in that time it is not even spawned) ?

Also nested decorators can be nasty and You cant use Signals in a subthread. If You want to create a truly universal, cross-platform decorator, all this needs to be taken into consideration (and tested).

Other issues are passing exceptions back to the caller, as well as logging issues (if used in the decorated function – logging to files in another process is NOT supported)

I tried to cover all edge cases, You might look into the package wrapt_timeout_decorator, or at least test Your own solutions inspired by the unittests used there.

@Alexis Eggermont – unfortunately I dont have enough points to comment – maybe someone else can notify You – I think I solved Your import issue.


回答 9

timeout-decorator不能在Windows系统上正常运行,因为Windows不能signal很好地支持。

如果您在Windows系统中使用timeout-decorator,则会得到以下信息

AttributeError: module 'signal' has no attribute 'SIGALRM'

有些人建议使用use_signals=False但对我没有用。

作者@bitranox创建了以下软件包:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

代码样例:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

给出以下异常:

TimeoutError: Function mytest timed out after 5 seconds

timeout-decorator don’t work on windows system as , windows didn’t support signal well.

If you use timeout-decorator in windows system you will get the following

AttributeError: module 'signal' has no attribute 'SIGALRM'

Some suggested to use use_signals=False but didn’t worked for me.

Author @bitranox created the following package:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

Code Sample:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

Gives the following exception:

TimeoutError: Function mytest timed out after 5 seconds

回答 10

我们可以使用相同的信号。我认为以下示例对您有用。与线程相比,它非常简单。

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"

We can use signals for the same. I think the below example will be useful for you. It is very simple compared to threads.

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"

回答 11

#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)

回答 12

我需要不会被time.sleep(基于线程的方法无法做到)阻止的可嵌套定时中断(SIGALARM无法做到)。我最终从这里复制并修改了代码:http : //code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

代码本身:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

和用法示例:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'

I had a need for nestable timed interrupts (which SIGALARM can’t do) that won’t get blocked by time.sleep (which the thread-based approach can’t do). I ended up copying and lightly modifying code from here: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

The code itself:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

and a usage example:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'

回答 13

这是对给定的基于线程的解决方案的一点改进。

下面的代码支持异常

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

在5秒钟的超时时间内调用它:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)

Here is a slight improvement to the given thread-based solution.

The code below supports exceptions:

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

Invoking it with a 5 second timeout:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)

multiprocessing.Pool:何时使用apply,apply_async或map?

问题:multiprocessing.Pool:何时使用apply,apply_async或map?

我还没有看到关于Pool.applyPool.apply_asyncPool.map用例的清晰示例。我主要使用Pool.map; 别人的优势是什么?

I have not seen clear examples with use-cases for Pool.apply, Pool.apply_async and Pool.map. I am mainly using Pool.map; what are the advantages of others?


回答 0

在Python的早期,要使用任意参数调用函数,可以使用apply

apply(f,args,kwargs)

apply尽管在Python2.7中仍然存在,但在Python3中仍然存在,并且通常不再使用。如今,

f(*args,**kwargs)

是首选。这些multiprocessing.Pool模块尝试提供类似的接口。

Pool.apply就像Python一样apply,不同之处在于函数调用是在单独的进程中执行的。Pool.apply直到功能完成为止。

Pool.apply_async也类似于Python的内置函数apply,除了调用立即返回而不是等待结果而已。AsyncResult返回一个对象。您调用其get()方法以检索函数调用的结果。该get()方法将阻塞直到功能完成。因此,pool.apply(func, args, kwargs)等效于pool.apply_async(func, args, kwargs).get()

与相比Pool.apply,该Pool.apply_async方法还具有一个回调,如果提供该回调,则在函数完成时调用该回调。可以使用它来代替get()

例如:

import multiprocessing as mp
import time

def foo_pool(x):
    time.sleep(2)
    return x*x

result_list = []
def log_result(result):
    # This is called whenever foo_pool(i) returns a result.
    # result_list is modified only by the main process, not the pool workers.
    result_list.append(result)

def apply_async_with_callback():
    pool = mp.Pool()
    for i in range(10):
        pool.apply_async(foo_pool, args = (i, ), callback = log_result)
    pool.close()
    pool.join()
    print(result_list)

if __name__ == '__main__':
    apply_async_with_callback()

可能会产生如下结果

[1, 0, 4, 9, 25, 16, 49, 36, 81, 64]

请注意,与不同pool.map,结果的顺序可能与pool.apply_async调用的顺序不同。


因此,如果您需要在一个单独的进程中运行一个函数,但是希望当前进程在该函数返回之前一直阻塞,请使用Pool.apply。像一样Pool.applyPool.map阻塞直到返回完整的结果。

如果希望工作进程池异步执行许多功能调用,请使用Pool.apply_async。结果的顺序不能保证与调用的顺序相同Pool.apply_async

还要注意,您可以使用调用许多不同的函数Pool.apply_async(并非所有调用都需要使用同一函数)。

相反,Pool.map将相同的函数应用于许多参数。但是,与不同Pool.apply_async,结果按与参数顺序相对应的顺序返回。

Back in the old days of Python, to call a function with arbitrary arguments, you would use apply:

apply(f,args,kwargs)

apply still exists in Python2.7 though not in Python3, and is generally not used anymore. Nowadays,

f(*args,**kwargs)

is preferred. The multiprocessing.Pool modules tries to provide a similar interface.

Pool.apply is like Python apply, except that the function call is performed in a separate process. Pool.apply blocks until the function is completed.

Pool.apply_async is also like Python’s built-in apply, except that the call returns immediately instead of waiting for the result. An AsyncResult object is returned. You call its get() method to retrieve the result of the function call. The get() method blocks until the function is completed. Thus, pool.apply(func, args, kwargs) is equivalent to pool.apply_async(func, args, kwargs).get().

In contrast to Pool.apply, the Pool.apply_async method also has a callback which, if supplied, is called when the function is complete. This can be used instead of calling get().

For example:

import multiprocessing as mp
import time

def foo_pool(x):
    time.sleep(2)
    return x*x

result_list = []
def log_result(result):
    # This is called whenever foo_pool(i) returns a result.
    # result_list is modified only by the main process, not the pool workers.
    result_list.append(result)

def apply_async_with_callback():
    pool = mp.Pool()
    for i in range(10):
        pool.apply_async(foo_pool, args = (i, ), callback = log_result)
    pool.close()
    pool.join()
    print(result_list)

if __name__ == '__main__':
    apply_async_with_callback()

may yield a result such as

[1, 0, 4, 9, 25, 16, 49, 36, 81, 64]

Notice, unlike pool.map, the order of the results may not correspond to the order in which the pool.apply_async calls were made.


So, if you need to run a function in a separate process, but want the current process to block until that function returns, use Pool.apply. Like Pool.apply, Pool.map blocks until the complete result is returned.

If you want the Pool of worker processes to perform many function calls asynchronously, use Pool.apply_async. The order of the results is not guaranteed to be the same as the order of the calls to Pool.apply_async.

Notice also that you could call a number of different functions with Pool.apply_async (not all calls need to use the same function).

In contrast, Pool.map applies the same function to many arguments. However, unlike Pool.apply_async, the results are returned in an order corresponding to the order of the arguments.


回答 1

关于applyvs map

pool.apply(f, args)f仅在池中的一个工作线程中执行。因此,池中的一个进程将运行f(args)

pool.map(f, iterable):此方法将可迭代项分为多个块,将其作为单独的任务提交给流程池。因此,您可以利用池中的所有进程。

Regarding apply vs map:

pool.apply(f, args): f is only executed in ONE of the workers of the pool. So ONE of the processes in the pool will run f(args).

pool.map(f, iterable): This method chops the iterable into a number of chunks which it submits to the process pool as separate tasks. So you take advantage of all the processes in the pool.


回答 2

以下是在一个表的格式,以显示之间的差异的概述Pool.applyPool.apply_asyncPool.mapPool.map_async。选择一个时,必须考虑多个参数,并发性,阻塞和排序:

                  | Multi-args   Concurrence    Blocking     Ordered-results
---------------------------------------------------------------------
Pool.map          | no           yes            yes          yes
Pool.map_async    | no           yes            no           yes
Pool.apply        | yes          no             yes          no
Pool.apply_async  | yes          yes            no           no
Pool.starmap      | yes          yes            yes          yes
Pool.starmap_async| yes          yes            no           no

笔记:

  • Pool.imapPool.imap_async–地图和map_async的惰性版本。

  • Pool.starmap 方法,除了接受多个参数外,与map方法非常相似。

  • Async方法一次提交所有流程,并在完成后检索结果。使用get方法获取结果。

  • Pool.map(或Pool.apply)方法与Python内置map(或套用)非常相似。它们阻塞主流程,直到所有流程完成并返回结果。

例子:

地图

一次调用一份工作清单

results = pool.map(func, [1, 2, 3])

应用

只能被要求一份工作

for x, y in [[1, 1], [2, 2]]:
    results.append(pool.apply(func, (x, y)))

def collect_result(result):
    results.append(result)

map_async

一次调用一份工作清单

pool.map_async(func, jobs, callback=collect_result)

apply_async

只能调用一个作业并在后台并行执行一个作业

for x, y in [[1, 1], [2, 2]]:
    pool.apply_async(worker, (x, y), callback=collect_result)

星图

pool.map支持多个参数的变体

pool.starmap(func, [(1, 1), (2, 1), (3, 1)])

starmap_async

starmap()和map_async()的组合,它对可迭代的可迭代对象进行迭代,并在未包装可迭代对象的情况下调用func。返回结果对象。

pool.starmap_async(calculate_worker, [(1, 1), (2, 1), (3, 1)], callback=collect_result)

参考:

在此处找到完整的文档:https : //docs.python.org/3/library/multiprocessing.html

Here is an overview in a table format in order to show the differences between Pool.apply, Pool.apply_async, Pool.map and Pool.map_async. When choosing one, you have to take multi-args, concurrency, blocking, and ordering into account:

                  | Multi-args   Concurrence    Blocking     Ordered-results
---------------------------------------------------------------------
Pool.map          | no           yes            yes          yes
Pool.map_async    | no           yes            no           yes
Pool.apply        | yes          no             yes          no
Pool.apply_async  | yes          yes            no           no
Pool.starmap      | yes          yes            yes          yes
Pool.starmap_async| yes          yes            no           no

Notes:

  • Pool.imap and Pool.imap_async – lazier version of map and map_async.

  • Pool.starmap method, very much similar to map method besides it acceptance of multiple arguments.

  • Async methods submit all the processes at once and retrieve the results once they are finished. Use get method to obtain the results.

  • Pool.map(or Pool.apply)methods are very much similar to Python built-in map(or apply). They block the main process until all the processes complete and return the result.

Examples:

map

Is called for a list of jobs in one time

results = pool.map(func, [1, 2, 3])

apply

Can only be called for one job

for x, y in [[1, 1], [2, 2]]:
    results.append(pool.apply(func, (x, y)))

def collect_result(result):
    results.append(result)

map_async

Is called for a list of jobs in one time

pool.map_async(func, jobs, callback=collect_result)

apply_async

Can only be called for one job and executes a job in the background in parallel

for x, y in [[1, 1], [2, 2]]:
    pool.apply_async(worker, (x, y), callback=collect_result)

starmap

Is a variant of pool.map which support multiple arguments

pool.starmap(func, [(1, 1), (2, 1), (3, 1)])

starmap_async

A combination of starmap() and map_async() that iterates over iterable of iterables and calls func with the iterables unpacked. Returns a result object.

pool.starmap_async(calculate_worker, [(1, 1), (2, 1), (3, 1)], callback=collect_result)

Reference:

Find complete documentation here: https://docs.python.org/3/library/multiprocessing.html


time.sleep —休眠线程或进程?

问题:time.sleep —休眠线程或进程?

在Python for * nix中,是否time.sleep()阻塞线程或进程?

In Python for *nix, does time.sleep() block the thread or the process?


回答 0

它阻塞线程。如果在Python源代码中查看Modules / timemodule.c,您会看到在调用中floatsleep(),睡眠操作的实质部分包装在Py_BEGIN_ALLOW_THREADS和Py_END_ALLOW_THREADS块中,从而允许其他线程在当前线程继续执行一睡觉。您也可以使用简单的python程序进行测试:

import time
from threading import Thread

class worker(Thread):
    def run(self):
        for x in xrange(0,11):
            print x
            time.sleep(1)

class waiter(Thread):
    def run(self):
        for x in xrange(100,103):
            print x
            time.sleep(5)

def run():
    worker().start()
    waiter().start()

将打印:

>>> thread_test.run()
0
100
>>> 1
2
3
4
5
101
6
7
8
9
10
102

It blocks the thread. If you look in Modules/timemodule.c in the Python source, you’ll see that in the call to floatsleep(), the substantive part of the sleep operation is wrapped in a Py_BEGIN_ALLOW_THREADS and Py_END_ALLOW_THREADS block, allowing other threads to continue to execute while the current one sleeps. You can also test this with a simple python program:

import time
from threading import Thread

class worker(Thread):
    def run(self):
        for x in xrange(0,11):
            print x
            time.sleep(1)

class waiter(Thread):
    def run(self):
        for x in xrange(100,103):
            print x
            time.sleep(5)

def run():
    worker().start()
    waiter().start()

Which will print:

>>> thread_test.run()
0
100
>>> 1
2
3
4
5
101
6
7
8
9
10
102

回答 1

它只会休眠线程,除非您的应用程序只有一个线程,在这种情况下,它将休眠线程并有效地进程。

睡眠中的python文档未指定此内容,因此我当然可以理解混淆!

http://docs.python.org/2/library/time.html

It will just sleep the thread except in the case where your application has only a single thread, in which case it will sleep the thread and effectively the process as well.

The python documentation on sleep doesn’t specify this however, so I can certainly understand the confusion!

http://docs.python.org/2/library/time.html


回答 2

只是线程。

Just the thread.


回答 3

该线程将阻塞,但是该进程仍然有效。

在单线程应用程序中,这意味着您在睡眠时一切都被阻止了。在多线程应用程序中,只有您显式“睡眠”的线程将被阻塞,其他线程仍在进程中运行。

The thread will block, but the process is still alive.

In a single threaded application, this means everything is blocked while you sleep. In a multithreaded application, only the thread you explicitly ‘sleep’ will block and the other threads still run within the process.


回答 4

只有线程,除非您的进程具有单个线程。

Only the thread unless your process has a single thread.


回答 5

进程本身无法运行。关于执行,进程只是线程的容器。这意味着您根本无法暂停该过程。它根本不适用于过程。

Process is not runnable by itself. In regard to execution, process is just a container for threads. Meaning you can’t pause the process at all. It is simply not applicable to process.


回答 6

如果它在同一线程中执行,则它将阻塞一个线程,如果不是从主代码中执行,则它将阻塞该线程

it blocks a thread if it is executed in the same thread not if it is executed from the main code