如何并行化一个简单的Python循环?

问题:如何并行化一个简单的Python循环?

这可能是一个琐碎的问题,但是如何在python中并行化以下循环?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

我知道如何在Python中启动单线程,但我不知道如何“收集”结果。

多个过程也可以-在这种情况下最简单的方法。我目前使用的是Linux,但代码也应同时在Windows和Mac上运行。

并行化此代码的最简单方法是什么?

This is probably a trivial question, but how do I parallelize the following loop in python?

# setup output lists
output1 = list()
output2 = list()
output3 = list()

for j in range(0, 10):
    # calc individual parameter value
    parameter = j * offset
    # call the calculation
    out1, out2, out3 = calc_stuff(parameter = parameter)

    # put results into correct output list
    output1.append(out1)
    output2.append(out2)
    output3.append(out3)

I know how to start single threads in Python but I don’t know how to “collect” the results.

Multiple processes would be fine too – whatever is easiest for this case. I’m using currently Linux but the code should run on Windows and Mac as-well.

What’s the easiest way to parallelize this code?


回答 0

由于全局解释器锁(GIL),在CPython上使用多个线程不会为纯Python代码带来更好的性能。我建议改用multiprocessing模块:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

请注意,这在交互式解释器中不起作用。

为了避免在GIL周围出现常见的FUD:对于本示例,无论如何都使用线程没有任何好处。您在这里使用进程而不是线程,因为它们避免了很多问题。

Using multiple threads on CPython won’t give you better performance for pure-Python code due to the global interpreter lock (GIL). I suggest using the multiprocessing module instead:

pool = multiprocessing.Pool(4)
out1, out2, out3 = zip(*pool.map(calc_stuff, range(0, 10 * offset, offset)))

Note that this won’t work in the interactive interpreter.

To avoid the usual FUD around the GIL: There wouldn’t be any advantage to using threads for this example anyway. You want to use processes here, not threads, because they avoid a whole bunch of problems.


回答 1

为了并行化一个简单的for循环,joblib为多处理的原始使用带来了很多价值。不仅简短的语法,而且还包括诸如非常快速的迭代透明捆绑(以消除开销)或捕获子进程的回溯之类的东西,以获得更好的错误报告。

免责声明:我是joblib的原始作者。

To parallelize a simple for loop, joblib brings a lot of value to raw use of multiprocessing. Not only the short syntax, but also things like transparent bunching of iterations when they are very fast (to remove the overhead) or capturing of the traceback of the child process, to have better error reporting.

Disclaimer: I am the original author of joblib.


回答 2

并行化此代码的最简单方法是什么?

我真的很喜欢concurrent.futures这一点,它从3.2版开始在Python3中可用-并通过回传到PyPi上的2.6和2.7 。

您可以使用线程或进程,并使用完全相同的接口。

多处理

将其放在文件中-futuretest.py:

import concurrent.futures
import time, random               # add some random sleep time

offset = 2                        # you don't supply these so
def calc_stuff(parameter=None):   # these are examples.
    sleep_time = random.choice([0, 1, 2, 3, 4, 5])
    time.sleep(sleep_time)
    return parameter / 2, sleep_time, parameter * parameter

def procedure(j):                 # just factoring out the
    parameter = j * offset        # procedure
    # call the calculation
    return calc_stuff(parameter=parameter)

def main():
    output1 = list()
    output2 = list()
    output3 = list()
    start = time.time()           # let's see how long this takes

    # we can swap out ProcessPoolExecutor for ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for out1, out2, out3 in executor.map(procedure, range(0, 10)):
            # put results into correct output list
            output1.append(out1)
            output2.append(out2)
            output3.append(out3)
    finish = time.time()
    # these kinds of format strings are only available on Python 3.6:
    # time to upgrade!
    print(f'original inputs: {repr(output1)}')
    print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
    print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
    print(f'returned in order given: {repr(output3)}')

if __name__ == '__main__':
    main()

这是输出:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

多线程

现在更改ProcessPoolExecutorThreadPoolExecutor,然后再次运行该模块:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

现在,您已经完成了多线程和多处理!

注意性能,并一起使用。

采样量太小,无法比较结果。

但是,我怀疑多线程通常比多处理要快,尤其是在Windows上,因为Windows不支持分支,因此每个新进程都需要花费一些时间才能启动。在Linux或Mac上,它们可能会更接近。

您可以在多个进程中嵌套多个线程,但是建议不要使用多个线程来剥离多个进程。

What’s the easiest way to parallelize this code?

I really like concurrent.futures for this, available in Python3 since version 3.2 – and via backport to 2.6 and 2.7 on PyPi.

You can use threads or processes and use the exact same interface.

Multiprocessing

Put this in a file – futuretest.py:

import concurrent.futures
import time, random               # add some random sleep time

offset = 2                        # you don't supply these so
def calc_stuff(parameter=None):   # these are examples.
    sleep_time = random.choice([0, 1, 2, 3, 4, 5])
    time.sleep(sleep_time)
    return parameter / 2, sleep_time, parameter * parameter

def procedure(j):                 # just factoring out the
    parameter = j * offset        # procedure
    # call the calculation
    return calc_stuff(parameter=parameter)

def main():
    output1 = list()
    output2 = list()
    output3 = list()
    start = time.time()           # let's see how long this takes

    # we can swap out ProcessPoolExecutor for ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for out1, out2, out3 in executor.map(procedure, range(0, 10)):
            # put results into correct output list
            output1.append(out1)
            output2.append(out2)
            output3.append(out3)
    finish = time.time()
    # these kinds of format strings are only available on Python 3.6:
    # time to upgrade!
    print(f'original inputs: {repr(output1)}')
    print(f'total time to execute {sum(output2)} = sum({repr(output2)})')
    print(f'time saved by parallelizing: {sum(output2) - (finish-start)}')
    print(f'returned in order given: {repr(output3)}')

if __name__ == '__main__':
    main()

And here’s the output:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 33 = sum([0, 3, 3, 4, 3, 5, 1, 5, 5, 4])
time saved by parallellizing: 27.68999981880188
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Multithreading

Now change ProcessPoolExecutor to ThreadPoolExecutor, and run the module again:

$ python3 -m futuretest
original inputs: [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
total time to execute 19 = sum([0, 2, 3, 5, 2, 0, 0, 3, 3, 1])
time saved by parallellizing: 13.992000102996826
returned in order given: [0, 4, 16, 36, 64, 100, 144, 196, 256, 324]

Now you have done both multithreading and multiprocessing!

Note on performance and using both together.

Sampling is far too small to compare the results.

However, I suspect that multithreading will be faster than multiprocessing in general, especially on Windows, since Windows doesn’t support forking so each new process has to take time to launch. On Linux or Mac they’ll probably be closer.

You can nest multiple threads inside multiple processes, but it’s recommended to not use multiple threads to spin off multiple processes.


回答 3

from joblib import Parallel, delayed
import multiprocessing

inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)

上面的代码在我的机器上很漂亮(Ubuntu,软件包joblib已预安装,但可以通过安装pip install joblib)。

取自https://blog.dominodatalab.com/simple-parallelization/

from joblib import Parallel, delayed
import multiprocessing

inputs = range(10) 
def processInput(i):
    return i * i

num_cores = multiprocessing.cpu_count()

results = Parallel(n_jobs=num_cores)(delayed(processInput)(i) for i in inputs)
print(results)

The above works beautifully on my machine (Ubuntu, package joblib was pre-installed, but can be installed via pip install joblib).

Taken from https://blog.dominodatalab.com/simple-parallelization/


回答 4

使用Ray有许多优点:

  • 除了多个内核(具有相同的代码)之外,您还可以并行处理多台计算机。
  • 通过共享内存有效地处理数字数据(以及零拷贝序列化)。
  • 具有分布式调度的高任务吞吐量。
  • 容错能力。

就您而言,您可以启动Ray并定义一个远程功能

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

然后并行调用

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

为了在集群上运行相同的示例,唯一会改变的行是对ray.init()的调用。相关文档可在此处找到。

请注意,我正在帮助开发Ray。

There are a number of advantages to using Ray:

  • You can parallelize over multiple machines in addition to multiple cores (with the same code).
  • Efficient handling of numerical data through shared memory (and zero-copy serialization).
  • High task throughput with distributed scheduling.
  • Fault tolerance.

In your case, you could start Ray and define a remote function

import ray

ray.init()

@ray.remote(num_return_vals=3)
def calc_stuff(parameter=None):
    # Do something.
    return 1, 2, 3

and then invoke it in parallel

output1, output2, output3 = [], [], []

# Launch the tasks.
for j in range(10):
    id1, id2, id3 = calc_stuff.remote(parameter=j)
    output1.append(id1)
    output2.append(id2)
    output3.append(id3)

# Block until the results have finished and get the results.
output1 = ray.get(output1)
output2 = ray.get(output2)
output3 = ray.get(output3)

To run the same example on a cluster, the only line that would change would be the call to ray.init(). The relevant documentation can be found here.

Note that I’m helping to develop Ray.


回答 5

这是最简单的方法!

您可以使用asyncio。(可在此处找到文档)。它用作多个Python异步框架的基础,这些框架提供了高性能的网络和Web服务器,数据库连接库,分布式任务队列等。此外,它还具有高级和低级API来解决任何类型的问题。 。

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

现在,此函数将在每次调用时并行运行,而不会使主程序进入等待状态。您也可以使用它并行化循环。当调用for循环时,尽管循环是顺序的,但是每次迭代都在解释器到达主程序后与主程序并行运行。 例如:

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))


for i in range(10):
    your_function(i)


print('loop finished')

这将产生以下输出:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1

This is the easiest way to do it!

You can use asyncio. (Documentation can be found here). It is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc. Plus it has both high-level and low-level APIs to accomodate any kind of problem.

import asyncio

def background(f):
    def wrapped(*args, **kwargs):
        return asyncio.get_event_loop().run_in_executor(None, f, *args, **kwargs)

    return wrapped

@background
def your_function(argument):
    #code

Now this function will be run in parallel whenever called without putting main program into wait state. You can use it to parallelize for loop as well. When called for a for loop, though loop is sequential but every iteration runs in parallel to the main program as soon as interpreter gets there. For instance:

@background
def your_function(argument):
    time.sleep(5)
    print('function finished for '+str(argument))


for i in range(10):
    your_function(i)


print('loop finished')

This produces following output:

loop finished
function finished for 4
function finished for 8
function finished for 0
function finished for 3
function finished for 6
function finished for 2
function finished for 5
function finished for 7
function finished for 9
function finished for 1

回答 6

您为什么不使用线程和一个互斥锁来保护一个全局列表?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

请记住,您将与最慢的线程一样快

why dont you use threads, and one mutex to protect one global list?

import os
import re
import time
import sys
import thread

from threading import Thread

class thread_it(Thread):
    def __init__ (self,param):
        Thread.__init__(self)
        self.param = param
    def run(self):
        mutex.acquire()
        output.append(calc_stuff(self.param))
        mutex.release()   


threads = []
output = []
mutex = thread.allocate_lock()

for j in range(0, 10):
    current = thread_it(j * offset)
    threads.append(current)
    current.start()

for t in threads:
    t.join()

#here you have output list filled with data

keep in mind, you will be as fast as your slowest thread


回答 7

我发现joblib对我非常有用。请参见以下示例:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

n_jobs = -1:使用所有可用的内核

I found joblib is very useful with me. Please see following example:

from joblib import Parallel, delayed
def yourfunction(k):   
    s=3.14*k*k
    print "Area of a circle with a radius ", k, " is:", s

element_run = Parallel(n_jobs=-1)(delayed(yourfunction)(k) for k in range(1,10))

n_jobs=-1: use all available cores


回答 8

假设我们有一个异步函数

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

这需要在大型阵列上运行。一些属性被传递给程序,一些属性从数组中的dictionary元素的属性使用。

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))

Let’s say we have an async function

async def work_async(self, student_name: str, code: str, loop):
"""
Some async function
"""
    # Do some async procesing    

That needs to be run on a large array. Some attributes are being passed to the program and some are used from property of dictionary element in the array.

async def process_students(self, student_name: str, loop):
    market = sys.argv[2]
    subjects = [...] #Some large array
    batchsize = 5
    for i in range(0, len(subjects), batchsize):
        batch = subjects[i:i+batchsize]
        await asyncio.gather(*(self.work_async(student_name,
                                           sub['Code'],
                                           loop)
                       for sub in batch))

回答 9

看看这个;

http://docs.python.org/library/queue.html

这可能不是正确的方法,但我会做类似的事情;

实际代码;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

希望能有所帮助。

Have a look at this;

http://docs.python.org/library/queue.html

This might not be the right way to do it, but I’d do something like;

Actual code;

from multiprocessing import Process, JoinableQueue as Queue 

class CustomWorker(Process):
    def __init__(self,workQueue, out1,out2,out3):
        Process.__init__(self)
        self.input=workQueue
        self.out1=out1
        self.out2=out2
        self.out3=out3
    def run(self):
            while True:
                try:
                    value = self.input.get()
                    #value modifier
                    temp1,temp2,temp3 = self.calc_stuff(value)
                    self.out1.put(temp1)
                    self.out2.put(temp2)
                    self.out3.put(temp3)
                    self.input.task_done()
                except Queue.Empty:
                    return
                   #Catch things better here
    def calc_stuff(self,param):
        out1 = param * 2
        out2 = param * 4
        out3 = param * 8
        return out1,out2,out3
def Main():
    inputQueue = Queue()
    for i in range(10):
        inputQueue.put(i)
    out1 = Queue()
    out2 = Queue()
    out3 = Queue()
    processes = []
    for x in range(2):
          p = CustomWorker(inputQueue,out1,out2,out3)
          p.daemon = True
          p.start()
          processes.append(p)
    inputQueue.join()
    while(not out1.empty()):
        print out1.get()
        print out2.get()
        print out3.get()
if __name__ == '__main__':
    Main()

Hope that helps.


回答 10

当在Python中实现多处理和并行/分布式计算时,这可能很有用。

YouTube关于使用Techila软件包的教程

Techila是一种分布式计算中间件,它使用techila软件包直接与Python集成。包中的peach函数可用于并行化循环结构。(以下代码段来自Techila社区论坛

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )

This could be useful when implementing multiprocessing and parallel/ distributed computing in Python.

YouTube tutorial on using techila package

Techila is a distributed computing middleware, which integrates directly with Python using the techila package. The peach function in the package can be useful in parallelizing loop structures. (Following code snippet is from the Techila Community Forums)

techila.peach(funcname = 'theheavyalgorithm', # Function that will be called on the compute nodes/ Workers
    files = 'theheavyalgorithm.py', # Python-file that will be sourced on Workers
    jobs = jobcount # Number of Jobs in the Project
    )

回答 11

谢谢@iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count


def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'

thanks @iuryxavier

from multiprocessing import Pool
from multiprocessing import cpu_count


def add_1(x):
    return x + 1

if __name__ == "__main__":
    pool = Pool(cpu_count())
    results = pool.map(add_1, range(10**12))
    pool.close()  # 'TERM'
    pool.join()   # 'KILL'

回答 12

并行处理的一个非常简单的例子是

from multiprocessing import Process

output1 = list()
output2 = list()
output3 = list()

def yourfunction():
    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter=parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)

if __name__ == '__main__':
    p = Process(target=pa.yourfunction, args=('bob',))
    p.start()
    p.join()

very simple example of parallel processing is

from multiprocessing import Process

output1 = list()
output2 = list()
output3 = list()

def yourfunction():
    for j in range(0, 10):
        # calc individual parameter value
        parameter = j * offset
        # call the calculation
        out1, out2, out3 = calc_stuff(parameter=parameter)

        # put results into correct output list
        output1.append(out1)
        output2.append(out2)
        output3.append(out3)

if __name__ == '__main__':
    p = Process(target=pa.yourfunction, args=('bob',))
    p.start()
    p.join()