标签归档:性能优化

Annoy 1个优秀的”邻近搜索”解决方案

Annoy是由 spotify 开源的一个Python第三方模块,它能用于搜索空间中给定查询点的近邻点。

此外,众所周知,Python由于GIL的存在,它的多线程最多只能用上一个CPU核的性能。如果你想要做性能优化,就必须用上多进程。

但是多进程存在一个问题,就是所有进程的变量都是独立的,B进程访问不到A进程的变量,因此Annoy为了解决这个问题,增加了一个静态索引保存功能,你可以在A进程中保存Annoy变量,在B进程中通过文件的形式访问这个变量。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install annoy

2.基本使用

Annoy使用起来非常简单,学习成本极低。比如我们随意生成1000个0,1之间的高斯分布点,将其加入到Annoy的索引,并保存为文件:

# 公众号:Python 实用宝典
from annoy import AnnoyIndex
import random

f = 40
t = AnnoyIndex(f, 'angular')  # 用于存储f维度向量
for i in range(1000):
    v = [random.gauss(0, 1) for z in range(f)]
    t.add_item(i, v)

t.build(10) # 10 棵树,查询时,树越多,精度越高。
t.save('test.ann')

这样,我们就完成了索引的创建及落地。Annoy 支持4种距离计算方式:

"angular""euclidean""manhattan""hamming",或"dot",即余弦距离、欧几里得距离、曼哈顿距离、汉明距离及点乘距离。

接下来我们可以新建一个进程访问这个索引:

from annoy import AnnoyIndex

f = 40
u = AnnoyIndex(f, 'angular')
u.load('test.ann') 
print(u.get_nns_by_item(1, 5))
# [1, 607, 672, 780, 625]

其中,u.get_nns_by_item(i, n, search_k=-1, include_distances=False)返回第 i 个item的n个最近邻的item。在查询期间,它将检索多达search_k(默认n_trees * n)个点。如果设置include_distancesTrue,它将返回一个包含两个列表的元组:第二个列表中包含所有对应的距离。

3.算法原理

构建索引:在数据集中随机选择两个点,用它们的中垂线来切分整个数据集。再随机从两个平面中各选出一个顶点,再用中垂线进行切分,于是两个平面变成了四个平面。以此类推形成一颗二叉树。当我们设定树的数量时,这个数量指的就是这样随机生成的二叉树的数量。所以每颗二叉树都是随机切分的。

查询方法
1. 将每一颗树的根节点插入优先队列;
2. 搜索优先队列中的每一颗二叉树,每一颗二叉树都可以得到最多 Top K 的候选集;
3. 删除重复的候选集;
4. 计算候选集与查询点的相似度或者距离;
5. 返回 Top K 的集合。

4.附录

下面是Annoy的所有函数方法:

  • AnnoyIndex(f, metric) 返回可读写的新索引,用于存储f维度向量。metric 可以是 "angular""euclidean""manhattan""hamming",或"dot"
  • a.add_item(i, v)用于给索引添加向量v,i 是指第 i 个向量。
  • a.build(n_trees)用于构建 n_trees 的森林。查询时,树越多,精度越高。在调用build后,无法再添加任何向量。
  • a.save(fn, prefault=False)将索引保存到磁盘。保存后,不能再添加任何向量。
  • a.load(fn, prefault=False)从磁盘加载索引。如果prefault设置为True,它将把整个文件预读到内存中。默认值为False。
  • a.unload() 释放索引。
  • a.get_nns_by_item(i, n, search_k=-1, include_distances=False)返回第 i 个item的 n 个最近邻的item。
  • a.get_nns_by_vector(v, n, search_k=-1, include_distances=False)与上面的相同,但按向量v查询。
  • a.get_item_vector(i)返回第i个向量。
  • a.get_distance(i, j)返回向量i和向量j之间的距离。
  • a.get_n_items() 返回索引中的向量数。
  • a.get_n_trees() 返回索引中的树的数量。
  • a.on_disk_build(fn) 用以在指定文件而不是RAM中建立索引(在添加向量之前执行,在建立之后无需保存)。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

什么格式是保存 Pandas 数据的最好格式?

在数据分析相关项目工作时,我通常使用Jupyter笔记本和pandas库来处理和移动我的数据。对于中等大小的数据集来说,这是一个非常直接的过程,你甚至可以将其存储为纯文本文件而没有太多的开销。

然而,当你的数据集中的观测数据数量较多时,保存和加载数据回内存的过程就会变慢,现在程序的重新启动都会迫使你等待数据重新加载。所以最终,CSV文件或任何其他纯文本格式都会失去吸引力。

我们可以做得更好。有很多二进制格式可以用来将数据存储到磁盘上,其中有很多格式pandas都支持。我们怎么能知道哪一种更适合我们的目的呢?

来吧,我们尝试其中的几个,然后进行对比!这就是我决定在这篇文章中要做的:通过几种方法将 pandas.DataFrame 保存到磁盘上,看看哪一种在I/O速度、内存消耗和磁盘空间方面做的更好。

在这篇文章中,我将展示我的测试结果。

1.要比较的格式

我们将考虑采用以下格式来存储我们的数据:

1. CSV — 数据科学家的一个好朋友
2. Pickle — 一种Python的方式来序列化事物
3. MessagePack — 它就像JSON,但又快又小
4. HDF5 — 一种设计用于存储和组织大量数据的文件格式
5. Feather — 一种快速、轻量级、易于使用的二进制文件格式,用于存储数据框架
6. Parquet — Apache Hadoop的柱状存储格式

所有这些格式都是被广泛使用的,而且(也许除了MessagePack)在你做一些数据分析的事情时非常经常遇到。

为了追求找到最好的缓冲格式来存储程序会话之间的数据,我选择了以下指标进行比较。

1. size_mb – 文件大小(Mb)。
2. save_time – 将数据帧保存到磁盘上所需的时间量。
3. load_time – 将之前转储的数据帧加载到内存中所需要的时间量。
4. save_ram_delta_mb – 数据帧保存过程中最大的内存消耗增长量。
5. load_ram_delta_mb – 数据帧加载过程中的最大内存消耗增长量。

请注意,当我们使用高效压缩的二进制数据格式,如 Parquet 时,最后两个指标变得非常重要。它们可以帮助我们估计加载序列化数据所需的内存量,此外还有数据大小本身。我们将在接下来的章节中更详细地讨论这个问题。

2.测试及结果

我决定使用一个合成数据集进行测试,以便更好地控制序列化的数据结构和属性。

另外,我在我的基准中使用了两种不同的方法:

(a) 将生成的分类变量保留为字符串。

(b) 在执行任何I/O之前将它们转换为 pandas.Categorical 数据类型。

函数generate_dataset显示了我在基准中是如何生成数据集的:

def generate_dataset(n_rows, num_count, cat_count, max_nan=0.1, max_cat_size=100):
    """
    随机生成具有数字和分类特征的数据集。
    
    数字特征取自正态分布X ~ N(0, 1)。
    分类特征则被生成为随机的uuid4字符串。
    
    此外,数字和分类特征的max_nan比例被替换为NaN值。
    """
    dataset, types = {}, {}
    
    def generate_categories():
        from uuid import uuid4
        category_size = np.random.randint(2, max_cat_size)
        return [str(uuid4()) for _ in range(category_size)]
    
    for col in range(num_count):
        name = f'n{col}'
        values = np.random.normal(0, 1, n_rows)
        nan_cnt = np.random.randint(1, int(max_nan*n_rows))
        index = np.random.choice(n_rows, nan_cnt, replace=False)
        values[index] = np.nan
        dataset[name] = values
        types[name] = 'float32'
        
    for col in range(cat_count):
        name = f'c{col}'
        cats = generate_categories()
        values = np.array(np.random.choice(cats, n_rows, replace=True), dtype=object)
        nan_cnt = np.random.randint(1, int(max_nan*n_rows))
        index = np.random.choice(n_rows, nan_cnt, replace=False)
        values[index] = np.nan
        dataset[name] = values
        types[name] = 'object'
    
    return pd.DataFrame(dataset), types

我们将CSV文件的保存和加载性能作为一个基准。

五个随机生成的具有一百万个观测值的数据集被转储到CSV中,并读回内存以获得平均指标。

每种二进制格式都针对20个随机生成的具有相同行数的数据集进行测试。

这些数据集包括15个数字特征和15个分类特征。你可以在这个资源库中找到带有基准测试功能和所需的完整源代码:

https://github.com/devforfu/pandas-formats-benchmark

或在Python实用宝典后台回复 Pandas IO对比 ,下载完整代码。

(a) 数据为字符串特征时的性能

下图显示了每种数据格式的平均I/O时间。一个有趣的观察是,hdf显示出比csv更慢的加载速度,而其他二进制格式的表现明显更好。其中最令人印象深刻的是feather和parquet。

在保存数据和从磁盘上读取数据时,内存开销如何?

下一张图片告诉我们,hdf 的表现就不是那么好了。可以肯定的是,csv在保存/加载纯文本字符串时不需要太多的额外内存,而Feather和parquet则相当接近:

最后,让我们看看文件的大小。这次parquet显示了一个令人印象深刻的结果,考虑到这种格式是为有效存储大量数据而开发的,这并不令人惊讶。

(b) 字符串特征转换为数字时的性能

在上一节中,我们没有尝试有效地存储我们的分类特征而是使用普通的字符串。让我们来弥补这个遗漏吧! 这一次我们使用一个专门的 pandas.Categorical 类型,转字符串特征为数字特征。

看看现在与纯文本的csv相比,它看起来如何!

现在所有的二进制格式都显示出它们的真正力量。Csv的基准结果已经远远落后了,所以让我们把它去掉,以便更清楚地看到各种二进制格式之间的差异:

Feather 和 Pickle 显示了最好的 I/O 速度,而 hdf 仍然显示了明显的性能开销。

现在是时候比较数据进程加载时的内存消耗了。下面的柱状图显示了我们之前提到的关于parquet格式的一个重要事实。

可以看到 parquet 读写时的内存空间差距有多大,你有可能你无法将比较大的 parquet 文件加载到内存中。

最后的图显示了各格式的文件大小。所有的格式都显示出良好的效果,除了hdf仍然需要比其他格式多得多的空间:

3.结论

正如我们的测试所显示的,似乎 feather 格式是存储Python会话数据的理想候选者。它显示了很快的I/O速度,在磁盘上不占用太多内存,并且在加载回RAM时不需要消耗太大的内存。

当然,这种比较并不意味着你应该在每个可能的情况下使用这种格式。例如,feather格式一般不会被用作长期文件存储的格式。

另外,某些特定情况下也无法使用 feather,这由你的整个程序架构决定。然而,就如本帖开头所述的目的,它在不被任何特殊事项限制的情况下是一个很好的选择。

本文译自 towardsdatascience
作者: Ilia Zaitsev
有部分修改。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python怎样存储变量性能最优?这篇文章告诉你答案

工作时我们经常会遇到需要临时保存结果变量的场景,尤其是一些数据处理、模型开发的场景,加载处理速度是个很漫长的过程,于是经常会把这些变量储存起来。

而储存变量最常见、最普遍的方法是用pickle,保存为pkl文件。但是如果从写入和读取的性能角度考虑,pkl可能真的不是最优选。

Pickle有其独特的好处,大部分变量不需要进行处理,都能直接存到pkl文件里,但这样的方便其实是牺牲了部分性能取得的。与之相比,numpy的.npy格式就比pickle性能上快不少。

当然,我们需要有证据支撑这个观点。所以今天我们就来做个实验,分别在Python2和Python3中对比 numpy 和 pickle 两种存储格式(.npy, .pkl) 对数据的存储和读取的性能对比。

部分内容参考分析自: https://applenob.github.io/python/save/

1. Python2中, npy与pkl的性能对比

首先初始化数据:

import numpy as np
import time
import cPickle as pkl
import os

all_batches = []
for i in range(20):
    a1 = np.random.normal(size=[25600, 40])
    label = np.random.normal(size=[25600, 1])
    all_batch = np.concatenate([a1, label], 1)
    all_batches.append(all_batch)
all_batches = np.array(all_batches)
print(all_batches.shape)
# (20, 25600, 41)

然后测试使用pickle保存和读取时间的耗时,以及整个文件的大小:

s_t1 = time.time()
pkl_name = "a.pkl"
with open(pkl_name, "wb") as f:
    pkl.dump(all_batches, f)
pkl_in_time = time.time() - s_t1
print("pkl dump costs {} sec".format(pkl_in_time))

s_t2 = time.time()
with open(pkl_name, "rb") as f:
    new_a = pkl.load(f)
pkl_out_time = time.time() - s_t2
print("pkl load costs {} sec".format(pkl_out_time))

pkl_size = os.path.getsize(pkl_name)
print("pkl file size: {} byte, {} mb".format(pkl_size, float(pkl_size)/(1024*1024)))

结果如下:

pkl dump costs 67.7483091354 sec
pkl load costs 52.1168899536 sec
pkl file size: 497437110 byte, 474.392995834 mb

然后再试一下npy的写入和读取:

s_t3 = time.time()
npy_name = "a.npy"
with open(npy_name, "wb") as f:
    np.save(f, arr=all_batches)
npy_in_time = time.time() - s_t3
print("npy save costs {} sec".format(npy_in_time))
s_t4 = time.time()
with open(npy_name, "rb") as f:
    new_a = np.load(f)
npy_out_time = time.time() - s_t4
print("npy load costs {} sec".format(npy_out_time))
npy_size = os.path.getsize(npy_name)
print("npy file size: {} byte, {} mb".format(npy_size, float(npy_size) / (1024 * 1024)))

结果如下:

npy save costs 20.718367815 sec
npy load costs 0.62314915657 sec
npy file size: 167936128 byte, 160.15637207 mb

结果发现,npy性能明显优于pkl格式。

通过多次测试发现,在Python2中,npy格式的性能优势全面碾压pkl,工程允许的情况下,在Python2中,我们应该在这二者中毫不犹豫地选择npy.

2.Python3中, npy与pkl的性能对比

Python2已经是过去式,重点还要看Python3.

在Python3中,与Python2的代码唯一一句不一样的是pickle的引入:

# Python2:
import cPickle as pkl

# Python3:
import pickle as pkl

其他代码基本一样,替换代码后,重新运行程序,让我们看看在Python3上,npy格式和pkl格式性能上的区别:

首先是pkl格式的表现:

ckenddeMacBook-Pro:Documents ckend$ python 1.py 
(20, 25600, 41)
pkl dump costs 24.32167887687683 sec
pkl load costs 4.480823040008545 sec
pkl file size: 167936163 byte, 160.15640544891357 mb

然后是npy格式的表现:

npy save costs 22.471696853637695 sec
npy load costs 0.3791017532348633 sec
npy file size: 167936080 byte, 160.1563262939453 mb

可以看到在Python3中pkl格式和npy格式的存储大小是基本相同的,在存储耗时上也相差无几。但是在读取数据的时候,npy相对于pkl还是有一定的优势的。

因此,如果你的程序非常注重读取效率,那么我觉得npy格式会比pkl格式更适合你。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化全方位实战教程

本文讲解了Pandas性能优化的几种方法,比如第一篇文章讲到了transform函数的应用、Cython编写C扩展、减少类型转换、使用特殊的数组。第二篇Pandas基础优化讲到了尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。第三篇进阶版优化讲到了一些更高级的优化技巧。

1.Pandas 性能优化 40 倍 – DataFrame

1. 1 性能优化小试牛刀

大名鼎鼎的Pandas是数据分析的神器。有时候我们需要对上千万甚至上亿的数据进行非常复杂处理,那么运行效率就是一个不能忽视的问题。

比如下面这个简单例子,我们随机生成100万条数据,对val这一列进行处理:如果是偶数则减1,奇数则加1。实际的数据分析工作要比这个例子复杂的多,但考虑到我们没有那么多时间等待运行结果,所以就偷个懒吧。可以看到transform函数的平均运行时间是284ms:

import pandas as pd
import numpy as np

def gen_data(size):
    d = dict()
    d["genre"] = np.random.choice(["A""B""C""D"], size=size)
    d["val"] = np.random.randint(low=0, high=100, size=size)
    return pd.DataFrame(d)

data = gen_data(1000000)
data.head()
def transform(data):
    data.loc[:, "new_val"] = data.val.apply(lambda x: x + 1 if x % 2 else x - 1)

%timeit -n 1 transform(data)
284 ms ± 8.95 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.2. 用Cython编写C扩展

试试用我们的老朋友Cython来写一下 x + 1 if x % 2 else x - 1 这个函数。平均运行时间降低到了202ms,果然速度变快了。性能大约提升了1.4倍,离40倍的flag还差的好远。

%load_ext cython
%%cython
cpdef int _transform(int x):
    if x % 2:
        return x + 1
    return x - 1

def transform(data):
    data.loc[:, "new_val"] = data.val.apply(_transform)

%timeit -n 1 transform(data)
202 ms ± 13.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.3. 减少类型转换

为了减少C和Python之间的类型转换,我们直接把val这一列作为Numpy数组传递给Cython函数,注意区分cnpnp。平均运行时间直接降到10.8毫秒,性能大约提升了26倍,仿佛看到了一丝希望。

%%cython
import numpy as np
cimport numpy as cnp
ctypedef cnp.int_t DTYPE_t

cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr):
    cdef:
        int i = 0
        int n = arr.shape[0]
        int x
        cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)

    while i < n:
        x = arr[i]
        if x % 2:
            new_arr[i] = x + 1
        else:
            new_arr[i] = x - 1
        i += 1
    return new_arr

def transform(data):
    data.loc[:, "new_val"] = _transform(data.val.values)

%timeit -n 1 transform(data)
10.8 ms ± 512 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.4. 使用不安全的数组

利用@cython.boundscheck(False)@cython.wraparound(False)装饰器关闭数组的边界检查和负下标处理,平均运行时间变为5.9毫秒。性能提升了42倍左右,顺利完成任务。

%%cython
import cython
import numpy as np
cimport numpy as cnp
ctypedef cnp.int_t DTYPE_t

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr):
    cdef:
        int i = 0
        int n = arr.shape[0]
        int x
        cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)

    while i < n:
        x = arr[i]
        if x % 2:
            new_arr[i] = x + 1
        else:
            new_arr[i] = x - 1
        i += 1
    return new_arr

def transform(data):
    data.loc[:, "new_val"] = _transform(data.val.values)

%timeit -n 1 transform(data)
6.76 ms ± 545 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

来源:Python中文社区

作者:李小文,先后从事过数据分析、数据挖掘工作,主要开发语言是Python,现任一家小型互联网公司的算法工程师。https://github.com/tushushu

2.Pandas性能优化:基础篇

Pandas 号称“数据挖掘瑞士军刀”,是数据处理最常用的库。在数据挖掘或者kaggle比赛中,我们经常使用pandas进行数据提取、分析、构造特征。而如果数据量很大,操作算法复杂,那么pandas的运行速度可能非常慢。本文根据实际工作中的经验,总结了一些pandas的使用技巧,帮助提高运行速度或减少内存占用。

2.1 按行迭代优化

很多时候,我们会按行对dataframe进行迭代,一般我们会用iterrows这个函数。在新版的pandas中,提供了一个更快的itertuples函数。

我们测试一下速度:

import pandas as pd
import numpy as np
import time
df = pd.DataFrame({'a': np.random.randn(1000),
                     'b': np.random.randn(1000),
                    'N': np.random.randint(100, 1000, (1000)),
                   'x':  np.random.randint(1, 10, (1000))})

%%timeit
a2=[]
for index,row in df.iterrows():
    temp=row['a']
    a2.append(temp*temp)
df['a2']=a2    

67.6 ms ± 3.69 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
a2=[]
for row in df.itertuples():
    temp=getattr(row, 'a')
    a2.append(temp*temp)
df['a2']=a2

1.54 ms ± 168 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看到直接循环,itertuples速度是iterrows的很多倍。

所以如果在必须要对dataframe进行遍历的话,直接用itertuples替换iterrows。

2.2 apply 优化

一般情况下,如果要对dataframe里的数据逐行处理,而不需要上下文信息,可以使用apply函数。
对于上面的例子,我们使用apply看下:

%%timeit
df['a2']=df.a.apply(lambda x: x*x)

360 µs ± 355 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看到,效率又有提升,apply的速度是itertuples的5倍左右。

注意一下,apply不光能对单个列值做处理,也能对多个列的值做处理。

%%timeit
df['a3']=df.apply( lambda row: row['a']*row['b'],axis=1)

15 ms ± 1.61 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)

这里看出来,多值处理的时候几乎等于iterrows。因此比单列值apply慢了许多,所以这里不推荐对整行进行apply。

我们可以简单的这样改写:

%%timeit
df['a3']=df['a']*df['b']

204 µs ± 8.31 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

如果在计算的时候,使用series.values 提取numpy 数组并使用numpy原生函数计算,效率可能更高

%%timeit
df['a3']=np.multiply(df['a'].values,df['b'].values)

93.3 µs ± 1.45 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

2.3 聚合agg效率优化

有的时候,我们会对一列数据按值进行分组(groupby),再分组后,依次对每一组数据中的其他列进行聚合(agg)。

还是上面的那个dataframe,我们看下:
采用自定义函数的agg函数:

%%timeit
df.groupby("x")['a'].agg(lambda x:x.sum())

1.27 ms ± 45.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

采用agg内置函数:

%%timeit
df.groupby("x")['a'].agg(sum)
#等价df.groupby("x")['a'].sum()

415 µs ± 20.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

刚才是单列聚合,我们看下多列聚合:
自定义函数:

%%timeit
df.groupby("x").agg({"a":lambda x:x.sum(),"b":lambda x:x.count()})

2.6 ms ± 8.17 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

采用内置函数:

%%timeit
df.groupby("x").agg({"a":"sum","b":"count"})

1.33 ms ± 29.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看出,对于多列聚合,内置函数仍然比自定义函数快1倍。所以再进行聚合操作时,尽量使用内置函数提高效率。

下面列出了一些内置函数:

内置函数描述
count计数
sum求和
mean平均值
median中位数
min,max最大值/最小值
std,var标准差/方差
prod求积

当我们需要统计的方法没有对于内置函数的情况下,在自定义函数的时候,优先选用pandas或numpy内置的其他高效函数,如

df.groupby("x")['a'].agg(lambda x:np.sum(np.abs(x)))

在数据量很大的时候要比非numpy函数效率高(数据量小的时候差不多)。

2.4 数据读取优化

如果我们需要用pandas读取较多次文件,或者读取的文件较大,那么这方面占用的时间也会较长,我们也需要对其进行优化。pandas最常见读取函数是read_csv函数,可以对csv文件进行读取。但是pandas read_csv对读取较大的、数据结构复杂的文件,效率不是很理想。

这里有几种方法可以优化:

1. 分块读取csv文件
如果文件过大,内存不够或者时间上等不及,可以分块进行读取。

chunksize = 10 ** 6for chunk in pd.read_csv(filename,  chunksize=chunksize):
     process(chunk)

这里也可以使用多进程方式,具体我们在后面进阶篇介绍。

  1. 过滤掉不需要的列

如果读取的文件列很多,可以使用usecols字段,只load需要的列,提高效率,节约内存。

df = pd.read_csv(filename,  usecols=["a","b","c"]):
  1. 为列指定类型
    panda在read_csv的时候,会自动匹配列的数值类型,这样会导致速度很慢,并且占用内存较大。我们可以为每个列指定类型。
df = pd.read_csv("f500.csv", dtype = {"revenues" : "float64","name":str})
  1. 保存为其他格式
    如果需要频繁的读取和写入,则可以将文件保存为其他格式的,如pickle或hdf。pickle和hdf读取速度是csv的数倍。这里注意一下,pickle比原csv略小,hdf比原csv略大。
 import pandas as pd
 #读取csv
 df = pd.read_csv('filename.csv')
 
 #pickle格式
 df.to_pickle('filename.pkl') 
 df = pd.read_pickle('filename.pkl') 
 
 #hdf格式
 df.to_hdf('filename.hdf','df') 
 df = pd.read_hdf('filename.hdf','df') 

file typetimespeed
csv1.93 s1
pickle415 m4.6
hdf808 ms2.3
  1. 用第三方的包读取
    如可以使用Dask DataFrame读取大文件。第三方包放到最后细讲。

2.5 优化数据处理逻辑

这点算是业务角度优化。如果我们能直接数据处理的步骤,那么处理时间就少了很多。

这需要具体问题具体分析,举个例子,假设我们有一个通讯记录数据集:

call_areacall_seconds
03364
23075
25847
12032

call_seconds 是拨打的时长,单位是秒。
call_area=1 是拨打国内电话,费率是0.1/min,call_area=0 是拨打国外电话,费率是0.7/min
call_area=2 是接听电话,不收费。

我们随机生成一批数据:

import pandas as pd
import numpy as np
import time
import math
row_number=10000
df = pd.DataFrame({'call_area':  np.random.randint(0, 3, (row_number)),
                   'call_seconds':  np.random.randint(0, 10000, (row_number))})

如果我们想计算每个电话的费用:

def get_cost(call_area,call_seconds):
    if call_area==1:
        rate=0.1
    elif call_area==0:
        rate=0.7
    else:
        return 0
    return math.ceil(call_seconds/60)*rate

方法1,采用按行迭代循环

%%timeit
cost_list = []
for i,row in df.iterrows():
    call_area=row['call_area']
    call_seconds=row['call_seconds']
    cost_list.append(get_cost(call_area,call_seconds))
df['cost'] = cost_list

方法2,采用apply行的方式

%%timeit
df['cost']=df.apply(
         lambda row: get_cost(
             row['call_area'],
             row['call_seconds']),
         axis=1)

方法3,采用mask+loc,分组计算

%%timeit
mask1=df.call_area==1
mask2=df.call_area==0
mask3=df.call_area==2
df['call_mins']=np.ceil(df['call_seconds']/60)
df.loc[mask1,'cost']=df.loc[mask1,'call_mins']*0.1
df.loc[mask2,'cost']=df.loc[mask2,'call_mins']*0.7
df.loc[mask3,'cost']=0

方法4,使用numpy的方式,可以使用index的方式找到对应的费用。

%%timeit
prices = np.array([0.7, 0.1, 0])
mins=np.ceil(df['call_seconds'].values/60)
df['cost']=prices[df.call_area]*mins

测试结果:

方法运行时间运行速度
iterrows513 ms1
apply181 ms2.8
loc6.44 ms79.6
numpy219 µs2342

方法4速度快是因为它采用了numpy向量化的数据处理方式。

总结

在优化尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。

3.Pandas性能优化:进阶篇

在这里介绍一些更高级的pandas优化方法。

3.1 numpy

我们先来回顾一下上节说过的一个例子

import pandas as pd
import numpy as np
import time
row_number=100000
df = pd.DataFrame({'a': np.random.randn(row_number),
                     'b': np.random.randn(row_number),
                    'N': np.random.randint(100, 1000, (row_number)),
                   'x':  np.random.randint(1, 10, (row_number))})

我们要计算a列与b列的乘积

方法1,采用apply

%timeit df.apply( lambda row: row['a']*row['b'],axis=1)

方法2,直接对series做乘法

%timeit df['a']*df['b']

方法3,使用numpy函数

%timeit  np.multiply(df['a'].values,df['b'].values)
方法运行时间运行速度
方法11.45s1
方法2254µs5708
方法341.2 µs3536

这提示我们,采用一些好的方法可以大幅度提高pandas的运行速度。

3.2 cython

我们还继续使用上面的dataframe,现在定义一个函数:

def f(x):
    return x * (x - 1)

def integrate_f(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f(a + i * dx)
    return s * dx

我们要计算每一行integrate_f的值,

方法1,还是apply:

%timeit df.apply(lambda x: integrate_f(x['a'], x['b'], x['N'].astype(int)), axis=1)

这个函数运行时间就较长了:

7.05 s ± 54.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

考虑可以用cython重写程序,提高效率。
在使用cython的时候,可能需要安装gcc环境或者mingw(windows)。

方法1,直接加头编译

%load_ext Cython
%%cython
def f_plain(x):
    return x * (x - 1)

def integrate_f_plain(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

6.46 s ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

看来直接加头,效率提升不大。

方法2,使用c type

%%cython
cdef double f_typed(double x) except? -2:
     return x * (x - 1)
cpdef double integrate_f_typed(double a, double b, int N):
    cdef int i
    cdef double s, dx
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_typed(a + i * dx)
    return s * dx

345 ms ± 529 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

可以看到,使用cython的特定编程方法,效率提升较大。

3.3 numba

numba是一个动态JIT编译器,在一些数值计算中可以大幅度提高运行速度。
我们学cython,在python程序上直接加numba jit的头。

import numba

@numba.jitdef f_plain(x):
    return x * (x - 1)

@numba.jitdef integrate_f_numba(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

@numba.jitdef apply_integrate_f_numba(col_a, col_b, col_N):
    n = len(col_N)
    result = np.empty(n, dtype='float64')
    assert len(col_a) == len(col_b) == n
    for i in range(n):
        result[i] = integrate_f_numba(col_a[i], col_b[i], col_N[i])
    return result

def compute_numba(df):
    result = apply_integrate_f_numba(df['a'].to_numpy(),
                                     df['b'].to_numpy(),
                                     df['N'].to_numpy())
    return pd.Series(result, index=df.index, name='result')

 %timeit compute_numba(df)

6.44 ms ± 440 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

我们看到,使用numba,需要做的代码改动较小,效率提升幅度却很大!

3.4 进阶 并行化处理

并行化读取数据

在基础篇讲分块读取时,简单提了一下并行化处理,这里详细说下代码。

第一种思路,分块读取,多进程处理。

import pandas as pd
from multiprocessing import Pool
 
def process(df):
    """
  数据处理
    """
    pass
 
# initialise the iterator object
iterator = pd.read_csv('train.csv', chunksize=200000, compression='gzip',
skipinitialspace=True, encoding='utf-8')
# depends on how many cores you want to utilise
max_processors = 4# Reserve 4 cores for our script
pool = Pool(processes=max_processors)
f_list = []
for df in iterator:
    # 异步处理每个分块
    f = pool.apply_async(process, [df])
    f_list.append(f)
    if len(f_list) >= max_processors:
        for f in f_list:
            f.get()
            del f_list[:]

第二种思路,把大文件拆分成多份,多进程读取。

利用linux中的split命令,将csv切分成p个文件。

!split -l 200000 -d train.csv train_split
#将文件train.csv按每200000行分割,前缀名为train_split,并设置文件命名为数字

代码部分

from multiprocessing import Pool
import pandas as pd
import os
 
def read_func(file_path):
    df = pd.read_csv(file_path, header=None)
    return df
 
def read_file():
    file_list=["train_split%02d"%i for i in range(66)]
    p = Pool(4)
    res = p.map(read_func, file_list)
    p.close()
    p.join()
    df = pd.concat(res, axis=0, ignore_index=True)
    return df
 
df = read_file()

并行化apply

apply的func如果在用了我们之前说的技术优化了速度之后仍然很慢,或者func遇到网络阻塞,那么我们需要去并行化执行apply。这里提供一种处理思路:

import multiprocessing as mp
import time
def slow_func(s):
    time.sleep(1)
    return "done"with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(slow_func, df['qid'])

3.5 进阶 第三方pandas库

由于padans的操作如apply,都是单线程的,直接调用效率不高。我可以使用第三方库进行并行操作。
当然第三方库会带来新的代码不兼容问题。我们有时候会考虑像上一章一样,手写并行化处理。这个权衡需要我们在编程之初就要规划好,避免后期因为bug需要重构。

dask库

pip install dask

类pandas库,可以并行读取、运行。

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import getand the syntax isdata = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def some_function(x,y,z):
    return x+y+z

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1))
.compute(get=get)  

swifter

pip install swifter

pandas的插件,可以直接在pandas上操作:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

Modin库

Modin后端使用dask或者ray,是个支持分布式运行的类pandas库,当然功能异常强大。具体请看官网,这里就不具体介绍了。

https://modin.readthedocs.io/en/latest/using_modin.html

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Redis适合做队列吗?详解三种Redis队列的使用方式

作者:Magic Kaito

来源:水滴与银弹

我经常听到很多人讨论,关于「把 Redis 当作队列来用是否合适」的问题。

有些人表示赞成,他们认为 Redis 很轻量,用作队列很方便。

也些人则反对,认为 Redis 会「丢」数据,最好还是用「专业」的队列中间件更稳妥。

究竟哪种方案更好呢?

这篇文章,我就和你聊一聊把 Redis 当作队列,究竟是否合适这个问题。

我会从简单到复杂,一步步带你梳理其中的细节,把这个问题真正的讲清楚。

看完这篇文章后,我希望你对这个问题你会有全新的认识。

在文章的最后,我还会告诉你关于「技术选型」的思路,文章有点长,希望你可以耐心读完。

从最简单的开始:List 队列

首先,我们先从最简单的场景开始讲起。

如果你的业务需求足够简单,想把 Redis 当作队列来使用,肯定最先想到的就是使用 List 这个数据类型。

因为 List 底层的实现就是一个「链表」,在头部和尾部操作元素,时间复杂度都是 O(1),这意味着它非常符合消息队列的模型。

如果把 List 当作队列,你可以这么来用。

生产者使用 LPUSH 发布消息:

127.0.0.1:6379> LPUSH queue msg1
(integer) 1
127.0.0.1:6379> LPUSH queue msg2
(integer) 2

消费者这一侧,使用 RPOP 拉取消息:

127.0.0.1:6379> RPOP queue
"msg1"
127.0.0.1:6379> RPOP queue
"msg2"

这个模型非常简单,也很容易理解。

但这里有个小问题,当队列中已经没有消息了,消费者在执行 RPOP 时,会返回 NULL。

127.0.0.1:6379> RPOP queue
(nil)   // 没消息了

而我们在编写消费者逻辑时,一般是一个「死循环」,这个逻辑需要不断地从队列中拉取消息进行处理,伪代码一般会这么写:

while true:
    msg = redis.rpop("queue")
    // 没有消息,继续循环
    if msg == null:
        continue
    // 处理消息
    handle(msg)

如果此时队列为空,那消费者依旧会频繁拉取消息,这会造成「CPU 空转」,不仅浪费 CPU 资源,还会对 Redis 造成压力。

怎么解决这个问题呢?

也很简单,当队列为空时,我们可以「休眠」一会,再去尝试拉取消息。代码可以修改成这样:

while true:
    msg = redis.rpop("queue")
    // 没有消息,休眠2s
    if msg == null:
        sleep(2)
        continue
    // 处理消息        
    handle(msg)

这就解决了 CPU 空转问题。

这个问题虽然解决了,但又带来另外一个问题:当消费者在休眠等待时,有新消息来了,那消费者处理新消息就会存在「延迟」。

假设设置的休眠时间是 2s,那新消息最多存在 2s 的延迟。

要想缩短这个延迟,只能减小休眠的时间。但休眠时间越小,又有可能引发 CPU 空转问题。

鱼和熊掌不可兼得。

那如何做,既能及时处理新消息,还能避免 CPU 空转呢?

Redis 是否存在这样一种机制:如果队列为空,消费者在拉取消息时就「阻塞等待」,一旦有新消息过来,就通知我的消费者立即处理新消息呢?

幸运的是,Redis 确实提供了「阻塞式」拉取消息的命令:BRPOP / BLPOP,这里的 B 指的是阻塞(Block)。

现在,你可以这样来拉取消息了:

while true:
    // 没消息阻塞等待,0表示不设置超时时间
    msg = redis.brpop("queue"0)
    if msg == null:
        continue
    // 处理消息
    handle(msg)

使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个「超时时间」,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 NULL。

这个方案不错,既兼顾了效率,还避免了 CPU 空转问题,一举两得。

注意:如果设置的超时时间太长,这个连接太久没有活跃过,可能会被 Redis Server 判定为无效连接,之后 Redis Server 会强制把这个客户端踢下线。所以,采用这种方案,客户端要有重连机制。

解决了消息处理不及时的问题,你可以再思考一下,这种队列模型,有什么缺点?

我们一起来分析一下:

  1. 不支持重复消费:消费者拉取消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费,即不支持多个消费者消费同一批数据
  2. 消息丢失:消费者拉取到消息后,如果发生异常宕机,那这条消息就丢失了

第一个问题是功能上的,使用 List 做消息队列,它仅仅支持最简单的,一组生产者对应一组消费者,不能满足多组生产者和消费者的业务场景。

第二个问题就比较棘手了,因为从 List 中 POP 一条消息出来后,这条消息就会立即从链表中删除了。也就是说,无论消费者是否处理成功,这条消息都没办法再次消费了。

这也意味着,如果消费者在处理消息时异常宕机,那这条消息就相当于丢失了。

针对这 2 个问题怎么解决呢?我们一个个来看。

发布/订阅模型:Pub/Sub

从名字就能看出来,这个模块是 Redis 专门是针对「发布/订阅」这种队列模型设计的。

它正好可以解决前面提到的第一个问题:重复消费。

即多组生产者、消费者的场景,我们来看它是如何做的。

Redis 提供了 PUBLISH / SUBSCRIBE 命令,来完成发布、订阅的操作。

假设你想开启 2 个消费者,同时消费同一批数据,就可以按照以下方式来实现。

首先,使用 SUBSCRIBE 命令,启动 2 个消费者,并「订阅」同一个队列。

// 2个消费者 都订阅一个队列
127.0.0.1:6379> SUBSCRIBE queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1

此时,2 个消费者都会被阻塞住,等待新消息的到来。

之后,再启动一个生产者,发布一条消息。

127.0.0.1:6379> PUBLISH queue msg1
(integer) 1

这时,2 个消费者就会解除阻塞,收到生产者发来的新消息。

127.0.0.1:6379> SUBSCRIBE queue
// 收到新消息
1) "message"
2) "queue"
3) "msg1"

看到了么,使用 Pub/Sub 这种方案,既支持阻塞式拉取消息,还很好地满足了多组消费者,消费同一批数据的业务需求。

除此之外,Pub/Sub 还提供了「匹配订阅」模式,允许消费者根据一定规则,订阅「多个」自己感兴趣的队列。

// 订阅符合规则的队列
127.0.0.1:6379> PSUBSCRIBE queue.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "queue.*"
3) (integer) 1

这里的消费者,订阅了 queue.* 相关的队列消息。

之后,生产者分别向 queue.p1 和 queue.p2 发布消息。

127.0.0.1:6379> PUBLISH queue.p1 msg1
(integer) 1
127.0.0.1:6379> PUBLISH queue.p2 msg2
(integer) 1

这时再看消费者,它就可以接收到这 2 个生产者的消息了。

127.0.0.1:6379> PSUBSCRIBE queue.*
Reading messages... (press Ctrl-C to quit)
...
// 来自queue.p1的消息
1) "pmessage"
2) "queue.*"
3) "queue.p1"
4) "msg1"

// 来自queue.p2的消息
1) "pmessage"
2) "queue.*"
3) "queue.p2"
4) "msg2"

我们可以看到,Pub/Sub 最大的优势就是,支持多组生产者、消费者处理消息。

讲完了它的优点,那它有什么缺点呢?

其实,Pub/Sub 最大问题是:丢数据

如果发生以下场景,就有可能导致数据丢失:

  1. 消费者下线
  2. Redis 宕机
  3. 消息堆积

究竟是怎么回事?

这其实与 Pub/Sub 的实现方式有很大关系。

Pub/Sub 在实现时非常简单,它没有基于任何数据类型,也没有做任何的数据存储,它只是单纯地为生产者、消费者建立「数据转发通道」,把符合规则的数据,从一端转发到另一端。

一个完整的发布、订阅消息处理流程是这样的:

  1. 消费者订阅指定队列,Redis 就会记录一个映射关系:队列->消费者
  2. 生产者向这个队列发布消息,那 Redis 就从映射关系中找出对应的消费者,把消息转发给它

看到了么,整个过程中,没有任何的数据存储,一切都是实时转发的。

这种设计方案,就导致了上面提到的那些问题。

例如,如果一个消费者异常挂掉了,它再重新上线后,只能接收新的消息,在下线期间生产者发布的消息,因为找不到消费者,都会被丢弃掉。

如果所有消费者都下线了,那生产者发布的消息,因为找不到任何一个消费者,也会全部「丢弃」。

所以,当你在使用 Pub/Sub 时,一定要注意:消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失。

这也是前面讲例子时,我们让消费者先订阅队列,之后才让生产者发布消息的原因。

另外,因为 Pub/Sub 没有基于任何数据类型实现,所以它也不具备「数据持久化」的能力。

也就是说,Pub/Sub 的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,Pub/Sub 的数据也会全部丢失。

最后,我们来看 Pub/Sub 在处理「消息积压」时,为什么也会丢数据?

当消费者的速度,跟不上生产者时,就会导致数据积压的情况发生。

如果采用 List 当作队列,消息积压时,会导致这个链表很长,最直接的影响就是,Redis 内存会持续增长,直到消费者把所有数据都从链表中取出。

但 Pub/Sub 的处理方式却不一样,当消息积压时,有可能会导致消费失败和消息丢失

这是怎么回事?

还是回到 Pub/Sub 的实现细节上来说。

每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个「缓冲区」,这个缓冲区其实就是一块内存。

当生产者发布消息时,Redis 先把消息写到对应消费者的缓冲区中。

之后,消费者不断地从缓冲区读取消息,处理消息。

但是,问题就出在这个缓冲区上。

因为这个缓冲区其实是有「上限」的(可配置),如果消费者拉取消息很慢,就会造成生产者发布到缓冲区的消息开始积压,缓冲区内存持续增长。

如果超过了缓冲区配置的上限,此时,Redis 就会「强制」把这个消费者踢下线。

这时消费者就会消费失败,也会丢失数据。

如果你有看过 Redis 的配置文件,可以看到这个缓冲区的默认配置:client-output-buffer-limit pubsub 32mb 8mb 60。

它的参数含义如下:

  • 32mb:缓冲区一旦超过 32MB,Redis 直接强制把消费者踢下线
  • 8mb + 60:缓冲区超过 8MB,并且持续 60 秒,Redis 也会把消费者踢下线

Pub/Sub 的这一点特点,是与 List 作队列差异比较大的。

从这里你应该可以看出,List 其实是属于「拉」模型,而 Pub/Sub 其实属于「推」模型

List 中的数据可以一直积压在内存中,消费者什么时候来「拉」都可以。

但 Pub/Sub 是把消息先「推」到消费者在 Redis Server 上的缓冲区中,然后等消费者再来取。

当生产、消费速度不匹配时,就会导致缓冲区的内存开始膨胀,Redis 为了控制缓冲区的上限,所以就有了上面讲到的,强制把消费者踢下线的机制。

好了,现在我们总结一下 Pub/Sub 的优缺点:

  1. 支持发布 / 订阅,支持多组生产者、消费者处理消息
  2. 消费者下线,数据会丢失
  3. 不支持数据持久化,Redis 宕机,数据也会丢失
  4. 消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失

有没有发现,除了第一个是优点之外,剩下的都是缺点。

所以,很多人看到 Pub/Sub 的特点后,觉得这个功能很「鸡肋」。

也正是以上原因,Pub/Sub 在实际的应用场景中用得并不多。

目前只有哨兵集群和 Redis 实例通信时,采用了 Pub/Sub 的方案,因为哨兵正好符合即时通讯的业务场景。

我们再来看一下,Pub/Sub 有没有解决,消息处理时异常宕机,无法再次消费的问题呢?

其实也不行,Pub/Sub 从缓冲区取走数据之后,数据就从 Redis 缓冲区删除了,消费者发生异常,自然也无法再次重新消费。

好,现在我们重新梳理一下,我们在使用消息队列时的需求。

当我们在使用一个消息队列时,希望它的功能如下:

  • 支持阻塞等待拉取消息
  • 支持发布 / 订阅模式
  • 消费失败,可重新消费,消息不丢失
  • 实例宕机,消息不丢失,数据可持久化
  • 消息可堆积

Redis 除了 List 和 Pub/Sub 之外,还有符合这些要求的数据类型吗?

其实,Redis 的作者也看到了以上这些问题,也一直在朝着这些方向努力着。

Redis 作者在开发 Redis 期间,还另外开发了一个开源项目 disque。

这个项目的定位,就是一个基于内存的分布式消息队列中间件。

但由于种种原因,这个项目一直不温不火。

终于,在 Redis 5.0 版本,作者把 disque 功能移植到了 Redis 中,并给它定义了一个新的数据类型:Stream

下面我们就来看看,它能符合上面提到的这些要求吗?

趋于成熟的队列:Stream

我们来看 Stream 是如何解决上面这些问题的。

我们依旧从简单到复杂,依次来看 Stream 在做消息队列时,是如何处理的?

首先,Stream 通过 XADD 和 XREAD 完成最简单的生产、消费模型:

  • XADD:发布消息
  • XREAD:读取消息

生产者发布 2 条消息:

// *表示让Redis自动生成消息ID
127.0.0.1:6379> XADD queue * name zhangsan
"1618469123380-0"
127.0.0.1:6379> XADD queue * name lisi
"1618469127777-0"

使用 XADD 命令发布消息,其中的「*」表示让 Redis 自动生成唯一的消息 ID。

这个消息 ID 的格式是「时间戳-自增序号」。

消费者拉取消息:

// 从开头读取5条消息,0-0表示从开头读取
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0
1) 1) "queue"
   2) 1) 1) "1618469123380-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618469127777-0"
         2) 1) "name"
            2) "lisi"

如果想继续拉取消息,需要传入上一条消息的 ID:

127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0
(nil)

没有消息,Redis 会返回 NULL。

以上就是 Stream 最简单的生产、消费。

这里不再重点介绍 Stream 命令的各种参数,我在例子中演示时,凡是大写的单词都是「固定」参数,凡是小写的单词,都是可以自己定义的,例如队列名、消息长度等等,下面的例子规则也是一样,为了方便你理解,这里有必要提醒一下。

下面我们来看,针对前面提到的消息队列要求,Stream 都是如何解决的?

1) Stream 是否支持「阻塞式」拉取消息?

可以的,在读取消息时,只需要增加 BLOCK 参数即可。

// BLOCK 0 表示阻塞等待,不设置超时时间
127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0

这时,消费者就会阻塞等待,直到生产者发布新的消息才会返回。

2) Stream 是否支持发布 / 订阅模式?

也没问题,Stream 通过以下命令完成发布订阅:

  • XGROUP:创建消费者组
  • XREADGROUP:在指定消费组下,开启消费者拉取消息

下面我们来看具体如何做?

首先,生产者依旧发布 2 条消息:

127.0.0.1:6379> XADD queue * name zhangsan
"1618470740565-0"
127.0.0.1:6379> XADD queue * name lisi
"1618470743793-0"

之后,我们想要开启 2 组消费者处理同一批数据,就需要创建 2 个消费者组:

// 创建消费者组1,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE queue group1 0-0
OK
// 创建消费者组2,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE queue group2 0-0
OK

消费者组创建好之后,我们可以给每个「消费者组」下面挂一个「消费者」,让它们分别处理同一批数据。

第一个消费组开始消费:

// group1的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
   2) 1) 1) "1618470740565-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618470743793-0"
         2) 1) "name"
            2) "lisi"

同样地,第二个消费组开始消费:

// group2的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
   2) 1) 1) "1618470740565-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618470743793-0"
         2) 1) "name"
            2) "lisi"

我们可以看到,这 2 组消费者,都可以获取同一批数据进行处理了。

这样一来,就达到了多组消费者「订阅」消费的目的。

3) 消息处理时异常,Stream 能否保证消息不丢失,重新消费?

除了上面拉取消息时用到了消息 ID,这里为了保证重新消费,也要用到这个消息 ID。

当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为「处理完成」。

// group1下的 1618472043089-0 消息已处理完成
127.0.0.1:6379> XACK queue group1 1618472043089-0

如果消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息。

待这组消费者重新上线后,Redis 就会把之前没有处理成功的数据,重新发给这个消费者。这样一来,即使消费者异常,也不会丢失数据了。

// 消费者重新上线,0-0表示重新拉取未ACK的消息
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS queue 0-0
// 之前没消费成功的数据,依旧可以重新消费
1) 1) "queue"
   2) 1) 1) "1618472043089-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618472045158-0"
         2) 1) "name"
            2) "lisi"

4) Stream 数据会写入到 RDB 和 AOF 做持久化吗?

Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。

我们只需要配置好持久化策略,这样的话,就算 Redis 宕机重启,Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。

5) 消息堆积时,Stream 是怎么处理的?

其实,当消息队列发生消息堆积时,一般只有 2 个解决方案:

  1. 生产者限流:避免消费者处理不及时,导致持续积压
  2. 丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息

而 Redis 在实现 Stream 时,采用了第 2 个方案。

在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。

// 队列长度最大10000
127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan
"1618473015018-0"

当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。

这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。

除了以上介绍到的命令,Stream 还支持查看消息长度(XLEN)、查看消费者状态(XINFO)等命令,使用也比较简单,你可以查询官方文档了解一下,这里就不过多介绍了。

好了,通过以上介绍,我们可以看到,Redis 的 Stream 几乎覆盖到了消息队列的各种场景,是不是觉得很完美?

既然它的功能这么强大,这是不是意味着,Redis 真的可以作为专业的消息队列中间件来使用呢?

但是还「差一点」,就算 Redis 能做到以上这些,也只是「趋近于」专业的消息队列。

原因在于 Redis 本身的一些问题,如果把其定位成消息队列,还是有些欠缺的。

到这里,就不得不把 Redis 与专业的队列中间件做对比了。

下面我们就来看一下,Redis 在作队列时,到底还有哪些欠缺?

与专业的消息队列对比

其实,一个专业的消息队列,必须要做到两大块:

  1. 消息不丢
  2. 消息可堆积

前面我们讨论的重点,很大篇幅围绕的是第一点展开的。

这里我们换个角度,从一个消息队列的「使用模型」来分析一下,怎么做,才能保证数据不丢?

使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者

消息是否会发生丢失,其重点也就在于以下 3 个环节:

  1. 生产者会不会丢消息?
  2. 消费者会不会丢消息?
  3. 队列中间件会不会丢消息?

1) 生产者会不会丢消息?

当生产者在发布消息时,可能发生以下异常情况:

  1. 消息没发出去:网络故障或其它问题导致发布失败,中间件直接返回失败
  2. 不确定是否发布成功:网络问题导致发布超时,可能数据已发送成功,但读取响应结果超时了

如果是情况 1,消息根本没发出去,那么重新发一次就好了。

如果是情况 2,生产者没办法知道消息到底有没有发成功?所以,为了避免消息丢失,它也只能继续重试,直到发布成功为止。

生产者一般会设定一个最大重试次数,超过上限依旧失败,需要记录日志报警处理。

也就是说,生产者为了避免消息丢失,只能采用失败重试的方式来处理。

但发现没有?这也意味着消息可能会重复发送。

是的,在使用消息队列时,要保证消息不丢,宁可重发,也不能丢弃。

那消费者这边,就需要多做一些逻辑了。

对于敏感业务,当消费者收到重复数据数据时,要设计幂等逻辑,保证业务的正确性。

从这个角度来看,生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。

所以,无论是 Redis 还是专业的队列中间件,生产者在这一点上都是可以保证消息不丢的。

2) 消费者会不会丢消息?

这种情况就是我们前面提到的,消费者拿到消息后,还没处理完成,就异常宕机了,那消费者还能否重新消费失败的消息?

要解决这个问题,消费者在处理完消息后,必须「告知」队列中间件,队列中间件才会把标记已处理,否则仍旧把这些数据发给消费者。

这种方案需要消费者和中间件互相配合,才能保证消费者这一侧的消息不丢。

无论是 Redis 的 Stream,还是专业的队列中间件,例如 RabbitMQ、Kafka,其实都是这么做的。

所以,从这个角度来看,Redis 也是合格的。

3) 队列中间件会不会丢消息?

前面 2 个问题都比较好处理,只要客户端和服务端配合好,就能保证生产端、消费端都不丢消息。

但是,如果队列中间件本身就不可靠呢?

毕竟生产者和消费这都依赖它,如果它不可靠,那么生产者和消费者无论怎么做,都无法保证数据不丢。

在这个方面,Redis 其实没有达到要求。

Redis 在以下 2 个场景下,都会导致数据丢失。

  1. AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能
  2. 主从复制也是异步的,主从切换时,也存在丢失数据的可能(从库还未同步完成主库发来的数据,就被提成主库)

基于以上原因我们可以看到,Redis 本身的无法保证严格的数据完整性

所以,如果把 Redis 当做消息队列,在这方面是有可能导致数据丢失的。

再来看那些专业的消息队列中间件是如何解决这个问题的?

像 RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时,一般是部署一个集群,生产者在发布消息时,队列中间件通常会写「多个节点」,以此保证消息的完整性。这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。

也正因为如此,RabbitMQ、Kafka在设计时也更复杂。毕竟,它们是专门针对队列场景设计的。

但 Redis 的定位则不同,它的定位更多是当作缓存来用,它们两者在这个方面肯定是存在差异的。

最后,我们来看消息积压怎么办?

4) 消息积压怎么办?

因为 Redis 的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。

所以,Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。

但 Kafka、RabbitMQ 这类消息队列就不一样了,它们的数据都会存储在磁盘上,磁盘的成本要比内存小得多,当消息积压时,无非就是多占用一些磁盘空间,相比于内存,在面对积压时也会更加「坦然」。

综上,我们可以看到,把 Redis 当作队列来使用时,始终面临的 2 个问题:

  1. Redis 本身可能会丢数据
  2. 面对消息积压,Redis 内存资源紧张

到这里,Redis 是否可以用作队列,我想这个答案你应该会比较清晰了。

如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。

而且,Redis 相比于 Kafka、RabbitMQ,部署和运维也更加轻量。

如果你的业务场景对于数据丢失非常敏感,而且写入量非常大,消息积压时会占用很多的机器资源,那么我建议你使用专业的消息队列中间件。

总结

好了,总结一下。这篇文章我们从「Redis 能否用作队列」这个角度出发,介绍了 List、Pub/Sub、Stream 在做队列的使用方式,以及它们各自的优劣。

之后又把 Redis 和专业的消息队列中间件做对比,发现 Redis 的不足之处。

最后,我们得出 Redis 做队列的合适场景。

这里我也列了一个表格,总结了它们各自的优缺点。

后记

最后,我想和你再聊一聊关于「技术方案选型」的问题。

你应该也看到了,这篇文章虽然始于 Redis,但并不止于 Redis。

我们在分析 Redis 细节时,一直在提出问题,然后寻找更好的解决方案,在文章最后,又聊到一个专业的消息队列应该怎么做。

其实,我们在讨论技术选型时,就是一个关于如何取舍的问题。

而这里我想传达给你的信息是,在面对技术选型时,不要不经过思考就觉得哪个方案好,哪个方案不好

你需要根据具体场景具体分析,这里我把这个分析过程分为 2 个层面:

  1. 业务功能角度
  2. 技术资源角度

这篇文章所讲到的内容,都是以业务功能角度出发做决策的。

但这里的第二点,从技术资源角度出发,其实也很重要。

技术资源的角度是说,你所处的公司环境、技术资源能否匹配这些技术方案

这个怎么解释呢?

简单来讲,就是你所在的公司、团队,是否有匹配的资源能 hold 住这些技术方案。

我们都知道 Kafka、RabbitMQ 是非常专业的消息中间件,但它们的部署和运维,相比于 Redis 来说,也会更复杂一些。

如果你在一个大公司,公司本身就有优秀的运维团队,那么使用这些中间件肯定没问题,因为有足够优秀的人能 hold 住这些中间件,公司也会投入人力和时间在这个方向上。

但如果你是在一个初创公司,业务正处在快速发展期,暂时没有能 hold 住这些中间件的团队和人,如果贸然使用这些组件,当发生故障时,排查问题也会变得很困难,甚至会阻碍业务的发展。

而这种情形下,如果公司的技术人员对于 Redis 都很熟,综合评估来看,Redis 也基本可以满足业务 90% 的需求,那当下选择 Redis 未必不是一个好的决策。

所以,做技术选型不只是技术问题,还与人、团队、管理、组织结构有关

也正是因为这些原因,当你在和别人讨论技术选型问题时,你会发现每个公司的做法都不相同。

毕竟每个公司所处的环境和文化不一样,做出的决策当然就会各有差异。

如果你不了解这其中的逻辑,那在做技术选型时,只会趋于表面现象,无法深入到问题根源。

而一旦你理解了这个逻辑,那么你在看待这个问题时,不仅对于技术会有更加深刻认识,对技术资源和人的把握,也会更加清晰。

希望你以后在做技术选型时,能够把这些因素也考虑在内,这对你的技术成长之路也是非常有帮助的。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

数据分析实战教程|Pandas处理数据太慢,来试试Polars吧!

很多人在学习数据分析的时候,肯定都会用到Pandas这个库,非常的实用!

从创建数据到读取各种格式的文件(text、csv、json),或者对数据进行切片和分割组合多个数据源,Pandas都能够很好的满足。

Pandas最初发布于2008年,使用Python、Cython和C编写的。是一个超级强大、快速易于使用的Python库,用于数据分析和处理。

当然Pandas也是有不足之处的,比如不具备多处理器,处理较大的数据集速度很慢。

今天,小F就给大家介绍一个新兴的Python库——Polars。

使用语法和Pandas差不多,处理数据的速度却比Pandas快了不少。

一个是大熊猫,一个是北极熊~

GitHub地址:https://github.com/ritchie46/polars

使用文档:https://ritchie46.github.io/polars-book/

Polars是通过Rust编写的一个库,Polars的内存模型是基于Apache Arrow。

Polars存在两种API,一种是Eager API,另一种则是Lazy API。

其中Eager API和Pandas的使用类似,语法差不太多,立即执行就能产生结果。

而Lazy API就像Spark,首先将查询转换为逻辑计划,然后对计划进行重组优化,以减少执行时间和内存使用。

安装Polars,使用百度pip源。

# 安装polars
pip install polars -i https://mirror.baidu.com/pypi/simple/

安装成功后,开始测试,比较Pandas和Polars处理数据的情况。

使用某网站注册用户的用户名数据进行分析,包含约2600万个用户名的CSV文件。

import pandas as pd

df = pd.read_csv(‘users.csv’)
print(df)

数据情况如下。

此外还使用了一个自己创建的CSV文件,用以数据整合测试。

import pandas as pd

df = pd.read_csv(‘fake_user.csv’)
print(df)

得到结果如下。

首先比较一下两个库的排序算法耗时。

import timeit
import pandas as pd

start = timeit.default_timer()

df = pd.read_csv(‘users.csv’)
df.sort_values(‘n’, ascending=False)
stop = timeit.default_timer()

print(‘Time: ‘, stop – start)

————————-
Time:  27.555776743218303

可以看到使用Pandas对数据进行排序,花费了大约28s。

import timeit
import polars as pl

start = timeit.default_timer()

df = pl.read_csv(‘users.csv’)
df.sort(by_column=‘n’, reverse=True)
stop = timeit.default_timer()

print(‘Time: ‘, stop – start)

———————–
Time:  9.924110282212496

Polars只花费了约10s,这意味着Polars比Pandas快了2.7倍。

下面,我们来试试数据整合的效果,纵向连接。

import timeit
import pandas as pd

start = timeit.default_timer()

df_users = pd.read_csv(‘users.csv’)
df_fake = pd.read_csv(‘fake_user.csv’)
df_users.append(df_fake, ignore_index=True)
stop = timeit.default_timer()

print(‘Time: ‘, stop – start)

————————
Time:  15.556222308427095

使用Pandas耗时15s。

import timeit
import polars as pl

start = timeit.default_timer()

df_users = pl.read_csv(‘users.csv’)
df_fake = pl.read_csv(‘fake_user.csv’)
df_users.vstack(df_fake)
stop = timeit.default_timer()

print(‘Time: ‘, stop – start)

———————–
Time:  3.475433263927698

Polars居然最使用了约3.5s,这里Polars比Pandas快了4.5倍。

通过上面的比较,Polars在处理速度上表现得相当不错。

可以是大家在未来处理数据时,另一种选择~

当然,Pandas目前历时12年,已经形成了很成熟的生态,支持很多其它的数据分析库。

Polars则是一个较新的库,不足的地方还有很多。

如果你的数据集对于Pandas来说太大,对于Spark来说太小,那么Polars便是你可以考虑的一个选择。

本文转载自公众号【法纳斯特】

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 优化提速的 8 个小技巧

 

   作者:张皓

   来源:https://zhuanlan.zhihu.com/p/143052860

Python 是一种脚本语言,相比 C/C++ 这样的编译语言,在效率和性能方面存在一些不足。但是,有很多时候,Python 的效率并没有想象中的那么夸张。本文对一些 Python 代码加速运行的技巧进行整理。

0. 代码优化原则

本文会介绍不少的 Python 代码加速运行的技巧。在深入代码优化细节之前,需要了解一些代码优化基本原则。

第一个基本原则是不要过早优化。很多人一开始写代码就奔着性能优化的目标,“让正确的程序更快要比让快速的程序正确容易得多”。因此,优化的前提是代码能正常工作。过早地进行优化可能会忽视对总体性能指标的把握,在得到全局结果前不要主次颠倒。

第二个基本原则是权衡优化的代价。优化是有代价的,想解决所有性能的问题是几乎不可能的。通常面临的选择是时间换空间或空间换时间。另外,开发代价也需要考虑。

第三个原则是不要优化那些无关紧要的部分。如果对代码的每一部分都去优化,这些修改会使代码难以阅读和理解。如果你的代码运行速度很慢,首先要找到代码运行慢的位置,通常是内部循环,专注于运行慢的地方进行优化。在其他地方,一点时间上的损失没有什么影响。

1. 避免全局变量

# 不推荐写法。代码耗时:26.8秒
import math

size = 10000
for x in range(size):
    for y in range(size):
        z = math.sqrt(x) + math.sqrt(y)

许多程序员刚开始会用 Python 语言写一些简单的脚本,当编写脚本时,通常习惯了直接将其写为全局变量,例如上面的代码。但是,由于全局变量和局部变量实现方式不同,定义在全局范围内的代码运行速度会比定义在函数中的慢不少。通过将脚本语句放入到函数中,通常可带来 15% – 30% 的速度提升。

# 推荐写法。代码耗时:20.6秒
import math

def main():  # 定义到函数中,以减少全部变量使用
    size = 10000
    for x in range(size):
        for y in range(size):
            z = math.sqrt(x) + math.sqrt(y)

main()

2. 避免.

2.1 避免模块和函数属性访问

# 不推荐写法。代码耗时:14.5秒
import math

def computeSqrt(size: int):
    result = []
    for i in range(size):
        result.append(math.sqrt(i))
    return result

def main():
    size = 10000
    for _ in range(size):
        result = computeSqrt(size)

main()

每次使用.(属性访问操作符时)会触发特定的方法,如__getattribute__()__getattr__(),这些方法会进行字典操作,因此会带来额外的时间开销。通过from import语句,可以消除属性访问。

# 第一次优化写法。代码耗时:10.9秒
from math import sqrt

def computeSqrt(size: int):
    result = []
    for i in range(size):
        result.append(sqrt(i))  # 避免math.sqrt的使用
    return result

def main():
    size = 10000
    for _ in range(size):
        result = computeSqrt(size)

main()

在第 1 节中我们讲到,局部变量的查找会比全局变量更快,因此对于频繁访问的变量sqrt,通过将其改为局部变量可以加速运行。

# 第二次优化写法。代码耗时:9.9秒
import math

def computeSqrt(size: int):
    result = []
    sqrt = math.sqrt  # 赋值给局部变量
    for i in range(size):
        result.append(sqrt(i))  # 避免math.sqrt的使用
    return result

def main():
    size = 10000
    for _ in range(size):
        result = computeSqrt(size)

main()

除了math.sqrt外,computeSqrt函数中还有.的存在,那就是调用listappend方法。通过将该方法赋值给一个局部变量,可以彻底消除computeSqrt函数中for循环内部的.使用。

# 推荐写法。代码耗时:7.9秒
import math

def computeSqrt(size: int):
    result = []
    append = result.append
    sqrt = math.sqrt    # 赋值给局部变量
    for i in range(size):
        append(sqrt(i))  # 避免 result.append 和 math.sqrt 的使用
    return result

def main():
    size = 10000
    for _ in range(size):
        result = computeSqrt(size)

main()

2.2 避免类内属性访问

# 不推荐写法。代码耗时:10.4秒
import math
from typing import List

class DemoClass:
    def __init__(self, value: int):
        self._value = value
    
    def computeSqrt(self, size: int) -> List[float]:
        result = []
        append = result.append
        sqrt = math.sqrt
        for _ in range(size):
            append(sqrt(self._value))
        return result

def main():
    size = 10000
    for _ in range(size):
        demo_instance = DemoClass(size)
        result = demo_instance.computeSqrt(size)

main()

避免.的原则也适用于类内属性,访问self._value的速度会比访问一个局部变量更慢一些。通过将需要频繁访问的类内属性赋值给一个局部变量,可以提升代码运行速度。

# 推荐写法。代码耗时:8.0秒
import math
from typing import List

class DemoClass:
    def __init__(self, value: int):
        self._value = value
    
    def computeSqrt(self, size: int) -> List[float]:
        result = []
        append = result.append
        sqrt = math.sqrt
        value = self._value
        for _ in range(size):
            append(sqrt(value))  # 避免 self._value 的使用
        return result

def main():
    size = 10000
    for _ in range(size):
        demo_instance = DemoClass(size)
        demo_instance.computeSqrt(size)

main()

3. 避免不必要的抽象

# 不推荐写法,代码耗时:0.55秒
class DemoClass:
    def __init__(self, value: int):
        self.value = value

    @property
    def value(self) -> int:
        return self._value

    @value.setter
    def value(self, x: int):
        self._value = x

def main():
    size = 1000000
    for i in range(size):
        demo_instance = DemoClass(size)
        value = demo_instance.value
        demo_instance.value = i

main()

任何时候当你使用额外的处理层(比如装饰器、属性访问、描述器)去包装代码时,都会让代码变慢。大部分情况下,需要重新进行审视使用属性访问器的定义是否有必要,使用getter/setter函数对属性进行访问通常是 C/C++ 程序员遗留下来的代码风格。如果真的没有必要,就使用简单属性。

# 推荐写法,代码耗时:0.33秒
class DemoClass:
    def __init__(self, value: int):
        self.value = value  # 避免不必要的属性访问器

def main():
    size = 1000000
    for i in range(size):
        demo_instance = DemoClass(size)
        value = demo_instance.value
        demo_instance.value = i

main()

4. 避免数据复制

4.1 避免无意义的数据复制

# 不推荐写法,代码耗时:6.5秒
def main():
    size = 10000
    for _ in range(size):
        value = range(size)
        value_list = [x for x in value]
        square_list = [x * x for x in value_list]

main()

上面的代码中value_list完全没有必要,这会创建不必要的数据结构或复制。

# 推荐写法,代码耗时:4.8秒
def main():
    size = 10000
    for _ in range(size):
        value = range(size)
        square_list = [x * x for x in value]  # 避免无意义的复制

main()

另外一种情况是对 Python 的数据共享机制过于偏执,并没有很好地理解或信任 Python 的内存模型,滥用 copy.deepcopy()之类的函数。通常在这些代码中是可以去掉复制操作的。

4.2 交换值时不使用中间变量

# 不推荐写法,代码耗时:0.07秒
def main():
    size = 1000000
    for _ in range(size):
        a = 3
        b = 5
        temp = a
        a = b
        b = temp

main()

上面的代码在交换值时创建了一个临时变量temp,如果不借助中间变量,代码更为简洁、且运行速度更快。

# 推荐写法,代码耗时:0.06秒
def main():
    size = 1000000
    for _ in range(size):
        a = 3
        b = 5
        a, b = b, a  # 不借助中间变量

main()

4.3 字符串拼接用join而不是+

# 不推荐写法,代码耗时:2.6秒
import string
from typing import List

def concatString(string_list: List[str]) -> str:
    result = ''
    for str_i in string_list:
        result += str_i
    return result

def main():
    string_list = list(string.ascii_letters * 100)
    for _ in range(10000):
        result = concatString(string_list)

main()

当使用a + b拼接字符串时,由于 Python 中字符串是不可变对象,其会申请一块内存空间,将ab分别复制到该新申请的内存空间中。因此,如果要拼接 n 个字符串,会产生 n-1 个中间结果,每产生一个中间结果都需要申请和复制一次内存,严重影响运行效率。而使用join()拼接字符串时,会首先计算出需要申请的总的内存空间,然后一次性地申请所需内存,并将每个字符串元素复制到该内存中去。

# 推荐写法,代码耗时:0.3秒
import string
from typing import List

def concatString(string_list: List[str]) -> str:
    return ''.join(string_list)  # 使用 join 而不是 +

def main():
    string_list = list(string.ascii_letters * 100)
    for _ in range(10000):
        result = concatString(string_list)

main()

5. 利用if条件的短路特性

# 不推荐写法,代码耗时:0.05秒
from typing import List

def concatString(string_list: List[str]) -> str:
    abbreviations = {'cf.''e.g.''ex.''etc.''flg.''i.e.''Mr.''vs.'}
    abbr_count = 0
    result = ''
    for str_i in string_list:
        if str_i in abbreviations:
            result += str_i
    return result

def main():
    for _ in range(10000):
        string_list = ['Mr.''Hat''is''Chasing''the''black''cat''.']
        result = concatString(string_list)

main()

if 条件的短路特性是指对if a and b这样的语句, 当aFalse时将直接返回,不再计算b;对于if a or b这样的语句,当aTrue时将直接返回,不再计算b。因此, 为了节约运行时间,对于or语句,应该将值为True可能性比较高的变量写在or前,而and应该推后。

# 推荐写法,代码耗时:0.03秒
from typing import List

def concatString(string_list: List[str]) -> str:
    abbreviations = {'cf.''e.g.''ex.''etc.''flg.''i.e.''Mr.''vs.'}
    abbr_count = 0
    result = ''
    for str_i in string_list:
        if str_i[-1] == '.' and str_i in abbreviations:  # 利用 if 条件的短路特性
            result += str_i
    return result

def main():
    for _ in range(10000):
        string_list = ['Mr.''Hat''is''Chasing''the''black''cat''.']
        result = concatString(string_list)

main()

6. 循环优化

6.1 用for循环代替while循环

# 不推荐写法。代码耗时:6.7秒
def computeSum(size: int) -> int:
    sum_ = 0
    i = 0
    while i < size:
        sum_ += i
        i += 1
    return sum_

def main():
    size = 10000
    for _ in range(size):
        sum_ = computeSum(size)

main()

Python 的for循环比while循环快不少。

# 推荐写法。代码耗时:4.3秒
def computeSum(size: int) -> int:
    sum_ = 0
    for i in range(size):  # for 循环代替 while 循环
        sum_ += i
    return sum_

def main():
    size = 10000
    for _ in range(size):
        sum_ = computeSum(size)

main()

6.2 使用隐式for循环代替显式for循环

针对上面的例子,更进一步可以用隐式for循环来替代显式for循环

# 推荐写法。代码耗时:1.7秒
def computeSum(size: int) -> int:
    return sum(range(size))  # 隐式 for 循环代替显式 for 循环

def main():
    size = 10000
    for _ in range(size):
        sum = computeSum(size)

main()

6.3 减少内层for循环的计算

# 不推荐写法。代码耗时:12.8秒
import math

def main():
    size = 10000
    sqrt = math.sqrt
    for x in range(size):
        for y in range(size):
            z = sqrt(x) + sqrt(y)

main() 

上面的代码中sqrt(x)位于内侧for循环, 每次训练过程中都会重新计算一次,增加了时间开销。

# 推荐写法。代码耗时:7.0秒
import math

def main():
    size = 10000
    sqrt = math.sqrt
    for x in range(size):
        sqrt_x = sqrt(x)  # 减少内层 for 循环的计算
        for y in range(size):
            z = sqrt_x + sqrt(y)

main() 

7. 使用numba.jit

我们沿用上面介绍过的例子,在此基础上使用numba.jitnumba可以将 Python 函数 JIT 编译为机器码执行,大大提高代码运行速度。关于numba的更多信息见下面的主页:http://numba.pydata.org/numba.pydata.org

# 推荐写法。代码耗时:0.62秒
import numba

@numba.jit
def computeSum(size: float) -> int:
    sum = 0
    for i in range(size):
        sum += i
    return sum

def main():
    size = 10000
    for _ in range(size):
        sum = computeSum(size)

main()

8. 选择合适的数据结构

Python 内置的数据结构如str, tuple, list, set, dict底层都是 C 实现的,速度非常快,自己实现新的数据结构想在性能上达到内置的速度几乎是不可能的。

list类似于 C++ 中的std::vector,是一种动态数组。其会预分配一定内存空间,当预分配的内存空间用完,又继续向其中添加元素时,会申请一块更大的内存空间,然后将原有的所有元素都复制过去,之后销毁之前的内存空间,再插入新元素。

删除元素时操作类似,当已使用内存空间比预分配内存空间的一半还少时,会另外申请一块小内存,做一次元素复制,之后销毁原有大内存空间。

因此,如果有频繁的新增、删除操作,新增、删除的元素数量又很多时,list的效率不高。此时,应该考虑使用collections.dequecollections.deque是双端队列,同时具备栈和队列的特性,能够在两端进行 O(1) 复杂度的插入和删除操作。

list的查找操作也非常耗时。当需要在list频繁查找某些元素,或频繁有序访问这些元素时,可以使用bisect维护list对象有序并在其中进行二分查找,提升查找的效率。

另外一个常见需求是查找极小值或极大值,此时可以使用heapq模块将list转化为一个堆,使得获取最小值的时间复杂度是 O(1)

下面的网页给出了常用的 Python 数据结构的各项操作的时间复杂度:https://wiki.python.org/moin/TimeComplexity

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典