标签归档:parallel-processing

多处理中的共享内存对象

问题:多处理中的共享内存对象

假设我有一个很大的内存numpy数组,我有一个函数func将这个巨型数组作为输入(以及其他一些参数)。func具有不同参数的参数可以并行运行。例如:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

如果我使用多处理库,那么该巨型数组将多次复制到不同的进程中。

有没有办法让不同的进程共享同一数组?该数组对象是只读的,永远不会被修改。

更复杂的是,如果arr不是数组,而是任意python对象,是否可以共享它?

[编辑]

我读了答案,但仍然有些困惑。由于fork()是写时复制的,因此在python多处理库中生成新进程时,我们不应调用任何额外的开销。但是下面的代码表明存在巨大的开销:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

输出(顺便说一句,成本随着数组大小的增加而增加,所以我怀疑仍然存在与内存复制相关的开销):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

如果我们不复制阵列,为什么会有这么大的开销?共享内存可以为我节省哪一部分?

Suppose I have a large in memory numpy array, I have a function func that takes in this giant array as input (together with some other parameters). func with different parameters can be run in parallel. For example:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

If I use multiprocessing library, then that giant array will be copied for multiple times into different processes.

Is there a way to let different processes share the same array? This array object is read-only and will never be modified.

What’s more complicated, if arr is not an array, but an arbitrary python object, is there a way to share it?

[EDITED]

I read the answer but I am still a bit confused. Since fork() is copy-on-write, we should not invoke any additional cost when spawning new processes in python multiprocessing library. But the following code suggests there is a huge overhead:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

output (and by the way, the cost increases as the size of the array increases, so I suspect there is still overhead related to memory copying):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

Why is there such huge overhead, if we didn’t copy the array? And what part does the shared memory save me?


回答 0

如果使用的操作系统使用写时复制fork()语义(如任何常见的unix),则只要不更改数据结构,所有子进程都可以使用它,而不会占用额外的内存。您将不必执行任何特殊操作(除非绝对确保您不更改该对象)。

可以针对问题执行的最有效的操作是将数组打包为有效的数组结构(使用numpyarray),将其放置在共享内存中,将其包装为multiprocessing.Array,然后将其传递给函数。这个答案说明了如何做到这一点

如果需要可写的共享库,则需要使用某种同步或锁定包装它。multiprocessing提供了两种方法来执行此操作:一种使用共享内存(适用于简单值,数组或ctypes)或Manager代理,其中一个进程持有该内存,而管理器则从其他进程(甚至是通过网络)仲裁对它的访问。

Manager方法可用于任意Python对象,但会比使用共享内存的等效方法慢,因为需要对对象进行序列化/反序列化并在进程之间发送。

Python提供许多并行处理库和方法multiprocessing是一个出色且全面的库,但是如果您有特殊需要,也许其他方法中的一种可能更好。

If you use an operating system that uses copy-on-write fork() semantics (like any common unix), then as long as you never alter your data structure it will be available to all child processes without taking up additional memory. You will not have to do anything special (except make absolutely sure you don’t alter the object).

The most efficient thing you can do for your problem would be to pack your array into an efficient array structure (using numpy or array), place that in shared memory, wrap it with multiprocessing.Array, and pass that to your functions. This answer shows how to do that.

If you want a writeable shared object, then you will need to wrap it with some kind of synchronization or locking. multiprocessing provides two methods of doing this: one using shared memory (suitable for simple values, arrays, or ctypes) or a Manager proxy, where one process holds the memory and a manager arbitrates access to it from other processes (even over a network).

The Manager approach can be used with arbitrary Python objects, but will be slower than the equivalent using shared memory because the objects need to be serialized/deserialized and sent between processes.

There are a wealth of parallel processing libraries and approaches available in Python. multiprocessing is an excellent and well rounded library, but if you have special needs perhaps one of the other approaches may be better.


回答 1

我遇到了同样的问题,并编写了一个共享内存实用程序类来解决该问题。

我正在使用multiprocessing.RawArray(无锁),并且对数组的访问完全不同步(无锁),请注意不要自己动手。

通过该解决方案,我在四核i7上获得了大约3倍的加速。

代码如下:随时使用和改进它,请报告所有错误。

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))

I run into the same problem and wrote a little shared-memory utility class to work around it.

I’m using multiprocessing.RawArray (lockfree), and also the access to the arrays is not synchronized at all (lockfree), be careful not to shoot your own feet.

With the solution I get speedups by a factor of approx 3 on a quad-core i7.

Here’s the code: Feel free to use and improve it, and please report back any bugs.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))

回答 2

这是Ray的预期用例,这是一个用于并行和分布式Python的库。在后台,它使用Apache Arrow数据布局(零副本格式)序列化对象,并将其存储在共享内存对象存储中,这样多个进程可以访问它们而无需创建副本。

该代码如下所示。

import numpy as np
import ray

ray.init()

@ray.remote
def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

如果您不调用ray.put该数组,则该数组仍将存储在共享内存中,但是每次调用都会完成一次func,这不是您想要的。

请注意,这不仅适用于数组,而且还适用于包含数组的对象,例如,将int映射到数组的字典,如下所示。

您可以通过在IPython中运行以下代码来比较Ray和pickle中的序列化性能。

import numpy as np
import pickle
import ray

ray.init()

x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

使用Ray进行序列化仅比pickle快一点,但是由于使用了共享内存,反序列化的速度要快1000倍(此数字当然取决于对象)。

请参阅Ray文档。您可以阅读更多有关使用Ray和Arrow进行快速序列化的信息。注意我是Ray开发人员之一。

This is the intended use case for Ray, which is a library for parallel and distributed Python. Under the hood, it serializes objects using the Apache Arrow data layout (which is a zero-copy format) and stores them in a shared-memory object store so they can be accessed by multiple processes without creating copies.

The code would look like the following.

import numpy as np
import ray

ray.init()

@ray.remote
def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

If you don’t call ray.put then the array will still be stored in shared memory, but that will be done once per invocation of func, which is not what you want.

Note that this will work not only for arrays but also for objects that contain arrays, e.g., dictionaries mapping ints to arrays as below.

You can compare the performance of serialization in Ray versus pickle by running the following in IPython.

import numpy as np
import pickle
import ray

ray.init()

x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

Serialization with Ray is only slightly faster than pickle, but deserialization is 1000x faster because of the use of shared memory (this number will of course depend on the object).

See the Ray documentation. You can read more about fast serialization using Ray and Arrow. Note I’m one of the Ray developers.


回答 3

就像Robert Nishihara提到的那样,Apache Arrow使得这一点变得容易,特别是使用Plasma内存对象存储库,这正是Ray所基于的。

为此,我专门制作了脑等离子体 -在Flask应用中快速加载和重新加载大对象。它是Apache Arrow可序列化对象的共享内存对象命名空间,包括pickle.d生成的’d字节字符串pickle.dumps(...)

Apache Ray和Plasma的主要区别在于,它可以为您跟踪对象ID。在本地运行的任何进程,线程或程序都可以通过从任何Brain对象中调用名称来共享变量的值。

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/)

brain['a'] = [1]*10000

brain['a']
# >>> [1,1,1,1,...]

Like Robert Nishihara mentioned, Apache Arrow makes this easy, specifically with the Plasma in-memory object store, which is what Ray is built on.

I made brain-plasma specifically for this reason – fast loading and reloading of big objects in a Flask app. It’s a shared-memory object namespace for Apache Arrow-serializable objects, including pickle‘d bytestrings generated by pickle.dumps(...).

The key difference with Apache Ray and Plasma is that it keeps track of object IDs for you. Any processes or threads or programs that are running on locally can share the variables’ values by calling the name from any Brain object.

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/)

brain['a'] = [1]*10000

brain['a']
# >>> [1,1,1,1,...]

决定子进程,多处理和Python中的线程?

问题:决定子进程,多处理和Python中的线程?

我想并行化我的Python程序,以便它可以在运行它的机器上使用多个处理器。我的并行化非常简单,因为程序的所有并行“线程”都是独立的,并将其输出写入单独的文件。我不需要线程交换信息,但是必须知道线程何时完成,因为管道的某些步骤取决于它们的输出。

可移植性很重要,因为我希望它可以在Mac,Linux和Windows上的任何Python版本上运行。考虑到这些限制,哪个是实现此功能的最合适的Python模块?我试图在线程,子进程和多处理之间做出选择,它们似乎都提供了相关的功能。

有什么想法吗?我想要最简单的便携式解决方案。

I’d like to parallelize my Python program so that it can make use of multiple processors on the machine that it runs on. My parallelization is very simple, in that all the parallel “threads” of the program are independent and write their output to separate files. I don’t need the threads to exchange information but it is imperative that I know when the threads finish since some steps of my pipeline depend on their output.

Portability is important, in that I’d like this to run on any Python version on Mac, Linux, and Windows. Given these constraints, which is the most appropriate Python module for implementing this? I am trying to decide between thread, subprocess, and multiprocessing, which all seem to provide related functionality.

Any thoughts on this? I’d like the simplest solution that’s portable.


回答 0

multiprocessing是一款很棒的瑞士军刀型模块。它比线程更通用,因为您甚至可以执行远程计算。因此,这是我建议您使用的模块。

subprocess模块还允许您启动多个进程,但是我发现它比新的多处理模块使用起来不方便。

众所周知,线程是微妙的,而且使用CPython时,线程通常仅限于一个内核(尽管正如其中一项注释所述,全局解释器锁(GIL)可以在从Python代码调用的C代码中释放) 。

我相信您引用的三个模块的大多数功能都可以以与平台无关的方式使用。在可移植性方面,请注意,multiprocessing仅自Python 2.6(确实存在用于某些较旧版本的Python的版本)以来才成为标准。但这是一个很棒的模块!

multiprocessing is a great Swiss-army knife type of module. It is more general than threads, as you can even perform remote computations. This is therefore the module I would suggest you use.

The subprocess module would also allow you to launch multiple processes, but I found it to be less convenient to use than the new multiprocessing module.

Threads are notoriously subtle, and, with CPython, you are often limited to one core, with them (even though, as noted in one of the comments, the Global Interpreter Lock (GIL) can be released in C code called from Python code).

I believe that most of the functions of the three modules you cite can be used in a platform-independent way. On the portability side, note that multiprocessing only comes in standard since Python 2.6 (a version for some older versions of Python does exist, though). But it’s a great module!


回答 1

对我来说,这实际上很简单:

选项:

subprocess用于运行其他可执行文件 —它基本上是一个包装器os.fork(),并os.execve()带有可选的管道一定的支撑(管道设置,并从子进程。很明显,你可能其他进程间通信(IPC)机制,如插座,或POSIX或SysV共享内存,但是您将受限于所调用程序所支持的任何接口和IPC通道。

通常,任何人都可以subprocess同步使用—只需调用某个外部实用程序并回读其输出或等待其完成(也许从一个临时文件中读取结果,或者将其发布到某个数据库中)即可。

但是,可以产生数百个子流程并对其进行轮询。我自己个人最喜欢的实用程序classh正是这样做的。 最大的缺点了的subprocess模块是I / O支持通常是封锁。有一个PEP-3145草案可以在将来的Python 3.x版本和一个替代的asyncproc中进行修复(警告会导致直接下载,而不是任何文档或自述文件)。我还发现,直接导入fcntl和操作PopenPIPE文件描述符相对容易—尽管我不知道它是否可以移植到非UNIX平台。

(更新:2019年8月7日:Python 3支持ayncio子流程:asyncio子流程)

subprocess 几乎没有事件处理支持尽管您可以使用signal模块和普通的老式UNIX / Linux信号-像以前那样轻柔地杀死进程。

多处理选项:

multiprocessing对现有的(Python)的代码中运行的功能,可支持这个家庭的过程中更加灵活的通信。特别是,最好在可能的情况下multiprocessing围绕模块的Queue对象构建IPC ,但您也可以使用Event对象和各种其他功能(其中一些功能大概mmap是在足够支持的平台上围绕支持构建的)。

Python的multiprocessing模块旨在提供与接口和功能非常相似的 功能,threading同时允许CPython在具有GIL(全局解释器锁定)的情况下在多个CPU /内核之间扩展您的处理。它利用了由OS内核开发人员完成的所有细粒度SMP锁定和一致性工作。

线程选项:

threading适用于受I / O限制(不需要跨多个CPU内核扩展)的相当狭窄的应用程序范围,并且受益于线程切换(带有共享核心内存)与进程/上下文切换。在Linux上,这几乎是空集(Linux进程切换时间非常接近其线程切换时间)。

threading在Python中两个主要缺点

当然,其中之一是特定于实现的-主要影响CPython。那就是GIL。在大多数情况下,大多数CPython程序不会受益于两个以上CPU(内核)的可用性,并且性能通常会受到 GIL锁定争用的影响。

与实现无关的更大问题是线程共享相同的内存,信号处理程序,文件描述符和某些其他OS资源。因此,程序员必须非常小心对象锁定,异常处理以及其代码的其他方面,这些方面都很微妙,并且可能杀死,停止或死锁整个进程(线程套件)。

通过比较,该multiprocessing模型为每个进程提供了自己的内存,文件描述符等。其中任何一个崩溃或未处理的异常只会杀死该资源,而可靠地处理子进程或同级进程的消失比调试,隔离要容易得多。并修复或解决线程中的类似问题。

  • (请注意:threading与主要的Python系统(例如NumPy)一起使用,可能比大多数自己的Python代码所遭受的GIL竞争要少得多。这是因为它们是专门为这样做而设计的; NumPy的本机/二进制部分,例如,在安全的情况下会释放GIL)。

扭曲的选项:

还值得注意的是,Twisted提供了另一种选择,既优雅又难以理解。基本上,在过度简化的风险下,Twisted的粉丝可能会用干草叉和火把冲进我的家,Twisted在任何(单个)过程中提供事件驱动的协作式多任务处理。

要了解这是如何实现的,应该阅读一下select()(可以围绕select()poll()或类似的OS系统调用构建)的功能。基本上,所有这些都由以下能力驱动:操作系统请求进入睡眠状态,以等待文件描述符列表中的任何活动或某个超时。

从这些调用中的每一个唤醒,select()都是一个事件—要么涉及一些套接字或文件描述符上的可用输入(可读),要么涉及某些其他(可写)描述符或套接字上可用的缓冲空间,以及一些特殊情况(TCP)带外推送数据包)或超时。

因此,Twisted编程模型是围绕处理这些事件然后在生成的“主”处理程序上循环构建的,从而使其可以将事件分派给您的处理程序。

我个人认为Twisted是编程模型的代名词,因为从某种意义上讲,解决问题的方法必须“内卷”。您不是将程序视为对输入数据,输出或结果的一系列操作,而是将程序编写为服务或守护程序,并定义程序对各种事件的反应。(实际上,Twisted程序的核心“主循环”是(通常?总是?)a reactor())。

使用Twisted主要挑战包括围绕事件驱动的模型扭曲思维,并避免使用任何未经编写在Twisted框架内合作的类库或工具包。这就是为什么Twisted提供了自己的模块来进行SSH协议处理,curses,自己的子进程/ Popen函数以及许多其他模块和协议处理程序的原因,这些模块和协议处理程序乍看起来似乎在Python标准库中是重复的。

我认为从概念上理解Twisted很有用,即使您从不打算使用它。它可以提供有关线程,多处理甚至子流程处理以及您执行的任何分布式处理中的性能,争用和事件处理的真知灼见。

注意:较新版本的Python 3.x包含asyncio(异步I / O)功能,例如async def@ async.coroutine装饰器和await关键字,以及从将来的支持中产生的收益。所有这些都大致类似于从流程(合作多任务)的角度来看是扭曲的。(有关Twisted对Python 3的支持的当前状态,请查看:https : //twistedmatrix.com/documents/current/core/howto/python3.html

分布选项:

您尚未询问的另一个处理领域,但值得考虑的是分布式处理。有许多用于分布式处理和并行计算的Python工具和框架。我个人认为,最容易使用的是在该空间中最不常用的一种。

围绕Redis构建分布式处理几乎是微不足道的。整个密钥存储区可用于存储工作单位和结果,Redis LIST可用作Queue()类似的对象,而PUB / SUB支持可用于类似Event的处理。您可以散列密钥并使用在Redis实例的松散集群中复制的键来存储拓扑和散列令牌映射,以提供一致的散列和故障转移,以扩展到超出任何单个实例的容量来协调工作人员并在其中封送数据(腌制,JSON,BSON或YAML)。

当然,当您开始围绕Redis构建更大规模,更复杂的解决方案时,您正在重新实现已经使用CeleryApache SparkHadoopZookeeperetcdCassandra等解决的许多功能。这些都具有用于Python访问其服务的模块。

[更新:如果您考虑将Python用于分布式系统中的计算密集型,则需要考虑以下两个资源:IPython ParallelPySpark。尽管这些是通用分布式计算系统,但它们尤其是可访问且流行的子系统,即数据科学和分析]。

结论

从单线程,简单的同步调用到子进程,轮询的子进程池,线程和多处理,事件驱动的协作式多任务处理以及分布式处理,到处都有Python的处理替代方案。

For me this is actually pretty simple:

The subprocess option:

subprocess is for running other executables — it’s basically a wrapper around os.fork() and os.execve() with some support for optional plumbing (setting up PIPEs to and from the subprocesses. Obviously you could other inter-process communications (IPC) mechanisms, such as sockets, or Posix or SysV shared memory. But you’re going to be limited to whatever interfaces and IPC channels are supported by the programs you’re calling.

Commonly, one uses any subprocess synchronously — simply calling some external utility and reading back its output or awaiting its completion (perhaps reading its results from a temporary file, or after it’s posted them to some database).

However one can spawn hundreds of subprocesses and poll them. My own personal favorite utility classh does exactly that. The biggest disadvantage of the subprocess module is that I/O support is generally blocking. There is a draft PEP-3145 to fix that in some future version of Python 3.x and an alternative asyncproc (Warning that leads right to the download, not to any sort of documentation nor README). I’ve also found that it’s relatively easy to just import fcntl and manipulate your Popen PIPE file descriptors directly — though I don’t know if this is portable to non-UNIX platforms.

(Update: 7 August 2019: Python 3 support for ayncio subprocesses: asyncio Subprocessses)

subprocess has almost no event handling supportthough you can use the signal module and plain old-school UNIX/Linux signals — killing your processes softly, as it were.

The multiprocessing option:

multiprocessing is for running functions within your existing (Python) code with support for more flexible communications among this family of processes. In particular it’s best to build your multiprocessing IPC around the module’s Queue objects where possible, but you can also use Event objects and various other features (some of which are, presumably, built around mmap support on the platforms where that support is sufficient).

Python’s multiprocessing module is intended to provide interfaces and features which are very similar to threading while allowing CPython to scale your processing among multiple CPUs/cores despite the GIL (Global Interpreter Lock). It leverages all the fine-grained SMP locking and coherency effort that was done by developers of your OS kernel.

The threading option:

threading is for a fairly narrow range of applications which are I/O bound (don’t need to scale across multiple CPU cores) and which benefit from the extremely low latency and switching overhead of thread switching (with shared core memory) vs. process/context switching. On Linux this is almost the empty set (Linux process switch times are extremely close to its thread-switches).

threading suffers from two major disadvantages in Python.

One, of course, is implementation specific — mostly affecting CPython. That’s the GIL. For the most part, most CPython programs will not benefit from the availability of more than two CPUs (cores) and often performance will suffer from the GIL locking contention.

The larger issue which is not implementation specific, is that threads share the same memory, signal handlers, file descriptors and certain other OS resources. Thus the programmer must be extremely careful about object locking, exception handling and other aspects of their code which are both subtle and which can kill, stall, or deadlock the entire process (suite of threads).

By comparison the multiprocessing model gives each process its own memory, file descriptors, etc. A crash or unhandled exception in any one of them will only kill that resource and robustly handling the disappearance of a child or sibling process can be considerably easier than debugging, isolating and fixing or working around similar issues in threads.

  • (Note: use of threading with major Python systems, such as NumPy, may suffer considerably less from GIL contention than most of your own Python code would. That’s because they’ve been specifically engineered to do so; the native/binary portions of NumPy, for example, will release the GIL when that’s safe).

The twisted option:

It’s also worth noting that Twisted offers yet another alternative which is both elegant and very challenging to understand. Basically, at the risk of over simplifying to the point where fans of Twisted may storm my home with pitchforks and torches, Twisted provides event-driven co-operative multi-tasking within any (single) process.

To understand how this is possible one should read about the features of select() (which can be built around the select() or poll() or similar OS system calls). Basically it’s all driven by the ability to make a request of the OS to sleep pending any activity on a list of file descriptors or some timeout.

Awakening from each of these calls to select() is an event — either one involving input available (readable) on some number of sockets or file descriptors, or buffering space becoming available on some other (writable) descriptors or sockets, some exceptional conditions (TCP out-of-band PUSH’d packets, for example), or a TIMEOUT.

Thus the Twisted programming model is built around handling these events then looping on the resulting “main” handler, allowing it to dispatch the events to your handlers.

I personally think of the name, Twisted as evocative of the programming model … since your approach to the problem must be, in some sense, “twisted” inside out. Rather than conceiving of your program as a series of operations on input data and outputs or results, you’re writing your program as a service or daemon and defining how it reacts to various events. (In fact the core “main loop” of a Twisted program is (usually? always?) a reactor()).

The major challenges to using Twisted involve twisting your mind around the event driven model and also eschewing the use of any class libraries or toolkits which are not written to co-operate within the Twisted framework. This is why Twisted supplies its own modules for SSH protocol handling, for curses, and its own subprocess/Popen functions, and many other modules and protocol handlers which, at first blush, would seem to duplicate things in the Python standard libraries.

I think it’s useful to understand Twisted on a conceptual level even if you never intend to use it. It may give insights into performance, contention, and event handling in your threading, multiprocessing and even subprocess handling as well as any distributed processing you undertake.

(Note: Newer versions of Python 3.x are including asyncio (asynchronous I/O) features such as async def, the @async.coroutine decorator, and the await keyword, and yield from future support. All of these are roughly similar to Twisted from a process (co-operative multitasking) perspective). (For the current status of Twisted support for Python 3, check out: https://twistedmatrix.com/documents/current/core/howto/python3.html)

The distributed option:

Yet another realm of processing you haven’t asked about, but which is worth considering, is that of distributed processing. There are many Python tools and frameworks for distributed processing and parallel computation. Personally I think the easiest to use is one which is least often considered to be in that space.

It is almost trivial to build distributed processing around Redis. The entire key store can be used to store work units and results, Redis LISTs can be used as Queue() like object, and the PUB/SUB support can be used for Event-like handling. You can hash your keys and use values, replicated across a loose cluster of Redis instances, to store the topology and hash-token mappings to provide consistent hashing and fail-over for scaling beyond the capacity of any single instance for co-ordinating your workers and marshaling data (pickled, JSON, BSON, or YAML) among them.

Of course as you start to build a larger scale and more sophisticated solution around Redis you are re-implementing many of the features that have already been solved using, Celery, Apache Spark and Hadoop, Zookeeper, etcd, Cassandra and so on. Those all have modules for Python access to their services.

[Update: A couple of resources for consideration if you’re considering Python for computationally intensive across distributed systems: IPython Parallel and PySpark. While these are general purpose distributed computing systems, they are particularly accessible and popular subsystems data science and analytics].

Conclusion

There you have the gamut of processing alternatives for Python, from single threaded, with simple synchronous calls to sub-processes, pools of polled subprocesses, threaded and multiprocessing, event-driven co-operative multi-tasking, and out to distributed processing.


回答 2

在类似的情况下,我选择了单独的过程以及通过网络套接字进行的少量必要通信。它使用python高度可移植并且非常简单,但是可能并不简单(在我的情况下,我还有另一个约束:与其他用C ++编写的进程进行通信)。

在您的情况下,我可能会选择多进程,因为python线程(至少在使用CPython时)不是真正的线程。好吧,它们是本机系统线程,但是从Python调用的C模块可能会也可能不会释放GIL,并允许其他线程在调用阻塞代码时运行。

In a similar case I opted for separate processes and the little bit of necessary communication trough network socket. It is highly portable and quite simple to do using python, but probably not the simpler (in my case I had also another constraint: communication with other processes written in C++).

In your case I would probably go for multiprocess, as python threads, at least when using CPython, are not real threads. Well, they are native system threads but C modules called from Python may or may not release the GIL and allow other threads them to run when calling blocking code.


回答 3

要在CPython中使用多个处理器,唯一的选择是multiprocessing模块。CPython对其内部(GIL)保持锁定,以防止其他cpus上的线程并行工作。该multiprocessing模块创建新流程(例如subprocess)并管理它们之间的通信。

To use multiple processors in CPython your only choice is the multiprocessing module. CPython keeps a lock on it’s internals (the GIL) which prevents threads on other cpus to work in parallel. The multiprocessing module creates new processes ( like subprocess ) and manages communication between them.


回答 4

掏出外壳,让unix来完成您的工作:

使用iterpipes包装子进程,然后:

来自Ted Ziuba的网站

INPUTS_FROM_YOU | xargs -n1 -0 -P NUM ./process #NUM个并行进程

要么

Gnu Parallel也将服务

当您派遣后台男孩进行多核工作时,您会与GIL闲逛。

Shell out and let the unix out to do your jobs:

use iterpipes to wrap subprocess and then:

From Ted Ziuba’s site

INPUTS_FROM_YOU | xargs -n1 -0 -P NUM ./process #NUM parallel processes

OR

Gnu Parallel will also serve

You hang out with GIL while you send the backroom boys out to do your multicore work.


如何在Python中进行并行编程?

问题:如何在Python中进行并行编程?

对于C ++,我们可以使用OpenMP进行并行编程。但是,OpenMP不适用于Python。如果要并行处理python程序的某些部分,该怎么办?

该代码的结构可以认为是:

solve1(A)
solve2(B)

其中solve1solve2是两个独立的功能。为了减少运行时间,如何并行而不是按顺序运行这种代码?希望可以有人帮帮我。首先十分感谢。代码是:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break

        node1 = partition[0]
        node2 = partition[1]

        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

setinner和setouter是两个独立的功能。那是我要平行的地方…

For C++, we can use OpenMP to do parallel programming; however, OpenMP will not work for Python. What should I do if I want to parallel some parts of my python program?

The structure of the code may be considered as:

solve1(A)
solve2(B)

Where solve1 and solve2 are two independent function. How to run this kind of code in parallel instead of in sequence in order to reduce the running time? The code is:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break
            
        node1 = partition[0]
        node2 = partition[1]
    
        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

Where setinner and setouter are two independent functions. That’s where I want to parallel…


回答 0

您可以使用多处理模块。对于这种情况,我可以使用一个处理池:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

这将产生可以为您完成常规工作的流程。由于未通过processes,它将为您计算机上的每个CPU内核生成一个进程。每个CPU内核可以同时执行一个进程。

如果要将列表映射到单个功能,请执行以下操作:

args = [A, B]
results = pool.map(solve1, args)

不要使用线程,因为GIL会锁定对python对象的所有操作。

You can use the multiprocessing module. For this case I might use a processing pool:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

This will spawn processes that can do generic work for you. Since we did not pass processes, it will spawn one process for each CPU core on your machine. Each CPU core can execute one process simultaneously.

If you want to map a list to a single function you would do this:

args = [A, B]
results = pool.map(solve1, args)

Don’t use threads because the GIL locks any operations on python objects.


回答 1

使用Ray可以非常优雅地完成此操作。

要并行处理示例,您需要使用@ray.remote装饰器定义函数,然后使用调用它们.remote

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

多处理模块相比,它有许多优点。

  1. 相同的代码将在多核计算机以及计算机集群上运行。
  2. 进程通过共享内存和零拷贝序列化有效地共享数据。
  3. 错误消息可以很好地传播。
  4. 这些函数调用可以组合在一起,例如,

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
    
  5. 除了远程调用功能外,类还可以作为actor远程实例化。

请注意,Ray是我一直在帮助开发的框架。

This can be done very elegantly with Ray.

To parallelize your example, you’d need to define your functions with the @ray.remote decorator, and then invoke them with .remote.

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

There are a number of advantages of this over the multiprocessing module.

  1. The same code will run on a multicore machine as well as a cluster of machines.
  2. Processes share data efficiently through shared memory and zero-copy serialization.
  3. Error messages are propagated nicely.
  4. These function calls can be composed together, e.g.,

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
    
  5. In addition to invoking functions remotely, classes can be instantiated remotely as actors.

Note that Ray is a framework I’ve been helping develop.


回答 2

CPython使用Global Interpreter Lock,这使并行编程比C ++更加有趣

本主题提供了一些有用的示例和挑战说明:

在Linux上使用Taskset在多核系统上使用Python Global Interpreter Lock(GIL)解决方法?

CPython uses the Global Interpreter Lock which makes parallel programing a bit more interesting than C++

This topic has several useful examples and descriptions of the challenge:

Python Global Interpreter Lock (GIL) workaround on multi-core systems using taskset on Linux?


回答 3

正如其他人所说,解决方案是使用多个过程。但是,哪种框架更合适取决于许多因素。除了已经提到的那些,还有charm4pympi4py(我是charm4py的开发人员)。

与使用工作池抽象相比,有一种更有效的方法来实现上述示例。G在1000次迭代中,主循环不断向工作人员发送相同的参数(包括完整的图)。由于至少一个工作进程将驻留在不同的进程上,因此这涉及将参数复制并发送到其他进程。根据对象的大小,这可能会非常昂贵。相反,让工作人员存储状态并仅发送更新的信息是有意义的。

例如,在charm4py中,可以这样进行:

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

请注意,在此示例中,我们实际上只需要一个工人。主循环可以执行其中一个功能,而工人可以执行另一个功能。但是我的代码有助于说明以下几点:

  1. 工作程序A在进程0中运行(与主循环相同)。在result_a.get()阻塞等待结果的同时,工作者A在同一过程中进行计算。
  2. 参数通过引用自动传递给工作程序A,因为它处于同一过程中(不涉及复制)。

The solution, as others have said, is to use multiple processes. Which framework is more appropriate, however, depends on many factors. In addition to the ones already mentioned, there is also charm4py and mpi4py (I am the developer of charm4py).

There is a more efficient way to implement the above example than using the worker pool abstraction. The main loop sends the same parameters (including the complete graph G) over and over to workers in each of the 1000 iterations. Since at least one worker will reside on a different process, this involves copying and sending the arguments to the other process(es). This could be very costly depending on the size of the objects. Instead, it makes sense to have workers store state and simply send the updated information.

For example, in charm4py this can be done like this:

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

Note that for this example we really only need one worker. The main loop could execute one of the functions, and have the worker execute the other. But my code helps to illustrate a couple of things:

  1. Worker A runs in process 0 (same as the main loop). While result_a.get() is blocked waiting on the result, worker A does the computation in the same process.
  2. Arguments are automatically passed by reference to worker A, since it is in the same process (there is no copying involved).

回答 4

在某些情况下,可以使用Numba自动并行化循环,尽管它仅适用于一小部分Python:

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

不幸的是,似乎Numba仅适用于Numpy数组,不适用于其他Python对象。从理论上讲,也可以将Python编译为C ++,然后使用Intel C ++编译器自动并行化它,尽管我还没有尝试过。

In some cases, it’s possible to automatically parallelize loops using Numba, though it only works with a small subset of Python:

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

Unfortunately, it seems that Numba only works with Numpy arrays, but not with other Python objects. In theory, it might also be possible to compile Python to C++ and then automatically parallelize it using the Intel C++ compiler, though I haven’t tried this yet.


回答 5

您可以使用joblib库进行并行计算和多处理。

from joblib import Parallel, delayed

您可以简单地创建foo要并行运行的函数,并基于以下代码实现并行处理:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

在哪里num_cores可以从以下地址获得multiprocessing库如下:

import multiprocessing

num_cores = multiprocessing.cpu_count()

如果您的函数具有多个输入参数,并且只想通过列表对一个参数进行迭代,则可以按以下方式使用库中的partial函数functools

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

您可以在此处找到一些有关python和R多重处理的完整说明。

You can use joblib library to do parallel computation and multiprocessing.

from joblib import Parallel, delayed

You can simply create a function foo which you want to be run in parallel and based on the following piece of code implement parallel processing:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

Where num_cores can be obtained from multiprocessing library as followed:

import multiprocessing

num_cores = multiprocessing.cpu_count()

If you have a function with more than one input argument, and you just want to iterate over one of the arguments by a list, you can use the the partial function from functools library as follow:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

You can find a complete explanation of the python and R multiprocessing with couple of examples here.


线程和多处理模块之间有什么区别?

问题:线程和多处理模块之间有什么区别?

我正在学习如何在Python中使用threadingmultiprocessing模块并行运行某些操作并加快代码速度。

我发现很难理解一个threading.Thread()对象与一个对象之间的区别(也许是因为我没有任何理论背景)multiprocessing.Process()

另外,对我来说,如何实例化一个作业队列并使其只有4个(例如)并行运行,而另一个则等待资源释放后再执行,对我来说也不是很清楚。

我发现文档中的示例很清楚,但并不十分详尽。一旦尝试使事情复杂化,我就会收到很多奇怪的错误(例如无法腌制的方法,等等)。

那么,什么时候应该使用threadingand multiprocessing模块?

您能否将我链接到一些资源,以解释这两个模块的概念以及如何在复杂的任务中正确使用它们?

I am learning how to use the threading and the multiprocessing modules in Python to run certain operations in parallel and speed up my code.

I am finding this hard (maybe because I don’t have any theoretical background about it) to understand what the difference is between a threading.Thread() object and a multiprocessing.Process() one.

Also, it is not entirely clear to me how to instantiate a queue of jobs and having only 4 (for example) of them running in parallel, while the other wait for resources to free before being executed.

I find the examples in the documentation clear, but not very exhaustive; as soon as I try to complicate things a bit, I receive a lot of weird errors (like a method that can’t be pickled, and so on).

So, when should I use the threading and multiprocessing modules?

Can you link me to some resources that explain the concepts behind these two modules and how to use them properly for complex tasks?


回答 0

什么朱利奥·佛朗哥说,对于多线程多处理与真一般

但是,Python *还有一个问题:有一个全局解释器锁,可以防止同一进程中的两个线程同时运行Python代码。这意味着,如果您有8个核心,并且将代码更改为使用8个线程,则它将无法使用800%的CPU并无法以8倍的速度运行;它会使用相同的100%CPU,并以相同的速度运行。(实际上,它的运行速度会稍慢一些,因为即使您没有任何共享数据,线程处理也会带来额外的开销,但是现在暂时忽略它。)

也有exceptions。如果您的代码繁重的计算实际上不是在Python中发生的,而是在某些具有自定义C代码的库中执行的,这些代码可以正确地进行GIL处理,例如numpy应用程序,则您将从线程中获得预期的性能收益。如果繁重的计算是由运行并等待的某些子进程完成的,则情况也是如此。

更重要的是,在某些情况下,这无关紧要。例如,网络服务器花费大部分时间来读取网络中的数据包,而GUI应用花费大部分时间来等待用户事件。在网络服务器或GUI应用程序中使用线程的原因之一是允许您执行长时间运行的“后台任务”,而不会阻止主线程继续为网络数据包或GUI事件提供服务。这在Python线程中工作得很好。(从技术上讲,这意味着Python线程为您提供了并发性,即使它们没有为您提供核心并行性。)

但是,如果您使用纯Python编写受CPU约束的程序,则使用更多线程通常无济于事。

对于GIL,使用单独的进程没有这种问题,因为每个进程都有自己的单独的GIL。当然,线程和进程之间仍然具有与其他任何语言相同的权衡取舍–在进程之间共享数据比在线程之间共享更加困难,而且成本更高,运行大量进程或创建和销毁这些开销可能会很高等等。但是GIL在处理方面的平衡上权衡沉重,这对于C或Java而言并非如此。因此,您会发现自己在Python中比在C或Java中使用多处理的频率更高。


同时,Python的“含电池”理念带来了一些好消息:编写代码很容易,只需进行一次更改即可在线程和进程之间来回切换。

如果您根据独立的“作业”来设计代码,除了输入和输出,这些作业不与其他作业(或主程序)共享任何内容,则可以使用该concurrent.futures库在线程池周围编写代码,如下所示:

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    executor.submit(job, argument)
    executor.map(some_function, collection_of_independent_things)
    # ...

您甚至可以获取这些作业的结果,并将其传递给其他作业,按执行顺序或完成顺序等待;等等。阅读有关Future对象的部分以获取详细信息。

现在,如果事实证明您的程序一直在使用100%CPU,并且添加更多线程只会使其速度变慢,那么您就遇到了GIL问题,因此您需要切换到进程。您要做的就是更改第一行:

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:

唯一真正的警告是,作业的自变量和返回值必须可腌制(而不需要花费太多时间或内存来腌制)才能使用跨进程。通常这不是问题,但有时是问题。


但是,如果您的工作不能自给自足怎么办?如果您可以根据将消息从一个传递到另一个的工作来设计代码,那仍然很容易。您可能必须使用threading.Threadmultiprocessing.Process代替依赖池。并且您将必须显式创建queue.Queuemultiprocessing.Queue对象。(还有很多其他选择,例如管道,套接字,带有斑点的文件等等。但是,要点是,如果执行器的自动魔力不足,则必须手动执行某些操作。)

但是,如果您甚至不能依靠消息传递怎么办?如果您需要两项工作来同时改变同一个结构并看到彼此的变化,该怎么办?在这种情况下,您将需要进行手动同步(锁定,信号量,条件等),并且,如果要使用进程,则需要显式的共享内存对象进行引导。这是当多线程(或多处理)变得困难时。如果可以避免,那就太好了;如果不能,那么您将需要阅读的内容超过某人可以提供的答案。


通过评论,您想了解Python中的线程和进程之间的区别。的确,如果您阅读了朱利奥·佛朗哥(Giulio Franco)的答案和我的知识以及我们所有的链接,那应该涵盖了所有内容……但是总结肯定会很有用,所以这里是:

  1. 线程默认共享数据;流程没有。
  2. 作为(1)的结果,在进程之间发送数据通常需要对其进行酸洗和酸洗。**
  3. (1)的另一个结果是,在进程之间直接共享数据通常需要将其放入低级格式,如Value,Array和ctypesTypes。
  4. 流程不受GIL约束。
  5. 在某些平台(主要是Windows)上,创建和销毁进程的成本要高得多。
  6. 对流程有一些额外的限制,其中某些限制在不同平台上有所不同。有关详细信息,请参见编程指南
  7. threading模块不具有该模块的某些功能multiprocessing。(您可以使用multiprocessing.dummy大多数缺少的API放在线程之上,也可以使用更高级别的模块,例如concurrent.futures,不必担心。)

*出现此问题的实际上不是Python语言,而是该语言的“标准”实现CPython。其他一些实现没有JIL,例如Jython。

**如果您正在使用fork start方法进行多处理(在大多数非Windows平台上可以使用),则每个子进程都将获得启动子级时父级拥有的任何资源,这可能是将数据传递给子级的另一种方式。

What Giulio Franco says is true for multithreading vs. multiprocessing in general.

However, Python* has an added issue: There’s a Global Interpreter Lock that prevents two threads in the same process from running Python code at the same time. This means that if you have 8 cores, and change your code to use 8 threads, it won’t be able to use 800% CPU and run 8x faster; it’ll use the same 100% CPU and run at the same speed. (In reality, it’ll run a little slower, because there’s extra overhead from threading, even if you don’t have any shared data, but ignore that for now.)

There are exceptions to this. If your code’s heavy computation doesn’t actually happen in Python, but in some library with custom C code that does proper GIL handling, like a numpy app, you will get the expected performance benefit from threading. The same is true if the heavy computation is done by some subprocess that you run and wait on.

More importantly, there are cases where this doesn’t matter. For example, a network server spends most of its time reading packets off the network, and a GUI app spends most of its time waiting for user events. One reason to use threads in a network server or GUI app is to allow you to do long-running “background tasks” without stopping the main thread from continuing to service network packets or GUI events. And that works just fine with Python threads. (In technical terms, this means Python threads give you concurrency, even though they don’t give you core-parallelism.)

But if you’re writing a CPU-bound program in pure Python, using more threads is generally not helpful.

Using separate processes has no such problems with the GIL, because each process has its own separate GIL. Of course you still have all the same tradeoffs between threads and processes as in any other languages—it’s more difficult and more expensive to share data between processes than between threads, it can be costly to run a huge number of processes or to create and destroy them frequently, etc. But the GIL weighs heavily on the balance toward processes, in a way that isn’t true for, say, C or Java. So, you will find yourself using multiprocessing a lot more often in Python than you would in C or Java.


Meanwhile, Python’s “batteries included” philosophy brings some good news: It’s very easy to write code that can be switched back and forth between threads and processes with a one-liner change.

If you design your code in terms of self-contained “jobs” that don’t share anything with other jobs (or the main program) except input and output, you can use the concurrent.futures library to write your code around a thread pool like this:

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    executor.submit(job, argument)
    executor.map(some_function, collection_of_independent_things)
    # ...

You can even get the results of those jobs and pass them on to further jobs, wait for things in order of execution or in order of completion, etc.; read the section on Future objects for details.

Now, if it turns out that your program is constantly using 100% CPU, and adding more threads just makes it slower, then you’re running into the GIL problem, so you need to switch to processes. All you have to do is change that first line:

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:

The only real caveat is that your jobs’ arguments and return values have to be pickleable (and not take too much time or memory to pickle) to be usable cross-process. Usually this isn’t a problem, but sometimes it is.


But what if your jobs can’t be self-contained? If you can design your code in terms of jobs that pass messages from one to another, it’s still pretty easy. You may have to use threading.Thread or multiprocessing.Process instead of relying on pools. And you will have to create queue.Queue or multiprocessing.Queue objects explicitly. (There are plenty of other options—pipes, sockets, files with flocks, … but the point is, you have to do something manually if the automatic magic of an Executor is insufficient.)

But what if you can’t even rely on message passing? What if you need two jobs to both mutate the same structure, and see each others’ changes? In that case, you will need to do manual synchronization (locks, semaphores, conditions, etc.) and, if you want to use processes, explicit shared-memory objects to boot. This is when multithreading (or multiprocessing) gets difficult. If you can avoid it, great; if you can’t, you will need to read more than someone can put into an SO answer.


From a comment, you wanted to know what’s different between threads and processes in Python. Really, if you read Giulio Franco’s answer and mine and all of our links, that should cover everything… but a summary would definitely be useful, so here goes:

  1. Threads share data by default; processes do not.
  2. As a consequence of (1), sending data between processes generally requires pickling and unpickling it.**
  3. As another consequence of (1), directly sharing data between processes generally requires putting it into low-level formats like Value, Array, and ctypes types.
  4. Processes are not subject to the GIL.
  5. On some platforms (mainly Windows), processes are much more expensive to create and destroy.
  6. There are some extra restrictions on processes, some of which are different on different platforms. See Programming guidelines for details.
  7. The threading module doesn’t have some of the features of the multiprocessing module. (You can use multiprocessing.dummy to get most of the missing API on top of threads, or you can use higher-level modules like concurrent.futures and not worry about it.)

* It’s not actually Python, the language, that has this issue, but CPython, the “standard” implementation of that language. Some other implementations don’t have a GIL, like Jython.

** If you’re using the fork start method for multiprocessing—which you can on most non-Windows platforms—each child process gets any resources the parent had when the child was started, which can be another way to pass data to children.


回答 1

一个进程中可以存在多个线程。属于同一进程的线程共享同一内存区域(可以读取和写入相同的变量,并且可以互相干扰)。相反,不同的进程驻留在不同的内存区域中,并且每个进程都有自己的变量。为了进行通信,进程必须使用其他通道(文件,管道或套接字)。

如果要并行化计算,则可能需要多线程处理,因为您可能希望线程在同一内存上进行协作。

说到性能,线程的创建和管理速度比进程要快(因为操作系统不需要分配整个新的虚拟内存区域),并且线程间通信通常比进程间通信快。但是线程很难编程。线程可以互相干扰,并且可以互相写入内存,但是这种情况并不总是很明显(由于多种因素,主要是指令重新排序和内存缓存),因此您将需要同步原语来控制访问您的变量。

Multiple threads can exist in a single process. The threads that belong to the same process share the same memory area (can read from and write to the very same variables, and can interfere with one another). On the contrary, different processes live in different memory areas, and each of them has its own variables. In order to communicate, processes have to use other channels (files, pipes or sockets).

If you want to parallelize a computation, you’re probably going to need multithreading, because you probably want the threads to cooperate on the same memory.

Speaking about performance, threads are faster to create and manage than processes (because the OS doesn’t need to allocate a whole new virtual memory area), and inter-thread communication is usually faster than inter-process communication. But threads are harder to program. Threads can interfere with one another, and can write to each other’s memory, but the way this happens is not always obvious (due to several factors, mainly instruction reordering and memory caching), and so you are going to need synchronization primitives to control access to your variables.


回答 2

我相信此链接可以优雅地回答您的问题。

简而言之,如果您的一个子问题不得不等待另一个子问题完成,那么多线程就可以了(例如,在I / O繁重的操作中);相反,如果您的子问题确实可能同时发生,则建议进行多处理。但是,创建的进程数不会超过核心数。

I believe this link answers your question in an elegant way.

To be short, if one of your sub-problems has to wait while another finishes, multithreading is good (in I/O heavy operations, for example); by contrast, if your sub-problems could really happen at the same time, multiprocessing is suggested. However, you won’t create more processes than your number of cores.


回答 3

Python文档引号

我在以下位置突出了有关Process vs Threads和GIL的重要Python文档报价:CPython中的全局解释器锁(GIL)是什么?

进程与线程实验

我做了一些基准测试,以便更具体地显示差异。

在基准测试中,我为8个超线程上的各种线程的CPU和IO绑定时间计时 CPU。每个线程提供的功总是相同的,因此,更多线程意味着提供更多的总功。

结果是:

绘制数据

结论:

  • 对于CPU限制的工作,多处理总是更快,大概是由于GIL

  • 用于IO绑定工作。两者的速度完全一样

  • 由于我在8个超线程计算机上,因此线程最多只能扩展到大约4倍,而不是预期的8倍。

    与C POSIX绑定的CPU工作达到预期的8倍加速相比,它是什么time(1)输出中的“ real”,“ user”和“ sys”是什么意思?

    TODO:我不知道是什么原因,一定还有其他Python低效率正在发挥作用。

测试代码:

#!/usr/bin/env python3

import multiprocessing
import threading
import time
import sys

def cpu_func(result, niters):
    '''
    A useless CPU bound function.
    '''
    for i in range(niters):
        result = (result * result * i + 2 * result * i * i + 3) % 10000000
    return result

class CpuThread(threading.Thread):
    def __init__(self, niters):
        super().__init__()
        self.niters = niters
        self.result = 1
    def run(self):
        self.result = cpu_func(self.result, self.niters)

class CpuProcess(multiprocessing.Process):
    def __init__(self, niters):
        super().__init__()
        self.niters = niters
        self.result = 1
    def run(self):
        self.result = cpu_func(self.result, self.niters)

class IoThread(threading.Thread):
    def __init__(self, sleep):
        super().__init__()
        self.sleep = sleep
        self.result = self.sleep
    def run(self):
        time.sleep(self.sleep)

class IoProcess(multiprocessing.Process):
    def __init__(self, sleep):
        super().__init__()
        self.sleep = sleep
        self.result = self.sleep
    def run(self):
        time.sleep(self.sleep)

if __name__ == '__main__':
    cpu_n_iters = int(sys.argv[1])
    sleep = 1
    cpu_count = multiprocessing.cpu_count()
    input_params = [
        (CpuThread, cpu_n_iters),
        (CpuProcess, cpu_n_iters),
        (IoThread, sleep),
        (IoProcess, sleep),
    ]
    header = ['nthreads']
    for thread_class, _ in input_params:
        header.append(thread_class.__name__)
    print(' '.join(header))
    for nthreads in range(1, 2 * cpu_count):
        results = [nthreads]
        for thread_class, work_size in input_params:
            start_time = time.time()
            threads = []
            for i in range(nthreads):
                thread = thread_class(work_size)
                threads.append(thread)
                thread.start()
            for i, thread in enumerate(threads):
                thread.join()
            results.append(time.time() - start_time)
        print(' '.join('{:.6e}'.format(result) for result in results))

GitHub上游+在同一目录上绘制代码

在带有CPU的Lenovo ThinkPad P51笔记本电脑上的Ubuntu 18.10,Python 3.6.7上进行了测试:Intel Core i7-7820HQ CPU(4核/ 8线程),RAM:2x三星M471A2K43BB1-CRC(2x 16GiB),SSD:Samsung MZVLB512HAJQ- 000L7(3,000 MB / s)。

可视化在给定时间正在运行的线程

这篇帖子https://rohanvarma.me/GIL/告诉我,只要调度线程的target=参数threading.Thread与相同,就可以运行回调multiprocessing.Process

这使我们可以精确查看每次运行哪个线程。完成此操作后,我们将看到类似的内容(我制作了此特定图形):

            +--------------------------------------+
            + Active threads / processes           +
+-----------+--------------------------------------+
|Thread   1 |********     ************             |
|         2 |        *****            *************|
+-----------+--------------------------------------+
|Process  1 |***  ************** ******  ****      |
|         2 |** **** ****** ** ********* **********|
+-----------+--------------------------------------+
            + Time -->                             +
            +--------------------------------------+

这将表明:

  • 线程由GIL完全序列化
  • 进程可以并行运行

Python documentation quotes

I’ve highlighted the key Python documentation quotes about Process vs Threads and the GIL at: What is the global interpreter lock (GIL) in CPython?

Process vs thread experiments

I did a bit of benchmarking in order to show the difference more concretely.

In the benchmark, I timed CPU and IO bound work for various numbers of threads on an 8 hyperthread CPU. The work supplied per thread is always the same, such that more threads means more total work supplied.

The results were:

Plot data.

Conclusions:

  • for CPU bound work, multiprocessing is always faster, presumably due to the GIL

  • for IO bound work. both are exactly the same speed

  • threads only scale up to about 4x instead of the expected 8x since I’m on an 8 hyperthread machine.

    Contrast that with a C POSIX CPU-bound work which reaches the expected 8x speedup: What do ‘real’, ‘user’ and ‘sys’ mean in the output of time(1)?

    TODO: I don’t know the reason for this, there must be other Python inefficiencies coming into play.

Test code:

#!/usr/bin/env python3

import multiprocessing
import threading
import time
import sys

def cpu_func(result, niters):
    '''
    A useless CPU bound function.
    '''
    for i in range(niters):
        result = (result * result * i + 2 * result * i * i + 3) % 10000000
    return result

class CpuThread(threading.Thread):
    def __init__(self, niters):
        super().__init__()
        self.niters = niters
        self.result = 1
    def run(self):
        self.result = cpu_func(self.result, self.niters)

class CpuProcess(multiprocessing.Process):
    def __init__(self, niters):
        super().__init__()
        self.niters = niters
        self.result = 1
    def run(self):
        self.result = cpu_func(self.result, self.niters)

class IoThread(threading.Thread):
    def __init__(self, sleep):
        super().__init__()
        self.sleep = sleep
        self.result = self.sleep
    def run(self):
        time.sleep(self.sleep)

class IoProcess(multiprocessing.Process):
    def __init__(self, sleep):
        super().__init__()
        self.sleep = sleep
        self.result = self.sleep
    def run(self):
        time.sleep(self.sleep)

if __name__ == '__main__':
    cpu_n_iters = int(sys.argv[1])
    sleep = 1
    cpu_count = multiprocessing.cpu_count()
    input_params = [
        (CpuThread, cpu_n_iters),
        (CpuProcess, cpu_n_iters),
        (IoThread, sleep),
        (IoProcess, sleep),
    ]
    header = ['nthreads']
    for thread_class, _ in input_params:
        header.append(thread_class.__name__)
    print(' '.join(header))
    for nthreads in range(1, 2 * cpu_count):
        results = [nthreads]
        for thread_class, work_size in input_params:
            start_time = time.time()
            threads = []
            for i in range(nthreads):
                thread = thread_class(work_size)
                threads.append(thread)
                thread.start()
            for i, thread in enumerate(threads):
                thread.join()
            results.append(time.time() - start_time)
        print(' '.join('{:.6e}'.format(result) for result in results))

GitHub upstream + plotting code on same directory.

Tested on Ubuntu 18.10, Python 3.6.7, in a Lenovo ThinkPad P51 laptop with CPU: Intel Core i7-7820HQ CPU (4 cores / 8 threads), RAM: 2x Samsung M471A2K43BB1-CRC (2x 16GiB), SSD: Samsung MZVLB512HAJQ-000L7 (3,000 MB/s).

Visualize which threads are running at a given time

This post https://rohanvarma.me/GIL/ taught me that you can run a callback whenever a thread is scheduled with the target= argument of threading.Thread and the same for multiprocessing.Process.

This allows us to view exactly which thread runs at each time. When this is done, we would see something like (I made this particular graph up):

            +--------------------------------------+
            + Active threads / processes           +
+-----------+--------------------------------------+
|Thread   1 |********     ************             |
|         2 |        *****            *************|
+-----------+--------------------------------------+
|Process  1 |***  ************** ******  ****      |
|         2 |** **** ****** ** ********* **********|
+-----------+--------------------------------------+
            + Time -->                             +
            +--------------------------------------+

which would show that:

  • threads are fully serialized by the GIL
  • processes can run in parallel

回答 4

这是python 2.6.x的一些性能数据,这些数据使人们质疑在IO绑定的情况下线程比多处理的性能更高。这些结果来自40个处理器的IBM System x3650 M4 BD。

IO绑定处理:进程池的性能优于线程池

>>> do_work(50, 300, 'thread','fileio')
do_work function took 455.752 ms

>>> do_work(50, 300, 'process','fileio')
do_work function took 319.279 ms

CPU限制处理:进程池的性能优于线程池

>>> do_work(50, 2000, 'thread','square')
do_work function took 338.309 ms

>>> do_work(50, 2000, 'process','square')
do_work function took 287.488 ms

这些不是严格的测试,但是它们告诉我,与线程处理相比,多处理并非完全没有表现。

交互式python控制台中用于上述测试的代码

from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
import time
import sys
import os
from glob import glob

text_for_test = str(range(1,100000))

def fileio(i):
 try :
  os.remove(glob('./test/test-*'))
 except : 
  pass
 f=open('./test/test-'+str(i),'a')
 f.write(text_for_test)
 f.close()
 f=open('./test/test-'+str(i),'r')
 text = f.read()
 f.close()


def square(i):
 return i*i

def timing(f):
 def wrap(*args):
  time1 = time.time()
  ret = f(*args)
  time2 = time.time()
  print '%s function took %0.3f ms' % (f.func_name, (time2-time1)*1000.0)
  return ret
 return wrap

result = None

@timing
def do_work(process_count, items, process_type, method) :
 pool = None
 if process_type == 'process' :
  pool = Pool(processes=process_count)
 else :
  pool = ThreadPool(processes=process_count)
 if method == 'square' : 
  multiple_results = [pool.apply_async(square,(a,)) for a in range(1,items)]
  result = [res.get()  for res in multiple_results]
 else :
  multiple_results = [pool.apply_async(fileio,(a,)) for a in range(1,items)]
  result = [res.get()  for res in multiple_results]


do_work(50, 300, 'thread','fileio')
do_work(50, 300, 'process','fileio')

do_work(50, 2000, 'thread','square')
do_work(50, 2000, 'process','square')

Here’s some performance data for python 2.6.x that calls to question the notion that threading is more performant that multiprocessing in IO-bound scenarios. These results are from a 40-processor IBM System x3650 M4 BD.

IO-Bound Processing : Process Pool performed better than Thread Pool

>>> do_work(50, 300, 'thread','fileio')
do_work function took 455.752 ms

>>> do_work(50, 300, 'process','fileio')
do_work function took 319.279 ms

CPU-Bound Processing : Process Pool performed better than Thread Pool

>>> do_work(50, 2000, 'thread','square')
do_work function took 338.309 ms

>>> do_work(50, 2000, 'process','square')
do_work function took 287.488 ms

These aren’t rigorous tests, but they tell me that multiprocessing isn’t entirely unperformant in comparison to threading.

Code used in the interactive python console for the above tests

from multiprocessing import Pool
from multiprocessing.pool import ThreadPool
import time
import sys
import os
from glob import glob

text_for_test = str(range(1,100000))

def fileio(i):
 try :
  os.remove(glob('./test/test-*'))
 except : 
  pass
 f=open('./test/test-'+str(i),'a')
 f.write(text_for_test)
 f.close()
 f=open('./test/test-'+str(i),'r')
 text = f.read()
 f.close()


def square(i):
 return i*i

def timing(f):
 def wrap(*args):
  time1 = time.time()
  ret = f(*args)
  time2 = time.time()
  print '%s function took %0.3f ms' % (f.func_name, (time2-time1)*1000.0)
  return ret
 return wrap

result = None

@timing
def do_work(process_count, items, process_type, method) :
 pool = None
 if process_type == 'process' :
  pool = Pool(processes=process_count)
 else :
  pool = ThreadPool(processes=process_count)
 if method == 'square' : 
  multiple_results = [pool.apply_async(square,(a,)) for a in range(1,items)]
  result = [res.get()  for res in multiple_results]
 else :
  multiple_results = [pool.apply_async(fileio,(a,)) for a in range(1,items)]
  result = [res.get()  for res in multiple_results]


do_work(50, 300, 'thread','fileio')
do_work(50, 300, 'process','fileio')

do_work(50, 2000, 'thread','square')
do_work(50, 2000, 'process','square')

回答 5

好吧,朱利奥·佛朗哥(Giulio Franco)回答了大多数问题。我将进一步阐述消费者-生产者问题,我想这将使您走上使用多线程应用程序的解决方案的正确轨道。

fill_count = Semaphore(0) # items produced
empty_count = Semaphore(BUFFER_SIZE) # remaining space
buffer = Buffer()

def producer(fill_count, empty_count, buffer):
    while True:
        item = produceItem()
        empty_count.down();
        buffer.push(item)
        fill_count.up()

def consumer(fill_count, empty_count, buffer):
    while True:
        fill_count.down()
        item = buffer.pop()
        empty_count.up()
        consume_item(item)

您可以从以下网站阅读有关同步原语的更多信息:

 http://linux.die.net/man/7/sem_overview
 http://docs.python.org/2/library/threading.html

伪代码在上面。我想您应该搜索生产者-消费者问题以获取更多参考。

Well, most of the question is answered by Giulio Franco. I will further elaborate on the consumer-producer problem, which I suppose will put you on the right track for your solution to using a multithreaded app.

fill_count = Semaphore(0) # items produced
empty_count = Semaphore(BUFFER_SIZE) # remaining space
buffer = Buffer()

def producer(fill_count, empty_count, buffer):
    while True:
        item = produceItem()
        empty_count.down();
        buffer.push(item)
        fill_count.up()

def consumer(fill_count, empty_count, buffer):
    while True:
        fill_count.down()
        item = buffer.pop()
        empty_count.up()
        consume_item(item)

You could read more on the synchronization primitives from:

 http://linux.die.net/man/7/sem_overview
 http://docs.python.org/2/library/threading.html

The pseudocode is above. I suppose you should search the producer-consumer-problem to get more references.


如何并行化一个简单的Python循环?

问题:如何并行化一个简单的Python循环?

这可能是一个琐碎的问题,但是如何在python中并行化以下循环?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

我知道如何在Python中启动单线程,但我不知道如何“收集”结果。

多个过程也可以-在这种情况下最简单的方法。我目前使用的是Linux,但代码也应同时在Windows和Mac上运行。

并行化此代码的最简单方法是什么?

This is probably a trivial question, but how do I parallelize the following loop in python?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

I know how to start single threads in Python but I don’t know how to “collect” the results.

Multiple processes would be fine too – whatever is easiest for this case. I’m using currently Linux but the code should run on Windows and Mac as-well.

What’s the easiest way to parallelize this code?


回答 0

由于全局解释器锁(GIL),在CPython上使用多个线程不会为纯Python代码带来更好的性能。我建议改用multiprocessing模块:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

请注意,这在交互式解释器中不起作用。

为了避免在GIL周围出现常见的FUD:对于本示例,无论如何都使用线程没有任何好处。您在这里使用进程而不是线程,因为它们避免了很多问题。

Using multiple threads on CPython won’t give you better performance for pure-Python code due to the global interpreter lock (GIL). I suggest using the multiprocessing module instead:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Note that this won’t work in the interactive interpreter.

To avoid the usual FUD around the GIL: There wouldn’t be any advantage to using threads for this example anyway. You want to use processes here, not threads, because they avoid a whole bunch of problems.


回答 1

为了并行化一个简单的for循环,joblib为多处理的原始使用带来了很多价值。不仅简短的语法,而且还包括诸如非常快速的迭代透明捆绑(以消除开销)或捕获子进程的回溯之类的东西,以获得更好的错误报告。

免责声明:我是joblib的原始作者。

To parallelize a simple for loop, joblib brings a lot of value to raw use of multiprocessing. Not only the short syntax, but also things like transparent bunching of iterations when they are very fast (to remove the overhead) or capturing of the traceback of the child process, to have better error reporting.

Disclaimer: I am the original author of joblib.


回答 2

并行化此代码的最简单方法是什么?

我真的很喜欢concurrent.futures这一点,它从3.2版开始在Python3中可用-并通过回传到PyPi上的2.6和2.7 。

您可以使用线程或进程,并使用完全相同的接口。

多处理

将其放在文件中-futuretest.py:

import concurrent.futures
import time, random               # add some random sleep time

offset = 2                        # you don't supply these so
def calc_stuff(parameter=None):   # these are examples.
    sleep_time = random.choice([0, 1, 2, 3, 4, 5])
    time.sleep(sleep_time)
    return parameter / 2, sleep_time, parameter * parameter

def procedure(j):                 # just factoring out the
    parameter = j * offset        # procedure
    # call the calculation
    return calc_stuff(parameter=parameter)

def main():
    output1 = list()
    output2 = list()
    output3 = list()
    start = time.time()           # let's see how long this takes

    # we can swap out ProcessPoolExecutor for ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for out1, out2, out3 in executor.map(procedure, range(0, 10)):
            # put results into correct output list
            output1.append(out1)
            output2.append(out2)
            output3.append(out3)
    finish = time.time()
    # these kinds of format strings are only available on Python 3.6:
    # time to upgrade!
    print(f'original inputs: {repr(output1)}')
    print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
    print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
    print(f'returned in order given: {repr(output3)}')

if __name__ == '__main__':
    main()

这是输出:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

多线程

现在更改ProcessPoolExecutorThreadPoolExecutor,然后再次运行该模块:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

现在,您已经完成了多线程和多处理!

注意性能,并一起使用。

采样量太小,无法比较结果。

但是,我怀疑多线程通常比多处理要快,尤其是在Windows上,因为Windows不支持分支,因此每个新进程都需要花费一些时间才能启动。在Linux或Mac上,它们可能会更接近。

您可以在多个进程中嵌套多个线程,但是建议不要使用多个线程来剥离多个进程。

What’s the easiest way to parallelize this code?

I really like concurrent.futures for this, available in Python3 since version 3.2 – and via backport to 2.6 and 2.7 on PyPi.

You can use threads or processes and use the exact same interface.

Multiprocessing

Put this in a file – futuretest.py:

import concurrent.futures
import time, random               # add some random sleep time

offset = 2                        # you don't supply these so
def calc_stuff(parameter=None):   # these are examples.
    sleep_time = random.choice([0, 1, 2, 3, 4, 5])
    time.sleep(sleep_time)
    return parameter / 2, sleep_time, parameter * parameter

def procedure(j):                 # just factoring out the
    parameter = j * offset        # procedure
    # call the calculation
    return calc_stuff(parameter=parameter)

def main():
    output1 = list()
    output2 = list()
    output3 = list()
    start = time.time()           # let's see how long this takes

    # we can swap out ProcessPoolExecutor for ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for out1, out2, out3 in executor.map(procedure, range(0, 10)):
            # put results into correct output list
            output1.append(out1)
            output2.append(out2)
            output3.append(out3)
    finish = time.time()
    # these kinds of format strings are only available on Python 3.6:
    # time to upgrade!
    print(f'original inputs: {repr(output1)}')
    print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
    print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
    print(f'returned in order given: {repr(output3)}')

if __name__ == '__main__':
    main()

And here’s the output:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Multithreading

Now change ProcessPoolExecutor to ThreadPoolExecutor, and run the module again:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Now you have done both multithreading and multiprocessing!

Note on performance and using both together.

Sampling is far too small to compare the results.

However, I suspect that multithreading will be faster than multiprocessing in general, especially on Windows, since Windows doesn’t support forking so each new process has to take time to launch. On Linux or Mac they’ll probably be closer.

You can nest multiple threads inside multiple processes, but it’s recommended to not use multiple threads to spin off multiple processes.


回答 3

from joblib import Parallel, delayed
import multiprocessing

inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)

上面的代码在我的机器上很漂亮(Ubuntu,软件包joblib已预安装,但可以通过安装pip install joblib)。

取自https://blog.dominodatalab.com/simple-parallelization/

from joblib import Parallel, delayed
import multiprocessing

inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)

The above works beautifully on my machine (Ubuntu, package joblib was pre-installed, but can be installed via pip install joblib).

Taken from https://blog.dominodatalab.com/simple-parallelization/


回答 4

使用Ray有许多优点:

  • 除了多个内核(具有相同的代码)之外,您还可以并行处理多台计算机。
  • 通过共享内存有效地处理数字数据(以及零拷贝序列化)。
  • 具有分布式调度的高任务吞吐量。
  • 容错能力。

就您而言,您可以启动Ray并定义一个远程功能

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

然后并行调用

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

为了在集群上运行相同的示例,唯一会改变的行是对ray.init()的调用。相关文档可在此处找到。

请注意,我正在帮助开发Ray。

There are a number of advantages to using Ray:

  • You can parallelize over multiple machines in addition to multiple cores (with the same code).
  • Efficient handling of numerical data through shared memory (and zero-copy serialization).
  • High task throughput with distributed scheduling.
  • Fault tolerance.

In your case, you could start Ray and define a remote function

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

and then invoke it in parallel

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

To run the same example on a cluster, the only line that would change would be the call to ray.init(). The relevant documentation can be found here.

Note that I’m helping to develop Ray.


回答 5

这是最简单的方法!

您可以使用asyncio。(可在此处找到文档)。它用作多个Python异步框架的基础,这些框架提供了高性能的网络和Web服务器,数据库连接库,分布式任务队列等。此外,它还具有高级和低级API来解决任何类型的问题。 。

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

现在,此函数将在每次调用时并行运行,而不会使主程序进入等待状态。您也可以使用它并行化循环。当调用for循环时,尽管循环是顺序的,但是每次迭代都在解释器到达主程序后与主程序并行运行。 例如:

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))


for i in range(10):
    your_function(i)


print('loop finished')

这将产生以下输出:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1

This is the easiest way to do it!

You can use asyncio. (Documentation can be found here). It is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc. Plus it has both high-level and low-level APIs to accomodate any kind of problem.

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

Now this function will be run in parallel whenever called without putting main program into wait state. You can use it to parallelize for loop as well. When called for a for loop, though loop is sequential but every iteration runs in parallel to the main program as soon as interpreter gets there. For instance:

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))


for i in range(10):
    your_function(i)


print('loop finished')

This produces following output:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1

回答 6

您为什么不使用线程和一个互斥锁来保护一个全局列表?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

请记住,您将与最慢的线程一样快

why dont you use threads, and one mutex to protect one global list?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

keep in mind, you will be as fast as your slowest thread


回答 7

我发现joblib对我非常有用。请参见以下示例:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

n_jobs = -1:使用所有可用的内核

I found joblib is very useful with me. Please see following example:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

n_jobs=-1: use all available cores


回答 8

假设我们有一个异步函数

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

这需要在大型阵列上运行。一些属性被传递给程序,一些属性从数组中的dictionary元素的属性使用。

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))

Let’s say we have an async function

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

That needs to be run on a large array. Some attributes are being passed to the program and some are used from property of dictionary element in the array.

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))

回答 9

看看这个;

http://docs.python.org/library/queue.html

这可能不是正确的方法,但我会做类似的事情;

实际代码;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

希望能有所帮助。

Have a look at this;

http://docs.python.org/library/queue.html

This might not be the right way to do it, but I’d do something like;

Actual code;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

Hope that helps.


回答 10

当在Python中实现多处理和并行/分布式计算时,这可能很有用。

YouTube关于使用Techila软件包的教程

Techila是一种分布式计算中间件,它使用techila软件包直接与Python集成。包中的peach函数可用于并行化循环结构。(以下代码段来自Techila社区论坛

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )

This could be useful when implementing multiprocessing and parallel/ distributed computing in Python.

YouTube tutorial on using techila package

Techila is a distributed computing middleware, which integrates directly with Python using the techila package. The peach function in the package can be useful in parallelizing loop structures. (Following code snippet is from the Techila Community Forums)

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )

回答 11

谢谢@iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count


def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'

thanks @iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count


def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'

回答 12

并行处理的一个非常简单的例子是

from multiprocessing import Process

output1 = list()
output2 = list()
output3 = list()

def yourfunction():
    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter=parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)

if __name__ == '__main__':
    p = Process(target=pa.yourfunction, args=('bob',))
    p.start()
    p.join()

very simple example of parallel processing is

from multiprocessing import Process

output1 = list()
output2 = list()
output3 = list()

def yourfunction():
    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter=parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)

if __name__ == '__main__':
    p = Process(target=pa.yourfunction, args=('bob',))
    p.start()
    p.join()