分类目录归档:性能优化

教你纯原生实现Redis简易客户端(绕过Qmt白名单)

Redis 是我们在开发过程中经常会用到的内存数据库,尤其是在Python的第三方模块Redis-py的支持下,在Python中使用Redis及其方便。

但是在有些情况下,我们无法使用像Redis-py这样的第三方模块(比如QMT),这时候就需要自己实现一个简易版的Redis-py了。

本文将教大家如何用20行代码,制作一个简易版的Redis客户端,不过仅以GET命令为例,其他命令的用法也差不多。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

2.原理剖析

其实通过Redis GET返回的数据就是一些字符串,这些字符串的格式如下:

b'$466\r\n\x80\x04\x95\xc7\x01\x00\x00\x00\x00\x00\x00]\x94(\x8c\x06000957\x94\x8c\x06002031\x94\x8c\x06000899\x94\x8c\x06300339\x94\x8c\x06002090\x94\x8c\x06601016\x94\x8c\x06002547\x94\x8c\x06002863\x94\x8c\x06002591\x94\x8c\x06002514\x94\x8c\x06000629\x94\x8c\x06002204\x94\x8c\x06000544\x94\x8c\x06002374\x94\x8c\x06000821\x94\x8c\x06000625\x94\x8c\x06000158\x94\x8c\x06002703\x94\x8c\x06002866\x94\x8c\x06600686\x94\x8c\x06002796\x94\x8c\x06300598\x94\x8c\x06002101\x94\x8c\x06002454\x94\x8c\x06000970\x94\x8c\x06000631\x94\x8c\x06002121\x94\x8c\x06600348\x94\x8c\x06600996\x94\x8c\x06002080\x94\x8c\x06002194\x94\x8c\x06002466\x94\x8c\x06300663\x94\x8c\x06002616\x94\x8c\x06000665\x94\x8c\x06600992\x94\x8c\x06300750\x94\x8c\x06300059\x94\x8c\x06002047\x94\x8c\x06002997\x94\x8c\x06000521\x94\x8c\x06002594\x94\x8c\x06002261\x94\x8c\x06002125\x94\x8c\x06002085\x94\x8c\x06002168\x94\x8c\x06002665\x94\x8c\x06002523\x94\x8c\x06603067\x94\x8c\x06002432\x94e.\r\n'

可见其是一个bytes字符串,开头$xxx是此数据的长度,\r\n作为分割符,后面紧跟着的就是你的原始数据内容,最后才是\r\n作为结尾。

根据这个返回内容,我们就可以制作一个简易的客户端用于在无法引用第三方模块的环境中接收Redis信息。

3.编写简易Redis客户端

与Redis通信,我们只需要用Python原生的socket模块即可。

import socket
import pickle

REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
# 创建 socket 对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 连接服务,指定主机和端口
s.connect((REDIS_HOST, REDIS_PORT))
s.close()

这样就与你的Redis服务器连接上了,接下来只需要向socket发送你的命令并receive即可获取对应的内容:

import socket
import pickle

REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
# 创建 socket 对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 连接服务,指定主机和端口
s.connect((REDIS_HOST, REDIS_PORT))
# GET 某个 KEY 的内容
s.send("GET RQB_keys_20220719 \r\n".encode("utf-8"))
# 接收小于 1M 的数据
msg = s.recv(1024 * 1024) 
s.close()
print(msg)
# b'$466\r\n\x80\x04\x95\xc7\x01\x00\x00\x00\x00\x00\x00]\x94(\x8c\x06000957\x94\x8c\x06002031\x94\x8c\x06000899\x94\x8c\x06300339\x94\x8c\x06002090\x94\x8c\x06601016\x94\x8c\x06002547\x94\x8c\x06002863\x94\x8c\x06002591\x94\x8c\x06002514\x94\x8c\x06000629\x94\x8c\x06002204\x94\x8c\x06000544\x94\x8c\x06002374\x94\x8c\x06000821\x94\x8c\x06000625\x94\x8c\x06000158\x94\x8c\x06002703\x94\x8c\x06002866\x94\x8c\x06600686\x94\x8c\x06002796\x94\x8c\x06300598\x94\x8c\x06002101\x94\x8c\x06002454\x94\x8c\x06000970\x94\x8c\x06000631\x94\x8c\x06002121\x94\x8c\x06600348\x94\x8c\x06600996\x94\x8c\x06002080\x94\x8c\x06002194\x94\x8c\x06002466\x94\x8c\x06300663\x94\x8c\x06002616\x94\x8c\x06000665\x94\x8c\x06600992\x94\x8c\x06300750\x94\x8c\x06300059\x94\x8c\x06002047\x94\x8c\x06002997\x94\x8c\x06000521\x94\x8c\x06002594\x94\x8c\x06002261\x94\x8c\x06002125\x94\x8c\x06002085\x94\x8c\x06002168\x94\x8c\x06002665\x94\x8c\x06002523\x94\x8c\x06603067\x94\x8c\x06002432\x94e.\r\n'

请注意,recv里你设定的大小会直接占用内存,所以请设定一个适宜的数目,或者从返回值中的美元符后的数字判断你需要接收的数据大小。

比如第一次请求,你只接收1024个字节,拿到 $xxx 这个长度后,重新send一次命令,再 s.recv(xxx) 长度。

上述例子中得到的内容是redis的格式,我们需要把\r\n给去除掉,并只取中间的数据便是我们存入redis的原始数据。

import pickle
def get_msg(msg):
    msg_new = msg.split(b"\r\n")[1]
    msg = pickle.loads(msg_new)
    return msg

因为我的原始内容是pickle格式,因此我在取出原始数据后使用pickle.loads便能拿到我想要的内容,完整代码如下:

import socket
import pickle

REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
# 创建 socket 对象
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 连接服务,指定主机和端口
s.connect((REDIS_HOST, REDIS_PORT))
# GET 某个 KEY 的内容
s.send("GET RQB_keys_20220719 \r\n".encode("utf-8"))
# 接收小于 1M 的数据
msg = s.recv(1024 * 1024) 
s.close()

def get_msg(msg):
    msg_new = msg.split(b"\r\n")[1]
    msg = pickle.loads(msg_new)
    return msg

print(get_msg(msg))

效果如下:

['000957', '002031', '000899', '300339', '002090', '601016', '002547', '002863', '002591', '002514', '000629', '002204', '000544', '002374', '000821', '000625', '000158', '002703', '002866', '600686', '002796', '300598', '002101', '002454', '000970', '000631', '002121', '600348', '600996', '002080', '002194', '002466', '300663', '002616', '000665', '600992', '300750', '300059', '002047', '002997', '000521', '002594', '002261', '002125', '002085', '002168', '002665', '002523', '603067', '002432']

在QMT等会限制第三方模块的软件中,使用这样的方式访问Redis,就不会再遇到白名单的限制了。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Annoy 1个优秀的”邻近搜索”解决方案

Annoy是由 spotify 开源的一个Python第三方模块,它能用于搜索空间中给定查询点的近邻点。

此外,众所周知,Python由于GIL的存在,它的多线程最多只能用上一个CPU核的性能。如果你想要做性能优化,就必须用上多进程。

但是多进程存在一个问题,就是所有进程的变量都是独立的,B进程访问不到A进程的变量,因此Annoy为了解决这个问题,增加了一个静态索引保存功能,你可以在A进程中保存Annoy变量,在B进程中通过文件的形式访问这个变量。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install annoy

2.基本使用

Annoy使用起来非常简单,学习成本极低。比如我们随意生成1000个0,1之间的高斯分布点,将其加入到Annoy的索引,并保存为文件:

# 公众号:Python 实用宝典
from annoy import AnnoyIndex
import random

f = 40
t = AnnoyIndex(f, 'angular')  # 用于存储f维度向量
for i in range(1000):
    v = [random.gauss(0, 1) for z in range(f)]
    t.add_item(i, v)

t.build(10) # 10 棵树,查询时,树越多,精度越高。
t.save('test.ann')

这样,我们就完成了索引的创建及落地。Annoy 支持4种距离计算方式:

"angular""euclidean""manhattan""hamming",或"dot",即余弦距离、欧几里得距离、曼哈顿距离、汉明距离及点乘距离。

接下来我们可以新建一个进程访问这个索引:

from annoy import AnnoyIndex

f = 40
u = AnnoyIndex(f, 'angular')
u.load('test.ann') 
print(u.get_nns_by_item(1, 5))
# [1, 607, 672, 780, 625]

其中,u.get_nns_by_item(i, n, search_k=-1, include_distances=False)返回第 i 个item的n个最近邻的item。在查询期间,它将检索多达search_k(默认n_trees * n)个点。如果设置include_distancesTrue,它将返回一个包含两个列表的元组:第二个列表中包含所有对应的距离。

3.算法原理

构建索引:在数据集中随机选择两个点,用它们的中垂线来切分整个数据集。再随机从两个平面中各选出一个顶点,再用中垂线进行切分,于是两个平面变成了四个平面。以此类推形成一颗二叉树。当我们设定树的数量时,这个数量指的就是这样随机生成的二叉树的数量。所以每颗二叉树都是随机切分的。

查询方法
1. 将每一颗树的根节点插入优先队列;
2. 搜索优先队列中的每一颗二叉树,每一颗二叉树都可以得到最多 Top K 的候选集;
3. 删除重复的候选集;
4. 计算候选集与查询点的相似度或者距离;
5. 返回 Top K 的集合。

4.附录

下面是Annoy的所有函数方法:

  • AnnoyIndex(f, metric) 返回可读写的新索引,用于存储f维度向量。metric 可以是 "angular""euclidean""manhattan""hamming",或"dot"
  • a.add_item(i, v)用于给索引添加向量v,i 是指第 i 个向量。
  • a.build(n_trees)用于构建 n_trees 的森林。查询时,树越多,精度越高。在调用build后,无法再添加任何向量。
  • a.save(fn, prefault=False)将索引保存到磁盘。保存后,不能再添加任何向量。
  • a.load(fn, prefault=False)从磁盘加载索引。如果prefault设置为True,它将把整个文件预读到内存中。默认值为False。
  • a.unload() 释放索引。
  • a.get_nns_by_item(i, n, search_k=-1, include_distances=False)返回第 i 个item的 n 个最近邻的item。
  • a.get_nns_by_vector(v, n, search_k=-1, include_distances=False)与上面的相同,但按向量v查询。
  • a.get_item_vector(i)返回第i个向量。
  • a.get_distance(i, j)返回向量i和向量j之间的距离。
  • a.get_n_items() 返回索引中的向量数。
  • a.get_n_trees() 返回索引中的树的数量。
  • a.on_disk_build(fn) 用以在指定文件而不是RAM中建立索引(在添加向量之前执行,在建立之后无需保存)。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

TinyDB 一个纯Python编写的轻量级数据库

TinyDB 是一个纯 Python 编写的轻量级数据库,一共只有1800行代码,没有外部依赖项。目标是降低小型 Python 应用程序使用数据库的难度,对于一些简单程序而言与其用 SQL 数据库,不如就用TinyDB.

TinyDB的特点是:

  • 轻便:当前源代码有 1800 行代码(大约 40% 的文档)和 1600 行测试代码。
  • 可随意迁移:在当前文件夹下生成数据库文件,不需要任何服务,可以随意迁移。
  • 简单: TinyDB 通过提供简单干净的 API 使得用户易于使用。
  • 用纯 Python 编写: TinyDB 既不需要外部服务器,也不需要任何来自 PyPI 的依赖项。
  • 适用于 Python 3.6+ 和 PyPy3: TinyDB 适用于所有现代版本的 Python 和 PyPy。
  • 强大的可扩展性:您可以通过编写中间件修改存储的行为来轻松扩展 TinyDB。
  • 100% 测试覆盖率:无需解释。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install tinydb

2.TinyDB 增删改查示例

初始化一个DB文件:

from tinydb import TinyDB
db = TinyDB('db.json')

这样就在当前文件夹下生成了一个名为 `db.json` 的数据库文件。

往里面插入数据

from tinydb import TinyDB
db = TinyDB('db.json')
db.insert({'type': 'apple', 'count': 7})
db.insert({'type': 'peach', 'count': 3})

可以看到,我们可以直接往数据库里插入字典数据,不需要任何处理。下面是批量插入的方法:

db.insert_multiple([
    {'name': 'John', 'age': 22},
    {'name': 'John', 'age': 37}])
db.insert_multiple({'int': 1, 'value': i} for i in range(2))

查询所有数据

from tinydb import TinyDB
db = TinyDB('db.json')
db.all()
# [{'count': 7, 'type': 'apple'}, {'count': 3, 'type': 'peach'}]

除了 .all() 我们还可以使用for循环遍历db:

from tinydb import TinyDB
db = TinyDB('db.json')
for item in db:
    print(item)
# {'count': 7, 'type': 'apple'}
# {'count': 3, 'type': 'peach'}

如果你需要搜索特定数据,可以使用Query():

from tinydb import TinyDB
db = TinyDB('db.json')
Fruit = Query()
db.search(Fruit.type == 'peach')
# [{'count': 3, 'type': 'peach'}]
db.search(Fruit.count > 5)
# [{'count': 7, 'type': 'apple'}]

更新数据:

from tinydb import TinyDB
db = TinyDB('db.json')
db.update({'foo': 'bar'})

# 删除某个Key
from tinydb.operations import delete
db.update(delete('key1'), User.name == 'John')

删除数据

删除数据也可以使用类似的条件语句:

from tinydb import TinyDB
db = TinyDB('db.json')
db.remove(Fruit.count < 5)
db.all()
# [{'count': 10, 'type': 'apple'}]

清空整个数据库:

from tinydb import TinyDB
db = TinyDB('db.json')
db.truncate()
db.all()
# []

3.高级查询

除了点操作符访问数据,你还可以用原生的dict访问表示法:

# 写法1
db.search(User.country-code == 'foo')
# 写法2
db.search(User['country-code'] == 'foo')

这两种写法是等效的。

另外在常见的查询运算符(==, <, >, …)之外,TinyDB还支持where语句:

from tinydb import where
db.search(where('field') == 'value')

这等同于:

db.search(Query()['field'] == 'value')

这种语法还能访问嵌套字段:

db.search(where('birthday').year == 1900)
# 或者
db.search(where('birthday')['year'] == 1900)

Any 查询方法:

db.search(Group.permissions.any(Permission.type == 'read'))
# [{'name': 'user', 'permissions': [{'type': 'read'}]},
# {'name': 'sudo', 'permissions': [{'type': 'read'}, {'type': 'sudo'}]},
# {'name': 'admin', 'permissions':
#        [{'type': 'read'}, {'type': 'write'}, {'type': 'sudo'}]}]

检查单个项目是否包含在列表中:

db.search(User.name.one_of(['jane', 'john']))

TinyDB还支持和Pandas类似的逻辑操作:

# Negate a query:
db.search(~ (User.name == 'John'))
# Logical AND:
db.search((User.name == 'John') & (User.age <= 30))
# Logical OR:
db.search((User.name == 'John') | (User.name == 'Bob'))

TinyDB的介绍就到这里,你还可以访问他们的官方文档,查看更多的使用方法:

https://tinydb.readthedocs.io/en/latest/usage.html

尤其是想基于TinyDB做些存储优化的同学,你们可以详细阅读 Storage & Middleware 章节。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python Heartrate 像观察心率一样观察代码性能表现

Python Heartrate 这个神奇的模块能让你实时可视化地观察Python程序执行时,每一行代码的性能表现。

左边的数字是每行被击中的次数。条形显示最近被击中的次数,较长的条意味着其被击中的次数更多。

颜色的深浅代表着命中的时间与当前时间的距离,颜色越浅代表离当前时间越近。

下面就来教大家怎么用这个模块来观察你的代码性能表现。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install --user heartrate

支持Python3.5及以上版本。

2.Python Heartrate 基本使用

这个模块用起来超级简单,你只需要在代码里添加下面这两行语句即可:

import heartrate
heartrate.trace(browser=True)

然后打开浏览器窗口,访问:127.0.0.1:9999

就能看到相关的代码性能表现:

通过Heartrate,我可以很清楚地知道我的代码的瓶颈在哪:

左边柱子越长,说明命中次数越多。

白色柱子越频繁出现,说明该行语句存在非常影响性能的问题。

3.高级用法

除了追踪启动Heartrate程序的文件代码之外,Heartrate还能追踪其他文件的运行情况,如果你的文件引入了其他文件下的函数,它也能一起追踪:

from heartrate import trace, files
trace(files=files.path_contains('my_app', 'my_library'))

你只需要这么调用即可追踪其他路径下的文件的代码执行情况。

如果你想追踪全部文件:

from heartrate import trace, files
trace(files=files.all)

这么写即可一劳永逸,不过不建议在生产环境这么用,最好是只用于性能测试。

如果你的代码有性能瓶颈,而你又找不到问题出在哪。那么就快去试一下heartrate,检测到底是哪一行代码出了问题并优化你的代码吧!

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python超好用的命令行参数工具—Click

Click 是一个简洁好用的Python模块,它能用尽量少的代码实现漂亮的命令行界面。它不仅开箱即用、还能支持高度自定义的配置。

一个简单的示例如下:

import click

@click.command()
@click.option('--count', default=1, help='Number of greetings.')
@click.option('--name', prompt='Your name',
              help='The person to greet.')
def hello(count, name):
    """Simple program that greets NAME for a total of COUNT times."""
    for x in range(count):
        click.echo(f"Hello {name}!")

if __name__ == '__main__':
    hello()

效果如下:

可见这个模块的强大之处,你只需要在对应的函数上加几个装饰器,就能实现带提示符的命令行界面的创建,相当方便。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install click

2.基本使用

如文首所示的例子一样,@click.option 是最基本的选项,它既可以设定参数默认值,也可以设定必须传入参数:

@click.command()
@click.option('--n', default=1) # 设定了默认值
def dots(n):
    click.echo('.' * n)
    

@click.command()
@click.option('--n', required=True, type=int) # 设定必须传入参数值
def dots(n):
    click.echo('.' * n)

如果你设置了必须传入相关参数,那么在没传入参数的情况下,效果是这样的:

当然,它还支持设定多种参数别名,比如下面的 –from 和 -f 是等效的:

@click.command()
@click.option('--from', '-f', 'from_')
@click.option('--to', '-t')
def reserved_param_name(from_, to):
    click.echo(f"from {from_} to {to}")

3.多值参数

如果你的选项需要多个参数,Click也能帮你实现这个需求。

@click.command()
@click.option('--pos', nargs=2, type=float)
def findme(pos):
    a, b = pos
    click.echo(f"{a} / {b}")

可见,通过配置nargs参数,你可以将用户传递的值存入元组,并在代码中解包这个元组拿到所有的值。

效果如下:

你还可以配置一个参数叫 multiple,这个参数可以让你接受N个值:

@click.command()
@click.option('--message', '-m', multiple=True)
def commit(message):
    click.echo(' '.join(message))

效果如下:

4.其他功能

你还可以使用Click来计数,这个使用非常罕见:

@click.command()
@click.option('-v', '--verbose', count=True)
def log(verbose):
    click.echo(f"Verbosity: {verbose}")

效果如下:

布尔标志

此外,Click还带有布尔标志功能,你可以直接使用 “/” 来标志参数为二选一参数,函数中直接就会拿到布尔型的变量:

import sys

@click.command()
@click.option('--shout/--no-shout', default=False)
def info(shout):
    rv = sys.platform
    if shout:
        rv = rv.upper() + '!!!!111'
    click.echo(rv)

效果如下:

选择选项

你可以直接限定用户的输入范围:

@click.command()
@click.option('--hash-type',
              type=click.Choice(['MD5', 'SHA1'], case_sensitive=False))
def digest(hash_type):
    click.echo(hash_type)

提示文本

在文首提到的例子中,输出了个 “You name:” 的提示,其实是 option 中的prompt参数控制的:

@click.command()
@click.option('--name', prompt='Your name please')
def hello(name):
    click.echo(f"Hello {name}!")

好了,Click的功能就介绍到这里,他还有许多高级的用法,比如动态默认值、回调函数等等,大家可以通过官方文档了解这些高级功能的使用方法:

https://click.palletsprojects.com/en/8.0.x/options/#name-your-options

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pydantic — 强大的数据校验工具,比DRF快12倍

Pydantic 是一个使用Python类型注解进行数据验证和管理的模块。安装方法非常简单,打开终端输入:

pip install pydantic

它类似于 Django DRF 序列化器的数据校验功能,不同的是,Django里的序列化器的Field是有限制的,如果你想要使用自己的Field还需要继承并重写它的基类:

# Django 序列化器
class Book(models.Model):
    id = models.AutoField(primary_key=True)
    name = models.CharField(max_length=32)
    price = models.DecimalField(max_digits=5, decimal_places=2)
    author = models.CharField(max_length=32)
    publish = models.CharField(max_length=32)

而 Pydantic 基于Python3.7以上的类型注解特性,实现了可以对任何类做数据校验的功能:

# Pydantic 数据校验功能
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel


class User(BaseModel):
    id: int
    name = 'John Doe'
    signup_ts: Optional[datetime] = None
    friends: List[int] = []


external_data = {
    'id': '123',
    'signup_ts': '2019-06-01 12:22',
    'friends': [1, 2, '3'],
}
user = User(**external_data)
print(user.id)
print(type(user.id))
#> 123
#> <class 'int'>
print(repr(user.signup_ts))
#> datetime.datetime(2019, 6, 1, 12, 22)
print(user.friends)
#> [1, 2, 3]
print(user.dict())
"""
{
    'id': 123,
    'signup_ts': datetime.datetime(2019, 6, 1, 12, 22),
    'friends': [1, 2, 3],
    'name': 'John Doe',
}
"""

从上面的基本使用可以看到,它甚至能自动帮你做数据类型的转换,比如代码中的 user.id, 在字典中是字符串,但经过Pydantic校验器后,它自动变成了int型,因为User类里的注解就是int型。

当我们的数据和定义的注解类型不一致时会报这样的Error:

from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel


class User(BaseModel):
    id: int
    name = 'John Doe'
    signup_ts: Optional[datetime] = None
    friends: List[int] = []


external_data = {
    'id': '123',
    'signup_ts': '2019-06-01 12:222',
    'friends': [1, 2, '3'],
}
user = User(**external_data)
"""
Traceback (most recent call last):
  File "1.py", line 18, in <module>
    user = User(**external_data)
  File "pydantic\main.py", line 331, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for User
signup_ts
  invalid datetime format (type=value_error.datetime)
"""

即 “invalid datetime format”, 因为我传入的 signup_ts 不是标准的时间格式(多了个2)。

1.Pydantic 模型数据导出

通过Pydantic模型中自带的 json 属性方法,能让经过校验后的数据一行命令直接转成 json 字符串,如前文中的user对象:

print(user.dict())  # 转为字典
"""
{
    'id': 123,
    'signup_ts': datetime.datetime(2019, 6, 1, 12, 22),
    'friends': [1, 2, 3],
    'name': 'John Doe',
}
"""
print(user.json())  # 转为json
"""
{"id": 123, "signup_ts": "2019-06-01T12:22:00", "friends": [1, 2, 3], "name": "John Doe"}
"""

非常方便。它还支持将整个数据结构导出为 schema json,它能完整地描述整个对象的数据结构类型:

print(user.schema_json(indent=2))
"""
{
  "title": "User",
  "type": "object",
  "properties": {
    "id": {
      "title": "Id",
      "type": "integer"
    },
    "signup_ts": {
      "title": "Signup Ts",
      "type": "string",
      "format": "date-time"
    },
    "friends": {
      "title": "Friends",
      "default": [],
      "type": "array",
      "items": {
        "type": "integer"
      }
    },
    "name": {
      "title": "Name",
      "default": "John Doe",
      "type": "string"
    }
  },
  "required": [
    "id"
  ]
}
"""

2.数据导入

除了直接定义数据校验模型,它还能通过ORM、字符串、文件导入到数据校验模型:

比如字符串(raw):

from datetime import datetime
from pydantic import BaseModel


class User(BaseModel):
    id: int
    name = 'John Doe'
    signup_ts: datetime = None
      
m = User.parse_raw('{"id": 123, "name": "James"}')
print(m)
#> id=123 signup_ts=None name='James'

此外,它能直接将ORM的对象输入,转为Pydantic的对象,比如从Sqlalchemy ORM:

from typing import List
from sqlalchemy import Column, Integer, String
from sqlalchemy.dialects.postgresql import ARRAY
from sqlalchemy.ext.declarative import declarative_base
from pydantic import BaseModel, constr

Base = declarative_base()


class CompanyOrm(Base):
    __tablename__ = 'companies'
    id = Column(Integer, primary_key=True, nullable=False)
    public_key = Column(String(20), index=True, nullable=False, unique=True)
    name = Column(String(63), unique=True)
    domains = Column(ARRAY(String(255)))


class CompanyModel(BaseModel):
    id: int
    public_key: constr(max_length=20)
    name: constr(max_length=63)
    domains: List[constr(max_length=255)]

    class Config:
        orm_mode = True


co_orm = CompanyOrm(
    id=123,
    public_key='foobar',
    name='Testing',
    domains=['example.com', 'foobar.com'],
)
print(co_orm)
#> <models_orm_mode.CompanyOrm object at 0x7f0bdac44850>
co_model = CompanyModel.from_orm(co_orm)
print(co_model)
#> id=123 public_key='foobar' name='Testing' domains=['example.com',
#> 'foobar.com']

从Json文件导入:

from datetime import datetime
from pathlib import Path
from pydantic import BaseModel


class User(BaseModel):
    id: int
    name = 'John Doe'
    signup_ts: datetime = None
      
path = Path('data.json')
path.write_text('{"id": 123, "name": "James"}')
m = User.parse_file(path)
print(m)

从pickle导入:

import pickle
from datetime import datetime
from pydantic import BaseModel

pickle_data = pickle.dumps({
    'id': 123,
    'name': 'James',
    'signup_ts': datetime(2017, 7, 14)
})
m = User.parse_raw(
    pickle_data, content_type='application/pickle', allow_pickle=True
)
print(m)
#> id=123 signup_ts=datetime.datetime(2017, 7, 14, 0, 0) name='James'

3.自定义数据校验

你还能给它增加 validator 装饰器,增加你需要的校验逻辑:

from pydantic import BaseModel, ValidationError, validator


class UserModel(BaseModel):
    name: str
    username: str
    password1: str
    password2: str

    @validator('name')
    def name_must_contain_space(cls, v):
        if ' ' not in v:
            raise ValueError('must contain a space')
        return v.title()

    @validator('password2')
    def passwords_match(cls, v, values, **kwargs):
        if 'password1' in values and v != values['password1']:
            raise ValueError('passwords do not match')
        return v

    @validator('username')
    def username_alphanumeric(cls, v):
        assert v.isalnum(), 'must be alphanumeric'
        return v

上面,我们增加了三种自定义校验逻辑:

1.name 必须带有空格

2.password2 必须和 password1 相同

3.username 必须为字母

让我们试试这三个校验是否成功实现:

user = UserModel(
    name='samuel colvin',
    username='scolvin',
    password1='zxcvbn',
    password2='zxcvbn',
)
print(user)
#> name='Samuel Colvin' username='scolvin' password1='zxcvbn' password2='zxcvbn'

try:
    UserModel(
        name='samuel',
        username='scolvin',
        password1='zxcvbn',
        password2='zxcvbn2',
    )
except ValidationError as e:
    print(e)
    """
    2 validation errors for UserModel
    name
      must contain a space (type=value_error)
    password2
      passwords do not match (type=value_error)
    """

可以看到,第一个UserModel里的数据完全没有问题,通过校验。

第二个UserModel里的数据,由于name存在空格,password2和password1不一致,无法通过校验。

4.性能表现

这是最令我惊讶的部分,Pydantic 比 Django-rest-framework 还快了12.3倍:

PackageVersionRelative PerformanceMean validation time
pydantic1.7.393.7μs
attrs + cattrs20.3.01.5x slower143.6μs
valideer0.4.21.9x slower175.9μs
marshmallow3.10.02.4x slower227.6μs
voluptuous0.12.12.7x slower257.5μs
trafaret2.1.03.2x slower296.7μs
schematics2.1.010.2x slower955.5μs
django-rest-framework3.12.212.3x slower1148.4μs
cerberus1.3.225.9x slower2427.6μs

而且他们的所有基准测试代码都是开源的,你可以在下面这个Github链接找到:

https://github.com/samuelcolvin/pydantic/tree/master/benchmarks

如果你的网络无法访问GitHub,请关注Python实用宝典公众号后台回复Pydantic获取。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

什么格式是保存 Pandas 数据的最好格式?

在数据分析相关项目工作时,我通常使用Jupyter笔记本和pandas库来处理和移动我的数据。对于中等大小的数据集来说,这是一个非常直接的过程,你甚至可以将其存储为纯文本文件而没有太多的开销。

然而,当你的数据集中的观测数据数量较多时,保存和加载数据回内存的过程就会变慢,现在程序的重新启动都会迫使你等待数据重新加载。所以最终,CSV文件或任何其他纯文本格式都会失去吸引力。

我们可以做得更好。有很多二进制格式可以用来将数据存储到磁盘上,其中有很多格式pandas都支持。我们怎么能知道哪一种更适合我们的目的呢?

来吧,我们尝试其中的几个,然后进行对比!这就是我决定在这篇文章中要做的:通过几种方法将 pandas.DataFrame 保存到磁盘上,看看哪一种在I/O速度、内存消耗和磁盘空间方面做的更好。

在这篇文章中,我将展示我的测试结果。

1.要比较的格式

我们将考虑采用以下格式来存储我们的数据:

1. CSV — 数据科学家的一个好朋友
2. Pickle — 一种Python的方式来序列化事物
3. MessagePack — 它就像JSON,但又快又小
4. HDF5 — 一种设计用于存储和组织大量数据的文件格式
5. Feather — 一种快速、轻量级、易于使用的二进制文件格式,用于存储数据框架
6. Parquet — Apache Hadoop的柱状存储格式

所有这些格式都是被广泛使用的,而且(也许除了MessagePack)在你做一些数据分析的事情时非常经常遇到。

为了追求找到最好的缓冲格式来存储程序会话之间的数据,我选择了以下指标进行比较。

1. size_mb – 文件大小(Mb)。
2. save_time – 将数据帧保存到磁盘上所需的时间量。
3. load_time – 将之前转储的数据帧加载到内存中所需要的时间量。
4. save_ram_delta_mb – 数据帧保存过程中最大的内存消耗增长量。
5. load_ram_delta_mb – 数据帧加载过程中的最大内存消耗增长量。

请注意,当我们使用高效压缩的二进制数据格式,如 Parquet 时,最后两个指标变得非常重要。它们可以帮助我们估计加载序列化数据所需的内存量,此外还有数据大小本身。我们将在接下来的章节中更详细地讨论这个问题。

2.测试及结果

我决定使用一个合成数据集进行测试,以便更好地控制序列化的数据结构和属性。

另外,我在我的基准中使用了两种不同的方法:

(a) 将生成的分类变量保留为字符串。

(b) 在执行任何I/O之前将它们转换为 pandas.Categorical 数据类型。

函数generate_dataset显示了我在基准中是如何生成数据集的:

def generate_dataset(n_rows, num_count, cat_count, max_nan=0.1, max_cat_size=100):
    """
    随机生成具有数字和分类特征的数据集。
    
    数字特征取自正态分布X ~ N(0, 1)。
    分类特征则被生成为随机的uuid4字符串。
    
    此外,数字和分类特征的max_nan比例被替换为NaN值。
    """
    dataset, types = {}, {}
    
    def generate_categories():
        from uuid import uuid4
        category_size = np.random.randint(2, max_cat_size)
        return [str(uuid4()) for _ in range(category_size)]
    
    for col in range(num_count):
        name = f'n{col}'
        values = np.random.normal(0, 1, n_rows)
        nan_cnt = np.random.randint(1, int(max_nan*n_rows))
        index = np.random.choice(n_rows, nan_cnt, replace=False)
        values[index] = np.nan
        dataset[name] = values
        types[name] = 'float32'
        
    for col in range(cat_count):
        name = f'c{col}'
        cats = generate_categories()
        values = np.array(np.random.choice(cats, n_rows, replace=True), dtype=object)
        nan_cnt = np.random.randint(1, int(max_nan*n_rows))
        index = np.random.choice(n_rows, nan_cnt, replace=False)
        values[index] = np.nan
        dataset[name] = values
        types[name] = 'object'
    
    return pd.DataFrame(dataset), types

我们将CSV文件的保存和加载性能作为一个基准。

五个随机生成的具有一百万个观测值的数据集被转储到CSV中,并读回内存以获得平均指标。

每种二进制格式都针对20个随机生成的具有相同行数的数据集进行测试。

这些数据集包括15个数字特征和15个分类特征。你可以在这个资源库中找到带有基准测试功能和所需的完整源代码:

https://github.com/devforfu/pandas-formats-benchmark

或在Python实用宝典后台回复 Pandas IO对比 ,下载完整代码。

(a) 数据为字符串特征时的性能

下图显示了每种数据格式的平均I/O时间。一个有趣的观察是,hdf显示出比csv更慢的加载速度,而其他二进制格式的表现明显更好。其中最令人印象深刻的是feather和parquet。

在保存数据和从磁盘上读取数据时,内存开销如何?

下一张图片告诉我们,hdf 的表现就不是那么好了。可以肯定的是,csv在保存/加载纯文本字符串时不需要太多的额外内存,而Feather和parquet则相当接近:

最后,让我们看看文件的大小。这次parquet显示了一个令人印象深刻的结果,考虑到这种格式是为有效存储大量数据而开发的,这并不令人惊讶。

(b) 字符串特征转换为数字时的性能

在上一节中,我们没有尝试有效地存储我们的分类特征而是使用普通的字符串。让我们来弥补这个遗漏吧! 这一次我们使用一个专门的 pandas.Categorical 类型,转字符串特征为数字特征。

看看现在与纯文本的csv相比,它看起来如何!

现在所有的二进制格式都显示出它们的真正力量。Csv的基准结果已经远远落后了,所以让我们把它去掉,以便更清楚地看到各种二进制格式之间的差异:

Feather 和 Pickle 显示了最好的 I/O 速度,而 hdf 仍然显示了明显的性能开销。

现在是时候比较数据进程加载时的内存消耗了。下面的柱状图显示了我们之前提到的关于parquet格式的一个重要事实。

可以看到 parquet 读写时的内存空间差距有多大,你有可能你无法将比较大的 parquet 文件加载到内存中。

最后的图显示了各格式的文件大小。所有的格式都显示出良好的效果,除了hdf仍然需要比其他格式多得多的空间:

3.结论

正如我们的测试所显示的,似乎 feather 格式是存储Python会话数据的理想候选者。它显示了很快的I/O速度,在磁盘上不占用太多内存,并且在加载回RAM时不需要消耗太大的内存。

当然,这种比较并不意味着你应该在每个可能的情况下使用这种格式。例如,feather格式一般不会被用作长期文件存储的格式。

另外,某些特定情况下也无法使用 feather,这由你的整个程序架构决定。然而,就如本帖开头所述的目的,它在不被任何特殊事项限制的情况下是一个很好的选择。

本文译自 towardsdatascience
作者: Ilia Zaitsev
有部分修改。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python怎样存储变量性能最优?这篇文章告诉你答案

工作时我们经常会遇到需要临时保存结果变量的场景,尤其是一些数据处理、模型开发的场景,加载处理速度是个很漫长的过程,于是经常会把这些变量储存起来。

而储存变量最常见、最普遍的方法是用pickle,保存为pkl文件。但是如果从写入和读取的性能角度考虑,pkl可能真的不是最优选。

Pickle有其独特的好处,大部分变量不需要进行处理,都能直接存到pkl文件里,但这样的方便其实是牺牲了部分性能取得的。与之相比,numpy的.npy格式就比pickle性能上快不少。

当然,我们需要有证据支撑这个观点。所以今天我们就来做个实验,分别在Python2和Python3中对比 numpy 和 pickle 两种存储格式(.npy, .pkl) 对数据的存储和读取的性能对比。

部分内容参考分析自: https://applenob.github.io/python/save/

1. Python2中, npy与pkl的性能对比

首先初始化数据:

import numpy as np
import time
import cPickle as pkl
import os

all_batches = []
for i in range(20):
    a1 = np.random.normal(size=[25600, 40])
    label = np.random.normal(size=[25600, 1])
    all_batch = np.concatenate([a1, label], 1)
    all_batches.append(all_batch)
all_batches = np.array(all_batches)
print(all_batches.shape)
# (20, 25600, 41)

然后测试使用pickle保存和读取时间的耗时,以及整个文件的大小:

s_t1 = time.time()
pkl_name = "a.pkl"
with open(pkl_name, "wb") as f:
    pkl.dump(all_batches, f)
pkl_in_time = time.time() - s_t1
print("pkl dump costs {} sec".format(pkl_in_time))

s_t2 = time.time()
with open(pkl_name, "rb") as f:
    new_a = pkl.load(f)
pkl_out_time = time.time() - s_t2
print("pkl load costs {} sec".format(pkl_out_time))

pkl_size = os.path.getsize(pkl_name)
print("pkl file size: {} byte, {} mb".format(pkl_size, float(pkl_size)/(1024*1024)))

结果如下:

pkl dump costs 67.7483091354 sec
pkl load costs 52.1168899536 sec
pkl file size: 497437110 byte, 474.392995834 mb

然后再试一下npy的写入和读取:

s_t3 = time.time()
npy_name = "a.npy"
with open(npy_name, "wb") as f:
    np.save(f, arr=all_batches)
npy_in_time = time.time() - s_t3
print("npy save costs {} sec".format(npy_in_time))
s_t4 = time.time()
with open(npy_name, "rb") as f:
    new_a = np.load(f)
npy_out_time = time.time() - s_t4
print("npy load costs {} sec".format(npy_out_time))
npy_size = os.path.getsize(npy_name)
print("npy file size: {} byte, {} mb".format(npy_size, float(npy_size) / (1024 * 1024)))

结果如下:

npy save costs 20.718367815 sec
npy load costs 0.62314915657 sec
npy file size: 167936128 byte, 160.15637207 mb

结果发现,npy性能明显优于pkl格式。

通过多次测试发现,在Python2中,npy格式的性能优势全面碾压pkl,工程允许的情况下,在Python2中,我们应该在这二者中毫不犹豫地选择npy.

2.Python3中, npy与pkl的性能对比

Python2已经是过去式,重点还要看Python3.

在Python3中,与Python2的代码唯一一句不一样的是pickle的引入:

# Python2:
import cPickle as pkl

# Python3:
import pickle as pkl

其他代码基本一样,替换代码后,重新运行程序,让我们看看在Python3上,npy格式和pkl格式性能上的区别:

首先是pkl格式的表现:

ckenddeMacBook-Pro:Documents ckend$ python 1.py 
(20, 25600, 41)
pkl dump costs 24.32167887687683 sec
pkl load costs 4.480823040008545 sec
pkl file size: 167936163 byte, 160.15640544891357 mb

然后是npy格式的表现:

npy save costs 22.471696853637695 sec
npy load costs 0.3791017532348633 sec
npy file size: 167936080 byte, 160.1563262939453 mb

可以看到在Python3中pkl格式和npy格式的存储大小是基本相同的,在存储耗时上也相差无几。但是在读取数据的时候,npy相对于pkl还是有一定的优势的。

因此,如果你的程序非常注重读取效率,那么我觉得npy格式会比pkl格式更适合你。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandarallel 一个能让你的Pandas计算火力拉满的工具

没有使用Pandarallel
使用了Pandarallel

众所周知,由于GIL的存在,Python单进程中的所有操作都是在一个CPU核上进行的,所以为了提高运行速度,我们一般会采用多进程的方式。而多进程无非就是以下几种方案:

  • 1、multiprocessing
  • 2、concurrent.futures.ProcessPoolExecutor()
  • 3、joblib
  • 4、ppserver
  • 5、celery

这些方案对于普通Pandas玩家来说都不是特别友好,怎样才能算作一个友好的并行处理方案?就是原来的逻辑我基本不用变,仅修改需要计算的那行就能完成我们目标的方案,而 pandarallel 就是一个这样友好的工具。

没有并行计算(原始pandas) pandarallel
df.apply(func)df.parallel_apply(func)
df.applymap(func)df.parallel_applymap(func)
df.groupby(args).apply(func)df.groupby(args).parallel_apply(func)
df.groupby(args1).col_name.rolling(args2).apply(func)df.groupby(args1).col_name.rolling(args2).parallel_apply(func)
df.groupby(args1).col_name.expanding(args2).apply(func)df.groupby(args1).col_name.expanding(args2).parallel_apply(func)
series.map(func)series.parallel_map(func)
series.apply(func)series.parallel_apply(func)
series.rolling(args).apply(func)series.rolling(args).parallel_apply(func)

可以看到,在 pandarallel 的世界里,你只需要替换原有的 pandas 处理语句就能实现多CPU并行计算。非常方便、非常nice.

在4核CPU的性能测试上,它比原始语句快了接近4倍。测试条件(OS: Linux Ubuntu 16.04,Hardware: Intel Core i7 @ 3.40 GHz – 4 cores),这就是我所说的,它把CPU充分利用了起来。

下面就给大家介绍这个模块怎么用,其实非常简单,任何代码只需要加几行代码就能实现质的飞跃。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install pandarallel

对于windows用户,有一个不好的消息是,它只能在Windows的linux子系统上运行(WSL),你可以在微软官网上找到安装教程:

https://docs.microsoft.com/zh-cn/windows/wsl/about

2.使用Pandarallel

使用前,需要对Pandarallel进行初始化:

from pandarallel import pandarallel
pandarallel.initialize()

这样才能调用并行计算的API,不过 initialize 中有一个重要参数需要说明,那就是 nb_workers ,它将指定并行计算的Worker数,如果没有设置,所有CPU的核都会用上。

Pandarallel一共支持8种Pandas操作,下面是一个apply方法的例子。

import pandas as pd
import time
import math
import numpy as np
from pandarallel import pandarallel

# 初始化
pandarallel.initialize()
df_size = int(5e6)
df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),
                       b=np.random.rand(df_size)))
def func(x):
    return math.sin(x.a**2) + math.sin(x.b**2)

# 正常处理
res = df.apply(func, axis=1)

# 并行处理
res_parallel = df.parallel_apply(func, axis=1)

# 查看结果是否相同
res.equals(res_parallel)

其他方法使用上也是类似的,在原始的函数名称前加上 parallel_。比如DataFrame.groupby.apply:

import pandas as pd
import time
import math
import numpy as np
from pandarallel import pandarallel

# 初始化
pandarallel.initialize()
df_size = int(3e7)
df = pd.DataFrame(dict(a=np.random.randint(1, 1000, df_size),
                       b=np.random.rand(df_size)))
def func(df):
    dum = 0
    for item in df.b:
        dum += math.log10(math.sqrt(math.exp(item**2)))
        
    return dum / len(df.b)

# 正常处理
res = df.groupby("a").apply(func)
# 并行处理
res_parallel = df.groupby("a").parallel_apply(func)
res.equals(res_parallel)

又比如 DataFrame.groupby.rolling.apply:

import pandas as pd
import time
import math
import numpy as np
from pandarallel import pandarallel

# 初始化
pandarallel.initialize()
df_size = int(1e6)
df = pd.DataFrame(dict(a=np.random.randint(1, 300, df_size),
                       b=np.random.rand(df_size)))
def func(x):
    return x.iloc[0] + x.iloc[1] ** 2 + x.iloc[2] ** 3 + x.iloc[3] ** 4

# 正常处理
res = df.groupby('a').b.rolling(4).apply(func, raw=False)
# 并行处理
res_parallel = df.groupby('a').b.rolling(4).parallel_apply(func, raw=False)
res.equals(res_parallel)

案例都是类似的,这里就直接列出表格,不浪费大家宝贵的时间去阅读一些重复的例子了:

没有并行计算(原始pandas) pandarallel
df.apply(func)df.parallel_apply(func)
df.applymap(func)df.parallel_applymap(func)
df.groupby(args).apply(func)df.groupby(args).parallel_apply(func)
df.groupby(args1).col_name.rolling(args2).apply(func)df.groupby(args1).col_name.rolling(args2).parallel_apply(func)
df.groupby(args1).col_name.expanding(args2).apply(func)df.groupby(args1).col_name.expanding(args2).parallel_apply(func)
series.map(func)series.parallel_map(func)
series.apply(func)series.parallel_apply(func)
series.rolling(args).apply(func)series.rolling(args).parallel_apply(func)

3.注意事项

1. 我有 8 个 CPU,但 parallel_apply 只能加快大约4倍的计算速度。为什么?

答:正如我前面所言,Python中每个进程占用一个核,Pandarallel 最多只能加快到你所拥有的核心的总数,一个 4 核的超线程 CPU 将向操作系统显示 8 个 CPU,但实际上只有 4 个核心,因此最多加快4倍。

2. 并行化是有成本的(实例化新进程,通过共享内存发送数据,…),所以只有当并行化的计算量足够大时,并行化才是有意义的。对于很少量的数据,使用 Pandarallel 并不总是值得的。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

超级方便的轻量级Python流水线工具,拥有漂亮的可视化界面!

Mara-pipelines 是一个轻量级的数据转换框架,具有透明和低复杂性的特点。其他特点如下:

  • 基于非常简单的Python代码就能完成流水线开发。
  • 使用 PostgreSQL 作为数据处理引擎。
  • 有Web界面可视化分析流水线执行过程。
  • 基于 Python 的 multiprocessing 单机流水线执行。不需要分布式任务队列。轻松调试和输出日志。
  • 基于成本的优先队列:首先运行具有较高成本(基于记录的运行时间)的节点。

此外,在Mara-pipelines的Web界面中,你不仅可以查看和管理流水线及其任务节点,你还可以直接触发这些流水线和节点,非常好用:

1.安装

由于使用了大量的依赖,Mara-pipelines 并不适用于Windows,如果你需要在Windows上使用Mara-pipelines,请使用docker或者windows下的linux子系统

使用pipe安装Mara-pipelines:

pip install mara-pipelines

或者:

pip install git+https://github.com/mara/mara-pipelines.git

2.使用示例

这是一个基础的流水线演示,由三个相互依赖的节点组成,包括 任务1(ping_localhost), 子流水线(sub_pipeline), 任务2(sleep):

# 注意,这个示例中使用了部分国外的网站,如果无法访问,请变更为国内网站。
from mara_pipelines.commands.bash import RunBash
from mara_pipelines.pipelines import Pipeline, Task
from mara_pipelines.ui.cli import run_pipeline, run_interactively

pipeline = Pipeline(
    id='demo',
    description='A small pipeline that demonstrates the interplay between pipelines, tasks and commands')

pipeline.add(Task(id='ping_localhost', description='Pings localhost',
                  commands=[RunBash('ping -c 3 localhost')]))

sub_pipeline = Pipeline(id='sub_pipeline', description='Pings a number of hosts')

for host in ['google', 'amazon', 'facebook']:
    sub_pipeline.add(Task(id=f'ping_{host}', description=f'Pings {host}',
                          commands=[RunBash(f'ping -c 3 {host}.com')]))

sub_pipeline.add_dependency('ping_amazon', 'ping_facebook')
sub_pipeline.add(Task(id='ping_foo', description='Pings foo',
                      commands=[RunBash('ping foo')]), ['ping_amazon'])

pipeline.add(sub_pipeline, ['ping_localhost'])

pipeline.add(Task(id='sleep', description='Sleeps for 2 seconds',
                  commands=[RunBash('sleep 2')]), ['sub_pipeline'])

可以看到,Task包含了多个commands,这些commands会用于真正地执行动作。而 pipeline.add 的参数中,第一个参数是其节点,第二个参数是此节点的上游。如:

pipeline.add(sub_pipeline, ['ping_localhost'])

则表明必须执行完 ping_localhost 才会执行 sub_pipeline.

为了运行这个流水线,需要配置一个 PostgreSQL 数据库来存储运行时信息、运行输出和增量处理状态:

import mara_db.auto_migration
import mara_db.config
import mara_db.dbs

mara_db.config.databases \
    = lambda: {'mara': mara_db.dbs.PostgreSQLDB(host='localhost', user='root', database='example_etl_mara')}

mara_db.auto_migration.auto_discover_models_and_migrate()

如果 PostgresSQL 正在运行并且账号密码正确,输出如下所示(创建了一个包含多个表的数据库):

Created database "postgresql+psycopg2://root@localhost/example_etl_mara"

CREATE TABLE data_integration_file_dependency (
    node_path TEXT[] NOT NULL, 
    dependency_type VARCHAR NOT NULL, 
    hash VARCHAR, 
    timestamp TIMESTAMP WITHOUT TIME ZONE, 
    PRIMARY KEY (node_path, dependency_type)
);

.. more tables

为了运行这个流水线,你需要:

from mara_pipelines.ui.cli import run_pipeline

run_pipeline(pipeline)

这将运行单个流水线节点及其 (sub_pipeline) 所依赖的所有节点:

run_pipeline(sub_pipeline, nodes=[sub_pipeline.nodes['ping_amazon']], with_upstreams=True)

3.Web 界面

我认为 mara-pipelines 最有用的是他们提供了基于Flask管控流水线的Web界面。

对于每条流水线,他们都有一个页面显示:

  • 所有子节点的图以及它们之间的依赖关系
  • 流水线的总体运行时间图表以及过去 30 天内最昂贵的节点(可配置)
  • 所有流水线节点及其平均运行时间和由此产生的排队优先级的表
  • 流水线最后一次运行的输出和时间线

对于每个任务,都有一个页面显示

  • 流水线中任务的上游和下游
  • 最近 30 天内任务的运行时间
  • 任务的所有命令
  • 任务最后运行的输出

此外,流水线和任务可以直接从网页端调用运行,这是非常棒的特点:

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典