分类目录归档:性能优化

Pydantic — 强大的数据校验工具,比DRF快12倍

Pydantic 是一个使用Python类型注解进行数据验证和管理的模块。安装方法非常简单,打开终端输入:

pip install pydantic

它类似于 Django DRF 序列化器的数据校验功能,不同的是,Django里的序列化器的Field是有限制的,如果你想要使用自己的Field还需要继承并重写它的基类:

# Django 序列化器
class Book(models.Model):
    id = models.AutoField(primary_key=True)
    name = models.CharField(max_length=32)
    price = models.DecimalField(max_digits=5, decimal_places=2)
    author = models.CharField(max_length=32)
    publish = models.CharField(max_length=32)

而 Pydantic 基于Python3.7以上的类型注解特性,实现了可以对任何类做数据校验的功能:

# Pydantic 数据校验功能
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel


class User(BaseModel):
    id: int
    name = 'John Doe'
    signup_ts: Optional[datetime] = None
    friends: List[int] = []


external_data = {
    'id': '123',
    'signup_ts': '2019-06-01 12:22',
    'friends': [1, 2, '3'],
}
user = User(**external_data)
print(user.id)
print(type(user.id))
#> 123
#> <class 'int'>
print(repr(user.signup_ts))
#> datetime.datetime(2019, 6, 1, 12, 22)
print(user.friends)
#> [1, 2, 3]
print(user.dict())
"""
{
    'id': 123,
    'signup_ts': datetime.datetime(2019, 6, 1, 12, 22),
    'friends': [1, 2, 3],
    'name': 'John Doe',
}
"""

从上面的基本使用可以看到,它甚至能自动帮你做数据类型的转换,比如代码中的 user.id, 在字典中是字符串,但经过Pydantic校验器后,它自动变成了int型,因为User类里的注解就是int型。

当我们的数据和定义的注解类型不一致时会报这样的Error:

from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel


class User(BaseModel):
    id: int
    name = 'John Doe'
    signup_ts: Optional[datetime] = None
    friends: List[int] = []


external_data = {
    'id': '123',
    'signup_ts': '2019-06-01 12:222',
    'friends': [1, 2, '3'],
}
user = User(**external_data)
"""
Traceback (most recent call last):
  File "1.py", line 18, in <module>
    user = User(**external_data)
  File "pydantic\main.py", line 331, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for User
signup_ts
  invalid datetime format (type=value_error.datetime)
"""

即 “invalid datetime format”, 因为我传入的 signup_ts 不是标准的时间格式(多了个2)。

1.Pydantic 模型数据导出

通过Pydantic模型中自带的 json 属性方法,能让经过校验后的数据一行命令直接转成 json 字符串,如前文中的user对象:

print(user.dict())  # 转为字典
"""
{
    'id': 123,
    'signup_ts': datetime.datetime(2019, 6, 1, 12, 22),
    'friends': [1, 2, 3],
    'name': 'John Doe',
}
"""
print(user.json())  # 转为json
"""
{"id": 123, "signup_ts": "2019-06-01T12:22:00", "friends": [1, 2, 3], "name": "John Doe"}
"""

非常方便。它还支持将整个数据结构导出为 schema json,它能完整地描述整个对象的数据结构类型:

print(user.schema_json(indent=2))
"""
{
  "title": "User",
  "type": "object",
  "properties": {
    "id": {
      "title": "Id",
      "type": "integer"
    },
    "signup_ts": {
      "title": "Signup Ts",
      "type": "string",
      "format": "date-time"
    },
    "friends": {
      "title": "Friends",
      "default": [],
      "type": "array",
      "items": {
        "type": "integer"
      }
    },
    "name": {
      "title": "Name",
      "default": "John Doe",
      "type": "string"
    }
  },
  "required": [
    "id"
  ]
}
"""

2.数据导入

除了直接定义数据校验模型,它还能通过ORM、字符串、文件导入到数据校验模型:

比如字符串(raw):

from datetime import datetime
from pydantic import BaseModel


class User(BaseModel):
    id: int
    name = 'John Doe'
    signup_ts: datetime = None
      
m = User.parse_raw('{"id": 123, "name": "James"}')
print(m)
#> id=123 signup_ts=None name='James'

此外,它能直接将ORM的对象输入,转为Pydantic的对象,比如从Sqlalchemy ORM:

from typing import List
from sqlalchemy import Column, Integer, String
from sqlalchemy.dialects.postgresql import ARRAY
from sqlalchemy.ext.declarative import declarative_base
from pydantic import BaseModel, constr

Base = declarative_base()


class CompanyOrm(Base):
    __tablename__ = 'companies'
    id = Column(Integer, primary_key=True, nullable=False)
    public_key = Column(String(20), index=True, nullable=False, unique=True)
    name = Column(String(63), unique=True)
    domains = Column(ARRAY(String(255)))


class CompanyModel(BaseModel):
    id: int
    public_key: constr(max_length=20)
    name: constr(max_length=63)
    domains: List[constr(max_length=255)]

    class Config:
        orm_mode = True


co_orm = CompanyOrm(
    id=123,
    public_key='foobar',
    name='Testing',
    domains=['example.com', 'foobar.com'],
)
print(co_orm)
#> <models_orm_mode.CompanyOrm object at 0x7f0bdac44850>
co_model = CompanyModel.from_orm(co_orm)
print(co_model)
#> id=123 public_key='foobar' name='Testing' domains=['example.com',
#> 'foobar.com']

从Json文件导入:

from datetime import datetime
from pathlib import Path
from pydantic import BaseModel


class User(BaseModel):
    id: int
    name = 'John Doe'
    signup_ts: datetime = None
      
path = Path('data.json')
path.write_text('{"id": 123, "name": "James"}')
m = User.parse_file(path)
print(m)

从pickle导入:

import pickle
from datetime import datetime
from pydantic import BaseModel

pickle_data = pickle.dumps({
    'id': 123,
    'name': 'James',
    'signup_ts': datetime(2017, 7, 14)
})
m = User.parse_raw(
    pickle_data, content_type='application/pickle', allow_pickle=True
)
print(m)
#> id=123 signup_ts=datetime.datetime(2017, 7, 14, 0, 0) name='James'

3.自定义数据校验

你还能给它增加 validator 装饰器,增加你需要的校验逻辑:

from pydantic import BaseModel, ValidationError, validator


class UserModel(BaseModel):
    name: str
    username: str
    password1: str
    password2: str

    @validator('name')
    def name_must_contain_space(cls, v):
        if ' ' not in v:
            raise ValueError('must contain a space')
        return v.title()

    @validator('password2')
    def passwords_match(cls, v, values, **kwargs):
        if 'password1' in values and v != values['password1']:
            raise ValueError('passwords do not match')
        return v

    @validator('username')
    def username_alphanumeric(cls, v):
        assert v.isalnum(), 'must be alphanumeric'
        return v

上面,我们增加了三种自定义校验逻辑:

1.name 必须带有空格

2.password2 必须和 password1 相同

3.username 必须为字母

让我们试试这三个校验是否成功实现:

user = UserModel(
    name='samuel colvin',
    username='scolvin',
    password1='zxcvbn',
    password2='zxcvbn',
)
print(user)
#> name='Samuel Colvin' username='scolvin' password1='zxcvbn' password2='zxcvbn'

try:
    UserModel(
        name='samuel',
        username='scolvin',
        password1='zxcvbn',
        password2='zxcvbn2',
    )
except ValidationError as e:
    print(e)
    """
    2 validation errors for UserModel
    name
      must contain a space (type=value_error)
    password2
      passwords do not match (type=value_error)
    """

可以看到,第一个UserModel里的数据完全没有问题,通过校验。

第二个UserModel里的数据,由于name存在空格,password2和password1不一致,无法通过校验。

4.性能表现

这是最令我惊讶的部分,Pydantic 比 Django-rest-framework 还快了12.3倍:

PackageVersionRelative PerformanceMean validation time
pydantic1.7.393.7μs
attrs + cattrs20.3.01.5x slower143.6μs
valideer0.4.21.9x slower175.9μs
marshmallow3.10.02.4x slower227.6μs
voluptuous0.12.12.7x slower257.5μs
trafaret2.1.03.2x slower296.7μs
schematics2.1.010.2x slower955.5μs
django-rest-framework3.12.212.3x slower1148.4μs
cerberus1.3.225.9x slower2427.6μs

而且他们的所有基准测试代码都是开源的,你可以在下面这个Github链接找到:

https://github.com/samuelcolvin/pydantic/tree/master/benchmarks

如果你的网络无法访问GitHub,请关注Python实用宝典公众号后台回复Pydantic获取。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

什么格式是保存 Pandas 数据的最好格式?

在数据分析相关项目工作时,我通常使用Jupyter笔记本和pandas库来处理和移动我的数据。对于中等大小的数据集来说,这是一个非常直接的过程,你甚至可以将其存储为纯文本文件而没有太多的开销。

然而,当你的数据集中的观测数据数量较多时,保存和加载数据回内存的过程就会变慢,现在程序的重新启动都会迫使你等待数据重新加载。所以最终,CSV文件或任何其他纯文本格式都会失去吸引力。

我们可以做得更好。有很多二进制格式可以用来将数据存储到磁盘上,其中有很多格式pandas都支持。我们怎么能知道哪一种更适合我们的目的呢?

来吧,我们尝试其中的几个,然后进行对比!这就是我决定在这篇文章中要做的:通过几种方法将 pandas.DataFrame 保存到磁盘上,看看哪一种在I/O速度、内存消耗和磁盘空间方面做的更好。

在这篇文章中,我将展示我的测试结果。

1.要比较的格式

我们将考虑采用以下格式来存储我们的数据:

1. CSV — 数据科学家的一个好朋友
2. Pickle — 一种Python的方式来序列化事物
3. MessagePack — 它就像JSON,但又快又小
4. HDF5 — 一种设计用于存储和组织大量数据的文件格式
5. Feather — 一种快速、轻量级、易于使用的二进制文件格式,用于存储数据框架
6. Parquet — Apache Hadoop的柱状存储格式

所有这些格式都是被广泛使用的,而且(也许除了MessagePack)在你做一些数据分析的事情时非常经常遇到。

为了追求找到最好的缓冲格式来存储程序会话之间的数据,我选择了以下指标进行比较。

1. size_mb – 文件大小(Mb)。
2. save_time – 将数据帧保存到磁盘上所需的时间量。
3. load_time – 将之前转储的数据帧加载到内存中所需要的时间量。
4. save_ram_delta_mb – 数据帧保存过程中最大的内存消耗增长量。
5. load_ram_delta_mb – 数据帧加载过程中的最大内存消耗增长量。

请注意,当我们使用高效压缩的二进制数据格式,如 Parquet 时,最后两个指标变得非常重要。它们可以帮助我们估计加载序列化数据所需的内存量,此外还有数据大小本身。我们将在接下来的章节中更详细地讨论这个问题。

2.测试及结果

我决定使用一个合成数据集进行测试,以便更好地控制序列化的数据结构和属性。

另外,我在我的基准中使用了两种不同的方法:

(a) 将生成的分类变量保留为字符串。

(b) 在执行任何I/O之前将它们转换为 pandas.Categorical 数据类型。

函数generate_dataset显示了我在基准中是如何生成数据集的:

def generate_dataset(n_rows, num_count, cat_count, max_nan=0.1, max_cat_size=100):
    """
    随机生成具有数字和分类特征的数据集。
    
    数字特征取自正态分布X ~ N(0, 1)。
    分类特征则被生成为随机的uuid4字符串。
    
    此外,数字和分类特征的max_nan比例被替换为NaN值。
    """
    dataset, types = {}, {}
    
    def generate_categories():
        from uuid import uuid4
        category_size = np.random.randint(2, max_cat_size)
        return [str(uuid4()) for _ in range(category_size)]
    
    for col in range(num_count):
        name = f'n{col}'
        values = np.random.normal(0, 1, n_rows)
        nan_cnt = np.random.randint(1, int(max_nan*n_rows))
        index = np.random.choice(n_rows, nan_cnt, replace=False)
        values[index] = np.nan
        dataset[name] = values
        types[name] = 'float32'
        
    for col in range(cat_count):
        name = f'c{col}'
        cats = generate_categories()
        values = np.array(np.random.choice(cats, n_rows, replace=True), dtype=object)
        nan_cnt = np.random.randint(1, int(max_nan*n_rows))
        index = np.random.choice(n_rows, nan_cnt, replace=False)
        values[index] = np.nan
        dataset[name] = values
        types[name] = 'object'
    
    return pd.DataFrame(dataset), types

我们将CSV文件的保存和加载性能作为一个基准。

五个随机生成的具有一百万个观测值的数据集被转储到CSV中,并读回内存以获得平均指标。

每种二进制格式都针对20个随机生成的具有相同行数的数据集进行测试。

这些数据集包括15个数字特征和15个分类特征。你可以在这个资源库中找到带有基准测试功能和所需的完整源代码:

https://github.com/devforfu/pandas-formats-benchmark

或在Python实用宝典后台回复 Pandas IO对比 ,下载完整代码。

(a) 数据为字符串特征时的性能

下图显示了每种数据格式的平均I/O时间。一个有趣的观察是,hdf显示出比csv更慢的加载速度,而其他二进制格式的表现明显更好。其中最令人印象深刻的是feather和parquet。

在保存数据和从磁盘上读取数据时,内存开销如何?

下一张图片告诉我们,hdf 的表现就不是那么好了。可以肯定的是,csv在保存/加载纯文本字符串时不需要太多的额外内存,而Feather和parquet则相当接近:

最后,让我们看看文件的大小。这次parquet显示了一个令人印象深刻的结果,考虑到这种格式是为有效存储大量数据而开发的,这并不令人惊讶。

(b) 字符串特征转换为数字时的性能

在上一节中,我们没有尝试有效地存储我们的分类特征而是使用普通的字符串。让我们来弥补这个遗漏吧! 这一次我们使用一个专门的 pandas.Categorical 类型,转字符串特征为数字特征。

看看现在与纯文本的csv相比,它看起来如何!

现在所有的二进制格式都显示出它们的真正力量。Csv的基准结果已经远远落后了,所以让我们把它去掉,以便更清楚地看到各种二进制格式之间的差异:

Feather 和 Pickle 显示了最好的 I/O 速度,而 hdf 仍然显示了明显的性能开销。

现在是时候比较数据进程加载时的内存消耗了。下面的柱状图显示了我们之前提到的关于parquet格式的一个重要事实。

可以看到 parquet 读写时的内存空间差距有多大,你有可能你无法将比较大的 parquet 文件加载到内存中。

最后的图显示了各格式的文件大小。所有的格式都显示出良好的效果,除了hdf仍然需要比其他格式多得多的空间:

3.结论

正如我们的测试所显示的,似乎 feather 格式是存储Python会话数据的理想候选者。它显示了很快的I/O速度,在磁盘上不占用太多内存,并且在加载回RAM时不需要消耗太大的内存。

当然,这种比较并不意味着你应该在每个可能的情况下使用这种格式。例如,feather格式一般不会被用作长期文件存储的格式。

另外,某些特定情况下也无法使用 feather,这由你的整个程序架构决定。然而,就如本帖开头所述的目的,它在不被任何特殊事项限制的情况下是一个很好的选择。

本文译自 towardsdatascience
作者: Ilia Zaitsev
有部分修改。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python怎样存储变量性能最优?这篇文章告诉你答案

工作时我们经常会遇到需要临时保存结果变量的场景,尤其是一些数据处理、模型开发的场景,加载处理速度是个很漫长的过程,于是经常会把这些变量储存起来。

而储存变量最常见、最普遍的方法是用pickle,保存为pkl文件。但是如果从写入和读取的性能角度考虑,pkl可能真的不是最优选。

Pickle有其独特的好处,大部分变量不需要进行处理,都能直接存到pkl文件里,但这样的方便其实是牺牲了部分性能取得的。与之相比,numpy的.npy格式就比pickle性能上快不少。

当然,我们需要有证据支撑这个观点。所以今天我们就来做个实验,分别在Python2和Python3中对比 numpy 和 pickle 两种存储格式(.npy, .pkl) 对数据的存储和读取的性能对比。

部分内容参考分析自: https://applenob.github.io/python/save/

1. Python2中, npy与pkl的性能对比

首先初始化数据:

import numpy as np
import time
import cPickle as pkl
import os

all_batches = []
for i in range(20):
    a1 = np.random.normal(size=[25600, 40])
    label = np.random.normal(size=[25600, 1])
    all_batch = np.concatenate([a1, label], 1)
    all_batches.append(all_batch)
all_batches = np.array(all_batches)
print(all_batches.shape)
# (20, 25600, 41)

然后测试使用pickle保存和读取时间的耗时,以及整个文件的大小:

s_t1 = time.time()
pkl_name = "a.pkl"
with open(pkl_name, "wb") as f:
    pkl.dump(all_batches, f)
pkl_in_time = time.time() - s_t1
print("pkl dump costs {} sec".format(pkl_in_time))

s_t2 = time.time()
with open(pkl_name, "rb") as f:
    new_a = pkl.load(f)
pkl_out_time = time.time() - s_t2
print("pkl load costs {} sec".format(pkl_out_time))

pkl_size = os.path.getsize(pkl_name)
print("pkl file size: {} byte, {} mb".format(pkl_size, float(pkl_size)/(1024*1024)))

结果如下:

pkl dump costs 67.7483091354 sec
pkl load costs 52.1168899536 sec
pkl file size: 497437110 byte, 474.392995834 mb

然后再试一下npy的写入和读取:

s_t3 = time.time()
npy_name = "a.npy"
with open(npy_name, "wb") as f:
    np.save(f, arr=all_batches)
npy_in_time = time.time() - s_t3
print("npy save costs {} sec".format(npy_in_time))
s_t4 = time.time()
with open(npy_name, "rb") as f:
    new_a = np.load(f)
npy_out_time = time.time() - s_t4
print("npy load costs {} sec".format(npy_out_time))
npy_size = os.path.getsize(npy_name)
print("npy file size: {} byte, {} mb".format(npy_size, float(npy_size) / (1024 * 1024)))

结果如下:

npy save costs 20.718367815 sec
npy load costs 0.62314915657 sec
npy file size: 167936128 byte, 160.15637207 mb

结果发现,npy性能明显优于pkl格式。

通过多次测试发现,在Python2中,npy格式的性能优势全面碾压pkl,工程允许的情况下,在Python2中,我们应该在这二者中毫不犹豫地选择npy.

2.Python3中, npy与pkl的性能对比

Python2已经是过去式,重点还要看Python3.

在Python3中,与Python2的代码唯一一句不一样的是pickle的引入:

# Python2:
import cPickle as pkl

# Python3:
import pickle as pkl

其他代码基本一样,替换代码后,重新运行程序,让我们看看在Python3上,npy格式和pkl格式性能上的区别:

首先是pkl格式的表现:

ckenddeMacBook-Pro:Documents ckend$ python 1.py 
(20, 25600, 41)
pkl dump costs 24.32167887687683 sec
pkl load costs 4.480823040008545 sec
pkl file size: 167936163 byte, 160.15640544891357 mb

然后是npy格式的表现:

npy save costs 22.471696853637695 sec
npy load costs 0.3791017532348633 sec
npy file size: 167936080 byte, 160.1563262939453 mb

可以看到在Python3中pkl格式和npy格式的存储大小是基本相同的,在存储耗时上也相差无几。但是在读取数据的时候,npy相对于pkl还是有一定的优势的。

因此,如果你的程序非常注重读取效率,那么我觉得npy格式会比pkl格式更适合你。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandarallel 一个能让你的Pandas计算火力拉满的工具

没有使用Pandarallel
使用了Pandarallel

众所周知,由于GIL的存在,Python单进程中的所有操作都是在一个CPU核上进行的,所以为了提高运行速度,我们一般会采用多进程的方式。而多进程无非就是以下几种方案:

  • 1、multiprocessing
  • 2、concurrent.futures.ProcessPoolExecutor()
  • 3、joblib
  • 4、ppserver
  • 5、celery

这些方案对于普通Pandas玩家来说都不是特别友好,怎样才能算作一个友好的并行处理方案?就是原来的逻辑我基本不用变,仅修改需要计算的那行就能完成我们目标的方案,而 pandarallel 就是一个这样友好的工具。

没有并行计算(原始pandas) pandarallel
df.apply(func)df.parallel_apply(func)
df.applymap(func)df.parallel_applymap(func)
df.groupby(args).apply(func)df.groupby(args).parallel_apply(func)
df.groupby(args1).col_name.rolling(args2).apply(func)df.groupby(args1).col_name.rolling(args2).parallel_apply(func)
df.groupby(args1).col_name.expanding(args2).apply(func)df.groupby(args1).col_name.expanding(args2).parallel_apply(func)
series.map(func)series.parallel_map(func)
series.apply(func)series.parallel_apply(func)
series.rolling(args).apply(func)series.rolling(args).parallel_apply(func)

可以看到,在 pandarallel 的世界里,你只需要替换原有的 pandas 处理语句就能实现多CPU并行计算。非常方便、非常nice.

在4核CPU的性能测试上,它比原始语句快了接近4倍。测试条件(OS: Linux Ubuntu 16.04,Hardware: Intel Core i7 @ 3.40 GHz – 4 cores),这就是我所说的,它把CPU充分利用了起来。

下面就给大家介绍这个模块怎么用,其实非常简单,任何代码只需要加几行代码就能实现质的飞跃。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install pandarallel

对于windows用户,有一个不好的消息是,它只能在Windows的linux子系统上运行(WSL),你可以在微软官网上找到安装教程:

https://docs.microsoft.com/zh-cn/windows/wsl/about

2.使用Pandarallel

使用前,需要对Pandarallel进行初始化:

from pandarallel import pandarallel
pandarallel.initialize()

这样才能调用并行计算的API,不过 initialize 中有一个重要参数需要说明,那就是 nb_workers ,它将指定并行计算的Worker数,如果没有设置,所有CPU的核都会用上。

Pandarallel一共支持8种Pandas操作,下面是一个apply方法的例子。

import pandas as pd
import time
import math
import numpy as np
from pandarallel import pandarallel

# 初始化
pandarallel.initialize()
df_size = int(5e6)
df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),
                       b=np.random.rand(df_size)))
def func(x):
    return math.sin(x.a**2) + math.sin(x.b**2)

# 正常处理
res = df.apply(func, axis=1)

# 并行处理
res_parallel = df.parallel_apply(func, axis=1)

# 查看结果是否相同
res.equals(res_parallel)

其他方法使用上也是类似的,在原始的函数名称前加上 parallel_。比如DataFrame.groupby.apply:

import pandas as pd
import time
import math
import numpy as np
from pandarallel import pandarallel

# 初始化
pandarallel.initialize()
df_size = int(3e7)
df = pd.DataFrame(dict(a=np.random.randint(1, 1000, df_size),
                       b=np.random.rand(df_size)))
def func(df):
    dum = 0
    for item in df.b:
        dum += math.log10(math.sqrt(math.exp(item**2)))
        
    return dum / len(df.b)

# 正常处理
res = df.groupby("a").apply(func)
# 并行处理
res_parallel = df.groupby("a").parallel_apply(func)
res.equals(res_parallel)

又比如 DataFrame.groupby.rolling.apply:

import pandas as pd
import time
import math
import numpy as np
from pandarallel import pandarallel

# 初始化
pandarallel.initialize()
df_size = int(1e6)
df = pd.DataFrame(dict(a=np.random.randint(1, 300, df_size),
                       b=np.random.rand(df_size)))
def func(x):
    return x.iloc[0] + x.iloc[1] ** 2 + x.iloc[2] ** 3 + x.iloc[3] ** 4

# 正常处理
res = df.groupby('a').b.rolling(4).apply(func, raw=False)
# 并行处理
res_parallel = df.groupby('a').b.rolling(4).parallel_apply(func, raw=False)
res.equals(res_parallel)

案例都是类似的,这里就直接列出表格,不浪费大家宝贵的时间去阅读一些重复的例子了:

没有并行计算(原始pandas) pandarallel
df.apply(func)df.parallel_apply(func)
df.applymap(func)df.parallel_applymap(func)
df.groupby(args).apply(func)df.groupby(args).parallel_apply(func)
df.groupby(args1).col_name.rolling(args2).apply(func)df.groupby(args1).col_name.rolling(args2).parallel_apply(func)
df.groupby(args1).col_name.expanding(args2).apply(func)df.groupby(args1).col_name.expanding(args2).parallel_apply(func)
series.map(func)series.parallel_map(func)
series.apply(func)series.parallel_apply(func)
series.rolling(args).apply(func)series.rolling(args).parallel_apply(func)

3.注意事项

1. 我有 8 个 CPU,但 parallel_apply 只能加快大约4倍的计算速度。为什么?

答:正如我前面所言,Python中每个进程占用一个核,Pandarallel 最多只能加快到你所拥有的核心的总数,一个 4 核的超线程 CPU 将向操作系统显示 8 个 CPU,但实际上只有 4 个核心,因此最多加快4倍。

2. 并行化是有成本的(实例化新进程,通过共享内存发送数据,…),所以只有当并行化的计算量足够大时,并行化才是有意义的。对于很少量的数据,使用 Pandarallel 并不总是值得的。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

超级方便的轻量级Python流水线工具,拥有漂亮的可视化界面!

Mara-pipelines 是一个轻量级的数据转换框架,具有透明和低复杂性的特点。其他特点如下:

  • 基于非常简单的Python代码就能完成流水线开发。
  • 使用 PostgreSQL 作为数据处理引擎。
  • 有Web界面可视化分析流水线执行过程。
  • 基于 Python 的 multiprocessing 单机流水线执行。不需要分布式任务队列。轻松调试和输出日志。
  • 基于成本的优先队列:首先运行具有较高成本(基于记录的运行时间)的节点。

此外,在Mara-pipelines的Web界面中,你不仅可以查看和管理流水线及其任务节点,你还可以直接触发这些流水线和节点,非常好用:

1.安装

由于使用了大量的依赖,Mara-pipelines 并不适用于Windows,如果你需要在Windows上使用Mara-pipelines,请使用docker或者windows下的linux子系统

使用pipe安装Mara-pipelines:

pip install mara-pipelines

或者:

pip install git+https://github.com/mara/mara-pipelines.git

2.使用示例

这是一个基础的流水线演示,由三个相互依赖的节点组成,包括 任务1(ping_localhost), 子流水线(sub_pipeline), 任务2(sleep):

# 注意,这个示例中使用了部分国外的网站,如果无法访问,请变更为国内网站。
from mara_pipelines.commands.bash import RunBash
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.ui.cli import run_pipeline, run_interactively

pipeline = Pipeline(
    id='demo',
    description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands')

pipeline.add(Task(id='ping_localhost', description='Pings localhost',
                  commands=[RunBash('ping -c 3 localhost')]))

sub_pipeline = Pipeline(id='sub_pipeline', description='Pings a number of hosts')

for host in ['google', 'amazon', 'facebook']:
    sub_pipeline.add(Task(id=f'ping_{host}', description=f'Pings {host}',
                          commands=[RunBash(f'ping -c 3 {host}.com')]))

sub_pipeline.add_dependency('ping_amazon', 'ping_facebook')
sub_pipeline.add(Task(id='ping_foo', description='Pings foo',
                      commands=[RunBash('ping foo')]), ['ping_amazon'])

pipeline.add(sub_pipeline, ['ping_localhost'])

pipeline.add(Task(id='sleep', description='Sleeps for 2 seconds',
                  commands=[RunBash('sleep 2')]), ['sub_pipeline'])

可以看到,Task包含了多个commands,这些commands会用于真正地执行动作。而 pipeline.add 的参数中,第一个参数是其节点,第二个参数是此节点的上游。如:

pipeline.add(sub_pipeline, ['ping_localhost'])

则表明必须执行完 ping_localhost 才会执行 sub_pipeline.

为了运行这个流水线,需要配置一个 PostgreSQL 数据库来存储运行时信息、运行输出和增量处理状态:

import mara_db.auto_migration
import mara_db.config
import mara_db.dbs

mara_db.config.databases \
    = lambda: {'mara': mara_db.dbs.PostgreSQLDB(host='localhost', user='root', database='example_etl_mara')}

mara_db.auto_migration.auto_discover_models_and_migrate()

如果 PostgresSQL 正在运行并且账号密码正确,输出如下所示(创建了一个包含多个表的数据库):

Created database "postgresql+psycopg2://root@localhost/example_etl_mara"

CREATE TABLE data_integration_file_dependency (
    node_path TEXT[] NOT NULL, 
    dependency_type VARCHAR NOT NULL, 
    hash VARCHAR, 
    timestamp TIMESTAMP WITHOUT TIME ZONE, 
    PRIMARY KEY (node_path, dependency_type)
);

.. more tables

为了运行这个流水线,你需要:

from mara_pipelines.ui.cli import run_pipeline

run_pipeline(pipeline)

这将运行单个流水线节点及其 (sub_pipeline) 所依赖的所有节点:

run_pipeline(sub_pipeline, nodes=[sub_pipeline.nodes['ping_amazon']], with_upstreams=True)

3.Web 界面

我认为 mara-pipelines 最有用的是他们提供了基于Flask管控流水线的Web界面。

对于每条流水线,他们都有一个页面显示:

  • 所有子节点的图以及它们之间的依赖关系
  • 流水线的总体运行时间图表以及过去 30 天内最昂贵的节点(可配置)
  • 所有流水线节点及其平均运行时间和由此产生的排队优先级的表
  • 流水线最后一次运行的输出和时间线

对于每个任务,都有一个页面显示

  • 流水线中任务的上游和下游
  • 最近 30 天内任务的运行时间
  • 任务的所有命令
  • 任务最后运行的输出

此外,流水线和任务可以直接从网页端调用运行,这是非常棒的特点:

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Isort 自动整理”import”的超实用工具详细教程

isort 是一个Python的实用程序/库,它会按字母顺序对导入(import)的库进行排序,并自动分组。它提供多种使用方式,包括命令行、Python调用等。

它基于Python 3.6+实现,但也支持格式化Python 2代码。

在使用 isort 格式化你的 import 之前,你的代码可能是长这样的:

from my_lib import Object
import os
from my_lib import Object3
from my_lib import Object2
import sys
from third_party import lib15, lib1, lib2, lib3, lib4, lib5, lib6, lib7, lib8, lib9, lib10, lib11, lib12, lib13, lib14
import sys
from __future__ import absolute_import
from third_party import lib3
print("Hey")
print("yo")

使用 isort 格式化后的代码是这样的:

from __future__ import absolute_import import os
import sys from third_party import (lib1, lib2, lib3, lib4, lib5, lib6, lib7, lib8,
                        lib9, lib10, lib11, lib12, lib13, lib14, lib15)

from my_lib import Object, Object2, Object3 
print("Hey")
print("yo")

​杂乱无章的格式瞬间变得井然有序,可见这是一款多么优秀的整理工具,下面就来介绍这个工具的安装及使用过程,及进阶用法。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install isort

如果你需要让他支持对 requirements.txt 的整理,请这样安装:

pip install isort[requirements_deprecated_finder]

2.使用 isort 整理你的python引用

isort 有2种使用方法,一种是从命令行直接针对py文件进行整理、另一种是在Python内导入 isort 进行整理。

命令行整理

要在特定文件上运行 isort,请在命令行执行以下操作:

isort mypythonfile.py mypythonfile2.py
# 或
python -m isort mypythonfile.py mypythonfile2.py

要对本文件夹递归进行isort整理,请执行以下操作:

isort .
# 或
python -m isort .

要查看更改建议的而不直接应用它们,请执行以下操作:

isort mypythonfile.py --diff

如果你要对项目自动运行isort,但是希望仅在未引入语法错误的情况下应用更改:

isort --atomic .

(注意:这在默认情况下是禁用的,因为它阻止了 isort 去整理不同版本的Python代码。)

从Python内部

import isort
isort.file("pythonfile.py")

或者:

import isort
sorted_code = isort.code("import b\nimport a\n")

3. 智能平衡格式化

从 isort 3.1.0 开始,添加了对平衡多行导入的支持。启用此选项后,isort 将动态地将导入长度更改为生成最平衡网格的长度,同时保持低于定义的最大导入长度。

开启了平衡导入的格式化:

from __future__ import (absolute_import, division,
                        print_function, unicode_literals)

未开启平衡的格式化:

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

要启用此设置, 在你的配置设置 balanced_wrapping=True 或 通过命令行添加 -e 参数执行整理。

4.跳过某个import

要使 isort 忽略单个 import,只需在包含文本的导入行的末尾添加注释 isort:skip,如下:

import module  # isort:skip

或者:

from xyz import (abc,  # isort:skip
                 yo,
                 hey)

要使 isort 跳过整个文件,只需添加 isort:skip_file 到文件的开头注释中:

""" 
my_module.py
Best module ever

isort:skip_file
"""

import b
import a

这个工具还是相当方便的,尤其是针对一些杂乱无章、多年沉淀下来的项目代码的 import 进行整理的时候,它会变得非常香。有需要的小伙伴可以赶快试一下。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

干货源码剖析!详解 Celery Beat 实现原理

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,它是一个专注于实时处理的任务队列,同时也支持任务调度。

为了讲解 Celery Beat 的周期调度机制及实现原理,我们会基于Django从制作一个简单的周期任务开始,然后一步一步拆解 Celery Beat 的源代码。

相关前置应用知识,可以阅读以下文章:

1.Django Celery 异步与定时任务实战教程
2.Python Celery 异步快速下载股票数据

1.Celery 简单周期任务示例

在 celery_app.tasks.py 中添加如下任务:

@shared_task
def pythondict_task():
    print("pythondict_task")

在 django.celery.py 文件中添加如下配置,

from celery_django import settings
from datetime import timedelta


app.autodiscover_tasks(lambda : settings.INSTALLED_APPS)

CELERYBEAT_SCHEDULE = {
    'pythondict_task': {
        'task': 'celery_app.tasks.pythondict_task',
        'schedule': timedelta(seconds=3),
    },
}

app.conf.update(CELERYBEAT_SCHEDULE=CELERYBEAT_SCHEDULE)

至此,配置完成,此时,先启动 Celery Beat 定时任务命令:

celery beat -A celery_django -S django

然后打开第二个终端进程启动消费者:

celery -A celery_django worker 

此时在worker的终端上就会输出类似如下的信息:

    [2021-07-11 16:34:11,546: WARNING/PoolWorker-3] pythondict_task
    [2021-07-11 16:34:11,550: WARNING/PoolWorker-4] pythondict_task
    [2021-07-11 16:34:11,551: WARNING/PoolWorker-2] pythondict_task
    [2021-07-11 16:34:11,560: WARNING/PoolWorker-1] pythondict_task

看到结果正常输出,说明任务成功定时执行。

2.源码剖析

为了明白 Celery Beat 是如何实现周期任务调度的,我们需要从 Celery 源码入手。

当你执行 Celery Beat 启动命令的时候,到底发生了什么?

celery beat -A celery_django -S django

当你执行这个命令的时候,Celery/bin/celery.py 中的 CeleryCommand 类接收到命令后,会选择 beat 对应的类执行如下代码:

# Python 实用宝典
# https://pythondict.com

from celery.bin.beat import beat

class CeleryCommand(Command):
    commands = {
        # ...
        'beat': beat,
        # ...
    }
    # ...
    def execute(self, command, argv=None):
        try:
            cls = self.commands[command]
        except KeyError:
            cls, argv = self.commands['help'], ['help']
        cls = self.commands.get(command) or self.commands['help']
        try:
            return cls(
                app=self.app, on_error=self.on_error,
                no_color=self.no_color, quiet=self.quiet,
                on_usage_error=partial(self.on_usage_error, command=command),
            ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
        except self.UsageError as exc:
            self.on_usage_error(exc)
            return exc.status
        except self.Error as exc:
            self.on_error(exc)
            return exc.status

此时cls对应的是beat类,通过查看位于bin/beat.py中的 beat 类可知,该类只重写了run方法和add_arguments方法。

所以此时执行的 run_from_argv 方法是 beat 继承的 Command 的 run_from_argv 方法:

# Python 实用宝典
# https://pythondict.com

def run_from_argv(self, prog_name, argv=None, command=None):
    return self.handle_argv(prog_name, sys.argv if argv is None else argv, command)

该方法中会调用 Command 的 handle_argv 方法,而该方法在经过相关参数处理后会调用 self(*args, **options) 到 __call__ 函数:

    # Python 实用宝典
    # https://pythondict.com
    
    def handle_argv(self, prog_name, argv, command=None):
        """Parse command-line arguments from ``argv`` and dispatch
        to :meth:`run`.

        :param prog_name: The program name (``argv[0]``).
        :param argv: Command arguments.

        Exits with an error message if :attr:`supports_args` is disabled
        and ``argv`` contains positional arguments.

        """
        options, args = self.prepare_args(
            *self.parse_options(prog_name, argv, command))
        return self(*args, **options)

Command 类的 __call__函数:

    # Python 实用宝典
    # https://pythondict.com
    
    def __call__(self, *args, **kwargs):
        random.seed()  # maybe we were forked.
        self.verify_args(args)
        try:
            ret = self.run(*args, **kwargs)
            return ret if ret is not None else EX_OK
        except self.UsageError as exc:
            self.on_usage_error(exc)
            return exc.status
        except self.Error as exc:
            self.on_error(exc)
            return exc.status

可见,在该函数中会调用到run方法,此时调用的run方法就是beat类中重写的run方法,查看该方法:

# Python 实用宝典
# https://pythondict.com
    
class beat(Command):
    """Start the beat periodic task scheduler.

    Examples::

        celery beat -l info
        celery beat -s /var/run/celery/beat-schedule --detach
        celery beat -S djcelery.schedulers.DatabaseScheduler

    """
    doc = __doc__
    enable_config_from_cmdline = True
    supports_args = False

    def run(self, detach=False, logfile=None, pidfile=None, uid=None,
            gid=None, umask=None, working_directory=None, **kwargs):
        # 是否开启后台运行
        if not detach:
            maybe_drop_privileges(uid=uid, gid=gid)
        workdir = working_directory
        kwargs.pop('app', None)
        # 设定偏函数
        beat = partial(self.app.Beat,
                       logfile=logfile, pidfile=pidfile, **kwargs)

        if detach:
            with detached(logfile, pidfile, uid, gid, umask, workdir):
                return beat().run() # 后台运行
        else:
            return beat().run() # 立即运行

这里引用了偏函数的知识,偏函数就是从基函数创建一个新的带默认参数的函数,详细可见廖雪峰老师的介绍:
https://www.liaoxuefeng.com/wiki/1016959663602400/1017454145929440

可见,此时创建了app的Beat方法的偏函数,并通过 .run 函数执行启动 beat 进程,首先看看这个 beat 方法:

    # Python 实用宝典
    # https://pythondict.com
    @cached_property
    def Beat(self, **kwargs):
        # 导入celery.apps.beat:Beat类
        return self.subclass_with_self('celery.apps.beat:Beat')

可以看到此时就实例化了 celery.apps.beat 中的 Beat 类,并调用了该实例的 run 方法:

    # Python 实用宝典
    # https://pythondict.com
    def run(self):
        print(str(self.colored.cyan(
            'celery beat v{0} is starting.'.format(VERSION_BANNER))))
        # 初始化loader
        self.init_loader()
        # 设置进程
        self.set_process_title()
        # 开启任务调度
        self.start_scheduler()

init_loader 中,会导入默认的modules,此时会引入相关的定时任务,这些不是本文重点。我们重点看 start_scheduler 是如何开启任务调度的:

    # Python 实用宝典
    # https://pythondict.com
    def start_scheduler(self):
        c = self.colored
        if self.pidfile: # 是否设定了pid文件
            platforms.create_pidlock(self.pidfile)  # 创建pid文件
        # 初始化service
        beat = self.Service(app=self.app,
                            max_interval=self.max_interval,
                            scheduler_cls=self.scheduler_cls,
                            schedule_filename=self.schedule)
        
        # 打印启动信息
        print(str(c.blue('__    ', c.magenta('-'),
                  c.blue('    ... __   '), c.magenta('-'),
                  c.blue('        _\n'),
                  c.reset(self.startup_info(beat)))))
        # 开启日志
        self.setup_logging()
        if self.socket_timeout:
            logger.debug('Setting default socket timeout to %r',
                         self.socket_timeout)
            # 设置超时
            socket.setdefaulttimeout(self.socket_timeout)
        try:
            # 注册handler
            self.install_sync_handler(beat)
            # 开启beat
            beat.start()
        except Exception as exc:
            logger.critical('beat raised exception %s: %r',
                            exc.__class__, exc,
                            exc_info=True)

我们看下beat是如何开启的:

    # Python 实用宝典
    # https://pythondict.com
    def start(self, embedded_process=False, drift=-0.010):
        info('beat: Starting...')
        # 打印最大间隔时间
        debug('beat: Ticking with max interval->%s',
              humanize_seconds(self.scheduler.max_interval))
        
        # 通知注册该signal的函数
        signals.beat_init.send(sender=self)
        if embedded_process:
            signals.beat_embedded_init.send(sender=self)
            platforms.set_process_title('celery beat')

        try:
            while not self._is_shutdown.is_set():
                # 调用scheduler.tick()函数检查还剩多余时间
                interval = self.scheduler.tick()
                interval = interval + drift if interval else interval
                # 如果大于0
                if interval and interval > 0:
                    debug('beat: Waking up %s.',
                          humanize_seconds(interval, prefix='in '))
                    # 休眠
                    time.sleep(interval)
                    if self.scheduler.should_sync():
                        self.scheduler._do_sync()
        except (KeyboardInterrupt, SystemExit):
            self._is_shutdown.set()
        finally:
            self.sync()

这里重点看 self.scheduler.tick() 方法:

    # Python 实用宝典
    # https://pythondict.com
    def tick(self):
        """Run a tick, that is one iteration of the scheduler.

        Executes all due tasks.

        """
        remaining_times = []
        try:
            # 遍历每个周期任务设定
            for entry in values(self.schedule):
                # 下次运行时间
                next_time_to_run = self.maybe_due(entry, self.publisher)
                if next_time_to_run:
                    remaining_times.append(next_time_to_run)
        except RuntimeError:
            pass

        return min(remaining_times + [self.max_interval])

这里通过 self.schedule 拿到了所有存放在用 shelve 写入的 celerybeat-schedule 文件的定时任务,遍历所有定时任务,调用 self.maybe_due 方法:

    # Python 实用宝典
    # https://pythondict.com
    def maybe_due(self, entry, publisher=None):
        # 是否到达运行时间
        is_due, next_time_to_run = entry.is_due()

        if is_due:
            # 打印任务发送日志
            info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
            try:
                # 执行任务
                result = self.apply_async(entry, publisher=publisher)
            except Exception as exc:
                error('Message Error: %s\n%s',
                      exc, traceback.format_stack(), exc_info=True)
            else:
                debug('%s sent. id->%s', entry.task, result.id)
        return next_time_to_run

可以看到,此处会判断任务是否到达定时时间,如果是的话,会调用 apply_async 调用Worker执行任务。如果不是,则返回下次运行时间,让 Beat 进程进行 Sleep,减少进程资源消耗。

到此,我们就讲解完了 Celery Beat 在周期定时任务的检测调度机制,怎么样,小伙伴们有没有什么疑惑?可以在下方留言区留言一起讨论哦。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 高效流程编排引擎 bamboo-pipeline 实战教程

Bamboo-pipeline 是蓝鲸智云旗下SaaS标准运维的流程编排引擎。其具备以下特点:

  • 1. 多种流程模式:支持串行、并行,支持子流程,可以根据全局参数自动选择分支执行,节点失败处理机制可配置。
  • 2. 参数引擎:支持参数共享,支持参数替换。
  • 3. 可交互的任务执行:任务执行中可以随时暂停、继续、撤销,节点失败后可以重试、跳过。

本文需要涉及到 Django, Celery 的前置知识,如果你还不了解这两者,建议谷歌或百度搜索了解一下,或者阅读我之前的文章:

Django:

Python Django快速开发音乐高潮提取网(1)

Python Django快速开发音乐高潮提取网(2)

Python Django快速开发音乐高潮提取网(3)

Pycharm+Django 安装及配置指南

手把手Django+Vue前后端分离入门实战教程

Celery:

Python celery异步快速下载股票数据

Django Celery 异步与定时任务实战教程

网络IO谁更快?Python与Go请求速度对比

什么?Python Celery 也能调度Go worker?

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install bamboo-engine
pip install bamboo-pipeline
pip install django
pip install celery

2. 项目初始化

(选项一:无Django项目)如果你没有任何的现成Django项目,请按下面的流程初始化

由于 bamboo-pipeline 运行时基于 Django 实现,所以需要新建一个 Django 项目:

django-admin startproject easy_pipeline
cd easy_pipeline

在 easy_pipeline.settings.py 下添加如下配置:

from pipeline.eri.celery.queues import *
from celery import Celery

app = Celery("proj")

app.config_from_object("django.conf:settings")

INSTALLED_APPS = [
    ...
    "pipeline",
    "pipeline.engine",
    "pipeline.component_framework",
    "pipeline.eri",
    ...
]

在 easy_pipeline目录下初始化数据库:

python manage.py migrate

(选项二:有Django项目需要使用流程引擎)如果你有现成的PipeLine项目需要使用此流程引擎,请在项目的 settings.py 下添加如下配置:

from pipeline.eri.celery.queues import *
from celery import Celery

app = Celery("proj")

app.config_from_object("django.conf:settings")

INSTALLED_APPS = [
    ...
    "pipeline",
    "pipeline.engine",
    "pipeline.component_framework",
    "pipeline.eri",
    ...
]

然后重新执行migrate,生成pipeline相关的流程模型:

python manage.py migrate

migrate 执行完毕后会如下图所示:

由于是在原有项目上使用流程引擎,可能会遇到一些版本不匹配的问题,如果遇到报错,请排查解决或到蓝鲸官网上进行询问。

3. 简单的流程编排实战

首先在项目目录下启动 celery worker:

python manage.py celery worker -Q er_execute,er_schedule --pool=solo -l info

启动成功类似下图所示:

(注意)如果你是在你的原有Django项目上做改造,它并不一定能够顺利地启动成功,这是因为Pipeline使用了 Django 2.2.24,会存在许多版本不兼容的情况。如果遇到报错,请排查解决或到蓝鲸官网上进行询问。

在下面的例子中,我们将会创建并执行一个简单的流程:

3.1 创建流程APP

在 bamboo_pipeline 中,一个流程由多个组件组成,官方推荐使用APP统一管控组件:

python manage.py create_plugins_app big_calculator

该命令会在 Django 工程根目录下生成拥有以下目录结构的 APP:

big_calculator
├── __init__.py
├── components
│   ├── __init__.py
│   └── collections
│       ├── __init__.py
│       └── plugins.py
├── migrations
│   └── __init__.py
└── static
    └── big_calculator
        └── plugins.js

别忘了把新创建的这个插件添加到 Django 配置的 INSTALLED_APPS 中:

INSTALLED_APPS = (
    ...
    'big_calculator',
    ...
)

3.2 编写流程的Service原子

组件服务 Service 是组件的核心,Service 定义了组件被调用时执行的逻辑,下面让我们实现一个计算传入的参数 n 的阶乘,并把结果写到输出中的 Service,在 big_calculator/components/collections/plugins.py 中输入以下代码:

import math
from pipeline.core.flow.activity import Service


class FactorialCalculateService(Service):

    def execute(self, data, parent_data):
        """
        组件被调用时的执行逻辑
        :param data: 当前节点的数据对象
        :param parent_data: 该节点所属流程的数据对象
        :return:
        """
        n = data.get_one_of_inputs('n')
        if not isinstance(n, int):
            data.outputs.ex_data = 'n must be a integer!'
            return False

        data.outputs.factorial_of_n = math.factorial(n)
        return True

    def inputs_format(self):
        """
        组件所需的输入字段,每个字段都包含字段名、字段键、字段类型及是否必填的说明。
        :return:必须返回一个 InputItem 的数组,返回的这些信息能够用于确认该组件需要获取什么样的输入数据。
        """
        return [
            Service.InputItem(name='integer n', key='n', type='int', required=True)
        ]

    def outputs_format(self):
        """
        组件执行成功时输出的字段,每个字段都包含字段名、字段键及字段类型的说明
        :return: 必须返回一个 OutputItem 的数组, 便于在流程上下文或后续节点中进行引用
        """
        return [
            Service.OutputItem(name='factorial of n', key='factorial_of_n', type='int')
        ]

首先我们继承了 Service 基类,并实现了 execute() 和 outputs_format() 这两个方法,他们的作用如下:

  • execute:组件被调用时执行的逻辑。接收 data 和 parent_data 两个参数。
    其中,data 是当前节点的数据对象,这个数据对象存储了用户传递给当前节点的参数的值以及当前节点输出的值。parent_data 则是该节点所属流程的数据对象,通常会将一些全局使用的常量存储在该对象中,如当前流程的执行者、流程的开始时间等。
  • outputs_format:组件执行成功时输出的字段,每个字段都包含字段名、字段键及字段类型的说明。这个方法必须返回一个 OutputItem 的数组,返回的这些信息能够用于确认某个组件在执行成功时输出的数据,便于在流程上下文或后续节点中进行引用。
  • inputs_format:组件所需的输入字段,每个字段都包含字段名、字段键、字段类型及是否必填的说明。这个方法必须返回一个 InputItem 的数组,返回的这些信息能够用于确认某个组件需要获取什么样的输入数据。

下面我们来看一下 execute() 方法内部执行的逻辑,首先我们尝试从当前节点数据对象的输出中获取输入参数 n,如果获取到的参数不是一个 int 实例,那么我们会将异常信息写入到当前节点输出的 ex_data 字段中,这个字段是引擎内部的保留字段,节点执行失败时产生的异常信息都应该写入到该字段中。随后我们返回 False 代表组件本次执行失败,随后节点会进入失败状态:

n = data.get_one_of_inputs('n')
if not isinstance(n, int):
    data.outputs.ex_data = 'n must be a integer!'
    return False

若获取到的 n 是一个正常的 int,我们就调用 math.factorial() 函数来计算 n 的阶乘,计算完成后,我们会将结果写入到输出的 factorial_of_n 字段中,以供流程中的其他节点使用:

data.outputs.factorial_of_n = math.factorial(n)
return True

3.3 编写流程组件,绑定Service原子

完成 Service 的编写后,我们需要将其与一个 Component 绑定起来,才能够注册到组件库中,在 big_calculator\components\__init__.py 文件下添加如下的代码:

import logging
from pipeline.component_framework.component import Component
from big_calculator.components.collections.plugins import FactorialCalculateService

logger = logging.getLogger('celery')


class FactorialCalculateComponent(Component):
    name = 'FactorialCalculateComponent'
    code = 'fac_cal_comp'
    bound_service = FactorialCalculateService

我们定义了一个继承自基类 Component 的类 FactorialCalculateComponent,他拥有以下属性:

  • name:组件名。
  • code:组件代码,这个代码必须是全局唯一的。
  • bound_service:与该组件绑定的 Service

这样一来,我们就完成了一个流程原子的开发。

3.4 生成流程,测试刚编写的组件

在 big_calculator\test.py 写入以下内容,生成一个流程,测试刚刚编写的组件:

# Python 实用宝典
# 2021/06/20

import time

from bamboo_engine.builder import *
from big_calculator.components import FactorialCalculateComponent
from pipeline.eri.runtime import BambooDjangoRuntime
from bamboo_engine import api
from bamboo_engine import builder


def bamboo_playground():
    """
    测试流程引擎
    """
    # 使用 builder 构造出流程描述结构
    start = EmptyStartEvent()
    # 这里使用 我们刚创建好的n阶乘组件
    act = ServiceActivity(component_code=FactorialCalculateComponent.code)
    # 传入参数
    act.component.inputs.n = Var(type=Var.PLAIN, value=4)
    end = EmptyEndEvent()

    start.extend(act).extend(end)

    pipeline = builder.build_tree(start)
    api.run_pipeline(runtime=BambooDjangoRuntime(), pipeline=pipeline)

    # 等待 1s 后获取流程执行结果
    time.sleep(1)

    result = api.get_execution_data_outputs(BambooDjangoRuntime(), act.id).data

    print(result)

随后,在命令行输入

python manage.py shell

打开 django console, 输入以下命令,执行此流程:

from big_calculator.test import bamboo_playground
bamboo_playground()

流程运行完后,获取节点的执行结果,可以看到,该节点输出了 factorial_of_n,并且值为 24(4 * 3 * 2 *1),这正是我们需要的效果:

{'_loop': 0, '_result': True, 'factorial_of_n': 24}

恭喜你,你已经成功的创建了一个流程并把它运行起来了!在这期间你可能会遇到不少的坑,建议尝试先自行解决,如果实在无法解决,可以前往 标准运维 仓库提 issues,或者前往蓝鲸智云官网提问。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化全方位实战教程

本文讲解了Pandas性能优化的几种方法,比如第一篇文章讲到了transform函数的应用、Cython编写C扩展、减少类型转换、使用特殊的数组。第二篇Pandas基础优化讲到了尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。第三篇进阶版优化讲到了一些更高级的优化技巧。

1.Pandas 性能优化 40 倍 – DataFrame

1. 1 性能优化小试牛刀

大名鼎鼎的Pandas是数据分析的神器。有时候我们需要对上千万甚至上亿的数据进行非常复杂处理,那么运行效率就是一个不能忽视的问题。

比如下面这个简单例子,我们随机生成100万条数据,对val这一列进行处理:如果是偶数则减1,奇数则加1。实际的数据分析工作要比这个例子复杂的多,但考虑到我们没有那么多时间等待运行结果,所以就偷个懒吧。可以看到transform函数的平均运行时间是284ms:

import pandas as pd
import numpy as np

def gen_data(size):
    d = dict()
    d["genre"] = np.random.choice(["A""B""C""D"], size=size)
    d["val"] = np.random.randint(low=0, high=100, size=size)
    return pd.DataFrame(d)

data = gen_data(1000000)
data.head()
def transform(data):
    data.loc[:, "new_val"] = data.val.apply(lambda x: x + 1 if x % 2 else x - 1)

%timeit -n 1 transform(data)
284 ms ± 8.95 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.2. 用Cython编写C扩展

试试用我们的老朋友Cython来写一下 x + 1 if x % 2 else x - 1 这个函数。平均运行时间降低到了202ms,果然速度变快了。性能大约提升了1.4倍,离40倍的flag还差的好远。

%load_ext cython
%%cython
cpdef int _transform(int x):
    if x % 2:
        return x + 1
    return x - 1

def transform(data):
    data.loc[:, "new_val"] = data.val.apply(_transform)

%timeit -n 1 transform(data)
202 ms ± 13.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.3. 减少类型转换

为了减少C和Python之间的类型转换,我们直接把val这一列作为Numpy数组传递给Cython函数,注意区分cnpnp。平均运行时间直接降到10.8毫秒,性能大约提升了26倍,仿佛看到了一丝希望。

%%cython
import numpy as np
cimport numpy as cnp
ctypedef cnp.int_t DTYPE_t

cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr):
    cdef:
        int i = 0
        int n = arr.shape[0]
        int x
        cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)

    while i < n:
        x = arr[i]
        if x % 2:
            new_arr[i] = x + 1
        else:
            new_arr[i] = x - 1
        i += 1
    return new_arr

def transform(data):
    data.loc[:, "new_val"] = _transform(data.val.values)

%timeit -n 1 transform(data)
10.8 ms ± 512 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.4. 使用不安全的数组

利用@cython.boundscheck(False)@cython.wraparound(False)装饰器关闭数组的边界检查和负下标处理,平均运行时间变为5.9毫秒。性能提升了42倍左右,顺利完成任务。

%%cython
import cython
import numpy as np
cimport numpy as cnp
ctypedef cnp.int_t DTYPE_t

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr):
    cdef:
        int i = 0
        int n = arr.shape[0]
        int x
        cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)

    while i < n:
        x = arr[i]
        if x % 2:
            new_arr[i] = x + 1
        else:
            new_arr[i] = x - 1
        i += 1
    return new_arr

def transform(data):
    data.loc[:, "new_val"] = _transform(data.val.values)

%timeit -n 1 transform(data)
6.76 ms ± 545 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

来源:Python中文社区

作者:李小文,先后从事过数据分析、数据挖掘工作,主要开发语言是Python,现任一家小型互联网公司的算法工程师。https://github.com/tushushu

2.Pandas性能优化:基础篇

Pandas 号称“数据挖掘瑞士军刀”,是数据处理最常用的库。在数据挖掘或者kaggle比赛中,我们经常使用pandas进行数据提取、分析、构造特征。而如果数据量很大,操作算法复杂,那么pandas的运行速度可能非常慢。本文根据实际工作中的经验,总结了一些pandas的使用技巧,帮助提高运行速度或减少内存占用。

2.1 按行迭代优化

很多时候,我们会按行对dataframe进行迭代,一般我们会用iterrows这个函数。在新版的pandas中,提供了一个更快的itertuples函数。

我们测试一下速度:

import pandas as pd
import numpy as np
import time
df = pd.DataFrame({'a': np.random.randn(1000),
                     'b': np.random.randn(1000),
                    'N': np.random.randint(100, 1000, (1000)),
                   'x':  np.random.randint(1, 10, (1000))})

%%timeit
a2=[]
for index,row in df.iterrows():
    temp=row['a']
    a2.append(temp*temp)
df['a2']=a2    

67.6 ms ± 3.69 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
a2=[]
for row in df.itertuples():
    temp=getattr(row, 'a')
    a2.append(temp*temp)
df['a2']=a2

1.54 ms ± 168 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看到直接循环,itertuples速度是iterrows的很多倍。

所以如果在必须要对dataframe进行遍历的话,直接用itertuples替换iterrows。

2.2 apply 优化

一般情况下,如果要对dataframe里的数据逐行处理,而不需要上下文信息,可以使用apply函数。
对于上面的例子,我们使用apply看下:

%%timeit
df['a2']=df.a.apply(lambda x: x*x)

360 µs ± 355 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看到,效率又有提升,apply的速度是itertuples的5倍左右。

注意一下,apply不光能对单个列值做处理,也能对多个列的值做处理。

%%timeit
df['a3']=df.apply( lambda row: row['a']*row['b'],axis=1)

15 ms ± 1.61 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)

这里看出来,多值处理的时候几乎等于iterrows。因此比单列值apply慢了许多,所以这里不推荐对整行进行apply。

我们可以简单的这样改写:

%%timeit
df['a3']=df['a']*df['b']

204 µs ± 8.31 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

如果在计算的时候,使用series.values 提取numpy 数组并使用numpy原生函数计算,效率可能更高

%%timeit
df['a3']=np.multiply(df['a'].values,df['b'].values)

93.3 µs ± 1.45 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

2.3 聚合agg效率优化

有的时候,我们会对一列数据按值进行分组(groupby),再分组后,依次对每一组数据中的其他列进行聚合(agg)。

还是上面的那个dataframe,我们看下:
采用自定义函数的agg函数:

%%timeit
df.groupby("x")['a'].agg(lambda x:x.sum())

1.27 ms ± 45.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

采用agg内置函数:

%%timeit
df.groupby("x")['a'].agg(sum)
#等价df.groupby("x")['a'].sum()

415 µs ± 20.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

刚才是单列聚合,我们看下多列聚合:
自定义函数:

%%timeit
df.groupby("x").agg({"a":lambda x:x.sum(),"b":lambda x:x.count()})

2.6 ms ± 8.17 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

采用内置函数:

%%timeit
df.groupby("x").agg({"a":"sum","b":"count"})

1.33 ms ± 29.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看出,对于多列聚合,内置函数仍然比自定义函数快1倍。所以再进行聚合操作时,尽量使用内置函数提高效率。

下面列出了一些内置函数:

内置函数描述
count计数
sum求和
mean平均值
median中位数
min,max最大值/最小值
std,var标准差/方差
prod求积

当我们需要统计的方法没有对于内置函数的情况下,在自定义函数的时候,优先选用pandas或numpy内置的其他高效函数,如

df.groupby("x")['a'].agg(lambda x:np.sum(np.abs(x)))

在数据量很大的时候要比非numpy函数效率高(数据量小的时候差不多)。

2.4 数据读取优化

如果我们需要用pandas读取较多次文件,或者读取的文件较大,那么这方面占用的时间也会较长,我们也需要对其进行优化。pandas最常见读取函数是read_csv函数,可以对csv文件进行读取。但是pandas read_csv对读取较大的、数据结构复杂的文件,效率不是很理想。

这里有几种方法可以优化:

1. 分块读取csv文件
如果文件过大,内存不够或者时间上等不及,可以分块进行读取。

chunksize = 10 ** 6for chunk in pd.read_csv(filename,  chunksize=chunksize):
     process(chunk)

这里也可以使用多进程方式,具体我们在后面进阶篇介绍。

  1. 过滤掉不需要的列

如果读取的文件列很多,可以使用usecols字段,只load需要的列,提高效率,节约内存。

df = pd.read_csv(filename,  usecols=["a","b","c"]):
  1. 为列指定类型
    panda在read_csv的时候,会自动匹配列的数值类型,这样会导致速度很慢,并且占用内存较大。我们可以为每个列指定类型。
df = pd.read_csv("f500.csv", dtype = {"revenues" : "float64","name":str})
  1. 保存为其他格式
    如果需要频繁的读取和写入,则可以将文件保存为其他格式的,如pickle或hdf。pickle和hdf读取速度是csv的数倍。这里注意一下,pickle比原csv略小,hdf比原csv略大。
 import pandas as pd
 #读取csv
 df = pd.read_csv('filename.csv')
 
 #pickle格式
 df.to_pickle('filename.pkl') 
 df = pd.read_pickle('filename.pkl') 
 
 #hdf格式
 df.to_hdf('filename.hdf','df') 
 df = pd.read_hdf('filename.hdf','df') 

file typetimespeed
csv1.93 s1
pickle415 m4.6
hdf808 ms2.3
  1. 用第三方的包读取
    如可以使用Dask DataFrame读取大文件。第三方包放到最后细讲。

2.5 优化数据处理逻辑

这点算是业务角度优化。如果我们能直接数据处理的步骤,那么处理时间就少了很多。

这需要具体问题具体分析,举个例子,假设我们有一个通讯记录数据集:

call_areacall_seconds
03364
23075
25847
12032

call_seconds 是拨打的时长,单位是秒。
call_area=1 是拨打国内电话,费率是0.1/min,call_area=0 是拨打国外电话,费率是0.7/min
call_area=2 是接听电话,不收费。

我们随机生成一批数据:

import pandas as pd
import numpy as np
import time
import math
row_number=10000
df = pd.DataFrame({'call_area':  np.random.randint(0, 3, (row_number)),
                   'call_seconds':  np.random.randint(0, 10000, (row_number))})

如果我们想计算每个电话的费用:

def get_cost(call_area,call_seconds):
    if call_area==1:
        rate=0.1
    elif call_area==0:
        rate=0.7
    else:
        return 0
    return math.ceil(call_seconds/60)*rate

方法1,采用按行迭代循环

%%timeit
cost_list = []
for i,row in df.iterrows():
    call_area=row['call_area']
    call_seconds=row['call_seconds']
    cost_list.append(get_cost(call_area,call_seconds))
df['cost'] = cost_list

方法2,采用apply行的方式

%%timeit
df['cost']=df.apply(
         lambda row: get_cost(
             row['call_area'],
             row['call_seconds']),
         axis=1)

方法3,采用mask+loc,分组计算

%%timeit
mask1=df.call_area==1
mask2=df.call_area==0
mask3=df.call_area==2
df['call_mins']=np.ceil(df['call_seconds']/60)
df.loc[mask1,'cost']=df.loc[mask1,'call_mins']*0.1
df.loc[mask2,'cost']=df.loc[mask2,'call_mins']*0.7
df.loc[mask3,'cost']=0

方法4,使用numpy的方式,可以使用index的方式找到对应的费用。

%%timeit
prices = np.array([0.7, 0.1, 0])
mins=np.ceil(df['call_seconds'].values/60)
df['cost']=prices[df.call_area]*mins

测试结果:

方法运行时间运行速度
iterrows513 ms1
apply181 ms2.8
loc6.44 ms79.6
numpy219 µs2342

方法4速度快是因为它采用了numpy向量化的数据处理方式。

总结

在优化尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。

3.Pandas性能优化:进阶篇

在这里介绍一些更高级的pandas优化方法。

3.1 numpy

我们先来回顾一下上节说过的一个例子

import pandas as pd
import numpy as np
import time
row_number=100000
df = pd.DataFrame({'a': np.random.randn(row_number),
                     'b': np.random.randn(row_number),
                    'N': np.random.randint(100, 1000, (row_number)),
                   'x':  np.random.randint(1, 10, (row_number))})

我们要计算a列与b列的乘积

方法1,采用apply

%timeit df.apply( lambda row: row['a']*row['b'],axis=1)

方法2,直接对series做乘法

%timeit df['a']*df['b']

方法3,使用numpy函数

%timeit  np.multiply(df['a'].values,df['b'].values)
方法运行时间运行速度
方法11.45s1
方法2254µs5708
方法341.2 µs3536

这提示我们,采用一些好的方法可以大幅度提高pandas的运行速度。

3.2 cython

我们还继续使用上面的dataframe,现在定义一个函数:

def f(x):
    return x * (x - 1)

def integrate_f(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f(a + i * dx)
    return s * dx

我们要计算每一行integrate_f的值,

方法1,还是apply:

%timeit df.apply(lambda x: integrate_f(x['a'], x['b'], x['N'].astype(int)), axis=1)

这个函数运行时间就较长了:

7.05 s ± 54.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

考虑可以用cython重写程序,提高效率。
在使用cython的时候,可能需要安装gcc环境或者mingw(windows)。

方法1,直接加头编译

%load_ext Cython
%%cython
def f_plain(x):
    return x * (x - 1)

def integrate_f_plain(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

6.46 s ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

看来直接加头,效率提升不大。

方法2,使用c type

%%cython
cdef double f_typed(double x) except? -2:
     return x * (x - 1)
cpdef double integrate_f_typed(double a, double b, int N):
    cdef int i
    cdef double s, dx
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_typed(a + i * dx)
    return s * dx

345 ms ± 529 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

可以看到,使用cython的特定编程方法,效率提升较大。

3.3 numba

numba是一个动态JIT编译器,在一些数值计算中可以大幅度提高运行速度。
我们学cython,在python程序上直接加numba jit的头。

import numba

@numba.jitdef f_plain(x):
    return x * (x - 1)

@numba.jitdef integrate_f_numba(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

@numba.jitdef apply_integrate_f_numba(col_a, col_b, col_N):
    n = len(col_N)
    result = np.empty(n, dtype='float64')
    assert len(col_a) == len(col_b) == n
    for i in range(n):
        result[i] = integrate_f_numba(col_a[i], col_b[i], col_N[i])
    return result

def compute_numba(df):
    result = apply_integrate_f_numba(df['a'].to_numpy(),
                                     df['b'].to_numpy(),
                                     df['N'].to_numpy())
    return pd.Series(result, index=df.index, name='result')

 %timeit compute_numba(df)

6.44 ms ± 440 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

我们看到,使用numba,需要做的代码改动较小,效率提升幅度却很大!

3.4 进阶 并行化处理

并行化读取数据

在基础篇讲分块读取时,简单提了一下并行化处理,这里详细说下代码。

第一种思路,分块读取,多进程处理。

import pandas as pd
from multiprocessing import Pool
 
def process(df):
    """
  数据处理
    """
    pass
 
# initialise the iterator object
iterator = pd.read_csv('train.csv', chunksize=200000, compression='gzip',
skipinitialspace=True, encoding='utf-8')
# depends on how many cores you want to utilise
max_processors = 4# Reserve 4 cores for our script
pool = Pool(processes=max_processors)
f_list = []
for df in iterator:
    # 异步处理每个分块
    f = pool.apply_async(process, [df])
    f_list.append(f)
    if len(f_list) >= max_processors:
        for f in f_list:
            f.get()
            del f_list[:]

第二种思路,把大文件拆分成多份,多进程读取。

利用linux中的split命令,将csv切分成p个文件。

!split -l 200000 -d train.csv train_split
#将文件train.csv按每200000行分割,前缀名为train_split,并设置文件命名为数字

代码部分

from multiprocessing import Pool
import pandas as pd
import os
 
def read_func(file_path):
    df = pd.read_csv(file_path, header=None)
    return df
 
def read_file():
    file_list=["train_split%02d"%i for i in range(66)]
    p = Pool(4)
    res = p.map(read_func, file_list)
    p.close()
    p.join()
    df = pd.concat(res, axis=0, ignore_index=True)
    return df
 
df = read_file()

并行化apply

apply的func如果在用了我们之前说的技术优化了速度之后仍然很慢,或者func遇到网络阻塞,那么我们需要去并行化执行apply。这里提供一种处理思路:

import multiprocessing as mp
import time
def slow_func(s):
    time.sleep(1)
    return "done"with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(slow_func, df['qid'])

3.5 进阶 第三方pandas库

由于padans的操作如apply,都是单线程的,直接调用效率不高。我可以使用第三方库进行并行操作。
当然第三方库会带来新的代码不兼容问题。我们有时候会考虑像上一章一样,手写并行化处理。这个权衡需要我们在编程之初就要规划好,避免后期因为bug需要重构。

dask库

pip install dask

类pandas库,可以并行读取、运行。

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import getand the syntax isdata = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def some_function(x,y,z):
    return x+y+z

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1))
.compute(get=get)  

swifter

pip install swifter

pandas的插件,可以直接在pandas上操作:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

Modin库

Modin后端使用dask或者ray,是个支持分布式运行的类pandas库,当然功能异常强大。具体请看官网,这里就不具体介绍了。

https://modin.readthedocs.io/en/latest/using_modin.html

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Prometheus + Granafa 40分钟构建MySQL监控平台实战教程

Prometheus + Granafa 概述

对于MySQL的监控平台,相信大家实现起来有很多了:基于天兔的监控,还有基于zabbix相关的二次开发。相信很多同行都应该已经开始玩起来了。我这边的选型是Prometheus + Granafa的实现方式。简而言之就是我现在的生产环境使用的是prometheus,还有就是granafa满足的我的日常工作需要。在入门的简介和安装,大家可以参考这里:

https://blog.51cto.com/cloumn/detail/77

1、首先看下我们的监控效果、mysql主从

构建高大上的MySQL监控平台

2、mysql状态:

构建高大上的MySQL监控平台

构建高大上的MySQL监控平台

3、缓冲池状态:

构建高大上的MySQL监控平台

exporter 相关部署实战教程

1、安装exporter

    [root@controller2 opt]# https://github.com/prometheus/mysqld_exporter/releases/download/v0.10.0/mysqld_exporter-0.10.0.linux-amd64.tar.gz
    [root@controller2 opt]# tar -xf mysqld_exporter-0.10.0.linux-amd64.tar.gz 

2、添加mysql 账户:

    GRANT SELECT, PROCESS, SUPER, REPLICATION CLIENT, RELOAD ON *.* TO 'exporter'@'%' IDENTIFIED BY 'localhost';
    flush privileges;

3、编辑配置文件:

    [root@controller2 mysqld_exporter-0.10.0.linux-amd64]# cat /opt/mysqld_exporter-0.10.0.linux-amd64/.my.cnf 
    [client]
    user=exporter
    password=123456

4、设置配置文件:

    [root@controller2 mysqld_exporter-0.10.0.linux-amd64]# cat /etc/systemd/system/mysql_exporter.service 
    [Unit]
    Description=mysql Monitoring System
    Documentation=mysql Monitoring System

    [Service]
    ExecStart=/opt/mysqld_exporter-0.10.0.linux-amd64/mysqld_exporter \
             -collect.info_schema.processlist \
             -collect.info_schema.innodb_tablespaces \
             -collect.info_schema.innodb_metrics  \
             -collect.perf_schema.tableiowaits \
             -collect.perf_schema.indexiowaits \
             -collect.perf_schema.tablelocks \
             -collect.engine_innodb_status \
             -collect.perf_schema.file_events \
             -collect.info_schema.processlist \
             -collect.binlog_size \
             -collect.info_schema.clientstats \
             -collect.perf_schema.eventswaits \
             -config.my-cnf=/opt/mysqld_exporter-0.10.0.linux-amd64/.my.cnf

    [Install]
    WantedBy=multi-user.target

5、添加配置到prometheus server

      - job_name: 'mysql'
        static_configs:
         - targets: ['192.168.1.11:9104','192.168.1.12:9104']

6、测试看有没有返回数值:

http://192.168.1.12:9104/metrics

正常我们通过mysql_up可以查询倒mysql监控是否已经生效,是否起起来

    #HELP mysql_up Whether the MySQL server is up.
    #TYPE mysql_up gauge
    mysql_up 1

监控相关指标

在做任何一个东西监控的时候,我们要时刻明白我们要监控的是什么,指标是啥才能更好的去监控我们的服务,在mysql里面我们通常可以通过一下指标去衡量mysql的运行情况:mysql主从运行情况、查询吞吐量、慢查询情况、连接数情况、缓冲池使用情况以及查询执行性能等。

主从复制运行指标:

1、主从复制线程监控:

大部分情况下,很多企业使用的都是主从复制的环境,监控两个线程是非常重要的,在mysql里面我们通常是通过命令:

    MariaDB [(none)]> show slave status\G;
    *************************** 1. row ***************************
                   Slave_IO_State: Waiting for master to send event
                      Master_Host: 172.16.1.1
                      Master_User: repl
                      Master_Port: 3306
                    Connect_Retry: 60
                  Master_Log_File: mysql-bin.000045
              Read_Master_Log_Pos: 72904854
                   Relay_Log_File: mariadb-relay-bin.000127
                    Relay_Log_Pos: 72905142
            Relay_Master_Log_File: mysql-bin.000045
                 Slave_IO_Running: Yes
                Slave_SQL_Running: Yes

Slave_IO_Running、Slave_SQL_Running两个线程正常那么说明我们的复制集群是健康状态的。

MySQLD Exporter中返回的样本数据中通过mysql_slave_status_slave_sql_running来获取主从集群的健康状况。

    # HELP mysql_slave_status_slave_sql_running Generic metric from SHOW SLAVE STATUS.
    # TYPE mysql_slave_status_slave_sql_running untyped
    mysql_slave_status_slave_sql_running{channel_name="",connection_name="",master_host="172.16.1.1",master_uuid=""} 1

2、主从复制落后时间:

在使用show slave status
里面还有一个关键的参数Seconds_Behind_Master。Seconds_Behind_Master表示slave上SQL thread与IO thread之间的延迟,我们都知道在MySQL的复制环境中,slave先从master上将binlog拉取到本地(通过IO thread),然后通过SQL
thread将binlog重放,而Seconds_Behind_Master表示本地relaylog中未被执行完的那部分的差值。所以如果slave拉取到本地的relaylog(实际上就是binlog,只是在slave上习惯称呼relaylog而已)都执行完,此时通过show slave status看到的会是0

Seconds_Behind_Master: 0

MySQLD Exporter中返回的样本数据中通过mysql_slave_status_seconds_behind_master 来获取相关状态。

    # HELP mysql_slave_status_seconds_behind_master Generic metric from SHOW SLAVE STATUS.
    # TYPE mysql_slave_status_seconds_behind_master untyped
    mysql_slave_status_seconds_behind_master{channel_name="",connection_name="",master_host="172.16.1.1",master_uuid=""} 0

查询吞吐量:

说到吞吐量,那么我们如何从那方面来衡量呢? 
通常来说我们可以根据mysql 的插入、查询、删除、更新等操作来

为了获取吞吐量,MySQL 有一个名为 Questions 的内部计数器(根据 MySQL
用语,这是一个服务器状态变量),客户端每发送一个查询语句,其值就会加一。由 Questions 指标带来的以客户端为中心的视角常常比相关的Queries
计数器更容易解释。作为存储程序的一部分,后者也会计算已执行语句的数量,以及诸如PREPARE 和 DEALLOCATE PREPARE
指令运行的次数,作为服务器端预处理语句的一部分。可以通过命令来查询:

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Questions";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    |
 Questions     | 15071 |
    +---------------+-------+

MySQLD Exporter中返回的样本数据中通过mysql_global_status_questions反映当前Questions计数器的大小:

    # HELP mysql_global_status_questions Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_questions untyped
    mysql_global_status_questions 13253

当然由于prometheus
具有非常丰富的查询语言,我们可以通过这个累加的计数器来查询某一短时间内的查询增长率情况,可以做相关的阈值告警处理、例如一下查询2分钟时间内的查询情况:

rate(mysql_global_status_questions[2m])

当然上面是总量,我们可以分别从监控读、写指令的分解情况,从而更好地理解数据库的工作负载、找到可能的瓶颈。通常,通常,读取查询会由 Com_select
指标抓取,而写入查询则可能增加三个状态变量中某一个的值,这取决于具体的指令:

Writes = Com_insert + Com_update + Com_delete

下面我们通过命令获取插入的情况:

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Com_insert";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    |
 Com_insert    | 10578 |
    +---------------+-------+

从MySQLD
Exporter的/metrics返回的监控样本中,可以通过global_status_commands_total获取当前实例各类指令执行的次数:

    # HELP mysql_global_status_commands_total Total number of executed MySQL commands.
    # TYPE mysql_global_status_commands_total counter
    mysql_global_status_commands_total{command="create_trigger"} 0
    mysql_global_status_commands_total{command="create_udf"} 0
    mysql_global_status_commands_total{command="create_user"} 1
    mysql_global_status_commands_total{command="create_view"} 0
    mysql_global_status_commands_total{command="dealloc_sql"} 0
    mysql_global_status_commands_total{command="delete"} 3369
    mysql_global_status_commands_total{command="delete_multi"} 0

慢查询性能

查询性能方面,慢查询也是查询告警的一个重要的指标。MySQL还提供了一个Slow_queries的计数器,当查询的执行时间超过long_query_time的值后,计数器就会+1,其默认值为10秒,可以通过以下指令在MySQL中查询当前long_query_time的设置:

    MariaDB [(none)]> SHOW VARIABLES LIKE 'long_query_time';
    +-----------------+-----------+
    | Variable_name   | Value     |
    +-----------------+-----------+
    |
 long_query_time | 10.000000 |
    +-----------------+-----------+
    1 row in set (0.00 sec)

当然我们也可以修改时间

    MariaDB [(none)]> SET GLOBAL long_query_time = 5;
    Query OK, 0 rows affected (0.00 sec)

然后我们而已通过sql语言查询MySQL实例中Slow_queries的数量:

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Slow_queries";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    |
 Slow_queries  | 0     |
    +---------------+-------+
    1 row in set (0.00 sec)

MySQLD
Exporter返回的样本数据中,通过mysql_global_status_slow_queries指标展示当前的Slow_queries的值:

    # HELP mysql_global_status_slow_queries Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_slow_queries untyped
    mysql_global_status_slow_queries 0

同样的,更具根据Prometheus 慢查询语句我们也可以查询倒他某段时间内的增长率:

rate(mysql_global_status_slow_queries[5m])

连接数监控

监控客户端连接情况相当重要,因为一旦可用连接耗尽,新的客户端连接就会遭到拒绝。MySQL 默认的连接数限制为 151。

    MariaDB [(none)]> SHOW VARIABLES LIKE 'max_connections';
    +-----------------+-------+
    | Variable_name   | Value |
    +-----------------+-------+
    |
 max_connections | 151   |
    +-----------------+-------+

当然我们可以修改配置文件的形式来增加这个数值。与之对应的就是当前连接数量,当我们当前连接出来超过系统设置的最大值之后常会出现我们看到的Too many
connections(连接数过多),下面我查找一下当前连接数:

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Threads_connected";
    +-------------------+-------+
    | Variable_name     | Value |
    +-------------------+-------+
    |
 Threads_connected | 41     |
    +-------------------+-------

当然mysql 还提供Threads_running 这个指标,帮助你分隔在任意时间正在积极处理查询的线程与那些虽然可用但是闲置的连接。

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Threads_running";
    +-----------------+-------+
    | Variable_name   | Value |
    +-----------------+-------+
    |
 Threads_running | 10     |
    +-----------------+-------+

如果服务器真的达到 max_connections
限制,它就会开始拒绝新的连接。在这种情况下,Connection_errors_max_connections
指标就会开始增加,同时,追踪所有失败连接尝试的Aborted_connects 指标也会开始增加。

MySQLD Exporter返回的样本数据中:

    # HELP mysql_global_variables_max_connections Generic gauge metric from SHOW GLOBAL VARIABLES.
    # TYPE mysql_global_variables_max_connections gauge
    mysql_global_variables_max_connections 151         

表示最大连接数

    # HELP mysql_global_status_threads_connected Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_threads_connected untyped
    mysql_global_status_threads_connected 41

表示当前的连接数

    # HELP mysql_global_status_threads_running Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_threads_running untyped
    mysql_global_status_threads_running 1

表示当前活跃的连接数

    # HELP mysql_global_status_aborted_connects Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_aborted_connects untyped
    mysql_global_status_aborted_connects 31

累计所有的连接数

    # HELP mysql_global_status_connection_errors_total Total number of MySQL connection errors.
    # TYPE mysql_global_status_connection_errors_total counter
    mysql_global_status_connection_errors_total{error="internal"} 0
    #服务器内部引起的错误、如内存硬盘等
    mysql_global_status_connection_errors_total{error="max_connections"} 0
    #超出连接处引起的错误

当然根据prom表达式,我们可以查询当前剩余可用的连接数:

mysql_global_variables_max_connections - mysql_global_status_threads_connected

查询mysq拒绝连接数

mysql_global_status_aborted_connects

缓冲池情况:

MySQL 默认的存储引擎 InnoDB
使用了一片称为缓冲池的内存区域,用于缓存数据表与索引的数据。缓冲池指标属于资源指标,而非工作指标,前者更多地用于调查(而非检测)性能问题。如果数据库性能开始下滑,而磁盘
I/O 在不断攀升,扩大缓冲池往往能带来性能回升。 
默认设置下,缓冲池的大小通常相对较小,为 128MiB。不过,MySQL 建议可将其扩大至专用数据库服务器物理内存的 80% 大小。我们可以查看一下:

    MariaDB [(none)]> show global variables like 'innodb_buffer_pool_size';
    +-------------------------+-----------+
    | Variable_name           | Value     |
    +-------------------------+-----------+
    |
 innodb_buffer_pool_size | 134217728 |
    +-------------------------+-----------+

MySQLD Exporter返回的样本数据中,使用mysql_global_variables_innodb_buffer_pool_size来表示。

    # HELP mysql_global_variables_innodb_buffer_pool_size Generic gauge metric from SHOW GLOBAL VARIABLES.
    # TYPE mysql_global_variables_innodb_buffer_pool_size gauge
    mysql_global_variables_innodb_buffer_pool_size 1.34217728e+08

    Innodb_buffer_pool_read_requests记录了正常从缓冲池读取数据的请求数量。可以通过以下指令查看

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Innodb_buffer_pool_read_requests";
    +----------------------------------+-------------+
    | Variable_name                    | Value       |
    +----------------------------------+-------------+
    |
 Innodb_buffer_pool_read_requests | 38465 |
    +----------------------------------+-------------+

MySQLD
Exporter返回的样本数据中,使用mysql_global_status_innodb_buffer_pool_read_requests来表示。

    # HELP mysql_global_status_innodb_buffer_pool_read_requests Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_innodb_buffer_pool_read_requests untyped
    mysql_global_status_innodb_buffer_pool_read_requests 2.7711547168e+10

当缓冲池无法满足时,MySQL只能从磁盘中读取数据。Innodb_buffer_pool_reads即记录了从磁盘读取数据的请求数量。通常来说从内存中读取数据的速度要比从磁盘中读取快很多,因此,如果Innodb_buffer_pool_reads的值开始增加,可能意味着数据库的性能有问题。
可以通过以下只能查看Innodb_buffer_pool_reads的数量

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Innodb_buffer_pool_reads";
    +--------------------------+-------+
    | Variable_name            | Value |
    +--------------------------+-------+
    |
 Innodb_buffer_pool_reads | 138  |
    +--------------------------+-------+
    1 row in set (0.00 sec)

MySQLD
Exporter返回的样本数据中,使用mysql_global_status_innodb_buffer_pool_read_requests来表示。

    # HELP mysql_global_status_innodb_buffer_pool_reads Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_innodb_buffer_pool_reads untyped
    mysql_global_status_innodb_buffer_pool_reads 138

通过以上监控指标,以及实际监控的场景,我们可以利用PromQL快速建立多个监控项。可以查看两分钟内读取磁盘的增长率的增长率:

rate(mysql_global_status_innodb_buffer_pool_reads[2m])

官方模板ID

上面是我们简单列举的一些指标,下面我们使用granafa给 MySQLD_Exporter添加监控图表:

  • 主从主群监控(模板7371):

  • 相关mysql 状态监控7362:

  • 缓冲池状态7365:

  • 简单的告警规则

除了相关模板之外,没有告警规则那么我们的监控就是不完美的,下面列一下我们的监控告警规则

    groups:
    - name: MySQL-rules
      rules:
      - alert: MySQL Status 
        expr: up == 0
        for: 5s 
        labels:
          severity: warning
        annotations:
          summary: "{{$labels.instance}}: MySQL has stop !!!"
          description: "检测MySQL数据库运行状态"

      - alert: MySQL Slave IO Thread Status
        expr: mysql_slave_status_slave_io_running == 0
        for: 5s 
        labels:
          severity: warning
        annotations: 
          summary: "{{$labels.instance}}: MySQL Slave IO Thread has stop !!!"
          description: "检测MySQL主从IO线程运行状态"

      - alert: MySQL Slave SQL Thread Status 
        expr: mysql_slave_status_slave_sql_running == 0
        for: 5s 
        labels:
          severity: warning
        annotations: 
          summary: "{{$labels.instance}}: MySQL Slave SQL Thread has stop !!!"
          description: "检测MySQL主从SQL线程运行状态"

      - alert: MySQL Slave Delay Status 
        expr: mysql_slave_status_sql_delay == 30
        for: 5s 
        labels:
          severity: warning
        annotations: 
          summary: "{{$labels.instance}}: MySQL Slave Delay has more than 30s !!!"
          description: "检测MySQL主从延时状态"

      - alert: Mysql_Too_Many_Connections
        expr: rate(mysql_global_status_threads_connected[5m]) > 200
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "{{$labels.instance}}: 连接数过多"
          description: "{{$labels.instance}}: 连接数过多,请处理 ,(current value is: {{ $value }})"  

      - alert: Mysql_Too_Many_slow_queries
        expr: rate(mysql_global_status_slow_queries[5m]) > 3
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "{{$labels.instance}}: 慢查询有点多,请检查处理"
          description: "{{$labels.instance}}: Mysql slow_queries is more than 3 per second ,(current value is: {{ $value }})"

2、添加规则到prometheus:

    rule_files:
      - "rules/*.yml" 

3、打开web ui我们可以看到规则生效了:

构建高大上的MySQL监控平台

总结

到处监控mysql的相关状态已经完成,大家可以根据mysql更多的监控指标去完善自己的监控,当然这一套就是我用在线上环境的,可以参考参考。

来源:https://blog.51cto.com/xiaoluoge/2476375
作者:小罗ge11

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典