本文讲解了Pandas性能优化的几种方法,比如第一篇文章讲到了transform函数的应用、Cython编写C扩展、减少类型转换、使用特殊的数组。第二篇Pandas基础优化讲到了尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。第三篇进阶版优化讲到了一些更高级的优化技巧。
1.Pandas 性能优化 40 倍 – DataFrame
1. 1 性能优化小试牛刀
大名鼎鼎的Pandas是数据分析的神器。有时候我们需要对上千万甚至上亿的数据进行非常复杂处理,那么运行效率就是一个不能忽视的问题。
比如下面这个简单例子,我们随机生成100万条数据,对val
这一列进行处理:如果是偶数则减1,奇数则加1。实际的数据分析工作要比这个例子复杂的多,但考虑到我们没有那么多时间等待运行结果,所以就偷个懒吧。可以看到transform
函数的平均运行时间是284ms:
import pandas as pd
import numpy as np
def gen_data(size):
d = dict()
d["genre"] = np.random.choice(["A", "B", "C", "D"], size=size)
d["val"] = np.random.randint(low=0, high=100, size=size)
return pd.DataFrame(d)
data = gen_data(1000000)
data.head()
def transform(data):
data.loc[:, "new_val"] = data.val.apply(lambda x: x + 1 if x % 2 else x - 1)
%timeit -n 1 transform(data)
284 ms ± 8.95 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
1.2. 用Cython编写C扩展
试试用我们的老朋友Cython来写一下 x + 1 if x % 2 else x - 1
这个函数。平均运行时间降低到了202ms,果然速度变快了。性能大约提升了1.4倍,离40倍的flag还差的好远。
%load_ext cython
%%cython
cpdef int _transform(int x):
if x % 2:
return x + 1
return x - 1
def transform(data):
data.loc[:, "new_val"] = data.val.apply(_transform)
%timeit -n 1 transform(data)
202 ms ± 13.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
1.3. 减少类型转换
为了减少C和Python之间的类型转换,我们直接把val
这一列作为Numpy数组传递给Cython函数,注意区分cnp
和np
。平均运行时间直接降到10.8毫秒,性能大约提升了26倍,仿佛看到了一丝希望。
%%cython
import numpy as np
cimport numpy as cnp
ctypedef cnp.int_t DTYPE_t
cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr):
cdef:
int i = 0
int n = arr.shape[0]
int x
cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)
while i < n:
x = arr[i]
if x % 2:
new_arr[i] = x + 1
else:
new_arr[i] = x - 1
i += 1
return new_arr
def transform(data):
data.loc[:, "new_val"] = _transform(data.val.values)
%timeit -n 1 transform(data)
10.8 ms ± 512 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
1.4. 使用不安全的数组
利用@cython.boundscheck(False)
,@cython.wraparound(False)
装饰器关闭数组的边界检查和负下标处理,平均运行时间变为5.9毫秒。性能提升了42倍左右,顺利完成任务。
%%cython
import cython
import numpy as np
cimport numpy as cnp
ctypedef cnp.int_t DTYPE_t
@cython.boundscheck(False)
@cython.wraparound(False)
cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr):
cdef:
int i = 0
int n = arr.shape[0]
int x
cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)
while i < n:
x = arr[i]
if x % 2:
new_arr[i] = x + 1
else:
new_arr[i] = x - 1
i += 1
return new_arr
def transform(data):
data.loc[:, "new_val"] = _transform(data.val.values)
%timeit -n 1 transform(data)
6.76 ms ± 545 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
来源:Python中文社区
作者:李小文,先后从事过数据分析、数据挖掘工作,主要开发语言是Python,现任一家小型互联网公司的算法工程师。https://github.com/tushushu
2.Pandas性能优化:基础篇
Pandas 号称“数据挖掘瑞士军刀”,是数据处理最常用的库。在数据挖掘或者kaggle比赛中,我们经常使用pandas进行数据提取、分析、构造特征。而如果数据量很大,操作算法复杂,那么pandas的运行速度可能非常慢。本文根据实际工作中的经验,总结了一些pandas的使用技巧,帮助提高运行速度或减少内存占用。
2.1 按行迭代优化
很多时候,我们会按行对dataframe进行迭代,一般我们会用iterrows这个函数。在新版的pandas中,提供了一个更快的itertuples函数。
我们测试一下速度:
import pandas as pd import numpy as np import time df = pd.DataFrame({'a': np.random.randn(1000), 'b': np.random.randn(1000), 'N': np.random.randint(100, 1000, (1000)), 'x': np.random.randint(1, 10, (1000))})
%%timeit a2=[] for index,row in df.iterrows(): temp=row['a'] a2.append(temp*temp) df['a2']=a2
67.6 ms ± 3.69 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit a2=[] for row in df.itertuples(): temp=getattr(row, 'a') a2.append(temp*temp) df['a2']=a2
1.54 ms ± 168 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
可以看到直接循环,itertuples速度是iterrows的很多倍。
所以如果在必须要对dataframe进行遍历的话,直接用itertuples替换iterrows。
2.2 apply 优化
一般情况下,如果要对dataframe里的数据逐行处理,而不需要上下文信息,可以使用apply函数。
对于上面的例子,我们使用apply看下:
%%timeit df['a2']=df.a.apply(lambda x: x*x)
360 µs ± 355 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)
可以看到,效率又有提升,apply的速度是itertuples的5倍左右。
注意一下,apply不光能对单个列值做处理,也能对多个列的值做处理。
%%timeit df['a3']=df.apply( lambda row: row['a']*row['b'],axis=1)
15 ms ± 1.61 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)
这里看出来,多值处理的时候几乎等于iterrows。因此比单列值apply慢了许多,所以这里不推荐对整行进行apply。
我们可以简单的这样改写:
%%timeit df['a3']=df['a']*df['b']
204 µs ± 8.31 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
如果在计算的时候,使用series.values 提取numpy 数组并使用numpy原生函数计算,效率可能更高
%%timeit df['a3']=np.multiply(df['a'].values,df['b'].values)
93.3 µs ± 1.45 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
2.3 聚合agg效率优化
有的时候,我们会对一列数据按值进行分组(groupby),再分组后,依次对每一组数据中的其他列进行聚合(agg)。
还是上面的那个dataframe,我们看下:
采用自定义函数的agg函数:
%%timeit df.groupby("x")['a'].agg(lambda x:x.sum())
1.27 ms ± 45.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
采用agg内置函数:
%%timeit df.groupby("x")['a'].agg(sum) #等价df.groupby("x")['a'].sum()
415 µs ± 20.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
刚才是单列聚合,我们看下多列聚合:
自定义函数:
%%timeit df.groupby("x").agg({"a":lambda x:x.sum(),"b":lambda x:x.count()})
2.6 ms ± 8.17 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
采用内置函数:
%%timeit df.groupby("x").agg({"a":"sum","b":"count"})
1.33 ms ± 29.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
可以看出,对于多列聚合,内置函数仍然比自定义函数快1倍。所以再进行聚合操作时,尽量使用内置函数提高效率。
下面列出了一些内置函数:
内置函数 | 描述 |
---|---|
count | 计数 |
sum | 求和 |
mean | 平均值 |
median | 中位数 |
min,max | 最大值/最小值 |
std,var | 标准差/方差 |
prod | 求积 |
当我们需要统计的方法没有对于内置函数的情况下,在自定义函数的时候,优先选用pandas或numpy内置的其他高效函数,如
df.groupby("x")['a'].agg(lambda x:np.sum(np.abs(x)))
在数据量很大的时候要比非numpy函数效率高(数据量小的时候差不多)。
2.4 数据读取优化
如果我们需要用pandas读取较多次文件,或者读取的文件较大,那么这方面占用的时间也会较长,我们也需要对其进行优化。pandas最常见读取函数是read_csv函数,可以对csv文件进行读取。但是pandas read_csv对读取较大的、数据结构复杂的文件,效率不是很理想。
这里有几种方法可以优化:
1. 分块读取csv文件
如果文件过大,内存不够或者时间上等不及,可以分块进行读取。
chunksize = 10 ** 6for chunk in pd.read_csv(filename, chunksize=chunksize): process(chunk)
这里也可以使用多进程方式,具体我们在后面进阶篇介绍。
- 过滤掉不需要的列
如果读取的文件列很多,可以使用usecols字段,只load需要的列,提高效率,节约内存。
df = pd.read_csv(filename, usecols=["a","b","c"]):
- 为列指定类型
panda在read_csv的时候,会自动匹配列的数值类型,这样会导致速度很慢,并且占用内存较大。我们可以为每个列指定类型。
df = pd.read_csv("f500.csv", dtype = {"revenues" : "float64","name":str})
- 保存为其他格式
如果需要频繁的读取和写入,则可以将文件保存为其他格式的,如pickle或hdf。pickle和hdf读取速度是csv的数倍。这里注意一下,pickle比原csv略小,hdf比原csv略大。
import pandas as pd #读取csv df = pd.read_csv('filename.csv') #pickle格式 df.to_pickle('filename.pkl') df = pd.read_pickle('filename.pkl') #hdf格式 df.to_hdf('filename.hdf','df') df = pd.read_hdf('filename.hdf','df')
file type | time | speed |
---|---|---|
csv | 1.93 s | 1 |
pickle | 415 m | 4.6 |
hdf | 808 ms | 2.3 |
- 用第三方的包读取
如可以使用Dask DataFrame读取大文件。第三方包放到最后细讲。
2.5 优化数据处理逻辑
这点算是业务角度优化。如果我们能直接数据处理的步骤,那么处理时间就少了很多。
这需要具体问题具体分析,举个例子,假设我们有一个通讯记录数据集:
call_area | call_seconds |
---|---|
0 | 3364 |
2 | 3075 |
2 | 5847 |
1 | 2032 |
call_seconds 是拨打的时长,单位是秒。
call_area=1 是拨打国内电话,费率是0.1/min,call_area=0 是拨打国外电话,费率是0.7/min
call_area=2 是接听电话,不收费。
我们随机生成一批数据:
import pandas as pd import numpy as np import time import math row_number=10000 df = pd.DataFrame({'call_area': np.random.randint(0, 3, (row_number)), 'call_seconds': np.random.randint(0, 10000, (row_number))})
如果我们想计算每个电话的费用:
def get_cost(call_area,call_seconds): if call_area==1: rate=0.1 elif call_area==0: rate=0.7 else: return 0 return math.ceil(call_seconds/60)*rate
方法1,采用按行迭代循环
%%timeit cost_list = [] for i,row in df.iterrows(): call_area=row['call_area'] call_seconds=row['call_seconds'] cost_list.append(get_cost(call_area,call_seconds)) df['cost'] = cost_list
方法2,采用apply行的方式
%%timeit df['cost']=df.apply( lambda row: get_cost( row['call_area'], row['call_seconds']), axis=1)
方法3,采用mask+loc,分组计算
%%timeit mask1=df.call_area==1 mask2=df.call_area==0 mask3=df.call_area==2 df['call_mins']=np.ceil(df['call_seconds']/60) df.loc[mask1,'cost']=df.loc[mask1,'call_mins']*0.1 df.loc[mask2,'cost']=df.loc[mask2,'call_mins']*0.7 df.loc[mask3,'cost']=0
方法4,使用numpy的方式,可以使用index的方式找到对应的费用。
%%timeit prices = np.array([0.7, 0.1, 0]) mins=np.ceil(df['call_seconds'].values/60) df['cost']=prices[df.call_area]*mins
测试结果:
方法 | 运行时间 | 运行速度 |
---|---|---|
iterrows | 513 ms | 1 |
apply | 181 ms | 2.8 |
loc | 6.44 ms | 79.6 |
numpy | 219 µs | 2342 |
方法4速度快是因为它采用了numpy向量化的数据处理方式。
总结
在优化尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。
3.Pandas性能优化:进阶篇
在这里介绍一些更高级的pandas优化方法。
3.1 numpy
我们先来回顾一下上节说过的一个例子
import pandas as pd import numpy as np import time row_number=100000 df = pd.DataFrame({'a': np.random.randn(row_number), 'b': np.random.randn(row_number), 'N': np.random.randint(100, 1000, (row_number)), 'x': np.random.randint(1, 10, (row_number))})
我们要计算a列与b列的乘积
方法1,采用apply
%timeit df.apply( lambda row: row['a']*row['b'],axis=1)
方法2,直接对series做乘法
%timeit df['a']*df['b']
方法3,使用numpy函数
%timeit np.multiply(df['a'].values,df['b'].values)
方法 | 运行时间 | 运行速度 |
---|---|---|
方法1 | 1.45s | 1 |
方法2 | 254µs | 5708 |
方法3 | 41.2 µs | 3536 |
这提示我们,采用一些好的方法可以大幅度提高pandas的运行速度。
3.2 cython
我们还继续使用上面的dataframe,现在定义一个函数:
def f(x): return x * (x - 1) def integrate_f(a, b, N): s = 0 dx = (b - a) / N for i in range(N): s += f(a + i * dx) return s * dx
我们要计算每一行integrate_f的值,
方法1,还是apply:
%timeit df.apply(lambda x: integrate_f(x['a'], x['b'], x['N'].astype(int)), axis=1)
这个函数运行时间就较长了:
7.05 s ± 54.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
考虑可以用cython重写程序,提高效率。
在使用cython的时候,可能需要安装gcc环境或者mingw(windows)。
方法1,直接加头编译
%load_ext Cython %%cython def f_plain(x): return x * (x - 1) def integrate_f_plain(a, b, N): s = 0 dx = (b - a) / N for i in range(N): s += f_plain(a + i * dx) return s * dx
6.46 s ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
看来直接加头,效率提升不大。
方法2,使用c type
%%cython cdef double f_typed(double x) except? -2: return x * (x - 1) cpdef double integrate_f_typed(double a, double b, int N): cdef int i cdef double s, dx s = 0 dx = (b - a) / N for i in range(N): s += f_typed(a + i * dx) return s * dx
345 ms ± 529 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
可以看到,使用cython的特定编程方法,效率提升较大。
3.3 numba
numba是一个动态JIT编译器,在一些数值计算中可以大幅度提高运行速度。
我们学cython,在python程序上直接加numba jit的头。
import numba @numba.jitdef f_plain(x): return x * (x - 1) @numba.jitdef integrate_f_numba(a, b, N): s = 0 dx = (b - a) / N for i in range(N): s += f_plain(a + i * dx) return s * dx @numba.jitdef apply_integrate_f_numba(col_a, col_b, col_N): n = len(col_N) result = np.empty(n, dtype='float64') assert len(col_a) == len(col_b) == n for i in range(n): result[i] = integrate_f_numba(col_a[i], col_b[i], col_N[i]) return result def compute_numba(df): result = apply_integrate_f_numba(df['a'].to_numpy(), df['b'].to_numpy(), df['N'].to_numpy()) return pd.Series(result, index=df.index, name='result') %timeit compute_numba(df)
6.44 ms ± 440 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
我们看到,使用numba,需要做的代码改动较小,效率提升幅度却很大!
3.4 进阶 并行化处理
并行化读取数据
在基础篇讲分块读取时,简单提了一下并行化处理,这里详细说下代码。
第一种思路,分块读取,多进程处理。
import pandas as pd from multiprocessing import Pool def process(df): """ 数据处理 """ pass # initialise the iterator object iterator = pd.read_csv('train.csv', chunksize=200000, compression='gzip', skipinitialspace=True, encoding='utf-8') # depends on how many cores you want to utilise max_processors = 4# Reserve 4 cores for our script pool = Pool(processes=max_processors) f_list = [] for df in iterator: # 异步处理每个分块 f = pool.apply_async(process, [df]) f_list.append(f) if len(f_list) >= max_processors: for f in f_list: f.get() del f_list[:]
第二种思路,把大文件拆分成多份,多进程读取。
利用linux中的split命令,将csv切分成p个文件。
!split -l 200000 -d train.csv train_split #将文件train.csv按每200000行分割,前缀名为train_split,并设置文件命名为数字
代码部分
from multiprocessing import Pool import pandas as pd import os def read_func(file_path): df = pd.read_csv(file_path, header=None) return df def read_file(): file_list=["train_split%02d"%i for i in range(66)] p = Pool(4) res = p.map(read_func, file_list) p.close() p.join() df = pd.concat(res, axis=0, ignore_index=True) return df df = read_file()
并行化apply
apply的func如果在用了我们之前说的技术优化了速度之后仍然很慢,或者func遇到网络阻塞,那么我们需要去并行化执行apply。这里提供一种处理思路:
import multiprocessing as mp import time def slow_func(s): time.sleep(1) return "done"with mp.Pool(mp.cpu_count()) as pool: df['newcol'] = pool.map(slow_func, df['qid'])
3.5 进阶 第三方pandas库
由于padans的操作如apply,都是单线程的,直接调用效率不高。我可以使用第三方库进行并行操作。
当然第三方库会带来新的代码不兼容问题。我们有时候会考虑像上一章一样,手写并行化处理。这个权衡需要我们在编程之初就要规划好,避免后期因为bug需要重构。
dask库
pip install dask
类pandas库,可以并行读取、运行。
import pandas as pd import dask.dataframe as dd from dask.multiprocessing import getand the syntax isdata = <your_pandas_dataframe> ddata = dd.from_pandas(data, npartitions=30) def some_function(x,y,z): return x+y+z res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1)) .compute(get=get)
swifter
pip install swifter
pandas的插件,可以直接在pandas上操作:
import swifter def some_function(data): return data * 10 data['out'] = data['in'].swifter.apply(some_function)
Modin库
Modin后端使用dask或者ray,是个支持分布式运行的类pandas库,当然功能异常强大。具体请看官网,这里就不具体介绍了。
https://modin.readthedocs.io/en/latest/using_modin.html
我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。
有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。
原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!
Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典