问题:多重处理:使用tqdm显示进度条
为了使我的代码更“ Pythonic”和更快,我使用“ multiprocessing”和一个map函数向其发送a)函数和b)迭代范围。
植入的解决方案(即直接在范围tqdm.tqdm(range(0,30))上调用tqdm不适用于多重处理(如以下代码中所述)。
进度条显示为0到100%(当python读取代码时?),但是它并不表示map函数的实际进度。
如何显示进度条以指示“地图”功能在哪一步?
from multiprocessing import Pool
import tqdm
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
p = Pool(2)
r = p.map(_foo, tqdm.tqdm(range(0, 30)))
p.close()
p.join()
欢迎任何帮助或建议…
回答 0
使用imap代替map,它返回已处理值的迭代器。
from multiprocessing import Pool
import tqdm
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
with Pool(2) as p:
r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
回答 1
找到的解决方案:注意!由于进行了多处理,估计时间(每个循环的迭代次数,总时间等)可能不稳定,但是进度条可以正常工作。
注意:Pool的上下文管理器仅在Python 3.3版中可用
from multiprocessing import Pool
import time
from tqdm import *
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
with Pool(processes=2) as p:
max_ = 30
with tqdm(total=max_) as pbar:
for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))):
pbar.update()
回答 2
您可以p_tqdm
改用。
https://github.com/swansonk14/p_tqdm
from p_tqdm import p_map
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
r = p_map(_foo, list(range(0, 30)))
回答 3
很抱歉,迟到了,但是如果您需要的是并发映射,那么最新版本(tqdm>=4.42.0
)现在具有以下内置功能:
from tqdm.contrib.concurrent import process_map # or thread_map
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
r = process_map(_foo, range(0, 30), max_workers=2)
参考:https : //tqdm.github.io/docs/contrib.concurrent/和https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py
回答 4
根据XaviMartínez的回答,我编写了该函数imap_unordered_bar
。可以imap_unordered
与显示处理栏的唯一区别相同的方式使用它。
from multiprocessing import Pool
import time
from tqdm import *
def imap_unordered_bar(func, args, n_processes = 2):
p = Pool(n_processes)
res_list = []
with tqdm(total = len(args)) as pbar:
for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
pbar.update()
res_list.append(res)
pbar.close()
p.close()
p.join()
return res_list
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
result = imap_unordered_bar(_foo, range(5))
回答 5
import multiprocessing as mp
import tqdm
some_iterable = ...
def some_func():
# your logic
...
if __name__ == '__main__':
with mp.Pool(mp.cpu_count()-2) as p:
list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))
回答 6
当需要从并行执行函数中获取结果时,这是我的看法。这个函数可以做一些事情(我的另一篇文章对此做了进一步解释),但是关键是有一个任务待处理队列和一个任务完成队列。当工作人员完成挂起队列中的每个任务时,他们会将结果添加到任务完成队列中。您可以使用tqdm进度条将检查包装到任务完成队列中。我没有在这里放置do_work()函数的实现,这无关紧要,因为这里的消息是监视已完成任务的队列并在每次输入结果时更新进度条。
def par_proc(job_list, num_cpus=None, verbose=False):
# Get the number of cores
if not num_cpus:
num_cpus = psutil.cpu_count(logical=False)
print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))
# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()
# Gather processes and results here
processes = []
results = []
# Count tasks
num_tasks = 0
# Add the tasks to the queue
for job in job_list:
for task in job['tasks']:
expanded_job = {}
num_tasks = num_tasks + 1
expanded_job.update({'func': pickle.dumps(job['func'])})
expanded_job.update({'task': task})
tasks_pending.put(expanded_job)
# Set the number of workers here
num_workers = min(num_cpus, num_tasks)
# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
tasks_pending.put(SENTINEL)
print('* Number of tasks: {}'.format(num_tasks))
# Set-up and start the workers
for c in range(num_workers):
p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
p.name = 'worker' + str(c)
processes.append(p)
p.start()
# Gather the results
completed_tasks_counter = 0
with tqdm(total=num_tasks) as bar:
while completed_tasks_counter < num_tasks:
results.append(tasks_completed.get())
completed_tasks_counter = completed_tasks_counter + 1
bar.update(completed_tasks_counter)
for p in processes:
p.join()
return results
回答 7
这种方法简单有效。
from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm
def job():
time.sleep(1)
pbar.update()
pool = ThreadPool(5)
with tqdm(total=100) as pbar:
for i in range(100):
pool.apply_async(job)
pool.close()
pool.join()