自定义MySQL数据流 — Python 量化投资实战教程(9)

前面八篇量化投资实战教程,我们所使用到的数据仅仅只有收盘价、成交量等普通指标,如果我们有其他的指标需要进行回测怎么办?

此外,前面使用的数据源都是基于csv文件的,我们能否从数据库(比如MySQL)中直接提取数据作为回测的数据源呢?

​事实上,backtrader虽然没有直接提供接口给我们做这样的优化,但是我们可以通过继承DataBase基类重写DataFeed实现目的。下面就给大家演示一下如何从MySQL中提取数据并增加换手率指标进行回测。

本文完整源代码和数据均在开源代码仓库中:
https://github.com/Ckend/pythondict-quant

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

在终端输入以下命令安装我们所需要的依赖模块:

pip install backtrader
pip install numpy
pip install matplotlib

看到 Successfully installed xxx 则说明安装成功。

2.自定义DataFeed

何为DataFeed?DataFeed 即 backtrader 中的“数据源”,任何数据进入策略回测前都要通过DataFeed,而DataFeed中会对数据进行处理,使得策略可以高效地进行计算。

而我们今天要做的,就是增加一个基于MySQL的DataFeed,使得整个流程变得更加自动化。

首先,需要定义一个类,使其继承与backtrader的数据基类 DataBase:

from backtrader.feed import DataBase
from backtrader import date2num


class MySQLData(DataBase):
    pass

如果需要从外部传入所需股票数据的代码和其一定范围内的K线数据,需要提前定义params. 此外,如果你有除了datetime(时间)open(开盘价)close(收盘价)high(最高价)low(最低价)volume(成交量) 之外的指标。需要提前定义lines,如下所示:

class MySQLData(DataBase):
    params = (
        ('fromdate', datetime.datetime.min),
        ('todate', datetime.datetime.max),
        ('ts_code', ''),
    )
    lines = (
        "turnover",
        "turnover_rate"
    )

可见在lines中,我增加了两个指标:turnover(成交额)turnover_rate(换手率)

接下来,编写一个函数根据params参数从MySQL中获取数据:

    def load_data_from_db(self, table, ts_code, start_time, end_time):
        """
        从MySQL加载指定数据
        Args:
            table (str): 表名
            ts_code (str): 股票代码
            start_time (str): 起始时间
            end_time (str): 终止时间
        return:
            data (List): 数据集
        """
        db = pymysql.connect(
            host="localhost",
            user="root",
            password="12345678",
            db="golden_stone",
            port=3306
        )

        cur = db.cursor()
        sql = cur.execute(
            f"SELECT * FROM {table} WHERE trade_time >= '{start_time}' and trade_time < '{end_time}'"
            f"and ts_code = '{ts_code}' order by trade_time asc"
        )
        data = cur.fetchall()
        db.close()
        return iter(list(data))

代码本身没有什么可说的,记得替换你本地的mysql配置,值得注意的是最后一行,拿到mysql数据后需要转化为迭代器。

在类初始化的时候,需要定义相关的数据存放变量并调用上述函数获取数据:

    def __init__(self, *args, **kwargs):
        self.result = []
        self.empty = False

    def start(self):
        self.result = self.load_data_from_db("stock_normalk", self.p.ts_code, self.p.fromdate, self.p.todate)

接下来到了关键的步骤,在 `cerebro.adddata(data)` 的时候,cerebro会遍历Datafeed的所有数据,此时会调用_load函数, 因此我们需要在这里,将数据库中提取的每列数据对应到lines上:

    def _load(self):
        if self.empty:
            return False
        try:
            one_row = next(self.result)
        except StopIteration:
            return False
        self.lines.datetime[0] = date2num(one_row[3])
        self.lines.open[0] = float(one_row[4])
        self.lines.high[0] = float(one_row[5])
        self.lines.low[0] = float(one_row[6])
        self.lines.close[0] = float(one_row[7])
        self.lines.volume[0] = float(one_row[8])
        self.lines.turnover[0] = float(one_row[9])
        self.lines.turnover_rate[0] = float(one_row[12])
        return True

如果你完整地看完了我的上述分析,那么理解下面整个DataFeed,甚至自己写一个DataFeed,是非常容易的。

# Python 实用宝典
# 自定义数据流 — Python 量化投资实战教程(9)
import datetime
import traceback
import pymysql

from backtrader.feed import DataBase
from backtrader import date2num


class MySQLData(DataBase):
    params = (
        ('fromdate', datetime.datetime.min),
        ('todate', datetime.datetime.max),
        ('ts_code', ''),
    )
    lines = (
        "turnover",
        "turnover_rate"
    )

    def load_data_from_db(self, table, ts_code, start_time, end_time):
        """
        从MySQL加载指定数据
        Args:
            table (str): 表名
            ts_code (str): 股票代码
            start_time (str): 起始时间
            end_time (str): 终止时间
        return:
            data (List): 数据集
        """
        db = pymysql.connect(
            host="localhost",
            user="root",
            password="12345678",
            db="golden_stone",
            port=3306
        )

        cur = db.cursor()
        sql = cur.execute(
            f"SELECT * FROM {table} WHERE trade_time >= '{start_time}' and trade_time < '{end_time}'"
            f"and ts_code = '{ts_code}' order by trade_time asc"
        )
        data = cur.fetchall()
        db.close()
        return iter(list(data))

    def __init__(self, *args, **kwargs):
        self.result = []
        self.empty = False

    def start(self):
        self.result = self.load_data_from_db("stock_normalk", self.p.ts_code, self.p.fromdate, self.p.todate)

    def _load(self):
        if self.empty:
            return False
        try:
            one_row = next(self.result)
        except StopIteration:
            return False
        self.lines.datetime[0] = date2num(one_row[3])
        self.lines.open[0] = float(one_row[4])
        self.lines.high[0] = float(one_row[5])
        self.lines.low[0] = float(one_row[6])
        self.lines.close[0] = float(one_row[7])
        self.lines.volume[0] = float(one_row[8])
        self.lines.turnover[0] = float(one_row[9])
        self.lines.turnover_rate[0] = float(one_row[12])
        return True

3.使用自定义数据流进行回测

接下来,让我们尝试使用这个自定义数据流输入数据,采用第二章的macd策略辅助增加换手率指标进行回测。

这里当然需要你先读懂第二章的内容,如果有点忘记了,可以回头阅读一下,非常简单:

Python 量化投资实战教程(2) —MACD策略

首先,在回测模块及next函数中,引入换手率指标:

class TestStrategy(bt.Strategy):
    def log(self, txt, dt=None):
        ''' Logging function fot this strategy'''
        dt = dt or self.datas[0].datetime.date(0)
        print('%s, %s' % (dt.isoformat(), txt))

    @staticmethod
    def percent(today, yesterday):
        return float(today - yesterday) / today

    def __init__(self):
        self.dataclose = self.datas[0].close
        self.volume = self.datas[0].volume
        # 新的变更:引入换手率指标
        self.turnover_rate = self.datas[0].turnover_rate

next函数买入时增加判断换手率必须小于3%的条件:

    # Python 实用宝典
    def next(self):
        self.log('Close, %.2f' % self.dataclose[0])
        if self.order:
            return

        if not self.position:
            condition1 = self.macd[-1] - self.signal[-1]
            condition2 = self.macd[0] - self.signal[0]
            # 增加判断换手率小于3%的条件
            if condition1 < 0 and condition2 > 0 and self.turnover_rate[0] < 3:
                self.log('BUY CREATE, %.2f' % self.dataclose[0])
                self.order = self.buy()

        else:
            condition = (self.dataclose[0] - self.bar_executed_close) / self.dataclose[0]
            if condition > 0.1 or condition < -0.1:
                self.log('SELL CREATE, %.2f' % self.dataclose[0])
                self.order = self.sell()

最后,引入我们刚刚编写完成的MySQLData Feed,传入相关参数读取股票为603520.SH的数据流,取2017年1月1日至2020年4月12日的数据,并调用回测函数:

if __name__ == '__main__':
    cerebro = bt.Cerebro()

    cerebro.addstrategy(TestStrategy)

    # 加载数据到模型中
    data = MySQLData(
        ts_code="sh603520",
        fromdate=datetime.datetime(2017, 1, 1),
        todate=datetime.datetime(2020, 4, 12),
    )
    cerebro.adddata(data)

    cerebro.broker.setcash(10000)

    cerebro.addsizer(bt.sizers.FixedSize, stake=100)

    cerebro.broker.setcommission(commission=0.005)

    print('Starting Portfolio Value: %.2f' % cerebro.broker.getvalue())

    cerebro.run()

    print('Final Portfolio Value: %.2f' % cerebro.broker.getvalue())

    cerebro.plot()

效果如下:

与原来相比,加入换手率指标后,利润率有一定的提高,不过就如我们之前所说的,单一股票维度的回测是不准确的,如果大家有兴趣,可以将第三章:

Python 量化投资实战教程(3) —A股回测MACD策略

改造一下并加入换手率指标进行回测,看看这个指标是否真的有正效益。

本文完整源代码和数据均在开源代码仓库中:
https://github.com/Ckend/pythondict-quant

如果你访问不了github,也可以在公众号后台回复 量化投资9 下载相关代码。

欢迎在公众号后台回复:加群,回答相应红字验证信息,进入互助群交流。

我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注