标签归档:multiprocessing

键盘中断与python的多处理池

问题:键盘中断与python的多处理池

如何使用python的多处理池处理KeyboardInterrupt事件?这是一个简单的示例:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

当运行上面的代码时,KeyboardInterrupt按时会引发^C,但是该过程只是在此时挂起,我必须在外部将其杀死。

我希望能够随时按下^C并使所有进程正常退出。

How can I handle KeyboardInterrupt events with python’s multiprocessing Pools? Here is a simple example:

from multiprocessing import Pool
from time import sleep
from sys import exit

def slowly_square(i):
    sleep(1)
    return i*i

def go():
    pool = Pool(8)
    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        # **** THIS PART NEVER EXECUTES. ****
        pool.terminate()
        print "You cancelled the program!"
        sys.exit(1)
    print "\nFinally, here are the results: ", results

if __name__ == "__main__":
    go()

When running the code above, the KeyboardInterrupt gets raised when I press ^C, but the process simply hangs at that point and I have to kill it externally.

I want to be able to press ^C at any time and cause all of the processes to exit gracefully.


回答 0

这是一个Python错误。等待threading.Condition.wait()中的条件时,从不发送KeyboardInterrupt。复制:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

直到wait()返回,才会传递KeyboardInterrupt异常,并且它永远不会返回,因此中断永远不会发生。KeyboardInterrupt几乎应该可以中断条件等待。

请注意,如果指定了超时,则不会发生这种情况。cond.wait(1)将立即收到中断。因此,一种解决方法是指定超时。为此,请更换

    results = pool.map(slowly_square, range(40))

    results = pool.map_async(slowly_square, range(40)).get(9999999)

或类似。

This is a Python bug. When waiting for a condition in threading.Condition.wait(), KeyboardInterrupt is never sent. Repro:

import threading
cond = threading.Condition(threading.Lock())
cond.acquire()
cond.wait(None)
print "done"

The KeyboardInterrupt exception won’t be delivered until wait() returns, and it never returns, so the interrupt never happens. KeyboardInterrupt should almost certainly interrupt a condition wait.

Note that this doesn’t happen if a timeout is specified; cond.wait(1) will receive the interrupt immediately. So, a workaround is to specify a timeout. To do that, replace

    results = pool.map(slowly_square, range(40))

with

    results = pool.map_async(slowly_square, range(40)).get(9999999)

or similar.


回答 1

从我最近发现的情况来看,最好的解决方案是设置工作进程完全忽略SIGINT,并将所有清理代码限制在父进程中。这可以解决空闲和繁忙的工作进程的问题,并且在子进程中不需要错误处理代码。

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

解释和完整的示例代码分别位于http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/http://github.com/jreese/multiprocessing-keyboardinterrupt

From what I have recently found, the best solution is to set up the worker processes to ignore SIGINT altogether, and confine all the cleanup code to the parent process. This fixes the problem for both idle and busy worker processes, and requires no error handling code in your child processes.

import signal

...

def init_worker():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

...

def main()
    pool = multiprocessing.Pool(size, init_worker)

    ...

    except KeyboardInterrupt:
        pool.terminate()
        pool.join()

Explanation and full example code can be found at http://noswap.com/blog/python-multiprocessing-keyboardinterrupt/ and http://github.com/jreese/multiprocessing-keyboardinterrupt respectively.


回答 2

由于某些原因,仅Exception可正常处理从基类继承的异常。作为一种变通方法,你可能会重新提高你KeyboardInterrupt作为一个Exception实例:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

通常,您将获得以下输出:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

因此,如果您点击^C,您将获得:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

For some reasons, only exceptions inherited from the base Exception class are handled normally. As a workaround, you may re-raise your KeyboardInterrupt as an Exception instance:

from multiprocessing import Pool
import time

class KeyboardInterruptError(Exception): pass

def f(x):
    try:
        time.sleep(x)
        return x
    except KeyboardInterrupt:
        raise KeyboardInterruptError()

def main():
    p = Pool(processes=4)
    try:
        print 'starting the pool map'
        print p.map(f, range(10))
        p.close()
        print 'pool map complete'
    except KeyboardInterrupt:
        print 'got ^C while pool mapping, terminating the pool'
        p.terminate()
        print 'pool is terminated'
    except Exception, e:
        print 'got exception: %r, terminating the pool' % (e,)
        p.terminate()
        print 'pool is terminated'
    finally:
        print 'joining pool processes'
        p.join()
        print 'join complete'
    print 'the end'

if __name__ == '__main__':
    main()

Normally you would get the following output:

staring the pool map
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
pool map complete
joining pool processes
join complete
the end

So if you hit ^C, you will get:

staring the pool map
got ^C while pool mapping, terminating the pool
pool is terminated
joining pool processes
join complete
the end

回答 3

通常这种简单的结构工程CtrlC上池:

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

如几篇类似文章所述:

无需尝试即可在Python中捕获键盘中断

Usually this simple structure works for CtrlC on Pool :

def signal_handle(_signal, frame):
    print "Stopping the Jobs."

signal.signal(signal.SIGINT, signal_handle)

As was stated in few similar posts:

Capture keyboardinterrupt in Python without try-except


回答 4

投票表决的答案不能解决核心问题,但具有类似的副作用。

多重处理库的作者Jesse Noller解释了multiprocessing.Pool在旧博客中使用CTRL + C时如何正确处理。

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

The voted answer does not tackle the core issue but a similar side effect.

Jesse Noller, the author of the multiprocessing library, explains how to correctly deal with CTRL+C when using multiprocessing.Pool in a old blog post.

import signal
from multiprocessing import Pool


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


pool = Pool(initializer=initializer)

try:
    pool.map(perform_download, dowloads)
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

回答 5

似乎有两个问题使多处理过程变得异常烦人。第一个(由Glenn指出)是您需要使用map_async超时而不是map为了获得即时响应(即,不要完成对整个列表的处理)。第二点(Andrey指出)是,多处理不会捕获不继承自Exception(例如SystemExit)的异常。所以这是我的解决方案,涉及这两个方面:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results

It seems there are two issues that make exceptions while multiprocessing annoying. The first (noted by Glenn) is that you need to use map_async with a timeout instead of map in order to get an immediate response (i.e., don’t finish processing the entire list). The second (noted by Andrey) is that multiprocessing doesn’t catch exceptions that don’t inherit from Exception (e.g., SystemExit). So here’s my solution that deals with both of these:

import sys
import functools
import traceback
import multiprocessing

def _poolFunctionWrapper(function, arg):
    """Run function under the pool

    Wrapper around function to catch exceptions that don't inherit from
    Exception (which aren't caught by multiprocessing, so that you end
    up hitting the timeout).
    """
    try:
        return function(arg)
    except:
        cls, exc, tb = sys.exc_info()
        if issubclass(cls, Exception):
            raise # No worries
        # Need to wrap the exception with something multiprocessing will recognise
        import traceback
        print "Unhandled exception %s (%s):\n%s" % (cls.__name__, exc, traceback.format_exc())
        raise Exception("Unhandled exception: %s (%s)" % (cls.__name__, exc))

def _runPool(pool, timeout, function, iterable):
    """Run the pool

    Wrapper around pool.map_async, to handle timeout.  This is required so as to
    trigger an immediate interrupt on the KeyboardInterrupt (Ctrl-C); see
    http://stackoverflow.com/questions/1408356/keyboard-interrupts-with-pythons-multiprocessing-pool

    Further wraps the function in _poolFunctionWrapper to catch exceptions
    that don't inherit from Exception.
    """
    return pool.map_async(functools.partial(_poolFunctionWrapper, function), iterable).get(timeout)

def myMap(function, iterable, numProcesses=1, timeout=9999):
    """Run the function on the iterable, optionally with multiprocessing"""
    if numProcesses > 1:
        pool = multiprocessing.Pool(processes=numProcesses, maxtasksperchild=1)
        mapFunc = functools.partial(_runPool, pool, timeout)
    else:
        pool = None
        mapFunc = map
    results = mapFunc(function, iterable)
    if pool is not None:
        pool.close()
        pool.join()
    return results

回答 6

我发现目前最好的解决方案是不使用multiprocessing.pool功能,而是使用自己的池功能。我提供了一个使用apply_async演示该错误的示例,以及一个示例,展示了如何避免完全使用池功能。

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/

I found, for the time being, the best solution is to not use the multiprocessing.pool feature but rather roll your own pool functionality. I provided an example demonstrating the error with apply_async as well as an example showing how to avoid using the pool functionality altogether.

http://www.bryceboe.com/2010/08/26/python-multiprocessing-and-keyboardinterrupt/


回答 7

我是Python的新手。我到处都在寻找答案,却偶然发现了这个以及其他一些博客和YouTube视频。我试图将粘贴作者的代码复制到上面,并在Windows 7 64位的python 2.7.13上重现它。这接近我想要实现的目标。

我使我的子进程忽略ControlC,并使父进程终止。似乎绕过子进程确实为我避免了这个问题。

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

从头开始的那部分pool.terminate()似乎永远不会执行。

I’m a newbie in Python. I was looking everywhere for answer and stumble upon this and a few other blogs and youtube videos. I have tried to copy paste the author’s code above and reproduce it on my python 2.7.13 in windows 7 64- bit. It’s close to what I wanna achieve.

I made my child processes to ignore the ControlC and make the parent process terminate. Looks like bypassing the child process does avoid this problem for me.

#!/usr/bin/python

from multiprocessing import Pool
from time import sleep
from sys import exit


def slowly_square(i):
    try:
        print "<slowly_square> Sleeping and later running a square calculation..."
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print "<child processor> Don't care if you say CtrlC"
        pass


def go():
    pool = Pool(8)

    try:
        results = pool.map(slowly_square, range(40))
    except KeyboardInterrupt:
        pool.terminate()
        pool.close()
        print "You cancelled the program!"
        exit(1)
    print "Finally, here are the results", results


if __name__ == '__main__':
    go()

The part starting at pool.terminate() never seems to execute.


回答 8

您可以尝试使用Pool对象的apply_async方法,如下所示:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

输出:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

此方法的优点是中断之前处理的结果将返回到结果字典中:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

You can try using the apply_async method of a Pool object, like this:

import multiprocessing
import time
from datetime import datetime


def test_func(x):
    time.sleep(2)
    return x**2


def apply_multiprocessing(input_list, input_function):
    pool_size = 5
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=10)

    try:
        jobs = {}
        for value in input_list:
            jobs[value] = pool.apply_async(input_function, [value])

        results = {}
        for value, result in jobs.items():
            try:
                results[value] = result.get()
            except KeyboardInterrupt:
                print "Interrupted by user"
                pool.terminate()
                break
            except Exception as e:
                results[value] = e
        return results
    except Exception:
        raise
    finally:
        pool.close()
        pool.join()


if __name__ == "__main__":
    iterations = range(100)
    t0 = datetime.now()
    results1 = apply_multiprocessing(iterations, test_func)
    t1 = datetime.now()
    print results1
    print "Multi: {}".format(t1 - t0)

    t2 = datetime.now()
    results2 = {i: test_func(i) for i in iterations}
    t3 = datetime.now()
    print results2
    print "Non-multi: {}".format(t3 - t2)

Output:

100
Multiprocessing run time: 0:00:41.131000
100
Non-multiprocessing run time: 0:03:20.688000

An advantage of this method is that results processed before interruption will be returned in the results dictionary:

>>> apply_multiprocessing(range(100), test_func)
Interrupted by user
{0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

回答 9

奇怪的是,您似乎也必须处理KeyboardInterrupt孩子中的孩子。我本来希望它能像写的那样工作…尝试更改slowly_square为:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

那应该可以按您预期的那样工作。

Strangely enough it looks like you have to handle the KeyboardInterrupt in the children as well. I would have expected this to work as written… try changing slowly_square to:

def slowly_square(i):
    try:
        sleep(1)
        return i * i
    except KeyboardInterrupt:
        print 'You EVIL bastard!'
        return 0

That should work as you expected.


为什么在导入numpy之后多处理仅使用单个内核?

问题:为什么在导入numpy之后多处理仅使用单个内核?

我不确定这是否更多的是操作系统问题,但是我想在这里问一下,以防有人对Python有所了解。

我一直在尝试使用并行化CPU繁重的for循环joblib,但是我发现不是将每个工作进程分配给不同的内核,而是最终将所有工作进程分配给相同的内核,并且没有性能提升。

这是一个非常简单的例子…

from joblib import Parallel,delayed
import numpy as np

def testfunc(data):
    # some very boneheaded CPU work
    for nn in xrange(1000):
        for ii in data[0,:]:
            for jj in data[1,:]:
                ii*jj

def run(niter=10):
    data = (np.random.randn(2,100) for ii in xrange(niter))
    pool = Parallel(n_jobs=-1,verbose=1,pre_dispatch='all')
    results = pool(delayed(testfunc)(dd) for dd in data)

if __name__ == '__main__':
    run()

htop这是该脚本运行时看到的内容:

我在具有4核的笔记本电脑上运行Ubuntu 12.10(3.5.0-26)。显然joblib.Parallel是为不同的工作人员生成了单独的进程,但是有什么方法可以使这些进程在不同的内核上执行?

I am not sure whether this counts more as an OS issue, but I thought I would ask here in case anyone has some insight from the Python end of things.

I’ve been trying to parallelise a CPU-heavy for loop using joblib, but I find that instead of each worker process being assigned to a different core, I end up with all of them being assigned to the same core and no performance gain.

Here’s a very trivial example…

from joblib import Parallel,delayed
import numpy as np

def testfunc(data):
    # some very boneheaded CPU work
    for nn in xrange(1000):
        for ii in data[0,:]:
            for jj in data[1,:]:
                ii*jj

def run(niter=10):
    data = (np.random.randn(2,100) for ii in xrange(niter))
    pool = Parallel(n_jobs=-1,verbose=1,pre_dispatch='all')
    results = pool(delayed(testfunc)(dd) for dd in data)

if __name__ == '__main__':
    run()

…and here’s what I see in htop while this script is running:

I’m running Ubuntu 12.10 (3.5.0-26) on a laptop with 4 cores. Clearly joblib.Parallel is spawning separate processes for the different workers, but is there any way that I can make these processes execute on different cores?


回答 0

经过更多的谷歌搜索后,我在这里找到了答案。

事实证明,某些Python模块(numpyscipytablespandasskimage对进口核心相关性……)的混乱。据我所知,这个问题似乎是由它们链接到多线程OpenBLAS库引起的。

解决方法是使用

os.system("taskset -p 0xff %d" % os.getpid())

在导入模块之后粘贴了这一行,我的示例现在可以在所有内核上运行:

到目前为止,我的经验是,这似乎对numpy机器的性能没有任何负面影响,尽管这可能是特定于机器和任务的。

更新:

还有两种方法可以禁用OpenBLAS本身的CPU关联性重置行为。在运行时,您可以使用环境变量OPENBLAS_MAIN_FREE(或GOTOBLAS_MAIN_FREE),例如

OPENBLAS_MAIN_FREE=1 python myscript.py

或者,如果您要从源代码编译OpenBLAS,则可以在构建时通过编辑Makefile.rule使其包含该行来永久禁用它

NO_AFFINITY=1

After some more googling I found the answer here.

It turns out that certain Python modules (numpy, scipy, tables, pandas, skimage…) mess with core affinity on import. As far as I can tell, this problem seems to be specifically caused by them linking against multithreaded OpenBLAS libraries.

A workaround is to reset the task affinity using

os.system("taskset -p 0xff %d" % os.getpid())

With this line pasted in after the module imports, my example now runs on all cores:

My experience so far has been that this doesn’t seem to have any negative effect on numpy‘s performance, although this is probably machine- and task-specific .

Update:

There are also two ways to disable the CPU affinity-resetting behaviour of OpenBLAS itself. At run-time you can use the environment variable OPENBLAS_MAIN_FREE (or GOTOBLAS_MAIN_FREE), for example

OPENBLAS_MAIN_FREE=1 python myscript.py

Or alternatively, if you’re compiling OpenBLAS from source you can permanently disable it at build-time by editing the Makefile.rule to contain the line

NO_AFFINITY=1

回答 1

Python 3现在公开了直接设置亲和力的方法

>>> import os
>>> os.sched_getaffinity(0)
{0, 1, 2, 3}
>>> os.sched_setaffinity(0, {1, 3})
>>> os.sched_getaffinity(0)
{1, 3}
>>> x = {i for i in range(10)}
>>> x
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
>>> os.sched_setaffinity(0, x)
>>> os.sched_getaffinity(0)
{0, 1, 2, 3}

Python 3 now exposes the methods to directly set the affinity

>>> import os
>>> os.sched_getaffinity(0)
{0, 1, 2, 3}
>>> os.sched_setaffinity(0, {1, 3})
>>> os.sched_getaffinity(0)
{1, 3}
>>> x = {i for i in range(10)}
>>> x
{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
>>> os.sched_setaffinity(0, x)
>>> os.sched_getaffinity(0)
{0, 1, 2, 3}

回答 2

这似乎是Ubuntu上Python的常见问题,并不特定于joblib

我建议尝试使用CPU相似性(taskset)。


多处理中的共享内存对象

问题:多处理中的共享内存对象

假设我有一个很大的内存numpy数组,我有一个函数func将这个巨型数组作为输入(以及其他一些参数)。func具有不同参数的参数可以并行运行。例如:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

如果我使用多处理库,那么该巨型数组将多次复制到不同的进程中。

有没有办法让不同的进程共享同一数组?该数组对象是只读的,永远不会被修改。

更复杂的是,如果arr不是数组,而是任意python对象,是否可以共享它?

[编辑]

我读了答案,但仍然有些困惑。由于fork()是写时复制的,因此在python多处理库中生成新进程时,我们不应调用任何额外的开销。但是下面的代码表明存在巨大的开销:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

输出(顺便说一句,成本随着数组大小的增加而增加,所以我怀疑仍然存在与内存复制相关的开销):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

如果我们不复制阵列,为什么会有这么大的开销?共享内存可以为我节省哪一部分?

Suppose I have a large in memory numpy array, I have a function func that takes in this giant array as input (together with some other parameters). func with different parameters can be run in parallel. For example:

def func(arr, param):
    # do stuff to arr, param

# build array arr

pool = Pool(processes = 6)
results = [pool.apply_async(func, [arr, param]) for param in all_params]
output = [res.get() for res in results]

If I use multiprocessing library, then that giant array will be copied for multiple times into different processes.

Is there a way to let different processes share the same array? This array object is read-only and will never be modified.

What’s more complicated, if arr is not an array, but an arbitrary python object, is there a way to share it?

[EDITED]

I read the answer but I am still a bit confused. Since fork() is copy-on-write, we should not invoke any additional cost when spawning new processes in python multiprocessing library. But the following code suggests there is a huge overhead:

from multiprocessing import Pool, Manager
import numpy as np; 
import time

def f(arr):
    return len(arr)

t = time.time()
arr = np.arange(10000000)
print "construct array = ", time.time() - t;


pool = Pool(processes = 6)

t = time.time()
res = pool.apply_async(f, [arr,])
res.get()
print "multiprocessing overhead = ", time.time() - t;

output (and by the way, the cost increases as the size of the array increases, so I suspect there is still overhead related to memory copying):

construct array =  0.0178790092468
multiprocessing overhead =  0.252444982529

Why is there such huge overhead, if we didn’t copy the array? And what part does the shared memory save me?


回答 0

如果使用的操作系统使用写时复制fork()语义(如任何常见的unix),则只要不更改数据结构,所有子进程都可以使用它,而不会占用额外的内存。您将不必执行任何特殊操作(除非绝对确保您不更改该对象)。

可以针对问题执行的最有效的操作是将数组打包为有效的数组结构(使用numpyarray),将其放置在共享内存中,将其包装为multiprocessing.Array,然后将其传递给函数。这个答案说明了如何做到这一点

如果需要可写的共享库,则需要使用某种同步或锁定包装它。multiprocessing提供了两种方法来执行此操作:一种使用共享内存(适用于简单值,数组或ctypes)或Manager代理,其中一个进程持有该内存,而管理器则从其他进程(甚至是通过网络)仲裁对它的访问。

Manager方法可用于任意Python对象,但会比使用共享内存的等效方法慢,因为需要对对象进行序列化/反序列化并在进程之间发送。

Python提供许多并行处理库和方法multiprocessing是一个出色且全面的库,但是如果您有特殊需要,也许其他方法中的一种可能更好。

If you use an operating system that uses copy-on-write fork() semantics (like any common unix), then as long as you never alter your data structure it will be available to all child processes without taking up additional memory. You will not have to do anything special (except make absolutely sure you don’t alter the object).

The most efficient thing you can do for your problem would be to pack your array into an efficient array structure (using numpy or array), place that in shared memory, wrap it with multiprocessing.Array, and pass that to your functions. This answer shows how to do that.

If you want a writeable shared object, then you will need to wrap it with some kind of synchronization or locking. multiprocessing provides two methods of doing this: one using shared memory (suitable for simple values, arrays, or ctypes) or a Manager proxy, where one process holds the memory and a manager arbitrates access to it from other processes (even over a network).

The Manager approach can be used with arbitrary Python objects, but will be slower than the equivalent using shared memory because the objects need to be serialized/deserialized and sent between processes.

There are a wealth of parallel processing libraries and approaches available in Python. multiprocessing is an excellent and well rounded library, but if you have special needs perhaps one of the other approaches may be better.


回答 1

我遇到了同样的问题,并编写了一个共享内存实用程序类来解决该问题。

我正在使用multiprocessing.RawArray(无锁),并且对数组的访问完全不同步(无锁),请注意不要自己动手。

通过该解决方案,我在四核i7上获得了大约3倍的加速。

代码如下:随时使用和改进它,请报告所有错误。

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))

I run into the same problem and wrote a little shared-memory utility class to work around it.

I’m using multiprocessing.RawArray (lockfree), and also the access to the arrays is not synchronized at all (lockfree), be careful not to shoot your own feet.

With the solution I get speedups by a factor of approx 3 on a quad-core i7.

Here’s the code: Feel free to use and improve it, and please report back any bugs.

'''
Created on 14.05.2013

@author: martin
'''

import multiprocessing
import ctypes
import numpy as np

class SharedNumpyMemManagerError(Exception):
    pass

'''
Singleton Pattern
'''
class SharedNumpyMemManager:    

    _initSize = 1024

    _instance = None

    def __new__(cls, *args, **kwargs):
        if not cls._instance:
            cls._instance = super(SharedNumpyMemManager, cls).__new__(
                                cls, *args, **kwargs)
        return cls._instance        

    def __init__(self):
        self.lock = multiprocessing.Lock()
        self.cur = 0
        self.cnt = 0
        self.shared_arrays = [None] * SharedNumpyMemManager._initSize

    def __createArray(self, dimensions, ctype=ctypes.c_double):

        self.lock.acquire()

        # double size if necessary
        if (self.cnt >= len(self.shared_arrays)):
            self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays)

        # next handle
        self.__getNextFreeHdl()        

        # create array in shared memory segment
        shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions))

        # convert to numpy array vie ctypeslib
        self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base)

        # do a reshape for correct dimensions            
        # Returns a masked array containing the same data, but with a new shape.
        # The result is a view on the original array
        self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions)

        # update cnt
        self.cnt += 1

        self.lock.release()

        # return handle to the shared memory numpy array
        return self.cur

    def __getNextFreeHdl(self):
        orgCur = self.cur
        while self.shared_arrays[self.cur] is not None:
            self.cur = (self.cur + 1) % len(self.shared_arrays)
            if orgCur == self.cur:
                raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!')

    def __freeArray(self, hdl):
        self.lock.acquire()
        # set reference to None
        if self.shared_arrays[hdl] is not None: # consider multiple calls to free
            self.shared_arrays[hdl] = None
            self.cnt -= 1
        self.lock.release()

    def __getArray(self, i):
        return self.shared_arrays[i]

    @staticmethod
    def getInstance():
        if not SharedNumpyMemManager._instance:
            SharedNumpyMemManager._instance = SharedNumpyMemManager()
        return SharedNumpyMemManager._instance

    @staticmethod
    def createArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs)

    @staticmethod
    def getArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs)

    @staticmethod    
    def freeArray(*args, **kwargs):
        return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs)

# Init Singleton on module load
SharedNumpyMemManager.getInstance()

if __name__ == '__main__':

    import timeit

    N_PROC = 8
    INNER_LOOP = 10000
    N = 1000

    def propagate(t):
        i, shm_hdl, evidence = t
        a = SharedNumpyMemManager.getArray(shm_hdl)
        for j in range(INNER_LOOP):
            a[i] = i

    class Parallel_Dummy_PF:

        def __init__(self, N):
            self.N = N
            self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double)            
            self.pool = multiprocessing.Pool(processes=N_PROC)

        def update_par(self, evidence):
            self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N))

        def update_seq(self, evidence):
            for i in range(self.N):
                propagate((i, self.arrayHdl, evidence))

        def getArray(self):
            return SharedNumpyMemManager.getArray(self.arrayHdl)

    def parallelExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_par(5)
        print(pf.getArray())

    def sequentialExec():
        pf = Parallel_Dummy_PF(N)
        print(pf.getArray())
        pf.update_seq(5)
        print(pf.getArray())

    t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec")
    t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec")

    print("Sequential: ", t1.timeit(number=1))    
    print("Parallel: ", t2.timeit(number=1))

回答 2

这是Ray的预期用例,这是一个用于并行和分布式Python的库。在后台,它使用Apache Arrow数据布局(零副本格式)序列化对象,并将其存储在共享内存对象存储中,这样多个进程可以访问它们而无需创建副本。

该代码如下所示。

import numpy as np
import ray

ray.init()

@ray.remote
def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

如果您不调用ray.put该数组,则该数组仍将存储在共享内存中,但是每次调用都会完成一次func,这不是您想要的。

请注意,这不仅适用于数组,而且还适用于包含数组的对象,例如,将int映射到数组的字典,如下所示。

您可以通过在IPython中运行以下代码来比较Ray和pickle中的序列化性能。

import numpy as np
import pickle
import ray

ray.init()

x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

使用Ray进行序列化仅比pickle快一点,但是由于使用了共享内存,反序列化的速度要快1000倍(此数字当然取决于对象)。

请参阅Ray文档。您可以阅读更多有关使用Ray和Arrow进行快速序列化的信息。注意我是Ray开发人员之一。

This is the intended use case for Ray, which is a library for parallel and distributed Python. Under the hood, it serializes objects using the Apache Arrow data layout (which is a zero-copy format) and stores them in a shared-memory object store so they can be accessed by multiple processes without creating copies.

The code would look like the following.

import numpy as np
import ray

ray.init()

@ray.remote
def func(array, param):
    # Do stuff.
    return 1

array = np.ones(10**6)
# Store the array in the shared memory object store once
# so it is not copied multiple times.
array_id = ray.put(array)

result_ids = [func.remote(array_id, i) for i in range(4)]
output = ray.get(result_ids)

If you don’t call ray.put then the array will still be stored in shared memory, but that will be done once per invocation of func, which is not what you want.

Note that this will work not only for arrays but also for objects that contain arrays, e.g., dictionaries mapping ints to arrays as below.

You can compare the performance of serialization in Ray versus pickle by running the following in IPython.

import numpy as np
import pickle
import ray

ray.init()

x = {i: np.ones(10**7) for i in range(20)}

# Time Ray.
%time x_id = ray.put(x)  # 2.4s
%time new_x = ray.get(x_id)  # 0.00073s

# Time pickle.
%time serialized = pickle.dumps(x)  # 2.6s
%time deserialized = pickle.loads(serialized)  # 1.9s

Serialization with Ray is only slightly faster than pickle, but deserialization is 1000x faster because of the use of shared memory (this number will of course depend on the object).

See the Ray documentation. You can read more about fast serialization using Ray and Arrow. Note I’m one of the Ray developers.


回答 3

就像Robert Nishihara提到的那样,Apache Arrow使得这一点变得容易,特别是使用Plasma内存对象存储库,这正是Ray所基于的。

为此,我专门制作了脑等离子体 -在Flask应用中快速加载和重新加载大对象。它是Apache Arrow可序列化对象的共享内存对象命名空间,包括pickle.d生成的’d字节字符串pickle.dumps(...)

Apache Ray和Plasma的主要区别在于,它可以为您跟踪对象ID。在本地运行的任何进程,线程或程序都可以通过从任何Brain对象中调用名称来共享变量的值。

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/)

brain['a'] = [1]*10000

brain['a']
# >>> [1,1,1,1,...]

Like Robert Nishihara mentioned, Apache Arrow makes this easy, specifically with the Plasma in-memory object store, which is what Ray is built on.

I made brain-plasma specifically for this reason – fast loading and reloading of big objects in a Flask app. It’s a shared-memory object namespace for Apache Arrow-serializable objects, including pickle‘d bytestrings generated by pickle.dumps(...).

The key difference with Apache Ray and Plasma is that it keeps track of object IDs for you. Any processes or threads or programs that are running on locally can share the variables’ values by calling the name from any Brain object.

$ pip install brain-plasma
$ plasma_store -m 10000000 -s /tmp/plasma

from brain_plasma import Brain
brain = Brain(path='/tmp/plasma/)

brain['a'] = [1]*10000

brain['a']
# >>> [1,1,1,1,...]

Windows上的RuntimeError尝试Python多处理

问题:Windows上的RuntimeError尝试Python多处理

我正在尝试在Windows机器上使用Threading and Multiprocessing的第一个正式python程序。我无法启动进程,但是python给出了以下消息。问题是,我没有在模块中启动线程。线程在类内的单独模块中处理。

编辑:顺便说一句,此代码在ubuntu上运行良好。在窗户上不太

RuntimeError: 
            Attempt to start a new process before the current process
            has finished its bootstrapping phase.
            This probably means that you are on Windows and you have
            forgotten to use the proper idiom in the main module:
                if __name__ == '__main__':
                    freeze_support()
                    ...
            The "freeze_support()" line can be omitted if the program
            is not going to be frozen to produce a Windows executable.

我的原始代码很长,但是我能够以节略的版本重现该错误。它分为两个文件,第一个是主模块,除了导入处理进程/线程和调用方法的模块外,几乎没有其他作用。第二个模块是代码的关键所在。


testMain.py:

import parallelTestModule

extractor = parallelTestModule.ParallelExtractor()
extractor.runInParallel(numProcesses=2, numThreads=4)

parallelTestModule.py:

import multiprocessing
from multiprocessing import Process
import threading

class ThreadRunner(threading.Thread):
    """ This class represents a single instance of a running thread"""
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name
    def run(self):
        print self.name,'\n'

class ProcessRunner:
    """ This class represents a single instance of a running process """
    def runp(self, pid, numThreads):
        mythreads = []
        for tid in range(numThreads):
            name = "Proc-"+str(pid)+"-Thread-"+str(tid)
            th = ThreadRunner(name)
            mythreads.append(th) 
        for i in mythreads:
            i.start()
        for i in mythreads:
            i.join()

class ParallelExtractor:    
    def runInParallel(self, numProcesses, numThreads):
        myprocs = []
        prunner = ProcessRunner()
        for pid in range(numProcesses):
            pr = Process(target=prunner.runp, args=(pid, numThreads)) 
            myprocs.append(pr) 
#        if __name__ == 'parallelTestModule':    #This didnt work
#        if __name__ == '__main__':              #This obviously doesnt work
#        multiprocessing.freeze_support()        #added after seeing error to no avail
        for i in myprocs:
            i.start()

        for i in myprocs:
            i.join()

I am trying my very first formal python program using Threading and Multiprocessing on a windows machine. I am unable to launch the processes though, with python giving the following message. The thing is, I am not launching my threads in the main module. The threads are handled in a separate module inside a class.

EDIT: By the way this code runs fine on ubuntu. Not quite on windows

RuntimeError: 
            Attempt to start a new process before the current process
            has finished its bootstrapping phase.
            This probably means that you are on Windows and you have
            forgotten to use the proper idiom in the main module:
                if __name__ == '__main__':
                    freeze_support()
                    ...
            The "freeze_support()" line can be omitted if the program
            is not going to be frozen to produce a Windows executable.

My original code is pretty long, but I was able to reproduce the error in an abridged version of the code. It is split in two files, the first is the main module and does very little other than import the module which handles processes/threads and calls a method. The second module is where the meat of the code is.


testMain.py:

import parallelTestModule

extractor = parallelTestModule.ParallelExtractor()
extractor.runInParallel(numProcesses=2, numThreads=4)

parallelTestModule.py:

import multiprocessing
from multiprocessing import Process
import threading

class ThreadRunner(threading.Thread):
    """ This class represents a single instance of a running thread"""
    def __init__(self, name):
        threading.Thread.__init__(self)
        self.name = name
    def run(self):
        print self.name,'\n'

class ProcessRunner:
    """ This class represents a single instance of a running process """
    def runp(self, pid, numThreads):
        mythreads = []
        for tid in range(numThreads):
            name = "Proc-"+str(pid)+"-Thread-"+str(tid)
            th = ThreadRunner(name)
            mythreads.append(th) 
        for i in mythreads:
            i.start()
        for i in mythreads:
            i.join()

class ParallelExtractor:    
    def runInParallel(self, numProcesses, numThreads):
        myprocs = []
        prunner = ProcessRunner()
        for pid in range(numProcesses):
            pr = Process(target=prunner.runp, args=(pid, numThreads)) 
            myprocs.append(pr) 
#        if __name__ == 'parallelTestModule':    #This didnt work
#        if __name__ == '__main__':              #This obviously doesnt work
#        multiprocessing.freeze_support()        #added after seeing error to no avail
        for i in myprocs:
            i.start()

        for i in myprocs:
            i.join()

回答 0

在Windows上,子进程将在启动时导入(即执行)主模块。您需要if __name__ == '__main__':在主模块中插入防护,以避免递归创建子流程。

已修改testMain.py

import parallelTestModule

if __name__ == '__main__':    
    extractor = parallelTestModule.ParallelExtractor()
    extractor.runInParallel(numProcesses=2, numThreads=4)

On Windows the subprocesses will import (i.e. execute) the main module at start. You need to insert an if __name__ == '__main__': guard in the main module to avoid creating subprocesses recursively.

Modified testMain.py:

import parallelTestModule

if __name__ == '__main__':    
    extractor = parallelTestModule.ParallelExtractor()
    extractor.runInParallel(numProcesses=2, numThreads=4)

回答 1

尝试将代码放入testMain.py的主函数中

import parallelTestModule

if __name__ ==  '__main__':
  extractor = parallelTestModule.ParallelExtractor()
  extractor.runInParallel(numProcesses=2, numThreads=4)

文档

"For an explanation of why (on Windows) the if __name__ == '__main__' 
part is necessary, see Programming guidelines."

哪说

“确保主模块可以由新的Python解释器安全地导入,而不会引起意外的副作用(例如,启动新进程)。”

… 通过使用 if __name__ == '__main__'

Try putting your code inside a main function in testMain.py

import parallelTestModule

if __name__ ==  '__main__':
  extractor = parallelTestModule.ParallelExtractor()
  extractor.runInParallel(numProcesses=2, numThreads=4)

See the docs:

"For an explanation of why (on Windows) the if __name__ == '__main__' 
part is necessary, see Programming guidelines."

which say

“Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).”

… by using if __name__ == '__main__'


回答 2

尽管较早的答案是正确的,但有一点复杂之处将有助于进一步说明。

如果您的主模块导入了另一个模块,在该模块中定义了全局变量或类成员变量并将其初始化为(或使用)一些新对象,则可能必须以相同的方式来限制导入:

if __name__ ==  '__main__':
  import my_module

Though the earlier answers are correct, there’s a small complication it would help to remark on.

In case your main module imports another module in which global variables or class member variables are defined and initialized to (or using) some new objects, you may have to condition that import in the same way:

if __name__ ==  '__main__':
  import my_module

回答 3

正如@Ofer所说,当您使用其他库或模块时,应将所有库或模块导入其中。 if __name__ == '__main__':

因此,就我而言,这样结束:

if __name__ == '__main__':       
    import librosa
    import os
    import pandas as pd
    run_my_program()

As @Ofer said, when you are using another libraries or modules, you should import all of them inside the if __name__ == '__main__':

So, in my case, ended like this:

if __name__ == '__main__':       
    import librosa
    import os
    import pandas as pd
    run_my_program()

回答 4

就我而言,这是代码中的一个简单错误,在创建变量之前就使用了变量。在尝试上述解决方案之前,值得检查一下。上帝知道为什么我得到这个特殊的错误信息。

In my case it was a simple bug in the code, using a variable before it was created. Worth checking that out before trying the above solutions. Why I got this particular error message, Lord knows.


多重处理:如何在多个流程之间共享一个字典?

问题:多重处理:如何在多个流程之间共享一个字典?

一个程序,该程序创建在可连接队列上工作的多个进程Q,并且最终可能会操纵全局字典D来存储结果。(因此每个子进程都可以D用来存储其结果,并查看其他子进程正在产生什么结果)

如果我在子进程中打印字典D,我会看到对它进行的修改(即在D上)。但是在主流程加入Q之后,如果我打印D,那就是空洞的字典!

我了解这是同步/锁定问题。有人可以告诉我这里发生了什么,如何同步对D的访问?

A program that creates several processes that work on a join-able queue, Q, and may eventually manipulate a global dictionary D to store results. (so each child process may use D to store its result and also see what results the other child processes are producing)

If I print the dictionary D in a child process, I see the modifications that have been done on it (i.e. on D). But after the main process joins Q, if I print D, it’s an empty dict!

I understand it is a synchronization/lock issue. Can someone tell me what is happening here, and how I can synchronize access to D?


回答 0

一般的答案涉及使用Manager对象。改编自文档:

from multiprocessing import Process, Manager

def f(d):
    d[1] += '1'
    d['2'] += 2

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    d[1] = '1'
    d['2'] = 2

    p1 = Process(target=f, args=(d,))
    p2 = Process(target=f, args=(d,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print d

输出:

$ python mul.py 
{1: '111', '2': 6}

A general answer involves using a Manager object. Adapted from the docs:

from multiprocessing import Process, Manager

def f(d):
    d[1] += '1'
    d['2'] += 2

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    d[1] = '1'
    d['2'] = 2

    p1 = Process(target=f, args=(d,))
    p2 = Process(target=f, args=(d,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print d

Output:

$ python mul.py 
{1: '111', '2': 6}

回答 1

多处理不像线程。每个子进程将获得主进程内存的副本。通常,状态是通过通信(管道/套接字),信号或共享内存共享的。

多重处理可为您的用例提供一些抽象-共享状态通过使用代理或共享内存被视为本地状态:http : //docs.python.org/library/multiprocessing.html#sharing-state-between-processes

相关章节:

multiprocessing is not like threading. Each child process will get a copy of the main process’s memory. Generally state is shared via communication (pipes/sockets), signals, or shared memory.

Multiprocessing makes some abstractions available for your use case – shared state that’s treated as local by use of proxies or shared memory: http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes

Relevant sections:


回答 2

我想分享自己的工作,该工作比Manager的指令快,并且比使用大量内存并且不适用于Mac OS的pyshmht库更简单,更稳定。虽然我的字典仅适用于纯字符串,并且目前不可变。我使用线性探测实现,并将键和值对存储在表后的单独内存块中。

from mmap import mmap
import struct
from timeit import default_timer
from multiprocessing import Manager
from pyshmht import HashTable


class shared_immutable_dict:
    def __init__(self, a):
        self.hs = 1 << (len(a) * 3).bit_length()
        kvp = self.hs * 4
        ht = [0xffffffff] * self.hs
        kvl = []
        for k, v in a.iteritems():
            h = self.hash(k)
            while ht[h] != 0xffffffff:
                h = (h + 1) & (self.hs - 1)
            ht[h] = kvp
            kvp += self.kvlen(k) + self.kvlen(v)
            kvl.append(k)
            kvl.append(v)

        self.m = mmap(-1, kvp)
        for p in ht:
            self.m.write(uint_format.pack(p))
        for x in kvl:
            if len(x) <= 0x7f:
                self.m.write_byte(chr(len(x)))
            else:
                self.m.write(uint_format.pack(0x80000000 + len(x)))
            self.m.write(x)

    def hash(self, k):
        h = hash(k)
        h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1)
        return h

    def get(self, k, d=None):
        h = self.hash(k)
        while True:
            x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0]
            if x == 0xffffffff:
                return d
            self.m.seek(x)
            if k == self.read_kv():
                return self.read_kv()
            h = (h + 1) & (self.hs - 1)

    def read_kv(self):
        sz = ord(self.m.read_byte())
        if sz & 0x80:
            sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000
        return self.m.read(sz)

    def kvlen(self, k):
        return len(k) + (1 if len(k) <= 0x7f else 4)

    def __contains__(self, k):
        return self.get(k, None) is not None

    def close(self):
        self.m.close()

uint_format = struct.Struct('>I')


def uget(a, k, d=None):
    return to_unicode(a.get(to_str(k), d))


def uin(a, k):
    return to_str(k) in a


def to_unicode(s):
    return s.decode('utf-8') if isinstance(s, str) else s


def to_str(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s


def mmap_test():
    n = 1000000
    d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'mmap speed: %d gets per sec' % (n / (default_timer() - start_time))


def manager_test():
    n = 100000
    d = Manager().dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'manager speed: %d gets per sec' % (n / (default_timer() - start_time))


def shm_test():
    n = 1000000
    d = HashTable('tmp', n)
    d.update({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'shm speed: %d gets per sec' % (n / (default_timer() - start_time))


if __name__ == '__main__':
    mmap_test()
    manager_test()
    shm_test()

在我的笔记本电脑上,性能结果是:

mmap speed: 247288 gets per sec
manager speed: 33792 gets per sec
shm speed: 691332 gets per sec

简单用法示例:

ht = shared_immutable_dict({'a': '1', 'b': '2'})
print ht.get('a')

I’d like to share my own work that is faster than Manager’s dict and is simpler and more stable than pyshmht library that uses tons of memory and doesn’t work for Mac OS. Though my dict only works for plain strings and is immutable currently. I use linear probing implementation and store keys and values pairs in a separate memory block after the table.

from mmap import mmap
import struct
from timeit import default_timer
from multiprocessing import Manager
from pyshmht import HashTable


class shared_immutable_dict:
    def __init__(self, a):
        self.hs = 1 << (len(a) * 3).bit_length()
        kvp = self.hs * 4
        ht = [0xffffffff] * self.hs
        kvl = []
        for k, v in a.iteritems():
            h = self.hash(k)
            while ht[h] != 0xffffffff:
                h = (h + 1) & (self.hs - 1)
            ht[h] = kvp
            kvp += self.kvlen(k) + self.kvlen(v)
            kvl.append(k)
            kvl.append(v)

        self.m = mmap(-1, kvp)
        for p in ht:
            self.m.write(uint_format.pack(p))
        for x in kvl:
            if len(x) <= 0x7f:
                self.m.write_byte(chr(len(x)))
            else:
                self.m.write(uint_format.pack(0x80000000 + len(x)))
            self.m.write(x)

    def hash(self, k):
        h = hash(k)
        h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1)
        return h

    def get(self, k, d=None):
        h = self.hash(k)
        while True:
            x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0]
            if x == 0xffffffff:
                return d
            self.m.seek(x)
            if k == self.read_kv():
                return self.read_kv()
            h = (h + 1) & (self.hs - 1)

    def read_kv(self):
        sz = ord(self.m.read_byte())
        if sz & 0x80:
            sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000
        return self.m.read(sz)

    def kvlen(self, k):
        return len(k) + (1 if len(k) <= 0x7f else 4)

    def __contains__(self, k):
        return self.get(k, None) is not None

    def close(self):
        self.m.close()

uint_format = struct.Struct('>I')


def uget(a, k, d=None):
    return to_unicode(a.get(to_str(k), d))


def uin(a, k):
    return to_str(k) in a


def to_unicode(s):
    return s.decode('utf-8') if isinstance(s, str) else s


def to_str(s):
    return s.encode('utf-8') if isinstance(s, unicode) else s


def mmap_test():
    n = 1000000
    d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'mmap speed: %d gets per sec' % (n / (default_timer() - start_time))


def manager_test():
    n = 100000
    d = Manager().dict({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'manager speed: %d gets per sec' % (n / (default_timer() - start_time))


def shm_test():
    n = 1000000
    d = HashTable('tmp', n)
    d.update({str(i * 2): '1' for i in xrange(n)})
    start_time = default_timer()
    for i in xrange(n):
        if bool(d.get(str(i))) != (i % 2 == 0):
            raise Exception(i)
    print 'shm speed: %d gets per sec' % (n / (default_timer() - start_time))


if __name__ == '__main__':
    mmap_test()
    manager_test()
    shm_test()

On my laptop performance results are:

mmap speed: 247288 gets per sec
manager speed: 33792 gets per sec
shm speed: 691332 gets per sec

simple usage example:

ht = shared_immutable_dict({'a': '1', 'b': '2'})
print ht.get('a')

回答 3

除了这里的@senderle之外,有些人可能还想知道如何使用的功能multiprocessing.Pool

令人高兴的是,实例中有一个.Pool()方法可以manager模拟所有熟悉的顶层API multiprocessing

from itertools import repeat
import multiprocessing as mp
import os
import pprint

def f(d: dict) -> None:
    pid = os.getpid()
    d[pid] = "Hi, I was written by process %d" % pid

if __name__ == '__main__':
    with mp.Manager() as manager:
        d = manager.dict()
        with manager.Pool() as pool:
            pool.map(f, repeat(d, 10))
        # `d` is a DictProxy object that can be converted to dict
        pprint.pprint(dict(d))

输出:

$ python3 mul.py 
{22562: 'Hi, I was written by process 22562',
 22563: 'Hi, I was written by process 22563',
 22564: 'Hi, I was written by process 22564',
 22565: 'Hi, I was written by process 22565',
 22566: 'Hi, I was written by process 22566',
 22567: 'Hi, I was written by process 22567',
 22568: 'Hi, I was written by process 22568',
 22569: 'Hi, I was written by process 22569',
 22570: 'Hi, I was written by process 22570',
 22571: 'Hi, I was written by process 22571'}

这是一个稍有不同的示例,其中每个进程仅将其进程ID记录到全局DictProxy对象中d

In addition to @senderle’s here, some might also be wondering how to use the functionality of multiprocessing.Pool.

The nice thing is that there is a .Pool() method to the manager instance that mimics all the familiar API of the top-level multiprocessing.

from itertools import repeat
import multiprocessing as mp
import os
import pprint

def f(d: dict) -> None:
    pid = os.getpid()
    d[pid] = "Hi, I was written by process %d" % pid

if __name__ == '__main__':
    with mp.Manager() as manager:
        d = manager.dict()
        with manager.Pool() as pool:
            pool.map(f, repeat(d, 10))
        # `d` is a DictProxy object that can be converted to dict
        pprint.pprint(dict(d))

Output:

$ python3 mul.py 
{22562: 'Hi, I was written by process 22562',
 22563: 'Hi, I was written by process 22563',
 22564: 'Hi, I was written by process 22564',
 22565: 'Hi, I was written by process 22565',
 22566: 'Hi, I was written by process 22566',
 22567: 'Hi, I was written by process 22567',
 22568: 'Hi, I was written by process 22568',
 22569: 'Hi, I was written by process 22569',
 22570: 'Hi, I was written by process 22570',
 22571: 'Hi, I was written by process 22571'}

This is a slightly different example where each process just logs its process ID to the global DictProxy object d.


回答 4

也许您可以尝试pyshmht,为Python共享基于内存的哈希表扩展。

注意

  1. 尚未经过全面测试,仅供参考。

  2. 当前,它缺乏用于多处理的锁定/ sem机制。

Maybe you can try pyshmht, sharing memory based hash table extension for Python.

Notice

  1. It’s not fully tested, just for your reference.

  2. It currently lacks lock/sem mechanisms for multiprocessing.


多重处理:使用tqdm显示进度条

问题:多重处理:使用tqdm显示进度条

为了使我的代码更“ Pythonic”和更快,我使用“ multiprocessing”和一个map函数向其发送a)函数和b)迭代范围。

植入的解决方案(即直接在范围tqdm.tqdm(range(0,30))上调用tqdm不适用于多重处理(如以下代码中所述)。

进度条显示为0到100%(当python读取代码时?),但是它并不表示map函数的实际进度。

如何显示进度条以指示“地图”功能在哪一步?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()

欢迎任何帮助或建议…

To make my code more “pythonic” and faster, I use “multiprocessing” and a map function to send it a) the function and b) the range of iterations.

The implanted solution (i.e., call tqdm directly on the range tqdm.tqdm(range(0, 30)) does not work with multiprocessing (as formulated in the code below).

The progress bar is displayed from 0 to 100% (when python reads the code?) but it does not indicate the actual progress of the map function.

How to display a progress bar that indicates at which step the ‘map’ function is ?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()

Any help or suggestions are welcome…


回答 0

使用imap代替map,它返回已处理值的迭代器。

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))

Use imap instead of map, which returns an iterator of processed values.

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))

回答 1

找到的解决方案:注意!由于进行了多处理,估计时间(每个循环的迭代次数,总时间等)可能不稳定,但是进度条可以正常工作。

注意:Pool的上下文管理器仅在Python 3.3版中可用

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))):
                pbar.update()

Solution Found : Be careful! Due to multiprocessing, estimation time (iteration per loop, total time, etc.) could be unstable, but the progress bar works perfectly.

Note: Context manager for Pool is only available from Python version 3.3

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))):
                pbar.update()

回答 2

您可以p_tqdm改用。

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))

You can use p_tqdm instead.

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))

回答 3

很抱歉,迟到了,但是如果您需要的是并发映射,那么最新版本(tqdm>=4.42.0)现在具有以下内置功能:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

参考:https : //tqdm.github.io/docs/contrib.concurrent/https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

Sorry for being late but if all you need is a concurrent map, I added this functionality in tqdm>=4.42.0:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

References: https://tqdm.github.io/docs/contrib.concurrent/ and https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

It supports max_workers and chunksize and you can also easily switch from process_map to thread_map.


回答 4

根据XaviMartínez的回答,我编写了该函数imap_unordered_bar。可以imap_unordered与显示处理栏的唯一区别相同的方式使用它。

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))

based on the answer of Xavi Martínez I wrote the function imap_unordered_bar. It can be used in the same way as imap_unordered with the only difference that a processing bar is shown.

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))

回答 5

import multiprocessing as mp
import tqdm


some_iterable = ...

def some_func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(mp.cpu_count()-2) as p:
        list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))
import multiprocessing as mp
import tqdm


some_iterable = ...

def some_func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(mp.cpu_count()-2) as p:
        list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))

回答 6

当需要从并行执行函数中获取结果时,这是我的看法。这个函数可以做一些事情(我的另一篇文章对此做了进一步解释),但是关键是有一个任务待处理队列和一个任务完成队列。当工作人员完成挂起队列中的每个任务时,他们会将结果添加到任务完成队列中。您可以使用tqdm进度条将检查包装到任务完成队列中。我没有在这里放置do_work()函数的实现,这无关紧要,因为这里的消息是监视已完成任务的队列并在每次输入结果时更新进度条。

def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})
        tasks_pending.put(expanded_job)

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
    tasks_pending.put(SENTINEL)

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)
    processes.append(p)
    p.start()

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
        bar.update(completed_tasks_counter)

for p in processes:
    p.join()

return results

Here is my take for when you need to get results back from your parallel executing functions. This function does a few things (there is another post of mine that explains it further) but the key point is that there is a tasks pending queue and a tasks completed queue. As workers are done with each task in the pending queue they add the results in the tasks completed queue. You can wrap the check to the tasks completed queue with the tqdm progress bar. I am not putting the implementation of the do_work() function here, it is not relevant, as the message here is to monitor the tasks completed queue and update the progress bar every time a result is in.

def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})
        tasks_pending.put(expanded_job)

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
    tasks_pending.put(SENTINEL)

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)
    processes.append(p)
    p.start()

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
        bar.update(completed_tasks_counter)

for p in processes:
    p.join()

return results

回答 7

这种方法简单有效。

from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm

def job():
    time.sleep(1)
    pbar.update()

pool = ThreadPool(5)
with tqdm(total=100) as pbar:
    for i in range(100):
        pool.apply_async(job)
    pool.close()
    pool.join()

This approach simple and it works.

from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm

def job():
    time.sleep(1)
    pbar.update()

pool = ThreadPool(5)
with tqdm(total=100) as pbar:
    for i in range(100):
        pool.apply_async(job)
    pool.close()
    pool.join()

显示Python多处理池imap_unordered调用的进度?

问题:显示Python多处理池imap_unordered调用的进度?

我有一个脚本可以通过imap_unordered()调用成功完成多处理池任务集:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

但是,我num_tasks大约是250,000,因此join()锁将主线程锁定了10秒钟左右,我希望能够逐步回显命令行以显示主进程未锁定。就像是:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(2)

是否有用于结果对象或池本身的方法来指示剩余任务数?我尝试使用multiprocessing.Value对象作为计数器(在完成任务后do_work调用counter.value += 1操作),但是在停止递增之前,计数器只能达到总值的〜85%。

I have a script that’s successfully doing a multiprocessing Pool set of tasks with a imap_unordered() call:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

However, my num_tasks is around 250,000, and so the join() locks the main thread for 10 seconds or so, and I’d like to be able to echo out to the command line incrementally to show the main process isn’t locked. Something like:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(2)

Is there a method for the result object or the pool itself that indicates the number of tasks remaining? I tried using a multiprocessing.Value object as a counter (do_work calls a counter.value += 1 action after doing its task), but the counter only gets to ~85% of the total value before stopping incrementing.


回答 0

无需访问结果集的私有属性:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

There is no need to access private attributes of the result set:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

回答 1

我个人的最爱-在事物并行运行和提交时为您提供一个不错的进度条和完成ETA。

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass

My personal favorite — gives you a nice little progress bar and completion ETA while things run and commit in parallel.

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass

回答 2

我发现在尝试检查进度时,该工作已经完成。这就是使用tqdm对我有用的

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

这应该适用于所有类型的多处理,无论它们是否阻塞。

I found that the work was already done by the time I tried to check it’s progress. This is what worked for me using tqdm.

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

This should work with all flavors of multiprocessing, whether they block or not.


回答 3

找到了答案我自己有一些挖:以一看__dict__的的imap_unordered结果对象,我发现它有一个_index属性,它与每个任务完成的增量。因此,这适用于日志记录,并包装在while循环中:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

但是,我确实发现,将结果交换imap_unordered为会map_async导致执行速度更快,尽管结果对象有所不同。相反,from的结果对象map_async具有_number_left属性和ready()方法:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)

Found an answer myself with some more digging: Taking a look at the __dict__ of the imap_unordered result object, I found it has a _index attribute that increments with each task completion. So this works for logging, wrapped in the while loop:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

However, I did find that swapping the imap_unordered for a map_async resulted in much faster execution, though the result object is a bit different. Instead, the result object from map_async has a _number_left attribute, and a ready() method:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)

回答 4

我知道这是一个相当老的问题,但是当我想跟踪python中任务池的进度时,这就是我正在做的事情。

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results

基本上,您将apply_async与callbak一起使用(在这种情况下,它是将返回的值附加到列表中),因此您不必等待做其他事情。然后,在一个while循环中,检查工作进度。在这种情况下,我添加了一个小部件以使其看起来更好。

输出:

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

希望能帮助到你。

I know that this is a rather old question, but here is what I’m doing when I want to track the progression of a pool of tasks in python.

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results

Basically, you use apply_async with a callbak (in this case, it is to append the returned value to a list), so you don’t have to wait to do something else. Then, within a while-loop, you check the progression of the work. In this case, I added a widget to make it look nicer.

The output:

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

Hope it helps.


回答 5

如Tim所建议,您可以使用tqdmimap解决此问题。我刚刚偶然发现了这个问题并调整了imap_unordered解决方案,以便可以访问映射结果。运作方式如下:

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

如果您不关心作业返回的值,则无需将列表分配给任何变量。

As suggested by Tim, you can use tqdm and imap to solve this issue. I’ve just stumbled upon this problem and tweaked the imap_unordered solution, so that I can access the results of the mapping. Here’s how it works:

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

In case you don’t care about the values returned from your jobs, you don’t need to assign the list to any variable.


回答 6

对于寻求与Pool.apply_async()以下人员合作的简单解决方案的任何人:

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep


def work(x):
    sleep(0.5)
    return x**2

n = 10

p = Pool(4)
pbar = tqdm(total=n)
res = [p.apply_async(work, args=(
    i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [p.get() for p in res]

for anybody looking for a simple solution working with Pool.apply_async():

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep


def work(x):
    sleep(0.5)
    return x**2

n = 10

p = Pool(4)
pbar = tqdm(total=n)
res = [p.apply_async(work, args=(
    i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [p.get() for p in res]

回答 7

我创建了一个自定义类来创建进度打印输出。Maby这有助于:

from multiprocessing import Pool, cpu_count


class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.results.extend(result)
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):
        self.pool.close()
        self.pool.join()

    def get_results(self):
        return self.results

I created a custom class to create a progress printout. Maby this helps:

from multiprocessing import Pool, cpu_count


class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.results.extend(result)
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):
        self.pool.close()
        self.pool.join()

    def get_results(self):
        return self.results

回答 8

尝试这种简单的基于队列的方法,该方法也可以与池一起使用。请注意,至少在此特定进度条启动之后,打印任何东西都会导致进度条被移动。(PyPI的进度1.5)

import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)  

    finished = 0
    while finished < n_groups:

        while queue_stat.empty():
            time.sleep(0.01)

        gotten = queue_stat.get()
        if gotten == 'finished':
            finished += 1
        else:
            bar.next()
    bar.finish()


def process_data( queue_data, queue_stat, group):

    for i in group:

        ... do stuff resulting in new_data

        queue_stat.put(1)

    queue_stat.put('finished')  
    queue_data.put(new_data)

def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

    queue_data = multiprocessing.Queue()
    queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

        if i == 0:

            p = multiprocessing.Process(target = status_bar,
                args=(queue_stat,len(groups),len(combined)))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_data, queue_stat, group))
        processes.append(p)
        p.start()

    for i in range(len(groups)):
        data = queue_data.get() 
        new_data += data

    for p in processes:
        p.join()

Try this simple Queue based approach, which can also be used with pooling. Be mindful that printing anything after the initiation of the progress bar will cause it to be moved, at least for this particular progress bar. (PyPI’s progress 1.5)

import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)  

    finished = 0
    while finished < n_groups:

        while queue_stat.empty():
            time.sleep(0.01)

        gotten = queue_stat.get()
        if gotten == 'finished':
            finished += 1
        else:
            bar.next()
    bar.finish()


def process_data( queue_data, queue_stat, group):

    for i in group:

        ... do stuff resulting in new_data

        queue_stat.put(1)

    queue_stat.put('finished')  
    queue_data.put(new_data)

def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

    queue_data = multiprocessing.Queue()
    queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

        if i == 0:

            p = multiprocessing.Process(target = status_bar,
                args=(queue_stat,len(groups),len(combined)))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_data, queue_stat, group))
        processes.append(p)
        p.start()

    for i in range(len(groups)):
        data = queue_data.get() 
        new_data += data

    for p in processes:
        p.join()

Python进程池非守护进程?

问题:Python进程池非守护进程?

是否可以创建非守护进程的python池?我希望一个池能够调用内部有另一个池的函数。

我想要这个,因为守护进程无法创建进程。具体来说,它将导致错误:

AssertionError: daemonic processes are not allowed to have children

例如,考虑这样一种情况:其中function_a有一个运行的池,function_b而有一个运行的池function_c。该功能链将失败,因为function_b它正在守护进程中运行,并且守护进程无法创建进程。

Would it be possible to create a python Pool that is non-daemonic? I want a pool to be able to call a function that has another pool inside.

I want this because deamon processes cannot create process. Specifically, it will cause the error:

AssertionError: daemonic processes are not allowed to have children

For example, consider the scenario where function_a has a pool which runs function_b which has a pool which runs function_c. This function chain will fail, because function_b is being run in a daemon process, and daemon processes cannot create processes.


回答 0

multiprocessing.pool.Pool类创建在其工作进程__init__的方法,使他们邪,开始他们,这是不可能自己重新设置daemon属性False在开始之前(事后这是不允许的了)。但是,您可以创建自己的multiprocesing.pool.Poolmultiprocessing.Pool只是包装函数)multiprocessing.Process子类,并替换您自己的子类(该子类始终是非守护程序的)用于工作进程。

这是如何执行此操作的完整示例。重要的部分是两个类NoDaemonProcessMyPool在顶部pool.close()pool.join()在您的MyPool实例的最后,是要调用的类。

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()

The multiprocessing.pool.Pool class creates the worker processes in its __init__ method, makes them daemonic and starts them, and it is not possible to re-set their daemon attribute to False before they are started (and afterwards it’s not allowed anymore). But you can create your own sub-class of multiprocesing.pool.Pool (multiprocessing.Pool is just a wrapper function) and substitute your own multiprocessing.Process sub-class, which is always non-daemonic, to be used for the worker processes.

Here’s a full example of how to do this. The important parts are the two classes NoDaemonProcess and MyPool at the top and to call pool.close() and pool.join() on your MyPool instance at the end.

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time

from random import randint


class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess

def sleepawhile(t):
    print("Sleeping %i seconds..." % t)
    time.sleep(t)
    return t

def work(num_procs):
    print("Creating %i (daemon) workers and jobs in child." % num_procs)
    pool = multiprocessing.Pool(num_procs)

    result = pool.map(sleepawhile,
        [randint(1, 5) for x in range(num_procs)])

    # The following is not really needed, since the (daemon) workers of the
    # child's pool are killed when the child is terminated, but it's good
    # practice to cleanup after ourselves anyway.
    pool.close()
    pool.join()
    return result

def test():
    print("Creating 5 (non-daemon) workers and jobs in main process.")
    pool = MyPool(5)

    result = pool.map(work, [randint(1, 5) for x in range(5)])

    pool.close()
    pool.join()
    print(result)

if __name__ == '__main__':
    test()

回答 1

我必须在Python 3.7中使用非守护程序池,并最终改编了接受的答案中发布的代码。下面是创建非守护程序池的代码段:

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(MyPool, self).__init__(*args, **kwargs)

由于的当前实现multiprocessing已被广泛地重构为基于上下文,因此我们需要提供一个NoDaemonContext具有NoDaemonProcessas属性的类。MyPool然后将使用该上下文,而不是默认上下文。

就是说,我应该警告这种方法至少有两个警告:

  1. 它仍然取决于multiprocessing软件包的实现细节,因此可能随时中断。
  2. 为什么有正当的理由multiprocessing说得那么难用非恶魔的过程,其中有许多是解释在这里。我认为最引人注目的是:

    至于允许子线程使用子进程来产生自己的子进程,如果父线程或子线程在子进程完成并返回之前终止,则可能会产生一些僵尸“孙子”。

I had the necessity to employ a non-daemonic pool in Python 3.7 and ended up adapting the code posted in the accepted answer. Below there’s the snippet that creates the non-daemonic pool:

import multiprocessing.pool

class NoDaemonProcess(multiprocessing.Process):
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, value):
        pass


class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class NestablePool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(NestablePool, self).__init__(*args, **kwargs)

As the current implementation of multiprocessing has been extensively refactored to be based on contexts, we need to provide a NoDaemonContext class that has our NoDaemonProcess as attribute. NestablePool will then use that context instead of the default one.

That said, I should warn that there are at least two caveats to this approach:

  1. It still depends on implementation details of the multiprocessing package, and could therefore break at any time.
  2. There are valid reasons why multiprocessing made it so hard to use non-daemonic processes, many of which are explained here. The most compelling in my opinion is:

As for allowing children threads to spawn off children of its own using subprocess runs the risk of creating a little army of zombie ‘grandchildren’ if either the parent or child threads terminate before the subprocess completes and returns.


回答 2

多处理模块有一个漂亮的界面使用与进程池线程。根据您当前的用例,您可能会考虑将其multiprocessing.pool.ThreadPool用于外部Pool,这将导致线程(允许从内部生成进程)而不是进程。

它可能受到GIL的限制,但是在我的特殊情况下(我都对两者进行了测试)此处Pool创建的外部进程的启动时间远远超过了解决方案的启动时间。ThreadPool


这真的很容易掉ProcessesThreads在此处此处阅读有关如何使用ThreadPool解决方案的更多信息。

The multiprocessing module has a nice interface to use pools with processes or threads. Depending on your current use case, you might consider using multiprocessing.pool.ThreadPool for your outer Pool, which will result in threads (that allow to spawn processes from within) as opposed to processes.

It might be limited by the GIL, but in my particular case (I tested both), the startup time for the processes from the outer Pool as created here far outweighed the solution with ThreadPool.


It’s really easy to swap Processes for Threads. Read more about how to use a ThreadPool solution here or here.


回答 3

在某些Python版本上,将标准Pool替换为custom会引发错误:AssertionError: group argument must be None for now

在这里,我找到了可以提供帮助的解决方案:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, val):
        pass


class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc

On some Python versions replacing standard Pool to custom can raise error: AssertionError: group argument must be None for now.

Here I found a solution that can help:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    @property
    def daemon(self):
        return False

    @daemon.setter
    def daemon(self, val):
        pass


class NoDaemonProcessPool(multiprocessing.pool.Pool):

    def Process(self, *args, **kwds):
        proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
        proc.__class__ = NoDaemonProcess

        return proc

回答 4

concurrent.futures.ProcessPoolExecutor没有这个限制。它可以有一个嵌套的过程池,完全没有问题:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()

上面的演示代码已通过Python 3.8进行了测试。

信用:jfs回答

concurrent.futures.ProcessPoolExecutor doesn’t have this limitation. It can have a nested process pool with no problem at all:

from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time

def pid():
    return current_process().pid

def _square(i):  # Runs in inner_pool
    square = i ** 2
    time.sleep(i / 10)
    print(f'{pid()=} {i=} {square=}')
    return square

def _sum_squares(i, j):  # Runs in outer_pool
    with Pool(max_workers=2) as inner_pool:
        squares = inner_pool.map(_square, (i, j))
    sum_squares = sum(squares)
    time.sleep(sum_squares ** .5)
    print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
    return sum_squares

def main():
    with Pool(max_workers=3) as outer_pool:
        for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
            print(f'{pid()=} {sum_squares=}')

if __name__ == "__main__":
    main()

The above demonstration code was tested with Python 3.8.

A limitation of ProcessPoolExecutor, however, is that it doesn’t have maxtasksperchild. If you need this, consider the answer by Massimiliano instead.

Credit: answer by jfs


回答 5

我遇到的问题是试图在模块之​​间导入全局变量,从而导致多次评估ProcessPool()行。

globals.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children

    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

然后从代码中的其他位置安全地导入

from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         

The issue I encountered was in trying to import globals between modules, causing the ProcessPool() line to get evaluated multiple times.

globals.py

from processing             import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading       import ThreadPool

class SingletonMeta(type):
    def __new__(cls, name, bases, dict):
        dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
        return super(SingletonMeta, cls).__new__(cls, name, bases, dict)

    def __init__(cls, name, bases, dict):
        super(SingletonMeta, cls).__init__(name, bases, dict)
        cls.instance = None

    def __call__(cls,*args,**kw):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
        return cls.instance

    def __deepcopy__(self, item):
        return item.__class__.instance

class Globals(object):
    __metaclass__ = SingletonMeta
    """     
    This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
     
    The root cause is that importing this file from different modules causes this file to be reevalutated each time, 
    thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug    
    """
    def __init__(self):
        print "%s::__init__()" % (self.__class__.__name__)
        self.shared_manager      = Manager()
        self.shared_process_pool = ProcessPool()
        self.shared_thread_pool  = ThreadPool()
        self.shared_lock         = Lock()        # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin

Then import safely from elsewhere in your code

from globals import Globals
Globals().shared_manager      
Globals().shared_process_pool
Globals().shared_thread_pool  
Globals().shared_lock         

I have written a more expanded wrapper class around pathos.multiprocessing here:

As a side note, if your usecase just requires async multiprocess map as a performance optimization, then joblib will manage all your process pools behind the scenes and allow this very simple syntax:

squares = Parallel(-1)( delayed(lambda num: num**2)(x) for x in range(100) )

回答 6

我见过人们通过使用称为桌球celery的叉(多处理池扩展)来处理此问题,该叉允许守护进程生成子进程。解决方法是仅通过以下方式替换模块:multiprocessingmultiprocessing

import billiard as multiprocessing

I have seen people dealing with this issue by using celery‘s fork of multiprocessing called billiard (multiprocessing pool extensions), which allows daemonic processes to spawn children. The walkaround is to simply replace the multiprocessing module by:

import billiard as multiprocessing

Python多重处理模块的.join()方法到底在做什么?

问题:Python多重处理模块的.join()方法到底在做什么?

了解Python 多重处理(来自PMOTW文章),并且希望澄清一下join()方法的。

2008年的旧教程中,它指出,没有p.join()以下代码中的调用,“子进程将处于空闲状态而不会终止,成为必须手动杀死的僵尸”。

from multiprocessing import Process

def say_hello(name='world'):
    print "Hello, %s" % name

p = Process(target=say_hello)
p.start()
p.join()

我添加的打印输出PID,以及一个time.sleep测试,并就我所知道的,在自己的进程终止:

from multiprocessing import Process
import sys
import time

def say_hello(name='world'):
    print "Hello, %s" % name
    print 'Starting:', p.name, p.pid
    sys.stdout.flush()
    print 'Exiting :', p.name, p.pid
    sys.stdout.flush()
    time.sleep(20)

p = Process(target=say_hello)
p.start()
# no p.join()

20秒内:

936 ttys000    0:00.05 /Library/Frameworks/Python.framework/Versions/2.7/Reso
938 ttys000    0:00.00 /Library/Frameworks/Python.framework/Versions/2.7/Reso
947 ttys001    0:00.13 -bash

20秒后:

947 ttys001    0:00.13 -bash

行为与p.join()在文件末尾添加回的行为相同。本周Python模块为模块提供了非常易读的解释 ; “要等到进程完成工作并退出后,请使用join()方法。”,但看来至少OS X仍在这样做。

我也想知道该方法的名称。该.join()方法在此处串联吗?它是在连接过程的结尾吗?还是只是与Python的本地.join()方法共享一个名称?

Learning about Python Multiprocessing (from a PMOTW article) and would love some clarification on what exactly the join() method is doing.

In an old tutorial from 2008 it states that without the p.join() call in the code below, “the child process will sit idle and not terminate, becoming a zombie you must manually kill”.

from multiprocessing import Process

def say_hello(name='world'):
    print "Hello, %s" % name

p = Process(target=say_hello)
p.start()
p.join()

I added a printout of the PID as well as a time.sleep to test and as far as I can tell, the process terminates on its own:

from multiprocessing import Process
import sys
import time

def say_hello(name='world'):
    print "Hello, %s" % name
    print 'Starting:', p.name, p.pid
    sys.stdout.flush()
    print 'Exiting :', p.name, p.pid
    sys.stdout.flush()
    time.sleep(20)

p = Process(target=say_hello)
p.start()
# no p.join()

within 20 seconds:

936 ttys000    0:00.05 /Library/Frameworks/Python.framework/Versions/2.7/Reso
938 ttys000    0:00.00 /Library/Frameworks/Python.framework/Versions/2.7/Reso
947 ttys001    0:00.13 -bash

after 20 seconds:

947 ttys001    0:00.13 -bash

Behavior is the same with p.join() added back at end of the file. Python Module of the Week offers a very readable explanation of the module; “To wait until a process has completed its work and exited, use the join() method.”, but it seems like at least OS X was doing that anyway.

Am also wondering about the name of the method. Is the .join() method concatenating anything here? Is it concatenating a process with it’s end? Or does it just share a name with Python’s native .join() method?


回答 0

join()方法与threading或一起使用时multiprocessing不相关str.join()-它实际上并没有将任何内容串联在一起。相反,它仅表示“等待此[线程/进程]完成”。join之所以使用该名称,是因为该multiprocessing模块的API看起来类似于该threading模块的API,并且该threading模块join用于其Thread对象。join在许多编程语言中,使用该术语表示“等待线程完成”是很常见的,因此Python也采用了它。

现在,您看到有和没有调用都延迟20秒的原因join()是因为默认情况下,当主进程准备退出时,它将隐式调用join()所有正在运行的multiprocessing.Process实例。在multiprocessing文档中并未对此进行明确说明,但在“ 编程指南”部分中进行了提及:

还请记住,非守护进程将自动加入。

您可以通过设置覆盖此行为daemon上的标志ProcessTrue之前,要启动的过程:

p = Process(target=say_hello)
p.daemon = True
p.start()
# Both parent and child will exit here, since the main process has completed.

如果这样做,则子进程将在主进程完成后立即终止

守护程序

进程的守护程序标志,一个布尔值。必须在调用start()之前进行设置。

初始值是从创建过程继承的。

进程退出时,它将尝试终止其所有守护程序子进程。

The join() method, when used with threading or multiprocessing, is not related to str.join() – it’s not actually concatenating anything together. Rather, it just means “wait for this [thread/process] to complete”. The name join is used because the multiprocessing module’s API is meant to look as similar to the threading module’s API, and the threading module uses join for its Thread object. Using the term join to mean “wait for a thread to complete” is common across many programming languages, so Python just adopted it as well.

Now, the reason you see the 20 second delay both with and without the call to join() is because by default, when the main process is ready to exit, it will implicitly call join() on all running multiprocessing.Process instances. This isn’t as clearly stated in the multiprocessing docs as it should be, but it is mentioned in the Programming Guidelines section:

Remember also that non-daemonic processes will be automatically be joined.

You can override this behavior by setting the daemon flag on the Process to True prior to starting the process:

p = Process(target=say_hello)
p.daemon = True
p.start()
# Both parent and child will exit here, since the main process has completed.

If you do that, the child process will be terminated as soon as the main process completes:

daemon

The process’s daemon flag, a Boolean value. This must be set before start() is called.

The initial value is inherited from the creating process.

When a process exits, it attempts to terminate all of its daemonic child processes.


回答 1

没有 join(),主进程可以在子进程之前完成。我不确定在什么情况下会导致僵尸。

主要目的 join()是确保子流程在主流程执行任何依赖于子流程的工作之前完成。

的词源join()是与的相反fork,后者是Unix系列操作系统中用于创建子进程的常用术语。单个过程“分叉”成多个,然后“连接”成一个。

Without the join(), the main process can complete before the child process does. I’m not sure under what circumstances that leads to zombieism.

The main purpose of join() is to ensure that a child process has completed before the main process does anything that depends on the work of the child process.

The etymology of join() is that it’s the opposite of fork, which is the common term in Unix-family operating systems for creating child processes. A single process “forks” into several, then “joins” back into one.


回答 2

我不会详细解释join它的作用,但是这里是它的词源和直觉,这应该有助于您更轻松地记住它的含义。

这个想法是执行“ 分叉 ”到多个进程中,其中一个是主进程,其余是工人(或“奴隶”)。工作人员完成后,他们“加入”主服务器,以便可以恢复串行执行。

join方法使主进程等待工作人员加入。该方法最好称为“等待”,因为这是它在主服务器中引起的实际行为(这就是POSIX中所称的内容,尽管POSIX线程也称其为“ join”)。连接仅是由于线程正确协作而产生的,不是主服务器执行的操作

自1963年以来,名称“ fork”和“ join”已在多处理中使用此含义。

I’m not going to explain in detail what join does, but here’s the etymology and the intuition behind it, which should help you remember its meaning more easily.

The idea is that execution “forks” into multiple processes of which one is the master, the rest workers (or “slaves”). When the workers are done, they “join” the master so that serial execution may be resumed.

The join method causes the master process to wait for a worker to join it. The method might better have been called “wait”, since that’s the actual behavior it causes in the master (and that’s what it’s called in POSIX, although POSIX threads call it “join” as well). The joining only occurs as an effect of the threads cooperating properly, it’s not something the master does.

The names “fork” and “join” have been used with this meaning in multiprocessing since 1963.


回答 3

join()用于等待辅助进程退出。必须先调用close()terminate()使用join()

就像@罗素提到的 的那样,join就像fork的相反对象(子流程)。

要运行加入,您必须先运行close(),这将阻止所有其他任务提交到池中,并在所有任务完成后退出。或者,terminate()通过立即停止所有工作进程退出运行。

"the child process will sit idle and not terminate, becoming a zombie you must manually kill" 当主(父)进程退出但子进程仍在运行并且完成后,它没有父进程可以将其退出状态返回时,这是可能的。

join() is used to wait for the worker processes to exit. One must call close() or terminate() before using join().

Like @Russell mentioned join is like the opposite of fork (which Spawns sub-processes).

For join to run you have to run close() which will prevent any more tasks from being submitted to the pool and exit once all tasks complete. Alternatively, running terminate() will just exit by stopping all worker processes immediately.

"the child process will sit idle and not terminate, becoming a zombie you must manually kill" this is possible when the main (parent) process exits but the child process is still running and once completed it has no parent process to return its exit status to.


回答 4

join()调用可确保在完成所有多处理过程之前不会调用代码的后续行。

例如,如果不使用join(),则restart_program()甚至在进程完成之前,也会调用以下代码,这类似于异步,不是我们想要的(您可以尝试):

num_processes = 5

for i in range(num_processes):
    p = multiprocessing.Process(target=calculate_stuff, args=(i,))
    p.start()
    processes.append(p)
for p in processes:
    p.join() # call to ensure subsequent line (e.g. restart_program) 
             # is not called until all processes finish

restart_program()

The join() call ensures that subsequent lines of your code are not called before all the multiprocessing processes are completed.

For example, without the join(), the following code will call restart_program() even before the processes finish, which is similar to asynchronous and is not what we want (you can try):

num_processes = 5

for i in range(num_processes):
    p = multiprocessing.Process(target=calculate_stuff, args=(i,))
    p.start()
    processes.append(p)
for p in processes:
    p.join() # call to ensure subsequent line (e.g. restart_program) 
             # is not called until all processes finish

restart_program()

回答 5

要等待进程完成其工作并退出,请使用join()方法。

注意终止进程后,必须对join()进程进行处理,以便让后台机制有时间更新对象的状态以反映终止。

这是一个很好的例子,帮助我理解了:这里

我个人注意到的一件事是,我的主要过程暂停了,直到孩子使用join()方法完成了过程,这首先挫败了我的使用multiprocessing.Process()意图。

To wait until a process has completed its work and exited, use the join() method.

and

Note It is important to join() the process after terminating it in order to give the background machinery time to update the status of the object to reflect the termination.

This is a good example helped me understand it: here

One thing I noticed personally was my main process paused until the child had finished its process using the join() method which defeated the point of me using multiprocessing.Process() in the first place.


在共享内存中使用numpy数组进行多处理

问题:在共享内存中使用numpy数组进行多处理

我想在共享内存中使用一个numpy数组,以便与多处理模块一起使用。困难是像numpy数组一样使用它,而不仅仅是ctypes数组。

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

这将产生如下输出:

Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]

可以ctypes方式访问该数组,例如arr[i]说得通。但是,它不是一个numpy数组,因此我无法执行-1*arr,或arr.sum()。我想一个解决方案是将ctypes数组转换为numpy数组。但是(除了无法完成这项工作之外),我不相信会再共享它。

对于必须解决的常见问题,似乎将有一个标准解决方案。

I would like to use a numpy array in shared memory for use with the multiprocessing module. The difficulty is using it like a numpy array, and not just as a ctypes array.

from multiprocessing import Process, Array
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child processes
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Printing out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

This produces output such as:

Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976]
Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976]

The array can be accessed in a ctypes manner, e.g. arr[i] makes sense. However, it is not a numpy array, and I cannot perform operations such as -1*arr, or arr.sum(). I suppose a solution would be to convert the ctypes array into a numpy array. However (besides not being able to make this work), I don’t believe it would be shared anymore.

It seems there would be a standard solution to what has to be a common problem.


回答 0

要添加到@unutbu(不再可用)和@Henry Gomersall的答案中。您可以shared_arr.get_lock()在需要时使用来同步访问:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    p.join()
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    """synchronized."""
    with shared_arr.get_lock(): # synchronize access
        g(i)

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':
    mp.freeze_support()
    main()

如果您不需要同步访问或创建自己的锁,则mp.Array()没有必要。mp.sharedctypes.RawArray在这种情况下,您可以使用。

To add to @unutbu’s (not available anymore) and @Henry Gomersall’s answers. You could use shared_arr.get_lock() to synchronize access when needed:

shared_arr = mp.Array(ctypes.c_double, N)
# ...
def f(i): # could be anything numpy accepts as an index such another numpy array
    with shared_arr.get_lock(): # synchronize access
        arr = np.frombuffer(shared_arr.get_obj()) # no data copying
        arr[i] = -arr[i]

Example

import ctypes
import logging
import multiprocessing as mp

from contextlib import closing

import numpy as np

info = mp.get_logger().info

def main():
    logger = mp.log_to_stderr()
    logger.setLevel(logging.INFO)

    # create shared array
    N, M = 100, 11
    shared_arr = mp.Array(ctypes.c_double, N)
    arr = tonumpyarray(shared_arr)

    # fill with random values
    arr[:] = np.random.uniform(size=N)
    arr_orig = arr.copy()

    # write to arr from different processes
    with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p:
        # many processes access the same slice
        stop_f = N // 10
        p.map_async(f, [slice(stop_f)]*M)

        # many processes access different slices of the same array
        assert M % 2 # odd
        step = N // 10
        p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)])
    p.join()
    assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig)

def init(shared_arr_):
    global shared_arr
    shared_arr = shared_arr_ # must be inherited, not passed as an argument

def tonumpyarray(mp_arr):
    return np.frombuffer(mp_arr.get_obj())

def f(i):
    """synchronized."""
    with shared_arr.get_lock(): # synchronize access
        g(i)

def g(i):
    """no synchronization."""
    info("start %s" % (i,))
    arr = tonumpyarray(shared_arr)
    arr[i] = -1 * arr[i]
    info("end   %s" % (i,))

if __name__ == '__main__':
    mp.freeze_support()
    main()

If you don’t need synchronized access or you create your own locks then mp.Array() is unnecessary. You could use mp.sharedctypes.RawArray in this case.


回答 1

Array对象具有get_obj()与之关联的方法,该方法返回呈现缓冲区接口的ctypes数组。我认为以下应该起作用…

from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]

运行时,它将打印出现在的第一个元素a10.0,显示ab只是进入同一内存的两个视图。

为了确保它仍然是多处理器安全的,我相信您将必须使用对象acquirerelease上存在的方法,以及其内置的锁以确保可以安全地访问所有对象(尽管我不是专家)多处理器模块)。Arraya

The Array object has a get_obj() method associated with it, which returns the ctypes array which presents a buffer interface. I think the following should work…

from multiprocessing import Process, Array
import scipy
import numpy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    a = Array('d', unshared_arr)
    print "Originally, the first two elements of arr = %s"%(a[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(a,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%a[:2]

    b = numpy.frombuffer(a.get_obj())

    b[0] = 10.0
    print a[0]

When run, this prints out the first element of a now being 10.0, showing a and b are just two views into the same memory.

In order to make sure it is still multiprocessor safe, I believe you will have to use the acquire and release methods that exist on the Array object, a, and its built in lock to make sure its all safely accessed (though I’m not an expert on the multiprocessor module).


回答 2

尽管已经给出了很好的答案,但是只要满足两个条件,就可以轻松解决此问题:

  1. 您使用的是POSIX兼容的操作系统(例如Linux,Mac OSX);和
  2. 您的子进程需要对共享阵列的只读访问权限

在这种情况下,您无需费心地显式地使变量共享,因为将使用派生来创建子进程。分叉的孩子会自动共享父母的内存空间。在Python多处理的上下文中,这意味着它共享所有模块级变量;请注意,这不适用于您显式传递给子进程或传递给a multiprocessing.Pool或此类函数的参数。

一个简单的例子:

import multiprocessing
import numpy as np

# will hold the (implicitly mem-shared) data
data_array = None

# child worker function
def job_handler(num):
    # built-in id() returns unique memory ID of a variable
    return id(data_array), np.sum(data_array)

def launch_jobs(data, num_jobs=5, num_worker=4):
    global data_array
    data_array = data

    pool = multiprocessing.Pool(num_worker)
    return pool.map(job_handler, range(num_jobs))

# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))

# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))

While the answers already given are good, there is a much easier solution to this problem provided two conditions are met:

  1. You are on a POSIX-compliant operating system (e.g. Linux, Mac OSX); and
  2. Your child processes need read-only access to the shared array.

In this case you do not need to fiddle with explicitly making variables shared, as the child processes will be created using a fork. A forked child automatically shares the parent’s memory space. In the context of Python multiprocessing, this means it shares all module-level variables; note that this does not hold for arguments that you explicitly pass to your child processes or to the functions you call on a multiprocessing.Pool or so.

A simple example:

import multiprocessing
import numpy as np

# will hold the (implicitly mem-shared) data
data_array = None

# child worker function
def job_handler(num):
    # built-in id() returns unique memory ID of a variable
    return id(data_array), np.sum(data_array)

def launch_jobs(data, num_jobs=5, num_worker=4):
    global data_array
    data_array = data

    pool = multiprocessing.Pool(num_worker)
    return pool.map(job_handler, range(num_jobs))

# create some random data and execute the child jobs
mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10)))

# this will print 'True' on POSIX OS, since the data was shared
print(np.all(np.asarray(mem_ids) == id(data_array)))

回答 3

我编写了一个小的python模块,该模块使用POSIX共享内存在python解释器之间共享numpy数组。也许您会发现它很方便。

https://pypi.python.org/pypi/SharedArray

运作方式如下:

import numpy as np
import SharedArray as sa

# Create an array in shared memory
a = sa.create("test1", 10)

# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")

# See how they are actually sharing the same memory block
a[0] = 42
print(b[0])

# Destroying a does not affect b.
del a
print(b[0])

# See how "test1" is still present in shared memory even though we
# destroyed the array a.
sa.list()

# Now destroy the array "test1" from memory.
sa.delete("test1")

# The array b is not affected, but once you destroy it then the
# data are lost.
print(b[0])

I’ve written a small python module that uses POSIX shared memory to share numpy arrays between python interpreters. Maybe you will find it handy.

https://pypi.python.org/pypi/SharedArray

Here’s how it works:

import numpy as np
import SharedArray as sa

# Create an array in shared memory
a = sa.create("test1", 10)

# Attach it as a different array. This can be done from another
# python interpreter as long as it runs on the same computer.
b = sa.attach("test1")

# See how they are actually sharing the same memory block
a[0] = 42
print(b[0])

# Destroying a does not affect b.
del a
print(b[0])

# See how "test1" is still present in shared memory even though we
# destroyed the array a.
sa.list()

# Now destroy the array "test1" from memory.
sa.delete("test1")

# The array b is not affected, but once you destroy it then the
# data are lost.
print(b[0])

回答 4

您可以使用以下sharedmem模块:https : //bitbucket.org/cleemesser/numpy-sharedmem

然后,这是您的原始代码,这一次使用行为类似于NumPy数组的共享内存(请注意调用NumPy sum()函数的其他最后一条语句):

from multiprocessing import Process
import sharedmem
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = sharedmem.empty(N)
    arr[:] = unshared_arr.copy()
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

    # Perform some NumPy operation
    print arr.sum()

You can use the sharedmem module: https://bitbucket.org/cleemesser/numpy-sharedmem

Here’s your original code then, this time using shared memory that behaves like a NumPy array (note the additional last statement calling a NumPy sum() function):

from multiprocessing import Process
import sharedmem
import scipy

def f(a):
    a[0] = -a[0]

if __name__ == '__main__':
    # Create the array
    N = int(10)
    unshared_arr = scipy.rand(N)
    arr = sharedmem.empty(N)
    arr[:] = unshared_arr.copy()
    print "Originally, the first two elements of arr = %s"%(arr[:2])

    # Create, start, and finish the child process
    p = Process(target=f, args=(arr,))
    p.start()
    p.join()

    # Print out the changed values
    print "Now, the first two elements of arr = %s"%arr[:2]

    # Perform some NumPy operation
    print arr.sum()