问题:多处理:在进程之间共享一个大型只读对象?
通过多处理程序生成的子进程是否共享程序早期创建的对象?
我有以下设置:
do_some_processing(filename):
for line in file(filename):
if line.split(',')[0] in big_lookup_object:
# something here
if __name__ == '__main__':
big_lookup_object = marshal.load('file.bin')
pool = Pool(processes=4)
print pool.map(do_some_processing, glob.glob('*.data'))
我正在将一些大对象加载到内存中,然后创建一个需要利用该大对象的工作池。大对象是只读访问的,我不需要在进程之间传递对它的修改。
我的问题是:大对象是否已加载到共享内存中,就像我在unix / c中生成进程一样,还是每个进程都加载了自己的大对象副本?
更新:进一步说明-big_lookup_object是共享的查找对象。我不需要将其拆分并单独处理。我需要保留一个副本。我需要拆分的工作是读取许多其他大文件,并针对查找对象查找那些大文件中的项目。
进一步更新:数据库是一个很好的解决方案,memcached可能是一个更好的解决方案,磁盘上的文件(机架或dbm)可能更好。在这个问题上,我对内存解决方案特别感兴趣。对于最终的解决方案,我将使用hadoop,但我想看看是否也可以具有本地内存版本。
回答 0
“子进程是否通过多进程共享对象而在程序中早先创建?”
否(3.8之前的python),以及3.8中的是(https://docs.python.org/3/library/multiprocessing.shared_memory.html#module-multiprocessing.shared_memory)
进程具有独立的内存空间。
解决方案1
为了充分利用有很多工人的大型结构,请执行此操作。
将每个工作程序写为“过滤器” –从stdin读取中间结果,执行工作,在stdout上写入中间结果。
将所有工作人员连接为管道:
process1 <source | process2 | process3 | ... | processn >result
每个过程都读取,执行和写入。
由于所有进程同时运行,因此非常高效。读写直接通过进程之间的共享缓冲区。
解决方案2
在某些情况下,您的结构更复杂-通常是“扇形”结构。在这种情况下,您的父母有多个孩子。
父级打开源数据。父母分叉了许多孩子。
父级读取源,将源的一部分分配给每个同时运行的子级。
当父级到达末尾时,关闭管道。子档结束并正常完成。
孩子的部分写作愉快,因为每个孩子都简单阅读sys.stdin
。
父母在产卵所有孩子和正确固定管道方面有一些花哨的步法,但这还不错。
扇入是相反的结构。许多独立运行的流程需要将其输入交织到一个通用流程中。收集器不那么容易编写,因为它必须从许多来源读取。
通常使用该select
模块从许多命名管道中进行读取,以查看哪些管道具有待处理的输入。
解决方案3
共享查找是数据库的定义。
解决方案3A –加载数据库。让工作人员处理数据库中的数据。
解决方案3B –使用werkzeug(或类似工具)创建一个非常简单的服务器,以提供响应HTTP GET的WSGI应用程序,以便工作人员可以查询服务器。
解决方案4
共享文件系统对象。Unix OS提供共享内存对象。这些只是映射到内存的文件,因此可以完成交换I / O的工作,而不是更多的常规缓冲读取。
您可以通过多种方式在Python上下文中执行此操作
编写一个启动程序,该程序(1)将原始的巨大对象分解为较小的对象,(2)启动工作程序,每个工作程序均具有较小的对象。较小的对象可以用Python对象腌制,以节省一小部分文件读取时间。
编写一个启动程序,该程序(1)使用
seek
操作来确保您可以通过简单的查找轻松找到各个部分,从而读取原始的巨大对象并写入页面结构的字节编码文件。这就是数据库引擎的工作–将数据分成页面,使每个页面都可以通过轻松定位seek
。具有此大型页面结构文件的Spawn工人可以访问。每个工人都可以查找相关部分并在那里进行工作。
回答 1
通过多处理程序生成的子进程是否共享程序早期创建的对象?
这取决于。对于全局只读变量,通常可以这样考虑(除了消耗的内存),否则应该不这样做。
multiprocessing的文档说:
Better to inherit than pickle/unpickle
在Windows上,多处理中的许多类型需要可腌制,以便子进程可以使用它们。但是,通常应该避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问在其他位置创建的共享资源的进程可以从祖先进程继承该程序。
Explicitly pass resources to child processes
在Unix上,子进程可以利用在父进程中使用全局资源创建的共享资源。但是,最好将对象作为参数传递给子进程的构造函数。
除了使代码(可能)与Windows兼容之外,这还确保只要子进程仍然存在,就不会在父进程中垃圾收集对象。如果在父进程中垃圾回收对象时释放了一些资源,这可能很重要。
Global variables
请记住,如果在子进程中运行的代码尝试访问全局变量,则它看到的值(如果有)可能与调用Process.start()时父进程中的值不同。 。
例
在Windows(单CPU)上:
#!/usr/bin/env python
import os, sys, time
from multiprocessing import Pool
x = 23000 # replace `23` due to small integers share representation
z = [] # integers are immutable, let's try mutable object
def printx(y):
global x
if y == 3:
x = -x
z.append(y)
print os.getpid(), x, id(x), z, id(z)
print y
if len(sys.argv) == 2 and sys.argv[1] == "sleep":
time.sleep(.1) # should make more apparant the effect
if __name__ == '__main__':
pool = Pool(processes=4)
pool.map(printx, (1,2,3,4))
与sleep
:
$ python26 test_share.py sleep
2504 23000 11639492 [1] 10774408
1
2564 23000 11639492 [2] 10774408
2
2504 -23000 11639384 [1, 3] 10774408
3
4084 23000 11639492 [4] 10774408
4
没有sleep
:
$ python26 test_share.py
1148 23000 11639492 [1] 10774408
1
1148 23000 11639492 [1, 2] 10774408
2
1148 -23000 11639324 [1, 2, 3] 10774408
3
1148 -23000 11639324 [1, 2, 3, 4] 10774408
4
回答 2
S.Lott是正确的。Python的多处理快捷方式有效地为您提供了一个单独的重复内存块。
os.fork()
实际上,在大多数* nix系统上,使用低级调用将为您提供写时复制内存,这可能就是您正在考虑的内容。从理论上讲,AFAIK在最简单的程序中,您可以读取数据而不必重复数据。
但是,Python解释器中的事情并不是那么简单。对象数据和元数据存储在同一内存段中,因此,即使对象永不更改,类似该对象的参考计数器之类的操作也会导致内存写入,从而导致复制。几乎所有比“ print’hello’”做更多事情的Python程序都会导致引用计数增加,因此您可能永远不会意识到写时复制的好处。
即使有人确实设法用Python破解了共享内存解决方案,尝试在各个进程之间协调垃圾回收也可能很痛苦。
回答 3
如果您在Unix下运行,由于fork的工作方式,它们可能共享同一个对象(即,子进程具有单独的内存,但是它是写时复制的,因此只要没有人修改,它就可以共享)。我尝试了以下方法:
import multiprocessing
x = 23
def printx(y):
print x, id(x)
print y
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=4)
pool.map(printx, (1,2,3,4))
并得到以下输出:
$ ./mtest.py 23 22995656 1个 23 22995656 2 23 22995656 3 23 22995656 4
当然,这并不能证明尚未创建副本,但是您应该能够通过查看输出ps
以查看每个子进程使用了多少实际内存来验证您的情况。
回答 4
不同的进程具有不同的地址空间。就像运行解释器的不同实例一样。这就是IPC(进程间通信)的目的。
您可以为此使用队列或管道。如果要稍后在网络上分发进程,也可以在TCP上使用rpc。
http://docs.python.org/dev/library/multiprocessing.html#exchanging-objects-between-processes
回答 5
本身与多处理并没有直接关系,但是从您的示例来看,您似乎可以只使用搁置模块或类似的模块。“ big_lookup_object”是否真的必须完全在内存中?
回答 6
否,但是您可以将数据作为子进程加载,并允许其与其他子进程共享数据。见下文。
import time
import multiprocessing
def load_data( queue_load, n_processes )
... load data here into some_variable
"""
Store multiple copies of the data into
the data queue. There needs to be enough
copies available for each process to access.
"""
for i in range(n_processes):
queue_load.put(some_variable)
def work_with_data( queue_data, queue_load ):
# Wait for load_data() to complete
while queue_load.empty():
time.sleep(1)
some_variable = queue_load.get()
"""
! Tuples can also be used here
if you have multiple data files
you wish to keep seperate.
a,b = queue_load.get()
"""
... do some stuff, resulting in new_data
# store it in the queue
queue_data.put(new_data)
def start_multiprocess():
n_processes = 5
processes = []
stored_data = []
# Create two Queues
queue_load = multiprocessing.Queue()
queue_data = multiprocessing.Queue()
for i in range(n_processes):
if i == 0:
# Your big data file will be loaded here...
p = multiprocessing.Process(target = load_data,
args=(queue_load, n_processes))
processes.append(p)
p.start()
# ... and then it will be used here with each process
p = multiprocessing.Process(target = work_with_data,
args=(queue_data, queue_load))
processes.append(p)
p.start()
for i in range(n_processes)
new_data = queue_data.get()
stored_data.append(new_data)
for p in processes:
p.join()
print(processes)
回答 7
对于Linux / Unix / MacOS平台,forkmap是一种快捷的解决方案。