新年新气象,超级文献下载工具更新了!一行命令下载全网任意文献

之前为了解决学生无力支付国内部分论文平台的付费阅读功能的问题,我们推出了超级文献下载工具:你不得不知道的python超级文献批量搜索下载工具

在最初的这几个版本中,我们必须通过编写代码才能选择不同的文献源去搜索和下载文献。很多同学在使用过程中会由于对Python不熟悉或者环境没有配置好而产生不少问题。

为了解决这些问题,我们给他增加了命令行调用的方式,并上传到了PyPi,你只需要一行命令,就能下载到你所需要的文献!(感谢 @hulei6188 的开源贡献)

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install scihub-cn

看到 Successfully installed … 就代表成功安装scihub-cn。

不过请注意,scihub-cn依赖 aiohttp 模块进行并发的下载,因此支持的最低Python版本为3.6.

项目源代码:https://github.com/Ckend/scihub-cn

2.Scihub-cn 使用方法

2.1 使用DOI号下载论文

首先让我们来试试根据DOI号下载文献:

scihub-cn -d 10.1038/s41524-017-0032-0

下载的论文会自动生成在当前文件夹下:

你也可以选择将其下载到任意目录下,只需要添加 -o 参数:

scihub-cn -d 10.1038/s41524-017-0032-0 -o D:\papers

这将会把这篇论文下载到D盘的papers文件夹中。

2.2 根据关键词下载论文

使用 -w 参数指定一个关键词,可以通过关键词下载论文:

scihub-cn -w reinforcement

同样滴,它也支持-o参数指定文件夹。此外,这里默认使用的搜索引擎是百度学术,你也可以使用Google学术、publons、science_direct等。通过指定 -e 参数即可:

scihub-cn -w reinforcement -e google_scholar

为了避免Google学术无法连接,你还可以增加代理 -p 参数:

scihub-cn -w reinforcement -e google_scholar -p http://127.0.0.1:10808

访问外网数据源的时候,增加代理能避免出现Connection closed等问题。

此外,你还能限定下载的篇目, 比如我希望下载100篇文章:

scihub-cn -w reinforcement -l 100

2.3 根据url下载论文

给定任意论文地址,可以让scihub-cn尝试去下载该论文:

scihub-cn -u https://ieeexplore.ieee.org/document/26502

使用 -u 参数指定论文链接即可,非常方便。

3.批量下载论文

当然,之前花了几篇文章优化的批量下载模块这个版本肯定少不了!

而且还增加了几种新的批量下载方式:

1. 根据给出所有论文名称的txt文本文件下载论文。

2. 根据给出所有论文url的txt文件下载论文。

3. 根据给出所有论文DOI号的txt文本文件下载论文。

4. 根据给出bibtex文件下载论文。

比如,根据给出所有论文URL的txt文件下载论文:

scihub-cn -i urls.txt --url

可以看到,文件内有4个论文链接,而他也成功地下载到了这4篇论文。

再试试放了DOI号的txt文件的批量下载:

scihub-cn -i dois.txt --doi

你可以输入 scihub-cn –help 看到更多的参数说明:

$scihub-cn --help
... ...
optional arguments:
  -h, --help            show this help message and exit
  -u URL                input the download url
  -d DOI                input the download doi
  --input INPUTFILE, -i INPUTFILE
                        input download file
  -w WORDS, --words WORDS
                        download from some key words,keywords are linked by
                        _,like machine_learning.
  --title               download from paper titles file
  -p PROXY, --proxy PROXY
                        use proxy to download papers
  --output OUTPUT, -o OUTPUT
                        setting output path
  --doi                 download paper from dois file
  --bib                 download papers from bibtex file
  --url                 download paper from url file
  -e SEARCH_ENGINE, --engine SEARCH_ENGINE
                        set the search engine
  -l LIMIT, --limit LIMIT
                        limit the number of search result

大家如果有更多的想法,可以往我们这个开源项目贡献代码:

https://github.com/Ckend/scihub-cn

本文仅限参考研究,下载的论文请在24小时内阅读后删除,请勿将此项目用于商业目的。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Dynaconf 轻松实现 Python 动态配置管理

Dynaconf 是一个库,旨在成为在 Python 中管理配置的最佳选择。

它可以从各种来源读取设置,包括环境变量、文件、服务器配置等。

它适用于任何类型的 Python 程序,包括 Flask 和 Django 扩展。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install dynaconf

2.初步使用DynaConf

在你的项目的根目录中运行 dynaconf init 命令。

cd path/to/your/project/
dynaconf init -f toml

会有类似如下的输出,说明初始化完成:

⚙️  Configuring your Dynaconf environment
------------------------------------------
🐍 The file `config.py` was generated.

🎛️  settings.toml created to hold your settings.

🔑 .secrets.toml created to hold your secrets.

🙈 the .secrets.* is also included in `.gitignore`
beware to not push your secrets to a public repo.

🎉 Dynaconf is configured! read more on https://dynaconf.com

刚刚初始化的时候我们选择了 toml 格式。实际上你还可以选择 toml|yaml|json|ini|py,不过 toml 是默认的,也是最推荐的配置格式。

初始化完成后会创建以下文件:

.
├── config.py       # 需要被导入的配置脚本
├── .secrets.toml   # 像密码等敏感信息配置
└── settings.toml   # 应用配置

初始化完成后你就可以编写你的配置,编辑settings.toml:

key = "value"
a_boolean = false
number = 1234
a_float = 56.8
a_list = [1, 2, 3, 4]
a_dict = {hello="world"}

[a_dict.nested]
other_level = "nested value"

然后就可以在你的代码中导入并使用这些配置:

from config import settings

assert settings.key == "value"
assert settings.number == 789
assert settings.a_dict.nested.other_level == "nested value"
assert settings['a_boolean'] is False
assert settings.get("DONTEXIST", default=1) == 1

如果是密码等敏感信息,你可以配置在 .secrets.toml 中:

password = "s3cr3t"
token = "dfgrfg5d4g56ds4gsdf5g74984we5345-"
message = "This file doesn't go to your pub repo"

.secrets.toml 文件会被自动加入到 .gitignore 文件中,这些信息不会被上传到Git仓库上。

同时,DYNACONF还支持带前缀的环境变量:

export DYNACONF_NUMBER=789
export DYNACONF_FOO=false
export DYNACONF_DATA__CAN__BE__NESTED=value
export DYNACONF_FORMATTED_KEY="@format {this.FOO}/BAR"
export DYNACONF_TEMPLATED_KEY="@jinja {{ env['HOME'] | abspath }}"

3.高级使用

你还可以在Flask或Django中使用DynaConf,以Django为例,第一步要先确保已经设置 DJANGO_SETTINGS_MODULE 环境变量:

export DJANGO_SETTINGS_MODULE=yourproject.settings

然后在 manage.py 相同文件夹下运行初始化命令:

dynaconf init -f yaml

然后按照终端上的说明进行操作:

Django app detected
⚙️  Configuring your Dynaconf environment
------------------------------------------
🎛️  settings.yaml created to hold your settings.

🔑 .secrets.yaml created to hold your secrets.

🙈 the .secrets.yaml is also included in `.gitignore`
beware to not push your secrets to a public repo
or use dynaconf builtin support for Vault Servers.

⁉  path/to/yourproject/settings.py is found do you want to add dynaconf? [y/N]:

回答 y:

🎠  Now your Django settings are managed by Dynaconf
🎉  Dynaconf is configured! read more on https://dynaconf.com

在 Django 上,推荐的文件格式是yaml,因为它可以更轻松地保存复杂的数据结构,但是你依然可以选择使用 toml、json、ini 甚至将你的配置保存为 .py 格式。

初始化 dynaconf 后,在现有的settings.py底部包含以下内容:

# HERE STARTS DYNACONF EXTENSION LOAD
import dynaconf  # noqa
settings = dynaconf.DjangoDynaconf(__name__)  # noqa
# HERE ENDS DYNACONF EXTENSION LOAD (No more code below this line)

现在,在你的 Django 视图、模型和所有其他地方,你现在可以正常使用 django.conf.settings,因为它已被 Dynaconf 设置对象替换。

from django.conf import settings


def index(request):
    assert settings.DEBUG is True
    assert settings.NAME == "Bruno"
    assert settings.DATABASES.default.name == "db"
    assert settings.get("NONEXISTENT", 2) == 2

现在,通过修改 manage.py 相同文件夹下的配置文件,就能让配置全局生效了。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

什么格式是保存 Pandas 数据的最好格式?

在数据分析相关项目工作时,我通常使用Jupyter笔记本和pandas库来处理和移动我的数据。对于中等大小的数据集来说,这是一个非常直接的过程,你甚至可以将其存储为纯文本文件而没有太多的开销。

然而,当你的数据集中的观测数据数量较多时,保存和加载数据回内存的过程就会变慢,现在程序的重新启动都会迫使你等待数据重新加载。所以最终,CSV文件或任何其他纯文本格式都会失去吸引力。

我们可以做得更好。有很多二进制格式可以用来将数据存储到磁盘上,其中有很多格式pandas都支持。我们怎么能知道哪一种更适合我们的目的呢?

来吧,我们尝试其中的几个,然后进行对比!这就是我决定在这篇文章中要做的:通过几种方法将 pandas.DataFrame 保存到磁盘上,看看哪一种在I/O速度、内存消耗和磁盘空间方面做的更好。

在这篇文章中,我将展示我的测试结果。

1.要比较的格式

我们将考虑采用以下格式来存储我们的数据:

1. CSV — 数据科学家的一个好朋友
2. Pickle — 一种Python的方式来序列化事物
3. MessagePack — 它就像JSON,但又快又小
4. HDF5 — 一种设计用于存储和组织大量数据的文件格式
5. Feather — 一种快速、轻量级、易于使用的二进制文件格式,用于存储数据框架
6. Parquet — Apache Hadoop的柱状存储格式

所有这些格式都是被广泛使用的,而且(也许除了MessagePack)在你做一些数据分析的事情时非常经常遇到。

为了追求找到最好的缓冲格式来存储程序会话之间的数据,我选择了以下指标进行比较。

1. size_mb – 文件大小(Mb)。
2. save_time – 将数据帧保存到磁盘上所需的时间量。
3. load_time – 将之前转储的数据帧加载到内存中所需要的时间量。
4. save_ram_delta_mb – 数据帧保存过程中最大的内存消耗增长量。
5. load_ram_delta_mb – 数据帧加载过程中的最大内存消耗增长量。

请注意,当我们使用高效压缩的二进制数据格式,如 Parquet 时,最后两个指标变得非常重要。它们可以帮助我们估计加载序列化数据所需的内存量,此外还有数据大小本身。我们将在接下来的章节中更详细地讨论这个问题。

2.测试及结果

我决定使用一个合成数据集进行测试,以便更好地控制序列化的数据结构和属性。

另外,我在我的基准中使用了两种不同的方法:

(a) 将生成的分类变量保留为字符串。

(b) 在执行任何I/O之前将它们转换为 pandas.Categorical 数据类型。

函数generate_dataset显示了我在基准中是如何生成数据集的:

def generate_dataset(n_rows, num_count, cat_count, max_nan=0.1, max_cat_size=100):
    """
    随机生成具有数字和分类特征的数据集。
    
    数字特征取自正态分布X ~ N(0, 1)。
    分类特征则被生成为随机的uuid4字符串。
    
    此外,数字和分类特征的max_nan比例被替换为NaN值。
    """
    dataset, types = {}, {}
    
    def generate_categories():
        from uuid import uuid4
        category_size = np.random.randint(2, max_cat_size)
        return [str(uuid4()) for _ in range(category_size)]
    
    for col in range(num_count):
        name = f'n{col}'
        values = np.random.normal(0, 1, n_rows)
        nan_cnt = np.random.randint(1, int(max_nan*n_rows))
        index = np.random.choice(n_rows, nan_cnt, replace=False)
        values[index] = np.nan
        dataset[name] = values
        types[name] = 'float32'
        
    for col in range(cat_count):
        name = f'c{col}'
        cats = generate_categories()
        values = np.array(np.random.choice(cats, n_rows, replace=True), dtype=object)
        nan_cnt = np.random.randint(1, int(max_nan*n_rows))
        index = np.random.choice(n_rows, nan_cnt, replace=False)
        values[index] = np.nan
        dataset[name] = values
        types[name] = 'object'
    
    return pd.DataFrame(dataset), types

我们将CSV文件的保存和加载性能作为一个基准。

五个随机生成的具有一百万个观测值的数据集被转储到CSV中,并读回内存以获得平均指标。

每种二进制格式都针对20个随机生成的具有相同行数的数据集进行测试。

这些数据集包括15个数字特征和15个分类特征。你可以在这个资源库中找到带有基准测试功能和所需的完整源代码:

https://github.com/devforfu/pandas-formats-benchmark

或在Python实用宝典后台回复 Pandas IO对比 ,下载完整代码。

(a) 数据为字符串特征时的性能

下图显示了每种数据格式的平均I/O时间。一个有趣的观察是,hdf显示出比csv更慢的加载速度,而其他二进制格式的表现明显更好。其中最令人印象深刻的是feather和parquet。

在保存数据和从磁盘上读取数据时,内存开销如何?

下一张图片告诉我们,hdf 的表现就不是那么好了。可以肯定的是,csv在保存/加载纯文本字符串时不需要太多的额外内存,而Feather和parquet则相当接近:

最后,让我们看看文件的大小。这次parquet显示了一个令人印象深刻的结果,考虑到这种格式是为有效存储大量数据而开发的,这并不令人惊讶。

(b) 字符串特征转换为数字时的性能

在上一节中,我们没有尝试有效地存储我们的分类特征而是使用普通的字符串。让我们来弥补这个遗漏吧! 这一次我们使用一个专门的 pandas.Categorical 类型,转字符串特征为数字特征。

看看现在与纯文本的csv相比,它看起来如何!

现在所有的二进制格式都显示出它们的真正力量。Csv的基准结果已经远远落后了,所以让我们把它去掉,以便更清楚地看到各种二进制格式之间的差异:

Feather 和 Pickle 显示了最好的 I/O 速度,而 hdf 仍然显示了明显的性能开销。

现在是时候比较数据进程加载时的内存消耗了。下面的柱状图显示了我们之前提到的关于parquet格式的一个重要事实。

可以看到 parquet 读写时的内存空间差距有多大,你有可能你无法将比较大的 parquet 文件加载到内存中。

最后的图显示了各格式的文件大小。所有的格式都显示出良好的效果,除了hdf仍然需要比其他格式多得多的空间:

3.结论

正如我们的测试所显示的,似乎 feather 格式是存储Python会话数据的理想候选者。它显示了很快的I/O速度,在磁盘上不占用太多内存,并且在加载回RAM时不需要消耗太大的内存。

当然,这种比较并不意味着你应该在每个可能的情况下使用这种格式。例如,feather格式一般不会被用作长期文件存储的格式。

另外,某些特定情况下也无法使用 feather,这由你的整个程序架构决定。然而,就如本帖开头所述的目的,它在不被任何特殊事项限制的情况下是一个很好的选择。

本文译自 towardsdatascience
作者: Ilia Zaitsev
有部分修改。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

7行代码实现早上出门前自动收到分时天气预报

早上出门上班前,我总是忘记查看天气预报,以至于通勤路上下雨来了个措手不及。

回想起来,大部分人早上出门前的行为模式是固定的,那么有没有办法能在我出门前的那一分钟提醒我带伞或者是穿外套?

答案是肯定的,通过上回的钉钉机器人,我们就能实现这个目的。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

如果你没有阅读上一篇关于钉钉机器人的文章,请记得阅读, 有一些前置知识是你必须知道的:

10分钟教你用Python开发钉钉通知机器人

2.请求天气接口

有一个网站服务叫做:wttr.in 提供了非常方便的天气接口,比如:

https://wttr.in/Shenzhen?&lang=cn

效果如下:

我们可以通过这个API,获得全天的天气预报。

它支持很多形式,比如单行输出:

$ curl wttr.in/Nuremberg?format=3
Nuremberg: 🌦 +11⁰C

或者一次处理所有城市的这些查询:

$ curl -s 'wttr.in/{Nuremberg,Hamburg,Berlin}?format=3'
Nuremberg: 🌦 +11⁰C
Hamburg: 🌦 +8⁰C
Berlin: 🌦 +8⁰C

如果你希望让刚刚的未来三天天气预报输出成为图片格式,它也能实现:

curl 'https://wttr.in/Shenzhen.png'

不仅如此,它还支持分时天气预报:

这一张图就是我们要自动通知的天气预报,下面就告诉大家如何把这种图嵌入到钉钉通知中。

3.钉钉通知天气预报

使用我们上一回讲过的钉钉通知机器人,7行代码就能搞定这个需求:

https://github.com/Ckend/dd_notice

7行?没想到吧,基于markdown发送通知就是如此的简单:

import datetime
from notice import Messenger
m = Messenger(
    token="你的token",
    secret="你的secret"
)
m.send_md(f"天气预报-{datetime.datetime.today()}", "![weather](https://v2d.wttr.in/Shenzhen.png)")

将上回的源代码拉下来后,增加这7行代码,你只需要修改你的 token 和 secret 就能发送天气预报。

注意,请求的链接里拿的还是ShenZhen的天气预报,你可以改成自己所在的城市,也可以自定义任何自己喜欢的图表。效果如下:

所有的源代码都已经放在:

https://github.com/Ckend/dd_notice

如果你上不了Github,Python实用宝典公众号后台回复天气预报也能下载完整的通知源代码。

然后为了实现每天的定时发送,你只需要把代码放到服务器上,使用crontab配置定时任务即可:

# 输入 crontab -e 增加下面这一行,每天早上8:00运行通知脚本
0 8 * * * python /data/dd_notice/weather_notice.py

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pytorch+spark 进行大规模预测

虽然深度学习日益盛行,但目前spark还不支持深度学习算法。虽然也有相关库sparktorch能够将spark和pytorch结合起来,但是使用发现并非那么好用,而且此库目前活跃度较低,不方便debug。因此,本地训练深度学习模型并部署到spark中是一种有效的利用深度学习进行大规模预测的方法。

将pytorch模型嵌入部署到spark中进行大规模预测主要包括三步:

1.利用spark进行特征工程预处理,以保证训练集和测试集特征处理一致;

第一二步都比较简单,这里省去。主要对第三步进行说明。

模型分发(broadcast)分两种情况,第一种是简单可通过nn.Sequential定义的模型。对于这种情况可以,模型可以直接用。如下:

# 生成测试数据
from sklearn.datasets import make_classification

X, y = make_classification(n_samples=50000, n_features=100, random_state=0)
df = pd.DataFrame(X)
df['label'] = np.random.randint(2,size=(50000))
df1 = spark.createDataFrame(df)
df1 = df1.withColumn('features', Func.array([col(f"{i}") for i in range(0, 100)])).repartition(1000)

# 创建模型并进行预测
%spark2_1.pyspark
import torch.nn as nn

network = nn.Sequential(
    nn.Linear(100, 2560),
    nn.ReLU(),
    nn.Linear(2560, 2560),
    nn.ReLU(),
    nn.Linear(2560, 2)
    #nn.Softmax(dim=1)
)

class network(nn.Module):
    def __init__(self):
        super(network, self).__init__()
        self.l1 = nn.Linear(100, 2560)
        self.l2 = nn.Linear(2560, 2560)
        self.l3 = nn.Linear(2560, 2)
        
    def forward(self, x):
        x = self.l1(x)
        x = self.l2(x)
        x = self.l3(x)
        return x

net = network()
bc_model_state = spark.sparkContext.broadcast(net.state_dict())

def get_model_for_eval():
  # Broadcast the model state_dict
  net.load_state_dict(bc_model_state.value)
  net.eval()
  return net
  
def one_row_predict(x):
    model = get_model_for_eval()
    t = torch.tensor(x, dtype=torch.float32)
    t = model(t).cpu().detach().numpy()
    #prediction = model(t).cpu().detach().item()
    # return prediction
    return list([float(i) for i in t])

one_row_udf = udf(one_row_predict, ArrayType(FloatType()))
df1 = df1.withColumn('pred_one_row', one_row_udf(col('features')))

在上面我们定义了一个简单模型,然后将其直接分发进行预测(这里省去了模型训练过程)。

但是当我们想使用一个比较复杂的模型来进行预测时(简单来讲就是不能使用 nn.Sequential 改写),使用上面的方法则会报错。

这时候需要将模型写入一个文件中,假设模型文件的路径为/export/models/item2vec.py, 使用pyspark中的addFile对其进行分发,然后import导入模型。

假设我们的模型文件/export/models/item2vec.py如下:

class Item2vec(nn.Module):
    def __init__(self, cv_dict, csr_cols):
        super(Item2vec, self).__init__()
        pass

    def forward(self, x):
        pass

    def predict(self, x):
        pass

假设模型已经训练好,现在要使用训练好的模型进行大规模预测:

from pyspark import SparkFiles
sc.addFile('/export/models/item2vec.py')
import sys
sys.path.append('/export/models/')

from item2vec import Item2vec

# model 表示训练好的模型
bc_model_state = sc.broadcast(model.state_dict())
net = Item2vec(cv_dict, csr_cols)

def get_model_for_eval_demo():
  # Broadcast the model state_dict
  net.load_state_dict(bc_model_state.value)
  net.eval()
  return net

上面的操作已经将模型分发(broadcast)出去,接下来就可以进行预测了。

预测这里介绍两种方式:一种是使用 udf + withColumn, 另一种则是使用 rdd + mapPartitions

由于这里使用的是 pyspark 2.1,还没有pandas udf,因此使用 udf + withColumn 时只能一行一行的预测,运行速度上来说是比不上 rdd + mapPartitions

对于pyspark 2.3以后的版本多了pandas udf后则可以使用batch predict了,具体可以参考

https://docs.databricks.com/static/notebooks/deep-learning/pytorch-images.html

udf + withColumn 的方式
# udf + withColumn 的方式
def one_row_predict_demo(x)
    x = torch.tensor(x, dtype=torch.float)
    _, prob = bc_model.predict(x)

    return round(float(prob[0]), 4)
    
one_row_predict_demo_udf = udf(one_row_predict_demo, DoubleType())

one_row_predict_demo_udf = udf(one_row_predict_demo, DoubleType())
df = demo.withColumn('demo_prob', one_row_predict_demo_udf('features'))
rdd + map 方式
def one_row_predict_map(rdds):
    bc_model = get_model_for_eval_demo()
    for row in rdds:
        x = torch.tensor(row.x, dtype=torch.float)
        _, prob = bc_model.predict(x)
    
        yield (row['id'], round(float(prob[0]), 4))

df = demo.rdd.mapPartitions(one_row_predict_map).toDF(['id', 'pred_prob'])

2. 效率优化(1)——mapPartition

上面的方法已经可以使得我们将训练好的深度学习模型部署到spark进行大规模预测了,但是其速度是非常慢的。通过在 mapPartitions 中进行一些处理,我们可以对预测进行加速:

# 代码源自 https://github.com/SaeedNajafi/infer-pytorch-pyspark

def basic_row_handler(row):
    return row

def predict_map(index, partition, ml_task,
                batch_size=16,
                row_preprocessor=basic_row_handler,
                row_postprocessor=basic_row_handler):

    # local model loading within each executor
    model = LocalPredictor(ml_task=ml_task, batch_size=batch_size,
                           partition_index=index)

    batch = []
    count = 0
    for row in partition:
        row_dict = row.asDict()
        # apply preprocessor on each row.
        row_dict_prep = row_preprocessor(row_dict)
        batch.append(row_dict_prep)
        count += 1
        if count == batch_size:
            # predict the ml and apply the postprocessor.
            for ret_row in model.predict(batch):  # ml prediction
                ret_row_post = row_postprocessor(ret_row)
                if ret_row_post is not None:
                    yield Row(**ret_row_post)

            batch = []
            count = 0

    # Flush remaining rows in the batches.
    if count != 0:
        for ret_row in model.predict(batch):  # ml prediction
            ret_row_post = row_postprocessor(ret_row)
            if ret_row_post is not None:
                yield Row(**ret_row_post)

        batch = []
        count = 0

上面的代码可以看作是在mapPartitions中进行了“延迟”预测——即先将一个partition中的多行数据进行处理然后合并为一个batch进行一起预测,这样能大大的提升运行效率。一个比较极端的情况是每个partition仅进行一次预测。

3. 效率优化(2)——pandas_udf

pandas_udf在udf的基础上进行了进一步的优化,利用pandas_udf程序运行效率更高。在这里我们可以借助于pandas_udf提升我们程序的运行效率:

# Enable Arrow support.
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "64")

sc.addFile('get_model.py')
from get_model import get_model

model_path = '/path/to/model.pt'
data_path = '/path/to/data'

# model 表示训练好的模型
model = torch.load(model_path)
bc_model_state = sc.broadcast(model.state_dict())


def get_model_for_eval():
  # Broadcast the model state_dict  
  model = get_model()
  model.load_state_dict(bc_model_state.value)
  model.eval()
  return model

# model = torch.load(model_path)
# model = sc.broadcast(model)


@pandas_udf(FloatType())
def predict_batch_udf(arr: pd.Series) -> pd.Series:
    model = get_model_for_eval()
    # model.to(device)
    arr = np.vstack(arr.map(lambda x: eval(x)).values)
    arr = torch.tensor(arr).long()
    with torch.no_grad():
        predictions = list(model(arr).cpu().numpy())
            
    return pd.Series(predictions)

# 预测
data = data.withColumn('predictions', predict_batch_udf('features'))

作者:井底蛙蛙呱呱呱
链接:https://www.jianshu.com/p/fc60c967c8b8

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

量化投资单因子回测神器 — Alphalens

还记得我们在前面采用的回测工具Backtrader吗?Backtrader是一款非常灵活的回测工具,基于它你能回测任何你想要测试的idea.

但是针对单因子回测,Backtrader 开发回测代码以及生成报告上并不算很方便,我们需要自己编写买卖逻辑,在生成的报告上也没有IC、IR、回撤等的数据分析,而实际上,从单因子回测的技术实现角度上来说,这些都是可以自动化生成的。

Alphalens就是一个专门实现单因子自动回测的神器,我们只要给它输入因子值的列,还有每支股票收盘价的数据,它就能自动生成数据分析及报告,并带有十几张可视化的报告数据统计图:

下面就带大家入门使用一下Alphalens,如果对你有帮助的话,记得点一下赞/在看哦。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install alphalens
pip install tushare
pip install pandas

2.数据预处理

正如前面所说,我们只需要把因子值和收盘价放入Alphalens中,就能自动生成回测和报告结果。

所以,我们90%的工作都会在数据处理这一部分,回测和分析都是抽象封装好的,并不需要太多地去担心它。

为了测试,我们导入tushare的数据进行测试:

import pandas as pd
import tushare as ts
from alphalens.utils import get_clean_factor_and_forward_returns
from alphalens.tears import create_full_tear_sheet

pro = ts.pro_api()
# 此接口获取的数据为未复权数据,回测建议使用复权数据,这里为批量获取股票数据做了简化
df = pro.daily(ts_code='000001.SZ,600982.SH', start_date='20200101', end_date='20211122')
df.index = pd.to_datetime(df['trade_date'])
df.index.name = None
df.sort_index(inplace=True)

这里获取了000001.SZ,600982.SH两只股票在2020-01-01到2021-11-22的日线数据,将交易日期设为了索引并排序。效果如下:

然后需要设置多索引的因子列 assets,第一个索引为日期,第二个索引为股票代码:

# 多索引的因子列,第一个索引为日期,第二个索引为股票代码
assets = df.set_index([df.index, df['ts_code']], drop=True)

​效果如下,仔细观察的话能发现其与导入的数据只有索引的不同:

然后,设置收盘价的Dataframe,这个与因子数据的格式不同,索引是时间,每一列是每只股票对应的收盘价:

# column为股票代码,index为日期,值为收盘价
close = df.pivot_table(index='trade_date', columns='ts_code', values='close')
close.index = pd.to_datetime(close.index)

到这一步,我们的初始化工作就完成了,下面就放到 Alphalens 进行测试。

3.Alphalens回测及报告

使用Alphalens进行回测,是非常轻松而写意的,只需要导入包,给它传递因子数据和收盘价数据即可:

from alphalens.utils import get_clean_factor_and_forward_returns
from alphalens.tears import create_full_tear_sheet

ret = get_clean_factor_and_forward_returns(assets[['pct_chg']], close)
create_full_tear_sheet(ret, long_short=False)

get_clean_factor_and_forward_returns 接受的第一个参数就是因子的列,我们只需要从前面预处理好的 assets 中任取一列作为因子进行回测即可,第二列是收盘价。

值得注意的是,因子数据在回测的时候,注意不要使用到未来数据,因为我们是用前一天的数据预测下一天的收盘价,所以要对因子列进行移位处理,这点一定要注意。

运行程序,就能生成如下的报告:

还有一点需要提醒大家的是,开源Alphalens的Quantopian公司已经倒闭,所以项目暂时没人维护了,部分代码没有适配最新的依赖,所以可能会有问题,比如下面的:

原本是通过 .get_values() 获得 input_periods, 但是 get_values 在 pandas 0.25.0 中已经被弃用,最新的pandas版本这里需要改成 .to_numpy() 才能生效。

除了这个小缺点,Alphalens整体上是非常符合大家单因子测试的需求的。它的分析报告可能没有那么齐全,我们也可以考虑在Alphalens的基础上增加其他的分析内容,如果能开源出来则更好了。

考虑到后续Alphalens没人维护,我fork了Alphalens,并增加了自己的改动,希望有余力的同学也能来一起贡献代码:
https://github.com/Ckend/alphalens

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python怎样存储变量性能最优?这篇文章告诉你答案

工作时我们经常会遇到需要临时保存结果变量的场景,尤其是一些数据处理、模型开发的场景,加载处理速度是个很漫长的过程,于是经常会把这些变量储存起来。

而储存变量最常见、最普遍的方法是用pickle,保存为pkl文件。但是如果从写入和读取的性能角度考虑,pkl可能真的不是最优选。

Pickle有其独特的好处,大部分变量不需要进行处理,都能直接存到pkl文件里,但这样的方便其实是牺牲了部分性能取得的。与之相比,numpy的.npy格式就比pickle性能上快不少。

当然,我们需要有证据支撑这个观点。所以今天我们就来做个实验,分别在Python2和Python3中对比 numpy 和 pickle 两种存储格式(.npy, .pkl) 对数据的存储和读取的性能对比。

部分内容参考分析自: https://applenob.github.io/python/save/

1. Python2中, npy与pkl的性能对比

首先初始化数据:

import numpy as np
import time
import cPickle as pkl
import os

all_batches = []
for i in range(20):
    a1 = np.random.normal(size=[25600, 40])
    label = np.random.normal(size=[25600, 1])
    all_batch = np.concatenate([a1, label], 1)
    all_batches.append(all_batch)
all_batches = np.array(all_batches)
print(all_batches.shape)
# (20, 25600, 41)

然后测试使用pickle保存和读取时间的耗时,以及整个文件的大小:

s_t1 = time.time()
pkl_name = "a.pkl"
with open(pkl_name, "wb") as f:
    pkl.dump(all_batches, f)
pkl_in_time = time.time() - s_t1
print("pkl dump costs {} sec".format(pkl_in_time))

s_t2 = time.time()
with open(pkl_name, "rb") as f:
    new_a = pkl.load(f)
pkl_out_time = time.time() - s_t2
print("pkl load costs {} sec".format(pkl_out_time))

pkl_size = os.path.getsize(pkl_name)
print("pkl file size: {} byte, {} mb".format(pkl_size, float(pkl_size)/(1024*1024)))

结果如下:

pkl dump costs 67.7483091354 sec
pkl load costs 52.1168899536 sec
pkl file size: 497437110 byte, 474.392995834 mb

然后再试一下npy的写入和读取:

s_t3 = time.time()
npy_name = "a.npy"
with open(npy_name, "wb") as f:
    np.save(f, arr=all_batches)
npy_in_time = time.time() - s_t3
print("npy save costs {} sec".format(npy_in_time))
s_t4 = time.time()
with open(npy_name, "rb") as f:
    new_a = np.load(f)
npy_out_time = time.time() - s_t4
print("npy load costs {} sec".format(npy_out_time))
npy_size = os.path.getsize(npy_name)
print("npy file size: {} byte, {} mb".format(npy_size, float(npy_size) / (1024 * 1024)))

结果如下:

npy save costs 20.718367815 sec
npy load costs 0.62314915657 sec
npy file size: 167936128 byte, 160.15637207 mb

结果发现,npy性能明显优于pkl格式。

通过多次测试发现,在Python2中,npy格式的性能优势全面碾压pkl,工程允许的情况下,在Python2中,我们应该在这二者中毫不犹豫地选择npy.

2.Python3中, npy与pkl的性能对比

Python2已经是过去式,重点还要看Python3.

在Python3中,与Python2的代码唯一一句不一样的是pickle的引入:

# Python2:
import cPickle as pkl

# Python3:
import pickle as pkl

其他代码基本一样,替换代码后,重新运行程序,让我们看看在Python3上,npy格式和pkl格式性能上的区别:

首先是pkl格式的表现:

ckenddeMacBook-Pro:Documents ckend$ python 1.py 
(20, 25600, 41)
pkl dump costs 24.32167887687683 sec
pkl load costs 4.480823040008545 sec
pkl file size: 167936163 byte, 160.15640544891357 mb

然后是npy格式的表现:

npy save costs 22.471696853637695 sec
npy load costs 0.3791017532348633 sec
npy file size: 167936080 byte, 160.1563262939453 mb

可以看到在Python3中pkl格式和npy格式的存储大小是基本相同的,在存储耗时上也相差无几。但是在读取数据的时候,npy相对于pkl还是有一定的优势的。

因此,如果你的程序非常注重读取效率,那么我觉得npy格式会比pkl格式更适合你。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandarallel 一个能让你的Pandas计算火力拉满的工具

没有使用Pandarallel
使用了Pandarallel

众所周知,由于GIL的存在,Python单进程中的所有操作都是在一个CPU核上进行的,所以为了提高运行速度,我们一般会采用多进程的方式。而多进程无非就是以下几种方案:

  • 1、multiprocessing
  • 2、concurrent.futures.ProcessPoolExecutor()
  • 3、joblib
  • 4、ppserver
  • 5、celery

这些方案对于普通Pandas玩家来说都不是特别友好,怎样才能算作一个友好的并行处理方案?就是原来的逻辑我基本不用变,仅修改需要计算的那行就能完成我们目标的方案,而 pandarallel 就是一个这样友好的工具。

没有并行计算(原始pandas) pandarallel
df.apply(func)df.parallel_apply(func)
df.applymap(func)df.parallel_applymap(func)
df.groupby(args).apply(func)df.groupby(args).parallel_apply(func)
df.groupby(args1).col_name.rolling(args2).apply(func)df.groupby(args1).col_name.rolling(args2).parallel_apply(func)
df.groupby(args1).col_name.expanding(args2).apply(func)df.groupby(args1).col_name.expanding(args2).parallel_apply(func)
series.map(func)series.parallel_map(func)
series.apply(func)series.parallel_apply(func)
series.rolling(args).apply(func)series.rolling(args).parallel_apply(func)

可以看到,在 pandarallel 的世界里,你只需要替换原有的 pandas 处理语句就能实现多CPU并行计算。非常方便、非常nice.

在4核CPU的性能测试上,它比原始语句快了接近4倍。测试条件(OS: Linux Ubuntu 16.04,Hardware: Intel Core i7 @ 3.40 GHz – 4 cores),这就是我所说的,它把CPU充分利用了起来。

下面就给大家介绍这个模块怎么用,其实非常简单,任何代码只需要加几行代码就能实现质的飞跃。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install pandarallel

对于windows用户,有一个不好的消息是,它只能在Windows的linux子系统上运行(WSL),你可以在微软官网上找到安装教程:

https://docs.microsoft.com/zh-cn/windows/wsl/about

2.使用Pandarallel

使用前,需要对Pandarallel进行初始化:

from pandarallel import pandarallel
pandarallel.initialize()

这样才能调用并行计算的API,不过 initialize 中有一个重要参数需要说明,那就是 nb_workers ,它将指定并行计算的Worker数,如果没有设置,所有CPU的核都会用上。

Pandarallel一共支持8种Pandas操作,下面是一个apply方法的例子。

import pandas as pd
import time
import math
import numpy as np
from pandarallel import pandarallel

# 初始化
pandarallel.initialize()
df_size = int(5e6)
df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),
                       b=np.random.rand(df_size)))
def func(x):
    return math.sin(x.a**2) + math.sin(x.b**2)

# 正常处理
res = df.apply(func, axis=1)

# 并行处理
res_parallel = df.parallel_apply(func, axis=1)

# 查看结果是否相同
res.equals(res_parallel)

其他方法使用上也是类似的,在原始的函数名称前加上 parallel_。比如DataFrame.groupby.apply:

import pandas as pd
import time
import math
import numpy as np
from pandarallel import pandarallel

# 初始化
pandarallel.initialize()
df_size = int(3e7)
df = pd.DataFrame(dict(a=np.random.randint(1, 1000, df_size),
                       b=np.random.rand(df_size)))
def func(df):
    dum = 0
    for item in df.b:
        dum += math.log10(math.sqrt(math.exp(item**2)))
        
    return dum / len(df.b)

# 正常处理
res = df.groupby("a").apply(func)
# 并行处理
res_parallel = df.groupby("a").parallel_apply(func)
res.equals(res_parallel)

又比如 DataFrame.groupby.rolling.apply:

import pandas as pd
import time
import math
import numpy as np
from pandarallel import pandarallel

# 初始化
pandarallel.initialize()
df_size = int(1e6)
df = pd.DataFrame(dict(a=np.random.randint(1, 300, df_size),
                       b=np.random.rand(df_size)))
def func(x):
    return x.iloc[0] + x.iloc[1] ** 2 + x.iloc[2] ** 3 + x.iloc[3] ** 4

# 正常处理
res = df.groupby('a').b.rolling(4).apply(func, raw=False)
# 并行处理
res_parallel = df.groupby('a').b.rolling(4).parallel_apply(func, raw=False)
res.equals(res_parallel)

案例都是类似的,这里就直接列出表格,不浪费大家宝贵的时间去阅读一些重复的例子了:

没有并行计算(原始pandas) pandarallel
df.apply(func)df.parallel_apply(func)
df.applymap(func)df.parallel_applymap(func)
df.groupby(args).apply(func)df.groupby(args).parallel_apply(func)
df.groupby(args1).col_name.rolling(args2).apply(func)df.groupby(args1).col_name.rolling(args2).parallel_apply(func)
df.groupby(args1).col_name.expanding(args2).apply(func)df.groupby(args1).col_name.expanding(args2).parallel_apply(func)
series.map(func)series.parallel_map(func)
series.apply(func)series.parallel_apply(func)
series.rolling(args).apply(func)series.rolling(args).parallel_apply(func)

3.注意事项

1. 我有 8 个 CPU,但 parallel_apply 只能加快大约4倍的计算速度。为什么?

答:正如我前面所言,Python中每个进程占用一个核,Pandarallel 最多只能加快到你所拥有的核心的总数,一个 4 核的超线程 CPU 将向操作系统显示 8 个 CPU,但实际上只有 4 个核心,因此最多加快4倍。

2. 并行化是有成本的(实例化新进程,通过共享内存发送数据,…),所以只有当并行化的计算量足够大时,并行化才是有意义的。对于很少量的数据,使用 Pandarallel 并不总是值得的。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

10分钟教你用Python开发钉钉通知机器人

在项目协同工作或自动化流程完成时,我们需要用一定的手段通知自己或他人。比如说,当服务器CPU使用率达到90%,发送告警信息给多名项目成员、或是股票自动化交易成交时发送通知给自己等应用场景。通知的手段有很多,使用邮件、Telegram都可以实现,但是它们都有各自的缺点。

邮件通知的方式存在滞后性,而且容易覆盖掉一些重要的邮件,整理起来非常繁琐。Telegram 非常好用,几个步骤就能创建一个机器人,可惜在国内无法使用,需要添加代理才能使用。

不过,前几天发现钉钉的机器人其实和Telegram的相差无几,用起来也相当舒服,因此今天给大家带来一个开发钉钉通知机器人的教程,非常简单,门槛极低,任何人都能用,每个人都能学会。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Python 环境准备完成后,我们就可以来创建一个钉钉机器人了。

1.1 打开钉钉软件,选择 ““, 再点击右上角+号,选择建场景群

1.2 这里可以选择任意一种群,我选择了培训群

1.3 群新建好后,点击右上角的齿轮—群设置,点击智能群助手。这里你也可以修改群的名字,点击名字右边的铅笔就能修改群名。

1.4 点击添加机器人

1.5 点击右上角的+号

1.6 选择自定义机器人,它能让我们通过Webhook接入自定义服务

1.7 然后输入机器人名字,安全设置选择加签,这一字符串你需要拷贝下来,发通知的时候就是我们的SECRET KEY.

1.8 点击完成后,会弹出创建成功的框框,请把这串webhook的链接拷贝下来,并将access_token参数复制下来,这一串 access_token 我们发送消息的时候也需要用到。

机器人创建完毕后,会在群聊中出现,然后我们就可以开始编写通知代码了。

2.Python 钉钉机器人通知代码

我们通过往 https://oapi.dingtalk.com/robot/send 地址发送 POST 请求的方式就能够利用钉钉自定义机器人发送消息。钉钉机器人支持两种消息内容:

  1. 纯文本信息
  2. Markdown信息

简单来讲,如果你的消息只有文本内容,就用第一种。如果你的消息内含图片和自定义格式,就用第二种。

纯文本消息,你的内容需要包含以下3种参数,并带2个内容体:

参数列表:

  1. access_token: 创建成功后返回的webhook链接里就有这个参数。
  2. sign: 就是我们选择加签安全设置中返回的SECRET.
  3. timestamp: 当前时间戳。

内容体包含:

  1. msgtype: 消息内容 text/markdown
  2. text: 文本内容

代码如下,非常简单:

# Python实用宝典
# 2021/11/13
import json
import hashlib
import base64
import hmac
import os
import time
import requests
from urllib.parse import quote_plus


class Messenger:
    def __init__(self, token=os.getenv("DD_ACCESS_TOKEN"), secret=os.getenv("DD_SECRET")):
        self.timestamp = str(round(time.time() * 1000))
        self.URL = "https://oapi.dingtalk.com/robot/send"
        self.headers = {'Content-Type': 'application/json'}
        secret = secret
        secret_enc = secret.encode('utf-8')
        string_to_sign = '{}\n{}'.format(self.timestamp, secret)
        string_to_sign_enc = string_to_sign.encode('utf-8')
        hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
        self.sign = quote_plus(base64.b64encode(hmac_code))
        self.params = {'access_token': token, "sign": self.sign}

    def send_text(self, content):
        """
        发送文本
        @param content: str, 文本内容
        """
        data = {"msgtype": "text", "text": {"content": content}}
        self.params["timestamp"] = self.timestamp
        return requests.post(
            url=self.URL,
            data=json.dumps(data),
            params=self.params,
            headers=self.headers
        )

使用的时候,请注意token和secret你既可以通过环境变量配置(DD_ACCESS_TOKEN和DD_SECRET),也可以直接传入给Messenger:

if __name__ == "__main__":
    m = Messenger(
        token="你的token",
        secret="你的secret"
    )
    m.send_text("测试一下,今天天气不错")

然后运行这个脚本,就能获取消息通知:

如果你只需要文本通知,那么到这里就已经实现了,如果你还需要发送图文消息或更多自定义内容体,请看下一节内容。

3.钉钉机器人支持Markdown

为了支持发送图片消息和自定义的文字格式,我们需要配置更多的参数:

    def send_md(self, title, content):
        """
        发送Markdown文本
        @param title: str, 标题
        @param content: str, 文本内容
        """
        data = {"msgtype": "markdown", "markdown": {"title": title, "text": content}}
        self.params["timestamp"] = self.timestamp
        return requests.post(
            url=self.URL,
            data=json.dumps(data),
            params=self.params,
            headers=self.headers
        )

msgtype改为markdown,并配置markdown的参数,包括:

  1. title: 标题
  2. content: markdown内容

这样,就能支持发送markdown消息了,我们试一下:

# Python实用宝典
# 2021/11/13
import json
import hashlib
import base64
import hmac
import os
import time
import requests
from urllib.parse import quote_plus


class Messenger:
    def __init__(self, token=os.getenv("DD_ACCESS_TOKEN"), secret=os.getenv("DD_SECRET")):
        self.timestamp = str(round(time.time() * 1000))
        self.URL = "https://oapi.dingtalk.com/robot/send"
        self.headers = {'Content-Type': 'application/json'}
        secret = secret
        secret_enc = secret.encode('utf-8')
        string_to_sign = '{}\n{}'.format(self.timestamp, secret)
        string_to_sign_enc = string_to_sign.encode('utf-8')
        hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
        self.sign = quote_plus(base64.b64encode(hmac_code))
        self.params = {'access_token': token, "sign": self.sign}

    def send_text(self, content):
        """
        发送文本
        @param content: str, 文本内容
        """
        data = {"msgtype": "text", "text": {"content": content}}
        self.params["timestamp"] = self.timestamp
        return requests.post(
            url=self.URL,
            data=json.dumps(data),
            params=self.params,
            headers=self.headers
        )

    def send_md(self, title, content):
        """
        发送Markdown文本
        @param title: str, 标题
        @param content: str, 文本内容
        """
        data = {"msgtype": "markdown", "markdown": {"title": title, "text": content}}
        self.params["timestamp"] = self.timestamp
        return requests.post(
            url=self.URL,
            data=json.dumps(data),
            params=self.params,
            headers=self.headers
        )


if __name__ == "__main__":
    markdown_text = "\n".join(open("md_test.md", encoding="utf-8").readlines())
    m = Messenger(
        token="你的token",
        secret="你的secret"
    )
    m.send_text("测试一下,今天天气不错")
    m.send_md("测试Markdown", markdown_text)

效果如下:

效果还是不错的,速度也非常快,一运行脚本,马上就能收到通知消息。大家可以在Python实用宝典公众号后台回复 钉钉 下载本文源代码,也可以在 https://github.com/Ckend/dd_notice 中找到源代码。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

微软开源!世界首个AI 量化投资平台 Qlib 使用教程

2020年9月,微软开源了AI量化投资平台Qlib的源代码,随后得到了不少的关注,Qlib的主要优势在于:

1.Python覆盖量化投资全过程,用户无需切换语言;​内置许多深度学习算法模型,降低AI算法使用的门槛。

2.内置A股、美股数据接入通道,基于qrun能够自动运行整个工作流程,大大提高开发效率。

3.每个组件都是松耦合可以独立使用,用户能够自行选用某些组件。

Qlib相比于我们之前介绍的backtrader,那功能完善太多。backtrader相当于给你提供一个基本的量化框架,数据、策略、算法,你全部自己搞定。而Qlib则从数据、到策略、到算法都给了你全套的解决方案,你只需要加一点自己的想法,不需要管其他细枝末节的东西就能完成AI量化研究,非常方便。

下面我们就来试一下 Qlib 的安装和运行内置算法策略。

1. 安装

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 (传统) 或 Python数据分析与挖掘好帮手—Anaconda 进行安装,本文建议使用Anaconda。

由于qlib有许多许多依赖,如果你不想安装过程中出现问题,或者引起其他程序的运行问题,建议使用Conda创建一个你的量化投资虚拟环境:

conda create -n my_quant python=3.8

Qlib 仅支持 Python3.7以上的版本且暂不支持 Python3.10. 另外Python 3.9 版本不支持模型性能绘制,因此我选择创建Python3.8版本的虚拟环境。

(安装方式一)pip 安装:

pip install pyqlib

在pip安装的过程中如果遇到任何问题,请搜索引擎解决,如果无法解决,可以尝试下面的源码安装:

(安装方式二)源码安装:

# 提前安装一些依赖
pip install numpy
pip install --upgrade  cython

# clone and install qlib
git clone https://github.com/microsoft/qlib.git && cd qlib
python setup.py install

Windows 机器在安装的时候可能会遇到下面这个问题:

这是因为安装 qlib 的依赖 — tables 时出现了编译错误,原因很多,我选择逃学,因此建议使用 tables 的 wheel 文件进行安装,这样就不需要编译了:

https://www.lfd.uci.edu/~gohlke/pythonlibs/#pytables

在上述网站下载适合你系统的 wheel 文件:

下载完毕后,输入以下命令:

pip install 你的文件路径/tables-3.6.1-cp39-cp39-win_amd64.whl

即可完成 tables 的安装,然后再执行一遍 python setup.py install 即可。

2. 数据准备

由于这套量化开源平台的作者是中国人,所以非常贴心地准备好了A股数据,大家可以输入命令直接下载:

# 1天级别数据
python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --region cn

# 1分钟级别数据
python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data_1min --region cn --interval 1min

如果你需要其他分钟级的数据,修改interval即可。

你可以使用crontab定时自动更新数据(来自雅虎财经):

* * * * 1-5 python <script path> update_data_to_bin --qlib_data_1d_dir <user data dir>

手动更新数据:

python scripts/data_collector/yahoo/collector.py update_data_to_bin --qlib_data_1d_dir <user data dir> --trading_date <start date> --end_date <end date>

3. 运行量化回测流程示例

Qlib 提供了一个名为 qrun 自动运行整个工作流程的工具(包括构建数据集、训练模型、回测和评估)。

你可以按照以下步骤启动自动量化研究工作流程并进行图形报告分析,Quant Research 工作流程:

Qrun 运行 lightgbm 工作流程的配置 workflow_config_lightgbm_Alpha158.yaml 如下所示:

cd examples  # Avoid running program under the directory contains `qlib`
qrun benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml

结果如下:

可以看到这里包括三个统计分析: benchmark return (基准收益) / excess return without cost(除去手续费的超额收益)) / excess return with cost(包含手续费的超额收益)。每个统计分析中都有如下5个参数:

  • mean: 异常收益的平均值
  • std: 异常收益的标准差
  • annualized_return: 年化回报
  • information_ratio: 信息比率
  • max_drawdown: 最大回撤

​如果你想要自定义这个策略和算法的回测参数,你可以查看 workflow_config_lightgbm_Alpha158.yaml 的内容:

qlib_init:
    provider_uri: "~/.qlib/qlib_data/cn_data"
    region: cn
market: &market csi300
benchmark: &benchmark SH000300
data_handler_config: &data_handler_config
    start_time: 2008-01-01
    end_time: 2020-08-01
    fit_start_time: 2008-01-01
    fit_end_time: 2014-12-31
    instruments: *market
port_analysis_config: &port_analysis_config
    strategy:
        class: TopkDropoutStrategy
        module_path: qlib.contrib.strategy
        kwargs:
            model: <MODEL>
            dataset: <DATASET>
            topk: 50
            n_drop: 5
    backtest:
        start_time: 2017-01-01
        end_time: 2020-08-01
        account: 100000000
        benchmark: *benchmark
        exchange_kwargs:
            limit_threshold: 0.095
            deal_price: close
            open_cost: 0.0005
            close_cost: 0.0015
            min_cost: 5
task:
    model:
        class: LGBModel
        module_path: qlib.contrib.model.gbdt
        kwargs:
            loss: mse
            colsample_bytree: 0.8879
            learning_rate: 0.2
            subsample: 0.8789
            lambda_l1: 205.6999
            lambda_l2: 580.9768
            max_depth: 8
            num_leaves: 210
            num_threads: 20
    dataset:
        class: DatasetH
        module_path: qlib.data.dataset
        kwargs:
            handler:
                class: Alpha158
                module_path: qlib.contrib.data.handler
                kwargs: *data_handler_config
            segments:
                train: [2008-01-01, 2014-12-31]
                valid: [2015-01-01, 2016-12-31]
                test: [2017-01-01, 2020-08-01]
    record: 
        - class: SignalRecord
          module_path: qlib.workflow.record_temp
          kwargs: 
            model: <MODEL>
            dataset: <DATASET>
        - class: SigAnaRecord
          module_path: qlib.workflow.record_temp
          kwargs: 
            ana_long_short: False
            ann_scaler: 252
        - class: PortAnaRecord
          module_path: qlib.workflow.record_temp
          kwargs: 
            config: *port_analysis_config

参数比较多,大家翻译一下应该都能看懂。这里摘取华泰的一个研究报告,里面对参数做了具体的翻译:

为了方便用户的使用,微软内置了许多模型,如上文我们用到的 gbdt 位于你克隆的文件夹下的 qlib/contrib/model/gbdt.py:

注意:pytorch 开头的模型需要预先安装pytorch.

Qlib里,策略和算法的区别是什么?

大家注意到,Qlib这里,必须定义策略和算法两个配置,而在backtrader里面,我们更加重视策略,而非“算法”这个概念。那么这两者在Qlib中的区别是什么?我们看默认TOPK策略的源代码:

可以看到,默认的这个策略,选择了算法预测分数结果中排名 TOP K 的股票,也就是策略从算法得到的结果中去做筛选需要交易的股票。算法相当于生成一个新的可用于判断买入卖出的评判标准。这就是策略和AI算法这两者的最重要区别。

最后,得益于松耦合的代码设计,我认为 Qlib 是一个能够让不同层次的研究者各取所需的开源项目,是一个不可多得的量化开源平台,特别适合重度Python使用者,有兴趣的朋友可以试一下,我未来也会考虑出 Qlib 相关的使用教程,敬请期待。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

有趣好用的Python教程

退出移动版
微信支付
请使用 微信 扫码支付