标签归档:queue

多处理-管道与队列

问题:多处理-管道与队列

Python的多处理程序包中的队列和管道之间的根本区别是什么?

在什么情况下应该选择一种?什么时候使用比较有利Pipe()?什么时候使用比较有利Queue()

What are the fundamental differences between queues and pipes in Python’s multiprocessing package?

In what scenarios should one choose one over the other? When is it advantageous to use Pipe()? When is it advantageous to use Queue()?


回答 0

  • A Pipe()只能有两个端点。

  • 一个Queue()可以有多个生产者和消费者。

何时使用它们

如果您需要两个以上的交流点,请使用Queue()

如果您需要绝对的性能,那么a Pipe()会更快,因为Queue()它建立在之上Pipe()

绩效基准

假设您要生成两个进程并在它们之间尽快发送消息。这些是使用Pipe()和进行类似测试之间的拖动竞赛的计时结果Queue()。这是在运行Ubuntu 11.10和Python 2.7.2的ThinkpadT61上进行的。

仅供参考,我把成绩JoinableQueue()作为奖金;JoinableQueue()queue.task_done()调用时说明任务(它甚至不知道特定任务,它只计算队列中未完成的任务),以便queue.join()知道工作已完成。

每个答案底部的代码…

mpenning@mpenning-T61:~$ python multi_pipe.py 
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py 
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py 
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$

概括而言,Pipe()它的速度比速度快3倍Queue()JoinableQueue()除非您确实必须拥有这些好处,否则请不要考虑。

奖励材料2

除非您知道一些捷径,否则多处理会在信息流中引入微妙的变化,使调试变得困难。例如,在许多情况下,当您通过字典建立索引时,您的脚本可能运行良好,但是某些输入很少会失败。

通常,当整个python进程崩溃时,我们会获得有关失败的线索;但是,如果多处理功能崩溃,则不会在控制台上打印未经请求的崩溃回溯。很难找到未知的多处理崩溃,而又不知道导致进程崩溃的线索。

我发现跟踪多处理崩溃信息的最简单方法是将整个多处理功能包装在try/中except并使用traceback.print_exc()

import traceback
def run(self, args):
    try:
        # Insert stuff to be multiprocessed here
        return args[0]['that']
    except:
        print "FATAL: reader({0}) exited while multiprocessing".format(args) 
        traceback.print_exc()

现在,当您发现崩溃时,您会看到类似以下内容的信息:

FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
Traceback (most recent call last):
  File "foo.py", line 19, in __init__
    self.run(args)
  File "foo.py", line 46, in run
    KeyError: 'that'

源代码:


"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time

def reader_proc(pipe):
    ## Read from the pipe; this will be spawned as a separate Process
    p_output, p_input = pipe
    p_input.close()    # We are only reading
    while True:
        msg = p_output.recv()    # Read from the output pipe and do nothing
        if msg=='DONE':
            break

def writer(count, p_input):
    for ii in xrange(0, count):
        p_input.send(ii)             # Write 'count' numbers into the input pipe
    p_input.send('DONE')

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        # Pipes are unidirectional with two endpoints:  p_input ------> p_output
        p_output, p_input = Pipe()  # writer() writes to p_input from _this_ process
        reader_p = Process(target=reader_proc, args=((p_output, p_input),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process

        p_output.close()       # We no longer need this part of the Pipe()
        _start = time.time()
        writer(count, p_input) # Send a lot of stuff to reader_proc()
        p_input.close()
        reader_p.join()
        print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
            (time.time() - _start)))

"""
multi_queue.py
"""

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    pqueue = Queue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:             
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to Queue() took {1} seconds".format(count, 
            (time.time() - _start)))

"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        queue.task_done()

def writer(count, queue):
    for ii in xrange(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
        # reader_proc() reads from jqueue as a different process...
        reader_p = Process(target=reader_proc, args=((jqueue),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process
        _start = time.time()
        writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
        jqueue.join()         # Wait for the reader to finish
        print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count, 
            (time.time() - _start)))
  • A Pipe() can only have two endpoints.

  • A Queue() can have multiple producers and consumers.

When to use them

If you need more than two points to communicate, use a Queue().

If you need absolute performance, a Pipe() is much faster because Queue() is built on top of Pipe().

Performance Benchmarking

Let’s assume you want to spawn two processes and send messages between them as quickly as possible. These are the timing results of a drag race between similar tests using Pipe() and Queue()… This is on a ThinkpadT61 running Ubuntu 11.10, and Python 2.7.2.

FYI, I threw in results for JoinableQueue() as a bonus; JoinableQueue() accounts for tasks when queue.task_done() is called (it doesn’t even know about the specific task, it just counts unfinished tasks in the queue), so that queue.join() knows the work is finished.

The code for each at bottom of this answer…

mpenning@mpenning-T61:~$ python multi_pipe.py 
Sending 10000 numbers to Pipe() took 0.0369849205017 seconds
Sending 100000 numbers to Pipe() took 0.328398942947 seconds
Sending 1000000 numbers to Pipe() took 3.17266988754 seconds
mpenning@mpenning-T61:~$ python multi_queue.py 
Sending 10000 numbers to Queue() took 0.105256080627 seconds
Sending 100000 numbers to Queue() took 0.980564117432 seconds
Sending 1000000 numbers to Queue() took 10.1611330509 seconds
mpnening@mpenning-T61:~$ python multi_joinablequeue.py 
Sending 10000 numbers to JoinableQueue() took 0.172781944275 seconds
Sending 100000 numbers to JoinableQueue() took 1.5714070797 seconds
Sending 1000000 numbers to JoinableQueue() took 15.8527247906 seconds
mpenning@mpenning-T61:~$

In summary Pipe() is about three times faster than a Queue(). Don’t even think about the JoinableQueue() unless you really must have the benefits.

BONUS MATERIAL 2

Multiprocessing introduces subtle changes in information flow that make debugging hard unless you know some shortcuts. For instance, you might have a script that works fine when indexing through a dictionary in under many conditions, but infrequently fails with certain inputs.

Normally we get clues to the failure when the entire python process crashes; however, you don’t get unsolicited crash tracebacks printed to the console if the multiprocessing function crashes. Tracking down unknown multiprocessing crashes is hard without a clue to what crashed the process.

The simplest way I have found to track down multiprocessing crash informaiton is to wrap the entire multiprocessing function in a try / except and use traceback.print_exc():

import traceback
def run(self, args):
    try:
        # Insert stuff to be multiprocessed here
        return args[0]['that']
    except:
        print "FATAL: reader({0}) exited while multiprocessing".format(args) 
        traceback.print_exc()

Now, when you find a crash you see something like:

FATAL: reader([{'crash': 'this'}]) exited while multiprocessing
Traceback (most recent call last):
  File "foo.py", line 19, in __init__
    self.run(args)
  File "foo.py", line 46, in run
    KeyError: 'that'

Source Code:


"""
multi_pipe.py
"""
from multiprocessing import Process, Pipe
import time

def reader_proc(pipe):
    ## Read from the pipe; this will be spawned as a separate Process
    p_output, p_input = pipe
    p_input.close()    # We are only reading
    while True:
        msg = p_output.recv()    # Read from the output pipe and do nothing
        if msg=='DONE':
            break

def writer(count, p_input):
    for ii in xrange(0, count):
        p_input.send(ii)             # Write 'count' numbers into the input pipe
    p_input.send('DONE')

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        # Pipes are unidirectional with two endpoints:  p_input ------> p_output
        p_output, p_input = Pipe()  # writer() writes to p_input from _this_ process
        reader_p = Process(target=reader_proc, args=((p_output, p_input),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process

        p_output.close()       # We no longer need this part of the Pipe()
        _start = time.time()
        writer(count, p_input) # Send a lot of stuff to reader_proc()
        p_input.close()
        reader_p.join()
        print("Sending {0} numbers to Pipe() took {1} seconds".format(count,
            (time.time() - _start)))

"""
multi_queue.py
"""

from multiprocessing import Process, Queue
import time
import sys

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        if (msg == 'DONE'):
            break

def writer(count, queue):
    ## Write to the queue
    for ii in range(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue
    queue.put('DONE')

if __name__=='__main__':
    pqueue = Queue() # writer() writes to pqueue from _this_ process
    for count in [10**4, 10**5, 10**6]:             
        ### reader_proc() reads from pqueue as a separate process
        reader_p = Process(target=reader_proc, args=((pqueue),))
        reader_p.daemon = True
        reader_p.start()        # Launch reader_proc() as a separate python process

        _start = time.time()
        writer(count, pqueue)    # Send a lot of stuff to reader()
        reader_p.join()         # Wait for the reader to finish
        print("Sending {0} numbers to Queue() took {1} seconds".format(count, 
            (time.time() - _start)))

"""
multi_joinablequeue.py
"""
from multiprocessing import Process, JoinableQueue
import time

def reader_proc(queue):
    ## Read from the queue; this will be spawned as a separate Process
    while True:
        msg = queue.get()         # Read from the queue and do nothing
        queue.task_done()

def writer(count, queue):
    for ii in xrange(0, count):
        queue.put(ii)             # Write 'count' numbers into the queue

if __name__=='__main__':
    for count in [10**4, 10**5, 10**6]:
        jqueue = JoinableQueue() # writer() writes to jqueue from _this_ process
        # reader_proc() reads from jqueue as a different process...
        reader_p = Process(target=reader_proc, args=((jqueue),))
        reader_p.daemon = True
        reader_p.start()     # Launch the reader process
        _start = time.time()
        writer(count, jqueue) # Send a lot of stuff to reader_proc() (in different process)
        jqueue.join()         # Wait for the reader to finish
        print("Sending {0} numbers to JoinableQueue() took {1} seconds".format(count, 
            (time.time() - _start)))

回答 1

另一个Queue()值得注意的功能是进纸器线程。节指出:“当进程首先将项目放入队列时,将启动一个供料器线程,该线程将对象从缓冲区转移到管道中。” 可以插入无数(或maxsize)个项目,Queue()而无需任何queue.put()阻塞。这使您可以将多个项目存储在中Queue(),直到程序准备处理它们为止。

Pipe()另一方面,对于已发送到一个连接但尚未从另一连接接收的项目,则具有有限的存储量。存储空间用完后,对的调用connection.send()将阻塞,直到有空间写入整个项目为止。这将使线程停止写入操作,直到从管道读取其他线程为止。Connection对象使您可以访问基础文件描述符。在* nix系统上,您可以connection.send()使用os.set_blocking()函数防止呼叫阻塞。但是,如果您尝试发送不适合管道文件的单个项目,这将导致问题。Linux的最新版本允许您增加文件的大小,但是允许的最大大小根据系统配置而有所不同。因此,您永远不应依赖于Pipe()缓冲数据。调用connection.send 可能会阻塞,直到从其他管道读取数据为止。

总之,当您需要缓冲数据时,队列是比管道更好的选择。即使您只需要在两点之间进行交流。

One additional feature of Queue() that is worth noting is the feeder thread. This section notes “When a process first puts an item on the queue a feeder thread is started which transfers objects from a buffer into the pipe.” An infinite number of (or maxsize) items can be inserted into Queue() without any calls to queue.put() blocking. This allows you to store multiple items in a Queue(), until your program is ready to process them.

Pipe(), on the other hand, has a finite amount of storage for items that have been sent to one connection, but have not been received from the other connection. After this storage is used up, calls to connection.send() will block until there is space to write the entire item. This will stall the thread doing the writing until some other thread reads from the pipe. Connection objects give you access to the underlying file descriptor. On *nix systems, you can prevent connection.send() calls from blocking using the os.set_blocking() function. However, this will cause problems if you try to send a single item that does not fit in the pipe’s file. Recent versions of Linux allow you to increase the size of a file, but the maximum size allowed varies based on system configurations. You should therefore never rely on Pipe() to buffer data. Calls to connection.send could block until data gets read from the pipe somehwere else.

In conclusion, Queue is a better choice than pipe when you need to buffer data. Even when you only need to communicate between two points.


Queue.Queue与collections.deque

问题:Queue.Queue与collections.deque

我需要一个队列,多个线程可以将内容放入其中,并且多个线程可以读取。

Python至少有两个队列类,Queue.Queue和collections.deque,前者似乎在内部使用后者。两者都声称在文档中是线程安全的。

但是,队列文档还指出:

collections.deque是具有无限原子append()和popleft()操作的无界队列的替代实现,不需要锁定。

我猜我不太理解:这是否意味着双端队列毕竟不是完全线程安全的?

如果是这样,我可能无法完全理解两个类之间的区别。我可以看到Queue添加了阻止功能。另一方面,它失去了一些过时的功能,例如对操作员的支持。

直接访问内部双端队列对象是

Queue()中的x

线程安全的?

另外,当双端队列已经是线程安全的了,为什么Queue在操作上使用互斥锁?

I need a queue which multiple threads can put stuff into, and multiple threads may read from.

Python has at least two queue classes, Queue.Queue and collections.deque, with the former seemingly using the latter internally. Both claim to be thread-safe in the documentation.

However, the Queue docs also state:

collections.deque is an alternative implementation of unbounded queues with fast atomic append() and popleft() operations that do not require locking.

Which I guess I don’t quite unterstand: Does this mean deque isn’t fully thread-safe after all?

If it is, I may not fully understand the difference between the two classes. I can see that Queue adds blocking functionality. On the other hand, it loses some deque features like support for the in-operator.

Accessing the internal deque object directly, is

x in Queue().deque

thread-safe?

Also, why does Queue employ a mutex for it’s operations when deque is thread-safe already?


回答 0

Queue.Queuecollections.deque达到不同的目的。Queue.Queue旨在允许不同的线程使用排队的消息/数据进行通信,而collections.deque仅仅是作为数据结构。这就是为什么Queue.Queue有类似的方法put_nowait()get_nowait()join(),而collections.deque不会。Queue.Queue不打算用作集合,这就是为什么它缺少in运算符之类的原因。

归结为:如果您有多个线程,并且希望它们能够在不需要锁的情况下进行通信,那么您正在寻找Queue.Queue;如果您只想将队列或双端队列作为数据结构,请使用collections.deque

最后,访问和处理内部的双端队列 Queue.Queue正在玩火-您确实不想这样做。

Queue.Queue and collections.deque serve different purposes. Queue.Queue is intended for allowing different threads to communicate using queued messages/data, whereas collections.deque is simply intended as a datastructure. That’s why Queue.Queue has methods like put_nowait(), get_nowait(), and join(), whereas collections.deque doesn’t. Queue.Queue isn’t intended to be used as a collection, which is why it lacks the likes of the in operator.

It boils down to this: if you have multiple threads and you want them to be able to communicate without the need for locks, you’re looking for Queue.Queue; if you just want a queue or a double-ended queue as a datastructure, use collections.deque.

Finally, accessing and manipulating the internal deque of a Queue.Queue is playing with fire – you really don’t want to be doing that.


回答 1

如果您要寻找的是一种在线程之间传输对象的线程安全方法,那么两者都将起作用(对于FIFO和LIFO都适用)。对于FIFO:

注意:

  • 的其他操作 deque我不确定可能不是线程安全的。
  • deque并未阻挡pop()popleft()让你无法立足于阻塞,直到一个新项目到达您的消费者线程流。

但是,似乎双端队列具有明显的效率优势。这是使用CPython 2.7.3在几秒钟内插入和删除100k项的一些基准测试结果

deque 0.0747888759791
Queue 1.60079066852

这是基准代码:

import time
import Queue
import collections

q = collections.deque()
t0 = time.clock()
for i in xrange(100000):
    q.append(1)
for i in xrange(100000):
    q.popleft()
print 'deque', time.clock() - t0

q = Queue.Queue(200000)
t0 = time.clock()
for i in xrange(100000):
    q.put(1)
for i in xrange(100000):
    q.get()
print 'Queue', time.clock() - t0

If all you’re looking for is a thread-safe way to transfer objects between threads, then both would work (both for FIFO and LIFO). For FIFO:

Note:

  • Other operations on deque might not be thread safe, I’m not sure.
  • deque does not block on pop() or popleft() so you can’t base your consumer thread flow on blocking till a new item arrives.

However, it seems that deque has a significant efficiency advantage. Here are some benchmark results in seconds using CPython 2.7.3 for inserting and removing 100k items

deque 0.0747888759791
Queue 1.60079066852

Here’s the benchmark code:

import time
import Queue
import collections

q = collections.deque()
t0 = time.clock()
for i in xrange(100000):
    q.append(1)
for i in xrange(100000):
    q.popleft()
print 'deque', time.clock() - t0

q = Queue.Queue(200000)
t0 = time.clock()
for i in xrange(100000):
    q.put(1)
for i in xrange(100000):
    q.get()
print 'Queue', time.clock() - t0

回答 2

有关信息,请参阅Python票证中的双端线程安全性(https://bugs.python.org/issue15329)。标题“阐明哪些双端队列方法是线程安全的”

底线在这里:https : //bugs.python.org/issue15329#msg199368

双端队列的append(),appendleft(),pop(),popleft()和len(d)操作在CPython中是线程安全的。append方法的末尾有一个DECREF(对于已设置maxlen的情况),但这会在所有结构更新完成并且不变量已恢复之后发生,因此可以将这些操作视为原子操作。

无论如何,如果您不确定100%的可靠性,而宁愿选择可靠性而不是性能,则只需放一个类似Lock的锁即可;)

For information there is a Python ticket referenced for deque thread-safety (https://bugs.python.org/issue15329). Title “clarify which deque methods are thread-safe”

Bottom line here: https://bugs.python.org/issue15329#msg199368

The deque’s append(), appendleft(), pop(), popleft(), and len(d) operations are thread-safe in CPython. The append methods have a DECREF at the end (for cases where maxlen has been set), but this happens after all of the structure updates have been made and the invariants have been restored, so it is okay to treat these operations as atomic.

Anyway, if you are not 100% sure and you prefer reliability over performance, just put a like Lock ;)


回答 3

所有启用的单元素方法deque都是原子和线程安全的。所有其他方法也是线程安全的。之类的东西len(dq)dq[4]产生瞬间的正确的价值观。但是想想一下dq.extend(mylist)mylist当其他线程也在同一侧附加元素时,您不能保证所有元素都连续提交,但这通常不是线程间通信和有问题的任务所必需的。

因此,a的deque速度要快20倍左右Queue(后者dequemaxsize幕后使用),除非您不需要“舒适的”同步API(阻止/超时),严格遵守或“覆盖这些方法(_put,_get,.. )来实现其他队列组织的子类化行为,或者当您自己处理此类事情时,光秃秃的deque是高速线程间通信的好方法。

实际上,大量使用额外的互斥锁和额外的方法._get()Queue.py是由于向后兼容性限制,过去的过度设计以及缺乏为线程间通信中这一重要的速度瓶颈问题提供有效解决方案的注意。在较旧的Python版本中使用了列表-但是list.append()/。pop(0)甚至是&都是原子和线程安全的…

All single-element methods on deque are atomic and thread-safe. All other methods are thread-safe too. Things like len(dq), dq[4] yield momentary correct values. But think e.g. about dq.extend(mylist): you don’t get a guarantee that all elements in mylist are filed in a row when other threads also append elements on the same side – but thats usually not a requirement in inter-thread communication and for the questioned task.

So a deque is ~20x faster than Queue (which uses a deque under the hood) and unless you don’t need the “comfortable” synchronization API (blocking / timeout), the strict maxsize obeyance or the “Override these methods (_put, _get, ..) to implement other queue organizations” sub-classing behavior, or when you take care of such things yourself, then a bare deque is a good and efficient deal for high-speed inter-thread communication.

In fact the heavy usage of an extra mutex and extra method ._get() etc. method calls in Queue.py is due to backwards compatibility constraints, past over-design and lack of care for providing an efficient solution for this important speed bottleneck issue in inter-thread communication. A list was used in older Python versions – but even list.append()/.pop(0) was & is atomic and threadsafe …


回答 4

默认行为的20倍改进相比notify_all()每个结果相加会导致更差的结果:deque appendpopleftdequedeque

deque + notify_all: 0.469802
Queue:              0.667279

@Jonathan稍微修改了他的代码,我使用cPython 3.6.2获得了基准,并在双端队列中添加了条件以模拟Queue的行为。

import time
from queue import Queue
import threading
import collections

mutex = threading.Lock()
condition = threading.Condition(mutex)
q = collections.deque()
t0 = time.clock()
for i in range(100000):
    with condition:
        q.append(1)
        condition.notify_all()
for _ in range(100000):
    with condition:
        q.popleft()
        condition.notify_all()
print('deque', time.clock() - t0)

q = Queue(200000)
t0 = time.clock()
for _ in range(100000):
    q.put(1)
for _ in range(100000):
    q.get()
print('Queue', time.clock() - t0)

而且似乎该功能限制了性能 condition.notify_all()

collections.deque是具有无限原子append()和popleft()操作的无界队列的替代实现,不需要锁定。 docs队列

Adding notify_all() to each deque append and popleft results in far worse results for deque than the 20x improvement achieved by default deque behavior:

deque + notify_all: 0.469802
Queue:              0.667279

@Jonathan modify his code a little and I get the benchmark using cPython 3.6.2 and add condition in deque loop to simulate the behaviour Queue do.

import time
from queue import Queue
import threading
import collections

mutex = threading.Lock()
condition = threading.Condition(mutex)
q = collections.deque()
t0 = time.clock()
for i in range(100000):
    with condition:
        q.append(1)
        condition.notify_all()
for _ in range(100000):
    with condition:
        q.popleft()
        condition.notify_all()
print('deque', time.clock() - t0)

q = Queue(200000)
t0 = time.clock()
for _ in range(100000):
    q.put(1)
for _ in range(100000):
    q.get()
print('Queue', time.clock() - t0)

And it seems the performance limited by this function condition.notify_all()

collections.deque is an alternative implementation of unbounded queues with fast atomic append() and popleft() operations that do not require locking. docs Queue


回答 5

deque是线程安全的。“不需要锁定的操作”意味着您不必自己进行锁定,deque多多关照。

以一看Queue源,内部双端队列被称为self.queue并使用存取和突变互斥,所以Queue().queue线程安全的使用。

如果您正在寻找“ in”运算符,则双端队列或队列可能不是最适合您问题的数据结构。

deque is thread-safe. “operations that do not require locking” means that you don’t have to do the locking yourself, the deque takes care of it.

Taking a look at the Queue source, the internal deque is called self.queue and uses a mutex for accessors and mutations, so Queue().queue is not thread-safe to use.

If you’re looking for an “in” operator, then a deque or queue is possibly not the most appropriate data structure for your problem.


回答 6

(似乎我没有信誉可言…)您需要注意从不同线程使用的双端队列的哪些方法。

deque.get()似乎是线程安全的,但是我发现这样做

for item in a_deque:
   process(item)

如果另一个线程同时添加项目,则失败。我收到一个RuntimeException,它抱怨“迭代期间双端队列已变异”。

检查collectionsmodule.c以查看哪些操作受此影响

(seems I don’t have reputation to comment…) You need to be careful which methods of the deque you use from different threads.

deque.get() appears to be threadsafe, but I have found that doing

for item in a_deque:
   process(item)

can fail if another thread is adding items at the same time. I got an RuntimeException that complained “deque mutated during iteration”.

Check collectionsmodule.c to see which operations are affected by this