问题:Python:如何并行运行python函数?

我先研究了一下,却找不到答案。我试图在Python中并行运行多个函数。

我有这样的事情:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

我想调用func1和func2并使它们同时运行。这些功能彼此之间或在同一对象上不相互作用。现在,我必须等待func1完成才能启动func2。我该如何执行以下操作:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

我希望能够几乎同时创建两个目录,因为我每分钟都在统计要创建多少个文件。如果该目录不存在,将会拖延我的时间。

I researched first and couldn’t find an answer to my question. I am trying to run multiple functions in parallel in Python.

I have something like this:

files.py

import common #common is a util class that handles all the IO stuff

dir1 = 'C:\folder1'
dir2 = 'C:\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

def func1():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir1)
       c.getFiles(dir1)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir1)
       c.getFiles(dir1)

def func2():
   c = common.Common()
   for i in range(len(addFiles)):
       c.createFiles(addFiles[i], filename, dir2)
       c.getFiles(dir2)
       time.sleep(10)
       c.removeFiles(addFiles[i], dir2)
       c.getFiles(dir2)

I want to call func1 and func2 and have them run at the same time. The functions do not interact with each other or on the same object. Right now I have to wait for func1 to finish before func2 to start. How do I do something like below:

process.py

from files import func1, func2

runBothFunc(func1(), func2())

I want to be able to create both directories pretty close to the same time because every min I am counting how many files are being created. If the directory isn’t there it will throw off my timing.


回答 0

您可以使用threadingmultiprocessing

由于CPython的特殊性threading不太可能实现真正的并行性。因此,multiprocessing通常是更好的选择。

这是一个完整的示例:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

可以按照以下方式轻松地将子进程的启动/联接机制封装为一个函数runBothFunc

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)

You could use threading or multiprocessing.

Due to peculiarities of CPython, threading is unlikely to achieve true parallelism. For this reason, multiprocessing is generally a better bet.

Here is a complete example:

from multiprocessing import Process

def func1():
  print 'func1: starting'
  for i in xrange(10000000): pass
  print 'func1: finishing'

def func2():
  print 'func2: starting'
  for i in xrange(10000000): pass
  print 'func2: finishing'

if __name__ == '__main__':
  p1 = Process(target=func1)
  p1.start()
  p2 = Process(target=func2)
  p2.start()
  p1.join()
  p2.join()

The mechanics of starting/joining child processes can easily be encapsulated into a function along the lines of your runBothFunc:

def runInParallel(*fns):
  proc = []
  for fn in fns:
    p = Process(target=fn)
    p.start()
    proc.append(p)
  for p in proc:
    p.join()

runInParallel(func1, func2)

回答 1

可以使用Ray优雅地完成此任务,该系统使您可以轻松地并行化和分发Python代码。

要并行处理示例,您需要使用@ray.remote装饰器定义函数,然后使用调用它们.remote

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

如果将相同的参数传递给两个函数且参数较大,则使用的更有效方法ray.put()。这样可以避免将大参数序列化两次并为其创建两个内存副本:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

如果func1()func2()返回结果,则需要按以下方式重写代码:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func1.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

多处理模块相比,使用Ray有许多优点。特别是,相同的代码将在单台计算机以及一台计算机集群上运行。有关Ray的更多优点,请参见此相关文章

This can be done elegantly with Ray, a system that allows you to easily parallelize and distribute your Python code.

To parallelize your example, you’d need to define your functions with the @ray.remote decorator, and then invoke them with .remote.

import ray

ray.init()

dir1 = 'C:\\folder1'
dir2 = 'C:\\folder2'
filename = 'test.txt'
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45]

# Define the functions. 
# You need to pass every global variable used by the function as an argument.
# This is needed because each remote function runs in a different process,
# and thus it does not have access to the global variables defined in 
# the current process.
@ray.remote
def func1(filename, addFiles, dir):
    # func1() code here...

@ray.remote
def func2(filename, addFiles, dir):
    # func2() code here...

# Start two tasks in the background and wait for them to finish.
ray.get([func1.remote(filename, addFiles, dir1), func2.remote(filename, addFiles, dir2)]) 

If you pass the same argument to both functions and the argument is large, a more efficient way to do this is using ray.put(). This avoids the large argument to be serialized twice and to create two memory copies of it:

largeData_id = ray.put(largeData)

ray.get([func1(largeData_id), func2(largeData_id)])

Important – If func1() and func2() return results, you need to rewrite the code as follows:

ret_id1 = func1.remote(filename, addFiles, dir1)
ret_id2 = func2.remote(filename, addFiles, dir2)
ret1, ret2 = ray.get([ret_id1, ret_id2])

There are a number of advantages of using Ray over the multiprocessing module. In particular, the same code will run on a single machine as well as on a cluster of machines. For more advantages of Ray see this related post.


回答 2

如果您的函数主要用于I / O工作(而CPU工作量较少),并且您具有Python 3.2+,则可以使用ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

如果您的功能主要是在做CPU工作(而I / O则更少),并且您拥有Python 2.6+,则可以使用多处理模块:

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])

If your functions are mainly doing I/O work (and less CPU work) and you have Python 3.2+, you can use a ThreadPoolExecutor:

from concurrent.futures import ThreadPoolExecutor

def run_io_tasks_in_parallel(tasks):
    with ThreadPoolExecutor() as executor:
        running_tasks = [executor.submit(task) for task in tasks]
        for running_task in running_tasks:
            running_task.result()

run_io_tasks_in_parallel([
    lambda: print('IO task 1 running!'),
    lambda: print('IO task 2 running!'),
])

If your functions are mainly doing CPU work (and less I/O work) and you have Python 2.6+, you can use the multiprocessing module:

from multiprocessing import Process

def run_cpu_tasks_in_parallel(tasks):
    running_tasks = [Process(target=task) for task in tasks]
    for running_task in running_tasks:
        running_task.start()
    for running_task in running_tasks:
        running_task.join()

run_cpu_tasks_in_parallel([
    lambda: print('CPU task 1 running!'),
    lambda: print('CPU task 2 running!'),
])

回答 3

如果您是Windows用户并且使用python 3,那么本文将帮助您在python中进行并行编程。当您运行常规的多处理库的池编程时,您将在程序中遇到关于主要功能的错误。这是因为Windows没有fork()功能。下面的帖子提供了上述问题的解决方案。

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

由于我使用的是python 3,因此我对该程序进行了如下更改:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

使用此功能后,上面的问题代码也做了如下更改:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

我得到的输出为:

[1, 8, 27, 64, 125, 216]

我认为这篇文章可能对某些Windows用户有用。

If you are a windows user and using python 3, then this post will help you to do parallel programming in python.when you run a usual multiprocessing library’s pool programming, you will get an error regarding the main function in your program. This is because the fact that windows has no fork() functionality. The below post is giving a solution to the mentioned problem .

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

Since I was using the python 3, I changed the program a little like this:

from types import FunctionType
import marshal

def _applicable(*args, **kwargs):
  name = kwargs['__pw_name']
  code = marshal.loads(kwargs['__pw_code'])
  gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls'])
  defs = marshal.loads(kwargs['__pw_defs'])
  clsr = marshal.loads(kwargs['__pw_clsr'])
  fdct = marshal.loads(kwargs['__pw_fdct'])
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  del kwargs['__pw_name']
  del kwargs['__pw_code']
  del kwargs['__pw_defs']
  del kwargs['__pw_clsr']
  del kwargs['__pw_fdct']
  return func(*args, **kwargs)

def make_applicable(f, *args, **kwargs):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  kwargs['__pw_name'] = f.__name__  # edited
  kwargs['__pw_code'] = marshal.dumps(f.__code__)   # edited
  kwargs['__pw_defs'] = marshal.dumps(f.__defaults__)  # edited
  kwargs['__pw_clsr'] = marshal.dumps(f.__closure__)  # edited
  kwargs['__pw_fdct'] = marshal.dumps(f.__dict__)   # edited
  return _applicable, args, kwargs

def _mappable(x):
  x,name,code,defs,clsr,fdct = x
  code = marshal.loads(code)
  gbls = globals() #gbls = marshal.loads(gbls)
  defs = marshal.loads(defs)
  clsr = marshal.loads(clsr)
  fdct = marshal.loads(fdct)
  func = FunctionType(code, gbls, name, defs, clsr)
  func.fdct = fdct
  return func(x)

def make_mappable(f, iterable):
  if not isinstance(f, FunctionType): raise ValueError('argument must be a function')
  name = f.__name__    # edited
  code = marshal.dumps(f.__code__)   # edited
  defs = marshal.dumps(f.__defaults__)  # edited
  clsr = marshal.dumps(f.__closure__)  # edited
  fdct = marshal.dumps(f.__dict__)  # edited
  return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable)

After this function , the above problem code is also changed a little like this:

from multiprocessing import Pool
from poolable import make_applicable, make_mappable

def cube(x):
  return x**3

if __name__ == "__main__":
  pool    = Pool(processes=2)
  results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)]
  print([result.get(timeout=10) for result in results])

And I got the output as :

[1, 8, 27, 64, 125, 216]

I am thinking that this post may be useful for some of the windows users.


回答 4

无法保证两个函数将彼此同步执行,这似乎是您想要执行的操作。

最好的办法是将功能分成几个步骤,然后使用Process.join@aix的答案提及等方式在关键的同步点完成这两个步骤。

这比time.sleep(10)因为您不能保证确切的时间安排要好。在显式等待的情况下,您要说的是必须在执行该步骤之前完成功能,然后再移至下一个步骤,而不是假设它将在10毫秒内完成,而这不能根据计算机上发生的其他事情来保证。

There’s no way to guarantee that two functions will execute in sync with each other which seems to be what you want to do.

The best you can do is to split up the function into several steps, then wait for both to finish at critical synchronization points using Process.join like @aix’s answer mentions.

This is better than time.sleep(10) because you can’t guarantee exact timings. With explicitly waiting, you’re saying that the functions must be done executing that step before moving to the next, instead of assuming it will be done within 10ms which isn’t guaranteed based on what else is going on on the machine.


回答 5

似乎您只有一个函数,需要调用两个不同的参数。可以结合使用Python和3.2+ concurrent.futures和以上版本来优雅地完成map

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

现在,如果您的操作受IO限制,则可以这样使用ThreadPoolExecutor

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

注意map此处如何map将函数用于参数列表。

现在,如果您的功能受CPU限制,则可以使用 ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

如果不确定,您可以简单地尝试两者,看看哪一个可以给您更好的结果。

最后,如果您希望打印出结果,则只需执行以下操作:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)

Seems like you have a single function that you need to call on two different parameters. This can be elegantly done using a combination of concurrent.futures and map with Python 3.2+

import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def sleep_secs(seconds):
  time.sleep(seconds)
  print(f'{seconds} has been processed')

secs_list = [2,4, 6, 8, 10, 12]

Now, if your operation is IO bound, then you can use the ThreadPoolExecutor as such:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

Note how map is used here to map your function to the list of arguments.

Now, If your function is CPU bound, then you can use ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)

If you are not sure, you can simply try both and see which one gives you better results.

Finally, if you are looking to print out your results, you can simply do this:

with ThreadPoolExecutor() as executor:
  results = executor.map(sleep_secs, secs_list)
  for result in results:
    print(result)

声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。