分类目录归档:性能优化

Isort 自动整理”import”的超实用工具详细教程

isort 是一个Python的实用程序/库,它会按字母顺序对导入(import)的库进行排序,并自动分组。它提供多种使用方式,包括命令行、Python调用等。

它基于Python 3.6+实现,但也支持格式化Python 2代码。

在使用 isort 格式化你的 import 之前,你的代码可能是长这样的:

from my_lib import Object
import os
from my_lib import Object3
from my_lib import Object2
import sys
from third_party import lib15, lib1, lib2, lib3, lib4, lib5, lib6, lib7, lib8, lib9, lib10, lib11, lib12, lib13, lib14
import sys
from __future__ import absolute_import
from third_party import lib3
print("Hey")
print("yo")

使用 isort 格式化后的代码是这样的:

from __future__ import absolute_import import os
import sys from third_party import (lib1, lib2, lib3, lib4, lib5, lib6, lib7, lib8,
                        lib9, lib10, lib11, lib12, lib13, lib14, lib15)

from my_lib import Object, Object2, Object3 
print("Hey")
print("yo")

​杂乱无章的格式瞬间变得井然有序,可见这是一款多么优秀的整理工具,下面就来介绍这个工具的安装及使用过程,及进阶用法。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install isort

如果你需要让他支持对 requirements.txt 的整理,请这样安装:

pip install isort[requirements_deprecated_finder]

2.使用 isort 整理你的python引用

isort 有2种使用方法,一种是从命令行直接针对py文件进行整理、另一种是在Python内导入 isort 进行整理。

命令行整理

要在特定文件上运行 isort,请在命令行执行以下操作:

isort mypythonfile.py mypythonfile2.py
# 或
python -m isort mypythonfile.py mypythonfile2.py

要对本文件夹递归进行isort整理,请执行以下操作:

isort .
# 或
python -m isort .

要查看更改建议的而不直接应用它们,请执行以下操作:

isort mypythonfile.py --diff

如果你要对项目自动运行isort,但是希望仅在未引入语法错误的情况下应用更改:

isort --atomic .

(注意:这在默认情况下是禁用的,因为它阻止了 isort 去整理不同版本的Python代码。)

从Python内部

import isort
isort.file("pythonfile.py")

或者:

import isort
sorted_code = isort.code("import b\nimport a\n")

3. 智能平衡格式化

从 isort 3.1.0 开始,添加了对平衡多行导入的支持。启用此选项后,isort 将动态地将导入长度更改为生成最平衡网格的长度,同时保持低于定义的最大导入长度。

开启了平衡导入的格式化:

from __future__ import (absolute_import, division,
                        print_function, unicode_literals)

未开启平衡的格式化:

from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

要启用此设置, 在你的配置设置 balanced_wrapping=True 或 通过命令行添加 -e 参数执行整理。

4.跳过某个import

要使 isort 忽略单个 import,只需在包含文本的导入行的末尾添加注释 isort:skip,如下:

import module  # isort:skip

或者:

from xyz import (abc,  # isort:skip
                 yo,
                 hey)

要使 isort 跳过整个文件,只需添加 isort:skip_file 到文件的开头注释中:

""" 
my_module.py
Best module ever

isort:skip_file
"""

import b
import a

这个工具还是相当方便的,尤其是针对一些杂乱无章、多年沉淀下来的项目代码的 import 进行整理的时候,它会变得非常香。有需要的小伙伴可以赶快试一下。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

干货源码剖析!详解 Celery Beat 实现原理

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,它是一个专注于实时处理的任务队列,同时也支持任务调度。

为了讲解 Celery Beat 的周期调度机制及实现原理,我们会基于Django从制作一个简单的周期任务开始,然后一步一步拆解 Celery Beat 的源代码。

相关前置应用知识,可以阅读以下文章:

1.Django Celery 异步与定时任务实战教程
2.Python Celery 异步快速下载股票数据

1.Celery 简单周期任务示例

在 celery_app.tasks.py 中添加如下任务:

@shared_task
def pythondict_task():
    print("pythondict_task")

在 django.celery.py 文件中添加如下配置,

from celery_django import settings
from datetime import timedelta


app.autodiscover_tasks(lambda : settings.INSTALLED_APPS)

CELERYBEAT_SCHEDULE = {
    'pythondict_task': {
        'task': 'celery_app.tasks.pythondict_task',
        'schedule': timedelta(seconds=3),
    },
}

app.conf.update(CELERYBEAT_SCHEDULE=CELERYBEAT_SCHEDULE)

至此,配置完成,此时,先启动 Celery Beat 定时任务命令:

celery beat -A celery_django -S django

然后打开第二个终端进程启动消费者:

celery -A celery_django worker 

此时在worker的终端上就会输出类似如下的信息:

    [2021-07-11 16:34:11,546: WARNING/PoolWorker-3] pythondict_task
    [2021-07-11 16:34:11,550: WARNING/PoolWorker-4] pythondict_task
    [2021-07-11 16:34:11,551: WARNING/PoolWorker-2] pythondict_task
    [2021-07-11 16:34:11,560: WARNING/PoolWorker-1] pythondict_task

看到结果正常输出,说明任务成功定时执行。

2.源码剖析

为了明白 Celery Beat 是如何实现周期任务调度的,我们需要从 Celery 源码入手。

当你执行 Celery Beat 启动命令的时候,到底发生了什么?

celery beat -A celery_django -S django

当你执行这个命令的时候,Celery/bin/celery.py 中的 CeleryCommand 类接收到命令后,会选择 beat 对应的类执行如下代码:

# Python 实用宝典
# https://pythondict.com

from celery.bin.beat import beat

class CeleryCommand(Command):
    commands = {
        # ...
        'beat': beat,
        # ...
    }
    # ...
    def execute(self, command, argv=None):
        try:
            cls = self.commands[command]
        except KeyError:
            cls, argv = self.commands['help'], ['help']
        cls = self.commands.get(command) or self.commands['help']
        try:
            return cls(
                app=self.app, on_error=self.on_error,
                no_color=self.no_color, quiet=self.quiet,
                on_usage_error=partial(self.on_usage_error, command=command),
            ).run_from_argv(self.prog_name, argv[1:], command=argv[0])
        except self.UsageError as exc:
            self.on_usage_error(exc)
            return exc.status
        except self.Error as exc:
            self.on_error(exc)
            return exc.status

此时cls对应的是beat类,通过查看位于bin/beat.py中的 beat 类可知,该类只重写了run方法和add_arguments方法。

所以此时执行的 run_from_argv 方法是 beat 继承的 Command 的 run_from_argv 方法:

# Python 实用宝典
# https://pythondict.com

def run_from_argv(self, prog_name, argv=None, command=None):
    return self.handle_argv(prog_name, sys.argv if argv is None else argv, command)

该方法中会调用 Command 的 handle_argv 方法,而该方法在经过相关参数处理后会调用 self(*args, **options) 到 __call__ 函数:

    # Python 实用宝典
    # https://pythondict.com
    
    def handle_argv(self, prog_name, argv, command=None):
        """Parse command-line arguments from ``argv`` and dispatch
        to :meth:`run`.

        :param prog_name: The program name (``argv[0]``).
        :param argv: Command arguments.

        Exits with an error message if :attr:`supports_args` is disabled
        and ``argv`` contains positional arguments.

        """
        options, args = self.prepare_args(
            *self.parse_options(prog_name, argv, command))
        return self(*args, **options)

Command 类的 __call__函数:

    # Python 实用宝典
    # https://pythondict.com
    
    def __call__(self, *args, **kwargs):
        random.seed()  # maybe we were forked.
        self.verify_args(args)
        try:
            ret = self.run(*args, **kwargs)
            return ret if ret is not None else EX_OK
        except self.UsageError as exc:
            self.on_usage_error(exc)
            return exc.status
        except self.Error as exc:
            self.on_error(exc)
            return exc.status

可见,在该函数中会调用到run方法,此时调用的run方法就是beat类中重写的run方法,查看该方法:

# Python 实用宝典
# https://pythondict.com
    
class beat(Command):
    """Start the beat periodic task scheduler.

    Examples::

        celery beat -l info
        celery beat -s /var/run/celery/beat-schedule --detach
        celery beat -S djcelery.schedulers.DatabaseScheduler

    """
    doc = __doc__
    enable_config_from_cmdline = True
    supports_args = False

    def run(self, detach=False, logfile=None, pidfile=None, uid=None,
            gid=None, umask=None, working_directory=None, **kwargs):
        # 是否开启后台运行
        if not detach:
            maybe_drop_privileges(uid=uid, gid=gid)
        workdir = working_directory
        kwargs.pop('app', None)
        # 设定偏函数
        beat = partial(self.app.Beat,
                       logfile=logfile, pidfile=pidfile, **kwargs)

        if detach:
            with detached(logfile, pidfile, uid, gid, umask, workdir):
                return beat().run() # 后台运行
        else:
            return beat().run() # 立即运行

这里引用了偏函数的知识,偏函数就是从基函数创建一个新的带默认参数的函数,详细可见廖雪峰老师的介绍:
https://www.liaoxuefeng.com/wiki/1016959663602400/1017454145929440

可见,此时创建了app的Beat方法的偏函数,并通过 .run 函数执行启动 beat 进程,首先看看这个 beat 方法:

    # Python 实用宝典
    # https://pythondict.com
    @cached_property
    def Beat(self, **kwargs):
        # 导入celery.apps.beat:Beat类
        return self.subclass_with_self('celery.apps.beat:Beat')

可以看到此时就实例化了 celery.apps.beat 中的 Beat 类,并调用了该实例的 run 方法:

    # Python 实用宝典
    # https://pythondict.com
    def run(self):
        print(str(self.colored.cyan(
            'celery beat v{0} is starting.'.format(VERSION_BANNER))))
        # 初始化loader
        self.init_loader()
        # 设置进程
        self.set_process_title()
        # 开启任务调度
        self.start_scheduler()

init_loader 中,会导入默认的modules,此时会引入相关的定时任务,这些不是本文重点。我们重点看 start_scheduler 是如何开启任务调度的:

    # Python 实用宝典
    # https://pythondict.com
    def start_scheduler(self):
        c = self.colored
        if self.pidfile: # 是否设定了pid文件
            platforms.create_pidlock(self.pidfile)  # 创建pid文件
        # 初始化service
        beat = self.Service(app=self.app,
                            max_interval=self.max_interval,
                            scheduler_cls=self.scheduler_cls,
                            schedule_filename=self.schedule)
        
        # 打印启动信息
        print(str(c.blue('__    ', c.magenta('-'),
                  c.blue('    ... __   '), c.magenta('-'),
                  c.blue('        _\n'),
                  c.reset(self.startup_info(beat)))))
        # 开启日志
        self.setup_logging()
        if self.socket_timeout:
            logger.debug('Setting default socket timeout to %r',
                         self.socket_timeout)
            # 设置超时
            socket.setdefaulttimeout(self.socket_timeout)
        try:
            # 注册handler
            self.install_sync_handler(beat)
            # 开启beat
            beat.start()
        except Exception as exc:
            logger.critical('beat raised exception %s: %r',
                            exc.__class__, exc,
                            exc_info=True)

我们看下beat是如何开启的:

    # Python 实用宝典
    # https://pythondict.com
    def start(self, embedded_process=False, drift=-0.010):
        info('beat: Starting...')
        # 打印最大间隔时间
        debug('beat: Ticking with max interval->%s',
              humanize_seconds(self.scheduler.max_interval))
        
        # 通知注册该signal的函数
        signals.beat_init.send(sender=self)
        if embedded_process:
            signals.beat_embedded_init.send(sender=self)
            platforms.set_process_title('celery beat')

        try:
            while not self._is_shutdown.is_set():
                # 调用scheduler.tick()函数检查还剩多余时间
                interval = self.scheduler.tick()
                interval = interval + drift if interval else interval
                # 如果大于0
                if interval and interval > 0:
                    debug('beat: Waking up %s.',
                          humanize_seconds(interval, prefix='in '))
                    # 休眠
                    time.sleep(interval)
                    if self.scheduler.should_sync():
                        self.scheduler._do_sync()
        except (KeyboardInterrupt, SystemExit):
            self._is_shutdown.set()
        finally:
            self.sync()

这里重点看 self.scheduler.tick() 方法:

    # Python 实用宝典
    # https://pythondict.com
    def tick(self):
        """Run a tick, that is one iteration of the scheduler.

        Executes all due tasks.

        """
        remaining_times = []
        try:
            # 遍历每个周期任务设定
            for entry in values(self.schedule):
                # 下次运行时间
                next_time_to_run = self.maybe_due(entry, self.publisher)
                if next_time_to_run:
                    remaining_times.append(next_time_to_run)
        except RuntimeError:
            pass

        return min(remaining_times + [self.max_interval])

这里通过 self.schedule 拿到了所有存放在用 shelve 写入的 celerybeat-schedule 文件的定时任务,遍历所有定时任务,调用 self.maybe_due 方法:

    # Python 实用宝典
    # https://pythondict.com
    def maybe_due(self, entry, publisher=None):
        # 是否到达运行时间
        is_due, next_time_to_run = entry.is_due()

        if is_due:
            # 打印任务发送日志
            info('Scheduler: Sending due task %s (%s)', entry.name, entry.task)
            try:
                # 执行任务
                result = self.apply_async(entry, publisher=publisher)
            except Exception as exc:
                error('Message Error: %s\n%s',
                      exc, traceback.format_stack(), exc_info=True)
            else:
                debug('%s sent. id->%s', entry.task, result.id)
        return next_time_to_run

可以看到,此处会判断任务是否到达定时时间,如果是的话,会调用 apply_async 调用Worker执行任务。如果不是,则返回下次运行时间,让 Beat 进程进行 Sleep,减少进程资源消耗。

到此,我们就讲解完了 Celery Beat 在周期定时任务的检测调度机制,怎么样,小伙伴们有没有什么疑惑?可以在下方留言区留言一起讨论哦。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 高效流程编排引擎 bamboo-pipeline 实战教程

Bamboo-pipeline 是蓝鲸智云旗下SaaS标准运维的流程编排引擎。其具备以下特点:

  • 1. 多种流程模式:支持串行、并行,支持子流程,可以根据全局参数自动选择分支执行,节点失败处理机制可配置。
  • 2. 参数引擎:支持参数共享,支持参数替换。
  • 3. 可交互的任务执行:任务执行中可以随时暂停、继续、撤销,节点失败后可以重试、跳过。

本文需要涉及到 Django, Celery 的前置知识,如果你还不了解这两者,建议谷歌或百度搜索了解一下,或者阅读我之前的文章:

Django:

Python Django快速开发音乐高潮提取网(1)

Python Django快速开发音乐高潮提取网(2)

Python Django快速开发音乐高潮提取网(3)

Pycharm+Django 安装及配置指南

手把手Django+Vue前后端分离入门实战教程

Celery:

Python celery异步快速下载股票数据

Django Celery 异步与定时任务实战教程

网络IO谁更快?Python与Go请求速度对比

什么?Python Celery 也能调度Go worker?

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install bamboo-engine
pip install bamboo-pipeline
pip install django
pip install celery

2. 项目初始化

(选项一:无Django项目)如果你没有任何的现成Django项目,请按下面的流程初始化

由于 bamboo-pipeline 运行时基于 Django 实现,所以需要新建一个 Django 项目:

django-admin startproject easy_pipeline
cd easy_pipeline

在 easy_pipeline.settings.py 下添加如下配置:

from pipeline.eri.celery.queues import *
from celery import Celery

app = Celery("proj")

app.config_from_object("django.conf:settings")

INSTALLED_APPS = [
    ...
    "pipeline",
    "pipeline.engine",
    "pipeline.component_framework",
    "pipeline.eri",
    ...
]

在 easy_pipeline目录下初始化数据库:

python manage.py migrate

(选项二:有Django项目需要使用流程引擎)如果你有现成的PipeLine项目需要使用此流程引擎,请在项目的 settings.py 下添加如下配置:

from pipeline.eri.celery.queues import *
from celery import Celery

app = Celery("proj")

app.config_from_object("django.conf:settings")

INSTALLED_APPS = [
    ...
    "pipeline",
    "pipeline.engine",
    "pipeline.component_framework",
    "pipeline.eri",
    ...
]

然后重新执行migrate,生成pipeline相关的流程模型:

python manage.py migrate

migrate 执行完毕后会如下图所示:

由于是在原有项目上使用流程引擎,可能会遇到一些版本不匹配的问题,如果遇到报错,请排查解决或到蓝鲸官网上进行询问。

3. 简单的流程编排实战

首先在项目目录下启动 celery worker:

python manage.py celery worker -Q er_execute,er_schedule --pool=solo -l info

启动成功类似下图所示:

(注意)如果你是在你的原有Django项目上做改造,它并不一定能够顺利地启动成功,这是因为Pipeline使用了 Django 2.2.24,会存在许多版本不兼容的情况。如果遇到报错,请排查解决或到蓝鲸官网上进行询问。

在下面的例子中,我们将会创建并执行一个简单的流程:

3.1 创建流程APP

在 bamboo_pipeline 中,一个流程由多个组件组成,官方推荐使用APP统一管控组件:

python manage.py create_plugins_app big_calculator

该命令会在 Django 工程根目录下生成拥有以下目录结构的 APP:

big_calculator
├── __init__.py
├── components
│   ├── __init__.py
│   └── collections
│       ├── __init__.py
│       └── plugins.py
├── migrations
│   └── __init__.py
└── static
    └── big_calculator
        └── plugins.js

别忘了把新创建的这个插件添加到 Django 配置的 INSTALLED_APPS 中:

INSTALLED_APPS = (
    ...
    'big_calculator',
    ...
)

3.2 编写流程的Service原子

组件服务 Service 是组件的核心,Service 定义了组件被调用时执行的逻辑,下面让我们实现一个计算传入的参数 n 的阶乘,并把结果写到输出中的 Service,在 big_calculator/components/collections/plugins.py 中输入以下代码:

import math
from pipeline.core.flow.activity import Service


class FactorialCalculateService(Service):

    def execute(self, data, parent_data):
        """
        组件被调用时的执行逻辑
        :param data: 当前节点的数据对象
        :param parent_data: 该节点所属流程的数据对象
        :return:
        """
        n = data.get_one_of_inputs('n')
        if not isinstance(n, int):
            data.outputs.ex_data = 'n must be a integer!'
            return False

        data.outputs.factorial_of_n = math.factorial(n)
        return True

    def inputs_format(self):
        """
        组件所需的输入字段,每个字段都包含字段名、字段键、字段类型及是否必填的说明。
        :return:必须返回一个 InputItem 的数组,返回的这些信息能够用于确认该组件需要获取什么样的输入数据。
        """
        return [
            Service.InputItem(name='integer n', key='n', type='int', required=True)
        ]

    def outputs_format(self):
        """
        组件执行成功时输出的字段,每个字段都包含字段名、字段键及字段类型的说明
        :return: 必须返回一个 OutputItem 的数组, 便于在流程上下文或后续节点中进行引用
        """
        return [
            Service.OutputItem(name='factorial of n', key='factorial_of_n', type='int')
        ]

首先我们继承了 Service 基类,并实现了 execute() 和 outputs_format() 这两个方法,他们的作用如下:

  • execute:组件被调用时执行的逻辑。接收 data 和 parent_data 两个参数。
    其中,data 是当前节点的数据对象,这个数据对象存储了用户传递给当前节点的参数的值以及当前节点输出的值。parent_data 则是该节点所属流程的数据对象,通常会将一些全局使用的常量存储在该对象中,如当前流程的执行者、流程的开始时间等。
  • outputs_format:组件执行成功时输出的字段,每个字段都包含字段名、字段键及字段类型的说明。这个方法必须返回一个 OutputItem 的数组,返回的这些信息能够用于确认某个组件在执行成功时输出的数据,便于在流程上下文或后续节点中进行引用。
  • inputs_format:组件所需的输入字段,每个字段都包含字段名、字段键、字段类型及是否必填的说明。这个方法必须返回一个 InputItem 的数组,返回的这些信息能够用于确认某个组件需要获取什么样的输入数据。

下面我们来看一下 execute() 方法内部执行的逻辑,首先我们尝试从当前节点数据对象的输出中获取输入参数 n,如果获取到的参数不是一个 int 实例,那么我们会将异常信息写入到当前节点输出的 ex_data 字段中,这个字段是引擎内部的保留字段,节点执行失败时产生的异常信息都应该写入到该字段中。随后我们返回 False 代表组件本次执行失败,随后节点会进入失败状态:

n = data.get_one_of_inputs('n')
if not isinstance(n, int):
    data.outputs.ex_data = 'n must be a integer!'
    return False

若获取到的 n 是一个正常的 int,我们就调用 math.factorial() 函数来计算 n 的阶乘,计算完成后,我们会将结果写入到输出的 factorial_of_n 字段中,以供流程中的其他节点使用:

data.outputs.factorial_of_n = math.factorial(n)
return True

3.3 编写流程组件,绑定Service原子

完成 Service 的编写后,我们需要将其与一个 Component 绑定起来,才能够注册到组件库中,在 big_calculator\components\__init__.py 文件下添加如下的代码:

import logging
from pipeline.component_framework.component import Component
from big_calculator.components.collections.plugins import FactorialCalculateService

logger = logging.getLogger('celery')


class FactorialCalculateComponent(Component):
    name = 'FactorialCalculateComponent'
    code = 'fac_cal_comp'
    bound_service = FactorialCalculateService

我们定义了一个继承自基类 Component 的类 FactorialCalculateComponent,他拥有以下属性:

  • name:组件名。
  • code:组件代码,这个代码必须是全局唯一的。
  • bound_service:与该组件绑定的 Service

这样一来,我们就完成了一个流程原子的开发。

3.4 生成流程,测试刚编写的组件

在 big_calculator\test.py 写入以下内容,生成一个流程,测试刚刚编写的组件:

# Python 实用宝典
# 2021/06/20

import time

from bamboo_engine.builder import *
from big_calculator.components import FactorialCalculateComponent
from pipeline.eri.runtime import BambooDjangoRuntime
from bamboo_engine import api
from bamboo_engine import builder


def bamboo_playground():
    """
    测试流程引擎
    """
    # 使用 builder 构造出流程描述结构
    start = EmptyStartEvent()
    # 这里使用 我们刚创建好的n阶乘组件
    act = ServiceActivity(component_code=FactorialCalculateComponent.code)
    # 传入参数
    act.component.inputs.n = Var(type=Var.PLAIN, value=4)
    end = EmptyEndEvent()

    start.extend(act).extend(end)

    pipeline = builder.build_tree(start)
    api.run_pipeline(runtime=BambooDjangoRuntime(), pipeline=pipeline)

    # 等待 1s 后获取流程执行结果
    time.sleep(1)

    result = api.get_execution_data_outputs(BambooDjangoRuntime(), act.id).data

    print(result)

随后,在命令行输入

python manage.py shell

打开 django console, 输入以下命令,执行此流程:

from big_calculator.test import bamboo_playground
bamboo_playground()

流程运行完后,获取节点的执行结果,可以看到,该节点输出了 factorial_of_n,并且值为 24(4 * 3 * 2 *1),这正是我们需要的效果:

{'_loop': 0, '_result': True, 'factorial_of_n': 24}

恭喜你,你已经成功的创建了一个流程并把它运行起来了!在这期间你可能会遇到不少的坑,建议尝试先自行解决,如果实在无法解决,可以前往 标准运维 仓库提 issues,或者前往蓝鲸智云官网提问。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化全方位实战教程

本文讲解了Pandas性能优化的几种方法,比如第一篇文章讲到了transform函数的应用、Cython编写C扩展、减少类型转换、使用特殊的数组。第二篇Pandas基础优化讲到了尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。第三篇进阶版优化讲到了一些更高级的优化技巧。

1.Pandas 性能优化 40 倍 – DataFrame

1. 1 性能优化小试牛刀

大名鼎鼎的Pandas是数据分析的神器。有时候我们需要对上千万甚至上亿的数据进行非常复杂处理,那么运行效率就是一个不能忽视的问题。

比如下面这个简单例子,我们随机生成100万条数据,对val这一列进行处理:如果是偶数则减1,奇数则加1。实际的数据分析工作要比这个例子复杂的多,但考虑到我们没有那么多时间等待运行结果,所以就偷个懒吧。可以看到transform函数的平均运行时间是284ms:

import pandas as pd
import numpy as np

def gen_data(size):
    d = dict()
    d["genre"] = np.random.choice(["A""B""C""D"], size=size)
    d["val"] = np.random.randint(low=0, high=100, size=size)
    return pd.DataFrame(d)

data = gen_data(1000000)
data.head()
def transform(data):
    data.loc[:, "new_val"] = data.val.apply(lambda x: x + 1 if x % 2 else x - 1)

%timeit -n 1 transform(data)
284 ms ± 8.95 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.2. 用Cython编写C扩展

试试用我们的老朋友Cython来写一下 x + 1 if x % 2 else x - 1 这个函数。平均运行时间降低到了202ms,果然速度变快了。性能大约提升了1.4倍,离40倍的flag还差的好远。

%load_ext cython
%%cython
cpdef int _transform(int x):
    if x % 2:
        return x + 1
    return x - 1

def transform(data):
    data.loc[:, "new_val"] = data.val.apply(_transform)

%timeit -n 1 transform(data)
202 ms ± 13.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.3. 减少类型转换

为了减少C和Python之间的类型转换,我们直接把val这一列作为Numpy数组传递给Cython函数,注意区分cnpnp。平均运行时间直接降到10.8毫秒,性能大约提升了26倍,仿佛看到了一丝希望。

%%cython
import numpy as np
cimport numpy as cnp
ctypedef cnp.int_t DTYPE_t

cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr):
    cdef:
        int i = 0
        int n = arr.shape[0]
        int x
        cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)

    while i < n:
        x = arr[i]
        if x % 2:
            new_arr[i] = x + 1
        else:
            new_arr[i] = x - 1
        i += 1
    return new_arr

def transform(data):
    data.loc[:, "new_val"] = _transform(data.val.values)

%timeit -n 1 transform(data)
10.8 ms ± 512 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.4. 使用不安全的数组

利用@cython.boundscheck(False)@cython.wraparound(False)装饰器关闭数组的边界检查和负下标处理,平均运行时间变为5.9毫秒。性能提升了42倍左右,顺利完成任务。

%%cython
import cython
import numpy as np
cimport numpy as cnp
ctypedef cnp.int_t DTYPE_t

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr):
    cdef:
        int i = 0
        int n = arr.shape[0]
        int x
        cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)

    while i < n:
        x = arr[i]
        if x % 2:
            new_arr[i] = x + 1
        else:
            new_arr[i] = x - 1
        i += 1
    return new_arr

def transform(data):
    data.loc[:, "new_val"] = _transform(data.val.values)

%timeit -n 1 transform(data)
6.76 ms ± 545 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

来源:Python中文社区

作者:李小文,先后从事过数据分析、数据挖掘工作,主要开发语言是Python,现任一家小型互联网公司的算法工程师。https://github.com/tushushu

2.Pandas性能优化:基础篇

Pandas 号称“数据挖掘瑞士军刀”,是数据处理最常用的库。在数据挖掘或者kaggle比赛中,我们经常使用pandas进行数据提取、分析、构造特征。而如果数据量很大,操作算法复杂,那么pandas的运行速度可能非常慢。本文根据实际工作中的经验,总结了一些pandas的使用技巧,帮助提高运行速度或减少内存占用。

2.1 按行迭代优化

很多时候,我们会按行对dataframe进行迭代,一般我们会用iterrows这个函数。在新版的pandas中,提供了一个更快的itertuples函数。

我们测试一下速度:

import pandas as pd
import numpy as np
import time
df = pd.DataFrame({'a': np.random.randn(1000),
                     'b': np.random.randn(1000),
                    'N': np.random.randint(100, 1000, (1000)),
                   'x':  np.random.randint(1, 10, (1000))})

%%timeit
a2=[]
for index,row in df.iterrows():
    temp=row['a']
    a2.append(temp*temp)
df['a2']=a2    

67.6 ms ± 3.69 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
a2=[]
for row in df.itertuples():
    temp=getattr(row, 'a')
    a2.append(temp*temp)
df['a2']=a2

1.54 ms ± 168 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看到直接循环,itertuples速度是iterrows的很多倍。

所以如果在必须要对dataframe进行遍历的话,直接用itertuples替换iterrows。

2.2 apply 优化

一般情况下,如果要对dataframe里的数据逐行处理,而不需要上下文信息,可以使用apply函数。
对于上面的例子,我们使用apply看下:

%%timeit
df['a2']=df.a.apply(lambda x: x*x)

360 µs ± 355 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看到,效率又有提升,apply的速度是itertuples的5倍左右。

注意一下,apply不光能对单个列值做处理,也能对多个列的值做处理。

%%timeit
df['a3']=df.apply( lambda row: row['a']*row['b'],axis=1)

15 ms ± 1.61 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)

这里看出来,多值处理的时候几乎等于iterrows。因此比单列值apply慢了许多,所以这里不推荐对整行进行apply。

我们可以简单的这样改写:

%%timeit
df['a3']=df['a']*df['b']

204 µs ± 8.31 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

如果在计算的时候,使用series.values 提取numpy 数组并使用numpy原生函数计算,效率可能更高

%%timeit
df['a3']=np.multiply(df['a'].values,df['b'].values)

93.3 µs ± 1.45 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

2.3 聚合agg效率优化

有的时候,我们会对一列数据按值进行分组(groupby),再分组后,依次对每一组数据中的其他列进行聚合(agg)。

还是上面的那个dataframe,我们看下:
采用自定义函数的agg函数:

%%timeit
df.groupby("x")['a'].agg(lambda x:x.sum())

1.27 ms ± 45.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

采用agg内置函数:

%%timeit
df.groupby("x")['a'].agg(sum)
#等价df.groupby("x")['a'].sum()

415 µs ± 20.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

刚才是单列聚合,我们看下多列聚合:
自定义函数:

%%timeit
df.groupby("x").agg({"a":lambda x:x.sum(),"b":lambda x:x.count()})

2.6 ms ± 8.17 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

采用内置函数:

%%timeit
df.groupby("x").agg({"a":"sum","b":"count"})

1.33 ms ± 29.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看出,对于多列聚合,内置函数仍然比自定义函数快1倍。所以再进行聚合操作时,尽量使用内置函数提高效率。

下面列出了一些内置函数:

内置函数描述
count计数
sum求和
mean平均值
median中位数
min,max最大值/最小值
std,var标准差/方差
prod求积

当我们需要统计的方法没有对于内置函数的情况下,在自定义函数的时候,优先选用pandas或numpy内置的其他高效函数,如

df.groupby("x")['a'].agg(lambda x:np.sum(np.abs(x)))

在数据量很大的时候要比非numpy函数效率高(数据量小的时候差不多)。

2.4 数据读取优化

如果我们需要用pandas读取较多次文件,或者读取的文件较大,那么这方面占用的时间也会较长,我们也需要对其进行优化。pandas最常见读取函数是read_csv函数,可以对csv文件进行读取。但是pandas read_csv对读取较大的、数据结构复杂的文件,效率不是很理想。

这里有几种方法可以优化:

1. 分块读取csv文件
如果文件过大,内存不够或者时间上等不及,可以分块进行读取。

chunksize = 10 ** 6for chunk in pd.read_csv(filename,  chunksize=chunksize):
     process(chunk)

这里也可以使用多进程方式,具体我们在后面进阶篇介绍。

  1. 过滤掉不需要的列

如果读取的文件列很多,可以使用usecols字段,只load需要的列,提高效率,节约内存。

df = pd.read_csv(filename,  usecols=["a","b","c"]):
  1. 为列指定类型
    panda在read_csv的时候,会自动匹配列的数值类型,这样会导致速度很慢,并且占用内存较大。我们可以为每个列指定类型。
df = pd.read_csv("f500.csv", dtype = {"revenues" : "float64","name":str})
  1. 保存为其他格式
    如果需要频繁的读取和写入,则可以将文件保存为其他格式的,如pickle或hdf。pickle和hdf读取速度是csv的数倍。这里注意一下,pickle比原csv略小,hdf比原csv略大。
 import pandas as pd
 #读取csv
 df = pd.read_csv('filename.csv')
 
 #pickle格式
 df.to_pickle('filename.pkl') 
 df = pd.read_pickle('filename.pkl') 
 
 #hdf格式
 df.to_hdf('filename.hdf','df') 
 df = pd.read_hdf('filename.hdf','df') 

file typetimespeed
csv1.93 s1
pickle415 m4.6
hdf808 ms2.3
  1. 用第三方的包读取
    如可以使用Dask DataFrame读取大文件。第三方包放到最后细讲。

2.5 优化数据处理逻辑

这点算是业务角度优化。如果我们能直接数据处理的步骤,那么处理时间就少了很多。

这需要具体问题具体分析,举个例子,假设我们有一个通讯记录数据集:

call_areacall_seconds
03364
23075
25847
12032

call_seconds 是拨打的时长,单位是秒。
call_area=1 是拨打国内电话,费率是0.1/min,call_area=0 是拨打国外电话,费率是0.7/min
call_area=2 是接听电话,不收费。

我们随机生成一批数据:

import pandas as pd
import numpy as np
import time
import math
row_number=10000
df = pd.DataFrame({'call_area':  np.random.randint(0, 3, (row_number)),
                   'call_seconds':  np.random.randint(0, 10000, (row_number))})

如果我们想计算每个电话的费用:

def get_cost(call_area,call_seconds):
    if call_area==1:
        rate=0.1
    elif call_area==0:
        rate=0.7
    else:
        return 0
    return math.ceil(call_seconds/60)*rate

方法1,采用按行迭代循环

%%timeit
cost_list = []
for i,row in df.iterrows():
    call_area=row['call_area']
    call_seconds=row['call_seconds']
    cost_list.append(get_cost(call_area,call_seconds))
df['cost'] = cost_list

方法2,采用apply行的方式

%%timeit
df['cost']=df.apply(
         lambda row: get_cost(
             row['call_area'],
             row['call_seconds']),
         axis=1)

方法3,采用mask+loc,分组计算

%%timeit
mask1=df.call_area==1
mask2=df.call_area==0
mask3=df.call_area==2
df['call_mins']=np.ceil(df['call_seconds']/60)
df.loc[mask1,'cost']=df.loc[mask1,'call_mins']*0.1
df.loc[mask2,'cost']=df.loc[mask2,'call_mins']*0.7
df.loc[mask3,'cost']=0

方法4,使用numpy的方式,可以使用index的方式找到对应的费用。

%%timeit
prices = np.array([0.7, 0.1, 0])
mins=np.ceil(df['call_seconds'].values/60)
df['cost']=prices[df.call_area]*mins

测试结果:

方法运行时间运行速度
iterrows513 ms1
apply181 ms2.8
loc6.44 ms79.6
numpy219 µs2342

方法4速度快是因为它采用了numpy向量化的数据处理方式。

总结

在优化尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。

3.Pandas性能优化:进阶篇

在这里介绍一些更高级的pandas优化方法。

3.1 numpy

我们先来回顾一下上节说过的一个例子

import pandas as pd
import numpy as np
import time
row_number=100000
df = pd.DataFrame({'a': np.random.randn(row_number),
                     'b': np.random.randn(row_number),
                    'N': np.random.randint(100, 1000, (row_number)),
                   'x':  np.random.randint(1, 10, (row_number))})

我们要计算a列与b列的乘积

方法1,采用apply

%timeit df.apply( lambda row: row['a']*row['b'],axis=1)

方法2,直接对series做乘法

%timeit df['a']*df['b']

方法3,使用numpy函数

%timeit  np.multiply(df['a'].values,df['b'].values)
方法运行时间运行速度
方法11.45s1
方法2254µs5708
方法341.2 µs3536

这提示我们,采用一些好的方法可以大幅度提高pandas的运行速度。

3.2 cython

我们还继续使用上面的dataframe,现在定义一个函数:

def f(x):
    return x * (x - 1)

def integrate_f(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f(a + i * dx)
    return s * dx

我们要计算每一行integrate_f的值,

方法1,还是apply:

%timeit df.apply(lambda x: integrate_f(x['a'], x['b'], x['N'].astype(int)), axis=1)

这个函数运行时间就较长了:

7.05 s ± 54.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

考虑可以用cython重写程序,提高效率。
在使用cython的时候,可能需要安装gcc环境或者mingw(windows)。

方法1,直接加头编译

%load_ext Cython
%%cython
def f_plain(x):
    return x * (x - 1)

def integrate_f_plain(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

6.46 s ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

看来直接加头,效率提升不大。

方法2,使用c type

%%cython
cdef double f_typed(double x) except? -2:
     return x * (x - 1)
cpdef double integrate_f_typed(double a, double b, int N):
    cdef int i
    cdef double s, dx
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_typed(a + i * dx)
    return s * dx

345 ms ± 529 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

可以看到,使用cython的特定编程方法,效率提升较大。

3.3 numba

numba是一个动态JIT编译器,在一些数值计算中可以大幅度提高运行速度。
我们学cython,在python程序上直接加numba jit的头。

import numba

@numba.jitdef f_plain(x):
    return x * (x - 1)

@numba.jitdef integrate_f_numba(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

@numba.jitdef apply_integrate_f_numba(col_a, col_b, col_N):
    n = len(col_N)
    result = np.empty(n, dtype='float64')
    assert len(col_a) == len(col_b) == n
    for i in range(n):
        result[i] = integrate_f_numba(col_a[i], col_b[i], col_N[i])
    return result

def compute_numba(df):
    result = apply_integrate_f_numba(df['a'].to_numpy(),
                                     df['b'].to_numpy(),
                                     df['N'].to_numpy())
    return pd.Series(result, index=df.index, name='result')

 %timeit compute_numba(df)

6.44 ms ± 440 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

我们看到,使用numba,需要做的代码改动较小,效率提升幅度却很大!

3.4 进阶 并行化处理

并行化读取数据

在基础篇讲分块读取时,简单提了一下并行化处理,这里详细说下代码。

第一种思路,分块读取,多进程处理。

import pandas as pd
from multiprocessing import Pool
 
def process(df):
    """
  数据处理
    """
    pass
 
# initialise the iterator object
iterator = pd.read_csv('train.csv', chunksize=200000, compression='gzip',
skipinitialspace=True, encoding='utf-8')
# depends on how many cores you want to utilise
max_processors = 4# Reserve 4 cores for our script
pool = Pool(processes=max_processors)
f_list = []
for df in iterator:
    # 异步处理每个分块
    f = pool.apply_async(process, [df])
    f_list.append(f)
    if len(f_list) >= max_processors:
        for f in f_list:
            f.get()
            del f_list[:]

第二种思路,把大文件拆分成多份,多进程读取。

利用linux中的split命令,将csv切分成p个文件。

!split -l 200000 -d train.csv train_split
#将文件train.csv按每200000行分割,前缀名为train_split,并设置文件命名为数字

代码部分

from multiprocessing import Pool
import pandas as pd
import os
 
def read_func(file_path):
    df = pd.read_csv(file_path, header=None)
    return df
 
def read_file():
    file_list=["train_split%02d"%i for i in range(66)]
    p = Pool(4)
    res = p.map(read_func, file_list)
    p.close()
    p.join()
    df = pd.concat(res, axis=0, ignore_index=True)
    return df
 
df = read_file()

并行化apply

apply的func如果在用了我们之前说的技术优化了速度之后仍然很慢,或者func遇到网络阻塞,那么我们需要去并行化执行apply。这里提供一种处理思路:

import multiprocessing as mp
import time
def slow_func(s):
    time.sleep(1)
    return "done"with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(slow_func, df['qid'])

3.5 进阶 第三方pandas库

由于padans的操作如apply,都是单线程的,直接调用效率不高。我可以使用第三方库进行并行操作。
当然第三方库会带来新的代码不兼容问题。我们有时候会考虑像上一章一样,手写并行化处理。这个权衡需要我们在编程之初就要规划好,避免后期因为bug需要重构。

dask库

pip install dask

类pandas库,可以并行读取、运行。

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import getand the syntax isdata = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def some_function(x,y,z):
    return x+y+z

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1))
.compute(get=get)  

swifter

pip install swifter

pandas的插件,可以直接在pandas上操作:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

Modin库

Modin后端使用dask或者ray,是个支持分布式运行的类pandas库,当然功能异常强大。具体请看官网,这里就不具体介绍了。

https://modin.readthedocs.io/en/latest/using_modin.html

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Prometheus + Granafa 40分钟构建MySQL监控平台实战教程

Prometheus + Granafa 概述

对于MySQL的监控平台,相信大家实现起来有很多了:基于天兔的监控,还有基于zabbix相关的二次开发。相信很多同行都应该已经开始玩起来了。我这边的选型是Prometheus + Granafa的实现方式。简而言之就是我现在的生产环境使用的是prometheus,还有就是granafa满足的我的日常工作需要。在入门的简介和安装,大家可以参考这里:

https://blog.51cto.com/cloumn/detail/77

1、首先看下我们的监控效果、mysql主从

构建高大上的MySQL监控平台

2、mysql状态:

构建高大上的MySQL监控平台

构建高大上的MySQL监控平台

3、缓冲池状态:

构建高大上的MySQL监控平台

exporter 相关部署实战教程

1、安装exporter

    [root@controller2 opt]# https://github.com/prometheus/mysqld_exporter/releases/download/v0.10.0/mysqld_exporter-0.10.0.linux-amd64.tar.gz
    [root@controller2 opt]# tar -xf mysqld_exporter-0.10.0.linux-amd64.tar.gz 

2、添加mysql 账户:

    GRANT SELECT, PROCESS, SUPER, REPLICATION CLIENT, RELOAD ON *.* TO 'exporter'@'%' IDENTIFIED BY 'localhost';
    flush privileges;

3、编辑配置文件:

    [root@controller2 mysqld_exporter-0.10.0.linux-amd64]# cat /opt/mysqld_exporter-0.10.0.linux-amd64/.my.cnf 
    [client]
    user=exporter
    password=123456

4、设置配置文件:

    [root@controller2 mysqld_exporter-0.10.0.linux-amd64]# cat /etc/systemd/system/mysql_exporter.service 
    [Unit]
    Description=mysql Monitoring System
    Documentation=mysql Monitoring System

    [Service]
    ExecStart=/opt/mysqld_exporter-0.10.0.linux-amd64/mysqld_exporter \
             -collect.info_schema.processlist \
             -collect.info_schema.innodb_tablespaces \
             -collect.info_schema.innodb_metrics  \
             -collect.perf_schema.tableiowaits \
             -collect.perf_schema.indexiowaits \
             -collect.perf_schema.tablelocks \
             -collect.engine_innodb_status \
             -collect.perf_schema.file_events \
             -collect.info_schema.processlist \
             -collect.binlog_size \
             -collect.info_schema.clientstats \
             -collect.perf_schema.eventswaits \
             -config.my-cnf=/opt/mysqld_exporter-0.10.0.linux-amd64/.my.cnf

    [Install]
    WantedBy=multi-user.target

5、添加配置到prometheus server

      - job_name: 'mysql'
        static_configs:
         - targets: ['192.168.1.11:9104','192.168.1.12:9104']

6、测试看有没有返回数值:

http://192.168.1.12:9104/metrics

正常我们通过mysql_up可以查询倒mysql监控是否已经生效,是否起起来

    #HELP mysql_up Whether the MySQL server is up.
    #TYPE mysql_up gauge
    mysql_up 1

监控相关指标

在做任何一个东西监控的时候,我们要时刻明白我们要监控的是什么,指标是啥才能更好的去监控我们的服务,在mysql里面我们通常可以通过一下指标去衡量mysql的运行情况:mysql主从运行情况、查询吞吐量、慢查询情况、连接数情况、缓冲池使用情况以及查询执行性能等。

主从复制运行指标:

1、主从复制线程监控:

大部分情况下,很多企业使用的都是主从复制的环境,监控两个线程是非常重要的,在mysql里面我们通常是通过命令:

    MariaDB [(none)]> show slave status\G;
    *************************** 1. row ***************************
                   Slave_IO_State: Waiting for master to send event
                      Master_Host: 172.16.1.1
                      Master_User: repl
                      Master_Port: 3306
                    Connect_Retry: 60
                  Master_Log_File: mysql-bin.000045
              Read_Master_Log_Pos: 72904854
                   Relay_Log_File: mariadb-relay-bin.000127
                    Relay_Log_Pos: 72905142
            Relay_Master_Log_File: mysql-bin.000045
                 Slave_IO_Running: Yes
                Slave_SQL_Running: Yes

Slave_IO_Running、Slave_SQL_Running两个线程正常那么说明我们的复制集群是健康状态的。

MySQLD Exporter中返回的样本数据中通过mysql_slave_status_slave_sql_running来获取主从集群的健康状况。

    # HELP mysql_slave_status_slave_sql_running Generic metric from SHOW SLAVE STATUS.
    # TYPE mysql_slave_status_slave_sql_running untyped
    mysql_slave_status_slave_sql_running{channel_name="",connection_name="",master_host="172.16.1.1",master_uuid=""} 1

2、主从复制落后时间:

在使用show slave status
里面还有一个关键的参数Seconds_Behind_Master。Seconds_Behind_Master表示slave上SQL thread与IO thread之间的延迟,我们都知道在MySQL的复制环境中,slave先从master上将binlog拉取到本地(通过IO thread),然后通过SQL
thread将binlog重放,而Seconds_Behind_Master表示本地relaylog中未被执行完的那部分的差值。所以如果slave拉取到本地的relaylog(实际上就是binlog,只是在slave上习惯称呼relaylog而已)都执行完,此时通过show slave status看到的会是0

Seconds_Behind_Master: 0

MySQLD Exporter中返回的样本数据中通过mysql_slave_status_seconds_behind_master 来获取相关状态。

    # HELP mysql_slave_status_seconds_behind_master Generic metric from SHOW SLAVE STATUS.
    # TYPE mysql_slave_status_seconds_behind_master untyped
    mysql_slave_status_seconds_behind_master{channel_name="",connection_name="",master_host="172.16.1.1",master_uuid=""} 0

查询吞吐量:

说到吞吐量,那么我们如何从那方面来衡量呢? 
通常来说我们可以根据mysql 的插入、查询、删除、更新等操作来

为了获取吞吐量,MySQL 有一个名为 Questions 的内部计数器(根据 MySQL
用语,这是一个服务器状态变量),客户端每发送一个查询语句,其值就会加一。由 Questions 指标带来的以客户端为中心的视角常常比相关的Queries
计数器更容易解释。作为存储程序的一部分,后者也会计算已执行语句的数量,以及诸如PREPARE 和 DEALLOCATE PREPARE
指令运行的次数,作为服务器端预处理语句的一部分。可以通过命令来查询:

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Questions";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    |
 Questions     | 15071 |
    +---------------+-------+

MySQLD Exporter中返回的样本数据中通过mysql_global_status_questions反映当前Questions计数器的大小:

    # HELP mysql_global_status_questions Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_questions untyped
    mysql_global_status_questions 13253

当然由于prometheus
具有非常丰富的查询语言,我们可以通过这个累加的计数器来查询某一短时间内的查询增长率情况,可以做相关的阈值告警处理、例如一下查询2分钟时间内的查询情况:

rate(mysql_global_status_questions[2m])

当然上面是总量,我们可以分别从监控读、写指令的分解情况,从而更好地理解数据库的工作负载、找到可能的瓶颈。通常,通常,读取查询会由 Com_select
指标抓取,而写入查询则可能增加三个状态变量中某一个的值,这取决于具体的指令:

Writes = Com_insert + Com_update + Com_delete

下面我们通过命令获取插入的情况:

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Com_insert";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    |
 Com_insert    | 10578 |
    +---------------+-------+

从MySQLD
Exporter的/metrics返回的监控样本中,可以通过global_status_commands_total获取当前实例各类指令执行的次数:

    # HELP mysql_global_status_commands_total Total number of executed MySQL commands.
    # TYPE mysql_global_status_commands_total counter
    mysql_global_status_commands_total{command="create_trigger"} 0
    mysql_global_status_commands_total{command="create_udf"} 0
    mysql_global_status_commands_total{command="create_user"} 1
    mysql_global_status_commands_total{command="create_view"} 0
    mysql_global_status_commands_total{command="dealloc_sql"} 0
    mysql_global_status_commands_total{command="delete"} 3369
    mysql_global_status_commands_total{command="delete_multi"} 0

慢查询性能

查询性能方面,慢查询也是查询告警的一个重要的指标。MySQL还提供了一个Slow_queries的计数器,当查询的执行时间超过long_query_time的值后,计数器就会+1,其默认值为10秒,可以通过以下指令在MySQL中查询当前long_query_time的设置:

    MariaDB [(none)]> SHOW VARIABLES LIKE 'long_query_time';
    +-----------------+-----------+
    | Variable_name   | Value     |
    +-----------------+-----------+
    |
 long_query_time | 10.000000 |
    +-----------------+-----------+
    1 row in set (0.00 sec)

当然我们也可以修改时间

    MariaDB [(none)]> SET GLOBAL long_query_time = 5;
    Query OK, 0 rows affected (0.00 sec)

然后我们而已通过sql语言查询MySQL实例中Slow_queries的数量:

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Slow_queries";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    |
 Slow_queries  | 0     |
    +---------------+-------+
    1 row in set (0.00 sec)

MySQLD
Exporter返回的样本数据中,通过mysql_global_status_slow_queries指标展示当前的Slow_queries的值:

    # HELP mysql_global_status_slow_queries Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_slow_queries untyped
    mysql_global_status_slow_queries 0

同样的,更具根据Prometheus 慢查询语句我们也可以查询倒他某段时间内的增长率:

rate(mysql_global_status_slow_queries[5m])

连接数监控

监控客户端连接情况相当重要,因为一旦可用连接耗尽,新的客户端连接就会遭到拒绝。MySQL 默认的连接数限制为 151。

    MariaDB [(none)]> SHOW VARIABLES LIKE 'max_connections';
    +-----------------+-------+
    | Variable_name   | Value |
    +-----------------+-------+
    |
 max_connections | 151   |
    +-----------------+-------+

当然我们可以修改配置文件的形式来增加这个数值。与之对应的就是当前连接数量,当我们当前连接出来超过系统设置的最大值之后常会出现我们看到的Too many
connections(连接数过多),下面我查找一下当前连接数:

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Threads_connected";
    +-------------------+-------+
    | Variable_name     | Value |
    +-------------------+-------+
    |
 Threads_connected | 41     |
    +-------------------+-------

当然mysql 还提供Threads_running 这个指标,帮助你分隔在任意时间正在积极处理查询的线程与那些虽然可用但是闲置的连接。

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Threads_running";
    +-----------------+-------+
    | Variable_name   | Value |
    +-----------------+-------+
    |
 Threads_running | 10     |
    +-----------------+-------+

如果服务器真的达到 max_connections
限制,它就会开始拒绝新的连接。在这种情况下,Connection_errors_max_connections
指标就会开始增加,同时,追踪所有失败连接尝试的Aborted_connects 指标也会开始增加。

MySQLD Exporter返回的样本数据中:

    # HELP mysql_global_variables_max_connections Generic gauge metric from SHOW GLOBAL VARIABLES.
    # TYPE mysql_global_variables_max_connections gauge
    mysql_global_variables_max_connections 151         

表示最大连接数

    # HELP mysql_global_status_threads_connected Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_threads_connected untyped
    mysql_global_status_threads_connected 41

表示当前的连接数

    # HELP mysql_global_status_threads_running Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_threads_running untyped
    mysql_global_status_threads_running 1

表示当前活跃的连接数

    # HELP mysql_global_status_aborted_connects Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_aborted_connects untyped
    mysql_global_status_aborted_connects 31

累计所有的连接数

    # HELP mysql_global_status_connection_errors_total Total number of MySQL connection errors.
    # TYPE mysql_global_status_connection_errors_total counter
    mysql_global_status_connection_errors_total{error="internal"} 0
    #服务器内部引起的错误、如内存硬盘等
    mysql_global_status_connection_errors_total{error="max_connections"} 0
    #超出连接处引起的错误

当然根据prom表达式,我们可以查询当前剩余可用的连接数:

mysql_global_variables_max_connections - mysql_global_status_threads_connected

查询mysq拒绝连接数

mysql_global_status_aborted_connects

缓冲池情况:

MySQL 默认的存储引擎 InnoDB
使用了一片称为缓冲池的内存区域,用于缓存数据表与索引的数据。缓冲池指标属于资源指标,而非工作指标,前者更多地用于调查(而非检测)性能问题。如果数据库性能开始下滑,而磁盘
I/O 在不断攀升,扩大缓冲池往往能带来性能回升。 
默认设置下,缓冲池的大小通常相对较小,为 128MiB。不过,MySQL 建议可将其扩大至专用数据库服务器物理内存的 80% 大小。我们可以查看一下:

    MariaDB [(none)]> show global variables like 'innodb_buffer_pool_size';
    +-------------------------+-----------+
    | Variable_name           | Value     |
    +-------------------------+-----------+
    |
 innodb_buffer_pool_size | 134217728 |
    +-------------------------+-----------+

MySQLD Exporter返回的样本数据中,使用mysql_global_variables_innodb_buffer_pool_size来表示。

    # HELP mysql_global_variables_innodb_buffer_pool_size Generic gauge metric from SHOW GLOBAL VARIABLES.
    # TYPE mysql_global_variables_innodb_buffer_pool_size gauge
    mysql_global_variables_innodb_buffer_pool_size 1.34217728e+08

    Innodb_buffer_pool_read_requests记录了正常从缓冲池读取数据的请求数量。可以通过以下指令查看

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Innodb_buffer_pool_read_requests";
    +----------------------------------+-------------+
    | Variable_name                    | Value       |
    +----------------------------------+-------------+
    |
 Innodb_buffer_pool_read_requests | 38465 |
    +----------------------------------+-------------+

MySQLD
Exporter返回的样本数据中,使用mysql_global_status_innodb_buffer_pool_read_requests来表示。

    # HELP mysql_global_status_innodb_buffer_pool_read_requests Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_innodb_buffer_pool_read_requests untyped
    mysql_global_status_innodb_buffer_pool_read_requests 2.7711547168e+10

当缓冲池无法满足时,MySQL只能从磁盘中读取数据。Innodb_buffer_pool_reads即记录了从磁盘读取数据的请求数量。通常来说从内存中读取数据的速度要比从磁盘中读取快很多,因此,如果Innodb_buffer_pool_reads的值开始增加,可能意味着数据库的性能有问题。
可以通过以下只能查看Innodb_buffer_pool_reads的数量

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Innodb_buffer_pool_reads";
    +--------------------------+-------+
    | Variable_name            | Value |
    +--------------------------+-------+
    |
 Innodb_buffer_pool_reads | 138  |
    +--------------------------+-------+
    1 row in set (0.00 sec)

MySQLD
Exporter返回的样本数据中,使用mysql_global_status_innodb_buffer_pool_read_requests来表示。

    # HELP mysql_global_status_innodb_buffer_pool_reads Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_innodb_buffer_pool_reads untyped
    mysql_global_status_innodb_buffer_pool_reads 138

通过以上监控指标,以及实际监控的场景,我们可以利用PromQL快速建立多个监控项。可以查看两分钟内读取磁盘的增长率的增长率:

rate(mysql_global_status_innodb_buffer_pool_reads[2m])

官方模板ID

上面是我们简单列举的一些指标,下面我们使用granafa给 MySQLD_Exporter添加监控图表:

  • 主从主群监控(模板7371):

  • 相关mysql 状态监控7362:

  • 缓冲池状态7365:

  • 简单的告警规则

除了相关模板之外,没有告警规则那么我们的监控就是不完美的,下面列一下我们的监控告警规则

    groups:
    - name: MySQL-rules
      rules:
      - alert: MySQL Status 
        expr: up == 0
        for: 5s 
        labels:
          severity: warning
        annotations:
          summary: "{{$labels.instance}}: MySQL has stop !!!"
          description: "检测MySQL数据库运行状态"

      - alert: MySQL Slave IO Thread Status
        expr: mysql_slave_status_slave_io_running == 0
        for: 5s 
        labels:
          severity: warning
        annotations: 
          summary: "{{$labels.instance}}: MySQL Slave IO Thread has stop !!!"
          description: "检测MySQL主从IO线程运行状态"

      - alert: MySQL Slave SQL Thread Status 
        expr: mysql_slave_status_slave_sql_running == 0
        for: 5s 
        labels:
          severity: warning
        annotations: 
          summary: "{{$labels.instance}}: MySQL Slave SQL Thread has stop !!!"
          description: "检测MySQL主从SQL线程运行状态"

      - alert: MySQL Slave Delay Status 
        expr: mysql_slave_status_sql_delay == 30
        for: 5s 
        labels:
          severity: warning
        annotations: 
          summary: "{{$labels.instance}}: MySQL Slave Delay has more than 30s !!!"
          description: "检测MySQL主从延时状态"

      - alert: Mysql_Too_Many_Connections
        expr: rate(mysql_global_status_threads_connected[5m]) > 200
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "{{$labels.instance}}: 连接数过多"
          description: "{{$labels.instance}}: 连接数过多,请处理 ,(current value is: {{ $value }})"  

      - alert: Mysql_Too_Many_slow_queries
        expr: rate(mysql_global_status_slow_queries[5m]) > 3
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "{{$labels.instance}}: 慢查询有点多,请检查处理"
          description: "{{$labels.instance}}: Mysql slow_queries is more than 3 per second ,(current value is: {{ $value }})"

2、添加规则到prometheus:

    rule_files:
      - "rules/*.yml" 

3、打开web ui我们可以看到规则生效了:

构建高大上的MySQL监控平台

总结

到处监控mysql的相关状态已经完成,大家可以根据mysql更多的监控指标去完善自己的监控,当然这一套就是我用在线上环境的,可以参考参考。

来源:https://blog.51cto.com/xiaoluoge/2476375
作者:小罗ge11

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 函数耗时异常自动化监控实战教程

来源:文呓

本文内容包括Python性能可视化分析,逻辑优化,及根据不同的模型动态计算安全阈值,实现各个函数耗时及程序总耗时的自动化监控预警

在做Python性能分析优化的时候,可以借助cProfile生成性能数据文件,通过pstats获取详细耗时分布数据,结合gprof2dot脚本生成函数调用栈结构图做可视化分析,提高性能分析的效率。

接着从具体的耗时分布,先从占用大头的函数分析具体逻辑实现,逐步优化,同时保存pstats函数耗时平均值数据作为后续异常自动化监控的样本数据。

实现耗时自动化监控必须是可以根据算法动态调整安全阈值,而不是人工定死安全阈值范围,这样才可以实现异常监控的自循环和迭代校准。

一、性能数据函数耗时采集及可视化报表生成

1. 性能数据文件保存(cProfile)

首先是性能数据文件的保存,cProfile和profile提供了Python程序的确定性性能分析。profile是一组统计数据,用来描述程序的各个部分执行的频率和时间。在程序开始的时候调用enable开始性能数据采集,结束的时候调用dump_stats停止性能数据采集并保存性能数据到指定路径的文件。

import cProfile
# 程序开始的时候打开数据采集开关
pr = cProfile.Profile()
pr.enable()

# 在程序运行结束的时候dump性能数据到指定路径文件中,profliePath为保存文件的绝对路径参数
pr.dump_stats(profliePath)

2. 详细性能数据读取查看

保存性能数据到文件之后,可以用pstats读取文件中的数据,profile统计数据可以通过pstats模块格式化为报表。

import pstats 
# 读取性能数据 
pS = pstats.Stats(profliePath) 
# 根据函数自身累计耗时做排序 
pS.sort_stats('tottime') 
# 打印所有耗时函数信息 
pS.print_stats()
print_stats()输出示例:
79837 function calls (75565 primitive calls) in 37.311 seconds
Ordered by: internal time
ncalls  tottime  percall  cumtime  percall  filename:lineno(function)
 2050    30.167    0.015   30.167    0.015  {time.sleep}
   16     6.579    0.411    6.579    0.411  {select.select}
    1     0.142    0.142    0.142    0.142  {method 'do_handshake' of '_ssl._SSLSocket' objects}
  434     0.074    0.000    0.074    0.000  {method 'read' of '_ssl._SSLSocket' objects}
    1     0.062    0.062    0.062    0.062  {method 'connect' of '_socket.socket' objects}
   37     0.046    0.001    0.046    0.001  {posix.read}
   14     0.024    0.002    0.024    0.002  {posix.fork}

输出字段说明:

  • ncalls  函数被调用次数(只有一个数字时表示不存在递归,有斜杠分割数字时,后面的数字表示非递归调用的次数)
  • tottime  函数总计运行时间,不包括子函数调用时间
  • percall  函数运行一次的平均时间,等于tottime/ncalls
  • cumtime 函数总计运行时间,包括子函数调用时间
  • percall  函数运行一次的平均时间,等于cumtime/ncalls
  • filename:lineno(function) 函数所在的文件名,函数的行号,函数名或基础框架函数类

如果要获取print_stats()里面各个字段信息可以通过如下方式:

# func————filename:lineno(function)
# cc ———— call count,调用次数 
# nc ———— ncalls
# tt ———— tottime
# ct ———— cumtime
# callers ———— 调用堆栈数组,每项数据包括了func, (cc, nc, tt, ct) 字段
for index in range(len(pS.fcn_list)): 
    func = pS.fcn_list[index] 
    cc, nc, tt, ct, callers = pS.stats[func]  
    print cc, nc, tt, ct, func, callers
    for func, (cc, nc, tt, ct) in callers.iteritems():
        print func,cc, nc, tt, ct

二、生成函数调用栈结构图(gprof2dot)教程

gprof2dot脚本把gprof或callgrind分析获得的信息,转化成一个以DOT语言描述的程序调用有向图对象,再通过Graphviz将DOT有向图对象渲染成图片,这样就可以很直观地看出整个程序的调用栈,包括函数所在的类和行数、耗时占比、函数递归次数、以及被调用的次数。

先从GitHub上下载gprof2dot.py脚本到本地,和执行的程序的脚本文件放在同一目录下,当然要使用这个脚本还需要安装graphviz,使用brew命令安装,若安装过程中遇到异常,根据异常提示执行命令安装需要的工具

brew install graphviz

生成程序函数调用栈结构图的逻辑可以参考如下逻辑实现,具体根据自身需求做下修改。

import os
# 获取当前gprof2dot.py脚本路径
gprof2dotPath = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'gprof2dot.py')
# 函数调用栈结构图保存文件名路径,这边使用生成PNG图片结果
dumpProfPath = profliePath.replace("stats", "png")
dumpCmd = "python %s -f pstats %s | dot -Tpng -o %s" % (gprof2dotPath, profliePath, dumpProfPath)
os.popen(dumpCmd)

三、性能分析及优化实战

在生成函数调用栈结构图之后,就可以很容易的看出各个函数之间的调用关系,每个方块内包括的信息包括函数所在的类和行数耗时百分比被调用次数,如果这个函数存在递归的情况,方块边缘会有一个回旋的箭头标明递归的次数

从结构图里面找到耗时占用较多的部分,分析具体函数的实现逻辑,定位具体耗时的原因,优化的策略如下:

  • 去除多余的逻辑:去除冗余代码
  • 优化递归函数:加日志打印递归时候的各个参数,如果发现很多参数都是重复的,可以加缓存,避免多余的递归消耗。
  • 归并通用逻辑调用:一个函数多次调用同一个子函数获取参数,查看这个子函数的调用是否可以进行整合归并,避免多余的函数调用。
  • 通过上下文环境判断测试程序的初始化是否必要,非必要情况下不进行测试环境的重置操作。

四、耗时异常自动化监控

如果是通过历史的耗时数据计算得到平均值+固定浮动百分比的方式,来配置耗时安全阈值参数实施异常监控存在很大的问题,因为函数执行的耗时容易受设备和运行环境的影响,人工固定浮动百分比的方式维护性差,数据本身不可迭代自循环,总不能每次出现误报问题之后都去手动调整参数。

这边监控的维度包括两方面,一方面是程序各个函数执行耗时的平均值,另一方面是完整程序执行的总耗时,在前期先把这些历史耗时数据保存在数据库中,供后续自动化异常监控的实现提供样本数据。

要实现自动化阈值调整,需要借助常规的模型算法实现,这边只对耗时单个维度的异常做自动化监控实现。

根据原理,无监督异常检测模型一般可分为以下几类:

  • 基于统计和概率模型:主要是对数据的分布做出假设,并找出假设下所定义的“异常”;
  • 线性模型:主要思想是通过线性方法找到合适的低维子空间使得异常点在其中区别于正常点;
  • 基于距离:这种方法认为异常点距离正常点比较远,通过比较数据点之间的距离区分异常点;
  • 基于密度:由于数据分布不均匀,绝对距离无法衡量数据点之间相对远近时,用局部密度表示数据点的异常情况;
  • 基于聚类:将数据点聚类,不属于任何簇、距离最近的簇较远、稀疏聚类里的点认为是异常点;
  • 基于树:通过划分子空间构建树模型寻找异常点。

异常耗时数据是波动的一维数据,这边就直接采用基于统计和概率模型的方式,根据保存的历史数据判断数据是否符合正态分布

若符合正态分布则用 μ+3δ(平均值+3倍标准差)的方式计算得到安全阈值

若不符合正态分布,则用Turkey 箱型图方案 Q+1.5IQR 计算安全阈值。

根据实际测试来看,随着样本数据的增加,会出现前期符合正态分布的函数耗时曲线,随着样本数据的增加会变成不符合正态分布。

Python中用于判断数据是否符合正态分布的代码如下,当pvalue值大于0.05时为正态分布,dataList是耗时数组数据:

from scipy import stats
import numpy
percallMean = numpy.mean(dataList) # 计算均值
# percallVar = numpy.var(dataList) # 求方差
percallStd = numpy.std(dataList) # 计算标准差
kstestResult = stats.kstest(dataList, 'norm', (percallMean, percallStd))
# 当pvalue值大于0.05为正态分布
if kstestResult[1] > 0.05:
    pass

1. 正态分布数据方案

在统计学中,如果一个数据分布近似正态,那么大约 68% 的数据值会在均值的一个标准差范围内,大约 95% 会在两个标准差范围内,大约 99.7% 会在三个标准差范围内。因此,如果任何数据点超过标准差的 3 倍,那么这些点很有可能是异常值或离群点。即正态分布的安全阈值上限为:percallMean + 3 * percallStd

 

2. Turkey 箱型图方案

基于正态分布的 3σ 法则或 Z 分数方法的异常检测是以假定数据服从正态分布为前提的,但实际数据往往并不严格服从正态分布。应用这种方法于非正态分布数据中判断异常值,其有效性是有限的。Tukey 箱型图是一种用于反映原始数据分布的特征常用方法,也可用于异常点识别。在识别异常点时其主要依靠实际数据,因此有其自身的优越性。

箱型图为我们提供了识别异常值的一个标准:异常值被定义为小于 Q1-1.5IQR 或大于 Q+1.5IQR 的值。虽然这种标准有点任意性,但它来源于经验判断,经验表明它在处理需要特别注意的数据方面表现不错。

计算箱型图安全阈值Python实现逻辑如下:

import numpy
percallMean = numpy.mean(dataList)  # 计算均值
boxplotQ1 = numpy.percentile(dataList, 25)
boxplotQ2 = numpy.percentile(dataList, 75)
boxplotIQR = boxplotQ2 - boxplotQ1
upperLimit =  boxplotQ2 + 1.5 * boxplotIQR

在程序实现中就是,在一个程序或用例执行完毕之后,先拿历史数据判断是否符合正态分布,当然历史样本数据至少要达到20个才比较准确,小于20个的时候就继续收集数据,不做异常判断。根据正态分布模型或箱型图模型计算安全阈值参数,判断当前各个函数耗时平均值或用例总耗时是否超过阈值,超过则预警。

高斯模型和箱型图两种方式阈值范围对比

这边给出stats文件数据汇总解析之后,根据相应的模型绘制耗时曲线及阈值或正态曲线及阈值的代码实现,statFolder参数替换成自己stats文件所在文件夹即可。

# coding=utf-8
import os
import pstats
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import traceback
from scipy import stats
import numpy

"""
汇总函数耗时平均值数据
"""
def dataSummary(array, fileName, fcn, percall):
    (funcPath, line, func) = fcn
    exists = False
    for item in array:
        if item["func"] == func and item["funcPath"] == funcPath and item["line"] == line:
            exists = True
            item["cost"].append({
                "percall": percall,
                "fileName": fileName
            })
    if not exists:
        array.append({
            "func": func,
            "funcPath": funcPath,
            "line": line,
            "cost": [{
                "percall": percall,
                "fileName": fileName
            }]
        })

"""
高斯函数计算Y值
"""
def gaussian(x, mu, delta):
    exp = numpy.exp(- numpy.power(x - mu, 2) / (2 * numpy.power(delta, 2)))
    c = 1 / (delta * numpy.sqrt(2 * numpy.pi))
    return c * exp

"""
读取汇总所有stats文件数据
"""
def readStatsFile(statFolder, filterData):
    for path, dir_list, file_list in os.walk(statFolder, "r"):
        for fileName in file_list:
            if fileName.find(".stats") > 0:
                fileAbsolutePath = os.path.join(path, fileName)
                pS = pstats.Stats(fileAbsolutePath)
             # 先对耗时数据从大到小进行排序
                pS.sort_stats('cumtime')
                # pS.print_stats()
                # 统计前100条耗时数据
                for index in range(100):
                    fcn = pS.fcn_list[index]
                    (funcPath, line, func) = fcn
                    # cc ———— call count,调用次数
                    # nc ———— ncalls,调用次数(只有一个数字时表示不存在递归;有斜杠分割数字时,后面的数字表示非递归调用的次数)
                    # tt ———— tottime,函数总计运行时间,除去函数中调用的子函数运行时间
                    # ct ———— cumtime,函数总计运行时间,含调用的子函数运行时间
                    cc, nc, tt, ct, callers = pS.stats[fcn]
                    # print fileName, func, cc, nc, tt, ct, callers
                    percall = ct / nc
                    # 只统计单次函数调用大于1毫秒的数据
                    if percall >= 0.001:
                        dataSummary(filterData, fileName, fcn, percall)

"""
绘制高斯函数曲线和安全阈值
"""
def drawGaussian(func, line, percallMean, threshold, percallList, dumpFolder):
    plt.title(func)
    plt.figure(figsize=(10, 8))
    for delta in [0.2, 0.5, 1]:
        gaussY = []
        gaussX = []
        for item in percallList:
            # 这边为了呈现正态曲线效果,减去平均值
            gaussX.append(item - percallMean)
            y = gaussian(item - percallMean, 0, delta)
            gaussY.append(y)
        plt.plot(gaussX, gaussY, label='sigma={}'.format(delta))
    # 绘制水位线
    plt.plot([threshold - percallMean, threshold - percallMean], [0, 5 * gaussian(percallMean, 0, 1)], color='red',
             linestyle="-", label="Threshold:" + str("%.5f" % threshold))
    plt.xlabel("Time(s)", fontsize=12)
    plt.legend()
    plt.tight_layout()
    # 可能不同类中包含相同的函数名,加上行数参数避免覆盖
    imagePath = dumpFolder + "cost_%s_%s.png" % (func, str(line))
    plt.savefig(imagePath)

"""
绘制耗时曲线和安全阈值
"""
def drawCurve(func, line, percallList, dumpFolder):
    boxplotQ1 = numpy.percentile(percallList, 25)
    boxplotQ2 = numpy.percentile(percallList, 75)
    boxplotIQR = boxplotQ2 - boxplotQ1
    upperLimit = boxplotQ2 + 1.5 * boxplotIQR
    # 不符合正态分布,绘制波动曲线
    timeArray = [i for i in range(len(percallList))]
    plt.title(dataItem["func"])
    plt.figure(figsize=(10, 8))
    # 绘制水位线
    plt.plot([0, len(percallList)], [upperLimit, upperLimit], color='red', linestyle="-",
             label="Threshold:" + str("%.5f" % upperLimit))
    plt.plot(timeArray, percallList, label=dataItem["func"] + "_" + str(dataItem["line"]))
    plt.ylabel("Time(s)", fontsize=12)
    plt.legend()
    plt.tight_layout()
    imagePath = dumpFolder + "cost_%s_%s.png" % (func, str(line))
    plt.savefig(imagePath)

if __name__ == "__main__":
    try:
        statFolder = "/Users/chenwenguan/Downloads/2aab7e17-a1b6-1253/"
        chartFolder = statFolder + "chart/"
        if not os.path.exists(chartFolder):
            os.mkdir(chartFolder)
        filterData = []
        readStatsFile(statFolder, filterData);
        for dataItem in filterData:
            percallList = map(lambda x: x["percall"], dataItem["cost"])
            func = dataItem["func"]
            line = dataItem["line"]
            # 样本个数大于20才进行绘制
            if len(percallList) > 20:
                percallMean = numpy.mean(percallList) # 计算均值
                # percallVar = numpy.var(percallMap) # 求方差
                percallStd = numpy.std(percallList)  # 计算标准差
                # pvalue值大于0.05为正太分布
                kstestResult = stats.kstest(percallList, 'norm', (percallMean, percallStd))
                print "percallStd:%s, pvalue:%s" % (percallStd, kstestResult[1])
                # 符合正态分布绘制分布曲线
                if kstestResult[1] > 0.05:
                    threshold = percallMean + 3 * percallStd
                    drawGaussian(func, line, percallMean, threshold, percallList, chartFolder)
                else:
                    drawCurve(func, line, percallList, chartFolder)
            else:
                pass
    except Exception:
        print 'exeption:' + traceback.format_exc()

两种耗时模型绘制的曲线效果图如下:

函数耗时高斯分布曲线及阈值效果示例

 

函数耗时曲线及Turkey箱型图阈值示例

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

30 个好用的 Python 编程技巧、小贴士

作者 | Erik-Jan van Baaren 
译者 | 弯月,责编 | 屠敏 
出品 | CSDN(ID:CSDNnews)
以下为译文:
借本文为大家献上 Python 语言的 30 个最佳实践、小贴士和技巧,希望能对各位勤劳的程序员有所帮助,并希望大家工作顺利! 

1. Python 版本

在此想提醒各位:自2020年1月1日起,Python 官方不再支持 Python 2。本文中的很多示例只能在 Python 3 中运行。如果你仍在使用 Python 2.7,请立即升级。

2. Python 编程技巧 – 检查 Python 的最低版本

你可以在代码中检查 Python 的版本,以确保你的用户没有在不兼容的版本中运行脚本。检查方式如下:
if not sys.version_info > (27):
   # berate your user for running a 10 year
   # python version
elif not sys.version_info >= (35):
   # Kindly tell your user (s)he needs to upgrade
   # because you’re using 3.5 features

3.Python 编程技巧 – IPython

IPython 本质上就是一个增强版的shell。就冲着自动补齐就值得一试,而且它的功能还不止于此,它还有很多令我爱不释手的命令,例如:
  • %cd:改变当前的工作目录

  • %edit:打开编辑器,并关闭编辑器后执行键入的代码

  • %env:显示当前环境变量

  • %pip install [pkgs]:无需离开交互式shell,就可以安装软件包

  • %time 和 %timeit:测量执行Python代码的时间

完整的命令列表,请点击此处查看(https://ipython.readthedocs.io/en/stable/interactive/magics.html)。
还有一个非常实用的功能:引用上一个命令的输出。In 和 Out 是实际的对象。你可以通过 Out[3] 的形式使用第三个命令的输出。
IPython 的安装命令如下:
pip3 install ipython

4.Python 编程技巧 – 列表推导式

你可以利用列表推导式,避免使用循环填充列表时的繁琐。列表推导式的基本语法如下:
[ expression for item in list if conditional ]
举一个基本的例子:用一组有序数字填充一个列表:
mylist = [i for i in range(10)]
print(mylist)
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
由于可以使用表达式,所以你也可以做一些算术运算:
squares = [x**2 for x in range(10)]
print(squares)
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
甚至可以调用外部函数:
def some_function(a):
    return (a + 5) / 2

 

my_formula = [some_function(i) for i in range(10)]
print(my_formula)
# [2, 3, 3, 4, 4, 5, 5, 6, 6, 7]

最后,你还可以使用 ‘if’ 来过滤列表。在如下示例中,我们只保留能被2整除的数字:
filtered = [i for i in range(20) if i%2==0]
print(filtered)
# [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

5. Python 编程技巧 -检查对象使用内存的状况

你可以利用 sys.getsizeof() 来检查对象使用内存的状况:
import sys

 

mylist = range(010000)
print(sys.getsizeof(mylist))
# 48

等等,为什么这个巨大的列表仅包含48个字节?
因为这里的 range 函数返回了一个类,只不过它的行为就像一个列表。在使用内存方面,range 远比实际的数字列表更加高效。
你可以试试看使用列表推导式创建一个范围相同的数字列表: 
import sys

 

myreallist = [x for x in range(010000)]
print(sys.getsizeof(myreallist))
# 87632

6. Python 编程技巧 – 返回多个值

Python 中的函数可以返回一个以上的变量,而且还无需使用字典、列表或类。如下所示:
def get_user(id):
    # fetch user from database
    # ….
    return name, birthdate

 

name, birthdate = get_user(4)

如果返回值的数量有限当然没问题。但是,如果返回值的数量超过3个,那么你就应该将返回值放入一个(数据)类中。

7. Python 编程技巧 – 使用数据类

Python从版本3.7开始提供数据类。与常规类或其他方法(比如返回多个值或字典)相比,数据类有几个明显的优势:
  • 数据类的代码量较少

  • 你可以比较数据类,因为数据类提供了 __eq__ 方法

  • 调试的时候,你可以轻松地输出数据类,因为数据类还提供了 __repr__ 方法

  • 数据类需要类型提示,因此可以减少Bug的发生几率 

数据类的示例如下:
from dataclasses import dataclass

 

@dataclass
class Card:
    rank: str
    suit: str

card = Card(“Q”“hearts”)

print(card == card)
# True

print(card.rank)
# ‘Q’

print(card)
Card(rank=‘Q’, suit=‘hearts’)

详细的使用指南请点击这里(https://realpython.com/python-data-classes/)。

8. Python 编程技巧 – 交换变量

如下的小技巧很巧妙,可以为你节省多行代码:
a = 1
b = 2
a, b = b, a
print (a)
# 2
print (b)
# 1

9. Python 编程技巧 – 合并字典(Python 3.5以上的版本)

从Python 3.5开始,合并字典的操作更加简单了:
dict1 = { ‘a’: 1, ‘b’: 2 }
dict2 = { ‘b’: 3, ‘c’: 4 }
merged = { **dict1, **dict2 }
print (merged)
# {‘a’: 1, ‘b’: 3, ‘c’: 4}
如果 key 重复,那么第一个字典中的 key 会被覆盖。

10. Python 编程技巧 – 字符串的首字母大写

如下技巧真是一个小可爱:
mystring = “10 awesome python tricks”
print(mystring.title())
’10 Awesome Python Tricks’

11. Python 编程技巧 – 将字符串分割成列表

你可以将字符串分割成一个字符串列表。在如下示例中,我们利用空格分割各个单词:
mystring = “The quick brown fox”
mylist = mystring.split(‘ ‘)
print(mylist)
# [‘The’, ‘quick’, ‘brown’, ‘fox’]

12. Python 编程技巧 – 根据字符串列表创建字符串

与上述技巧相反,我们可以根据字符串列表创建字符串,然后在各个单词之间加入空格:
mylist = [‘The’‘quick’‘brown’‘fox’]
mystring = ” “.join(mylist)
print(mystring)
# ‘The quick brown fox’
你可能会问为什么不是 mylist.join(” “),这是个好问题!
根本原因在于,函数 String.join() 不仅可以联接列表,而且还可以联接任何可迭代对象。将其放在String中是为了避免在多个地方重复实现同一个功能。

13. Python 编程技巧 – 表情符

有些人非常喜欢表情符,而有些人则深恶痛绝。我在此郑重声明:在分析社交媒体数据时,表情符可以派上大用场。
首先,我们来安装表情符模块:
pip3 install emoji
安装完成后,你可以按照如下方式使用:
import emoji
result = emoji.emojize(‘Python is :thumbs_up:’)
print(result)
# ‘Python is 👍’

 

# You can also reverse this:
result = emoji.demojize(‘Python is 👍’)
print(result)
# ‘Python is :thumbs_up:’

更多有关表情符的示例和文档,请点击此处(https://pypi.org/project/emoji/)。

14. Python 编程技巧 – 列表切片

列表切片的基本语法如下:
a[start:stop:step]
start、stop 和 step 都是可选项。如果不指定,则会使用如下默认值:
  • start:0

  • end:字符串的结尾

  • step:1

示例如下:
# We can easily create a new list from 
# the first two elements of a list:
first_two = [1, 2, 3, 4, 5][0:2]
print(first_two)
# [1, 2]

 

# And if we use a step value of 2, 
# we can skip over every second number
# like this:
steps = [1, 2, 3, 4, 5][0:5:2]
print(steps)
# [1, 3, 5]

# This works on strings too. In Python,
# you can treat a string like a list of
# letters:
mystring = “abcdefdn nimt”[::2]
print(mystring)
# ‘aced it’

15. Python 编程技巧 – 反转字符串和列表

你可以利用如上切片的方法来反转字符串或列表。只需指定 step 为 -1,就可以反转其中的元素:
revstring = “abcdefg”[::-1]
print(revstring)
# ‘gfedcba’

 

revarray = [1, 2, 3, 4, 5][::-1]
print(revarray)
# [5, 4, 3, 2, 1]

16. Python 编程技巧 – 显示猫猫

我终于找到了一个充分的借口可以在我的文章中显示猫猫了,哈哈!当然,你也可以利用它来显示图片。首先你需要安装 Pillow,这是一个 Python 图片库的分支:
pip3 install Pillow
接下来,你可以将如下图片下载到一个名叫 kittens.jpg 的文件中:
然后,你就可以通过如下 Python 代码显示上面的图片:
from PIL import Image

 

im = Image.open(“kittens.jpg”)
im.show()
print(im.format, im.size, im.mode)
# JPEG (1920, 1357) RGB

Pillow 还有很多显示该图片之外的功能。它可以分析、调整大小、过滤、增强、变形等等。完整的文档,请点击这里(https://pillow.readthedocs.io/en/stable/)。

17. Python 编程技巧 – map()

Python 有一个自带的函数叫做 map(),语法如下:
map(functionsomething_iterable)
所以,你需要指定一个函数来执行,或者一些东西来执行。任何可迭代对象都可以。在如下示例中,我指定了一个列表:
def upper(s):
    return s.upper()

 

mylist = list(map(upper, [‘sentence’‘fragment’]))
print(mylist)
# [‘SENTENCE’, ‘FRAGMENT’]

# Convert a string representation of
# a number into a list of ints.
list_of_ints = list(map(int“1234567”)))
print(list_of_ints)
# [1, 2, 3, 4, 5, 6, 7]

你可以仔细看看自己的代码,看看能不能用 map() 替代某处的循环。

18. Python 编程技巧 – 获取列表或字符串中的唯一元素

如果你利用函数 set() 创建一个集合,就可以获取某个列表或类似于列表的对象的唯一元素:
mylist = [1, 1, 2, 3, 4, 5, 5, 5, 6, 6]
print (set(mylist))
# {1, 2, 3, 4, 5, 6}

 

# And since a string can be treated like a 
# list of letters, you can also get the 
# unique letters from a string this way:
print (set(“aaabbbcccdddeeefff”))
# {‘a’, ‘b’, ‘c’, ‘d’, ‘e’, ‘f’}

19. Python 编程技巧 – 查找出现频率最高的值

你可以通过如下方法查找出现频率最高的值:
test = [1, 2, 3, 4, 2, 2, 3, 1, 4, 4, 4]
print(max(set(test), key = test.count))
# 4
你能看懂上述代码吗?想法搞明白上述代码再往下读。
没看懂?我来告诉你吧:
  • max() 会返回列表的最大值。参数 key 会接受一个参数函数来自定义排序,在本例中为 test.count。该函数会应用于迭代对象的每一项。

  • test.count 是 list 的内置函数。它接受一个参数,而且还会计算该参数的出现次数。因此,test.count(1) 将返回2,而 test.count(4) 将返回4。

  • set(test) 将返回 test 中所有的唯一值,也就是 {1, 2, 3, 4}。

因此,这一行代码完成的操作是:首先获取 test 所有的唯一值,即{1, 2, 3, 4};然后,max 会针对每一个值执行 list.count,并返回最大值。
这一行代码可不是我个人的发明。

20. Python 编程技巧 – 创建一个进度条

你可以创建自己的进度条,听起来很有意思。但是,更简单的方法是使用 progress 包:
pip3 install progress
接下来,你就可以轻松地创建进度条了:
from progress.bar import Bar

 

bar = Bar(‘Processing’, max=20)
for i in range(20):
    # Do some work
    bar.next()
bar.finish()

21. Python 编程技巧 – 在交互式shell中使用_(下划线运算符)

你可以通过下划线运算符获取上一个表达式的结果,例如在 IPython 中,你可以这样操作:
In [1]: 3 * 3
Out[1]: 9In [2]: _ + 3
Out[2]: 12
Python Shell 中也可以这样使用。另外,在 IPython shell 中,你还可以通过 Out[n] 获取表达式 In[n] 的值。例如,在如上示例中,Out[1] 将返回数字9。

22. Python 编程技巧 – 快速创建Web服务器

你可以快速启动一个Web服务,并提供当前目录的内容:
python3 -m http.server
当你想与同事共享某个文件,或测试某个简单的HTML网站时,就可以考虑这个方法。

23. Python 编程技巧 – 多行字符串

虽然你可以用三重引号将代码中的多行字符串括起来,但是这种做法并不理想。所有放在三重引号之间的内容都会成为字符串,包括代码的格式,如下所示。
我更喜欢另一种方法,这种方法不仅可以将多行字符串连接在一起,而且还可以保证代码的整洁。唯一的缺点是你需要明确指定换行符。
s1 = “””Multi line strings can be put
        between triple quotes. It’s not ideal
        when formatting your code though”””

 

print (s1)
# Multi line strings can be put
#         between triple quotes. It’s not ideal
#         when formatting your code though

s2 = (“You can also concatenate multiple\n” +
        “strings this way, but you’ll have to\n”
        “explicitly put in the newlines”)

print(s2)
# You can also concatenate multiple
# strings this way, but you’ll have to
# explicitly put in the newlines

24. Python 编程技巧 – 条件赋值中的三元运算符

这种方法可以让代码更简洁,同时又可以保证代码的可读性:
[on_trueif [expression] else [on_false]
示例如下:
x = “Success!” if (y == 2) else “Failed!”

25. Python 编程技巧 – 统计元素的出现次数

你可以使用集合库中的 Counter 来获取列表中所有唯一元素的出现次数,Counter 会返回一个字典:
from collections import Counter

 

mylist = [1123455566]
c = Counter(mylist)
print(c)
# Counter({1: 2, 2: 1, 3: 1, 4: 1, 5: 3, 6: 2})

# And it works on strings too:
print(Counter(“aaaaabbbbbccccc”))
# Counter({‘a’: 5, ‘b’: 5, ‘c’: 5})

26. Python 编程技巧 – 比较运算符的链接

你可以在 Python 中将多个比较运算符链接到一起,如此就可以创建更易读、更简洁的代码:
x = 10

 

# Instead of:
if x > 5 and x < 15:
    print(“Yes”)
# yes

# You can also write:
if 5 < x < 15:
    print(“Yes”)
# Yes

27. Python 编程技巧 – 添加颜色

你可以通过 Colorama,设置终端的显示颜色:
from colorama import Fore, Back, Style

 

print(Fore.RED + ‘some red text’)
print(Back.GREEN + ‘and with a green background’)
print(Style.DIM + ‘and in dim text’)
print(Style.RESET_ALL)
print(‘back to normal now’)

28. Python 编程技巧 – 日期的处理

python-dateutil 模块作为标准日期模块的补充,提供了非常强大的扩展,你可以通过如下命令安装: 
pip3 install python-dateutil 
你可以利用该库完成很多神奇的操作。在此我只举一个例子:模糊分析日志文件中的日期:
from dateutil.parser import parse

 

logline = ‘INFO 2020-01-01T00:00:01 Happy new year, human.’
timestamp = parse(log_line, fuzzy=True)
print(timestamp)
# 2020-01-01 00:00:01

你只需记住:当遇到常规 Python 日期时间功能无法解决的问题时,就可以考虑 python-dateutil !

29.Python 编程技巧 – 整数除法

在 Python 2 中,除法运算符(/)默认为整数除法,除非其中一个操作数是浮点数。因此,你可以这么写:
# Python 2
5 / 2 = 2
5 / 2.0 = 2.5
在 Python 3 中,除法运算符(/)默认为浮点除法,而整数除法的运算符为 //。因此,你需要这么写:
Python 3
5 / 2 = 2.5
5 // 2 = 2
这项变更背后的动机,请参阅 PEP-0238(https://www.python.org/dev/peps/pep-0238/)。

30. Python 编程技巧 – 通过chardet 来检测字符集

你可以使用 chardet 模块来检测文件的字符集。在分析大量随机文本时,这个模块十分实用。安装方法如下:
pip install chardet
安装完成后,你就可以使用命令行工具 chardetect 了,使用方法如下:
chardetect somefile.txt
somefile.txtascii with confidence 1.0
你也可以在编程中使用该库,完整的文档请点击这里:
https://chardet.readthedocs.io/en/latest/usage.html
这 30 个小例子虽然有一些是老生长谈,但是确实非常经典,值得反复记忆、练习和收藏!

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 单线程、多线程和协程的爬虫性能对比

单线程、多线程和协程的爬虫性能对比

今天我要给大家分享的是如何爬取豆瓣上深圳近期即将上映的电影影讯,并分别用普通的单线程、多线程和协程来爬取,从而对比单线程、多线程和协程在网络爬虫中的性能。

具体要爬的网址是:https://movie.douban.com/cinema/later/shenzhen/

除了要爬入口页以外还需爬取每个电影的详情页,具体要爬取的结构信息如下:

1.基础爬取测试

下面我演示使用xpath解析数据。

入口页数据读取:

import requests
from lxml import etree
import pandas as pd
import re

main_url = "https://movie.douban.com/cinema/later/shenzhen/"
headers = {
    "Accept-Encoding""Gzip",
    "User-Agent""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"
}
r = requests.get(main_url, headers=headers)
r

结果:

<Response [200]>

检查一下所需数据的xpath:

可以看到每个电影信息都位于id为showing-soon下面的div里面,再分别分析内部的电影名称、url和想看人数所处的位置,于是可以写出如下代码:

html = etree.HTML(r.text)
all_movies = html.xpath("//div[@id='showing-soon']/div")
result = []
for e in all_movies:
    #  imgurl, = e.xpath(".//img/@src")
    name, = e.xpath(".//div[@class='intro']/h3/a/text()")
    url, = e.xpath(".//div[@class='intro']/h3/a/@href")
    # date, movie_type, pos = e.xpath(".//div[@class='intro']/ul/li[@class='dt']/text()")
    like_num, = e.xpath(
        ".//div[@class='intro']/ul/li[@class='dt last']/span/text()")
    result.append((name, int(like_num[:like_num.find("人")]), url))
main_df = pd.DataFrame(result, columns=["影名""想看人数""url"])
main_df

结果:

然后再选择一个详情页的url进行测试,我选择了熊出没·狂野大陆这部电影,因为文本数据相对最复杂,也最具备代表性:

url = main_df.at[17"url"]
url

结果:

'https://movie.douban.com/subject/34825886/'

分析详情页结构:

文本信息都在这个位置中,下面我们直接提取这个div下面的所有文本节点:

r = requests.get(url, headers=headers)
html = etree.HTML(r.text)
movie_infos = html.xpath("//div[@id='info']//text()")
print(movie_infos)

结果:

['\n        ''导演'': ''丁亮''\n        ''编剧'': ''徐芸'' / ''崔铁志'' / ''张宇''\n        ''主演'': ''张伟'' / ''张秉君'' / ''谭笑''\n        ''类型:'' ''喜剧'' / ''科幻'' / ''动画''\n        \n        ''制片国家/地区:'' 中国大陆''\n        ''语言:'' 汉语普通话''\n        ''上映日期:'' ''2021-02-12(中国大陆)'' / ''2020-08-01(上海电影节)''\n        ''片长:'' ''100分钟''\n        ''又名:'' 熊出没大电影7 / 熊出没科幻大电影 / Boonie Bears: The Wild Life''\n        ''IMDb链接:'' ''tt11654032''\n\n']

为了阅读方便,拼接一下:

movie_info_txt = "".join(movie_infos)
print(movie_info_txt)

结果:

        导演: 丁亮
        编剧: 徐芸 / 崔铁志 / 张宇
        主演: 张伟 / 张秉君 / 谭笑
        类型: 喜剧 / 科幻 / 动画
        
        制片国家/地区: 中国大陆
        语言: 汉语普通话
        上映日期: 2021-02-12(中国大陆) / 2020-08-01(上海电影节)
        片长: 100分钟
        又名: 熊出没大电影7 / 熊出没科幻大电影 / Boonie Bears: The Wild Life
        IMDb链接: tt11654032

接下来就简单了:

row = {}
for line in re.split("[\n ]*\n[\n ]*", movie_info_txt):
    line = line.strip()
    arr = line.split(": ", maxsplit=1)
    if len(arr) != 2:
        continue
    k, v = arr
    row[k] = v
row

结果:

{'导演''丁亮',
 '编剧''徐芸 / 崔铁志 / 张宇',
 '主演''张伟 / 张秉君 / 谭笑',
 '类型''喜剧 / 科幻 / 动画',
 '制片国家/地区''中国大陆',
 '语言''汉语普通话',
 '上映日期''2021-02-12(中国大陆) / 2020-08-01(上海电影节)',
 '片长''100分钟',
 '又名''熊出没大电影7 / 熊出没科幻大电影 / Boonie Bears: The Wild Life',
 'IMDb链接''tt11654032'}

可以看到成功的切割出了每一项。

下面根据上面的测试基础,我们完善整体的爬虫代码:

2.单线程 爬虫性能

import requests
from lxml import etree
import pandas as pd
import re

main_url = "https://movie.douban.com/cinema/later/shenzhen/"
headers = {
    "Accept-Encoding""Gzip",
    "User-Agent""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"
}
r = requests.get(main_url, headers=headers)
html = etree.HTML(r.text)
all_movies = html.xpath("//div[@id='showing-soon']/div")
result = []
for e in all_movies:
    imgurl, = e.xpath(".//img/@src")
    name, = e.xpath(".//div[@class='intro']/h3/a/text()")
    url, = e.xpath(".//div[@class='intro']/h3/a/@href")
    print(url)
#     date, movie_type, pos = e.xpath(".//div[@class='intro']/ul/li[@class='dt']/text()")
    like_num, = e.xpath(
        ".//div[@class='intro']/ul/li[@class='dt last']/span/text()")
    r = requests.get(url, headers=headers)
    html = etree.HTML(r.text)
    row = {}
    row["电影名称"] = name
    for line in re.split("[\n ]*\n[\n ]*""".join(html.xpath("//div[@id='info']//text()")).strip()):
        line = line.strip()
        arr = line.split(": ", maxsplit=1)
        if len(arr) != 2:
            continue
        k, v = arr
        row[k] = v
    row["想看人数"] = int(like_num[:like_num.find("人")])
#     row["url"] = url
#     row["图片地址"] = imgurl
#     print(row)
    result.append(row)
df = pd.DataFrame(result)
df.sort_values("想看人数", ascending=False, inplace=True)
df.to_csv("shenzhen_movie.csv", index=False)

结果:

https://movie.douban.com/subject/26752564/
https://movie.douban.com/subject/35172699/
https://movie.douban.com/subject/34992142/
https://movie.douban.com/subject/30349667/
https://movie.douban.com/subject/30283209/
https://movie.douban.com/subject/33457717/
https://movie.douban.com/subject/30487738/
https://movie.douban.com/subject/35068230/
https://movie.douban.com/subject/27039358/
https://movie.douban.com/subject/30205667/
https://movie.douban.com/subject/30476403/
https://movie.douban.com/subject/30154423/
https://movie.douban.com/subject/27619748/
https://movie.douban.com/subject/26826330/
https://movie.douban.com/subject/26935283/
https://movie.douban.com/subject/34841067/
https://movie.douban.com/subject/34880302/
https://movie.douban.com/subject/34825886/
https://movie.douban.com/subject/34779692/
https://movie.douban.com/subject/35154209/

爬到的文件:

整体耗时:

42.5 秒

3.多线程 爬虫性能

单线程的爬取耗时还是挺长的,下面看看使用多线程的爬取效率:

import requests
from lxml import etree
import pandas as pd
import re
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED


def fetch_content(url):
    print(url)
    headers = {
        "Accept-Encoding""Gzip",  # 使用gzip压缩传输数据让访问更快
        "User-Agent""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"
    }
    r = requests.get(url, headers=headers)
    return r.text


url = "https://movie.douban.com/cinema/later/shenzhen/"
init_page = fetch_content(url)
html = etree.HTML(init_page)
all_movies = html.xpath("//div[@id='showing-soon']/div")
result = []
for e in all_movies:
#     imgurl, = e.xpath(".//img/@src")
    name, = e.xpath(".//div[@class='intro']/h3/a/text()")
    url, = e.xpath(".//div[@class='intro']/h3/a/@href")
#     date, movie_type, pos = e.xpath(".//div[@class='intro']/ul/li[@class='dt']/text()")
    like_num, = e.xpath(
        ".//div[@class='intro']/ul/li[@class='dt last']/span/text()")
    result.append((name, int(like_num[:like_num.find("人")]), url))
main_df = pd.DataFrame(result, columns=["影名""想看人数""url"])

max_workers = main_df.shape[0]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
    future_tasks = [executor.submit(fetch_content, url) for url in main_df.url]
    wait(future_tasks, return_when=ALL_COMPLETED)
    pages = [future.result() for future in future_tasks]

result = []
for url, html_text in zip(main_df.url, pages):
    html = etree.HTML(html_text)
    row = {}
    for line in re.split("[\n ]*\n[\n ]*""".join(html.xpath("//div[@id='info']//text()")).strip()):
        line = line.strip()
        arr = line.split(": ", maxsplit=1)
        if len(arr) != 2:
            continue
        k, v = arr
        row[k] = v
    row["url"] = url
    result.append(row)
detail_df = pd.DataFrame(result)
df = main_df.merge(detail_df, on="url")
df.drop(columns=["url"], inplace=True)
df.sort_values("想看人数", ascending=False, inplace=True)
df.to_csv("shenzhen_movie2.csv", index=False)
df

结果:

耗时 8 秒

由于每个子页面都是单独的线程爬取,每个线程几乎都是同时在工作,所以最终耗时仅取决于爬取最慢的子页面

4.协程 异步爬虫性能

由于我在jupyter中运行,为了使协程能够直接在jupyter中直接运行,所以我在代码中增加了下面两行代码,在普通编辑器里面可以去掉:

import nest_asyncio
nest_asyncio.apply()

这个问题是因为jupyter所依赖的高版本Tornado存在bug,将Tornado退回到低版本也可以解决这个问题。

下面我使用协程来完成这个需求的爬取:

import aiohttp
from lxml import etree
import pandas as pd
import re
import asyncio
import nest_asyncio
nest_asyncio.apply()


async def fetch_content(url):
    print(url)
    header = {
        "Accept-Encoding""Gzip",  # 使用gzip压缩传输数据让访问更快
        "User-Agent""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"
    }
    async with aiohttp.ClientSession(
        headers=header, connector=aiohttp.TCPConnector(ssl=False)
    ) as session:
        async with session.get(url) as response:
            return await response.text()


async def main():
    url = "https://movie.douban.com/cinema/later/shenzhen/"
    init_page = await fetch_content(url)
    html = etree.HTML(init_page)
    all_movies = html.xpath("//div[@id='showing-soon']/div")
    result = []
    for e in all_movies:
        #         imgurl, = e.xpath(".//img/@src")
        name, = e.xpath(".//div[@class='intro']/h3/a/text()")
        url, = e.xpath(".//div[@class='intro']/h3/a/@href")
    #     date, movie_type, pos = e.xpath(".//div[@class='intro']/ul/li[@class='dt']/text()")
        like_num, = e.xpath(
            ".//div[@class='intro']/ul/li[@class='dt last']/span/text()")
        result.append((name, int(like_num[:like_num.find("人")]), url))
    main_df = pd.DataFrame(result, columns=["影名""想看人数""url"])

    tasks = [fetch_content(url) for url in main_df.url]
    pages = await asyncio.gather(*tasks)

    result = []
    for url, html_text in zip(main_df.url, pages):
        html = etree.HTML(html_text)
        row = {}
        for line in re.split("[\n ]*\n[\n ]*""".join(html.xpath("//div[@id='info']//text()")).strip()):
            line = line.strip()
            arr = line.split(": ", maxsplit=1)
            if len(arr) != 2:
                continue
            k, v = arr
            row[k] = v
        row["url"] = url
        result.append(row)
    detail_df = pd.DataFrame(result)
    df = main_df.merge(detail_df, on="url")
    df.drop(columns=["url"], inplace=True)
    df.sort_values("想看人数", ascending=False, inplace=True)
    return df

df = asyncio.run(main())
df.to_csv("shenzhen_movie3.csv", index=False)
df

结果:

耗时仅 7 秒,相对比多线程更快一点

由于request库不支持协程,所以我使用了支持协程的aiohttp进行页面抓取

当然实际爬取的耗时还取绝于当时的网络,但整体来说,协程爬取会比多线程爬虫稍微快一些

5.性能对比回顾

今天我向你演示了,单线程爬虫、多线程爬虫和协程爬虫

可以看到,一般情况下协程爬虫速度最快,多线程爬虫略慢一点,单线程爬虫则必须上一个页面爬取完成才能继续爬取。

但协程爬虫相对来说并不是那么好编写,数据抓取无法使用request库,只能使用aiohttp

所以在实际编写爬虫时,我们一般都会使用多线程爬虫来提速,但必须注意的是网站都有ip访问频率限制,爬的过快可能会被封ip,所以一般我们在多线程提速的同时使用代理ip来并发的爬取数据

6.彩蛋:xpath+pandas解析表格并提取url

我们在深圳影的底部能够看到一个[查看全部即将上映的影片] (https://movie.douban.com/coming)的按钮,点进去能够看到一张完整近期上映电影的列表,发现这个列表是个table标签的数据:

那就简单了,解析table我们可能压根就不需要用xpath,直接用pandas即可,但片名中包含的url地址还需解析,所以我采用xpath+pandas来解析这个网页,看看我的代码吧:

import pandas as pd
import requests
from lxml import etree

headers = {
    "Accept-Encoding""Gzip",
    "User-Agent""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"
}
r = requests.get("https://movie.douban.com/coming", headers=headers)
html = etree.HTML(r.text)
table_tag = html.xpath("//table")[0]
df, = pd.read_html(etree.tostring(table_tag))
urls = table_tag.xpath(".//td[2]/a/@href")
df["url"] = urls
df

结果

这样就能到了主页面的完整数据,再简单的处理一下即可

结语

感谢各位读者,有什么想法和收获欢迎留言评论噢!

本文转自AirPython.

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Jupyter 中 IPython 可以用的12个方便命令

我在日常编程中一般都会用到两个工具——Pycharm和Jupyter,在刷算法、写爬虫时会用到前者,因为我习惯用Pycharm里的Debug功能调试,很容易找出代码中的Bug。而进行数据分析、机器学习时就会用到后者,因为Jupyter编译器利用的IPython是一种交互式计算和开发环境,而且有许多方便命令。
Jupyter对数据的可视化十分友好,这类单元格的形式每一步都有运行结果,便于整理自己思路,并且很大程度上节约了运行时间,在调试的时候只需要运行出错的部分代码,而不是全部。

IPython中有一些特有的魔法命令,如果能合理的利用这些魔法命令,会省去很多不必要的操作,为编程带来很大程度的便利,下面就来安利十二个常用的魔法命令。

方便命令的基础常识

  • ?和?? ->例:%matplotlib?、%matplotlib??

后缀为?可以获取一个对象的相关信息,比如描述一个方法该怎么用;后缀为??可以获取该对象更加详细的信息,比如源码。这个对象可以是IPython中自带的、也可以是导入的、也可以是自己定义的。

  • %和%% ->例:%time、%%time

前缀为%被称作行魔法命令(line magics),只能在单个输入行上运行;前缀为%%被称作单元格魔法命令(cell magics),可以在多个输入行上运行。

1.%Ismagic和%magic

如果你还不了解IPython的魔法命令,那这两个魔法命令一定是最重要的,记牢这两个命令之后慢慢了解剩下的。%lsmagic的作用就是列出所有存在的行魔法命令和单元格魔法命令,部分截图如下:

%magic的作用就是给出所有魔法命令的详细介绍,比如介绍、样例等等,比较考验英语功底,耐下心慢慢了解。

2.%pdb

输入这个命令并且运行之后,如果后面的代码出现了异常,这个指令就会主动进入调试器,几十行几百行代码难免会有几个或一堆Bug。可能比较笨的方法就是找断点然后print,最后还要把print删掉,而%pdb调试找到Bug后直接退出就好,相对前者更方便些。

比如两个数相加,不小心把一个整数定义成字符型,在调用函数计算时会发生报错,然后就可以进入调试器进行调试,切记最后要通过exit()退出,不能直接终止单元格运行。

3.%debug

%debug的作用与%pdb几乎是一样的,不同之处就是%pdb在遇到异常自动进入调试器,而%debug是人遇到报错主动输入指令进入调试器,仍然是上面那个例子,调试界面如下:
主动和被动两种调试方式大家可以靠自己喜好选择,我个人比较喜欢%debug。

4.%who和%whos

代码一多变量可能就会变多,变量一多可能就会混淆,或者在删除单元格的时候不小心把变量定义的单元格也删掉了,%who和%whos这两条命令就起到大作用了。
%who给出的信息只有全局变量的名称,而%whos给出的信息更加详细,包括变量名称、类型、和数据。

5.%time和%timeit

这两条命令都是用来输出代码的执行时间,比如可以用来粗略的比较两种算法在相同的问题上执行时间哪一个更少,不同点在于%time只执行一次就输出执行时间,而%timeit是执行多次然后计算平均时间再输出。
比如这里%timeit命令输出中有7 runs代表共执行7次,这两个命令都为行命令,%%time和%%timeit为单元格命令,区别同上。

6.%store

如果你在一个文件中花了很长的时间清理了一些数据,比如对原始数据缺失值填充呀、降维呀、转换呀等等,然后在另一个文件中需要用到同样的数据,笨一点方法就是将数据保存然后在新文件中调用,但这种操作一条%store命令就能完成,我们先在一个文件中利用%store保存一个变量。
然后在另一个文件中调用这个变量:
可以看到直接调用是会报错的,但利用了%store -r命令之后就可以成功调用被%store保存的变量,所以%store用来保存,%store -r用来读取。

7.%xdel和%reset

这条命令的作用就是删除变量,并且删除其在IPython中的对象上的一切引用。平时在数据清洗时,从原始数据到清洗后的数据中间要经过很多步骤,我们不可能全程用一个变量名称,所以中间步骤很容易为数据起一些类似的名称,而利用%xdel就可以将无用的单个变量名称删掉,防止混淆。
%reset的作用就是删除所有变量名。

8.%cls

在数据清洗时候,通常都是做一步然后输出一次数据集,观察一下变化,我们都知道展示数据集是很占网页的,久而久之,这个notebook就特别长,再想查看文件前面的内容不仅需要滚动很长时间滑轮,而且数据间很容易混淆,所以每当输出一次数据集后可以利用%cls命令清除一次,使notebook看起来更整洁。
可以看到正常的话data之后会打印数据集,但利用%cls之后数据集的输出被清除了。

9.%%writefile

如果我们想写一个函数,例如去除中文符号的函数,这样的函数在很多情景下都可以利用,所以我们可以将这个函数写入一个单独文件,想用的时候直接调用,这个操作可以利用%%writefile命令进行写入。

10.%run

%run命令的作用就是运行脚本文件,不仅可以直接使用脚本文件中的代码,脚本文件也可以使用IPython环境中的变量,仍用上面的例子,可以用%run命令直接运行。

11.%psource

如果你在notebook定义了一个函数,但隔了比较久需要用到这个函数,但是可能忘记了这个函数需要传入哪些参数、或者传入参数的类型应该是什么,这种情况下就不得不往前翻寻找这个函数的代码,但利用%psource可以偷懒,这个命令就是输出源代码。
前面提及的??也有相同的作用,但是输出的形式没有%psource直观,还混有其它的信息在里面。

12.%hist

%hist的作用就是打印所有命令行输入的历史记录,方便查看之前输入的代码信息。
这个命令允许设置查询的区间,也就是命令行输入对应的序号。
这些魔法命令有一部分能被常用的代码语句代替,但是却没有魔法命令简单明了,只是个人习惯的问题,如果可能尽量改掉自己的思维定式,用更加便捷的代码处理问题。
转自Python编程时光。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 性能测试工具 Locust 极简入门

作者:dongfanger
来源:dongfanger
Locust是一款Python技术栈的开源的性能测试工具。Locust直译为蝗虫,寓意着它能产生蝗虫般成千上万的并发用户:
Locust并不小众,从它Github的Star数量就可见一斑:
截止文章写作时,一共15951Star。
Locust生态良好,它已在多家外企(包括世界500强)投入使用:
如此看来,Locust是非常值得学习和掌握的一款工具。
Python的魔力在于化繁为简,基于Python的Locust也能给仍然困惑于性能测试的我们带来启发。


1.Locust特点

  • 以纯Python方式编写用户脚本,提供极大自由度。

  • 用户脚本可以串行方式编写,Locust会通过轻量级进程/协程产生并发,无需自己做并发编程。

  • 并发量大,借助于gevent库,Locust能产生成千上万并发请求。

  • 开销小,Locust用户运行时开销很小。

  • 良好的Web UI对性能结果实时监测。

  • 能测任何系统任何协议,只需要写个client即可。

  • 开放REST API,尽情发挥。


2.安装Locust

需要Python版本3.6及以上。
执行pip命令:
$ pip install locust
验证安装成功:
$ locust -V
安装时会一并安装依赖库:
Installing collected packages: Werkzeug, pywin32, zope.event, greenlet, gevent, geventhttpclient, itsdangerous, flask, Flask-BasicAuth, ConfigArgParse, pyzmq, psutil, locust
能看出来flask为Locust提供了Web功能。


3.快速上手

使用Locust一般按照以下步骤进行:
  1. 编写Python用户脚本。

  2. 使用locust命令执行性能测试。

  3. (可选)通过Web界面监测结果。

示例代码如下,新建locustfile.py文件:
import time
from locust import HttpUser, task, between

class QuickstartUser(HttpUser):
wait_time = between(1, 2.5)

@task
def hello_world(self):
self.client.get(“/hello”)
self.client.get(“/world”)

@task(3)
def view_items(self):
for item_id in range(10):
self.client.get(f”/item?id={item_id}, name=“/item”)
time.sleep(1)

def on_start(self):
self.client.post(“/login”, json={“username”:“foo”, “password”:“bar”})
路径切换到locustfile.py文件所在目录,执行命令:
$ locust
也可以通过-f指定某个目录文件:
$ locust -f locust_files/my_locust_file.py
运行后,打开http://127.0.0.1:8089看到Web界面:
填写信息后,就能开始压测了。Web界面提供了结果统计数据:
和性能指标走势图:


4.脚本解析

示例脚本解析如下:
# Locust用户脚本就是Python模块
import time
from locust import HttpUser, task, between

# 类继承自HttpUser
class QuickstartUser(HttpUser):
# 每个模拟用户等待1~2.5秒
wait_time = between(1, 2.5)

# 被@task装饰的才会并发执行
@task
def hello_world(self):
# client属性是HttpSession实例,用来发送HTTP请求
self.client.get(“/hello”)
self.client.get(“/world”)

# 每个类只会有一个task被选中执行
# 3代表weight权重
# 权重越大越容易被选中执行
# view_items比hello_wolrd多3倍概率被选中执行
@task(3)
def view_items(self):
for item_id in range(10):
# name参数作用是把统计结果按同一名称进行分组
# 这里防止URL参数不同会产生10个不同记录不便于观察
# 把10个汇总成1个”/item”记录
self.client.get(f”/item?id={item_id}, name=“/item”)
time.sleep(1)

# 每个模拟用户开始运行时都会执行
def on_start(self):
self.client.post(“/login”, json={“username”:“foo”, “password”:“bar”})

小结

本文先了解了Locust的背景和生态,它是值得学习的,对于Python技术栈来说更加如此。接着介绍了使用pip命令安装Locust,其中发现顺带安装了flask,Locust的Web功能是flask提供的
然后给出了一段示例代码,按照步骤上手Locust。最后对示例代码进行了解析,浅尝辄止。locustfile实际上该怎么写呢?
参考资料:
https://locust.io/
https://docs.locust.io/en/stable/

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典