分类目录归档:Python 数据分析

为什么说Python大数据处理一定要用Numpy Array?

Numpy 是Python科学计算的一个核心模块。它提供了非常高效的数组对象,以及用于处理这些数组对象的工具。一个Numpy数组由许多值组成,所有值的类型是相同的。

Python的核心库提供了 List 列表。列表是最常见的Python数据类型之一,它可以调整大小并且包含不同类型的元素,非常方便。

那么List和Numpy Array到底有什么区别?为什么我们需要在大数据处理的时候使用Numpy Array?答案是性能。

Numpy数据结构在以下方面表现更好:

1.内存大小—Numpy数据结构占用的内存更小。

2.性能—Numpy底层是用C语言实现的,比列表更快。

3.运算方法—内置优化了代数运算等方法。

下面分别讲解在大数据处理时,Numpy数组相对于List的优势。

1.Numpy Array内存占用更小

适当地使用Numpy数组替代List,你能让你的内存占用降低20倍。

对于Python原生的List列表,由于每次新增对象,都需要8个字节来引用新对象,新的对象本身占28个字节(以整数为例)。所以,列表 list 的大小可以用以下公式计算:

64 + 8 * len(lst) + len(lst) * 28 字节

list

而使用Numpy,就能减少非常多的空间占用。比如长度为n的Numpy整形Array,它需要:

96 + len(a) * 8 字节

Numpy Array

可见,数组越大,你节省的内存空间越多。假设你的数组有10亿个元素,那么这个内存占用大小的差距会是GB级别的。

2.Numpy Array速度更快、内置计算方法

运行下面这个脚本,同样是生成某个维度的两个数组并相加,你就能看到原生List和Numpy Array的性能差距。

import time
import numpy as np

size_of_vec = 1000

def pure_python_version():
    t1 = time.time()
    X = range(size_of_vec)
    Y = range(size_of_vec)
    Z = [X[i] + Y[i] for i in range(len(X)) ]
    return time.time() - t1

def numpy_version():
    t1 = time.time()
    X = np.arange(size_of_vec)
    Y = np.arange(size_of_vec)
    Z = X + Y
    return time.time() - t1


t1 = pure_python_version()
t2 = numpy_version()
print(t1, t2)
print("Numpy is in this example " + str(t1/t2) + " faster!")

结果如下:

0.00048732757568359375 0.0002491474151611328
Numpy is in this example 1.955980861244019 faster!

可以看到,Numpy比原生数组快1.95倍。

如果你细心的话,还能发现,Numpy array可以直接执行加法操作。而原生的数组是做不到这点的,这就是Numpy 运算方法的优势。

我们再做几次重复试验,以证明这个性能优势是持久性的。

import numpy as np
from timeit import Timer

size_of_vec = 1000
X_list = range(size_of_vec)
Y_list = range(size_of_vec)
X = np.arange(size_of_vec)
Y = np.arange(size_of_vec)

def pure_python_version():
    Z = [X_list[i] + Y_list[i] for i in range(len(X_list)) ]

def numpy_version():
    Z = X + Y

timer_obj1 = Timer("pure_python_version()", 
                   "from __main__ import pure_python_version")
timer_obj2 = Timer("numpy_version()", 
                   "from __main__ import numpy_version")

print(timer_obj1.timeit(10))
print(timer_obj2.timeit(10))  # Runs Faster!

print(timer_obj1.repeat(repeat=3, number=10))
print(timer_obj2.repeat(repeat=3, number=10)) # repeat to prove it!

结果如下:

0.0029753120616078377
0.00014940369874238968
[0.002683573868125677, 0.002754641231149435, 0.002803879790008068]
[6.536301225423813e-05, 2.9387418180704117e-05, 2.9171351343393326e-05]

可以看到,第二个输出的时间总是小得多,这就证明了这个性能优势是具有持久性的。

所以,如果你在做一些大数据研究,比如金融数据、股票数据的研究,使用Numpy能够节省你不少内存空间,并拥有更强大的性能。

参考文献:https://webcourses.ucf.edu/courses/1249560/pages/python-lists-vs-numpy-arrays-what-is-the-difference

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化

教你使用 Python 获取Fredapi美国重要经济指标数据

美国的金融市场(主要是股市、债市和汇市)经常受到其国内各项经济数据影响而波动。不仅如此,这些经济数据甚至会影响远在太平洋对岸的港股和A股。因此对于世界经济火车头的美国,我们要有对其经济数据有一定程度的理解。

对于股市而言,几个比较重要的经济指标为:

1. 联储局公开市场委员会会议声明

联储局公开市场委员会(FOMC)是决定美国利率走向的主宰者,而利率是美国经济未来增长的最重要因素,它的变化都会令消费支出、公司利润、政府预算及股票债券和美元的价值都受到影响。

2.消费者物价指数 Consumer Price Index (CPI)

这个指标是市场上最瞩目的经济指标之一,通胀(缩)率是联储局决定是否加息的最主要参考指标,而消费者物价指数则是最重要的通胀(缩)指标。通胀(缩)会影响到民生、政府的财政政策和民间的所有经济活动。通胀(缩)对于投资市场来说是件非常可怕的事情,因为它制造了经济的不稳定性和不确定性,给股市会带来波动和风险。

3.生产者物价指数 Producer Price Index (PPI)

跟CPI一样,这个指标是预测通货膨胀的重要指标之一,不过它反映的是生产者这个环节,是在通胀转移到消费者之前的数据,也就是说它对通胀和利率政策更具前瞻性,尽管其对消费者的影响力不如CPI。

4.采购经理人指数 Purchasing Managers Index (PMI)

PMI是一项全面的经济指标,概括了美国整体制造业状况、就业及物价表现,是全球最受关注的经济资料之一。采购经理人指数为每月第一个公布的重要数据,加上其所反映的经济状况较为全面,因此市场十分重视数据所反映的具体结果。在一般意义上讲采购经理人指数上升,会带来美元汇价上涨;采购经理人指数下降,会带来美元汇价的下跌。

5.非农就业数据 Non-farm Payrolls (NFP)

是美国非农业人口的就业数据,由美国劳工部每月公布一次,反应美国经济的趋势,数据好说明经济好转,数据差说明经济转坏。非农数据会影响美联储对美元的货币政策,经济差,美联储会倾向减息,美元贬值,经济好,美联储会倾向加息,美元升值。

本文将教你如何使用Python调用 FRED(Federal Reserve Economic Data) 数据库API获取以上相关数据。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install fredapi

2.注册账户获取FredApi权限

为了能够获取FRED的数据,你需要先注册账号、然后申请一个API秘钥,完全免费,三分钟就能解决。

进入FRED主页:https://research.stlouisfed.org

右上角有一个『My Account』,点进去后,选择『Create New Account』申请 FRED 账号:

注册完成后,会跳转到会员页,点击左侧API Keys:

申请API秘钥:

申请完毕后就能获得一个API Key了。

3.通过接口获取Fredapi经济指标数据

FRED 数据量非常庞大,其分为大分类和大分类的子项目。大分类我们可以通过这样的代码获得:

import requests
import pandas as pd
import datetime as dt
def fetch_releases(api_key):
    """
    取得 FRED 大分类信息
    Args:
        api_key (str): 秘钥
    """
    r = requests.get('https://api.stlouisfed.org/fred/releases?api_key='+api_key+'&file_type=json', verify=True)
    full_releases = r.json()['releases']
    full_releases = pd.DataFrame.from_dict(full_releases)
    full_releases = full_releases.set_index('id')
    # full_releases.to_csv("full_releases.csv")
    return full_releases

导出为CSV后,你能看到所有的大分类ID及其说明。

fredapi经济指标数据

每个大分类中有许多子项目,比如 355:Minimum Wage Rates 底下会有:
* FEDMINNFRWG:Nonfarm Workers Minimum Hourly Wage
* FEDMINFRMWG:Farm Workers Minimum Hourly Wage

每一个子项目也有一个专属的『子项目英文代码』,就是前面的那串英文字。

我们可以通过对大分类进行关键字搜索,获取我们文首提到的五个指标:

from fredapi import Fred
import requests
import numpy as np
import pandas as pd
import datetime as dt


def fetch_releases(api_key):
    """
    取得 FRED 大分类信息
    Args:
        api_key (str): 秘钥
    """
    r = requests.get('https://api.stlouisfed.org/fred/releases?api_key='+api_key+'&file_type=json', verify=True)
    full_releases = r.json()['releases']
    full_releases = pd.DataFrame.from_dict(full_releases)
    full_releases = full_releases.set_index('id')
    # full_releases.to_csv("full_releases.csv")
    return full_releases

  
def fetch_release_id_data(release_id):
    """
    按照分类ID获取数据

    Args:
        release_id (int): 大分类ID

    Returns:
        dataframe: 数据
    """
    econ_data = pd.DataFrame(index=pd.date_range(start='2000-01-01', end=dt.datetime.today(), freq='MS'))
    series_df = fred.search_by_release(release_id, limit=3, order_by='popularity', sort_order='desc')
    for topic_label in series_df.index:
        econ_data[series_df.loc[topic_label].title] = fred.get_series(topic_label, observation_start='2000-01-01', observation_end=dt.datetime.today())
    return econ_data


api_key = '填入你的API秘钥'

fred = Fred(api_key)

full_releases = fetch_releases(api_key)

keywords = ["producer price", "consumer price", "fomc", "manufacturing", "employment"]

for search_keywords in keywords:
    search_result = full_releases.name[full_releases.name.apply(lambda x: search_keywords in x.lower())]
    econ_data = pd.DataFrame(index=pd.date_range(start='2000-01-01', end=dt.datetime.today(), freq='MS'))

    for release_id in search_result.index:
        print("scraping release_id: ", release_id)
        econ_data = pd.concat([econ_data, fetch_release_id_data(release_id)], axis=1)
    econ_data.to_csv(f"{search_keywords}.csv")

上面就是完整的数据下载代码,如果你想直接获取脚本文件,请在Python实用宝典后台回复:FRED 下载。

填入你申请的 API 秘钥,运行脚本,就能获取我们想要的五个指标数据。会在当前文件夹下生成相应关键词的csv文件。比如 employment.csv 的内容如下:

里面包含了就业相关的许多数据,包括我们关注的非农数据等。

如果你想要更换关键词下载其他关键词的数据,也可以在keywords中进行增删。请注意,这里搜索必须使用小写单词。

获取数据只是第一步,最重要的是如何分析这些数据与股市的相关性。

德意志银行有一个研究发现历年来ISM(即PMI)指数的数值和标普500的同比增长数值是高度相关的。

类似于这样的数据分析切入点是非常有意思的,大家也可以尝试基于这些数据做一些自己的研究分析,说不定会有意外的发现。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化

什么格式是保存 Pandas 数据的最好格式?

在数据分析相关项目工作时,我通常使用Jupyter笔记本和pandas库来处理和移动我的数据。对于中等大小的数据集来说,这是一个非常直接的过程,你甚至可以将其存储为纯文本文件而没有太多的开销。

然而,当你的数据集中的观测数据数量较多时,保存和加载数据回内存的过程就会变慢,现在程序的重新启动都会迫使你等待数据重新加载。所以最终,CSV文件或任何其他纯文本格式都会失去吸引力。

我们可以做得更好。有很多二进制格式可以用来将数据存储到磁盘上,其中有很多格式pandas都支持。我们怎么能知道哪一种更适合我们的目的呢?

来吧,我们尝试其中的几个,然后进行对比!这就是我决定在这篇文章中要做的:通过几种方法将 pandas.DataFrame 保存到磁盘上,看看哪一种在I/O速度、内存消耗和磁盘空间方面做的更好。

在这篇文章中,我将展示我的测试结果。

1.要比较的格式

我们将考虑采用以下格式来存储我们的数据:

1. CSV — 数据科学家的一个好朋友
2. Pickle — 一种Python的方式来序列化事物
3. MessagePack — 它就像JSON,但又快又小
4. HDF5 — 一种设计用于存储和组织大量数据的文件格式
5. Feather — 一种快速、轻量级、易于使用的二进制文件格式,用于存储数据框架
6. Parquet — Apache Hadoop的柱状存储格式

所有这些格式都是被广泛使用的,而且(也许除了MessagePack)在你做一些数据分析的事情时非常经常遇到。

为了追求找到最好的缓冲格式来存储程序会话之间的数据,我选择了以下指标进行比较。

1. size_mb – 文件大小(Mb)。
2. save_time – 将数据帧保存到磁盘上所需的时间量。
3. load_time – 将之前转储的数据帧加载到内存中所需要的时间量。
4. save_ram_delta_mb – 数据帧保存过程中最大的内存消耗增长量。
5. load_ram_delta_mb – 数据帧加载过程中的最大内存消耗增长量。

请注意,当我们使用高效压缩的二进制数据格式,如 Parquet 时,最后两个指标变得非常重要。它们可以帮助我们估计加载序列化数据所需的内存量,此外还有数据大小本身。我们将在接下来的章节中更详细地讨论这个问题。

2.测试及结果

我决定使用一个合成数据集进行测试,以便更好地控制序列化的数据结构和属性。

另外,我在我的基准中使用了两种不同的方法:

(a) 将生成的分类变量保留为字符串。

(b) 在执行任何I/O之前将它们转换为 pandas.Categorical 数据类型。

函数generate_dataset显示了我在基准中是如何生成数据集的:

def generate_dataset(n_rows, num_count, cat_count, max_nan=0.1, max_cat_size=100):
    """
    随机生成具有数字和分类特征的数据集。
    
    数字特征取自正态分布X ~ N(0, 1)。
    分类特征则被生成为随机的uuid4字符串。
    
    此外,数字和分类特征的max_nan比例被替换为NaN值。
    """
    dataset, types = {}, {}
    
    def generate_categories():
        from uuid import uuid4
        category_size = np.random.randint(2, max_cat_size)
        return [str(uuid4()) for _ in range(category_size)]
    
    for col in range(num_count):
        name = f'n{col}'
        values = np.random.normal(0, 1, n_rows)
        nan_cnt = np.random.randint(1, int(max_nan*n_rows))
        index = np.random.choice(n_rows, nan_cnt, replace=False)
        values[index] = np.nan
        dataset[name] = values
        types[name] = 'float32'
        
    for col in range(cat_count):
        name = f'c{col}'
        cats = generate_categories()
        values = np.array(np.random.choice(cats, n_rows, replace=True), dtype=object)
        nan_cnt = np.random.randint(1, int(max_nan*n_rows))
        index = np.random.choice(n_rows, nan_cnt, replace=False)
        values[index] = np.nan
        dataset[name] = values
        types[name] = 'object'
    
    return pd.DataFrame(dataset), types

我们将CSV文件的保存和加载性能作为一个基准。

五个随机生成的具有一百万个观测值的数据集被转储到CSV中,并读回内存以获得平均指标。

每种二进制格式都针对20个随机生成的具有相同行数的数据集进行测试。

这些数据集包括15个数字特征和15个分类特征。你可以在这个资源库中找到带有基准测试功能和所需的完整源代码:

https://github.com/devforfu/pandas-formats-benchmark

或在Python实用宝典后台回复 Pandas IO对比 ,下载完整代码。

(a) 数据为字符串特征时的性能

下图显示了每种数据格式的平均I/O时间。一个有趣的观察是,hdf显示出比csv更慢的加载速度,而其他二进制格式的表现明显更好。其中最令人印象深刻的是feather和parquet。

在保存数据和从磁盘上读取数据时,内存开销如何?

下一张图片告诉我们,hdf 的表现就不是那么好了。可以肯定的是,csv在保存/加载纯文本字符串时不需要太多的额外内存,而Feather和parquet则相当接近:

最后,让我们看看文件的大小。这次parquet显示了一个令人印象深刻的结果,考虑到这种格式是为有效存储大量数据而开发的,这并不令人惊讶。

(b) 字符串特征转换为数字时的性能

在上一节中,我们没有尝试有效地存储我们的分类特征而是使用普通的字符串。让我们来弥补这个遗漏吧! 这一次我们使用一个专门的 pandas.Categorical 类型,转字符串特征为数字特征。

看看现在与纯文本的csv相比,它看起来如何!

现在所有的二进制格式都显示出它们的真正力量。Csv的基准结果已经远远落后了,所以让我们把它去掉,以便更清楚地看到各种二进制格式之间的差异:

Feather 和 Pickle 显示了最好的 I/O 速度,而 hdf 仍然显示了明显的性能开销。

现在是时候比较数据进程加载时的内存消耗了。下面的柱状图显示了我们之前提到的关于parquet格式的一个重要事实。

可以看到 parquet 读写时的内存空间差距有多大,你有可能你无法将比较大的 parquet 文件加载到内存中。

最后的图显示了各格式的文件大小。所有的格式都显示出良好的效果,除了hdf仍然需要比其他格式多得多的空间:

3.结论

正如我们的测试所显示的,似乎 feather 格式是存储Python会话数据的理想候选者。它显示了很快的I/O速度,在磁盘上不占用太多内存,并且在加载回RAM时不需要消耗太大的内存。

当然,这种比较并不意味着你应该在每个可能的情况下使用这种格式。例如,feather格式一般不会被用作长期文件存储的格式。

另外,某些特定情况下也无法使用 feather,这由你的整个程序架构决定。然而,就如本帖开头所述的目的,它在不被任何特殊事项限制的情况下是一个很好的选择。

本文译自 towardsdatascience
作者: Ilia Zaitsev
有部分修改。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化

Sweetviz 让你三行代码实现数据分析

Sweetviz是一个开源Python库,它只需三行代码就可以生成漂亮的高精度可视化效果来启动EDA(探索性数据分析)。输出一个HTML。

如上图所示,它不仅能根据性别、年龄等不同栏目纵向分析数据,还能对每个栏目做众数、最大值、最小值等横向对比。

所有输入的数值、文本信息都会被自动检测,并进行数据分析、可视化和对比,最后自动帮你进行总结,是一个探索性数据分析的好帮手。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install sweetviz

2.sweetviz 基本用法

sweetviz 使用的原理是,使用一行代码,生成一个数据报告的对象(其中,my_dataframe是pandas中的DataFrame,一种表格型数据结构):

import pandas as pd
import sweetviz as sv

# 读取数据
my_dataframe = pd.read_csv('../ImpartData/iris.csv')
# 分析数据
my_report = sv.analyze(my_dataframe)
# 生成报告
my_report.show_html()

执行完成后,会在当前文件夹下生成一个HTML的报告文件

双击这个html,你就能看到精美的分析报告了:

其中,分析数据有三种函数可以用,除了上面提到的analyze函数,还有 compare 和 compare_intra 函数。

首先是analyze函数:

analyze(source: Union[pd.DataFrame, Tuple[pd.DataFrame, str]],
            target_feat: str = None,
            feat_cfg: FeatureConfig = None,
            pairwise_analysis: str = 'auto')

可见其有以下4个参数可以配置:

  • source:以pandas中的DataFrame数据结构作为分析对象。
  • target_feat:需要被标记为目标对象的字符串。
  • feat_cfg:需要被跳过、或是需要被强制转换为某种数据类型的特征。
  • pairwise_analysis:相关性分析可能需要花费较长时间。如果超过了你的忍受范围,就需要设置这个参数为on或者off,以判断是否需要分析数据相关性。

compare()丨两个数据集比较

my_report = sv.compare([my_dataframe, "Training Data"], [test_df, "Test Data"], "Survived", feature_config)

要比较两个数据集,只需使用该 compare() 函数。它的参数与 analyze() 相同,只是插入了第二个参数来覆盖比较数据帧。建议使用 [dataframe, “name”] 参数格式以更好地区分基础数据帧和比较数据帧。(例如 [my_df, "Train"] 比 my_df 更好)

compare_intra()丨数据集栏目比较

my_report = sv.compare_intra(my_dataframe, my_dataframe["Sex"] == "male", ["Male", "Female"], feature_config)

想要对数据集中某个栏目下的参数进行分析,就采用这个函数进行。
例如,如果需要比较“性别”栏目下的“男性”和“女性”,就可以采用这个函数。

3.调整报告布局

一旦你创建了你的报告对象,只需将它传递给两个show函数中的一个:

1. show_html():

show_html(  filepath='SWEETVIZ_REPORT.html', 
            open_browser=True, 
            layout='widescreen', 
            scale=None)

show_html(…)将在当前文件路径中创建并保存 HTML 报告。有以下参数:

  • layout (布局):无论是'widescreen''vertical'。当鼠标移过每个功能时,宽屏布局会在屏幕右侧显示详细信息。新的(从 2.0 开始)垂直布局在水平方向上更加紧凑,并且可以在单击时扩展每个细节区域。
  • scale:使用浮点数(scale= 0.8None)来缩放整个报告。
  • open_browser:启用 Web 浏览器的自动打开以显示报告。如果不需要,可以在此处禁用它。

2.show_notebook():

show_notebook(  w=None, 
                h=None, 
                scale=None,
                layout='widescreen',
                filepath=None)

它将嵌入一个 IFRAME 元素,在notebook中显示报告(例如 Jupyter、Google Colab 等)。

请注意,由于Notebook通常是一个更受限制的环境,因此使用自定义宽度/高度/比例值 ( whscale) 可能是个好主意。选项是:

  • w(宽度):设置报告输出窗口的宽度。可以是百分比字符串 ( w="100%") 或像素 (w=900)。
  • h(高度):设置报告输出窗口的高度。可以是像素数 ( h=700) 或将窗口拉伸到与所有特征 ( h="Full")一样高。
  • scale:与上面的 show_html 相同。
  • layout:与上面的 show_html 相同。
  • scale:与上面的 show_html 相同。
  • filepath:可选的输出 HTML 报告。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化

数学神器!Sympy 模块解数学方程解微积分

SymPy 是一个Python库,专注于符号数学,它的目标是成为一个全功能的计算机代数系统,同时保持代码简洁、易于理解和扩展。

举一个简单的例子,比如说展开二次方程:

from sympy import *
x = Symbol('x')
y = Symbol('y')
d = ((x+y)**2).expand()
print(d)
# 结果:x**2 + 2*x*y + y**2

你可以随便输入表达式,即便是十次方,它都能轻易的展开,非常方便:

from sympy import *
x = Symbol('x')
y = Symbol('y')
d = ((x+y)**10).expand()
print(d)
# 结果:x**10 + 10*x**9*y + 45*x**8*y**2 + 120*x**7*y**3 + 210*x**6*y**4 + 252*x**5*y**5 + 210*x**4*y**6 + 120*x**3*y**7 + 45*x**2*y**8 + 10*x*y**9 + y**10

下面就来讲讲这个模块的具体使用方法和例子。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install Sympy

2.基本使用

简化表达式(化简)

sympy支持三种化简方式,分别是普通化简、三角化简、指数化简。

普通化简 simplify( ):

from sympy import *
x = Symbol('x')
d = simplify((x**3 + x**2 - x - 1)/(x**2 + 2*x + 1))
print(d)
# 结果:x - 1

三角化简 trigsimp( ):

from sympy import *
x = Symbol('x')
d = trigsimp(sin(x)/cos(x))
print(d)
# 结果:tan(x)

指数化简 powsimp( ):

from sympy import *
x = Symbol('x')
a = Symbol('a')
b = Symbol('b')
d = powsimp(x**a*x**b)
print(d)
# 结果:x**(a + b)

解方程 solve()

第一个参数为要解的方程,要求右端等于0,第二个参数为要解的未知数。

如一元一次方程:

from sympy import *
x = Symbol('x')
d = solve(x * 3 - 6, x)
print(d)
# 结果:[2]

二元一次方程:

from sympy import *
x = Symbol('x')
y = Symbol('y')
d = solve([2 * x - y - 3, 3 * x + y - 7],[x, y])
print(d)
# 结果:{x: 2, y: 1}

求极限 limit()

dir=’+’表示求解右极限,dir=’-‘表示求解左极限:

from sympy import *
x = Symbol('x')
d = limit(1/x,x,oo,dir='+')
print(d)
# 结果:0
d = limit(1/x,x,oo,dir='-')
print(d)
# 结果:0

求积分 integrate( )

先试试求解不定积分:

from sympy import *
x = Symbol('x')
d = integrate(sin(x),x)
print(d)
# 结果:-cos(x)

再试试定积分:

from sympy import *
x = Symbol('x')
d = integrate(sin(x),(x,0,pi/2))
print(d)
# 结果:1

求导 diff()

使用 diff 函数可以对方程进行求导:

from sympy import *
x = Symbol('x')
d = diff(x**3,x)
print(d)
# 结果:3*x**2

d = diff(x**3,x,2)
print(d)
# 结果:6*x

解微分方程 dsolve( )

以 y′=2xy 为例:

from sympy import *
x = Symbol('x')
f = Function('f')
d = dsolve(diff(f(x),x) - 2*f(x)*x,f(x))
print(d)
# 结果:Eq(f(x), C1*exp(x**2))

3.实战一下

今天群里有同学问了这个问题,“大佬们,我想问问,如果这个积分用Python应该怎么写呢,谢谢大家”:

from sympy import *
x = Symbol('x')
y = Symbol('y')
d = integrate(x-y, (y, 0, 1))
print(d)
# 结果:x - 1/2

为了计算这个结果,integrate的第一个参数是公式,第二个参数是积分变量及积分范围下标和上标。

运行后得到的结果便是 x – 1/2 与预期一致。

如果大家也有求解微积分、复杂方程的需要,可以试试sympy,它几乎是完美的存在。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化

Python 全自动解密解码神器 — Ciphey

Python 全自动解密解码神器 — Ciphey

Ciphey 是一个使用自然语言处理和人工智能的全自动解密/解码/破解工具。

简单地来讲,你只需要输入加密文本,它就能给你返回解密文本。就是这么牛逼。

有了Ciphey,你根本不需要知道你的密文是哪种类型的加密,你只知道它是加密的,那么Ciphey就能在3秒甚至更短的时间内给你解密,返回你想要的大部分密文的答案。

下面就给大家介绍 Ciphey 的实战使用教程

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install -U ciphey

2.Ciphey 基本使用

有3种方法可以运行 Ciphey:

1. 文件输入:

ciphey -f encrypted.txt
# 或
python -m ciphey -f encrypted.txt

2.不规范的方法:

ciphey -- "Encrypted input"
# 或
python -m ciphey -- "Encrypted input"

3.正常方式

ciphey -t "Encrypted input"
# 或
python -m ciphey -t "Encrypted input"

如下图所示:

要去除进度条、概率表和所有噪音,请使用安静模式:

ciphey -t "encrypted text here" -q

3.在Python中调用Ciphey

如果你需要在Python中使用Ciphey,请这样使用:

# Python实用宝典
# 2021/07/19
from ciphey.__main__ import main, main_decrypt, make_default_config
main_decrypt(make_default_config("SGVsbG8gbXkgbmFtZSBpcyBiZWUgYW5kIEkgbGlrZSBkb2cgYW5kIGFwcGxlIGFuZCB0cmVl"))
# >> Hello my name is bee and I like dog and apple and tree

运行后会输出如下的结果:

效果还是相当不错的,如果你不想输出概率表,只想要解密内容,代码需要这么写:

# Python实用宝典
# 2021/07/19
from ciphey.__main__ import main, main_decrypt, make_default_config
config = make_default_config("SGVsbG8gbXkgbmFtZSBpcyBiZWUgYW5kIEkgbGlrZSBkb2cgYW5kIGFwcGxlIGFuZCB0cmVl")
config["grep"] = True
main_decrypt(config)
# >> Hello my name is bee and I like dog and apple and tree

非常Nice,你根本无需知道这是什么编码。

Ciphey 支持解密的密文和编码多达51种,下面列出一些基本的选项

基本密码:

  • Caesar Cipher
  • ROT47 (up to ROT94 with the ROT47 alphabet)
  • ASCII shift (up to ROT127 with the full ASCII alphabet)
  • Vigenère Cipher
  • Affine Cipher
  • Binary Substitution Cipher (XY-Cipher)
  • Baconian Cipher (both variants)
  • Soundex
  • Transposition Cipher
  • Pig Latin

现代密码学:

  • Repeating-key XOR
  • Single XOR

编码:

  • Base32
  • Base64
  • Z85 (release candidate stage)
  • Base65536 (release candidate stage)
  • ASCII
  • Reversed text
  • Morse Code
  • DNA codons (release candidate stage)
  • Atbash
  • Standard Galactic Alphabet (aka Minecraft Enchanting Language)
  • Leetspeak
  • Baudot ITA2
  • URL encoding
  • SMS Multi-tap
  • DMTF (release candidate stage)
  • UUencode
  • Braille (Grade 1)
  • ……

Ciphey 的功能不仅于本文介绍的这些,本文所介绍的只是冰山一角,它还可以添加属于你自己的解码器:

https://github.com/Ciphey/Ciphey/wiki/Adding-your-own-ciphers

如果要进一步的学习,你可以在上述 Ciphey 的 Github Wiki 介绍中,查阅到更多的资料,进行更深层次的学习。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化

太酷炫了,我用python画出了北上广深的地铁路线动态图

今天教大家用python制作北上广深——地铁线路动态图,这可能是全网最全最详细的教程了。

坐标点的采集

小五之前做过类似的地理可视化,不过都是使用网络上收集到的json数据。但很多数据其实是过时的,甚至是错误/不全的。所以我们最好还是要自己动手,丰衣足食(爬虫大法好)。

打开高德地图的地铁网页,http://map.amap.com/subway/index.html?&1100

可以轻松得到北京地铁数据的接口,同理也把其他三个城市的url复制出来。

有了api,解析json即可获得数据👇

url = 'http://map.amap.com/service/subway?_1615466846985&srhdata=1100_drw_beijing.json'
response = requests.get(url)
result = json.loads(response.text)
stations = []
for i in result['l']:
    station = []
    for a in i['st']:
        station.append([float(b) for b in a['sl'].split(',')])
    stations.append(station)
pprint.pprint(stations)

pprint格式化打印结果,方便预览

坐标系的转换

其实我之前有看到类似地理可视化文章,结果自己一试发现缩小看还行,一放大就会发现坐标点飘出二里地了😂

正好拿上文获取的坐标点给大家演示一下,看看同样的经纬度在不同地图里的地理位置👇

👆可以看到该经纬度在高德地图里指的是金安桥地铁站,然而在百度地图里,地理位置则指向了几公里外的某大厦。

为什么会出现这个问题呢?

其实是不同地图产品的地理坐标系导致的。

下面说一下常见的地理坐标系:地球坐标系是国际通用坐标系,比较适合国际地图可视化。不过在我国范围内,一般不会直接使用它,而是使用由国家测绘局在其基础上加密的火星坐标系。另外还有公司会在火星坐标系上进行二次加密,比如百度坐标系、搜狗坐标系等。

我网上找到了一张图,来自知乎@师大Giser[1]👇

上图可以作为参考,具体原因我们就不细究了。重点是什么,如何利用python转换坐标系?

例如在本文中,我们是在高德地图中获得的坐标点集合,那么也就是使用的是GCJ-02坐标系。而下文可视化中会调用百度地图的接口,也就是需要在BD-09坐标系中进行可视化。

幸好我在网上搜到了GCJ-02BD-09的公式,并用python实现此公式:

#需要的两个常量先设置好
pi = 3.1415926535897932384 
r_pi = pi * 3000.0/180.0

def gcj02_bd09(lon_gcj02,lat_gcj02):
    b = math.sqrt(lon_gcj02 * lon_gcj02 + lat_gcj02 * lat_gcj02) + 0.00002 * math.sin(lat_gcj02 * r_pi)
    o = math.atan2(lat_gcj02 , lon_gcj02) + 0.000003 * math.cos(lon_gcj02 * r_pi)
    lon_bd09 = b * math.cos(o) + 0.0065
    lat_bd09 = b * math.sin(o) + 0.006
    return [lon_bd09,lat_bd09]

这样我们就写好了一个python将GCJ-02坐标系转成BD-09的函数,调用这个函数,就可以将高德地图获取的坐标点集合统统转换成百度坐标系。

result = []
for station in stations:
    result.append([gcj02_bd09(*point) for point in station])

以其中一个坐标点为例:

到此,我们的前期数据工作终于准备齐了。

当然,如果我们一开始获取的数据就是BD_09(百度地图)坐标系的,转换这步就可以直接省略喽~

地理可视化

接下来就要利用pyecharts中的BMap来可视化了,不过需要先获取百度开放平台的密钥。

百度地图开放平台👉http://lbsyun.baidu.com/apiconsole/key#/home

登录百度账户,查看应用管理-我的应用。点击创建应用,全部默认随便创建。

复制👆上图中的访问应用(AK),保存好,这在后续的可视化中将要用到。

我们使用pyecharts中的BMap,先导入模块

from pyecharts.charts import BMap 
from pyecharts import options as opts 
from pyecharts.globals import BMapType, ChartType 

在导入数据(也就是上文转换后的经纬度数据result)后,可以调整一下参数以及增添一些控件。

👇关键参数都做了注释,方便大家查看(其中百度appkey记得替换成自己的)

map_b = (
    BMap(init_opts = opts.InitOpts(width = "800px", height = "600px"))
    .add_schema(
        baidu_ak = '****************'#百度地图开发应用appkey
        center = [116.40396339.915119], #当前视角的中心点
        zoom = 10#当前视角的缩放比例
        is_roam = True#开启鼠标缩放和平移漫游
    )
    .add(
        series_name = "",
        type_ = ChartType.LINES, #设置Geo图类型
        data_pair = result, #数据项
        is_polyline = True#是否是多段线,在画lines图情况下#
        linestyle_opts = opts.LineStyleOpts(color = "blue", opacity = 0.5, width = 1), # 线样式配置项
    )
    .add_control_panel(
        maptype_control_opts = opts.BMapTypeControlOpts(type_ = BMapType.MAPTYPE_CONTROL_DROPDOWN), #切换地图类型的控件
        scale_control_opts = opts.BMapScaleControlOpts(), #比例尺控件
        overview_map_opts = opts.BMapOverviewMapControlOpts(is_open = True), #添加缩略地图
        navigation_control_opts = opts.BMapNavigationControlOpts() #地图的平移缩放控件
    )
)

map_b.render(path = 'subway_beijing.html')

注:因为是北京地图,所以设置天安门的经纬度[116.403963, 39.915119]为视角中心。

让我们看一下可视化的结果吧:

👆上图中的四个角都有控件,这是我们在代码中添加了控件参数,它们分别为:地图的平移缩放控件、切换地图类型的控件、缩略地图、以及比例尺控件。

是不是还阔以

其他效果展示

上文已经基本实现了用python制作地铁线路动态图。不过大家都用同一种颜色背景制作动态图的话,就显得就太单调了。

正好我们还要绘制其他三个城市的地铁图,那就调整一些参数,看看能获得什么效果吧?

上海-变色

上海的数据接口是:

http://map.amap.com/service/subway?_1615467204533&srhdata=3100_drw_shanghai.json

上海市的地铁图我们改一下line的颜色,可在参数linestyle_opts中修改color。

👇下图中的线条颜色是lilac——浅紫色

广州-卫星图

广州的数据接口是:

http://map.amap.com/service/subway?_1615494419554&srhdata=4401_drw_guangzhou.json

其实我们还可以调整可视化背景为卫星图。不过这一操作并不需要额外写代码,因为刚刚上文提到我在调整参数时添加了4个控件,其中右上角的就可以直接切换地图类型,具体操作见下图。

深圳-个性化配色

深圳的数据接口是:

http://map.amap.com/service/subway?_1615494473615&srhdata=4403_drw_shenzhen.json

如果不满意百度地图设置好的地图背景,我们还可以个性化设置mapStyle,调整自己的配色styleJson

下图就是小五参考网上公开的配色方案制作的,大家也可以用来参考https://blog.csdn.net/weixin_41290949/article/details/106379134[2]

小结

今天带大家学习了如何利用python绘制一线城市的地铁线路动图。

主要分为四个部分:坐标点的采集、坐标系的转换、利用pyecharts地理可视化、其他效果展示。

如果你读完本文觉得有收获,希望可以给文章右下角点个赞👍

我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应红字验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

点击下方阅读原文可获得更好的阅读体验

Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 电信用户流失预测模型实战教程

1.流失预测 研究背景

1、做好用户流失预测可以降低营销成本。老生常谈,新客户开发成本老客户维护成本的5倍。

2、获得更好的用户体验。并不是所有的增值服务都可以有效留住客户。

3、获得更高的销售回报。价格敏感型客户和非价格敏感性客户。

2.提出问题

1、流失客户有哪些显著性特征?

2、当客户在哪些特征下什么条件下比较容易发生流失

3.数据集描述

该数据是datafountain上的《电信客户流失数据》,这里提供一个下载地址。
https://www.datafountain.cn/datasets/35guide

该数据集有21个变量,7043个数据点。变量可分为以下三个部分:用户属性、用户行为、研究对象。

用户属性
customerID :用户ID
gender:性别(Female & Male)
SeniorCitizen :老年人(1表示是,0表示不是)
Partner :是否有配偶(Yes or No)
Dependents :是否经济独立(Yes or No)
tenure :客户的职位(0-72,共73个职位)

用户行为
PhoneService :是否开通电话服务业务(Yes or No)
MultipleLines :是否开通了多线业务(Yes 、No or No phoneservice 三种)
InternetService :是否开通互联网服务(No, DSL数字网络,fiber optic光纤网络 三种)
OnlineSecurity :是否开通网络安全服务(Yes,No,No internetserive 三种)
OnlineBackup :是否开通在线备份业务(Yes,No,No internetserive 三种)
DeviceProtection :是否开通了设备保护业务(Yes,No,No internetserive 三种)
TechSupport :是否开通了技术支持服务(Yes,No,No internetserive 三种)
StreamingTV :是否开通网络电视(Yes,No,No internetserive 三种)
StreamingMovies :是否开通网络电影(Yes,No,No internetserive 三种)
Contract :签订合同方式 (按月,一年,两年)
PaperlessBilling :是否开通电子账单(Yes or No)
PaymentMethod :付款方式(bank transfer,credit card,electronic check,mailed check)
MonthlyCharges :月费用
TotalCharges :总费用

研究对象Churn:该用户是否流失(Yes or No)

4.分析思路

分析视角分析方法的灵魂。

分析方法有上百种,但分析视角只有四种:

  • 对比视角
  • 分类视角
  • 相关视角
  • 描述视角

一旦将业务需求拆解成指标,接下来只需要针对每个指标进行分析视角四选一即可。

数据集描述,已经将变量分为三个维度了:用户属性、用户行为、研究对象(是否流失客户),三个维度组合一下就得出了以下解题思路了:

  • 哪些属性的用户比较容易流失?
  • 哪些行为的用户比较容易流失?

以上两个分析思路运用的是【对比视角】,该视角下具体的分析方法有:

  • 数值型数据:均值比较
  • 分类型数据:频数分布比较(交叉分析)

以上的分析方法是统计分析,只能一个维度一个维度地去比较。但实际情况中,并不是每个维度的权重都一样的,那如何去研究各个维度的权重?

权重问题属于分类视角,故我们可以采用分类模型,要用哪个分类模型呢?不知道。可以全部采用,看模型精度得分,然后选得分最高的模型进行进一步预测。

  • Random Forest 随机森林
  • SVC 支持向量机
  • LogisticRegression 逻辑回归
  • KNN 近邻算法
  • Naive Bayes  朴素贝叶斯
  • Decision Tree 决策树
  • AdaBoost
  • GradientBoosting
  • XGB
  • CatBoost

5.分析结论及运营建议

5.1 分析结论

Python 电信用户流失预测模型实战教程

综合统计分析XGB算法输出特征重要性得出流失客户有以下特征(依特征重要性从大到小排列):

  1. tenure :1-5号职位的用户比较容易流失
  2. PaymentMethod :使用电子支票支付的人
  3. MonthlyCharges 、TotalCharges : 总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失
  4. PaperlessBilling : 开通电子账单
  5. Partner : 单身
  6. OnlineBackup : 没开通在线备份业务
  7. InternetService :开通了Fiber optic 光纤网络
  8. TechSupport :没开通“技术支持服务”
  9. DeviceProtection :没开通设备保护业务
  10. OnlineSecurity :没开通网络安全服务
  11. Contract :按月签订合同方式
  12. Dependents :无经济独立
  13. SeniorCitizen :青年人
  14. TotalCharges :总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失

当条件覆盖得越多,人群越精确,但与此同时,覆盖的人群也会越少。业务方可直接在数据库中,通过SQL检索符合要求的客户,然后做针对性的运营工作。

5.2 运营建议

如何留住客户,可以从两方面去思考:

  • 增加用户的沉没成本(损失厌恶)
    • 会员等级
    • 积分制
    • 充值赠送
    • 满减券
    • 其他增值服务
  • 培养用户的条件反射(习惯)
    • 会员日
    • 定期用户召回
    • 签到
    • 每日定时抽奖
    • 小游戏

电子账单解锁新权益

  • 现象:“开通电子账单”的人反而容易流失。
  • 基本假设:价格敏感型客户。电子账单,让客户理性消费。
  • 建议:让“电子账单”变成一项“福利。跟连锁便利店,联名发”商品满减券”,每月的账单时间,就将”商品满减券“和账单一起推送过去。文案:您上月消费了XX元,解锁了xx会员权益。
  • 底层规律:增加沉没成本。

“单身用户”尊享亲情网

  • 现象:“单身用户”容易流失。
  • 基本假设:社交欲望低。
  • 建议:一个单身用户拥有建立3个人以内的“亲情网”的权益。
  • 底层规律:增加沉没成本。

推广“在线备份、设备保护、技术支持、网络保护”等增值服务。

6.实战教程-数据清洗

6.1 导入模块

6.1.1 数据处理

import pandas as pd
import numpy as np

6.1.2 可视化

import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style='darkgrid',font_scale=1.3)
plt.rcParams['font.family']='SimHei'
plt.rcParams['axes.unicode_minus']=False

6.1.3 特征工程

import sklearn
from sklearn import preprocessing                            #数据预处理模块
from sklearn.preprocessing import LabelEncoder               #编码转换
from sklearn.preprocessing import StandardScaler             #归一化
from sklearn.model_selection import StratifiedShuffleSplit   #分层抽样
from sklearn.model_selection import train_test_split         #数据分区
from sklearn.decomposition import PCA                        #主成分分析 (降维)

6.1.4 分类算法

from sklearn.ensemble import RandomForestClassifier     #随机森林
from sklearn.svm import SVC,LinearSVC                   #支持向量机
from sklearn.linear_model import LogisticRegression     #逻辑回归
from sklearn.neighbors import KNeighborsClassifier      #KNN算法
from sklearn.cluster import KMeans                     #K-Means 聚类算法
from sklearn.naive_bayes import GaussianNB              #朴素贝叶斯
from sklearn.tree import DecisionTreeClassifier         #决策树

6.1.5 分类算法–集成学习

import xgboost as xgb
from xgboost import XGBClassifier                      
from catboost import CatBoostClassifier                
from sklearn.ensemble import AdaBoostClassifier        
from sklearn.ensemble import GradientBoostingClassifier 

6.1.6 模型评估

from sklearn.metrics import classification_report,precision_score,recall_score,f1_score  #分类报告
from sklearn.metrics import confusion_matrix           #混淆矩阵
from sklearn.metrics import silhouette_score           #轮廓系数(评价k-mean聚类效果)
from sklearn.model_selection import GridSearchCV       #交叉验证
from sklearn.metrics import make_scorer
from sklearn.ensemble import VotingClassifier          #投票

6.1.7 忽略警告

import warnings
warnings.filterwarnings('ignore')

6.2 读取数据

df=pd.read_csv(r'C:\Users\Think\Desktop\刻意练习数据\电信数据集\Customer-Churn.csv',header=0)
#预览数据
df.head()
#查看数据大小
df.shape
#查看数据数据及分布
df.describe()

这里安利一下spyder编辑器,下图是这个编辑器的界面。编程过程中,有赋值变量的操作,该编辑器都会在右上角呈现,双击一下,就可以像在Execel上查看数据,非常方便。图片查看该数据集的详情。图片

6.3 数据清洗

6.3.1 缺失值处理

#查看缺失值
df.isnull().sum()

注:缺失值的数据类型是 float 类型。一旦有变量的数据类型转换成float 类型,需再次查看缺失值。图片

6.3.2 重复值处理

#查看重复值
df.duplicated().sum()

【输出】图片

6.3.3 数值类型转换

#查看数据类型
df.info()

【输出】图片TotalCharages总费用应该跟MonthlvCharges是同一个数据类型(float64)。故需将TotalCharages由object转换成float64,且需要再次查看缺失值。

#总费用 TotalCharges  该列的数据类型应是float64,不是object
# df['TotalCharges'].astype('float64')
# 此处用“astype”转化数据类型报错 “could not convert string to float”
#改用强制转化 convert_numeric=True   
df['TotalCharges']=df['TotalCharges'].convert_objects(convert_numeric=True)
df['TotalCharges'].dtype

输出如下:图片再次查看缺失值:图片TotalCharges列有11个缺失值,处理缺失值的原则是尽量填充,最后才是删除。
缺失值填充的原则:

  • 分类型数据:众数填充
  • 数值型数据:正态分布,均值/中位数填充;偏态分布,中位数填充。

TotalCharges列是数值型数据,先画直方图查看数据分布形态。

#分别作直方图:全部客户类型、流失客户类型、留存客户类型
plt.figure(figsize=(14,5))
plt.subplot(1,3,1)
plt.title('全部客户的总付费直方图')
sns.distplot(df['TotalCharges'].dropna())

plt.subplot(1,3,2)
plt.title('流失客户的总付费直方图')
sns.distplot(df[df['Churn']=='Yes']['TotalCharges'].dropna())

plt.subplot(1,3,3)
plt.title('留存客户的总付费直方图')
sns.distplot(df[df['Churn']=='No']['TotalCharges'].dropna())

结果如下:图片从三个直方图看,该列数据是偏态分布,故选择中位数填充。

df.fillna({'TotalCharges':df['TotalCharges'].median()},inplace=True)
#再次确认是否还有空值
df.isnull().sum()

结果如下:图片

6.4 查看样本分布

研究对象’Churn’列重新编码“Yes”=1,“No”=0。重新编码有下面两种方法。

方法一:replace

df['Churn'].replace(to_replace = 'Yes', value = 1,inplace = True)
df['Churn'].replace(to_replace = 'No', value = 0,inplace = True)

方法二:map函数

df['Churn']=df['Churn'].map({'Yes':1,'No':0})

预览数据:

df['Churn'].head()

结果如下:图片绘制饼图,查看流失客户占比。

churn_value=df["Churn"].value_counts()
labels=df["Churn"].value_counts().index

plt.figure(figsize=(7,7))
plt.pie(churn_value,labels=labels,colors=["b","w"], explode=(0.1,0),autopct='%1.1f%%', shadow=True)
plt.title("流失客户占比高达26.5%")
plt.show()  

结果如下:图片【分析】:流失客户样本占比26.5%,留存客户样本占比73.5%,明显的“样本不均衡”。

解决样本不均衡有以下方法可以选择:

  • 分层抽样
  • 过抽样
  • 欠抽样

7.实战教程-特征选择

提取特征

feature=df.iloc[:,1:20]

7.1 整数编码

查看变量间的两两相关性

#重新编码
corr_df = feature.apply(lambda x: pd.factorize(x)[0])
corr_df.head()
#相关性矩阵
corr=corr_df.corr()
corr

结果如下:图片相关性矩阵可视化

#绘制热力图观察变量之间的相关性强弱
plt.figure(figsize=(15,12))
ax = sns.heatmap(corr, xticklabels=corr.columns, yticklabels=corr.columns, 
                 linewidths=0.2, cmap="RdYlGn",annot=True)
plt.title("Correlation between variables")

结果如下:图片【分析】:从热力图来看,互联网服务、网络安全、在线备份、设备维护服务、技术支持服务、开通网络电视服务、开通网络电影之间相关性很强,且是正相关。电话服务和多线业务之间也存在很强的正相关关系。

7.2 独热编码

查看研究对象”Churn”与其他变量下的标签相关性。独热编码,可以将分类变量下的标签转化成列

df_onehot = pd.get_dummies(df.iloc[:,1:21])
df_onehot.head()

结果如下:图片绘图查看用户流失(‘Churn’)与各个维度之间的关系

plt.figure(figsize=(15,6))
df_onehot.corr()['Churn'].sort_values(ascending=False).plot(kind='bar')
plt.title('Correlation between Churn  and variables ')

结果如下:图片【分析】:从图看gender(性别)、PhoneService(电话服务)相关性几乎为0,故两个维度可以忽略。[‘SeniorCitizen’,’Partner’,’Dependents’, ‘Contract’,MultipleLines,’InternetService’,  ‘OnlineSecurity’, ‘OnlineBackup’, ‘DeviceProtection’,’TechSupport’, ‘StreamingTV’, ‘StreamingMovies’,’PaperlessBilling’,’PaymentMethod’]等都有较高的相关性,将以上维度合并成一个列表kf_var,然后进行频数比较。

kf_var=list(df.columns[2:5])
for var in list(df.columns[7:18]):
    kf_var.append(var)
print('kf_var=',kf_var)

结果如下:图片

8.实战教程-统计分析

8.1 频数分布比较

8.1.1 卡方检验

组间有显著性差异,频数分布比较才有意义,否则可能会做无用功。
“卡方检验”,就是提高频数比较结论可信度的统计方法。

#分组间确实是有显著性差异,频数比较的结论才有可信度,故需进行”卡方检验“
from scipy.stats import chi2_contingency   #统计分析 卡方检验
#自定义卡方检验函数
def KF(x):
    df1=pd.crosstab(df['Churn'],df[x])
    li1=list(df1.iloc[0,:])
    li2=list(df1.iloc[1,:])
    kf_data=np.array([li1,li2])
    kf=chi2_contingency(kf_data)
    if kf[1]<0.05:
        print('Churn by {} 的卡方临界值是{:.2f},小于0.05,表明{}组间有显著性差异,可进行【交叉分析】'.format(x,kf[1],x),'\n')
    else:
        print('Churn by {} 的卡方临界值是{:.2f},大于0.05,表明{}组间无显著性差异,不可进行交叉分析'.format(x,kf[1],x),'\n')
#对 kf_var进行卡方检验
print('kf_var的卡方检验结果如下:','\n')
print(list(map(KF, kf_var)))

kf_var的卡方检验结果如下:

Churn by SeniorCitizen 的卡方临界值是0.00,小于0.05,表明SeniorCitizen组间有显著性差异,可进行【交叉分析】

Churn by Partner 的卡方临界值是0.00,小于0.05,表明Partner组间有显著性差异,可进行【交叉分析】

Churn by Dependents 的卡方临界值是0.00,小于0.05,表明Dependents组间有显著性差异,可进行【交叉分析】

Churn by MultipleLines 的卡方临界值是0.99,大于0.05,表明MultipleLines组间无显著性差异,不可进行交叉分析

Churn by InternetService 的卡方临界值是0.00,小于0.05,表明InternetService组间有显著性差异,可进行【交叉分析】

Churn by OnlineSecurity 的卡方临界值是0.00,小于0.05,表明OnlineSecurity组间有显著性差异,可进行【交叉分析】

Churn by OnlineBackup 的卡方临界值是0.00,小于0.05,表明OnlineBackup组间有显著性差异,可进行【交叉分析】

Churn by DeviceProtection 的卡方临界值是0.00,小于0.05,表明DeviceProtection组间有显著性差异,可进行【交叉分析】

Churn by TechSupport 的卡方临界值是0.00,小于0.05,表明TechSupport组间有显著性差异,可进行【交叉分析】

Churn by StreamingTV 的卡方临界值是0.00,小于0.05,表明StreamingTV组间有显著性差异,可进行【交叉分析】

Churn by StreamingMovies 的卡方临界值是0.00,小于0.05,表明StreamingMovies组间有显著性差异,可进行【交叉分析】

Churn by Contract 的卡方临界值是0.00,小于0.05,表明Contract组间有显著性差异,可进行【交叉分析】

Churn by PaperlessBilling 的卡方临界值是0.00,小于0.05,表明PaperlessBilling组间有显著性差异,可进行【交叉分析】

Churn by PaymentMethod 的卡方临界值是0.00,小于0.05,表明PaymentMethod组间有显著性差异,可进行【交叉分析】

从卡方检验的结果,kf_var包含的特征,组间都有显著性差异,可进行频数比较。

8.1.2 柱形图

频数比较–柱形图

plt.figure(figsize=(20,25))
a=0
for k in kf_var:
    a=a+1
    plt.subplot(4,4,a)
    plt.title('Churn BY '+ k)
    sns.countplot(x=k,hue='Churn',data=df)

结果如下:图片图片因为PaymentMethod的标签比较长,影响看图,所以单独画。

plt.xticks(rotation=45)
sns.countplot(x='PaymentMethod',hue='Churn',data=df)

图片可以直接从柱形图去判断对哪个维度对流失客户的影响大吗?不能,因为“样本不均衡”(流失客户样本占比26.5%,留存客户样本占比73.5%),基数不一样,故不能直接通过“频数”的柱形图去分析。
解决办法:交叉分析,且作同行百分比(’Churn’作为“行”)

8.1.3 交叉分析

print('ka_var列表中的维度与Churn交叉分析结果如下:','\n')
for i in kf_var:
    print('................Churn BY {}...............'.format(i))
    print(pd.crosstab(df['Churn'],df[i],normalize=0),'\n'#交叉分析,同行百分比

ka_var列表中的维度与Churn交叉分析结果如下:图片【SeniorCitizen 分析】:年轻用户 在流失、留存,两个标签的人数占比都高。图片【Parter 分析】:单身用户更容易流失。图片【Denpendents 分析】:经济不独立的用户更容易流失。图片【MultipleLines 分析】:是否开通MultipleLines,对留存和流失都没有明显的促进作用。图片【InternetService 分析】:办理了 “Fiber optic 光纤网络”的客户容易流失。图片【OnlineSecurity 分析】:没开通“网络安全服务”的客户容易流失。图片【OnlineBackup 分析】:没开通“在线备份服务”的客户容易流失。图片【DeviceProtection 分析】:没开通“设备保护业务”的用户比较容易流失图片【TechSupport 分析】:没开通“技术支持服务”的用户容易流失。图片【StreamingTV 分析】:是否开通“网络电视”服务,对用户留存、流失,没有明显的促进作用。图片【StreamingMovies 分析】:是否开通“网络电影”服务,对用户留存、流失,没有明显的促进作用。图片【Contract 分析】逐月签订合同的用户最容易流失。图片

因为”Churn BY PaymentMethod”打印出来显示不全,故我就从临时表将“交叉表”给截图出来了: 图片【分析】使用“电子支票”支付的人更容易流失。

8.2 均值比较

组间有显著性差异,均值比较才有意义。
显著性检验,先通过了齐性检验,再通过方差分析,最后才能做均值比较。

8.2.0 齐性检验,方差分析

#自定义齐性检验 & 方差分析 函数
def ANOVA(x):
    li_index=list(df['Churn'].value_counts().keys())
    args=[]
    for i in li_index:
        args.append(df[df['Churn']==i][x])
    w,p=stats.levene(*args)             #齐性检验
    if p<0.05:
        print('警告:Churn BY {}的P值为{:.2f},小于0.05,表明齐性检验不通过,不可作方差分析'.format(x,p),'\n')
    else:
        f,p_value=stats.f_oneway(*args) #方差分析
        print('Churn BY {} 的f值是{},p_value值是{}'.format(x,f,p_value),'\n')
        if p_value<0.05:
            print('Churn BY {}的均值有显著性差异,可进行均值比较'.format(x),'\n')
        else:
            print('Churn BY {}的均值无显著性差异,不可进行均值比较'.format(x),'\n')

对MonthlyCharges、TotalCharges维度分别进行齐性检验和方差分析

print('MonthlyCharges、TotalCharges的齐性检验 和方差分析结果如下:','\n')
ANOVA('MonthlyCharges')
ANOVA('TotalCharges')

【输出】:
MonthlyCharges、TotalCharges的齐性检验 和方差分析结果如下:

警告:Churn BY MonthlyCharges的P值为0.00,小于0.05,表明齐性检验不通过,不可作方差分析

警告:Churn BY TotalCharges的P值为0.00,小于0.05,表明齐性检验不通过,不可作方差分析

8.3 总结

用户出现以下特征比较容易流失:

  • SeniorCitizen:青年人
  • Partner :单身
  • Dependents :无经济独立
  • InternetService:开通了 “Fiber optic 光纤网络”
  • OnlineSecurity:没开通“网络安全服务”
  • OnlineBackup:没开通“在线备份业务”
  • DeviceProtection:没开通通了“设备保护业务
  • TechSupport:没开通“技术支持服务”
  • Contract:“按月”签订合同方式
  • PaperlessBilling:开通电子账单
  • PaymentMethod:使用“电子支票”支付的人

我们可以在SQL(数据库)上找有以上特征的客户,进行精准营销,即可以降低用户流失。虽然特征选得越多,越精确,但覆盖的人群也会越少。故,我们还需要计算“特征”的【重要性】,将最为重要的几个特征作为筛选条件。

计算特征的【重要性】,是“分类视角”,接下来我们会挑选常见的分类模型,进行批量训练,然后挑出得分最高的模型,进一步计算“特征重要性”。

9.实战教程-特征工程

9.1 提取特征

有前面的流失率与各个维度的相关系数柱状图可知:
流失率与gender(性别)、PhoneService(电话服务)相关性几乎为0,可以筛选掉,而customerID是随机数,不影响建模,故可以筛选掉。最终得到特征 churn_var

churn_var=df.iloc[:,2:20]
churn_var.drop("PhoneService",axis=1, inplace=True)
churn_var.head()

结果如下:图片

9.2 处理“量纲差异大”

“MonthlyCharges”、”TotalCharges”两个特征跟其他特征相比,量纲差异大。图片处理量纲差异大,有两种方法:

  1. 标准化
  2. 离散化

以上两种方法,哪个能让模型精度提高,就选哪个。根据模型的最后得分,我选了“离散化”来处理量纲差异大。

9.2.1 标准化

scaler = StandardScaler(copy=False)
scaler.fit_transform(churn_var[['MonthlyCharges','TotalCharges']])  #fit_transform拟合数据
churn_var[['MonthlyCharges','TotalCharges']]=scaler.transform(churn_var[['MonthlyCharges','TotalCharges']])  #transform标准化

print(churn_var[['MonthlyCharges','TotalCharges']].head() )#查看拟合结果

【输出】图片

9.2.2 特征离散化

特征离散化后,模型易于快速迭代,且模型更稳定。

1、处理’MonthlyCharges’:

#查看'MonthlyCharges'列的4分位
churn_var['MonthlyCharges'].describe() 

图片离散操作
18.25=<churn_var[‘MonthlyCharges’]<=35.5,标记 “1”
35.5<churn_var[‘MonthlyCharges’]<=70.35,标记 “2”
70.35<churn_var[‘MonthlyCharges’]<=89.85,标记 “3”
89.85=<churn_varf[‘MonthlyCharges’]<=118.75,标记“4”

#用四分位数进行离散
churn_var['MonthlyCharges']=pd.qcut(churn_var['MonthlyCharges'],4,labels=['1','2','3','4'])
churn_var['MonthlyCharges'].head()

结果如下:图片2、处理’TotalCharges’:

#查看'TotalCharges'列的4分位
churn_var['TotalCharges'].describe()

结果如下:图片

离散操作:
18=<churn_var[‘TotalCharges’]<=402,标记 “1”
402<churn_var[‘TotalCharges’]<=1397,标记 “2”
1397<churn_var[‘TotalCharges’]<=3786,标记 “3”
3786<churn_var[‘TotalCharges’]<=8684,标记 “4”

#用四分位数进行离散 
churn_var['TotalCharges']=pd.qcut(churn_var['TotalCharges'],4,labels=['1','2','3','4'])
churn_var['TotalCharges'].head()

【输出】图片

9.3 分类数据转换成“整数编码”

9.3.1 查看churn_var中分类变量的label(标签)

#自定义函数获取分类变量中的label
def Label(x):
    print(x,"--" ,churn_var[x].unique()) 
#筛选出数据类型为“object”的数据点
df_object=churn_var.select_dtypes(['object']) 
print(list(map(Label,df_object)))

结果如下:图片图片

通过同行百分比的“交叉分析”发现,label “No internetserive”的人数占比在以下特征[OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingTV]都是惊人的一致,故我们可以判断label “No internetserive”不影响流失率。
因为这6项增值服务,都是需要开通“互联网服务”的基础上才享受得到的。不开通“互联网服务”视为没开通这6项增值服务,故可以将 6个特正中的“No internetserive” 并到 “No”里面。

churn_var.replace(to_replace='No internet service',value='No',inplace=True)

而特征MultipleLines的“ No phoneservice”在流失客户、留存客户样本中的人数占比几乎接近,且比较少,故可以将“ No phoneservice”并到“No”。

churn_var.replace(to_replace='No phone service',value='No',inplace=True)
df_object=churn_var.select_dtypes(['object']) 
print(list(map(Label,df_object.columns)))

结果如下:图片

9.3.2 整数编码

整数编码的方法有两种:

  1. sklearn中的LabelEncoder()
  2. pandas中的factorize
    此处选用 LabelEncoder()
def labelencode(x):
    churn_var[x] = LabelEncoder().fit_transform(churn_var[x])
for i in range(0,len(df_object.columns)):
    labelencode(df_object.columns[i])
print(list(map(Label,df_object.columns)))

结果如下:图片

9.4 处理“样本不均衡”

分拆变量

x=churn_var
y=df['Churn'].values
print('抽样前的数据特征',x.shape)
print('抽样前的数据标签',y.shape)

【输出】
抽样前的数据特征 (7043, 17)
抽样前的数据标签 (7043,)

处理样本不均衡常用的方式有三种:

  1. 分层抽样
  2. 过抽样
  3. 欠抽样

笔者先后尝试了“分层抽样”和“欠抽样”,前者最终得到的模型中精度最高的是0.63,而后者最终得到的模型中精度最低是0.78,最高是0.84。所以说“抽样方式”的选择极为重要,大家要在这里多试错。

分层抽样

sss=StratifiedShuffleSplit(n_splits=5, test_size=0.2, random_state=0)
print(sss)
print("训练数据和测试数据被分成的组数:",sss.get_n_splits(x,y))
# 分拆训练集和测试集
for train_index, test_index in sss.split(x, y):
    print("train:", train_index, "test:", test_index)
    x_train,x_test=x.iloc[train_index], x.iloc[test_index]
    y_train,y_test=y[train_index], y[test_index]

“过抽样”让模型精度更高,故我选“过抽样”。

from imblearn.over_sampling import SMOTE
model_smote=SMOTE()
x,y=model_smote.fit_sample(x,y)
x=pd.DataFrame(x,columns=churn_var.columns)
#分拆数据集:训练集 和 测试集
x_train,x_test,y_train,y_test=train_test_split(x,y,test_size=0.3,random_state=0)

输出数据集大小

print('过抽样数据特征:', x.shape,
      '训练数据特征:',x_train.shape,
      '测试数据特征:',x_test.shape)

print('过抽样后数据标签:', y.shape,
      '   训练数据标签:',y_train.shape,
      '   测试数据标签:',y_test.shape)

【输出】

过抽样后数据特征: (10348, 17) 训练数据特征: (7243, 17) 测试数据特征: (3105, 17)
过抽样后数据标签: (10348,)    训练数据标签: (7243,)    测试数据标签: (3105,)

10.实战教程-数据建模

使用分类算法

Classifiers=[["Random Forest",RandomForestClassifier()],
             ["Support Vector Machine",SVC()],
             ["LogisticRegression",LogisticRegression()],
             ["KNN",KNeighborsClassifier(n_neighbors=5)],
             ["Naive Bayes",GaussianNB()],
             ["Decision Tree",DecisionTreeClassifier()],
             ["AdaBoostClassifier", AdaBoostClassifier()],
             ["GradientBoostingClassifier", GradientBoostingClassifier()],
             ["XGB", XGBClassifier()],
             ["CatBoost", CatBoostClassifier(logging_level='Silent')]  
]

训练模型

Classify_result=[]
names=[]
prediction=[]
for name,classifier in Classifiers:
    classifier=classifier
    classifier.fit(x_train,y_train)
    y_pred=classifier.predict(x_test)
    recall=recall_score(y_test,y_pred)
    precision=precision_score(y_test,y_pred)
    f1score = f1_score(y_test, y_pred)
    class_eva=pd.DataFrame([recall,precision,f1score])
    Classify_result.append(class_eva)
    name=pd.Series(name)
    names.append(name)
    y_pred=pd.Series(y_pred)
    prediction.append(y_pred)

11.模型评估

names=pd.DataFrame(names)
names=names[0].tolist()
result=pd.concat(Classify_result,axis=1)
result.columns=names
result.index=["recall","precision","f1score"]
result

【输出】
特征工程,采用“标准化”处理量纲差异,采用“分层抽样”处理样本不均衡。
最终模型精度得分,最高分是0.63,是“朴素贝叶斯”模型图片

特征工程,采用“离散化”处理量纲差异,采用“过抽样”处理样本不均衡。
最终模型精度得分,最高分是0.84,是“XGB”模型图片

12.基于“XGB”模型输出特征重要性

笔者尝试了两个算法分别输出“特征重要性”:CatBoost算法 和 XGB 算法

  • CatBoost算法
model = CatBoostClassifier()
model.fit(x_train,y_train,eval_set=(x_test, y_test),plot=True)
#特征重要性可视化
catboost=pd.DataFrame(columns=['feature','feature_importance'])
catboost['feature']=model.feature_names_
catboost['feature_importance']=model.feature_importances_
catboost=catboost.sort_values('feature_importance',ascending=False#降序排列
plt.figure(figsize=(10,10))
plt.title('特征重要性')
sns.barplot(x='feature_importance',y='feature',data=catboost)

【输出】图片-XGB 算法

model_xgb= XGBClassifier()
model_xgb.fit(x_train,y_train)
from xgboost import plot_importance
plot_importance(model_xgb,height=0.5)
plt.show()

【输出】图片由于 XGB算法精度得分最高,故我们以XGB得到的“特征重要性”进行分析。
【分析】

  1. 第一重要特征:tenure
plt.figure(figsize=(20,4))
sns.countplot(x='tenure',hue='Churn',data=df)

【输出】图片【分析】
由图可知,流失客户集中在1-5号职位,运营团队需要重点关注1-5号职位。

  1. 第二重要特征:PaymentMethod

【分析】
使用“电子支票”支付的人更容易流失。

  1. 第三重要特征:MonthlyCharges
    查看流失用户、留存用户在付费方面的偏好:
    ‘MonthlyCharges’、’TotalCharges’,离散化后,可进行卡方检验,然后交叉分析。
  • 卡方检验:’MonthlyCharges’、’TotalCharges’
df['MonthlyCharges-']=churn_var['MonthlyCharges']
df['TotalCharges-']=churn_var['TotalCharges']
print('kf_var的卡方检验结果如下:','\n')
KF('MonthlyCharges-')
KF('TotalCharges-')

【输出】
kf_var的卡方检验结果如下:

Churn by MonthlyCharges 的卡方临界值是0.00,小于0.05,表明MonthlyCharges组间有显著性差异,可进行【交叉分析】

Churn by TotalCharges 的卡方临界值是0.00,小于0.05,表明TotalCharges组间有显著性差异,可进行【交叉分析】

  • 交叉分析
for i in ['MonthlyCharges','TotalCharges']:
    print('................Churn BY {}...............'.format(i))
    print(pd.crosstab(df['Churn'],df[i],normalize=0),'\n')

【输出】图片18.25=<churn_var[‘MonthlyCharges’]<=35.5,标记 “1”
35.5<churn_var[‘MonthlyCharges’]<=70.35,标记 “2”
70.35<churn_var[‘MonthlyCharges’]<=89.85,标记 “3”
89.85=<churn_varf[‘MonthlyCharges’]<=118.75,标记“4”
【分析】
月付费70.35–118.75元的用户更容易流失图片18=<churn_var[‘TotalCharges’]<=402,标记 “1”
402<churn_var[‘TotalCharges’]<=1397,标记 “2”
1397<churn_var[‘TotalCharges’]<=3786,标记 “3”
3786<churn_var[‘TotalCharges’]<=8684,标记 “4”
【分析】
总付费18–1397元的用户更容易流失

基于”MonthlyCharges”和“TotalCharges”画四分图:
求两个维度的均值

 print('MonthlyCharges的均值是{:.2f},TotalCharges的均值是{:.2f}'.format(df['MonthlyCharges'].mean(),df['TotalCharges'].mean()))

流失客户四分图:

df_1=df[df['Churn']==1#流失客户
df_0=df[df['Churn']==0#留存客户
plt.figure(figsize=(10,10))   
sns.scatterplot('MonthlyCharges','TotalCharges',hue='Churn', palette=plt.cm.RdYlBu,data=df_1)
plt.axhline(y=df['TotalCharges'].mean(),ls="-",c="k")
plt.axvline(x=df['MonthlyCharges'].mean(),ls="-",c="green")

【输出】图片【分析】
四分图的右下区域,流失客户比较集中,即总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失。

留存客户四分图

plt.figure(figsize=(10,10)) 
sns.scatterplot('MonthlyCharges','TotalCharges',hue='Churn', palette=plt.cm.RdYlBu_r,data=df_0)
plt.axhline(y=df['TotalCharges'].mean(),ls="-",c="k")
plt.axvline(x=df['MonthlyCharges'].mean(),ls="-",c="green")

【输出】图片【结论】
综合“ 统计分析” 和 “XGB算法输出特征重要性” 得出流失客户有以下特征(依特征重要性从大到小排列):

  1. tenure:1-5号职位的用户比较容易流失
  2. PaymentMethod:使用“电子支票”支付的人
  3. MonthlyCharges 、TotalCharges:总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失
  4. PaperlessBilling:开通电子账单
  5. Partner:单身
  6. OnlineBackup:没开通“在线备份业务”
  7. InternetService:开通了 “Fiber optic 光纤网络”
  8. TechSupport:没开通“技术支持服务”
  9. DeviceProtection:没开通通了“设备保护业务
  10. OnlineSecurity:没开通“网络安全服务”
  11. Contract:“按月”签订合同方式
  12. Dependents:无经济独立
  13. SeniorCitizen :青年人
  14. TotalCharges:总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失

转自猫有九条命。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化

Selenium 实战教程-爬取人民网留言板单进程、多线程、多进程版

本文讲解了如何通过selenium爬取人民网留言板留言,并从单进程、多线程、多进程版角度进行爬虫性能优化,是一篇优秀的 selenium 实战教程。

第一节:单进程版+selenium实战教程

一、项目概述

1.项目说明

本项目主要是对领导留言板内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。网站链接是http://liuyan.people.com.cn/home?p=0,任意选择一条留言点击进入详情页后,如下Selenium 实战教程-爬取人民网留言板单进程、多线程、多进程版对于图中标出的数据,均要进行爬取,以此构成一条留言的组成部分。

2.环境配置

(1)Python:3.x
(2)所需库:

  • dateutil
    • 安装方法:
    • pip install python-dateutil
  • selenium
    • 安装方法:
    • pip install selenium

(3)模拟驱动:chromedriver,可点击https://download.csdn.net/download/CUFEECR/12193208进行下载Google浏览器80.0.3987.16版对应版本,或点击http://chromedriver.storage.googleapis.com/index.html下载与Google对应版本,并放入Python对应安装路径下的Scripts目录下。

二、项目实施

1.导入所需要的库

import csv
import os
import random
import re
import time

import dateutil.parser as dparser
from random import choice
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options

主要导入在爬取过程中需要用到的处理库和selenium中要用到的类。

2.全局变量和参数配置

## 时间节点
start_date = dparser.parse('2019-06-01')
## 浏览器设置选项
chrome_options = Options()
chrome_options.add_argument('blink-settings=imagesEnabled=false')

我们假设只爬取2019.6.1以后的留言,因为这之前的留言自动给好评,没有参考价值,因此设置时间节点,并禁止网页加载图片,减少对网络的带宽要求、提升加载速率。

3.产生随机时间和用户代理

def get_time():
    '''获取随机时间'''
    return round(random.uniform(36), 1)


def get_user_agent():
    '''获取随机用户代理'''
    user_agents = [
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
        "Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
        "Mozilla/5.0 (iPod; U; CPU iPhone OS 2_1 like Mac OS X; ja-jp) AppleWebKit/525.18.1 (KHTML, like Gecko) Version/3.1.1 Mobile/5F137 Safari/525.20",
        "Mozilla/5.0 (Linux;u;Android 4.2.2;zh-cn;) AppleWebKit/534.46 (KHTML,like Gecko) Version/5.1 Mobile Safari/10600.6.3 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
        "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    ]
    ## 在user_agent列表中随机产生一个代理,作为模拟的浏览器
    user_agent = choice(user_agents)
    return user_agent

产生随机时间并随机模拟浏览器用于访问网页,降低被服务器识别出是爬虫而被禁的可能。

4.获取领导的fid

def get_fid():
    '''获取所有领导id'''
    with open('url_fid.txt''r'as f:
        content = f.read()
        fids = content.split()
    return fids

每个领导都有一个fid用于区分,这里采用手动获取fid并保存到txt中,在开始爬取时再逐行读取。

5.获取领导所有留言链接

def get_detail_urls(position, list_url):
    '''获取每个领导的所有留言链接'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    drivertemp = webdriver.Chrome(options=chrome_options)
    drivertemp.maximize_window()
    drivertemp.get(list_url)
    time.sleep(2)
    ## 循环加载页面
    while True:
        datestr = WebDriverWait(drivertemp, 10).until(
            lambda driver: driver.find_element_by_xpath(
                '//*[@id="list_content"]/li[position()=last()]/h3/span')).text.strip()
        datestr = re.search(r'\d{4}-\d{2}-\d{2}', datestr).group()
        date = dparser.parse(datestr, fuzzy=True)
        print('正在爬取链接 --', position, '--', date)
        if date < start_date:
            break
        ## 模拟点击加载
        try:
            WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.ID, "show_more")))
            drivertemp.execute_script('window.scrollTo(document.body.scrollHeight, document.body.scrollHeight - 600)')
            time.sleep(get_time())
            drivertemp.execute_script('window.scrollTo(document.body.scrollHeight - 600, document.body.scrollHeight)')
            WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.XPATH, '//*[@id="show_more"]')))
            drivertemp.find_element_by_xpath('//*[@id="show_more"]').click()
        except:
            break
        time.sleep(get_time() - 1)
    detail_elements = drivertemp.find_elements_by_xpath('//*[@id="list_content"]/li/h2/b/a')
    ## 获取所有链接
    for element in detail_elements:
        detail_url = element.get_attribute('href')
        yield detail_url
    drivertemp.quit()

根据第4步提供的fid找到一个领导对应的所有留言的链接,由于领导的留言列表并未一次显示完,下方有一个加载更多按钮,如下图片每次需要进行点击向下加载,所以要模拟点击的操作,向下滑动,等完全加载后再次点击,直到底部。函数返回值时,不是一次返回一个列表,而是通过yield关键字生成生成器,按照程序执行的进度生成url,可以减少内存的压力。

6.获取留言详情

def get_message_detail(driver, detail_url, writer, position):
    '''获取留言详情'''
    print('正在爬取留言 --', position, '--', detail_url)
    driver.get(detail_url)
    ## 判断,如果没有评论则跳过
    try:
        satis_degree = WebDriverWait(driver, 2.5).until(
            lambda driver: driver.find_element_by_class_name("sec-score_firstspan")).text.strip()
    except:
        return
    ## 获取留言各部分内容
    message_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/h3/span")).text
    message_date = re.search(r'\d{4}-\d{2}-\d{2}', message_date_temp).group()
    message_datetime = dparser.parse(message_date, fuzzy=True)
    if message_datetime < start_date:
        return
    message_title = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_class_name("context-title-text")).text.strip()
    label_elements = WebDriverWait(driver, 2.5).until(lambda driver: driver.find_elements_by_class_name("domainType"))
    try:
        label1 = label_elements[0].text.strip()
        label2 = label_elements[1].text.strip()
    except:
        label1 = ''
        label2 = label_elements[0].text.strip()
    message_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/p")).text.strip()
    replier = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[1]/i")).text.strip()
    reply_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/p")).text.strip()
    reply_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[2]/em")).text
    reply_date = re.search(r'\d{4}-\d{2}-\d{2}', reply_date_temp).group()
    review_scores = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_elements_by_xpath("/html/body/div[8]/ul/li[2]/h4[1]/span/span/span"))
    resolve_degree = review_scores[0].text.strip()[:-1]
    handle_atti = review_scores[1].text.strip()[:-1]
    handle_speed = review_scores[2].text.strip()[:-1]
    review_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/p")).text.strip()
    is_auto_review = '是' if (('自动默认好评' in review_content) or ('默认评价' in review_content)) else '否'
    review_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/h4[2]/em")).text
    review_date = re.search(r'\d{4}-\d{2}-\d{2}', review_date_temp).group()
    ## 存入CSV文件
    writer.writerow(
        [position, message_title, label1, label2, message_date, message_content, replier, reply_content, reply_date,
         satis_degree, resolve_degree, handle_atti, handle_speed, is_auto_review, review_content, review_date])

我们只需要有评论的留言,因此在最开始要过滤掉没有评论的留言。然后通过xpath、class_name等方式定位到相应的元素获取留言的各个部分的内容,每条留言共保存14个内容,并保存到csv中。

7.获取并保存领导所有留言

def get_officer_messages(index, fid):
    '''获取并保存领导的所有留言'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    driver = webdriver.Chrome(options=chrome_options)
    list_url = "http://liuyan.people.com.cn/threads/list?fid={}##state=4".format(fid)
    driver.get(list_url)
    position = WebDriverWait(driver, 10).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[4]/i")).text
    ## time.sleep(get_time())
    print(index, '-- 正在爬取 --', position)
    start_time = time.time()
    ## encoding='gb18030'
    csv_name = position + '.csv'
    ## 文件存在则删除重新创建
    if os.path.exists(csv_name):
        os.remove(csv_name)
    with open(csv_name, 'a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for detail_url in get_detail_urls(position, list_url):
            get_message_detail(driver, detail_url, writer, position)
            time.sleep(get_time())
    end_time = time.time()
    crawl_time = int(end_time - start_time)
    crawl_minute = crawl_time // 60
    crawl_second = crawl_time % 60
    print(position, '已爬取结束!!!')
    print('该领导用时:{}分钟{}秒。'.format(crawl_minute, crawl_second))
    driver.quit()
    time.sleep(5)

获取该领导的职位信息并为该领导创建一个独立的csv用于保存提取到的留言信息,调用get_message_detail()方法获取每条留言的具体信息并保存,计算出每个领导的执行时间。

8.合并文件

def merge_csv():
    '''将所有文件合并'''
    file_list = os.listdir('.')
    csv_list = []
    for file in file_list:
        if file.endswith('.csv'):
            csv_list.append(file)
    ## 文件存在则删除重新创建
    if os.path.exists('DATA.csv'):
        os.remove('DATA.csv')
    with open('DATA.csv''a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for csv_file in csv_list:
            with open(csv_file, 'r', encoding='gb18030'as csv_f:
                reader = csv.reader(csv_f)
                line_count = 0
                for line in reader:
                    line_count += 1
                    if line_count != 1:
                        writer.writerow(
                            (line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7], line[8],
                             line[9], line[10], line[11], line[12], line[13], line[14], line[15]))

将爬取的所有领导的数据进行合并。

9.主函数调用

def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    for index, fid in enumerate(fids):
        try:
            get_officer_messages(index + 1, fid)
        except:
            get_officer_messages(index + 1, fid)
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()

主函数中先获取领导所有留言,再合并所有数据文件,完成整个爬取过程,并统计整个程序的运行时间,便于分析运行效率。

三、结果、分析及说明

1.结果说明

完整代码和测试执行结果可点击https://download.csdn.net/download/CUFEECR/12198734下载,仅供交流学习,请勿滥用。整个执行过程较长,因为是单线程的,必须要等一个领导数据爬取完毕之后才能爬取下一个,我选择了10个领导进行测试,在云服务器中的运行结果分别如下图片图片图片图片图片图片图片图片图片图片图片显然,整个运行时间将近5小时,效率相对较低,有很大的提升空间。最终得到了合并的DATA.csv图片

2.改进分析

(1)该版本的代码未实现自动爬取所有的fid,需要手动保存,是其中一点不足,可以在后期改进。(2)爬取留言详情页也是采用的selenium模拟,会降低请求效率,可以考虑用requests库请求。(3)该版本是单进程(线程)的,必须要一个领导爬取完之后才能进行下一个领导的爬取,效率较低,特别是留言较多的领导耗时很长,可以考虑使用多进程或多线程进行优化。

第二节:多线程版+selenium实战教程

一、项目概述

本项目主要是对领导留言板http://liuyan.people.com.cn/home?p=0内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。具体项目说明和环境配置可参考本系列的第一篇Python 爬取留言板留言(一):单进程版+selenium模拟。本篇在第一篇的基础上做了一些改进

  1. 采用了多线程,设定同时运行的线程的数量为3,线程数量适中,这样在保证在同一时刻有多个线程在执行爬取的同时,也能避免线程过多对内存、CPU和网络带宽的高要求,从而大大降低了整体运行时间,这是该项目的主要改进。
  2. 对异常处理进行了优化,之前异常处理是放在获取一个领导对应的所有的留言链接函数里的,当获取不到加载更多按钮并且超时时就会抛出异常,这样使得如果异常发生在其他部分如获取留言详情时会被忽略,改进之后将其放入主函数,对于每一个领导都放入异常处理,这里涵盖了对该领导爬取时的所有操作,只要在任一环节报错都会捕捉到,同时增加了5层嵌套异常处理,增加了对出现异常的容忍度(在发生网络环境不好而加载不出页面、内存消耗较多而卡顿、被被爬取方反爬而不能爬取等情况时,可以对官员重新爬取以保证数据的完整程度和程序的健壮性)。

二、项目实施

由于在实现过程中有3种常用的方法实现多线程,因此对应也有3种不同的具体实现,这里选第1种进行说明:

1.导入所需要的库

import csv
import os
import random
import re
import time
import threading

import dateutil.parser as dparser
from random import choice
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options

主要导入在爬取过程中需要用到的处理库和selenium中要用到的类。

2.全局变量和参数配置

## 时间节点
start_date = dparser.parse('2019-06-01')
## 控制同时运行的线程数为3
sem = threading.Semaphore(3)
## 浏览器设置选项
chrome_options = Options()
chrome_options.add_argument('blink-settings=imagesEnabled=false')

我们假设只爬取2019.6.1以后的留言,因为这之前的留言自动给好评,没有参考价值,因此设置时间节点,同时在全局中设置同时运行的线程数为3,并禁止网页加载图片,减少对网络的带宽要求、提升加载速率。

3.产生随机时间和用户代理

def get_time():
    '''获取随机时间'''
    return round(random.uniform(36), 1)


def get_user_agent():
    '''获取随机用户代理'''
    user_agents = [
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
        "Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
        "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 2.0.50727; Media Center PC 6.0)",
        "Mozilla/5.0 (Linux; Android 5.1.1; Nexus 6 Build/LYZ28E) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.23 Mobile Safari/537.36",
        "Mozilla/5.0 (iPod; U; CPU iPhone OS 2_1 like Mac OS X; ja-jp) AppleWebKit/525.18.1 (KHTML, like Gecko) Version/3.1.1 Mobile/5F137 Safari/525.20",
        "Mozilla/5.0 (Linux;u;Android 4.2.2;zh-cn;) AppleWebKit/534.46 (KHTML,like Gecko) Version/5.1 Mobile Safari/10600.6.3 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
        "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    ]
    ## 在user_agent列表中随机产生一个代理,作为模拟的浏览器
    user_agent = choice(user_agents)
    return user_agent

产生随机时间并随机模拟浏览器用于访问网页,降低被服务器识别出是爬虫而被禁的可能。

4.获取领导的fid

def get_fid():
    '''获取所有领导id'''
    with open('url_fid.txt''r'as f:
        content = f.read()
        fids = content.split()
    return fids

每个领导都有一个fid用于区分,这里采用手动获取fid并保存到txt中,在开始爬取时再逐行读取。

5.获取领导所有留言链接

def get_detail_urls(position, list_url):
    '''获取每个领导的所有留言链接'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    drivertemp = webdriver.Chrome(options=chrome_options)
    drivertemp.maximize_window()
    drivertemp.get(list_url)
    time.sleep(2)
    ## 循环加载页面
    try:
        while WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.ID, "show_more"))):
            datestr = WebDriverWait(drivertemp, 10).until(
                lambda driver: driver.find_element_by_xpath(
                    '//*[@id="list_content"]/li[position()=last()]/h3/span')).text.strip()
            datestr = re.search(r'\d{4}-\d{2}-\d{2}', datestr).group()
            date = dparser.parse(datestr, fuzzy=True)
            print('正在爬取链接 --', position, '--', date)
            if date < start_date:
                break
            ## 模拟点击加载
            drivertemp.find_element_by_xpath('//*[@id="show_more"]').click()
            time.sleep(get_time())
        detail_elements = drivertemp.find_elements_by_xpath('//*[@id="list_content"]/li/h2/b/a')
        ## 获取所有链接
        for element in detail_elements:
            detail_url = element.get_attribute('href')
            yield detail_url
        drivertemp.quit()
    except TimeoutException:
        drivertemp.quit()
        get_detail_urls(position, list_url)

根据第4步提供的fid找到一个领导对应的所有留言的链接,由于领导的留言列表并未一次显示完,下方有一个加载更多按钮,如下图片每次需要进行点击向下加载,所以要模拟点击的操作,向下滑动,等完全加载后再次点击,直到底部,有可能会滑倒页面最底部不再显示按钮或者由于被反爬或网络不好而未加载出来,此时定位元素会超时,增加异常处理,递归调用。函数返回值时,不是一次返回一个列表,而是通过yield关键字生成生成器,按照程序执行的进度生成url,可以减少内存的压力。

6.获取留言详情

def get_message_detail(driver, detail_url, writer, position):
    '''获取留言详情'''
    print('正在爬取留言 --', position, '--', detail_url)
    driver.get(detail_url)
    ## 判断,如果没有评论则跳过
    try:
        satis_degree = WebDriverWait(driver, 2.5).until(
            lambda driver: driver.find_element_by_class_name("sec-score_firstspan")).text.strip()
    except:
        return
    ## 获取留言各部分内容
    message_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/h3/span")).text
    message_date = re.search(r'\d{4}-\d{2}-\d{2}', message_date_temp).group()
    message_datetime = dparser.parse(message_date, fuzzy=True)
    if message_datetime < start_date:
        return
    message_title = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_class_name("context-title-text")).text.strip()
    label_elements = WebDriverWait(driver, 2.5).until(lambda driver: driver.find_elements_by_class_name("domainType"))
    try:
        label1 = label_elements[0].text.strip()
        label2 = label_elements[1].text.strip()
    except:
        label1 = ''
        label2 = label_elements[0].text.strip()
    message_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/p")).text.strip()
    replier = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[1]/i")).text.strip()
    reply_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/p")).text.strip()
    reply_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[2]/em")).text
    reply_date = re.search(r'\d{4}-\d{2}-\d{2}', reply_date_temp).group()
    review_scores = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_elements_by_xpath("/html/body/div[8]/ul/li[2]/h4[1]/span/span/span"))
    resolve_degree = review_scores[0].text.strip()[:-1]
    handle_atti = review_scores[1].text.strip()[:-1]
    handle_speed = review_scores[2].text.strip()[:-1]
    review_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/p")).text.strip()
    is_auto_review = '是' if (('自动默认好评' in review_content) or ('默认评价' in review_content)) else '否'
    review_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/h4[2]/em")).text
    review_date = re.search(r'\d{4}-\d{2}-\d{2}', review_date_temp).group()
    ## 存入CSV文件
    writer.writerow(
        [position, message_title, label1, label2, message_date, message_content, replier, reply_content, reply_date,
         satis_degree, resolve_degree, handle_atti, handle_speed, is_auto_review, review_content, review_date])

我们只需要有评论的留言,因此在最开始要过滤掉没有评论的留言。然后通过xpath、class_name等方式定位到相应的元素获取留言的各个部分的内容,每条留言共保存14个属性,并保存到csv中。

7.获取并保存领导所有留言

user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    driver = webdriver.Chrome(options=chrome_options)
    list_url = "http://liuyan.people.com.cn/threads/list?fid={}##state=4".format(fid)
    driver.get(list_url)
    try:
        position = WebDriverWait(driver, 10).until(
            lambda driver: driver.find_element_by_xpath("/html/body/div[4]/i")).text
        print(index, '-- 正在爬取 --', position)
        start_time = time.time()
        csv_name = position + '.csv'
        ## 文件存在则删除重新创建
        if os.path.exists(csv_name):
            os.remove(csv_name)
        with open(csv_name, 'a+', newline='', encoding='gb18030'as f:
            writer = csv.writer(f, dialect="excel")
            writer.writerow(
                ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
                 '办理速度分''是否自动好评''评价内容''评价日期'])
            for detail_url in get_detail_urls(position, list_url):
                get_message_detail(driver, detail_url, writer, position)
                time.sleep(get_time())
        end_time = time.time()
        crawl_time = int(end_time - start_time)
        crawl_minute = crawl_time // 60
        crawl_second = crawl_time % 60
        print(position, '已爬取结束!!!')
        print('该领导用时:{}分钟{}秒。'.format(crawl_minute, crawl_second))
        driver.quit()
        time.sleep(5)
    except:
        driver.quit()
        get_officer_messages(index, fid)

获取该领导的职位信息并为该领导创建一个独立的csv用于保存提取到的留言信息,增加异常处理递归调用,调用get_message_detail()方法获取每条留言的具体信息并保存,计算出每个领导的执行时间。

8.合并文件

def merge_csv():
    '''将所有文件合并'''
    file_list = os.listdir('.')
    csv_list = []
    for file in file_list:
        if file.endswith('.csv'):
            csv_list.append(file)
    ## 文件存在则删除重新创建
    if os.path.exists('DATA.csv'):
        os.remove('DATA.csv')
    with open('DATA.csv''a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for csv_file in csv_list:
            with open(csv_file, 'r', encoding='gb18030'as csv_f:
                reader = csv.reader(csv_f)
                line_count = 0
                for line in reader:
                    line_count += 1
                    if line_count != 1:
                        writer.writerow(
                            (line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7], line[8],
                             line[9], line[10], line[11], line[12], line[13], line[14], line[15]))

将爬取的所有领导的数据进行合并。

9.主函数调用

多线程的实现主要在这部分,有3种方式实现:

  • 这通过threading.Semaphore()指定线程数量,后边在实现作为线程参数的函数时使用上下文处理器
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    thread_list = []
    ## 将所有线程加入线程列表,便于控制同时执行的线程数量
    for index, fid in enumerate(fids):
        t = threading.Thread(target=get_officer_messages, args=(index + 1, fid))
        thread_list.append([t, fid])
    for thread, fid in thread_list:
        ## 5层嵌套进行异常处理
        try:
            thread.start()
        except:
            try:
                thread.start()
            except:
                try:
                    thread.start()
                except:
                    try:
                        thread.start()
                    except:
                        try:
                            thread.start()
                        except:
                            ## 如果仍出现异常加入失败名单
                            print('该官员爬取失败,已存入失败名单,以备再爬')
                            if not os.path.exists('fid_not_success.txt'):
                                with open('fid_not_success.txt''a+'as f:
                                    f.write(fid)
                            else:
                                with open('fid_not_success.txt''a+'as f:
                                    f.write('\n' + fid)
                            continue
    for thread, fid in thread_list:
        thread.join()
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()
  • 通过concurrent.futures.ThreadPoolExecutor指定线程数量,并调用submit()函数实现线程的调用执行
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    with ThreadPoolExecutor(3as executor:
        for index, fid in enumerate(fids):
            ## 5层嵌套进行异常处理
            try:
                executor.submit(get_officer_messages, index + 1, fid)
            except:
                try:
                    executor.submit(get_officer_messages, index + 1, fid)
                except:
                    try:
                        executor.submit(get_officer_messages, index + 1, fid)
                    except:
                        try:
                            executor.submit(get_officer_messages, index + 1, fid)
                        except:
                            try:
                                executor.submit(get_officer_messages, index + 1, fid)
                            except:
                                ## 如果仍出现异常加入失败名单
                                print('该官员爬取失败,已存入失败名单,以备再爬')
                                if not os.path.exists('fid_not_success.txt'):
                                    with open('fid_not_success.txt''a+'as f:
                                        f.write(fid)
                                else:
                                    with open('fid_not_success.txt''a+'as f:
                                        f.write('\n' + fid)
                                continue
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()
  • 通过concurrent.futures.ThreadPoolExecutor指定线程数量,并调用map()函数实现函数和多个参数的映射来执行线程

def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    with ThreadPoolExecutor(3as executor:
        executor.map(get_officer_messages, range(1, len(fids) + 1), fids)
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()

主函数中先通过多线程获取领导所有留言,再合并所有数据文件,完成整个爬取过程,并统计整个程序的运行时间,便于分析运行效率。

三、结果、分析及说明

1.结果说明

3个完整代码和测试执行结果可点击https://download.csdn.net/download/CUFEECR/12199138下载,欢迎进行测试和交流学习,请勿滥用。整个执行过程相比于单线程大大缩短了时间,我选择了10个领导进行测试,它们的留言数量有差异,以便于发现多线程的优势,在云服务器中的运行结果分别如下图片图片图片图片图片图片图片图片图片图片图片运行时间缩短到不到1小时半左右,约等于第一篇单线程的三分之一,因为同一时刻有3个子线程执行,大大降低了运行时间,效率比之前提高很多,加入多线程之后,可以让运行时间较长和较短的相互补充,同时多个线程同时运行,在同一时刻爬取多个领导,很显然大大缩短了运行时间。最后得到了合并的DATA.csv:图片可以进一步总结多线程的优势 :

  • 易于调度
  • 提高并发性:通过线程可方便有效地实现并发性。进程可创建多个线程来执行同一程序的不同部分。
  • 开销少:创建线程比创建进程要快,所需开销很少。
  • 利于充分发挥多处理器的功能:通过创建多线程进程,每个线程在一个处理器上运行,从而实现应用程序的并发性,使每个处理器都得到充分运行。

2.改进分析

(1)该版本的代码仍未实现自动爬取所有的fid,需要手动保存,是其中一点不足,可以在后期改进。(2)爬取留言详情页仍然采用的selenium模拟,会降低请求效率,可以考虑用requests库请求。(3)该版本对于反爬的措施较弱,因此很多时候会出现异常,比如得到的页面不正常找不到对应的元素,请求时间延长等,可以在之后的版本加入进一步的防反爬措施,进一步增加代码的健壮性。

第三节:多进程版+selenium实战教程

一、项目概述

本项目主要是对领导留言板http://liuyan.people.com.cn/home?p=0内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。具体项目说明和环境配置可参考本系列的第一篇Python 爬取留言板留言(一):单进程版+selenium模拟
本篇在第二篇的基础上做了一个主要改进:
从多线程改变为多进程,设定同时运行的进程的数量为3,数量适中,这样在保证在同一时刻有多个进程在执行爬取的同时,也能避免进程过多对内存、CPU和网络带宽的高要求,从而大大降低了整体运行时间,这是该项目的主要改进。

二、项目实施

由于在实现过程中有2种常用的方法实现多线程,因此对应也有2种不同的具体实现。

1.导入所需要的库

import csv
import os
import random
import re
import time


import dateutil.parser as dparser
from random import choice
from multiprocessing import Pool
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options

主要导入在爬取过程中需要用到的处理库和selenium中要用到的类。

2.全局变量和参数配置

## 时间节点
start_date = dparser.parse('2019-06-01')
## 浏览器设置选项
chrome_options = Options()
chrome_options.add_argument('blink-settings=imagesEnabled=false')

我们假设只爬取2019.6.1以后的留言,因为这之前的留言自动给好评,没有参考价值,因此设置时间节点,并禁止网页加载图片,减少对网络的带宽要求、提升加载速率。

3.产生随机时间和用户代理

def get_time():
    '''获取随机时间'''
    return round(random.uniform(36), 1)


def get_user_agent():
    '''获取随机用户代理'''
    user_agents = [
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1",
        "Mozilla/5.0 (Linux; Android 5.1.1; Nexus 6 Build/LYZ28E) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.23 Mobile Safari/537.36",
        "Mozilla/5.0 (iPod; U; CPU iPhone OS 2_1 like Mac OS X; ja-jp) AppleWebKit/525.18.1 (KHTML, like Gecko) Version/3.1.1 Mobile/5F137 Safari/525.20",
        "Mozilla/5.0 (Linux;u;Android 4.2.2;zh-cn;) AppleWebKit/534.46 (KHTML,like Gecko) Version/5.1 Mobile Safari/10600.6.3 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
        "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    ]
    ## 在user_agent列表中随机产生一个代理,作为模拟的浏览器
    user_agent = choice(user_agents)
    return user_agent

产生随机时间并随机模拟浏览器用于访问网页,降低被服务器识别出是爬虫而被禁的可能。

4.获取领导的fid

def get_fid():
    '''获取所有领导id'''
    with open('url_fid.txt''r'as f:
        content = f.read()
        fids = content.split()
    return fids

每个领导都有一个fid用于区分,这里采用手动获取fid并保存到txt中,在开始爬取时再逐行读取。

5.获取领导所有留言链接

def get_detail_urls(position, list_url):
    '''获取每个领导的所有留言链接'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    drivertemp = webdriver.Chrome(options=chrome_options)
    drivertemp.maximize_window()
    drivertemp.get(list_url)
    time.sleep(2)
    ## 循环加载页面
    try:
        while WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.ID, "show_more"))):
            datestr = WebDriverWait(drivertemp, 10).until(
                lambda driver: driver.find_element_by_xpath(
                    '//*[@id="list_content"]/li[position()=last()]/h3/span')).text.strip()
            datestr = re.search(r'\d{4}-\d{2}-\d{2}', datestr).group()
            date = dparser.parse(datestr, fuzzy=True)
            print('正在爬取链接 --', position, '--', date)
            if date < start_date:
                break
            ## 模拟点击加载
            drivertemp.find_element_by_xpath('//*[@id="show_more"]').click()
            time.sleep(get_time())
        detail_elements = drivertemp.find_elements_by_xpath('//*[@id="list_content"]/li/h2/b/a')
        ## 获取所有链接
        for element in detail_elements:
            detail_url = element.get_attribute('href')
            yield detail_url
        drivertemp.quit()
    except TimeoutException:
        drivertemp.quit()
        get_detail_urls(position, list_url)

根据第4步提供的fid找到一个领导对应的所有留言的链接,由于领导的留言列表并未一次显示完,下方有一个加载更多按钮,如下图片每次需要进行点击向下加载,所以要模拟点击的操作,向下滑动,等完全加载后再次点击,直到底部,有可能会滑倒页面最底部不再显示按钮或者由于被反爬或网络不好而未加载出来,此时定位元素会超时,增加异常处理,递归调用。
函数返回值时,不是一次返回一个列表,而是通过yield关键字生成生成器,按照程序执行的进度生成url,可以减少内存的压力。

6.获取留言详情

def get_message_detail(driver, detail_url, writer, position):
    '''获取留言详情'''
    print('正在爬取留言 --', position, '--', detail_url)
    driver.get(detail_url)
    ## 判断,如果没有评论则跳过
    try:
        satis_degree = WebDriverWait(driver, 2.5).until(
            lambda driver: driver.find_element_by_class_name("sec-score_firstspan")).text.strip()
    except:
        return
    ## 获取留言各部分内容
    message_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/h3/span")).text
    message_date = re.search(r'\d{4}-\d{2}-\d{2}', message_date_temp).group()
    message_datetime = dparser.parse(message_date, fuzzy=True)
    if message_datetime < start_date:
        return
    message_title = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_class_name("context-title-text")).text.strip()
    label_elements = WebDriverWait(driver, 2.5).until(lambda driver: driver.find_elements_by_class_name("domainType"))
    try:
        label1 = label_elements[0].text.strip()
        label2 = label_elements[1].text.strip()
    except:
        label1 = ''
        label2 = label_elements[0].text.strip()
    message_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/p")).text.strip()
    replier = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[1]/i")).text.strip()
    reply_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/p")).text.strip()
    reply_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[2]/em")).text
    reply_date = re.search(r'\d{4}-\d{2}-\d{2}', reply_date_temp).group()
    review_scores = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_elements_by_xpath("/html/body/div[8]/ul/li[2]/h4[1]/span/span/span"))
    resolve_degree = review_scores[0].text.strip()[:-1]
    handle_atti = review_scores[1].text.strip()[:-1]
    handle_speed = review_scores[2].text.strip()[:-1]
    review_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/p")).text.strip()
    is_auto_review = '是' if (('自动默认好评' in review_content) or ('默认评价' in review_content)) else '否'
    review_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/h4[2]/em")).text
    review_date = re.search(r'\d{4}-\d{2}-\d{2}', review_date_temp).group()
    ## 存入CSV文件
    writer.writerow(
        [position, message_title, label1, label2, message_date, message_content, replier, reply_content, reply_date,
         satis_degree, resolve_degree, handle_atti, handle_speed, is_auto_review, review_content, review_date])

我们只需要有评论的留言,因此在最开始要过滤掉没有评论的留言。然后通过xpath、class_name等方式定位到相应的元素获取留言的各个部分的内容,每条留言共保存14个属性,并保存到csv中。

7.获取并保存领导所有留言

def get_officer_messages(index, fid):
    '''获取并保存领导的所有留言'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    driver = webdriver.Chrome(options=chrome_options)
    list_url = "http://liuyan.people.com.cn/threads/list?fid={}##state=4".format(fid)
    driver.get(list_url)
    try:
        position = WebDriverWait(driver, 10).until(
            lambda driver: driver.find_element_by_xpath("/html/body/div[4]/i")).text
        print(index, '-- 正在爬取 --', position)
        start_time = time.time()
        csv_name = position + '.csv'
        ## 文件存在则删除重新创建
        if os.path.exists(csv_name):
            os.remove(csv_name)
        with open(csv_name, 'a+', newline='', encoding='gb18030'as f:
            writer = csv.writer(f, dialect="excel")
            writer.writerow(
                ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
                 '办理速度分''是否自动好评''评价内容''评价日期'])
            for detail_url in get_detail_urls(position, list_url):
                get_message_detail(driver, detail_url, writer, position)
                time.sleep(get_time())
        end_time = time.time()
        crawl_time = int(end_time - start_time)
        crawl_minute = crawl_time // 60
        crawl_second = crawl_time % 60
        print(position, '已爬取结束!!!')
        print('该领导用时:{}分钟{}秒。'.format(crawl_minute, crawl_second))
        driver.quit()
        time.sleep(5)
    except:
        driver.quit()
        get_officer_messages(index, fid)

获取该领导的职位信息并为该领导创建一个独立的csv用于保存提取到的留言信息,增加异常处理递归调用,调用get_message_detail()方法获取每条留言的具体信息并保存,计算出每个领导的执行时间。

8.合并文件

def merge_csv():
    '''将所有文件合并'''
    file_list = os.listdir('.')
    csv_list = []
    for file in file_list:
        if file.endswith('.csv'):
            csv_list.append(file)
    ## 文件存在则删除重新创建
    if os.path.exists('DATA.csv'):
        os.remove('DATA.csv')
    with open('DATA.csv''a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for csv_file in csv_list:
            with open(csv_file, 'r', encoding='gb18030'as csv_f:
                reader = csv.reader(csv_f)
                line_count = 0
                for line in reader:
                    line_count += 1
                    if line_count != 1:
                        writer.writerow(
                            (line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7], line[8],
                             line[9], line[10], line[11], line[12], line[13], line[14], line[15]))

将爬取的所有领导的数据进行合并。

9.主函数调用

多线程的实现主要在这部分,有2种方式实现:

  • 通过for循环遍历所有任务和参数,并调用apply_async()函数将任务加入进程池
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    ## 创建进程池
    pool = Pool(3)
    ## 将任务加入进程池并传入参数
    for index, fid in zip(range(1, len(fids) + 1), fids):
        pool.apply_async(get_officer_messages, (index, fid))
    pool.close()
    pool.join()
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()
  • 通过调用map()函数将任务加入进程池并实现函数与参数的映射
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    ## 处理传入的参数,使之对应索引合并并且可迭代
    itera_merge = list(zip(range(1, len(fids) + 1), fids))
    ## 创建进程池
    pool = Pool(3)
    ## 将任务传入进程池并通过映射传入参数
    pool.map(get_officer_messages_enc, itera_merge)
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()

主函数中先通过多进程获取领导所有留言,再合并所有数据文件,完成整个爬取过程,并统计整个程序的运行时间,便于分析运行效率。

三、结果、分析及说明

1.结果说明

2个完整代码和测试执行结果可点击https://download.csdn.net/download/CUFEECR/12200752下载,欢迎进行测试和交流学习,请勿滥用
整个执行过程相比于单线程大大缩短了时间,我选择了10个领导进行测试,它们的留言数量有差异,以便于发现多线程的优势,在云服务器中的运行结果分别如下图片图片图片图片图片图片图片图片图片图片图片运行时间缩短到不到100分钟,与单进程相比大大缩短了时间、提高了效率,因为同一时刻有3个子进程执行。加入多进程之后,可以让运行时间较长和较短的相互补充,在任意时刻多个进程同时运行。但是也可以看出来与多线程相比,多进程的运行时间相对稍长,虽然差别不大,但是这可能就是性能的瓶颈。可能的原因是进程需要的资源更多,这对内存、CPU和网络的要求更高,从而对设备提出了更高的要求,有时可能设备性能跟不上程序要求,从而降低了效率。
最后得到了合并的DATA.csv:图片对于多线程和多进程的简单对比分析如下:
一个线程至少有一个进程,一个进程至少有一个线程,线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高,进程在执行过程中拥有独立的存储单元,而多个线程共享存储器,从而极大的提高了程序的运行效率。线程不能够独立执行,必须依存在进程中。

多线程:

  • 线程执行开销小(占用的资源非常少)但是不利于资源的管理和保护;
  • 如果需要共享数据,建议使用线程;
  • 适用于IO密集型任务(Web和文档读写等),遇到IO阻塞,速度远远小于CPU的运行速度,可以多开辟一些线程,有线程阻塞了,其他线程依然正常工作。

多进程:

  • 执行额开销比较大(占用的资源多),但是利于资源的管理和保护;
  • 适用于计算密集型(视频的译码编码和科学数据计算等)。

显然,在爬虫中应该偏向使用多线程。

2.改进分析

(1)该版本的代码仍未实现自动爬取所有的fid,需要手动保存,是其中一点不足,可以在后期改进。
(2)爬取留言详情页仍然采用的selenium模拟,会降低请求效率,可以考虑用requests库请求。
(3)该版本对于反爬的措施较弱,因此很多时候会出现异常,比如得到的页面不正常找不到对应的元素,请求时间延长等,可以在之后的版本加入进一步的防反爬措施,进一步增加代码的健壮性。

3.合法性说明

  • 本项目是为了学习和科研的目的,所有读者可以参考执行思路和程序代码,但不能用于恶意和非法目的(恶意攻击网站服务器、非法盈利等),如有违者请自行负责。
  • 本项目所获取的数据都是在进一步的分析之后用于对电子政务的实施改进,对政府的决策能起到一定的参考作用,并非于恶意抓取数据来攫取不正当竞争的优势,也未用于商业目的牟取不法利益,运行代码只是用几个fid进行测试,并非大范围地爬取,同时严格控制爬取的速率、争取不对服务器造成压力,如侵犯当事者(即被抓取的网络主体)的利益,请联系更改或删除。
  • 本项目是留言板爬取系列的第二篇,后期会继续更新,欢迎读者交流,以期不断改进。

作者:Corley

源自:快学python

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化

Prometheus 实战教程 + Grafana + Python — 实时监控东方财富人气榜股票

上次我们讲过普罗米修斯(prometheus)这个接近完美的监控系统,有很多读者不了解它到底要如何搭建、应用,需要一篇 Prometheus 实战教程。今天我们就结合普罗米修斯、Grafana和Python采集脚本,写一个小小的东方财富人气榜 TOP100监控系统。

跟着本文的教程耐心往下走,你可能只需要花30分钟便可完成环境的搭建,非常舒服,下面先介绍基本概念。

普罗米修斯(prometheus)上次我们已经使用一整篇文章介绍过了,它是一个开源监控报警系统和时序列数据库。如果你没有阅读过这篇文章,请花五分钟读一下:

Grafana 是一个开源的数据可视化网络应用程序平台。用户配置连接的数据源之后,Grafana可以在网络浏览器里显示数据图表和警告。

比如说我基于 普罗米修斯(prometheus) + node_exporter 监控主机性能指标,然后由Grafana构建主机实时监控仪表盘,它是长这样的:

至于东方财富人气榜,指的是这个:

它能将市场目前最活跃的一些股票提取出来,可供我们作为投资的一种参考。

而我们今天要做的,就是自己搭建一套监控系统,实时监控某只股票在TOP100上的排名变化。

1.Prometheus 安装教程

创建 Prometheus 安装目录并添加 promethus 用户:

PROM_PATH='/data/prometheus'
mkdir -p ${PROM_PATH}
mkdir -p ${PROM_PATH}/{data,conf,logs,bin}
useradd prometheus
cd /usr/local/src

下载解压 prometheus, 这里我们选用2021年5月18日更新的最新版 v2.27.1:

wget https://github.com/prometheus/prometheus/releases/download/v2.27.1/prometheus-2.27.1.linux-amd64.tar.gz
tar -xvf prometheus-2.27.1.linux-amd64.tar.gz
cd prometheus-2.27.1.linux-amd64/
cp prometheus promtool ${PROM_PATH}/bin/
cp prometheus.yml ${PROM_PATH}/conf/
chown -R prometheus.prometheus /data/prometheus

设置环境变量:

cat >> /etc/profile <<EOF
PATH=/data/prometheus/bin:$PATH:$HOME/bin
EOF

将 Promethus 配置为系统服务之一,以便使用 systemctl 命令管控服务:

cat >>/etc/systemd/system/prometheus.service <<EOF
[Unit]
Description=Prometheus
Documentation=https://prometheus.io/
After=network.target

[Service]
Type=simple
User=prometheus
ExecStart=/data/prometheus/bin/prometheus --config.file=/data/prometheus/conf/prometheus.yml --storage.tsdb.path=/data/prometheus/data --storage.tsdb.retention=90d
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

现在使用下面的systemctl命令重新加载systemd系统,并查看服务是否启动:

systemctl daemon-reload
systemctl enable prometheus
systemctl start prometheus
systemctl status prometheus

看到 running 状态说明一切正常:

记得开放9090端口,这样才可以访问 Prometheus 的 Web 端,访问 http://服务器IP:9090 查看得到 Prometheus Web界面,说明安装成功:

2.Grafana 安装教程

Grafana 我们也使用最新的 8.0.1 版本,安装方式如下:

CentOS系列系统使用以下命令安装:

cd /usr/local/src
wget https://dl.grafana.com/oss/release/grafana-8.0.1-1.x86_64.rpm
sudo yum localinstall grafana-6.5.2-1.x86_64.rpm

Ubuntu和Debian系列系统使用以下命令安装:

cd /usr/local/src
sudo apt-get install -y adduser libfontconfig1
wget https://dl.grafana.com/oss/release/grafana_8.0.1_amd64.deb
sudo dpkg -i grafana_8.0.1_amd64.deb

然后启动系统服务即可:

systemctl start grafana-server
systemctl status grafana-server

看到 running 状态说明一切正常:

记得开放3000端口,这样你才可以访问你的Grafana: http://你的服务器IP:3000 如下所示:

输入用户名,密码登录系统。用户名与密码都是”admin”,如果能打开页面则已经安装成功了。

3.初尝Grafana+Prometheus实战教程

为了初步尝试这套系统,我们可以通过简单的采集主机性能数据开始。Node_exporter是一个Prometheus推出的官方主机性能采集工具。通过它我们能很方便地输出主机性能指标到Prometheus.

3.1 下载安装Node_Exporter:

NODE_PATH='/data/prometheus/node_exporter/'
cd /usr/local/src/
mkdir -p ${NODE_PATH}
wget https://github.com/prometheus/node_exporter/releases/download/v1.1.2/node_exporter-1.1.2.linux-amd64.tar.gz
tar -xvf node_exporter-1.1.2.linux-amd64.tar.gz
cp node_exporter-1.1.2.linux-amd64/node_exporter ${NODE_PATH}
chown -R prometheus.prometheus ${NODE_PATH}

配置node_exporter为系统服务:

cat > /lib/systemd/system/node_exporter.service <<EOF
[Unit]
Description=node_exporter
Documentation=https://prometheus.io/
After=network.target
 
[Service]
Type=simple
User=prometheus
ExecStart=/data/prometheus/node_exporter/node_exporter
Restart=on-failure
 
[Install]
WantedBy=multi-user.target
EOF

现在使用systemctl命令重新加载系统命令,并查看服务是否启动:

systemctl daemon-reload
systemctl enable node_exporter
systemctl start node_exporter
systemctl status node_exporter

看到如下图的状态说明启动成功。

放行9100端口,访问http://你的服务器地址:9100/metrics 看到如下指标页面说明安装成功:

配置 prometheus.yaml (ubuntu 下为 prometheus.yml), 让 prometheus 采集 node_exporter 输出的指标数据:

vim /data/prometheus/conf/prometheus.yml

配置如下:

# my global config
global:
  scrape_interval:     15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
  alertmanagers:
  - static_configs:
    - targets:
      # - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'prometheus'

    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.

    static_configs:
    - targets: ['localhost:9090']

   # 主要是新增了node_exporter的job,如果有多个node_exporter,在targets数组后面加即可

  - job_name: 'node_exporter'
    static_configs:
      - targets: ['localhost:9100']

保存后重启prometheus:

systemctl restart prometheus

最后配置Grafana:

然后选择 Prometheus 数据源:

输入 Prometheus url 然后点击 save&test 保存:

然后导入官方仪表盘,官方提供的模板号为8919:

然后你就能看见本机非常漂亮的性能指标数据仪表盘了。

不看不知道,一看吓一跳,看来我需要升级这台机器的内存了。

4.编写采集脚本

为了能够采集东方财富人气榜前100名,我们需要用Python编写一个人气榜采集脚本,并使其像 node_exporter 一样输出指标信息:

为了达到这个目的,我们必须安装 prometheus_client 模块:

pip3 install prometheus_client

获取股票排名的代码如下:

# Python实用宝典
# 2021-06-13
# 文件名: fetch_stock.py
import time
import requests
from prometheus_client import start_http_server, CollectorRegistry, Gauge


reg = CollectorRegistry()
gauge = Gauge(
    'rank', '人气榜排名',
    ['stock_id'], registry=reg
)


def process_request():
    url = "https://emappdata.eastmoney.com/stockrank/getAllCurrentList"
    kwargs = {
        "appId": "appId01",
        "pageNo": 1,
        "pageSize": "100",
    }
    result = requests.post(url, json=kwargs).json()
    for i in result.get("data", []):
        gauge.labels(stock_id=i["sc"]).set(i["rk"])
    time.sleep(60)


if __name__ == '__main__':
    start_http_server(8000, registry=reg)
    while True:
        process_request()

这里我们只捕获人气榜前100名,并通过Prometheus客户端的start_http_server开启一个Web服务,这样你通过http服务访问8000端口的时候就能输出这些指标。

为了让其能持续输出指标数据,我们要用nohup使其成为一个常驻进程:

nohup python3 fetch_stock.py &

开放8000端口,访问 http://你的服务器IP:8000 就能查看输出的指标:

5.应用采集脚本

同配置Node_exporter一样,我们需要将自己编写好的采集脚本落入Prometheus,配置prometheus.yaml:

配置 prometheus.yaml, 让 prometheus 采集 node_exporter 输出的指标数据:

#(CentOS) vim /data/prometheus/conf/prometheus.yaml
vim /data/prometheus/conf/prometheus.yml # ubuntu

配置如下:

# my global config
global:
  scrape_interval:     15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
  alertmanagers:
  - static_configs:
    - targets:
      # - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'prometheus'

    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.

    static_configs:
    - targets: ['localhost:9090']

   # 主要是新增了node_exporter的job,如果有多个node_exporter,在targets数组后面加即可
  - job_name: 'node_exporter'
    static_configs:
      - targets: ['localhost:9100']

   # 新增我们的Python股票采集脚本
  - job_name: 'hot_list'
    static_configs:
      - targets: ['localhost:8000']

保存后重启prometheus:

systemctl restart prometheus

最后配置Grafana, 选择新建一个dashboard:

然后选择rank指标:

点击 Use query 就能获取所有股票的排名曲线:

Prometheus 实战教程

6.配置Grafana告警

为了在某只股票达到某种排名的时候触发通知,我们需要先配置好告警渠道:

然后配置邮件告警,点击 Test, 此时 Grafana 会告诉你一个错误:

就是我们还没有配置好 SMTP 相关服务,需要配置 SMTP 相关服务才能正常发送邮件,如果你是按照本文按照Grafana的教程走下来的,那么Grafana.ini的文件位于 /etc/grafana/grafana.ini.

vim /etc/grafana/grafana.ini

然后在 smtp 部分配置你的 host、user、password、from_address、from_name,并打开 enabled 如下图所示:

然后重启 Grafana-server

systemctl restart grafana-server

再点击Test,你的邮箱里收到这样的邮件说明通知可以正常发送了:

然后我们进入正题,监控某只股票的排名变化,比如 SH600070:

然后点击 Alert 配置告警,一旦其排名高于65名则发送邮件通知:

完成后点击右上角的 save 保存即可:

然后进入 Alerting 告警中心,你会看到刚刚配置的告警规则在这里可以进行管控:

点击Pause可以暂停这个告警,Edit alert可以去更改告警条件。

一旦触发告警,这个状态便会更改,你就会收到邮件:

邮件效果如下:

邮件里的告警图片没显示出来,因为我们没有安装 “grafana image renderer”, 需要在你的服务器执行以下命令安装并重启 Grafana:

grafana-cli plugins install grafana-image-renderer
systemctl restart grafana-server

新的告警邮件便能看到图片了:

怎么样,用Prometheus+Grafana+Python采集搭建一个股票监控系统还是非常简单的吧?创新性地监控东方财富人气榜上某只股票的变化并产生告警,能让你熟悉监控策略的配置,见微知著。跟着本文的教程走,相信你会有不少收获。

如果我们延伸一下,结合量化投资系列教程的可转债交易策略 — Python 量化投资实战教程(10),是否可以构建一些更有意义的策略?答案是肯定的。

我们可以监控所有100元以下的可转债对应的股票,如果这些股票进入了人气榜TOP100或者飙升榜(本文没有采集,有兴趣的读者可以自行采集),就购入这些低价可转债,这种买入策略或许也不错。

你也可以抛弃东方财富的榜单分类,构建自己的排名环比增长买入策略,环比下跌卖出策略,我相信这会非常有意思。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化