问题:如何在Python中进行并行编程?
对于C ++,我们可以使用OpenMP进行并行编程。但是,OpenMP不适用于Python。如果要并行处理python程序的某些部分,该怎么办?
该代码的结构可以认为是:
solve1(A)
solve2(B)
其中solve1
和solve2
是两个独立的功能。为了减少运行时间,如何并行而不是按顺序运行这种代码?希望可以有人帮帮我。首先十分感谢。代码是:
def solve(Q, G, n):
i = 0
tol = 10 ** -4
while i < 1000:
inneropt, partition, x = setinner(Q, G, n)
outeropt = setouter(Q, G, n)
if (outeropt - inneropt) / (1 + abs(outeropt) + abs(inneropt)) < tol:
break
node1 = partition[0]
node2 = partition[1]
G = updateGraph(G, node1, node2)
if i == 999:
print "Maximum iteration reaches"
print inneropt
setinner和setouter是两个独立的功能。那是我要平行的地方…
回答 0
您可以使用多处理模块。对于这种情况,我可以使用一个处理池:
from multiprocessing import Pool
pool = Pool()
result1 = pool.apply_async(solve1, [A]) # evaluate "solve1(A)" asynchronously
result2 = pool.apply_async(solve2, [B]) # evaluate "solve2(B)" asynchronously
answer1 = result1.get(timeout=10)
answer2 = result2.get(timeout=10)
这将产生可以为您完成常规工作的流程。由于未通过processes
,它将为您计算机上的每个CPU内核生成一个进程。每个CPU内核可以同时执行一个进程。
如果要将列表映射到单个功能,请执行以下操作:
args = [A, B]
results = pool.map(solve1, args)
不要使用线程,因为GIL会锁定对python对象的所有操作。
回答 1
使用Ray可以非常优雅地完成此操作。
要并行处理示例,您需要使用@ray.remote
装饰器定义函数,然后使用调用它们.remote
。
import ray
ray.init()
# Define the functions.
@ray.remote
def solve1(a):
return 1
@ray.remote
def solve2(b):
return 2
# Start two tasks in the background.
x_id = solve1.remote(0)
y_id = solve2.remote(1)
# Block until the tasks are done and get the results.
x, y = ray.get([x_id, y_id])
与多处理模块相比,它有许多优点。
- 相同的代码将在多核计算机以及计算机集群上运行。
- 进程通过共享内存和零拷贝序列化有效地共享数据。
- 错误消息可以很好地传播。
这些函数调用可以组合在一起,例如,
@ray.remote def f(x): return x + 1 x_id = f.remote(1) y_id = f.remote(x_id) z_id = f.remote(y_id) ray.get(z_id) # returns 4
- 除了远程调用功能外,类还可以作为actor远程实例化。
请注意,Ray是我一直在帮助开发的框架。
回答 2
CPython使用Global Interpreter Lock,这使并行编程比C ++更加有趣
本主题提供了一些有用的示例和挑战说明:
在Linux上使用Taskset在多核系统上使用Python Global Interpreter Lock(GIL)解决方法?
回答 3
正如其他人所说,解决方案是使用多个过程。但是,哪种框架更合适取决于许多因素。除了已经提到的那些,还有charm4py和mpi4py(我是charm4py的开发人员)。
与使用工作池抽象相比,有一种更有效的方法来实现上述示例。G
在1000次迭代中,主循环不断向工作人员发送相同的参数(包括完整的图)。由于至少一个工作进程将驻留在不同的进程上,因此这涉及将参数复制并发送到其他进程。根据对象的大小,这可能会非常昂贵。相反,让工作人员存储状态并仅发送更新的信息是有意义的。
例如,在charm4py中,可以这样进行:
class Worker(Chare):
def __init__(self, Q, G, n):
self.G = G
...
def setinner(self, node1, node2):
self.updateGraph(node1, node2)
...
def solve(Q, G, n):
# create 2 workers, each on a different process, passing the initial state
worker_a = Chare(Worker, onPE=0, args=[Q, G, n])
worker_b = Chare(Worker, onPE=1, args=[Q, G, n])
while i < 1000:
result_a = worker_a.setinner(node1, node2, ret=True) # execute setinner on worker A
result_b = worker_b.setouter(node1, node2, ret=True) # execute setouter on worker B
inneropt, partition, x = result_a.get() # wait for result from worker A
outeropt = result_b.get() # wait for result from worker B
...
请注意,在此示例中,我们实际上只需要一个工人。主循环可以执行其中一个功能,而工人可以执行另一个功能。但是我的代码有助于说明以下几点:
- 工作程序A在进程0中运行(与主循环相同)。在
result_a.get()
阻塞等待结果的同时,工作者A在同一过程中进行计算。 - 参数通过引用自动传递给工作程序A,因为它处于同一过程中(不涉及复制)。
回答 4
在某些情况下,可以使用Numba自动并行化循环,尽管它仅适用于一小部分Python:
from numba import njit, prange
@njit(parallel=True)
def prange_test(A):
s = 0
# Without "parallel=True" in the jit-decorator
# the prange statement is equivalent to range
for i in prange(A.shape[0]):
s += A[i]
return s
不幸的是,似乎Numba仅适用于Numpy数组,不适用于其他Python对象。从理论上讲,也可以将Python编译为C ++,然后使用Intel C ++编译器自动并行化它,尽管我还没有尝试过。
回答 5
您可以使用joblib
库进行并行计算和多处理。
from joblib import Parallel, delayed
您可以简单地创建foo
要并行运行的函数,并基于以下代码实现并行处理:
output = Parallel(n_jobs=num_cores)(delayed(foo)(i) for i in input)
在哪里num_cores
可以从以下地址获得multiprocessing
库如下:
import multiprocessing
num_cores = multiprocessing.cpu_count()
如果您的函数具有多个输入参数,并且只想通过列表对一个参数进行迭代,则可以按以下方式使用库中的partial
函数functools
:
from joblib import Parallel, delayed
import multiprocessing
from functools import partial
def foo(arg1, arg2, arg3, arg4):
'''
body of the function
'''
return output
input = [11,32,44,55,23,0,100,...] # arbitrary list
num_cores = multiprocessing.cpu_count()
foo_ = partial(foo, arg2=arg2, arg3=arg3, arg4=arg4)
# arg1 is being fetched from input list
output = Parallel(n_jobs=num_cores)(delayed(foo_)(i) for i in input)
您可以在此处找到一些有关python和R多重处理的完整说明。