如何恢复传递给multiprocessing.Process的函数的返回值?

问题:如何恢复传递给multiprocessing.Process的函数的返回值?

在下面的示例代码中,我想恢复该函数的返回值 worker。我该怎么做呢?此值存储在哪里?

示例代码:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

我似乎在中存储的对象中找不到相关的属性jobs

In the example code below, I’d like to recover the return value of the function worker. How can I go about doing this? Where is this value stored?

Example Code:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

I can’t seem to find the relevant attribute in the objects stored in jobs.


回答 0

使用共享变量进行通信。例如这样:

import multiprocessing

def worker(procnum, return_dict):
    '''worker function'''
    print str(procnum) + ' represent!'
    return_dict[procnum] = procnum


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print return_dict.values()

Use shared variable to communicate. For example like this:

import multiprocessing


def worker(procnum, return_dict):
    """worker function"""
    print(str(procnum) + " represent!")
    return_dict[procnum] = procnum


if __name__ == "__main__":
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print(return_dict.values())

回答 1

我认为@sega_sai建议的方法更好。但这确实需要一个代码示例,因此请按以下步骤进行:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

将打印返回值:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

如果您熟悉map(内置Python 2),这应该不会太有挑战性。否则,请查看sega_Sai的链接

注意需要很少的代码。(还请注意如何重用流程)。

I think the approach suggested by @sega_sai is the better one. But it really needs a code example, so here goes:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

Which will print the return values:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

If you are familiar with map (the Python 2 built-in) this should not be too challenging. Otherwise have a look at sega_Sai’s link.

Note how little code is needed. (Also note how processes are re-used).


回答 2

本示例说明如何使用multiprocessing.Pipe实例列表从任意数量的进程中返回字符串:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

该解决方案使用较少的资源比multiprocessing.Queue其用途

  • 管道
  • 至少一把锁
  • 缓冲区
  • 一个线程

multiprocessing.SimpleQueue其用途

  • 管道
  • 至少一把锁

查看每种类型的源代码非常有启发性。

This example shows how to use a list of multiprocessing.Pipe instances to return strings from an arbitrary number of processes:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

This solution uses fewer resources than a multiprocessing.Queue which uses

  • a Pipe
  • at least one Lock
  • a buffer
  • a thread

or a multiprocessing.SimpleQueue which uses

  • a Pipe
  • at least one Lock

It is very instructive to look at the source for each of these types.


回答 3

出于某种原因,我找不到在Queue任何地方执行此操作的一般示例(即使Python的doc示例也不会生成多个进程),所以这是经过10次尝试后我才开始工作的内容:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue是一个线程安全的阻塞队列,可用于存储子进程的返回值。因此,您必须将队列传递给每个进程。一些不太明显的是,你们必须get()从队列你之前joinProcessES否则队列已满,并且块一切。

针对那些面向对象的人的更新(在Python 3.4中测试):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)

For some reason, I couldn’t find a general example of how to do this with Queue anywhere (even Python’s doc examples don’t spawn multiple processes), so here’s what I got working after like 10 tries:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue is a blocking, thread-safe queue that you can use to store the return values from the child processes. So you have to pass the queue to each process. Something less obvious here is that you have to get() from the queue before you join the Processes or else the queue fills up and blocks everything.

Update for those who are object-oriented (tested in Python 3.4):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)

回答 4

对于任何人谁是寻求如何从获取值Process使用Queue

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    print queue.get()  # Prints {"foo": True}
    p.join()

For anyone else who is seeking how to get a value from a Process using Queue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join()
    print(queue.get())  # Prints {"foo": True}

Note that in Windows or Jupyter Notebook, with multithreading you have to save this as a file and execute the file. If you do it in a command prompt you will see an error like this:

 AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>

回答 5

似乎应该改用multiprocessing.Pool类,并使用.apply().apply_async(),map()方法

http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult

It seems that you should use the multiprocessing.Pool class instead and use the methods .apply() .apply_async(), map()

http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult


回答 6

您可以使用exit内置功能来设置流程的退出代码。可以从exitcode流程的属性中获取:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

You can use the exit built-in to set the exit code of a process. It can be obtained from the exitcode attribute of the process:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

回答 7

石子包有一个很好的抽象杠杆multiprocessing.Pipe,这使得这个非常简单:

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

示例来自:https : //pythonhosted.org/Pebble/#concurrent-decorators

The pebble package has a nice abstraction leveraging multiprocessing.Pipe which makes this quite straightforward:

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

Example from: https://pythonhosted.org/Pebble/#concurrent-decorators


回答 8

以为我会简化上面复制的最简单的示例,为我在Py3.6上工作。最简单的是multiprocessing.Pool

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

您可以使用设置池中的进程数Pool(processes=5)。但是,它默认为CPU计数,因此对于与CPU绑定的任务,请将其留空。(I / O绑定的任务无论如何通常都适合线程,因为线程大多在等待,因此可以共享一个CPU内核。)Pool还应用了分块优化

(请注意,worker方法不能嵌套在一个方法中。我最初在该方法内定义了我的worker方法,该方法对进行调用pool.map,以使其全部独立,但随后这些进程无法导入它,并抛出了“ AttributeError :无法腌制本地对象“ outer_method..inner_method”。在此处更多。它可以在类内部。)

(赞赏最初指定打印'represent!'而不是的问题time.sleep(),但没有它,我以为某些代码不是在同时运行的。)


Py3的 ProcessPoolExecutor也是两行(.map返回一个生成器,因此您需要list()):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))

使用纯ProcessES:

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())

SimpleQueue如果您需要的是put和,请使用get。第一个循环开始所有进程,第二个循环进行阻塞之前queue.get调用。我认为也没有任何理由打电话p.join()

Thought I’d simplify the simplest examples copied from above, working for me on Py3.6. Simplest is multiprocessing.Pool:

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

You can set the number of processes in the pool with, e.g., Pool(processes=5). However it defaults to CPU count, so leave it blank for CPU-bound tasks. (I/O-bound tasks often suit threads anyway, as the threads are mostly waiting so can share a CPU core.) Pool also applies chunking optimization.

(Note that the worker method cannot be nested within a method. I initially defined my worker method inside the method that makes the call to pool.map, to keep it all self-contained, but then the processes couldn’t import it, and threw “AttributeError: Can’t pickle local object outer_method..inner_method”. More here. It can be inside a class.)

(Appreciate the original question specified printing 'represent!' rather than time.sleep(), but without it I thought some code was running concurrently when it wasn’t.)


Py3’s ProcessPoolExecutor is also two lines (.map returns a generator so you need the list()):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))

With plain Processes:

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())

Use SimpleQueue if all you need is put and get. The first loop starts all the processes, before the second makes the blocking queue.get calls. I don’t think there’s any reason to call p.join() too.


回答 9

一个简单的解决方案:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

A simple solution:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

Output:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

回答 10

如果您使用的是Python 3,则可以concurrent.futures.ProcessPoolExecutor用作方便的抽象:

from concurrent.futures import ProcessPoolExecutor

def worker(procnum):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return procnum


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        print(list(executor.map(worker, range(5))))

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

If you are using Python 3, you can use concurrent.futures.ProcessPoolExecutor as a convenient abstraction:

from concurrent.futures import ProcessPoolExecutor

def worker(procnum):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return procnum


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        print(list(executor.map(worker, range(5))))

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

回答 11

由于需要从函数中获取错误代码,因此我对vartec的答案做了一些修改。(感谢vertec !!!这是一个很棒的技巧)

也可以使用a来完成此操作,manager.list但我认为最好将它放在字典中并在其中存储列表。这样,由于我们无法确定列表的填充顺序,因此我们保留了函数和结果。

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # if fail wont join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # name was set up in the Process line 53

    # here will only show the function that worked and where able to populate the 
    # manager dict
    for i, j in manager.items():
        print dir(i)  # things you can do to the function
        print i, j

I modified vartec’s answer a bit since I needed to get the error codes from the function. (Thanks vertec!!! its an awesome trick)

This can also be done with a manager.list but I think is better to have it in a dict and store a list within it. That way, way we keep the function and the results since we can’t be sure of the order in which the list will be populated.

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # if fail wont join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # name was set up in the Process line 53

    # here will only show the function that worked and where able to populate the 
    # manager dict
    for i, j in manager.items():
        print dir(i)  # things you can do to the function
        print i, j