专属Python开发者的完美终端工具—Rich

Rich 是一个 Python 库,可以为你在终端中提供富文本和漂亮、精美的格式。

使用 Rich API 可以很容易的在终端输出添加各种颜色和不同风格。它可以绘制漂亮的表格,进度条,markdown,突出显示语法的源代码及回溯等等,优秀的功能不胜枚举。

我已经将本文全部示例存放在网盘中,在Python实用宝典公众号后台回复 rich示例 可以下载全部示例。

1.Rich 兼容性

Rich 适用于 Linux,OSX 和 Windows。可与新的 Windows 终端一起使用,Windows 的经典终端仅限 8 种颜色。

Rich 还可以与Jupyter NoteBook一起使用,而无需其他配置。

2.Rich 安装说明

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install rich

3.Rich 的 Print 功能

想毫不费力地将 Rich 的输出功能添加到你的Python脚本程序中,你只需导入 rich print 方法,该方法和其他 Python 的自带功能的参数类似。 你可以试试:

from rich import print

print("Hello, [bold magenta]World[/bold magenta]!", ":vampire:", locals())

可以看到,基于 rich 的 print 方法输出的内容都是带颜色、带重点的,相比于Python自带的 print 有明显的优势。

4.自定义 Console 控制台输出

想要对 Rich 终端内容进行更多的自定义设置,你需要导入并构造一个控制台对象:

from rich.console import Console

console = Console()

Console 对象含有一个 print 方法,它的界面与 python 内置的 print 功能相似。你可以试试:

console.print("Hello", "World!")

你可能已经料到,这时终端上会显示“ Hello World!”,请注意,与内置的“打印”功能不同,Rich 会将文字自动换行以适合终端宽度。

有几种方法可以为输出添加自定义颜色和样式。你可以通过添加 style 关键字参数来为整个输出设置样式。例子如下:

console.print("Hello", "World!", style="bold red")

输出如下图:

这个范例一次只设置了一行文字的样式。如果想获得更细腻更复杂的样式,Rich 可以渲染一个特殊的标记,其语法类似于 bbcode。示例如下:

console.print("Where there is a [bold cyan]Will[/bold cyan] there [u]is[/u] a [i]way[/i].")

5.Console 控制台记录

Console 对象具有一个 log() 方法,该方法具有与 print() 类似的界面,除此之外,还能显示当前时间以及被调用的文件和行。

默认情况下,Rich 将针对 Python 结构和 repr 字符串进行语法突出显示。如果你记录一个集合(如字典或列表),Rich 会把它漂亮地打印出来,使其切合可用空间。下面是其中一些功能的示例:

from rich.console import Console
console = Console()

test_data = [
    {"jsonrpc": "2.0", "method": "sum", "params": [None, 1, 2, 4, False, True], "id": "1",},
    {"jsonrpc": "2.0", "method": "notify_hello", "params": [7]},
    {"jsonrpc": "2.0", "method": "subtract", "params": [42, 23], "id": "2"},
]

def test_log():
    enabled = False
    context = {
        "foo": "bar",
    }
    movies = ["Deadpool", "Rise of the Skywalker"]
    console.log("Hello from", console, "!")
    console.log(test_data, log_locals=True)


test_log()

以上范例的输出如下:

注意其中的 log_locals 参数会输出一个表格,该表格包含调用 log 方法的局部变量。

log 方法既可用于将长时间运行应用程序(例如服务器)的日志记录到终端,也可用于辅助调试。

Logging 处理程序

你还可以使用内置的处理类来对 Python 日志记录模块的输出进行格式化和着色。下面是输出示例:

6. 表情符号

将名称放在两个冒号之间即可在控制台输出中插入表情符号。示例如下:

>>> console.print(":smiley: :vampire: :pile_of_poo: :thumbs_up: :raccoon:")
😃 🧛 💩 👍 🦝

请谨慎地使用此功能。

7.表格

Rich 包含多种边框,样式,单元格对齐等格式设置的选项。下面是一个简单的示例:

from rich.console import Console
from rich.table import Column, Table

console = Console()

table = Table(show_header=True, header_style="bold magenta")
table.add_column("Date", style="dim", width=12)
table.add_column("Title")
table.add_column("Production Budget", justify="right")
table.add_column("Box Office", justify="right")
table.add_row(
    "Dev 20, 2019", "Star Wars: The Rise of Skywalker", "$275,000,000", "$375,126,118"
)
table.add_row(
    "May 25, 2018",
    "[red]Solo[/red]: A Star Wars Story",
    "$275,000,000",
    "$393,151,347",
)
table.add_row(
    "Dec 15, 2017",
    "Star Wars Ep. VIII: The Last Jedi",
    "$262,000,000",
    "[bold]$1,332,539,889[/bold]",
)

console.print(table)

该示例的输出如下:

请注意,控制台标记的呈现方式与 print() 和 log() 相同。实际上,由 Rich 渲染的任何内容都可以添加到标题/行(甚至其他表格)中。

Table 类很聪明,可以调整列的大小以适合终端的可用宽度,并能根据需要做文本环绕的处理。下面是相同的示例,输出与比上表小的终端上:

8.进度条

Rich 可以渲染多个不闪烁的进度条形图,以跟踪长时间运行的任务。

基本用法:用 track 函数调用程序并迭代结果。下面是一个例子:

from rich.progress import track

for step in track(range(100)):
    do_step(step)

添加多个进度条并不难。以下是效果示例:

这些列可以配置为显示你所需的任何详细信息。

内置列包括完成百分比,文件大小,文件速度和剩余时间。下面是显示正在进行的下载的示例:

它可以在显示进度的同时下载多个 URL。要自己尝试一下,请参阅示例文件中的 examples/downloader.py ,在Python实用宝典公众号后台回复 rich示例下载全部示例。

9.按列输出数据

Rich 可以将内容通过排列整齐的,具有相等或最佳的宽度的来呈现。下面是(macOS / Linux)ls 命令的一个非常基本的克隆,用列来显示目录列表:

import os
import sys

from rich import print
from rich.columns import Columns

directory = os.listdir(sys.argv[1])
print(Columns(directory))

以下屏幕截图是列示例的输出,该列显示了从 API 提取的数据:

10.Markdown

Rich 可以呈现markdown,相当不错的将其格式显示到终端。

为了渲染 markdown,请导入 Markdown 类,将其打印到控制台。例子如下:

from rich.console import Console
from rich.markdown import Markdown

console = Console()
with open("README.md") as readme:
    markdown = Markdown(readme.read())
console.print(markdown)

该例子的输出如下图:

11.语法突出显示

Rich 使用 pygments 库来实现语法高亮显示。用法类似于渲染 markdown。构造一个 Syntax 对象并将其打印到控制台。下面是一个例子:

from rich.console import Console
from rich.syntax import Syntax

my_code = '''
def iter_first_last(values: Iterable[T]) -> Iterable[Tuple[bool, bool, T]]:
    """Iterate and generate a tuple with a flag for first and last value."""
    iter_values = iter(values)
    try:
        previous_value = next(iter_values)
    except StopIteration:
        return
    first = True
    for value in iter_values:
        yield first, False, previous_value
        first = False
        previous_value = value
    yield first, True, previous_value
'''
syntax = Syntax(my_code, "python", theme="monokai", line_numbers=True)
console = Console()
console.print(syntax)

输出如下:

12.错误回溯(traceback)

Rich 可以渲染漂亮的错误回溯日志,比标准的 Python 回溯更容易阅读,并能显示更多代码。

你可以将 Rich 设置为默认的回溯处理程序,这样所有异常都将由 Rich 为你呈现。

下面是在 OSX(与 Linux 类似)上的外观:

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

什么是m3u8格式? Python合并ts文件至mp4格式及解密教程

m3u8是什么格式?m3u8是苹果公司推出的视频播放标准,是m3u的一种,只是编码格式采用的是UTF-8。使用m3u8格式文件主要因为可以实现多码率视频的适配,视频网站可以根据用户的网络带宽情况,自动为客户端匹配一个合适的码率文件进行播放,从而保证视频的流畅度。

m3u8准确来说是一种索引文件,使用m3u8文件实际上是通过它来解析对应的放在服务器上的视频网络地址,从而实现在线播放。它将视频切割成一小段一小段的ts格式的视频文件,然后存在服务器中(现在为了减少I/o访问次数,一般存在服务器的内存中),通过m3u8解析出来路径,然后去请求。

合并 ts 文件其实有很多种方法,有一些教程直接使用 cmd 的 copy 命令直接合并 ts 文件:

copy /b  movie*.ts movie_new.ts

这个方法虽然可以合并,但是无法转化为 mp4 格式,而且也有可能出现视频缺损的情况。因此本文将讲解如何使用 ffmpeg 合并 ts 文件为mp4格式,使用 ffmpeg 也能有效防止视频出现缺损的问题。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

安装ffmpeg

Mac (打开终端(Terminal), 用 homebrew 安装):

brew install ffmpeg --with-libvorbis --with-sdl2 --with-theora

Linux:

apt-get install ffmpeg libavcodec-extra

Windows:

1. 进入 http://ffmpeg.org/download.html#build-windows,点击 windows 对应的图标,进入下载界面点击 download 下载按钮,
2. 解压下载好的zip文件到指定目录
3. 将解压后的文件目录中 bin 目录(包含 ffmpeg.exe )添加进 path 环境变量中
4. DOS 命令行输入 ffmpeg -version, 出现以下界面说明安装完成:

2.简单合并ts文件

使用 ffmpeg 合并一些 ts 文件非常简单,你只需要在终端输入一行命令即可:

ffmpeg -f concat -i file_list.txt -c copy output.mp4

其中 file_list.txt 为如下格式文本文件:

      file 'input1.ts'
      file 'input2.ts'
      file 'input3.ts'

我们可以用 Python 脚本生成这个 file_list.txt:

import os
filePath = "你的ts视频存放路径"
file_list = sorted(os.listdir(filePath))
with open("你的ts视频存放路径/file_list.txt","w+") as f:
    for file in file_list:
        f.write("file '{}'\n".format(file))

注意,这个 file_list.txt 需要和你的 ts 视频存放在同一个目录下,然后 cd 进入此目录,并执行上面提到过的 ffmpeg 合并转换命令:

ffmpeg -f concat -i file_list.txt -c copy output.mp4

3.解密处理

上面我们讲的是没有经过加密的 ts 文件,这些文件下载后直接可以播放,但经过AES-128加密后的文件下载后会无法播放,所以还需要进行解密。

如何判断是否需要加密?观察视频网站是否有m3u8的文件传输,下载下来并打开:

#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:13
#EXT-X-MEDIA-SEQUENCE:0

//这里是注释,真实文件里不会有的,
//看看你的m3u8文件里有没有这一行,如果有的话,这个m3u8文件是加密的,请继续往下看
//如果没有这行的话,特别是没有这个#EXT-X-KEY,那么恭喜你,这个m3u8没有加密,你直接按本文教程第二点合并即可
#EXT-X-KEY:METHOD=AES-128,URI="http://www.example.com/20180125/key.key"


#EXTINF:12.5,
//下面的这个其实才是视频真正的地址,你放在浏览器地址栏上直接回车是可以直接下载的
//不过这样的链接在m3u8文件里会有很多,建议使用工具下载(迅雷)、ffmpeg、vlc等
//这里还可能出现GBDYO3576000.ts这种情况,其实是把前面的路径省略了,可根据m3u8文件的路径自行加上
http://www.example.com/20180125/GBDYO3576000.ts
#EXTINF:12.5,
http://www.example.com/20180125/GBDYO3576001.ts
#EXTINF:12.5,
http://www.example.com/20180125/GBDYO3576002.ts

如果你的文件是加密的,那么你还需要一个key文件,Key文件下载的方法和m3u8文件类似,如下所示 key.key 就是我们需要下载的 key 文件,并注意这里 m3u8 有2个,需要使用的是像上面一样存在 ts 文件超链接的 m3u8 文件:

下载所有 ts 文件,将下载好的所有的 ts 文件、m3u8、key.key 放到一个文件夹中,将 m3u8 文件改名为 index.m3u8,将 key.key 改名为 key.m3u8 。更改 index.m3u8 里的 URL,变为你本地路径的 key 文件,将所有 ts 也改为你本地的路径,如下所示:

#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:13
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-KEY:METHOD=AES-128,URI="e:/20180125/key.m3u8"
#EXTINF:12.5,
e:/20180125/GBDYO3576000.ts
#EXTINF:12.5,
e:/20180125/GBDYO3576001.ts
#EXTINF:12.5,
e:/20180125/GBDYO3576002.ts

然后用ffmpeg进行合并:

ffmpeg -allowed_extensions ALL -i index.m3u8 -c copy new.mp4

这样就大功告成了!我们成功解密并使用 ffmpeg 合并了这些 ts 视频片段,实际应用场景可能和这不一样,希望我们这篇文章能起到抛砖引玉的作用。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

智能监控时代 – 5步道出监控建设之道

 

在正式阅读本文之前,我们先思考一个问题-几乎每个IT公司都有一套自己的运维监控系统,每家公司的运维都在做监控系统,而似乎每家都在面临一个问题,监控系统不好用,不能解决实际的监控问题,有没有更好的监控系统呢?答案是有的,本文将为您揭晓谜底。

 

 

1.什么是监控-凡事预则立不预则废

  
    运维曰:监控,业务之大事,死生之地,存亡之道,不可不察也。

    监控的建设,不亚于一场战争的准备,无论是使用监控的用户,还是建设监控的人员,都是面临着监控是否好用、好使的现实挑战。因此,我们必须充分认识并理解监控

 

 

    监控(Monitoring),顾名思义,一是监测,二是控制,重点在第一个字眼“”上,即监测、预防的意思。在计算机运维领域,特指对目标状态进行数据采样,从而判断其运行状态的一种方式,通常我们会关注以下监控数据。

    在本文中,我们重点探讨后4种监控,因APM属于特定领域监控,此处不详细展开论述。那么,既然用户关心如此多的监控数据,如何建设符合业务需求的监控呢?是不是存在一款监控系统,就完全能够满足用户的监控需求呢?

 

2.监控建设的现状-千里之行始于足下

    
    监控平台的建设,是一个长期的过程,而非一蹴而就。总体而言,有四种玩法,一是基于开源监控平台搭建,二是使用商业平台搭建,三是二次开发开源产品,四是完全自主研发。而每种玩法,均有自己的特点和其局限。

 

    1.基于开源监控软件搭建监控平台,可选Nagios,Cacti,Ganglia,Zabbix,Graphite,Prometheus,TIGK(Telegraf、InfluxDB、Grafana、Kapacitor)等,通常只需要将开源软件部署起来,然后丰富采集数据即可。其特点是开源,社区解决方案众多,可以自由定制。

 

    2.基于商业软件搭建监控平台,在监控的商业领域,以前基本都是国外公司的天下,如IBM,HP,卓豪等,但随着本土基础设施厂商的崛起,国产厂商奋发图强,出现了一些优秀的商业监控软件,如云智慧,监控易,OneAPM等,以及一些专有场景的监控软件。其特点是只要花钱,就可以实现相应的监控服务,免去了监控构建的重复摸索,适合于监控场景较复杂,缺少人力,又急需监控解决方案的项目。

 

    3.基于开源软件二次开发搭建监控平台,开源软件本身具备完备的功能,如提供API,提供数据查询接口,可以通过API进行管控等,如基于Zabbix、Prometheus、Open-falcon等的二次开发,可以实现完备的监控功能和友好的管理功能。其特点是可以按需扩展监控采集源,按需集成,自由定制,不满足于已有软件提供的功能,可以按场景灵活定制,建议在“花钱买服务不能解决问题”的情况下而为之。

 

    4.基于自主研发,从零开始构建监控平台,这是一个庞大的工程,对技术和工程管理都是个不小的挑战。为何需要自主从头开始研发一个监控系统,原因可能有以下几个,1.市面监控软件无法满足其业务需求,功能上不够用,性能上不满足,管理上不足以支撑其业务和组织架构的发展;2.生态上无法满足业务发展需求;3.基于开源软件二次开发版权存在风险,受制于他人;4.业务需要,管理层支持,技术人员充足,天时地利人和均具备。其特点是开发周期长,目标期望与现实差距,开发速度和业务发展速度能否及时跟进,稍有不慎就有软件项目失控风险,考验的是项目管理水平和工程实现能力。

 

    从上面四种方式来看,其实现成本从低到高,从易到难,而具体采用什么方式,需要根据实际情况定夺,那么,主要取决于什么呢?人力、物力、财力,还和公司所处的阶段有密切关系。

 

    如公司刚起步,追求速度和成本,则花半天时间搭建一个开源监控系统,则不失为明智选择,具体选择哪款开源软件,则可以选择自己最熟悉的,使用人群最多的。而公司已初具规模,业务需求较多,选择商业监控软件和基于开源二次开发,则可以根据具体业务需求进行详细评估,规则是,花多少成本,换取多少收益,还需考虑收益是长期的,还是短期的。

 

    当公司发展到一定规模,其组织架构和业务需求,决定了软件架构需求,因此此时的监控系统,也必须具备这种能力,因此这个时候,建设基础设施不仅仅是业务需求和产品能力那问题,而是战略规划紧密关联的问题,所以选择完全自主开发,还是选择商业、开源优良产品二次开发,均是可选方向,取决于技术储备和组织的执行力,天时地利人和缺一不可。
 
    诚如“孙子曰:凡用兵之法,驰车千驷,革车千乘,带甲十万,千里馈粮,则内外之费,宾客之用,胶漆之材,车甲之奉,日费千金,然后十万之师举矣。”,对应监控的自主开发,需要项目,产品,设计,开发,测试,运维,运营等多种人员共同参与,然后历经数月,出demo,测试验证,经过一个又一个的迭代,然后十万服务器可监控之。自主开发耗费人力物力财力,犹如一场战争的准备,不可轻举妄动,要详细周密的规划方可行动。

 

3.监控建设的挑战-吾将上下而求索

    在第2节中,我们探讨了监控建设方案选型问题,那么,除了上面提到的问题,我们在建设过程中还会有哪些问题呢?

 

1.系统关键指标缺失

    监控建设从来都是一个持续的过程,没有一劳永逸的解决方案。在持续不断的监控运维中,我们不断去丰富和完善相关监控,常见的系统和应用层面的监控指标如下所示。

 

 

    从上图中我们可以看到,监控的具体使用用户对监控指标的采集是非常宽泛的,不管是市面哪款监控系统,其提供的默认监控指标,可能是无法满足实际的场景需求。随着监控系统的持续运营和业务不断发展,采集更多监控指标的需求会越来越迫切,因此,我们希望监控系统能够提供自由扩充和灵活定制的能力。

 

2.功能扩展困难重重

 

    在持续的监控系统建设过程中,我们根据实际需求不断地完善采集指标。因此,监控平台是不是原生支持多种采集方式,可能会限制我们能力的发挥,比如监控网络、存储等硬件设备,我们就必须使用SNMP协议来获取监控指标数据,这应该是平台自带的一个基本能力。如果还要我们自己去从零实现,就相当于写了一个小型的监控软件。因此,这个监控系统应该提供扩展能力,要么是代码开源,要么是接口开放,我们可以根据实际需求去扩展模块和组件,从而达到我们业务不断发展的需求。

 

    监控指标的不断扩充,指标数据需要存储的数据量也会越来越大,此时,对监控系统的QPS,就有非常高的要求。监控系统能否支持高并发的请求,直接就决定了这个监控系统能否使用。试想,如果一个监控系统三天两头出问题,那么用户可能就会流失,甚至是放弃这个监控系统的使用,转而去寻求更好的解决方案。

 

    监控用户的使用过程中,对监控平台的期望SLA可能是100%,而实际能达到的SLA可能是99.9%(每年停机约9小时)。随着业务和技术的不断发展,监控用户对监控系统的要求也越来越高,SLA能不能再继续提高呢?

 

    实际上,SLA的提高1个9,对系统的挑战都是非常大的,比如我们的架构是否支持,架构设计是否合理,架构有没有冗余,能不能支持水平横向扩容,存不存在单点故障,服务器资源是否足够,系统的并发是否是一条直线,等等众多的因素,直接决定了我们提供的监控系统SLA能否再继续提高。理想情况是,架构有冗余,当链路出现故障时,能够自动切换,能够有备机顶替;当容量出现不足时,能够加服务器就扩容,而能够自动负载均衡。

 

    所以,我们在设计监控系统的时候,一定要学习互联网架构中的高并发,高可用,分布式架构设计方案。此后,无论是增加功能模块,或者是系统的整体升级,有了架构上的保障,可以做到按需进行升级扩展,而不用担心系统可用性问题,升级变更扩展对用户都是无感知的。

 

3.系统可靠性毫无保障

 

    当监控的主机规模达到5000设备,1万设备的时候,一般监控系统会出现瓶颈,系统的QPS持续增长,能否支撑7*24小时、365天稳定运行,是一个非常大的挑战,可以说,监控系统从来都是一个高并发系统,同时,也是一个大数据库系统,比如日增5T,10T,50T的数据,并且要求详细历史数据,保存周期需要7天,30天,甚至是1年,而趋势数据(将历史数据进行归档,如按小时的max,min,avg存储)则要求保留1年,2年或者是更久,那么监控系统的数据可能达到PB数据级别,其数据处理之道,和海量的大数据处理系统,有异曲同工之妙,采集->清洗->分析->入库->使用

 

    数据上报延迟,大致的原因有三个,一是采集器出现的问题,无法按照既定的周期采集到数据,或因原始数据不存在,或因采集器达到本身性能上限问题;二是监控系统的清洗,处理分析环节出现堵塞,表现为采集上报正常,数据未入库;三是数据处理后,不得正常入库,即为监控的数据库存在问题,表现为数据写入慢,查询慢,超过数据库的上限。这3种情况,不论是哪种情况,对用户来说,都是不可用的。

 

    不误报,不漏报,不延迟,这是对监控系统的基本要求。误报就是数据处理出现问题,让用户降低对监控的信任感,如果长期存在误报情况,那么用户就会失去对监控的信任,逐步抛弃此监控系统。漏报就是本该发出的告警而没有发送出来,这种情况就更严重了,严重影响了用户的正常使用,严重降低了用户的预期,宛如晚点的飞机,不能准时达到目的地。延迟就是告警现在产生,明天才收到,这个情况说明监控系统已处于不可用状态。在本该故障的时候,告警收不到,在故障结束时,告警发出来,用户会完全不信任这个监控系统。如果监控系统连告警这个基本事情都做不好,那么算不上一个合格的监控系统,用户会将这个监控系统当做一个噪音对待。

 

 

    当我们把数据上报,告警问题都解决后,系统能够正常工作,又会面临新的问题。用户反馈,能不能将告警做得更智能一点呢?试想一下,告警模块正常工作,用户每天收到1000条告警,甚至收到10000条告警,用户也会疯掉,这简直就是告警“轰炸机”,过多的告警,成为了噪音,干扰正常判断。因此,告警能否收敛的问题,成为了头等大事。

 

    告警收敛,是指将多条策略相同、目标范围不同的告警进行合并发送,按照一定的规则进行收敛的一种告警发送方式。比如,我们一个云区域的网络故障,会造成该区域下所有设备不可达,那么ping不可达告警,则会逐条发送,如果该区域下1000条机器,是发送1条告警好呢?还是1000条好呢?相信绝大多数正常人会只想接收1条重要告警即可。告警收敛,将告警极大提高告警的精确性,真正让我们做到运筹帷幄之中而不慌乱,不至于让我们天天神经紧张,天天处于狼来了的局面,因为过多的告警,有温水煮青蛙效应,让我们会逐步失去对告警的敏感度,逐渐会因为过多的告警而完全不关注告警。

 

    有了告警收敛,是不是就可以高枕无忧了呢?不,我们还需要故障关联 ,故障自动分析,为什么需要这个功能呢?试想,一个机架掉电,引起了15台设备全部宕机,从而引发了一系列故障,如API超时,HTTP拨测失败,DB连接数增长,那么能否找到root-cause呢?能否提供一条重要告警帮我们自动分析故障的根因呢?此时,故障关联和故障自动分析,显得格外重要。所以,监控系统,必须具备故障关联分析的能力,为我们的运维决策提供更准确的信息。

 

    此外,监控系统能否对当前环境的性能进行分析,对系统容量分析,也将是一个重要能力,如对趋势预测,什么时候我们应该对业务进行扩容服务器,什么时候应该进行缩容服务器,当前的性能是否足够,是否还有优化的空间。而监控系统,因为有数据存在,是完全可以提供这一系列数据作为重要依据的。

 

4.技术落后于业务发展

    随着业务不断发展,组织架构会根据业务形态进行调整,不同人员,需要对应不同的权限,需要划分更多角色,如超级管理员,分级管理员,普通管理员,普通用户,甚至于针对菜单按钮级别更细粒度权限管控需求。如果监控系统一开始没有考虑到这些需求,则很难应对业务的增长需求。随着业务的发展,公司为了降低成本,可以将一些常规事项外包出去,此时,对权限的分级控制显得格外重要。在这个时候,监控系统不再是一个孤立的系统,必须和统一用户登录认证系统集成,实现配置,查询分离,才能够满足组织的业务发展。

 

    随着业务不断发展,业务对监控系统提出了更高要求,比如要求“微秒级监控数据采样”,要求“秒级告警”,要求监控系统提供100%可靠信息,根据监控系统提供的系统容量指标数据,对业务应用的目标服务器、容器进行扩容或者缩容。监控系统作为一个底层依赖系统,此时将发挥更大价值。

 

4.智能监控建设的实践-纸上得来终觉浅

    在建设监控系统的时候,我们清楚以上的问题所在,那么,我们是如何解决以上这些问题的呢?下面,我们来详细看看具体的实践。
 

1.降低使用门槛-开箱即用

 

    对于服务器设备,系统初始化后,默认就安装了Agent,无需任何配置,就可以采集到主机监控数据,如在CMDB中配置了进程信息,则会采集进程状态数据。

    其中进程指标基本的如下所示

 

2.和告警风暴说再见

    当主机默认接入后,即可自动添加默认告警策略,如达到阈值,即可产生告警,如下所示,是一些默认告警策略。

 

    为了防止告警过多对用户造成干扰 ,我们使用了四大锦囊法宝,确保用户收到的告警是有效的。

 

    锦囊一-告警收敛。如下所示的事件中,可以看到收敛规则有效减少了告警事件收敛,产生的事件和通知比例1:100,甚至更高。从设计上原生支持的告警收敛功能,有效阻止了告警风暴产生。

    锦囊二-告警抑制。通常,由于用户不同的实际需求,会对同一告警内容配置不同的阈值。比如配置磁盘空间使用率告警,有80%预警,90%警告,95%严重,那么这3条策略如何工作呢?假如当前磁盘空间使用率已经达到96%,是产生3条告警通知吗?实际情况中,如果监控系统产生了3条告警,那这个监控系统就会被用户认为是智障了。所以,对于同纬度的策略,我们只会发送告警级别的告警,即只会产生95%严重级别的告警通知。

 

    锦囊三-告警汇总。既使我们的监控系统已经有了告警收敛,告警抑制这两大功能,我们依然不能解决同一时间发送大量告警问题,比如多条告警规则同时满足,那么还是可能产生告警洪水风暴问题。因此,对于告警汇总功能,实在是有必要。对于同一时候涌入的大量告警,对于同维度的告警进行汇总,对于不同策略,则进行合并告警。有了告警汇总功能,我们就可以放心接收告警了。

 

    锦囊四-告警分析。通过对历史告警数据的挖掘分析,我们还可以找到异常告警,以便更好的分析原始数据和告警阈值,从而为告警配置和无阈值告警提供更好的数据支撑。

3.功能扩展容易-即插即用

4.用户权限控制-按需授权

    按监控提供的功能进行查看和管理两种基本的划分,一般来说有如下几个用户场景:

  • 告警通知的接收者:如运维,开发,测试,产品等。适用申请查看类和屏蔽类操作。

  • 监控的配置者:如运维。适用于申请查看+管理类操作。

  • 监控平台的管理者:适用于全局的功能。

5.自动化筑基石-效率优先

    有别于其他的监控系统,定义采集,直接在系统中就可以完成,无需使用其他第三方控制系统或者登录服务器部署。所有的配置都能够在界面上完成,包括插件编写,我们只需要打开页面制作插件即可。

 

     动态采集目标,当模块中的主机有增加或减少的时候,可以自动下发插件到目标机器,而无法人工部署采集插件。

 

    同理,告警策略中的目标范围也是自动匹配,而不用对新增加的主机或模块进行策略编辑,范围自动生效。

 

5.监控建设的总结-身经百战忆往昔

 

    在监控平台的建设过程中,经过不断实践,不断总结经验,监控平台建设的功能会逐步成熟,但是,如果只把监控平台当做核心目标来建设,会遇到非监控本身的问题,比如怎么将发布和监控结合,如何在发布期间屏蔽告警。如何让监控联动CMDB,怎么打通监控系统和运维自动化系统,怎么打通监控与流水线发布系统,怎么打通监控和运维的各个环节?这个问题,不是独立的一个监控系统能解决的,而是需要一个可扩展的可定制的运维平台才能解决的。

 

    回顾监控体系的建设,我们总结如下的经验:

1.确定目标,搞清楚需求,请不要盲目行动,仔细分析业务需求,然后选择相应的监控方案,指定清晰的业务需求规划和项目规划,弄清楚是需要一个烟囱式监控系统还是一个互通有无的运维平台体系。

 

2.尽量使用成熟稳定的开源平台,如考虑使用Zabbix、Pormetheus等,如果监控规模过大,可能会存在性能和其他使用方面问题。这里,笔者推荐选择使用蓝鲸[5]这样成熟稳定的平台,则可以很好的解决容量和性能问题,因为蓝鲸平台是天然支持海量并发场景,能够水平扩容,遇到了容量不够的问题,也只需要扩容对应的服务模块即可。

 

3.伴随着业务不断的发展和技术的不断迭代更新,监控系统也会不断优化总结,迭代更新。因此,没有一劳永逸的方案,只有不断的动态发展平衡。

 

4.要从软件设计者的角度去做监控系统,面对监控需求,从现象到本质去更好的满足使用场景,而不是仅仅作为一个工具,因为监控和运维是密切关联的,是需要从数据生产到消费,再到分析关联,为运维的目标-“提效、提质、提能”而添砖加瓦。

 

     因此,运维曰:监控,业务之大事,死生之地,存亡之道,不可不察也

 

体验地址

如需下载体验,可点击下方 监控日志套餐 下载按钮。

监控日志套餐需在基础套餐上运行。

蓝鲸社区版V6.0.3正式版

部署福利活动:点击跳转

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 模拟登陆神库!集合了20+个平台的模拟登陆脚本

Awesome-python-login-model 是一个国人开发的模拟登陆仓库,在这个仓库上有20几个网站的模拟登陆脚本,你可以基于这个仓库实现的代码做简易的修改,以实现自己的自动化功能。

https://github.com/Kr1s77/awesome-python-login-model

其支持模拟登陆的网站有:

可以看到,支持的站点非常多,大家可以从他仓库里学到许多关于模拟登陆的方法,简单的来讲,大多数脚本采用的是直接登录的方式,有的网站直接登录难度很大,比如qq空间,bilibili等使用 selenium + webdriver 的方式就相对轻松一些。

一些网站虽然在登录的时候采用的是selenium的方式,为了效率,我们可以在登录过后得到的cookie维护起来,然后调用 requests 或者 scrapy 等进行数据采集,这样数据采集的速度可以得到保证。

如果你无法登陆 Github 克隆它的仓库,请在 Python 实用宝典 后台回复 Awesome-python-login-model 下载仓库代码。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

使用这个仓库的时候,你需要按需安装并加载相应的模块,不过无非就是以下几个模块:

pip install beautifulsoup4
pip install selenium
pip install pyppeteer
pip install pillow

上面的模块你并不需要全部安装,最好是找到你所需要模拟登陆的网站的脚本,查看它头部 import 了什么模块,按需安装即可。

2.简单的模拟登陆实战

下面来看一个拉勾网的登陆脚本:

# -*- coding:utf-8 -*-
import re
import os
import time
import json
import sys
import subprocess
import requests
import hashlib
from bs4 import BeautifulSoup

"""
info:
author:CriseLYJ
github:https://github.com/CriseLYJ/
update_time:2019-3-6
"""


class Lagou_login(object):
    def __init__(self):
        self.session = requests.session()
        self.CaptchaImagePath = os.path.split(os.path.realpath(__file__))[0] + os.sep + 'captcha.jpg'
        self.HEADERS = {'Referer': 'https://passport.lagou.com/login/login.html',
                        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36'
                                      ' (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36'
                                      ' Core/1.53.4882.400 QQBrowser/9.7.13059.400',
                        'X-Requested-With': 'XMLHttpRequest'}

    # 密码加密
    def encryptPwd(self, passwd):
        # 对密码进行了md5双重加密
        passwd = hashlib.md5(passwd.encode('utf-8')).hexdigest()
        # veennike 这个值是在js文件找到的一个写死的值
        passwd = 'veenike' + passwd + 'veenike'
        passwd = hashlib.md5(passwd.encode('utf-8')).hexdigest()
        return passwd

    # 获取请求token
    def getTokenCode(self):
        login_page = 'https://passport.lagou.com/login/login.html'

        data = self.session.get(login_page, headers=self.HEADERS)

        soup = BeautifulSoup(data.content, "lxml", from_encoding='utf-8')
        '''
            要从登录页面提取token,code, 在头信息里面添加
            <!-- 页面样式 --><!-- 动态token,防御伪造请求,重复提交 -->
            <script type="text/javascript">
                window.X_Anti_Forge_Token = 'dde4db4a-888e-47ca-8277-0c6da6a8fc19';
                window.X_Anti_Forge_Code = '61142241';
            </script>
        '''
        anti_token = {'X-Anit-Forge-Token': 'None',
                      'X-Anit-Forge-Code': '0'}
        anti = soup.findAll('script')[1].getText().splitlines()
        anti = [str(x) for x in anti]

        anti_token['X-Anit-Forge-Token'] = re.findall(r'= \'(.+?)\'', anti[1])[0]
        anti_token['X-Anit-Forge-Code'] = re.findall(r'= \'(.+?)\'', anti[2])[0]

        return anti_token

    # 人工读取验证码并返回
    def getCaptcha(self):
        captchaImgUrl = 'https://passport.lagou.com/vcode/create?from=register&refresh=%s' % time.time()
        # 写入验证码图片
        f = open(self.CaptchaImagePath, 'wb')
        f.write(self.session.get(captchaImgUrl, headers=self.HEADERS).content)
        f.close()
        # 打开验证码图片
        if sys.platform.find('darwin') >= 0:
            subprocess.cx5c r[p'6;-]l=09all(['open', self.CaptchaImagePath])
        elif sys.platform.find('linux') >= 0:
            subprocess.call(['xdg-open', self.CaptchaImagePath])
        else:
            os.startfile(self.CaptchaImagePath)

        # 输入返回验证码
        captcha = input("请输入当前地址(% s)的验证码: " % self.CaptchaImagePath)
        print('你输入的验证码是:% s' % captcha)
        return captcha

    # 登陆操作
    def login(self, user, passwd, captchaData=None, token_code=None):
        postData = {'isValidate': 'true',
                    'password': passwd,
                    # 如需验证码,则添加上验证码
                    'request_form_verifyCode': (captchaData if captchaData != None else ''),
                    'submit': '',
                    'username': user
                    }
        login_url = 'https://passport.lagou.com/login/login.json'

        # 头信息添加tokena
        login_headers = self.HEADERS.copy()
        token_code = self.getTokenCode() if token_code is None else token_code
        login_headers.update(token_code)

        # data = {"content":{"rows":[]},"message":"该帐号不存在或密码错误,请重新输入","state":400}
        response = self.session.post(login_url, data=postData, headers=login_headers)
        data = json.loads(response.content.decode('utf-8'))

        if data['state'] == 1:
            return response.content
        elif data['state'] == 10010:
            print(data['message'])
            captchaData = self.getCaptcha()
            token_code = {'X-Anit-Forge-Code': data['submitCode'], 'X-Anit-Forge-Token': data['submitToken']}
            return self.login(user, passwd, captchaData, token_code)
        else:
            print(data['message'])
            return False


if __name__ == "__main__":

    username = input("请输入你的手机号或者邮箱\n >>>:")
    passwd = input("请输入你的密码\n >>>:")

    lg = Lagou_login()
    passwd = lg.encryptPwd(passwd)

    data = lg.login(username, passwd)
    if data:
        print(data)
        print('登录成功')
    else:
        print('登录不成功')

从头部的 import 引入来看,你需要安装并加载 Beautifulsoup4 模块:

pip install beautifulsoup4

安装完成后,终端需要 cd 进入此脚本所在文件夹,执行脚本:

python Lagou.py

运行脚本后需要你输入一定的信息进行登陆,做得非常方便和贴心:

登陆完成后,你就可以做任何你想要做的事情了。

3.基于selenium的模拟登陆

有些网站的爬取没有那么简单,他们会做权限校验、会做反爬机制。这种情况下,我们可以用selenium解决一些比较困难和复杂的登陆场景。

基于selenium的模拟登陆稍微复杂一点,你需要设置chromedriver的路径到环境变量中。如果你没有设置,运行登陆脚本的时候会出现以下错误:

怎么下载并设置 Chromedriver 到环境变量里呢?你可以在这里下载到最新版的chromedriver:

https://chromedriver.chromium.org/

现在最新版 Chromedriver 版本号到了 91.0.4472.101,下载链接如下: https://chromedriver.storage.googleapis.com/index.html?path=91.0.4472.101

可以看到,每个系统需要下载的 Chromedriver 版本不一样,请对应你的系统下载指定的版本即可。mac64 和 mac_m1指的是使用了不同芯片的Mac笔记本,你可以在Mac上,单击菜单栏左上角的[Apple]图标,然后选择“关于本机”选项。看到如下写着芯片 Apple M1 则应该下载mac_m1版本。

如果你的网络存在问题无法下载,没关系,关注 Python实用宝典 公众号,后台回复 Chromedriver 即可下载,我已经把这4个版本放到了国内网盘上。

下载 Chromedriver 完成后,你还需要设置环境变量

(macOS 系统) 请这样设置环境变量:

1. 把解压得到的 Chromedriver 放到一个你不会经常变动的路径

如 /usr/local/bin/ ,你需要 Command+空格 输入并打开终端(Terminal),执行以下命令:

cd /usr/local/bin/
open .

然后将 Chromedriver 拖入,就能成功将 Chromedriver 放入其中。

2.添加环境变量

在终端输入下列命令就能添加到环境变量:

export PATH=$PATH:/usr/local/bin/chromedriver

执行完这一步,恭喜你成功在 macOS 上安装了 Chromedriver.

(Windows 系统) 请这样设置环境变量:

1.在左下角搜索环境变量,打开“编辑系统环境变量”的选项:

2.设置 Chromedriver 环境变量:

将你的 chromedriver 所在目录放入到 PATH 变量中,如图所示。比如我的 chromedriver.exe 的路径是C:\Users\83493\Documents\bin\chromedriver.exe, 那么此处就应该增加 C:\Users\83493\Documents\bin 路径。

设置完成后,你便成功在 Windows 上安装了 Chromedriver. 另外请注意设置后要重启终端或CMD让环境变量生效。

另外如果你在使用 Chromedriver 的时候出现了类似于以下的报错,不要慌:

这是由于当前 Chromedriver 版本是91, 而你现有的 Chrome 版本是 90 造成的,升级Chrome即可解决问题。

完成selenium的基本配置后,我们可以尝试运行QQ空间模拟登陆:

进入项目文件夹的qqzone文件夹:

cd awesome-python-login-model\qqzone

然后直接运行 qq_zone.py 文件:

python qq_zone.py

此时会弹出一个浏览器并让你输入信息:

输入信息后,就会正常走登陆流程:

看到如上的界面,说明登陆完成,此时Cookie什么的都已经被设定完毕,你可以把Cookie存下来,并做任何你想做的事情了。

如果你愿意研究作者的代码,你会发现其实很简单:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
info:
author:CriseLYJ
github:https://github.com/CriseLYJ/
update_time:2019-3-7
"""

import time  # 用来延时
from selenium import webdriver

driver = webdriver.Chrome()  # 选择浏览器,此处我选择的Chrome
QQ_NUMBER = input('请输入你的QQ号')
PASSWORD = input('请输入你的QQ密码')

driver.get('http://i.qq.com/')
driver.switch_to.frame('login_frame')
driver.find_element_by_id('switcher_plogin').click()

driver.find_element_by_name('u').clear()
driver.find_element_by_name('u').send_keys(QQ_NUMBER)  # 此处输入你的QQ号
driver.find_element_by_name('p').clear()
driver.find_element_by_name('p').send_keys(PASSWORD)  # 此处输入你的QQ密码

driver.execute_script("document.getElementById('login_button').parentNode.hidefocus=false;")

driver.find_element_by_xpath('//*[@id="loginform"]/div[4]/a').click()
driver.find_element_by_id('login_button').click()

time.sleep(10)  # 因为我曾经是QQ会员,所以每次登陆时都会提醒我要不要再续费的弹窗...

driver.find_element_by_id('dialog_button_1').click()  # 这个地方是我把那个弹窗给点击了,配合上面的延时用的,延时是等待那个弹窗出现,然后此处点击取消

btns = driver.find_elements_by_css_selector('a.item.qz_like_btn_v3')  # 此处是CSS选择器
for btn in btns:
    btn.click()

简单的讲,代码一共分了4个步骤,分别如下:

1.让使用者输入QQ号和密码。

2.切换浏览器焦点到登录框中,选择元素输入账号和密码。

3.为了显示登录按钮,执行了以下脚本:

driver.execute_script("document.getElementById('login_button').parentNode.hidefocus=false;")

4.点击确认按钮,完成登录。

可以看到,基于 Selenium 的自动化控制一点都不难,一旦熟悉控制流程及相应的方法后应该如鱼得水。只要你度过一开始安装 Chromedriver 时的繁琐阶段,后面代码开发时多参考他人的代码,Selenium这个自动化工具是可以被熟练掌握的。

总而言之,Awesome-python-login-model 这个模拟登陆的代码库,可以给你带来不少的便利,你可以直接基于它提供的登陆脚本开发,也可以参考这些脚本自己写一个其他网站的模拟登陆脚本,并给作者提交PR。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python yield 关键字有什么作用?详细解答

yield 关键字有什么作用?要了解 yield 的作用,您必须了解生成器是什么。在了解生成器之前,您必须了解iterables(可迭代对象)。

1. 什么是可迭代对象?

对于列表时,您可以一项一项地输出它的值。一项一项地读取列表的内容,这种形式被称为迭代:

>>> mylist = [1, 2, 3]
>>> for i in mylist:
...    print(i)
1
2
3

mylist是一个可迭代的对象。当您使用列表推导式时,您将创建一个列表,以及一个可迭代对象:

>>> mylist = [x*x for x in range(3)]
>>> for i in mylist:
...    print(i)
0
1
4

那些可以被你使用 “ for... in...” 迭代的所有对象都是可迭代的,比如 数组、字符串等。

这些可迭代对象很方便,因为您可以随心所欲地读取它们,但是您将所有值存储在内存中,当您有很多值时,这并不总是您想要的。

2. 什么是生成器?

生成器是迭代器,一种只能迭代一次的可迭代对象。生成器不会将所有值存储在内存中,它们会即时生成值

>>> mygenerator = (x*x for x in range(3))
>>> for i in mygenerator:
...    print(i)
0
1
4

生成器创建的时候需要用 () 而不是 [] . 但是,您不能重复执行 for i in mygenerator ,因为生成器只能使用一次:它们计算出 0 (0*0),然后忘记它并计算得到 1 (1*1),然后一一结束计算得到 4 (2*2)。

3. 重点来了,什么是yield? yield 关键字有什么作用?

yield 是一个像 return 一样使用的关键字,不同的是使用yield会使该函数返回一个生成器。

>>> def create_generator():
...    mylist = range(3)
...    for i in mylist:
...        yield i*i
...
>>> mygenerator = create_generator() # create a generator
>>> print(mygenerator) # mygenerator is an object!
<generator object create_generator at 0xb7555c34>
>>> for i in mygenerator:
...     print(i)
0
1
4

这是一个无用的示例,但是当您知道您的函数将返回大量您只需要读取一次的值时,它会很方便。

要掌握yield,你必须明白,当你调用函数时,你写在函数体中的代码并没有运行。该函数只返回生成器对象。然后,您的代码将在每次for循环使用生成器时从停止的地方继续。

现在是困难的部分:

第一次 for 调用从您的函数创建的生成器对象时,它将从头开始运行您的函数中的代码,直到命中yield,然后它将返回循环的第一个值。然后,每个后续调用将运行您在函数中编写的循环的另一次迭代并返回下一个值。这将一直持续到生成器被认为是空的为止。


4. 控制生成器耗尽的一个例子

>>> class Bank(): # Let's create a bank, building ATMs
...    crisis = False
...    def create_atm(self):
...        while not self.crisis:
...            yield "$100"
>>> hsbc = Bank() # When everything's ok the ATM gives you as much as you want
>>> corner_street_atm = hsbc.create_atm()
>>> print(corner_street_atm.next())
$100
>>> print(corner_street_atm.next())
$100
>>> print([corner_street_atm.next() for cash in range(5)])
['$100', '$100', '$100', '$100', '$100']
>>> hsbc.crisis = True # Crisis is coming, no more money!
>>> print(corner_street_atm.next())
<type 'exceptions.StopIteration'>
>>> wall_street_atm = hsbc.create_atm() # It's even true for new ATMs
>>> print(wall_street_atm.next())
<type 'exceptions.StopIteration'>
>>> hsbc.crisis = False # The trouble is, even post-crisis the ATM remains empty
>>> print(corner_street_atm.next())
<type 'exceptions.StopIteration'>
>>> brand_new_atm = hsbc.create_atm() # Build a new one to get back in business
>>> for cash in brand_new_atm:
...    print cash
$100
$100
$100
$100
$100
$100
$100
$100
$100
...

注意:对于 Python 3,使用print(corner_street_atm.__next__())print(next(corner_street_atm))

它可用于控制对资源的访问等各种事情。

5.Itertools,你最好的朋友

itertools 模块包含操作可迭代对象的特殊函数。曾经想复制一个生成器吗?连接两个生成器?使用单行对嵌套列表中的值进行分组?Map/Zip 不创建另一个列表?

那么就 import itertools.

一个例子?让我们看看四马比赛可能的到达顺序:

>>> horses = [1, 2, 3, 4]
>>> races = itertools.permutations(horses)
>>> print(races)
<itertools.permutations object at 0xb754f1dc>
>>> print(list(itertools.permutations(horses)))
[(1, 2, 3, 4),
 (1, 2, 4, 3),
 (1, 3, 2, 4),
 (1, 3, 4, 2),
 (1, 4, 2, 3),
 (1, 4, 3, 2),
 (2, 1, 3, 4),
 (2, 1, 4, 3),
 (2, 3, 1, 4),
 (2, 3, 4, 1),
 (2, 4, 1, 3),
 (2, 4, 3, 1),
 (3, 1, 2, 4),
 (3, 1, 4, 2),
 (3, 2, 1, 4),
 (3, 2, 4, 1),
 (3, 4, 1, 2),
 (3, 4, 2, 1),
 (4, 1, 2, 3),
 (4, 1, 3, 2),
 (4, 2, 1, 3),
 (4, 2, 3, 1),
 (4, 3, 1, 2),
 (4, 3, 2, 1)]

6.理解迭代的内部机制

迭代是一个包含可迭代对象(实现__iter__()方法)和迭代器(实现__next__()方法)的过程。

可迭代对象是您可以从中获取迭代器的任何对象。迭代器是让你迭代可迭代对象的对象。

这篇文章中有更多关于for循环如何工作的内容。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 高效流程编排引擎 bamboo-pipeline 实战教程

Bamboo-pipeline 是蓝鲸智云旗下SaaS标准运维的流程编排引擎。其具备以下特点:

  • 1. 多种流程模式:支持串行、并行,支持子流程,可以根据全局参数自动选择分支执行,节点失败处理机制可配置。
  • 2. 参数引擎:支持参数共享,支持参数替换。
  • 3. 可交互的任务执行:任务执行中可以随时暂停、继续、撤销,节点失败后可以重试、跳过。

本文需要涉及到 Django, Celery 的前置知识,如果你还不了解这两者,建议谷歌或百度搜索了解一下,或者阅读我之前的文章:

Django:

Python Django快速开发音乐高潮提取网(1)

Python Django快速开发音乐高潮提取网(2)

Python Django快速开发音乐高潮提取网(3)

Pycharm+Django 安装及配置指南

手把手Django+Vue前后端分离入门实战教程

Celery:

Python celery异步快速下载股票数据

Django Celery 异步与定时任务实战教程

网络IO谁更快?Python与Go请求速度对比

什么?Python Celery 也能调度Go worker?

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install bamboo-engine
pip install bamboo-pipeline
pip install django
pip install celery

2. 项目初始化

(选项一:无Django项目)如果你没有任何的现成Django项目,请按下面的流程初始化

由于 bamboo-pipeline 运行时基于 Django 实现,所以需要新建一个 Django 项目:

django-admin startproject easy_pipeline
cd easy_pipeline

在 easy_pipeline.settings.py 下添加如下配置:

from pipeline.eri.celery.queues import *
from celery import Celery

app = Celery("proj")

app.config_from_object("django.conf:settings")

INSTALLED_APPS = [
    ...
    "pipeline",
    "pipeline.engine",
    "pipeline.component_framework",
    "pipeline.eri",
    ...
]

在 easy_pipeline目录下初始化数据库:

python manage.py migrate

(选项二:有Django项目需要使用流程引擎)如果你有现成的PipeLine项目需要使用此流程引擎,请在项目的 settings.py 下添加如下配置:

from pipeline.eri.celery.queues import *
from celery import Celery

app = Celery("proj")

app.config_from_object("django.conf:settings")

INSTALLED_APPS = [
    ...
    "pipeline",
    "pipeline.engine",
    "pipeline.component_framework",
    "pipeline.eri",
    ...
]

然后重新执行migrate,生成pipeline相关的流程模型:

python manage.py migrate

migrate 执行完毕后会如下图所示:

由于是在原有项目上使用流程引擎,可能会遇到一些版本不匹配的问题,如果遇到报错,请排查解决或到蓝鲸官网上进行询问。

3. 简单的流程编排实战

首先在项目目录下启动 celery worker:

python manage.py celery worker -Q er_execute,er_schedule --pool=solo -l info

启动成功类似下图所示:

(注意)如果你是在你的原有Django项目上做改造,它并不一定能够顺利地启动成功,这是因为Pipeline使用了 Django 2.2.24,会存在许多版本不兼容的情况。如果遇到报错,请排查解决或到蓝鲸官网上进行询问。

在下面的例子中,我们将会创建并执行一个简单的流程:

3.1 创建流程APP

在 bamboo_pipeline 中,一个流程由多个组件组成,官方推荐使用APP统一管控组件:

python manage.py create_plugins_app big_calculator

该命令会在 Django 工程根目录下生成拥有以下目录结构的 APP:

big_calculator
├── __init__.py
├── components
│   ├── __init__.py
│   └── collections
│       ├── __init__.py
│       └── plugins.py
├── migrations
│   └── __init__.py
└── static
    └── big_calculator
        └── plugins.js

别忘了把新创建的这个插件添加到 Django 配置的 INSTALLED_APPS 中:

INSTALLED_APPS = (
    ...
    'big_calculator',
    ...
)

3.2 编写流程的Service原子

组件服务 Service 是组件的核心,Service 定义了组件被调用时执行的逻辑,下面让我们实现一个计算传入的参数 n 的阶乘,并把结果写到输出中的 Service,在 big_calculator/components/collections/plugins.py 中输入以下代码:

import math
from pipeline.core.flow.activity import Service


class FactorialCalculateService(Service):

    def execute(self, data, parent_data):
        """
        组件被调用时的执行逻辑
        :param data: 当前节点的数据对象
        :param parent_data: 该节点所属流程的数据对象
        :return:
        """
        n = data.get_one_of_inputs('n')
        if not isinstance(n, int):
            data.outputs.ex_data = 'n must be a integer!'
            return False

        data.outputs.factorial_of_n = math.factorial(n)
        return True

    def inputs_format(self):
        """
        组件所需的输入字段,每个字段都包含字段名、字段键、字段类型及是否必填的说明。
        :return:必须返回一个 InputItem 的数组,返回的这些信息能够用于确认该组件需要获取什么样的输入数据。
        """
        return [
            Service.InputItem(name='integer n', key='n', type='int', required=True)
        ]

    def outputs_format(self):
        """
        组件执行成功时输出的字段,每个字段都包含字段名、字段键及字段类型的说明
        :return: 必须返回一个 OutputItem 的数组, 便于在流程上下文或后续节点中进行引用
        """
        return [
            Service.OutputItem(name='factorial of n', key='factorial_of_n', type='int')
        ]

首先我们继承了 Service 基类,并实现了 execute() 和 outputs_format() 这两个方法,他们的作用如下:

  • execute:组件被调用时执行的逻辑。接收 data 和 parent_data 两个参数。
    其中,data 是当前节点的数据对象,这个数据对象存储了用户传递给当前节点的参数的值以及当前节点输出的值。parent_data 则是该节点所属流程的数据对象,通常会将一些全局使用的常量存储在该对象中,如当前流程的执行者、流程的开始时间等。
  • outputs_format:组件执行成功时输出的字段,每个字段都包含字段名、字段键及字段类型的说明。这个方法必须返回一个 OutputItem 的数组,返回的这些信息能够用于确认某个组件在执行成功时输出的数据,便于在流程上下文或后续节点中进行引用。
  • inputs_format:组件所需的输入字段,每个字段都包含字段名、字段键、字段类型及是否必填的说明。这个方法必须返回一个 InputItem 的数组,返回的这些信息能够用于确认某个组件需要获取什么样的输入数据。

下面我们来看一下 execute() 方法内部执行的逻辑,首先我们尝试从当前节点数据对象的输出中获取输入参数 n,如果获取到的参数不是一个 int 实例,那么我们会将异常信息写入到当前节点输出的 ex_data 字段中,这个字段是引擎内部的保留字段,节点执行失败时产生的异常信息都应该写入到该字段中。随后我们返回 False 代表组件本次执行失败,随后节点会进入失败状态:

n = data.get_one_of_inputs('n')
if not isinstance(n, int):
    data.outputs.ex_data = 'n must be a integer!'
    return False

若获取到的 n 是一个正常的 int,我们就调用 math.factorial() 函数来计算 n 的阶乘,计算完成后,我们会将结果写入到输出的 factorial_of_n 字段中,以供流程中的其他节点使用:

data.outputs.factorial_of_n = math.factorial(n)
return True

3.3 编写流程组件,绑定Service原子

完成 Service 的编写后,我们需要将其与一个 Component 绑定起来,才能够注册到组件库中,在 big_calculator\components\__init__.py 文件下添加如下的代码:

import logging
from pipeline.component_framework.component import Component
from big_calculator.components.collections.plugins import FactorialCalculateService

logger = logging.getLogger('celery')


class FactorialCalculateComponent(Component):
    name = 'FactorialCalculateComponent'
    code = 'fac_cal_comp'
    bound_service = FactorialCalculateService

我们定义了一个继承自基类 Component 的类 FactorialCalculateComponent,他拥有以下属性:

  • name:组件名。
  • code:组件代码,这个代码必须是全局唯一的。
  • bound_service:与该组件绑定的 Service

这样一来,我们就完成了一个流程原子的开发。

3.4 生成流程,测试刚编写的组件

在 big_calculator\test.py 写入以下内容,生成一个流程,测试刚刚编写的组件:

# Python 实用宝典
# 2021/06/20

import time

from bamboo_engine.builder import *
from big_calculator.components import FactorialCalculateComponent
from pipeline.eri.runtime import BambooDjangoRuntime
from bamboo_engine import api
from bamboo_engine import builder


def bamboo_playground():
    """
    测试流程引擎
    """
    # 使用 builder 构造出流程描述结构
    start = EmptyStartEvent()
    # 这里使用 我们刚创建好的n阶乘组件
    act = ServiceActivity(component_code=FactorialCalculateComponent.code)
    # 传入参数
    act.component.inputs.n = Var(type=Var.PLAIN, value=4)
    end = EmptyEndEvent()

    start.extend(act).extend(end)

    pipeline = builder.build_tree(start)
    api.run_pipeline(runtime=BambooDjangoRuntime(), pipeline=pipeline)

    # 等待 1s 后获取流程执行结果
    time.sleep(1)

    result = api.get_execution_data_outputs(BambooDjangoRuntime(), act.id).data

    print(result)

随后,在命令行输入

python manage.py shell

打开 django console, 输入以下命令,执行此流程:

from big_calculator.test import bamboo_playground
bamboo_playground()

流程运行完后,获取节点的执行结果,可以看到,该节点输出了 factorial_of_n,并且值为 24(4 * 3 * 2 *1),这正是我们需要的效果:

{'_loop': 0, '_result': True, 'factorial_of_n': 24}

恭喜你,你已经成功的创建了一个流程并把它运行起来了!在这期间你可能会遇到不少的坑,建议尝试先自行解决,如果实在无法解决,可以前往 标准运维 仓库提 issues,或者前往蓝鲸智云官网提问。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 电信用户流失预测模型实战教程

1.流失预测 研究背景

1、做好用户流失预测可以降低营销成本。老生常谈,新客户开发成本老客户维护成本的5倍。

2、获得更好的用户体验。并不是所有的增值服务都可以有效留住客户。

3、获得更高的销售回报。价格敏感型客户和非价格敏感性客户。

2.提出问题

1、流失客户有哪些显著性特征?

2、当客户在哪些特征下什么条件下比较容易发生流失

3.数据集描述

该数据是datafountain上的《电信客户流失数据》,这里提供一个下载地址。
https://www.datafountain.cn/datasets/35guide

该数据集有21个变量,7043个数据点。变量可分为以下三个部分:用户属性、用户行为、研究对象。

用户属性
customerID :用户ID
gender:性别(Female & Male)
SeniorCitizen :老年人(1表示是,0表示不是)
Partner :是否有配偶(Yes or No)
Dependents :是否经济独立(Yes or No)
tenure :客户的职位(0-72,共73个职位)

用户行为
PhoneService :是否开通电话服务业务(Yes or No)
MultipleLines :是否开通了多线业务(Yes 、No or No phoneservice 三种)
InternetService :是否开通互联网服务(No, DSL数字网络,fiber optic光纤网络 三种)
OnlineSecurity :是否开通网络安全服务(Yes,No,No internetserive 三种)
OnlineBackup :是否开通在线备份业务(Yes,No,No internetserive 三种)
DeviceProtection :是否开通了设备保护业务(Yes,No,No internetserive 三种)
TechSupport :是否开通了技术支持服务(Yes,No,No internetserive 三种)
StreamingTV :是否开通网络电视(Yes,No,No internetserive 三种)
StreamingMovies :是否开通网络电影(Yes,No,No internetserive 三种)
Contract :签订合同方式 (按月,一年,两年)
PaperlessBilling :是否开通电子账单(Yes or No)
PaymentMethod :付款方式(bank transfer,credit card,electronic check,mailed check)
MonthlyCharges :月费用
TotalCharges :总费用

研究对象Churn:该用户是否流失(Yes or No)

4.分析思路

分析视角分析方法的灵魂。

分析方法有上百种,但分析视角只有四种:

  • 对比视角
  • 分类视角
  • 相关视角
  • 描述视角

一旦将业务需求拆解成指标,接下来只需要针对每个指标进行分析视角四选一即可。

数据集描述,已经将变量分为三个维度了:用户属性、用户行为、研究对象(是否流失客户),三个维度组合一下就得出了以下解题思路了:

  • 哪些属性的用户比较容易流失?
  • 哪些行为的用户比较容易流失?

以上两个分析思路运用的是【对比视角】,该视角下具体的分析方法有:

  • 数值型数据:均值比较
  • 分类型数据:频数分布比较(交叉分析)

以上的分析方法是统计分析,只能一个维度一个维度地去比较。但实际情况中,并不是每个维度的权重都一样的,那如何去研究各个维度的权重?

权重问题属于分类视角,故我们可以采用分类模型,要用哪个分类模型呢?不知道。可以全部采用,看模型精度得分,然后选得分最高的模型进行进一步预测。

  • Random Forest 随机森林
  • SVC 支持向量机
  • LogisticRegression 逻辑回归
  • KNN 近邻算法
  • Naive Bayes  朴素贝叶斯
  • Decision Tree 决策树
  • AdaBoost
  • GradientBoosting
  • XGB
  • CatBoost

5.分析结论及运营建议

5.1 分析结论

综合统计分析XGB算法输出特征重要性得出流失客户有以下特征(依特征重要性从大到小排列):

  1. tenure :1-5号职位的用户比较容易流失
  2. PaymentMethod :使用电子支票支付的人
  3. MonthlyCharges 、TotalCharges : 总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失
  4. PaperlessBilling : 开通电子账单
  5. Partner : 单身
  6. OnlineBackup : 没开通在线备份业务
  7. InternetService :开通了Fiber optic 光纤网络
  8. TechSupport :没开通“技术支持服务”
  9. DeviceProtection :没开通设备保护业务
  10. OnlineSecurity :没开通网络安全服务
  11. Contract :按月签订合同方式
  12. Dependents :无经济独立
  13. SeniorCitizen :青年人
  14. TotalCharges :总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失

当条件覆盖得越多,人群越精确,但与此同时,覆盖的人群也会越少。业务方可直接在数据库中,通过SQL检索符合要求的客户,然后做针对性的运营工作。

5.2 运营建议

如何留住客户,可以从两方面去思考:

  • 增加用户的沉没成本(损失厌恶)
    • 会员等级
    • 积分制
    • 充值赠送
    • 满减券
    • 其他增值服务
  • 培养用户的条件反射(习惯)
    • 会员日
    • 定期用户召回
    • 签到
    • 每日定时抽奖
    • 小游戏

电子账单解锁新权益

  • 现象:“开通电子账单”的人反而容易流失。
  • 基本假设:价格敏感型客户。电子账单,让客户理性消费。
  • 建议:让“电子账单”变成一项“福利。跟连锁便利店,联名发”商品满减券”,每月的账单时间,就将”商品满减券“和账单一起推送过去。文案:您上月消费了XX元,解锁了xx会员权益。
  • 底层规律:增加沉没成本。

“单身用户”尊享亲情网

  • 现象:“单身用户”容易流失。
  • 基本假设:社交欲望低。
  • 建议:一个单身用户拥有建立3个人以内的“亲情网”的权益。
  • 底层规律:增加沉没成本。

推广“在线备份、设备保护、技术支持、网络保护”等增值服务。

6.实战教程-数据清洗

6.1 导入模块

6.1.1 数据处理

import pandas as pd
import numpy as np

6.1.2 可视化

import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style='darkgrid',font_scale=1.3)
plt.rcParams['font.family']='SimHei'
plt.rcParams['axes.unicode_minus']=False

6.1.3 特征工程

import sklearn
from sklearn import preprocessing                            #数据预处理模块
from sklearn.preprocessing import LabelEncoder               #编码转换
from sklearn.preprocessing import StandardScaler             #归一化
from sklearn.model_selection import StratifiedShuffleSplit   #分层抽样
from sklearn.model_selection import train_test_split         #数据分区
from sklearn.decomposition import PCA                        #主成分分析 (降维)

6.1.4 分类算法

from sklearn.ensemble import RandomForestClassifier     #随机森林
from sklearn.svm import SVC,LinearSVC                   #支持向量机
from sklearn.linear_model import LogisticRegression     #逻辑回归
from sklearn.neighbors import KNeighborsClassifier      #KNN算法
from sklearn.cluster import KMeans                     #K-Means 聚类算法
from sklearn.naive_bayes import GaussianNB              #朴素贝叶斯
from sklearn.tree import DecisionTreeClassifier         #决策树

6.1.5 分类算法–集成学习

import xgboost as xgb
from xgboost import XGBClassifier                      
from catboost import CatBoostClassifier                
from sklearn.ensemble import AdaBoostClassifier        
from sklearn.ensemble import GradientBoostingClassifier 

6.1.6 模型评估

from sklearn.metrics import classification_report,precision_score,recall_score,f1_score  #分类报告
from sklearn.metrics import confusion_matrix           #混淆矩阵
from sklearn.metrics import silhouette_score           #轮廓系数(评价k-mean聚类效果)
from sklearn.model_selection import GridSearchCV       #交叉验证
from sklearn.metrics import make_scorer
from sklearn.ensemble import VotingClassifier          #投票

6.1.7 忽略警告

import warnings
warnings.filterwarnings('ignore')

6.2 读取数据

df=pd.read_csv(r'C:\Users\Think\Desktop\刻意练习数据\电信数据集\Customer-Churn.csv',header=0)
#预览数据
df.head()
#查看数据大小
df.shape
#查看数据数据及分布
df.describe()

这里安利一下spyder编辑器,下图是这个编辑器的界面。编程过程中,有赋值变量的操作,该编辑器都会在右上角呈现,双击一下,就可以像在Execel上查看数据,非常方便。查看该数据集的详情。

6.3 数据清洗

6.3.1 缺失值处理

#查看缺失值
df.isnull().sum()

注:缺失值的数据类型是 float 类型。一旦有变量的数据类型转换成float 类型,需再次查看缺失值。

6.3.2 重复值处理

#查看重复值
df.duplicated().sum()

【输出】

6.3.3 数值类型转换

#查看数据类型
df.info()

【输出】TotalCharages总费用应该跟MonthlvCharges是同一个数据类型(float64)。故需将TotalCharages由object转换成float64,且需要再次查看缺失值。

#总费用 TotalCharges  该列的数据类型应是float64,不是object
# df['TotalCharges'].astype('float64')
# 此处用“astype”转化数据类型报错 “could not convert string to float”
#改用强制转化 convert_numeric=True   
df['TotalCharges']=df['TotalCharges'].convert_objects(convert_numeric=True)
df['TotalCharges'].dtype

输出如下:再次查看缺失值:TotalCharges列有11个缺失值,处理缺失值的原则是尽量填充,最后才是删除。
缺失值填充的原则:

  • 分类型数据:众数填充
  • 数值型数据:正态分布,均值/中位数填充;偏态分布,中位数填充。

TotalCharges列是数值型数据,先画直方图查看数据分布形态。

#分别作直方图:全部客户类型、流失客户类型、留存客户类型
plt.figure(figsize=(14,5))
plt.subplot(1,3,1)
plt.title('全部客户的总付费直方图')
sns.distplot(df['TotalCharges'].dropna())

plt.subplot(1,3,2)
plt.title('流失客户的总付费直方图')
sns.distplot(df[df['Churn']=='Yes']['TotalCharges'].dropna())

plt.subplot(1,3,3)
plt.title('留存客户的总付费直方图')
sns.distplot(df[df['Churn']=='No']['TotalCharges'].dropna())

结果如下:从三个直方图看,该列数据是偏态分布,故选择中位数填充。

df.fillna({'TotalCharges':df['TotalCharges'].median()},inplace=True)
#再次确认是否还有空值
df.isnull().sum()

结果如下:

6.4 查看样本分布

研究对象’Churn’列重新编码“Yes”=1,“No”=0。重新编码有下面两种方法。

方法一:replace

df['Churn'].replace(to_replace = 'Yes', value = 1,inplace = True)
df['Churn'].replace(to_replace = 'No', value = 0,inplace = True)

方法二:map函数

df['Churn']=df['Churn'].map({'Yes':1,'No':0})

预览数据:

df['Churn'].head()

结果如下:绘制饼图,查看流失客户占比。

churn_value=df["Churn"].value_counts()
labels=df["Churn"].value_counts().index

plt.figure(figsize=(7,7))
plt.pie(churn_value,labels=labels,colors=["b","w"], explode=(0.1,0),autopct='%1.1f%%', shadow=True)
plt.title("流失客户占比高达26.5%")
plt.show()  

结果如下:【分析】:流失客户样本占比26.5%,留存客户样本占比73.5%,明显的“样本不均衡”。

解决样本不均衡有以下方法可以选择:

  • 分层抽样
  • 过抽样
  • 欠抽样

7.实战教程-特征选择

提取特征

feature=df.iloc[:,1:20]

7.1 整数编码

查看变量间的两两相关性

#重新编码
corr_df = feature.apply(lambda x: pd.factorize(x)[0])
corr_df.head()
#相关性矩阵
corr=corr_df.corr()
corr

结果如下:相关性矩阵可视化

#绘制热力图观察变量之间的相关性强弱
plt.figure(figsize=(15,12))
ax = sns.heatmap(corr, xticklabels=corr.columns, yticklabels=corr.columns, 
                 linewidths=0.2, cmap="RdYlGn",annot=True)
plt.title("Correlation between variables")

结果如下:【分析】:从热力图来看,互联网服务、网络安全、在线备份、设备维护服务、技术支持服务、开通网络电视服务、开通网络电影之间相关性很强,且是正相关。电话服务和多线业务之间也存在很强的正相关关系。

7.2 独热编码

查看研究对象”Churn”与其他变量下的标签相关性。独热编码,可以将分类变量下的标签转化成列

df_onehot = pd.get_dummies(df.iloc[:,1:21])
df_onehot.head()

结果如下:绘图查看用户流失(‘Churn’)与各个维度之间的关系

plt.figure(figsize=(15,6))
df_onehot.corr()['Churn'].sort_values(ascending=False).plot(kind='bar')
plt.title('Correlation between Churn  and variables ')

结果如下:【分析】:从图看gender(性别)、PhoneService(电话服务)相关性几乎为0,故两个维度可以忽略。[‘SeniorCitizen’,’Partner’,’Dependents’, ‘Contract’,MultipleLines,’InternetService’,  ‘OnlineSecurity’, ‘OnlineBackup’, ‘DeviceProtection’,’TechSupport’, ‘StreamingTV’, ‘StreamingMovies’,’PaperlessBilling’,’PaymentMethod’]等都有较高的相关性,将以上维度合并成一个列表kf_var,然后进行频数比较。

kf_var=list(df.columns[2:5])
for var in list(df.columns[7:18]):
    kf_var.append(var)
print('kf_var=',kf_var)

结果如下:

8.实战教程-统计分析

8.1 频数分布比较

8.1.1 卡方检验

组间有显著性差异,频数分布比较才有意义,否则可能会做无用功。
“卡方检验”,就是提高频数比较结论可信度的统计方法。

#分组间确实是有显著性差异,频数比较的结论才有可信度,故需进行”卡方检验“
from scipy.stats import chi2_contingency   #统计分析 卡方检验
#自定义卡方检验函数
def KF(x):
    df1=pd.crosstab(df['Churn'],df[x])
    li1=list(df1.iloc[0,:])
    li2=list(df1.iloc[1,:])
    kf_data=np.array([li1,li2])
    kf=chi2_contingency(kf_data)
    if kf[1]<0.05:
        print('Churn by {} 的卡方临界值是{:.2f},小于0.05,表明{}组间有显著性差异,可进行【交叉分析】'.format(x,kf[1],x),'\n')
    else:
        print('Churn by {} 的卡方临界值是{:.2f},大于0.05,表明{}组间无显著性差异,不可进行交叉分析'.format(x,kf[1],x),'\n')
#对 kf_var进行卡方检验
print('kf_var的卡方检验结果如下:','\n')
print(list(map(KF, kf_var)))

kf_var的卡方检验结果如下:

Churn by SeniorCitizen 的卡方临界值是0.00,小于0.05,表明SeniorCitizen组间有显著性差异,可进行【交叉分析】

Churn by Partner 的卡方临界值是0.00,小于0.05,表明Partner组间有显著性差异,可进行【交叉分析】

Churn by Dependents 的卡方临界值是0.00,小于0.05,表明Dependents组间有显著性差异,可进行【交叉分析】

Churn by MultipleLines 的卡方临界值是0.99,大于0.05,表明MultipleLines组间无显著性差异,不可进行交叉分析

Churn by InternetService 的卡方临界值是0.00,小于0.05,表明InternetService组间有显著性差异,可进行【交叉分析】

Churn by OnlineSecurity 的卡方临界值是0.00,小于0.05,表明OnlineSecurity组间有显著性差异,可进行【交叉分析】

Churn by OnlineBackup 的卡方临界值是0.00,小于0.05,表明OnlineBackup组间有显著性差异,可进行【交叉分析】

Churn by DeviceProtection 的卡方临界值是0.00,小于0.05,表明DeviceProtection组间有显著性差异,可进行【交叉分析】

Churn by TechSupport 的卡方临界值是0.00,小于0.05,表明TechSupport组间有显著性差异,可进行【交叉分析】

Churn by StreamingTV 的卡方临界值是0.00,小于0.05,表明StreamingTV组间有显著性差异,可进行【交叉分析】

Churn by StreamingMovies 的卡方临界值是0.00,小于0.05,表明StreamingMovies组间有显著性差异,可进行【交叉分析】

Churn by Contract 的卡方临界值是0.00,小于0.05,表明Contract组间有显著性差异,可进行【交叉分析】

Churn by PaperlessBilling 的卡方临界值是0.00,小于0.05,表明PaperlessBilling组间有显著性差异,可进行【交叉分析】

Churn by PaymentMethod 的卡方临界值是0.00,小于0.05,表明PaymentMethod组间有显著性差异,可进行【交叉分析】

从卡方检验的结果,kf_var包含的特征,组间都有显著性差异,可进行频数比较。

8.1.2 柱形图

频数比较–柱形图

plt.figure(figsize=(20,25))
a=0
for k in kf_var:
    a=a+1
    plt.subplot(4,4,a)
    plt.title('Churn BY '+ k)
    sns.countplot(x=k,hue='Churn',data=df)

结果如下:因为PaymentMethod的标签比较长,影响看图,所以单独画。

plt.xticks(rotation=45)
sns.countplot(x='PaymentMethod',hue='Churn',data=df)

可以直接从柱形图去判断对哪个维度对流失客户的影响大吗?不能,因为“样本不均衡”(流失客户样本占比26.5%,留存客户样本占比73.5%),基数不一样,故不能直接通过“频数”的柱形图去分析。
解决办法:交叉分析,且作同行百分比(’Churn’作为“行”)

8.1.3 交叉分析

print('ka_var列表中的维度与Churn交叉分析结果如下:','\n')
for i in kf_var:
    print('................Churn BY {}...............'.format(i))
    print(pd.crosstab(df['Churn'],df[i],normalize=0),'\n'#交叉分析,同行百分比

ka_var列表中的维度与Churn交叉分析结果如下:【SeniorCitizen 分析】:年轻用户 在流失、留存,两个标签的人数占比都高。【Parter 分析】:单身用户更容易流失。【Denpendents 分析】:经济不独立的用户更容易流失。【MultipleLines 分析】:是否开通MultipleLines,对留存和流失都没有明显的促进作用。【InternetService 分析】:办理了 “Fiber optic 光纤网络”的客户容易流失。【OnlineSecurity 分析】:没开通“网络安全服务”的客户容易流失。【OnlineBackup 分析】:没开通“在线备份服务”的客户容易流失。【DeviceProtection 分析】:没开通“设备保护业务”的用户比较容易流失【TechSupport 分析】:没开通“技术支持服务”的用户容易流失。【StreamingTV 分析】:是否开通“网络电视”服务,对用户留存、流失,没有明显的促进作用。【StreamingMovies 分析】:是否开通“网络电影”服务,对用户留存、流失,没有明显的促进作用。【Contract 分析】逐月签订合同的用户最容易流失。

因为”Churn BY PaymentMethod”打印出来显示不全,故我就从临时表将“交叉表”给截图出来了: 【分析】使用“电子支票”支付的人更容易流失。

8.2 均值比较

组间有显著性差异,均值比较才有意义。
显著性检验,先通过了齐性检验,再通过方差分析,最后才能做均值比较。

8.2.0 齐性检验,方差分析

#自定义齐性检验 & 方差分析 函数
def ANOVA(x):
    li_index=list(df['Churn'].value_counts().keys())
    args=[]
    for i in li_index:
        args.append(df[df['Churn']==i][x])
    w,p=stats.levene(*args)             #齐性检验
    if p<0.05:
        print('警告:Churn BY {}的P值为{:.2f},小于0.05,表明齐性检验不通过,不可作方差分析'.format(x,p),'\n')
    else:
        f,p_value=stats.f_oneway(*args) #方差分析
        print('Churn BY {} 的f值是{},p_value值是{}'.format(x,f,p_value),'\n')
        if p_value<0.05:
            print('Churn BY {}的均值有显著性差异,可进行均值比较'.format(x),'\n')
        else:
            print('Churn BY {}的均值无显著性差异,不可进行均值比较'.format(x),'\n')

对MonthlyCharges、TotalCharges维度分别进行齐性检验和方差分析

print('MonthlyCharges、TotalCharges的齐性检验 和方差分析结果如下:','\n')
ANOVA('MonthlyCharges')
ANOVA('TotalCharges')

【输出】:
MonthlyCharges、TotalCharges的齐性检验 和方差分析结果如下:

警告:Churn BY MonthlyCharges的P值为0.00,小于0.05,表明齐性检验不通过,不可作方差分析

警告:Churn BY TotalCharges的P值为0.00,小于0.05,表明齐性检验不通过,不可作方差分析

8.3 总结

用户出现以下特征比较容易流失:

  • SeniorCitizen:青年人
  • Partner :单身
  • Dependents :无经济独立
  • InternetService:开通了 “Fiber optic 光纤网络”
  • OnlineSecurity:没开通“网络安全服务”
  • OnlineBackup:没开通“在线备份业务”
  • DeviceProtection:没开通通了“设备保护业务
  • TechSupport:没开通“技术支持服务”
  • Contract:“按月”签订合同方式
  • PaperlessBilling:开通电子账单
  • PaymentMethod:使用“电子支票”支付的人

我们可以在SQL(数据库)上找有以上特征的客户,进行精准营销,即可以降低用户流失。虽然特征选得越多,越精确,但覆盖的人群也会越少。故,我们还需要计算“特征”的【重要性】,将最为重要的几个特征作为筛选条件。

计算特征的【重要性】,是“分类视角”,接下来我们会挑选常见的分类模型,进行批量训练,然后挑出得分最高的模型,进一步计算“特征重要性”。

9.实战教程-特征工程

9.1 提取特征

有前面的流失率与各个维度的相关系数柱状图可知:
流失率与gender(性别)、PhoneService(电话服务)相关性几乎为0,可以筛选掉,而customerID是随机数,不影响建模,故可以筛选掉。最终得到特征 churn_var

churn_var=df.iloc[:,2:20]
churn_var.drop("PhoneService",axis=1, inplace=True)
churn_var.head()

结果如下:

9.2 处理“量纲差异大”

“MonthlyCharges”、”TotalCharges”两个特征跟其他特征相比,量纲差异大。处理量纲差异大,有两种方法:

  1. 标准化
  2. 离散化

以上两种方法,哪个能让模型精度提高,就选哪个。根据模型的最后得分,我选了“离散化”来处理量纲差异大。

9.2.1 标准化

scaler = StandardScaler(copy=False)
scaler.fit_transform(churn_var[['MonthlyCharges','TotalCharges']])  #fit_transform拟合数据
churn_var[['MonthlyCharges','TotalCharges']]=scaler.transform(churn_var[['MonthlyCharges','TotalCharges']])  #transform标准化

print(churn_var[['MonthlyCharges','TotalCharges']].head() )#查看拟合结果

【输出】

9.2.2 特征离散化

特征离散化后,模型易于快速迭代,且模型更稳定。

1、处理’MonthlyCharges’:

#查看'MonthlyCharges'列的4分位
churn_var['MonthlyCharges'].describe() 

离散操作
18.25=<churn_var[‘MonthlyCharges’]<=35.5,标记 “1”
35.5<churn_var[‘MonthlyCharges’]<=70.35,标记 “2”
70.35<churn_var[‘MonthlyCharges’]<=89.85,标记 “3”
89.85=<churn_varf[‘MonthlyCharges’]<=118.75,标记“4”

#用四分位数进行离散
churn_var['MonthlyCharges']=pd.qcut(churn_var['MonthlyCharges'],4,labels=['1','2','3','4'])
churn_var['MonthlyCharges'].head()

结果如下:2、处理’TotalCharges’:

#查看'TotalCharges'列的4分位
churn_var['TotalCharges'].describe()

结果如下:

离散操作:
18=<churn_var[‘TotalCharges’]<=402,标记 “1”
402<churn_var[‘TotalCharges’]<=1397,标记 “2”
1397<churn_var[‘TotalCharges’]<=3786,标记 “3”
3786<churn_var[‘TotalCharges’]<=8684,标记 “4”

#用四分位数进行离散 
churn_var['TotalCharges']=pd.qcut(churn_var['TotalCharges'],4,labels=['1','2','3','4'])
churn_var['TotalCharges'].head()

【输出】

9.3 分类数据转换成“整数编码”

9.3.1 查看churn_var中分类变量的label(标签)

#自定义函数获取分类变量中的label
def Label(x):
    print(x,"--" ,churn_var[x].unique()) 
#筛选出数据类型为“object”的数据点
df_object=churn_var.select_dtypes(['object']) 
print(list(map(Label,df_object)))

结果如下:

通过同行百分比的“交叉分析”发现,label “No internetserive”的人数占比在以下特征[OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingTV]都是惊人的一致,故我们可以判断label “No internetserive”不影响流失率。
因为这6项增值服务,都是需要开通“互联网服务”的基础上才享受得到的。不开通“互联网服务”视为没开通这6项增值服务,故可以将 6个特正中的“No internetserive” 并到 “No”里面。

churn_var.replace(to_replace='No internet service',value='No',inplace=True)

而特征MultipleLines的“ No phoneservice”在流失客户、留存客户样本中的人数占比几乎接近,且比较少,故可以将“ No phoneservice”并到“No”。

churn_var.replace(to_replace='No phone service',value='No',inplace=True)
df_object=churn_var.select_dtypes(['object']) 
print(list(map(Label,df_object.columns)))

结果如下:

9.3.2 整数编码

整数编码的方法有两种:

  1. sklearn中的LabelEncoder()
  2. pandas中的factorize
    此处选用 LabelEncoder()
def labelencode(x):
    churn_var[x] = LabelEncoder().fit_transform(churn_var[x])
for i in range(0,len(df_object.columns)):
    labelencode(df_object.columns[i])
print(list(map(Label,df_object.columns)))

结果如下:

9.4 处理“样本不均衡”

分拆变量

x=churn_var
y=df['Churn'].values
print('抽样前的数据特征',x.shape)
print('抽样前的数据标签',y.shape)

【输出】
抽样前的数据特征 (7043, 17)
抽样前的数据标签 (7043,)

处理样本不均衡常用的方式有三种:

  1. 分层抽样
  2. 过抽样
  3. 欠抽样

笔者先后尝试了“分层抽样”和“欠抽样”,前者最终得到的模型中精度最高的是0.63,而后者最终得到的模型中精度最低是0.78,最高是0.84。所以说“抽样方式”的选择极为重要,大家要在这里多试错。

分层抽样

sss=StratifiedShuffleSplit(n_splits=5, test_size=0.2, random_state=0)
print(sss)
print("训练数据和测试数据被分成的组数:",sss.get_n_splits(x,y))
# 分拆训练集和测试集
for train_index, test_index in sss.split(x, y):
    print("train:", train_index, "test:", test_index)
    x_train,x_test=x.iloc[train_index], x.iloc[test_index]
    y_train,y_test=y[train_index], y[test_index]

“过抽样”让模型精度更高,故我选“过抽样”。

from imblearn.over_sampling import SMOTE
model_smote=SMOTE()
x,y=model_smote.fit_sample(x,y)
x=pd.DataFrame(x,columns=churn_var.columns)
#分拆数据集:训练集 和 测试集
x_train,x_test,y_train,y_test=train_test_split(x,y,test_size=0.3,random_state=0)

输出数据集大小

print('过抽样数据特征:', x.shape,
      '训练数据特征:',x_train.shape,
      '测试数据特征:',x_test.shape)

print('过抽样后数据标签:', y.shape,
      '   训练数据标签:',y_train.shape,
      '   测试数据标签:',y_test.shape)

【输出】

过抽样后数据特征: (10348, 17) 训练数据特征: (7243, 17) 测试数据特征: (3105, 17)
过抽样后数据标签: (10348,)    训练数据标签: (7243,)    测试数据标签: (3105,)

10.实战教程-数据建模

使用分类算法

Classifiers=[["Random Forest",RandomForestClassifier()],
             ["Support Vector Machine",SVC()],
             ["LogisticRegression",LogisticRegression()],
             ["KNN",KNeighborsClassifier(n_neighbors=5)],
             ["Naive Bayes",GaussianNB()],
             ["Decision Tree",DecisionTreeClassifier()],
             ["AdaBoostClassifier", AdaBoostClassifier()],
             ["GradientBoostingClassifier", GradientBoostingClassifier()],
             ["XGB", XGBClassifier()],
             ["CatBoost", CatBoostClassifier(logging_level='Silent')]  
]

训练模型

Classify_result=[]
names=[]
prediction=[]
for name,classifier in Classifiers:
    classifier=classifier
    classifier.fit(x_train,y_train)
    y_pred=classifier.predict(x_test)
    recall=recall_score(y_test,y_pred)
    precision=precision_score(y_test,y_pred)
    f1score = f1_score(y_test, y_pred)
    class_eva=pd.DataFrame([recall,precision,f1score])
    Classify_result.append(class_eva)
    name=pd.Series(name)
    names.append(name)
    y_pred=pd.Series(y_pred)
    prediction.append(y_pred)

11.模型评估

names=pd.DataFrame(names)
names=names[0].tolist()
result=pd.concat(Classify_result,axis=1)
result.columns=names
result.index=["recall","precision","f1score"]
result

【输出】
特征工程,采用“标准化”处理量纲差异,采用“分层抽样”处理样本不均衡。
最终模型精度得分,最高分是0.63,是“朴素贝叶斯”模型

特征工程,采用“离散化”处理量纲差异,采用“过抽样”处理样本不均衡。
最终模型精度得分,最高分是0.84,是“XGB”模型

12.基于“XGB”模型输出特征重要性

笔者尝试了两个算法分别输出“特征重要性”:CatBoost算法 和 XGB 算法

  • CatBoost算法
model = CatBoostClassifier()
model.fit(x_train,y_train,eval_set=(x_test, y_test),plot=True)
#特征重要性可视化
catboost=pd.DataFrame(columns=['feature','feature_importance'])
catboost['feature']=model.feature_names_
catboost['feature_importance']=model.feature_importances_
catboost=catboost.sort_values('feature_importance',ascending=False#降序排列
plt.figure(figsize=(10,10))
plt.title('特征重要性')
sns.barplot(x='feature_importance',y='feature',data=catboost)

【输出】-XGB 算法

model_xgb= XGBClassifier()
model_xgb.fit(x_train,y_train)
from xgboost import plot_importance
plot_importance(model_xgb,height=0.5)
plt.show()

【输出】由于 XGB算法精度得分最高,故我们以XGB得到的“特征重要性”进行分析。
【分析】

  1. 第一重要特征:tenure
plt.figure(figsize=(20,4))
sns.countplot(x='tenure',hue='Churn',data=df)

【输出】【分析】
由图可知,流失客户集中在1-5号职位,运营团队需要重点关注1-5号职位。

  1. 第二重要特征:PaymentMethod

【分析】
使用“电子支票”支付的人更容易流失。

  1. 第三重要特征:MonthlyCharges
    查看流失用户、留存用户在付费方面的偏好:
    ‘MonthlyCharges’、’TotalCharges’,离散化后,可进行卡方检验,然后交叉分析。
  • 卡方检验:’MonthlyCharges’、’TotalCharges’
df['MonthlyCharges-']=churn_var['MonthlyCharges']
df['TotalCharges-']=churn_var['TotalCharges']
print('kf_var的卡方检验结果如下:','\n')
KF('MonthlyCharges-')
KF('TotalCharges-')

【输出】
kf_var的卡方检验结果如下:

Churn by MonthlyCharges 的卡方临界值是0.00,小于0.05,表明MonthlyCharges组间有显著性差异,可进行【交叉分析】

Churn by TotalCharges 的卡方临界值是0.00,小于0.05,表明TotalCharges组间有显著性差异,可进行【交叉分析】

  • 交叉分析
for i in ['MonthlyCharges','TotalCharges']:
    print('................Churn BY {}...............'.format(i))
    print(pd.crosstab(df['Churn'],df[i],normalize=0),'\n')

【输出】18.25=<churn_var[‘MonthlyCharges’]<=35.5,标记 “1”
35.5<churn_var[‘MonthlyCharges’]<=70.35,标记 “2”
70.35<churn_var[‘MonthlyCharges’]<=89.85,标记 “3”
89.85=<churn_varf[‘MonthlyCharges’]<=118.75,标记“4”
【分析】
月付费70.35–118.75元的用户更容易流失18=<churn_var[‘TotalCharges’]<=402,标记 “1”
402<churn_var[‘TotalCharges’]<=1397,标记 “2”
1397<churn_var[‘TotalCharges’]<=3786,标记 “3”
3786<churn_var[‘TotalCharges’]<=8684,标记 “4”
【分析】
总付费18–1397元的用户更容易流失

基于”MonthlyCharges”和“TotalCharges”画四分图:
求两个维度的均值

 print('MonthlyCharges的均值是{:.2f},TotalCharges的均值是{:.2f}'.format(df['MonthlyCharges'].mean(),df['TotalCharges'].mean()))

流失客户四分图:

df_1=df[df['Churn']==1#流失客户
df_0=df[df['Churn']==0#留存客户
plt.figure(figsize=(10,10))   
sns.scatterplot('MonthlyCharges','TotalCharges',hue='Churn', palette=plt.cm.RdYlBu,data=df_1)
plt.axhline(y=df['TotalCharges'].mean(),ls="-",c="k")
plt.axvline(x=df['MonthlyCharges'].mean(),ls="-",c="green")

【输出】【分析】
四分图的右下区域,流失客户比较集中,即总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失。

留存客户四分图

plt.figure(figsize=(10,10)) 
sns.scatterplot('MonthlyCharges','TotalCharges',hue='Churn', palette=plt.cm.RdYlBu_r,data=df_0)
plt.axhline(y=df['TotalCharges'].mean(),ls="-",c="k")
plt.axvline(x=df['MonthlyCharges'].mean(),ls="-",c="green")

【输出】【结论】
综合“ 统计分析” 和 “XGB算法输出特征重要性” 得出流失客户有以下特征(依特征重要性从大到小排列):

  1. tenure:1-5号职位的用户比较容易流失
  2. PaymentMethod:使用“电子支票”支付的人
  3. MonthlyCharges 、TotalCharges:总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失
  4. PaperlessBilling:开通电子账单
  5. Partner:单身
  6. OnlineBackup:没开通“在线备份业务”
  7. InternetService:开通了 “Fiber optic 光纤网络”
  8. TechSupport:没开通“技术支持服务”
  9. DeviceProtection:没开通通了“设备保护业务
  10. OnlineSecurity:没开通“网络安全服务”
  11. Contract:“按月”签订合同方式
  12. Dependents:无经济独立
  13. SeniorCitizen :青年人
  14. TotalCharges:总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失

转自猫有九条命。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化全方位实战教程

本文讲解了Pandas性能优化的几种方法,比如第一篇文章讲到了transform函数的应用、Cython编写C扩展、减少类型转换、使用特殊的数组。第二篇Pandas基础优化讲到了尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。第三篇进阶版优化讲到了一些更高级的优化技巧。

1.Pandas 性能优化 40 倍 – DataFrame

1. 1 性能优化小试牛刀

大名鼎鼎的Pandas是数据分析的神器。有时候我们需要对上千万甚至上亿的数据进行非常复杂处理,那么运行效率就是一个不能忽视的问题。

比如下面这个简单例子,我们随机生成100万条数据,对val这一列进行处理:如果是偶数则减1,奇数则加1。实际的数据分析工作要比这个例子复杂的多,但考虑到我们没有那么多时间等待运行结果,所以就偷个懒吧。可以看到transform函数的平均运行时间是284ms:

import pandas as pd
import numpy as np

def gen_data(size):
    d = dict()
    d["genre"] = np.random.choice(["A""B""C""D"], size=size)
    d["val"] = np.random.randint(low=0, high=100, size=size)
    return pd.DataFrame(d)

data = gen_data(1000000)
data.head()
def transform(data):
    data.loc[:, "new_val"] = data.val.apply(lambda x: x + 1 if x % 2 else x - 1)

%timeit -n 1 transform(data)
284 ms ± 8.95 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.2. 用Cython编写C扩展

试试用我们的老朋友Cython来写一下 x + 1 if x % 2 else x - 1 这个函数。平均运行时间降低到了202ms,果然速度变快了。性能大约提升了1.4倍,离40倍的flag还差的好远。

%load_ext cython
%%cython
cpdef int _transform(int x):
    if x % 2:
        return x + 1
    return x - 1

def transform(data):
    data.loc[:, "new_val"] = data.val.apply(_transform)

%timeit -n 1 transform(data)
202 ms ± 13.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.3. 减少类型转换

为了减少C和Python之间的类型转换,我们直接把val这一列作为Numpy数组传递给Cython函数,注意区分cnpnp。平均运行时间直接降到10.8毫秒,性能大约提升了26倍,仿佛看到了一丝希望。

%%cython
import numpy as np
cimport numpy as cnp
ctypedef cnp.int_t DTYPE_t

cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr):
    cdef:
        int i = 0
        int n = arr.shape[0]
        int x
        cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)

    while i < n:
        x = arr[i]
        if x % 2:
            new_arr[i] = x + 1
        else:
            new_arr[i] = x - 1
        i += 1
    return new_arr

def transform(data):
    data.loc[:, "new_val"] = _transform(data.val.values)

%timeit -n 1 transform(data)
10.8 ms ± 512 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.4. 使用不安全的数组

利用@cython.boundscheck(False)@cython.wraparound(False)装饰器关闭数组的边界检查和负下标处理,平均运行时间变为5.9毫秒。性能提升了42倍左右,顺利完成任务。

%%cython
import cython
import numpy as np
cimport numpy as cnp
ctypedef cnp.int_t DTYPE_t

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr):
    cdef:
        int i = 0
        int n = arr.shape[0]
        int x
        cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)

    while i < n:
        x = arr[i]
        if x % 2:
            new_arr[i] = x + 1
        else:
            new_arr[i] = x - 1
        i += 1
    return new_arr

def transform(data):
    data.loc[:, "new_val"] = _transform(data.val.values)

%timeit -n 1 transform(data)
6.76 ms ± 545 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

来源:Python中文社区

作者:李小文,先后从事过数据分析、数据挖掘工作,主要开发语言是Python,现任一家小型互联网公司的算法工程师。https://github.com/tushushu

2.Pandas性能优化:基础篇

Pandas 号称“数据挖掘瑞士军刀”,是数据处理最常用的库。在数据挖掘或者kaggle比赛中,我们经常使用pandas进行数据提取、分析、构造特征。而如果数据量很大,操作算法复杂,那么pandas的运行速度可能非常慢。本文根据实际工作中的经验,总结了一些pandas的使用技巧,帮助提高运行速度或减少内存占用。

2.1 按行迭代优化

很多时候,我们会按行对dataframe进行迭代,一般我们会用iterrows这个函数。在新版的pandas中,提供了一个更快的itertuples函数。

我们测试一下速度:

import pandas as pd
import numpy as np
import time
df = pd.DataFrame({'a': np.random.randn(1000),
                     'b': np.random.randn(1000),
                    'N': np.random.randint(100, 1000, (1000)),
                   'x':  np.random.randint(1, 10, (1000))})

%%timeit
a2=[]
for index,row in df.iterrows():
    temp=row['a']
    a2.append(temp*temp)
df['a2']=a2    

67.6 ms ± 3.69 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
a2=[]
for row in df.itertuples():
    temp=getattr(row, 'a')
    a2.append(temp*temp)
df['a2']=a2

1.54 ms ± 168 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看到直接循环,itertuples速度是iterrows的很多倍。

所以如果在必须要对dataframe进行遍历的话,直接用itertuples替换iterrows。

2.2 apply 优化

一般情况下,如果要对dataframe里的数据逐行处理,而不需要上下文信息,可以使用apply函数。
对于上面的例子,我们使用apply看下:

%%timeit
df['a2']=df.a.apply(lambda x: x*x)

360 µs ± 355 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看到,效率又有提升,apply的速度是itertuples的5倍左右。

注意一下,apply不光能对单个列值做处理,也能对多个列的值做处理。

%%timeit
df['a3']=df.apply( lambda row: row['a']*row['b'],axis=1)

15 ms ± 1.61 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)

这里看出来,多值处理的时候几乎等于iterrows。因此比单列值apply慢了许多,所以这里不推荐对整行进行apply。

我们可以简单的这样改写:

%%timeit
df['a3']=df['a']*df['b']

204 µs ± 8.31 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

如果在计算的时候,使用series.values 提取numpy 数组并使用numpy原生函数计算,效率可能更高

%%timeit
df['a3']=np.multiply(df['a'].values,df['b'].values)

93.3 µs ± 1.45 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

2.3 聚合agg效率优化

有的时候,我们会对一列数据按值进行分组(groupby),再分组后,依次对每一组数据中的其他列进行聚合(agg)。

还是上面的那个dataframe,我们看下:
采用自定义函数的agg函数:

%%timeit
df.groupby("x")['a'].agg(lambda x:x.sum())

1.27 ms ± 45.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

采用agg内置函数:

%%timeit
df.groupby("x")['a'].agg(sum)
#等价df.groupby("x")['a'].sum()

415 µs ± 20.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

刚才是单列聚合,我们看下多列聚合:
自定义函数:

%%timeit
df.groupby("x").agg({"a":lambda x:x.sum(),"b":lambda x:x.count()})

2.6 ms ± 8.17 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

采用内置函数:

%%timeit
df.groupby("x").agg({"a":"sum","b":"count"})

1.33 ms ± 29.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看出,对于多列聚合,内置函数仍然比自定义函数快1倍。所以再进行聚合操作时,尽量使用内置函数提高效率。

下面列出了一些内置函数:

内置函数描述
count计数
sum求和
mean平均值
median中位数
min,max最大值/最小值
std,var标准差/方差
prod求积

当我们需要统计的方法没有对于内置函数的情况下,在自定义函数的时候,优先选用pandas或numpy内置的其他高效函数,如

df.groupby("x")['a'].agg(lambda x:np.sum(np.abs(x)))

在数据量很大的时候要比非numpy函数效率高(数据量小的时候差不多)。

2.4 数据读取优化

如果我们需要用pandas读取较多次文件,或者读取的文件较大,那么这方面占用的时间也会较长,我们也需要对其进行优化。pandas最常见读取函数是read_csv函数,可以对csv文件进行读取。但是pandas read_csv对读取较大的、数据结构复杂的文件,效率不是很理想。

这里有几种方法可以优化:

1. 分块读取csv文件
如果文件过大,内存不够或者时间上等不及,可以分块进行读取。

chunksize = 10 ** 6for chunk in pd.read_csv(filename,  chunksize=chunksize):
     process(chunk)

这里也可以使用多进程方式,具体我们在后面进阶篇介绍。

  1. 过滤掉不需要的列

如果读取的文件列很多,可以使用usecols字段,只load需要的列,提高效率,节约内存。

df = pd.read_csv(filename,  usecols=["a","b","c"]):
  1. 为列指定类型
    panda在read_csv的时候,会自动匹配列的数值类型,这样会导致速度很慢,并且占用内存较大。我们可以为每个列指定类型。
df = pd.read_csv("f500.csv", dtype = {"revenues" : "float64","name":str})
  1. 保存为其他格式
    如果需要频繁的读取和写入,则可以将文件保存为其他格式的,如pickle或hdf。pickle和hdf读取速度是csv的数倍。这里注意一下,pickle比原csv略小,hdf比原csv略大。
 import pandas as pd
 #读取csv
 df = pd.read_csv('filename.csv')
 
 #pickle格式
 df.to_pickle('filename.pkl') 
 df = pd.read_pickle('filename.pkl') 
 
 #hdf格式
 df.to_hdf('filename.hdf','df') 
 df = pd.read_hdf('filename.hdf','df') 

file typetimespeed
csv1.93 s1
pickle415 m4.6
hdf808 ms2.3
  1. 用第三方的包读取
    如可以使用Dask DataFrame读取大文件。第三方包放到最后细讲。

2.5 优化数据处理逻辑

这点算是业务角度优化。如果我们能直接数据处理的步骤,那么处理时间就少了很多。

这需要具体问题具体分析,举个例子,假设我们有一个通讯记录数据集:

call_areacall_seconds
03364
23075
25847
12032

call_seconds 是拨打的时长,单位是秒。
call_area=1 是拨打国内电话,费率是0.1/min,call_area=0 是拨打国外电话,费率是0.7/min
call_area=2 是接听电话,不收费。

我们随机生成一批数据:

import pandas as pd
import numpy as np
import time
import math
row_number=10000
df = pd.DataFrame({'call_area':  np.random.randint(0, 3, (row_number)),
                   'call_seconds':  np.random.randint(0, 10000, (row_number))})

如果我们想计算每个电话的费用:

def get_cost(call_area,call_seconds):
    if call_area==1:
        rate=0.1
    elif call_area==0:
        rate=0.7
    else:
        return 0
    return math.ceil(call_seconds/60)*rate

方法1,采用按行迭代循环

%%timeit
cost_list = []
for i,row in df.iterrows():
    call_area=row['call_area']
    call_seconds=row['call_seconds']
    cost_list.append(get_cost(call_area,call_seconds))
df['cost'] = cost_list

方法2,采用apply行的方式

%%timeit
df['cost']=df.apply(
         lambda row: get_cost(
             row['call_area'],
             row['call_seconds']),
         axis=1)

方法3,采用mask+loc,分组计算

%%timeit
mask1=df.call_area==1
mask2=df.call_area==0
mask3=df.call_area==2
df['call_mins']=np.ceil(df['call_seconds']/60)
df.loc[mask1,'cost']=df.loc[mask1,'call_mins']*0.1
df.loc[mask2,'cost']=df.loc[mask2,'call_mins']*0.7
df.loc[mask3,'cost']=0

方法4,使用numpy的方式,可以使用index的方式找到对应的费用。

%%timeit
prices = np.array([0.7, 0.1, 0])
mins=np.ceil(df['call_seconds'].values/60)
df['cost']=prices[df.call_area]*mins

测试结果:

方法运行时间运行速度
iterrows513 ms1
apply181 ms2.8
loc6.44 ms79.6
numpy219 µs2342

方法4速度快是因为它采用了numpy向量化的数据处理方式。

总结

在优化尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。

3.Pandas性能优化:进阶篇

在这里介绍一些更高级的pandas优化方法。

3.1 numpy

我们先来回顾一下上节说过的一个例子

import pandas as pd
import numpy as np
import time
row_number=100000
df = pd.DataFrame({'a': np.random.randn(row_number),
                     'b': np.random.randn(row_number),
                    'N': np.random.randint(100, 1000, (row_number)),
                   'x':  np.random.randint(1, 10, (row_number))})

我们要计算a列与b列的乘积

方法1,采用apply

%timeit df.apply( lambda row: row['a']*row['b'],axis=1)

方法2,直接对series做乘法

%timeit df['a']*df['b']

方法3,使用numpy函数

%timeit  np.multiply(df['a'].values,df['b'].values)
方法运行时间运行速度
方法11.45s1
方法2254µs5708
方法341.2 µs3536

这提示我们,采用一些好的方法可以大幅度提高pandas的运行速度。

3.2 cython

我们还继续使用上面的dataframe,现在定义一个函数:

def f(x):
    return x * (x - 1)

def integrate_f(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f(a + i * dx)
    return s * dx

我们要计算每一行integrate_f的值,

方法1,还是apply:

%timeit df.apply(lambda x: integrate_f(x['a'], x['b'], x['N'].astype(int)), axis=1)

这个函数运行时间就较长了:

7.05 s ± 54.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

考虑可以用cython重写程序,提高效率。
在使用cython的时候,可能需要安装gcc环境或者mingw(windows)。

方法1,直接加头编译

%load_ext Cython
%%cython
def f_plain(x):
    return x * (x - 1)

def integrate_f_plain(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

6.46 s ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

看来直接加头,效率提升不大。

方法2,使用c type

%%cython
cdef double f_typed(double x) except? -2:
     return x * (x - 1)
cpdef double integrate_f_typed(double a, double b, int N):
    cdef int i
    cdef double s, dx
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_typed(a + i * dx)
    return s * dx

345 ms ± 529 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

可以看到,使用cython的特定编程方法,效率提升较大。

3.3 numba

numba是一个动态JIT编译器,在一些数值计算中可以大幅度提高运行速度。
我们学cython,在python程序上直接加numba jit的头。

import numba

@numba.jitdef f_plain(x):
    return x * (x - 1)

@numba.jitdef integrate_f_numba(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

@numba.jitdef apply_integrate_f_numba(col_a, col_b, col_N):
    n = len(col_N)
    result = np.empty(n, dtype='float64')
    assert len(col_a) == len(col_b) == n
    for i in range(n):
        result[i] = integrate_f_numba(col_a[i], col_b[i], col_N[i])
    return result

def compute_numba(df):
    result = apply_integrate_f_numba(df['a'].to_numpy(),
                                     df['b'].to_numpy(),
                                     df['N'].to_numpy())
    return pd.Series(result, index=df.index, name='result')

 %timeit compute_numba(df)

6.44 ms ± 440 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

我们看到,使用numba,需要做的代码改动较小,效率提升幅度却很大!

3.4 进阶 并行化处理

并行化读取数据

在基础篇讲分块读取时,简单提了一下并行化处理,这里详细说下代码。

第一种思路,分块读取,多进程处理。

import pandas as pd
from multiprocessing import Pool
 
def process(df):
    """
  数据处理
    """
    pass
 
# initialise the iterator object
iterator = pd.read_csv('train.csv', chunksize=200000, compression='gzip',
skipinitialspace=True, encoding='utf-8')
# depends on how many cores you want to utilise
max_processors = 4# Reserve 4 cores for our script
pool = Pool(processes=max_processors)
f_list = []
for df in iterator:
    # 异步处理每个分块
    f = pool.apply_async(process, [df])
    f_list.append(f)
    if len(f_list) >= max_processors:
        for f in f_list:
            f.get()
            del f_list[:]

第二种思路,把大文件拆分成多份,多进程读取。

利用linux中的split命令,将csv切分成p个文件。

!split -l 200000 -d train.csv train_split
#将文件train.csv按每200000行分割,前缀名为train_split,并设置文件命名为数字

代码部分

from multiprocessing import Pool
import pandas as pd
import os
 
def read_func(file_path):
    df = pd.read_csv(file_path, header=None)
    return df
 
def read_file():
    file_list=["train_split%02d"%i for i in range(66)]
    p = Pool(4)
    res = p.map(read_func, file_list)
    p.close()
    p.join()
    df = pd.concat(res, axis=0, ignore_index=True)
    return df
 
df = read_file()

并行化apply

apply的func如果在用了我们之前说的技术优化了速度之后仍然很慢,或者func遇到网络阻塞,那么我们需要去并行化执行apply。这里提供一种处理思路:

import multiprocessing as mp
import time
def slow_func(s):
    time.sleep(1)
    return "done"with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(slow_func, df['qid'])

3.5 进阶 第三方pandas库

由于padans的操作如apply,都是单线程的,直接调用效率不高。我可以使用第三方库进行并行操作。
当然第三方库会带来新的代码不兼容问题。我们有时候会考虑像上一章一样,手写并行化处理。这个权衡需要我们在编程之初就要规划好,避免后期因为bug需要重构。

dask库

pip install dask

类pandas库,可以并行读取、运行。

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import getand the syntax isdata = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def some_function(x,y,z):
    return x+y+z

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1))
.compute(get=get)  

swifter

pip install swifter

pandas的插件,可以直接在pandas上操作:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

Modin库

Modin后端使用dask或者ray,是个支持分布式运行的类pandas库,当然功能异常强大。具体请看官网,这里就不具体介绍了。

https://modin.readthedocs.io/en/latest/using_modin.html

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Selenium 实战教程-爬取人民网留言板单进程、多线程、多进程版

本文讲解了如何通过selenium爬取人民网留言板留言,并从单进程、多线程、多进程版角度进行爬虫性能优化,是一篇优秀的 selenium 实战教程。

第一节:单进程版+selenium实战教程

一、项目概述

1.项目说明

本项目主要是对领导留言板内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。网站链接是http://liuyan.people.com.cn/home?p=0,任意选择一条留言点击进入详情页后,如下对于图中标出的数据,均要进行爬取,以此构成一条留言的组成部分。

2.环境配置

(1)Python:3.x
(2)所需库:

  • dateutil
    • 安装方法:
    • pip install python-dateutil
  • selenium
    • 安装方法:
    • pip install selenium

(3)模拟驱动:chromedriver,可点击https://download.csdn.net/download/CUFEECR/12193208进行下载Google浏览器80.0.3987.16版对应版本,或点击http://chromedriver.storage.googleapis.com/index.html下载与Google对应版本,并放入Python对应安装路径下的Scripts目录下。

二、项目实施

1.导入所需要的库

import csv
import os
import random
import re
import time

import dateutil.parser as dparser
from random import choice
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options

主要导入在爬取过程中需要用到的处理库和selenium中要用到的类。

2.全局变量和参数配置

## 时间节点
start_date = dparser.parse('2019-06-01')
## 浏览器设置选项
chrome_options = Options()
chrome_options.add_argument('blink-settings=imagesEnabled=false')

我们假设只爬取2019.6.1以后的留言,因为这之前的留言自动给好评,没有参考价值,因此设置时间节点,并禁止网页加载图片,减少对网络的带宽要求、提升加载速率。

3.产生随机时间和用户代理

def get_time():
    '''获取随机时间'''
    return round(random.uniform(36), 1)


def get_user_agent():
    '''获取随机用户代理'''
    user_agents = [
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
        "Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
        "Mozilla/5.0 (iPod; U; CPU iPhone OS 2_1 like Mac OS X; ja-jp) AppleWebKit/525.18.1 (KHTML, like Gecko) Version/3.1.1 Mobile/5F137 Safari/525.20",
        "Mozilla/5.0 (Linux;u;Android 4.2.2;zh-cn;) AppleWebKit/534.46 (KHTML,like Gecko) Version/5.1 Mobile Safari/10600.6.3 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
        "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    ]
    ## 在user_agent列表中随机产生一个代理,作为模拟的浏览器
    user_agent = choice(user_agents)
    return user_agent

产生随机时间并随机模拟浏览器用于访问网页,降低被服务器识别出是爬虫而被禁的可能。

4.获取领导的fid

def get_fid():
    '''获取所有领导id'''
    with open('url_fid.txt''r'as f:
        content = f.read()
        fids = content.split()
    return fids

每个领导都有一个fid用于区分,这里采用手动获取fid并保存到txt中,在开始爬取时再逐行读取。

5.获取领导所有留言链接

def get_detail_urls(position, list_url):
    '''获取每个领导的所有留言链接'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    drivertemp = webdriver.Chrome(options=chrome_options)
    drivertemp.maximize_window()
    drivertemp.get(list_url)
    time.sleep(2)
    ## 循环加载页面
    while True:
        datestr = WebDriverWait(drivertemp, 10).until(
            lambda driver: driver.find_element_by_xpath(
                '//*[@id="list_content"]/li[position()=last()]/h3/span')).text.strip()
        datestr = re.search(r'\d{4}-\d{2}-\d{2}', datestr).group()
        date = dparser.parse(datestr, fuzzy=True)
        print('正在爬取链接 --', position, '--', date)
        if date < start_date:
            break
        ## 模拟点击加载
        try:
            WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.ID, "show_more")))
            drivertemp.execute_script('window.scrollTo(document.body.scrollHeight, document.body.scrollHeight - 600)')
            time.sleep(get_time())
            drivertemp.execute_script('window.scrollTo(document.body.scrollHeight - 600, document.body.scrollHeight)')
            WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.XPATH, '//*[@id="show_more"]')))
            drivertemp.find_element_by_xpath('//*[@id="show_more"]').click()
        except:
            break
        time.sleep(get_time() - 1)
    detail_elements = drivertemp.find_elements_by_xpath('//*[@id="list_content"]/li/h2/b/a')
    ## 获取所有链接
    for element in detail_elements:
        detail_url = element.get_attribute('href')
        yield detail_url
    drivertemp.quit()

根据第4步提供的fid找到一个领导对应的所有留言的链接,由于领导的留言列表并未一次显示完,下方有一个加载更多按钮,如下每次需要进行点击向下加载,所以要模拟点击的操作,向下滑动,等完全加载后再次点击,直到底部。函数返回值时,不是一次返回一个列表,而是通过yield关键字生成生成器,按照程序执行的进度生成url,可以减少内存的压力。

6.获取留言详情

def get_message_detail(driver, detail_url, writer, position):
    '''获取留言详情'''
    print('正在爬取留言 --', position, '--', detail_url)
    driver.get(detail_url)
    ## 判断,如果没有评论则跳过
    try:
        satis_degree = WebDriverWait(driver, 2.5).until(
            lambda driver: driver.find_element_by_class_name("sec-score_firstspan")).text.strip()
    except:
        return
    ## 获取留言各部分内容
    message_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/h3/span")).text
    message_date = re.search(r'\d{4}-\d{2}-\d{2}', message_date_temp).group()
    message_datetime = dparser.parse(message_date, fuzzy=True)
    if message_datetime < start_date:
        return
    message_title = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_class_name("context-title-text")).text.strip()
    label_elements = WebDriverWait(driver, 2.5).until(lambda driver: driver.find_elements_by_class_name("domainType"))
    try:
        label1 = label_elements[0].text.strip()
        label2 = label_elements[1].text.strip()
    except:
        label1 = ''
        label2 = label_elements[0].text.strip()
    message_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/p")).text.strip()
    replier = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[1]/i")).text.strip()
    reply_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/p")).text.strip()
    reply_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[2]/em")).text
    reply_date = re.search(r'\d{4}-\d{2}-\d{2}', reply_date_temp).group()
    review_scores = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_elements_by_xpath("/html/body/div[8]/ul/li[2]/h4[1]/span/span/span"))
    resolve_degree = review_scores[0].text.strip()[:-1]
    handle_atti = review_scores[1].text.strip()[:-1]
    handle_speed = review_scores[2].text.strip()[:-1]
    review_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/p")).text.strip()
    is_auto_review = '是' if (('自动默认好评' in review_content) or ('默认评价' in review_content)) else '否'
    review_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/h4[2]/em")).text
    review_date = re.search(r'\d{4}-\d{2}-\d{2}', review_date_temp).group()
    ## 存入CSV文件
    writer.writerow(
        [position, message_title, label1, label2, message_date, message_content, replier, reply_content, reply_date,
         satis_degree, resolve_degree, handle_atti, handle_speed, is_auto_review, review_content, review_date])

我们只需要有评论的留言,因此在最开始要过滤掉没有评论的留言。然后通过xpath、class_name等方式定位到相应的元素获取留言的各个部分的内容,每条留言共保存14个内容,并保存到csv中。

7.获取并保存领导所有留言

def get_officer_messages(index, fid):
    '''获取并保存领导的所有留言'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    driver = webdriver.Chrome(options=chrome_options)
    list_url = "http://liuyan.people.com.cn/threads/list?fid={}##state=4".format(fid)
    driver.get(list_url)
    position = WebDriverWait(driver, 10).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[4]/i")).text
    ## time.sleep(get_time())
    print(index, '-- 正在爬取 --', position)
    start_time = time.time()
    ## encoding='gb18030'
    csv_name = position + '.csv'
    ## 文件存在则删除重新创建
    if os.path.exists(csv_name):
        os.remove(csv_name)
    with open(csv_name, 'a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for detail_url in get_detail_urls(position, list_url):
            get_message_detail(driver, detail_url, writer, position)
            time.sleep(get_time())
    end_time = time.time()
    crawl_time = int(end_time - start_time)
    crawl_minute = crawl_time // 60
    crawl_second = crawl_time % 60
    print(position, '已爬取结束!!!')
    print('该领导用时:{}分钟{}秒。'.format(crawl_minute, crawl_second))
    driver.quit()
    time.sleep(5)

获取该领导的职位信息并为该领导创建一个独立的csv用于保存提取到的留言信息,调用get_message_detail()方法获取每条留言的具体信息并保存,计算出每个领导的执行时间。

8.合并文件

def merge_csv():
    '''将所有文件合并'''
    file_list = os.listdir('.')
    csv_list = []
    for file in file_list:
        if file.endswith('.csv'):
            csv_list.append(file)
    ## 文件存在则删除重新创建
    if os.path.exists('DATA.csv'):
        os.remove('DATA.csv')
    with open('DATA.csv''a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for csv_file in csv_list:
            with open(csv_file, 'r', encoding='gb18030'as csv_f:
                reader = csv.reader(csv_f)
                line_count = 0
                for line in reader:
                    line_count += 1
                    if line_count != 1:
                        writer.writerow(
                            (line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7], line[8],
                             line[9], line[10], line[11], line[12], line[13], line[14], line[15]))

将爬取的所有领导的数据进行合并。

9.主函数调用

def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    for index, fid in enumerate(fids):
        try:
            get_officer_messages(index + 1, fid)
        except:
            get_officer_messages(index + 1, fid)
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()

主函数中先获取领导所有留言,再合并所有数据文件,完成整个爬取过程,并统计整个程序的运行时间,便于分析运行效率。

三、结果、分析及说明

1.结果说明

完整代码和测试执行结果可点击https://download.csdn.net/download/CUFEECR/12198734下载,仅供交流学习,请勿滥用。整个执行过程较长,因为是单线程的,必须要等一个领导数据爬取完毕之后才能爬取下一个,我选择了10个领导进行测试,在云服务器中的运行结果分别如下显然,整个运行时间将近5小时,效率相对较低,有很大的提升空间。最终得到了合并的DATA.csv

2.改进分析

(1)该版本的代码未实现自动爬取所有的fid,需要手动保存,是其中一点不足,可以在后期改进。(2)爬取留言详情页也是采用的selenium模拟,会降低请求效率,可以考虑用requests库请求。(3)该版本是单进程(线程)的,必须要一个领导爬取完之后才能进行下一个领导的爬取,效率较低,特别是留言较多的领导耗时很长,可以考虑使用多进程或多线程进行优化。

第二节:多线程版+selenium实战教程

一、项目概述

本项目主要是对领导留言板http://liuyan.people.com.cn/home?p=0内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。具体项目说明和环境配置可参考本系列的第一篇Python 爬取留言板留言(一):单进程版+selenium模拟。本篇在第一篇的基础上做了一些改进

  1. 采用了多线程,设定同时运行的线程的数量为3,线程数量适中,这样在保证在同一时刻有多个线程在执行爬取的同时,也能避免线程过多对内存、CPU和网络带宽的高要求,从而大大降低了整体运行时间,这是该项目的主要改进。
  2. 对异常处理进行了优化,之前异常处理是放在获取一个领导对应的所有的留言链接函数里的,当获取不到加载更多按钮并且超时时就会抛出异常,这样使得如果异常发生在其他部分如获取留言详情时会被忽略,改进之后将其放入主函数,对于每一个领导都放入异常处理,这里涵盖了对该领导爬取时的所有操作,只要在任一环节报错都会捕捉到,同时增加了5层嵌套异常处理,增加了对出现异常的容忍度(在发生网络环境不好而加载不出页面、内存消耗较多而卡顿、被被爬取方反爬而不能爬取等情况时,可以对官员重新爬取以保证数据的完整程度和程序的健壮性)。

二、项目实施

由于在实现过程中有3种常用的方法实现多线程,因此对应也有3种不同的具体实现,这里选第1种进行说明:

1.导入所需要的库

import csv
import os
import random
import re
import time
import threading

import dateutil.parser as dparser
from random import choice
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options

主要导入在爬取过程中需要用到的处理库和selenium中要用到的类。

2.全局变量和参数配置

## 时间节点
start_date = dparser.parse('2019-06-01')
## 控制同时运行的线程数为3
sem = threading.Semaphore(3)
## 浏览器设置选项
chrome_options = Options()
chrome_options.add_argument('blink-settings=imagesEnabled=false')

我们假设只爬取2019.6.1以后的留言,因为这之前的留言自动给好评,没有参考价值,因此设置时间节点,同时在全局中设置同时运行的线程数为3,并禁止网页加载图片,减少对网络的带宽要求、提升加载速率。

3.产生随机时间和用户代理

def get_time():
    '''获取随机时间'''
    return round(random.uniform(36), 1)


def get_user_agent():
    '''获取随机用户代理'''
    user_agents = [
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
        "Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
        "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 2.0.50727; Media Center PC 6.0)",
        "Mozilla/5.0 (Linux; Android 5.1.1; Nexus 6 Build/LYZ28E) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.23 Mobile Safari/537.36",
        "Mozilla/5.0 (iPod; U; CPU iPhone OS 2_1 like Mac OS X; ja-jp) AppleWebKit/525.18.1 (KHTML, like Gecko) Version/3.1.1 Mobile/5F137 Safari/525.20",
        "Mozilla/5.0 (Linux;u;Android 4.2.2;zh-cn;) AppleWebKit/534.46 (KHTML,like Gecko) Version/5.1 Mobile Safari/10600.6.3 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
        "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    ]
    ## 在user_agent列表中随机产生一个代理,作为模拟的浏览器
    user_agent = choice(user_agents)
    return user_agent

产生随机时间并随机模拟浏览器用于访问网页,降低被服务器识别出是爬虫而被禁的可能。

4.获取领导的fid

def get_fid():
    '''获取所有领导id'''
    with open('url_fid.txt''r'as f:
        content = f.read()
        fids = content.split()
    return fids

每个领导都有一个fid用于区分,这里采用手动获取fid并保存到txt中,在开始爬取时再逐行读取。

5.获取领导所有留言链接

def get_detail_urls(position, list_url):
    '''获取每个领导的所有留言链接'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    drivertemp = webdriver.Chrome(options=chrome_options)
    drivertemp.maximize_window()
    drivertemp.get(list_url)
    time.sleep(2)
    ## 循环加载页面
    try:
        while WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.ID, "show_more"))):
            datestr = WebDriverWait(drivertemp, 10).until(
                lambda driver: driver.find_element_by_xpath(
                    '//*[@id="list_content"]/li[position()=last()]/h3/span')).text.strip()
            datestr = re.search(r'\d{4}-\d{2}-\d{2}', datestr).group()
            date = dparser.parse(datestr, fuzzy=True)
            print('正在爬取链接 --', position, '--', date)
            if date < start_date:
                break
            ## 模拟点击加载
            drivertemp.find_element_by_xpath('//*[@id="show_more"]').click()
            time.sleep(get_time())
        detail_elements = drivertemp.find_elements_by_xpath('//*[@id="list_content"]/li/h2/b/a')
        ## 获取所有链接
        for element in detail_elements:
            detail_url = element.get_attribute('href')
            yield detail_url
        drivertemp.quit()
    except TimeoutException:
        drivertemp.quit()
        get_detail_urls(position, list_url)

根据第4步提供的fid找到一个领导对应的所有留言的链接,由于领导的留言列表并未一次显示完,下方有一个加载更多按钮,如下每次需要进行点击向下加载,所以要模拟点击的操作,向下滑动,等完全加载后再次点击,直到底部,有可能会滑倒页面最底部不再显示按钮或者由于被反爬或网络不好而未加载出来,此时定位元素会超时,增加异常处理,递归调用。函数返回值时,不是一次返回一个列表,而是通过yield关键字生成生成器,按照程序执行的进度生成url,可以减少内存的压力。

6.获取留言详情

def get_message_detail(driver, detail_url, writer, position):
    '''获取留言详情'''
    print('正在爬取留言 --', position, '--', detail_url)
    driver.get(detail_url)
    ## 判断,如果没有评论则跳过
    try:
        satis_degree = WebDriverWait(driver, 2.5).until(
            lambda driver: driver.find_element_by_class_name("sec-score_firstspan")).text.strip()
    except:
        return
    ## 获取留言各部分内容
    message_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/h3/span")).text
    message_date = re.search(r'\d{4}-\d{2}-\d{2}', message_date_temp).group()
    message_datetime = dparser.parse(message_date, fuzzy=True)
    if message_datetime < start_date:
        return
    message_title = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_class_name("context-title-text")).text.strip()
    label_elements = WebDriverWait(driver, 2.5).until(lambda driver: driver.find_elements_by_class_name("domainType"))
    try:
        label1 = label_elements[0].text.strip()
        label2 = label_elements[1].text.strip()
    except:
        label1 = ''
        label2 = label_elements[0].text.strip()
    message_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/p")).text.strip()
    replier = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[1]/i")).text.strip()
    reply_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/p")).text.strip()
    reply_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[2]/em")).text
    reply_date = re.search(r'\d{4}-\d{2}-\d{2}', reply_date_temp).group()
    review_scores = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_elements_by_xpath("/html/body/div[8]/ul/li[2]/h4[1]/span/span/span"))
    resolve_degree = review_scores[0].text.strip()[:-1]
    handle_atti = review_scores[1].text.strip()[:-1]
    handle_speed = review_scores[2].text.strip()[:-1]
    review_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/p")).text.strip()
    is_auto_review = '是' if (('自动默认好评' in review_content) or ('默认评价' in review_content)) else '否'
    review_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/h4[2]/em")).text
    review_date = re.search(r'\d{4}-\d{2}-\d{2}', review_date_temp).group()
    ## 存入CSV文件
    writer.writerow(
        [position, message_title, label1, label2, message_date, message_content, replier, reply_content, reply_date,
         satis_degree, resolve_degree, handle_atti, handle_speed, is_auto_review, review_content, review_date])

我们只需要有评论的留言,因此在最开始要过滤掉没有评论的留言。然后通过xpath、class_name等方式定位到相应的元素获取留言的各个部分的内容,每条留言共保存14个属性,并保存到csv中。

7.获取并保存领导所有留言

user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    driver = webdriver.Chrome(options=chrome_options)
    list_url = "http://liuyan.people.com.cn/threads/list?fid={}##state=4".format(fid)
    driver.get(list_url)
    try:
        position = WebDriverWait(driver, 10).until(
            lambda driver: driver.find_element_by_xpath("/html/body/div[4]/i")).text
        print(index, '-- 正在爬取 --', position)
        start_time = time.time()
        csv_name = position + '.csv'
        ## 文件存在则删除重新创建
        if os.path.exists(csv_name):
            os.remove(csv_name)
        with open(csv_name, 'a+', newline='', encoding='gb18030'as f:
            writer = csv.writer(f, dialect="excel")
            writer.writerow(
                ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
                 '办理速度分''是否自动好评''评价内容''评价日期'])
            for detail_url in get_detail_urls(position, list_url):
                get_message_detail(driver, detail_url, writer, position)
                time.sleep(get_time())
        end_time = time.time()
        crawl_time = int(end_time - start_time)
        crawl_minute = crawl_time // 60
        crawl_second = crawl_time % 60
        print(position, '已爬取结束!!!')
        print('该领导用时:{}分钟{}秒。'.format(crawl_minute, crawl_second))
        driver.quit()
        time.sleep(5)
    except:
        driver.quit()
        get_officer_messages(index, fid)

获取该领导的职位信息并为该领导创建一个独立的csv用于保存提取到的留言信息,增加异常处理递归调用,调用get_message_detail()方法获取每条留言的具体信息并保存,计算出每个领导的执行时间。

8.合并文件

def merge_csv():
    '''将所有文件合并'''
    file_list = os.listdir('.')
    csv_list = []
    for file in file_list:
        if file.endswith('.csv'):
            csv_list.append(file)
    ## 文件存在则删除重新创建
    if os.path.exists('DATA.csv'):
        os.remove('DATA.csv')
    with open('DATA.csv''a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for csv_file in csv_list:
            with open(csv_file, 'r', encoding='gb18030'as csv_f:
                reader = csv.reader(csv_f)
                line_count = 0
                for line in reader:
                    line_count += 1
                    if line_count != 1:
                        writer.writerow(
                            (line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7], line[8],
                             line[9], line[10], line[11], line[12], line[13], line[14], line[15]))

将爬取的所有领导的数据进行合并。

9.主函数调用

多线程的实现主要在这部分,有3种方式实现:

  • 这通过threading.Semaphore()指定线程数量,后边在实现作为线程参数的函数时使用上下文处理器
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    thread_list = []
    ## 将所有线程加入线程列表,便于控制同时执行的线程数量
    for index, fid in enumerate(fids):
        t = threading.Thread(target=get_officer_messages, args=(index + 1, fid))
        thread_list.append([t, fid])
    for thread, fid in thread_list:
        ## 5层嵌套进行异常处理
        try:
            thread.start()
        except:
            try:
                thread.start()
            except:
                try:
                    thread.start()
                except:
                    try:
                        thread.start()
                    except:
                        try:
                            thread.start()
                        except:
                            ## 如果仍出现异常加入失败名单
                            print('该官员爬取失败,已存入失败名单,以备再爬')
                            if not os.path.exists('fid_not_success.txt'):
                                with open('fid_not_success.txt''a+'as f:
                                    f.write(fid)
                            else:
                                with open('fid_not_success.txt''a+'as f:
                                    f.write('\n' + fid)
                            continue
    for thread, fid in thread_list:
        thread.join()
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()
  • 通过concurrent.futures.ThreadPoolExecutor指定线程数量,并调用submit()函数实现线程的调用执行
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    with ThreadPoolExecutor(3as executor:
        for index, fid in enumerate(fids):
            ## 5层嵌套进行异常处理
            try:
                executor.submit(get_officer_messages, index + 1, fid)
            except:
                try:
                    executor.submit(get_officer_messages, index + 1, fid)
                except:
                    try:
                        executor.submit(get_officer_messages, index + 1, fid)
                    except:
                        try:
                            executor.submit(get_officer_messages, index + 1, fid)
                        except:
                            try:
                                executor.submit(get_officer_messages, index + 1, fid)
                            except:
                                ## 如果仍出现异常加入失败名单
                                print('该官员爬取失败,已存入失败名单,以备再爬')
                                if not os.path.exists('fid_not_success.txt'):
                                    with open('fid_not_success.txt''a+'as f:
                                        f.write(fid)
                                else:
                                    with open('fid_not_success.txt''a+'as f:
                                        f.write('\n' + fid)
                                continue
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()
  • 通过concurrent.futures.ThreadPoolExecutor指定线程数量,并调用map()函数实现函数和多个参数的映射来执行线程

def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    with ThreadPoolExecutor(3as executor:
        executor.map(get_officer_messages, range(1, len(fids) + 1), fids)
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()

主函数中先通过多线程获取领导所有留言,再合并所有数据文件,完成整个爬取过程,并统计整个程序的运行时间,便于分析运行效率。

三、结果、分析及说明

1.结果说明

3个完整代码和测试执行结果可点击https://download.csdn.net/download/CUFEECR/12199138下载,欢迎进行测试和交流学习,请勿滥用。整个执行过程相比于单线程大大缩短了时间,我选择了10个领导进行测试,它们的留言数量有差异,以便于发现多线程的优势,在云服务器中的运行结果分别如下运行时间缩短到不到1小时半左右,约等于第一篇单线程的三分之一,因为同一时刻有3个子线程执行,大大降低了运行时间,效率比之前提高很多,加入多线程之后,可以让运行时间较长和较短的相互补充,同时多个线程同时运行,在同一时刻爬取多个领导,很显然大大缩短了运行时间。最后得到了合并的DATA.csv:可以进一步总结多线程的优势 :

  • 易于调度
  • 提高并发性:通过线程可方便有效地实现并发性。进程可创建多个线程来执行同一程序的不同部分。
  • 开销少:创建线程比创建进程要快,所需开销很少。
  • 利于充分发挥多处理器的功能:通过创建多线程进程,每个线程在一个处理器上运行,从而实现应用程序的并发性,使每个处理器都得到充分运行。

2.改进分析

(1)该版本的代码仍未实现自动爬取所有的fid,需要手动保存,是其中一点不足,可以在后期改进。(2)爬取留言详情页仍然采用的selenium模拟,会降低请求效率,可以考虑用requests库请求。(3)该版本对于反爬的措施较弱,因此很多时候会出现异常,比如得到的页面不正常找不到对应的元素,请求时间延长等,可以在之后的版本加入进一步的防反爬措施,进一步增加代码的健壮性。

第三节:多进程版+selenium实战教程

一、项目概述

本项目主要是对领导留言板http://liuyan.people.com.cn/home?p=0内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。具体项目说明和环境配置可参考本系列的第一篇Python 爬取留言板留言(一):单进程版+selenium模拟
本篇在第二篇的基础上做了一个主要改进:
从多线程改变为多进程,设定同时运行的进程的数量为3,数量适中,这样在保证在同一时刻有多个进程在执行爬取的同时,也能避免进程过多对内存、CPU和网络带宽的高要求,从而大大降低了整体运行时间,这是该项目的主要改进。

二、项目实施

由于在实现过程中有2种常用的方法实现多线程,因此对应也有2种不同的具体实现。

1.导入所需要的库

import csv
import os
import random
import re
import time


import dateutil.parser as dparser
from random import choice
from multiprocessing import Pool
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options

主要导入在爬取过程中需要用到的处理库和selenium中要用到的类。

2.全局变量和参数配置

## 时间节点
start_date = dparser.parse('2019-06-01')
## 浏览器设置选项
chrome_options = Options()
chrome_options.add_argument('blink-settings=imagesEnabled=false')

我们假设只爬取2019.6.1以后的留言,因为这之前的留言自动给好评,没有参考价值,因此设置时间节点,并禁止网页加载图片,减少对网络的带宽要求、提升加载速率。

3.产生随机时间和用户代理

def get_time():
    '''获取随机时间'''
    return round(random.uniform(36), 1)


def get_user_agent():
    '''获取随机用户代理'''
    user_agents = [
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1",
        "Mozilla/5.0 (Linux; Android 5.1.1; Nexus 6 Build/LYZ28E) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.23 Mobile Safari/537.36",
        "Mozilla/5.0 (iPod; U; CPU iPhone OS 2_1 like Mac OS X; ja-jp) AppleWebKit/525.18.1 (KHTML, like Gecko) Version/3.1.1 Mobile/5F137 Safari/525.20",
        "Mozilla/5.0 (Linux;u;Android 4.2.2;zh-cn;) AppleWebKit/534.46 (KHTML,like Gecko) Version/5.1 Mobile Safari/10600.6.3 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
        "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    ]
    ## 在user_agent列表中随机产生一个代理,作为模拟的浏览器
    user_agent = choice(user_agents)
    return user_agent

产生随机时间并随机模拟浏览器用于访问网页,降低被服务器识别出是爬虫而被禁的可能。

4.获取领导的fid

def get_fid():
    '''获取所有领导id'''
    with open('url_fid.txt''r'as f:
        content = f.read()
        fids = content.split()
    return fids

每个领导都有一个fid用于区分,这里采用手动获取fid并保存到txt中,在开始爬取时再逐行读取。

5.获取领导所有留言链接

def get_detail_urls(position, list_url):
    '''获取每个领导的所有留言链接'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    drivertemp = webdriver.Chrome(options=chrome_options)
    drivertemp.maximize_window()
    drivertemp.get(list_url)
    time.sleep(2)
    ## 循环加载页面
    try:
        while WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.ID, "show_more"))):
            datestr = WebDriverWait(drivertemp, 10).until(
                lambda driver: driver.find_element_by_xpath(
                    '//*[@id="list_content"]/li[position()=last()]/h3/span')).text.strip()
            datestr = re.search(r'\d{4}-\d{2}-\d{2}', datestr).group()
            date = dparser.parse(datestr, fuzzy=True)
            print('正在爬取链接 --', position, '--', date)
            if date < start_date:
                break
            ## 模拟点击加载
            drivertemp.find_element_by_xpath('//*[@id="show_more"]').click()
            time.sleep(get_time())
        detail_elements = drivertemp.find_elements_by_xpath('//*[@id="list_content"]/li/h2/b/a')
        ## 获取所有链接
        for element in detail_elements:
            detail_url = element.get_attribute('href')
            yield detail_url
        drivertemp.quit()
    except TimeoutException:
        drivertemp.quit()
        get_detail_urls(position, list_url)

根据第4步提供的fid找到一个领导对应的所有留言的链接,由于领导的留言列表并未一次显示完,下方有一个加载更多按钮,如下每次需要进行点击向下加载,所以要模拟点击的操作,向下滑动,等完全加载后再次点击,直到底部,有可能会滑倒页面最底部不再显示按钮或者由于被反爬或网络不好而未加载出来,此时定位元素会超时,增加异常处理,递归调用。
函数返回值时,不是一次返回一个列表,而是通过yield关键字生成生成器,按照程序执行的进度生成url,可以减少内存的压力。

6.获取留言详情

def get_message_detail(driver, detail_url, writer, position):
    '''获取留言详情'''
    print('正在爬取留言 --', position, '--', detail_url)
    driver.get(detail_url)
    ## 判断,如果没有评论则跳过
    try:
        satis_degree = WebDriverWait(driver, 2.5).until(
            lambda driver: driver.find_element_by_class_name("sec-score_firstspan")).text.strip()
    except:
        return
    ## 获取留言各部分内容
    message_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/h3/span")).text
    message_date = re.search(r'\d{4}-\d{2}-\d{2}', message_date_temp).group()
    message_datetime = dparser.parse(message_date, fuzzy=True)
    if message_datetime < start_date:
        return
    message_title = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_class_name("context-title-text")).text.strip()
    label_elements = WebDriverWait(driver, 2.5).until(lambda driver: driver.find_elements_by_class_name("domainType"))
    try:
        label1 = label_elements[0].text.strip()
        label2 = label_elements[1].text.strip()
    except:
        label1 = ''
        label2 = label_elements[0].text.strip()
    message_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/p")).text.strip()
    replier = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[1]/i")).text.strip()
    reply_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/p")).text.strip()
    reply_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[2]/em")).text
    reply_date = re.search(r'\d{4}-\d{2}-\d{2}', reply_date_temp).group()
    review_scores = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_elements_by_xpath("/html/body/div[8]/ul/li[2]/h4[1]/span/span/span"))
    resolve_degree = review_scores[0].text.strip()[:-1]
    handle_atti = review_scores[1].text.strip()[:-1]
    handle_speed = review_scores[2].text.strip()[:-1]
    review_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/p")).text.strip()
    is_auto_review = '是' if (('自动默认好评' in review_content) or ('默认评价' in review_content)) else '否'
    review_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/h4[2]/em")).text
    review_date = re.search(r'\d{4}-\d{2}-\d{2}', review_date_temp).group()
    ## 存入CSV文件
    writer.writerow(
        [position, message_title, label1, label2, message_date, message_content, replier, reply_content, reply_date,
         satis_degree, resolve_degree, handle_atti, handle_speed, is_auto_review, review_content, review_date])

我们只需要有评论的留言,因此在最开始要过滤掉没有评论的留言。然后通过xpath、class_name等方式定位到相应的元素获取留言的各个部分的内容,每条留言共保存14个属性,并保存到csv中。

7.获取并保存领导所有留言

def get_officer_messages(index, fid):
    '''获取并保存领导的所有留言'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    driver = webdriver.Chrome(options=chrome_options)
    list_url = "http://liuyan.people.com.cn/threads/list?fid={}##state=4".format(fid)
    driver.get(list_url)
    try:
        position = WebDriverWait(driver, 10).until(
            lambda driver: driver.find_element_by_xpath("/html/body/div[4]/i")).text
        print(index, '-- 正在爬取 --', position)
        start_time = time.time()
        csv_name = position + '.csv'
        ## 文件存在则删除重新创建
        if os.path.exists(csv_name):
            os.remove(csv_name)
        with open(csv_name, 'a+', newline='', encoding='gb18030'as f:
            writer = csv.writer(f, dialect="excel")
            writer.writerow(
                ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
                 '办理速度分''是否自动好评''评价内容''评价日期'])
            for detail_url in get_detail_urls(position, list_url):
                get_message_detail(driver, detail_url, writer, position)
                time.sleep(get_time())
        end_time = time.time()
        crawl_time = int(end_time - start_time)
        crawl_minute = crawl_time // 60
        crawl_second = crawl_time % 60
        print(position, '已爬取结束!!!')
        print('该领导用时:{}分钟{}秒。'.format(crawl_minute, crawl_second))
        driver.quit()
        time.sleep(5)
    except:
        driver.quit()
        get_officer_messages(index, fid)

获取该领导的职位信息并为该领导创建一个独立的csv用于保存提取到的留言信息,增加异常处理递归调用,调用get_message_detail()方法获取每条留言的具体信息并保存,计算出每个领导的执行时间。

8.合并文件

def merge_csv():
    '''将所有文件合并'''
    file_list = os.listdir('.')
    csv_list = []
    for file in file_list:
        if file.endswith('.csv'):
            csv_list.append(file)
    ## 文件存在则删除重新创建
    if os.path.exists('DATA.csv'):
        os.remove('DATA.csv')
    with open('DATA.csv''a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for csv_file in csv_list:
            with open(csv_file, 'r', encoding='gb18030'as csv_f:
                reader = csv.reader(csv_f)
                line_count = 0
                for line in reader:
                    line_count += 1
                    if line_count != 1:
                        writer.writerow(
                            (line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7], line[8],
                             line[9], line[10], line[11], line[12], line[13], line[14], line[15]))

将爬取的所有领导的数据进行合并。

9.主函数调用

多线程的实现主要在这部分,有2种方式实现:

  • 通过for循环遍历所有任务和参数,并调用apply_async()函数将任务加入进程池
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    ## 创建进程池
    pool = Pool(3)
    ## 将任务加入进程池并传入参数
    for index, fid in zip(range(1, len(fids) + 1), fids):
        pool.apply_async(get_officer_messages, (index, fid))
    pool.close()
    pool.join()
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()
  • 通过调用map()函数将任务加入进程池并实现函数与参数的映射
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    ## 处理传入的参数,使之对应索引合并并且可迭代
    itera_merge = list(zip(range(1, len(fids) + 1), fids))
    ## 创建进程池
    pool = Pool(3)
    ## 将任务传入进程池并通过映射传入参数
    pool.map(get_officer_messages_enc, itera_merge)
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()

主函数中先通过多进程获取领导所有留言,再合并所有数据文件,完成整个爬取过程,并统计整个程序的运行时间,便于分析运行效率。

三、结果、分析及说明

1.结果说明

2个完整代码和测试执行结果可点击https://download.csdn.net/download/CUFEECR/12200752下载,欢迎进行测试和交流学习,请勿滥用
整个执行过程相比于单线程大大缩短了时间,我选择了10个领导进行测试,它们的留言数量有差异,以便于发现多线程的优势,在云服务器中的运行结果分别如下运行时间缩短到不到100分钟,与单进程相比大大缩短了时间、提高了效率,因为同一时刻有3个子进程执行。加入多进程之后,可以让运行时间较长和较短的相互补充,在任意时刻多个进程同时运行。但是也可以看出来与多线程相比,多进程的运行时间相对稍长,虽然差别不大,但是这可能就是性能的瓶颈。可能的原因是进程需要的资源更多,这对内存、CPU和网络的要求更高,从而对设备提出了更高的要求,有时可能设备性能跟不上程序要求,从而降低了效率。
最后得到了合并的DATA.csv:对于多线程和多进程的简单对比分析如下:
一个线程至少有一个进程,一个进程至少有一个线程,线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高,进程在执行过程中拥有独立的存储单元,而多个线程共享存储器,从而极大的提高了程序的运行效率。线程不能够独立执行,必须依存在进程中。

多线程:

  • 线程执行开销小(占用的资源非常少)但是不利于资源的管理和保护;
  • 如果需要共享数据,建议使用线程;
  • 适用于IO密集型任务(Web和文档读写等),遇到IO阻塞,速度远远小于CPU的运行速度,可以多开辟一些线程,有线程阻塞了,其他线程依然正常工作。

多进程:

  • 执行额开销比较大(占用的资源多),但是利于资源的管理和保护;
  • 适用于计算密集型(视频的译码编码和科学数据计算等)。

显然,在爬虫中应该偏向使用多线程。

2.改进分析

(1)该版本的代码仍未实现自动爬取所有的fid,需要手动保存,是其中一点不足,可以在后期改进。
(2)爬取留言详情页仍然采用的selenium模拟,会降低请求效率,可以考虑用requests库请求。
(3)该版本对于反爬的措施较弱,因此很多时候会出现异常,比如得到的页面不正常找不到对应的元素,请求时间延长等,可以在之后的版本加入进一步的防反爬措施,进一步增加代码的健壮性。

3.合法性说明

  • 本项目是为了学习和科研的目的,所有读者可以参考执行思路和程序代码,但不能用于恶意和非法目的(恶意攻击网站服务器、非法盈利等),如有违者请自行负责。
  • 本项目所获取的数据都是在进一步的分析之后用于对电子政务的实施改进,对政府的决策能起到一定的参考作用,并非于恶意抓取数据来攫取不正当竞争的优势,也未用于商业目的牟取不法利益,运行代码只是用几个fid进行测试,并非大范围地爬取,同时严格控制爬取的速率、争取不对服务器造成压力,如侵犯当事者(即被抓取的网络主体)的利益,请联系更改或删除。
  • 本项目是留言板爬取系列的第二篇,后期会继续更新,欢迎读者交流,以期不断改进。

作者:Corley

源自:快学python

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Prometheus 实战教程 + Grafana + Python — 实时监控东方财富人气榜股票

上次我们讲过普罗米修斯(prometheus)这个接近完美的监控系统,有很多读者不了解它到底要如何搭建、应用,需要一篇 Prometheus 实战教程。今天我们就结合普罗米修斯、Grafana和Python采集脚本,写一个小小的东方财富人气榜 TOP100监控系统。

跟着本文的教程耐心往下走,你可能只需要花30分钟便可完成环境的搭建,非常舒服,下面先介绍基本概念。

普罗米修斯(prometheus)上次我们已经使用一整篇文章介绍过了,它是一个开源监控报警系统和时序列数据库。如果你没有阅读过这篇文章,请花五分钟读一下:

Grafana 是一个开源的数据可视化网络应用程序平台。用户配置连接的数据源之后,Grafana可以在网络浏览器里显示数据图表和警告。

比如说我基于 普罗米修斯(prometheus) + node_exporter 监控主机性能指标,然后由Grafana构建主机实时监控仪表盘,它是长这样的:

至于东方财富人气榜,指的是这个:

它能将市场目前最活跃的一些股票提取出来,可供我们作为投资的一种参考。

而我们今天要做的,就是自己搭建一套监控系统,实时监控某只股票在TOP100上的排名变化。

1.Prometheus 安装教程

创建 Prometheus 安装目录并添加 promethus 用户:

PROM_PATH='/data/prometheus'
mkdir -p ${PROM_PATH}
mkdir -p ${PROM_PATH}/{data,conf,logs,bin}
useradd prometheus
cd /usr/local/src

下载解压 prometheus, 这里我们选用2021年5月18日更新的最新版 v2.27.1:

wget https://github.com/prometheus/prometheus/releases/download/v2.27.1/prometheus-2.27.1.linux-amd64.tar.gz
tar -xvf prometheus-2.27.1.linux-amd64.tar.gz
cd prometheus-2.27.1.linux-amd64/
cp prometheus promtool ${PROM_PATH}/bin/
cp prometheus.yml ${PROM_PATH}/conf/
chown -R prometheus.prometheus /data/prometheus

设置环境变量:

cat >> /etc/profile <<EOF
PATH=/data/prometheus/bin:$PATH:$HOME/bin
EOF

将 Promethus 配置为系统服务之一,以便使用 systemctl 命令管控服务:

cat >>/etc/systemd/system/prometheus.service <<EOF
[Unit]
Description=Prometheus
Documentation=https://prometheus.io/
After=network.target

[Service]
Type=simple
User=prometheus
ExecStart=/data/prometheus/bin/prometheus --config.file=/data/prometheus/conf/prometheus.yml --storage.tsdb.path=/data/prometheus/data --storage.tsdb.retention=90d
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

现在使用下面的systemctl命令重新加载systemd系统,并查看服务是否启动:

systemctl daemon-reload
systemctl enable prometheus
systemctl start prometheus
systemctl status prometheus

看到 running 状态说明一切正常:

记得开放9090端口,这样才可以访问 Prometheus 的 Web 端,访问 http://服务器IP:9090 查看得到 Prometheus Web界面,说明安装成功:

2.Grafana 安装教程

Grafana 我们也使用最新的 8.0.1 版本,安装方式如下:

CentOS系列系统使用以下命令安装:

cd /usr/local/src
wget https://dl.grafana.com/oss/release/grafana-8.0.1-1.x86_64.rpm
sudo yum localinstall grafana-6.5.2-1.x86_64.rpm

Ubuntu和Debian系列系统使用以下命令安装:

cd /usr/local/src
sudo apt-get install -y adduser libfontconfig1
wget https://dl.grafana.com/oss/release/grafana_8.0.1_amd64.deb
sudo dpkg -i grafana_8.0.1_amd64.deb

然后启动系统服务即可:

systemctl start grafana-server
systemctl status grafana-server

看到 running 状态说明一切正常:

记得开放3000端口,这样你才可以访问你的Grafana: http://你的服务器IP:3000 如下所示:

输入用户名,密码登录系统。用户名与密码都是”admin”,如果能打开页面则已经安装成功了。

3.初尝Grafana+Prometheus实战教程

为了初步尝试这套系统,我们可以通过简单的采集主机性能数据开始。Node_exporter是一个Prometheus推出的官方主机性能采集工具。通过它我们能很方便地输出主机性能指标到Prometheus.

3.1 下载安装Node_Exporter:

NODE_PATH='/data/prometheus/node_exporter/'
cd /usr/local/src/
mkdir -p ${NODE_PATH}
wget https://github.com/prometheus/node_exporter/releases/download/v1.1.2/node_exporter-1.1.2.linux-amd64.tar.gz
tar -xvf node_exporter-1.1.2.linux-amd64.tar.gz
cp node_exporter-1.1.2.linux-amd64/node_exporter ${NODE_PATH}
chown -R prometheus.prometheus ${NODE_PATH}

配置node_exporter为系统服务:

cat > /lib/systemd/system/node_exporter.service <<EOF
[Unit]
Description=node_exporter
Documentation=https://prometheus.io/
After=network.target
 
[Service]
Type=simple
User=prometheus
ExecStart=/data/prometheus/node_exporter/node_exporter
Restart=on-failure
 
[Install]
WantedBy=multi-user.target
EOF

现在使用systemctl命令重新加载系统命令,并查看服务是否启动:

systemctl daemon-reload
systemctl enable node_exporter
systemctl start node_exporter
systemctl status node_exporter

看到如下图的状态说明启动成功。

放行9100端口,访问http://你的服务器地址:9100/metrics 看到如下指标页面说明安装成功:

配置 prometheus.yaml (ubuntu 下为 prometheus.yml), 让 prometheus 采集 node_exporter 输出的指标数据:

vim /data/prometheus/conf/prometheus.yml

配置如下:

# my global config
global:
  scrape_interval:     15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
  alertmanagers:
  - static_configs:
    - targets:
      # - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'prometheus'

    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.

    static_configs:
    - targets: ['localhost:9090']

   # 主要是新增了node_exporter的job,如果有多个node_exporter,在targets数组后面加即可

  - job_name: 'node_exporter'
    static_configs:
      - targets: ['localhost:9100']

保存后重启prometheus:

systemctl restart prometheus

最后配置Grafana:

然后选择 Prometheus 数据源:

输入 Prometheus url 然后点击 save&test 保存:

然后导入官方仪表盘,官方提供的模板号为8919:

然后你就能看见本机非常漂亮的性能指标数据仪表盘了。

不看不知道,一看吓一跳,看来我需要升级这台机器的内存了。

4.编写采集脚本

为了能够采集东方财富人气榜前100名,我们需要用Python编写一个人气榜采集脚本,并使其像 node_exporter 一样输出指标信息:

为了达到这个目的,我们必须安装 prometheus_client 模块:

pip3 install prometheus_client

获取股票排名的代码如下:

# Python实用宝典
# 2021-06-13
# 文件名: fetch_stock.py
import time
import requests
from prometheus_client import start_http_server, CollectorRegistry, Gauge


reg = CollectorRegistry()
gauge = Gauge(
    'rank', '人气榜排名',
    ['stock_id'], registry=reg
)


def process_request():
    url = "https://emappdata.eastmoney.com/stockrank/getAllCurrentList"
    kwargs = {
        "appId": "appId01",
        "pageNo": 1,
        "pageSize": "100",
    }
    result = requests.post(url, json=kwargs).json()
    for i in result.get("data", []):
        gauge.labels(stock_id=i["sc"]).set(i["rk"])
    time.sleep(60)


if __name__ == '__main__':
    start_http_server(8000, registry=reg)
    while True:
        process_request()

这里我们只捕获人气榜前100名,并通过Prometheus客户端的start_http_server开启一个Web服务,这样你通过http服务访问8000端口的时候就能输出这些指标。

为了让其能持续输出指标数据,我们要用nohup使其成为一个常驻进程:

nohup python3 fetch_stock.py &

开放8000端口,访问 http://你的服务器IP:8000 就能查看输出的指标:

5.应用采集脚本

同配置Node_exporter一样,我们需要将自己编写好的采集脚本落入Prometheus,配置prometheus.yaml:

配置 prometheus.yaml, 让 prometheus 采集 node_exporter 输出的指标数据:

#(CentOS) vim /data/prometheus/conf/prometheus.yaml
vim /data/prometheus/conf/prometheus.yml # ubuntu

配置如下:

# my global config
global:
  scrape_interval:     15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
  alertmanagers:
  - static_configs:
    - targets:
      # - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'prometheus'

    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.

    static_configs:
    - targets: ['localhost:9090']

   # 主要是新增了node_exporter的job,如果有多个node_exporter,在targets数组后面加即可
  - job_name: 'node_exporter'
    static_configs:
      - targets: ['localhost:9100']

   # 新增我们的Python股票采集脚本
  - job_name: 'hot_list'
    static_configs:
      - targets: ['localhost:8000']

保存后重启prometheus:

systemctl restart prometheus

最后配置Grafana, 选择新建一个dashboard:

然后选择rank指标:

点击 Use query 就能获取所有股票的排名曲线:

6.配置Grafana告警

为了在某只股票达到某种排名的时候触发通知,我们需要先配置好告警渠道:

然后配置邮件告警,点击 Test, 此时 Grafana 会告诉你一个错误:

就是我们还没有配置好 SMTP 相关服务,需要配置 SMTP 相关服务才能正常发送邮件,如果你是按照本文按照Grafana的教程走下来的,那么Grafana.ini的文件位于 /etc/grafana/grafana.ini.

vim /etc/grafana/grafana.ini

然后在 smtp 部分配置你的 host、user、password、from_address、from_name,并打开 enabled 如下图所示:

然后重启 Grafana-server

systemctl restart grafana-server

再点击Test,你的邮箱里收到这样的邮件说明通知可以正常发送了:

然后我们进入正题,监控某只股票的排名变化,比如 SH600070:

然后点击 Alert 配置告警,一旦其排名高于65名则发送邮件通知:

完成后点击右上角的 save 保存即可:

然后进入 Alerting 告警中心,你会看到刚刚配置的告警规则在这里可以进行管控:

点击Pause可以暂停这个告警,Edit alert可以去更改告警条件。

一旦触发告警,这个状态便会更改,你就会收到邮件:

邮件效果如下:

邮件里的告警图片没显示出来,因为我们没有安装 “grafana image renderer”, 需要在你的服务器执行以下命令安装并重启 Grafana:

grafana-cli plugins install grafana-image-renderer
systemctl restart grafana-server

新的告警邮件便能看到图片了:

怎么样,用Prometheus+Grafana+Python采集搭建一个股票监控系统还是非常简单的吧?创新性地监控东方财富人气榜上某只股票的变化并产生告警,能让你熟悉监控策略的配置,见微知著。跟着本文的教程走,相信你会有不少收获。

如果我们延伸一下,结合量化投资系列教程的可转债交易策略 — Python 量化投资实战教程(10),是否可以构建一些更有意义的策略?答案是肯定的。

我们可以监控所有100元以下的可转债对应的股票,如果这些股票进入了人气榜TOP100或者飙升榜(本文没有采集,有兴趣的读者可以自行采集),就购入这些低价可转债,这种买入策略或许也不错。

你也可以抛弃东方财富的榜单分类,构建自己的排名环比增长买入策略,环比下跌卖出策略,我相信这会非常有意思。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

有趣好用的Python教程

退出移动版
微信支付
请使用 微信 扫码支付