
本文讲解了Pandas性能优化的几种方法,比如第一篇文章讲到了transform函数的应用、Cython编写C扩展、减少类型转换、使用特殊的数组。第二篇Pandas基础优化讲到了尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。第三篇进阶版优化讲到了一些更高级的优化技巧。
1.Pandas 性能优化 40 倍 – DataFrame
1. 1 性能优化小试牛刀
大名鼎鼎的Pandas是数据分析的神器。有时候我们需要对上千万甚至上亿的数据进行非常复杂处理,那么运行效率就是一个不能忽视的问题。
比如下面这个简单例子,我们随机生成100万条数据,对val
这一列进行处理:如果是偶数则减1,奇数则加1。实际的数据分析工作要比这个例子复杂的多,但考虑到我们没有那么多时间等待运行结果,所以就偷个懒吧。可以看到transform
函数的平均运行时间是284ms:
import pandas as pdimport numpy as npdef gen_data(size): d = dict() d["genre"] = np.random.choice(["A", "B", "C", "D"], size=size) d["val"] = np.random.randint(low=0, high=100, size=size) return pd.DataFrame(d)data = gen_data(1000000)data.head()def transform(data): data.loc[:, "new_val"] = data.val.apply(lambda x: x + 1 if x % 2 else x - 1)%timeit -n 1 transform(data)284 ms ± 8.95 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
1.2. 用Cython编写C扩展
试试用我们的老朋友Cython来写一下 x + 1 if x % 2 else x - 1
这个函数。平均运行时间降低到了202ms,果然速度变快了。性能大约提升了1.4倍,离40倍的flag还差的好远。
%load_ext cython%%cythoncpdef int _transform(int x): if x % 2: return x + 1 return x - 1def transform(data): data.loc[:, "new_val"] = data.val.apply(_transform)%timeit -n 1 transform(data)202 ms ± 13.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
1.3. 减少类型转换
为了减少C和Python之间的类型转换,我们直接把val
这一列作为Numpy数组传递给Cython函数,注意区分cnp
和np
。平均运行时间直接降到10.8毫秒,性能大约提升了26倍,仿佛看到了一丝希望。
%%cythonimport numpy as npcimport numpy as cnpctypedef cnp.int_t DTYPE_tcpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr): cdef: int i = 0 int n = arr.shape[0] int x cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr) while i < n: x = arr[i] if x % 2: new_arr[i] = x + 1 else: new_arr[i] = x - 1 i += 1 return new_arrdef transform(data): data.loc[:, "new_val"] = _transform(data.val.values)%timeit -n 1 transform(data)10.8 ms ± 512 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
1.4. 使用不安全的数组
利用@cython.boundscheck(False)
,@cython.wraparound(False)
装饰器关闭数组的边界检查和负下标处理,平均运行时间变为5.9毫秒。性能提升了42倍左右,顺利完成任务。
%%cythonimport cythonimport numpy as npcimport numpy as cnpctypedef cnp.int_t DTYPE_t@cython.boundscheck(False)@cython.wraparound(False)cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr): cdef: int i = 0 int n = arr.shape[0] int x cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr) while i < n: x = arr[i] if x % 2: new_arr[i] = x + 1 else: new_arr[i] = x - 1 i += 1 return new_arrdef transform(data): data.loc[:, "new_val"] = _transform(data.val.values)%timeit -n 1 transform(data)6.76 ms ± 545 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
来源:Python中文社区
作者:李小文,先后从事过数据分析、数据挖掘工作,主要开发语言是Python,现任一家小型互联网公司的算法工程师。https://github.com/tushushu
2.Pandas性能优化:基础篇
Pandas 号称“数据挖掘瑞士军刀”,是数据处理最常用的库。在数据挖掘或者kaggle比赛中,我们经常使用pandas进行数据提取、分析、构造特征。而如果数据量很大,操作算法复杂,那么pandas的运行速度可能非常慢。本文根据实际工作中的经验,总结了一些pandas的使用技巧,帮助提高运行速度或减少内存占用。
2.1 按行迭代优化
很多时候,我们会按行对dataframe进行迭代,一般我们会用iterrows这个函数。在新版的pandas中,提供了一个更快的itertuples函数。
我们测试一下速度:
import pandas as pd
import numpy as np
import time
df = pd.DataFrame({'a': np.random.randn(1000),
'b': np.random.randn(1000),
'N': np.random.randint(100, 1000, (1000)),
'x': np.random.randint(1, 10, (1000))})
%%timeit
a2=[]
for index,row in df.iterrows():
temp=row['a']
a2.append(temp*temp)
df['a2']=a2
67.6 ms ± 3.69 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
%%timeit
a2=[]
for row in df.itertuples():
temp=getattr(row, 'a')
a2.append(temp*temp)
df['a2']=a2
1.54 ms ± 168 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
可以看到直接循环,itertuples速度是iterrows的很多倍。
所以如果在必须要对dataframe进行遍历的话,直接用itertuples替换iterrows。
2.2 apply 优化
一般情况下,如果要对dataframe里的数据逐行处理,而不需要上下文信息,可以使用apply函数。
对于上面的例子,我们使用apply看下:
%%timeit
df['a2']=df.a.apply(lambda x: x*x)
360 µs ± 355 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)
可以看到,效率又有提升,apply的速度是itertuples的5倍左右。
注意一下,apply不光能对单个列值做处理,也能对多个列的值做处理。
%%timeit
df['a3']=df.apply( lambda row: row['a']*row['b'],axis=1)
15 ms ± 1.61 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)
这里看出来,多值处理的时候几乎等于iterrows。因此比单列值apply慢了许多,所以这里不推荐对整行进行apply。
我们可以简单的这样改写:
%%timeit
df['a3']=df['a']*df['b']
204 µs ± 8.31 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
如果在计算的时候,使用series.values 提取numpy 数组并使用numpy原生函数计算,效率可能更高
%%timeit
df['a3']=np.multiply(df['a'].values,df['b'].values)
93.3 µs ± 1.45 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
2.3 聚合agg效率优化
有的时候,我们会对一列数据按值进行分组(groupby),再分组后,依次对每一组数据中的其他列进行聚合(agg)。
还是上面的那个dataframe,我们看下:
采用自定义函数的agg函数:
%%timeit
df.groupby("x")['a'].agg(lambda x:x.sum())
1.27 ms ± 45.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
采用agg内置函数:
%%timeit
df.groupby("x")['a'].agg(sum)
#等价df.groupby("x")['a'].sum()
415 µs ± 20.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
刚才是单列聚合,我们看下多列聚合:
自定义函数:
%%timeit
df.groupby("x").agg({"a":lambda x:x.sum(),"b":lambda x:x.count()})
2.6 ms ± 8.17 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
采用内置函数:
%%timeit
df.groupby("x").agg({"a":"sum","b":"count"})
1.33 ms ± 29.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
可以看出,对于多列聚合,内置函数仍然比自定义函数快1倍。所以再进行聚合操作时,尽量使用内置函数提高效率。
下面列出了一些内置函数:
内置函数 | 描述 |
---|---|
count | 计数 |
sum | 求和 |
mean | 平均值 |
median | 中位数 |
min,max | 最大值/最小值 |
std,var | 标准差/方差 |
prod | 求积 |
当我们需要统计的方法没有对于内置函数的情况下,在自定义函数的时候,优先选用pandas或numpy内置的其他高效函数,如
df.groupby("x")['a'].agg(lambda x:np.sum(np.abs(x)))
在数据量很大的时候要比非numpy函数效率高(数据量小的时候差不多)。
2.4 数据读取优化
如果我们需要用pandas读取较多次文件,或者读取的文件较大,那么这方面占用的时间也会较长,我们也需要对其进行优化。pandas最常见读取函数是read_csv函数,可以对csv文件进行读取。但是pandas read_csv对读取较大的、数据结构复杂的文件,效率不是很理想。
这里有几种方法可以优化:
1. 分块读取csv文件
如果文件过大,内存不够或者时间上等不及,可以分块进行读取。
chunksize = 10 ** 6for chunk in pd.read_csv(filename, chunksize=chunksize):
process(chunk)
这里也可以使用多进程方式,具体我们在后面进阶篇介绍。
- 过滤掉不需要的列
如果读取的文件列很多,可以使用usecols字段,只load需要的列,提高效率,节约内存。
df = pd.read_csv(filename, usecols=["a","b","c"]):
- 为列指定类型
panda在read_csv的时候,会自动匹配列的数值类型,这样会导致速度很慢,并且占用内存较大。我们可以为每个列指定类型。
df = pd.read_csv("f500.csv", dtype = {"revenues" : "float64","name":str})
- 保存为其他格式
如果需要频繁的读取和写入,则可以将文件保存为其他格式的,如pickle或hdf。pickle和hdf读取速度是csv的数倍。这里注意一下,pickle比原csv略小,hdf比原csv略大。
import pandas as pd
#读取csv
df = pd.read_csv('filename.csv')
#pickle格式
df.to_pickle('filename.pkl')
df = pd.read_pickle('filename.pkl')
#hdf格式
df.to_hdf('filename.hdf','df')
df = pd.read_hdf('filename.hdf','df')
file type | time | speed |
---|---|---|
csv | 1.93 s | 1 |
pickle | 415 m | 4.6 |
hdf | 808 ms | 2.3 |
- 用第三方的包读取
如可以使用Dask DataFrame读取大文件。第三方包放到最后细讲。
2.5 优化数据处理逻辑
这点算是业务角度优化。如果我们能直接数据处理的步骤,那么处理时间就少了很多。
这需要具体问题具体分析,举个例子,假设我们有一个通讯记录数据集:
call_area | call_seconds |
---|---|
0 | 3364 |
2 | 3075 |
2 | 5847 |
1 | 2032 |
call_seconds 是拨打的时长,单位是秒。
call_area=1 是拨打国内电话,费率是0.1/min,call_area=0 是拨打国外电话,费率是0.7/min
call_area=2 是接听电话,不收费。
我们随机生成一批数据:
import pandas as pd
import numpy as np
import time
import math
row_number=10000
df = pd.DataFrame({'call_area': np.random.randint(0, 3, (row_number)),
'call_seconds': np.random.randint(0, 10000, (row_number))})
如果我们想计算每个电话的费用:
def get_cost(call_area,call_seconds):
if call_area==1:
rate=0.1
elif call_area==0:
rate=0.7
else:
return 0
return math.ceil(call_seconds/60)*rate
方法1,采用按行迭代循环
%%timeit
cost_list = []
for i,row in df.iterrows():
call_area=row['call_area']
call_seconds=row['call_seconds']
cost_list.append(get_cost(call_area,call_seconds))
df['cost'] = cost_list
方法2,采用apply行的方式
%%timeit
df['cost']=df.apply(
lambda row: get_cost(
row['call_area'],
row['call_seconds']),
axis=1)
方法3,采用mask+loc,分组计算
%%timeit
mask1=df.call_area==1
mask2=df.call_area==0
mask3=df.call_area==2
df['call_mins']=np.ceil(df['call_seconds']/60)
df.loc[mask1,'cost']=df.loc[mask1,'call_mins']*0.1
df.loc[mask2,'cost']=df.loc[mask2,'call_mins']*0.7
df.loc[mask3,'cost']=0
方法4,使用numpy的方式,可以使用index的方式找到对应的费用。
%%timeit
prices = np.array([0.7, 0.1, 0])
mins=np.ceil(df['call_seconds'].values/60)
df['cost']=prices[df.call_area]*mins
测试结果:
方法 | 运行时间 | 运行速度 |
---|---|---|
iterrows | 513 ms | 1 |
apply | 181 ms | 2.8 |
loc | 6.44 ms | 79.6 |
numpy | 219 µs | 2342 |
方法4速度快是因为它采用了numpy向量化的数据处理方式。
总结
在优化尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。
3.Pandas性能优化:进阶篇
在这里介绍一些更高级的pandas优化方法。
3.1 numpy
我们先来回顾一下上节说过的一个例子
import pandas as pd
import numpy as np
import time
row_number=100000
df = pd.DataFrame({'a': np.random.randn(row_number),
'b': np.random.randn(row_number),
'N': np.random.randint(100, 1000, (row_number)),
'x': np.random.randint(1, 10, (row_number))})
我们要计算a列与b列的乘积
方法1,采用apply
%timeit df.apply( lambda row: row['a']*row['b'],axis=1)
方法2,直接对series做乘法
%timeit df['a']*df['b']
方法3,使用numpy函数
%timeit np.multiply(df['a'].values,df['b'].values)
方法 | 运行时间 | 运行速度 |
---|---|---|
方法1 | 1.45s | 1 |
方法2 | 254µs | 5708 |
方法3 | 41.2 µs | 3536 |
这提示我们,采用一些好的方法可以大幅度提高pandas的运行速度。
3.2 cython
我们还继续使用上面的dataframe,现在定义一个函数:
def f(x):
return x * (x - 1)
def integrate_f(a, b, N):
s = 0
dx = (b - a) / N
for i in range(N):
s += f(a + i * dx)
return s * dx
我们要计算每一行integrate_f的值,
方法1,还是apply:
%timeit df.apply(lambda x: integrate_f(x['a'], x['b'], x['N'].astype(int)), axis=1)
这个函数运行时间就较长了:
7.05 s ± 54.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
考虑可以用cython重写程序,提高效率。
在使用cython的时候,可能需要安装gcc环境或者mingw(windows)。
方法1,直接加头编译
%load_ext Cython
%%cython
def f_plain(x):
return x * (x - 1)
def integrate_f_plain(a, b, N):
s = 0
dx = (b - a) / N
for i in range(N):
s += f_plain(a + i * dx)
return s * dx
6.46 s ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
看来直接加头,效率提升不大。
方法2,使用c type
%%cython
cdef double f_typed(double x) except? -2:
return x * (x - 1)
cpdef double integrate_f_typed(double a, double b, int N):
cdef int i
cdef double s, dx
s = 0
dx = (b - a) / N
for i in range(N):
s += f_typed(a + i * dx)
return s * dx
345 ms ± 529 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
可以看到,使用cython的特定编程方法,效率提升较大。
3.3 numba
numba是一个动态JIT编译器,在一些数值计算中可以大幅度提高运行速度。
我们学cython,在python程序上直接加numba jit的头。
import numba
jitdef f_plain(x): .
return x * (x - 1)
jitdef integrate_f_numba(a, b, N): .
s = 0
dx = (b - a) / N
for i in range(N):
s += f_plain(a + i * dx)
return s * dx
jitdef apply_integrate_f_numba(col_a, col_b, col_N): .
n = len(col_N)
result = np.empty(n, dtype='float64')
assert len(col_a) == len(col_b) == n
for i in range(n):
result[i] = integrate_f_numba(col_a[i], col_b[i], col_N[i])
return result
def compute_numba(df):
result = apply_integrate_f_numba(df['a'].to_numpy(),
df['b'].to_numpy(),
df['N'].to_numpy())
return pd.Series(result, index=df.index, name='result')
%timeit compute_numba(df)
6.44 ms ± 440 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
我们看到,使用numba,需要做的代码改动较小,效率提升幅度却很大!
3.4 进阶 并行化处理
并行化读取数据
在基础篇讲分块读取时,简单提了一下并行化处理,这里详细说下代码。
第一种思路,分块读取,多进程处理。
import pandas as pd
from multiprocessing import Pool
def process(df):
"""
数据处理
"""
pass
# initialise the iterator object
iterator = pd.read_csv('train.csv', chunksize=200000, compression='gzip',
skipinitialspace=True, encoding='utf-8')
# depends on how many cores you want to utilise
max_processors = 4# Reserve 4 cores for our script
pool = Pool(processes=max_processors)
f_list = []
for df in iterator:
# 异步处理每个分块
f = pool.apply_async(process, [df])
f_list.append(f)
if len(f_list) >= max_processors:
for f in f_list:
f.get()
del f_list[:]
第二种思路,把大文件拆分成多份,多进程读取。
利用linux中的split命令,将csv切分成p个文件。
!split -l 200000 -d train.csv train_split
#将文件train.csv按每200000行分割,前缀名为train_split,并设置文件命名为数字
代码部分
from multiprocessing import Pool
import pandas as pd
import os
def read_func(file_path):
df = pd.read_csv(file_path, header=None)
return df
def read_file():
file_list=["train_split%02d"%i for i in range(66)]
p = Pool(4)
res = p.map(read_func, file_list)
p.close()
p.join()
df = pd.concat(res, axis=0, ignore_index=True)
return df
df = read_file()
并行化apply
apply的func如果在用了我们之前说的技术优化了速度之后仍然很慢,或者func遇到网络阻塞,那么我们需要去并行化执行apply。这里提供一种处理思路:
import multiprocessing as mp
import time
def slow_func(s):
time.sleep(1)
return "done"with mp.Pool(mp.cpu_count()) as pool:
df['newcol'] = pool.map(slow_func, df['qid'])
3.5 进阶 第三方pandas库
由于padans的操作如apply,都是单线程的,直接调用效率不高。我可以使用第三方库进行并行操作。
当然第三方库会带来新的代码不兼容问题。我们有时候会考虑像上一章一样,手写并行化处理。这个权衡需要我们在编程之初就要规划好,避免后期因为bug需要重构。
dask库
pip install dask
类pandas库,可以并行读取、运行。
import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import getand the syntax isdata = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)
def some_function(x,y,z):
return x+y+z
res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1))
.compute(get=get)
swifter
pip install swifter
pandas的插件,可以直接在pandas上操作:
import swifter
def some_function(data):
return data * 10
data['out'] = data['in'].swifter.apply(some_function)
Modin库
Modin后端使用dask或者ray,是个支持分布式运行的类pandas库,当然功能异常强大。具体请看官网,这里就不具体介绍了。
https://modin.readthedocs.io/en/latest/using_modin.html
我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。
有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。
原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!
Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典
