如何在Python中进行并行编程?

问题:如何在Python中进行并行编程?

对于C ++,我们可以使用OpenMP进行并行编程。但是,OpenMP不适用于Python。如果要并行处理python程序的某些部分,该怎么办?

该代码的结构可以认为是:

solve1(A)
solve2(B)

其中solve1solve2是两个独立的功能。为了减少运行时间,如何并行而不是按顺序运行这种代码?希望可以有人帮帮我。首先十分感谢。代码是:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break

        node1 = partition[0]
        node2 = partition[1]

        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

setinner和setouter是两个独立的功能。那是我要平行的地方…

For C++, we can use OpenMP to do parallel programming; however, OpenMP will not work for Python. What should I do if I want to parallel some parts of my python program?

The structure of the code may be considered as:

solve1(A)
solve2(B)

Where solve1 and solve2 are two independent function. How to run this kind of code in parallel instead of in sequence in order to reduce the running time? The code is:

def solve(Q, G, n):
    i = 0
    tol = 10 ** -4

    while i < 1000:
        inneropt, partition, x = setinner(Q, G, n)
        outeropt = setouter(Q, G, n)

        if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
            break
            
        node1 = partition[0]
        node2 = partition[1]
    
        G = updateGraph(G, node1, node2)

        if i == 999:
            print "Maximum iteration reaches"
    print inneropt

Where setinner and setouter are two independent functions. That’s where I want to parallel…


回答 0

您可以使用多处理模块。对于这种情况,我可以使用一个处理池:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

这将产生可以为您完成常规工作的流程。由于未通过processes,它将为您计算机上的每个CPU内核生成一个进程。每个CPU内核可以同时执行一个进程。

如果要将列表映射到单个功能,请执行以下操作:

args = [A, B]
results = pool.map(solve1, args)

不要使用线程,因为GIL会锁定对python对象的所有操作。

You can use the multiprocessing module. For this case I might use a processing pool:

from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A])    # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B])    # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)

This will spawn processes that can do generic work for you. Since we did not pass processes, it will spawn one process for each CPU core on your machine. Each CPU core can execute one process simultaneously.

If you want to map a list to a single function you would do this:

args = [A, B]
results = pool.map(solve1, args)

Don’t use threads because the GIL locks any operations on python objects.


回答 1

使用Ray可以非常优雅地完成此操作。

要并行处理示例,您需要使用@ray.remote装饰器定义函数,然后使用调用它们.remote

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

多处理模块相比,它有许多优点。

  1. 相同的代码将在多核计算机以及计算机集群上运行。
  2. 进程通过共享内存和零拷贝序列化有效地共享数据。
  3. 错误消息可以很好地传播。
  4. 这些函数调用可以组合在一起,例如,

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
    
  5. 除了远程调用功能外,类还可以作为actor远程实例化。

请注意,Ray是我一直在帮助开发的框架。

This can be done very elegantly with Ray.

To parallelize your example, you’d need to define your functions with the @ray.remote decorator, and then invoke them with .remote.

import ray

ray.init()

# Define the functions.

@ray.remote
def solve1(a):
    return 1

@ray.remote
def solve2(b):
    return 2

# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)

# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])

There are a number of advantages of this over the multiprocessing module.

  1. The same code will run on a multicore machine as well as a cluster of machines.
  2. Processes share data efficiently through shared memory and zero-copy serialization.
  3. Error messages are propagated nicely.
  4. These function calls can be composed together, e.g.,

    @ray.remote
    def f(x):
        return x + 1
    
    x_id = f.remote(1)
    y_id = f.remote(x_id)
    z_id = f.remote(y_id)
    ray.get(z_id)  # returns 4
    
  5. In addition to invoking functions remotely, classes can be instantiated remotely as actors.

Note that Ray is a framework I’ve been helping develop.


回答 2

CPython使用Global Interpreter Lock,这使并行编程比C ++更加有趣

本主题提供了一些有用的示例和挑战说明:

在Linux上使用Taskset在多核系统上使用Python Global Interpreter Lock(GIL)解决方法?

CPython uses the Global Interpreter Lock which makes parallel programing a bit more interesting than C++

This topic has several useful examples and descriptions of the challenge:

Python Global Interpreter Lock (GIL) workaround on multi-core systems using taskset on Linux?


回答 3

正如其他人所说,解决方案是使用多个过程。但是,哪种框架更合适取决于许多因素。除了已经提到的那些,还有charm4pympi4py(我是charm4py的开发人员)。

与使用工作池抽象相比,有一种更有效的方法来实现上述示例。G在1000次迭代中,主循环不断向工作人员发送相同的参数(包括完整的图)。由于至少一个工作进程将驻留在不同的进程上,因此这涉及将参数复制并发送到其他进程。根据对象的大小,这可能会非常昂贵。相反,让工作人员存储状态并仅发送更新的信息是有意义的。

例如,在charm4py中,可以这样进行:

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

请注意,在此示例中,我们实际上只需要一个工人。主循环可以执行其中一个功能,而工人可以执行另一个功能。但是我的代码有助于说明以下几点:

  1. 工作程序A在进程0中运行(与主循环相同)。在result_a.get()阻塞等待结果的同时,工作者A在同一过程中进行计算。
  2. 参数通过引用自动传递给工作程序A,因为它处于同一过程中(不涉及复制)。

The solution, as others have said, is to use multiple processes. Which framework is more appropriate, however, depends on many factors. In addition to the ones already mentioned, there is also charm4py and mpi4py (I am the developer of charm4py).

There is a more efficient way to implement the above example than using the worker pool abstraction. The main loop sends the same parameters (including the complete graph G) over and over to workers in each of the 1000 iterations. Since at least one worker will reside on a different process, this involves copying and sending the arguments to the other process(es). This could be very costly depending on the size of the objects. Instead, it makes sense to have workers store state and simply send the updated information.

For example, in charm4py this can be done like this:

class Worker(Chare):

    def __init__(self, Q, G, n):
        self.G = G
        ...

    def setinner(self, node1, node2):
        self.updateGraph(node1, node2)
        ...


def solve(Q, G, n):
    # create 2 workers, each on a different process, passing the initial state
    worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
    worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
    while i < 1000:
        result_a = worker_a.setinner(node1, node2, ret=True)  # execute setinner on worker A
        result_b = worker_b.setouter(node1, node2, ret=True)  # execute setouter on worker B

        inneropt, partition, x = result_a.get()  # wait for result from worker A
        outeropt = result_b.get()  # wait for result from worker B
        ...

Note that for this example we really only need one worker. The main loop could execute one of the functions, and have the worker execute the other. But my code helps to illustrate a couple of things:

  1. Worker A runs in process 0 (same as the main loop). While result_a.get() is blocked waiting on the result, worker A does the computation in the same process.
  2. Arguments are automatically passed by reference to worker A, since it is in the same process (there is no copying involved).

回答 4

在某些情况下,可以使用Numba自动并行化循环,尽管它仅适用于一小部分Python:

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

不幸的是,似乎Numba仅适用于Numpy数组,不适用于其他Python对象。从理论上讲,也可以将Python编译为C ++,然后使用Intel C ++编译器自动并行化它,尽管我还没有尝试过。

In some cases, it’s possible to automatically parallelize loops using Numba, though it only works with a small subset of Python:

from numba import njit, prange

@njit(parallel=True)
def prange_test(A):
    s = 0
    # Without "parallel=True" in the jit-decorator
    # the prange statement is equivalent to range
    for i in prange(A.shape[0]):
        s += A[i]
    return s

Unfortunately, it seems that Numba only works with Numpy arrays, but not with other Python objects. In theory, it might also be possible to compile Python to C++ and then automatically parallelize it using the Intel C++ compiler, though I haven’t tried this yet.


回答 5

您可以使用joblib库进行并行计算和多处理。

from joblib import Parallel, delayed

您可以简单地创建foo要并行运行的函数,并基于以下代码实现并行处理:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

在哪里num_cores可以从以下地址获得multiprocessing库如下:

import multiprocessing

num_cores = multiprocessing.cpu_count()

如果您的函数具有多个输入参数,并且只想通过列表对一个参数进行迭代,则可以按以下方式使用库中的partial函数functools

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

您可以在此处找到一些有关python和R多重处理的完整说明。

You can use joblib library to do parallel computation and multiprocessing.

from joblib import Parallel, delayed

You can simply create a function foo which you want to be run in parallel and based on the following piece of code implement parallel processing:

output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)

Where num_cores can be obtained from multiprocessing library as followed:

import multiprocessing

num_cores = multiprocessing.cpu_count()

If you have a function with more than one input argument, and you just want to iterate over one of the arguments by a list, you can use the the partial function from functools library as follow:

from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
    '''
    body of the function
    '''
    return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)

You can find a complete explanation of the python and R multiprocessing with couple of examples here.