问题:显示Python多处理池imap_unordered调用的进度?
我有一个脚本可以通过imap_unordered()
调用成功完成多处理池任务集:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
p.join() # Wait for completion
但是,我num_tasks
大约是250,000,因此join()
锁将主线程锁定了10秒钟左右,我希望能够逐步回显命令行以显示主进程未锁定。就像是:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
remaining = rs.tasks_remaining() # How many of the map call haven't been done yet?
if (remaining == 0): break # Jump out of while loop
print "Waiting for", remaining, "tasks to complete..."
time.sleep(2)
是否有用于结果对象或池本身的方法来指示剩余任务数?我尝试使用multiprocessing.Value
对象作为计数器(在完成任务后do_work
调用counter.value += 1
操作),但是在停止递增之前,计数器只能达到总值的〜85%。
回答 0
无需访问结果集的私有属性:
from __future__ import division
import sys
for i, _ in enumerate(p.imap_unordered(do_work, xrange(num_tasks)), 1):
sys.stderr.write('\rdone {0:%}'.format(i/num_tasks))
回答 1
我个人的最爱-在事物并行运行和提交时为您提供一个不错的进度条和完成ETA。
from multiprocessing import Pool
import tqdm
pool = Pool(processes=8)
for _ in tqdm.tqdm(pool.imap_unordered(do_work, tasks), total=len(tasks)):
pass
回答 2
我发现在尝试检查进度时,该工作已经完成。这就是使用tqdm对我有用的。
pip install tqdm
from multiprocessing import Pool
from tqdm import tqdm
tasks = range(5)
pool = Pool()
pbar = tqdm(total=len(tasks))
def do_work(x):
# do something with x
pbar.update(1)
pool.imap_unordered(do_work, tasks)
pool.close()
pool.join()
pbar.close()
这应该适用于所有类型的多处理,无论它们是否阻塞。
回答 3
找到了答案我自己有一些挖:以一看__dict__
的的imap_unordered
结果对象,我发现它有一个_index
属性,它与每个任务完成的增量。因此,这适用于日志记录,并包装在while
循环中:
p = multiprocessing.Pool()
rs = p.imap_unordered(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
completed = rs._index
if (completed == num_tasks): break
print "Waiting for", num_tasks-completed, "tasks to complete..."
time.sleep(2)
但是,我确实发现,将结果交换map_async
具有_number_left
属性和ready()
方法:
p = multiprocessing.Pool()
rs = p.map_async(do_work, xrange(num_tasks))
p.close() # No more work
while (True):
if (rs.ready()): break
remaining = rs._number_left
print "Waiting for", remaining, "tasks to complete..."
time.sleep(0.5)
回答 4
我知道这是一个相当老的问题,但是当我想跟踪python中任务池的进度时,这就是我正在做的事情。
from progressbar import ProgressBar, SimpleProgress
import multiprocessing as mp
from time import sleep
def my_function(letter):
sleep(2)
return letter+letter
dummy_args = ["A", "B", "C", "D"]
pool = mp.Pool(processes=2)
results = []
pbar = ProgressBar(widgets=[SimpleProgress()], maxval=len(dummy_args)).start()
r = [pool.apply_async(my_function, (x,), callback=results.append) for x in dummy_args]
while len(results) != len(dummy_args):
pbar.update(len(results))
sleep(0.5)
pbar.finish()
print results
基本上,您将apply_async与callbak一起使用(在这种情况下,它是将返回的值附加到列表中),因此您不必等待做其他事情。然后,在一个while循环中,检查工作进度。在这种情况下,我添加了一个小部件以使其看起来更好。
输出:
4 of 4
['AA', 'BB', 'CC', 'DD']
希望能帮助到你。
回答 5
如Tim所建议,您可以使用tqdm
和imap
解决此问题。我刚刚偶然发现了这个问题并调整了imap_unordered
解决方案,以便可以访问映射结果。运作方式如下:
from multiprocessing import Pool
import tqdm
pool = multiprocessing.Pool(processes=4)
mapped_values = list(tqdm.tqdm(pool.imap_unordered(do_work, range(num_tasks)), total=len(values)))
如果您不关心作业返回的值,则无需将列表分配给任何变量。
回答 6
对于寻求与Pool.apply_async()
以下人员合作的简单解决方案的任何人:
from multiprocessing import Pool
from tqdm import tqdm
from time import sleep
def work(x):
sleep(0.5)
return x**2
n = 10
p = Pool(4)
pbar = tqdm(total=n)
res = [p.apply_async(work, args=(
i,), callback=lambda _: pbar.update(1)) for i in range(n)]
results = [p.get() for p in res]
回答 7
我创建了一个自定义类来创建进度打印输出。Maby这有助于:
from multiprocessing import Pool, cpu_count
class ParallelSim(object):
def __init__(self, processes=cpu_count()):
self.pool = Pool(processes=processes)
self.total_processes = 0
self.completed_processes = 0
self.results = []
def add(self, func, args):
self.pool.apply_async(func=func, args=args, callback=self.complete)
self.total_processes += 1
def complete(self, result):
self.results.extend(result)
self.completed_processes += 1
print('Progress: {:.2f}%'.format((self.completed_processes/self.total_processes)*100))
def run(self):
self.pool.close()
self.pool.join()
def get_results(self):
return self.results
回答 8
尝试这种简单的基于队列的方法,该方法也可以与池一起使用。请注意,至少在此特定进度条启动之后,打印任何东西都会导致进度条被移动。(PyPI的进度1.5)
import time
from progress.bar import Bar
def status_bar( queue_stat, n_groups, n ):
bar = Bar('progress', max = n)
finished = 0
while finished < n_groups:
while queue_stat.empty():
time.sleep(0.01)
gotten = queue_stat.get()
if gotten == 'finished':
finished += 1
else:
bar.next()
bar.finish()
def process_data( queue_data, queue_stat, group):
for i in group:
... do stuff resulting in new_data
queue_stat.put(1)
queue_stat.put('finished')
queue_data.put(new_data)
def multiprocess():
new_data = []
groups = [[1,2,3],[4,5,6],[7,8,9]]
combined = sum(groups,[])
queue_data = multiprocessing.Queue()
queue_stat = multiprocessing.Queue()
for i, group in enumerate(groups):
if i == 0:
p = multiprocessing.Process(target = status_bar,
args=(queue_stat,len(groups),len(combined)))
processes.append(p)
p.start()
p = multiprocessing.Process(target = process_data,
args=(queue_data, queue_stat, group))
processes.append(p)
p.start()
for i in range(len(groups)):
data = queue_data.get()
new_data += data
for p in processes:
p.join()