问题:多处理:在进程之间共享一个大型只读对象?

通过处理程序生成的子进程是否共享程序早期创建的对象?

我有以下设置:

do_some_processing(filename):
    for line in file(filename):
        if line.split(',')[0] in big_lookup_object:
            # something here

if __name__ == '__main__':
    big_lookup_object = marshal.load('file.bin')
    pool = Pool(processes=4)
    print pool.map(do_some_processing, glob.glob('*.data'))

我正在将一些大对象加载到内存中,然后创建一个需要利用该大对象的工作池。大对象是只读访问的,我不需要在进程之间传递对它的修改。

我的问题是:大对象是否已加载到共享内存中,就像我在unix / c中生成进程一样,还是每个进程都加载了自己的大对象副本?

更新:进一步说明-big_lookup_object是共享的查找对象。我不需要将其拆分并单独处理。我需要保留一个副本。我需要拆分的工作是读取许多其他大文件,并针对查找对象查找那些大文件中的项目。

进一步更新:数据库是一个很好的解决方案,memcached可能是一个更好的解决方案,磁盘上的文件(机架或dbm)可能更好。在这个问题上,我对内存解决方案特别感兴趣。对于最终的解决方案,我将使用hadoop,但我想看看是否也可以具有本地内存版本。

Do child processes spawned via multiprocessing share objects created earlier in the program?

I have the following setup:

do_some_processing(filename):
    for line in file(filename):
        if line.split(',')[0] in big_lookup_object:
            # something here

if __name__ == '__main__':
    big_lookup_object = marshal.load('file.bin')
    pool = Pool(processes=4)
    print pool.map(do_some_processing, glob.glob('*.data'))

I’m loading some big object into memory, then creating a pool of workers that need to make use of that big object. The big object is accessed read-only, I don’t need to pass modifications of it between processes.

My question is: is the big object loaded into shared memory, as it would be if I spawned a process in unix/c, or does each process load its own copy of the big object?

Update: to clarify further – big_lookup_object is a shared lookup object. I don’t need to split that up and process it separately. I need to keep a single copy of it. The work that I need to split it is reading lots of other large files and looking up the items in those large files against the lookup object.

Further update: database is a fine solution, memcached might be a better solution, and file on disk (shelve or dbm) might be even better. In this question I was particularly interested in an in memory solution. For the final solution I’ll be using hadoop, but I wanted to see if I can have a local in-memory version as well.


回答 0

“子进程是否通过多进程共享对象而在程序中早先创建?”

否(3.8之前的python),以及3.8中的是(https://docs.python.org/3/library/multiprocessing.shared_memory.html#module-multiprocessing.shared_memory

进程具有独立的内存空间。

解决方案1

为了充分利用有很多工人的大型结构,请执行此操作。

  1. 将每个工作程序写为“过滤器” –从stdin读取中间结果,执行工作,在stdout上写入中间结果。

  2. 将所有工作人员连接为管道:

    process1 <source | process2 | process3 | ... | processn >result

每个过程都读取,执行和写入。

由于所有进程同时运行,因此非常高效。读写直接通过进程之间的共享缓冲区。


解决方案2

在某些情况下,您的结构更复杂-通常是“扇形”结构。在这种情况下,您的父母有多个孩子。

  1. 父级打开源数据。父母分叉了许多孩子。

  2. 父级读取源,将源的一部分分配给每个同时运行的子级。

  3. 当父级到达末尾时,关闭管道。子档结束并正常完成。

孩子的部分写作愉快,因为每个孩子都简单阅读sys.stdin

父母在产卵所有孩子和正确固定管道方面有一些花哨的步法,但这还不错。

扇入是相反的结构。许多独立运行的流程需要将其输入交织到一个通用流程中。收集器不那么容易编写,因为它必须从许多来源读取。

通常使用该select模块从许多命名管道中进行读取,以查看哪些管道具有待处理的输入。


解决方案3

共享查找是数据库的定义。

解决方案3A –加载数据库。让工作人员处理数据库中的数据。

解决方案3B –使用werkzeug(或类似工具)创建一个非常简单的服务器,以提供响应HTTP GET的WSGI应用程序,以便工作人员可以查询服务器。


解决方案4

共享文件系统对象。Unix OS提供共享内存对象。这些只是映射到内存的文件,因此可以完成交换I / O的工作,而不是更多的常规缓冲读取。

您可以通过多种方式在Python上下文中执行此操作

  1. 编写一个启动程序,该程序(1)将原始的巨大对象分解为较小的对象,(2)启动工作程序,每个工作程序均具有较小的对象。较小的对象可以用Python对象腌制,以节省一小部分文件读取时间。

  2. 编写一个启动程序,该程序(1)使用seek操作来确保您可以通过简单的查找轻松找到各个部分,从而读取原始的巨大对象并写入页面结构的字节编码文件。这就是数据库引擎的工作–将数据分成页面,使每个页面都可以通过轻松定位seek

    具有此大型页面结构文件的Spawn工人可以访问。每个工人都可以查找相关部分并在那里进行工作。

Do child processes spawned via multiprocessing share objects created earlier in the program?

No (python before 3.8), and Yes in 3.8

Processes have independent memory space.

Solution 1

To make best use of a large structure with lots of workers, do this.

  1. Write each worker as a “filter” – reads intermediate results from stdin, does work, writes intermediate results on stdout.

  2. Connect all the workers as a pipeline:

    process1 <source | process2 | process3 | ... | processn >result
    

Each process reads, does work and writes.

This is remarkably efficient since all processes are running concurrently. The writes and reads pass directly through shared buffers between the processes.


Solution 2

In some cases, you have a more complex structure – often a fan-out structure. In this case you have a parent with multiple children.

  1. Parent opens source data. Parent forks a number of children.

  2. Parent reads source, farms parts of the source out to each concurrently running child.

  3. When parent reaches the end, close the pipe. Child gets end of file and finishes normally.

The child parts are pleasant to write because each child simply reads sys.stdin.

The parent has a little bit of fancy footwork in spawning all the children and retaining the pipes properly, but it’s not too bad.

Fan-in is the opposite structure. A number of independently running processes need to interleave their inputs into a common process. The collector is not as easy to write, since it has to read from many sources.

Reading from many named pipes is often done using the select module to see which pipes have pending input.


Solution 3

Shared lookup is the definition of a database.

Solution 3A – load a database. Let the workers process the data in the database.

Solution 3B – create a very simple server using werkzeug (or similar) to provide WSGI applications that respond to HTTP GET so the workers can query the server.


Solution 4

Shared filesystem object. Unix OS offers shared memory objects. These are just files that are mapped to memory so that swapping I/O is done instead of more convention buffered reads.

You can do this from a Python context in several ways

  1. Write a startup program that (1) breaks your original gigantic object into smaller objects, and (2) starts workers, each with a smaller object. The smaller objects could be pickled Python objects to save a tiny bit of file reading time.

  2. Write a startup program that (1) reads your original gigantic object and writes a page-structured, byte-coded file using seek operations to assure that individual sections are easy to find with simple seeks. This is what a database engine does – break the data into pages, make each page easy to locate via a seek.

Spawn workers with access this this large page-structured file. Each worker can seek to the relevant parts and do their work there.


回答 1

通过多处理程序生成的子进程是否共享程序早期创建的对象?

这取决于。对于全局只读变量,通常可以这样考虑(除了消耗的内存),否则应该不这样做。

multiprocessing的文档说:

Better to inherit than pickle/unpickle

在Windows上,多处理中的许多类型需要可腌制,以便子进程可以使用它们。但是,通常应该避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问在其他位置创建的共享资源的进程可以从祖先进程继承该程序。

Explicitly pass resources to child processes

在Unix上,子进程可以利用在父进程中使用全局资源创建的共享资源。但是,最好将对象作为参数传递给子进程的构造函数。

除了使代码(可能)与Windows兼容之外,这还确保只要子进程仍然存在,就不会在父进程中垃圾收集对象。如果在父进程中垃圾回收对象时释放了一些资源,这可能很重要。

Global variables

请记住,如果在子进程中运行的代码尝试访问全局变量,则它看到的值(如果有)可能与调用Process.start()时父进程中的值不同。 。

在Windows(单CPU)上:

#!/usr/bin/env python
import os, sys, time
from multiprocessing import Pool

x = 23000 # replace `23` due to small integers share representation
z = []    # integers are immutable, let's try mutable object

def printx(y):
    global x
    if y == 3:
       x = -x
    z.append(y)
    print os.getpid(), x, id(x), z, id(z) 
    print y
    if len(sys.argv) == 2 and sys.argv[1] == "sleep":
       time.sleep(.1) # should make more apparant the effect

if __name__ == '__main__':
    pool = Pool(processes=4)
    pool.map(printx, (1,2,3,4))

sleep

$ python26 test_share.py sleep
2504 23000 11639492 [1] 10774408
1
2564 23000 11639492 [2] 10774408
2
2504 -23000 11639384 [1, 3] 10774408
3
4084 23000 11639492 [4] 10774408
4

没有sleep

$ python26 test_share.py
1148 23000 11639492 [1] 10774408
1
1148 23000 11639492 [1, 2] 10774408
2
1148 -23000 11639324 [1, 2, 3] 10774408
3
1148 -23000 11639324 [1, 2, 3, 4] 10774408
4

Do child processes spawned via multiprocessing share objects created earlier in the program?

It depends. For global read-only variables it can be often considered so (apart from the memory consumed) else it should not.

multiprocessing‘s documentation says:

Better to inherit than pickle/unpickle

On Windows many types from multiprocessing need to be picklable so that child processes can use them. However, one should generally avoid sending shared objects to other processes using pipes or queues. Instead you should arrange the program so that a process which need access to a shared resource created elsewhere can inherit it from an ancestor process.

Explicitly pass resources to child processes

On Unix a child process can make use of a shared resource created in a parent process using a global resource. However, it is better to pass the object as an argument to the constructor for the child process.

Apart from making the code (potentially) compatible with Windows this also ensures that as long as the child process is still alive the object will not be garbage collected in the parent process. This might be important if some resource is freed when the object is garbage collected in the parent process.

Global variables

Bear in mind that if code run in a child process tries to access a global variable, then the value it sees (if any) may not be the same as the value in the parent process at the time that Process.start() was called.

Example

On Windows (single CPU):

#!/usr/bin/env python
import os, sys, time
from multiprocessing import Pool

x = 23000 # replace `23` due to small integers share representation
z = []    # integers are immutable, let's try mutable object

def printx(y):
    global x
    if y == 3:
       x = -x
    z.append(y)
    print os.getpid(), x, id(x), z, id(z) 
    print y
    if len(sys.argv) == 2 and sys.argv[1] == "sleep":
       time.sleep(.1) # should make more apparant the effect

if __name__ == '__main__':
    pool = Pool(processes=4)
    pool.map(printx, (1,2,3,4))

With sleep:

$ python26 test_share.py sleep
2504 23000 11639492 [1] 10774408
1
2564 23000 11639492 [2] 10774408
2
2504 -23000 11639384 [1, 3] 10774408
3
4084 23000 11639492 [4] 10774408
4

Without sleep:

$ python26 test_share.py
1148 23000 11639492 [1] 10774408
1
1148 23000 11639492 [1, 2] 10774408
2
1148 -23000 11639324 [1, 2, 3] 10774408
3
1148 -23000 11639324 [1, 2, 3, 4] 10774408
4

回答 2

S.Lott是正确的。Python的多处理快捷方式有效地为您提供了一个单独的重复内存块。

os.fork()实际上,在大多数* nix系统上,使用低级调用将为您提供写时复制内存,这可能就是您正在考虑的内容。从理论上讲,AFAIK在最简单的程序中,您可以读取数据而不必重复数据。

但是,Python解释器中的事情并不是那么简单。对象数据和元数据存储在同一内存段中,因此,即使对象永不更改,类似该对象的参考计数器之类的操作也会导致内存写入,从而导致复制。几乎所有比“ print’hello’”做更多事情的Python程序都会导致引用计数增加,因此您可能永远不会意识到写时复制的好处。

即使有人确实设法用Python破解了共享内存解决方案,尝试在各个进程之间协调垃圾回收也可能很痛苦。

S.Lott is correct. Python’s multiprocessing shortcuts effectively give you a separate, duplicated chunk of memory.

On most *nix systems, using a lower-level call to os.fork() will, in fact, give you copy-on-write memory, which might be what you’re thinking. AFAIK, in theory, in the most simplistic of programs possible, you could read from that data without having it duplicated.

However, things aren’t quite that simple in the Python interpreter. Object data and meta-data are stored in the same memory segment, so even if the object never changes, something like a reference counter for that object being incremented will cause a memory write, and therefore a copy. Almost any Python program that is doing more than “print ‘hello'” will cause reference count increments, so you will likely never realize the benefit of copy-on-write.

Even if someone did manage to hack a shared-memory solution in Python, trying to coordinate garbage collection across processes would probably be pretty painful.


回答 3

如果您在Unix下运行,由于fork的工作方式,它们可能共享同一个对象(即,子进程具有单独的内存,但是它是写时复制的,因此只要没有人修改,它就可以共享)。我尝试了以下方法:

import multiprocessing

x = 23

def printx(y):
    print x, id(x)
    print y

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    pool.map(printx, (1,2,3,4))

并得到以下输出:

$ ./mtest.py
23 22995656
1个
23 22995656
2
23 22995656
3
23 22995656
4

当然,这并不能证明尚未创建副本,但是您应该能够通过查看输出ps以查看每个子进程使用了​​多少实际内存来验证您的情况。

If you’re running under Unix, they may share the same object, due to how fork works (i.e., the child processes have separate memory but it’s copy-on-write, so it may be shared as long as nobody modifies it). I tried the following:

import multiprocessing

x = 23

def printx(y):
    print x, id(x)
    print y

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=4)
    pool.map(printx, (1,2,3,4))

and got the following output:

$ ./mtest.py
23 22995656
1
23 22995656
2
23 22995656
3
23 22995656
4

Of course this doesn’t prove that a copy hasn’t been made, but you should be able to verify that in your situation by looking at the output of ps to see how much real memory each subprocess is using.


回答 4

不同的进程具有不同的地址空间。就像运行解释器的不同实例一样。这就是IPC(进程间通信)的目的。

您可以为此使用队列或管道。如果要稍后在网络上分发进程,也可以在TCP上使用rpc。

http://docs.python.org/dev/library/multiprocessing.html#exchanging-objects-between-processes

Different processes have different address space. Like running different instances of the interpreter. That’s what IPC (interprocess communication) is for.

You can use either queues or pipes for this purpose. You can also use rpc over tcp if you want to distribute the processes over a network later.

http://docs.python.org/dev/library/multiprocessing.html#exchanging-objects-between-processes


回答 5

本身与多处理并没有直接关系,但是从您的示例来看,您似乎可以只使用搁置模块或类似的模块。“ big_lookup_object”是否真的必须完全在内存中?

Not directly related to multiprocessing per se, but from your example, it would seem you could just use the shelve module or something like that. Does the “big_lookup_object” really have to be completely in memory?


回答 6

否,但是您可以将数据作为子进程加载,并允许其与其他子进程共享数据。见下文。

import time
import multiprocessing

def load_data( queue_load, n_processes )

    ... load data here into some_variable

    """
    Store multiple copies of the data into
    the data queue. There needs to be enough
    copies available for each process to access. 
    """

    for i in range(n_processes):
        queue_load.put(some_variable)


def work_with_data( queue_data, queue_load ):

    # Wait for load_data() to complete
    while queue_load.empty():
        time.sleep(1)

    some_variable = queue_load.get()

    """
    ! Tuples can also be used here
    if you have multiple data files
    you wish to keep seperate.  
    a,b = queue_load.get()
    """

    ... do some stuff, resulting in new_data

    # store it in the queue
    queue_data.put(new_data)


def start_multiprocess():

    n_processes = 5

    processes = []
    stored_data = []

    # Create two Queues
    queue_load = multiprocessing.Queue()
    queue_data = multiprocessing.Queue()

    for i in range(n_processes):

        if i == 0:
            # Your big data file will be loaded here...
            p = multiprocessing.Process(target = load_data,
            args=(queue_load, n_processes))

            processes.append(p)
            p.start()   

        # ... and then it will be used here with each process
        p = multiprocessing.Process(target = work_with_data,
        args=(queue_data, queue_load))

        processes.append(p)
        p.start()

    for i in range(n_processes)
        new_data = queue_data.get()
        stored_data.append(new_data)    

    for p in processes:
        p.join()
    print(processes)    

No, but you can load your data as a child process and allow it to share its data with other children. see below.

import time
import multiprocessing

def load_data( queue_load, n_processes )

    ... load data here into some_variable

    """
    Store multiple copies of the data into
    the data queue. There needs to be enough
    copies available for each process to access. 
    """

    for i in range(n_processes):
        queue_load.put(some_variable)


def work_with_data( queue_data, queue_load ):

    # Wait for load_data() to complete
    while queue_load.empty():
        time.sleep(1)

    some_variable = queue_load.get()

    """
    ! Tuples can also be used here
    if you have multiple data files
    you wish to keep seperate.  
    a,b = queue_load.get()
    """

    ... do some stuff, resulting in new_data

    # store it in the queue
    queue_data.put(new_data)


def start_multiprocess():

    n_processes = 5

    processes = []
    stored_data = []

    # Create two Queues
    queue_load = multiprocessing.Queue()
    queue_data = multiprocessing.Queue()

    for i in range(n_processes):

        if i == 0:
            # Your big data file will be loaded here...
            p = multiprocessing.Process(target = load_data,
            args=(queue_load, n_processes))

            processes.append(p)
            p.start()   

        # ... and then it will be used here with each process
        p = multiprocessing.Process(target = work_with_data,
        args=(queue_data, queue_load))

        processes.append(p)
        p.start()

    for i in range(n_processes)
        new_data = queue_data.get()
        stored_data.append(new_data)    

    for p in processes:
        p.join()
    print(processes)    

回答 7

对于Linux / Unix / MacOS平台,forkmap是一种快捷的解决方案。

For Linux/Unix/MacOS platform, forkmap is a quick-and-dirty solution.


声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。