标签归档:python-multiprocessing

我们什么时候应该调用multiprocessing.Pool.join?

问题:我们什么时候应该调用multiprocessing.Pool.join?

我正在使用’multiprocess.Pool.imap_unordered’如下

from multiprocessing import Pool
pool = Pool()
for mapped_result in pool.imap_unordered(mapping_func, args_iter):
    do some additional processing on mapped_result

我需要打电话pool.closepool.join之后的for循环?

I am using ‘multiprocess.Pool.imap_unordered’ as following

from multiprocessing import Pool
pool = Pool()
for mapped_result in pool.imap_unordered(mapping_func, args_iter):
    do some additional processing on mapped_result

Do I need to call pool.close or pool.join after the for loop?


回答 0

不,您没有,但是如果您不再使用游泳池,那可能是个好主意。

Tim Peters在此SO帖子中调用pool.close或调用的理由pool.join很明确:

至于Pool.close(),您应该在永远不会向池实例提交更多工作的时候才调用它。因此,通常在主程序的可并行化部分完成时调用Pool.close()。然后,当所有已分配的工作完成时,工作进程将终止。

调用Pool.join()等待工作进程终止也是一种很好的做法。除其他原因外,通常没有很好的方法来报告并行化代码中的异常(异常仅在与您的主程序正在做的事情有关的上下文中发生),而Pool.join()提供了一个同步点,可以报告发生的某些异常在您否则无法看到的工作流程中。

No, you don’t, but it’s probably a good idea if you aren’t going to use the pool anymore.

Reasons for calling pool.close or pool.join are well said by Tim Peters in this SO post:

As to Pool.close(), you should call that when – and only when – you’re never going to submit more work to the Pool instance. So Pool.close() is typically called when the parallelizable part of your main program is finished. Then the worker processes will terminate when all work already assigned has completed.

It’s also excellent practice to call Pool.join() to wait for the worker processes to terminate. Among other reasons, there’s often no good way to report exceptions in parallelized code (exceptions occur in a context only vaguely related to what your main program is doing), and Pool.join() provides a synchronization point that can report some exceptions that occurred in worker processes that you’d otherwise never see.


回答 1

我遇到了相同的内存问题,因为当我不使用Python时,pool.close()以及与用于计算Levenshtein距离的函数一起使用pool.join(),Python的multiprocessing.pool的内存使用率一直在增长pool.map()。该功能运行良好,但是在Win7 64机器上没有正确收集垃圾,并且每次调用该功能之前,内存使用率一直在失控,直到整个操作系统崩溃。这是修复漏洞的代码:

stringList = []
for possible_string in stringArray:
    stringList.append((searchString,possible_string))

pool = Pool(5)
results = pool.map(myLevenshteinFunction, stringList)
pool.close()
pool.join()

关闭并加入池后,内存泄漏消失了。

I had the same memory issue as Memory usage keep growing with Python’s multiprocessing.pool when I didn’t use pool.close() and pool.join() when using pool.map() with a function that calculated Levenshtein distance. The function worked fine, but wasn’t garbage collected properly on a Win7 64 machine, and the memory usage kept growing out of control every time the function was called until it took the whole operating system down. Here’s the code that fixed the leak:

stringList = []
for possible_string in stringArray:
    stringList.append((searchString,possible_string))

pool = Pool(5)
results = pool.map(myLevenshteinFunction, stringList)
pool.close()
pool.join()

After closing and joining the pool the memory leak went away.


如何恢复传递给multiprocessing.Process的函数的返回值?

问题:如何恢复传递给multiprocessing.Process的函数的返回值?

在下面的示例代码中,我想恢复该函数的返回值 worker。我该怎么做呢?此值存储在哪里?

示例代码:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

我似乎在中存储的对象中找不到相关的属性jobs

In the example code below, I’d like to recover the return value of the function worker. How can I go about doing this? Where is this value stored?

Example Code:

import multiprocessing

def worker(procnum):
    '''worker function'''
    print str(procnum) + ' represent!'
    return procnum


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print jobs

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[<Process(Process-1, stopped)>, <Process(Process-2, stopped)>, <Process(Process-3, stopped)>, <Process(Process-4, stopped)>, <Process(Process-5, stopped)>]

I can’t seem to find the relevant attribute in the objects stored in jobs.


回答 0

使用共享变量进行通信。例如这样:

import multiprocessing

def worker(procnum, return_dict):
    '''worker function'''
    print str(procnum) + ' represent!'
    return_dict[procnum] = procnum


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print return_dict.values()

Use shared variable to communicate. For example like this:

import multiprocessing


def worker(procnum, return_dict):
    """worker function"""
    print(str(procnum) + " represent!")
    return_dict[procnum] = procnum


if __name__ == "__main__":
    manager = multiprocessing.Manager()
    return_dict = manager.dict()
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i, return_dict))
        jobs.append(p)
        p.start()

    for proc in jobs:
        proc.join()
    print(return_dict.values())

回答 1

我认为@sega_sai建议的方法更好。但这确实需要一个代码示例,因此请按以下步骤进行:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

将打印返回值:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

如果您熟悉map(内置Python 2),这应该不会太有挑战性。否则,请查看sega_Sai的链接

注意需要很少的代码。(还请注意如何重用流程)。

I think the approach suggested by @sega_sai is the better one. But it really needs a code example, so here goes:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

Which will print the return values:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

If you are familiar with map (the Python 2 built-in) this should not be too challenging. Otherwise have a look at sega_Sai’s link.

Note how little code is needed. (Also note how processes are re-used).


回答 2

本示例说明如何使用multiprocessing.Pipe实例列表从任意数量的进程中返回字符串:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

该解决方案使用较少的资源比multiprocessing.Queue其用途

  • 管道
  • 至少一把锁
  • 缓冲区
  • 一个线程

multiprocessing.SimpleQueue其用途

  • 管道
  • 至少一把锁

查看每种类型的源代码非常有启发性。

This example shows how to use a list of multiprocessing.Pipe instances to return strings from an arbitrary number of processes:

import multiprocessing

def worker(procnum, send_end):
    '''worker function'''
    result = str(procnum) + ' represent!'
    print result
    send_end.send(result)

def main():
    jobs = []
    pipe_list = []
    for i in range(5):
        recv_end, send_end = multiprocessing.Pipe(False)
        p = multiprocessing.Process(target=worker, args=(i, send_end))
        jobs.append(p)
        pipe_list.append(recv_end)
        p.start()

    for proc in jobs:
        proc.join()
    result_list = [x.recv() for x in pipe_list]
    print result_list

if __name__ == '__main__':
    main()

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
['0 represent!', '1 represent!', '2 represent!', '3 represent!', '4 represent!']

This solution uses fewer resources than a multiprocessing.Queue which uses

  • a Pipe
  • at least one Lock
  • a buffer
  • a thread

or a multiprocessing.SimpleQueue which uses

  • a Pipe
  • at least one Lock

It is very instructive to look at the source for each of these types.


回答 3

出于某种原因,我找不到在Queue任何地方执行此操作的一般示例(即使Python的doc示例也不会生成多个进程),所以这是经过10次尝试后我才开始工作的内容:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue是一个线程安全的阻塞队列,可用于存储子进程的返回值。因此,您必须将队列传递给每个进程。一些不太明显的是,你们必须get()从队列你之前joinProcessES否则队列已满,并且块一切。

针对那些面向对象的人的更新(在Python 3.4中测试):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)

For some reason, I couldn’t find a general example of how to do this with Queue anywhere (even Python’s doc examples don’t spawn multiple processes), so here’s what I got working after like 10 tries:

def add_helper(queue, arg1, arg2): # the func called in child processes
    ret = arg1 + arg2
    queue.put(ret)

def multi_add(): # spawns child processes
    q = Queue()
    processes = []
    rets = []
    for _ in range(0, 100):
        p = Process(target=add_helper, args=(q, 1, 2))
        processes.append(p)
        p.start()
    for p in processes:
        ret = q.get() # will block
        rets.append(ret)
    for p in processes:
        p.join()
    return rets

Queue is a blocking, thread-safe queue that you can use to store the return values from the child processes. So you have to pass the queue to each process. Something less obvious here is that you have to get() from the queue before you join the Processes or else the queue fills up and blocks everything.

Update for those who are object-oriented (tested in Python 3.4):

from multiprocessing import Process, Queue

class Multiprocessor():

    def __init__(self):
        self.processes = []
        self.queue = Queue()

    @staticmethod
    def _wrapper(func, queue, args, kwargs):
        ret = func(*args, **kwargs)
        queue.put(ret)

    def run(self, func, *args, **kwargs):
        args2 = [func, self.queue, args, kwargs]
        p = Process(target=self._wrapper, args=args2)
        self.processes.append(p)
        p.start()

    def wait(self):
        rets = []
        for p in self.processes:
            ret = self.queue.get()
            rets.append(ret)
        for p in self.processes:
            p.join()
        return rets

# tester
if __name__ == "__main__":
    mp = Multiprocessor()
    num_proc = 64
    for _ in range(num_proc): # queue up multiple tasks running `sum`
        mp.run(sum, [1, 2, 3, 4, 5])
    ret = mp.wait() # get all results
    print(ret)
    assert len(ret) == num_proc and all(r == 15 for r in ret)

回答 4

对于任何人谁是寻求如何从获取值Process使用Queue

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    print queue.get()  # Prints {"foo": True}
    p.join()

For anyone else who is seeking how to get a value from a Process using Queue:

import multiprocessing

ret = {'foo': False}

def worker(queue):
    ret = queue.get()
    ret['foo'] = True
    queue.put(ret)

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    queue.put(ret)
    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()
    p.join()
    print(queue.get())  # Prints {"foo": True}

Note that in Windows or Jupyter Notebook, with multithreading you have to save this as a file and execute the file. If you do it in a command prompt you will see an error like this:

 AttributeError: Can't get attribute 'worker' on <module '__main__' (built-in)>

回答 5

似乎应该改用multiprocessing.Pool类,并使用.apply().apply_async(),map()方法

http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult

It seems that you should use the multiprocessing.Pool class instead and use the methods .apply() .apply_async(), map()

http://docs.python.org/library/multiprocessing.html?highlight=pool#multiprocessing.pool.AsyncResult


回答 6

您可以使用exit内置功能来设置流程的退出代码。可以从exitcode流程的属性中获取:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

You can use the exit built-in to set the exit code of a process. It can be obtained from the exitcode attribute of the process:

import multiprocessing

def worker(procnum):
    print str(procnum) + ' represent!'
    exit(procnum)

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

    result = []
    for proc in jobs:
        proc.join()
        result.append(proc.exitcode)
    print result

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

回答 7

石子包有一个很好的抽象杠杆multiprocessing.Pipe,这使得这个非常简单:

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

示例来自:https : //pythonhosted.org/Pebble/#concurrent-decorators

The pebble package has a nice abstraction leveraging multiprocessing.Pipe which makes this quite straightforward:

from pebble import concurrent

@concurrent.process
def function(arg, kwarg=0):
    return arg + kwarg

future = function(1, kwarg=1)

print(future.result())

Example from: https://pythonhosted.org/Pebble/#concurrent-decorators


回答 8

以为我会简化上面复制的最简单的示例,为我在Py3.6上工作。最简单的是multiprocessing.Pool

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

您可以使用设置池中的进程数Pool(processes=5)。但是,它默认为CPU计数,因此对于与CPU绑定的任务,请将其留空。(I / O绑定的任务无论如何通常都适合线程,因为线程大多在等待,因此可以共享一个CPU内核。)Pool还应用了分块优化

(请注意,worker方法不能嵌套在一个方法中。我最初在该方法内定义了我的worker方法,该方法对进行调用pool.map,以使其全部独立,但随后这些进程无法导入它,并抛出了“ AttributeError :无法腌制本地对象“ outer_method..inner_method”。在此处更多。它可以在类内部。)

(赞赏最初指定打印'represent!'而不是的问题time.sleep(),但没有它,我以为某些代码不是在同时运行的。)


Py3的 ProcessPoolExecutor也是两行(.map返回一个生成器,因此您需要list()):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))

使用纯ProcessES:

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())

SimpleQueue如果您需要的是put和,请使用get。第一个循环开始所有进程,第二个循环进行阻塞之前queue.get调用。我认为也没有任何理由打电话p.join()

Thought I’d simplify the simplest examples copied from above, working for me on Py3.6. Simplest is multiprocessing.Pool:

import multiprocessing
import time

def worker(x):
    time.sleep(1)
    return x

pool = multiprocessing.Pool()
print(pool.map(worker, range(10)))

You can set the number of processes in the pool with, e.g., Pool(processes=5). However it defaults to CPU count, so leave it blank for CPU-bound tasks. (I/O-bound tasks often suit threads anyway, as the threads are mostly waiting so can share a CPU core.) Pool also applies chunking optimization.

(Note that the worker method cannot be nested within a method. I initially defined my worker method inside the method that makes the call to pool.map, to keep it all self-contained, but then the processes couldn’t import it, and threw “AttributeError: Can’t pickle local object outer_method..inner_method”. More here. It can be inside a class.)

(Appreciate the original question specified printing 'represent!' rather than time.sleep(), but without it I thought some code was running concurrently when it wasn’t.)


Py3’s ProcessPoolExecutor is also two lines (.map returns a generator so you need the list()):

from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
    print(list(executor.map(worker, range(10))))

With plain Processes:

import multiprocessing
import time

def worker(x, queue):
    time.sleep(1)
    queue.put(x)

queue = multiprocessing.SimpleQueue()
tasks = range(10)

for task in tasks:
    multiprocessing.Process(target=worker, args=(task, queue,)).start()

for _ in tasks:
    print(queue.get())

Use SimpleQueue if all you need is put and get. The first loop starts all the processes, before the second makes the blocking queue.get calls. I don’t think there’s any reason to call p.join() too.


回答 9

一个简单的解决方案:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

输出:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

A simple solution:

import multiprocessing

output=[]
data = range(0,10)

def f(x):
    return x**2

def handler():
    p = multiprocessing.Pool(64)
    r=p.map(f, data)
    return r

if __name__ == '__main__':
    output.append(handler())

print(output[0])

Output:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

回答 10

如果您使用的是Python 3,则可以concurrent.futures.ProcessPoolExecutor用作方便的抽象:

from concurrent.futures import ProcessPoolExecutor

def worker(procnum):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return procnum


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        print(list(executor.map(worker, range(5))))

输出:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

If you are using Python 3, you can use concurrent.futures.ProcessPoolExecutor as a convenient abstraction:

from concurrent.futures import ProcessPoolExecutor

def worker(procnum):
    '''worker function'''
    print(str(procnum) + ' represent!')
    return procnum


if __name__ == '__main__':
    with ProcessPoolExecutor() as executor:
        print(list(executor.map(worker, range(5))))

Output:

0 represent!
1 represent!
2 represent!
3 represent!
4 represent!
[0, 1, 2, 3, 4]

回答 11

由于需要从函数中获取错误代码,因此我对vartec的答案做了一些修改。(感谢vertec !!!这是一个很棒的技巧)

也可以使用a来完成此操作,manager.list但我认为最好将它放在字典中并在其中存储列表。这样,由于我们无法确定列表的填充顺序,因此我们保留了函数和结果。

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # if fail wont join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # name was set up in the Process line 53

    # here will only show the function that worked and where able to populate the 
    # manager dict
    for i, j in manager.items():
        print dir(i)  # things you can do to the function
        print i, j

I modified vartec’s answer a bit since I needed to get the error codes from the function. (Thanks vertec!!! its an awesome trick)

This can also be done with a manager.list but I think is better to have it in a dict and store a list within it. That way, way we keep the function and the results since we can’t be sure of the order in which the list will be populated.

from multiprocessing import Process
import time
import datetime
import multiprocessing


def func1(fn, m_list):
    print 'func1: starting'
    time.sleep(1)
    m_list[fn] = "this is the first function"
    print 'func1: finishing'
    # return "func1"  # no need for return since Multiprocess doesnt return it =(

def func2(fn, m_list):
    print 'func2: starting'
    time.sleep(3)
    m_list[fn] = "this is function 2"
    print 'func2: finishing'
    # return "func2"

def func3(fn, m_list):
    print 'func3: starting'
    time.sleep(9)
    # if fail wont join the rest because it never populate the dict
    # or do a try/except to get something in return.
    raise ValueError("failed here")
    # if we want to get the error in the manager dict we can catch the error
    try:
        raise ValueError("failed here")
        m_list[fn] = "this is third"
    except:
        m_list[fn] = "this is third and it fail horrible"
        # print 'func3: finishing'
        # return "func3"


def runInParallel(*fns):  # * is to accept any input in list
    start_time = datetime.datetime.now()
    proc = []
    manager = multiprocessing.Manager()
    m_list = manager.dict()
    for fn in fns:
        # print fn
        # print dir(fn)
        p = Process(target=fn, name=fn.func_name, args=(fn, m_list))
        p.start()
        proc.append(p)
    for p in proc:
        p.join()  # 5 is the time out

    print datetime.datetime.now() - start_time
    return m_list, proc

if __name__ == '__main__':
    manager, proc = runInParallel(func1, func2, func3)
    # print dir(proc[0])
    # print proc[0]._name
    # print proc[0].name
    # print proc[0].exitcode

    # here you can check what did fail
    for i in proc:
        print i.name, i.exitcode  # name was set up in the Process line 53

    # here will only show the function that worked and where able to populate the 
    # manager dict
    for i, j in manager.items():
        print dir(i)  # things you can do to the function
        print i, j

multiprocessing.Pool:map_async和imap有什么区别?

问题:multiprocessing.Pool:map_async和imap有什么区别?

我想学习如何使用Python的multiprocessing包,但我不明白之间的差别map_asyncimap。我注意到两者map_asyncimap都是异步执行的。那么我什么时候应该使用另一个呢?我应该如何检索返回的结果map_async

我应该使用这样的东西吗?

def test():
    result = pool.map_async()
    pool.close()
    pool.join()
    return result.get()

result=test()
for i in result:
    print i

I’m trying to learn how to use Python’s multiprocessing package, but I don’t understand the difference between map_async and imap. I noticed that both map_async and imap are executed asynchronously. So when should I use one over the other? And how should I retrieve the result returned by map_async?

Should I use something like this?

def test():
    result = pool.map_async()
    pool.close()
    pool.join()
    return result.get()

result=test()
for i in result:
    print i

回答 0

imap/ imap_unorderedmap/ 之间有两个主要区别map_async

  1. 他们消耗迭代的方式传递给他们。
  2. 他们将结果返回给您的方式。

map通过将iterable转换为列表(假设它还不是列表)来消耗iterable,将其分成多个块,然后将这些块发送到中的worker进程Pool。与将可迭代项中的每个项目一次在一个进程中的一个进程之间传递相比,将可迭代项拆分为多个块效果更好-特别是在可迭代项较大的情况下。但是,将迭代器转换为列表以对其进行分块可能会具有很高的内存成本,因为整个列表都需要保留在内存中。

imap不会将您提供的可迭代项变成一个列表,也不会将其分成多个块(默认情况下)。它将一次遍历可迭代的一个元素,并将它们分别发送给工作进程。这意味着您不会浪费将整个可迭代对象转换为列表的内存,但是这也意味着由于缺少分块,大型可迭代对象的性能会降低。但是,可以通过传递chunksize大于默认值1 的参数来缓解这种情况。

imap/ imap_unorderedmap/ 之间的另一个主要区别map_async是,使用imap/ imap_unordered,您可以在工作人员准备就绪后立即开始接收其结果,而不必等待所有工作完成。使用map_asyncAsyncResult会立即返回an ,但您实际上无法从该对象检索结果,除非所有结果都已处理完毕,然后它会返回与之相同的列表mapmap实际上是在内部实现的map_async(...).get())。无法获得部分结果。您要么拥有整个结果,要么一无所有。

imap并且imap_unordered都立即返回可迭代对象。使用时imap,结果将在准备好后立即从Iterable中产生,同时仍保留可迭代输入的顺序。使用imap_unordered,无论输入可迭代的顺序如何,都将在准备好结果后立即产生结果。所以,说你有这个:

import multiprocessing
import time

def func(x):
    time.sleep(x)
    return x + 2

if __name__ == "__main__":    
    p = multiprocessing.Pool()
    start = time.time()
    for x in p.imap(func, [1,5,3]):
        print("{} (Time elapsed: {}s)".format(x, int(time.time() - start)))

这将输出:

3 (Time elapsed: 1s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

如果您使用p.imap_unordered而不是p.imap,则会看到:

3 (Time elapsed: 1s)
5 (Time elapsed: 3s)
7 (Time elapsed: 5s)

如果您使用p.mapp.map_async().get(),则会看到:

3 (Time elapsed: 5s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

因此,使用imap/ imap_unordered超过的主要原因map_async是:

  1. 您的可迭代对象足够大,以至于将其转换为列表将导致您用完/使用过多的内存。
  2. 您希望能够在所有结果完成之前开始处理结果。

There are two key differences between imap/imap_unordered and map/map_async:

  1. The way they consume the iterable you pass to them.
  2. The way they return the result back to you.

map consumes your iterable by converting the iterable to a list (assuming it isn’t a list already), breaking it into chunks, and sending those chunks to the worker processes in the Pool. Breaking the iterable into chunks performs better than passing each item in the iterable between processes one item at a time – particularly if the iterable is large. However, turning the iterable into a list in order to chunk it can have a very high memory cost, since the entire list will need to be kept in memory.

imap doesn’t turn the iterable you give it into a list, nor does break it into chunks (by default). It will iterate over the iterable one element at a time, and send them each to a worker process. This means you don’t take the memory hit of converting the whole iterable to a list, but it also means the performance is slower for large iterables, because of the lack of chunking. This can be mitigated by passing a chunksize argument larger than default of 1, however.

The other major difference between imap/imap_unordered and map/map_async, is that with imap/imap_unordered, you can start receiving results from workers as soon as they’re ready, rather than having to wait for all of them to be finished. With map_async, an AsyncResult is returned right away, but you can’t actually retrieve results from that object until all of them have been processed, at which points it returns the same list that map does (map is actually implemented internally as map_async(...).get()). There’s no way to get partial results; you either have the entire result, or nothing.

imap and imap_unordered both return iterables right away. With imap, the results will be yielded from the iterable as soon as they’re ready, while still preserving the ordering of the input iterable. With imap_unordered, results will be yielded as soon as they’re ready, regardless of the order of the input iterable. So, say you have this:

import multiprocessing
import time

def func(x):
    time.sleep(x)
    return x + 2

if __name__ == "__main__":    
    p = multiprocessing.Pool()
    start = time.time()
    for x in p.imap(func, [1,5,3]):
        print("{} (Time elapsed: {}s)".format(x, int(time.time() - start)))

This will output:

3 (Time elapsed: 1s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

If you use p.imap_unordered instead of p.imap, you’ll see:

3 (Time elapsed: 1s)
5 (Time elapsed: 3s)
7 (Time elapsed: 5s)

If you use p.map or p.map_async().get(), you’ll see:

3 (Time elapsed: 5s)
7 (Time elapsed: 5s)
5 (Time elapsed: 5s)

So, the primary reasons to use imap/imap_unordered over map_async are:

  1. Your iterable is large enough that converting it to a list would cause you to run out of/use too much memory.
  2. You want to be able to start processing the results before all of them are completed.

Python多处理PicklingError:无法腌制

问题:Python多处理PicklingError:无法腌制

很抱歉,我无法使用更简单的示例重现该错误,并且我的代码过于复杂,无法发布。如果我在IPython Shell中而不是在常规Python中运行该程序,那么效果会很好。

我查阅了有关此问题的以前的笔记。它们都是由使用池调用在类函数中定义的函数引起的。但这不是我的情况。

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

我将不胜感激任何帮助。

更新:我的泡菜功能定义在模块的顶层。虽然它调用包含嵌套函数的函数。即f()要求g()调用h()具有嵌套函数i(),和我打电话pool.apply_async(f)f()g()h()都在顶层定义。我用这种模式尝试了更简单的示例,尽管它可以工作。

I am sorry that I can’t reproduce the error with a simpler example, and my code is too complicated to post. If I run the program in IPython shell instead of the regular Python, things work out well.

I looked up some previous notes on this problem. They were all caused by using pool to call function defined within a class function. But this is not the case for me.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

I would appreciate any help.

Update: The function I pickle is defined at the top level of the module. Though it calls a function that contains a nested function. i.e, f() calls g() calls h() which has a nested function i(), and I am calling pool.apply_async(f). f(), g(), h() are all defined at the top level. I tried simpler example with this pattern and it works though.


回答 0

这是可以腌制的食物清单。特别是,只有在模块的顶层定义了功能时,这些功能才是可腌制的。

这段代码:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

产生的错误几乎与您发布的错误相同:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

问题在于,pool所有方法都使用a mp.SimpleQueue将任务传递给工作进程。mp.SimpleQueue必须经过的所有内容都必须是可选取的,并且foo.work不可选取,因为它不是在模块的顶层定义的。

可以通过在顶层定义一个函数来修复该问题,该函数调用foo.work()

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

请注意,它foo是可拾取的,因为它Foo是在顶层定义的并且 foo.__dict__是可拾取的。

Here is a list of what can be pickled. In particular, functions are only picklable if they are defined at the top-level of a module.

This piece of code:

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

yields an error almost identical to the one you posted:

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

The problem is that the pool methods all use a mp.SimpleQueue to pass tasks to the worker processes. Everything that goes through the mp.SimpleQueue must be pickable, and foo.work is not picklable since it is not defined at the top level of the module.

It can be fixed by defining a function at the top level, which calls foo.work():

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

Notice that foo is pickable, since Foo is defined at the top level and foo.__dict__ is picklable.


回答 1

我会用pathos.multiprocesssing,而不是multiprocessingpathos.multiprocessing是一个叉multiprocessing使用dilldill可以在python中序列化几乎所有内容,因此您可以并行发送更多内容。该pathos叉也有直接与多个参数的函数工作的能力,因为你需要为类方法。

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

在此处获取pathos(并且,如果愿意dill):https : //github.com/uqfoundation

I’d use pathos.multiprocesssing, instead of multiprocessing. pathos.multiprocessing is a fork of multiprocessing that uses dill. dill can serialize almost anything in python, so you are able to send a lot more around in parallel. The pathos fork also has the ability to work directly with multiple argument functions, as you need for class methods.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

Get pathos (and if you like, dill) here: https://github.com/uqfoundation


回答 2

正如其他人所说multiprocessing,只能将Python对象传输到可以腌制的工作进程。如果您不能按照unutbu的描述重新组织代码,则可以使用dill扩展的酸洗/酸洗功能来传输数据(尤其是代码数据),如下所示。

此解决方案仅需要安装以下dill库,而无需安装其他库pathos

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()

As others have said multiprocessing can only transfer Python objects to worker processes which can be pickled. If you cannot reorganize your code as described by unutbu, you can use dills extended pickling/unpickling capabilities for transferring data (especially code data) as I show below.

This solution requires only the installation of dill and no other libraries as pathos:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()

回答 3

我发现通过尝试在其上使用探查器,我还可以在完美工作的代码段上准确生成该错误输出。

请注意,这是在Windows上进行的(分叉不太优雅)。

我之前在跑步:

python -m profile -o output.pstats <script> 

并发现删除配置文件可以消除错误,并放置配置文件可以恢复错误。我也很生气,因为我知道以前的代码可以工作。我正在检查是否有更新过的pool.py …然后有下沉的感觉并消除了配置文件,仅此而已。

如果有人遇到,请在此处发布档案。

I have found that I can also generate exactly that error output on a perfectly working piece of code by attempting to use the profiler on it.

Note that this was on Windows (where the forking is a bit less elegant).

I was running:

python -m profile -o output.pstats <script> 

And found that removing the profiling removed the error and placing the profiling restored it. Was driving me batty too because I knew the code used to work. I was checking to see if something had updated pool.py… then had a sinking feeling and eliminated the profiling and that was it.

Posting here for the archives in case anybody else runs into it.


回答 4

解决此问题时,multiprocessing一个简单的解决方案是从切换PoolThreadPool。只需导入-

from multiprocessing.pool import ThreadPool as Pool

这是可行的,因为ThreadPool与主线程共享内存,而不是创建新进程-这意味着不需要酸洗。

这种方法的缺点是python并不是处理线程的最佳语言-它使用一种称为Global Interpreter Lock的方法来保持线程安全,这可能会减慢此处的一些用例。但是,如果您主要是与其他系统进行交互(运行HTTP命令,与数据库进行交谈,写入文件系统),则您的代码很可能不受CPU的束缚,因此不会受到太大的影响。实际上,在编写HTTP / HTTPS基准测试时,我发现这里使用的线程模型具有较少的开销和延迟,因为创建新进程的开销比创建新线程的开销高得多。

因此,如果要在python用户空间中处理大量内容,这可能不是最好的方法。

When this problem comes up with multiprocessing a simple solution is to switch from Pool to ThreadPool. This can be done with no change of code other than the import-

from multiprocessing.pool import ThreadPool as Pool

This works because ThreadPool shares memory with the main thread, rather than creating a new process- this means that pickling is not required.

The downside to this method is that python isn’t the greatest language with handling threads- it uses something called the Global Interpreter Lock to stay thread safe, which can slow down some use cases here. However, if you’re primarily interacting with other systems (running HTTP commands, talking with a database, writing to filesystems) then your code is likely not bound by CPU and won’t take much of a hit. In fact I’ve found when writing HTTP/HTTPS benchmarks that the threaded model used here has less overhead and delays, as the overhead from creating new processes is much higher than the overhead for creating new threads.

So if you’re processing a ton of stuff in python userspace this might not be the best method.


回答 5

此解决方案仅需要安装莳萝,而无需其他库作为安装程序

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

它也适用于numpy数组。

This solution requires only the installation of dill and no other libraries as pathos

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

It also works for numpy arrays.


回答 6

Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

如果您在传递给异步作业的模型对象中有任何内置函数,也会出现此错误。

因此,请确保检查传递的模型对象没有内置函数。(在我们的例子中,我们使用模型内部的django-model-utilsFieldTracker()函数来跟踪某个字段)。这是相关的GitHub问题的链接

Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

This error will also come if you have any inbuilt function inside the model object that was passed to the async job.

So make sure to check the model objects that are passed doesn’t have inbuilt functions. (In our case we were using FieldTracker() function of django-model-utils inside the model to track a certain field). Here is the link to relevant GitHub issue.


回答 7

建立在@rocksportrocker解决方案的基础上,在发送和接收结果时应该莳萝。

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)

Building on @rocksportrocker solution, It would make sense to dill when sending and RECVing the results.

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)