标签归档:python-multithreading

如何在Python中找到线程ID

问题:如何在Python中找到线程ID

我有一个多线程Python程序和一个实用程序函数, writeLog(message),该写出时间戳记和消息。不幸的是,结果日志文件没有给出哪个线程正在生成哪个消息的指示。

我希望writeLog()能够在消息中添加一些内容,以标识哪个线程正在调用它。显然,我可以使线程将这些信息传递进来,但这将需要更多工作。是否有一些os.getpid()我可以使用的等效线程?

I have a multi-threading Python program, and a utility function, writeLog(message), that writes out a timestamp followed by the message. Unfortunately, the resultant log file gives no indication of which thread is generating which message.

I would like writeLog() to be able to add something to the message to identify which thread is calling it. Obviously I could just make the threads pass this information in, but that would be a lot more work. Is there some thread equivalent of os.getpid() that I could use?


回答 0

threading.get_ident(),或threading.current_thread().ident(或(threading.currentThread().ident对于python <2.6)。

threading.get_ident() works, or threading.current_thread().ident (or threading.currentThread().ident for Python < 2.6).


回答 1

使用日志记录模块,您可以在每个日志条目中自动添加当前线程标识符。只需在记录器格式字符串中使用以下LogRecord映射键之一:

%(thread)d: 线程ID(如果有)。

%(threadName)s: 线程名称(如果有)。

并使用它设置默认处理程序:

logging.basicConfig(format="%(threadName)s:%(message)s")

Using the logging module you can automatically add the current thread identifier in each log entry. Just use one of these LogRecord mapping keys in your logger format string:

%(thread)d : Thread ID (if available).

%(threadName)s : Thread name (if available).

and set up your default handler with it:

logging.basicConfig(format="%(threadName)s:%(message)s")

回答 2

thread.get_ident()函数在Linux上返回一个长整数。这实际上不是线程ID。

我使用这种方法来真正获取Linux上的线程ID:

import ctypes
libc = ctypes.cdll.LoadLibrary('libc.so.6')

# System dependent, see e.g. /usr/include/x86_64-linux-gnu/asm/unistd_64.h
SYS_gettid = 186

def getThreadId():
   """Returns OS thread id - Specific to Linux"""
   return libc.syscall(SYS_gettid)

The thread.get_ident() function returns a long integer on Linux. It’s not really a thread id.

I use this method to really get the thread id on Linux:

import ctypes
libc = ctypes.cdll.LoadLibrary('libc.so.6')

# System dependent, see e.g. /usr/include/x86_64-linux-gnu/asm/unistd_64.h
SYS_gettid = 186

def getThreadId():
   """Returns OS thread id - Specific to Linux"""
   return libc.syscall(SYS_gettid)

回答 3


回答 4

我看到了这样的线程ID的示例:

class myThread(threading.Thread):
    def __init__(self, threadID, name, counter):
        self.threadID = threadID
        ...

线程模块文档列表name属性,以及:

...

A thread has a name. 
The name can be passed to the constructor, 
and read or changed through the name attribute.

...

Thread.name

A string used for identification purposes only. 
It has no semantics. Multiple threads may
be given the same name. The initial name is set by the constructor.

I saw examples of thread IDs like this:

class myThread(threading.Thread):
    def __init__(self, threadID, name, counter):
        self.threadID = threadID
        ...

The threading module docs lists name attribute as well:

...

A thread has a name. 
The name can be passed to the constructor, 
and read or changed through the name attribute.

...

Thread.name

A string used for identification purposes only. 
It has no semantics. Multiple threads may
be given the same name. The initial name is set by the constructor.

回答 5

您可以获得当前正在运行的线程的标识。如果当前线程结束,则该标识可以重用于其他线程。

创建线程实例时,将为该线程隐式指定一个名称,即模式:线程号

名称没有意义,名称不必唯一。所有正在运行的线程的标识都是唯一的。

import threading


def worker():
    print(threading.current_thread().name)
    print(threading.get_ident())


threading.Thread(target=worker).start()
threading.Thread(target=worker, name='foo').start()

函数threading.current_thread()返回当前正在运行的线程。该对象保存线程的全部信息。

You can get the ident of the current running thread. The ident could be reused for other threads, if the current thread ends.

When you crate an instance of Thread, a name is given implicit to the thread, which is the pattern: Thread-number

The name has no meaning and the name don’t have to be unique. The ident of all running threads is unique.

import threading


def worker():
    print(threading.current_thread().name)
    print(threading.get_ident())


threading.Thread(target=worker).start()
threading.Thread(target=worker, name='foo').start()

The function threading.current_thread() returns the current running thread. This object holds the whole information of the thread.


回答 6

我在Python中创建了多个线程,打印了线程对象,并使用ident变量打印了id 。我看到所有ID都一样:

<Thread(Thread-1, stopped 140500807628544)>
<Thread(Thread-2, started 140500807628544)>
<Thread(Thread-3, started 140500807628544)>

I created multiple threads in Python, I printed the thread objects, and I printed the id using the ident variable. I see all the ids are same:

<Thread(Thread-1, stopped 140500807628544)>
<Thread(Thread-2, started 140500807628544)>
<Thread(Thread-3, started 140500807628544)>

回答 7

与@brucexin类似,我需要获取操作系统级别的线程标识符(!= thread.get_ident()),并使用如下所示的内容来不依赖于特定的数字并且仅使用amd64:

---- 8< ---- (xos.pyx)
"""module xos complements standard module os""" 

cdef extern from "<sys/syscall.h>":                                                             
    long syscall(long number, ...)                                                              
    const int SYS_gettid                                                                        

# gettid returns current OS thread identifier.                                                  
def gettid():                                                                                   
    return syscall(SYS_gettid)                                                                  

---- 8< ---- (test.py)
import pyximport; pyximport.install()
import xos

...

print 'my tid: %d' % xos.gettid()

这取决于Cython。

Similarly to @brucexin I needed to get OS-level thread identifier (which != thread.get_ident()) and use something like below not to depend on particular numbers and being amd64-only:

---- 8< ---- (xos.pyx)
"""module xos complements standard module os""" 

cdef extern from "<sys/syscall.h>":                                                             
    long syscall(long number, ...)                                                              
    const int SYS_gettid                                                                        

# gettid returns current OS thread identifier.                                                  
def gettid():                                                                                   
    return syscall(SYS_gettid)                                                                  

and

---- 8< ---- (test.py)
import pyximport; pyximport.install()
import xos

...

print 'my tid: %d' % xos.gettid()

this depends on Cython though.


列表是线程安全的吗?

问题:列表是线程安全的吗?

我注意到经常建议使用具有多个线程的队列,而不是列表和.pop()。这是因为列表不是线程安全的,还是出于其他原因?

I notice that it is often suggested to use queues with multiple threads, instead of lists and .pop(). Is this because lists are not thread-safe, or for some other reason?


回答 0

列表本身是线程安全的。在CPython中,GIL防止对它们的并发访问,而其他实现则要小心地为它们的列表实现使用细粒度锁或同步数据类型。但是,虽然列表本身不会因尝试并发访问而损坏,但是列表的数据不受保护。例如:

L[0] += 1

如果另一个线程做同样的事情,则不能保证实际上将L [0]增加一,因为 +=这不是原子操作。(实际上,Python中很少有原子操作的操作,因为它们中的大多数操作都会导致调用任意Python代码。)您应该使用Queues,因为如果您仅使用不受保护的列表,则可能由于种族而获得或删除了错误的项目条件。

Lists themselves are thread-safe. In CPython the GIL protects against concurrent accesses to them, and other implementations take care to use a fine-grained lock or a synchronized datatype for their list implementations. However, while lists themselves can’t go corrupt by attempts to concurrently access, the lists’s data is not protected. For example:

L[0] += 1

is not guaranteed to actually increase L[0] by one if another thread does the same thing, because += is not an atomic operation. (Very, very few operations in Python are actually atomic, because most of them can cause arbitrary Python code to be called.) You should use Queues because if you just use an unprotected list, you may get or delete the wrong item because of race conditions.


回答 1

为了澄清托马斯出色答案的观点,应该提到的append() 线程安全的。

这是因为不必担心一旦我们去写入数据,读取的数据就会位于同一位置。该操作不读取数据,仅将数据写入列表。append()

To clarify a point in Thomas’ excellent answer, it should be mentioned that append() is thread safe.

This is because there is no concern that data being read will be in the same place once we go to write to it. The append() operation does not read data, it only writes data to the list.


回答 2

list操作示例以及它们是否线程安全的完整但不详尽的列表。希望能得到关于答案obj in a_list的语言结构在这里

Here’s a comprehensive yet non-exhaustive list of examples of list operations and whether or not they are thread safe. Hoping to get an answer regarding the obj in a_list language construct here.


回答 3

我最近遇到过这种情况,我需要在一个线程中连续地追加到列表,循环遍历这些项目,并检查该项目是否准备就绪,在我的情况下,它是一个AsyncResult,只有在准备就绪时才将其从列表中删除。我找不到任何示例可以清楚地说明我的问题。这是一个示例,该示例演示了在一个线程中连续添加到列表,并在另一个线程中连续从同一列表中删除该有缺陷的版本很容易在较小的数字上运行,但保持足够大的数字并运行一个几次,你会看到错误

FLAWED版本

import threading
import time

# Change this number as you please, bigger numbers will get the error quickly
count = 1000
l = []

def add():
    for i in range(count):
        l.append(i)
        time.sleep(0.0001)

def remove():
    for i in range(count):
        l.remove(i)
        time.sleep(0.0001)


t1 = threading.Thread(target=add)
t2 = threading.Thread(target=remove)
t1.start()
t2.start()
t1.join()
t2.join()

print(l)

错误时输出

Exception in thread Thread-63:
Traceback (most recent call last):
  File "/Users/zup/.pyenv/versions/3.6.8/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/zup/.pyenv/versions/3.6.8/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-30-ecfbac1c776f>", line 13, in remove
    l.remove(i)
ValueError: list.remove(x): x not in list

使用锁的版本

import threading
import time
count = 1000
l = []
lock = threading.RLock()
def add():
    with lock:
        for i in range(count):
            l.append(i)
            time.sleep(0.0001)

def remove():
    with lock:
        for i in range(count):
            l.remove(i)
            time.sleep(0.0001)


t1 = threading.Thread(target=add)
t2 = threading.Thread(target=remove)
t1.start()
t2.start()
t1.join()
t2.join()

print(l)

输出量

[] # Empty list

结论

如前面的回答中所述,虽然从列表本身追加或弹出元素的行为是线程安全的,但当您在一个线程中追加并在另一个线程中弹出时,不是线程安全的

I recently had this case where I needed to append to a list continuously in one thread, loop through the items and check if the item was ready, it was an AsyncResult in my case and remove it from the list only if it was ready. I could not find any examples that demonstrated my problem clearly Here is an example demonstrating adding to list in one thread continuously and removing from the same list in another thread continuously The flawed version runs easily on smaller numbers but keep the numbers big enough and run a few times and you will see the error

The FLAWED version

import threading
import time

# Change this number as you please, bigger numbers will get the error quickly
count = 1000
l = []

def add():
    for i in range(count):
        l.append(i)
        time.sleep(0.0001)

def remove():
    for i in range(count):
        l.remove(i)
        time.sleep(0.0001)


t1 = threading.Thread(target=add)
t2 = threading.Thread(target=remove)
t1.start()
t2.start()
t1.join()
t2.join()

print(l)

Output when ERROR

Exception in thread Thread-63:
Traceback (most recent call last):
  File "/Users/zup/.pyenv/versions/3.6.8/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/Users/zup/.pyenv/versions/3.6.8/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-30-ecfbac1c776f>", line 13, in remove
    l.remove(i)
ValueError: list.remove(x): x not in list

Version that uses locks

import threading
import time
count = 1000
l = []
lock = threading.RLock()
def add():
    with lock:
        for i in range(count):
            l.append(i)
            time.sleep(0.0001)

def remove():
    with lock:
        for i in range(count):
            l.remove(i)
            time.sleep(0.0001)


t1 = threading.Thread(target=add)
t2 = threading.Thread(target=remove)
t1.start()
t2.start()
t1.join()
t2.join()

print(l)

Output

[] # Empty list

Conclusion

As mentioned in the earlier answers while the act of appending or popping elements from the list itself is thread safe, what is not thread safe is when you append in one thread and pop in another


Python线程中的join()有什么用?

问题:Python线程中的join()有什么用?

我正在研究python线程并遇到了join()

作者告诉我们,如果线程处于守护程序模式,那么我需要使用它,join()以便线程可以在主线程终止之前完成自身。

但我一直在使用也没见他t.join()即使tdaemon

示例代码是这个

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def daemon():
    logging.debug('Starting')
    time.sleep(2)
    logging.debug('Exiting')

d = threading.Thread(name='daemon', target=daemon)
d.setDaemon(True)

def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

d.join()
t.join()

我不知道这是什么用途,t.join()因为它不是守护程序,即使删除它也看不到任何变化

I was studying the python threading and came across join().

The author told that if thread is in daemon mode then i need to use join() so that thread can finish itself before main thread terminates.

but I have also seen him using t.join() even though t was not daemon

example code is this

import threading
import time
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def daemon():
    logging.debug('Starting')
    time.sleep(2)
    logging.debug('Exiting')

d = threading.Thread(name='daemon', target=daemon)
d.setDaemon(True)

def non_daemon():
    logging.debug('Starting')
    logging.debug('Exiting')

t = threading.Thread(name='non-daemon', target=non_daemon)

d.start()
t.start()

d.join()
t.join()

i don’t know what is use of t.join() as it is not daemon and i can see no change even if i remove it


回答 0

展示这种机制的技术有些笨拙:join()大概是由主线程调用的。也可以由另一个线程调用它,但是会不必要地使该图复杂化。

join-调用应该放在主线程的轨道上,但是为了表达线程关系并使其尽可能简单,我选择将其放在子线程中。

without join:
+---+---+------------------                     main-thread
    |   |
    |   +...........                            child-thread(short)
    +..................................         child-thread(long)

with join
+---+---+------------------***********+###      main-thread
    |   |                             |
    |   +...........join()            |         child-thread(short)
    +......................join()......         child-thread(long)

with join and daemon thread
+-+--+---+------------------***********+###     parent-thread
  |  |   |                             |
  |  |   +...........join()            |        child-thread(short)
  |  +......................join()......        child-thread(long)
  +,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,     child-thread(long + daemonized)

'-' main-thread/parent-thread/main-program execution
'.' child-thread execution
'#' optional parent-thread execution after join()-blocked parent-thread could 
    continue
'*' main-thread 'sleeping' in join-method, waiting for child-thread to finish
',' daemonized thread - 'ignores' lifetime of other threads;
    terminates when main-programs exits; is normally meant for 
    join-independent tasks

因此,您看不到任何更改的原因是因为您的主线程在之后没有执行任何操作join。您可以说join(仅)与主线程的执行流程相关。

例如,如果要同时下载一堆页面以将它们串联成一个大页面,则可以使用线程开始并发下载,但是需要等到最后一页/线程完成后才能开始组装单个页面在很多。那是您使用的时间join()

A somewhat clumsy ascii-art to demonstrate the mechanism: The join() is presumably called by the main-thread. It could also be called by another thread, but would needlessly complicate the diagram.

join-calling should be placed in the track of the main-thread, but to express thread-relation and keep it as simple as possible, I choose to place it in the child-thread instead.

without join:
+---+---+------------------                     main-thread
    |   |
    |   +...........                            child-thread(short)
    +..................................         child-thread(long)

with join
+---+---+------------------***********+###      main-thread
    |   |                             |
    |   +...........join()            |         child-thread(short)
    +......................join()......         child-thread(long)

with join and daemon thread
+-+--+---+------------------***********+###     parent-thread
  |  |   |                             |
  |  |   +...........join()            |        child-thread(short)
  |  +......................join()......        child-thread(long)
  +,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,     child-thread(long + daemonized)

'-' main-thread/parent-thread/main-program execution
'.' child-thread execution
'#' optional parent-thread execution after join()-blocked parent-thread could 
    continue
'*' main-thread 'sleeping' in join-method, waiting for child-thread to finish
',' daemonized thread - 'ignores' lifetime of other threads;
    terminates when main-programs exits; is normally meant for 
    join-independent tasks

So the reason you don’t see any changes is because your main-thread does nothing after your join. You could say join is (only) relevant for the execution-flow of the main-thread.

If, for example, you want to concurrently download a bunch of pages to concatenate them into a single large page, you may start concurrent downloads using threads, but need to wait until the last page/thread is finished before you start assembling a single page out of many. That’s when you use join().


回答 1

直接来自文档

join([timeout])等待线程终止。这将阻塞调用线程,直到被调用join()方法的线程终止(正常或通过未处理的异常终止),或者直到发生可选的超时。

这意味着,主线程其产卵td,等待t完成,直到它完成。

根据您的程序采用的逻辑,您可能要等到线程完成后再继续执行主线程。

另外从文档:

线程可以标记为“守护程序线程”。该标志的重要性在于,仅保留守护程序线程时,整个Python程序都会退出。

一个简单的例子,说我们有这个:

def non_daemon():
    time.sleep(5)
    print 'Test non-daemon'

t = threading.Thread(name='non-daemon', target=non_daemon)

t.start()

结束于:

print 'Test one'
t.join()
print 'Test two'

这将输出:

Test one
Test non-daemon
Test two

在此,主线程显式等待t线程完成,直到print第二次调用为止。

或者,如果我们有这个:

print 'Test one'
print 'Test two'
t.join()

我们将得到以下输出:

Test one
Test two
Test non-daemon

在这里,我们在主线程中完成工作,然后等待t线程完成。在这种情况下,我们甚至可以删除显式连接t.join(),并且程序将隐式等待t完成。

Straight from the docs

join([timeout]) Wait until the thread terminates. This blocks the calling thread until the thread whose join() method is called terminates – either normally or through an unhandled exception – or until the optional timeout occurs.

This means that the main thread which spawns t and d, waits for t to finish until it finishes.

Depending on the logic your program employs, you may want to wait until a thread finishes before your main thread continues.

Also from the docs:

A thread can be flagged as a “daemon thread”. The significance of this flag is that the entire Python program exits when only daemon threads are left.

A simple example, say we have this:

def non_daemon():
    time.sleep(5)
    print 'Test non-daemon'

t = threading.Thread(name='non-daemon', target=non_daemon)

t.start()

Which finishes with:

print 'Test one'
t.join()
print 'Test two'

This will output:

Test one
Test non-daemon
Test two

Here the master thread explicitly waits for the t thread to finish until it calls print the second time.

Alternatively if we had this:

print 'Test one'
print 'Test two'
t.join()

We’ll get this output:

Test one
Test two
Test non-daemon

Here we do our job in the main thread and then we wait for the t thread to finish. In this case we might even remove the explicit joining t.join() and the program will implicitly wait for t to finish.


回答 2

感谢您提供此主题-它对我也有很大帮助。

我今天学到了有关.join()的知识。

这些线程并行运行:

d.start()
t.start()
d.join()
t.join()

这些依次运行(不是我想要的):

d.start()
d.join()
t.start()
t.join()

特别是,我试图变得聪明和整洁:

class Kiki(threading.Thread):
    def __init__(self, time):
        super(Kiki, self).__init__()
        self.time = time
        self.start()
        self.join()

这可行!但是它顺序运行。我可以将self.start()放在__ init __中,但是不能将self.join()放入。必须每个线程启动之后执行此操作。

join()是导致主线程等待线程完成的原因。否则,您的线程将独自运行。

因此,可以将join()视为主线程上的一个“保留”方法-它可以对线程进行解线程,并在主线程可以继续之前在主线程中按顺序执行。它可以确保您的线程在主线程向前移动之前是完整的。请注意,这意味着在调用join()之前线程已经完成是可以的-在调用join()时立即立即释放主线程。

实际上,我刚才想到的是,主线程在d.join()上等待,直到线程d完成,然后才移至t.join()。

实际上,非常清楚,请考虑以下代码:

import threading
import time

class Kiki(threading.Thread):
    def __init__(self, time):
        super(Kiki, self).__init__()
        self.time = time
        self.start()

    def run(self):
        print self.time, " seconds start!"
        for i in range(0,self.time):
            time.sleep(1)
            print "1 sec of ", self.time
        print self.time, " seconds finished!"


t1 = Kiki(3)
t2 = Kiki(2)
t3 = Kiki(1)
t1.join()
print "t1.join() finished"
t2.join()
print "t2.join() finished"
t3.join()
print "t3.join() finished"

它产生此输出(请注意print语句如何彼此穿线。)

$ python test_thread.py
32   seconds start! seconds start!1

 seconds start!
1 sec of  1
 1 sec of 1  seconds finished!
 21 sec of
3
1 sec of  3
1 sec of  2
2  seconds finished!
1 sec of  3
3  seconds finished!
t1.join() finished
t2.join() finished
t3.join() finished
$ 

t1.join()正在阻止主线程。所有三个线程在t1.join()完成之前完成,并且主线程继续执行打印,然后执行t2.join()然后执行打印,然后执行t3.join()然后进行打印。

欢迎更正。我也是线程新手。

(注意:如果您有兴趣,我正在为DrinkBot编写代码,并且我需要线程来同时运行配料泵,而不是顺序运行-等待每杯饮料的时间更少。)

Thanks for this thread — it helped me a lot too.

I learned something about .join() today.

These threads run in parallel:

d.start()
t.start()
d.join()
t.join()

and these run sequentially (not what I wanted):

d.start()
d.join()
t.start()
t.join()

In particular, I was trying to clever and tidy:

class Kiki(threading.Thread):
    def __init__(self, time):
        super(Kiki, self).__init__()
        self.time = time
        self.start()
        self.join()

This works! But it runs sequentially. I can put the self.start() in __ init __, but not the self.join(). That has to be done after every thread has been started.

join() is what causes the main thread to wait for your thread to finish. Otherwise, your thread runs all by itself.

So one way to think of join() as a “hold” on the main thread — it sort of de-threads your thread and executes sequentially in the main thread, before the main thread can continue. It assures that your thread is complete before the main thread moves forward. Note that this means it’s ok if your thread is already finished before you call the join() — the main thread is simply released immediately when join() is called.

In fact, it just now occurs to me that the main thread waits at d.join() until thread d finishes before it moves on to t.join().

In fact, to be very clear, consider this code:

import threading
import time

class Kiki(threading.Thread):
    def __init__(self, time):
        super(Kiki, self).__init__()
        self.time = time
        self.start()

    def run(self):
        print self.time, " seconds start!"
        for i in range(0,self.time):
            time.sleep(1)
            print "1 sec of ", self.time
        print self.time, " seconds finished!"


t1 = Kiki(3)
t2 = Kiki(2)
t3 = Kiki(1)
t1.join()
print "t1.join() finished"
t2.join()
print "t2.join() finished"
t3.join()
print "t3.join() finished"

It produces this output (note how the print statements are threaded into each other.)

$ python test_thread.py
32   seconds start! seconds start!1

 seconds start!
1 sec of  1
 1 sec of 1  seconds finished!
 21 sec of
3
1 sec of  3
1 sec of  2
2  seconds finished!
1 sec of  3
3  seconds finished!
t1.join() finished
t2.join() finished
t3.join() finished
$ 

The t1.join() is holding up the main thread. All three threads complete before the t1.join() finishes and the main thread moves on to execute the print then t2.join() then print then t3.join() then print.

Corrections welcome. I’m also new to threading.

(Note: in case you’re interested, I’m writing code for a DrinkBot, and I need threading to run the ingredient pumps concurrently rather than sequentially — less time to wait for each drink.)


回答 3

方法join()

阻塞调用线程,直到调用join()方法的线程终止。

来源:http : //docs.python.org/2/library/threading.html

The method join()

blocks the calling thread until the thread whose join() method is called is terminated.

Source : http://docs.python.org/2/library/threading.html


回答 4

简单了解

与join-解释器将等待,直到您的过程完成终止

>>> from threading import Thread
>>> import time
>>> def sam():
...   print 'started'
...   time.sleep(10)
...   print 'waiting for 10sec'
... 
>>> t = Thread(target=sam)
>>> t.start()
started

>>> t.join() # with join interpreter will wait until your process get completed or terminated
done?   # this line printed after thread execution stopped i.e after 10sec
waiting for 10sec
>>> done?

没有连接解释器不会等到进程被终止

>>> t = Thread(target=sam)
>>> t.start()
started
>>> print 'yes done' #without join interpreter wont wait until process get terminated
yes done
>>> waiting for 10sec

With join – interpreter will wait until your process get completed or terminated

>>> from threading import Thread
>>> import time
>>> def sam():
...   print 'started'
...   time.sleep(10)
...   print 'waiting for 10sec'
... 
>>> t = Thread(target=sam)
>>> t.start()
started

>>> t.join() # with join interpreter will wait until your process get completed or terminated
done?   # this line printed after thread execution stopped i.e after 10sec
waiting for 10sec
>>> done?

without join – interpreter wont wait until process get terminated,

>>> t = Thread(target=sam)
>>> t.start()
started
>>> print 'yes done' #without join interpreter wont wait until process get terminated
yes done
>>> waiting for 10sec

回答 5

join(t)同时为非守护程序线程和守护程序线程创建函数时,主线程(或主进程)应等待t几秒钟,然后才能进一步在其自己的进程上工作。在t几秒钟的等待时间内,两个子线程都应该做他们可以做的事情,例如打印一些文本。在t几秒钟之后,如果非守护程序线程仍然没有完成其工作,并且它仍可以在主进程完成其工作之后完成它,但是对于守护程序线程,它只是错过了机会窗口。但是,它将在python程序退出后最终消失。如果有问题,请纠正我。

When making join(t) function for both non-daemon thread and daemon thread, the main thread (or main process) should wait t seconds, then can go further to work on its own process. During the t seconds waiting time, both of the children threads should do what they can do, such as printing out some text. After the t seconds, if non-daemon thread still didn’t finish its job, and it still can finish it after the main process finishes its job, but for daemon thread, it just missed its opportunity window. However, it will eventually die after the python program exits. Please correct me if there is something wrong.


回答 6

在python 3.x中,join()用于将线程与主线程连接,即,当join()用于特定线程时,主线程将停止执行,直到连接线程的执行完成为止。

#1 - Without Join():
import threading
import time
def loiter():
    print('You are loitering!')
    time.sleep(5)
    print('You are not loitering anymore!')

t1 = threading.Thread(target = loiter)
t1.start()
print('Hey, I do not want to loiter!')
'''
Output without join()--> 
You are loitering!
Hey, I do not want to loiter!
You are not loitering anymore! #After 5 seconds --> This statement will be printed

'''
#2 - With Join():
import threading
import time
def loiter():
    print('You are loitering!')
    time.sleep(5)
    print('You are not loitering anymore!')

t1 = threading.Thread(target = loiter)
t1.start()
t1.join()
print('Hey, I do not want to loiter!')

'''
Output with join() -->
You are loitering!
You are not loitering anymore! #After 5 seconds --> This statement will be printed
Hey, I do not want to loiter! 

'''

In python 3.x join() is used to join a thread with the main thread i.e. when join() is used for a particular thread the main thread will stop executing until the execution of joined thread is complete.

#1 - Without Join():
import threading
import time
def loiter():
    print('You are loitering!')
    time.sleep(5)
    print('You are not loitering anymore!')

t1 = threading.Thread(target = loiter)
t1.start()
print('Hey, I do not want to loiter!')
'''
Output without join()--> 
You are loitering!
Hey, I do not want to loiter!
You are not loitering anymore! #After 5 seconds --> This statement will be printed

'''
#2 - With Join():
import threading
import time
def loiter():
    print('You are loitering!')
    time.sleep(5)
    print('You are not loitering anymore!')

t1 = threading.Thread(target = loiter)
t1.start()
t1.join()
print('Hey, I do not want to loiter!')

'''
Output with join() -->
You are loitering!
You are not loitering anymore! #After 5 seconds --> This statement will be printed
Hey, I do not want to loiter! 

'''

回答 7

此示例演示了该.join()操作:

import threading
import time

def threaded_worker():
    for r in range(10):
        print('Other: ', r)
        time.sleep(2)

thread_ = threading.Timer(1, threaded_worker)
thread_.daemon = True  # If the main thread kills, this thread will be killed too. 
thread_.start()

flag = True

for i in range(10):
    print('Main: ', i)
    time.sleep(2)
    if flag and i > 4:
        print(
            '''
            Threaded_worker() joined to the main thread. 
            Now we have a sequential behavior instead of concurrency.
            ''')
        thread_.join()
        flag = False

出:

Main:  0
Other:  0
Main:  1
Other:  1
Main:  2
Other:  2
Main:  3
Other:  3
Main:  4
Other:  4
Main:  5
Other:  5

            Threaded_worker() joined to the main thread. 
            Now we have a sequential behavior instead of concurrency.

Other:  6
Other:  7
Other:  8
Other:  9
Main:  6
Main:  7
Main:  8
Main:  9

This example demonstrate the .join() action:

import threading
import time

def threaded_worker():
    for r in range(10):
        print('Other: ', r)
        time.sleep(2)

thread_ = threading.Timer(1, threaded_worker)
thread_.daemon = True  # If the main thread kills, this thread will be killed too. 
thread_.start()

flag = True

for i in range(10):
    print('Main: ', i)
    time.sleep(2)
    if flag and i > 4:
        print(
            '''
            Threaded_worker() joined to the main thread. 
            Now we have a sequential behavior instead of concurrency.
            ''')
        thread_.join()
        flag = False

Out:

Main:  0
Other:  0
Main:  1
Other:  1
Main:  2
Other:  2
Main:  3
Other:  3
Main:  4
Other:  4
Main:  5
Other:  5

            Threaded_worker() joined to the main thread. 
            Now we have a sequential behavior instead of concurrency.

Other:  6
Other:  7
Other:  8
Other:  9
Main:  6
Main:  7
Main:  8
Main:  9

回答 8

主线程(或任何其他线程)加入其他线程的原因有几个

  1. 线程可能已经创建或持有(锁定)了一些资源。加入调用线程可能能够代表它清除资源

  2. join()是自然的阻塞调用,用于在被调用线程终止后继续继续调用join调用线程。

如果python程序未加入其他线程,则python解释器仍将代表其加入非守护程序线程。

There are a few reasons for the main thread (or any other thread) to join other threads

  1. A thread may have created or holding (locking) some resources. The join-calling thread may be able to clear the resources on its behalf

  2. join() is a natural blocking call for the join-calling thread to continue after the called thread has terminated.

If a python program does not join other threads, the python interpreter will still join non-daemon threads on its behalf.


回答 9

“使用join()有什么用?” 你说。确实,答案与“关闭文件有什么用,因为当程序退出时python和OS会为我关闭我的文件吗?”。

这仅仅是一个良好的编程问题。您应该在代码中不应运行该线程的位置加入join()线程,这是因为您肯定要确保该线程未在运行以干扰您自己的代码,或者您想在一个线程中正确运行更大的系统。

您可能因为join()可能需要额外的时间而说“我不希望我的代码延迟给出答案”。在某些情况下,这可能是完全正确的,但是现在您需要考虑到您的代码“为Python和OS清理留下了麻烦”。如果出于性能原因执行此操作,强烈建议您记录该行为。如果您要构建其他人希望使用的库/包,则尤其如此。

除了性能原因外,没有任何理由不加入join(),我认为您的代码不需要表现得那么好。

“What’s the use of using join()?” you say. Really, it’s the same answer as “what’s the use of closing files, since python and the OS will close my file for me when my program exits?”.

It’s simply a matter of good programming. You should join() your threads at the point in the code that the thread should not be running anymore, either because you positively have to ensure the thread is not running to interfere with your own code, or that you want to behave correctly in a larger system.

You might say “I don’t want my code to delay giving an answer” just because of the additional time that the join() might require. This may be perfectly valid in some scenarios, but you now need to take into account that your code is “leaving cruft around for python and the OS to clean up”. If you do this for performance reasons, I strongly encourage you to document that behavior. This is especially true if you’re building a library/package that others are expected to utilize.

There’s no reason to not join(), other than performance reasons, and I would argue that your code does not need to perform that well.


守护程序线程说明

问题:守护程序线程说明

Python文档中 它说:

线程可以标记为“守护程序线程”。该标志的重要性在于,仅保留守护程序线程时,整个Python程序都会退出。初始值是从创建线程继承的。

没有人对这意味着什么有更清楚的解释,或者有实际的示例显示了将线程设置为的位置daemonic

为我澄清一下:因此,您唯一不希望将线程设置为的情况daemonic是,您希望它们在主线程退出后继续运行吗?

In the Python documentation it says:

A thread can be flagged as a “daemon thread”. The significance of this flag is that the entire Python program exits when only daemon threads are left. The initial value is inherited from the creating thread.

Does anyone have a clearer explanation of what that means or a practical example showing where you would set threads as daemonic?

Clarify it for me: so the only situation you wouldn’t set threads as daemonic, is when you want them to continue running after the main thread exits?


回答 0

一些线程执行后台任务,例如发送keepalive数据包,执行定期垃圾回收等。这些仅在主程序正在运行时才有用,并且可以在其他非守护程序线程退出后将其杀死。

如果没有守护程序线程,则必须跟踪它们,并告诉它们退出,然后程序才能完全退出。通过将它们设置为守护程序线程,可以让它们运行并忘记它们,并且在程序退出时,所有守护程序线程都会自动终止。

Some threads do background tasks, like sending keepalive packets, or performing periodic garbage collection, or whatever. These are only useful when the main program is running, and it’s okay to kill them off once the other, non-daemon, threads have exited.

Without daemon threads, you’d have to keep track of them, and tell them to exit, before your program can completely quit. By setting them as daemon threads, you can let them run and forget about them, and when your program quits, any daemon threads are killed automatically.


回答 1

假设您正在制作某种仪表板小部件。作为其一部分,您希望它在您的电子邮件框中显示未读邮件数。因此,您将创建一个小线程,该线程将:

  1. 连接到邮件服务器,并询问您有多少未读邮件。
  2. 用更新的计数向GUI发出信号。
  3. 睡一会儿。

当您的小部件启动时,它将创建此线程,将其指定为守护程序,然后启动它。因为它是一个守护程序,所以您不必考虑它。当您的小部件退出时,线程将自动停止。

Let’s say you’re making some kind of dashboard widget. As part of this, you want it to display the unread message count in your email box. So you make a little thread that will:

  1. Connect to the mail server and ask how many unread messages you have.
  2. Signal the GUI with the updated count.
  3. Sleep for a little while.

When your widget starts up, it would create this thread, designate it a daemon, and start it. Because it’s a daemon, you don’t have to think about it; when your widget exits, the thread will stop automatically.


回答 2

其他张贴者提供了一些有关使用守护程序线程的情况的示例。但是,我的建议是永远不要使用它们。

不是因为它们没有用,而是因为如果使用它们,您会遇到一些不良的副作用。在Python运行时开始拆除主线程中的内容之后,守护程序线程仍然可以执行,从而导致一些非常奇怪的异常。

更多信息在这里:

https://joeshaw.org/python-daemon-threads-considered-harmful/

https://mail.python.org/pipermail/python-list/2005-February/343697.html

严格来说,您永远不需要它们,在某些情况下,它只是使实施更容易。

Other posters gave some examples for situations in which you’d use daemon threads. My recommendation, however, is never to use them.

It’s not because they’re not useful, but because there are some bad side effects you can experience if you use them. Daemon threads can still execute after the Python runtime starts tearing down things in the main thread, causing some pretty bizarre exceptions.

More info here:

https://joeshaw.org/python-daemon-threads-considered-harmful/

https://mail.python.org/pipermail/python-list/2005-February/343697.html

Strictly speaking you never need them, it just makes implementation easier in some cases.


回答 3

一种更简单的思考方式可能是:当main返回时,如果仍有非守护进程线程仍在运行,则您的进程将不会退出。

一点建议:涉及线程和同步时,干净关闭很容易出错-如果可以避免,请这样做。尽可能使用守护程序线程。

A simpler way to think about it, perhaps: when main returns, your process will not exit if there are non-daemon threads still running.

A bit of advice: Clean shutdown is easy to get wrong when threads and synchronization are involved – if you can avoid it, do so. Use daemon threads whenever possible.


回答 4

克里斯已经解释了什么是守护程序线程,所以让我们谈谈实际用法。许多线程池实现将守护程序线程用于任务工作者。工作者是从任务队列执行任务的线程。

工作人员需要无限期地等待任务队列中的任务,因为他们不知道何时出现新任务。分配任务的线程(例如主线程)仅知道任务何时结束。主线程等待任务队列变空,然后退出。如果worker是用户线程,即非守护程序,则程序不会终止。即使工人没有做任何有用的事情,它将继续等待这些无限期地运转的工人。标记工人守护程序线程,主线程将在处理完任务后立即杀死它们。

Chris already explained what daemon threads are, so let’s talk about practical usage. Many thread pool implementations use daemon threads for task workers. Workers are threads which execute tasks from task queue.

Worker needs to keep waiting for tasks in task queue indefinitely as they don’t know when new task will appear. Thread which assigns tasks (say main thread) only knows when tasks are over. Main thread waits on task queue to get empty and then exits. If workers are user threads i.e. non-daemon, program won’t terminate. It will keep waiting for these indefinitely running workers, even though workers aren’t doing anything useful. Mark workers daemon threads, and main thread will take care of killing them as soon as it’s done handling tasks.


回答 5

引用克里斯:“ …程序退出时,所有守护程序线程都会被自动杀死。”。我认为可以总结一下。使用它们时应小心,因为它们会在主程序执行完成时突然终止。

Quoting Chris: “… when your program quits, any daemon threads are killed automatically.”. I think that sums it up. You should be careful when you use them as they abruptly terminate when main program executes to completion.


回答 6

当您的第二个线程是非守护程序线程时,您的应用程序的主主线程无法退出,因为它的退出条件也与非守护程序线程的退出绑定在一起。无法在python中强制杀死线程,因此您的应用程序将必须真正等待非守护进程线程退出。如果这种行为不是您想要的,则将您的第二个线程设置为守护程序,这样它就不会阻止您的应用程序退出。

When your second thread is non-Daemon, your application’s primary main thread cannot quit because its exit criteria is being tied to the exit also of non-Daemon thread(s). Threads cannot be forcibly killed in python, therefore your app will have to really wait for the non-Daemon thread(s) to exit. If this behavior is not what you want, then set your second thread as daemon so that it won’t hold back your application from exiting.


函数调用超时

问题:函数调用超时

我正在Python中调用一个函数,该函数可能会停滞并迫使我重新启动脚本。

如何调用该函数或将其包装在其中,以便如果花费的时间超过5秒,脚本将取消该函数并执行其他操作?

I’m calling a function in Python which I know may stall and force me to restart the script.

How do I call the function or what do I wrap it in so that if it takes longer than 5 seconds the script cancels it and does something else?


回答 0

如果您在UNIX上运行,则可以使用信号包:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

调用后10秒钟,将调用alarm.alarm(10)处理程序。这引发了一个异常,您可以从常规Python代码中拦截该异常。

该模块不能很好地与线程配合使用(但是,谁可以呢?)

请注意,由于发生超时时会引发异常,因此它可能最终在函数内部被捕获并被忽略,例如一个这样的函数:

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue

You may use the signal package if you are running on UNIX:

In [1]: import signal

# Register an handler for the timeout
In [2]: def handler(signum, frame):
   ...:     print("Forever is over!")
   ...:     raise Exception("end of time")
   ...: 

# This function *may* run for an indetermined time...
In [3]: def loop_forever():
   ...:     import time
   ...:     while 1:
   ...:         print("sec")
   ...:         time.sleep(1)
   ...:         
   ...:         

# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0

# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0

In [6]: try:
   ...:     loop_forever()
   ...: except Exception, exc: 
   ...:     print(exc)
   ....: 
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time

# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0

10 seconds after the call signal.alarm(10), the handler is called. This raises an exception that you can intercept from the regular Python code.

This module doesn’t play well with threads (but then, who does?)

Note that since we raise an exception when timeout happens, it may end up caught and ignored inside the function, for example of one such function:

def loop_forever():
    while 1:
        print('sec')
        try:
            time.sleep(10)
        except:
            continue

回答 1

您可以multiprocessing.Process用来精确地做到这一点。

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate
        p.terminate()
        p.join()

You can use multiprocessing.Process to do exactly that.

Code

import multiprocessing
import time

# bar
def bar():
    for i in range(100):
        print "Tick"
        time.sleep(1)

if __name__ == '__main__':
    # Start bar as a process
    p = multiprocessing.Process(target=bar)
    p.start()

    # Wait for 10 seconds or until process finishes
    p.join(10)

    # If thread is still active
    if p.is_alive():
        print "running... let's kill it..."

        # Terminate - may not work if process is stuck for good
        p.terminate()
        # OR Kill - will work for sure, no chance for process to finish nicely however
        # p.kill()

        p.join()

回答 2

如何调用该函数或将其包装起来,以便如果花费的时间超过5秒钟,脚本将取消该函数?

我发布了要点,用装饰器和来解决此问题threading.Timer。这是一个细分。

导入和设置以实现兼容性

它已经通过Python 2和3进行了测试。它也应该在Unix / Linux和Windows下运行。

首先是进口。这些尝试使代码保持一致,而不管Python版本如何:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

使用版本无关代码:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

现在,我们已经从标准库中导入了我们的功能。

exit_after 装饰工

接下来,我们需要一个函数来终止main()子线程:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

这是装饰器本身:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

用法

这是直接回答您有关5秒后退出的问题的用法!:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

演示:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

第二个函数调用将不会结束,而是该过程应退出并回溯!

KeyboardInterrupt 并不总是停止休眠线程

请注意,在Windows上的Python 2上,睡眠不会总是被键盘中断打断,例如:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

除非它明确检查PyErr_CheckSignals(),否则也不太可能中断在扩展程序中运行的代码,请参见 Cython,Python和KeyboardInterrupt被忽略

在任何情况下,我都避免将线程休眠超过一秒钟-这是处理器时间的永恒。

如何调用该函数或将其包装在其中,以便如果花费的时间超过5秒,脚本将取消该函数并执行其他操作?

要捕获它并执行其他操作,可以捕获KeyboardInterrupt。

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

How do I call the function or what do I wrap it in so that if it takes longer than 5 seconds the script cancels it?

I posted a gist that solves this question/problem with a decorator and a threading.Timer. Here it is with a breakdown.

Imports and setups for compatibility

It was tested with Python 2 and 3. It should also work under Unix/Linux and Windows.

First the imports. These attempt to keep the code consistent regardless of the Python version:

from __future__ import print_function
import sys
import threading
from time import sleep
try:
    import thread
except ImportError:
    import _thread as thread

Use version independent code:

try:
    range, _print = xrange, print
    def print(*args, **kwargs): 
        flush = kwargs.pop('flush', False)
        _print(*args, **kwargs)
        if flush:
            kwargs.get('file', sys.stdout).flush()            
except NameError:
    pass

Now we have imported our functionality from the standard library.

exit_after decorator

Next we need a function to terminate the main() from the child thread:

def quit_function(fn_name):
    # print to stderr, unbuffered in Python 2.
    print('{0} took too long'.format(fn_name), file=sys.stderr)
    sys.stderr.flush() # Python 3 stderr is likely buffered.
    thread.interrupt_main() # raises KeyboardInterrupt

And here is the decorator itself:

def exit_after(s):
    '''
    use as decorator to exit process if 
    function takes longer than s seconds
    '''
    def outer(fn):
        def inner(*args, **kwargs):
            timer = threading.Timer(s, quit_function, args=[fn.__name__])
            timer.start()
            try:
                result = fn(*args, **kwargs)
            finally:
                timer.cancel()
            return result
        return inner
    return outer

Usage

And here’s the usage that directly answers your question about exiting after 5 seconds!:

@exit_after(5)
def countdown(n):
    print('countdown started', flush=True)
    for i in range(n, -1, -1):
        print(i, end=', ', flush=True)
        sleep(1)
    print('countdown finished')

Demo:

>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 6, in countdown
KeyboardInterrupt

The second function call will not finish, instead the process should exit with a traceback!

KeyboardInterrupt does not always stop a sleeping thread

Note that sleep will not always be interrupted by a keyboard interrupt, on Python 2 on Windows, e.g.:

@exit_after(1)
def sleep10():
    sleep(10)
    print('slept 10 seconds')

>>> sleep10()
sleep10 took too long         # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 11, in inner
  File "<stdin>", line 3, in sleep10
KeyboardInterrupt

nor is it likely to interrupt code running in extensions unless it explicitly checks for PyErr_CheckSignals(), see Cython, Python and KeyboardInterrupt ignored

I would avoid sleeping a thread more than a second, in any case – that’s an eon in processor time.

How do I call the function or what do I wrap it in so that if it takes longer than 5 seconds the script cancels it and does something else?

To catch it and do something else, you can catch the KeyboardInterrupt.

>>> try:
...     countdown(10)
... except KeyboardInterrupt:
...     print('do something else')
... 
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else

回答 3

我有一个不同的建议,它是一个纯函数(具有与线程建议相同的API),并且似乎可以正常工作(基于此线程的建议)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

I have a different proposal which is a pure function (with the same API as the threading suggestion) and seems to work fine (based on suggestions on this thread)

def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
    import signal

    class TimeoutError(Exception):
        pass

    def handler(signum, frame):
        raise TimeoutError()

    # set the timeout handler
    signal.signal(signal.SIGALRM, handler) 
    signal.alarm(timeout_duration)
    try:
        result = func(*args, **kwargs)
    except TimeoutError as exc:
        result = default
    finally:
        signal.alarm(0)

    return result

回答 4

在搜索单元测试的超时调用时,我遇到了这个线程。我没有在答案或第三方软件包中找到任何简单的东西,因此我在下面编写了装饰器,您可以直接进入代码:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

然后,使测试或您喜欢的任何功能超时就这么简单:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

I ran across this thread when searching for a timeout call on unit tests. I didn’t find anything simple in the answers or 3rd party packages so I wrote the decorator below you can drop right into code:

import multiprocessing.pool
import functools

def timeout(max_timeout):
    """Timeout decorator, parameter in seconds."""
    def timeout_decorator(item):
        """Wrap the original function."""
        @functools.wraps(item)
        def func_wrapper(*args, **kwargs):
            """Closure for function."""
            pool = multiprocessing.pool.ThreadPool(processes=1)
            async_result = pool.apply_async(item, args, kwargs)
            # raises a TimeoutError if execution exceeds max_timeout
            return async_result.get(max_timeout)
        return func_wrapper
    return timeout_decorator

Then it’s as simple as this to timeout a test or any function you like:

@timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
    ...

回答 5

stopit在pypi上找到软件包似乎可以很好地处理超时问题。

我喜欢@stopit.threading_timeoutable装饰器,它增加了一个timeout向装饰的函数参数,该参数完成了您所期望的操作,从而停止了该函数。

在pypi上查看:https ://pypi.python.org/pypi/stopit

The stopit package, found on pypi, seems to handle timeouts well.

I like the @stopit.threading_timeoutable decorator, which adds a timeout parameter to the decorated function, which does what you expect, it stops the function.

Check it out on pypi: https://pypi.python.org/pypi/stopit


回答 6

有很多建议,但没有一个建议使用current.futures,我认为这是处理此问题的最清晰的方法。

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

超级简单的阅读和维护。

我们创建一个池,提交一个进程,然后等待最多5秒钟,然后引发一个TimeoutError,您可以根据需要捕获并处理它。

原生于python 3.2+,并反向移植到2.7(点安装期货)。

线程和进程之间的切换很简单,只要更换ProcessPoolExecutorThreadPoolExecutor

如果您想在超时时终止进程,建议您调查Pebble

There are a lot of suggestions, but none using concurrent.futures, which I think is the most legible way to handle this.

from concurrent.futures import ProcessPoolExecutor

# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
    with ProcessPoolExecutor() as p:
        f = p.submit(fnc, *args, **kwargs)
        return f.result(timeout=5)

Super simple to read and maintain.

We make a pool, submit a single process and then wait up to 5 seconds before raising a TimeoutError that you could catch and handle however you needed.

Native to python 3.2+ and backported to 2.7 (pip install futures).

Switching between threads and processes is as simple as replacing ProcessPoolExecutor with ThreadPoolExecutor.

If you want to terminate the Process on timeout I would suggest looking into Pebble.


回答 7

出色,易于使用且可靠的PyPi项目超时装饰器https://pypi.org/project/timeout-decorator/

安装方式

pip install timeout-decorator

用法

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()

Great, easy to use and reliable PyPi project timeout-decorator (https://pypi.org/project/timeout-decorator/)

installation:

pip install timeout-decorator

Usage:

import time
import timeout_decorator

@timeout_decorator.timeout(5)
def mytest():
    print "Start"
    for i in range(1,10):
        time.sleep(1)
        print "%d seconds have passed" % i

if __name__ == '__main__':
    mytest()

回答 8

我是wrapt_timeout_decorator的作者

乍一看,这里介绍的大多数解决方案在Linux上都无法正常工作-因为我们有fork()和signal()-但是在Windows上,情况看起来有些不同。当涉及到Linux上的子线程时,您将无法再使用Signals。

为了在Windows下产生一个进程,它必须是可挑选的-许多修饰函数或Class方法不是。

因此,您需要使用更好的Pickler,如莳萝和多进程(而不是Pickle和多进程)-这就是为什么您不能使用ProcessPoolExecutor(或仅在功能有限的情况下)的原因。

对于超时本身-您需要定义超时的含义-因为在Windows上将花费大量(且不确定)的时间来生成该进程。如果超时时间短,这可能会很棘手。让我们假设,生成该过程大约需要0.5秒(很容易!!!)。如果您给出0.2秒的超时时间,应该怎么办?函数是否应在0.5 + 0.2秒后超时(让方法运行0.2秒)?还是被调用的进程应该在0.2秒后超时(在这种情况下,修饰的函数将始终超时,因为在那个时间内它甚至没有生成)?

嵌套的装饰器也很讨厌,您不能在子线程中使用Signals。如果要创建真正的通用跨平台装饰器,则需要考虑所有这些因素(并进行测试)。

其他问题会将异常传递回调用者以及日志记录问题(如果在修饰的函数中使用-不支持在另一个进程中记录文件)

我试图涵盖所有的极端情况,您可能会考虑wrapt_timeout_decorator包,或者至少测试那里使用的单元测试启发的自己的解决方案。

@Alexis Eggermont-很遗憾,我没有足够的意见要发表-也许其他人可以通知您-我想我已经解决了您的导入问题。

I am the author of wrapt_timeout_decorator

Most of the solutions presented here work wunderfully under Linux on the first glance – because we have fork() and signals() – but on windows the things look a bit different. And when it comes to subthreads on Linux, You cant use Signals anymore.

In order to spawn a process under Windows, it needs to be picklable – and many decorated functions or Class methods are not.

So You need to use a better pickler like dill and multiprocess (not pickle and multiprocessing) – thats why You cant use ProcessPoolExecutor (or only with limited functionality).

For the timeout itself – You need to define what timeout means – because on Windows it will take considerable (and not determinable) time to spawn the process. This can be tricky on short timeouts. Lets assume, spawning the process takes about 0.5 seconds (easily !!!). If You give a timeout of 0.2 seconds what should happen ? Should the function time out after 0.5 + 0.2 seconds (so let the method run for 0.2 seconds)? Or should the called process time out after 0.2 seconds (in that case, the decorated function will ALWAYS timeout, because in that time it is not even spawned) ?

Also nested decorators can be nasty and You cant use Signals in a subthread. If You want to create a truly universal, cross-platform decorator, all this needs to be taken into consideration (and tested).

Other issues are passing exceptions back to the caller, as well as logging issues (if used in the decorated function – logging to files in another process is NOT supported)

I tried to cover all edge cases, You might look into the package wrapt_timeout_decorator, or at least test Your own solutions inspired by the unittests used there.

@Alexis Eggermont – unfortunately I dont have enough points to comment – maybe someone else can notify You – I think I solved Your import issue.


回答 9

timeout-decorator不能在Windows系统上正常运行,因为Windows不能signal很好地支持。

如果您在Windows系统中使用timeout-decorator,则会得到以下信息

AttributeError: module 'signal' has no attribute 'SIGALRM'

有些人建议使用use_signals=False但对我没有用。

作者@bitranox创建了以下软件包:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

代码样例:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

给出以下异常:

TimeoutError: Function mytest timed out after 5 seconds

timeout-decorator don’t work on windows system as , windows didn’t support signal well.

If you use timeout-decorator in windows system you will get the following

AttributeError: module 'signal' has no attribute 'SIGALRM'

Some suggested to use use_signals=False but didn’t worked for me.

Author @bitranox created the following package:

pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip

Code Sample:

import time
from wrapt_timeout_decorator import *

@timeout(5)
def mytest(message):
    print(message)
    for i in range(1,10):
        time.sleep(1)
        print('{} seconds have passed'.format(i))

def main():
    mytest('starting')


if __name__ == '__main__':
    main()

Gives the following exception:

TimeoutError: Function mytest timed out after 5 seconds

回答 10

我们可以使用相同的信号。我认为以下示例对您有用。与线程相比,它非常简单。

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"

We can use signals for the same. I think the below example will be useful for you. It is very simple compared to threads.

import signal

def timeout(signum, frame):
    raise myException

#this is an infinite loop, never ending under normal circumstances
def main():
    print 'Starting Main ',
    while 1:
        print 'in main ',

#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)

#change 5 to however many seconds you need
signal.alarm(5)

try:
    main()
except myException:
    print "whoops"

回答 11

#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)

回答 12

我需要不会被time.sleep(基于线程的方法无法做到)阻止的可嵌套定时中断(SIGALARM无法做到)。我最终从这里复制并修改了代码:http : //code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

代码本身:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

和用法示例:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'

I had a need for nestable timed interrupts (which SIGALARM can’t do) that won’t get blocked by time.sleep (which the thread-based approach can’t do). I ended up copying and lightly modifying code from here: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

The code itself:

#!/usr/bin/python

# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/


"""alarm.py: Permits multiple SIGALRM events to be queued.

Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""

import heapq
import signal
from time import time

__version__ = '$Revision: 2539 $'.split()[1]

alarmlist = []

__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))


class TimeoutError(Exception):
    def __init__(self, message, id_=None):
        self.message = message
        self.id_ = id_


class Timeout:
    ''' id_ allows for nested timeouts. '''
    def __init__(self, id_=None, seconds=1, error_message='Timeout'):
        self.seconds = seconds
        self.error_message = error_message
        self.id_ = id_
    def handle_timeout(self):
        raise TimeoutError(self.error_message, self.id_)
    def __enter__(self):
        self.this_alarm = alarm(self.seconds, self.handle_timeout)
    def __exit__(self, type, value, traceback):
        try:
            cancel(self.this_alarm) 
        except ValueError:
            pass


def __clear_alarm():
    """Clear an existing alarm.

    If the alarm signal was set to a callable other than our own, queue the
    previous alarm settings.
    """
    oldsec = signal.alarm(0)
    oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
    if oldsec > 0 and oldfunc != __alarm_handler:
        heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))


def __alarm_handler(*zargs):
    """Handle an alarm by calling any due heap entries and resetting the alarm.

    Note that multiple heap entries might get called, especially if calling an
    entry takes a lot of time.
    """
    try:
        nextt = __next_alarm()
        while nextt is not None and nextt <= 0:
            (tm, func, args, keys) = heapq.heappop(alarmlist)
            func(*args, **keys)
            nextt = __next_alarm()
    finally:
        if alarmlist: __set_alarm()


def alarm(sec, func, *args, **keys):
    """Set an alarm.

    When the alarm is raised in `sec` seconds, the handler will call `func`,
    passing `args` and `keys`. Return the heap entry (which is just a big
    tuple), so that it can be cancelled by calling `cancel()`.
    """
    __clear_alarm()
    try:
        newalarm = __new_alarm(sec, func, args, keys)
        heapq.heappush(alarmlist, newalarm)
        return newalarm
    finally:
        __set_alarm()


def cancel(alarm):
    """Cancel an alarm by passing the heap entry returned by `alarm()`.

    It is an error to try to cancel an alarm which has already occurred.
    """
    __clear_alarm()
    try:
        alarmlist.remove(alarm)
        heapq.heapify(alarmlist)
    finally:
        if alarmlist: __set_alarm()

and a usage example:

import alarm
from time import sleep

try:
    with alarm.Timeout(id_='a', seconds=5):
        try:
            with alarm.Timeout(id_='b', seconds=2):
                sleep(3)
        except alarm.TimeoutError as e:
            print 'raised', e.id_
        sleep(30)
except alarm.TimeoutError as e:
    print 'raised', e.id_
else:
    print 'nope.'

回答 13

这是对给定的基于线程的解决方案的一点改进。

下面的代码支持异常

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

在5秒钟的超时时间内调用它:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)

Here is a slight improvement to the given thread-based solution.

The code below supports exceptions:

def runFunctionCatchExceptions(func, *args, **kwargs):
    try:
        result = func(*args, **kwargs)
    except Exception, message:
        return ["exception", message]

    return ["RESULT", result]


def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
    import threading
    class InterruptableThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
            self.result = default
        def run(self):
            self.result = runFunctionCatchExceptions(func, *args, **kwargs)
    it = InterruptableThread()
    it.start()
    it.join(timeout_duration)
    if it.isAlive():
        return default

    if it.result[0] == "exception":
        raise it.result[1]

    return it.result[1]

Invoking it with a 5 second timeout:

result = timeout(remote_calculate, (myarg,), timeout_duration=5)

如何在Python中使用线程?

问题:如何在Python中使用线程?

我试图了解Python中的线程。我看过文档和示例,但坦率地说,许多示例过于复杂,我难以理解它们。

您如何清楚地显示为多线程而划分的任务?

I am trying to understand threading in Python. I’ve looked at the documentation and examples, but quite frankly, many examples are overly sophisticated and I’m having trouble understanding them.

How do you clearly show tasks being divided for multi-threading?


回答 0

自2010年提出这个问题以来,如何使用带有mappool的 Python进行简单的多线程处理已经有了真正的简化。

下面的代码来自于一篇文章/博客文章,您绝对应该检出(没有从属关系)- 并行显示在一行中:更好的日常线程任务模型。我将在下面进行总结-最终仅是几行代码:

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)

这是以下内容的多线程版本:

results = []
for item in my_array:
    results.append(my_function(item))

描述

Map是一个很棒的小功能,是轻松将并行性注入Python代码的关键。对于那些不熟悉的人来说,地图是从Lisp之类的功能语言中提炼出来的。它是将另一个功能映射到序列上的功能。

Map为我们处理序列上的迭代,应用函数,并将所有结果存储在最后的方便列表中。


实作

map函数的并行版本由以下两个库提供:multiprocessing,以及鲜为人知但同样出色的继子child:multiprocessing.dummy。

multiprocessing.dummy与多处理模块完全相同,但是使用线程代替一个重要的区别 -使用多个进程来执行CPU密集型任务;用于I / O的线程)。

multiprocessing.dummy复制了多处理的API,但仅不过是线程模块的包装器。

import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls = [
  'http://www.python.org',
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# Make the Pool of workers
pool = ThreadPool(4)

# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# Close the pool and wait for the work to finish
pool.close()
pool.join()

以及计时结果:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

传递多个参数仅在Python 3.3和更高版本中才这样):

要传递多个数组:

results = pool.starmap(function, zip(list_a, list_b))

或传递一个常量和一个数组:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

如果您使用的是Python的早期版本,则可以通过此变通方法()传递多个参数。

(感谢user136036的有用评论。)

Since this question was asked in 2010, there has been real simplification in how to do simple multithreading with Python with map and pool.

The code below comes from an article/blog post that you should definitely check out (no affiliation) – Parallelism in one line: A Better Model for Day to Day Threading Tasks. I’ll summarize below – it ends up being just a few lines of code:

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)

Which is the multithreaded version of:

results = []
for item in my_array:
    results.append(my_function(item))

Description

Map is a cool little function, and the key to easily injecting parallelism into your Python code. For those unfamiliar, map is something lifted from functional languages like Lisp. It is a function which maps another function over a sequence.

Map handles the iteration over the sequence for us, applies the function, and stores all of the results in a handy list at the end.


Implementation

Parallel versions of the map function are provided by two libraries:multiprocessing, and also its little known, but equally fantastic step child:multiprocessing.dummy.

multiprocessing.dummy is exactly the same as multiprocessing module, but uses threads instead (an important distinction – use multiple processes for CPU-intensive tasks; threads for (and during) I/O):

multiprocessing.dummy replicates the API of multiprocessing, but is no more than a wrapper around the threading module.

import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls = [
  'http://www.python.org',
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# Make the Pool of workers
pool = ThreadPool(4)

# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# Close the pool and wait for the work to finish
pool.close()
pool.join()

And the timing results:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

Passing multiple arguments (works like this only in Python 3.3 and later):

To pass multiple arrays:

results = pool.starmap(function, zip(list_a, list_b))

Or to pass a constant and an array:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

If you are using an earlier version of Python, you can pass multiple arguments via this workaround).

(Thanks to user136036 for the helpful comment.)


回答 1

这是一个简单的示例:您需要尝试一些备用URL并返回第一个URL的内容以进行响应。

import Queue
import threading
import urllib2

# Called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

在这种情况下,线程被用作简单的优化:每个子线程都在等待URL解析和响应,以便将其内容放入队列中。每个线程都是一个守护进程(如果主线程结束,则不会使进程继续运行-这比不常见);主线程启动所有子线程,get在队列中执行a ,以等待直到其中一个完成a put,然后发出结果并终止(由于它们是守护线程,因此将取消可能仍在运行的所有子线程)。

正确使用Python中的线程总是会与I / O操作相关联(因为CPython无论如何都不会使用多个内核来运行受CPU约束的任务,因此,线程的唯一原因是在等待某些I / O时不会阻塞进程)。顺便说一句,队列几乎总是将工作分配到线程和/或收集工作结果的最佳方法,并且它们本质上是线程安全的,因此它们使您不必担心锁,条件,事件,信号量以及其他相互之间的关系。线程协调/通信概念。

Here’s a simple example: you need to try a few alternative URLs and return the contents of the first one to respond.

import Queue
import threading
import urllib2

# Called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

This is a case where threading is used as a simple optimization: each subthread is waiting for a URL to resolve and respond, in order to put its contents on the queue; each thread is a daemon (won’t keep the process up if main thread ends — that’s more common than not); the main thread starts all subthreads, does a get on the queue to wait until one of them has done a put, then emits the results and terminates (which takes down any subthreads that might still be running, since they’re daemon threads).

Proper use of threads in Python is invariably connected to I/O operations (since CPython doesn’t use multiple cores to run CPU-bound tasks anyway, the only reason for threading is not blocking the process while there’s a wait for some I/O). Queues are almost invariably the best way to farm out work to threads and/or collect the work’s results, by the way, and they’re intrinsically threadsafe, so they save you from worrying about locks, conditions, events, semaphores, and other inter-thread coordination/communication concepts.


回答 2

注意:对于Python中的实际并行化,您应该使用多处理模块来分叉多个并行执行的进程(由于全局解释器锁,Python线程提供了交织,但实际上它们是串行执行的,而不是并行执行的,并且仅仅是在交错I / O操作时很有用)。

但是,如果您只是在寻找交织(或者正在进行尽管可以使用全局解释器锁而可以并行化的I / O操作),那么就可以从线程模块开始。作为一个非常简单的示例,让我们考虑通过并行求和子范围来求和一个大范围的问题:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

请注意,以上示例是一个非常愚蠢的示例,因为它完全不执行任何I / O操作,并且由于全局解释器锁定,尽管在CPython中是交错执行的(带有上下文切换的额外开销),但仍将串行执行。

NOTE: For actual parallelization in Python, you should use the multiprocessing module to fork multiple processes that execute in parallel (due to the global interpreter lock, Python threads provide interleaving, but they are in fact executed serially, not in parallel, and are only useful when interleaving I/O operations).

However, if you are merely looking for interleaving (or are doing I/O operations that can be parallelized despite the global interpreter lock), then the threading module is the place to start. As a really simple example, let’s consider the problem of summing a large range by summing subranges in parallel:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

Note that the above is a very stupid example, as it does absolutely no I/O and will be executed serially albeit interleaved (with the added overhead of context switching) in CPython due to the global interpreter lock.


回答 3

像其他提到的一样,由于GIL,CPython只能将线程用于I / O等待。

如果您想从多个内核中受益于CPU绑定任务,请使用multiprocessing

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Like others mentioned, CPython can use threads only for I/O waits due to GIL.

If you want to benefit from multiple cores for CPU-bound tasks, use multiprocessing:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

回答 4

仅需注意:线程不需要队列。

这是我能想到的最简单的示例,其中显示了10个进程同时运行。

import threading
from random import randint
from time import sleep


def print_number(number):

    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):

    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"

Just a note: A queue is not required for threading.

This is the simplest example I could imagine that shows 10 processes running concurrently.

import threading
from random import randint
from time import sleep


def print_number(number):

    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):

    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"

回答 5

Alex Martelli的回答对我有所帮助。但是,这是我认为更有用的修改版本(至少对我而言)。

更新:在Python 2和Python 3中均可使用

try:
    # For Python 3
    import queue
    from urllib.request import urlopen
except:
    # For Python 2 
    import Queue as queue
    from urllib2 import urlopen

import threading

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

# Load up a queue with your data. This will handle locking
q = queue.Queue()
for url in worker_data:
    q.put(url)

# Define a worker function
def worker(url_queue):
    queue_full = True
    while queue_full:
        try:
            # Get your data off the queue, and do some work
            url = url_queue.get(False)
            data = urlopen(url).read()
            print(len(data))

        except queue.Empty:
            queue_full = False

# Create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()

The answer from Alex Martelli helped me. However, here is a modified version that I thought was more useful (at least to me).

Updated: works in both Python 2 and Python 3

try:
    # For Python 3
    import queue
    from urllib.request import urlopen
except:
    # For Python 2 
    import Queue as queue
    from urllib2 import urlopen

import threading

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

# Load up a queue with your data. This will handle locking
q = queue.Queue()
for url in worker_data:
    q.put(url)

# Define a worker function
def worker(url_queue):
    queue_full = True
    while queue_full:
        try:
            # Get your data off the queue, and do some work
            url = url_queue.get(False)
            data = urlopen(url).read()
            print(len(data))

        except queue.Empty:
            queue_full = False

# Create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()

回答 6

给定一个函数,将f其像这样进行线程化:

import threading
threading.Thread(target=f).start()

将参数传递给 f

threading.Thread(target=f, args=(a,b,c)).start()

Given a function, f, thread it like this:

import threading
threading.Thread(target=f).start()

To pass arguments to f

threading.Thread(target=f, args=(a,b,c)).start()

回答 7

我发现这非常有用:创建与内核一样多的线程,并让它们执行(大量)任务(在这种情况下,调用Shell程序):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): # Put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        # Execute a task: call a shell program and wait until it completes
        subprocess.call("echo " + str(item), shell=True)
        q.task_done()

cpus = multiprocessing.cpu_count() # Detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() # Block until all tasks are done

I found this very useful: create as many threads as cores and let them execute a (large) number of tasks (in this case, calling a shell program):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): # Put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        # Execute a task: call a shell program and wait until it completes
        subprocess.call("echo " + str(item), shell=True)
        q.task_done()

cpus = multiprocessing.cpu_count() # Detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() # Block until all tasks are done

回答 8

Python 3具有启动并行任务的功能。这使我们的工作更加轻松。

它具有线程池进程池

以下提供了一个见解:

ThreadPoolExecutor示例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Python 3 has the facility of launching parallel tasks. This makes our work easier.

It has thread pooling and process pooling.

The following gives an insight:

ThreadPoolExecutor Example (source)

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor (source)

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

回答 9

使用新的并发模块

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

对于所有以前接触过Java的人来说,执行者方法似乎都很熟悉。

另外请注意:为了使Universe保持理智,如果您不使用with上下文,请不要忘记关闭池/执行器(它非常强大,它可以为您完成此工作)

Using the blazing new concurrent.futures module

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

The executor approach might seem familiar to all those who have gotten their hands dirty with Java before.

Also on a side note: To keep the universe sane, don’t forget to close your pools/executors if you don’t use with context (which is so awesome that it does it for you)


回答 10

对我而言,线程的完美示例是监视异步事件。看这段代码。

# thread_test.py
import threading
import time

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

您可以通过打开IPython会话并执行以下操作来处理此代码:

>>> from thread_test import Monitor
>>> a = [0]
>>> mon = Monitor(a)
>>> mon.start()
>>> a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

等一下

>>> a[0] = 2
Mon = 2

For me, the perfect example for threading is monitoring asynchronous events. Look at this code.

# thread_test.py
import threading
import time

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

You can play with this code by opening an IPython session and doing something like:

>>> from thread_test import Monitor
>>> a = [0]
>>> mon = Monitor(a)
>>> mon.start()
>>> a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

Wait a few minutes

>>> a[0] = 2
Mon = 2

回答 11

大多数文档和教程都使用Python ThreadingQueue模块,对于初学者来说,它们似乎不胜枚举。

也许考虑使用concurrent.futures.ThreadPoolExecutorPython 3 的模块。

结合with子句和列表理解,这可能是一个真正的魅力。

from concurrent.futures import ThreadPoolExecutor, as_completed

def get_url(url):
    # Your actual program here. Using threading.Lock() if necessary
    return ""

# List of URLs to fetch
urls = ["url1", "url2"]

with ThreadPoolExecutor(max_workers = 5) as executor:

    # Create threads
    futures = {executor.submit(get_url, url) for url in urls}

    # as_completed() gives you the threads once finished
    for f in as_completed(futures):
        # Get the results
        rs = f.result()

Most documentation and tutorials use Python’s Threading and Queue module, and they could seem overwhelming for beginners.

Perhaps consider the concurrent.futures.ThreadPoolExecutor module of Python 3.

Combined with with clause and list comprehension it could be a real charm.

from concurrent.futures import ThreadPoolExecutor, as_completed

def get_url(url):
    # Your actual program here. Using threading.Lock() if necessary
    return ""

# List of URLs to fetch
urls = ["url1", "url2"]

with ThreadPoolExecutor(max_workers = 5) as executor:

    # Create threads
    futures = {executor.submit(get_url, url) for url in urls}

    # as_completed() gives you the threads once finished
    for f in as_completed(futures):
        # Get the results
        rs = f.result()

回答 12

我在这里看到了很多没有执行任何实际工作的示例,这些示例主要是CPU约束的。这是一个CPU限制任务的示例,该任务计算1000万到10.05百万之间的所有素数。我在这里使用了所有四种方法:

import math
import timeit
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def time_stuff(fn):
    """
    Measure time of execution of a function
    """
    def wrapper(*args, **kwargs):
        t0 = timeit.default_timer()
        fn(*args, **kwargs)
        t1 = timeit.default_timer()
        print("{} seconds".format(t1 - t0))
    return wrapper

def find_primes_in(nmin, nmax):
    """
    Compute a list of prime numbers between the given minimum and maximum arguments
    """
    primes = []

    # Loop from minimum to maximum
    for current in range(nmin, nmax + 1):

        # Take the square root of the current number
        sqrt_n = int(math.sqrt(current))
        found = False

        # Check if the any number from 2 to the square root + 1 divides the current numnber under consideration
        for number in range(2, sqrt_n + 1):

            # If divisible we have found a factor, hence this is not a prime number, lets move to the next one
            if current % number == 0:
                found = True
                break

        # If not divisible, add this number to the list of primes that we have found so far
        if not found:
            primes.append(current)

    # I am merely printing the length of the array containing all the primes, but feel free to do what you want
    print(len(primes))

@time_stuff
def sequential_prime_finder(nmin, nmax):
    """
    Use the main process and main thread to compute everything in this case
    """
    find_primes_in(nmin, nmax)

@time_stuff
def threading_prime_finder(nmin, nmax):
    """
    If the minimum is 1000 and the maximum is 2000 and we have four workers,
    1000 - 1250 to worker 1
    1250 - 1500 to worker 2
    1500 - 1750 to worker 3
    1750 - 2000 to worker 4
    so let’s split the minimum and maximum values according to the number of workers
    """
    nrange = nmax - nmin
    threads = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)

        # Start the thread with the minimum and maximum split up to compute
        # Parallel computation will not work here due to the GIL since this is a CPU-bound task
        t = threading.Thread(target = find_primes_in, args = (start, end))
        threads.append(t)
        t.start()

    # Don’t forget to wait for the threads to finish
    for t in threads:
        t.join()

@time_stuff
def processing_prime_finder(nmin, nmax):
    """
    Split the minimum, maximum interval similar to the threading method above, but use processes this time
    """
    nrange = nmax - nmin
    processes = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)
        p = multiprocessing.Process(target = find_primes_in, args = (start, end))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

@time_stuff
def thread_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method, but use a thread pool executor this time.
    This method is slightly faster than using pure threading as the pools manage threads more efficiently.
    This method is still slow due to the GIL limitations since we are doing a CPU-bound task.
    """
    nrange = nmax - nmin
    with ThreadPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

@time_stuff
def process_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method, but use the process pool executor.
    This is the fastest method recorded so far as it manages process efficiently + overcomes GIL limitations.
    RECOMMENDED METHOD FOR CPU-BOUND TASKS
    """
    nrange = nmax - nmin
    with ProcessPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

def main():
    nmin = int(1e7)
    nmax = int(1.05e7)
    print("Sequential Prime Finder Starting")
    sequential_prime_finder(nmin, nmax)
    print("Threading Prime Finder Starting")
    threading_prime_finder(nmin, nmax)
    print("Processing Prime Finder Starting")
    processing_prime_finder(nmin, nmax)
    print("Thread Executor Prime Finder Starting")
    thread_executor_prime_finder(nmin, nmax)
    print("Process Executor Finder Starting")
    process_executor_prime_finder(nmin, nmax)

main()

这是我的Mac OS X四核计算机上的结果

Sequential Prime Finder Starting
9.708213827005238 seconds
Threading Prime Finder Starting
9.81836523200036 seconds
Processing Prime Finder Starting
3.2467174359990167 seconds
Thread Executor Prime Finder Starting
10.228896902000997 seconds
Process Executor Finder Starting
2.656402041000547 seconds

I saw a lot of examples here where no real work was being performed, and they were mostly CPU-bound. Here is an example of a CPU-bound task that computes all prime numbers between 10 million and 10.05 million. I have used all four methods here:

import math
import timeit
import threading
import multiprocessing
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


def time_stuff(fn):
    """
    Measure time of execution of a function
    """
    def wrapper(*args, **kwargs):
        t0 = timeit.default_timer()
        fn(*args, **kwargs)
        t1 = timeit.default_timer()
        print("{} seconds".format(t1 - t0))
    return wrapper

def find_primes_in(nmin, nmax):
    """
    Compute a list of prime numbers between the given minimum and maximum arguments
    """
    primes = []

    # Loop from minimum to maximum
    for current in range(nmin, nmax + 1):

        # Take the square root of the current number
        sqrt_n = int(math.sqrt(current))
        found = False

        # Check if the any number from 2 to the square root + 1 divides the current numnber under consideration
        for number in range(2, sqrt_n + 1):

            # If divisible we have found a factor, hence this is not a prime number, lets move to the next one
            if current % number == 0:
                found = True
                break

        # If not divisible, add this number to the list of primes that we have found so far
        if not found:
            primes.append(current)

    # I am merely printing the length of the array containing all the primes, but feel free to do what you want
    print(len(primes))

@time_stuff
def sequential_prime_finder(nmin, nmax):
    """
    Use the main process and main thread to compute everything in this case
    """
    find_primes_in(nmin, nmax)

@time_stuff
def threading_prime_finder(nmin, nmax):
    """
    If the minimum is 1000 and the maximum is 2000 and we have four workers,
    1000 - 1250 to worker 1
    1250 - 1500 to worker 2
    1500 - 1750 to worker 3
    1750 - 2000 to worker 4
    so let’s split the minimum and maximum values according to the number of workers
    """
    nrange = nmax - nmin
    threads = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)

        # Start the thread with the minimum and maximum split up to compute
        # Parallel computation will not work here due to the GIL since this is a CPU-bound task
        t = threading.Thread(target = find_primes_in, args = (start, end))
        threads.append(t)
        t.start()

    # Don’t forget to wait for the threads to finish
    for t in threads:
        t.join()

@time_stuff
def processing_prime_finder(nmin, nmax):
    """
    Split the minimum, maximum interval similar to the threading method above, but use processes this time
    """
    nrange = nmax - nmin
    processes = []
    for i in range(8):
        start = int(nmin + i * nrange/8)
        end = int(nmin + (i + 1) * nrange/8)
        p = multiprocessing.Process(target = find_primes_in, args = (start, end))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

@time_stuff
def thread_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method, but use a thread pool executor this time.
    This method is slightly faster than using pure threading as the pools manage threads more efficiently.
    This method is still slow due to the GIL limitations since we are doing a CPU-bound task.
    """
    nrange = nmax - nmin
    with ThreadPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

@time_stuff
def process_executor_prime_finder(nmin, nmax):
    """
    Split the min max interval similar to the threading method, but use the process pool executor.
    This is the fastest method recorded so far as it manages process efficiently + overcomes GIL limitations.
    RECOMMENDED METHOD FOR CPU-BOUND TASKS
    """
    nrange = nmax - nmin
    with ProcessPoolExecutor(max_workers = 8) as e:
        for i in range(8):
            start = int(nmin + i * nrange/8)
            end = int(nmin + (i + 1) * nrange/8)
            e.submit(find_primes_in, start, end)

def main():
    nmin = int(1e7)
    nmax = int(1.05e7)
    print("Sequential Prime Finder Starting")
    sequential_prime_finder(nmin, nmax)
    print("Threading Prime Finder Starting")
    threading_prime_finder(nmin, nmax)
    print("Processing Prime Finder Starting")
    processing_prime_finder(nmin, nmax)
    print("Thread Executor Prime Finder Starting")
    thread_executor_prime_finder(nmin, nmax)
    print("Process Executor Finder Starting")
    process_executor_prime_finder(nmin, nmax)

main()

Here are the results on my Mac OS X four-core machine

Sequential Prime Finder Starting
9.708213827005238 seconds
Threading Prime Finder Starting
9.81836523200036 seconds
Processing Prime Finder Starting
3.2467174359990167 seconds
Thread Executor Prime Finder Starting
10.228896902000997 seconds
Process Executor Finder Starting
2.656402041000547 seconds

回答 13

这是使用线程导入CSV的非常简单的示例。(图书馆收录的目的可能有所不同。)

辅助功能:

from threading import Thread
from project import app
import csv


def import_handler(csv_file_name):
    thr = Thread(target=dump_async_csv_data, args=[csv_file_name])
    thr.start()

def dump_async_csv_data(csv_file_name):
    with app.app_context():
        with open(csv_file_name) as File:
            reader = csv.DictReader(File)
            for row in reader:
                # DB operation/query

驱动功能:

import_handler(csv_file_name)

Here is the very simple example of CSV import using threading. (Library inclusion may differ for different purpose.)

Helper Functions:

from threading import Thread
from project import app
import csv


def import_handler(csv_file_name):
    thr = Thread(target=dump_async_csv_data, args=[csv_file_name])
    thr.start()

def dump_async_csv_data(csv_file_name):
    with app.app_context():
        with open(csv_file_name) as File:
            reader = csv.DictReader(File)
            for row in reader:
                # DB operation/query

Driver Function:

import_handler(csv_file_name)

回答 14

我想举一个简单的例子,当我不得不自己解决这个问题时,我发现这些解释很有用。

在此答案中,您将找到有关Python的GIL(全局解释器锁)的一些信息,以及使用multiprocessing.dummy和一些简单的基准编写的简单的日常示例。

全局翻译锁定(GIL)

Python不允许真正意义上的多线程。它具有一个多线程程序包,但是如果您想使用多线程来加快代码速度,那么使用它通常不是一个好主意。

Python具有称为全局解释器锁(GIL)的构造。GIL确保您的“线程”只能在任何一次执行。线程获取GIL,做一些工作,然后将GIL传递到下一个线程。

这发生得非常快,以至于人眼似乎您的线程正在并行执行,但是实际上它们只是使用相同的CPU内核轮流执行。

所有这些GIL传递都会增加执行开销。这意味着,如果您想使代码运行更快,那么使用线程包通常不是一个好主意。

有理由使用Python的线程包。如果您想同时运行某些东西,而效率不是问题,那么它就很好而且很方便。或者,如果您正在运行的代码需要等待某些东西(例如某些I / O),那么这很有意义。但是线程库不允许您使用额外的CPU内核。

多线程可以外包给操作系统(通过执行多处理),某些外部应用程序可以调用Python代码(例如SparkHadoop),也可以外包给Python代码调用的某些代码(例如:让您的Python代码调用C函数来执行昂贵的多线程任务)。

为什么如此重要

因为很多人在学习GIL是什么之前,会花费大量时间试图在他们喜欢的Python多线程代码中找到瓶颈。

清除此信息后,这是我的代码:

#!/bin/python
from multiprocessing.dummy import Pool
from subprocess import PIPE,Popen
import time
import os

# In the variable pool_size we define the "parallelness".
# For CPU-bound tasks, it doesn't make sense to create more Pool processes
# than you have cores to run them on.
#
# On the other hand, if you are using I/O-bound tasks, it may make sense
# to create a quite a few more Pool processes than cores, since the processes
# will probably spend most their time blocked (waiting for I/O to complete).
pool_size = 8

def do_ping(ip):
    if os.name == 'nt':
        print ("Using Windows Ping to " + ip)
        proc = Popen(['ping', ip], stdout=PIPE)
        return proc.communicate()[0]
    else:
        print ("Using Linux / Unix Ping to " + ip)
        proc = Popen(['ping', ip, '-c', '4'], stdout=PIPE)
        return proc.communicate()[0]


os.system('cls' if os.name=='nt' else 'clear')
print ("Running using threads\n")
start_time = time.time()
pool = Pool(pool_size)
website_names = ["www.google.com","www.facebook.com","www.pinterest.com","www.microsoft.com"]
result = {}
for website_name in website_names:
    result[website_name] = pool.apply_async(do_ping, args=(website_name,))
pool.close()
pool.join()
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Now we do the same without threading, just to compare time
print ("\nRunning NOT using threads\n")
start_time = time.time()
for website_name in website_names:
    do_ping(website_name)
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Here's one way to print the final output from the threads
output = {}
for key, value in result.items():
    output[key] = value.get()
print ("\nOutput aggregated in a Dictionary:")
print (output)
print ("\n")

print ("\nPretty printed output: ")
for key, value in output.items():
    print (key + "\n")
    print (value)

I would like to contribute with a simple example and the explanations I’ve found useful when I had to tackle this problem myself.

In this answer you will find some information about Python’s GIL (global interpreter lock) and a simple day-to-day example written using multiprocessing.dummy plus some simple benchmarks.

Global Interpreter Lock (GIL)

Python doesn’t allow multi-threading in the truest sense of the word. It has a multi-threading package, but if you want to multi-thread to speed your code up, then it’s usually not a good idea to use it.

Python has a construct called the global interpreter lock (GIL). The GIL makes sure that only one of your ‘threads’ can execute at any one time. A thread acquires the GIL, does a little work, then passes the GIL onto the next thread.

This happens very quickly so to the human eye it may seem like your threads are executing in parallel, but they are really just taking turns using the same CPU core.

All this GIL passing adds overhead to execution. This means that if you want to make your code run faster then using the threading package often isn’t a good idea.

There are reasons to use Python’s threading package. If you want to run some things simultaneously, and efficiency is not a concern, then it’s totally fine and convenient. Or if you are running code that needs to wait for something (like some I/O) then it could make a lot of sense. But the threading library won’t let you use extra CPU cores.

Multi-threading can be outsourced to the operating system (by doing multi-processing), and some external application that calls your Python code (for example, Spark or Hadoop), or some code that your Python code calls (for example: you could have your Python code call a C function that does the expensive multi-threaded stuff).

Why This Matters

Because lots of people spend a lot of time trying to find bottlenecks in their fancy Python multi-threaded code before they learn what the GIL is.

Once this information is clear, here’s my code:

#!/bin/python
from multiprocessing.dummy import Pool
from subprocess import PIPE,Popen
import time
import os

# In the variable pool_size we define the "parallelness".
# For CPU-bound tasks, it doesn't make sense to create more Pool processes
# than you have cores to run them on.
#
# On the other hand, if you are using I/O-bound tasks, it may make sense
# to create a quite a few more Pool processes than cores, since the processes
# will probably spend most their time blocked (waiting for I/O to complete).
pool_size = 8

def do_ping(ip):
    if os.name == 'nt':
        print ("Using Windows Ping to " + ip)
        proc = Popen(['ping', ip], stdout=PIPE)
        return proc.communicate()[0]
    else:
        print ("Using Linux / Unix Ping to " + ip)
        proc = Popen(['ping', ip, '-c', '4'], stdout=PIPE)
        return proc.communicate()[0]


os.system('cls' if os.name=='nt' else 'clear')
print ("Running using threads\n")
start_time = time.time()
pool = Pool(pool_size)
website_names = ["www.google.com","www.facebook.com","www.pinterest.com","www.microsoft.com"]
result = {}
for website_name in website_names:
    result[website_name] = pool.apply_async(do_ping, args=(website_name,))
pool.close()
pool.join()
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Now we do the same without threading, just to compare time
print ("\nRunning NOT using threads\n")
start_time = time.time()
for website_name in website_names:
    do_ping(website_name)
print ("\n--- Execution took {} seconds ---".format((time.time() - start_time)))

# Here's one way to print the final output from the threads
output = {}
for key, value in result.items():
    output[key] = value.get()
print ("\nOutput aggregated in a Dictionary:")
print (output)
print ("\n")

print ("\nPretty printed output: ")
for key, value in output.items():
    print (key + "\n")
    print (value)

回答 15

这是带有一个简单示例的多线程,将很有帮助。您可以运行它并轻松了解Python中多线程的工作方式。在以前的线程完成其工作之前,我使用了一个锁来防止访问其他线程。通过使用这一行代码,

tLock = threading.BoundedSemaphore(值= 4)

您可以一次允许多个进程,并保留其余线程,这些线程将在以后的进程或之前的进程完成后运行。

import threading
import time

#tLock = threading.Lock()
tLock = threading.BoundedSemaphore(value=4)
def timer(name, delay, repeat):
    print  "\r\nTimer: ", name, " Started"
    tLock.acquire()
    print "\r\n", name, " has the acquired the lock"
    while repeat > 0:
        time.sleep(delay)
        print "\r\n", name, ": ", str(time.ctime(time.time()))
        repeat -= 1

    print "\r\n", name, " is releaseing the lock"
    tLock.release()
    print "\r\nTimer: ", name, " Completed"

def Main():
    t1 = threading.Thread(target=timer, args=("Timer1", 2, 5))
    t2 = threading.Thread(target=timer, args=("Timer2", 3, 5))
    t3 = threading.Thread(target=timer, args=("Timer3", 4, 5))
    t4 = threading.Thread(target=timer, args=("Timer4", 5, 5))
    t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5))

    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()

    print "\r\nMain Complete"

if __name__ == "__main__":
    Main()

Here is multi threading with a simple example which will be helpful. You can run it and understand easily how multi threading is working in Python. I used a lock for preventing access to other threads until the previous threads finished their work. By the use of this line of code,

tLock = threading.BoundedSemaphore(value=4)

you can allow a number of processes at a time and keep hold to the rest of the threads which will run later or after finished previous processes.

import threading
import time

#tLock = threading.Lock()
tLock = threading.BoundedSemaphore(value=4)
def timer(name, delay, repeat):
    print  "\r\nTimer: ", name, " Started"
    tLock.acquire()
    print "\r\n", name, " has the acquired the lock"
    while repeat > 0:
        time.sleep(delay)
        print "\r\n", name, ": ", str(time.ctime(time.time()))
        repeat -= 1

    print "\r\n", name, " is releaseing the lock"
    tLock.release()
    print "\r\nTimer: ", name, " Completed"

def Main():
    t1 = threading.Thread(target=timer, args=("Timer1", 2, 5))
    t2 = threading.Thread(target=timer, args=("Timer2", 3, 5))
    t3 = threading.Thread(target=timer, args=("Timer3", 4, 5))
    t4 = threading.Thread(target=timer, args=("Timer4", 5, 5))
    t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5))

    t1.start()
    t2.start()
    t3.start()
    t4.start()
    t5.start()

    print "\r\nMain Complete"

if __name__ == "__main__":
    Main()

回答 16

通过从这篇文章中借用,我们知道在多线程,多处理和异步/ asyncio及其用法之间进行选择。

Python 3具有新的内置库以实现并发性和并行性:current.futures

因此,我将通过一个实验来演示如何通过以下方式运行四个任务(即.sleep()方法)Threading-Pool

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time

def concurrent(max_worker=1):
    futures = []

    tick = time()
    with ThreadPoolExecutor(max_workers=max_worker) as executor:
        futures.append(executor.submit(sleep, 2))  # Two seconds sleep
        futures.append(executor.submit(sleep, 1))
        futures.append(executor.submit(sleep, 7))
        futures.append(executor.submit(sleep, 3))

        for future in as_completed(futures):
            if future.result() is not None:
                print(future.result())

    print('Total elapsed time by {} workers:'.format(max_worker), time()-tick)

concurrent(5)
concurrent(4)
concurrent(3)
concurrent(2)
concurrent(1)

输出:

Total elapsed time by 5 workers: 7.007831811904907
Total elapsed time by 4 workers: 7.007944107055664
Total elapsed time by 3 workers: 7.003149509429932
Total elapsed time by 2 workers: 8.004627466201782
Total elapsed time by 1 workers: 13.013478994369507

[ 注意 ]:

  • 从以上结果中可以看出,最好的情况是这四个任务需要3个工作人员。
  • 如果您有流程任务而不是I / O绑定或阻止(multiprocessingthreading),则可以将更ThreadPoolExecutor改为ProcessPoolExecutor

With borrowing from this post we know about choosing between the multithreading, multiprocessing, and async/asyncio and their usage.

Python 3 has a new built-in library in order to concurrency and parallelism: concurrent.futures

So I’ll demonstrate through an experiment to run four tasks (i.e. .sleep() method) by Threading-Pool manner:

from concurrent.futures import ThreadPoolExecutor, as_completed
from time import sleep, time

def concurrent(max_worker=1):
    futures = []

    tick = time()
    with ThreadPoolExecutor(max_workers=max_worker) as executor:
        futures.append(executor.submit(sleep, 2))  # Two seconds sleep
        futures.append(executor.submit(sleep, 1))
        futures.append(executor.submit(sleep, 7))
        futures.append(executor.submit(sleep, 3))

        for future in as_completed(futures):
            if future.result() is not None:
                print(future.result())

    print('Total elapsed time by {} workers:'.format(max_worker), time()-tick)

concurrent(5)
concurrent(4)
concurrent(3)
concurrent(2)
concurrent(1)

Output:

Total elapsed time by 5 workers: 7.007831811904907
Total elapsed time by 4 workers: 7.007944107055664
Total elapsed time by 3 workers: 7.003149509429932
Total elapsed time by 2 workers: 8.004627466201782
Total elapsed time by 1 workers: 13.013478994369507

[NOTE]:

  • As you can see in the above results, the best case was 3 workers for those four tasks.
  • If you have a process task instead of I/O bound or blocking (multiprocessing vs threading) you could change the ThreadPoolExecutor to ProcessPoolExecutor.

回答 17

先前的解决方案均未在我的GNU / Linux服务器(我没有管理员权限)上实际使用多个内核。他们只是在一个核心上运行。

我使用了较低级别的os.fork界面来生成多个进程。这是对我有用的代码:

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break

None of the previous solutions actually used multiple cores on my GNU/Linux server (where I don’t have administrator rights). They just ran on a single core.

I used the lower level os.fork interface to spawn multiple processes. This is the code that worked for me:

from os import fork

values = ['different', 'values', 'for', 'threads']

for i in range(len(values)):
    p = fork()
    if p == 0:
        my_function(values[i])
        break

回答 18

import threading
import requests

def send():

  r = requests.get('https://www.stackoverlow.com')

thread = []
t = threading.Thread(target=send())
thread.append(t)
t.start()
import threading
import requests

def send():

  r = requests.get('https://www.stackoverlow.com')

thread = []
t = threading.Thread(target=send())
thread.append(t)
t.start()