上一篇股票文章中,我们讲了如何通过tushare下载股票数据,并存入mongodb:
其中有非常大的优化空间,比如,单线程的下载速度太慢了,能不能用多线程的方式?有时候网络连接会失败,能不能增加重试机制?
这些问题,我们将在这篇文章里进行优化。
1.准备
开始之前,你要确保Python和pip已经成功安装在电脑上噢,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda
Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),准备开始输入命令安装依赖。
当然,我更推荐大家用VSCode编辑器,把本文代码Copy下来,在编辑器下方的终端运行命令安装依赖模块,多舒服的一件事啊:Python 编程的最好搭档—VSCode 详细指南。
此外,本文章是 Python 获取股票数据并存入MongoDB实战教程 的优化版,请根据该文章提前装好tushare和mongodb.
除此之外,你还需要安装celery和eventlet:
pip install celery pip install eventlet
看到 Successfully installed xxx 则说明安装成功。
2.使用Celery异步下载股票数据
Celery是一个强大的异步任务队列,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。
我们通常使用它来实现异步任务(async task)和定时任务(crontab)。
为了能异步下载股票,我们需要将上篇文章的内容拆分为两个部分:
1.Celery任务:获得某股票的日线数据;
2.分发任务:分发获取指定股票的日线数据;
这样,在我们的场景中,使用Celery实现异步任务主要包含三个步骤:
1.创建Celery任务实例(获得某股票的日线数据)
2.启动Celery Worker,用于运行任务
3.应用程序分发任务(分发获取指定股票的日线数据)
思路已经理清楚了,下面动手实践一下:
2.1 创建Celery任务实例:
此处大部分代码和上篇文章相似,因此下面仅显示和celery相关的核心代码:
# Python实用宝典
# https://pythondict.com
from celery import Celery
# 设置BROKER
BROKER_URL = 'mongodb://127.0.0.1:27017/celery'
# 新建celery任务
app = Celery('my_task', broker=BROKER_URL)
@app.task
def get_stock_daily(start_date, end_date, code):
"""
Celery任务:获得某股票的日数据
Args:
start_date (str): 起始日
end_date (str): 结束日
code (str): 指定股票
"""
# 请求tushare数据,并转化为json格式
df = pro.daily(ts_code=code, start_date=start_date, end_date=end_date)
data = json.loads(df.T.to_json()).values()
# 这里为了保证数据的绝对稳定性,选择一条条创建
for row in data:
daily.update({"_id": f"{row['ts_code']}-{row['trade_date']}"}, row, upsert=True)
print(f"{code}: 插入\更新 完毕 - {start_date} to {end_date}")
2.2 启动worker
在cmd执行以下命令启动celery worker:
python -m celery worker -A tasks --loglevel=info --pool=eventlet
注意,这里使用了–pool=eventlet,是为了让windows机器具有并行运行的能力。
2.3 分发获取指定股票的日数据
遍历一遍股票列表,通过delay调用Celery任务实例,将任务分发给worker:
# Python实用宝典 # https://pythondict.com from tasks import get_stock_daily def delay_stock_data(start_date, end_date): """ 获得A股所有股票日数据 Args: start_date (str): 起始日 end_date (str): 结束日 """ codes = open('./codes.csv', 'r', encoding='utf-8').readlines() # 遍历所有股票ID for code in codes: get_stock_daily.delay(start_date, end_date, code) delay_stock_data("20180101", "20200725")
这样,worker就会在后台异步执行这些任务,切换到worker的命令行中,你会看到输出如丝般润滑:
好景不长,不久后你肯定会受到tushare发送回来的CPS限制错误:
Exception: 抱歉,您每分钟最多访问该接口800次,权限的具体详情访问:https://tushare.pro/document/1?doc_id=108。
3.限制访问次数与重试机制
为了解决这个CPS问题,我确实花了不少时间,尝试控制worker频率,无果,最终选择了一个不是办法的办法:
在Tushare报错的时候,捕捉起来,控制其60秒后重试
# 请求tushare数据,并转化为json格式 try: df = pro.daily(ts_code=code, start_date=start_date, end_date=end_date) except Exception as e: # 触发普通用户CPS限制,60秒后重试 print(e) get_stock_daily.retry(countdown=60)
简单,但有效。
此外,为了防止网络请求出现问题,导致某个任务丢失,我们还可以在下发任务的时候使用apply_async配置失败重试。默认重试三次:
def delay_stock_data(start_date, end_date): """ 获得A股所有股票日数据 Args: start_date (str): 起始日 end_date (str): 结束日 """ codes = open('./codes.csv', 'r', encoding='utf-8').readlines() # 遍历所有股票ID for code in codes: get_stock_daily.apply_async( (start_date, end_date, code), retry=True )
这样,一个相对健壮的股票数据异步下载器就完成了。
用该方法把A股所有股票下载一遍,应该不会超过5分钟。
如果你给tushare氪了金,没有cps限制,下载时间不会超过1分钟。
目前github上好像缺少这样的项目,因此我将该项目开源到了GitHub上:
https://github.com/Ckend/stock_download_celery
目前仅支持日线数据,欢迎补充。
我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。
有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。
原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!
Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典
评论(0)