Pandas 性能优化

本文讲解了Pandas性能优化的几种方法,比如第一篇文章讲到了transform函数的应用、Cython编写C扩展、减少类型转换、使用特殊的数组。第二篇Pandas基础优化讲到了尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。第三篇进阶版优化讲到了一些更高级的优化技巧。

1.Pandas 性能优化 40 倍 – DataFrame

1. 1 性能优化小试牛刀

大名鼎鼎的Pandas是数据分析的神器。有时候我们需要对上千万甚至上亿的数据进行非常复杂处理,那么运行效率就是一个不能忽视的问题。

比如下面这个简单例子,我们随机生成100万条数据,对val这一列进行处理:如果是偶数则减1,奇数则加1。实际的数据分析工作要比这个例子复杂的多,但考虑到我们没有那么多时间等待运行结果,所以就偷个懒吧。可以看到transform函数的平均运行时间是284ms:

import pandas as pd
import numpy as np

def gen_data(size):
    d = dict()
    d["genre"] = np.random.choice(["A""B""C""D"], size=size)
    d["val"] = np.random.randint(low=0, high=100, size=size)
    return pd.DataFrame(d)

data = gen_data(1000000)
data.head()
def transform(data):
    data.loc[:, "new_val"] = data.val.apply(lambda x: x + 1 if x % 2 else x - 1)

%timeit -n 1 transform(data)
284 ms ± 8.95 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.2. 用Cython编写C扩展

试试用我们的老朋友Cython来写一下 x + 1 if x % 2 else x - 1 这个函数。平均运行时间降低到了202ms,果然速度变快了。性能大约提升了1.4倍,离40倍的flag还差的好远。

%load_ext cython
%%cython
cpdef int _transform(int x):
    if x % 2:
        return x + 1
    return x - 1

def transform(data):
    data.loc[:, "new_val"] = data.val.apply(_transform)

%timeit -n 1 transform(data)
202 ms ± 13.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.3. 减少类型转换

为了减少C和Python之间的类型转换,我们直接把val这一列作为Numpy数组传递给Cython函数,注意区分cnpnp。平均运行时间直接降到10.8毫秒,性能大约提升了26倍,仿佛看到了一丝希望。

%%cython
import numpy as np
cimport numpy as cnp
ctypedef cnp.int_t DTYPE_t

cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr):
    cdef:
        int i = 0
        int n = arr.shape[0]
        int x
        cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)

    while i < n:
        x = arr[i]
        if x % 2:
            new_arr[i] = x + 1
        else:
            new_arr[i] = x - 1
        i += 1
    return new_arr

def transform(data):
    data.loc[:, "new_val"] = _transform(data.val.values)

%timeit -n 1 transform(data)
10.8 ms ± 512 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.4. 使用不安全的数组

利用@cython.boundscheck(False)@cython.wraparound(False)装饰器关闭数组的边界检查和负下标处理,平均运行时间变为5.9毫秒。性能提升了42倍左右,顺利完成任务。

%%cython
import cython
import numpy as np
cimport numpy as cnp
ctypedef cnp.int_t DTYPE_t

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr):
    cdef:
        int i = 0
        int n = arr.shape[0]
        int x
        cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)

    while i < n:
        x = arr[i]
        if x % 2:
            new_arr[i] = x + 1
        else:
            new_arr[i] = x - 1
        i += 1
    return new_arr

def transform(data):
    data.loc[:, "new_val"] = _transform(data.val.values)

%timeit -n 1 transform(data)
6.76 ms ± 545 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

来源:Python中文社区

作者:李小文,先后从事过数据分析、数据挖掘工作,主要开发语言是Python,现任一家小型互联网公司的算法工程师。https://github.com/tushushu

2.Pandas性能优化:基础篇

Pandas 号称“数据挖掘瑞士军刀”,是数据处理最常用的库。在数据挖掘或者kaggle比赛中,我们经常使用pandas进行数据提取、分析、构造特征。而如果数据量很大,操作算法复杂,那么pandas的运行速度可能非常慢。本文根据实际工作中的经验,总结了一些pandas的使用技巧,帮助提高运行速度或减少内存占用。

2.1 按行迭代优化

很多时候,我们会按行对dataframe进行迭代,一般我们会用iterrows这个函数。在新版的pandas中,提供了一个更快的itertuples函数。

我们测试一下速度:

import pandas as pd
import numpy as np
import time
df = pd.DataFrame({'a': np.random.randn(1000),
                     'b': np.random.randn(1000),
                    'N': np.random.randint(100, 1000, (1000)),
                   'x':  np.random.randint(1, 10, (1000))})

%%timeit
a2=[]
for index,row in df.iterrows():
    temp=row['a']
    a2.append(temp*temp)
df['a2']=a2    

67.6 ms ± 3.69 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
a2=[]
for row in df.itertuples():
    temp=getattr(row, 'a')
    a2.append(temp*temp)
df['a2']=a2

1.54 ms ± 168 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看到直接循环,itertuples速度是iterrows的很多倍。

所以如果在必须要对dataframe进行遍历的话,直接用itertuples替换iterrows。

2.2 apply 优化

一般情况下,如果要对dataframe里的数据逐行处理,而不需要上下文信息,可以使用apply函数。
对于上面的例子,我们使用apply看下:

%%timeit
df['a2']=df.a.apply(lambda x: x*x)

360 µs ± 355 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看到,效率又有提升,apply的速度是itertuples的5倍左右。

注意一下,apply不光能对单个列值做处理,也能对多个列的值做处理。

%%timeit
df['a3']=df.apply( lambda row: row['a']*row['b'],axis=1)

15 ms ± 1.61 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)

这里看出来,多值处理的时候几乎等于iterrows。因此比单列值apply慢了许多,所以这里不推荐对整行进行apply。

我们可以简单的这样改写:

%%timeit
df['a3']=df['a']*df['b']

204 µs ± 8.31 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

如果在计算的时候,使用series.values 提取numpy 数组并使用numpy原生函数计算,效率可能更高

%%timeit
df['a3']=np.multiply(df['a'].values,df['b'].values)

93.3 µs ± 1.45 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

2.3 聚合agg效率优化

有的时候,我们会对一列数据按值进行分组(groupby),再分组后,依次对每一组数据中的其他列进行聚合(agg)。

还是上面的那个dataframe,我们看下:
采用自定义函数的agg函数:

%%timeit
df.groupby("x")['a'].agg(lambda x:x.sum())

1.27 ms ± 45.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

采用agg内置函数:

%%timeit
df.groupby("x")['a'].agg(sum)
#等价df.groupby("x")['a'].sum()

415 µs ± 20.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

刚才是单列聚合,我们看下多列聚合:
自定义函数:

%%timeit
df.groupby("x").agg({"a":lambda x:x.sum(),"b":lambda x:x.count()})

2.6 ms ± 8.17 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

采用内置函数:

%%timeit
df.groupby("x").agg({"a":"sum","b":"count"})

1.33 ms ± 29.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看出,对于多列聚合,内置函数仍然比自定义函数快1倍。所以再进行聚合操作时,尽量使用内置函数提高效率。

下面列出了一些内置函数:

内置函数描述
count计数
sum求和
mean平均值
median中位数
min,max最大值/最小值
std,var标准差/方差
prod求积

当我们需要统计的方法没有对于内置函数的情况下,在自定义函数的时候,优先选用pandas或numpy内置的其他高效函数,如

df.groupby("x")['a'].agg(lambda x:np.sum(np.abs(x)))

在数据量很大的时候要比非numpy函数效率高(数据量小的时候差不多)。

2.4 数据读取优化

如果我们需要用pandas读取较多次文件,或者读取的文件较大,那么这方面占用的时间也会较长,我们也需要对其进行优化。pandas最常见读取函数是read_csv函数,可以对csv文件进行读取。但是pandas read_csv对读取较大的、数据结构复杂的文件,效率不是很理想。

这里有几种方法可以优化:

1. 分块读取csv文件
如果文件过大,内存不够或者时间上等不及,可以分块进行读取。

chunksize = 10 ** 6for chunk in pd.read_csv(filename,  chunksize=chunksize):
     process(chunk)

这里也可以使用多进程方式,具体我们在后面进阶篇介绍。

  1. 过滤掉不需要的列

如果读取的文件列很多,可以使用usecols字段,只load需要的列,提高效率,节约内存。

df = pd.read_csv(filename,  usecols=["a","b","c"]):
  1. 为列指定类型
    panda在read_csv的时候,会自动匹配列的数值类型,这样会导致速度很慢,并且占用内存较大。我们可以为每个列指定类型。
df = pd.read_csv("f500.csv", dtype = {"revenues" : "float64","name":str})
  1. 保存为其他格式
    如果需要频繁的读取和写入,则可以将文件保存为其他格式的,如pickle或hdf。pickle和hdf读取速度是csv的数倍。这里注意一下,pickle比原csv略小,hdf比原csv略大。
 import pandas as pd
 #读取csv
 df = pd.read_csv('filename.csv')
 
 #pickle格式
 df.to_pickle('filename.pkl') 
 df = pd.read_pickle('filename.pkl') 
 
 #hdf格式
 df.to_hdf('filename.hdf','df') 
 df = pd.read_hdf('filename.hdf','df') 

file typetimespeed
csv1.93 s1
pickle415 m4.6
hdf808 ms2.3
  1. 用第三方的包读取
    如可以使用Dask DataFrame读取大文件。第三方包放到最后细讲。

2.5 优化数据处理逻辑

这点算是业务角度优化。如果我们能直接数据处理的步骤,那么处理时间就少了很多。

这需要具体问题具体分析,举个例子,假设我们有一个通讯记录数据集:

call_areacall_seconds
03364
23075
25847
12032

call_seconds 是拨打的时长,单位是秒。
call_area=1 是拨打国内电话,费率是0.1/min,call_area=0 是拨打国外电话,费率是0.7/min
call_area=2 是接听电话,不收费。

我们随机生成一批数据:

import pandas as pd
import numpy as np
import time
import math
row_number=10000
df = pd.DataFrame({'call_area':  np.random.randint(0, 3, (row_number)),
                   'call_seconds':  np.random.randint(0, 10000, (row_number))})

如果我们想计算每个电话的费用:

def get_cost(call_area,call_seconds):
    if call_area==1:
        rate=0.1
    elif call_area==0:
        rate=0.7
    else:
        return 0
    return math.ceil(call_seconds/60)*rate

方法1,采用按行迭代循环

%%timeit
cost_list = []
for i,row in df.iterrows():
    call_area=row['call_area']
    call_seconds=row['call_seconds']
    cost_list.append(get_cost(call_area,call_seconds))
df['cost'] = cost_list

方法2,采用apply行的方式

%%timeit
df['cost']=df.apply(
         lambda row: get_cost(
             row['call_area'],
             row['call_seconds']),
         axis=1)

方法3,采用mask+loc,分组计算

%%timeit
mask1=df.call_area==1
mask2=df.call_area==0
mask3=df.call_area==2
df['call_mins']=np.ceil(df['call_seconds']/60)
df.loc[mask1,'cost']=df.loc[mask1,'call_mins']*0.1
df.loc[mask2,'cost']=df.loc[mask2,'call_mins']*0.7
df.loc[mask3,'cost']=0

方法4,使用numpy的方式,可以使用index的方式找到对应的费用。

%%timeit
prices = np.array([0.7, 0.1, 0])
mins=np.ceil(df['call_seconds'].values/60)
df['cost']=prices[df.call_area]*mins

测试结果:

方法运行时间运行速度
iterrows513 ms1
apply181 ms2.8
loc6.44 ms79.6
numpy219 µs2342

方法4速度快是因为它采用了numpy向量化的数据处理方式。

总结

在优化尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。

3.Pandas性能优化:进阶篇

在这里介绍一些更高级的pandas优化方法。

3.1 numpy

我们先来回顾一下上节说过的一个例子

import pandas as pd
import numpy as np
import time
row_number=100000
df = pd.DataFrame({'a': np.random.randn(row_number),
                     'b': np.random.randn(row_number),
                    'N': np.random.randint(100, 1000, (row_number)),
                   'x':  np.random.randint(1, 10, (row_number))})

我们要计算a列与b列的乘积

方法1,采用apply

%timeit df.apply( lambda row: row['a']*row['b'],axis=1)

方法2,直接对series做乘法

%timeit df['a']*df['b']

方法3,使用numpy函数

%timeit  np.multiply(df['a'].values,df['b'].values)
方法运行时间运行速度
方法11.45s1
方法2254µs5708
方法341.2 µs3536

这提示我们,采用一些好的方法可以大幅度提高pandas的运行速度。

3.2 cython

我们还继续使用上面的dataframe,现在定义一个函数:

def f(x):
    return x * (x - 1)

def integrate_f(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f(a + i * dx)
    return s * dx

我们要计算每一行integrate_f的值,

方法1,还是apply:

%timeit df.apply(lambda x: integrate_f(x['a'], x['b'], x['N'].astype(int)), axis=1)

这个函数运行时间就较长了:

7.05 s ± 54.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

考虑可以用cython重写程序,提高效率。
在使用cython的时候,可能需要安装gcc环境或者mingw(windows)。

方法1,直接加头编译

%load_ext Cython
%%cython
def f_plain(x):
    return x * (x - 1)

def integrate_f_plain(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

6.46 s ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

看来直接加头,效率提升不大。

方法2,使用c type

%%cython
cdef double f_typed(double x) except? -2:
     return x * (x - 1)
cpdef double integrate_f_typed(double a, double b, int N):
    cdef int i
    cdef double s, dx
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_typed(a + i * dx)
    return s * dx

345 ms ± 529 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

可以看到,使用cython的特定编程方法,效率提升较大。

3.3 numba

numba是一个动态JIT编译器,在一些数值计算中可以大幅度提高运行速度。
我们学cython,在python程序上直接加numba jit的头。

import numba

@numba.jitdef f_plain(x):
    return x * (x - 1)

@numba.jitdef integrate_f_numba(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

@numba.jitdef apply_integrate_f_numba(col_a, col_b, col_N):
    n = len(col_N)
    result = np.empty(n, dtype='float64')
    assert len(col_a) == len(col_b) == n
    for i in range(n):
        result[i] = integrate_f_numba(col_a[i], col_b[i], col_N[i])
    return result

def compute_numba(df):
    result = apply_integrate_f_numba(df['a'].to_numpy(),
                                     df['b'].to_numpy(),
                                     df['N'].to_numpy())
    return pd.Series(result, index=df.index, name='result')

 %timeit compute_numba(df)

6.44 ms ± 440 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

我们看到,使用numba,需要做的代码改动较小,效率提升幅度却很大!

3.4 进阶 并行化处理

并行化读取数据

在基础篇讲分块读取时,简单提了一下并行化处理,这里详细说下代码。

第一种思路,分块读取,多进程处理。

import pandas as pd
from multiprocessing import Pool
 
def process(df):
    """
  数据处理
    """
    pass
 
# initialise the iterator object
iterator = pd.read_csv('train.csv', chunksize=200000, compression='gzip',
skipinitialspace=True, encoding='utf-8')
# depends on how many cores you want to utilise
max_processors = 4# Reserve 4 cores for our script
pool = Pool(processes=max_processors)
f_list = []
for df in iterator:
    # 异步处理每个分块
    f = pool.apply_async(process, [df])
    f_list.append(f)
    if len(f_list) >= max_processors:
        for f in f_list:
            f.get()
            del f_list[:]

第二种思路,把大文件拆分成多份,多进程读取。

利用linux中的split命令,将csv切分成p个文件。

!split -l 200000 -d train.csv train_split
#将文件train.csv按每200000行分割,前缀名为train_split,并设置文件命名为数字

代码部分

from multiprocessing import Pool
import pandas as pd
import os
 
def read_func(file_path):
    df = pd.read_csv(file_path, header=None)
    return df
 
def read_file():
    file_list=["train_split%02d"%i for i in range(66)]
    p = Pool(4)
    res = p.map(read_func, file_list)
    p.close()
    p.join()
    df = pd.concat(res, axis=0, ignore_index=True)
    return df
 
df = read_file()

并行化apply

apply的func如果在用了我们之前说的技术优化了速度之后仍然很慢,或者func遇到网络阻塞,那么我们需要去并行化执行apply。这里提供一种处理思路:

import multiprocessing as mp
import time
def slow_func(s):
    time.sleep(1)
    return "done"with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(slow_func, df['qid'])

3.5 进阶 第三方pandas库

由于padans的操作如apply,都是单线程的,直接调用效率不高。我可以使用第三方库进行并行操作。
当然第三方库会带来新的代码不兼容问题。我们有时候会考虑像上一章一样,手写并行化处理。这个权衡需要我们在编程之初就要规划好,避免后期因为bug需要重构。

dask库

pip install dask

类pandas库,可以并行读取、运行。

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import getand the syntax isdata = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def some_function(x,y,z):
    return x+y+z

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1))
.compute(get=get)  

swifter

pip install swifter

pandas的插件,可以直接在pandas上操作:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

Modin库

Modin后端使用dask或者ray,是个支持分布式运行的类pandas库,当然功能异常强大。具体请看官网,这里就不具体介绍了。

https://modin.readthedocs.io/en/latest/using_modin.html

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理。