分类目录归档:开发工具

Schedule—简单实用的Python 周期任务调度工具

如果你想周期性地执行某个Python函数或脚本,最出名的选择应该是Crontab,但是Crontab具有以下缺点:

  • 1.不方便执行秒级任务。
  • 2.当需要执行的定时任务有上百个的时候,Crontab的管理就会特别不方便。

还有一个选择是Celery,但是Celery的配置比较麻烦,如果你只是需要一个轻量级的调度工具,那么Celery不是一个好选择。

在你想要使用一个轻量级的任务调度工具,而且希望它尽量简单、容易使用、不需要外部依赖,最好能够容纳Crontab的所有基本功能,那么Schedule模块是你的不二之选。

使用它来调度任务可能只需要几行代码,感受一下:

# Python 实用宝典
import schedule
import time

def job():
    print("I'm working...")

schedule.every(10).minutes.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

上面的代码表示每10分钟执行一次job函数,非常简单方便。你只需要引入schedule模块,通过调用 `scedule.every(时间数).时间类型.do(job)` 发布周期任务。

发布后的周期任务需要用 run_pending 函数来检测是否执行,因此需要一个While循环不断地轮询这个函数。

下面具体讲讲Schedule模块的安装和初级、进阶使用方法。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install schedule

2.Schedule 基本使用

最基本的使用在文首已经提到过,下面给大家展示更多的调度任务例子:

# Python 实用宝典
import schedule
import time

def job():
    print("I'm working...")

# 每十分钟执行任务
schedule.every(10).minutes.do(job)
# 每个小时执行任务
schedule.every().hour.do(job)
# 每天的10:30执行任务
schedule.every().day.at("10:30").do(job)
# 每个月执行任务
schedule.every().monday.do(job)
# 每个星期三的13:15分执行任务
schedule.every().wednesday.at("13:15").do(job)
# 每分钟的第17秒执行任务
schedule.every().minute.at(":17").do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

可以看到,从月到秒的配置,上面的例子都覆盖到了。不过如果你想只运行一次任务的话,可以这么配:

# Python 实用宝典
import schedule
import time

def job_that_executes_once():
    # 此处编写的任务只会执行一次...
    return schedule.CancelJob

schedule.every().day.at('22:30').do(job_that_executes_once)

while True:
    schedule.run_pending()
    time.sleep(1)

参数传递

如果你有参数需要传递给作业去执行,你只需要这么做:

# Python 实用宝典
import schedule

def greet(name):
    print('Hello', name)

# do() 将额外的参数传递给job函数
schedule.every(2).seconds.do(greet, name='Alice')
schedule.every(4).seconds.do(greet, name='Bob')

获取目前所有的作业

如果你想获取目前所有的作业,可以这么做:

# Python 实用宝典
import schedule

def hello():
    print('Hello world')

schedule.every().second.do(hello)

all_jobs = schedule.get_jobs()

取消所有作业

如果某些机制触发了,你需要立即清除当前程序的所有作业,需要这么使用:

# Python 实用宝典
import schedule

def greet(name):
    print('Hello {}'.format(name))

schedule.every().second.do(greet)

schedule.clear()

标签功能

在设置作业的时候,为了后续方便管理作业,你可以给作业打个标签,这样你可以通过标签过滤获取作业或取消作业。

# Python 实用宝典
import schedule

def greet(name):
    print('Hello {}'.format(name))

# .tag 打标签
schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')

# get_jobs(标签):可以获取所有该标签的任务
friends = schedule.get_jobs('friend')

# 取消所有 daily-tasks 标签的任务
schedule.clear('daily-tasks')

设定作业截止时间

如果你需要让某个作业到某个时间截止,你可以通过这个方法:

# Python 实用宝典
import schedule
from datetime import datetime, timedelta, time

def job():
    print('Boo')

# 每个小时运行作业,18:30后停止
schedule.every(1).hours.until("18:30").do(job)

# 每个小时运行作业,2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)

# 每个小时运行作业,8个小时后停止
schedule.every(1).hours.until(timedelta(hours=8)).do(job)

# 每个小时运行作业,11:32:42后停止
schedule.every(1).hours.until(time(11, 33, 42)).do(job)

# 每个小时运行作业,2020-5-17 11:36:20后停止
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)

截止日期之后,该作业将无法运行。

立即运行所有作业,而不管其安排如何

如果某个机制触发了,你需要立即运行所有作业,可以调用 schedule.run_all() :

# Python 实用宝典
import schedule

def job_1():
    print('Foo')

def job_2():
    print('Bar')

schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)

schedule.run_all()

# 立即运行所有作业,每次作业间隔10秒
schedule.run_all(delay_seconds=10)

3.高级使用

装饰器安排作业

如果你觉得设定作业这种形式太啰嗦了,也可以使用装饰器模式:

# Python 实用宝典
from schedule import every, repeat, run_pending
import time

# 此装饰器效果等同于 schedule.every(10).minutes.do(job)
@repeat(every(10).minutes)
def job():
    print("I am a scheduled job")

while True:
    run_pending()
    time.sleep(1)

并行执行

默认情况下,Schedule 按顺序执行所有作业。其背后的原因是,很难找到让每个人都高兴的并行执行模型。

不过你可以通过多线程的形式来运行每个作业以解决此限制:

# Python 实用宝典
import threading
import time
import schedule

def job1():
    print("I'm running on thread %s" % threading.current_thread())
def job2():
    print("I'm running on thread %s" % threading.current_thread())
def job3():
    print("I'm running on thread %s" % threading.current_thread())

def run_threaded(job_func):
    job_thread = threading.Thread(target=job_func)
    job_thread.start()

schedule.every(10).seconds.do(run_threaded, job1)
schedule.every(10).seconds.do(run_threaded, job2)
schedule.every(10).seconds.do(run_threaded, job3)

while True:
    schedule.run_pending()
    time.sleep(1)

日志记录

Schedule模块同时也支持logging日志记录,这么使用:

# Python 实用宝典
import schedule
import logging

logging.basicConfig()
schedule_logger = logging.getLogger('schedule')
# 日志级别为DEBUG
schedule_logger.setLevel(level=logging.DEBUG)

def job():
    print("Hello, Logs")

schedule.every().second.do(job)

schedule.run_all()

schedule.clear()

效果如下:

DEBUG:schedule:Running *all* 1 jobs with 0s delay in between
DEBUG:schedule:Running job Job(interval=1, unit=seconds, do=job, args=(), kwargs={})
Hello, Logs
DEBUG:schedule:Deleting *all* jobs

异常处理

Schedule不会自动捕捉异常,它遇到异常会直接抛出,这会导致一个严重的问题:后续所有的作业都会被中断执行,因此我们需要捕捉到这些异常。

你可以手动捕捉,但是某些你预料不到的情况需要程序进行自动捕获,加一个装饰器就能做到了:

# Python 实用宝典
import functools

def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except:
                import traceback
                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob
        return wrapper
    return catch_exceptions_decorator

@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0

schedule.every(5).minutes.do(bad_task)

这样,bad_task在执行时遇到的任何错误,都会被catch_exceptions捕获,这点在保证调度任务正常运转的时候非常关键。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Jupyter 中 IPython 可以用的12个方便命令

我在日常编程中一般都会用到两个工具——Pycharm和Jupyter,在刷算法、写爬虫时会用到前者,因为我习惯用Pycharm里的Debug功能调试,很容易找出代码中的Bug。而进行数据分析、机器学习时就会用到后者,因为Jupyter编译器利用的IPython是一种交互式计算和开发环境,而且有许多方便命令。
Jupyter对数据的可视化十分友好,这类单元格的形式每一步都有运行结果,便于整理自己思路,并且很大程度上节约了运行时间,在调试的时候只需要运行出错的部分代码,而不是全部。

IPython中有一些特有的魔法命令,如果能合理的利用这些魔法命令,会省去很多不必要的操作,为编程带来很大程度的便利,下面就来安利十二个常用的魔法命令。

方便命令的基础常识

  • ?和?? ->例:%matplotlib?、%matplotlib??

后缀为?可以获取一个对象的相关信息,比如描述一个方法该怎么用;后缀为??可以获取该对象更加详细的信息,比如源码。这个对象可以是IPython中自带的、也可以是导入的、也可以是自己定义的。

  • %和%% ->例:%time、%%time

前缀为%被称作行魔法命令(line magics),只能在单个输入行上运行;前缀为%%被称作单元格魔法命令(cell magics),可以在多个输入行上运行。

1.%Ismagic和%magic

如果你还不了解IPython的魔法命令,那这两个魔法命令一定是最重要的,记牢这两个命令之后慢慢了解剩下的。%lsmagic的作用就是列出所有存在的行魔法命令和单元格魔法命令,部分截图如下:

%magic的作用就是给出所有魔法命令的详细介绍,比如介绍、样例等等,比较考验英语功底,耐下心慢慢了解。

2.%pdb

输入这个命令并且运行之后,如果后面的代码出现了异常,这个指令就会主动进入调试器,几十行几百行代码难免会有几个或一堆Bug。可能比较笨的方法就是找断点然后print,最后还要把print删掉,而%pdb调试找到Bug后直接退出就好,相对前者更方便些。

比如两个数相加,不小心把一个整数定义成字符型,在调用函数计算时会发生报错,然后就可以进入调试器进行调试,切记最后要通过exit()退出,不能直接终止单元格运行。

3.%debug

%debug的作用与%pdb几乎是一样的,不同之处就是%pdb在遇到异常自动进入调试器,而%debug是人遇到报错主动输入指令进入调试器,仍然是上面那个例子,调试界面如下:
主动和被动两种调试方式大家可以靠自己喜好选择,我个人比较喜欢%debug。

4.%who和%whos

代码一多变量可能就会变多,变量一多可能就会混淆,或者在删除单元格的时候不小心把变量定义的单元格也删掉了,%who和%whos这两条命令就起到大作用了。
%who给出的信息只有全局变量的名称,而%whos给出的信息更加详细,包括变量名称、类型、和数据。

5.%time和%timeit

这两条命令都是用来输出代码的执行时间,比如可以用来粗略的比较两种算法在相同的问题上执行时间哪一个更少,不同点在于%time只执行一次就输出执行时间,而%timeit是执行多次然后计算平均时间再输出。
比如这里%timeit命令输出中有7 runs代表共执行7次,这两个命令都为行命令,%%time和%%timeit为单元格命令,区别同上。

6.%store

如果你在一个文件中花了很长的时间清理了一些数据,比如对原始数据缺失值填充呀、降维呀、转换呀等等,然后在另一个文件中需要用到同样的数据,笨一点方法就是将数据保存然后在新文件中调用,但这种操作一条%store命令就能完成,我们先在一个文件中利用%store保存一个变量。
然后在另一个文件中调用这个变量:
可以看到直接调用是会报错的,但利用了%store -r命令之后就可以成功调用被%store保存的变量,所以%store用来保存,%store -r用来读取。

7.%xdel和%reset

这条命令的作用就是删除变量,并且删除其在IPython中的对象上的一切引用。平时在数据清洗时,从原始数据到清洗后的数据中间要经过很多步骤,我们不可能全程用一个变量名称,所以中间步骤很容易为数据起一些类似的名称,而利用%xdel就可以将无用的单个变量名称删掉,防止混淆。
%reset的作用就是删除所有变量名。

8.%cls

在数据清洗时候,通常都是做一步然后输出一次数据集,观察一下变化,我们都知道展示数据集是很占网页的,久而久之,这个notebook就特别长,再想查看文件前面的内容不仅需要滚动很长时间滑轮,而且数据间很容易混淆,所以每当输出一次数据集后可以利用%cls命令清除一次,使notebook看起来更整洁。
可以看到正常的话data之后会打印数据集,但利用%cls之后数据集的输出被清除了。

9.%%writefile

如果我们想写一个函数,例如去除中文符号的函数,这样的函数在很多情景下都可以利用,所以我们可以将这个函数写入一个单独文件,想用的时候直接调用,这个操作可以利用%%writefile命令进行写入。

10.%run

%run命令的作用就是运行脚本文件,不仅可以直接使用脚本文件中的代码,脚本文件也可以使用IPython环境中的变量,仍用上面的例子,可以用%run命令直接运行。

11.%psource

如果你在notebook定义了一个函数,但隔了比较久需要用到这个函数,但是可能忘记了这个函数需要传入哪些参数、或者传入参数的类型应该是什么,这种情况下就不得不往前翻寻找这个函数的代码,但利用%psource可以偷懒,这个命令就是输出源代码。
前面提及的??也有相同的作用,但是输出的形式没有%psource直观,还混有其它的信息在里面。

12.%hist

%hist的作用就是打印所有命令行输入的历史记录,方便查看之前输入的代码信息。
这个命令允许设置查询的区间,也就是命令行输入对应的序号。
这些魔法命令有一部分能被常用的代码语句代替,但是却没有魔法命令简单明了,只是个人习惯的问题,如果可能尽量改掉自己的思维定式,用更加便捷的代码处理问题。
转自Python编程时光。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 调用 Shodan 实战教程 — 互联网上最可怕的搜索引擎

Shodan 在百度百科里被给出了这么一句话介绍:Shodan是互联网上最可怕的搜索引擎。

为什么呢?与谷歌、百度等搜索引擎爬取网页信息不同,Shodan爬取的是互联网上所有设备的IP地址及其端口号。

而随着智能家电的普及,家家户户都有许多电器连接到互联网,这些设备存在被入侵的可能性,这是十分危险的。

说了这么多,给大家体验下shodan,让你们有更切身的理解。打开shodan,在搜索框输入 Hikvision-Webs:

你会搜素到这个品牌的摄像头设备遍及全球的IP及其暴露的端口号:

可以看到,这台机器暴露了17、80、111、995、3128、5000、6000、20547端口,黑客可以根据这些端口进行针对性的攻击。

不过也不需要过于担心,如果你的服务不存在漏洞,一般是无法攻入的。但有些端口号会暴露摄像头的web管理端,如下:

那么黑客可能可以用暴力破解的方式,强行进入摄像头后台管理端,获取到实时的录像。

谨记这会侵犯别人的隐私权,是违法的行为,我们是遵纪守法的好公民所以知道它的原理和危害就足够。我们的目的是运用技术保护好个人隐私,如非必要不将摄像头接入互联网,一定要接入的话,不能使用容易被破解的弱口令。

Shodan Web端非常好用,但如果我们有从Python搜索的需求怎么办?

没关系,shodan 官方也提供了python官方SDK包,下面就来讲讲这个SDK包的使用。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install shodan

2.Shodan 注册账号获取API

使用 Shodan 必须注册账号,注册网址https://account.shodan.io/register

输入完相关信息,点击 CREATE 会跳转到个人账户页:

此时 API Key 会显示你的API秘钥,请记录这个秘钥,后续会使用到这个秘钥去请求接口。

3.Shodan 基本调用实战教程

Shodan本质上就是一个搜索引擎,你只需要输入搜索的关键词:

# 公众号:Python 实用宝典
# 2021-05-04
from shodan import Shodan

api = Shodan('你的API KEY')

def search_shodan(keyword):
    # 调用搜索接口
    result = api.search(keyword)

    # 显示所有IP
    for service in result['matches']:
            print(service['ip_str'])

search_shodan("Hikvision-Webs")

结果如下:

可惜的是,普通API只能像这样搜索关键字,无法使用过滤条件如:Hikvision-Webs country:”US” 搜索美国内的所有Hikvision网站管理端。

如果你想要使用过滤条件,Shodan需要你升级API权限:

挺贵的,不过还好是一次性支付,永久使用。

4. Shodan 高级使用教程

Shodan 的用处当然不仅仅是在黑客攻防中,它还能用于统计。如果你想要了解哪些国家的使用这款摄像头的数量最多,可以使用 Facets 特性。

# 公众号:Python 实用宝典
# 2021-05-04
from shodan import Shodan

api = Shodan('你的API KEY')
def try_facets(query):
    FACETS = [
        'org',
        'domain',
        'port',
        'asn',
        ('country', 3),
    ]

    FACET_TITLES = {
        'org': 'Top 5 Organizations',
        'domain': 'Top 5 Domains',
        'port': 'Top 5 Ports',
        'asn': 'Top 5 Autonomous Systems',
        'country': 'Top 3 Countries',
    }

    try:
        # 使用 count() 方法可以不需要升级API,且比 search 方法更快。
        result = api.count(query, facets=FACETS)

        print('Shodan Summary Information')
        print('Query: %s' % query)
        print('Total Results: %s\n' % result['total'])

        # 显示每个要素的摘要
        for facet in result['facets']:
            print(FACET_TITLES[facet])

            for term in result['facets'][facet]:
                print('%s: %s' % (term['value'], term['count']))

    except Exception as e:
        print('Error: %s' % e)

try_facets("Hikvision-Webs")

得到结果如下:

从 Top 3 Countries 中可以看到,这款摄像头使用数量排名前三的国家分别是:美国、日本和德国。

没想到吧,Shodan居然还能用于产品分析。同样地原理,如果你把关键词改为”apache”,你可以知道目前哪些国家使用apache服务器数量最多,最普遍被使用的版本号是什么。

简而言之,Shodan是一个非常强大的搜索引擎,它在好人手里,能被发挥出巨大的潜能。如果Shodan落入坏人之手的话,那真是一个可怕的东西。

为了避免受到不必要的攻击,请大家及时检查所有联网设备的管理端的密码,如果有使用默认密码及弱口令,立即进行密码的更改,以保证服务的安全。

本文所有源代码可在 Python 实用宝典 公众号后台回复:shodan 下载。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 性能测试工具 Locust 极简入门

作者:dongfanger
来源:dongfanger
Locust是一款Python技术栈的开源的性能测试工具。Locust直译为蝗虫,寓意着它能产生蝗虫般成千上万的并发用户:
Locust并不小众,从它Github的Star数量就可见一斑:
截止文章写作时,一共15951Star。
Locust生态良好,它已在多家外企(包括世界500强)投入使用:
如此看来,Locust是非常值得学习和掌握的一款工具。
Python的魔力在于化繁为简,基于Python的Locust也能给仍然困惑于性能测试的我们带来启发。


1.Locust特点

  • 以纯Python方式编写用户脚本,提供极大自由度。

  • 用户脚本可以串行方式编写,Locust会通过轻量级进程/协程产生并发,无需自己做并发编程。

  • 并发量大,借助于gevent库,Locust能产生成千上万并发请求。

  • 开销小,Locust用户运行时开销很小。

  • 良好的Web UI对性能结果实时监测。

  • 能测任何系统任何协议,只需要写个client即可。

  • 开放REST API,尽情发挥。


2.安装Locust

需要Python版本3.6及以上。
执行pip命令:
$ pip install locust
验证安装成功:
$ locust -V
安装时会一并安装依赖库:
Installing collected packages: Werkzeug, pywin32, zope.event, greenlet, gevent, geventhttpclient, itsdangerous, flask, Flask-BasicAuth, ConfigArgParse, pyzmq, psutil, locust
能看出来flask为Locust提供了Web功能。


3.快速上手

使用Locust一般按照以下步骤进行:
  1. 编写Python用户脚本。

  2. 使用locust命令执行性能测试。

  3. (可选)通过Web界面监测结果。

示例代码如下,新建locustfile.py文件:
import time
from locust import HttpUser, task, between

class QuickstartUser(HttpUser):
wait_time = between(1, 2.5)

@task
def hello_world(self):
self.client.get(“/hello”)
self.client.get(“/world”)

@task(3)
def view_items(self):
for item_id in range(10):
self.client.get(f”/item?id={item_id}, name=“/item”)
time.sleep(1)

def on_start(self):
self.client.post(“/login”, json={“username”:“foo”, “password”:“bar”})
路径切换到locustfile.py文件所在目录,执行命令:
$ locust
也可以通过-f指定某个目录文件:
$ locust -f locust_files/my_locust_file.py
运行后,打开http://127.0.0.1:8089看到Web界面:
填写信息后,就能开始压测了。Web界面提供了结果统计数据:
和性能指标走势图:


4.脚本解析

示例脚本解析如下:
# Locust用户脚本就是Python模块
import time
from locust import HttpUser, task, between

# 类继承自HttpUser
class QuickstartUser(HttpUser):
# 每个模拟用户等待1~2.5秒
wait_time = between(1, 2.5)

# 被@task装饰的才会并发执行
@task
def hello_world(self):
# client属性是HttpSession实例,用来发送HTTP请求
self.client.get(“/hello”)
self.client.get(“/world”)

# 每个类只会有一个task被选中执行
# 3代表weight权重
# 权重越大越容易被选中执行
# view_items比hello_wolrd多3倍概率被选中执行
@task(3)
def view_items(self):
for item_id in range(10):
# name参数作用是把统计结果按同一名称进行分组
# 这里防止URL参数不同会产生10个不同记录不便于观察
# 把10个汇总成1个”/item”记录
self.client.get(f”/item?id={item_id}, name=“/item”)
time.sleep(1)

# 每个模拟用户开始运行时都会执行
def on_start(self):
self.client.post(“/login”, json={“username”:“foo”, “password”:“bar”})

小结

本文先了解了Locust的背景和生态,它是值得学习的,对于Python技术栈来说更加如此。接着介绍了使用pip命令安装Locust,其中发现顺带安装了flask,Locust的Web功能是flask提供的
然后给出了一段示例代码,按照步骤上手Locust。最后对示例代码进行了解析,浅尝辄止。locustfile实际上该怎么写呢?
参考资料:
https://locust.io/
https://docs.locust.io/en/stable/

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Birdseye 极其强大的Python调试工具

Birdseye是一个Python调试器,它在函数调用中记录表达式的值,并让你在函数**退出**后轻松查看它们。例如:

无论你如何运行或编辑代码,都可以使用Birdseye。只需要你安装好依赖:

pip install birdseye

并在代码函数上方添加 @eye 装饰器(如上所示),即可根据需要运行函数,并在浏览器中查看结果。

它还可以与一些常用工具集成在一起,如 Pycharm 和 Vscode,以提供更流畅的体验,后续我们会介绍如何将其与这些工具结合使用。

它不仅仅能够单步执行,还能在循环迭代中来回移动,并查看所选表达式的值如何变化:

通过 birdseye 你能很容易地知道哪些表达式引发了异常:

你也能够展开具体的数据结构和对象以查看其内容:

调用会按功能组织(文件组织)并进行时间排序进行显示,让你一目了然地看到发生了什么:

1.快速上手

首先,使用 pip 安装 birdseye :

pip install birdseye

然后,对需要进行调试的函数使用eye装饰器:

from birdseye import eye

@eye
def foo():

在你调用该函数完成后,在终端运行命令打开Birdseye的Web服务:

python -m birdseye

在浏览器打开 http://localhost:7777 就能看到需要调试的函数执行流程了。点击下图的按钮即可跳转到最新的函数调用。

2.Birdseye在Pycharm中集成调试

在 Pycharm 的 Settings 中,点击 Plugins 插件市场搜索 birdseye 点击 install 安装。

安装完成后重启Pycharm,就可以在 Pycharm 中使用 birdseye了:

默认情况下,该插件还可以为你自动运行Birdseye服务器,因此就不需要输入 python -m birdseye 那行命令了。

3.Birdseye在VSCode中集成调试

在VSCode中继承调试Birdseye也非常方便,点击左侧的扩展商店,在弹出框中输入搜索 birdseye,并点击 install 安装:

安装完成后,点击 F1 输入Birdseye,就能显示调试界面:

如果无法正常显示右侧调试界面,并提示未安装birdseye,但实际上你已经安装成功了,这一般是路径错误导致的,请在扩展设置中手动更改python路径为你安装了Birdseye的Python。

4.美中不足

Birdseye 是一个非常强大的调试工具,但我认为这还是有缺点可以改善的:

1.为了防止堆栈过大,每个迭代它最多只保留6个(前三、末三)元素:

因此如果你想看一些特殊元素值的执行情况,它可能不会如你所愿。

不过,不需要担心某些分支你调试不到,因为 birdseye 有个保险机制:如果一个表达式仅在某种特定情况下会被执行,那么执行时的元素也会被加入到可调试元素中

2.由于需要记录堆栈,程序会大大减慢速度,因此它绝对不适合上到生产环境。

3.每个函数调用,Birdseye 都需要收集许多数据,对于某些极其复杂的函数调用,可能会引发内存问题。

如果你不担心这三个缺点,而且希望能快速方便地看到函数中不同分支的执行情况,那么Birdseye就是你的不二之选。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pyforest !自动导入代码所需的 Python 库

今天给大家介绍一个懒人 Python 库:Pyforest,只用一行代码,就能导入所有的 Python 库(本地已经安装的)。

项目地址:https://github.com/8080labs/pyforest

/ 01 / 介绍

Python 因为有着成千上万个功能强大开源库,备受大家的欢迎

目前,通过 PyPl 可以导入超过 23.5 万个 Python 库,数量庞大

在大家平常的实践当中,一般都是需要导入多个库或者框架来执行任务

而且每当新建一个程序文件时,都需要根据自己的需求导入相关的库

如果是相同类型的任务,比如想做一个数据可视化的小项目,可能会一直使用到某个库

如此,反复编写同一条 import 语句,就算是复制粘贴,也会感觉到麻烦,这时 Pyforest 库就可以上场了

Pyforest 是一个开源的 Python 库,可以自动导入代码中使用到的 Python 库

在进行数据可视化的时候,一般都需要导入多个库,比如 pandas、numpy、matplotlib 等等

使用了 Pyforest,每个程序文件中就不需要导入相同的 Python 库,而且也不必使用确切的导入语句

比如下面这行代码,就可以省略掉

from sklearn.ensemble import RandomForestClassifier

在你使用 import 语句导入Pyforest 库后,你就可以直接使用所有的 Python 库

import pyforest

df = pd.read_csv(‘test.csv’)
print(df)

你使用的任何库都不需要使用 import 语句导入,Pyforest 会为你自动导入。

只有在代码中调用库或创建库的对象后,才会导入库。如果一个库没有被使用或调用,Pyforest 将不会导入它。

/ 02 / 使用

安装,使用以下命令安装 Pyforest

pip install pyforest -i https://pypi.tuna.tsinghua.edu.cn/simple

安装成功后,使用 import 语句导入它

现在,你可以直接使用相关的 Python 库,无需编写 import 导入

先以 jupiter notebook 为例,我们没有导入 pandas、seaborn 和 matplotlib 库,但是我们可以通过导入 Pyforest 库直接使用它们

读取数据,这个是国内棉花产量排行前三的省份,新疆全国第一(数据来源:国家统计局)

那么 Pyforest 可以导入所有库吗?

目前这个包包含了大部分流行的 Python 库,比如

pandas as pd
NumPy as np
matplotlob.pyplot as plt
seaborn as sns 

除了这些库之外,它还提供了一些辅助的 Python 库,如 os、tqdm、re 等

如果你想查看库列表,可以使用 dir(pyforest) 进行查看,内置的是 68 个库

import pyforest

print(len(dir(pyforest)))
for i in dir(pyforest):
    print(i)

————————-
68
GradientBoostingClassifier
GradientBoostingRegressor
LazyImport
OneHotEncoder
Path
RandomForestClassifier
RandomForestRegressor
SparkContext
TSNE
TfidfVectorizer

如果没有的话,可以进行自定义添加,在主目录中的文件写入 import 语句

示例如下

vim ~/.pyforest/user_imports.py

添加语句,此处便能在代码中使用 requests 这个库

# Add your imports here, line by line
# e.g
# import pandas as pd
# from pathlib import Path
# import re

import requests as req
~                                                                               
~                                                                                                                                                                                                      
“~/.pyforest/user_imports.py” 7L129C

这回我们在 PyCharm 中来实验一下。

发现 PyCharm 的自动补全的功能失效了,看来这个库还是比较适合 jupyter notebook(自动补全代码还可以使用)

除了上面这个地方可以自定义添加,还可以在库的 _import.py 文件中添加

此处以 Pyechars 为例,缩写为 chart

可视化代码如下

新疆棉花产量年年上升,其它省份年年下降…

最后 Pyforest 还提供了一些函数来了解库的使用情况

# 返回已导入并且正在使用的库列表
print(pyforest.active_imports())
——————————–
[‘import pandas as pd’‘import requests as req’‘import pyg2plot’]


# 返回pyforest中所有Python库的列表
print(pyforest.lazy_imports())
——————————–
[‘import glob’‘import numpy as np’‘import matplotlib.pyplot as plt’…]

只有代码中有使用到的库,程序才会 import 进去,否则不会导入的哦!

/ 03 / 总结

好了,到此本期的分享就结束了

使用 Pyforest 库有时候确实是可以节省一些时间,不过也是有弊端存在的。比如调试的时候(大型项目),可能会很痛苦,不知道是哪里来的库

所以建议大家,在一些独立的脚本程序中使用,效果应该还是不错的

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Redis适合做队列吗?详解三种Redis队列的使用方式

作者:Magic Kaito

来源:水滴与银弹

我经常听到很多人讨论,关于「把 Redis 当作队列来用是否合适」的问题。

有些人表示赞成,他们认为 Redis 很轻量,用作队列很方便。

也些人则反对,认为 Redis 会「丢」数据,最好还是用「专业」的队列中间件更稳妥。

究竟哪种方案更好呢?

这篇文章,我就和你聊一聊把 Redis 当作队列,究竟是否合适这个问题。

我会从简单到复杂,一步步带你梳理其中的细节,把这个问题真正的讲清楚。

看完这篇文章后,我希望你对这个问题你会有全新的认识。

在文章的最后,我还会告诉你关于「技术选型」的思路,文章有点长,希望你可以耐心读完。

从最简单的开始:List 队列

首先,我们先从最简单的场景开始讲起。

如果你的业务需求足够简单,想把 Redis 当作队列来使用,肯定最先想到的就是使用 List 这个数据类型。

因为 List 底层的实现就是一个「链表」,在头部和尾部操作元素,时间复杂度都是 O(1),这意味着它非常符合消息队列的模型。

如果把 List 当作队列,你可以这么来用。

生产者使用 LPUSH 发布消息:

127.0.0.1:6379> LPUSH queue msg1
(integer) 1
127.0.0.1:6379> LPUSH queue msg2
(integer) 2

消费者这一侧,使用 RPOP 拉取消息:

127.0.0.1:6379> RPOP queue
"msg1"
127.0.0.1:6379> RPOP queue
"msg2"

这个模型非常简单,也很容易理解。

但这里有个小问题,当队列中已经没有消息了,消费者在执行 RPOP 时,会返回 NULL。

127.0.0.1:6379> RPOP queue
(nil)   // 没消息了

而我们在编写消费者逻辑时,一般是一个「死循环」,这个逻辑需要不断地从队列中拉取消息进行处理,伪代码一般会这么写:

while true:
    msg = redis.rpop("queue")
    // 没有消息,继续循环
    if msg == null:
        continue
    // 处理消息
    handle(msg)

如果此时队列为空,那消费者依旧会频繁拉取消息,这会造成「CPU 空转」,不仅浪费 CPU 资源,还会对 Redis 造成压力。

怎么解决这个问题呢?

也很简单,当队列为空时,我们可以「休眠」一会,再去尝试拉取消息。代码可以修改成这样:

while true:
    msg = redis.rpop("queue")
    // 没有消息,休眠2s
    if msg == null:
        sleep(2)
        continue
    // 处理消息        
    handle(msg)

这就解决了 CPU 空转问题。

这个问题虽然解决了,但又带来另外一个问题:当消费者在休眠等待时,有新消息来了,那消费者处理新消息就会存在「延迟」。

假设设置的休眠时间是 2s,那新消息最多存在 2s 的延迟。

要想缩短这个延迟,只能减小休眠的时间。但休眠时间越小,又有可能引发 CPU 空转问题。

鱼和熊掌不可兼得。

那如何做,既能及时处理新消息,还能避免 CPU 空转呢?

Redis 是否存在这样一种机制:如果队列为空,消费者在拉取消息时就「阻塞等待」,一旦有新消息过来,就通知我的消费者立即处理新消息呢?

幸运的是,Redis 确实提供了「阻塞式」拉取消息的命令:BRPOP / BLPOP,这里的 B 指的是阻塞(Block)。

现在,你可以这样来拉取消息了:

while true:
    // 没消息阻塞等待,0表示不设置超时时间
    msg = redis.brpop("queue"0)
    if msg == null:
        continue
    // 处理消息
    handle(msg)

使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个「超时时间」,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 NULL。

这个方案不错,既兼顾了效率,还避免了 CPU 空转问题,一举两得。

注意:如果设置的超时时间太长,这个连接太久没有活跃过,可能会被 Redis Server 判定为无效连接,之后 Redis Server 会强制把这个客户端踢下线。所以,采用这种方案,客户端要有重连机制。

解决了消息处理不及时的问题,你可以再思考一下,这种队列模型,有什么缺点?

我们一起来分析一下:

  1. 不支持重复消费:消费者拉取消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费,即不支持多个消费者消费同一批数据
  2. 消息丢失:消费者拉取到消息后,如果发生异常宕机,那这条消息就丢失了

第一个问题是功能上的,使用 List 做消息队列,它仅仅支持最简单的,一组生产者对应一组消费者,不能满足多组生产者和消费者的业务场景。

第二个问题就比较棘手了,因为从 List 中 POP 一条消息出来后,这条消息就会立即从链表中删除了。也就是说,无论消费者是否处理成功,这条消息都没办法再次消费了。

这也意味着,如果消费者在处理消息时异常宕机,那这条消息就相当于丢失了。

针对这 2 个问题怎么解决呢?我们一个个来看。

发布/订阅模型:Pub/Sub

从名字就能看出来,这个模块是 Redis 专门是针对「发布/订阅」这种队列模型设计的。

它正好可以解决前面提到的第一个问题:重复消费。

即多组生产者、消费者的场景,我们来看它是如何做的。

Redis 提供了 PUBLISH / SUBSCRIBE 命令,来完成发布、订阅的操作。

假设你想开启 2 个消费者,同时消费同一批数据,就可以按照以下方式来实现。

首先,使用 SUBSCRIBE 命令,启动 2 个消费者,并「订阅」同一个队列。

// 2个消费者 都订阅一个队列
127.0.0.1:6379> SUBSCRIBE queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1

此时,2 个消费者都会被阻塞住,等待新消息的到来。

之后,再启动一个生产者,发布一条消息。

127.0.0.1:6379> PUBLISH queue msg1
(integer) 1

这时,2 个消费者就会解除阻塞,收到生产者发来的新消息。

127.0.0.1:6379> SUBSCRIBE queue
// 收到新消息
1) "message"
2) "queue"
3) "msg1"

看到了么,使用 Pub/Sub 这种方案,既支持阻塞式拉取消息,还很好地满足了多组消费者,消费同一批数据的业务需求。

除此之外,Pub/Sub 还提供了「匹配订阅」模式,允许消费者根据一定规则,订阅「多个」自己感兴趣的队列。

// 订阅符合规则的队列
127.0.0.1:6379> PSUBSCRIBE queue.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "queue.*"
3) (integer) 1

这里的消费者,订阅了 queue.* 相关的队列消息。

之后,生产者分别向 queue.p1 和 queue.p2 发布消息。

127.0.0.1:6379> PUBLISH queue.p1 msg1
(integer) 1
127.0.0.1:6379> PUBLISH queue.p2 msg2
(integer) 1

这时再看消费者,它就可以接收到这 2 个生产者的消息了。

127.0.0.1:6379> PSUBSCRIBE queue.*
Reading messages... (press Ctrl-C to quit)
...
// 来自queue.p1的消息
1) "pmessage"
2) "queue.*"
3) "queue.p1"
4) "msg1"

// 来自queue.p2的消息
1) "pmessage"
2) "queue.*"
3) "queue.p2"
4) "msg2"

我们可以看到,Pub/Sub 最大的优势就是,支持多组生产者、消费者处理消息。

讲完了它的优点,那它有什么缺点呢?

其实,Pub/Sub 最大问题是:丢数据

如果发生以下场景,就有可能导致数据丢失:

  1. 消费者下线
  2. Redis 宕机
  3. 消息堆积

究竟是怎么回事?

这其实与 Pub/Sub 的实现方式有很大关系。

Pub/Sub 在实现时非常简单,它没有基于任何数据类型,也没有做任何的数据存储,它只是单纯地为生产者、消费者建立「数据转发通道」,把符合规则的数据,从一端转发到另一端。

一个完整的发布、订阅消息处理流程是这样的:

  1. 消费者订阅指定队列,Redis 就会记录一个映射关系:队列->消费者
  2. 生产者向这个队列发布消息,那 Redis 就从映射关系中找出对应的消费者,把消息转发给它

看到了么,整个过程中,没有任何的数据存储,一切都是实时转发的。

这种设计方案,就导致了上面提到的那些问题。

例如,如果一个消费者异常挂掉了,它再重新上线后,只能接收新的消息,在下线期间生产者发布的消息,因为找不到消费者,都会被丢弃掉。

如果所有消费者都下线了,那生产者发布的消息,因为找不到任何一个消费者,也会全部「丢弃」。

所以,当你在使用 Pub/Sub 时,一定要注意:消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失。

这也是前面讲例子时,我们让消费者先订阅队列,之后才让生产者发布消息的原因。

另外,因为 Pub/Sub 没有基于任何数据类型实现,所以它也不具备「数据持久化」的能力。

也就是说,Pub/Sub 的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,Pub/Sub 的数据也会全部丢失。

最后,我们来看 Pub/Sub 在处理「消息积压」时,为什么也会丢数据?

当消费者的速度,跟不上生产者时,就会导致数据积压的情况发生。

如果采用 List 当作队列,消息积压时,会导致这个链表很长,最直接的影响就是,Redis 内存会持续增长,直到消费者把所有数据都从链表中取出。

但 Pub/Sub 的处理方式却不一样,当消息积压时,有可能会导致消费失败和消息丢失

这是怎么回事?

还是回到 Pub/Sub 的实现细节上来说。

每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个「缓冲区」,这个缓冲区其实就是一块内存。

当生产者发布消息时,Redis 先把消息写到对应消费者的缓冲区中。

之后,消费者不断地从缓冲区读取消息,处理消息。

但是,问题就出在这个缓冲区上。

因为这个缓冲区其实是有「上限」的(可配置),如果消费者拉取消息很慢,就会造成生产者发布到缓冲区的消息开始积压,缓冲区内存持续增长。

如果超过了缓冲区配置的上限,此时,Redis 就会「强制」把这个消费者踢下线。

这时消费者就会消费失败,也会丢失数据。

如果你有看过 Redis 的配置文件,可以看到这个缓冲区的默认配置:client-output-buffer-limit pubsub 32mb 8mb 60。

它的参数含义如下:

  • 32mb:缓冲区一旦超过 32MB,Redis 直接强制把消费者踢下线
  • 8mb + 60:缓冲区超过 8MB,并且持续 60 秒,Redis 也会把消费者踢下线

Pub/Sub 的这一点特点,是与 List 作队列差异比较大的。

从这里你应该可以看出,List 其实是属于「拉」模型,而 Pub/Sub 其实属于「推」模型

List 中的数据可以一直积压在内存中,消费者什么时候来「拉」都可以。

但 Pub/Sub 是把消息先「推」到消费者在 Redis Server 上的缓冲区中,然后等消费者再来取。

当生产、消费速度不匹配时,就会导致缓冲区的内存开始膨胀,Redis 为了控制缓冲区的上限,所以就有了上面讲到的,强制把消费者踢下线的机制。

好了,现在我们总结一下 Pub/Sub 的优缺点:

  1. 支持发布 / 订阅,支持多组生产者、消费者处理消息
  2. 消费者下线,数据会丢失
  3. 不支持数据持久化,Redis 宕机,数据也会丢失
  4. 消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失

有没有发现,除了第一个是优点之外,剩下的都是缺点。

所以,很多人看到 Pub/Sub 的特点后,觉得这个功能很「鸡肋」。

也正是以上原因,Pub/Sub 在实际的应用场景中用得并不多。

目前只有哨兵集群和 Redis 实例通信时,采用了 Pub/Sub 的方案,因为哨兵正好符合即时通讯的业务场景。

我们再来看一下,Pub/Sub 有没有解决,消息处理时异常宕机,无法再次消费的问题呢?

其实也不行,Pub/Sub 从缓冲区取走数据之后,数据就从 Redis 缓冲区删除了,消费者发生异常,自然也无法再次重新消费。

好,现在我们重新梳理一下,我们在使用消息队列时的需求。

当我们在使用一个消息队列时,希望它的功能如下:

  • 支持阻塞等待拉取消息
  • 支持发布 / 订阅模式
  • 消费失败,可重新消费,消息不丢失
  • 实例宕机,消息不丢失,数据可持久化
  • 消息可堆积

Redis 除了 List 和 Pub/Sub 之外,还有符合这些要求的数据类型吗?

其实,Redis 的作者也看到了以上这些问题,也一直在朝着这些方向努力着。

Redis 作者在开发 Redis 期间,还另外开发了一个开源项目 disque。

这个项目的定位,就是一个基于内存的分布式消息队列中间件。

但由于种种原因,这个项目一直不温不火。

终于,在 Redis 5.0 版本,作者把 disque 功能移植到了 Redis 中,并给它定义了一个新的数据类型:Stream

下面我们就来看看,它能符合上面提到的这些要求吗?

趋于成熟的队列:Stream

我们来看 Stream 是如何解决上面这些问题的。

我们依旧从简单到复杂,依次来看 Stream 在做消息队列时,是如何处理的?

首先,Stream 通过 XADD 和 XREAD 完成最简单的生产、消费模型:

  • XADD:发布消息
  • XREAD:读取消息

生产者发布 2 条消息:

// *表示让Redis自动生成消息ID
127.0.0.1:6379> XADD queue * name zhangsan
"1618469123380-0"
127.0.0.1:6379> XADD queue * name lisi
"1618469127777-0"

使用 XADD 命令发布消息,其中的「*」表示让 Redis 自动生成唯一的消息 ID。

这个消息 ID 的格式是「时间戳-自增序号」。

消费者拉取消息:

// 从开头读取5条消息,0-0表示从开头读取
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0
1) 1) "queue"
   2) 1) 1) "1618469123380-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618469127777-0"
         2) 1) "name"
            2) "lisi"

如果想继续拉取消息,需要传入上一条消息的 ID:

127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0
(nil)

没有消息,Redis 会返回 NULL。

以上就是 Stream 最简单的生产、消费。

这里不再重点介绍 Stream 命令的各种参数,我在例子中演示时,凡是大写的单词都是「固定」参数,凡是小写的单词,都是可以自己定义的,例如队列名、消息长度等等,下面的例子规则也是一样,为了方便你理解,这里有必要提醒一下。

下面我们来看,针对前面提到的消息队列要求,Stream 都是如何解决的?

1) Stream 是否支持「阻塞式」拉取消息?

可以的,在读取消息时,只需要增加 BLOCK 参数即可。

// BLOCK 0 表示阻塞等待,不设置超时时间
127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0

这时,消费者就会阻塞等待,直到生产者发布新的消息才会返回。

2) Stream 是否支持发布 / 订阅模式?

也没问题,Stream 通过以下命令完成发布订阅:

  • XGROUP:创建消费者组
  • XREADGROUP:在指定消费组下,开启消费者拉取消息

下面我们来看具体如何做?

首先,生产者依旧发布 2 条消息:

127.0.0.1:6379> XADD queue * name zhangsan
"1618470740565-0"
127.0.0.1:6379> XADD queue * name lisi
"1618470743793-0"

之后,我们想要开启 2 组消费者处理同一批数据,就需要创建 2 个消费者组:

// 创建消费者组1,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE queue group1 0-0
OK
// 创建消费者组2,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE queue group2 0-0
OK

消费者组创建好之后,我们可以给每个「消费者组」下面挂一个「消费者」,让它们分别处理同一批数据。

第一个消费组开始消费:

// group1的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
   2) 1) 1) "1618470740565-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618470743793-0"
         2) 1) "name"
            2) "lisi"

同样地,第二个消费组开始消费:

// group2的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
   2) 1) 1) "1618470740565-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618470743793-0"
         2) 1) "name"
            2) "lisi"

我们可以看到,这 2 组消费者,都可以获取同一批数据进行处理了。

这样一来,就达到了多组消费者「订阅」消费的目的。

3) 消息处理时异常,Stream 能否保证消息不丢失,重新消费?

除了上面拉取消息时用到了消息 ID,这里为了保证重新消费,也要用到这个消息 ID。

当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为「处理完成」。

// group1下的 1618472043089-0 消息已处理完成
127.0.0.1:6379> XACK queue group1 1618472043089-0

如果消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息。

待这组消费者重新上线后,Redis 就会把之前没有处理成功的数据,重新发给这个消费者。这样一来,即使消费者异常,也不会丢失数据了。

// 消费者重新上线,0-0表示重新拉取未ACK的消息
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS queue 0-0
// 之前没消费成功的数据,依旧可以重新消费
1) 1) "queue"
   2) 1) 1) "1618472043089-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618472045158-0"
         2) 1) "name"
            2) "lisi"

4) Stream 数据会写入到 RDB 和 AOF 做持久化吗?

Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。

我们只需要配置好持久化策略,这样的话,就算 Redis 宕机重启,Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。

5) 消息堆积时,Stream 是怎么处理的?

其实,当消息队列发生消息堆积时,一般只有 2 个解决方案:

  1. 生产者限流:避免消费者处理不及时,导致持续积压
  2. 丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息

而 Redis 在实现 Stream 时,采用了第 2 个方案。

在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。

// 队列长度最大10000
127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan
"1618473015018-0"

当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。

这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。

除了以上介绍到的命令,Stream 还支持查看消息长度(XLEN)、查看消费者状态(XINFO)等命令,使用也比较简单,你可以查询官方文档了解一下,这里就不过多介绍了。

好了,通过以上介绍,我们可以看到,Redis 的 Stream 几乎覆盖到了消息队列的各种场景,是不是觉得很完美?

既然它的功能这么强大,这是不是意味着,Redis 真的可以作为专业的消息队列中间件来使用呢?

但是还「差一点」,就算 Redis 能做到以上这些,也只是「趋近于」专业的消息队列。

原因在于 Redis 本身的一些问题,如果把其定位成消息队列,还是有些欠缺的。

到这里,就不得不把 Redis 与专业的队列中间件做对比了。

下面我们就来看一下,Redis 在作队列时,到底还有哪些欠缺?

与专业的消息队列对比

其实,一个专业的消息队列,必须要做到两大块:

  1. 消息不丢
  2. 消息可堆积

前面我们讨论的重点,很大篇幅围绕的是第一点展开的。

这里我们换个角度,从一个消息队列的「使用模型」来分析一下,怎么做,才能保证数据不丢?

使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者

消息是否会发生丢失,其重点也就在于以下 3 个环节:

  1. 生产者会不会丢消息?
  2. 消费者会不会丢消息?
  3. 队列中间件会不会丢消息?

1) 生产者会不会丢消息?

当生产者在发布消息时,可能发生以下异常情况:

  1. 消息没发出去:网络故障或其它问题导致发布失败,中间件直接返回失败
  2. 不确定是否发布成功:网络问题导致发布超时,可能数据已发送成功,但读取响应结果超时了

如果是情况 1,消息根本没发出去,那么重新发一次就好了。

如果是情况 2,生产者没办法知道消息到底有没有发成功?所以,为了避免消息丢失,它也只能继续重试,直到发布成功为止。

生产者一般会设定一个最大重试次数,超过上限依旧失败,需要记录日志报警处理。

也就是说,生产者为了避免消息丢失,只能采用失败重试的方式来处理。

但发现没有?这也意味着消息可能会重复发送。

是的,在使用消息队列时,要保证消息不丢,宁可重发,也不能丢弃。

那消费者这边,就需要多做一些逻辑了。

对于敏感业务,当消费者收到重复数据数据时,要设计幂等逻辑,保证业务的正确性。

从这个角度来看,生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。

所以,无论是 Redis 还是专业的队列中间件,生产者在这一点上都是可以保证消息不丢的。

2) 消费者会不会丢消息?

这种情况就是我们前面提到的,消费者拿到消息后,还没处理完成,就异常宕机了,那消费者还能否重新消费失败的消息?

要解决这个问题,消费者在处理完消息后,必须「告知」队列中间件,队列中间件才会把标记已处理,否则仍旧把这些数据发给消费者。

这种方案需要消费者和中间件互相配合,才能保证消费者这一侧的消息不丢。

无论是 Redis 的 Stream,还是专业的队列中间件,例如 RabbitMQ、Kafka,其实都是这么做的。

所以,从这个角度来看,Redis 也是合格的。

3) 队列中间件会不会丢消息?

前面 2 个问题都比较好处理,只要客户端和服务端配合好,就能保证生产端、消费端都不丢消息。

但是,如果队列中间件本身就不可靠呢?

毕竟生产者和消费这都依赖它,如果它不可靠,那么生产者和消费者无论怎么做,都无法保证数据不丢。

在这个方面,Redis 其实没有达到要求。

Redis 在以下 2 个场景下,都会导致数据丢失。

  1. AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能
  2. 主从复制也是异步的,主从切换时,也存在丢失数据的可能(从库还未同步完成主库发来的数据,就被提成主库)

基于以上原因我们可以看到,Redis 本身的无法保证严格的数据完整性

所以,如果把 Redis 当做消息队列,在这方面是有可能导致数据丢失的。

再来看那些专业的消息队列中间件是如何解决这个问题的?

像 RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时,一般是部署一个集群,生产者在发布消息时,队列中间件通常会写「多个节点」,以此保证消息的完整性。这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。

也正因为如此,RabbitMQ、Kafka在设计时也更复杂。毕竟,它们是专门针对队列场景设计的。

但 Redis 的定位则不同,它的定位更多是当作缓存来用,它们两者在这个方面肯定是存在差异的。

最后,我们来看消息积压怎么办?

4) 消息积压怎么办?

因为 Redis 的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。

所以,Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。

但 Kafka、RabbitMQ 这类消息队列就不一样了,它们的数据都会存储在磁盘上,磁盘的成本要比内存小得多,当消息积压时,无非就是多占用一些磁盘空间,相比于内存,在面对积压时也会更加「坦然」。

综上,我们可以看到,把 Redis 当作队列来使用时,始终面临的 2 个问题:

  1. Redis 本身可能会丢数据
  2. 面对消息积压,Redis 内存资源紧张

到这里,Redis 是否可以用作队列,我想这个答案你应该会比较清晰了。

如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。

而且,Redis 相比于 Kafka、RabbitMQ,部署和运维也更加轻量。

如果你的业务场景对于数据丢失非常敏感,而且写入量非常大,消息积压时会占用很多的机器资源,那么我建议你使用专业的消息队列中间件。

总结

好了,总结一下。这篇文章我们从「Redis 能否用作队列」这个角度出发,介绍了 List、Pub/Sub、Stream 在做队列的使用方式,以及它们各自的优劣。

之后又把 Redis 和专业的消息队列中间件做对比,发现 Redis 的不足之处。

最后,我们得出 Redis 做队列的合适场景。

这里我也列了一个表格,总结了它们各自的优缺点。

后记

最后,我想和你再聊一聊关于「技术方案选型」的问题。

你应该也看到了,这篇文章虽然始于 Redis,但并不止于 Redis。

我们在分析 Redis 细节时,一直在提出问题,然后寻找更好的解决方案,在文章最后,又聊到一个专业的消息队列应该怎么做。

其实,我们在讨论技术选型时,就是一个关于如何取舍的问题。

而这里我想传达给你的信息是,在面对技术选型时,不要不经过思考就觉得哪个方案好,哪个方案不好

你需要根据具体场景具体分析,这里我把这个分析过程分为 2 个层面:

  1. 业务功能角度
  2. 技术资源角度

这篇文章所讲到的内容,都是以业务功能角度出发做决策的。

但这里的第二点,从技术资源角度出发,其实也很重要。

技术资源的角度是说,你所处的公司环境、技术资源能否匹配这些技术方案

这个怎么解释呢?

简单来讲,就是你所在的公司、团队,是否有匹配的资源能 hold 住这些技术方案。

我们都知道 Kafka、RabbitMQ 是非常专业的消息中间件,但它们的部署和运维,相比于 Redis 来说,也会更复杂一些。

如果你在一个大公司,公司本身就有优秀的运维团队,那么使用这些中间件肯定没问题,因为有足够优秀的人能 hold 住这些中间件,公司也会投入人力和时间在这个方向上。

但如果你是在一个初创公司,业务正处在快速发展期,暂时没有能 hold 住这些中间件的团队和人,如果贸然使用这些组件,当发生故障时,排查问题也会变得很困难,甚至会阻碍业务的发展。

而这种情形下,如果公司的技术人员对于 Redis 都很熟,综合评估来看,Redis 也基本可以满足业务 90% 的需求,那当下选择 Redis 未必不是一个好的决策。

所以,做技术选型不只是技术问题,还与人、团队、管理、组织结构有关

也正是因为这些原因,当你在和别人讨论技术选型问题时,你会发现每个公司的做法都不相同。

毕竟每个公司所处的环境和文化不一样,做出的决策当然就会各有差异。

如果你不了解这其中的逻辑,那在做技术选型时,只会趋于表面现象,无法深入到问题根源。

而一旦你理解了这个逻辑,那么你在看待这个问题时,不仅对于技术会有更加深刻认识,对技术资源和人的把握,也会更加清晰。

希望你以后在做技术选型时,能够把这些因素也考虑在内,这对你的技术成长之路也是非常有帮助的。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

最全总结!一张图整理了 Python 所有内置异常

 

在编写程序时,可能会经常报出一些异常,很大一方面原因是自己的疏忽大意导致程序给出错误信息,另一方面是因为有些异常是程序运行时不可避免的,比如:在爬虫时可能有几个网页的结构不一致,这时两种结构的网页用同一套代码就会出错

所以,我们就需要捕获出现的异常,以防止程序因为错误信息而终止运行

Python 有很多的内置异常,也就是说 Python 开发者提前考虑到了用户编程过程中可能会出现这类错误,所以制造了这些内置异常可以快速准确向用户反馈出错信息帮助找出代码中的 Bug

Python 官方文档中也给出了所有内置异常及触发条件,为了更好的阅读体验,我把所有异常及触发条件整理成了一张思维导图:

文末附有高清版本的获取方式

伙伴们可以直接划至文末取图,下面针对几个常见的异常单独介绍一下,通过举例深入了解在什么条件下会触发哪一种异常。

1、SyntaxError

SyntaxError 主要是 Python 语法发生了错误,比如少个冒号、多个引号之类的,编程时稍微疏忽大意一下就会出错,应该是最常见的一种异常错误了

In [1]: While True print(‘1’)
  File “<ipython-input-1-8ebf67bb4c2b>”, line 1
    While True print(‘1’)
          ^
SyntaxError: invalid syntax

2、TypeError

TypeError 是类型错误,也就是说将某个操作或功能应用于不合适类型的对象时引发,比如整型与字符型进行加减法、在两个列表之间进行相减操作等等

In [8]: a = [1,2];b = [2,3]
In [9]: a-b
—————————————————————————
TypeError                                 Traceback (most recent call last)
<ipython-input-9-5ae0619f8fe1> in <module>
—-> 1 a-b

TypeError: unsupported operand type(s) for -: ‘list’ and ‘list’

3、IndexError

IndexError 是指索引出现了错误,比如最常见下标索引超出了序列边界,比如当某个序列 m 只有三个元素,却试图访问 m[4]

In [16]: m = [1,2,3]
In [17]: m[4]
—————————————————————————
IndexError                                Traceback (most recent call last)
<ipython-input-17-94e0dfab3ff6> in <module>
—-> 1 m[4]

IndexError: list index out of range

4、KeyError

KeyError 是关键字错误,这个异常主要发生在字典中,比如当用户试图访问一个字典中不存在的键时会被引发

In [18]: dict_ = {‘1’:‘yi’,‘2’:‘er’}
In [19]: dict_[‘3’]
—————————————————————————
KeyError                                  Traceback (most recent call last)
<ipython-input-19-c2e43847635f> in <module>
—-> 1 dict_[‘3’]

KeyError: ‘3’

5、ValueError

ValueError 为值错误,当用户传入一个调用者不期望的值时会引发,即使这个值的类型是正确的,比如想获取一个列表中某个不存在值的索引

In [22]: n = [1,2,3]
In [23]: n.index(4)
—————————————————————————
ValueError                                Traceback (most recent call last)
<ipython-input-23-9a1887cf29d7> in <module>
—-> 1 n.index(4)

ValueError: 4 is not in list

6、AttributeError

AttributeError 是属性错误,当用户试图访问一个对象不存在的属性时会引发,比如列表有 index 方法,而字典却没有,所以对一个字典对象调用该方法就会引发该异常

In [25]: dict_ = {‘1’:‘yi’,‘2’:‘er’}
In [26]: dict_.index(‘1’)
—————————————————————————
AttributeError                            Traceback (most recent call last)
<ipython-input-26-516844ad2563> in <module>
—-> 1 dict_.index(‘1’)

AttributeError: ‘dict’ object has no attribute ‘index’

7、NameError

NameError 是指变量名称发生错误,比如用户试图调用一个还未被赋值或初始化的变量时会被触发

In [27]: print(list_)
—————————————————————————
NameError                                 Traceback (most recent call last)
<ipython-input-27-87ebf02ffcab> in <module>
—-> 1 print(list_)

NameError: name ‘list_’ is not defined

8、FileNotFoundError

FileNotFoundError 为打开文件错误,当用户试图以读取方式打开一个不存在的文件时引发

In [29]: fb = open(‘./list’,‘r’)
—————————————————————————
FileNotFoundError                         Traceback (most recent call last)
<ipython-input-29-1b65fe5400ea> in <module>
—-> 1 fb = open(‘./list’,‘r’)

FileNotFoundError: [Errno 2] No such file or directory: ‘./list’

9、StopIteration

StopIteration 为迭代器错误,当访问至迭代器最后一个值时仍然继续访问,就会引发这种异常,提醒用户迭代器中已经没有值可供访问了

In [30]: list1 = [1,2]
In [31]: list2 = iter(list1)
In [33]: next(list2)
Out[33]: 1

In [34]: next(list2)
Out[34]: 2

In [35]: next(list2)
—————————————————————————
StopIteration                             Traceback (most recent call last)
<ipython-input-35-5a5a8526e73b> in <module>
—-> 1 next(list2)

10、AssertionError

AssertionError 为断言错误,当用户利用断言语句检测异常时,如果断言语句检测的表达式为假,则会引发这种异常

In [45]: list3 = [1,2]

In [46]: assert len(list3)>2
—————————————————————————
AssertionError                            Traceback (most recent call last)
<ipython-input-46-ffd051e2ba94> in <module>
—-> 1 assert len(list3)>2

AssertionError:

上面这些异常应该是平时编程中遇见频率比较高的一部分,完整的还是要看上文的思维导图或者查阅官方文档,当然除此之外,Python 也支持用户根据自己的需求自定义异常,这里就不再过多概述了。

对于异常的处理 Python 也有着比较强大的功能,比如可以捕获异常,主动抛出异常等等,主要有下面几种方式:

  • 1.try … except 结构语句捕获
  • 2.try … except … finally 结构语句捕获
  • 3.try … except … else 结构语句捕获
  • 4.raise关键字主动抛出异常
  • 5.try … raise … except 触发异常
  • 6.assert断言语句
  • 7.traceback模块跟踪查看异常

除了已经下载好的思维导图,也有一份在线版思维导图,我是用百度脑图绘制的

如果你觉得导图有哪部分不合理的话,可以根据自己的想法在网页端在线编辑,左下角阅读原文是参考的官方文档链接

本文转自AirPython.

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

数据分析实战教程|Pandas处理数据太慢,来试试Polars吧!

很多人在学习数据分析的时候,肯定都会用到Pandas这个库,非常的实用!

从创建数据到读取各种格式的文件(text、csv、json),或者对数据进行切片和分割组合多个数据源,Pandas都能够很好的满足。

Pandas最初发布于2008年,使用Python、Cython和C编写的。是一个超级强大、快速易于使用的Python库,用于数据分析和处理。

当然Pandas也是有不足之处的,比如不具备多处理器,处理较大的数据集速度很慢。

今天,小F就给大家介绍一个新兴的Python库——Polars。

使用语法和Pandas差不多,处理数据的速度却比Pandas快了不少。

一个是大熊猫,一个是北极熊~

GitHub地址:https://github.com/ritchie46/polars

使用文档:https://ritchie46.github.io/polars-book/

Polars是通过Rust编写的一个库,Polars的内存模型是基于Apache Arrow。

Polars存在两种API,一种是Eager API,另一种则是Lazy API。

其中Eager API和Pandas的使用类似,语法差不太多,立即执行就能产生结果。

而Lazy API就像Spark,首先将查询转换为逻辑计划,然后对计划进行重组优化,以减少执行时间和内存使用。

安装Polars,使用百度pip源。

# 安装polars
pip install polars -i https://mirror.baidu.com/pypi/simple/

安装成功后,开始测试,比较Pandas和Polars处理数据的情况。

使用某网站注册用户的用户名数据进行分析,包含约2600万个用户名的CSV文件。

import pandas as pd

df = pd.read_csv(‘users.csv’)
print(df)

数据情况如下。

此外还使用了一个自己创建的CSV文件,用以数据整合测试。

import pandas as pd

df = pd.read_csv(‘fake_user.csv’)
print(df)

得到结果如下。

首先比较一下两个库的排序算法耗时。

import timeit
import pandas as pd

start = timeit.default_timer()

df = pd.read_csv(‘users.csv’)
df.sort_values(‘n’, ascending=False)
stop = timeit.default_timer()

print(‘Time: ‘, stop – start)

————————-
Time:  27.555776743218303

可以看到使用Pandas对数据进行排序,花费了大约28s。

import timeit
import polars as pl

start = timeit.default_timer()

df = pl.read_csv(‘users.csv’)
df.sort(by_column=‘n’, reverse=True)
stop = timeit.default_timer()

print(‘Time: ‘, stop – start)

———————–
Time:  9.924110282212496

Polars只花费了约10s,这意味着Polars比Pandas快了2.7倍。

下面,我们来试试数据整合的效果,纵向连接。

import timeit
import pandas as pd

start = timeit.default_timer()

df_users = pd.read_csv(‘users.csv’)
df_fake = pd.read_csv(‘fake_user.csv’)
df_users.append(df_fake, ignore_index=True)
stop = timeit.default_timer()

print(‘Time: ‘, stop – start)

————————
Time:  15.556222308427095

使用Pandas耗时15s。

import timeit
import polars as pl

start = timeit.default_timer()

df_users = pl.read_csv(‘users.csv’)
df_fake = pl.read_csv(‘fake_user.csv’)
df_users.vstack(df_fake)
stop = timeit.default_timer()

print(‘Time: ‘, stop – start)

———————–
Time:  3.475433263927698

Polars居然最使用了约3.5s,这里Polars比Pandas快了4.5倍。

通过上面的比较,Polars在处理速度上表现得相当不错。

可以是大家在未来处理数据时,另一种选择~

当然,Pandas目前历时12年,已经形成了很成熟的生态,支持很多其它的数据分析库。

Polars则是一个较新的库,不足的地方还有很多。

如果你的数据集对于Pandas来说太大,对于Spark来说太小,那么Polars便是你可以考虑的一个选择。

本文转载自公众号【法纳斯特】

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 实战教程 | 轻松实现APP自动化记账

前情回顾

不知道大家有没有手动记账的习惯,我大概从大学开始就坚持记账,中途也换过几个账本APP。目前使用的是圈子账本 ,它的记账界面如下图所示:

再说说我现在的情况,毕业之后支出越来越多越琐碎,每月的账单多到再手动记账有些过于浪费时间了。

不过有几点让我注意到了,似乎可以实现自动化记账:

一是我目前支出首选信用卡(支付宝、微信也绑定信用卡),几乎全部支出都在这里;

二是圈子账本可以通过上传模板文件来直接上传账单,现在也支持支付宝账单了;

三是我的支出类别比较单一,主要就下面几种:

折中方案

根据上面的几点,我搞了个折中的方案,并一直用到现在。

就是电脑登录信用卡官网,手动复制或者下载账单。

然后使用python调整成账本官网支持的格式,导出成excel格式,直接上传至官网。

下面给大家对比一下操作前和操作后的格式:

信用卡里的账单:

官网规定格式:

操作实战

先手动复制账单到excel里,先命名为测试数据.xlsx

然后我们开始使用python处理,导入数据

df = pd.read_excel('测试数据.xlsx',header = None)
df.head(6)

👆每隔三行其实是一条数据,所以我们要跳行提取数据

df2 = pd.DataFrame(columns=['日期','时间','入账金额','交易说明'])

df2['日期'] = df.iloc[[ i+1 for i in range(0,len(df),3)],[0]].reset_index()[0]
df2['时间'] = df.iloc[[ i+2 for i in range(0,len(df),3)],[0]].reset_index(drop=True)
df2['入账金额'] = df.iloc[[ i+1 for i in range(0,len(df),3)],[1]].reset_index(drop=True)
df2['交易说明'] = df.iloc[[ i+2 for i in range(0,len(df),3)],[1]].reset_index(drop=True)

创建了一个df2,并从df中隔行提取数据,结果如下

调整格式

下一步调整格式,先参考目标格式要求:

df2['时间'] = df2['日期'].apply(lambda x : str(x).replace('00:00:00','')) + df2['时间'].apply(lambda x : str(x)[:-3])
df2['入账金额'] = df2['入账金额'].str.lstrip('入账金额:¥')
df2['交易说明'] = df2['交易说明'].str.lstrip('交易说明:财付通公司-')
df2 = df2.drop(columns = '日期')

这样我们就调整好了时间入账金额(金额)交易说明(备注),还剩下一个关键的值就是类别,其实我自己的消费类别没几个,可以简单的利用交易说明判断类别,无法分辨的类别归为其他。

def fenlei(comment):
    if '美团' in comment or '拉扎斯'in comment:
        data = "餐饮"
    elif '花小猪'in comment or '滴滴'in comment:
        data = "交通"
    elif '燃气'in comment or '电费'in comment:
        data = "水电燃气"
    elif '拼多多'in comment:
        data = "生活用品"
    else:
        data = "其他"
    return data

利用上面的函数,就可以统计出类别这一列的值。

其中拉扎斯指的是饿了么,大家查一下自己账单就知道了。至于其他的种类,大家根据自己实际情况改改就可。

另外再添加另外两个固定列,就齐了。

df2['分类'] = df2.apply(lambda x :fenlei(x['交易说明']), axis=1)
df2 = df2[df2['入账金额'].astype(float) > 0]
df2['类型'] = '支出'
df2['账户'] = '交通银行'
df2.head()

👆这里面我还筛选了只大于0的入账金额,这是为了排除还款记录。

注:还款记录在信用卡账单里是负值

最后结果如下图所示:

导出数据

由于这次导出数据要固定格式,所以使用了openpyxl来向官网模板中直接写入,这样导入比较规范。

workbook = load_workbook(filename="朱小五.xlsx")
sheet = workbook.active
df2 = df2.iloc[:,[4,0,3,1,5,2]]
# 先删除第4行之后的旧数据,预计1000行完全够用
sheet.delete_rows(idx=2, amount=1000)
# 然后在进行添加数据
for row in df2.values.tolist():
    sheet.append(row)
    print(row)
print("已经保存到文件中")
workbook.save(filename="朱小五.xlsx")
workbook.close()

打开朱小五.xlsx,查看结果

没什么问题,将Excel导入账本官网中

完美导入
再打开手机记账APP
发现账单已经安安静静地躺在账本里啦!

本文转自快学Python.

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典