熊猫行动中的进度指示器

问题:熊猫行动中的进度指示器

我定期对超过1500万行的数据帧执行熊猫操作,我很乐意能够访问特定操作的进度指示器。

是否存在基于文本的熊猫拆分应用合并操作进度指示器?

例如,类似:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

其中feature_rollup包含一些DF列并通过各种方法创建新用户列的函数。对于大型数据帧,这些操作可能需要一段时间,因此我想知道是否有可能在iPython笔记本中提供基于文本的输出,从而使我了解进度。

到目前为止,我已经尝试了Python的规范循环进度指示器,但是它们并未以任何有意义的方式与熊猫互动。

我希望pandas库/文档中有一些被我忽略的东西,它使人们知道了split-apply-combine的进度。一个简单的实现方法可能是查看apply功能在其上起作用的数据帧子集的总数,并将进度报告为这些子集的完成部分。

这是否可能需要添加到库中?

I regularly perform pandas operations on data frames in excess of 15 million or so rows and I’d love to have access to a progress indicator for particular operations.

Does a text based progress indicator for pandas split-apply-combine operations exist?

For example, in something like:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

where feature_rollup is a somewhat involved function that take many DF columns and creates new user columns through various methods. These operations can take a while for large data frames so I’d like to know if it is possible to have text based output in an iPython notebook that updates me on the progress.

So far, I’ve tried canonical loop progress indicators for Python but they don’t interact with pandas in any meaningful way.

I’m hoping there’s something I’ve overlooked in the pandas library/documentation that allows one to know the progress of a split-apply-combine. A simple implementation would maybe look at the total number of data frame subsets upon which the apply function is working and report progress as the completed fraction of those subsets.

Is this perhaps something that needs to be added to the library?


回答 0

由于需求旺盛,tqdm已增加了对的支持pandas。与其他答案不同,这不会明显降低熊猫的速度 -这是以下示例DataFrameGroupBy.progress_apply

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))

# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

如果您对它的工作方式(以及如何为自己的回调进行修改)感兴趣,请参阅github上示例pypi 完整文档或导入模块并运行help(tqdm)

编辑


要直接回答原始问题,请替换为:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

与:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

注意:tqdm <= v4.8:对于低于4.8的tqdm版本,tqdm.pandas()您不必执行以下操作:

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())

Due to popular demand, tqdm has added support for pandas. Unlike the other answers, this will not noticeably slow pandas down — here’s an example for DataFrameGroupBy.progress_apply:

import pandas as pd
import numpy as np
from tqdm import tqdm
# from tqdm.auto import tqdm  # for notebooks

df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))

# Create and register a new `tqdm` instance with `pandas`
# (can use tqdm_gui, optional kwargs, etc.)
tqdm.pandas()

# Now you can use `progress_apply` instead of `apply`
df.groupby(0).progress_apply(lambda x: x**2)

In case you’re interested in how this works (and how to modify it for your own callbacks), see the examples on github, the full documentation on pypi, or import the module and run help(tqdm). Other supported functions include map, applymap, aggregate, and transform.

EDIT


To directly answer the original question, replace:

df_users.groupby(['userID', 'requestDate']).apply(feature_rollup)

with:

from tqdm import tqdm
tqdm.pandas()
df_users.groupby(['userID', 'requestDate']).progress_apply(feature_rollup)

Note: tqdm <= v4.8: For versions of tqdm below 4.8, instead of tqdm.pandas() you had to do:

from tqdm import tqdm, tqdm_pandas
tqdm_pandas(tqdm())

回答 1

调整Jeff的答案(并将其作为可重用函数)。

def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res

注意:应用进度百分比会内联更新。如果您的函数标准输出,则将无法正常工作。

In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...

像往常一样,您可以将其作为方法添加到groupby对象中:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...

正如评论中提到的那样,这不是熊猫要实现的功能。但是python允许您为许多熊猫对象/方法创建这些(这样做将需要很多工作…尽管您应该能够概括这种方法)。

To tweak Jeff’s answer (and have this as a reuseable function).

def logged_apply(g, func, *args, **kwargs):
    step_percentage = 100. / len(g)
    import sys
    sys.stdout.write('apply progress:   0%')
    sys.stdout.flush()

    def logging_decorator(func):
        def wrapper(*args, **kwargs):
            progress = wrapper.count * step_percentage
            sys.stdout.write('\033[D \033[D' * 4 + format(progress, '3.0f') + '%')
            sys.stdout.flush()
            wrapper.count += 1
            return func(*args, **kwargs)
        wrapper.count = 0
        return wrapper

    logged_func = logging_decorator(func)
    res = g.apply(logged_func, *args, **kwargs)
    sys.stdout.write('\033[D \033[D' * 4 + format(100., '3.0f') + '%' + '\n')
    sys.stdout.flush()
    return res

Note: the apply progress percentage updates inline. If your function stdouts then this won’t work.

In [11]: g = df_users.groupby(['userID', 'requestDate'])

In [12]: f = feature_rollup

In [13]: logged_apply(g, f)
apply progress: 100%
Out[13]: 
...

As usual you can add this to your groupby objects as a method:

from pandas.core.groupby import DataFrameGroupBy
DataFrameGroupBy.logged_apply = logged_apply

In [21]: g.logged_apply(f)
apply progress: 100%
Out[21]: 
...

As mentioned in the comments, this isn’t a feature that core pandas would be interested in implementing. But python allows you to create these for many pandas objects/methods (doing so would be quite a bit of work… although you should be able to generalise this approach).


回答 2

如果您需要了解如何在Jupyter / IPython的笔记本使用此支持,像我一样,这里是一个有益的指导和源相关文章

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

请注意的import语句中的下划线_tqdm_notebook。正如所引用的文章所提到的,开发处于beta后期。

In case you need support for how to use this in a Jupyter/ipython notebook, as I did, here’s a helpful guide and source to relevant article:

from tqdm._tqdm_notebook import tqdm_notebook
import pandas as pd
tqdm_notebook.pandas()
df = pd.DataFrame(np.random.randint(0, int(1e8), (10000, 1000)))
df.groupby(0).progress_apply(lambda x: x**2)

Note the underscore in the import statement for _tqdm_notebook. As referenced article mentions, development is in late beta stage.


回答 3

对于希望在自己的自定义并行熊猫应用代码上应用tqdm的任何人。

(多年来,我尝试了一些用于并行化的库,但是我从来没有找到一个100%并行化解决方案,主要是针对apply函数,而且我总是不得不返回自己的“手动”代码。)

df_multi_core-这是您要呼叫的那个。它接受:

  1. 您的df对象
  2. 您要调用的函数名称
  3. 可以执行该功能的列的子集(有助于减少时间/内存)
  4. 并行运行的作业数(所有内核为-1或忽略)
  5. df函数接受的其他任何变形(例如“轴”)

_df_split-这是一个内部帮助器函数,必须全局定位到正在运行的模块(Pool.map是“与位置相关的”),否则我将在内部对其进行定位。

这是我的要旨中的代码(我将在其中添加更多的pandas功能测试):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

波纹管是与tqdm“ progress_apply” 并行应用的测试代码。

from time import time
from tqdm import tqdm
tqdm.pandas()

if __name__ == '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))

在输出中,您可以看到1个进度条,用于在没有并行化的情况下运行,以及每核进度条,在具有并行化的情况下运行。会有一些小小的变化,有时其他核心会同时出现,但是即使如此,我仍然认为这很有用,因为您可以获得每个核心的进度统计信息(例如,每秒/秒和总记录)

谢谢@abcdaa提供的这个出色的库!

For anyone who’s looking to apply tqdm on their custom parallel pandas-apply code.

(I tried some of the libraries for parallelization over the years, but I never found a 100% parallelization solution, mainly for the apply function, and I always had to come back for my “manual” code.)

df_multi_core – this is the one you call. It accepts:

  1. Your df object
  2. The function name you’d like to call
  3. The subset of columns the function can be performed upon (helps reducing time / memory)
  4. The number of jobs to run in parallel (-1 or omit for all cores)
  5. Any other kwargs the df’s function accepts (like “axis”)

_df_split – this is an internal helper function that has to be positioned globally to the running module (Pool.map is “placement dependent”), otherwise I’d locate it internally..

here’s the code from my gist (I’ll add more pandas function tests there):

import pandas as pd
import numpy as np
import multiprocessing
from functools import partial

def _df_split(tup_arg, **kwargs):
    split_ind, df_split, df_f_name = tup_arg
    return (split_ind, getattr(df_split, df_f_name)(**kwargs))

def df_multi_core(df, df_f_name, subset=None, njobs=-1, **kwargs):
    if njobs == -1:
        njobs = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=njobs)

    try:
        splits = np.array_split(df[subset], njobs)
    except ValueError:
        splits = np.array_split(df, njobs)

    pool_data = [(split_ind, df_split, df_f_name) for split_ind, df_split in enumerate(splits)]
    results = pool.map(partial(_df_split, **kwargs), pool_data)
    pool.close()
    pool.join()
    results = sorted(results, key=lambda x:x[0])
    results = pd.concat([split[1] for split in results])
    return results

Bellow is a test code for a parallelized apply with tqdm “progress_apply”.

from time import time
from tqdm import tqdm
tqdm.pandas()

if __name__ == '__main__': 
    sep = '-' * 50

    # tqdm progress_apply test      
    def apply_f(row):
        return row['c1'] + 0.1
    N = 1000000
    np.random.seed(0)
    df = pd.DataFrame({'c1': np.arange(N), 'c2': np.arange(N)})

    print('testing pandas apply on {}\n{}'.format(df.shape, sep))
    t1 = time()
    res = df.progress_apply(apply_f, axis=1)
    t2 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for native implementation {}\n{}'.format(round(t2 - t1, 2), sep))

    t3 = time()
    # res = df_multi_core(df=df, df_f_name='apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    res = df_multi_core(df=df, df_f_name='progress_apply', subset=['c1'], njobs=-1, func=apply_f, axis=1)
    t4 = time()
    print('result random sample\n{}'.format(res.sample(n=3, random_state=0)))
    print('time for multi core implementation {}\n{}'.format(round(t4 - t3, 2), sep))

In the output you can see 1 progress bar for running without parallelization, and per-core progress bars when running with parallelization. There is a slight hickup and sometimes the rest of the cores appear at once, but even then I think its usefull since you get the progress stats per core (it/sec and total records, for ex)

Thank you @abcdaa for this great library!


回答 4

您可以使用装饰器轻松完成此操作

from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called {0} times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)

然后只需使用modified_function(并在您希望打印时更改)

You can easily do this with a decorator

from functools import wraps 

def logging_decorator(func):

    @wraps
    def wrapper(*args, **kwargs):
        wrapper.count += 1
        print "The function I modify has been called {0} times(s).".format(
              wrapper.count)
        func(*args, **kwargs)
    wrapper.count = 0
    return wrapper

modified_function = logging_decorator(feature_rollup)

then just use the modified_function (and change when you want it to print)


回答 5

我更改了Jeff的answer,使其包含总数,以便您可以跟踪进度和一个变量以仅打印每X次迭代(如果“ print_at”相当高,则实际上可以大大提高性能)

def count_wrapper(func,total, print_at):

    def wrapper(*args):
        wrapper.count += 1
        if wrapper.count % wrapper.print_at == 0:
            clear_output()
            sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
            sys.stdout.flush()
        return func(*args)
    wrapper.count = 0
    wrapper.total = total
    wrapper.print_at = print_at

    return wrapper

clear_output()函数来自

from IPython.core.display import clear_output

如果不在IPython上,那么Andy Hayden的答案就是没有它

I’ve changed Jeff’s answer, to include a total, so that you can track progress and a variable to just print every X iterations (this actually improves the performance by a lot, if the “print_at” is reasonably high)

def count_wrapper(func,total, print_at):

    def wrapper(*args):
        wrapper.count += 1
        if wrapper.count % wrapper.print_at == 0:
            clear_output()
            sys.stdout.write( "%d / %d"%(calc_time.count,calc_time.total) )
            sys.stdout.flush()
        return func(*args)
    wrapper.count = 0
    wrapper.total = total
    wrapper.print_at = print_at

    return wrapper

the clear_output() function is from

from IPython.core.display import clear_output

if not on IPython Andy Hayden’s answer does that without it