问题:如何并行化一个简单的Python循环?
这可能是一个琐碎的问题,但是如何在python中并行化以下循环?
# setup output lists
output1 = list()
output2 = list()
output3 = list()
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter = parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
我知道如何在Python中启动单线程,但我不知道如何“收集”结果。
多个过程也可以-在这种情况下最简单的方法。我目前使用的是Linux,但代码也应同时在Windows和Mac上运行。
并行化此代码的最简单方法是什么?
回答 0
由于全局解释器锁(GIL),在CPython上使用多个线程不会为纯Python代码带来更好的性能。我建议改用multiprocessing
模块:
pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))
请注意,这在交互式解释器中不起作用。
为了避免在GIL周围出现常见的FUD:对于本示例,无论如何都使用线程没有任何好处。您想在这里使用进程而不是线程,因为它们避免了很多问题。
回答 1
为了并行化一个简单的for循环,joblib为多处理的原始使用带来了很多价值。不仅简短的语法,而且还包括诸如非常快速的迭代透明捆绑(以消除开销)或捕获子进程的回溯之类的东西,以获得更好的错误报告。
免责声明:我是joblib的原始作者。
回答 2
并行化此代码的最简单方法是什么?
我真的很喜欢concurrent.futures
这一点,它从3.2版开始在Python3中可用-并通过回传到PyPi上的2.6和2.7 。
您可以使用线程或进程,并使用完全相同的接口。
多处理
将其放在文件中-futuretest.py:
import concurrent.futures
import time, random # add some random sleep time
offset = 2 # you don't supply these so
def calc_stuff(parameter=None): # these are examples.
sleep_time = random.choice([0, 1, 2, 3, 4, 5])
time.sleep(sleep_time)
return parameter / 2, sleep_time, parameter * parameter
def procedure(j): # just factoring out the
parameter = j * offset # procedure
# call the calculation
return calc_stuff(parameter=parameter)
def main():
output1 = list()
output2 = list()
output3 = list()
start = time.time() # let's see how long this takes
# we can swap out ProcessPoolExecutor for ThreadPoolExecutor
with concurrent.futures.ProcessPoolExecutor() as executor:
for out1, out2, out3 in executor.map(procedure, range(0, 10)):
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
finish = time.time()
# these kinds of format strings are only available on Python 3.6:
# time to upgrade!
print(f'original inputs: {repr(output1)}')
print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
print(f'returned in order given: {repr(output3)}')
if __name__ == '__main__':
main()
这是输出:
$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
多线程
现在更改ProcessPoolExecutor
为ThreadPoolExecutor
,然后再次运行该模块:
$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]
现在,您已经完成了多线程和多处理!
注意性能,并一起使用。
采样量太小,无法比较结果。
但是,我怀疑多线程通常比多处理要快,尤其是在Windows上,因为Windows不支持分支,因此每个新进程都需要花费一些时间才能启动。在Linux或Mac上,它们可能会更接近。
您可以在多个进程中嵌套多个线程,但是建议不要使用多个线程来剥离多个进程。
回答 3
from joblib import Parallel, delayed
import multiprocessing
inputs = range(10)
def processInput(i):
return i * i
num_cores = multiprocessing.cpu_count()
results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)
上面的代码在我的机器上很漂亮(Ubuntu,软件包joblib已预安装,但可以通过安装pip install joblib
)。
回答 4
使用Ray有许多优点:
- 除了多个内核(具有相同的代码)之外,您还可以并行处理多台计算机。
- 通过共享内存有效地处理数字数据(以及零拷贝序列化)。
- 具有分布式调度的高任务吞吐量。
- 容错能力。
就您而言,您可以启动Ray并定义一个远程功能
import ray
ray.init()
@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
# Do something.
return 1, 2, 3
然后并行调用
output1, output2, output3 = [], [], []
# Launch the tasks.
for j in range(10):
id1, id2, id3 = calc_stuff.remote(parameter=j)
output1.append(id1)
output2.append(id2)
output3.append(id3)
# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)
为了在集群上运行相同的示例,唯一会改变的行是对ray.init()的调用。相关文档可在此处找到。
请注意,我正在帮助开发Ray。
回答 5
这是最简单的方法!
您可以使用asyncio。(可在此处找到文档)。它用作多个Python异步框架的基础,这些框架提供了高性能的网络和Web服务器,数据库连接库,分布式任务队列等。此外,它还具有高级和低级API来解决任何类型的问题。 。
import asyncio
def background(f):
def wrapped(*args, **kwargs):
return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)
return wrapped
@background
def your_function(argument):
#code
现在,此函数将在每次调用时并行运行,而不会使主程序进入等待状态。您也可以使用它并行化循环。当调用for循环时,尽管循环是顺序的,但是每次迭代都在解释器到达主程序后与主程序并行运行。 例如:
@background
def your_function(argument):
time.sleep(5)
print('function finished for '+str(argument))
for i in range(10):
your_function(i)
print('loop finished')
这将产生以下输出:
loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1
回答 6
您为什么不使用线程和一个互斥锁来保护一个全局列表?
import os
import re
import time
import sys
import thread
from threading import Thread
class thread_it(Thread):
def __init__ (self,param):
Thread.__init__(self)
self.param = param
def run(self):
mutex.acquire()
output.append(calc_stuff(self.param))
mutex.release()
threads = []
output = []
mutex = thread.allocate_lock()
for j in range(0, 10):
current = thread_it(j * offset)
threads.append(current)
current.start()
for t in threads:
t.join()
#here you have output list filled with data
请记住,您将与最慢的线程一样快
回答 7
我发现joblib
对我非常有用。请参见以下示例:
from joblib import Parallel, delayed
def yourfunction(k):
s=3.14*k*k
print "Area of a circle with a radius ", k, " is:", s
element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))
n_jobs = -1:使用所有可用的内核
回答 8
假设我们有一个异步函数
async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
# Do some async procesing
这需要在大型阵列上运行。一些属性被传递给程序,一些属性从数组中的dictionary元素的属性使用。
async def process_students(self, student_name: str, loop):
market = sys.argv[2]
subjects = [...] #Some large array
batchsize = 5
for i in range(0, len(subjects), batchsize):
batch = subjects[i:i+batchsize]
await asyncio.gather(*(self.work_async(student_name,
sub['Code'],
loop)
for sub in batch))
回答 9
看看这个;
http://docs.python.org/library/queue.html
这可能不是正确的方法,但我会做类似的事情;
实际代码;
from multiprocessing import Process, JoinableQueue as Queue
class CustomWorker(Process):
def __init__(self,workQueue, out1,out2,out3):
Process.__init__(self)
self.input=workQueue
self.out1=out1
self.out2=out2
self.out3=out3
def run(self):
while True:
try:
value = self.input.get()
#value modifier
temp1,temp2,temp3 = self.calc_stuff(value)
self.out1.put(temp1)
self.out2.put(temp2)
self.out3.put(temp3)
self.input.task_done()
except Queue.Empty:
return
#Catch things better here
def calc_stuff(self,param):
out1 = param * 2
out2 = param * 4
out3 = param * 8
return out1,out2,out3
def Main():
inputQueue = Queue()
for i in range(10):
inputQueue.put(i)
out1 = Queue()
out2 = Queue()
out3 = Queue()
processes = []
for x in range(2):
p = CustomWorker(inputQueue,out1,out2,out3)
p.daemon = True
p.start()
processes.append(p)
inputQueue.join()
while(not out1.empty()):
print out1.get()
print out2.get()
print out3.get()
if __name__ == '__main__':
Main()
希望能有所帮助。
回答 10
当在Python中实现多处理和并行/分布式计算时,这可能很有用。
Techila是一种分布式计算中间件,它使用techila软件包直接与Python集成。包中的peach函数可用于并行化循环结构。(以下代码段来自Techila社区论坛)
techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
jobs = jobcount # Number of Jobs in the Project
)
回答 11
谢谢@iuryxavier
from multiprocessing import Pool
from multiprocessing import cpu_count
def add_1(x):
return x + 1
if __name__ == "__main__":
pool = Pool(cpu_count())
results = pool.map(add_1, range(10**12))
pool.close() # 'TERM'
pool.join() # 'KILL'
回答 12
并行处理的一个非常简单的例子是
from multiprocessing import Process
output1 = list()
output2 = list()
output3 = list()
def yourfunction():
for j in range(0, 10):
# calc individual parameter value
parameter = j * offset
# call the calculation
out1, out2, out3 = calc_stuff(parameter=parameter)
# put results into correct output list
output1.append(out1)
output2.append(out2)
output3.append(out3)
if __name__ == '__main__':
p = Process(target=pa.yourfunction, args=('bob',))
p.start()
p.join()