使用多重处理Pool.map()时无法腌制

问题:使用多重处理Pool.map()时无法腌制

我正在尝试使用multiprocessingPool.map()功能同时划分工作。当我使用以下代码时,它可以正常工作:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

但是,当我以更加面向对象的方式使用它时,它将无法正常工作。它给出的错误信息是:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

当以下是我的主程序时,会发生这种情况:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

这是我的someClass课:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

任何人都知道问题可能是什么,或解决问题的简单方法?

I’m trying to use multiprocessing‘s Pool.map() function to divide out work simultaneously. When I use the following code, it works fine:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

However, when I use it in a more object-oriented approach, it doesn’t work. The error message it gives is:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

This occurs when the following is my main program:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

and the following is my someClass class:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

Anyone know what the problem could be, or an easy way around it?


回答 0

问题在于,多处理必须使进程中的事物腌制,并且绑定的方法不可腌制。解决方法(无论您是否认为它“简单” ;-)是向您的程序中添加基础结构,以允许对这些方法进行腌制,并使用copy_reg标准库方法进行注册。

例如,史蒂文·贝萨德(Steven Bethard)对这个线程的贡献(接近线程的结尾)显示了一种非常可行的方法,该方法允许通过进行酸洗/取消酸洗copy_reg

The problem is that multiprocessing must pickle things to sling them among processes, and bound methods are not picklable. The workaround (whether you consider it “easy” or not;-) is to add the infrastructure to your program to allow such methods to be pickled, registering it with the copy_reg standard library method.

For example, Steven Bethard’s contribution to this thread (towards the end of the thread) shows one perfectly workable approach to allow method pickling/unpickling via copy_reg.


回答 1

所有这些解决方案都是丑陋的,因为除非您跳出标准库,否则多重处理和酸洗会受到破坏和限制。

如果您使用的一个分支multiprocessingpathos.multiprocesssing,你可以直接使用类和类方法在多处理的map功能。这是因为dill用代替picklecPickle,并且dill可以在python中序列化几乎所有内容。

pathos.multiprocessing还提供了异步映射功能…,并且可以map使用多个参数(例如map(math.pow, [1,2,3], [4,5,6]))运行

请参阅: 多重处理和莳萝可以一起做什么?

和:http : //matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

明确地说,您可以完全按照自己的意愿进行操作,如果需要,可以从解释器中进行操作。

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

在此处获取代码:https : //github.com/uqfoundation/pathos

All of these solutions are ugly because multiprocessing and pickling is broken and limited unless you jump outside the standard library.

If you use a fork of multiprocessing called pathos.multiprocesssing, you can directly use classes and class methods in multiprocessing’s map functions. This is because dill is used instead of pickle or cPickle, and dill can serialize almost anything in python.

pathos.multiprocessing also provides an asynchronous map function… and it can map functions with multiple arguments (e.g. map(math.pow, [1,2,3], [4,5,6]))

See: What can multiprocessing and dill do together?

and: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

And just to be explicit, you can do exactly want you wanted to do in the first place, and you can do it from the interpreter, if you wanted to.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

Get the code here: https://github.com/uqfoundation/pathos


回答 2

您还可以__call__()在中定义一个方法someClass(),该方法会调用someClass.go(),然后将的实例传递someClass()给池。该对象是可腌制的,并且对我来说很好用…

You could also define a __call__() method inside your someClass(), which calls someClass.go() and then pass an instance of someClass() to the pool. This object is pickleable and it works fine (for me)…


回答 3

尽管对史蒂文·贝萨德的解决方案有一些限制:

当您将类方法注册为函数时,每次您的方法处理完成时,都会令人惊讶地调用类的析构函数。因此,如果您有一个类的实例调用其方法的n倍,则成员可能在两次运行之间消失,并且可能会收到一条消息malloc: *** error for object 0x...: pointer being freed was not allocated(例如,打开的成员文件)或pure virtual method called, terminate called without an active exception(这意味着我使用的成员对象的生存期短于我的想法)。当处理大于池大小的n时,我得到了这个。这是一个简短的示例:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

输出:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

__call__方法不是那么等效,因为从结果中读取了[None,…]:

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

所以这两种方法都不令人满意…

Some limitations though to Steven Bethard’s solution :

When you register your class method as a function, the destructor of your class is surprisingly called every time your method processing is finished. So if you have 1 instance of your class that calls n times its method, members may disappear between 2 runs and you may get a message malloc: *** error for object 0x...: pointer being freed was not allocated (e.g. open member file) or pure virtual method called, terminate called without an active exception (which means than the lifetime of a member object I used was shorter than what I thought). I got this when dealing with n greater than the pool size. Here is a short example :

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

Output:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

The __call__ method is not so equivalent, because [None,…] are read from the results :

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

So none of both methods is satisfying…


回答 4

您可以使用另一种快捷方式,尽管根据类实例中的内容,效率可能会很低。

就像大家都说过的那样,问题在于multiprocessing代码必须对发送给它已启动的子流程的内容进行腌制,并且腌制者不执行实例方法。

但是,除了发送实例方法之外,您可以将实际的类实例以及要调用的函数的名称发送到一个普通函数,该普通函数随后用于getattr调用该实例方法,从而在Pool子进程中创建绑定方法。这类似于定义__call__方法,不同之处在于您可以调用多个成员函数。

从所有答案中窃取@EricH。的代码并对其进行注释(我重新输入了代码,因此所有名称都更改了,因此,由于某种原因,这似乎比剪切粘贴更容易:-))来说明所有魔术:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

输出显示,确实,构造函数被调用一次(在原始pid中),而析构函数被调用9次(每次复制一次=根据需要每个pool-worker-process 2到3次,在原始副本中被调用一次处理)。通常是可以的,因为在这种情况下,默认的选择器会复制整个实例,然后(半)秘密地重新填充它,在这种情况下,请执行以下操作:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

这就是为什么即使在三个工作进程中都调用了析构函数八次,每次都将析构函数从1递减为0的原因,但是当然您仍然会遇到这种麻烦。如有必要,您可以提供自己的__setstate__

    def __setstate__(self, adict):
        self.count = adict['count']

例如在这种情况下。

There’s another short-cut you can use, although it can be inefficient depending on what’s in your class instances.

As everyone has said the problem is that the multiprocessing code has to pickle the things that it sends to the sub-processes it has started, and the pickler doesn’t do instance-methods.

However, instead of sending the instance-method, you can send the actual class instance, plus the name of the function to call, to an ordinary function that then uses getattr to call the instance-method, thus creating the bound method in the Pool subprocess. This is similar to defining a __call__ method except that you can call more than one member function.

Stealing @EricH.’s code from his answer and annotating it a bit (I retyped it hence all the name changes and such, for some reason this seemed easier than cut-and-paste :-) ) for illustration of all the magic:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

The output shows that, indeed, the constructor is called once (in the original pid) and the destructor is called 9 times (once for each copy made = 2 or 3 times per pool-worker-process as needed, plus once in the original process). This is often OK, as in this case, since the default pickler makes a copy of the entire instance and (semi-) secretly re-populates it—in this case, doing:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

—that’s why even though the destructor is called eight times in the three worker processes, it counts down from 1 to 0 each time—but of course you can still get into trouble this way. If necessary, you can provide your own __setstate__:

    def __setstate__(self, adict):
        self.count = adict['count']

in this case for instance.


回答 5

您还可以__call__()在中定义一个方法someClass(),该方法会调用someClass.go(),然后将的实例传递someClass()给池。该对象是可腌制的,并且对我来说很好用…

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()

You could also define a __call__() method inside your someClass(), which calls someClass.go() and then pass an instance of someClass() to the pool. This object is pickleable and it works fine (for me)…

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()

回答 6

上面parisjohn的解决方案对我来说很好用。另外,代码看起来干净而且易于理解。就我而言,有一些使用Pool调用的函数,因此我在下面修改了parisjohn的代码。我进行了调用,以便能够调用多个函数,并且函数名称从传入自变量dict中go()

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()

The solution from parisjohn above works fine with me. Plus the code looks clean and easy to understand. In my case there are a few functions to call using Pool, so I modified parisjohn’s code a bit below. I made call to be able to call several functions, and the function names are passed in the argument dict from go():

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()

回答 7

一个可能的解决方案是切换到using multiprocessing.dummy。这是多处理接口的基于线程的实现,在python 2.7中似乎没有此问题。我在这里经验不足,但是快速的导入更改使我可以在类方法上调用apply_async。

以下是一些很好的资源multiprocessing.dummy

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/

A potentially trivial solution to this is to switch to using multiprocessing.dummy. This is a thread based implementation of the multiprocessing interface that doesn’t seem to have this problem in Python 2.7. I don’t have a lot of experience here, but this quick import change allowed me to call apply_async on a class method.

A few good resources on multiprocessing.dummy:

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/


回答 8

在这种简单的情况下,someClass.f不从类继承任何数据并且不向类附加任何内容,一种可能的解决方案是将out分离出来f,以便对其进行腌制:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))

In this simple case, where someClass.f is not inheriting any data from the class and not attaching anything to the class, a possible solution would be to separate out f, so it can be pickled:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))

回答 9

为什么不使用单独的func?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)

Why not to use separate func?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)

回答 10

我遇到了同样的问题,但发现有一个JSON编码器可用于在进程之间移动这些对象。

from pyVmomi.VmomiSupport import VmomiJSONEncoder

使用它来创建您的列表:

jsonSerialized = json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)

然后在映射函数中,使用它来恢复对象:

pfVmomiObj = json.loads(jsonSerialized)

I ran into this same issue but found out that there is a JSON encoder that can be used to move these objects between processes.

from pyVmomi.VmomiSupport import VmomiJSONEncoder

Use this to create your list:

jsonSerialized = json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)

Then in the mapped function, use this to recover the object:

pfVmomiObj = json.loads(jsonSerialized)

回答 11

更新:截至撰写本文之日,namedTuples可供选择(从python 2.7开始)

这里的问题是子进程无法导入对象的类-在这种情况下,是类P-,在多模型项目的情况下,类P应该可以在使用子进程的任何地方导入

一个快速的解决方法是通过将其影响到globals()使其可导入

globals()["P"] = P

Update: as of the day of this writing, namedTuples are pickable (starting with python 2.7)

The issue here is the child processes aren’t able to import the class of the object -in this case, the class P-, in the case of a multi-model project the Class P should be importable anywhere the child process get used

a quick workaround is to make it importable by affecting it to globals()

globals()["P"] = P