多重处理:使用tqdm显示进度条

问题:多重处理:使用tqdm显示进度条

为了使我的代码更“ Pythonic”和更快,我使用“ multiprocessing”和一个map函数向其发送a)函数和b)迭代范围。

植入的解决方案(即直接在范围tqdm.tqdm(range(0,30))上调用tqdm不适用于多重处理(如以下代码中所述)。

进度条显示为0到100%(当python读取代码时?),但是它并不表示map函数的实际进度。

如何显示进度条以指示“地图”功能在哪一步?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()

欢迎任何帮助或建议…

To make my code more “pythonic” and faster, I use “multiprocessing” and a map function to send it a) the function and b) the range of iterations.

The implanted solution (i.e., call tqdm directly on the range tqdm.tqdm(range(0, 30)) does not work with multiprocessing (as formulated in the code below).

The progress bar is displayed from 0 to 100% (when python reads the code?) but it does not indicate the actual progress of the map function.

How to display a progress bar that indicates at which step the ‘map’ function is ?

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   p = Pool(2)
   r = p.map(_foo, tqdm.tqdm(range(0, 30)))
   p.close()
   p.join()

Any help or suggestions are welcome…


回答 0

使用imap代替map,它返回已处理值的迭代器。

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))

Use imap instead of map, which returns an iterator of processed values.

from multiprocessing import Pool
import tqdm
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   with Pool(2) as p:
      r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))

回答 1

找到的解决方案:注意!由于进行了多处理,估计时间(每个循环的迭代次数,总时间等)可能不稳定,但是进度条可以正常工作。

注意:Pool的上下文管理器仅在Python 3.3版中可用

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))):
                pbar.update()

Solution Found : Be careful! Due to multiprocessing, estimation time (iteration per loop, total time, etc.) could be unstable, but the progress bar works perfectly.

Note: Context manager for Pool is only available from Python version 3.3

from multiprocessing import Pool
import time
from tqdm import *

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
    with Pool(processes=2) as p:
        max_ = 30
        with tqdm(total=max_) as pbar:
            for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))):
                pbar.update()

回答 2

您可以p_tqdm改用。

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))

You can use p_tqdm instead.

https://github.com/swansonk14/p_tqdm

from p_tqdm import p_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = p_map(_foo, list(range(0, 30)))

回答 3

很抱歉,迟到了,但是如果您需要的是并发映射,那么最新版本(tqdm>=4.42.0)现在具有以下内置功能:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

参考:https : //tqdm.github.io/docs/contrib.concurrent/https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

Sorry for being late but if all you need is a concurrent map, I added this functionality in tqdm>=4.42.0:

from tqdm.contrib.concurrent import process_map  # or thread_map
import time

def _foo(my_number):
   square = my_number * my_number
   time.sleep(1)
   return square 

if __name__ == '__main__':
   r = process_map(_foo, range(0, 30), max_workers=2)

References: https://tqdm.github.io/docs/contrib.concurrent/ and https://github.com/tqdm/tqdm/blob/master/examples/parallel_bars.py

It supports max_workers and chunksize and you can also easily switch from process_map to thread_map.


回答 4

根据XaviMartínez的回答,我编写了该函数imap_unordered_bar。可以imap_unordered与显示处理栏的唯一区别相同的方式使用它。

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))

based on the answer of Xavi Martínez I wrote the function imap_unordered_bar. It can be used in the same way as imap_unordered with the only difference that a processing bar is shown.

from multiprocessing import Pool
import time
from tqdm import *

def imap_unordered_bar(func, args, n_processes = 2):
    p = Pool(n_processes)
    res_list = []
    with tqdm(total = len(args)) as pbar:
        for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
            pbar.update()
            res_list.append(res)
    pbar.close()
    p.close()
    p.join()
    return res_list

def _foo(my_number):
    square = my_number * my_number
    time.sleep(1)
    return square 

if __name__ == '__main__':
    result = imap_unordered_bar(_foo, range(5))

回答 5

import multiprocessing as mp
import tqdm


some_iterable = ...

def some_func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(mp.cpu_count()-2) as p:
        list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))
import multiprocessing as mp
import tqdm


some_iterable = ...

def some_func():
    # your logic
    ...


if __name__ == '__main__':
    with mp.Pool(mp.cpu_count()-2) as p:
        list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))

回答 6

当需要从并行执行函数中获取结果时,这是我的看法。这个函数可以做一些事情(我的另一篇文章对此做了进一步解释),但是关键是有一个任务待处理队列和一个任务完成队列。当工作人员完成挂起队列中的每个任务时,他们会将结果添加到任务完成队列中。您可以使用tqdm进度条将检查包装到任务完成队列中。我没有在这里放置do_work()函数的实现,这无关紧要,因为这里的消息是监视已完成任务的队列并在每次输入结果时更新进度条。

def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})
        tasks_pending.put(expanded_job)

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
    tasks_pending.put(SENTINEL)

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)
    processes.append(p)
    p.start()

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
        bar.update(completed_tasks_counter)

for p in processes:
    p.join()

return results

Here is my take for when you need to get results back from your parallel executing functions. This function does a few things (there is another post of mine that explains it further) but the key point is that there is a tasks pending queue and a tasks completed queue. As workers are done with each task in the pending queue they add the results in the tasks completed queue. You can wrap the check to the tasks completed queue with the tqdm progress bar. I am not putting the implementation of the do_work() function here, it is not relevant, as the message here is to monitor the tasks completed queue and update the progress bar every time a result is in.

def par_proc(job_list, num_cpus=None, verbose=False):

# Get the number of cores
if not num_cpus:
    num_cpus = psutil.cpu_count(logical=False)

print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))

# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()

# Gather processes and results here
processes = []
results = []

# Count tasks
num_tasks = 0

# Add the tasks to the queue
for job in job_list:
    for task in job['tasks']:
        expanded_job = {}
        num_tasks = num_tasks + 1
        expanded_job.update({'func': pickle.dumps(job['func'])})
        expanded_job.update({'task': task})
        tasks_pending.put(expanded_job)

# Set the number of workers here
num_workers = min(num_cpus, num_tasks)

# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
    tasks_pending.put(SENTINEL)

print('* Number of tasks: {}'.format(num_tasks))

# Set-up and start the workers
for c in range(num_workers):
    p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
    p.name = 'worker' + str(c)
    processes.append(p)
    p.start()

# Gather the results
completed_tasks_counter = 0

with tqdm(total=num_tasks) as bar:
    while completed_tasks_counter < num_tasks:
        results.append(tasks_completed.get())
        completed_tasks_counter = completed_tasks_counter + 1
        bar.update(completed_tasks_counter)

for p in processes:
    p.join()

return results

回答 7

这种方法简单有效。

from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm

def job():
    time.sleep(1)
    pbar.update()

pool = ThreadPool(5)
with tqdm(total=100) as pbar:
    for i in range(100):
        pool.apply_async(job)
    pool.close()
    pool.join()

This approach simple and it works.

from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm

def job():
    time.sleep(1)
    pbar.update()

pool = ThreadPool(5)
with tqdm(total=100) as pbar:
    for i in range(100):
        pool.apply_async(job)
    pool.close()
    pool.join()