Pytorch+spark 进行大规模预测

虽然深度学习日益盛行,但目前spark还不支持深度学习算法。虽然也有相关库sparktorch能够将spark和pytorch结合起来,但是使用发现并非那么好用,而且此库目前活跃度较低,不方便debug。因此,本地训练深度学习模型并部署到spark中是一种有效的利用深度学习进行大规模预测的方法。

将pytorch模型嵌入部署到spark中进行大规模预测主要包括三步:

1.利用spark进行特征工程预处理,以保证训练集和测试集特征处理一致;

第一二步都比较简单,这里省去。主要对第三步进行说明。

模型分发(broadcast)分两种情况,第一种是简单可通过nn.Sequential定义的模型。对于这种情况可以,模型可以直接用。如下:

# 生成测试数据
from sklearn.datasets import make_classification

X, y = make_classification(n_samples=50000, n_features=100, random_state=0)
df = pd.DataFrame(X)
df['label'] = np.random.randint(2,size=(50000))
df1 = spark.createDataFrame(df)
df1 = df1.withColumn('features', Func.array([col(f"{i}") for i in range(0, 100)])).repartition(1000)

# 创建模型并进行预测
%spark2_1.pyspark
import torch.nn as nn

network = nn.Sequential(
    nn.Linear(100, 2560),
    nn.ReLU(),
    nn.Linear(2560, 2560),
    nn.ReLU(),
    nn.Linear(2560, 2)
    #nn.Softmax(dim=1)
)

class network(nn.Module):
    def __init__(self):
        super(network, self).__init__()
        self.l1 = nn.Linear(100, 2560)
        self.l2 = nn.Linear(2560, 2560)
        self.l3 = nn.Linear(2560, 2)
        
    def forward(self, x):
        x = self.l1(x)
        x = self.l2(x)
        x = self.l3(x)
        return x

net = network()
bc_model_state = spark.sparkContext.broadcast(net.state_dict())

def get_model_for_eval():
  # Broadcast the model state_dict
  net.load_state_dict(bc_model_state.value)
  net.eval()
  return net
  
def one_row_predict(x):
    model = get_model_for_eval()
    t = torch.tensor(x, dtype=torch.float32)
    t = model(t).cpu().detach().numpy()
    #prediction = model(t).cpu().detach().item()
    # return prediction
    return list([float(i) for i in t])

one_row_udf = udf(one_row_predict, ArrayType(FloatType()))
df1 = df1.withColumn('pred_one_row', one_row_udf(col('features')))

在上面我们定义了一个简单模型,然后将其直接分发进行预测(这里省去了模型训练过程)。

但是当我们想使用一个比较复杂的模型来进行预测时(简单来讲就是不能使用 nn.Sequential 改写),使用上面的方法则会报错。

这时候需要将模型写入一个文件中,假设模型文件的路径为/export/models/item2vec.py, 使用pyspark中的addFile对其进行分发,然后import导入模型。

假设我们的模型文件/export/models/item2vec.py如下:

class Item2vec(nn.Module):
    def __init__(self, cv_dict, csr_cols):
        super(Item2vec, self).__init__()
        pass

    def forward(self, x):
        pass

    def predict(self, x):
        pass

假设模型已经训练好,现在要使用训练好的模型进行大规模预测:

from pyspark import SparkFiles
sc.addFile('/export/models/item2vec.py')
import sys
sys.path.append('/export/models/')

from item2vec import Item2vec

# model 表示训练好的模型
bc_model_state = sc.broadcast(model.state_dict())
net = Item2vec(cv_dict, csr_cols)

def get_model_for_eval_demo():
  # Broadcast the model state_dict
  net.load_state_dict(bc_model_state.value)
  net.eval()
  return net

上面的操作已经将模型分发(broadcast)出去,接下来就可以进行预测了。

预测这里介绍两种方式:一种是使用 udf + withColumn, 另一种则是使用 rdd + mapPartitions

由于这里使用的是 pyspark 2.1,还没有pandas udf,因此使用 udf + withColumn 时只能一行一行的预测,运行速度上来说是比不上 rdd + mapPartitions

对于pyspark 2.3以后的版本多了pandas udf后则可以使用batch predict了,具体可以参考

https://docs.databricks.com/static/notebooks/deep-learning/pytorch-images.html

udf + withColumn 的方式
# udf + withColumn 的方式
def one_row_predict_demo(x)
    x = torch.tensor(x, dtype=torch.float)
    _, prob = bc_model.predict(x)

    return round(float(prob[0]), 4)
    
one_row_predict_demo_udf = udf(one_row_predict_demo, DoubleType())

one_row_predict_demo_udf = udf(one_row_predict_demo, DoubleType())
df = demo.withColumn('demo_prob', one_row_predict_demo_udf('features'))
rdd + map 方式
def one_row_predict_map(rdds):
    bc_model = get_model_for_eval_demo()
    for row in rdds:
        x = torch.tensor(row.x, dtype=torch.float)
        _, prob = bc_model.predict(x)
    
        yield (row['id'], round(float(prob[0]), 4))

df = demo.rdd.mapPartitions(one_row_predict_map).toDF(['id', 'pred_prob'])

2. 效率优化(1)——mapPartition

上面的方法已经可以使得我们将训练好的深度学习模型部署到spark进行大规模预测了,但是其速度是非常慢的。通过在 mapPartitions 中进行一些处理,我们可以对预测进行加速:

# 代码源自 https://github.com/SaeedNajafi/infer-pytorch-pyspark

def basic_row_handler(row):
    return row

def predict_map(index, partition, ml_task,
                batch_size=16,
                row_preprocessor=basic_row_handler,
                row_postprocessor=basic_row_handler):

    # local model loading within each executor
    model = LocalPredictor(ml_task=ml_task, batch_size=batch_size,
                           partition_index=index)

    batch = []
    count = 0
    for row in partition:
        row_dict = row.asDict()
        # apply preprocessor on each row.
        row_dict_prep = row_preprocessor(row_dict)
        batch.append(row_dict_prep)
        count += 1
        if count == batch_size:
            # predict the ml and apply the postprocessor.
            for ret_row in model.predict(batch):  # ml prediction
                ret_row_post = row_postprocessor(ret_row)
                if ret_row_post is not None:
                    yield Row(**ret_row_post)

            batch = []
            count = 0

    # Flush remaining rows in the batches.
    if count != 0:
        for ret_row in model.predict(batch):  # ml prediction
            ret_row_post = row_postprocessor(ret_row)
            if ret_row_post is not None:
                yield Row(**ret_row_post)

        batch = []
        count = 0

上面的代码可以看作是在mapPartitions中进行了“延迟”预测——即先将一个partition中的多行数据进行处理然后合并为一个batch进行一起预测,这样能大大的提升运行效率。一个比较极端的情况是每个partition仅进行一次预测。

3. 效率优化(2)——pandas_udf

pandas_udf在udf的基础上进行了进一步的优化,利用pandas_udf程序运行效率更高。在这里我们可以借助于pandas_udf提升我们程序的运行效率:

# Enable Arrow support.
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "64")

sc.addFile('get_model.py')
from get_model import get_model

model_path = '/path/to/model.pt'
data_path = '/path/to/data'

# model 表示训练好的模型
model = torch.load(model_path)
bc_model_state = sc.broadcast(model.state_dict())


def get_model_for_eval():
  # Broadcast the model state_dict  
  model = get_model()
  model.load_state_dict(bc_model_state.value)
  model.eval()
  return model

# model = torch.load(model_path)
# model = sc.broadcast(model)


@pandas_udf(FloatType())
def predict_batch_udf(arr: pd.Series) -> pd.Series:
    model = get_model_for_eval()
    # model.to(device)
    arr = np.vstack(arr.map(lambda x: eval(x)).values)
    arr = torch.tensor(arr).long()
    with torch.no_grad():
        predictions = list(model(arr).cpu().numpy())
            
    return pd.Series(predictions)

# 预测
data = data.withColumn('predictions', predict_batch_udf('features'))

作者:井底蛙蛙呱呱呱
链接:https://www.jianshu.com/p/fc60c967c8b8

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

量化投资单因子回测神器 — Alphalens

还记得我们在前面采用的回测工具Backtrader吗?Backtrader是一款非常灵活的回测工具,基于它你能回测任何你想要测试的idea.

但是针对单因子回测,Backtrader 开发回测代码以及生成报告上并不算很方便,我们需要自己编写买卖逻辑,在生成的报告上也没有IC、IR、回撤等的数据分析,而实际上,从单因子回测的技术实现角度上来说,这些都是可以自动化生成的。

Alphalens就是一个专门实现单因子自动回测的神器,我们只要给它输入因子值的列,还有每支股票收盘价的数据,它就能自动生成数据分析及报告,并带有十几张可视化的报告数据统计图:

下面就带大家入门使用一下Alphalens,如果对你有帮助的话,记得点一下赞/在看哦。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install alphalens
pip install tushare
pip install pandas

2.数据预处理

正如前面所说,我们只需要把因子值和收盘价放入Alphalens中,就能自动生成回测和报告结果。

所以,我们90%的工作都会在数据处理这一部分,回测和分析都是抽象封装好的,并不需要太多地去担心它。

为了测试,我们导入tushare的数据进行测试:

import pandas as pd
import tushare as ts
from alphalens.utils import get_clean_factor_and_forward_returns
from alphalens.tears import create_full_tear_sheet

pro = ts.pro_api()
# 此接口获取的数据为未复权数据,回测建议使用复权数据,这里为批量获取股票数据做了简化
df = pro.daily(ts_code='000001.SZ,600982.SH', start_date='20200101', end_date='20211122')
df.index = pd.to_datetime(df['trade_date'])
df.index.name = None
df.sort_index(inplace=True)

这里获取了000001.SZ,600982.SH两只股票在2020-01-01到2021-11-22的日线数据,将交易日期设为了索引并排序。效果如下:

然后需要设置多索引的因子列 assets,第一个索引为日期,第二个索引为股票代码:

# 多索引的因子列,第一个索引为日期,第二个索引为股票代码
assets = df.set_index([df.index, df['ts_code']], drop=True)

​效果如下,仔细观察的话能发现其与导入的数据只有索引的不同:

然后,设置收盘价的Dataframe,这个与因子数据的格式不同,索引是时间,每一列是每只股票对应的收盘价:

# column为股票代码,index为日期,值为收盘价
close = df.pivot_table(index='trade_date', columns='ts_code', values='close')
close.index = pd.to_datetime(close.index)

到这一步,我们的初始化工作就完成了,下面就放到 Alphalens 进行测试。

3.Alphalens回测及报告

使用Alphalens进行回测,是非常轻松而写意的,只需要导入包,给它传递因子数据和收盘价数据即可:

from alphalens.utils import get_clean_factor_and_forward_returns
from alphalens.tears import create_full_tear_sheet

# 我们是使用pct_chg因子数据预测收盘价,因此需要偏移1天,但是这里有2只股票,所以是shift(2)
ret = get_clean_factor_and_forward_returns(assets[['pct_chg']].shift(2), close)
create_full_tear_sheet(ret, long_short=False)

get_clean_factor_and_forward_returns 接受的第一个参数就是因子的列,我们只需要从前面预处理好的 assets 中任取一列作为因子进行回测即可,第二列是收盘价。

值得注意的是,因子数据在回测的时候,注意不要使用到未来数据,因为我们是用前一天的数据预测下一天的收盘价,所以要对因子列进行移位处理,这点一定要注意。

运行程序,就能生成如下的报告:

还有一点需要提醒大家的是,开源Alphalens的Quantopian公司已经倒闭,所以项目暂时没人维护了,部分代码没有适配最新的依赖,所以可能会有问题,比如下面的:

原本是通过 .get_values() 获得 input_periods, 但是 get_values 在 pandas 0.25.0 中已经被弃用,最新的pandas版本这里需要改成 .to_numpy() 才能生效。

除了这个小缺点,Alphalens整体上是非常符合大家单因子测试的需求的。它的分析报告可能没有那么齐全,我们也可以考虑在Alphalens的基础上增加其他的分析内容,如果能开源出来则更好了。

考虑到后续Alphalens没人维护,我fork了Alphalens,并增加了自己的改动,希望有余力的同学也能来一起贡献代码:
https://github.com/Ckend/alphalens

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python怎样存储变量性能最优?这篇文章告诉你答案

工作时我们经常会遇到需要临时保存结果变量的场景,尤其是一些数据处理、模型开发的场景,加载处理速度是个很漫长的过程,于是经常会把这些变量储存起来。

而储存变量最常见、最普遍的方法是用pickle,保存为pkl文件。但是如果从写入和读取的性能角度考虑,pkl可能真的不是最优选。

Pickle有其独特的好处,大部分变量不需要进行处理,都能直接存到pkl文件里,但这样的方便其实是牺牲了部分性能取得的。与之相比,numpy的.npy格式就比pickle性能上快不少。

当然,我们需要有证据支撑这个观点。所以今天我们就来做个实验,分别在Python2和Python3中对比 numpy 和 pickle 两种存储格式(.npy, .pkl) 对数据的存储和读取的性能对比。

部分内容参考分析自: https://applenob.github.io/python/save/

1. Python2中, npy与pkl的性能对比

首先初始化数据:

import numpy as np
import time
import cPickle as pkl
import os

all_batches = []
for i in range(20):
    a1 = np.random.normal(size=[25600, 40])
    label = np.random.normal(size=[25600, 1])
    all_batch = np.concatenate([a1, label], 1)
    all_batches.append(all_batch)
all_batches = np.array(all_batches)
print(all_batches.shape)
# (20, 25600, 41)

然后测试使用pickle保存和读取时间的耗时,以及整个文件的大小:

s_t1 = time.time()
pkl_name = "a.pkl"
with open(pkl_name, "wb") as f:
    pkl.dump(all_batches, f)
pkl_in_time = time.time() - s_t1
print("pkl dump costs {} sec".format(pkl_in_time))

s_t2 = time.time()
with open(pkl_name, "rb") as f:
    new_a = pkl.load(f)
pkl_out_time = time.time() - s_t2
print("pkl load costs {} sec".format(pkl_out_time))

pkl_size = os.path.getsize(pkl_name)
print("pkl file size: {} byte, {} mb".format(pkl_size, float(pkl_size)/(1024*1024)))

结果如下:

pkl dump costs 67.7483091354 sec
pkl load costs 52.1168899536 sec
pkl file size: 497437110 byte, 474.392995834 mb

然后再试一下npy的写入和读取:

s_t3 = time.time()
npy_name = "a.npy"
with open(npy_name, "wb") as f:
    np.save(f, arr=all_batches)
npy_in_time = time.time() - s_t3
print("npy save costs {} sec".format(npy_in_time))
s_t4 = time.time()
with open(npy_name, "rb") as f:
    new_a = np.load(f)
npy_out_time = time.time() - s_t4
print("npy load costs {} sec".format(npy_out_time))
npy_size = os.path.getsize(npy_name)
print("npy file size: {} byte, {} mb".format(npy_size, float(npy_size) / (1024 * 1024)))

结果如下:

npy save costs 20.718367815 sec
npy load costs 0.62314915657 sec
npy file size: 167936128 byte, 160.15637207 mb

结果发现,npy性能明显优于pkl格式。

通过多次测试发现,在Python2中,npy格式的性能优势全面碾压pkl,工程允许的情况下,在Python2中,我们应该在这二者中毫不犹豫地选择npy.

2.Python3中, npy与pkl的性能对比

Python2已经是过去式,重点还要看Python3.

在Python3中,与Python2的代码唯一一句不一样的是pickle的引入:

# Python2:
import cPickle as pkl

# Python3:
import pickle as pkl

其他代码基本一样,替换代码后,重新运行程序,让我们看看在Python3上,npy格式和pkl格式性能上的区别:

首先是pkl格式的表现:

ckenddeMacBook-Pro:Documents ckend$ python 1.py 
(20, 25600, 41)
pkl dump costs 24.32167887687683 sec
pkl load costs 4.480823040008545 sec
pkl file size: 167936163 byte, 160.15640544891357 mb

然后是npy格式的表现:

npy save costs 22.471696853637695 sec
npy load costs 0.3791017532348633 sec
npy file size: 167936080 byte, 160.1563262939453 mb

可以看到在Python3中pkl格式和npy格式的存储大小是基本相同的,在存储耗时上也相差无几。但是在读取数据的时候,npy相对于pkl还是有一定的优势的。

因此,如果你的程序非常注重读取效率,那么我觉得npy格式会比pkl格式更适合你。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandarallel 一个能让你的Pandas计算火力拉满的工具

没有使用Pandarallel
使用了Pandarallel

众所周知,由于GIL的存在,Python单进程中的所有操作都是在一个CPU核上进行的,所以为了提高运行速度,我们一般会采用多进程的方式。而多进程无非就是以下几种方案:

  • 1、multiprocessing
  • 2、concurrent.futures.ProcessPoolExecutor()
  • 3、joblib
  • 4、ppserver
  • 5、celery

这些方案对于普通Pandas玩家来说都不是特别友好,怎样才能算作一个友好的并行处理方案?就是原来的逻辑我基本不用变,仅修改需要计算的那行就能完成我们目标的方案,而 pandarallel 就是一个这样友好的工具。

没有并行计算(原始pandas) pandarallel
df.apply(func)df.parallel_apply(func)
df.applymap(func)df.parallel_applymap(func)
df.groupby(args).apply(func)df.groupby(args).parallel_apply(func)
df.groupby(args1).col_name.rolling(args2).apply(func)df.groupby(args1).col_name.rolling(args2).parallel_apply(func)
df.groupby(args1).col_name.expanding(args2).apply(func)df.groupby(args1).col_name.expanding(args2).parallel_apply(func)
series.map(func)series.parallel_map(func)
series.apply(func)series.parallel_apply(func)
series.rolling(args).apply(func)series.rolling(args).parallel_apply(func)

可以看到,在 pandarallel 的世界里,你只需要替换原有的 pandas 处理语句就能实现多CPU并行计算。非常方便、非常nice.

在4核CPU的性能测试上,它比原始语句快了接近4倍。测试条件(OS: Linux Ubuntu 16.04,Hardware: Intel Core i7 @ 3.40 GHz – 4 cores),这就是我所说的,它把CPU充分利用了起来。

下面就给大家介绍这个模块怎么用,其实非常简单,任何代码只需要加几行代码就能实现质的飞跃。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install pandarallel

对于windows用户,有一个不好的消息是,它只能在Windows的linux子系统上运行(WSL),你可以在微软官网上找到安装教程:

https://docs.microsoft.com/zh-cn/windows/wsl/about

2.使用Pandarallel

使用前,需要对Pandarallel进行初始化:

from pandarallel import pandarallel
pandarallel.initialize()

这样才能调用并行计算的API,不过 initialize 中有一个重要参数需要说明,那就是 nb_workers ,它将指定并行计算的Worker数,如果没有设置,所有CPU的核都会用上。

Pandarallel一共支持8种Pandas操作,下面是一个apply方法的例子。

import pandas as pd
import time
import math
import numpy as np
from pandarallel import pandarallel

# 初始化
pandarallel.initialize()
df_size = int(5e6)
df = pd.DataFrame(dict(a=np.random.randint(1, 8, df_size),
                       b=np.random.rand(df_size)))
def func(x):
    return math.sin(x.a**2) + math.sin(x.b**2)

# 正常处理
res = df.apply(func, axis=1)

# 并行处理
res_parallel = df.parallel_apply(func, axis=1)

# 查看结果是否相同
res.equals(res_parallel)

其他方法使用上也是类似的,在原始的函数名称前加上 parallel_。比如DataFrame.groupby.apply:

import pandas as pd
import time
import math
import numpy as np
from pandarallel import pandarallel

# 初始化
pandarallel.initialize()
df_size = int(3e7)
df = pd.DataFrame(dict(a=np.random.randint(1, 1000, df_size),
                       b=np.random.rand(df_size)))
def func(df):
    dum = 0
    for item in df.b:
        dum += math.log10(math.sqrt(math.exp(item**2)))
        
    return dum / len(df.b)

# 正常处理
res = df.groupby("a").apply(func)
# 并行处理
res_parallel = df.groupby("a").parallel_apply(func)
res.equals(res_parallel)

又比如 DataFrame.groupby.rolling.apply:

import pandas as pd
import time
import math
import numpy as np
from pandarallel import pandarallel

# 初始化
pandarallel.initialize()
df_size = int(1e6)
df = pd.DataFrame(dict(a=np.random.randint(1, 300, df_size),
                       b=np.random.rand(df_size)))
def func(x):
    return x.iloc[0] + x.iloc[1] ** 2 + x.iloc[2] ** 3 + x.iloc[3] ** 4

# 正常处理
res = df.groupby('a').b.rolling(4).apply(func, raw=False)
# 并行处理
res_parallel = df.groupby('a').b.rolling(4).parallel_apply(func, raw=False)
res.equals(res_parallel)

案例都是类似的,这里就直接列出表格,不浪费大家宝贵的时间去阅读一些重复的例子了:

没有并行计算(原始pandas) pandarallel
df.apply(func)df.parallel_apply(func)
df.applymap(func)df.parallel_applymap(func)
df.groupby(args).apply(func)df.groupby(args).parallel_apply(func)
df.groupby(args1).col_name.rolling(args2).apply(func)df.groupby(args1).col_name.rolling(args2).parallel_apply(func)
df.groupby(args1).col_name.expanding(args2).apply(func)df.groupby(args1).col_name.expanding(args2).parallel_apply(func)
series.map(func)series.parallel_map(func)
series.apply(func)series.parallel_apply(func)
series.rolling(args).apply(func)series.rolling(args).parallel_apply(func)

3.注意事项

1. 我有 8 个 CPU,但 parallel_apply 只能加快大约4倍的计算速度。为什么?

答:正如我前面所言,Python中每个进程占用一个核,Pandarallel 最多只能加快到你所拥有的核心的总数,一个 4 核的超线程 CPU 将向操作系统显示 8 个 CPU,但实际上只有 4 个核心,因此最多加快4倍。

2. 并行化是有成本的(实例化新进程,通过共享内存发送数据,…),所以只有当并行化的计算量足够大时,并行化才是有意义的。对于很少量的数据,使用 Pandarallel 并不总是值得的。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

10分钟教你用Python开发钉钉通知机器人

在项目协同工作或自动化流程完成时,我们需要用一定的手段通知自己或他人。比如说,当服务器CPU使用率达到90%,发送告警信息给多名项目成员、或是股票自动化交易成交时发送通知给自己等应用场景。通知的手段有很多,使用邮件、Telegram都可以实现,但是它们都有各自的缺点。

邮件通知的方式存在滞后性,而且容易覆盖掉一些重要的邮件,整理起来非常繁琐。Telegram 非常好用,几个步骤就能创建一个机器人,可惜在国内无法使用,需要添加代理才能使用。

不过,前几天发现钉钉的机器人其实和Telegram的相差无几,用起来也相当舒服,因此今天给大家带来一个开发钉钉通知机器人的教程,非常简单,门槛极低,任何人都能用,每个人都能学会。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Python 环境准备完成后,我们就可以来创建一个钉钉机器人了。

1.1 打开钉钉软件,选择 ““, 再点击右上角+号,选择建场景群

1.2 这里可以选择任意一种群,我选择了培训群

1.3 群新建好后,点击右上角的齿轮—群设置,点击智能群助手。这里你也可以修改群的名字,点击名字右边的铅笔就能修改群名。

1.4 点击添加机器人

1.5 点击右上角的+号

1.6 选择自定义机器人,它能让我们通过Webhook接入自定义服务

1.7 然后输入机器人名字,安全设置选择加签,这一字符串你需要拷贝下来,发通知的时候就是我们的SECRET KEY.

1.8 点击完成后,会弹出创建成功的框框,请把这串webhook的链接拷贝下来,并将access_token参数复制下来,这一串 access_token 我们发送消息的时候也需要用到。

机器人创建完毕后,会在群聊中出现,然后我们就可以开始编写通知代码了。

2.Python 钉钉机器人通知代码

我们通过往 https://oapi.dingtalk.com/robot/send 地址发送 POST 请求的方式就能够利用钉钉自定义机器人发送消息。钉钉机器人支持两种消息内容:

  1. 纯文本信息
  2. Markdown信息

简单来讲,如果你的消息只有文本内容,就用第一种。如果你的消息内含图片和自定义格式,就用第二种。

纯文本消息,你的内容需要包含以下3种参数,并带2个内容体:

参数列表:

  1. access_token: 创建成功后返回的webhook链接里就有这个参数。
  2. sign: 就是我们选择加签安全设置中返回的SECRET.
  3. timestamp: 当前时间戳。

内容体包含:

  1. msgtype: 消息内容 text/markdown
  2. text: 文本内容

代码如下,非常简单:

# Python实用宝典
# 2021/11/13
import json
import hashlib
import base64
import hmac
import os
import time
import requests
from urllib.parse import quote_plus


class Messenger:
    def __init__(self, token=os.getenv("DD_ACCESS_TOKEN"), secret=os.getenv("DD_SECRET")):
        self.timestamp = str(round(time.time() * 1000))
        self.URL = "https://oapi.dingtalk.com/robot/send"
        self.headers = {'Content-Type': 'application/json'}
        secret = secret
        secret_enc = secret.encode('utf-8')
        string_to_sign = '{}\n{}'.format(self.timestamp, secret)
        string_to_sign_enc = string_to_sign.encode('utf-8')
        hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
        self.sign = quote_plus(base64.b64encode(hmac_code))
        self.params = {'access_token': token, "sign": self.sign}

    def send_text(self, content):
        """
        发送文本
        @param content: str, 文本内容
        """
        data = {"msgtype": "text", "text": {"content": content}}
        self.params["timestamp"] = self.timestamp
        return requests.post(
            url=self.URL,
            data=json.dumps(data),
            params=self.params,
            headers=self.headers
        )

使用的时候,请注意token和secret你既可以通过环境变量配置(DD_ACCESS_TOKEN和DD_SECRET),也可以直接传入给Messenger:

if __name__ == "__main__":
    m = Messenger(
        token="你的token",
        secret="你的secret"
    )
    m.send_text("测试一下,今天天气不错")

然后运行这个脚本,就能获取消息通知:

如果你只需要文本通知,那么到这里就已经实现了,如果你还需要发送图文消息或更多自定义内容体,请看下一节内容。

3.钉钉机器人支持Markdown

为了支持发送图片消息和自定义的文字格式,我们需要配置更多的参数:

    def send_md(self, title, content):
        """
        发送Markdown文本
        @param title: str, 标题
        @param content: str, 文本内容
        """
        data = {"msgtype": "markdown", "markdown": {"title": title, "text": content}}
        self.params["timestamp"] = self.timestamp
        return requests.post(
            url=self.URL,
            data=json.dumps(data),
            params=self.params,
            headers=self.headers
        )

msgtype改为markdown,并配置markdown的参数,包括:

  1. title: 标题
  2. content: markdown内容

这样,就能支持发送markdown消息了,我们试一下:

# Python实用宝典
# 2021/11/13
import json
import hashlib
import base64
import hmac
import os
import time
import requests
from urllib.parse import quote_plus


class Messenger:
    def __init__(self, token=os.getenv("DD_ACCESS_TOKEN"), secret=os.getenv("DD_SECRET")):
        self.timestamp = str(round(time.time() * 1000))
        self.URL = "https://oapi.dingtalk.com/robot/send"
        self.headers = {'Content-Type': 'application/json'}
        secret = secret
        secret_enc = secret.encode('utf-8')
        string_to_sign = '{}\n{}'.format(self.timestamp, secret)
        string_to_sign_enc = string_to_sign.encode('utf-8')
        hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
        self.sign = quote_plus(base64.b64encode(hmac_code))
        self.params = {'access_token': token, "sign": self.sign}

    def send_text(self, content):
        """
        发送文本
        @param content: str, 文本内容
        """
        data = {"msgtype": "text", "text": {"content": content}}
        self.params["timestamp"] = self.timestamp
        return requests.post(
            url=self.URL,
            data=json.dumps(data),
            params=self.params,
            headers=self.headers
        )

    def send_md(self, title, content):
        """
        发送Markdown文本
        @param title: str, 标题
        @param content: str, 文本内容
        """
        data = {"msgtype": "markdown", "markdown": {"title": title, "text": content}}
        self.params["timestamp"] = self.timestamp
        return requests.post(
            url=self.URL,
            data=json.dumps(data),
            params=self.params,
            headers=self.headers
        )


if __name__ == "__main__":
    markdown_text = "\n".join(open("md_test.md", encoding="utf-8").readlines())
    m = Messenger(
        token="你的token",
        secret="你的secret"
    )
    m.send_text("测试一下,今天天气不错")
    m.send_md("测试Markdown", markdown_text)

效果如下:

效果还是不错的,速度也非常快,一运行脚本,马上就能收到通知消息。大家可以在Python实用宝典公众号后台回复 钉钉 下载本文源代码,也可以在 https://github.com/Ckend/dd_notice 中找到源代码。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

微软开源!世界首个AI 量化投资平台 Qlib 使用教程

2020年9月,微软开源了AI量化投资平台Qlib的源代码,随后得到了不少的关注,Qlib的主要优势在于:

1.Python覆盖量化投资全过程,用户无需切换语言;​内置许多深度学习算法模型,降低AI算法使用的门槛。

2.内置A股、美股数据接入通道,基于qrun能够自动运行整个工作流程,大大提高开发效率。

3.每个组件都是松耦合可以独立使用,用户能够自行选用某些组件。

Qlib相比于我们之前介绍的backtrader,那功能完善太多。backtrader相当于给你提供一个基本的量化框架,数据、策略、算法,你全部自己搞定。而Qlib则从数据、到策略、到算法都给了你全套的解决方案,你只需要加一点自己的想法,不需要管其他细枝末节的东西就能完成AI量化研究,非常方便。

下面我们就来试一下 Qlib 的安装和运行内置算法策略。

1. 安装

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 (传统) 或 Python数据分析与挖掘好帮手—Anaconda 进行安装,本文建议使用Anaconda。

由于qlib有许多许多依赖,如果你不想安装过程中出现问题,或者引起其他程序的运行问题,建议使用Conda创建一个你的量化投资虚拟环境:

conda create -n my_quant python=3.8

Qlib 仅支持 Python3.7以上的版本且暂不支持 Python3.10. 另外Python 3.9 版本不支持模型性能绘制,因此我选择创建Python3.8版本的虚拟环境。

(安装方式一)pip 安装:

pip install pyqlib

在pip安装的过程中如果遇到任何问题,请搜索引擎解决,如果无法解决,可以尝试下面的源码安装:

(安装方式二)源码安装:

# 提前安装一些依赖
pip install numpy
pip install --upgrade  cython

# clone and install qlib
git clone https://github.com/microsoft/qlib.git && cd qlib
python setup.py install

Windows 机器在安装的时候可能会遇到下面这个问题:

这是因为安装 qlib 的依赖 — tables 时出现了编译错误,原因很多,我选择逃学,因此建议使用 tables 的 wheel 文件进行安装,这样就不需要编译了:

https://www.lfd.uci.edu/~gohlke/pythonlibs/#pytables

在上述网站下载适合你系统的 wheel 文件:

下载完毕后,输入以下命令:

pip install 你的文件路径/tables-3.6.1-cp39-cp39-win_amd64.whl

即可完成 tables 的安装,然后再执行一遍 python setup.py install 即可。

2. 数据准备

由于这套量化开源平台的作者是中国人,所以非常贴心地准备好了A股数据,大家可以输入命令直接下载:

# 1天级别数据
python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data --region cn

# 1分钟级别数据
python scripts/get_data.py qlib_data --target_dir ~/.qlib/qlib_data/cn_data_1min --region cn --interval 1min

如果你需要其他分钟级的数据,修改interval即可。

你可以使用crontab定时自动更新数据(来自雅虎财经):

* * * * 1-5 python <script path> update_data_to_bin --qlib_data_1d_dir <user data dir>

手动更新数据:

python scripts/data_collector/yahoo/collector.py update_data_to_bin --qlib_data_1d_dir <user data dir> --trading_date <start date> --end_date <end date>

3. 运行量化回测流程示例

Qlib 提供了一个名为 qrun 自动运行整个工作流程的工具(包括构建数据集、训练模型、回测和评估)。

你可以按照以下步骤启动自动量化研究工作流程并进行图形报告分析,Quant Research 工作流程:

Qrun 运行 lightgbm 工作流程的配置 workflow_config_lightgbm_Alpha158.yaml 如下所示:

cd examples  # Avoid running program under the directory contains `qlib`
qrun benchmarks/LightGBM/workflow_config_lightgbm_Alpha158.yaml

结果如下:

可以看到这里包括三个统计分析: benchmark return (基准收益) / excess return without cost(除去手续费的超额收益)) / excess return with cost(包含手续费的超额收益)。每个统计分析中都有如下5个参数:

  • mean: 异常收益的平均值
  • std: 异常收益的标准差
  • annualized_return: 年化回报
  • information_ratio: 信息比率
  • max_drawdown: 最大回撤

​如果你想要自定义这个策略和算法的回测参数,你可以查看 workflow_config_lightgbm_Alpha158.yaml 的内容:

qlib_init:
    provider_uri: "~/.qlib/qlib_data/cn_data"
    region: cn
market: &market csi300
benchmark: &benchmark SH000300
data_handler_config: &data_handler_config
    start_time: 2008-01-01
    end_time: 2020-08-01
    fit_start_time: 2008-01-01
    fit_end_time: 2014-12-31
    instruments: *market
port_analysis_config: &port_analysis_config
    strategy:
        class: TopkDropoutStrategy
        module_path: qlib.contrib.strategy
        kwargs:
            model: <MODEL>
            dataset: <DATASET>
            topk: 50
            n_drop: 5
    backtest:
        start_time: 2017-01-01
        end_time: 2020-08-01
        account: 100000000
        benchmark: *benchmark
        exchange_kwargs:
            limit_threshold: 0.095
            deal_price: close
            open_cost: 0.0005
            close_cost: 0.0015
            min_cost: 5
task:
    model:
        class: LGBModel
        module_path: qlib.contrib.model.gbdt
        kwargs:
            loss: mse
            colsample_bytree: 0.8879
            learning_rate: 0.2
            subsample: 0.8789
            lambda_l1: 205.6999
            lambda_l2: 580.9768
            max_depth: 8
            num_leaves: 210
            num_threads: 20
    dataset:
        class: DatasetH
        module_path: qlib.data.dataset
        kwargs:
            handler:
                class: Alpha158
                module_path: qlib.contrib.data.handler
                kwargs: *data_handler_config
            segments:
                train: [2008-01-01, 2014-12-31]
                valid: [2015-01-01, 2016-12-31]
                test: [2017-01-01, 2020-08-01]
    record: 
        - class: SignalRecord
          module_path: qlib.workflow.record_temp
          kwargs: 
            model: <MODEL>
            dataset: <DATASET>
        - class: SigAnaRecord
          module_path: qlib.workflow.record_temp
          kwargs: 
            ana_long_short: False
            ann_scaler: 252
        - class: PortAnaRecord
          module_path: qlib.workflow.record_temp
          kwargs: 
            config: *port_analysis_config

参数比较多,大家翻译一下应该都能看懂。这里摘取华泰的一个研究报告,里面对参数做了具体的翻译:

为了方便用户的使用,微软内置了许多模型,如上文我们用到的 gbdt 位于你克隆的文件夹下的 qlib/contrib/model/gbdt.py:

注意:pytorch 开头的模型需要预先安装pytorch.

Qlib里,策略和算法的区别是什么?

大家注意到,Qlib这里,必须定义策略和算法两个配置,而在backtrader里面,我们更加重视策略,而非“算法”这个概念。那么这两者在Qlib中的区别是什么?我们看默认TOPK策略的源代码:

可以看到,默认的这个策略,选择了算法预测分数结果中排名 TOP K 的股票,也就是策略从算法得到的结果中去做筛选需要交易的股票。算法相当于生成一个新的可用于判断买入卖出的评判标准。这就是策略和AI算法这两者的最重要区别。

最后,得益于松耦合的代码设计,我认为 Qlib 是一个能够让不同层次的研究者各取所需的开源项目,是一个不可多得的量化开源平台,特别适合重度Python使用者,有兴趣的朋友可以试一下,我未来也会考虑出 Qlib 相关的使用教程,敬请期待。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Sweetviz 让你三行代码实现数据分析

Sweetviz是一个开源Python库,它只需三行代码就可以生成漂亮的高精度可视化效果来启动EDA(探索性数据分析)。输出一个HTML。

如上图所示,它不仅能根据性别、年龄等不同栏目纵向分析数据,还能对每个栏目做众数、最大值、最小值等横向对比。

所有输入的数值、文本信息都会被自动检测,并进行数据分析、可视化和对比,最后自动帮你进行总结,是一个探索性数据分析的好帮手。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install sweetviz

2.sweetviz 基本用法

sweetviz 使用的原理是,使用一行代码,生成一个数据报告的对象(其中,my_dataframe是pandas中的DataFrame,一种表格型数据结构):

import pandas as pd
import sweetviz as sv

# 读取数据
my_dataframe = pd.read_csv('../ImpartData/iris.csv')
# 分析数据
my_report = sv.analyze(my_dataframe)
# 生成报告
my_report.show_html()

执行完成后,会在当前文件夹下生成一个HTML的报告文件

双击这个html,你就能看到精美的分析报告了:

其中,分析数据有三种函数可以用,除了上面提到的analyze函数,还有 compare 和 compare_intra 函数。

首先是analyze函数:

analyze(source: Union[pd.DataFrame, Tuple[pd.DataFrame, str]],
            target_feat: str = None,
            feat_cfg: FeatureConfig = None,
            pairwise_analysis: str = 'auto')

可见其有以下4个参数可以配置:

  • source:以pandas中的DataFrame数据结构作为分析对象。
  • target_feat:需要被标记为目标对象的字符串。
  • feat_cfg:需要被跳过、或是需要被强制转换为某种数据类型的特征。
  • pairwise_analysis:相关性分析可能需要花费较长时间。如果超过了你的忍受范围,就需要设置这个参数为on或者off,以判断是否需要分析数据相关性。

compare()丨两个数据集比较

my_report = sv.compare([my_dataframe, "Training Data"], [test_df, "Test Data"], "Survived", feature_config)

要比较两个数据集,只需使用该 compare() 函数。它的参数与 analyze() 相同,只是插入了第二个参数来覆盖比较数据帧。建议使用 [dataframe, “name”] 参数格式以更好地区分基础数据帧和比较数据帧。(例如 [my_df, "Train"] 比 my_df 更好)

compare_intra()丨数据集栏目比较

my_report = sv.compare_intra(my_dataframe, my_dataframe["Sex"] == "male", ["Male", "Female"], feature_config)

想要对数据集中某个栏目下的参数进行分析,就采用这个函数进行。
例如,如果需要比较“性别”栏目下的“男性”和“女性”,就可以采用这个函数。

3.调整报告布局

一旦你创建了你的报告对象,只需将它传递给两个show函数中的一个:

1. show_html():

show_html(  filepath='SWEETVIZ_REPORT.html', 
            open_browser=True, 
            layout='widescreen', 
            scale=None)

show_html(…)将在当前文件路径中创建并保存 HTML 报告。有以下参数:

  • layout (布局):无论是'widescreen''vertical'。当鼠标移过每个功能时,宽屏布局会在屏幕右侧显示详细信息。新的(从 2.0 开始)垂直布局在水平方向上更加紧凑,并且可以在单击时扩展每个细节区域。
  • scale:使用浮点数(scale= 0.8None)来缩放整个报告。
  • open_browser:启用 Web 浏览器的自动打开以显示报告。如果不需要,可以在此处禁用它。

2.show_notebook():

show_notebook(  w=None, 
                h=None, 
                scale=None,
                layout='widescreen',
                filepath=None)

它将嵌入一个 IFRAME 元素,在notebook中显示报告(例如 Jupyter、Google Colab 等)。

请注意,由于Notebook通常是一个更受限制的环境,因此使用自定义宽度/高度/比例值 ( whscale) 可能是个好主意。选项是:

  • w(宽度):设置报告输出窗口的宽度。可以是百分比字符串 ( w="100%") 或像素 (w=900)。
  • h(高度):设置报告输出窗口的高度。可以是像素数 ( h=700) 或将窗口拉伸到与所有特征 ( h="Full")一样高。
  • scale:与上面的 show_html 相同。
  • layout:与上面的 show_html 相同。
  • scale:与上面的 show_html 相同。
  • filepath:可选的输出 HTML 报告。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Lean — 优秀好用的开源量化交易平台

Lean 是 QuantConnect 开源的一款非常强大的开源量化交易平台,可以回测或运行Python或者C#写的策略,并内置了上百个C#和Python写的策略算法在代码仓库中。

这个开源算法交易引擎,专为轻松地进行策略研究、回测和实时交易而构建。它集成了常见的数据提供商和券商,因此可以快速部署算法交易策略。

LEAN Engine 的核心是用 C# 编写的;但它可以在 Linux、Mac 和 Windows 操作系统上无缝运行。它支持用 Python 3.6 或 C# 编写的算法。

引擎分为许多模块化部分,可以在不接触其他文件的情况下对某个模块进行扩展。

最重要的几个模块是:

  • 结果处理(IResultHandler)处理来自算法交易引擎的所有消息。决定应该发送什么,以及消息应该去哪里。结果处理系统可以将消息发送到本地 GUI 或 Web 界面。
  • 数据源(IDataFeed)连接并下载算法交易引擎所需的数据。从磁盘文件中读取文件进行回测;实时交易则连接到一个流并生成数据对象。
  • 事务处理(ITransactionHandler)处理新的订单请求;要么使用算法提供的模拟模型,要么使用实际券商。
  • 实时事件管理(IRealtimeHandler)生成实时事件 – 例如一天结束的事件。触发对实时事件处理程序的回调。
  • 算法状态设置(ISetupHandler)配置算法资金、投资组合和请求的数据。初始化所需的所有状态参数。

这些都可以从 Launcher 项目中的 config.json 文件进行配置。

1. Leon 安装教程

由于Leon是基于C#开发的,因此我推荐使用Visual Studio进行开发。

1、克隆项目。从 https://github.com/QuantConnect/Lean 克隆项目到本地(如果你网络不通可在公众号后台回复 Lean 下载)。

2、使用 Visual Studio 打开项目中的 QuantConnect.Lean.sln

3、点击 生成 – 生成解决方案

4、点击 F5 则可以运行程序。

如果你在生成解决方案的过程中遇到了类似于如下的错误:

请在工具 – NuGet包管理器 – 程序包管理器设置 中 添加如下的源, 名字任取,链接对了就行: https://api.nuget.org/v3/index.json

2. 回测 Lean 内置的C#策略

Lean 中比较有意思的一点是,其所有C#策略算法都位于 QuantConnect.Algorithm.CSharp 中,所有的Python策略算法都位于 QuantConnect.Algorithm.Python 中:

如果你想回测C#的策略,你只需要修改 QuantConnect.Lean.Launcher 中的 config.json,将 QuantConnect.Algorithm.CSharp 中对应策略名称,修改到 algorithm-type-name 字段对应的值中,如图所示:

然后按 F5 运行程序,回测开始,此时会弹出一个cmd窗口,里面有本次回测的统计数据:

3. 回测 Lean 内置的 Python策略

如果你想要回测内置的Python策略,我们需要先指定Lean使用的Python环境位置:

1.打开系统变量(我的电脑-右键属性-高级系统设置->环境变量->系统变量)

2.点击新建变量,name为 PYTHONNET_PYDLL;value则为你的Python环境的dll文件所在文件夹,如我的为 G:\Anaconda3\python36.dll

3.在此Python环境中安装Lean的依赖:

pip install pandas
pip install wrapt==1.11.2

然后在项目的 config.json 中需要多改几个配置:

然后按F5进行回测,效果如下:

这些统计指标令人眼花缭乱,对于股票的回测我们只要重点关注这些即可:

  • Total Trades: 总交易量
  • Average Win: 平均盈利率
  • Average Loss: 平均亏损率
  • Compounding Annual Return: 复合年回报率
  • Drawdown: 最大回撤率
  • Expectancy: 期望值
  • Net Profit: 净利润
  • Sharpe Ratio: 夏普比率
  • Probabilistic Sharpe Ratio: 概率性夏普比率
  • Loss Rate: 失败率
  • Win Rate: 胜率
  • Profit-Loss Ratio: 盈亏比
  • Alpha: Alpha值
  • Beta: Beta值
  • Total Fees: 总手续费

其他的,按需关注即可。

4. Lean 策略是怎么写的?

开始之前,让我们先学习下 Lean 内置策略的写法:

from AlgorithmImports import *


class MACDTrendAlgorithm(QCAlgorithm):

    def Initialize(self):
        '''Initialise the data and resolution required, as well as the cash and start-end dates for your algorithm. All algorithms must initialized.'''

        self.SetStartDate(2004, 1, 1)    #Set Start Date
        self.SetEndDate(2015, 1, 1)      #Set End Date
        self.SetCash(100000)             #Set Strategy Cash
        # Find more symbols here: http://quantconnect.com/data
        self.AddEquity("SPY", Resolution.Daily)

        # define our daily macd(12,26) with a 9 day signal
        self.__macd = self.MACD("SPY", 12, 26, 9, MovingAverageType.Exponential, Resolution.Daily)
        self.__previous = datetime.min
        self.PlotIndicator("MACD", True, self.__macd, self.__macd.Signal)
        self.PlotIndicator("SPY", self.__macd.Fast, self.__macd.Slow)


    def OnData(self, data):
        '''OnData event is the primary entry point for your algorithm. Each new data point will be pumped in here.'''
        # wait for our macd to fully initialize
        if not self.__macd.IsReady: return

        # only once per day
        if self.__previous.date() == self.Time.date(): return

        # define a small tolerance on our checks to avoid bouncing
        tolerance = 0.0025

        holdings = self.Portfolio["SPY"].Quantity

        signalDeltaPercent = (self.__macd.Current.Value - self.__macd.Signal.Current.Value)/self.__macd.Fast.Current.Value

        # if our macd is greater than our signal, then let's go long
        if holdings <= 0 and signalDeltaPercent > tolerance:  # 0.01%
            # longterm says buy as well
            self.SetHoldings("SPY", 1.0)

        # of our macd is less than our signal, then let's go short
        elif holdings >= 0 and signalDeltaPercent < -tolerance:
            self.Liquidate("SPY")


        self.__previous = self.Time

可以看到,其实它和Backtrader的写法相差无几,Initialize 函数设置基本的回测参数,如:

  • self.SetStartDate: 回测起始时间
  • self.SetEndDate: 回测结束时间
  • self.setCash: 回测资金
  • self.AddEquity: 回测对象(Resolution.Daily 是指按日回测)
  • self.PlotIndicator: 绘图时添加指标

而 onData 函数则会在每个数据点上做操作,如果是日线,则每天的数据都会流入到这个函数并运行一遍。因此 onData 就是算法分析的主逻辑。

在这里,你可以检查需要的指标是否已经准备完毕,因为可能存在一些滞后性指标在回测刚开始的时候并没有对应的值;此外,在日线的情况下,你还可以检测上一个数据点是不是和这个点在同一天上,如果是的话则不作任何操作返回:

if not self.__macd.IsReady: return
if self.__previous.date() == self.Time.date(): return

然后就是核心的买入卖出逻辑:

tolerance = 0.0025

holdings = self.Portfolio["SPY"].Quantity

signalDeltaPercent = (self.__macd.Current.Value - self.__macd.Signal.Current.Value)/self.__macd.Fast.Current.Value

# if our macd is greater than our signal, then let's go long
if holdings <= 0 and signalDeltaPercent > tolerance:  # 0.01%
    # longterm says buy as well
    self.SetHoldings("SPY", 1.0)

# of our macd is less than our signal, then let's go short
elif holdings >= 0 and signalDeltaPercent < -tolerance:
    self.Liquidate("SPY")
    
self.__previous = self.Time

如果我持仓的股数<=0, 且信号值大于我设定的阈值,则将我资产的1%买入这只股票。这里和backtrader最大的不同,买入是以资产的百分比为单位的动态买入。当然,你也可以使用限定数量的买入方式:

self.LimitOrder("IBM", 100, self.Securities["IBM"].Price)

如果持仓股市>=0, 且触发卖出信号,则进行清仓操作:

elif holdings >= 0 and signalDeltaPercent < -tolerance:
    self.Liquidate("SPY")

如果你不希望全部清仓,也可以使用 SetHoldings 来调整仓位。

可以看到,Lean相对于Backtrader有更灵活的仓位管理方式,甚至能够进行自动仓位调整、构建投资组合、实时交易等等。而且针对一些比较复杂的策略,你还可以用C#而不是Python来编写以提高运行速度。

综上所述,Lean是一个非常值得深入学习的量化交易平台,有兴趣的同学可以在他们官网学习到更多的内容:

https://www.quantconnect.com/docs

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Box 为你的字典添加点符号访问特性

正常情况下,我们想访问字典中的某个值,都是通过中括号访问,比如:

test_dict = {"test": {"imdb stars": 6.7, "length": 104}}

print(test_dict["test"]["imdb stars"])
# 104

而通过Box模块,我们可以扩展字典功能,使用点符号访问元素:

from box import Box

movie_box = Box({ "Robin Hood: Men in Tights": { "imdb stars": 6.7, "length": 104 } })

movie_box.Robin_Hood_Men_in_Tights.imdb_stars

# 6.7

另外,可以看到默认情况下转换后,字典键值中的空格被转化为了下划线。

下面具体介绍 Box 模块的使用方法。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install --upgrade python-box[all]

2.基本使用

我们可以像文章开头那样传入一个字典给 Box,生成一个Box对象;也可以直接使用参数赋值的方式生成一个Box对象:

from box import Box

my_box = Box(funny_movie='Hudson Hawk', best_movie='Kung Fu Panda')
my_box.funny_movie
# 'Hudson Hawk'

请记住,任何情况下,你往Box对象里添加字典或是数组,这些字典或数组都会被转变为Box对象:

my_box = Box({"team": {"red": {"leader": "Sarge", "members": []}}})
print(my_box.team.red.leader)
# Sarge

my_box.team.blue = {"leader": "Church", "members": []} 
print(repr(my_box.team.blue))
# <Box: {'leader': 'Church', 'members': []}>

访问列表中的 Box 对象也非常轻松:

my_box.team.red.members = [
    {"name": "Grif", "rank": "Minor Junior Private Negative First Class"},
    {"name": "Dick Simmons", "rank": "Captain"}
]

print(my_box.team.red.members[0].name)
# Grif

局限性

请注意,字典中有些默认方法,如:clear, copy, fromkeys, get, items, keys, pop, popitem, setdefault, to_dict, update, merge_update, values,当你的键值和这些方法名称冲突时,你无法使用点符号访问它们。

不过冲突时,你依然可以使用传统的字典取值访问它们,例如:

my_box['keys']

合并

要合并两个Box对象,你只需要通过 merge_update 方法:

from box import Box

box_1 = Box(val={'important_key': 1}) 
box_2 = Box(val={'less_important_key': 2})

box_1.merge_update(box_2)

print(box_1)
# {'val': {'important_key': 1, 'less_important_key': 2}}

当然,你也可以用传统的 update 方法:

from box import Box

box_1 = Box(val={'important_key': 1}) 
box_2 = Box(val={'less_important_key': 2})

box_1.update(box_2)

print(box_1)
# {'val': {'less_important_key': 2}}

转换为原始列表/字典

如果你需要把一个 Box 对象的字典转化为原始字典,.to_dict() 方法就可以帮你实现:

from box import Box

box_1 = Box(val={'important_key': 1}) 

print(box_1)
# {'val': {'less_important_key': 2}}
print(type(box_1))
# <class 'box.box.Box'>
print(type(box_1.to_dict()))
# <class 'dict'>

如果你需要把一个 Box 对象的列表转化为原始列表,你可以使用 .to_list() 方法:

from box import BoxList

my_boxlist = BoxList({'item': x} for x in range(10))
#  <BoxList: [<Box: {'item': 0}>, <Box: {'item': 1}>, ...

my_boxlist[5].item
# 5

print(type(my_boxlist.to_list()))
# <class 'list'>

3.导入导出功能

Box对象有一个很方便的功能,就是能够轻松地将Box对象导出为Json/yaml/csv/msgpack文件:

from box import BoxList

my_boxlist = BoxList({'item': x} for x in range(10))
#  <BoxList: [<Box: {'item': 0}>, <Box: {'item': 1}>, ...

my_boxlist.to_json(filename="test.json")
# 在当前文件夹下生成一个 test.json 文件

此外,还能接受 Json/yaml/csv/msgpack 文件导入:

new_box = Box.from_json(filename="films.json")

各种类型的文件对应的方法如下:

转换器方法描述
to_dict递归地将所有 Box(和 BoxList)对象转换回字典(和列表)
to_json将 Box 对象另存为 JSON 字符串或使用filename参数写入文件
to_yaml将 Box 对象另存为 YAML 字符串或使用filename参数写入文件
to_msgpack将 Box 对象另存为 msgpack 字节或使用filename参数写入文件
to_toml*将 Box 对象另存为 TOML 字符串或使用filename参数写入文件
to_csv**将 BoxList 对象另存为 CSV 字符串或使用filename参数写入文件
from_jsonClassmethod,从一个 JSON 文件或字符串创建一个 Box 对象(所有 Box 参数都可以传递)
from_yaml类方法,从 YAML 文件或字符串创建一个 Box 对象(所有 Box 参数都可以传递)
from_msgpackClassmethod,从msgpack文件或字节创建一个Box对象(所有Box参数都可以传递)
from_toml*Classmethod,从TOML文件或字符串创建一个Box对象(所有Box参数都可以传递)
from_csv**Classmethod,从一个CSV文件或字符串创建一个BoxList对象(可以传递所有BoxList参数)

* 不适用于 BoxList,仅适用于 Box ** 不适用于 Box,仅适用于 BoxList。

还有更多的特性,大家可以参考 Box 模块官方WIki:

https://github.com/cdgriffith/Box/wiki

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Delorean 优秀的Python时间格式转换工具

DeLorean是一个Python的第三方模块,基于 pytz 和 dateutil 开发的,用于处理Python中日期时间的格式转换。

由于时间转换是一个足够微妙的问题,DeLorean希望为移位、操作和生成日期时间提供一种更干净、更省事的解决方案。比如,实例化字符串形式的时间对象,Delorean只需要 parse 指定字符串,不需要声明其格式就可以进行转换。

至于 Delorean 这个模块名称的由来,Delorean 是电影《回到未来》里的那辆极为炫酷的鸥翼汽车,采用这部电影里的非常具有代表性的汽车的名字作为库名,作者估计也是想表达使用这个库能让你在时空里任意遨游,没有掣肘。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install Delorean

2.Delorean 基础使用

轻松获取当前时间:

from delorean import Delorean

d = Delorean()
print(d)
# Delorean(datetime=datetime.datetime(2021, 10, 6, 9, 5, 57, 611589), timezone='UTC')

将datetime格式的时间转化为Delorean:

import datetime
from delorean import Delorean

d = Delorean()
print(d)
d = Delorean(datetime=datetime.datetime(2018, 5, 10, 8, 52, 23, 560811), timezone='UTC')
# 这里默认的是UTC时间
print(d)
# Delorean(datetime=datetime.datetime(2021, 10, 6, 9, 5, 57, 611589), timezone='UTC')
# Delorean(datetime=datetime.datetime(2018, 5, 10, 8, 52, 23, 560811), timezone='UTC')

转换为国内时区:

import datetime
from delorean import Delorean

d = Delorean(datetime=datetime.datetime(2018, 5, 10, 8, 52, 23, 560811), timezone='UTC')
d = d.shift("Asia/Shanghai")
print(d)
# Delorean(datetime=datetime.datetime(2018, 5, 10, 16, 52, 23, 560811), timezone='Asia/Shanghai')

输出为 datetime、date 也不在话下:

import datetime
from delorean import Delorean

d = Delorean(datetime=datetime.datetime(2018, 5, 10, 8, 52, 23, 560811), timezone='UTC')
d = d.shift("Asia/Shanghai")
print(d.datetime)
print(d.date)
# 2018-05-10 16:52:23.560811+08:00
# 2018-05-10

查看无时区时间及时间戳:

import datetime
from delorean import Delorean

d = Delorean(datetime=datetime.datetime(2018, 5, 10, 8, 52, 23, 560811), timezone='UTC')
d = d.shift("Asia/Shanghai")
print(d.epoch)
print(d.naive)
# 1525942343.560811
# 2018-05-10 08:52:23.560811

用unix时间戳初始化Delorean:

from delorean import epoch
d = epoch(1357971038.102223).shift("Asia/Shanghai")
print(d)
# Delorean(datetime=datetime.datetime(2013, 1, 12, 14, 10, 38, 102223), timezone='Asia/Shanghai')

Delorean支持timedelta的时间加减法。Delorean可以使用timedelta进行加减,得到一个Delorean对象:

import datetime
from delorean import Delorean

d = Delorean(datetime=datetime.datetime(2018, 5, 10, 8, 52, 23, 560811), timezone='UTC')
d = d.shift("Asia/Shanghai")
print(d)
d2 = d + datetime.timedelta(hours=2)
print(d2)
d3 = d - datetime.timedelta(hours=3)
print(d3)
# Delorean(datetime=datetime.datetime(2018, 5, 10, 16, 52, 23, 560811), timezone='Asia/Shanghai')
# Delorean(datetime=datetime.datetime(2018, 5, 10, 18, 52, 23, 560811), timezone='Asia/Shanghai')
# Delorean(datetime=datetime.datetime(2018, 5, 10, 13, 52, 23, 560811), timezone='Asia/Shanghai')

3. Delorean 高级使用

通常情况下我们不关心有多少微妙或者多少秒,因此Delorean提供了非常方便的过滤方式:

from delorean import Delorean

d = Delorean()
print(d)
# Delorean(datetime=datetime.datetime(2019, 3, 14, 4, 0, 50, 597357), timezone='UTC')
d.truncate('second')
# Delorean(datetime=datetime.datetime(2019, 3, 14, 4, 0, 50), timezone='UTC')
d.truncate('hour')
# Delorean(datetime=datetime.datetime(2019, 3, 14, 4, 0), timezone='UTC')
d.truncate('month')
# Delorean(datetime=datetime.datetime(2019, 3, 1, 0, 0), timezone='UTC')
d.truncate('year')
# Delorean(datetime=datetime.datetime(2019, 1, 1, 0, 0), timezone='UTC')

另外,datetime格式的字符串处理的时候转换需要标明各种各样的格式,在Delorean你直接parse就可以了:

from delorean import parse
parse("2011/01/01 00:00:00 -0700")
# Delorean(datetime=datetime.datetime(2011, 1, 1, 0, 0), timezone=pytz.FixedOffset(-420))
parse("2018-05-06")
# Delorean(datetime=datetime.datetime(2018, 6, 5, 0, 0), timezone='UTC')

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

有趣好用的Python教程

退出移动版