问题:Python进程池非守护进程?
是否可以创建非守护进程的python池?我希望一个池能够调用内部有另一个池的函数。
我想要这个,因为守护进程无法创建进程。具体来说,它将导致错误:
AssertionError: daemonic processes are not allowed to have children
例如,考虑这样一种情况:其中function_a
有一个运行的池,function_b
而有一个运行的池function_c
。该功能链将失败,因为function_b
它正在守护进程中运行,并且守护进程无法创建进程。
回答 0
在multiprocessing.pool.Pool
类创建在其工作进程__init__
的方法,使他们邪,开始他们,这是不可能自己重新设置daemon
属性False
在开始之前(事后这是不允许的了)。但是,您可以创建自己的multiprocesing.pool.Pool
(multiprocessing.Pool
只是包装函数)multiprocessing.Process
子类,并替换您自己的子类(该子类始终是非守护程序的)用于工作进程。
这是如何执行此操作的完整示例。重要的部分是两个类NoDaemonProcess
,MyPool
在顶部pool.close()
,pool.join()
在您的MyPool
实例的最后,是要调用的类。
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import multiprocessing
# We must import this explicitly, it is not imported by the top-level
# multiprocessing module.
import multiprocessing.pool
import time
from random import randint
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
Process = NoDaemonProcess
def sleepawhile(t):
print("Sleeping %i seconds..." % t)
time.sleep(t)
return t
def work(num_procs):
print("Creating %i (daemon) workers and jobs in child." % num_procs)
pool = multiprocessing.Pool(num_procs)
result = pool.map(sleepawhile,
[randint(1, 5) for x in range(num_procs)])
# The following is not really needed, since the (daemon) workers of the
# child's pool are killed when the child is terminated, but it's good
# practice to cleanup after ourselves anyway.
pool.close()
pool.join()
return result
def test():
print("Creating 5 (non-daemon) workers and jobs in main process.")
pool = MyPool(5)
result = pool.map(work, [randint(1, 5) for x in range(5)])
pool.close()
pool.join()
print(result)
if __name__ == '__main__':
test()
回答 1
我必须在Python 3.7中使用非守护程序池,并最终改编了接受的答案中发布的代码。下面是创建非守护程序池的代码段:
class NoDaemonProcess(multiprocessing.Process):
@property
def daemon(self):
return False
@daemon.setter
def daemon(self, value):
pass
class NoDaemonContext(type(multiprocessing.get_context())):
Process = NoDaemonProcess
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class MyPool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
kwargs['context'] = NoDaemonContext()
super(MyPool, self).__init__(*args, **kwargs)
由于的当前实现multiprocessing
已被广泛地重构为基于上下文,因此我们需要提供一个NoDaemonContext
具有NoDaemonProcess
as属性的类。MyPool
然后将使用该上下文,而不是默认上下文。
就是说,我应该警告这种方法至少有两个警告:
- 它仍然取决于
multiprocessing
软件包的实现细节,因此可能随时中断。 - 为什么有正当的理由
multiprocessing
说得那么难用非恶魔的过程,其中有许多是解释在这里。我认为最引人注目的是:至于允许子线程使用子进程来产生自己的子进程,如果父线程或子线程在子进程完成并返回之前终止,则可能会产生一些僵尸“孙子”。
回答 2
回答 3
在某些Python版本上,将标准Pool替换为custom会引发错误:AssertionError: group argument must be None for now
。
在这里,我找到了可以提供帮助的解决方案:
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
@property
def daemon(self):
return False
@daemon.setter
def daemon(self, val):
pass
class NoDaemonProcessPool(multiprocessing.pool.Pool):
def Process(self, *args, **kwds):
proc = super(NoDaemonProcessPool, self).Process(*args, **kwds)
proc.__class__ = NoDaemonProcess
return proc
回答 4
concurrent.futures.ProcessPoolExecutor
没有这个限制。它可以有一个嵌套的过程池,完全没有问题:
from concurrent.futures import ProcessPoolExecutor as Pool
from itertools import repeat
from multiprocessing import current_process
import time
def pid():
return current_process().pid
def _square(i): # Runs in inner_pool
square = i ** 2
time.sleep(i / 10)
print(f'{pid()=} {i=} {square=}')
return square
def _sum_squares(i, j): # Runs in outer_pool
with Pool(max_workers=2) as inner_pool:
squares = inner_pool.map(_square, (i, j))
sum_squares = sum(squares)
time.sleep(sum_squares ** .5)
print(f'{pid()=}, {i=}, {j=} {sum_squares=}')
return sum_squares
def main():
with Pool(max_workers=3) as outer_pool:
for sum_squares in outer_pool.map(_sum_squares, range(5), repeat(3)):
print(f'{pid()=} {sum_squares=}')
if __name__ == "__main__":
main()
上面的演示代码已通过Python 3.8进行了测试。
信用:jfs回答
回答 5
我遇到的问题是试图在模块之间导入全局变量,从而导致多次评估ProcessPool()行。
globals.py
from processing import Manager, Lock
from pathos.multiprocessing import ProcessPool
from pathos.threading import ThreadPool
class SingletonMeta(type):
def __new__(cls, name, bases, dict):
dict['__deepcopy__'] = dict['__copy__'] = lambda self, *args: self
return super(SingletonMeta, cls).__new__(cls, name, bases, dict)
def __init__(cls, name, bases, dict):
super(SingletonMeta, cls).__init__(name, bases, dict)
cls.instance = None
def __call__(cls,*args,**kw):
if cls.instance is None:
cls.instance = super(SingletonMeta, cls).__call__(*args, **kw)
return cls.instance
def __deepcopy__(self, item):
return item.__class__.instance
class Globals(object):
__metaclass__ = SingletonMeta
"""
This class is a workaround to the bug: AssertionError: daemonic processes are not allowed to have children
The root cause is that importing this file from different modules causes this file to be reevalutated each time,
thus ProcessPool() gets reexecuted inside that child thread, thus causing the daemonic processes bug
"""
def __init__(self):
print "%s::__init__()" % (self.__class__.__name__)
self.shared_manager = Manager()
self.shared_process_pool = ProcessPool()
self.shared_thread_pool = ThreadPool()
self.shared_lock = Lock() # BUG: Windows: global name 'lock' is not defined | doesn't affect cygwin
然后从代码中的其他位置安全地导入
from globals import Globals
Globals().shared_manager
Globals().shared_process_pool
Globals().shared_thread_pool
Globals().shared_lock
回答 6
我见过人们通过使用称为桌球celery
的叉(多处理池扩展)来处理此问题,该叉允许守护进程生成子进程。解决方法是仅通过以下方式替换模块:multiprocessing
multiprocessing
import billiard as multiprocessing