To make my code more “pythonic” and faster, I use “multiprocessing” and a map function to send it a) the function and b) the range of iterations.
The implanted solution (i.e., call tqdm directly on the range tqdm.tqdm(range(0, 30)) does not work with multiprocessing (as formulated in the code below).
The progress bar is displayed from 0 to 100% (when python reads the code?) but it does not indicate the actual progress of the map function.
How to display a progress bar that indicates at which step the ‘map’ function is ?
from multiprocessing import Pool
import tqdm
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
p = Pool(2)
r = p.map(_foo, tqdm.tqdm(range(0, 30)))
p.close()
p.join()
Any help or suggestions are welcome…
回答 0
使用imap代替map,它返回已处理值的迭代器。
from multiprocessing importPoolimport tqdmimport timedef _foo(my_number):
square = my_number * my_number
time.sleep(1)return square if __name__ =='__main__':withPool(2)as p:
r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
Use imap instead of map, which returns an iterator of processed values.
from multiprocessing import Pool
import tqdm
import time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
with Pool(2) as p:
r = list(tqdm.tqdm(p.imap(_foo, range(30)), total=30))
Solution Found : Be careful! Due to multiprocessing, estimation time (iteration per loop, total time, etc.) could be unstable, but the progress bar works perfectly.
Note: Context manager for Pool is only available from Python version 3.3
from multiprocessing import Pool
import time
from tqdm import *
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
with Pool(processes=2) as p:
max_ = 30
with tqdm(total=max_) as pbar:
for i, _ in enumerate(p.imap_unordered(_foo, range(0, max_))):
pbar.update()
from tqdm.contrib.concurrent import process_map # or thread_mapimport time
def _foo(my_number):
square = my_number * my_number
time.sleep(1)return square
if __name__ =='__main__':
r = process_map(_foo, range(0,30), max_workers=2)
based on the answer of Xavi Martínez I wrote the function imap_unordered_bar. It can be used in the same way as imap_unordered with the only difference that a processing bar is shown.
from multiprocessing import Pool
import time
from tqdm import *
def imap_unordered_bar(func, args, n_processes = 2):
p = Pool(n_processes)
res_list = []
with tqdm(total = len(args)) as pbar:
for i, res in tqdm(enumerate(p.imap_unordered(func, args))):
pbar.update()
res_list.append(res)
pbar.close()
p.close()
p.join()
return res_list
def _foo(my_number):
square = my_number * my_number
time.sleep(1)
return square
if __name__ == '__main__':
result = imap_unordered_bar(_foo, range(5))
回答 5
import multiprocessing as mp
import tqdm
some_iterable =...def some_func():# your logic...if __name__ =='__main__':with mp.Pool(mp.cpu_count()-2)as p:
list(tqdm.tqdm(p.imap(some_func, iterable), total=len(iterable)))
def par_proc(job_list, num_cpus=None, verbose=False):# Get the number of coresifnot num_cpus:
num_cpus = psutil.cpu_count(logical=False)print('* Parallel processing')print('* Running on {} cores'.format(num_cpus))# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()# Gather processes and results here
processes =[]
results =[]# Count tasks
num_tasks =0# Add the tasks to the queuefor job in job_list:for task in job['tasks']:
expanded_job ={}
num_tasks = num_tasks +1
expanded_job.update({'func': pickle.dumps(job['func'])})
expanded_job.update({'task': task})
tasks_pending.put(expanded_job)# Set the number of workers here
num_workers = min(num_cpus, num_tasks)# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more# work left to be done.for c in range(num_workers):
tasks_pending.put(SENTINEL)print('* Number of tasks: {}'.format(num_tasks))# Set-up and start the workersfor c in range(num_workers):
p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
p.name ='worker'+ str(c)
processes.append(p)
p.start()# Gather the results
completed_tasks_counter =0with tqdm(total=num_tasks)as bar:while completed_tasks_counter < num_tasks:
results.append(tasks_completed.get())
completed_tasks_counter = completed_tasks_counter +1
bar.update(completed_tasks_counter)for p in processes:
p.join()return results
Here is my take for when you need to get results back from your parallel executing functions. This function does a few things (there is another post of mine that explains it further) but the key point is that there is a tasks pending queue and a tasks completed queue. As workers are done with each task in the pending queue they add the results in the tasks completed queue. You can wrap the check to the tasks completed queue with the tqdm progress bar. I am not putting the implementation of the do_work() function here, it is not relevant, as the message here is to monitor the tasks completed queue and update the progress bar every time a result is in.
def par_proc(job_list, num_cpus=None, verbose=False):
# Get the number of cores
if not num_cpus:
num_cpus = psutil.cpu_count(logical=False)
print('* Parallel processing')
print('* Running on {} cores'.format(num_cpus))
# Set-up the queues for sending and receiving data to/from the workers
tasks_pending = mp.Queue()
tasks_completed = mp.Queue()
# Gather processes and results here
processes = []
results = []
# Count tasks
num_tasks = 0
# Add the tasks to the queue
for job in job_list:
for task in job['tasks']:
expanded_job = {}
num_tasks = num_tasks + 1
expanded_job.update({'func': pickle.dumps(job['func'])})
expanded_job.update({'task': task})
tasks_pending.put(expanded_job)
# Set the number of workers here
num_workers = min(num_cpus, num_tasks)
# We need as many sentinels as there are worker processes so that ALL processes exit when there is no more
# work left to be done.
for c in range(num_workers):
tasks_pending.put(SENTINEL)
print('* Number of tasks: {}'.format(num_tasks))
# Set-up and start the workers
for c in range(num_workers):
p = mp.Process(target=do_work, args=(tasks_pending, tasks_completed, verbose))
p.name = 'worker' + str(c)
processes.append(p)
p.start()
# Gather the results
completed_tasks_counter = 0
with tqdm(total=num_tasks) as bar:
while completed_tasks_counter < num_tasks:
results.append(tasks_completed.get())
completed_tasks_counter = completed_tasks_counter + 1
bar.update(completed_tasks_counter)
for p in processes:
p.join()
return results
回答 7
这种方法简单有效。
from multiprocessing.pool importThreadPoolimport time
from tqdm import tqdm
def job():
time.sleep(1)
pbar.update()
pool =ThreadPool(5)with tqdm(total=100)as pbar:for i in range(100):
pool.apply_async(job)
pool.close()
pool.join()
from multiprocessing.pool import ThreadPool
import time
from tqdm import tqdm
def job():
time.sleep(1)
pbar.update()
pool = ThreadPool(5)
with tqdm(total=100) as pbar:
for i in range(100):
pool.apply_async(job)
pool.close()
pool.join()
The end= keyword is what does the work here — by default, print() ends in a newline (\n) character, but this can be replaced with a different string. In this case, ending the line with a carriage return instead returns the cursor to the start of the current line. Thus, there’s no need to import the sys module for this sort of simple usage. print() actually has a number of keyword arguments which can be used to greatly simplify code.
To use the same code on Python 2.6+, put the following line at the top of the file:
If all you want to do is change a single line, use \r. \r means carriage return. It’s effect is solely to put the caret back at the start of the current line. It does not erase anything. Similarly, \b can be used to go one character backward. (some terminals may not support all those features)
Here’s my little class that can reprint blocks of text. It properly clears the previous text so you can overwrite your old text with shorter new text without creating a mess.
import re, sys
class Reprinter:
def __init__(self):
self.text = ''
def moveup(self, lines):
for _ in range(lines):
sys.stdout.write("\x1b[A")
def reprint(self, text):
# Clear previous text by overwritig non-spaces with spaces
self.moveup(self.text.count("\n"))
sys.stdout.write(re.sub(r"[^\s]", " ", self.text))
# Print new text
lines = min(self.text.count("\n"), text.count("\n"))
self.moveup(lines)
sys.stdout.write(text)
self.text = text
reprinter = Reprinter()
reprinter.reprint("Foobar\nBazbar")
reprinter.reprint("Foo\nbar")
#works in spyder ipython console - \r at start of string , end=""import time
import sys
for i in range(20):
time.sleep(0.5)print(f"\rnumber{i}",end="")
sys.stdout.flush()
#works in spyder ipython console - \r at start of string , end=""
import time
import sys
for i in range(20):
time.sleep(0.5)
print(f"\rnumber{i}",end="")
sys.stdout.flush()
回答 7
要超越python中的前一行,您需要做的就是在打印函数中添加end =’\ r’,请测试以下示例:
import time
for j in range(1,5):print('waiting : '+j, end='\r')
time.sleep(1)
classtqdm():
""" Decorate an iterable object, returning an iterator which acts exactly like the original iterable, but prints a dynamically updating progressbar every time a value is requested. """def__init__(self, iterable=None, desc=None, total=None, leave=True,
file=None, ncols=None, mininterval=0.1,
maxinterval=10.0, miniters=None, ascii=None, disable=False,
unit='it', unit_scale=False, dynamic_ncols=False,
smoothing=0.3, bar_format=None, initial=0, position=None,
postfix=None, unit_divisor=1000):
classtqdm():
defupdate(self, n=1):
""" Manually update the progress bar, useful for streams such as reading files. E.g.: >>> t = tqdm(total=filesize) # Initialise >>> for current_buffer in stream: ... ... ... t.update(len(current_buffer)) >>> t.close() The last line is highly recommended, but possibly not necessary if ``t.update()`` will be called in such a way that ``filesize`` will be exactly reached and printed. Parameters ---------- n : int or float, optional Increment to add to the internal counter of iterations [default: 1]. If using float, consider specifying ``{n:.3f}`` or similar in ``bar_format``, or specifying ``unit_scale``. Returns ------- out : bool or None True if a ``display()`` was triggered. """defclose(self):
"""Cleanup and (if leave=False) close the progressbar."""defclear(self, nomove=False):
"""Clear current bar display."""defrefresh(self):
""" Force refresh the display of this bar. Parameters ---------- nolock : bool, optional If ``True``, does not lock. If [default: ``False``]: calls ``acquire()`` on internal lock. lock_args : tuple, optional Passed to internal lock's ``acquire()``. If specified, will only ``display()`` if ``acquire()`` returns ``True``. """defunpause(self):
"""Restart tqdm timer from last print time."""defreset(self, total=None):
""" Resets to 0 iterations for repeated use. Consider combining with ``leave=True``. Parameters ---------- total : int or float, optional. Total to use for the new bar. """defset_description(self, desc=None, refresh=True):
""" Set/modify description of the progress bar. Parameters ---------- desc : str, optional refresh : bool, optional Forces refresh [default: True]. """defset_postfix(self, ordered_dict=None, refresh=True, **tqdm_kwargs):
""" Set/modify postfix (additional stats) with automatic formatting based on datatype. Parameters ---------- ordered_dict : dict or OrderedDict, optional refresh : bool, optional Forces refresh [default: True]. kwargs : dict, optional """@classmethoddefwrite(cls, s, file=sys.stdout, end="\n"):
"""Print a message via tqdm (without overlap with bars)."""@propertydefformat_dict(self):
"""Public API for read-only member access."""defdisplay(self, msg=None, pos=None):
""" Use ``self.sp`` to display ``msg`` in the specified ``pos``. Consider overloading this function when inheriting to use e.g.: ``self.some_frontend(**self.format_dict)`` instead of ``self.sp``. Parameters ---------- msg : str, optional. What to display (default: ``repr(self)``). pos : int, optional. Position to ``moveto`` (default: ``abs(self.pos)``). """@classmethod@contextmanagerdefwrapattr(cls, stream, method, total=None, bytes=True, **tqdm_kwargs):
""" stream : file-like object. method : str, "read" or "write". The result of ``read()`` and the first argument of ``write()`` should have a ``len()``. >>> with tqdm.wrapattr(file_obj, "read", total=file_obj.size) as fobj: ... while True: ... chunk = fobj.read(chunk_size) ... if not chunk: ... break """@classmethoddefpandas(cls, *targs, **tqdm_kwargs):
"""Registers the current `tqdm` class with `pandas`."""deftrange(*args, **tqdm_kwargs):
""" A shortcut for `tqdm(xrange(*args), **tqdm_kwargs)`. On Python3+, `range` is used instead of `xrange`. """
fromtqdmimporttqdm, trangefromrandomimportrandom, randintfromtimeimportsleepwithtrange(10) ast:
foriint:
# Description will be displayed on the leftt.set_description('GEN %i'%i)
# Postfix will be displayed on the right,# formatted automatically based on argument's datatypet.set_postfix(loss=random(), gen=randint(1,999), str='h',
lst=[1, 2])
sleep(0.1)
withtqdm(total=10, bar_format="{postfix[0]} {postfix[1][value]:>8.2g}",
postfix=["Batch", dict(value=0)]) ast:
foriinrange(10):
sleep(0.1)
t.postfix[1]["value"] =i/2t.update()
importpandasaspdimportnumpyasnpfromtqdmimporttqdmdf=pd.DataFrame(np.random.randint(0, 100, (100000, 6)))
# Register `pandas.progress_apply` and `pandas.Series.map_apply` with `tqdm`# (can use `tqdm.gui.tqdm`, `tqdm.notebook.tqdm`, optional kwargs, etc.)tqdm.pandas(desc="my bar!")
# Now you can use `progress_apply` instead of `apply`# and `progress_map` instead of `map`df.progress_apply(lambdax: x**2)
# can also groupby:# df.groupby(0).progress_apply(lambda x: x**2)
fromtqdm.daskimportTqdmCallbackwithTqdmCallback(desc="compute"):
...
arr.compute()
# or use callback globallycb=TqdmCallback(desc="global")
cb.register()
arr.compute()
# different celliterable=range(100)
pbar.reset(total=len(iterable)) # initialise with new `total`foriiniterable:
pbar.update()
pbar.refresh() # force print final status but don't `close()`
fromtqdmimporttqdmimportos.pathdeffind_files_recursively(path, show_progress=True):
files= []
# total=1 assumes `path` is a filet=tqdm(total=1, unit="file", disable=notshow_progress)
ifnotos.path.exists(path):
raiseIOError("Cannot find:"+path)
defappend_found_file(f):
files.append(f)
t.update()
deflist_found_dir(path):
"""returns os.listdir(path) assuming os.path.isdir(path)"""listing=os.listdir(path)
# subtract 1 since a "file" we found was actually this directoryt.total+=len(listing) -1# fancy way to give info without forcing a refresht.set_postfix(dir=path[-10:], refresh=False)
t.update(0) # may trigger a refreshreturnlistingdefrecursively_search(path):
ifos.path.isdir(path):
forfinlist_found_dir(path):
recursively_search(os.path.join(path, f))
else:
append_found_file(path)
recursively_search(path)
t.set_postfix(dir=path)
t.close()
returnfiles
fromtqdm.autoimporttqdm, trangefromtimeimportsleepbar=trange(10)
foriinbar:
# Print using tqdm class method .write()sleep(0.1)
ifnot (i%3):
tqdm.write("Done task %i"%i)
# Can also use bar.write()
fromrich.consoleimportConsolefromrich.syntaximportSyntaxmy_code='''def iter_first_last(values: Iterable[T]) -> Iterable[Tuple[bool, bool, T]]: """Iterate and generate a tuple with a flag for first and last value.""" iter_values = iter(values) try: previous_value = next(iter_values) except StopIteration: return first = True for value in iter_values: yield first, False, previous_value first = False previous_value = value yield first, True, previous_value'''syntax=Syntax(my_code, "python", theme="monokai", line_numbers=True)
console=Console()
console.print(syntax)