Python celery异步快速下载股票数据

上一篇股票文章中,我们讲了如何通过tushare下载股票数据,并存入mongodb:

Python 获取股票数据并存入MongoDB实战教程

其中有非常大的优化空间,比如,单线程的下载速度太慢了,能不能用多线程的方式?有时候网络连接会失败,能不能增加重试机制?

这些问题,我们将在这篇文章里进行优化。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上噢,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),准备开始输入命令安装依赖。

当然,我更推荐大家用VSCode编辑器,把本文代码Copy下来,在编辑器下方的终端运行命令安装依赖模块,多舒服的一件事啊:Python 编程的最好搭档—VSCode 详细指南。

此外,本文章是 Python 获取股票数据并存入MongoDB实战教程 的优化版,请根据该文章提前装好tushare和mongodb.

除此之外,你还需要安装celery和eventlet:

pip install celery
pip install eventlet

看到 Successfully installed xxx 则说明安装成功。

2.使用Celery异步下载股票数据

Celery是一个强大的异步任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。

我们通常使用它来实现异步任务(async task)和定时任务(crontab)。

为了能异步下载股票,我们需要将上篇文章的内容拆分为两个部分:

1.Celery任务:获得某股票的日线数据
2.分发任务:分发获取指定股票的日线数据

这样,在我们的场景中,使用Celery实现异步任务主要包含三个步骤:

1.创建Celery任务实例(获得某股票的日线数据)
2.启动Celery Worker,用于运行任务
3.应用程序分发任务(分发获取指定股票的日线数据)

思路已经理清楚了,下面动手实践一下:

2.1 创建Celery任务实例:

此处大部分代码和上篇文章相似,因此下面仅显示和celery相关的核心代码:

# Python实用宝典
# https://pythondict.com
from celery import Celery

# 设置BROKER
BROKER_URL = 'mongodb://127.0.0.1:27017/celery'
# 新建celery任务
app = Celery('my_task', broker=BROKER_URL)

@app.task
def get_stock_daily(start_date, end_date, code):
    """
    Celery任务:获得某股票的日数据

    Args:
        start_date (str): 起始日
        end_date (str): 结束日
        code (str): 指定股票
    """


    # 请求tushare数据,并转化为json格式
    df = pro.daily(ts_code=code, start_date=start_date, end_date=end_date)
    data = json.loads(df.T.to_json()).values()

    # 这里为了保证数据的绝对稳定性,选择一条条创建
    for row in data:
        daily.update({"_id": f"{row['ts_code']}-{row['trade_date']}"}, row, upsert=True)

    print(f"{code}: 插入\更新 完毕 - {start_date} to {end_date}")

2.2 启动worker

在cmd执行以下命令启动celery worker:

python -m celery worker -A tasks --loglevel=info --pool=eventlet

注意,这里使用了–pool=eventlet,是为了让windows机器具有并行运行的能力。

2.3 分发获取指定股票的日数据

遍历一遍股票列表,通过delay调用Celery任务实例,将任务分发给worker:

# Python实用宝典
# https://pythondict.com
from tasks import get_stock_daily

def delay_stock_data(start_date, end_date):
    """
    获得A股所有股票日数据

    Args:
        start_date (str): 起始日
        end_date (str): 结束日
    """

    codes = open('./codes.csv', 'r', encoding='utf-8').readlines()

    # 遍历所有股票ID
    for code in codes:
        get_stock_daily.delay(start_date, end_date, code)

delay_stock_data("20180101", "20200725")

这样,worker就会在后台异步执行这些任务,切换到worker的命令行中,你会看到输出如丝般润滑:

好景不长,不久后你肯定会受到tushare发送回来的CPS限制错误:

Exception: 抱歉,您每分钟最多访问该接口800次,权限的具体详情访问:https://tushare.pro/document/1?doc_id=108。

3.限制访问次数与重试机制

为了解决这个CPS问题,我确实花了不少时间,尝试控制worker频率,无果,最终选择了一个不是办法的办法:

在Tushare报错的时候,捕捉起来,控制其60秒后重试

    # 请求tushare数据,并转化为json格式
    try:
        df = pro.daily(ts_code=code, start_date=start_date, end_date=end_date)
    except Exception as e:
        # 触发普通用户CPS限制,60秒后重试
        print(e)
        get_stock_daily.retry(countdown=60)

简单,但有效。

此外,为了防止网络请求出现问题,导致某个任务丢失,我们还可以在下发任务的时候使用apply_async配置失败重试。默认重试三次:

def delay_stock_data(start_date, end_date):
    """
    获得A股所有股票日数据

    Args:
        start_date (str): 起始日
        end_date (str): 结束日
    """

    codes = open('./codes.csv', 'r', encoding='utf-8').readlines()

    # 遍历所有股票ID
    for code in codes:
        get_stock_daily.apply_async(
            (start_date, end_date, code), retry=True
        )

这样,一个相对健壮的股票数据异步下载器就完成了。

用该方法把A股所有股票下载一遍,应该不会超过5分钟。

如果你给tushare氪了金,没有cps限制,下载时间不会超过1分钟。

目前github上好像缺少这样的项目,因此我将该项目开源到了GitHub上:

https://github.com/Ckend/stock_download_celery

目前仅支持日线数据,欢迎补充。

我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注