显示Python多处理池imap_unordered调用的进度?

问题:显示Python多处理池imap_unordered调用的进度?

我有一个脚本可以通过imap_unordered()调用成功完成多处理池任务集:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

但是,我num_tasks大约是250,000,因此join()锁将主线程锁定了10秒钟左右,我希望能够逐步回显命令行以显示主进程未锁定。就像是:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(2)

是否有用于结果对象或池本身的方法来指示剩余任务数?我尝试使用multiprocessing.Value对象作为计数器(在完成任务后do_work调用counter.value += 1操作),但是在停止递增之前,计数器只能达到总值的〜85%。

I have a script that’s successfully doing a multiprocessing Pool set of tasks with a imap_unordered() call:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion

However, my num_tasks is around 250,000, and so the join() locks the main thread for 10 seconds or so, and I’d like to be able to echo out to the command line incrementally to show the main process isn’t locked. Something like:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
  if (remaining == 0): break # Jump out of while loop
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(2)

Is there a method for the result object or the pool itself that indicates the number of tasks remaining? I tried using a multiprocessing.Value object as a counter (do_work calls a counter.value += 1 action after doing its task), but the counter only gets to ~85% of the total value before stopping incrementing.


回答 0

无需访问结果集的私有属性:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

There is no need to access private attributes of the result set:

from __future__ import division
import sys

for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
    sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))

回答 1

我个人的最爱-在事物并行运行和提交时为您提供一个不错的进度条和完成ETA。

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass

My personal favorite — gives you a nice little progress bar and completion ETA while things run and commit in parallel.

from multiprocessing import Pool
import tqdm

pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
    pass

回答 2

我发现在尝试检查进度时,该工作已经完成。这就是使用tqdm对我有用的

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

这应该适用于所有类型的多处理,无论它们是否阻塞。

I found that the work was already done by the time I tried to check it’s progress. This is what worked for me using tqdm.

pip install tqdm

from multiprocessing import Pool
from tqdm import tqdm

tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))

def do_work(x):
    # do something with x
    pbar.update(1)

pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()

This should work with all flavors of multiprocessing, whether they block or not.


回答 3

找到了答案我自己有一些挖:以一看__dict__的的imap_unordered结果对象,我发现它有一个_index属性,它与每个任务完成的增量。因此,这适用于日志记录,并包装在while循环中:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

但是,我确实发现,将结果交换imap_unordered为会map_async导致执行速度更快,尽管结果对象有所不同。相反,from的结果对象map_async具有_number_left属性和ready()方法:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)

Found an answer myself with some more digging: Taking a look at the __dict__ of the imap_unordered result object, I found it has a _index attribute that increments with each task completion. So this works for logging, wrapped in the while loop:

p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  completed = rs._index
  if (completed == num_tasks): break
  print "Waiting for", num_tasks-completed, "tasks to complete..."
  time.sleep(2)

However, I did find that swapping the imap_unordered for a map_async resulted in much faster execution, though the result object is a bit different. Instead, the result object from map_async has a _number_left attribute, and a ready() method:

p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
  if (rs.ready()): break
  remaining = rs._number_left
  print "Waiting for", remaining, "tasks to complete..."
  time.sleep(0.5)

回答 4

我知道这是一个相当老的问题,但是当我想跟踪python中任务池的进度时,这就是我正在做的事情。

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results

基本上,您将apply_async与callbak一起使用(在这种情况下,它是将返回的值附加到列表中),因此您不必等待做其他事情。然后,在一个while循环中,检查工作进度。在这种情况下,我添加了一个小部件以使其看起来更好。

输出:

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

希望能帮助到你。

I know that this is a rather old question, but here is what I’m doing when I want to track the progression of a pool of tasks in python.

from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep

def my_function(letter):
    sleep(2)
    return letter+letter

dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)

results = []

pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()

r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]

while len(results) != len(dummy_args):
    pbar.update(len(results))
    sleep(0.5)
pbar.finish()

print results

Basically, you use apply_async with a callbak (in this case, it is to append the returned value to a list), so you don’t have to wait to do something else. Then, within a while-loop, you check the progression of the work. In this case, I added a widget to make it look nicer.

The output:

4 of 4                                                                         
['AA', 'BB', 'CC', 'DD']

Hope it helps.


回答 5

如Tim所建议,您可以使用tqdmimap解决此问题。我刚刚偶然发现了这个问题并调整了imap_unordered解决方案,以便可以访问映射结果。运作方式如下:

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

如果您不关心作业返回的值,则无需将列表分配给任何变量。

As suggested by Tim, you can use tqdm and imap to solve this issue. I’ve just stumbled upon this problem and tweaked the imap_unordered solution, so that I can access the results of the mapping. Here’s how it works:

from multiprocessing import Pool
import tqdm

pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))

In case you don’t care about the values returned from your jobs, you don’t need to assign the list to any variable.


回答 6

对于寻求与Pool.apply_async()以下人员合作的简单解决方案的任何人:

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep


def work(x):
    sleep(0.5)
    return x**2

n = 10

p = Pool(4)
pbar = tqdm(total=n)
res = [p.apply_async(work, args=(
    i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [p.get() for p in res]

for anybody looking for a simple solution working with Pool.apply_async():

from multiprocessing import Pool
from tqdm import tqdm
from time import sleep


def work(x):
    sleep(0.5)
    return x**2

n = 10

p = Pool(4)
pbar = tqdm(total=n)
res = [p.apply_async(work, args=(
    i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [p.get() for p in res]

回答 7

我创建了一个自定义类来创建进度打印输出。Maby这有助于:

from multiprocessing import Pool, cpu_count


class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.results.extend(result)
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):
        self.pool.close()
        self.pool.join()

    def get_results(self):
        return self.results

I created a custom class to create a progress printout. Maby this helps:

from multiprocessing import Pool, cpu_count


class ParallelSim(object):
    def __init__(self, processes=cpu_count()):
        self.pool = Pool(processes=processes)
        self.total_processes = 0
        self.completed_processes = 0
        self.results = []

    def add(self, func, args):
        self.pool.apply_async(func=func, args=args, callback=self.complete)
        self.total_processes += 1

    def complete(self, result):
        self.results.extend(result)
        self.completed_processes += 1
        print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))

    def run(self):
        self.pool.close()
        self.pool.join()

    def get_results(self):
        return self.results

回答 8

尝试这种简单的基于队列的方法,该方法也可以与池一起使用。请注意,至少在此特定进度条启动之后,打印任何东西都会导致进度条被移动。(PyPI的进度1.5)

import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)  

    finished = 0
    while finished < n_groups:

        while queue_stat.empty():
            time.sleep(0.01)

        gotten = queue_stat.get()
        if gotten == 'finished':
            finished += 1
        else:
            bar.next()
    bar.finish()


def process_data( queue_data, queue_stat, group):

    for i in group:

        ... do stuff resulting in new_data

        queue_stat.put(1)

    queue_stat.put('finished')  
    queue_data.put(new_data)

def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

    queue_data = multiprocessing.Queue()
    queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

        if i == 0:

            p = multiprocessing.Process(target = status_bar,
                args=(queue_stat,len(groups),len(combined)))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_data, queue_stat, group))
        processes.append(p)
        p.start()

    for i in range(len(groups)):
        data = queue_data.get() 
        new_data += data

    for p in processes:
        p.join()

Try this simple Queue based approach, which can also be used with pooling. Be mindful that printing anything after the initiation of the progress bar will cause it to be moved, at least for this particular progress bar. (PyPI’s progress 1.5)

import time
from progress.bar import Bar

def status_bar( queue_stat, n_groups, n ):

    bar = Bar('progress', max = n)  

    finished = 0
    while finished < n_groups:

        while queue_stat.empty():
            time.sleep(0.01)

        gotten = queue_stat.get()
        if gotten == 'finished':
            finished += 1
        else:
            bar.next()
    bar.finish()


def process_data( queue_data, queue_stat, group):

    for i in group:

        ... do stuff resulting in new_data

        queue_stat.put(1)

    queue_stat.put('finished')  
    queue_data.put(new_data)

def multiprocess():

    new_data = []

    groups = [[1,2,3],[4,5,6],[7,8,9]]
    combined = sum(groups,[])

    queue_data = multiprocessing.Queue()
    queue_stat = multiprocessing.Queue()

    for i, group in enumerate(groups): 

        if i == 0:

            p = multiprocessing.Process(target = status_bar,
                args=(queue_stat,len(groups),len(combined)))
                processes.append(p)
                p.start()

        p = multiprocessing.Process(target = process_data,
        args=(queue_data, queue_stat, group))
        processes.append(p)
        p.start()

    for i in range(len(groups)):
        data = queue_data.get() 
        new_data += data

    for p in processes:
        p.join()