所有由Python实用宝典发布的文章

什么是强化学习?预测股票的效果如何?

强化学习是机器学习的方式之一,它与监督学习、无监督学习并列,是三种机器学习训练方法之一。

在围棋上击败世界第一的李世石的 AlphaGo、在《星际争霸2》中以 10:1 击败了人类顶级职业玩家的AlphaStar,他们都是强化学习模型。诸如此类的模型还有 AlphaGo Zero 等。

强化学习的原理非常简单,它非常像心理学中新行为主义派的斯金纳发现的操作性条件反射。

操作性条件反射是什么?当年斯金纳做了一个箱子,进行了两次实验。

第一次实验,箱子里放了一只饥饿的老鼠,在箱子的一边有一个可供按压的杠杆,在杠杆旁边有一个放置食物的小盒子。动物在箱内按下杠杆,食盒就会释放食物进入箱内,动物可以取食。结果:小鼠自发学会了按按钮。这是积极强化。

另一次实验是,每次小白鼠不按下按钮,则给箱子通电,小白鼠因此学会了按按钮以防自己遭受电击。这是消极强化(负向强化)。

这就是斯金纳发现的操作性条件反射,当行为得到奖励或惩罚时出现刺激,反过来控制这种行为。

强化学习与操作性条件反射有异曲同工之妙,以人类玩游戏为例,如果在游戏中采取某种策略购买某种类型的装备可以取得较高的得分,那么就会进一步“强化”这种策略,以期继续取得较好的结果。

网上有不少强化学习的例子,鉴于读者中对股票感兴趣的同学比较多,我们以股票预测为例,实验一下 wangshubRL-Stock 项目。

使用强化学习预测股价,需要在决策的时候采取合适的行动 (Action) 使最后的奖励最大化。与监督学习预测未来的数值不同,强化学习根据输入的状态(如当日开盘价、收盘价等),输出系列动作(例如:买进、持有、卖出),并对好的动作结果不断进行奖励,对差的动作结果不断进行惩罚,使得最后的收益最大化,实现自动交易。

下面就试一下这个强化学习项目,前往GitHub下载 RL-Stock

如果你无法使用GitHub,也可以在Python实用宝典公众号后台回复:股票强化学习1 下载全文完整代码,包括第三部分的多进程优化逻辑。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

请注意,由于TensorFlow版本限制,这个强化学习项目只支持 Python3 以上,Python3.6 及以下的版本,因此我建议使用Anaconda创建一个新的虚拟环境运行这个项目:

conda create -n rlstock python=3.6

另外,实测依赖需要改动 requirements.txt 的tensorflow-gpu版本至1.14:

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),进入 RL-Stock 项目文件夹输入命令安装依赖:

pip install -r requirements.txt

2.小试强化学习

运行RL-Stock项目前,需要下载数据。进入刚创建的虚拟环境,运行get_stock_data.py代码会自动下载数据到 stockdata 目录中:

python get_stock_data.py

如果你使用的是在Github上下载的代码而不是Python实用宝典后台改好的代码,请注意 get_stock_data.py 的第46行,必须对 row[“code_name”] 去除 * 号,否则Windows系统下可能会存在问题:

df_code.to_csv(f'{self.output_dir}/{row["code"]}.{row["code_name"].strip("*")}.csv', index=False)

数据下载完成后就可以运行 main.py 执行强化学习训练和测试,不过在训练之前,我们先简单了解下整个项目的输入状态、动作、和奖励函数。

输入状态(观测 Observation)

策略网络观测的就是一只股票的各类数据,比如开盘价、收盘价、成交量等,它可以由许多因子组成。为了训练时网络收敛,观测状态数据输入时必须要进行归一化,变换到 [-1, 1] 的区间内。RL-Stock输入的观测数据字段如下:

参数名称参数描述说明
date交易所行情日期格式:YYYY-MM-DD
code证券代码格式:sh.600000。sh:上海,sz:深圳
open今开盘价格精度:小数点后4位;单位:人民币元
high最高价精度:小数点后4位;单位:人民币元
low最低价精度:小数点后4位;单位:人民币元
close今收盘价精度:小数点后4位;单位:人民币元
preclose昨日收盘价精度:小数点后4位;单位:人民币元
volume成交数量单位:股
amount成交金额精度:小数点后4位;单位:人民币元
adjustflag复权状态不复权、前复权、后复权
turn换手率精度:小数点后6位;单位:%
tradestatus交易状态1:正常交易 0:停牌
pctChg涨跌幅(百分比)精度:小数点后6位
peTTM滚动市盈率精度:小数点后6位
psTTM滚动市销率精度:小数点后6位
pcfNcfTTM滚动市现率精度:小数点后6位
pbMRQ市净率精度:小数点后6位

动作 Action

共有买入卖出持有 3 种动作,定义动作(action)为长度为 2 的数组

  • action[0] 为操作类型;
  • action[1] 表示买入或卖出百分比;
动作类型 action[0]说明
1买入 action[1]
2卖出 action[1]
3持有

注意,当动作类型 action[0] = 3 时,表示不买也不抛售股票,此时 action[1] 的值无实际意义,网络在训练过程中,Agent 会慢慢学习到这一信息。Agent,实称代理,在我们的上下文中,你可以视其为策略。

奖励 Reward

奖励函数的设计,对强化学习的目标至关重要。在股票交易的环境下,最应该关心的就是当前的盈利情况,故用当前的利润作为奖励函数。

# profits
reward = self.net_worth - INITIAL_ACCOUNT_BALANCE
reward = 1 if reward > 0 else -100

为了使网络更快学习到盈利的策略,当利润为负值时,给予网络一个较大的惩罚 (-100)。

梯度策略

作者采用了基于策略梯度的PPO 算法,OpenAI 和许多文献已把 PPO 作为强化学习研究中首选的算法。PPO 优化算法 Python 实现参考 stable-baselines

数据集及自定义

在数据集上,作者使用了1990年至2019年11月作为训练集,2019年12月作为测试集。

1990-01-01 ~ 2019-11-292019-12-01 ~ 2019-12-31
训练集测试集

如果你要调整这个训练集和测试集的时间,可以更改 get_stock_data.py 的以下部分:

if __name__ == '__main__':
    # 获取全部股票的日K线数据
    
    # 训练集
    mkdir('./stockdata/train')
    downloader = Downloader('./stockdata/train', date_start='1990-01-01', date_end='2019-11-29')
    downloader.run()
	# 测试集
    mkdir('./stockdata/test')
    downloader = Downloader('./stockdata/test', date_start='2019-12-01', date_end='2019-12-31')
    downloader.run()

训练并测试

首先,我们尝试一下单一代码的训练和测试,修改main.py里的股票代码,比如我这里修改为601919中远海控:

if __name__ == '__main__':
    # multi_stock_trade()
    test_a_stock_trade('sh.601919')
    # ret = find_file('./stockdata/train', '601919')
    # print(ret)

运行下面的命令,执行此深度学习模型的训练和测试。

python main.py

训练完成后,会自动进行模拟操作测试集这20个交易日,然后会输出这20个交易日的测试结果:

------------------------------
Step: 20
Balance: 0.713083354256014
Shares held: 2060 (Total sold: 2392)
Avg cost for held shares: 5.072161917927474 (Total sales value: 12195.091008936648)
Net worth: 10930.56492977963 (Max net worth: 10930.56492977963)
Profit: 930.5649297796299
------------------------------
Step: 21
Balance: 0.713083354256014
Shares held: 2060 (Total sold: 2392)
Avg cost for held shares: 5.072161917927474 (Total sales value: 12195.091008936648)
Net worth: 10815.713083354256 (Max net worth: 10930.56492977963)
Profit: 815.713083354256

利润图如下:

然后我们看一下中远海控2019年12月的走势:

可以看到这个月的中远海控是一个上升趋势,一共上涨了12%,而这个模型捕捉到其中8%左右的利润,还是相当不错的。当然,凡事不能只看个体,下面我们修改下作者的源代码,试一下其在市场里的整体表现。

3.强化学习模型整体表现

由于作者原有的模型是单进程的计算,为了测试全市场的表现,我进行了多进程改造。

我将作者的训练及测试任务集成到一个函数中,并使用celery做并行:

@app.task
def multi_stock_trade(code):
    stock_file = find_file('./stockdata/train', str(code))
    if stock_file:
        try:
            profits = stock_trade(stock_file)
            with open(f'result/code-{code}.pkl', 'wb') as f:
                pickle.dump(profits, f)
        except Exception as err:
            print(err)

将测试集的测试周期改为最近一个月:

1990-01-01 ~ 2021-11-252021-11-26 ~ 2021-12-25
训练集测试集

开启redis-server 及 Celery Worker:

# redis-server 独占一个进程,所以需要另开一个窗口
celery -A tasks worker -l info

遍历所有的股票代码做并发测试:

files = os.listdir("stockdata/train")
files_test = os.listdir("stockdata/test")
all_files_list = list(set(files) & set(files_test))
for i in all_files_list:
    # 使用celery做并发
    code = ".".join(i.split(".")[:2])
    # multi_stock_trade.apply_async(args=(code,))
    multi_stock_trade(code)

再对生成的结果进行统计,测试结果如下:

对这个模型在2021-11-26到2021-12-25的测试结果表明,有40.8%的股票进行了交易并且获利,有49.9%的股票没有进行操作,有9.4%的股票进行了交易并亏损。平均每次交易利润为445元,作为一个测试策略,这个结果已经很不错了。

由于只是一个测试策略,这里就不做详细的风险分析了,实际上我们还需要观察这个策略的最大回撤率、夏普率等指标才能更好地评判此策略的好坏。

我认为这个项目还有很大的改造空间,原逻辑中只观察了OHLC等基本数据,我们还可以增加很多指标,比如基于Ta-lib,算出MACD、RSI等技术指标,再将其加入Observation中,让模型观察学习这些数据的特征,可能会有不错的表现。有兴趣的同学可以试一下,本文源代码存放于:

https://github.com/Ckend/pythondict-quant

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pydantic — 强大的数据校验工具,比DRF快12倍

Pydantic 是一个使用Python类型注解进行数据验证和管理的模块。安装方法非常简单,打开终端输入:

pip install pydantic

它类似于 Django DRF 序列化器的数据校验功能,不同的是,Django里的序列化器的Field是有限制的,如果你想要使用自己的Field还需要继承并重写它的基类:

# Django 序列化器
class Book(models.Model):
    id = models.AutoField(primary_key=True)
    name = models.CharField(max_length=32)
    price = models.DecimalField(max_digits=5, decimal_places=2)
    author = models.CharField(max_length=32)
    publish = models.CharField(max_length=32)

而 Pydantic 基于Python3.7以上的类型注解特性,实现了可以对任何类做数据校验的功能:

# Pydantic 数据校验功能
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel


class User(BaseModel):
    id: int
    name = 'John Doe'
    signup_ts: Optional[datetime] = None
    friends: List[int] = []


external_data = {
    'id': '123',
    'signup_ts': '2019-06-01 12:22',
    'friends': [1, 2, '3'],
}
user = User(**external_data)
print(user.id)
print(type(user.id))
#> 123
#> <class 'int'>
print(repr(user.signup_ts))
#> datetime.datetime(2019, 6, 1, 12, 22)
print(user.friends)
#> [1, 2, 3]
print(user.dict())
"""
{
    'id': 123,
    'signup_ts': datetime.datetime(2019, 6, 1, 12, 22),
    'friends': [1, 2, 3],
    'name': 'John Doe',
}
"""

从上面的基本使用可以看到,它甚至能自动帮你做数据类型的转换,比如代码中的 user.id, 在字典中是字符串,但经过Pydantic校验器后,它自动变成了int型,因为User类里的注解就是int型。

当我们的数据和定义的注解类型不一致时会报这样的Error:

from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel


class User(BaseModel):
    id: int
    name = 'John Doe'
    signup_ts: Optional[datetime] = None
    friends: List[int] = []


external_data = {
    'id': '123',
    'signup_ts': '2019-06-01 12:222',
    'friends': [1, 2, '3'],
}
user = User(**external_data)
"""
Traceback (most recent call last):
  File "1.py", line 18, in <module>
    user = User(**external_data)
  File "pydantic\main.py", line 331, in pydantic.main.BaseModel.__init__
pydantic.error_wrappers.ValidationError: 1 validation error for User
signup_ts
  invalid datetime format (type=value_error.datetime)
"""

即 “invalid datetime format”, 因为我传入的 signup_ts 不是标准的时间格式(多了个2)。

1.Pydantic 模型数据导出

通过Pydantic模型中自带的 json 属性方法,能让经过校验后的数据一行命令直接转成 json 字符串,如前文中的user对象:

print(user.dict())  # 转为字典
"""
{
    'id': 123,
    'signup_ts': datetime.datetime(2019, 6, 1, 12, 22),
    'friends': [1, 2, 3],
    'name': 'John Doe',
}
"""
print(user.json())  # 转为json
"""
{"id": 123, "signup_ts": "2019-06-01T12:22:00", "friends": [1, 2, 3], "name": "John Doe"}
"""

非常方便。它还支持将整个数据结构导出为 schema json,它能完整地描述整个对象的数据结构类型:

print(user.schema_json(indent=2))
"""
{
  "title": "User",
  "type": "object",
  "properties": {
    "id": {
      "title": "Id",
      "type": "integer"
    },
    "signup_ts": {
      "title": "Signup Ts",
      "type": "string",
      "format": "date-time"
    },
    "friends": {
      "title": "Friends",
      "default": [],
      "type": "array",
      "items": {
        "type": "integer"
      }
    },
    "name": {
      "title": "Name",
      "default": "John Doe",
      "type": "string"
    }
  },
  "required": [
    "id"
  ]
}
"""

2.数据导入

除了直接定义数据校验模型,它还能通过ORM、字符串、文件导入到数据校验模型:

比如字符串(raw):

from datetime import datetime
from pydantic import BaseModel


class User(BaseModel):
    id: int
    name = 'John Doe'
    signup_ts: datetime = None
      
m = User.parse_raw('{"id": 123, "name": "James"}')
print(m)
#> id=123 signup_ts=None name='James'

此外,它能直接将ORM的对象输入,转为Pydantic的对象,比如从Sqlalchemy ORM:

from typing import List
from sqlalchemy import Column, Integer, String
from sqlalchemy.dialects.postgresql import ARRAY
from sqlalchemy.ext.declarative import declarative_base
from pydantic import BaseModel, constr

Base = declarative_base()


class CompanyOrm(Base):
    __tablename__ = 'companies'
    id = Column(Integer, primary_key=True, nullable=False)
    public_key = Column(String(20), index=True, nullable=False, unique=True)
    name = Column(String(63), unique=True)
    domains = Column(ARRAY(String(255)))


class CompanyModel(BaseModel):
    id: int
    public_key: constr(max_length=20)
    name: constr(max_length=63)
    domains: List[constr(max_length=255)]

    class Config:
        orm_mode = True


co_orm = CompanyOrm(
    id=123,
    public_key='foobar',
    name='Testing',
    domains=['example.com', 'foobar.com'],
)
print(co_orm)
#> <models_orm_mode.CompanyOrm object at 0x7f0bdac44850>
co_model = CompanyModel.from_orm(co_orm)
print(co_model)
#> id=123 public_key='foobar' name='Testing' domains=['example.com',
#> 'foobar.com']

从Json文件导入:

from datetime import datetime
from pathlib import Path
from pydantic import BaseModel


class User(BaseModel):
    id: int
    name = 'John Doe'
    signup_ts: datetime = None
      
path = Path('data.json')
path.write_text('{"id": 123, "name": "James"}')
m = User.parse_file(path)
print(m)

从pickle导入:

import pickle
from datetime import datetime
from pydantic import BaseModel

pickle_data = pickle.dumps({
    'id': 123,
    'name': 'James',
    'signup_ts': datetime(2017, 7, 14)
})
m = User.parse_raw(
    pickle_data, content_type='application/pickle', allow_pickle=True
)
print(m)
#> id=123 signup_ts=datetime.datetime(2017, 7, 14, 0, 0) name='James'

3.自定义数据校验

你还能给它增加 validator 装饰器,增加你需要的校验逻辑:

from pydantic import BaseModel, ValidationError, validator


class UserModel(BaseModel):
    name: str
    username: str
    password1: str
    password2: str

    @validator('name')
    def name_must_contain_space(cls, v):
        if ' ' not in v:
            raise ValueError('must contain a space')
        return v.title()

    @validator('password2')
    def passwords_match(cls, v, values, **kwargs):
        if 'password1' in values and v != values['password1']:
            raise ValueError('passwords do not match')
        return v

    @validator('username')
    def username_alphanumeric(cls, v):
        assert v.isalnum(), 'must be alphanumeric'
        return v

上面,我们增加了三种自定义校验逻辑:

1.name 必须带有空格

2.password2 必须和 password1 相同

3.username 必须为字母

让我们试试这三个校验是否成功实现:

user = UserModel(
    name='samuel colvin',
    username='scolvin',
    password1='zxcvbn',
    password2='zxcvbn',
)
print(user)
#> name='Samuel Colvin' username='scolvin' password1='zxcvbn' password2='zxcvbn'

try:
    UserModel(
        name='samuel',
        username='scolvin',
        password1='zxcvbn',
        password2='zxcvbn2',
    )
except ValidationError as e:
    print(e)
    """
    2 validation errors for UserModel
    name
      must contain a space (type=value_error)
    password2
      passwords do not match (type=value_error)
    """

可以看到,第一个UserModel里的数据完全没有问题,通过校验。

第二个UserModel里的数据,由于name存在空格,password2和password1不一致,无法通过校验。

4.性能表现

这是最令我惊讶的部分,Pydantic 比 Django-rest-framework 还快了12.3倍:

PackageVersionRelative PerformanceMean validation time
pydantic1.7.393.7μs
attrs + cattrs20.3.01.5x slower143.6μs
valideer0.4.21.9x slower175.9μs
marshmallow3.10.02.4x slower227.6μs
voluptuous0.12.12.7x slower257.5μs
trafaret2.1.03.2x slower296.7μs
schematics2.1.010.2x slower955.5μs
django-rest-framework3.12.212.3x slower1148.4μs
cerberus1.3.225.9x slower2427.6μs

而且他们的所有基准测试代码都是开源的,你可以在下面这个Github链接找到:

https://github.com/samuelcolvin/pydantic/tree/master/benchmarks

如果你的网络无法访问GitHub,请关注Python实用宝典公众号后台回复Pydantic获取。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 教你3分钟用Bert搭建问答搜索引擎

鼎鼎大名的 Bert 算法相信大部分同学都听说过,它是Google推出的NLP领域“王炸级”预训练模型,其在NLP任务中刷新了多项记录,并取得state of the art的成绩。

但是有很多深度学习的新手发现BERT模型并不好搭建,上手难度很高,普通人可能要研究几天才能勉强搭建出一个模型。

没关系,今天我们介绍的这个模块,能让你在3分钟内基于BERT算法搭建一个问答搜索引擎。它就是 bert-as-service 项目。这个开源项目,能够让你基于多GPU机器快速搭建BERT服务(支持微调模型),并且能够让多个客户端并发使用。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install bert-serving-server  # 服务端
pip install bert-serving-client  # 客户端

请注意,服务端的版本要求:Python >= 3.5Tensorflow >= 1.10 。

此外还要下载预训练好的BERT模型,在 https://github.com/hanxiao/bert-as-service#install 上可以下载。

也可在Python实用宝典后台回复 bert-as-service 下载这些预训练好的模型。

下载完成后,将 zip 文件解压到某个文件夹中,例如 /tmp/english_L-12_H-768_A-12/.

2.Bert-as-service 基本使用

安装完成后,输入以下命令启动BERT服务:

bert-serving-start -model_dir /tmp/english_L-12_H-768_A-12/ -num_worker=4 

-num_worker=4 代表这将启动一个有四个worker的服务,意味着它最多可以处理四个并发请求。超过4个其他并发请求将在负载均衡器中排队等待处理。

下面显示了正确启动时服务器的样子:

使用客户端获取语句的编码

现在你可以简单地对句子进行编码,如下所示:

from bert_serving.client import BertClient
bc = BertClient()
bc.encode(['First do it', 'then do it right', 'then do it better'])

作为 BERT 的一个特性,你可以通过将它们与 |||(前后有空格)连接来获得一对句子的编码,例如

bc.encode(['First do it ||| then do it right'])

远程使用 BERT 服务

你还可以在一台 (GPU) 机器上启动服务并从另一台 (CPU) 机器上调用它,如下所示:

# on another CPU machine
from bert_serving.client import BertClient
bc = BertClient(ip='xx.xx.xx.xx')  # ip address of the GPU machine
bc.encode(['First do it', 'then do it right', 'then do it better'])

3.搭建问答搜索引擎

我们将通过 bert-as-service 从FAQ 列表中找到与用户输入的问题最相似的问题,并返回相应的答案。

FAQ列表你也可以在 Python实用宝典后台回复 bert-as-service 下载。

首先,加载所有问题,并显示统计数据:

prefix_q = '##### **Q:** '
with open('README.md') as fp:
    questions = [v.replace(prefix_q, '').strip() for v in fp if v.strip() and v.startswith(prefix_q)]
    print('%d questions loaded, avg. len of %d' % (len(questions), np.mean([len(d.split()) for d in questions])))
    # 33 questions loaded, avg. len of 9

一共有33个问题被加载,平均长度是9.

然后使用预训练好的模型:uncased_L-12_H-768_A-12 启动一个Bert服务:

bert-serving-start -num_worker=1 -model_dir=/data/cips/data/lab/data/model/uncased_L-12_H-768_A-12

接下来,将我们的问题编码为向量:

bc = BertClient(port=4000, port_out=4001)
doc_vecs = bc.encode(questions)

最后,我们准备好接收用户的查询,并对现有问题执行简单的“模糊”搜索。

为此,每次有新查询到来时,我们将其编码为向量并计算其点积 doc_vecs;然后对结果进行降序排序,返回前N个类似的问题:

while True:
    query = input('your question: ')
    query_vec = bc.encode([query])[0]
    # compute normalized dot product as score
    score = np.sum(query_vec * doc_vecs, axis=1) / np.linalg.norm(doc_vecs, axis=1)
    topk_idx = np.argsort(score)[::-1][:topk]
    for idx in topk_idx:
        print('> %s\t%s' % (score[idx], questions[idx]))

完成!现在运行代码并输入你的查询,看看这个搜索引擎如何处理模糊匹配:

完整代码如下,一共23行代码(在后台回复关键词也能下载):

import numpy as np
from bert_serving.client import BertClient
from termcolor import colored

prefix_q = '##### **Q:** '
topk = 5

with open('README.md') as fp:
    questions = [v.replace(prefix_q, '').strip() for v in fp if v.strip() and v.startswith(prefix_q)]
    print('%d questions loaded, avg. len of %d' % (len(questions), np.mean([len(d.split()) for d in questions])))

with BertClient(port=4000, port_out=4001) as bc:
    doc_vecs = bc.encode(questions)

    while True:
        query = input(colored('your question: ', 'green'))
        query_vec = bc.encode([query])[0]
        # compute normalized dot product as score
        score = np.sum(query_vec * doc_vecs, axis=1) / np.linalg.norm(doc_vecs, axis=1)
        topk_idx = np.argsort(score)[::-1][:topk]
        print('top %d questions similar to "%s"' % (topk, colored(query, 'green')))
        for idx in topk_idx:
            print('> %s\t%s' % (colored('%.1f' % score[idx], 'cyan'), colored(questions[idx], 'yellow')))

够简单吧?当然,这是一个基于预训练的Bert模型制造的一个简单QA搜索模型。

你还可以微调模型,让这个模型整体表现地更完美,你可以将自己的数据放到某个目录下,然后执行 run_classifier.py 对模型进行微调,比如这个例子:

https://github.com/google-research/bert#sentence-and-sentence-pair-classification-tasks

它还有许多别的用法,我们这里就不一一介绍了,大家可以前往官方文档学习:

https://github.com/hanxiao/bert-as-service

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

新年新气象,超级文献下载工具更新了!一行命令下载全网任意文献

之前为了解决学生无力支付国内部分论文平台的付费阅读功能的问题,我们推出了超级文献下载工具:你不得不知道的python超级文献批量搜索下载工具

在最初的这几个版本中,我们必须通过编写代码才能选择不同的文献源去搜索和下载文献。很多同学在使用过程中会由于对Python不熟悉或者环境没有配置好而产生不少问题。

为了解决这些问题,我们给他增加了命令行调用的方式,并上传到了PyPi,你只需要一行命令,就能下载到你所需要的文献!(感谢 @hulei6188 的开源贡献)

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install scihub-cn

看到 Successfully installed … 就代表成功安装scihub-cn。

不过请注意,scihub-cn依赖 aiohttp 模块进行并发的下载,因此支持的最低Python版本为3.6.

项目源代码:https://github.com/Ckend/scihub-cn

2.Scihub-cn 使用方法

2.1 使用DOI号下载论文

首先让我们来试试根据DOI号下载文献:

scihub-cn -d 10.1038/s41524-017-0032-0

下载的论文会自动生成在当前文件夹下:

你也可以选择将其下载到任意目录下,只需要添加 -o 参数:

scihub-cn -d 10.1038/s41524-017-0032-0 -o D:\papers

这将会把这篇论文下载到D盘的papers文件夹中。

2.2 根据关键词下载论文

使用 -w 参数指定一个关键词,可以通过关键词下载论文:

scihub-cn -w reinforcement

同样滴,它也支持-o参数指定文件夹。此外,这里默认使用的搜索引擎是百度学术,你也可以使用Google学术、publons、science_direct等。通过指定 -e 参数即可:

scihub-cn -w reinforcement -e google_scholar

为了避免Google学术无法连接,你还可以增加代理 -p 参数:

scihub-cn -w reinforcement -e google_scholar -p http://127.0.0.1:10808

访问外网数据源的时候,增加代理能避免出现Connection closed等问题。

此外,你还能限定下载的篇目, 比如我希望下载100篇文章:

scihub-cn -w reinforcement -l 100

2.3 根据url下载论文

给定任意论文地址,可以让scihub-cn尝试去下载该论文:

scihub-cn -u https://ieeexplore.ieee.org/document/26502

使用 -u 参数指定论文链接即可,非常方便。

3.批量下载论文

当然,之前花了几篇文章优化的批量下载模块这个版本肯定少不了!

而且还增加了几种新的批量下载方式:

1. 根据给出所有论文名称的txt文本文件下载论文。

2. 根据给出所有论文url的txt文件下载论文。

3. 根据给出所有论文DOI号的txt文本文件下载论文。

4. 根据给出bibtex文件下载论文。

比如,根据给出所有论文URL的txt文件下载论文:

scihub-cn -i urls.txt --url

可以看到,文件内有4个论文链接,而他也成功地下载到了这4篇论文。

再试试放了DOI号的txt文件的批量下载:

scihub-cn -i dois.txt --doi

你可以输入 scihub-cn –help 看到更多的参数说明:

$scihub-cn --help
... ...
optional arguments:
  -h, --help            show this help message and exit
  -u URL                input the download url
  -d DOI                input the download doi
  --input INPUTFILE, -i INPUTFILE
                        input download file
  -w WORDS, --words WORDS
                        download from some key words,keywords are linked by
                        _,like machine_learning.
  --title               download from paper titles file
  -p PROXY, --proxy PROXY
                        use proxy to download papers
  --output OUTPUT, -o OUTPUT
                        setting output path
  --doi                 download paper from dois file
  --bib                 download papers from bibtex file
  --url                 download paper from url file
  -e SEARCH_ENGINE, --engine SEARCH_ENGINE
                        set the search engine
  -l LIMIT, --limit LIMIT
                        limit the number of search result

大家如果有更多的想法,可以往我们这个开源项目贡献代码:

https://github.com/Ckend/scihub-cn

本文仅限参考研究,下载的论文请在24小时内阅读后删除,请勿将此项目用于商业目的。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

这个神奇的库,可以将数据平滑化并找到异常点

在处理数据的时候,我们经常会遇到一些非连续的散点时间序列数据:

有些时候,这样的散点数据是不利于我们进行数据的聚类和预测的。因此我们需要把它们平滑化,如下图所示:

将散点都去除,平滑后的效果如下:

​这样的时序数据是不是看起来舒服多了?​此外,使用平滑后的时序数据去做聚类或预测或许有令人惊艳的效果,因为它去除了一些偏差值并细化了数据的分布范围。

如果我们自己开发一个这样的平滑工具,会耗费不少的时间。​因为平滑的技术有很多种,你需要一个个地去研究,找到最合适的技术并编写代码,这是一个非常耗时的过程。平滑技术包括但不限于:

  • 指数平滑
  • 具有各种窗口类型(常数、汉宁、汉明、巴特利特、布莱克曼)的卷积平滑
  • 傅立叶变换的频谱平滑
  • 多项式平滑
  • 各种样条平滑(线性、三次、自然三次)
  • 高斯平滑
  • 二进制平滑

所幸,有大佬已经为我们实现好了时间序列的这些平滑技术,并在GitHub上开源了这份模块的代码——它就是 tsmoothie。

下面就让我们来试一下 tsmoothie.

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install tsmoothie

PS, Tsmoothie仅支持Python 3.6 及以上的版本。

2.Tsmoothie 基本使用

为了尝试Tsmoothie的效果,我们需要生成随机数据:

import numpy as np
import matplotlib.pyplot as plt
from tsmoothie.utils_func import sim_randomwalk
from tsmoothie.smoother import LowessSmoother

# 生成 3 个长度为200的随机数据组
np.random.seed(123)
data = sim_randomwalk(n_series=3, timesteps=200, 
                      process_noise=10, measure_noise=30)

然后使用Tsmoothie执行平滑化:

# 平滑
smoother = LowessSmoother(smooth_fraction=0.1, iterations=1)
smoother.smooth(data)

通过 smoother.smooth_data 你就可以获取平滑后的数据:

print(smoother.smooth_data)
# [[   5.21462928    3.07898076    0.93933646   -1.19847767   -3.32294934 
#     -5.40678762   -7.42425709   -9.36150892  -11.23591897  -13.05271523 
#      .......       .......       .......      .......       .......   ]]

绘制效果图:

# 生成范围区间
low, up = smoother.get_intervals('prediction_interval')

plt.figure(figsize=(18,5))

for i in range(3):
    
    plt.subplot(1,3,i+1)
    plt.plot(smoother.smooth_data[i], linewidth=3, color='blue')
    plt.plot(smoother.data[i], '.k')
    plt.title(f"timeseries {i+1}"); plt.xlabel('time')

    plt.fill_between(range(len(smoother.data[i])), low[i], up[i], alpha=0.3)

3.基于Tsmoothie的极端异常值检测

事实上,基于smoother生成的范围区域,我们可以进行异常值的检测:

可以看到,在蓝色范围以外的点,都属于异常值。我们可以轻易地将这些异常值标红或记录,以便后续的处理。

_low, _up = smoother.get_intervals('sigma_interval', n_sigma=2)
series['low'] = np.hstack([series['low'], _low[:,[-1]]])
series['up'] = np.hstack([series['up'], _up[:,[-1]]])
is_anomaly = np.logical_or(
    series['original'][:,-1] > series['up'][:,-1], 
    series['original'][:,-1] < series['low'][:,-1]
).reshape(-1,1)

假设蓝色范围interval的最大值为up、最小值为low,如果存在 data > up 或 data < low 则表明此数据是异常点。

这些异常点并非都是负面作用,在不同的应用场景下,它们可能代表了不同的意义。

比如在股票中,它或许可以代表着震荡行情中某种趋势反转的信号。

或者在家庭用电量分析中,它可能代表着某个时刻的用电峰值,根据这个峰值我们可以此时此刻开启了什么样的电器。

所以异常点的作用需要根据不同应用场景进行不同的分析,才能找到它真正的价值。

总而言之,Tsmoothie 不仅可以使用多种平滑技术平滑化我们的时序数据,还可以根据平滑结果找出数据中的离群点,是我们做数据分析和研究的一个好帮手,非常有价值。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Dynaconf 轻松实现 Python 动态配置管理

Dynaconf 是一个库,旨在成为在 Python 中管理配置的最佳选择。

它可以从各种来源读取设置,包括环境变量、文件、服务器配置等。

它适用于任何类型的 Python 程序,包括 Flask 和 Django 扩展。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install dynaconf

2.初步使用DynaConf

在你的项目的根目录中运行 dynaconf init 命令。

cd path/to/your/project/
dynaconf init -f toml

会有类似如下的输出,说明初始化完成:

⚙️  Configuring your Dynaconf environment
------------------------------------------
🐍 The file `config.py` was generated.

🎛️  settings.toml created to hold your settings.

🔑 .secrets.toml created to hold your secrets.

🙈 the .secrets.* is also included in `.gitignore`
beware to not push your secrets to a public repo.

🎉 Dynaconf is configured! read more on https://dynaconf.com

刚刚初始化的时候我们选择了 toml 格式。实际上你还可以选择 toml|yaml|json|ini|py,不过 toml 是默认的,也是最推荐的配置格式。

初始化完成后会创建以下文件:

.
├── config.py       # 需要被导入的配置脚本
├── .secrets.toml   # 像密码等敏感信息配置
└── settings.toml   # 应用配置

初始化完成后你就可以编写你的配置,编辑settings.toml:

key = "value"
a_boolean = false
number = 1234
a_float = 56.8
a_list = [1, 2, 3, 4]
a_dict = {hello="world"}

[a_dict.nested]
other_level = "nested value"

然后就可以在你的代码中导入并使用这些配置:

from config import settings

assert settings.key == "value"
assert settings.number == 789
assert settings.a_dict.nested.other_level == "nested value"
assert settings['a_boolean'] is False
assert settings.get("DONTEXIST", default=1) == 1

如果是密码等敏感信息,你可以配置在 .secrets.toml 中:

password = "s3cr3t"
token = "dfgrfg5d4g56ds4gsdf5g74984we5345-"
message = "This file doesn't go to your pub repo"

.secrets.toml 文件会被自动加入到 .gitignore 文件中,这些信息不会被上传到Git仓库上。

同时,DYNACONF还支持带前缀的环境变量:

export DYNACONF_NUMBER=789
export DYNACONF_FOO=false
export DYNACONF_DATA__CAN__BE__NESTED=value
export DYNACONF_FORMATTED_KEY="@format {this.FOO}/BAR"
export DYNACONF_TEMPLATED_KEY="@jinja {{ env['HOME'] | abspath }}"

3.高级使用

你还可以在Flask或Django中使用DynaConf,以Django为例,第一步要先确保已经设置 DJANGO_SETTINGS_MODULE 环境变量:

export DJANGO_SETTINGS_MODULE=yourproject.settings

然后在 manage.py 相同文件夹下运行初始化命令:

dynaconf init -f yaml

然后按照终端上的说明进行操作:

Django app detected
⚙️  Configuring your Dynaconf environment
------------------------------------------
🎛️  settings.yaml created to hold your settings.

🔑 .secrets.yaml created to hold your secrets.

🙈 the .secrets.yaml is also included in `.gitignore`
beware to not push your secrets to a public repo
or use dynaconf builtin support for Vault Servers.

⁉  path/to/yourproject/settings.py is found do you want to add dynaconf? [y/N]:

回答 y:

🎠  Now your Django settings are managed by Dynaconf
🎉  Dynaconf is configured! read more on https://dynaconf.com

在 Django 上,推荐的文件格式是yaml,因为它可以更轻松地保存复杂的数据结构,但是你依然可以选择使用 toml、json、ini 甚至将你的配置保存为 .py 格式。

初始化 dynaconf 后,在现有的settings.py底部包含以下内容:

# HERE STARTS DYNACONF EXTENSION LOAD
import dynaconf  # noqa
settings = dynaconf.DjangoDynaconf(__name__)  # noqa
# HERE ENDS DYNACONF EXTENSION LOAD (No more code below this line)

现在,在你的 Django 视图、模型和所有其他地方,你现在可以正常使用 django.conf.settings,因为它已被 Dynaconf 设置对象替换。

from django.conf import settings


def index(request):
    assert settings.DEBUG is True
    assert settings.NAME == "Bruno"
    assert settings.DATABASES.default.name == "db"
    assert settings.get("NONEXISTENT", 2) == 2

现在,通过修改 manage.py 相同文件夹下的配置文件,就能让配置全局生效了。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

什么格式是保存 Pandas 数据的最好格式?

在数据分析相关项目工作时,我通常使用Jupyter笔记本和pandas库来处理和移动我的数据。对于中等大小的数据集来说,这是一个非常直接的过程,你甚至可以将其存储为纯文本文件而没有太多的开销。

然而,当你的数据集中的观测数据数量较多时,保存和加载数据回内存的过程就会变慢,现在程序的重新启动都会迫使你等待数据重新加载。所以最终,CSV文件或任何其他纯文本格式都会失去吸引力。

我们可以做得更好。有很多二进制格式可以用来将数据存储到磁盘上,其中有很多格式pandas都支持。我们怎么能知道哪一种更适合我们的目的呢?

来吧,我们尝试其中的几个,然后进行对比!这就是我决定在这篇文章中要做的:通过几种方法将 pandas.DataFrame 保存到磁盘上,看看哪一种在I/O速度、内存消耗和磁盘空间方面做的更好。

在这篇文章中,我将展示我的测试结果。

1.要比较的格式

我们将考虑采用以下格式来存储我们的数据:

1. CSV — 数据科学家的一个好朋友
2. Pickle — 一种Python的方式来序列化事物
3. MessagePack — 它就像JSON,但又快又小
4. HDF5 — 一种设计用于存储和组织大量数据的文件格式
5. Feather — 一种快速、轻量级、易于使用的二进制文件格式,用于存储数据框架
6. Parquet — Apache Hadoop的柱状存储格式

所有这些格式都是被广泛使用的,而且(也许除了MessagePack)在你做一些数据分析的事情时非常经常遇到。

为了追求找到最好的缓冲格式来存储程序会话之间的数据,我选择了以下指标进行比较。

1. size_mb – 文件大小(Mb)。
2. save_time – 将数据帧保存到磁盘上所需的时间量。
3. load_time – 将之前转储的数据帧加载到内存中所需要的时间量。
4. save_ram_delta_mb – 数据帧保存过程中最大的内存消耗增长量。
5. load_ram_delta_mb – 数据帧加载过程中的最大内存消耗增长量。

请注意,当我们使用高效压缩的二进制数据格式,如 Parquet 时,最后两个指标变得非常重要。它们可以帮助我们估计加载序列化数据所需的内存量,此外还有数据大小本身。我们将在接下来的章节中更详细地讨论这个问题。

2.测试及结果

我决定使用一个合成数据集进行测试,以便更好地控制序列化的数据结构和属性。

另外,我在我的基准中使用了两种不同的方法:

(a) 将生成的分类变量保留为字符串。

(b) 在执行任何I/O之前将它们转换为 pandas.Categorical 数据类型。

函数generate_dataset显示了我在基准中是如何生成数据集的:

def generate_dataset(n_rows, num_count, cat_count, max_nan=0.1, max_cat_size=100):
    """
    随机生成具有数字和分类特征的数据集。
    
    数字特征取自正态分布X ~ N(0, 1)。
    分类特征则被生成为随机的uuid4字符串。
    
    此外,数字和分类特征的max_nan比例被替换为NaN值。
    """
    dataset, types = {}, {}
    
    def generate_categories():
        from uuid import uuid4
        category_size = np.random.randint(2, max_cat_size)
        return [str(uuid4()) for _ in range(category_size)]
    
    for col in range(num_count):
        name = f'n{col}'
        values = np.random.normal(0, 1, n_rows)
        nan_cnt = np.random.randint(1, int(max_nan*n_rows))
        index = np.random.choice(n_rows, nan_cnt, replace=False)
        values[index] = np.nan
        dataset[name] = values
        types[name] = 'float32'
        
    for col in range(cat_count):
        name = f'c{col}'
        cats = generate_categories()
        values = np.array(np.random.choice(cats, n_rows, replace=True), dtype=object)
        nan_cnt = np.random.randint(1, int(max_nan*n_rows))
        index = np.random.choice(n_rows, nan_cnt, replace=False)
        values[index] = np.nan
        dataset[name] = values
        types[name] = 'object'
    
    return pd.DataFrame(dataset), types

我们将CSV文件的保存和加载性能作为一个基准。

五个随机生成的具有一百万个观测值的数据集被转储到CSV中,并读回内存以获得平均指标。

每种二进制格式都针对20个随机生成的具有相同行数的数据集进行测试。

这些数据集包括15个数字特征和15个分类特征。你可以在这个资源库中找到带有基准测试功能和所需的完整源代码:

https://github.com/devforfu/pandas-formats-benchmark

或在Python实用宝典后台回复 Pandas IO对比 ,下载完整代码。

(a) 数据为字符串特征时的性能

下图显示了每种数据格式的平均I/O时间。一个有趣的观察是,hdf显示出比csv更慢的加载速度,而其他二进制格式的表现明显更好。其中最令人印象深刻的是feather和parquet。

在保存数据和从磁盘上读取数据时,内存开销如何?

下一张图片告诉我们,hdf 的表现就不是那么好了。可以肯定的是,csv在保存/加载纯文本字符串时不需要太多的额外内存,而Feather和parquet则相当接近:

最后,让我们看看文件的大小。这次parquet显示了一个令人印象深刻的结果,考虑到这种格式是为有效存储大量数据而开发的,这并不令人惊讶。

(b) 字符串特征转换为数字时的性能

在上一节中,我们没有尝试有效地存储我们的分类特征而是使用普通的字符串。让我们来弥补这个遗漏吧! 这一次我们使用一个专门的 pandas.Categorical 类型,转字符串特征为数字特征。

看看现在与纯文本的csv相比,它看起来如何!

现在所有的二进制格式都显示出它们的真正力量。Csv的基准结果已经远远落后了,所以让我们把它去掉,以便更清楚地看到各种二进制格式之间的差异:

Feather 和 Pickle 显示了最好的 I/O 速度,而 hdf 仍然显示了明显的性能开销。

现在是时候比较数据进程加载时的内存消耗了。下面的柱状图显示了我们之前提到的关于parquet格式的一个重要事实。

可以看到 parquet 读写时的内存空间差距有多大,你有可能你无法将比较大的 parquet 文件加载到内存中。

最后的图显示了各格式的文件大小。所有的格式都显示出良好的效果,除了hdf仍然需要比其他格式多得多的空间:

3.结论

正如我们的测试所显示的,似乎 feather 格式是存储Python会话数据的理想候选者。它显示了很快的I/O速度,在磁盘上不占用太多内存,并且在加载回RAM时不需要消耗太大的内存。

当然,这种比较并不意味着你应该在每个可能的情况下使用这种格式。例如,feather格式一般不会被用作长期文件存储的格式。

另外,某些特定情况下也无法使用 feather,这由你的整个程序架构决定。然而,就如本帖开头所述的目的,它在不被任何特殊事项限制的情况下是一个很好的选择。

本文译自 towardsdatascience
作者: Ilia Zaitsev
有部分修改。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

7行代码实现早上出门前自动收到分时天气预报

早上出门上班前,我总是忘记查看天气预报,以至于通勤路上下雨来了个措手不及。

回想起来,大部分人早上出门前的行为模式是固定的,那么有没有办法能在我出门前的那一分钟提醒我带伞或者是穿外套?

答案是肯定的,通过上回的钉钉机器人,我们就能实现这个目的。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

如果你没有阅读上一篇关于钉钉机器人的文章,请记得阅读, 有一些前置知识是你必须知道的:

10分钟教你用Python开发钉钉通知机器人

2.请求天气接口

有一个网站服务叫做:wttr.in 提供了非常方便的天气接口,比如:

https://wttr.in/Shenzhen?&lang=cn

效果如下:

我们可以通过这个API,获得全天的天气预报。

它支持很多形式,比如单行输出:

$ curl wttr.in/Nuremberg?format=3
Nuremberg: 🌦 +11⁰C

或者一次处理所有城市的这些查询:

$ curl -s 'wttr.in/{Nuremberg,Hamburg,Berlin}?format=3'
Nuremberg: 🌦 +11⁰C
Hamburg: 🌦 +8⁰C
Berlin: 🌦 +8⁰C

如果你希望让刚刚的未来三天天气预报输出成为图片格式,它也能实现:

curl 'https://wttr.in/Shenzhen.png'

不仅如此,它还支持分时天气预报:

这一张图就是我们要自动通知的天气预报,下面就告诉大家如何把这种图嵌入到钉钉通知中。

3.钉钉通知天气预报

使用我们上一回讲过的钉钉通知机器人,7行代码就能搞定这个需求:

https://github.com/Ckend/dd_notice

7行?没想到吧,基于markdown发送通知就是如此的简单:

import datetime
from notice import Messenger
m = Messenger(
    token="你的token",
    secret="你的secret"
)
m.send_md(f"天气预报-{datetime.datetime.today()}", "![weather](https://v2d.wttr.in/Shenzhen.png)")

将上回的源代码拉下来后,增加这7行代码,你只需要修改你的 token 和 secret 就能发送天气预报。

注意,请求的链接里拿的还是ShenZhen的天气预报,你可以改成自己所在的城市,也可以自定义任何自己喜欢的图表。效果如下:

所有的源代码都已经放在:

https://github.com/Ckend/dd_notice

如果你上不了Github,Python实用宝典公众号后台回复天气预报也能下载完整的通知源代码。

然后为了实现每天的定时发送,你只需要把代码放到服务器上,使用crontab配置定时任务即可:

# 输入 crontab -e 增加下面这一行,每天早上8:00运行通知脚本
0 8 * * * python /data/dd_notice/weather_notice.py

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pytorch+spark 进行大规模预测

虽然深度学习日益盛行,但目前spark还不支持深度学习算法。虽然也有相关库sparktorch能够将spark和pytorch结合起来,但是使用发现并非那么好用,而且此库目前活跃度较低,不方便debug。因此,本地训练深度学习模型并部署到spark中是一种有效的利用深度学习进行大规模预测的方法。

将pytorch模型嵌入部署到spark中进行大规模预测主要包括三步:

1.利用spark进行特征工程预处理,以保证训练集和测试集特征处理一致;

第一二步都比较简单,这里省去。主要对第三步进行说明。

模型分发(broadcast)分两种情况,第一种是简单可通过nn.Sequential定义的模型。对于这种情况可以,模型可以直接用。如下:

# 生成测试数据
from sklearn.datasets import make_classification

X, y = make_classification(n_samples=50000, n_features=100, random_state=0)
df = pd.DataFrame(X)
df['label'] = np.random.randint(2,size=(50000))
df1 = spark.createDataFrame(df)
df1 = df1.withColumn('features', Func.array([col(f"{i}") for i in range(0, 100)])).repartition(1000)

# 创建模型并进行预测
%spark2_1.pyspark
import torch.nn as nn

network = nn.Sequential(
    nn.Linear(100, 2560),
    nn.ReLU(),
    nn.Linear(2560, 2560),
    nn.ReLU(),
    nn.Linear(2560, 2)
    #nn.Softmax(dim=1)
)

class network(nn.Module):
    def __init__(self):
        super(network, self).__init__()
        self.l1 = nn.Linear(100, 2560)
        self.l2 = nn.Linear(2560, 2560)
        self.l3 = nn.Linear(2560, 2)
        
    def forward(self, x):
        x = self.l1(x)
        x = self.l2(x)
        x = self.l3(x)
        return x

net = network()
bc_model_state = spark.sparkContext.broadcast(net.state_dict())

def get_model_for_eval():
  # Broadcast the model state_dict
  net.load_state_dict(bc_model_state.value)
  net.eval()
  return net
  
def one_row_predict(x):
    model = get_model_for_eval()
    t = torch.tensor(x, dtype=torch.float32)
    t = model(t).cpu().detach().numpy()
    #prediction = model(t).cpu().detach().item()
    # return prediction
    return list([float(i) for i in t])

one_row_udf = udf(one_row_predict, ArrayType(FloatType()))
df1 = df1.withColumn('pred_one_row', one_row_udf(col('features')))

在上面我们定义了一个简单模型,然后将其直接分发进行预测(这里省去了模型训练过程)。

但是当我们想使用一个比较复杂的模型来进行预测时(简单来讲就是不能使用 nn.Sequential 改写),使用上面的方法则会报错。

这时候需要将模型写入一个文件中,假设模型文件的路径为/export/models/item2vec.py, 使用pyspark中的addFile对其进行分发,然后import导入模型。

假设我们的模型文件/export/models/item2vec.py如下:

class Item2vec(nn.Module):
    def __init__(self, cv_dict, csr_cols):
        super(Item2vec, self).__init__()
        pass

    def forward(self, x):
        pass

    def predict(self, x):
        pass

假设模型已经训练好,现在要使用训练好的模型进行大规模预测:

from pyspark import SparkFiles
sc.addFile('/export/models/item2vec.py')
import sys
sys.path.append('/export/models/')

from item2vec import Item2vec

# model 表示训练好的模型
bc_model_state = sc.broadcast(model.state_dict())
net = Item2vec(cv_dict, csr_cols)

def get_model_for_eval_demo():
  # Broadcast the model state_dict
  net.load_state_dict(bc_model_state.value)
  net.eval()
  return net

上面的操作已经将模型分发(broadcast)出去,接下来就可以进行预测了。

预测这里介绍两种方式:一种是使用 udf + withColumn, 另一种则是使用 rdd + mapPartitions

由于这里使用的是 pyspark 2.1,还没有pandas udf,因此使用 udf + withColumn 时只能一行一行的预测,运行速度上来说是比不上 rdd + mapPartitions

对于pyspark 2.3以后的版本多了pandas udf后则可以使用batch predict了,具体可以参考

https://docs.databricks.com/static/notebooks/deep-learning/pytorch-images.html

udf + withColumn 的方式
# udf + withColumn 的方式
def one_row_predict_demo(x)
    x = torch.tensor(x, dtype=torch.float)
    _, prob = bc_model.predict(x)

    return round(float(prob[0]), 4)
    
one_row_predict_demo_udf = udf(one_row_predict_demo, DoubleType())

one_row_predict_demo_udf = udf(one_row_predict_demo, DoubleType())
df = demo.withColumn('demo_prob', one_row_predict_demo_udf('features'))
rdd + map 方式
def one_row_predict_map(rdds):
    bc_model = get_model_for_eval_demo()
    for row in rdds:
        x = torch.tensor(row.x, dtype=torch.float)
        _, prob = bc_model.predict(x)
    
        yield (row['id'], round(float(prob[0]), 4))

df = demo.rdd.mapPartitions(one_row_predict_map).toDF(['id', 'pred_prob'])

2. 效率优化(1)——mapPartition

上面的方法已经可以使得我们将训练好的深度学习模型部署到spark进行大规模预测了,但是其速度是非常慢的。通过在 mapPartitions 中进行一些处理,我们可以对预测进行加速:

# 代码源自 https://github.com/SaeedNajafi/infer-pytorch-pyspark

def basic_row_handler(row):
    return row

def predict_map(index, partition, ml_task,
                batch_size=16,
                row_preprocessor=basic_row_handler,
                row_postprocessor=basic_row_handler):

    # local model loading within each executor
    model = LocalPredictor(ml_task=ml_task, batch_size=batch_size,
                           partition_index=index)

    batch = []
    count = 0
    for row in partition:
        row_dict = row.asDict()
        # apply preprocessor on each row.
        row_dict_prep = row_preprocessor(row_dict)
        batch.append(row_dict_prep)
        count += 1
        if count == batch_size:
            # predict the ml and apply the postprocessor.
            for ret_row in model.predict(batch):  # ml prediction
                ret_row_post = row_postprocessor(ret_row)
                if ret_row_post is not None:
                    yield Row(**ret_row_post)

            batch = []
            count = 0

    # Flush remaining rows in the batches.
    if count != 0:
        for ret_row in model.predict(batch):  # ml prediction
            ret_row_post = row_postprocessor(ret_row)
            if ret_row_post is not None:
                yield Row(**ret_row_post)

        batch = []
        count = 0

上面的代码可以看作是在mapPartitions中进行了“延迟”预测——即先将一个partition中的多行数据进行处理然后合并为一个batch进行一起预测,这样能大大的提升运行效率。一个比较极端的情况是每个partition仅进行一次预测。

3. 效率优化(2)——pandas_udf

pandas_udf在udf的基础上进行了进一步的优化,利用pandas_udf程序运行效率更高。在这里我们可以借助于pandas_udf提升我们程序的运行效率:

# Enable Arrow support.
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "64")

sc.addFile('get_model.py')
from get_model import get_model

model_path = '/path/to/model.pt'
data_path = '/path/to/data'

# model 表示训练好的模型
model = torch.load(model_path)
bc_model_state = sc.broadcast(model.state_dict())


def get_model_for_eval():
  # Broadcast the model state_dict  
  model = get_model()
  model.load_state_dict(bc_model_state.value)
  model.eval()
  return model

# model = torch.load(model_path)
# model = sc.broadcast(model)


@pandas_udf(FloatType())
def predict_batch_udf(arr: pd.Series) -> pd.Series:
    model = get_model_for_eval()
    # model.to(device)
    arr = np.vstack(arr.map(lambda x: eval(x)).values)
    arr = torch.tensor(arr).long()
    with torch.no_grad():
        predictions = list(model(arr).cpu().numpy())
            
    return pd.Series(predictions)

# 预测
data = data.withColumn('predictions', predict_batch_udf('features'))

作者:井底蛙蛙呱呱呱
链接:https://www.jianshu.com/p/fc60c967c8b8

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

量化投资单因子回测神器 — Alphalens

还记得我们在前面采用的回测工具Backtrader吗?Backtrader是一款非常灵活的回测工具,基于它你能回测任何你想要测试的idea.

但是针对单因子回测,Backtrader 开发回测代码以及生成报告上并不算很方便,我们需要自己编写买卖逻辑,在生成的报告上也没有IC、IR、回撤等的数据分析,而实际上,从单因子回测的技术实现角度上来说,这些都是可以自动化生成的。

Alphalens就是一个专门实现单因子自动回测的神器,我们只要给它输入因子值的列,还有每支股票收盘价的数据,它就能自动生成数据分析及报告,并带有十几张可视化的报告数据统计图:

下面就带大家入门使用一下Alphalens,如果对你有帮助的话,记得点一下赞/在看哦。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install alphalens
pip install tushare
pip install pandas

2.数据预处理

正如前面所说,我们只需要把因子值和收盘价放入Alphalens中,就能自动生成回测和报告结果。

所以,我们90%的工作都会在数据处理这一部分,回测和分析都是抽象封装好的,并不需要太多地去担心它。

为了测试,我们导入tushare的数据进行测试:

import pandas as pd
import tushare as ts
from alphalens.utils import get_clean_factor_and_forward_returns
from alphalens.tears import create_full_tear_sheet

pro = ts.pro_api()
# 此接口获取的数据为未复权数据,回测建议使用复权数据,这里为批量获取股票数据做了简化
df = pro.daily(ts_code='000001.SZ,600982.SH', start_date='20200101', end_date='20211122')
df.index = pd.to_datetime(df['trade_date'])
df.index.name = None
df.sort_index(inplace=True)

这里获取了000001.SZ,600982.SH两只股票在2020-01-01到2021-11-22的日线数据,将交易日期设为了索引并排序。效果如下:

然后需要设置多索引的因子列 assets,第一个索引为日期,第二个索引为股票代码:

# 多索引的因子列,第一个索引为日期,第二个索引为股票代码
assets = df.set_index([df.index, df['ts_code']], drop=True)

​效果如下,仔细观察的话能发现其与导入的数据只有索引的不同:

然后,设置收盘价的Dataframe,这个与因子数据的格式不同,索引是时间,每一列是每只股票对应的收盘价:

# column为股票代码,index为日期,值为收盘价
close = df.pivot_table(index='trade_date', columns='ts_code', values='close')
close.index = pd.to_datetime(close.index)

到这一步,我们的初始化工作就完成了,下面就放到 Alphalens 进行测试。

3.Alphalens回测及报告

使用Alphalens进行回测,是非常轻松而写意的,只需要导入包,给它传递因子数据和收盘价数据即可:

from alphalens.utils import get_clean_factor_and_forward_returns
from alphalens.tears import create_full_tear_sheet

ret = get_clean_factor_and_forward_returns(assets[['pct_chg']], close)
create_full_tear_sheet(ret, long_short=False)

get_clean_factor_and_forward_returns 接受的第一个参数就是因子的列,我们只需要从前面预处理好的 assets 中任取一列作为因子进行回测即可,第二列是收盘价。

值得注意的是,因子数据在回测的时候,注意不要使用到未来数据,因为我们是用前一天的数据预测下一天的收盘价,所以要对因子列进行移位处理,这点一定要注意。

运行程序,就能生成如下的报告:

还有一点需要提醒大家的是,开源Alphalens的Quantopian公司已经倒闭,所以项目暂时没人维护了,部分代码没有适配最新的依赖,所以可能会有问题,比如下面的:

原本是通过 .get_values() 获得 input_periods, 但是 get_values 在 pandas 0.25.0 中已经被弃用,最新的pandas版本这里需要改成 .to_numpy() 才能生效。

除了这个小缺点,Alphalens整体上是非常符合大家单因子测试的需求的。它的分析报告可能没有那么齐全,我们也可以考虑在Alphalens的基础上增加其他的分析内容,如果能开源出来则更好了。

考虑到后续Alphalens没人维护,我fork了Alphalens,并增加了自己的改动,希望有余力的同学也能来一起贡献代码:
https://github.com/Ckend/alphalens

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典