



from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    return i*i

def go():
    pool = Pool(8)
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        print "You cancelled the program!"
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":



How can I handle KeyboardInterrupt events with python’s multiprocessing Pools? Here is a simple example:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    return i*i

def go():
    pool = Pool(8)
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        print "You cancelled the program!"
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":

When running the code above, the KeyboardInterrupt gets raised when I press ^C, but the process simply hangs at that point and I have to kill it externally.

I want to be able to press ^C at any time and cause all of the processes to exit gracefully.

回答 0


import threading
cond = threading.Condition(threading.Lock())
print "done"



    results = pool.map(slowly_square, range(40))

    results = pool.map_async(slowly_square, range(40)).get(9999999)


This is a Python bug. When waiting for a condition in threading.Condition.wait(), KeyboardInterrupt is never sent. Repro:

import threading
cond = threading.Condition(threading.Lock())
print "done"

The KeyboardInterrupt exception won’t be delivered until wait() returns, and it never returns, so the interrupt never happens. KeyboardInterrupt should almost certainly interrupt a condition wait.

Note that this doesn’t happen if a timeout is specified; cond.wait(1) will receive the interrupt immediately. So, a workaround is to specify a timeout. To do that, replace

    results = pool.map(slowly_square, range(40))


    results = pool.map_async(slowly_square, range(40)).get(9999999)

or similar.

回答 1


import signal


def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def main()
    pool = multiprocessing.Pool(size, init_worker)


    except KeyboardInterrupt:


From what I have recently found, the best solution is to set up the worker processes to ignore SIGINT altogether, and confine all the cleanup code to the parent process. This fixes the problem for both idle and busy worker processes, and requires no error handling code in your child processes.

import signal


def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def main()
    pool = multiprocessing.Pool(size, init_worker)


    except KeyboardInterrupt:

Explanation and full example code can be found at http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ and http://github.com/jreese/multiprocessing-keyboardinterrupt respectively.

回答 2


from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
        print 'starting the pool map'
        print p.map(f, range(10))
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        print 'pool is terminated'
        print 'joining pool processes'
        print 'join complete'
    print 'the end'

if __name__ == '__main__':


staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end


staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

For some reasons, only exceptions inherited from the base Exception class are handled normally. As a workaround, you may re-raise your KeyboardInterrupt as an Exception instance:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
        print 'starting the pool map'
        print p.map(f, range(10))
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        print 'pool is terminated'
        print 'joining pool processes'
        print 'join complete'
    print 'the end'

if __name__ == '__main__':

Normally you would get the following output:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

So if you hit ^C, you will get:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

回答 3


def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)



Usually this simple structure works for CtrlC on Pool :

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

As was stated in few similar posts:

Capture keyboardinterrupt in Python without try-except

回答 4


多重处理库的作者Jesse Noller解释了multiprocessing.Pool在旧博客中使用CTRL + C时如何正确处理。

import signal
from multiprocessing import Pool

def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)

pool = Pool(initializer=initializer)

    pool.map(perform_download, dowloads)
except KeyboardInterrupt:

The voted answer does not tackle the core issue but a similar side effect.

Jesse Noller, the author of the multiprocessing library, explains how to correctly deal with CTRL+C when using multiprocessing.Pool in a old blog post.

import signal
from multiprocessing import Pool

def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)

pool = Pool(initializer=initializer)

    pool.map(perform_download, dowloads)
except KeyboardInterrupt:

回答 5


import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
        return function(arg)
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
    return results

It seems there are two issues that make exceptions while multiprocessing annoying. The first (noted by Glenn) is that you need to use map_async with a timeout instead of map in order to get an immediate response (i.e., don’t finish processing the entire list). The second (noted by Andrey) is that multiprocessing doesn’t catch exceptions that don’t inherit from Exception (e.g., SystemExit). So here’s my solution that deals with both of these:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
        return function(arg)
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
    return results

回答 6



I found, for the time being, the best solution is to not use the multiprocessing.pool feature but rather roll your own pool functionality. I provided an example demonstrating the error with apply_async as well as an example showing how to avoid using the pool functionality altogether.


回答 7

我是Python的新手。我到处都在寻找答案,却偶然发现了这个以及其他一些博客和YouTube视频。我试图将粘贴作者的代码复制到上面,并在Windows 7 64位的python 2.7.13上重现它。这接近我想要实现的目标。



from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
        print "<slowly_square> Sleeping and later running a square calculation..."
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"

def go():
    pool = Pool(8)

        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        print "You cancelled the program!"
    print "Finally, here are the results", results

if __name__ == '__main__':


I’m a newbie in Python. I was looking everywhere for answer and stumble upon this and a few other blogs and youtube videos. I have tried to copy paste the author’s code above and reproduce it on my python 2.7.13 in windows 7 64- bit. It’s close to what I wanna achieve.

I made my child processes to ignore the ControlC and make the parent process terminate. Looks like bypassing the child process does avoid this problem for me.


from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
        print "<slowly_square> Sleeping and later running a square calculation..."
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"

def go():
    pool = Pool(8)

        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        print "You cancelled the program!"
    print "Finally, here are the results", results

if __name__ == '__main__':

The part starting at pool.terminate() never seems to execute.

回答 8


import multiprocessing
import time
from datetime import datetime

def test_func(x):
    return x**2

def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
            except Exception as e:
                results[value] = e
        return results
    except Exception:

if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)


Multiprocessing run time: 0:00:41.131000
Non-multiprocessing run time: 0:03:20.688000


>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

You can try using the apply_async method of a Pool object, like this:

import multiprocessing
import time
from datetime import datetime

def test_func(x):
    return x**2

def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
            except Exception as e:
                results[value] = e
        return results
    except Exception:

if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)


Multiprocessing run time: 0:00:41.131000
Non-multiprocessing run time: 0:03:20.688000

An advantage of this method is that results processed before interruption will be returned in the results dictionary:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

回答 9


def slowly_square(i):
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0


Strangely enough it looks like you have to handle the KeyboardInterrupt in the children as well. I would have expected this to work as written… try changing slowly_square to:

def slowly_square(i):
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

That should work as you expected.






from joblib import Parallel,delayed
import numpy as np

def testfunc(data):
    # some very boneheaded CPU work
    for nn in xrange(1000):
        for ii in data[0,:]:
            for jj in data[1,:]:

def run(niter=10):
    data = (np.random.randn(2,100) for ii in xrange(niter))
    pool = Parallel(n_jobs=-1,verbose=1,pre_dispatch='all')
    results = pool(delayed(testfunc)(dd) for dd in data)

if __name__ == '__main__':


我在具有4核的笔记本电脑上运行Ubuntu 12.10(3.5.0-26)。显然joblib.Parallel是为不同的工作人员生成了单独的进程,但是有什么方法可以使这些进程在不同的内核上执行?

I am not sure whether this counts more as an OS issue, but I thought I would ask here in case anyone has some insight from the Python end of things.

I’ve been trying to parallelise a CPU-heavy for loop using joblib, but I find that instead of each worker process being assigned to a different core, I end up with all of them being assigned to the same core and no performance gain.

Here’s a very trivial example…

from joblib import Parallel,delayed
import numpy as np

def testfunc(data):
    # some very boneheaded CPU work
    for nn in xrange(1000):
        for ii in data[0,:]:
            for jj in data[1,:]:

def run(niter=10):
    data = (np.random.randn(2,100) for ii in xrange(niter))
    pool = Parallel(n_jobs=-1,verbose=1,pre_dispatch='all')
    results = pool(delayed(testfunc)(dd) for dd in data)

if __name__ == '__main__':

…and here’s what I see in htop while this script is running:

I’m running Ubuntu 12.10 (3.5.0-26) on a laptop with 4 cores. Clearly joblib.Parallel is spawning separate processes for the different workers, but is there any way that I can make these processes execute on different cores?

回答 0




os.system("taskset -p 0xff %d" % os.getpid())





OPENBLAS_MAIN_FREE=1 python myscript.py



After some more googling I found the answer here.

It turns out that certain Python modules (numpy, scipy, tables, pandas, skimage…) mess with core affinity on import. As far as I can tell, this problem seems to be specifically caused by them linking against multithreaded OpenBLAS libraries.

A workaround is to reset the task affinity using

os.system("taskset -p 0xff %d" % os.getpid())

With this line pasted in after the module imports, my example now runs on all cores:

My experience so far has been that this doesn’t seem to have any negative effect on numpy‘s performance, although this is probably machine- and task-specific .


There are also two ways to disable the CPU affinity-resetting behaviour of OpenBLAS itself. At run-time you can use the environment variable OPENBLAS_MAIN_FREE (or GOTOBLAS_MAIN_FREE), for example

OPENBLAS_MAIN_FREE=1 python myscript.py

Or alternatively, if you’re compiling OpenBLAS from source you can permanently disable it at build-time by editing the Makefile.rule to contain the line


回答 1

Python 3现在公开了直接设置亲和力的方法

>>> import os
>>> os.sched_getaffinity(0)
{0, 1, 2, 3}
>>> os.sched_setaffinity(0, {1, 3})
>>> os.sched_getaffinity(0)
{1, 3}
>>> x = {i for i in range(10)}
>>> x
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
>>> os.sched_setaffinity(0, x)
>>> os.sched_getaffinity(0)
{0, 1, 2, 3}

Python 3 now exposes the methods to directly set the affinity

>>> import os
>>> os.sched_getaffinity(0)
{0, 1, 2, 3}
>>> os.sched_setaffinity(0, {1, 3})
>>> os.sched_getaffinity(0)
{1, 3}
>>> x = {i for i in range(10)}
>>> x
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
>>> os.sched_setaffinity(0, x)
>>> os.sched_getaffinity(0)
{0, 1, 2, 3}

回答 2






def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]






from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;

pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
print "multiprocessing overhead = ", time.time() - t;


construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529


Suppose I have a large in memory numpy array, I have a function func that takes in this giant array as input (together with some other parameters). func with different parameters can be run in parallel. For example:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

If I use multiprocessing library, then that giant array will be copied for multiple times into different processes.

Is there a way to let different processes share the same array? This array object is read-only and will never be modified.

What’s more complicated, if arr is not an array, but an arbitrary python object, is there a way to share it?


I read the answer but I am still a bit confused. Since fork() is copy-on-write, we should not invoke any additional cost when spawning new processes in python multiprocessing library. But the following code suggests there is a huge overhead:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;

pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
print "multiprocessing overhead = ", time.time() - t;

output (and by the way, the cost increases as the size of the array increases, so I suspect there is still overhead related to memory copying):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

Why is there such huge overhead, if we didn’t copy the array? And what part does the shared memory save me?

回答 0






If you use an operating system that uses copy-on-write fork() semantics (like any common unix), then as long as you never alter your data structure it will be available to all child processes without taking up additional memory. You will not have to do anything special (except make absolutely sure you don’t alter the object).

The most efficient thing you can do for your problem would be to pack your array into an efficient array structure (using numpy or array), place that in shared memory, wrap it with multiprocessing.Array, and pass that to your functions. This answer shows how to do that.

If you want a writeable shared object, then you will need to wrap it with some kind of synchronization or locking. multiprocessing provides two methods of doing this: one using shared memory (suitable for simple values, arrays, or ctypes) or a Manager proxy, where one process holds the memory and a manager arbitrates access to it from other processes (even over a network).

The Manager approach can be used with arbitrary Python objects, but will be slower than the equivalent using shared memory because the objects need to be serialized/deserialized and sent between processes.

There are a wealth of parallel processing libraries and approaches available in Python. multiprocessing is an excellent and well rounded library, but if you have special needs perhaps one of the other approaches may be better.

回答 1





Created on 14.05.2013

@author: martin

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):

Singleton Pattern
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):


        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1


        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1

    def __getArray(self, i):
        return self.shared_arrays[i]

    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))

I run into the same problem and wrote a little shared-memory utility class to work around it.

I’m using multiprocessing.RawArray (lockfree), and also the access to the arrays is not synchronized at all (lockfree), be careful not to shoot your own feet.

With the solution I get speedups by a factor of approx 3 on a quad-core i7.

Here’s the code: Feel free to use and improve it, and please report back any bugs.

Created on 14.05.2013

@author: martin

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):

Singleton Pattern
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):


        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1


        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1

    def __getArray(self, i):
        return self.shared_arrays[i]

    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))

回答 2

这是Ray的预期用例,这是一个用于并行和分布式Python的库。在后台,它使用Apache Arrow数据布局(零副本格式)序列化对象,并将其存储在共享内存对象存储中,这样多个进程可以访问它们而无需创建副本。


import numpy as np
import ray


def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)




import numpy as np
import pickle
import ray


x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s



This is the intended use case for Ray, which is a library for parallel and distributed Python. Under the hood, it serializes objects using the Apache Arrow data layout (which is a zero-copy format) and stores them in a shared-memory object store so they can be accessed by multiple processes without creating copies.

The code would look like the following.

import numpy as np
import ray


def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

If you don’t call ray.put then the array will still be stored in shared memory, but that will be done once per invocation of func, which is not what you want.

Note that this will work not only for arrays but also for objects that contain arrays, e.g., dictionaries mapping ints to arrays as below.

You can compare the performance of serialization in Ray versus pickle by running the following in IPython.

import numpy as np
import pickle
import ray


x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

Serialization with Ray is only slightly faster than pickle, but deserialization is 1000x faster because of the use of shared memory (this number will of course depend on the object).

See the Ray documentation. You can read more about fast serialization using Ray and Arrow. Note I’m one of the Ray developers.

回答 3

就像Robert Nishihara提到的那样,Apache Arrow使得这一点变得容易,特别是使用Plasma内存对象存储库,这正是Ray所基于的。

为此,我专门制作了脑等离子体 -在Flask应用中快速加载和重新加载大对象。它是Apache Arrow可序列化对象的共享内存对象命名空间,包括pickle.d生成的’d字节字符串pickle.dumps(...)

Apache Ray和Plasma的主要区别在于,它可以为您跟踪对象ID。在本地运行的任何进程,线程或程序都可以通过从任何Brain对象中调用名称来共享变量的值。

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/)

brain['a'] = [1]*10000

# >>> [1,1,1,1,...]

Like Robert Nishihara mentioned, Apache Arrow makes this easy, specifically with the Plasma in-memory object store, which is what Ray is built on.

I made brain-plasma specifically for this reason – fast loading and reloading of big objects in a Flask app. It’s a shared-memory object namespace for Apache Arrow-serializable objects, including pickle‘d bytestrings generated by pickle.dumps(...).

The key difference with Apache Ray and Plasma is that it keeps track of object IDs for you. Any processes or threads or programs that are running on locally can share the variables’ values by calling the name from any Brain object.

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/)

brain['a'] = [1]*10000

# >>> [1,1,1,1,...]



我正在尝试在Windows机器上使用Threading and Multiprocessing的第一个正式python程序。我无法启动进程,但是python给出了以下消息。问题是,我没有在模块中启动线程。线程在类内的单独模块中处理。


            Attempt to start a new process before the current process
            has finished its bootstrapping phase.
            This probably means that you are on Windows and you have
            forgotten to use the proper idiom in the main module:
                if __name__ == '__main__':
            The "freeze_support()" line can be omitted if the program
            is not going to be frozen to produce a Windows executable.



import parallelTestModule

extractor = parallelTestModule.ParallelExtractor()
extractor.runInParallel(numProcesses=2, numThreads=4)


import multiprocessing
from multiprocessing import Process
import threading

class ThreadRunner(threading.Thread):
    """ This class represents a single instance of a running thread"""
    def __init__(self, name):
        self.name = name
    def run(self):
        print self.name,'\n'

class ProcessRunner:
    """ This class represents a single instance of a running process """
    def runp(self, pid, numThreads):
        mythreads = []
        for tid in range(numThreads):
            name = "Proc-"+str(pid)+"-Thread-"+str(tid)
            th = ThreadRunner(name)
        for i in mythreads:
        for i in mythreads:

class ParallelExtractor:    
    def runInParallel(self, numProcesses, numThreads):
        myprocs = []
        prunner = ProcessRunner()
        for pid in range(numProcesses):
            pr = Process(target=prunner.runp, args=(pid, numThreads)) 
#        if __name__ == 'parallelTestModule':    #This didnt work
#        if __name__ == '__main__':              #This obviously doesnt work
#        multiprocessing.freeze_support()        #added after seeing error to no avail
        for i in myprocs:

        for i in myprocs:

I am trying my very first formal python program using Threading and Multiprocessing on a windows machine. I am unable to launch the processes though, with python giving the following message. The thing is, I am not launching my threads in the main module. The threads are handled in a separate module inside a class.

EDIT: By the way this code runs fine on ubuntu. Not quite on windows

            Attempt to start a new process before the current process
            has finished its bootstrapping phase.
            This probably means that you are on Windows and you have
            forgotten to use the proper idiom in the main module:
                if __name__ == '__main__':
            The "freeze_support()" line can be omitted if the program
            is not going to be frozen to produce a Windows executable.

My original code is pretty long, but I was able to reproduce the error in an abridged version of the code. It is split in two files, the first is the main module and does very little other than import the module which handles processes/threads and calls a method. The second module is where the meat of the code is.


import parallelTestModule

extractor = parallelTestModule.ParallelExtractor()
extractor.runInParallel(numProcesses=2, numThreads=4)


import multiprocessing
from multiprocessing import Process
import threading

class ThreadRunner(threading.Thread):
    """ This class represents a single instance of a running thread"""
    def __init__(self, name):
        self.name = name
    def run(self):
        print self.name,'\n'

class ProcessRunner:
    """ This class represents a single instance of a running process """
    def runp(self, pid, numThreads):
        mythreads = []
        for tid in range(numThreads):
            name = "Proc-"+str(pid)+"-Thread-"+str(tid)
            th = ThreadRunner(name)
        for i in mythreads:
        for i in mythreads:

class ParallelExtractor:    
    def runInParallel(self, numProcesses, numThreads):
        myprocs = []
        prunner = ProcessRunner()
        for pid in range(numProcesses):
            pr = Process(target=prunner.runp, args=(pid, numThreads)) 
#        if __name__ == 'parallelTestModule':    #This didnt work
#        if __name__ == '__main__':              #This obviously doesnt work
#        multiprocessing.freeze_support()        #added after seeing error to no avail
        for i in myprocs:

        for i in myprocs:

回答 0

在Windows上,子进程将在启动时导入(即执行)主模块。您需要if __name__ == '__main__':在主模块中插入防护,以避免递归创建子流程。


import parallelTestModule

if __name__ == '__main__':    
    extractor = parallelTestModule.ParallelExtractor()
    extractor.runInParallel(numProcesses=2, numThreads=4)

On Windows the subprocesses will import (i.e. execute) the main module at start. You need to insert an if __name__ == '__main__': guard in the main module to avoid creating subprocesses recursively.

Modified testMain.py:

import parallelTestModule

if __name__ == '__main__':    
    extractor = parallelTestModule.ParallelExtractor()
    extractor.runInParallel(numProcesses=2, numThreads=4)

回答 1


import parallelTestModule

if __name__ ==  '__main__':
  extractor = parallelTestModule.ParallelExtractor()
  extractor.runInParallel(numProcesses=2, numThreads=4)


"For an explanation of why (on Windows) the if __name__ == '__main__' 
part is necessary, see Programming guidelines."



… 通过使用 if __name__ == '__main__'

Try putting your code inside a main function in testMain.py

import parallelTestModule

if __name__ ==  '__main__':
  extractor = parallelTestModule.ParallelExtractor()
  extractor.runInParallel(numProcesses=2, numThreads=4)

See the docs:

"For an explanation of why (on Windows) the if __name__ == '__main__' 
part is necessary, see Programming guidelines."

which say

“Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).”

… by using if __name__ == '__main__'

回答 2



if __name__ ==  '__main__':
  import my_module

Though the earlier answers are correct, there’s a small complication it would help to remark on.

In case your main module imports another module in which global variables or class member variables are defined and initialized to (or using) some new objects, you may have to condition that import in the same way:

if __name__ ==  '__main__':
  import my_module

回答 3

正如@Ofer所说,当您使用其他库或模块时,应将所有库或模块导入其中。 if __name__ == '__main__':


if __name__ == '__main__':       
    import librosa
    import os
    import pandas as pd

As @Ofer said, when you are using another libraries or modules, you should import all of them inside the if __name__ == '__main__':

So, in my case, ended like this:

if __name__ == '__main__':       
    import librosa
    import os
    import pandas as pd

回答 4


In my case it was a simple bug in the code, using a variable before it was created. Worth checking that out before trying the above solutions. Why I got this particular error message, Lord knows.






A program that creates several processes that work on a join-able queue, Q, and may eventually manipulate a global dictionary D to store results. (so each child process may use D to store its result and also see what results the other child processes are producing)

If I print the dictionary D in a child process, I see the modifications that have been done on it (i.e. on D). But after the main process joins Q, if I print D, it’s an empty dict!

I understand it is a synchronization/lock issue. Can someone tell me what is happening here, and how I can synchronize access to D?

回答 0


from multiprocessing import Process, Manager

def f(d):
    d[1] += '1'
    d['2'] += 2

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    d[1] = '1'
    d['2'] = 2

    p1 = Process(target=f, args=(d,))
    p2 = Process(target=f, args=(d,))

    print d


$ python mul.py 
{1: '111', '2': 6}

A general answer involves using a Manager object. Adapted from the docs:

from multiprocessing import Process, Manager

def f(d):
    d[1] += '1'
    d['2'] += 2

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    d[1] = '1'
    d['2'] = 2

    p1 = Process(target=f, args=(d,))
    p2 = Process(target=f, args=(d,))

    print d


$ python mul.py 
{1: '111', '2': 6}

回答 1


多重处理可为您的用例提供一些抽象-共享状态通过使用代理或共享内存被视为本地状态:http : //docs.python.org/library/multiprocessing.html#sharing-state-between-processes


multiprocessing is not like threading. Each child process will get a copy of the main process’s memory. Generally state is shared via communication (pipes/sockets), signals, or shared memory.

Multiprocessing makes some abstractions available for your use case – shared state that’s treated as local by use of proxies or shared memory: http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes

Relevant sections:

回答 2

我想分享自己的工作,该工作比Manager的指令快,并且比使用大量内存并且不适用于Mac OS的pyshmht库更简单,更稳定。虽然我的字典仅适用于纯字符串,并且目前不可变。我使用线性探测实现,并将键和值对存储在表后的单独内存块中。

from mmap import mmap
import struct
from timeit import default_timer
from multiprocessing import Manager
from pyshmht import HashTable

class shared_immutable_dict:
    def __init__(self, a):
        self.hs = 1 << (len(a) * 3).bit_length()
        kvp = self.hs * 4
        ht = [0xffffffff] * self.hs
        kvl = []
        for k, v in a.iteritems():
            h = self.hash(k)
            while ht[h] != 0xffffffff:
                h = (h + 1) & (self.hs - 1)
            ht[h] = kvp
            kvp += self.kvlen(k) + self.kvlen(v)

        self.m = mmap(-1, kvp)
        for p in ht:
        for x in kvl:
            if len(x) <= 0x7f:
                self.m.write(uint_format.pack(0x80000000 + len(x)))

    def hash(self, k):
        h = hash(k)
        h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1)
        return h

    def get(self, k, d=None):
        h = self.hash(k)
        while True:
            x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0]
            if x == 0xffffffff:
                return d
            if k == self.read_kv():
                return self.read_kv()
            h = (h + 1) & (self.hs - 1)

    def read_kv(self):
        sz = ord(self.m.read_byte())
        if sz & 0x80:
            sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000
        return self.m.read(sz)

    def kvlen(self, k):
        return len(k) + (1 if len(k) <= 0x7f else 4)

    def __contains__(self, k):
        return self.get(k, None) is not None

    def close(self):

uint_format = struct.Struct('>I')

def uget(a, k, d=None):
    return to_unicode(a.get(to_str(k), d))

def uin(a, k):
    return to_str(k) in a

def to_unicode(s):
    return s.decode('utf-8') if isinstance(s, str) else s

def to_str(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s

def mmap_test():
    n = 1000000
    d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'mmap speed: %d gets per sec' % (n / (default_timer() - start_time))

def manager_test():
    n = 100000
    d = Manager().dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'manager speed: %d gets per sec' % (n / (default_timer() - start_time))

def shm_test():
    n = 1000000
    d = HashTable('tmp', n)
    d.update({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'shm speed: %d gets per sec' % (n / (default_timer() - start_time))

if __name__ == '__main__':


mmap speed: 247288 gets per sec
manager speed: 33792 gets per sec
shm speed: 691332 gets per sec


ht = shared_immutable_dict({'a': '1', 'b': '2'})
print ht.get('a')

I’d like to share my own work that is faster than Manager’s dict and is simpler and more stable than pyshmht library that uses tons of memory and doesn’t work for Mac OS. Though my dict only works for plain strings and is immutable currently. I use linear probing implementation and store keys and values pairs in a separate memory block after the table.

from mmap import mmap
import struct
from timeit import default_timer
from multiprocessing import Manager
from pyshmht import HashTable

class shared_immutable_dict:
    def __init__(self, a):
        self.hs = 1 << (len(a) * 3).bit_length()
        kvp = self.hs * 4
        ht = [0xffffffff] * self.hs
        kvl = []
        for k, v in a.iteritems():
            h = self.hash(k)
            while ht[h] != 0xffffffff:
                h = (h + 1) & (self.hs - 1)
            ht[h] = kvp
            kvp += self.kvlen(k) + self.kvlen(v)

        self.m = mmap(-1, kvp)
        for p in ht:
        for x in kvl:
            if len(x) <= 0x7f:
                self.m.write(uint_format.pack(0x80000000 + len(x)))

    def hash(self, k):
        h = hash(k)
        h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1)
        return h

    def get(self, k, d=None):
        h = self.hash(k)
        while True:
            x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0]
            if x == 0xffffffff:
                return d
            if k == self.read_kv():
                return self.read_kv()
            h = (h + 1) & (self.hs - 1)

    def read_kv(self):
        sz = ord(self.m.read_byte())
        if sz & 0x80:
            sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000
        return self.m.read(sz)

    def kvlen(self, k):
        return len(k) + (1 if len(k) <= 0x7f else 4)

    def __contains__(self, k):
        return self.get(k, None) is not None

    def close(self):

uint_format = struct.Struct('>I')

def uget(a, k, d=None):
    return to_unicode(a.get(to_str(k), d))

def uin(a, k):
    return to_str(k) in a

def to_unicode(s):
    return s.decode('utf-8') if isinstance(s, str) else s

def to_str(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s

def mmap_test():
    n = 1000000
    d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'mmap speed: %d gets per sec' % (n / (default_timer() - start_time))

def manager_test():
    n = 100000
    d = Manager().dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'manager speed: %d gets per sec' % (n / (default_timer() - start_time))

def shm_test():
    n = 1000000
    d = HashTable('tmp', n)
    d.update({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'shm speed: %d gets per sec' % (n / (default_timer() - start_time))

if __name__ == '__main__':

On my laptop performance results are:

mmap speed: 247288 gets per sec
manager speed: 33792 gets per sec
shm speed: 691332 gets per sec

simple usage example:

ht = shared_immutable_dict({'a': '1', 'b': '2'})
print ht.get('a')

回答 3


令人高兴的是,实例中有一个.Pool()方法可以manager模拟所有熟悉的顶层API multiprocessing

from itertools import repeat
import multiprocessing as mp
import os
import pprint

def f(d: dict) -> None:
    pid = os.getpid()
    d[pid] = "Hi, I was written by process %d" % pid

if __name__ == '__main__':
    with mp.Manager() as manager:
        d = manager.dict()
        with manager.Pool() as pool:
            pool.map(f, repeat(d, 10))
        # `d` is a DictProxy object that can be converted to dict


$ python3 mul.py 
{22562: 'Hi, I was written by process 22562',
 22563: 'Hi, I was written by process 22563',
 22564: 'Hi, I was written by process 22564',
 22565: 'Hi, I was written by process 22565',
 22566: 'Hi, I was written by process 22566',
 22567: 'Hi, I was written by process 22567',
 22568: 'Hi, I was written by process 22568',
 22569: 'Hi, I was written by process 22569',
 22570: 'Hi, I was written by process 22570',
 22571: 'Hi, I was written by process 22571'}


In addition to @senderle’s here, some might also be wondering how to use the functionality of multiprocessing.Pool.

The nice thing is that there is a .Pool() method to the manager instance that mimics all the familiar API of the top-level multiprocessing.

from itertools import repeat
import multiprocessing as mp
import os
import pprint

def f(d: dict) -> None:
    pid = os.getpid()
    d[pid] = "Hi, I was written by process %d" % pid

if __name__ == '__main__':
    with mp.Manager() as manager:
        d = manager.dict()
        with manager.Pool() as pool:
            pool.map(f, repeat(d, 10))
        # `d` is a DictProxy object that can be converted to dict


$ python3 mul.py 
{22562: 'Hi, I was written by process 22562',
 22563: 'Hi, I was written by process 22563',
 22564: 'Hi, I was written by process 22564',
 22565: 'Hi, I was written by process 22565',
 22566: 'Hi, I was written by process 22566',
 22567: 'Hi, I was written by process 22567',
 22568: 'Hi, I was written by process 22568',
 22569: 'Hi, I was written by process 22569',
 22570: 'Hi, I was written by process 22570',
 22571: 'Hi, I was written by process 22571'}

This is a slightly different example where each process just logs its process ID to the global DictProxy object d.

回答 4



  1. 尚未经过全面测试,仅供参考。

  2. 当前,它缺乏用于多处理的锁定/ sem机制。

Maybe you can try pyshmht, sharing memory based hash table extension for Python.


  1. It’s not fully tested, just for your reference.

  2. It currently lacks lock/sem mechanisms for multiprocessing.



为了使我的代码更“ Pythonic”和更快,我使用“ multiprocessing”和一个map函数向其发送a)函数和b)迭代范围。




from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))


To make my code more “pythonic” and faster, I use “multiprocessing” and a map function to send it a) the function and b) the range of iterations.

The implanted solution (i.e., call tqdm directly on the range tqdm.tqdm(range(0, 30)) does not work with multiprocessing (as formulated in the code below).

The progress bar is displayed from 0 to 100% (when python reads the code?) but it does not indicate the actual progress of the map function.

How to display a progress bar that indicates at which step the ‘map’ function is ?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))

Any help or suggestions are welcome…

回答 0


from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))

Use imap instead of map, which returns an iterator of processed values.

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))

回答 1


注意:Pool的上下文管理器仅在Python 3.3版中可用

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))):

Solution Found : Be careful! Due to multiprocessing, estimation time (iteration per loop, total time, etc.) could be unstable, but the progress bar works perfectly.

Note: Context manager for Pool is only available from Python version 3.3

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))):

回答 2



from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))

You can use p_tqdm instead.


from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))

回答 3


from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

参考:https : //tqdm.github.io/docs/contrib.concurrent/https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

Sorry for being late but if all you need is a concurrent map, I added this functionality in tqdm>=4.42.0:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

References: https://tqdm.github.io/docs/contrib.concurrent/ and https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

It supports max_workers and chunksize and you can also easily switch from process_map to thread_map.

回答 4


from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
    return res_list

def _foo(my_number):
    square = my_number * my_number
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))

based on the answer of Xavi Martínez I wrote the function imap_unordered_bar. It can be used in the same way as imap_unordered with the only difference that a processing bar is shown.

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
    return res_list

def _foo(my_number):
    square = my_number * my_number
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))

回答 5

import multiprocessing as mp
import tqdm

some_iterable = ...

def some_func():
    # your logic

if __name__ == '__main__':
    with mp.Pool(mp.cpu_count()-2) as p:
        list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))
import multiprocessing as mp
import tqdm

some_iterable = ...

def some_func():
    # your logic

if __name__ == '__main__':
    with mp.Pool(mp.cpu_count()-2) as p:
        list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))

回答 6


def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        completed_tasks_counter = completed_tasks_counter + 1

for p in processes:

return results

Here is my take for when you need to get results back from your parallel executing functions. This function does a few things (there is another post of mine that explains it further) but the key point is that there is a tasks pending queue and a tasks completed queue. As workers are done with each task in the pending queue they add the results in the tasks completed queue. You can wrap the check to the tasks completed queue with the tqdm progress bar. I am not putting the implementation of the do_work() function here, it is not relevant, as the message here is to monitor the tasks completed queue and update the progress bar every time a result is in.

def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        completed_tasks_counter = completed_tasks_counter + 1

for p in processes:

return results

回答 7


from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm

def job():

pool = ThreadPool(5)
with tqdm(total=100) as pbar:
    for i in range(100):

This approach simple and it works.

from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm

def job():

pool = ThreadPool(5)
with tqdm(total=100) as pbar:
    for i in range(100):




p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion


p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print "Waiting for", remaining, "tasks to complete..."

是否有用于结果对象或池本身的方法来指示剩余任务数?我尝试使用multiprocessing.Value对象作为计数器(在完成任务后do_work调用counter.value += 1操作),但是在停止递增之前,计数器只能达到总值的〜85%。

I have a script that’s successfully doing a multiprocessing Pool set of tasks with a imap_unordered() call:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

However, my num_tasks is around 250,000, and so the join() locks the main thread for 10 seconds or so, and I’d like to be able to echo out to the command line incrementally to show the main process isn’t locked. Something like:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print "Waiting for", remaining, "tasks to complete..."

Is there a method for the result object or the pool itself that indicates the number of tasks remaining? I tried using a multiprocessing.Value object as a counter (do_work calls a counter.value += 1 action after doing its task), but the counter only gets to ~85% of the total value before stopping incrementing.

回答 0


from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

There is no need to access private attributes of the result set:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

回答 1


from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):

My personal favorite — gives you a nice little progress bar and completion ETA while things run and commit in parallel.

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):

回答 2


pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x

pool.imap_unordered(do_work, tasks)


I found that the work was already done by the time I tried to check it’s progress. This is what worked for me using tqdm.

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x

pool.imap_unordered(do_work, tasks)

This should work with all flavors of multiprocessing, whether they block or not.

回答 3


p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."


p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."

Found an answer myself with some more digging: Taking a look at the __dict__ of the imap_unordered result object, I found it has a _index attribute that increments with each task completion. So this works for logging, wrapped in the while loop:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."

However, I did find that swapping the imap_unordered for a map_async resulted in much faster execution, though the result object is a bit different. Instead, the result object from map_async has a _number_left attribute, and a ready() method:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."

回答 4


from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):

print results



4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']


I know that this is a rather old question, but here is what I’m doing when I want to track the progression of a pool of tasks in python.

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):

print results

Basically, you use apply_async with a callbak (in this case, it is to append the returned value to a list), so you don’t have to wait to do something else. Then, within a while-loop, you check the progression of the work. In this case, I added a widget to make it look nicer.

The output:

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

Hope it helps.

回答 5


from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))


As suggested by Tim, you can use tqdm and imap to solve this issue. I’ve just stumbled upon this problem and tweaked the imap_unordered solution, so that I can access the results of the mapping. Here’s how it works:

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

In case you don’t care about the values returned from your jobs, you don’t need to assign the list to any variable.

回答 6


from multiprocessing import Pool
from tqdm import tqdm
from time import sleep

def work(x):
    return x**2

n = 10

p = Pool(4)
pbar = tqdm(total=n)
res = [p.apply_async(work, args=(
    i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [p.get() for p in res]

for anybody looking for a simple solution working with Pool.apply_async():

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep

def work(x):
    return x**2

n = 10

p = Pool(4)
pbar = tqdm(total=n)
res = [p.apply_async(work, args=(
    i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [p.get() for p in res]

回答 7


from multiprocessing import Pool, cpu_count

class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):

    def get_results(self):
        return self.results

I created a custom class to create a progress printout. Maby this helps:

from multiprocessing import Pool, cpu_count

class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):

    def get_results(self):
        return self.results

回答 8


import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)  

    finished = 0
    while finished < n_groups:

        while queue_stat.empty():

        gotten = queue_stat.get()
        if gotten == 'finished':
            finished += 1

def process_data( queue_data, queue_stat, group):

    for i in group:

        ... do stuff resulting in new_data



def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

    queue_data = multiprocessing.Queue()
    queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

        if i == 0:

            p = multiprocessing.Process(target = status_bar,

        p = multiprocessing.Process(target = process_data,
        args=(queue_data, queue_stat, group))

    for i in range(len(groups)):
        data = queue_data.get() 
        new_data += data

    for p in processes:

Try this simple Queue based approach, which can also be used with pooling. Be mindful that printing anything after the initiation of the progress bar will cause it to be moved, at least for this particular progress bar. (PyPI’s progress 1.5)

import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)  

    finished = 0
    while finished < n_groups:

        while queue_stat.empty():

        gotten = queue_stat.get()
        if gotten == 'finished':
            finished += 1

def process_data( queue_data, queue_stat, group):

    for i in group:

        ... do stuff resulting in new_data



def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

    queue_data = multiprocessing.Queue()
    queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

        if i == 0:

            p = multiprocessing.Process(target = status_bar,

        p = multiprocessing.Process(target = process_data,
        args=(queue_data, queue_stat, group))

    for i in range(len(groups)):
        data = queue_data.get() 
        new_data += data

    for p in processes:





AssertionError: daemonic processes are not allowed to have children


Would it be possible to create a python Pool that is non-daemonic? I want a pool to be able to call a function that has another pool inside.

I want this because deamon processes cannot create process. Specifically, it will cause the error:

AssertionError: daemonic processes are not allowed to have children

For example, consider the scenario where function_a has a pool which runs function_b which has a pool which runs function_c. This function chain will fail, because function_b is being run in a daemon process, and daemon processes cannot create processes.

回答 0



#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])


if __name__ == '__main__':

The multiprocessing.pool.Pool class creates the worker processes in its __init__ method, makes them daemonic and starts them, and it is not possible to re-set their daemon attribute to False before they are started (and afterwards it’s not allowed anymore). But you can create your own sub-class of multiprocesing.pool.Pool (multiprocessing.Pool is just a wrapper function) and substitute your own multiprocessing.Process sub-class, which is always non-daemonic, to be used for the worker processes.

Here’s a full example of how to do this. The important parts are the two classes NoDaemonProcess and MyPool at the top and to call pool.close() and pool.join() on your MyPool instance at the end.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])


if __name__ == '__main__':

回答 1

我必须在Python 3.7中使用非守护程序池,并最终改编了接受的答案中发布的代码。下面是创建非守护程序池的代码段:

class NoDaemonProcess(multiprocessing.Process):
    def daemon(self):
        return False

    def daemon(self, value):

class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(MyPool, self).__init__(*args, **kwargs)



  1. 它仍然取决于multiprocessing软件包的实现细节,因此可能随时中断。
  2. 为什么有正当的理由multiprocessing说得那么难用非恶魔的过程,其中有许多是解释在这里。我认为最引人注目的是:


I had the necessity to employ a non-daemonic pool in Python 3.7 and ended up adapting the code posted in the accepted answer. Below there’s the snippet that creates the non-daemonic pool:

import multiprocessing.pool

class NoDaemonProcess(multiprocessing.Process):
    def daemon(self):
        return False

    def daemon(self, value):

class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(NestablePool, self).__init__(*args, **kwargs)

As the current implementation of multiprocessing has been extensively refactored to be based on contexts, we need to provide a NoDaemonContext class that has our NoDaemonProcess as attribute. NestablePool will then use that context instead of the default one.

That said, I should warn that there are at least two caveats to this approach:

  1. It still depends on implementation details of the multiprocessing package, and could therefore break at any time.
  2. There are valid reasons why multiprocessing made it so hard to use non-daemonic processes, many of which are explained here. The most compelling in my opinion is:

As for allowing children threads to spawn off children of its own using subprocess runs the risk of creating a little army of zombie ‘grandchildren’ if either the parent or child threads terminate before the subprocess completes and returns.

回答 2




The multiprocessing module has a nice interface to use pools with processes or threads. Depending on your current use case, you might consider using multiprocessing.pool.ThreadPool for your outer Pool, which will result in threads (that allow to spawn processes from within) as opposed to processes.

It might be limited by the GIL, but in my particular case (I tested both), the startup time for the processes from the outer Pool as created here far outweighed the solution with ThreadPool.

It’s really easy to swap Processes for Threads. Read more about how to use a ThreadPool solution here or here.

回答 3

在某些Python版本上,将标准Pool替换为custom会引发错误:AssertionError: group argument must be None for now


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def daemon(self):
        return False

    def daemon(self, val):

class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc

On some Python versions replacing standard Pool to custom can raise error: AssertionError: group argument must be None for now.

Here I found a solution that can help:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def daemon(self):
        return False

    def daemon(self, val):

class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc

回答 4


from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":

上面的演示代码已通过Python 3.8进行了测试。


concurrent.futures.ProcessPoolExecutor doesn’t have this limitation. It can have a nested process pool with no problem at all:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":

The above demonstration code was tested with Python 3.8.

A limitation of ProcessPoolExecutor, however, is that it doesn’t have maxtasksperchild. If you need this, consider the answer by Massimiliano instead.

Credit: answer by jfs

回答 5



from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children

    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin


from globals import Globals

The issue I encountered was in trying to import globals between modules, causing the ProcessPool() line to get evaluated multiple times.


from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

Then import safely from elsewhere in your code

from globals import Globals

I have written a more expanded wrapper class around pathos.multiprocessing here:

As a side note, if your usecase just requires async multiprocess map as a performance optimization, then joblib will manage all your process pools behind the scenes and allow this very simple syntax:

squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )

回答 6


import billiard as multiprocessing

I have seen people dealing with this issue by using celery‘s fork of multiprocessing called billiard (multiprocessing pool extensions), which allows daemonic processes to spawn children. The walkaround is to simply replace the multiprocessing module by:

import billiard as multiprocessing



了解Python 多重处理(来自PMOTW文章),并且希望澄清一下join()方法的。


from multiprocessing import Process

def say_hello(name='world'):
    print "Hello, %s" % name

p = Process(target=say_hello)


from multiprocessing import Process
import sys
import time

def say_hello(name='world'):
    print "Hello, %s" % name
    print 'Starting:', p.name, p.pid
    print 'Exiting :', p.name, p.pid

p = Process(target=say_hello)
# no p.join()


936 ttys000    0:00.05 /Library/Frameworks/Python.framework/Versions/2.7/Reso
938 ttys000    0:00.00 /Library/Frameworks/Python.framework/Versions/2.7/Reso
947 ttys001    0:00.13 -bash


947 ttys001    0:00.13 -bash

行为与p.join()在文件末尾添加回的行为相同。本周Python模块为模块提供了非常易读的解释 ; “要等到进程完成工作并退出后,请使用join()方法。”,但看来至少OS X仍在这样做。


Learning about Python Multiprocessing (from a PMOTW article) and would love some clarification on what exactly the join() method is doing.

In an old tutorial from 2008 it states that without the p.join() call in the code below, “the child process will sit idle and not terminate, becoming a zombie you must manually kill”.

from multiprocessing import Process

def say_hello(name='world'):
    print "Hello, %s" % name

p = Process(target=say_hello)

I added a printout of the PID as well as a time.sleep to test and as far as I can tell, the process terminates on its own:

from multiprocessing import Process
import sys
import time

def say_hello(name='world'):
    print "Hello, %s" % name
    print 'Starting:', p.name, p.pid
    print 'Exiting :', p.name, p.pid

p = Process(target=say_hello)
# no p.join()

within 20 seconds:

936 ttys000    0:00.05 /Library/Frameworks/Python.framework/Versions/2.7/Reso
938 ttys000    0:00.00 /Library/Frameworks/Python.framework/Versions/2.7/Reso
947 ttys001    0:00.13 -bash

after 20 seconds:

947 ttys001    0:00.13 -bash

Behavior is the same with p.join() added back at end of the file. Python Module of the Week offers a very readable explanation of the module; “To wait until a process has completed its work and exited, use the join() method.”, but it seems like at least OS X was doing that anyway.

Am also wondering about the name of the method. Is the .join() method concatenating anything here? Is it concatenating a process with it’s end? Or does it just share a name with Python’s native .join() method?

回答 0


现在,您看到有和没有调用都延迟20秒的原因join()是因为默认情况下,当主进程准备退出时,它将隐式调用join()所有正在运行的multiprocessing.Process实例。在multiprocessing文档中并未对此进行明确说明,但在“ 编程指南”部分中进行了提及:



p = Process(target=say_hello)
p.daemon = True
# Both parent and child will exit here, since the main process has completed.






The join() method, when used with threading or multiprocessing, is not related to str.join() – it’s not actually concatenating anything together. Rather, it just means “wait for this [thread/process] to complete”. The name join is used because the multiprocessing module’s API is meant to look as similar to the threading module’s API, and the threading module uses join for its Thread object. Using the term join to mean “wait for a thread to complete” is common across many programming languages, so Python just adopted it as well.

Now, the reason you see the 20 second delay both with and without the call to join() is because by default, when the main process is ready to exit, it will implicitly call join() on all running multiprocessing.Process instances. This isn’t as clearly stated in the multiprocessing docs as it should be, but it is mentioned in the Programming Guidelines section:

Remember also that non-daemonic processes will be automatically be joined.

You can override this behavior by setting the daemon flag on the Process to True prior to starting the process:

p = Process(target=say_hello)
p.daemon = True
# Both parent and child will exit here, since the main process has completed.

If you do that, the child process will be terminated as soon as the main process completes:


The process’s daemon flag, a Boolean value. This must be set before start() is called.

The initial value is inherited from the creating process.

When a process exits, it attempts to terminate all of its daemonic child processes.

回答 1

没有 join(),主进程可以在子进程之前完成。我不确定在什么情况下会导致僵尸。

主要目的 join()是确保子流程在主流程执行任何依赖于子流程的工作之前完成。


Without the join(), the main process can complete before the child process does. I’m not sure under what circumstances that leads to zombieism.

The main purpose of join() is to ensure that a child process has completed before the main process does anything that depends on the work of the child process.

The etymology of join() is that it’s the opposite of fork, which is the common term in Unix-family operating systems for creating child processes. A single process “forks” into several, then “joins” back into one.

回答 2


这个想法是执行“ 分叉 ”到多个进程中,其中一个是主进程,其余是工人(或“奴隶”)。工作人员完成后,他们“加入”主服务器,以便可以恢复串行执行。

join方法使主进程等待工作人员加入。该方法最好称为“等待”,因为这是它在主服务器中引起的实际行为(这就是POSIX中所称的内容,尽管POSIX线程也称其为“ join”)。连接仅是由于线程正确协作而产生的,不是主服务器执行的操作

自1963年以来,名称“ fork”和“ join”已在多处理中使用此含义。

I’m not going to explain in detail what join does, but here’s the etymology and the intuition behind it, which should help you remember its meaning more easily.

The idea is that execution “forks” into multiple processes of which one is the master, the rest workers (or “slaves”). When the workers are done, they “join” the master so that serial execution may be resumed.

The join method causes the master process to wait for a worker to join it. The method might better have been called “wait”, since that’s the actual behavior it causes in the master (and that’s what it’s called in POSIX, although POSIX threads call it “join” as well). The joining only occurs as an effect of the threads cooperating properly, it’s not something the master does.

The names “fork” and “join” have been used with this meaning in multiprocessing since 1963.

回答 3


就像@罗素提到的 的那样,join就像fork的相反对象(子流程)。


"the child process will sit idle and not terminate, becoming a zombie you must manually kill" 当主(父)进程退出但子进程仍在运行并且完成后,它没有父进程可以将其退出状态返回时,这是可能的。

join() is used to wait for the worker processes to exit. One must call close() or terminate() before using join().

Like @Russell mentioned join is like the opposite of fork (which Spawns sub-processes).

For join to run you have to run close() which will prevent any more tasks from being submitted to the pool and exit once all tasks complete. Alternatively, running terminate() will just exit by stopping all worker processes immediately.

"the child process will sit idle and not terminate, becoming a zombie you must manually kill" this is possible when the main (parent) process exits but the child process is still running and once completed it has no parent process to return its exit status to.

回答 4



num_processes = 5

for i in range(num_processes):
    p = multiprocessing.Process(target=calculate_stuff, args=(i,))
for p in processes:
    p.join() # call to ensure subsequent line (e.g. restart_program) 
             # is not called until all processes finish


The join() call ensures that subsequent lines of your code are not called before all the multiprocessing processes are completed.

For example, without the join(), the following code will call restart_program() even before the processes finish, which is similar to asynchronous and is not what we want (you can try):

num_processes = 5

for i in range(num_processes):
    p = multiprocessing.Process(target=calculate_stuff, args=(i,))
for p in processes:
    p.join() # call to ensure subsequent line (e.g. restart_program) 
             # is not called until all processes finish


回答 5





To wait until a process has completed its work and exited, use the join() method.


Note It is important to join() the process after terminating it in order to give the background machinery time to update the status of the object to reflect the termination.

This is a good example helped me understand it: here

One thing I noticed personally was my main process paused until the child had finished its process using the join() method which defeated the point of me using multiprocessing.Process() in the first place.




from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]


Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]



I would like to use a numpy array in shared memory for use with the multiprocessing module. The difficulty is using it like a numpy array, and not just as a ctypes array.

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

This produces output such as:

Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]

The array can be accessed in a ctypes manner, e.g. arr[i] makes sense. However, it is not a numpy array, and I cannot perform operations such as -1*arr, or arr.sum(). I suppose a solution would be to convert the ctypes array into a numpy array. However (besides not being able to make this work), I don’t believe it would be shared anymore.

It seems there would be a standard solution to what has to be a common problem.

回答 0

要添加到@unutbu(不再可用)和@Henry Gomersall的答案中。您可以shared_arr.get_lock()在需要时使用来同步访问:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    with shared_arr.get_lock(): # synchronize access

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':


To add to @unutbu’s (not available anymore) and @Henry Gomersall’s answers. You could use shared_arr.get_lock() to synchronize access when needed:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]


import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    with shared_arr.get_lock(): # synchronize access

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':

If you don’t need synchronized access or you create your own locks then mp.Array() is unnecessary. You could use mp.sharedctypes.RawArray in this case.

回答 1


from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]



The Array object has a get_obj() method associated with it, which returns the ctypes array which presents a buffer interface. I think the following should work…

from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]

When run, this prints out the first element of a now being 10.0, showing a and b are just two views into the same memory.

In order to make sure it is still multiprocessor safe, I believe you will have to use the acquire and release methods that exist on the Array object, a, and its built in lock to make sure its all safely accessed (though I’m not an expert on the multiprocessor module).

回答 2


  1. 您使用的是POSIX兼容的操作系统(例如Linux,Mac OSX);和
  2. 您的子进程需要对共享阵列的只读访问权限

在这种情况下,您无需费心地显式地使变量共享,因为将使用派生来创建子进程。分叉的孩子会自动共享父母的内存空间。在Python多处理的上下文中,这意味着它共享所有模块级变量;请注意,这不适用于您显式传递给子进程或传递给a multiprocessing.Pool或此类函数的参数。


import multiprocessing
import numpy as np

# will hold the (implicitly mem-shared) data
data_array = None

# child worker function
def job_handler(num):
    # built-in id() returns unique memory ID of a variable
    return id(data_array), np.sum(data_array)

def launch_jobs(data, num_jobs=5, num_worker=4):
    global data_array
    data_array = data

    pool = multiprocessing.Pool(num_worker)
    return pool.map(job_handler, range(num_jobs))

# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))

# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))

While the answers already given are good, there is a much easier solution to this problem provided two conditions are met:

  1. You are on a POSIX-compliant operating system (e.g. Linux, Mac OSX); and
  2. Your child processes need read-only access to the shared array.

In this case you do not need to fiddle with explicitly making variables shared, as the child processes will be created using a fork. A forked child automatically shares the parent’s memory space. In the context of Python multiprocessing, this means it shares all module-level variables; note that this does not hold for arguments that you explicitly pass to your child processes or to the functions you call on a multiprocessing.Pool or so.

A simple example:

import multiprocessing
import numpy as np

# will hold the (implicitly mem-shared) data
data_array = None

# child worker function
def job_handler(num):
    # built-in id() returns unique memory ID of a variable
    return id(data_array), np.sum(data_array)

def launch_jobs(data, num_jobs=5, num_worker=4):
    global data_array
    data_array = data

    pool = multiprocessing.Pool(num_worker)
    return pool.map(job_handler, range(num_jobs))

# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))

# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))

回答 3




import numpy as np
import SharedArray as sa

# Create an array in shared memory
a = sa.create("test1", 10)

# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")

# See how they are actually sharing the same memory block
a[0] = 42

# Destroying a does not affect b.
del a

# See how "test1" is still present in shared memory even though we
# destroyed the array a.

# Now destroy the array "test1" from memory.

# The array b is not affected, but once you destroy it then the
# data are lost.

I’ve written a small python module that uses POSIX shared memory to share numpy arrays between python interpreters. Maybe you will find it handy.


Here’s how it works:

import numpy as np
import SharedArray as sa

# Create an array in shared memory
a = sa.create("test1", 10)

# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")

# See how they are actually sharing the same memory block
a[0] = 42

# Destroying a does not affect b.
del a

# See how "test1" is still present in shared memory even though we
# destroyed the array a.

# Now destroy the array "test1" from memory.

# The array b is not affected, but once you destroy it then the
# data are lost.

回答 4

您可以使用以下sharedmem模块:https : //bitbucket.org/cleemesser/numpy-sharedmem

然后,这是您的原始代码,这一次使用行为类似于NumPy数组的共享内存(请注意调用NumPy sum()函数的其他最后一条语句):

from multiprocessing import Process
import sharedmem
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = sharedmem.empty(N)
    arr[:] = unshared_arr.copy()
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(arr,))

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

    # Perform some NumPy operation
    print arr.sum()

You can use the sharedmem module: https://bitbucket.org/cleemesser/numpy-sharedmem

Here’s your original code then, this time using shared memory that behaves like a NumPy array (note the additional last statement calling a NumPy sum() function):

from multiprocessing import Process
import sharedmem
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = sharedmem.empty(N)
    arr[:] = unshared_arr.copy()
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(arr,))

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

    # Perform some NumPy operation
    print arr.sum()