标签归档:转载

Python 电信用户流失预测模型实战教程

1.流失预测 研究背景

1、做好用户流失预测可以降低营销成本。老生常谈,新客户开发成本老客户维护成本的5倍。

2、获得更好的用户体验。并不是所有的增值服务都可以有效留住客户。

3、获得更高的销售回报。价格敏感型客户和非价格敏感性客户。

2.提出问题

1、流失客户有哪些显著性特征?

2、当客户在哪些特征下什么条件下比较容易发生流失

3.数据集描述

该数据是datafountain上的《电信客户流失数据》,这里提供一个下载地址。
https://www.datafountain.cn/datasets/35guide

该数据集有21个变量,7043个数据点。变量可分为以下三个部分:用户属性、用户行为、研究对象。

用户属性
customerID :用户ID
gender:性别(Female & Male)
SeniorCitizen :老年人(1表示是,0表示不是)
Partner :是否有配偶(Yes or No)
Dependents :是否经济独立(Yes or No)
tenure :客户的职位(0-72,共73个职位)

用户行为
PhoneService :是否开通电话服务业务(Yes or No)
MultipleLines :是否开通了多线业务(Yes 、No or No phoneservice 三种)
InternetService :是否开通互联网服务(No, DSL数字网络,fiber optic光纤网络 三种)
OnlineSecurity :是否开通网络安全服务(Yes,No,No internetserive 三种)
OnlineBackup :是否开通在线备份业务(Yes,No,No internetserive 三种)
DeviceProtection :是否开通了设备保护业务(Yes,No,No internetserive 三种)
TechSupport :是否开通了技术支持服务(Yes,No,No internetserive 三种)
StreamingTV :是否开通网络电视(Yes,No,No internetserive 三种)
StreamingMovies :是否开通网络电影(Yes,No,No internetserive 三种)
Contract :签订合同方式 (按月,一年,两年)
PaperlessBilling :是否开通电子账单(Yes or No)
PaymentMethod :付款方式(bank transfer,credit card,electronic check,mailed check)
MonthlyCharges :月费用
TotalCharges :总费用

研究对象Churn:该用户是否流失(Yes or No)

4.分析思路

分析视角分析方法的灵魂。

分析方法有上百种,但分析视角只有四种:

  • 对比视角
  • 分类视角
  • 相关视角
  • 描述视角

一旦将业务需求拆解成指标,接下来只需要针对每个指标进行分析视角四选一即可。

数据集描述,已经将变量分为三个维度了:用户属性、用户行为、研究对象(是否流失客户),三个维度组合一下就得出了以下解题思路了:

  • 哪些属性的用户比较容易流失?
  • 哪些行为的用户比较容易流失?

以上两个分析思路运用的是【对比视角】,该视角下具体的分析方法有:

  • 数值型数据:均值比较
  • 分类型数据:频数分布比较(交叉分析)

以上的分析方法是统计分析,只能一个维度一个维度地去比较。但实际情况中,并不是每个维度的权重都一样的,那如何去研究各个维度的权重?

权重问题属于分类视角,故我们可以采用分类模型,要用哪个分类模型呢?不知道。可以全部采用,看模型精度得分,然后选得分最高的模型进行进一步预测。

  • Random Forest 随机森林
  • SVC 支持向量机
  • LogisticRegression 逻辑回归
  • KNN 近邻算法
  • Naive Bayes  朴素贝叶斯
  • Decision Tree 决策树
  • AdaBoost
  • GradientBoosting
  • XGB
  • CatBoost

5.分析结论及运营建议

5.1 分析结论

Python 电信用户流失预测模型实战教程

综合统计分析XGB算法输出特征重要性得出流失客户有以下特征(依特征重要性从大到小排列):

  1. tenure :1-5号职位的用户比较容易流失
  2. PaymentMethod :使用电子支票支付的人
  3. MonthlyCharges 、TotalCharges : 总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失
  4. PaperlessBilling : 开通电子账单
  5. Partner : 单身
  6. OnlineBackup : 没开通在线备份业务
  7. InternetService :开通了Fiber optic 光纤网络
  8. TechSupport :没开通“技术支持服务”
  9. DeviceProtection :没开通设备保护业务
  10. OnlineSecurity :没开通网络安全服务
  11. Contract :按月签订合同方式
  12. Dependents :无经济独立
  13. SeniorCitizen :青年人
  14. TotalCharges :总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失

当条件覆盖得越多,人群越精确,但与此同时,覆盖的人群也会越少。业务方可直接在数据库中,通过SQL检索符合要求的客户,然后做针对性的运营工作。

5.2 运营建议

如何留住客户,可以从两方面去思考:

  • 增加用户的沉没成本(损失厌恶)
    • 会员等级
    • 积分制
    • 充值赠送
    • 满减券
    • 其他增值服务
  • 培养用户的条件反射(习惯)
    • 会员日
    • 定期用户召回
    • 签到
    • 每日定时抽奖
    • 小游戏

电子账单解锁新权益

  • 现象:“开通电子账单”的人反而容易流失。
  • 基本假设:价格敏感型客户。电子账单,让客户理性消费。
  • 建议:让“电子账单”变成一项“福利。跟连锁便利店,联名发”商品满减券”,每月的账单时间,就将”商品满减券“和账单一起推送过去。文案:您上月消费了XX元,解锁了xx会员权益。
  • 底层规律:增加沉没成本。

“单身用户”尊享亲情网

  • 现象:“单身用户”容易流失。
  • 基本假设:社交欲望低。
  • 建议:一个单身用户拥有建立3个人以内的“亲情网”的权益。
  • 底层规律:增加沉没成本。

推广“在线备份、设备保护、技术支持、网络保护”等增值服务。

6.实战教程-数据清洗

6.1 导入模块

6.1.1 数据处理

import pandas as pd
import numpy as np

6.1.2 可视化

import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style='darkgrid',font_scale=1.3)
plt.rcParams['font.family']='SimHei'
plt.rcParams['axes.unicode_minus']=False

6.1.3 特征工程

import sklearn
from sklearn import preprocessing                            #数据预处理模块
from sklearn.preprocessing import LabelEncoder               #编码转换
from sklearn.preprocessing import StandardScaler             #归一化
from sklearn.model_selection import StratifiedShuffleSplit   #分层抽样
from sklearn.model_selection import train_test_split         #数据分区
from sklearn.decomposition import PCA                        #主成分分析 (降维)

6.1.4 分类算法

from sklearn.ensemble import RandomForestClassifier     #随机森林
from sklearn.svm import SVC,LinearSVC                   #支持向量机
from sklearn.linear_model import LogisticRegression     #逻辑回归
from sklearn.neighbors import KNeighborsClassifier      #KNN算法
from sklearn.cluster import KMeans                     #K-Means 聚类算法
from sklearn.naive_bayes import GaussianNB              #朴素贝叶斯
from sklearn.tree import DecisionTreeClassifier         #决策树

6.1.5 分类算法–集成学习

import xgboost as xgb
from xgboost import XGBClassifier                      
from catboost import CatBoostClassifier                
from sklearn.ensemble import AdaBoostClassifier        
from sklearn.ensemble import GradientBoostingClassifier 

6.1.6 模型评估

from sklearn.metrics import classification_report,precision_score,recall_score,f1_score  #分类报告
from sklearn.metrics import confusion_matrix           #混淆矩阵
from sklearn.metrics import silhouette_score           #轮廓系数(评价k-mean聚类效果)
from sklearn.model_selection import GridSearchCV       #交叉验证
from sklearn.metrics import make_scorer
from sklearn.ensemble import VotingClassifier          #投票

6.1.7 忽略警告

import warnings
warnings.filterwarnings('ignore')

6.2 读取数据

df=pd.read_csv(r'C:\Users\Think\Desktop\刻意练习数据\电信数据集\Customer-Churn.csv',header=0)
#预览数据
df.head()
#查看数据大小
df.shape
#查看数据数据及分布
df.describe()

这里安利一下spyder编辑器,下图是这个编辑器的界面。编程过程中,有赋值变量的操作,该编辑器都会在右上角呈现,双击一下,就可以像在Execel上查看数据,非常方便。查看该数据集的详情。

6.3 数据清洗

6.3.1 缺失值处理

#查看缺失值
df.isnull().sum()

注:缺失值的数据类型是 float 类型。一旦有变量的数据类型转换成float 类型,需再次查看缺失值。

6.3.2 重复值处理

#查看重复值
df.duplicated().sum()

【输出】

6.3.3 数值类型转换

#查看数据类型
df.info()

【输出】TotalCharages总费用应该跟MonthlvCharges是同一个数据类型(float64)。故需将TotalCharages由object转换成float64,且需要再次查看缺失值。

#总费用 TotalCharges  该列的数据类型应是float64,不是object
# df['TotalCharges'].astype('float64')
# 此处用“astype”转化数据类型报错 “could not convert string to float”
#改用强制转化 convert_numeric=True   
df['TotalCharges']=df['TotalCharges'].convert_objects(convert_numeric=True)
df['TotalCharges'].dtype

输出如下:再次查看缺失值:TotalCharges列有11个缺失值,处理缺失值的原则是尽量填充,最后才是删除。
缺失值填充的原则:

  • 分类型数据:众数填充
  • 数值型数据:正态分布,均值/中位数填充;偏态分布,中位数填充。

TotalCharges列是数值型数据,先画直方图查看数据分布形态。

#分别作直方图:全部客户类型、流失客户类型、留存客户类型
plt.figure(figsize=(14,5))
plt.subplot(1,3,1)
plt.title('全部客户的总付费直方图')
sns.distplot(df['TotalCharges'].dropna())

plt.subplot(1,3,2)
plt.title('流失客户的总付费直方图')
sns.distplot(df[df['Churn']=='Yes']['TotalCharges'].dropna())

plt.subplot(1,3,3)
plt.title('留存客户的总付费直方图')
sns.distplot(df[df['Churn']=='No']['TotalCharges'].dropna())

结果如下:从三个直方图看,该列数据是偏态分布,故选择中位数填充。

df.fillna({'TotalCharges':df['TotalCharges'].median()},inplace=True)
#再次确认是否还有空值
df.isnull().sum()

结果如下:

6.4 查看样本分布

研究对象’Churn’列重新编码“Yes”=1,“No”=0。重新编码有下面两种方法。

方法一:replace

df['Churn'].replace(to_replace = 'Yes', value = 1,inplace = True)
df['Churn'].replace(to_replace = 'No', value = 0,inplace = True)

方法二:map函数

df['Churn']=df['Churn'].map({'Yes':1,'No':0})

预览数据:

df['Churn'].head()

结果如下:绘制饼图,查看流失客户占比。

churn_value=df["Churn"].value_counts()
labels=df["Churn"].value_counts().index

plt.figure(figsize=(7,7))
plt.pie(churn_value,labels=labels,colors=["b","w"], explode=(0.1,0),autopct='%1.1f%%', shadow=True)
plt.title("流失客户占比高达26.5%")
plt.show()  

结果如下:【分析】:流失客户样本占比26.5%,留存客户样本占比73.5%,明显的“样本不均衡”。

解决样本不均衡有以下方法可以选择:

  • 分层抽样
  • 过抽样
  • 欠抽样

7.实战教程-特征选择

提取特征

feature=df.iloc[:,1:20]

7.1 整数编码

查看变量间的两两相关性

#重新编码
corr_df = feature.apply(lambda x: pd.factorize(x)[0])
corr_df.head()
#相关性矩阵
corr=corr_df.corr()
corr

结果如下:相关性矩阵可视化

#绘制热力图观察变量之间的相关性强弱
plt.figure(figsize=(15,12))
ax = sns.heatmap(corr, xticklabels=corr.columns, yticklabels=corr.columns, 
                 linewidths=0.2, cmap="RdYlGn",annot=True)
plt.title("Correlation between variables")

结果如下:【分析】:从热力图来看,互联网服务、网络安全、在线备份、设备维护服务、技术支持服务、开通网络电视服务、开通网络电影之间相关性很强,且是正相关。电话服务和多线业务之间也存在很强的正相关关系。

7.2 独热编码

查看研究对象”Churn”与其他变量下的标签相关性。独热编码,可以将分类变量下的标签转化成列

df_onehot = pd.get_dummies(df.iloc[:,1:21])
df_onehot.head()

结果如下:绘图查看用户流失(‘Churn’)与各个维度之间的关系

plt.figure(figsize=(15,6))
df_onehot.corr()['Churn'].sort_values(ascending=False).plot(kind='bar')
plt.title('Correlation between Churn  and variables ')

结果如下:【分析】:从图看gender(性别)、PhoneService(电话服务)相关性几乎为0,故两个维度可以忽略。[‘SeniorCitizen’,’Partner’,’Dependents’, ‘Contract’,MultipleLines,’InternetService’,  ‘OnlineSecurity’, ‘OnlineBackup’, ‘DeviceProtection’,’TechSupport’, ‘StreamingTV’, ‘StreamingMovies’,’PaperlessBilling’,’PaymentMethod’]等都有较高的相关性,将以上维度合并成一个列表kf_var,然后进行频数比较。

kf_var=list(df.columns[2:5])
for var in list(df.columns[7:18]):
    kf_var.append(var)
print('kf_var=',kf_var)

结果如下:

8.实战教程-统计分析

8.1 频数分布比较

8.1.1 卡方检验

组间有显著性差异,频数分布比较才有意义,否则可能会做无用功。
“卡方检验”,就是提高频数比较结论可信度的统计方法。

#分组间确实是有显著性差异,频数比较的结论才有可信度,故需进行”卡方检验“
from scipy.stats import chi2_contingency   #统计分析 卡方检验
#自定义卡方检验函数
def KF(x):
    df1=pd.crosstab(df['Churn'],df[x])
    li1=list(df1.iloc[0,:])
    li2=list(df1.iloc[1,:])
    kf_data=np.array([li1,li2])
    kf=chi2_contingency(kf_data)
    if kf[1]<0.05:
        print('Churn by {} 的卡方临界值是{:.2f},小于0.05,表明{}组间有显著性差异,可进行【交叉分析】'.format(x,kf[1],x),'\n')
    else:
        print('Churn by {} 的卡方临界值是{:.2f},大于0.05,表明{}组间无显著性差异,不可进行交叉分析'.format(x,kf[1],x),'\n')
#对 kf_var进行卡方检验
print('kf_var的卡方检验结果如下:','\n')
print(list(map(KF, kf_var)))

kf_var的卡方检验结果如下:

Churn by SeniorCitizen 的卡方临界值是0.00,小于0.05,表明SeniorCitizen组间有显著性差异,可进行【交叉分析】

Churn by Partner 的卡方临界值是0.00,小于0.05,表明Partner组间有显著性差异,可进行【交叉分析】

Churn by Dependents 的卡方临界值是0.00,小于0.05,表明Dependents组间有显著性差异,可进行【交叉分析】

Churn by MultipleLines 的卡方临界值是0.99,大于0.05,表明MultipleLines组间无显著性差异,不可进行交叉分析

Churn by InternetService 的卡方临界值是0.00,小于0.05,表明InternetService组间有显著性差异,可进行【交叉分析】

Churn by OnlineSecurity 的卡方临界值是0.00,小于0.05,表明OnlineSecurity组间有显著性差异,可进行【交叉分析】

Churn by OnlineBackup 的卡方临界值是0.00,小于0.05,表明OnlineBackup组间有显著性差异,可进行【交叉分析】

Churn by DeviceProtection 的卡方临界值是0.00,小于0.05,表明DeviceProtection组间有显著性差异,可进行【交叉分析】

Churn by TechSupport 的卡方临界值是0.00,小于0.05,表明TechSupport组间有显著性差异,可进行【交叉分析】

Churn by StreamingTV 的卡方临界值是0.00,小于0.05,表明StreamingTV组间有显著性差异,可进行【交叉分析】

Churn by StreamingMovies 的卡方临界值是0.00,小于0.05,表明StreamingMovies组间有显著性差异,可进行【交叉分析】

Churn by Contract 的卡方临界值是0.00,小于0.05,表明Contract组间有显著性差异,可进行【交叉分析】

Churn by PaperlessBilling 的卡方临界值是0.00,小于0.05,表明PaperlessBilling组间有显著性差异,可进行【交叉分析】

Churn by PaymentMethod 的卡方临界值是0.00,小于0.05,表明PaymentMethod组间有显著性差异,可进行【交叉分析】

从卡方检验的结果,kf_var包含的特征,组间都有显著性差异,可进行频数比较。

8.1.2 柱形图

频数比较–柱形图

plt.figure(figsize=(20,25))
a=0
for k in kf_var:
    a=a+1
    plt.subplot(4,4,a)
    plt.title('Churn BY '+ k)
    sns.countplot(x=k,hue='Churn',data=df)

结果如下:因为PaymentMethod的标签比较长,影响看图,所以单独画。

plt.xticks(rotation=45)
sns.countplot(x='PaymentMethod',hue='Churn',data=df)

可以直接从柱形图去判断对哪个维度对流失客户的影响大吗?不能,因为“样本不均衡”(流失客户样本占比26.5%,留存客户样本占比73.5%),基数不一样,故不能直接通过“频数”的柱形图去分析。
解决办法:交叉分析,且作同行百分比(’Churn’作为“行”)

8.1.3 交叉分析

print('ka_var列表中的维度与Churn交叉分析结果如下:','\n')
for i in kf_var:
    print('................Churn BY {}...............'.format(i))
    print(pd.crosstab(df['Churn'],df[i],normalize=0),'\n'#交叉分析,同行百分比

ka_var列表中的维度与Churn交叉分析结果如下:【SeniorCitizen 分析】:年轻用户 在流失、留存,两个标签的人数占比都高。【Parter 分析】:单身用户更容易流失。【Denpendents 分析】:经济不独立的用户更容易流失。【MultipleLines 分析】:是否开通MultipleLines,对留存和流失都没有明显的促进作用。【InternetService 分析】:办理了 “Fiber optic 光纤网络”的客户容易流失。【OnlineSecurity 分析】:没开通“网络安全服务”的客户容易流失。【OnlineBackup 分析】:没开通“在线备份服务”的客户容易流失。【DeviceProtection 分析】:没开通“设备保护业务”的用户比较容易流失【TechSupport 分析】:没开通“技术支持服务”的用户容易流失。【StreamingTV 分析】:是否开通“网络电视”服务,对用户留存、流失,没有明显的促进作用。【StreamingMovies 分析】:是否开通“网络电影”服务,对用户留存、流失,没有明显的促进作用。【Contract 分析】逐月签订合同的用户最容易流失。

因为”Churn BY PaymentMethod”打印出来显示不全,故我就从临时表将“交叉表”给截图出来了: 【分析】使用“电子支票”支付的人更容易流失。

8.2 均值比较

组间有显著性差异,均值比较才有意义。
显著性检验,先通过了齐性检验,再通过方差分析,最后才能做均值比较。

8.2.0 齐性检验,方差分析

#自定义齐性检验 & 方差分析 函数
def ANOVA(x):
    li_index=list(df['Churn'].value_counts().keys())
    args=[]
    for i in li_index:
        args.append(df[df['Churn']==i][x])
    w,p=stats.levene(*args)             #齐性检验
    if p<0.05:
        print('警告:Churn BY {}的P值为{:.2f},小于0.05,表明齐性检验不通过,不可作方差分析'.format(x,p),'\n')
    else:
        f,p_value=stats.f_oneway(*args) #方差分析
        print('Churn BY {} 的f值是{},p_value值是{}'.format(x,f,p_value),'\n')
        if p_value<0.05:
            print('Churn BY {}的均值有显著性差异,可进行均值比较'.format(x),'\n')
        else:
            print('Churn BY {}的均值无显著性差异,不可进行均值比较'.format(x),'\n')

对MonthlyCharges、TotalCharges维度分别进行齐性检验和方差分析

print('MonthlyCharges、TotalCharges的齐性检验 和方差分析结果如下:','\n')
ANOVA('MonthlyCharges')
ANOVA('TotalCharges')

【输出】:
MonthlyCharges、TotalCharges的齐性检验 和方差分析结果如下:

警告:Churn BY MonthlyCharges的P值为0.00,小于0.05,表明齐性检验不通过,不可作方差分析

警告:Churn BY TotalCharges的P值为0.00,小于0.05,表明齐性检验不通过,不可作方差分析

8.3 总结

用户出现以下特征比较容易流失:

  • SeniorCitizen:青年人
  • Partner :单身
  • Dependents :无经济独立
  • InternetService:开通了 “Fiber optic 光纤网络”
  • OnlineSecurity:没开通“网络安全服务”
  • OnlineBackup:没开通“在线备份业务”
  • DeviceProtection:没开通通了“设备保护业务
  • TechSupport:没开通“技术支持服务”
  • Contract:“按月”签订合同方式
  • PaperlessBilling:开通电子账单
  • PaymentMethod:使用“电子支票”支付的人

我们可以在SQL(数据库)上找有以上特征的客户,进行精准营销,即可以降低用户流失。虽然特征选得越多,越精确,但覆盖的人群也会越少。故,我们还需要计算“特征”的【重要性】,将最为重要的几个特征作为筛选条件。

计算特征的【重要性】,是“分类视角”,接下来我们会挑选常见的分类模型,进行批量训练,然后挑出得分最高的模型,进一步计算“特征重要性”。

9.实战教程-特征工程

9.1 提取特征

有前面的流失率与各个维度的相关系数柱状图可知:
流失率与gender(性别)、PhoneService(电话服务)相关性几乎为0,可以筛选掉,而customerID是随机数,不影响建模,故可以筛选掉。最终得到特征 churn_var

churn_var=df.iloc[:,2:20]
churn_var.drop("PhoneService",axis=1, inplace=True)
churn_var.head()

结果如下:

9.2 处理“量纲差异大”

“MonthlyCharges”、”TotalCharges”两个特征跟其他特征相比,量纲差异大。处理量纲差异大,有两种方法:

  1. 标准化
  2. 离散化

以上两种方法,哪个能让模型精度提高,就选哪个。根据模型的最后得分,我选了“离散化”来处理量纲差异大。

9.2.1 标准化

scaler = StandardScaler(copy=False)
scaler.fit_transform(churn_var[['MonthlyCharges','TotalCharges']])  #fit_transform拟合数据
churn_var[['MonthlyCharges','TotalCharges']]=scaler.transform(churn_var[['MonthlyCharges','TotalCharges']])  #transform标准化

print(churn_var[['MonthlyCharges','TotalCharges']].head() )#查看拟合结果

【输出】

9.2.2 特征离散化

特征离散化后,模型易于快速迭代,且模型更稳定。

1、处理’MonthlyCharges’:

#查看'MonthlyCharges'列的4分位
churn_var['MonthlyCharges'].describe() 

离散操作
18.25=<churn_var[‘MonthlyCharges’]<=35.5,标记 “1”
35.5<churn_var[‘MonthlyCharges’]<=70.35,标记 “2”
70.35<churn_var[‘MonthlyCharges’]<=89.85,标记 “3”
89.85=<churn_varf[‘MonthlyCharges’]<=118.75,标记“4”

#用四分位数进行离散
churn_var['MonthlyCharges']=pd.qcut(churn_var['MonthlyCharges'],4,labels=['1','2','3','4'])
churn_var['MonthlyCharges'].head()

结果如下:2、处理’TotalCharges’:

#查看'TotalCharges'列的4分位
churn_var['TotalCharges'].describe()

结果如下:

离散操作:
18=<churn_var[‘TotalCharges’]<=402,标记 “1”
402<churn_var[‘TotalCharges’]<=1397,标记 “2”
1397<churn_var[‘TotalCharges’]<=3786,标记 “3”
3786<churn_var[‘TotalCharges’]<=8684,标记 “4”

#用四分位数进行离散 
churn_var['TotalCharges']=pd.qcut(churn_var['TotalCharges'],4,labels=['1','2','3','4'])
churn_var['TotalCharges'].head()

【输出】

9.3 分类数据转换成“整数编码”

9.3.1 查看churn_var中分类变量的label(标签)

#自定义函数获取分类变量中的label
def Label(x):
    print(x,"--" ,churn_var[x].unique()) 
#筛选出数据类型为“object”的数据点
df_object=churn_var.select_dtypes(['object']) 
print(list(map(Label,df_object)))

结果如下:

通过同行百分比的“交叉分析”发现,label “No internetserive”的人数占比在以下特征[OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingTV]都是惊人的一致,故我们可以判断label “No internetserive”不影响流失率。
因为这6项增值服务,都是需要开通“互联网服务”的基础上才享受得到的。不开通“互联网服务”视为没开通这6项增值服务,故可以将 6个特正中的“No internetserive” 并到 “No”里面。

churn_var.replace(to_replace='No internet service',value='No',inplace=True)

而特征MultipleLines的“ No phoneservice”在流失客户、留存客户样本中的人数占比几乎接近,且比较少,故可以将“ No phoneservice”并到“No”。

churn_var.replace(to_replace='No phone service',value='No',inplace=True)
df_object=churn_var.select_dtypes(['object']) 
print(list(map(Label,df_object.columns)))

结果如下:

9.3.2 整数编码

整数编码的方法有两种:

  1. sklearn中的LabelEncoder()
  2. pandas中的factorize
    此处选用 LabelEncoder()
def labelencode(x):
    churn_var[x] = LabelEncoder().fit_transform(churn_var[x])
for i in range(0,len(df_object.columns)):
    labelencode(df_object.columns[i])
print(list(map(Label,df_object.columns)))

结果如下:

9.4 处理“样本不均衡”

分拆变量

x=churn_var
y=df['Churn'].values
print('抽样前的数据特征',x.shape)
print('抽样前的数据标签',y.shape)

【输出】
抽样前的数据特征 (7043, 17)
抽样前的数据标签 (7043,)

处理样本不均衡常用的方式有三种:

  1. 分层抽样
  2. 过抽样
  3. 欠抽样

笔者先后尝试了“分层抽样”和“欠抽样”,前者最终得到的模型中精度最高的是0.63,而后者最终得到的模型中精度最低是0.78,最高是0.84。所以说“抽样方式”的选择极为重要,大家要在这里多试错。

分层抽样

sss=StratifiedShuffleSplit(n_splits=5, test_size=0.2, random_state=0)
print(sss)
print("训练数据和测试数据被分成的组数:",sss.get_n_splits(x,y))
# 分拆训练集和测试集
for train_index, test_index in sss.split(x, y):
    print("train:", train_index, "test:", test_index)
    x_train,x_test=x.iloc[train_index], x.iloc[test_index]
    y_train,y_test=y[train_index], y[test_index]

“过抽样”让模型精度更高,故我选“过抽样”。

from imblearn.over_sampling import SMOTE
model_smote=SMOTE()
x,y=model_smote.fit_sample(x,y)
x=pd.DataFrame(x,columns=churn_var.columns)
#分拆数据集:训练集 和 测试集
x_train,x_test,y_train,y_test=train_test_split(x,y,test_size=0.3,random_state=0)

输出数据集大小

print('过抽样数据特征:', x.shape,
      '训练数据特征:',x_train.shape,
      '测试数据特征:',x_test.shape)

print('过抽样后数据标签:', y.shape,
      '   训练数据标签:',y_train.shape,
      '   测试数据标签:',y_test.shape)

【输出】

过抽样后数据特征: (10348, 17) 训练数据特征: (7243, 17) 测试数据特征: (3105, 17)
过抽样后数据标签: (10348,)    训练数据标签: (7243,)    测试数据标签: (3105,)

10.实战教程-数据建模

使用分类算法

Classifiers=[["Random Forest",RandomForestClassifier()],
             ["Support Vector Machine",SVC()],
             ["LogisticRegression",LogisticRegression()],
             ["KNN",KNeighborsClassifier(n_neighbors=5)],
             ["Naive Bayes",GaussianNB()],
             ["Decision Tree",DecisionTreeClassifier()],
             ["AdaBoostClassifier", AdaBoostClassifier()],
             ["GradientBoostingClassifier", GradientBoostingClassifier()],
             ["XGB", XGBClassifier()],
             ["CatBoost", CatBoostClassifier(logging_level='Silent')]  
]

训练模型

Classify_result=[]
names=[]
prediction=[]
for name,classifier in Classifiers:
    classifier=classifier
    classifier.fit(x_train,y_train)
    y_pred=classifier.predict(x_test)
    recall=recall_score(y_test,y_pred)
    precision=precision_score(y_test,y_pred)
    f1score = f1_score(y_test, y_pred)
    class_eva=pd.DataFrame([recall,precision,f1score])
    Classify_result.append(class_eva)
    name=pd.Series(name)
    names.append(name)
    y_pred=pd.Series(y_pred)
    prediction.append(y_pred)

11.模型评估

names=pd.DataFrame(names)
names=names[0].tolist()
result=pd.concat(Classify_result,axis=1)
result.columns=names
result.index=["recall","precision","f1score"]
result

【输出】
特征工程,采用“标准化”处理量纲差异,采用“分层抽样”处理样本不均衡。
最终模型精度得分,最高分是0.63,是“朴素贝叶斯”模型

特征工程,采用“离散化”处理量纲差异,采用“过抽样”处理样本不均衡。
最终模型精度得分,最高分是0.84,是“XGB”模型

12.基于“XGB”模型输出特征重要性

笔者尝试了两个算法分别输出“特征重要性”:CatBoost算法 和 XGB 算法

  • CatBoost算法
model = CatBoostClassifier()
model.fit(x_train,y_train,eval_set=(x_test, y_test),plot=True)
#特征重要性可视化
catboost=pd.DataFrame(columns=['feature','feature_importance'])
catboost['feature']=model.feature_names_
catboost['feature_importance']=model.feature_importances_
catboost=catboost.sort_values('feature_importance',ascending=False#降序排列
plt.figure(figsize=(10,10))
plt.title('特征重要性')
sns.barplot(x='feature_importance',y='feature',data=catboost)

【输出】-XGB 算法

model_xgb= XGBClassifier()
model_xgb.fit(x_train,y_train)
from xgboost import plot_importance
plot_importance(model_xgb,height=0.5)
plt.show()

【输出】由于 XGB算法精度得分最高,故我们以XGB得到的“特征重要性”进行分析。
【分析】

  1. 第一重要特征:tenure
plt.figure(figsize=(20,4))
sns.countplot(x='tenure',hue='Churn',data=df)

【输出】【分析】
由图可知,流失客户集中在1-5号职位,运营团队需要重点关注1-5号职位。

  1. 第二重要特征:PaymentMethod

【分析】
使用“电子支票”支付的人更容易流失。

  1. 第三重要特征:MonthlyCharges
    查看流失用户、留存用户在付费方面的偏好:
    ‘MonthlyCharges’、’TotalCharges’,离散化后,可进行卡方检验,然后交叉分析。
  • 卡方检验:’MonthlyCharges’、’TotalCharges’
df['MonthlyCharges-']=churn_var['MonthlyCharges']
df['TotalCharges-']=churn_var['TotalCharges']
print('kf_var的卡方检验结果如下:','\n')
KF('MonthlyCharges-')
KF('TotalCharges-')

【输出】
kf_var的卡方检验结果如下:

Churn by MonthlyCharges 的卡方临界值是0.00,小于0.05,表明MonthlyCharges组间有显著性差异,可进行【交叉分析】

Churn by TotalCharges 的卡方临界值是0.00,小于0.05,表明TotalCharges组间有显著性差异,可进行【交叉分析】

  • 交叉分析
for i in ['MonthlyCharges','TotalCharges']:
    print('................Churn BY {}...............'.format(i))
    print(pd.crosstab(df['Churn'],df[i],normalize=0),'\n')

【输出】18.25=<churn_var[‘MonthlyCharges’]<=35.5,标记 “1”
35.5<churn_var[‘MonthlyCharges’]<=70.35,标记 “2”
70.35<churn_var[‘MonthlyCharges’]<=89.85,标记 “3”
89.85=<churn_varf[‘MonthlyCharges’]<=118.75,标记“4”
【分析】
月付费70.35–118.75元的用户更容易流失18=<churn_var[‘TotalCharges’]<=402,标记 “1”
402<churn_var[‘TotalCharges’]<=1397,标记 “2”
1397<churn_var[‘TotalCharges’]<=3786,标记 “3”
3786<churn_var[‘TotalCharges’]<=8684,标记 “4”
【分析】
总付费18–1397元的用户更容易流失

基于”MonthlyCharges”和“TotalCharges”画四分图:
求两个维度的均值

 print('MonthlyCharges的均值是{:.2f},TotalCharges的均值是{:.2f}'.format(df['MonthlyCharges'].mean(),df['TotalCharges'].mean()))

流失客户四分图:

df_1=df[df['Churn']==1#流失客户
df_0=df[df['Churn']==0#留存客户
plt.figure(figsize=(10,10))   
sns.scatterplot('MonthlyCharges','TotalCharges',hue='Churn', palette=plt.cm.RdYlBu,data=df_1)
plt.axhline(y=df['TotalCharges'].mean(),ls="-",c="k")
plt.axvline(x=df['MonthlyCharges'].mean(),ls="-",c="green")

【输出】【分析】
四分图的右下区域,流失客户比较集中,即总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失。

留存客户四分图

plt.figure(figsize=(10,10)) 
sns.scatterplot('MonthlyCharges','TotalCharges',hue='Churn', palette=plt.cm.RdYlBu_r,data=df_0)
plt.axhline(y=df['TotalCharges'].mean(),ls="-",c="k")
plt.axvline(x=df['MonthlyCharges'].mean(),ls="-",c="green")

【输出】【结论】
综合“ 统计分析” 和 “XGB算法输出特征重要性” 得出流失客户有以下特征(依特征重要性从大到小排列):

  1. tenure:1-5号职位的用户比较容易流失
  2. PaymentMethod:使用“电子支票”支付的人
  3. MonthlyCharges 、TotalCharges:总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失
  4. PaperlessBilling:开通电子账单
  5. Partner:单身
  6. OnlineBackup:没开通“在线备份业务”
  7. InternetService:开通了 “Fiber optic 光纤网络”
  8. TechSupport:没开通“技术支持服务”
  9. DeviceProtection:没开通通了“设备保护业务
  10. OnlineSecurity:没开通“网络安全服务”
  11. Contract:“按月”签订合同方式
  12. Dependents:无经济独立
  13. SeniorCitizen :青年人
  14. TotalCharges:总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失

转自猫有九条命。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化全方位实战教程

本文讲解了Pandas性能优化的几种方法,比如第一篇文章讲到了transform函数的应用、Cython编写C扩展、减少类型转换、使用特殊的数组。第二篇Pandas基础优化讲到了尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。第三篇进阶版优化讲到了一些更高级的优化技巧。

1.Pandas 性能优化 40 倍 – DataFrame

1. 1 性能优化小试牛刀

大名鼎鼎的Pandas是数据分析的神器。有时候我们需要对上千万甚至上亿的数据进行非常复杂处理,那么运行效率就是一个不能忽视的问题。

比如下面这个简单例子,我们随机生成100万条数据,对val这一列进行处理:如果是偶数则减1,奇数则加1。实际的数据分析工作要比这个例子复杂的多,但考虑到我们没有那么多时间等待运行结果,所以就偷个懒吧。可以看到transform函数的平均运行时间是284ms:

import pandas as pd
import numpy as np

def gen_data(size):
    d = dict()
    d["genre"] = np.random.choice(["A""B""C""D"], size=size)
    d["val"] = np.random.randint(low=0, high=100, size=size)
    return pd.DataFrame(d)

data = gen_data(1000000)
data.head()
def transform(data):
    data.loc[:, "new_val"] = data.val.apply(lambda x: x + 1 if x % 2 else x - 1)

%timeit -n 1 transform(data)
284 ms ± 8.95 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.2. 用Cython编写C扩展

试试用我们的老朋友Cython来写一下 x + 1 if x % 2 else x - 1 这个函数。平均运行时间降低到了202ms,果然速度变快了。性能大约提升了1.4倍,离40倍的flag还差的好远。

%load_ext cython
%%cython
cpdef int _transform(int x):
    if x % 2:
        return x + 1
    return x - 1

def transform(data):
    data.loc[:, "new_val"] = data.val.apply(_transform)

%timeit -n 1 transform(data)
202 ms ± 13.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.3. 减少类型转换

为了减少C和Python之间的类型转换,我们直接把val这一列作为Numpy数组传递给Cython函数,注意区分cnpnp。平均运行时间直接降到10.8毫秒,性能大约提升了26倍,仿佛看到了一丝希望。

%%cython
import numpy as np
cimport numpy as cnp
ctypedef cnp.int_t DTYPE_t

cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr):
    cdef:
        int i = 0
        int n = arr.shape[0]
        int x
        cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)

    while i < n:
        x = arr[i]
        if x % 2:
            new_arr[i] = x + 1
        else:
            new_arr[i] = x - 1
        i += 1
    return new_arr

def transform(data):
    data.loc[:, "new_val"] = _transform(data.val.values)

%timeit -n 1 transform(data)
10.8 ms ± 512 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.4. 使用不安全的数组

利用@cython.boundscheck(False)@cython.wraparound(False)装饰器关闭数组的边界检查和负下标处理,平均运行时间变为5.9毫秒。性能提升了42倍左右,顺利完成任务。

%%cython
import cython
import numpy as np
cimport numpy as cnp
ctypedef cnp.int_t DTYPE_t

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr):
    cdef:
        int i = 0
        int n = arr.shape[0]
        int x
        cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)

    while i < n:
        x = arr[i]
        if x % 2:
            new_arr[i] = x + 1
        else:
            new_arr[i] = x - 1
        i += 1
    return new_arr

def transform(data):
    data.loc[:, "new_val"] = _transform(data.val.values)

%timeit -n 1 transform(data)
6.76 ms ± 545 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

来源:Python中文社区

作者:李小文,先后从事过数据分析、数据挖掘工作,主要开发语言是Python,现任一家小型互联网公司的算法工程师。https://github.com/tushushu

2.Pandas性能优化:基础篇

Pandas 号称“数据挖掘瑞士军刀”,是数据处理最常用的库。在数据挖掘或者kaggle比赛中,我们经常使用pandas进行数据提取、分析、构造特征。而如果数据量很大,操作算法复杂,那么pandas的运行速度可能非常慢。本文根据实际工作中的经验,总结了一些pandas的使用技巧,帮助提高运行速度或减少内存占用。

2.1 按行迭代优化

很多时候,我们会按行对dataframe进行迭代,一般我们会用iterrows这个函数。在新版的pandas中,提供了一个更快的itertuples函数。

我们测试一下速度:

import pandas as pd
import numpy as np
import time
df = pd.DataFrame({'a': np.random.randn(1000),
                     'b': np.random.randn(1000),
                    'N': np.random.randint(100, 1000, (1000)),
                   'x':  np.random.randint(1, 10, (1000))})

%%timeit
a2=[]
for index,row in df.iterrows():
    temp=row['a']
    a2.append(temp*temp)
df['a2']=a2    

67.6 ms ± 3.69 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
a2=[]
for row in df.itertuples():
    temp=getattr(row, 'a')
    a2.append(temp*temp)
df['a2']=a2

1.54 ms ± 168 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看到直接循环,itertuples速度是iterrows的很多倍。

所以如果在必须要对dataframe进行遍历的话,直接用itertuples替换iterrows。

2.2 apply 优化

一般情况下,如果要对dataframe里的数据逐行处理,而不需要上下文信息,可以使用apply函数。
对于上面的例子,我们使用apply看下:

%%timeit
df['a2']=df.a.apply(lambda x: x*x)

360 µs ± 355 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看到,效率又有提升,apply的速度是itertuples的5倍左右。

注意一下,apply不光能对单个列值做处理,也能对多个列的值做处理。

%%timeit
df['a3']=df.apply( lambda row: row['a']*row['b'],axis=1)

15 ms ± 1.61 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)

这里看出来,多值处理的时候几乎等于iterrows。因此比单列值apply慢了许多,所以这里不推荐对整行进行apply。

我们可以简单的这样改写:

%%timeit
df['a3']=df['a']*df['b']

204 µs ± 8.31 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

如果在计算的时候,使用series.values 提取numpy 数组并使用numpy原生函数计算,效率可能更高

%%timeit
df['a3']=np.multiply(df['a'].values,df['b'].values)

93.3 µs ± 1.45 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

2.3 聚合agg效率优化

有的时候,我们会对一列数据按值进行分组(groupby),再分组后,依次对每一组数据中的其他列进行聚合(agg)。

还是上面的那个dataframe,我们看下:
采用自定义函数的agg函数:

%%timeit
df.groupby("x")['a'].agg(lambda x:x.sum())

1.27 ms ± 45.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

采用agg内置函数:

%%timeit
df.groupby("x")['a'].agg(sum)
#等价df.groupby("x")['a'].sum()

415 µs ± 20.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

刚才是单列聚合,我们看下多列聚合:
自定义函数:

%%timeit
df.groupby("x").agg({"a":lambda x:x.sum(),"b":lambda x:x.count()})

2.6 ms ± 8.17 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

采用内置函数:

%%timeit
df.groupby("x").agg({"a":"sum","b":"count"})

1.33 ms ± 29.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看出,对于多列聚合,内置函数仍然比自定义函数快1倍。所以再进行聚合操作时,尽量使用内置函数提高效率。

下面列出了一些内置函数:

内置函数描述
count计数
sum求和
mean平均值
median中位数
min,max最大值/最小值
std,var标准差/方差
prod求积

当我们需要统计的方法没有对于内置函数的情况下,在自定义函数的时候,优先选用pandas或numpy内置的其他高效函数,如

df.groupby("x")['a'].agg(lambda x:np.sum(np.abs(x)))

在数据量很大的时候要比非numpy函数效率高(数据量小的时候差不多)。

2.4 数据读取优化

如果我们需要用pandas读取较多次文件,或者读取的文件较大,那么这方面占用的时间也会较长,我们也需要对其进行优化。pandas最常见读取函数是read_csv函数,可以对csv文件进行读取。但是pandas read_csv对读取较大的、数据结构复杂的文件,效率不是很理想。

这里有几种方法可以优化:

1. 分块读取csv文件
如果文件过大,内存不够或者时间上等不及,可以分块进行读取。

chunksize = 10 ** 6for chunk in pd.read_csv(filename,  chunksize=chunksize):
     process(chunk)

这里也可以使用多进程方式,具体我们在后面进阶篇介绍。

  1. 过滤掉不需要的列

如果读取的文件列很多,可以使用usecols字段,只load需要的列,提高效率,节约内存。

df = pd.read_csv(filename,  usecols=["a","b","c"]):
  1. 为列指定类型
    panda在read_csv的时候,会自动匹配列的数值类型,这样会导致速度很慢,并且占用内存较大。我们可以为每个列指定类型。
df = pd.read_csv("f500.csv", dtype = {"revenues" : "float64","name":str})
  1. 保存为其他格式
    如果需要频繁的读取和写入,则可以将文件保存为其他格式的,如pickle或hdf。pickle和hdf读取速度是csv的数倍。这里注意一下,pickle比原csv略小,hdf比原csv略大。
 import pandas as pd
 #读取csv
 df = pd.read_csv('filename.csv')
 
 #pickle格式
 df.to_pickle('filename.pkl') 
 df = pd.read_pickle('filename.pkl') 
 
 #hdf格式
 df.to_hdf('filename.hdf','df') 
 df = pd.read_hdf('filename.hdf','df') 

file typetimespeed
csv1.93 s1
pickle415 m4.6
hdf808 ms2.3
  1. 用第三方的包读取
    如可以使用Dask DataFrame读取大文件。第三方包放到最后细讲。

2.5 优化数据处理逻辑

这点算是业务角度优化。如果我们能直接数据处理的步骤,那么处理时间就少了很多。

这需要具体问题具体分析,举个例子,假设我们有一个通讯记录数据集:

call_areacall_seconds
03364
23075
25847
12032

call_seconds 是拨打的时长,单位是秒。
call_area=1 是拨打国内电话,费率是0.1/min,call_area=0 是拨打国外电话,费率是0.7/min
call_area=2 是接听电话,不收费。

我们随机生成一批数据:

import pandas as pd
import numpy as np
import time
import math
row_number=10000
df = pd.DataFrame({'call_area':  np.random.randint(0, 3, (row_number)),
                   'call_seconds':  np.random.randint(0, 10000, (row_number))})

如果我们想计算每个电话的费用:

def get_cost(call_area,call_seconds):
    if call_area==1:
        rate=0.1
    elif call_area==0:
        rate=0.7
    else:
        return 0
    return math.ceil(call_seconds/60)*rate

方法1,采用按行迭代循环

%%timeit
cost_list = []
for i,row in df.iterrows():
    call_area=row['call_area']
    call_seconds=row['call_seconds']
    cost_list.append(get_cost(call_area,call_seconds))
df['cost'] = cost_list

方法2,采用apply行的方式

%%timeit
df['cost']=df.apply(
         lambda row: get_cost(
             row['call_area'],
             row['call_seconds']),
         axis=1)

方法3,采用mask+loc,分组计算

%%timeit
mask1=df.call_area==1
mask2=df.call_area==0
mask3=df.call_area==2
df['call_mins']=np.ceil(df['call_seconds']/60)
df.loc[mask1,'cost']=df.loc[mask1,'call_mins']*0.1
df.loc[mask2,'cost']=df.loc[mask2,'call_mins']*0.7
df.loc[mask3,'cost']=0

方法4,使用numpy的方式,可以使用index的方式找到对应的费用。

%%timeit
prices = np.array([0.7, 0.1, 0])
mins=np.ceil(df['call_seconds'].values/60)
df['cost']=prices[df.call_area]*mins

测试结果:

方法运行时间运行速度
iterrows513 ms1
apply181 ms2.8
loc6.44 ms79.6
numpy219 µs2342

方法4速度快是因为它采用了numpy向量化的数据处理方式。

总结

在优化尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。

3.Pandas性能优化:进阶篇

在这里介绍一些更高级的pandas优化方法。

3.1 numpy

我们先来回顾一下上节说过的一个例子

import pandas as pd
import numpy as np
import time
row_number=100000
df = pd.DataFrame({'a': np.random.randn(row_number),
                     'b': np.random.randn(row_number),
                    'N': np.random.randint(100, 1000, (row_number)),
                   'x':  np.random.randint(1, 10, (row_number))})

我们要计算a列与b列的乘积

方法1,采用apply

%timeit df.apply( lambda row: row['a']*row['b'],axis=1)

方法2,直接对series做乘法

%timeit df['a']*df['b']

方法3,使用numpy函数

%timeit  np.multiply(df['a'].values,df['b'].values)
方法运行时间运行速度
方法11.45s1
方法2254µs5708
方法341.2 µs3536

这提示我们,采用一些好的方法可以大幅度提高pandas的运行速度。

3.2 cython

我们还继续使用上面的dataframe,现在定义一个函数:

def f(x):
    return x * (x - 1)

def integrate_f(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f(a + i * dx)
    return s * dx

我们要计算每一行integrate_f的值,

方法1,还是apply:

%timeit df.apply(lambda x: integrate_f(x['a'], x['b'], x['N'].astype(int)), axis=1)

这个函数运行时间就较长了:

7.05 s ± 54.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

考虑可以用cython重写程序,提高效率。
在使用cython的时候,可能需要安装gcc环境或者mingw(windows)。

方法1,直接加头编译

%load_ext Cython
%%cython
def f_plain(x):
    return x * (x - 1)

def integrate_f_plain(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

6.46 s ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

看来直接加头,效率提升不大。

方法2,使用c type

%%cython
cdef double f_typed(double x) except? -2:
     return x * (x - 1)
cpdef double integrate_f_typed(double a, double b, int N):
    cdef int i
    cdef double s, dx
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_typed(a + i * dx)
    return s * dx

345 ms ± 529 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

可以看到,使用cython的特定编程方法,效率提升较大。

3.3 numba

numba是一个动态JIT编译器,在一些数值计算中可以大幅度提高运行速度。
我们学cython,在python程序上直接加numba jit的头。

import numba

@numba.jitdef f_plain(x):
    return x * (x - 1)

@numba.jitdef integrate_f_numba(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

@numba.jitdef apply_integrate_f_numba(col_a, col_b, col_N):
    n = len(col_N)
    result = np.empty(n, dtype='float64')
    assert len(col_a) == len(col_b) == n
    for i in range(n):
        result[i] = integrate_f_numba(col_a[i], col_b[i], col_N[i])
    return result

def compute_numba(df):
    result = apply_integrate_f_numba(df['a'].to_numpy(),
                                     df['b'].to_numpy(),
                                     df['N'].to_numpy())
    return pd.Series(result, index=df.index, name='result')

 %timeit compute_numba(df)

6.44 ms ± 440 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

我们看到,使用numba,需要做的代码改动较小,效率提升幅度却很大!

3.4 进阶 并行化处理

并行化读取数据

在基础篇讲分块读取时,简单提了一下并行化处理,这里详细说下代码。

第一种思路,分块读取,多进程处理。

import pandas as pd
from multiprocessing import Pool
 
def process(df):
    """
  数据处理
    """
    pass
 
# initialise the iterator object
iterator = pd.read_csv('train.csv', chunksize=200000, compression='gzip',
skipinitialspace=True, encoding='utf-8')
# depends on how many cores you want to utilise
max_processors = 4# Reserve 4 cores for our script
pool = Pool(processes=max_processors)
f_list = []
for df in iterator:
    # 异步处理每个分块
    f = pool.apply_async(process, [df])
    f_list.append(f)
    if len(f_list) >= max_processors:
        for f in f_list:
            f.get()
            del f_list[:]

第二种思路,把大文件拆分成多份,多进程读取。

利用linux中的split命令,将csv切分成p个文件。

!split -l 200000 -d train.csv train_split
#将文件train.csv按每200000行分割,前缀名为train_split,并设置文件命名为数字

代码部分

from multiprocessing import Pool
import pandas as pd
import os
 
def read_func(file_path):
    df = pd.read_csv(file_path, header=None)
    return df
 
def read_file():
    file_list=["train_split%02d"%i for i in range(66)]
    p = Pool(4)
    res = p.map(read_func, file_list)
    p.close()
    p.join()
    df = pd.concat(res, axis=0, ignore_index=True)
    return df
 
df = read_file()

并行化apply

apply的func如果在用了我们之前说的技术优化了速度之后仍然很慢,或者func遇到网络阻塞,那么我们需要去并行化执行apply。这里提供一种处理思路:

import multiprocessing as mp
import time
def slow_func(s):
    time.sleep(1)
    return "done"with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(slow_func, df['qid'])

3.5 进阶 第三方pandas库

由于padans的操作如apply,都是单线程的,直接调用效率不高。我可以使用第三方库进行并行操作。
当然第三方库会带来新的代码不兼容问题。我们有时候会考虑像上一章一样,手写并行化处理。这个权衡需要我们在编程之初就要规划好,避免后期因为bug需要重构。

dask库

pip install dask

类pandas库,可以并行读取、运行。

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import getand the syntax isdata = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def some_function(x,y,z):
    return x+y+z

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1))
.compute(get=get)  

swifter

pip install swifter

pandas的插件,可以直接在pandas上操作:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

Modin库

Modin后端使用dask或者ray,是个支持分布式运行的类pandas库,当然功能异常强大。具体请看官网,这里就不具体介绍了。

https://modin.readthedocs.io/en/latest/using_modin.html

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Selenium 实战教程-爬取人民网留言板单进程、多线程、多进程版

本文讲解了如何通过selenium爬取人民网留言板留言,并从单进程、多线程、多进程版角度进行爬虫性能优化,是一篇优秀的 selenium 实战教程。

第一节:单进程版+selenium实战教程

一、项目概述

1.项目说明

本项目主要是对领导留言板内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。网站链接是http://liuyan.people.com.cn/home?p=0,任意选择一条留言点击进入详情页后,如下对于图中标出的数据,均要进行爬取,以此构成一条留言的组成部分。

2.环境配置

(1)Python:3.x
(2)所需库:

  • dateutil
    • 安装方法:
    • pip install python-dateutil
  • selenium
    • 安装方法:
    • pip install selenium

(3)模拟驱动:chromedriver,可点击https://download.csdn.net/download/CUFEECR/12193208进行下载Google浏览器80.0.3987.16版对应版本,或点击http://chromedriver.storage.googleapis.com/index.html下载与Google对应版本,并放入Python对应安装路径下的Scripts目录下。

二、项目实施

1.导入所需要的库

import csv
import os
import random
import re
import time

import dateutil.parser as dparser
from random import choice
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options

主要导入在爬取过程中需要用到的处理库和selenium中要用到的类。

2.全局变量和参数配置

## 时间节点
start_date = dparser.parse('2019-06-01')
## 浏览器设置选项
chrome_options = Options()
chrome_options.add_argument('blink-settings=imagesEnabled=false')

我们假设只爬取2019.6.1以后的留言,因为这之前的留言自动给好评,没有参考价值,因此设置时间节点,并禁止网页加载图片,减少对网络的带宽要求、提升加载速率。

3.产生随机时间和用户代理

def get_time():
    '''获取随机时间'''
    return round(random.uniform(36), 1)


def get_user_agent():
    '''获取随机用户代理'''
    user_agents = [
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
        "Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
        "Mozilla/5.0 (iPod; U; CPU iPhone OS 2_1 like Mac OS X; ja-jp) AppleWebKit/525.18.1 (KHTML, like Gecko) Version/3.1.1 Mobile/5F137 Safari/525.20",
        "Mozilla/5.0 (Linux;u;Android 4.2.2;zh-cn;) AppleWebKit/534.46 (KHTML,like Gecko) Version/5.1 Mobile Safari/10600.6.3 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
        "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    ]
    ## 在user_agent列表中随机产生一个代理,作为模拟的浏览器
    user_agent = choice(user_agents)
    return user_agent

产生随机时间并随机模拟浏览器用于访问网页,降低被服务器识别出是爬虫而被禁的可能。

4.获取领导的fid

def get_fid():
    '''获取所有领导id'''
    with open('url_fid.txt''r'as f:
        content = f.read()
        fids = content.split()
    return fids

每个领导都有一个fid用于区分,这里采用手动获取fid并保存到txt中,在开始爬取时再逐行读取。

5.获取领导所有留言链接

def get_detail_urls(position, list_url):
    '''获取每个领导的所有留言链接'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    drivertemp = webdriver.Chrome(options=chrome_options)
    drivertemp.maximize_window()
    drivertemp.get(list_url)
    time.sleep(2)
    ## 循环加载页面
    while True:
        datestr = WebDriverWait(drivertemp, 10).until(
            lambda driver: driver.find_element_by_xpath(
                '//*[@id="list_content"]/li[position()=last()]/h3/span')).text.strip()
        datestr = re.search(r'\d{4}-\d{2}-\d{2}', datestr).group()
        date = dparser.parse(datestr, fuzzy=True)
        print('正在爬取链接 --', position, '--', date)
        if date < start_date:
            break
        ## 模拟点击加载
        try:
            WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.ID, "show_more")))
            drivertemp.execute_script('window.scrollTo(document.body.scrollHeight, document.body.scrollHeight - 600)')
            time.sleep(get_time())
            drivertemp.execute_script('window.scrollTo(document.body.scrollHeight - 600, document.body.scrollHeight)')
            WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.XPATH, '//*[@id="show_more"]')))
            drivertemp.find_element_by_xpath('//*[@id="show_more"]').click()
        except:
            break
        time.sleep(get_time() - 1)
    detail_elements = drivertemp.find_elements_by_xpath('//*[@id="list_content"]/li/h2/b/a')
    ## 获取所有链接
    for element in detail_elements:
        detail_url = element.get_attribute('href')
        yield detail_url
    drivertemp.quit()

根据第4步提供的fid找到一个领导对应的所有留言的链接,由于领导的留言列表并未一次显示完,下方有一个加载更多按钮,如下每次需要进行点击向下加载,所以要模拟点击的操作,向下滑动,等完全加载后再次点击,直到底部。函数返回值时,不是一次返回一个列表,而是通过yield关键字生成生成器,按照程序执行的进度生成url,可以减少内存的压力。

6.获取留言详情

def get_message_detail(driver, detail_url, writer, position):
    '''获取留言详情'''
    print('正在爬取留言 --', position, '--', detail_url)
    driver.get(detail_url)
    ## 判断,如果没有评论则跳过
    try:
        satis_degree = WebDriverWait(driver, 2.5).until(
            lambda driver: driver.find_element_by_class_name("sec-score_firstspan")).text.strip()
    except:
        return
    ## 获取留言各部分内容
    message_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/h3/span")).text
    message_date = re.search(r'\d{4}-\d{2}-\d{2}', message_date_temp).group()
    message_datetime = dparser.parse(message_date, fuzzy=True)
    if message_datetime < start_date:
        return
    message_title = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_class_name("context-title-text")).text.strip()
    label_elements = WebDriverWait(driver, 2.5).until(lambda driver: driver.find_elements_by_class_name("domainType"))
    try:
        label1 = label_elements[0].text.strip()
        label2 = label_elements[1].text.strip()
    except:
        label1 = ''
        label2 = label_elements[0].text.strip()
    message_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/p")).text.strip()
    replier = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[1]/i")).text.strip()
    reply_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/p")).text.strip()
    reply_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[2]/em")).text
    reply_date = re.search(r'\d{4}-\d{2}-\d{2}', reply_date_temp).group()
    review_scores = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_elements_by_xpath("/html/body/div[8]/ul/li[2]/h4[1]/span/span/span"))
    resolve_degree = review_scores[0].text.strip()[:-1]
    handle_atti = review_scores[1].text.strip()[:-1]
    handle_speed = review_scores[2].text.strip()[:-1]
    review_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/p")).text.strip()
    is_auto_review = '是' if (('自动默认好评' in review_content) or ('默认评价' in review_content)) else '否'
    review_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/h4[2]/em")).text
    review_date = re.search(r'\d{4}-\d{2}-\d{2}', review_date_temp).group()
    ## 存入CSV文件
    writer.writerow(
        [position, message_title, label1, label2, message_date, message_content, replier, reply_content, reply_date,
         satis_degree, resolve_degree, handle_atti, handle_speed, is_auto_review, review_content, review_date])

我们只需要有评论的留言,因此在最开始要过滤掉没有评论的留言。然后通过xpath、class_name等方式定位到相应的元素获取留言的各个部分的内容,每条留言共保存14个内容,并保存到csv中。

7.获取并保存领导所有留言

def get_officer_messages(index, fid):
    '''获取并保存领导的所有留言'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    driver = webdriver.Chrome(options=chrome_options)
    list_url = "http://liuyan.people.com.cn/threads/list?fid={}##state=4".format(fid)
    driver.get(list_url)
    position = WebDriverWait(driver, 10).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[4]/i")).text
    ## time.sleep(get_time())
    print(index, '-- 正在爬取 --', position)
    start_time = time.time()
    ## encoding='gb18030'
    csv_name = position + '.csv'
    ## 文件存在则删除重新创建
    if os.path.exists(csv_name):
        os.remove(csv_name)
    with open(csv_name, 'a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for detail_url in get_detail_urls(position, list_url):
            get_message_detail(driver, detail_url, writer, position)
            time.sleep(get_time())
    end_time = time.time()
    crawl_time = int(end_time - start_time)
    crawl_minute = crawl_time // 60
    crawl_second = crawl_time % 60
    print(position, '已爬取结束!!!')
    print('该领导用时:{}分钟{}秒。'.format(crawl_minute, crawl_second))
    driver.quit()
    time.sleep(5)

获取该领导的职位信息并为该领导创建一个独立的csv用于保存提取到的留言信息,调用get_message_detail()方法获取每条留言的具体信息并保存,计算出每个领导的执行时间。

8.合并文件

def merge_csv():
    '''将所有文件合并'''
    file_list = os.listdir('.')
    csv_list = []
    for file in file_list:
        if file.endswith('.csv'):
            csv_list.append(file)
    ## 文件存在则删除重新创建
    if os.path.exists('DATA.csv'):
        os.remove('DATA.csv')
    with open('DATA.csv''a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for csv_file in csv_list:
            with open(csv_file, 'r', encoding='gb18030'as csv_f:
                reader = csv.reader(csv_f)
                line_count = 0
                for line in reader:
                    line_count += 1
                    if line_count != 1:
                        writer.writerow(
                            (line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7], line[8],
                             line[9], line[10], line[11], line[12], line[13], line[14], line[15]))

将爬取的所有领导的数据进行合并。

9.主函数调用

def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    for index, fid in enumerate(fids):
        try:
            get_officer_messages(index + 1, fid)
        except:
            get_officer_messages(index + 1, fid)
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()

主函数中先获取领导所有留言,再合并所有数据文件,完成整个爬取过程,并统计整个程序的运行时间,便于分析运行效率。

三、结果、分析及说明

1.结果说明

完整代码和测试执行结果可点击https://download.csdn.net/download/CUFEECR/12198734下载,仅供交流学习,请勿滥用。整个执行过程较长,因为是单线程的,必须要等一个领导数据爬取完毕之后才能爬取下一个,我选择了10个领导进行测试,在云服务器中的运行结果分别如下显然,整个运行时间将近5小时,效率相对较低,有很大的提升空间。最终得到了合并的DATA.csv

2.改进分析

(1)该版本的代码未实现自动爬取所有的fid,需要手动保存,是其中一点不足,可以在后期改进。(2)爬取留言详情页也是采用的selenium模拟,会降低请求效率,可以考虑用requests库请求。(3)该版本是单进程(线程)的,必须要一个领导爬取完之后才能进行下一个领导的爬取,效率较低,特别是留言较多的领导耗时很长,可以考虑使用多进程或多线程进行优化。

第二节:多线程版+selenium实战教程

一、项目概述

本项目主要是对领导留言板http://liuyan.people.com.cn/home?p=0内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。具体项目说明和环境配置可参考本系列的第一篇Python 爬取留言板留言(一):单进程版+selenium模拟。本篇在第一篇的基础上做了一些改进

  1. 采用了多线程,设定同时运行的线程的数量为3,线程数量适中,这样在保证在同一时刻有多个线程在执行爬取的同时,也能避免线程过多对内存、CPU和网络带宽的高要求,从而大大降低了整体运行时间,这是该项目的主要改进。
  2. 对异常处理进行了优化,之前异常处理是放在获取一个领导对应的所有的留言链接函数里的,当获取不到加载更多按钮并且超时时就会抛出异常,这样使得如果异常发生在其他部分如获取留言详情时会被忽略,改进之后将其放入主函数,对于每一个领导都放入异常处理,这里涵盖了对该领导爬取时的所有操作,只要在任一环节报错都会捕捉到,同时增加了5层嵌套异常处理,增加了对出现异常的容忍度(在发生网络环境不好而加载不出页面、内存消耗较多而卡顿、被被爬取方反爬而不能爬取等情况时,可以对官员重新爬取以保证数据的完整程度和程序的健壮性)。

二、项目实施

由于在实现过程中有3种常用的方法实现多线程,因此对应也有3种不同的具体实现,这里选第1种进行说明:

1.导入所需要的库

import csv
import os
import random
import re
import time
import threading

import dateutil.parser as dparser
from random import choice
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options

主要导入在爬取过程中需要用到的处理库和selenium中要用到的类。

2.全局变量和参数配置

## 时间节点
start_date = dparser.parse('2019-06-01')
## 控制同时运行的线程数为3
sem = threading.Semaphore(3)
## 浏览器设置选项
chrome_options = Options()
chrome_options.add_argument('blink-settings=imagesEnabled=false')

我们假设只爬取2019.6.1以后的留言,因为这之前的留言自动给好评,没有参考价值,因此设置时间节点,同时在全局中设置同时运行的线程数为3,并禁止网页加载图片,减少对网络的带宽要求、提升加载速率。

3.产生随机时间和用户代理

def get_time():
    '''获取随机时间'''
    return round(random.uniform(36), 1)


def get_user_agent():
    '''获取随机用户代理'''
    user_agents = [
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
        "Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
        "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 2.0.50727; Media Center PC 6.0)",
        "Mozilla/5.0 (Linux; Android 5.1.1; Nexus 6 Build/LYZ28E) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.23 Mobile Safari/537.36",
        "Mozilla/5.0 (iPod; U; CPU iPhone OS 2_1 like Mac OS X; ja-jp) AppleWebKit/525.18.1 (KHTML, like Gecko) Version/3.1.1 Mobile/5F137 Safari/525.20",
        "Mozilla/5.0 (Linux;u;Android 4.2.2;zh-cn;) AppleWebKit/534.46 (KHTML,like Gecko) Version/5.1 Mobile Safari/10600.6.3 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
        "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    ]
    ## 在user_agent列表中随机产生一个代理,作为模拟的浏览器
    user_agent = choice(user_agents)
    return user_agent

产生随机时间并随机模拟浏览器用于访问网页,降低被服务器识别出是爬虫而被禁的可能。

4.获取领导的fid

def get_fid():
    '''获取所有领导id'''
    with open('url_fid.txt''r'as f:
        content = f.read()
        fids = content.split()
    return fids

每个领导都有一个fid用于区分,这里采用手动获取fid并保存到txt中,在开始爬取时再逐行读取。

5.获取领导所有留言链接

def get_detail_urls(position, list_url):
    '''获取每个领导的所有留言链接'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    drivertemp = webdriver.Chrome(options=chrome_options)
    drivertemp.maximize_window()
    drivertemp.get(list_url)
    time.sleep(2)
    ## 循环加载页面
    try:
        while WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.ID, "show_more"))):
            datestr = WebDriverWait(drivertemp, 10).until(
                lambda driver: driver.find_element_by_xpath(
                    '//*[@id="list_content"]/li[position()=last()]/h3/span')).text.strip()
            datestr = re.search(r'\d{4}-\d{2}-\d{2}', datestr).group()
            date = dparser.parse(datestr, fuzzy=True)
            print('正在爬取链接 --', position, '--', date)
            if date < start_date:
                break
            ## 模拟点击加载
            drivertemp.find_element_by_xpath('//*[@id="show_more"]').click()
            time.sleep(get_time())
        detail_elements = drivertemp.find_elements_by_xpath('//*[@id="list_content"]/li/h2/b/a')
        ## 获取所有链接
        for element in detail_elements:
            detail_url = element.get_attribute('href')
            yield detail_url
        drivertemp.quit()
    except TimeoutException:
        drivertemp.quit()
        get_detail_urls(position, list_url)

根据第4步提供的fid找到一个领导对应的所有留言的链接,由于领导的留言列表并未一次显示完,下方有一个加载更多按钮,如下每次需要进行点击向下加载,所以要模拟点击的操作,向下滑动,等完全加载后再次点击,直到底部,有可能会滑倒页面最底部不再显示按钮或者由于被反爬或网络不好而未加载出来,此时定位元素会超时,增加异常处理,递归调用。函数返回值时,不是一次返回一个列表,而是通过yield关键字生成生成器,按照程序执行的进度生成url,可以减少内存的压力。

6.获取留言详情

def get_message_detail(driver, detail_url, writer, position):
    '''获取留言详情'''
    print('正在爬取留言 --', position, '--', detail_url)
    driver.get(detail_url)
    ## 判断,如果没有评论则跳过
    try:
        satis_degree = WebDriverWait(driver, 2.5).until(
            lambda driver: driver.find_element_by_class_name("sec-score_firstspan")).text.strip()
    except:
        return
    ## 获取留言各部分内容
    message_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/h3/span")).text
    message_date = re.search(r'\d{4}-\d{2}-\d{2}', message_date_temp).group()
    message_datetime = dparser.parse(message_date, fuzzy=True)
    if message_datetime < start_date:
        return
    message_title = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_class_name("context-title-text")).text.strip()
    label_elements = WebDriverWait(driver, 2.5).until(lambda driver: driver.find_elements_by_class_name("domainType"))
    try:
        label1 = label_elements[0].text.strip()
        label2 = label_elements[1].text.strip()
    except:
        label1 = ''
        label2 = label_elements[0].text.strip()
    message_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/p")).text.strip()
    replier = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[1]/i")).text.strip()
    reply_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/p")).text.strip()
    reply_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[2]/em")).text
    reply_date = re.search(r'\d{4}-\d{2}-\d{2}', reply_date_temp).group()
    review_scores = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_elements_by_xpath("/html/body/div[8]/ul/li[2]/h4[1]/span/span/span"))
    resolve_degree = review_scores[0].text.strip()[:-1]
    handle_atti = review_scores[1].text.strip()[:-1]
    handle_speed = review_scores[2].text.strip()[:-1]
    review_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/p")).text.strip()
    is_auto_review = '是' if (('自动默认好评' in review_content) or ('默认评价' in review_content)) else '否'
    review_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/h4[2]/em")).text
    review_date = re.search(r'\d{4}-\d{2}-\d{2}', review_date_temp).group()
    ## 存入CSV文件
    writer.writerow(
        [position, message_title, label1, label2, message_date, message_content, replier, reply_content, reply_date,
         satis_degree, resolve_degree, handle_atti, handle_speed, is_auto_review, review_content, review_date])

我们只需要有评论的留言,因此在最开始要过滤掉没有评论的留言。然后通过xpath、class_name等方式定位到相应的元素获取留言的各个部分的内容,每条留言共保存14个属性,并保存到csv中。

7.获取并保存领导所有留言

user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    driver = webdriver.Chrome(options=chrome_options)
    list_url = "http://liuyan.people.com.cn/threads/list?fid={}##state=4".format(fid)
    driver.get(list_url)
    try:
        position = WebDriverWait(driver, 10).until(
            lambda driver: driver.find_element_by_xpath("/html/body/div[4]/i")).text
        print(index, '-- 正在爬取 --', position)
        start_time = time.time()
        csv_name = position + '.csv'
        ## 文件存在则删除重新创建
        if os.path.exists(csv_name):
            os.remove(csv_name)
        with open(csv_name, 'a+', newline='', encoding='gb18030'as f:
            writer = csv.writer(f, dialect="excel")
            writer.writerow(
                ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
                 '办理速度分''是否自动好评''评价内容''评价日期'])
            for detail_url in get_detail_urls(position, list_url):
                get_message_detail(driver, detail_url, writer, position)
                time.sleep(get_time())
        end_time = time.time()
        crawl_time = int(end_time - start_time)
        crawl_minute = crawl_time // 60
        crawl_second = crawl_time % 60
        print(position, '已爬取结束!!!')
        print('该领导用时:{}分钟{}秒。'.format(crawl_minute, crawl_second))
        driver.quit()
        time.sleep(5)
    except:
        driver.quit()
        get_officer_messages(index, fid)

获取该领导的职位信息并为该领导创建一个独立的csv用于保存提取到的留言信息,增加异常处理递归调用,调用get_message_detail()方法获取每条留言的具体信息并保存,计算出每个领导的执行时间。

8.合并文件

def merge_csv():
    '''将所有文件合并'''
    file_list = os.listdir('.')
    csv_list = []
    for file in file_list:
        if file.endswith('.csv'):
            csv_list.append(file)
    ## 文件存在则删除重新创建
    if os.path.exists('DATA.csv'):
        os.remove('DATA.csv')
    with open('DATA.csv''a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for csv_file in csv_list:
            with open(csv_file, 'r', encoding='gb18030'as csv_f:
                reader = csv.reader(csv_f)
                line_count = 0
                for line in reader:
                    line_count += 1
                    if line_count != 1:
                        writer.writerow(
                            (line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7], line[8],
                             line[9], line[10], line[11], line[12], line[13], line[14], line[15]))

将爬取的所有领导的数据进行合并。

9.主函数调用

多线程的实现主要在这部分,有3种方式实现:

  • 这通过threading.Semaphore()指定线程数量,后边在实现作为线程参数的函数时使用上下文处理器
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    thread_list = []
    ## 将所有线程加入线程列表,便于控制同时执行的线程数量
    for index, fid in enumerate(fids):
        t = threading.Thread(target=get_officer_messages, args=(index + 1, fid))
        thread_list.append([t, fid])
    for thread, fid in thread_list:
        ## 5层嵌套进行异常处理
        try:
            thread.start()
        except:
            try:
                thread.start()
            except:
                try:
                    thread.start()
                except:
                    try:
                        thread.start()
                    except:
                        try:
                            thread.start()
                        except:
                            ## 如果仍出现异常加入失败名单
                            print('该官员爬取失败,已存入失败名单,以备再爬')
                            if not os.path.exists('fid_not_success.txt'):
                                with open('fid_not_success.txt''a+'as f:
                                    f.write(fid)
                            else:
                                with open('fid_not_success.txt''a+'as f:
                                    f.write('\n' + fid)
                            continue
    for thread, fid in thread_list:
        thread.join()
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()
  • 通过concurrent.futures.ThreadPoolExecutor指定线程数量,并调用submit()函数实现线程的调用执行
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    with ThreadPoolExecutor(3as executor:
        for index, fid in enumerate(fids):
            ## 5层嵌套进行异常处理
            try:
                executor.submit(get_officer_messages, index + 1, fid)
            except:
                try:
                    executor.submit(get_officer_messages, index + 1, fid)
                except:
                    try:
                        executor.submit(get_officer_messages, index + 1, fid)
                    except:
                        try:
                            executor.submit(get_officer_messages, index + 1, fid)
                        except:
                            try:
                                executor.submit(get_officer_messages, index + 1, fid)
                            except:
                                ## 如果仍出现异常加入失败名单
                                print('该官员爬取失败,已存入失败名单,以备再爬')
                                if not os.path.exists('fid_not_success.txt'):
                                    with open('fid_not_success.txt''a+'as f:
                                        f.write(fid)
                                else:
                                    with open('fid_not_success.txt''a+'as f:
                                        f.write('\n' + fid)
                                continue
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()
  • 通过concurrent.futures.ThreadPoolExecutor指定线程数量,并调用map()函数实现函数和多个参数的映射来执行线程

def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    with ThreadPoolExecutor(3as executor:
        executor.map(get_officer_messages, range(1, len(fids) + 1), fids)
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()

主函数中先通过多线程获取领导所有留言,再合并所有数据文件,完成整个爬取过程,并统计整个程序的运行时间,便于分析运行效率。

三、结果、分析及说明

1.结果说明

3个完整代码和测试执行结果可点击https://download.csdn.net/download/CUFEECR/12199138下载,欢迎进行测试和交流学习,请勿滥用。整个执行过程相比于单线程大大缩短了时间,我选择了10个领导进行测试,它们的留言数量有差异,以便于发现多线程的优势,在云服务器中的运行结果分别如下运行时间缩短到不到1小时半左右,约等于第一篇单线程的三分之一,因为同一时刻有3个子线程执行,大大降低了运行时间,效率比之前提高很多,加入多线程之后,可以让运行时间较长和较短的相互补充,同时多个线程同时运行,在同一时刻爬取多个领导,很显然大大缩短了运行时间。最后得到了合并的DATA.csv:可以进一步总结多线程的优势 :

  • 易于调度
  • 提高并发性:通过线程可方便有效地实现并发性。进程可创建多个线程来执行同一程序的不同部分。
  • 开销少:创建线程比创建进程要快,所需开销很少。
  • 利于充分发挥多处理器的功能:通过创建多线程进程,每个线程在一个处理器上运行,从而实现应用程序的并发性,使每个处理器都得到充分运行。

2.改进分析

(1)该版本的代码仍未实现自动爬取所有的fid,需要手动保存,是其中一点不足,可以在后期改进。(2)爬取留言详情页仍然采用的selenium模拟,会降低请求效率,可以考虑用requests库请求。(3)该版本对于反爬的措施较弱,因此很多时候会出现异常,比如得到的页面不正常找不到对应的元素,请求时间延长等,可以在之后的版本加入进一步的防反爬措施,进一步增加代码的健壮性。

第三节:多进程版+selenium实战教程

一、项目概述

本项目主要是对领导留言板http://liuyan.people.com.cn/home?p=0内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。具体项目说明和环境配置可参考本系列的第一篇Python 爬取留言板留言(一):单进程版+selenium模拟
本篇在第二篇的基础上做了一个主要改进:
从多线程改变为多进程,设定同时运行的进程的数量为3,数量适中,这样在保证在同一时刻有多个进程在执行爬取的同时,也能避免进程过多对内存、CPU和网络带宽的高要求,从而大大降低了整体运行时间,这是该项目的主要改进。

二、项目实施

由于在实现过程中有2种常用的方法实现多线程,因此对应也有2种不同的具体实现。

1.导入所需要的库

import csv
import os
import random
import re
import time


import dateutil.parser as dparser
from random import choice
from multiprocessing import Pool
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options

主要导入在爬取过程中需要用到的处理库和selenium中要用到的类。

2.全局变量和参数配置

## 时间节点
start_date = dparser.parse('2019-06-01')
## 浏览器设置选项
chrome_options = Options()
chrome_options.add_argument('blink-settings=imagesEnabled=false')

我们假设只爬取2019.6.1以后的留言,因为这之前的留言自动给好评,没有参考价值,因此设置时间节点,并禁止网页加载图片,减少对网络的带宽要求、提升加载速率。

3.产生随机时间和用户代理

def get_time():
    '''获取随机时间'''
    return round(random.uniform(36), 1)


def get_user_agent():
    '''获取随机用户代理'''
    user_agents = [
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1",
        "Mozilla/5.0 (Linux; Android 5.1.1; Nexus 6 Build/LYZ28E) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.23 Mobile Safari/537.36",
        "Mozilla/5.0 (iPod; U; CPU iPhone OS 2_1 like Mac OS X; ja-jp) AppleWebKit/525.18.1 (KHTML, like Gecko) Version/3.1.1 Mobile/5F137 Safari/525.20",
        "Mozilla/5.0 (Linux;u;Android 4.2.2;zh-cn;) AppleWebKit/534.46 (KHTML,like Gecko) Version/5.1 Mobile Safari/10600.6.3 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
        "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    ]
    ## 在user_agent列表中随机产生一个代理,作为模拟的浏览器
    user_agent = choice(user_agents)
    return user_agent

产生随机时间并随机模拟浏览器用于访问网页,降低被服务器识别出是爬虫而被禁的可能。

4.获取领导的fid

def get_fid():
    '''获取所有领导id'''
    with open('url_fid.txt''r'as f:
        content = f.read()
        fids = content.split()
    return fids

每个领导都有一个fid用于区分,这里采用手动获取fid并保存到txt中,在开始爬取时再逐行读取。

5.获取领导所有留言链接

def get_detail_urls(position, list_url):
    '''获取每个领导的所有留言链接'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    drivertemp = webdriver.Chrome(options=chrome_options)
    drivertemp.maximize_window()
    drivertemp.get(list_url)
    time.sleep(2)
    ## 循环加载页面
    try:
        while WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.ID, "show_more"))):
            datestr = WebDriverWait(drivertemp, 10).until(
                lambda driver: driver.find_element_by_xpath(
                    '//*[@id="list_content"]/li[position()=last()]/h3/span')).text.strip()
            datestr = re.search(r'\d{4}-\d{2}-\d{2}', datestr).group()
            date = dparser.parse(datestr, fuzzy=True)
            print('正在爬取链接 --', position, '--', date)
            if date < start_date:
                break
            ## 模拟点击加载
            drivertemp.find_element_by_xpath('//*[@id="show_more"]').click()
            time.sleep(get_time())
        detail_elements = drivertemp.find_elements_by_xpath('//*[@id="list_content"]/li/h2/b/a')
        ## 获取所有链接
        for element in detail_elements:
            detail_url = element.get_attribute('href')
            yield detail_url
        drivertemp.quit()
    except TimeoutException:
        drivertemp.quit()
        get_detail_urls(position, list_url)

根据第4步提供的fid找到一个领导对应的所有留言的链接,由于领导的留言列表并未一次显示完,下方有一个加载更多按钮,如下每次需要进行点击向下加载,所以要模拟点击的操作,向下滑动,等完全加载后再次点击,直到底部,有可能会滑倒页面最底部不再显示按钮或者由于被反爬或网络不好而未加载出来,此时定位元素会超时,增加异常处理,递归调用。
函数返回值时,不是一次返回一个列表,而是通过yield关键字生成生成器,按照程序执行的进度生成url,可以减少内存的压力。

6.获取留言详情

def get_message_detail(driver, detail_url, writer, position):
    '''获取留言详情'''
    print('正在爬取留言 --', position, '--', detail_url)
    driver.get(detail_url)
    ## 判断,如果没有评论则跳过
    try:
        satis_degree = WebDriverWait(driver, 2.5).until(
            lambda driver: driver.find_element_by_class_name("sec-score_firstspan")).text.strip()
    except:
        return
    ## 获取留言各部分内容
    message_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/h3/span")).text
    message_date = re.search(r'\d{4}-\d{2}-\d{2}', message_date_temp).group()
    message_datetime = dparser.parse(message_date, fuzzy=True)
    if message_datetime < start_date:
        return
    message_title = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_class_name("context-title-text")).text.strip()
    label_elements = WebDriverWait(driver, 2.5).until(lambda driver: driver.find_elements_by_class_name("domainType"))
    try:
        label1 = label_elements[0].text.strip()
        label2 = label_elements[1].text.strip()
    except:
        label1 = ''
        label2 = label_elements[0].text.strip()
    message_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/p")).text.strip()
    replier = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[1]/i")).text.strip()
    reply_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/p")).text.strip()
    reply_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[2]/em")).text
    reply_date = re.search(r'\d{4}-\d{2}-\d{2}', reply_date_temp).group()
    review_scores = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_elements_by_xpath("/html/body/div[8]/ul/li[2]/h4[1]/span/span/span"))
    resolve_degree = review_scores[0].text.strip()[:-1]
    handle_atti = review_scores[1].text.strip()[:-1]
    handle_speed = review_scores[2].text.strip()[:-1]
    review_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/p")).text.strip()
    is_auto_review = '是' if (('自动默认好评' in review_content) or ('默认评价' in review_content)) else '否'
    review_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/h4[2]/em")).text
    review_date = re.search(r'\d{4}-\d{2}-\d{2}', review_date_temp).group()
    ## 存入CSV文件
    writer.writerow(
        [position, message_title, label1, label2, message_date, message_content, replier, reply_content, reply_date,
         satis_degree, resolve_degree, handle_atti, handle_speed, is_auto_review, review_content, review_date])

我们只需要有评论的留言,因此在最开始要过滤掉没有评论的留言。然后通过xpath、class_name等方式定位到相应的元素获取留言的各个部分的内容,每条留言共保存14个属性,并保存到csv中。

7.获取并保存领导所有留言

def get_officer_messages(index, fid):
    '''获取并保存领导的所有留言'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    driver = webdriver.Chrome(options=chrome_options)
    list_url = "http://liuyan.people.com.cn/threads/list?fid={}##state=4".format(fid)
    driver.get(list_url)
    try:
        position = WebDriverWait(driver, 10).until(
            lambda driver: driver.find_element_by_xpath("/html/body/div[4]/i")).text
        print(index, '-- 正在爬取 --', position)
        start_time = time.time()
        csv_name = position + '.csv'
        ## 文件存在则删除重新创建
        if os.path.exists(csv_name):
            os.remove(csv_name)
        with open(csv_name, 'a+', newline='', encoding='gb18030'as f:
            writer = csv.writer(f, dialect="excel")
            writer.writerow(
                ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
                 '办理速度分''是否自动好评''评价内容''评价日期'])
            for detail_url in get_detail_urls(position, list_url):
                get_message_detail(driver, detail_url, writer, position)
                time.sleep(get_time())
        end_time = time.time()
        crawl_time = int(end_time - start_time)
        crawl_minute = crawl_time // 60
        crawl_second = crawl_time % 60
        print(position, '已爬取结束!!!')
        print('该领导用时:{}分钟{}秒。'.format(crawl_minute, crawl_second))
        driver.quit()
        time.sleep(5)
    except:
        driver.quit()
        get_officer_messages(index, fid)

获取该领导的职位信息并为该领导创建一个独立的csv用于保存提取到的留言信息,增加异常处理递归调用,调用get_message_detail()方法获取每条留言的具体信息并保存,计算出每个领导的执行时间。

8.合并文件

def merge_csv():
    '''将所有文件合并'''
    file_list = os.listdir('.')
    csv_list = []
    for file in file_list:
        if file.endswith('.csv'):
            csv_list.append(file)
    ## 文件存在则删除重新创建
    if os.path.exists('DATA.csv'):
        os.remove('DATA.csv')
    with open('DATA.csv''a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for csv_file in csv_list:
            with open(csv_file, 'r', encoding='gb18030'as csv_f:
                reader = csv.reader(csv_f)
                line_count = 0
                for line in reader:
                    line_count += 1
                    if line_count != 1:
                        writer.writerow(
                            (line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7], line[8],
                             line[9], line[10], line[11], line[12], line[13], line[14], line[15]))

将爬取的所有领导的数据进行合并。

9.主函数调用

多线程的实现主要在这部分,有2种方式实现:

  • 通过for循环遍历所有任务和参数,并调用apply_async()函数将任务加入进程池
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    ## 创建进程池
    pool = Pool(3)
    ## 将任务加入进程池并传入参数
    for index, fid in zip(range(1, len(fids) + 1), fids):
        pool.apply_async(get_officer_messages, (index, fid))
    pool.close()
    pool.join()
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()
  • 通过调用map()函数将任务加入进程池并实现函数与参数的映射
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    ## 处理传入的参数,使之对应索引合并并且可迭代
    itera_merge = list(zip(range(1, len(fids) + 1), fids))
    ## 创建进程池
    pool = Pool(3)
    ## 将任务传入进程池并通过映射传入参数
    pool.map(get_officer_messages_enc, itera_merge)
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()

主函数中先通过多进程获取领导所有留言,再合并所有数据文件,完成整个爬取过程,并统计整个程序的运行时间,便于分析运行效率。

三、结果、分析及说明

1.结果说明

2个完整代码和测试执行结果可点击https://download.csdn.net/download/CUFEECR/12200752下载,欢迎进行测试和交流学习,请勿滥用
整个执行过程相比于单线程大大缩短了时间,我选择了10个领导进行测试,它们的留言数量有差异,以便于发现多线程的优势,在云服务器中的运行结果分别如下运行时间缩短到不到100分钟,与单进程相比大大缩短了时间、提高了效率,因为同一时刻有3个子进程执行。加入多进程之后,可以让运行时间较长和较短的相互补充,在任意时刻多个进程同时运行。但是也可以看出来与多线程相比,多进程的运行时间相对稍长,虽然差别不大,但是这可能就是性能的瓶颈。可能的原因是进程需要的资源更多,这对内存、CPU和网络的要求更高,从而对设备提出了更高的要求,有时可能设备性能跟不上程序要求,从而降低了效率。
最后得到了合并的DATA.csv:对于多线程和多进程的简单对比分析如下:
一个线程至少有一个进程,一个进程至少有一个线程,线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高,进程在执行过程中拥有独立的存储单元,而多个线程共享存储器,从而极大的提高了程序的运行效率。线程不能够独立执行,必须依存在进程中。

多线程:

  • 线程执行开销小(占用的资源非常少)但是不利于资源的管理和保护;
  • 如果需要共享数据,建议使用线程;
  • 适用于IO密集型任务(Web和文档读写等),遇到IO阻塞,速度远远小于CPU的运行速度,可以多开辟一些线程,有线程阻塞了,其他线程依然正常工作。

多进程:

  • 执行额开销比较大(占用的资源多),但是利于资源的管理和保护;
  • 适用于计算密集型(视频的译码编码和科学数据计算等)。

显然,在爬虫中应该偏向使用多线程。

2.改进分析

(1)该版本的代码仍未实现自动爬取所有的fid,需要手动保存,是其中一点不足,可以在后期改进。
(2)爬取留言详情页仍然采用的selenium模拟,会降低请求效率,可以考虑用requests库请求。
(3)该版本对于反爬的措施较弱,因此很多时候会出现异常,比如得到的页面不正常找不到对应的元素,请求时间延长等,可以在之后的版本加入进一步的防反爬措施,进一步增加代码的健壮性。

3.合法性说明

  • 本项目是为了学习和科研的目的,所有读者可以参考执行思路和程序代码,但不能用于恶意和非法目的(恶意攻击网站服务器、非法盈利等),如有违者请自行负责。
  • 本项目所获取的数据都是在进一步的分析之后用于对电子政务的实施改进,对政府的决策能起到一定的参考作用,并非于恶意抓取数据来攫取不正当竞争的优势,也未用于商业目的牟取不法利益,运行代码只是用几个fid进行测试,并非大范围地爬取,同时严格控制爬取的速率、争取不对服务器造成压力,如侵犯当事者(即被抓取的网络主体)的利益,请联系更改或删除。
  • 本项目是留言板爬取系列的第二篇,后期会继续更新,欢迎读者交流,以期不断改进。

作者:Corley

源自:快学python

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Prometheus + Granafa 40分钟构建MySQL监控平台实战教程

Prometheus + Granafa 概述

对于MySQL的监控平台,相信大家实现起来有很多了:基于天兔的监控,还有基于zabbix相关的二次开发。相信很多同行都应该已经开始玩起来了。我这边的选型是Prometheus + Granafa的实现方式。简而言之就是我现在的生产环境使用的是prometheus,还有就是granafa满足的我的日常工作需要。在入门的简介和安装,大家可以参考这里:

https://blog.51cto.com/cloumn/detail/77

1、首先看下我们的监控效果、mysql主从

构建高大上的MySQL监控平台

2、mysql状态:

构建高大上的MySQL监控平台

构建高大上的MySQL监控平台

3、缓冲池状态:

构建高大上的MySQL监控平台

exporter 相关部署实战教程

1、安装exporter

    [root@controller2 opt]# https://github.com/prometheus/mysqld_exporter/releases/download/v0.10.0/mysqld_exporter-0.10.0.linux-amd64.tar.gz
    [root@controller2 opt]# tar -xf mysqld_exporter-0.10.0.linux-amd64.tar.gz 

2、添加mysql 账户:

    GRANT SELECT, PROCESS, SUPER, REPLICATION CLIENT, RELOAD ON *.* TO 'exporter'@'%' IDENTIFIED BY 'localhost';
    flush privileges;

3、编辑配置文件:

    [root@controller2 mysqld_exporter-0.10.0.linux-amd64]# cat /opt/mysqld_exporter-0.10.0.linux-amd64/.my.cnf 
    [client]
    user=exporter
    password=123456

4、设置配置文件:

    [root@controller2 mysqld_exporter-0.10.0.linux-amd64]# cat /etc/systemd/system/mysql_exporter.service 
    [Unit]
    Description=mysql Monitoring System
    Documentation=mysql Monitoring System

    [Service]
    ExecStart=/opt/mysqld_exporter-0.10.0.linux-amd64/mysqld_exporter \
             -collect.info_schema.processlist \
             -collect.info_schema.innodb_tablespaces \
             -collect.info_schema.innodb_metrics  \
             -collect.perf_schema.tableiowaits \
             -collect.perf_schema.indexiowaits \
             -collect.perf_schema.tablelocks \
             -collect.engine_innodb_status \
             -collect.perf_schema.file_events \
             -collect.info_schema.processlist \
             -collect.binlog_size \
             -collect.info_schema.clientstats \
             -collect.perf_schema.eventswaits \
             -config.my-cnf=/opt/mysqld_exporter-0.10.0.linux-amd64/.my.cnf

    [Install]
    WantedBy=multi-user.target

5、添加配置到prometheus server

      - job_name: 'mysql'
        static_configs:
         - targets: ['192.168.1.11:9104','192.168.1.12:9104']

6、测试看有没有返回数值:

http://192.168.1.12:9104/metrics

正常我们通过mysql_up可以查询倒mysql监控是否已经生效,是否起起来

    #HELP mysql_up Whether the MySQL server is up.
    #TYPE mysql_up gauge
    mysql_up 1

监控相关指标

在做任何一个东西监控的时候,我们要时刻明白我们要监控的是什么,指标是啥才能更好的去监控我们的服务,在mysql里面我们通常可以通过一下指标去衡量mysql的运行情况:mysql主从运行情况、查询吞吐量、慢查询情况、连接数情况、缓冲池使用情况以及查询执行性能等。

主从复制运行指标:

1、主从复制线程监控:

大部分情况下,很多企业使用的都是主从复制的环境,监控两个线程是非常重要的,在mysql里面我们通常是通过命令:

    MariaDB [(none)]> show slave status\G;
    *************************** 1. row ***************************
                   Slave_IO_State: Waiting for master to send event
                      Master_Host: 172.16.1.1
                      Master_User: repl
                      Master_Port: 3306
                    Connect_Retry: 60
                  Master_Log_File: mysql-bin.000045
              Read_Master_Log_Pos: 72904854
                   Relay_Log_File: mariadb-relay-bin.000127
                    Relay_Log_Pos: 72905142
            Relay_Master_Log_File: mysql-bin.000045
                 Slave_IO_Running: Yes
                Slave_SQL_Running: Yes

Slave_IO_Running、Slave_SQL_Running两个线程正常那么说明我们的复制集群是健康状态的。

MySQLD Exporter中返回的样本数据中通过mysql_slave_status_slave_sql_running来获取主从集群的健康状况。

    # HELP mysql_slave_status_slave_sql_running Generic metric from SHOW SLAVE STATUS.
    # TYPE mysql_slave_status_slave_sql_running untyped
    mysql_slave_status_slave_sql_running{channel_name="",connection_name="",master_host="172.16.1.1",master_uuid=""} 1

2、主从复制落后时间:

在使用show slave status
里面还有一个关键的参数Seconds_Behind_Master。Seconds_Behind_Master表示slave上SQL thread与IO thread之间的延迟,我们都知道在MySQL的复制环境中,slave先从master上将binlog拉取到本地(通过IO thread),然后通过SQL
thread将binlog重放,而Seconds_Behind_Master表示本地relaylog中未被执行完的那部分的差值。所以如果slave拉取到本地的relaylog(实际上就是binlog,只是在slave上习惯称呼relaylog而已)都执行完,此时通过show slave status看到的会是0

Seconds_Behind_Master: 0

MySQLD Exporter中返回的样本数据中通过mysql_slave_status_seconds_behind_master 来获取相关状态。

    # HELP mysql_slave_status_seconds_behind_master Generic metric from SHOW SLAVE STATUS.
    # TYPE mysql_slave_status_seconds_behind_master untyped
    mysql_slave_status_seconds_behind_master{channel_name="",connection_name="",master_host="172.16.1.1",master_uuid=""} 0

查询吞吐量:

说到吞吐量,那么我们如何从那方面来衡量呢? 
通常来说我们可以根据mysql 的插入、查询、删除、更新等操作来

为了获取吞吐量,MySQL 有一个名为 Questions 的内部计数器(根据 MySQL
用语,这是一个服务器状态变量),客户端每发送一个查询语句,其值就会加一。由 Questions 指标带来的以客户端为中心的视角常常比相关的Queries
计数器更容易解释。作为存储程序的一部分,后者也会计算已执行语句的数量,以及诸如PREPARE 和 DEALLOCATE PREPARE
指令运行的次数,作为服务器端预处理语句的一部分。可以通过命令来查询:

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Questions";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    |
 Questions     | 15071 |
    +---------------+-------+

MySQLD Exporter中返回的样本数据中通过mysql_global_status_questions反映当前Questions计数器的大小:

    # HELP mysql_global_status_questions Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_questions untyped
    mysql_global_status_questions 13253

当然由于prometheus
具有非常丰富的查询语言,我们可以通过这个累加的计数器来查询某一短时间内的查询增长率情况,可以做相关的阈值告警处理、例如一下查询2分钟时间内的查询情况:

rate(mysql_global_status_questions[2m])

当然上面是总量,我们可以分别从监控读、写指令的分解情况,从而更好地理解数据库的工作负载、找到可能的瓶颈。通常,通常,读取查询会由 Com_select
指标抓取,而写入查询则可能增加三个状态变量中某一个的值,这取决于具体的指令:

Writes = Com_insert + Com_update + Com_delete

下面我们通过命令获取插入的情况:

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Com_insert";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    |
 Com_insert    | 10578 |
    +---------------+-------+

从MySQLD
Exporter的/metrics返回的监控样本中,可以通过global_status_commands_total获取当前实例各类指令执行的次数:

    # HELP mysql_global_status_commands_total Total number of executed MySQL commands.
    # TYPE mysql_global_status_commands_total counter
    mysql_global_status_commands_total{command="create_trigger"} 0
    mysql_global_status_commands_total{command="create_udf"} 0
    mysql_global_status_commands_total{command="create_user"} 1
    mysql_global_status_commands_total{command="create_view"} 0
    mysql_global_status_commands_total{command="dealloc_sql"} 0
    mysql_global_status_commands_total{command="delete"} 3369
    mysql_global_status_commands_total{command="delete_multi"} 0

慢查询性能

查询性能方面,慢查询也是查询告警的一个重要的指标。MySQL还提供了一个Slow_queries的计数器,当查询的执行时间超过long_query_time的值后,计数器就会+1,其默认值为10秒,可以通过以下指令在MySQL中查询当前long_query_time的设置:

    MariaDB [(none)]> SHOW VARIABLES LIKE 'long_query_time';
    +-----------------+-----------+
    | Variable_name   | Value     |
    +-----------------+-----------+
    |
 long_query_time | 10.000000 |
    +-----------------+-----------+
    1 row in set (0.00 sec)

当然我们也可以修改时间

    MariaDB [(none)]> SET GLOBAL long_query_time = 5;
    Query OK, 0 rows affected (0.00 sec)

然后我们而已通过sql语言查询MySQL实例中Slow_queries的数量:

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Slow_queries";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    |
 Slow_queries  | 0     |
    +---------------+-------+
    1 row in set (0.00 sec)

MySQLD
Exporter返回的样本数据中,通过mysql_global_status_slow_queries指标展示当前的Slow_queries的值:

    # HELP mysql_global_status_slow_queries Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_slow_queries untyped
    mysql_global_status_slow_queries 0

同样的,更具根据Prometheus 慢查询语句我们也可以查询倒他某段时间内的增长率:

rate(mysql_global_status_slow_queries[5m])

连接数监控

监控客户端连接情况相当重要,因为一旦可用连接耗尽,新的客户端连接就会遭到拒绝。MySQL 默认的连接数限制为 151。

    MariaDB [(none)]> SHOW VARIABLES LIKE 'max_connections';
    +-----------------+-------+
    | Variable_name   | Value |
    +-----------------+-------+
    |
 max_connections | 151   |
    +-----------------+-------+

当然我们可以修改配置文件的形式来增加这个数值。与之对应的就是当前连接数量,当我们当前连接出来超过系统设置的最大值之后常会出现我们看到的Too many
connections(连接数过多),下面我查找一下当前连接数:

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Threads_connected";
    +-------------------+-------+
    | Variable_name     | Value |
    +-------------------+-------+
    |
 Threads_connected | 41     |
    +-------------------+-------

当然mysql 还提供Threads_running 这个指标,帮助你分隔在任意时间正在积极处理查询的线程与那些虽然可用但是闲置的连接。

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Threads_running";
    +-----------------+-------+
    | Variable_name   | Value |
    +-----------------+-------+
    |
 Threads_running | 10     |
    +-----------------+-------+

如果服务器真的达到 max_connections
限制,它就会开始拒绝新的连接。在这种情况下,Connection_errors_max_connections
指标就会开始增加,同时,追踪所有失败连接尝试的Aborted_connects 指标也会开始增加。

MySQLD Exporter返回的样本数据中:

    # HELP mysql_global_variables_max_connections Generic gauge metric from SHOW GLOBAL VARIABLES.
    # TYPE mysql_global_variables_max_connections gauge
    mysql_global_variables_max_connections 151         

表示最大连接数

    # HELP mysql_global_status_threads_connected Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_threads_connected untyped
    mysql_global_status_threads_connected 41

表示当前的连接数

    # HELP mysql_global_status_threads_running Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_threads_running untyped
    mysql_global_status_threads_running 1

表示当前活跃的连接数

    # HELP mysql_global_status_aborted_connects Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_aborted_connects untyped
    mysql_global_status_aborted_connects 31

累计所有的连接数

    # HELP mysql_global_status_connection_errors_total Total number of MySQL connection errors.
    # TYPE mysql_global_status_connection_errors_total counter
    mysql_global_status_connection_errors_total{error="internal"} 0
    #服务器内部引起的错误、如内存硬盘等
    mysql_global_status_connection_errors_total{error="max_connections"} 0
    #超出连接处引起的错误

当然根据prom表达式,我们可以查询当前剩余可用的连接数:

mysql_global_variables_max_connections - mysql_global_status_threads_connected

查询mysq拒绝连接数

mysql_global_status_aborted_connects

缓冲池情况:

MySQL 默认的存储引擎 InnoDB
使用了一片称为缓冲池的内存区域,用于缓存数据表与索引的数据。缓冲池指标属于资源指标,而非工作指标,前者更多地用于调查(而非检测)性能问题。如果数据库性能开始下滑,而磁盘
I/O 在不断攀升,扩大缓冲池往往能带来性能回升。 
默认设置下,缓冲池的大小通常相对较小,为 128MiB。不过,MySQL 建议可将其扩大至专用数据库服务器物理内存的 80% 大小。我们可以查看一下:

    MariaDB [(none)]> show global variables like 'innodb_buffer_pool_size';
    +-------------------------+-----------+
    | Variable_name           | Value     |
    +-------------------------+-----------+
    |
 innodb_buffer_pool_size | 134217728 |
    +-------------------------+-----------+

MySQLD Exporter返回的样本数据中,使用mysql_global_variables_innodb_buffer_pool_size来表示。

    # HELP mysql_global_variables_innodb_buffer_pool_size Generic gauge metric from SHOW GLOBAL VARIABLES.
    # TYPE mysql_global_variables_innodb_buffer_pool_size gauge
    mysql_global_variables_innodb_buffer_pool_size 1.34217728e+08

    Innodb_buffer_pool_read_requests记录了正常从缓冲池读取数据的请求数量。可以通过以下指令查看

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Innodb_buffer_pool_read_requests";
    +----------------------------------+-------------+
    | Variable_name                    | Value       |
    +----------------------------------+-------------+
    |
 Innodb_buffer_pool_read_requests | 38465 |
    +----------------------------------+-------------+

MySQLD
Exporter返回的样本数据中,使用mysql_global_status_innodb_buffer_pool_read_requests来表示。

    # HELP mysql_global_status_innodb_buffer_pool_read_requests Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_innodb_buffer_pool_read_requests untyped
    mysql_global_status_innodb_buffer_pool_read_requests 2.7711547168e+10

当缓冲池无法满足时,MySQL只能从磁盘中读取数据。Innodb_buffer_pool_reads即记录了从磁盘读取数据的请求数量。通常来说从内存中读取数据的速度要比从磁盘中读取快很多,因此,如果Innodb_buffer_pool_reads的值开始增加,可能意味着数据库的性能有问题。
可以通过以下只能查看Innodb_buffer_pool_reads的数量

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Innodb_buffer_pool_reads";
    +--------------------------+-------+
    | Variable_name            | Value |
    +--------------------------+-------+
    |
 Innodb_buffer_pool_reads | 138  |
    +--------------------------+-------+
    1 row in set (0.00 sec)

MySQLD
Exporter返回的样本数据中,使用mysql_global_status_innodb_buffer_pool_read_requests来表示。

    # HELP mysql_global_status_innodb_buffer_pool_reads Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_innodb_buffer_pool_reads untyped
    mysql_global_status_innodb_buffer_pool_reads 138

通过以上监控指标,以及实际监控的场景,我们可以利用PromQL快速建立多个监控项。可以查看两分钟内读取磁盘的增长率的增长率:

rate(mysql_global_status_innodb_buffer_pool_reads[2m])

官方模板ID

上面是我们简单列举的一些指标,下面我们使用granafa给 MySQLD_Exporter添加监控图表:

  • 主从主群监控(模板7371):

  • 相关mysql 状态监控7362:

  • 缓冲池状态7365:

  • 简单的告警规则

除了相关模板之外,没有告警规则那么我们的监控就是不完美的,下面列一下我们的监控告警规则

    groups:
    - name: MySQL-rules
      rules:
      - alert: MySQL Status 
        expr: up == 0
        for: 5s 
        labels:
          severity: warning
        annotations:
          summary: "{{$labels.instance}}: MySQL has stop !!!"
          description: "检测MySQL数据库运行状态"

      - alert: MySQL Slave IO Thread Status
        expr: mysql_slave_status_slave_io_running == 0
        for: 5s 
        labels:
          severity: warning
        annotations: 
          summary: "{{$labels.instance}}: MySQL Slave IO Thread has stop !!!"
          description: "检测MySQL主从IO线程运行状态"

      - alert: MySQL Slave SQL Thread Status 
        expr: mysql_slave_status_slave_sql_running == 0
        for: 5s 
        labels:
          severity: warning
        annotations: 
          summary: "{{$labels.instance}}: MySQL Slave SQL Thread has stop !!!"
          description: "检测MySQL主从SQL线程运行状态"

      - alert: MySQL Slave Delay Status 
        expr: mysql_slave_status_sql_delay == 30
        for: 5s 
        labels:
          severity: warning
        annotations: 
          summary: "{{$labels.instance}}: MySQL Slave Delay has more than 30s !!!"
          description: "检测MySQL主从延时状态"

      - alert: Mysql_Too_Many_Connections
        expr: rate(mysql_global_status_threads_connected[5m]) > 200
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "{{$labels.instance}}: 连接数过多"
          description: "{{$labels.instance}}: 连接数过多,请处理 ,(current value is: {{ $value }})"  

      - alert: Mysql_Too_Many_slow_queries
        expr: rate(mysql_global_status_slow_queries[5m]) > 3
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "{{$labels.instance}}: 慢查询有点多,请检查处理"
          description: "{{$labels.instance}}: Mysql slow_queries is more than 3 per second ,(current value is: {{ $value }})"

2、添加规则到prometheus:

    rule_files:
      - "rules/*.yml" 

3、打开web ui我们可以看到规则生效了:

构建高大上的MySQL监控平台

总结

到处监控mysql的相关状态已经完成,大家可以根据mysql更多的监控指标去完善自己的监控,当然这一套就是我用在线上环境的,可以参考参考。

来源:https://blog.51cto.com/xiaoluoge/2476375
作者:小罗ge11

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pyzbar 两步批量识别快递单号条形码实战教程

这是Python改变生活系列的第三篇,讲到了如何通过Python的pyzbar批量识别快递单号的条形码,以提高我们的生活工作效率,这是一篇实战教程。

1.识别快递单号的前情提要

了解我的小伙伴可能都知道,小五经常给大家送书。最近一年,不算联合抽奖送书,单独我自购+出版社赞助已送出1000本书籍。

如果是自购的话,还需要自己联系快递小哥寄出书籍。

寄出后快递小哥会给我截图来反馈,然而我想要单号的时候就遇到问题了。

每次寄完书,我都只能得到n个截图(内含快递信息)。

为了及时反馈大家物流信息,我需要尽快将快递单号提取出来。

2.思考解决办法

每次大概都有十几到几十张截图,手动去识别真的太麻烦。

不如先看看每张截图大概是什么样子,再去想想批量处理的办法吧。

主要是为了批量获取图片中的快递单号,我想到了两个解决办法:

  1. 用python识别条形码来直接获得准确快递单号

  2. 用python调用ocr,识别截图中的快递单号文字

大家觉得哪个更简单更准确呢?

今天我先聊聊第一种方法的流程和踩坑经历。

 

3.实战教程-遍历图片

首先,第一步需要先获取文件夹中的所有截图,再依次进行条形码识别。

具体操作可以参考注释

import os

def get_jpg():
    jpgs = []
    path = os.getcwd()
    for i in os.listdir(path):  #获取文件列表
        if i.split(".")[-1] == "jpg":  #筛选jpg文件(截图)
            oldname=os.path.join(path,i)  #旧文件名
            i = i.replace('微信图片_','')
            newname=os.path.join(path,i)  #新文件名
            os.rename(oldname,newname)  #改名
            jpgs.append(i)
    return jpgs

上面的代码中除了遍历筛选图片,还涉及了改名的操作。

这是因为我在后面使用 opencv 时,打开的路径只要含有中文就会一直报错,于是我就干脆把截图名称里的中文去除。

执行构建的get_jpg()函数,得到

这些就是演示文件中的四个截图文件,下面开始对他们进行识别。

4.实战教程-识别条形码

python的第三方模块 pyzbar 可以很方便地处理二维码的识别。我们这次用它来识别一维条形码的话,用法也大致一样。不过还要搭配 cv2 使用,主要是为了利用cv2.imread()来读取图片文件。

注意:对于cv2模块,安装时需要输入pip3 install opencv-python,但在导入的时候采用import cv2

识别条形码的具体语句如下所示:

import pyzbar.pyzbar as pyzbar
import cv2

def get_barcode(img):
    image = cv2.imread(img)
    barcodes = pyzbar.decode(image)
    barcode = barcodes[0]
    barcode_data = barcode.data.decode("utf-8")
    return barcode_data

上面构建的get_barcode()函数可以实现识别条形码,并返回结果数据。

我们可以用for循环遍历前文获取的所有图片,再依次使用get_barcode()函数来识别条形码。

data_m =[]
for i in jpgs:
    data = get_barcode(i)
    data_m.append(data)
data_m

可以发现,成功识别了四张截图里的条形码,并获取了对应的快递单号。

小结

回顾今天的问题案例,我先通过思考想出了两种解决办法。第一种的优点是识别条形码比OCR更准确,但是其只获取了快递单号。后续在给获得赠书的同学反馈时,我还需要手动将名字和单号对应,不够偷懒。后续将给大家介绍第二种方法的流程和优缺点。

如果想看更多python改变生活的真实问题案例,给本文右下角点个赞吧👍

如果你也有一直想用python解决的问题,欢迎在评论区告诉我🚀

本文转自快学Python。

我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应红字验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

点击下方阅读原文可获得更好的阅读体验

Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典

强大的Json解析工具 Jsonpath Python 实战教程

 

JsonPath是一种简单的方法来提取给定JSON文档的部分内容。 JsonPath有许多编程语言,如Javascript,Python和PHP,Java。JsonPath提供的json解析非常强大,它提供了类似正则表达式的语法,基本上可以满足所有你想要获得的json内容。

本文介绍了Json相关的基础知识,引入XML和Jsonpath的对比,说明Jsonpath出现的必要性,并在文末附上了 Jsonpath 实战教程。

1.关于JSON

JSON是一个标记符序列。这套标记符包括:构造字符、字符串、数字和三个字面值

构造字符

JSON包括六个构造字符,分别是:左方括号、右方括号、左大括号、右大括号、冒号与逗号。

JSON值

JSON值可以是对象、数组、数字、字符串或者三个字面值(false、true、null),并且字面值必须是小写英文字母。

对象

对象是由花括号括起来,逗号分割的成员构成,成员是字符串键和上面所说的JSON值构成,例如:

{"name":"jack","age":18,"address":{"country"}}

数组

数组是由方括号括起来的一组数值构成,例如:

[1,2,32,3,6,5,5]

字符串与数字想必就不用我过多叙述吧。

下面我就举例一些合法的JSON格式的数据:

{"a":1,"b":[1.2.3]}
[1,2,"3",{"a":4}]
3.14
"json_data"

2.为什么要使用JSON

JSON是一种轻量级的数据交互格式,它使得人们很容易的进行阅读和编写。同时也方便机器进行解析和生成。适用于进行数据交互的场景,比如网站前台与后台之间的数据交互。

JSON的使用方法

json.loads()

把JSON格式字符串解码转成Python对象,从JSON到Python类型转换表如下:

JSON Python
object dict
array list
string str
number(int) int
number(real) float
true True
false False
null None
  • 将数组转成列表对象
import json


strList = "[1,2,3,3,4]"
print(json.loads(strList))
print(type(json.loads(strList)))

试着运行上面的代码,你会发现已经成功的将strList转换为列表对象。

  • 将对象转换成字典
import json


strDict = '{"city":"上海","name":"jack","age":18}'
print(json.loads(strDict))
print(type(json.loads(strDict)))

试着运行上面的代码,你会发现已经成功的将object转换为dict类型的数据。

json.dumps()

其实这个方法也很好理解,就是将Python类型的对象转换为json字符串。从Python类型向JSON类型转换的对照表如下:

 

python JSON
dict object
list, tuple array
str string
int, float number
True true
False false
None null
  • 将Python列表对象转换为JSON字符串
import json


list_str = [1,2,3,6,5]
print(json.dumps(list_str))
print(type(json.dumps(list_str)))

试着运行上面的代码,你会发现成功的将列表类型转换成了字符串类型。

  • 将Python元组对象转换为JSON字符串
import json


tuple_str = (1,2,3,6,5)
print(json.dumps(tuple_str))
print(type(json.dumps(tuple_str)))

试着运行上面的代码,你会发现成功的将元组类型的数据转换成了字符串。

  • 将Python字典对象转换为JSON字符串
import json 


dict_str = {"name""小明""age":18"city""中国深圳"}
print(json.dumps(dict_str))
print(type(json.dumps(dict_str)))

输出结果:

{"name""\u5c0f\u660e""age": 18, "city""\u4e2d\u56fd\u6df1\u5733"}
<class 'str'>

看到上面的输出结果也许你会有点疑惑,其实不需要疑惑,这是ASCII编码方式造成的,因为**json.dumps()**做序列化操作时默认使用的就是ASCII编码,因此我们可以这样写:

import json


dict_str = {"name""小明""age":18"city""中国深圳"}
print(json.dumps(dict_str, ensure_ascii=False))
print(type(json.dumps(dict_str)))

输出结果:

{"name""小明""age": 18, "city""中国深圳"}
<class 'str'>

因为ensure_ascii的默认值是True,因此我们可以添加参数ensure_ascii将它的默认值改成False,这样编码方式就会更改为utf-8了。

json.load()

该方法的主要作用是将文件中JSON形式的字符串转换为Python类型。

具体代码示例如下:

import json

str_list = json.load(open('position.json', encoding='utf-8'))
print(str_dict)
print(type(str_dict))

运行上面的代码,你会发现成功的将字符串类型的JSON数据转换为了dict类型。

代码中的文件position.json我也会分享给大家。

  • json.dump()

将Python内置类型序列化为JSON对象后写入文件。具体代码示例如下所示:

import json

list_str = [{'city':'深圳'}, {'name''小明'},{'age':18}]
dict_str = {'city':'深圳','name':'小明','age':18}

json.dump(list_str, open('listStr.json''w'), ensure_ascii=False)
json.dump(list_str, open('dictStr.json''w'), ensure_ascii=False)

3.jsonpath

XML的优点是提供了大量的工具来分析、转换和有选择地从XML文档中提取数据。Xpath是这些功能强大的工具之一。

对于JSON数据来说,也出现了jsonpath这样的工具来解决这些问题:

  • 数据可以通过交互方式从客户端上的JSON结构提取,不需要特殊的脚本。
  • 客户端请求的JSON数据可以减少到服务器的上的相关部分,从而大幅度减少服务器响应的带宽使用。

jsonpath表达式始终引用JSON结构的方式与Xpath表达式与XML文档使用的方式相同。

jsonpath的安装方法

pip install jsonpath

jsonpath与Xpath

下面表格是jsonpath语法与Xpath的完整概述和比较。

Xpath jsonpath 概述
/ $ 根节点
. @ 当前节点
/ .or[] 取子节点
* * 匹配所有节点
[] [] 迭代器标识(如数组下标,根据内容选值)
// 不管在任何位置,选取符合条件的节点
n/a [,] 支持迭代器中多选
n/a ?() 支持过滤操作
n/a () 支持表达式计算

下面我们就通过几个示例来学习jsonxpath的使用方法。

我们先来看下面这段json数据

"store": {
    "book": [
      { "category""reference",
        "author""Nigel Rees",
        "title""Sayings of the Century",
        "price"8.95
      },
      { "category""fiction",
        "author""Evelyn Waugh",
        "title""Sword of Honour",
        "price"12.99
      },
      { "category""fiction",
        "author""Herman Melville",
        "title""Moby Dick",
        "isbn""0-553-21311-3",
        "price"8.99
      },
      { "category""fiction",
        "author""J. R. R. Tolkien",
        "title""The Lord of the Rings",
        "isbn""0-395-19395-8",
        "price"22.99
      }
    ],
    "bicycle": {
      "color""red",
      "price"19.95
    }
  }
}

获取符合条件的节点

假如我需要获取到作者的名称该怎么样写呢?

如果通过Python的字典方法来获取是非常麻烦的,所以在这里我们可以选择使用jsonpath.。

具体代码示例如下所示:

import jsonpath


author = jsonpath.jsonpath(data_json, '$.store.book[*].author')
print(author)

运行上面的代码你会发现,成功的获取到了所有的作者名称,并保存在列表中。

或者还可以这样写:

import jsonpath

author = jsonpath.jsonpath(data_json, '$..author')
print(author)

使用指定索引

还是使用上面的json数据,假如我现在需要获取第三本书的价格。

third_book_price = jsonpath.jsonpath(data_json, '$.store.book[2].price')
print(third_book_price)

运行上面的代码,你会发现成功的获取到了第三本书的价格。

使用过滤器

isbn_book = jsonpath.jsonpath(data_json, '$..book[?(@.isbn)]')
print(isbn_book)
print(type(isbn_book))

通过运行上面的代码,你会发现,成功的将含有isbn编号的书籍过滤出来了。

同样的道理,根据上面的例子,我们也可以将价格小于10元的书过滤出来。

book = jsonpath.jsonpath(data_json, '$..book[?(@.price<10)]')
print(book)
print(type(book))

通过运行上面的代码,你会发现这里已经成功的将价格小于10元的书提取出来了。

jsonpath其实是非常适合用来获取json格式的数据的一款工具,最重要的是这款工具轻量简单容使用。关于jsonpath的介绍到这里就结束了,下面我们就进入实战演练吧!

4.Jsonpath 实战教程

前言

每年的6月份都是高校学生的毕业季,作为计算机专业的你来说,如果刚刚毕业就可以进入大厂,想必是一个非常不错的选择。因此,今天我带来的项目就是爬取腾讯招聘的网站,获取职位名称、职位类别、工作地点、工作国家、职位的更新时间、职位描述

爬取内容一共有329页,在前329页的职位都是在这个月发布的,还是比较新,对大家来说更有参考的价值。

网页链接:https://careers.tencent.com/search.html

准备

工欲善其事,必现利其器。首先我们要准备好几个库:pandas、requests、jsonpath

如果没有安装,请参考下面的安装过程:

pip install requests
pip install pandas
pip install jsonpath

需求分析与功能实现

获取所有的职位信息

对网页进行分析的时候,我发现想从网页上直接获取信息是是做不到的,该网页的响应信息如下所示:

<!DOCTYPE html><html><head><meta charset=utf-8><meta http-equiv=X-UA-Compatible content="IE=edge"><meta name=viewport content="initial-scale=1,maximum-scale=1,user-scalable=no"><meta name=keywords content=""><meta name=description content=""><meta name=apple-mobile-web-app-capable content=no><meta name=format-detection content="telephone=no"><title>搜索 | 腾讯招聘</title><link rel=stylesheet href=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/css/main.css><link rel=stylesheet href=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/css/jquery-ui.min.css></head><body><div id=app></div><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/careersmlr/HeadFoot_zh-cn.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/careersmlr/HostMsg_zh-cn.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/careersmlr/Search_zh-cn.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/vendor/config.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/vendor/jquery.min.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/vendor/jquery.ellipsis.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/vendor/report.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/vendor/qrcode.min.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/manifest.build.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/vendor.build.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/p_zh-cn_search.build.js></script></body><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/vendor/common.js></script></html>

因此我判断,这个是动态Ajax加载的数据,因此就要去网页控制器上查找职位数据是否存在。

经过一番查找,果然发现是动态加载的数据,信息如下所示:

格式化之后的数据如下所示:

{
    "Code":200,
    "Data":{
        "Count":8500,
        "Posts":[
            {
                "Id":0,
                "PostId":"1346716678288842752",
                "RecruitPostId":71330,
                "RecruitPostName":"41071-腾讯会议项目经理(西安)(CSIG全资子公司)",
                "CountryName":"中国",
                "LocationName":"西安",
                "BGName":"CSIG",
                "ProductName":"腾讯云",
                "CategoryName":"产品",
                "Responsibility":"1、负责研发项目及研发效能的计划制定、进度驱动和跟踪、风险识别以及应对,确保项目按计划完成;
2、负责组织项目各项评审会议及项目例会,制定并推广项目流程规范,确保项目有序进行;
3、负责与项目外部合作伙伴进行沟通,制定流程规范双方合作,并推动合作事宜;
4、及时发现并跟踪解决项目问题,有效管理项目风险。
"
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=1346716678288842752",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1346716729744564224",
                "RecruitPostId":71331,
                "RecruitPostName":"41071-腾讯会议产品策划(平台方向)(CSIG全资子公司)",
                "CountryName":"中国",
                "LocationName":"西安",
                "BGName":"CSIG",
                "ProductName":"腾讯云",
                "CategoryName":"产品",
                "Responsibility":"1、负责腾讯会议企业管理平台的产品策划工作,包括企业运营平台、运维、会控平台和工具的产品设计和迭代优化;
2、协调和推动研发团队完成产品开发、需求落地,并能在需求上线后进行持续数据分析和反馈跟进,不断提升产品竞争力;
3、根据行业场景抽象用户需求,沉淀面向不同类型客户的云端管控平台解决方案;
 "
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=1346716729744564224",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1346062593894129664",
                "RecruitPostId":71199,
                "RecruitPostName":"41071-腾讯会议产品策划(CSIG全资子公司)",
                "CountryName":"中国",
                "LocationName":"西安",
                "BGName":"CSIG",
                "ProductName":"腾讯云",
                "CategoryName":"产品",
                "Responsibility":"负责腾讯会议的产品策划工作:
1、研究海外用户办公习惯及SaaS市场动态,调研海外相关SaaS产品并输出产品调研结论,综合市场情况和用户需求输出高质量的产品需求或解决方案;
2、负责腾讯会议各产品线的英文版的功能同步和产品设计工作,把关产品功能同步和国际版需求改造等;
3、协调和推动研发团队完成产品开发、需求落地,并能在需求上线后进行持续数据分析和反馈跟进,不断提升产品竞争力; "
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=1346062593894129664",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1352161575309418496",
                "RecruitPostId":72134,
                "RecruitPostName":"CSIG16-推荐算法高级工程师",
                "CountryName":"中国",
                "LocationName":"北京",
                "BGName":"CSIG",
                "ProductName":"",
                "CategoryName":"技术",
                "Responsibility":"1. 参与地图场景下推荐算法优化,持续提升转化效果和用户体验;
2. 负责地图场景下推荐引擎架构设计和开发工作;
3. 跟进业界推荐领域最新进展,并推动其在地图场景下落地。"
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=0",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1352158432852975616",
                "RecruitPostId":72133,
                "RecruitPostName":"41071-腾讯云SDK 终端研发工程师(CSIG全资子公司)",
                "CountryName":"中国",
                "LocationName":"西安",
                "BGName":"CSIG",
                "ProductName":"",
                "CategoryName":"技术",
                "Responsibility":"1. 负责腾讯云 GME SDK(游戏多媒体引擎)的开发和优化工作,并配套开发相应的场景解决方案业务流程,以满足不同场景和不同行业的客户需求; 
2. 全流程参与客户需求咨询、需求评估、方案设计、方案编码实施及交付工作; 
3. 负责优化腾讯云GME产品易用性,并跟踪客户的接入成本、完善服务体系,解决客户使用产品服务和解决方案过程中的技术问题,不断完善问题处理机制和流程。"
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=0",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1352155053116366848",
                "RecruitPostId":72131,
                "RecruitPostName":"40931-智慧交通数据平台前端开发工程师(北京)",
                "CountryName":"中国",
                "LocationName":"北京",
                "BGName":"CSIG",
                "ProductName":"",
                "CategoryName":"技术",
                "Responsibility":"负责腾讯智慧交通领域的平台前端开发工作;
负责规划与制定前端整体发展计划与基础建设;
负责完成前端基础架构设计与组件抽象。"
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=0",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1306860769169645568",
                "RecruitPostId":66367,
                "RecruitPostName":"35566-HRBP(腾讯全资子公司)",
                "CountryName":"中国",
                "LocationName":"武汉",
                "BGName":"CSIG",
                "ProductName":"",
                "CategoryName":"人力资源",
                "Responsibility":"负责区域研发公司的HR政策、制度、体系与重点项目在部门内部的落地与推动执行;
深入了解所负责领域业务与人员发展状况,评估并明确组织与人才发展对HR的需求;
驱动平台资源提供HR解决方案,并整合内部资源推动执行;提升管理干部的人力资源管理能力,关注关键人才融入与培养,确保持续的沟通与反馈;
协助管理层进行人才管理、团队发展、组织氛围建设等,确保公司文化在所属业务领域的落地;
负责所对接部门的人才招聘工作;
"
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=1306860769169645568",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1351353005709991936",
                "RecruitPostId":71981,
                "RecruitPostName":"35566-招聘经理(腾讯云全资子公司)",
                "CountryName":"中国",
                "LocationName":"武汉",
                "BGName":"CSIG",
                "ProductName":"",
                "CategoryName":"人力资源",
                "Responsibility":"1、负责CSIG区域研发公司相关部门的社会招聘及校园招聘工作,制定有效的招聘策略并推动落地执行,保障人才开源、甄选和吸引;
2、负责相关部门人力资源市场分析,有效管理并优化招聘渠道;
3、参与招聘体系化建设,甄选相关优化项目,有效管理及优化招聘渠道。"
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=1351353005709991936",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1351838518279675904",
                "RecruitPostId":72081,
                "RecruitPostName":"35566-雇主品牌经理(腾讯云全资子公司)",
                "CountryName":"中国",
                "LocationName":"武汉",
                "BGName":"CSIG",
                "ProductName":"",
                "CategoryName":"人力资源",
                "Responsibility":"1、负责腾讯云区域研发公司雇主品牌的规划和建设工作,结合业务招聘需求,制定有效的品牌方案;
2、负责讯云区域研发公司的公众号、媒体账号的内容策划、撰写,协调相关资源完成高质量内容输出;
3、负责招聘创意项目的策划和项目统筹,借助各种平台渠道,完成创意内容的传播触达,提升人选对腾讯云区域研发公司的认知和意向度;"
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=1351838518279675904",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1199244591342030848",
                "RecruitPostId":55432,
                "RecruitPostName":"22989-数据库解决方案架构师(北京/上海/深圳)",
                "CountryName":"中国",
                "LocationName":"上海",
                "BGName":"CSIG",
                "ProductName":"",
                "CategoryName":"产品",
                "Responsibility":"支持客户的应用架构设计,了解客户的业务逻辑和应用架构,给出合理的产品方案建议; 
支持客户的数据库方案设计,从运维、成本、流程等角度主导云数据库产品落地; 
梳理客户的核心诉求,提炼为普适性的产品能力,推动研发团队提升产品体验;
根据客户的行业属性,定制行业场景的解决方案,提升云数据库的影响力;"
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=1199244591342030848",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            }
        ]
    }
}

经过对比发现上面的json数据与网页信息是完全相同的。

看到json数据你有没有一丝的惊喜,终于到了可以大显身手的时候了。

你会发现,上面每一个节点的参数都是独立的,不会存在重复,那我们可以这样写:

def get_info(data):
    recruit_post_name = jsonpath.jsonpath(data, '$..RecruitPostName')
    category_name = jsonpath.jsonpath(data, '$..CategoryName')
    country_name= jsonpath.jsonpath(data, '$..CountryName')
    location_name = jsonpath.jsonpath(data, '$.Data.Posts..LocationName')
    responsibility = jsonpath.jsonpath(data, '$..Responsibility')
    responsibility = [i.replace('\n''').replace('\r'''for i in responsibility]
    last_update_time = jsonpath.jsonpath(data, '$..LastUpdateTime')

运行上面的代码,你会发现成功的获取到了每一组数据。

关于翻页

打开网页之后你会发现腾讯的职位信息一共有850页,但是前面的json数据仅仅只有第一页的数据怎么办呢?

不用担心,直接点击第二页看看网络数据有什么变化。

如上图所示,当点击第二页的时候,又加载出来了一个数据,点击进去之后你就会发现,这个数据刚好就是第二页的职位信息。

那接下来就是发现规律的时候了,第一页与第二页保存JSON数据的URL如下所示:

# 第一页
https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1611215870971&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex=1&pageSize=10&language=zh-cn&area=cn

# 第二页
https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1611217026103&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex=2&pageSize=10&language=zh-cn&area=cn

经过测试发现,可以将URL地址进行简化,简化后的URL如下所示:

# 第一页
https://careers.tencent.com/tencentcareer/api/post/Query?pageIndex=1&pageSize=10

# 第二页
https://careers.tencent.com/tencentcareer/api/post/Query?pageIndex=1&pageSize=10

数据保存

将爬取下来的数据保存至csv文件,核心代码如下所示:

df = pd.DataFrame({
        'country_name': country_name,
        'location_name': location_name,
        'recruit_post_name':recruit_post_name,
        'category_name': category_name,
        'responsibility':responsibility,
        'last_update_time':last_update_time
    })

if __name__ == '__main__':
    tengxun = TengXun()
    df = pd.DataFrame(columns=['country_name''location_name''category_name','recruit_post_name''responsibility''last_update_time'])

    for page in range(1330):
        print(f'正在获取第{page}页')
        url = tengxun.get_url(page)
        data = tengxun.get_json(url)
        time.sleep(0.03)

        df1 = get_info(data)
        df = pd.concat([df, df1])
        df = df.reset_index(drop=True)
    # pprint.pprint(data)

    df.to_csv('../data/腾讯招聘.csv', encoding='utf-8-sig')

最后结果

 

我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应红字验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

点击下方阅读原文可获得更好的阅读体验

Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典

手把手Django+Vue前后端分离入门实战教程

1.前言

众所周知,Django对于网站快速开发非常友好,这得益于框架为我们做了很多事情,让我们只需要做一些简单的配置和逻辑即可把网站的功能开发出来。

但是,在使用Django的过程中,有一个地方一直是比较难受的,那就是使用Django自带的模版,这种通常需要自己利用HTML+CSS+Jquery的方式总感觉是上一个时代的做法,前后端分离无论对于开发效率、多端支持等等都是很有好处的。

所以,本文希望通过一个简单的demo,讲一讲基于Django+Vue的前后端分离开发,将Django作为一个纯后端的服务,为前端Vue页面提供数据支持。

本文采用的django版本号2.2.5,Vue版本2.9.6。

如果看不完可以先收藏关注哈~

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,可以访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器,它有许多的优点:Python 编程的最好搭档—VSCode 详细指南

请选择以下任一种方式输入命令安装依赖
1. Windows 环境 打开 Cmd (开始-运行-CMD)。
2. MacOS 环境 打开 Terminal (command+空格输入Terminal)。
3. 如果你用的是 VSCode编辑器 或 Pycharm,可以直接使用界面下方的Terminal.

pip install django

vue的安装可见:
https://www.runoob.com/vue2/vue-install.html

创建前后端项目:创建一个文件夹,然后命令行创建项目即可,如下图:

命令行进入后端文件夹 book_demo,输入下面命令,浏览器登陆 127.0.0.1:8000 看见欢迎页即成功。

python manage.py runserver

再进入前端文件夹 appfront ,输入下面命令,浏览器登陆 127.0.0.1:8080 看见欢迎页即成功。

npm run dev

上面两个命令也是对应前后端项目的启动命令,后面就直接将过程说成启动前/后端项目。

2.Django+Vue后端实现

为了方便后端的实现,作为django做后端api服务的一种常用插件,django-rest-framework(DRF)提供了许多好用的特性,所以本文demo中也应用一下,命令行输入命令安装:

pip install django-rest-framework

进入book_demo目录,创建一个新的名为books的应用,并在新应用中添加urls.py文件,方便后面的路由配置,命令行输入:

python manage.py startapp books
cd books touch urls.py

现在的目录结构如下:

下面开始做一些简单的配置:

将DRF和books配置到django项目中,打开项目中的 settings.py 文件,添加:

# book_demo/settings.py
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',

    # demo add
    'rest_framework',
    'books',
]

对整个项目的路由进行配置,让访问 api/ 路径时候转到books应用中的urls.py文件配置进行处理。

# book_demo/settings.py
from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('api/', include('books.urls')), # demo add
]

下面在books应用中写简单的逻辑,demo只最简单涉及对书本的增删改查。因为这一部分不是本文重点,所以这里只介绍写代码流程和贴代码,对代码不做详细解释:

在models.py文件中写简单数据类Books:

# books/models.py
from django.db import models


class Books(models.Model):
    name = models.CharField(max_length=30)
    author = models.CharField(max_length=30, blank=True, null=True)

在books文件夹中创建serializer.py文件,并写对应序列化器BooksSerializer:

# books/serializer.py
from rest_framework import serializers

from books.models import Books


class BooksSerializer(serializers.ModelSerializer):
    class Meta:
        model = Books
        fields = '__all__'

在views.py文件中写对应的视图集BooksViewSet来处理请求:

# books/views.py
from rest_framework import viewsets

from books.models import Books
from books.serializer import BooksSerializer


class BooksViewSet(viewsets.ModelViewSet):
    queryset = Books.objects.all()
    serializer_class = BooksSerializer

在urls.py文件中写对应的路由映射:

# books/urls.py
from django.urls import path, include
from rest_framework.routers import DefaultRouter

from books import views


router = DefaultRouter()
router.register('books', views.BooksViewSet)

urlpatterns = [
    path('', include(router.urls)),
]

对于books应用中的内容,如果对DRF和Django流程熟悉的同学肯定知道都干了些什么,篇幅关系这里只能简单解释,DRF通过视图集ViewSet的方式让我们对某一个数据类Model可以进行增删改查,而且不同的操作对应于不同的请求方式,比如查看所有books用get方法,添加一本book用post方法等,让整个后端服务是restful的。

如果实在看不懂代码含义,只需知道这样做之后就可以通过不同的网络请求对后端数据库的Books数据进行操作即可,后续可以结合Django和DRF官方文档对代码进行学习,或关注本人未来分享的内容。

到这里,可以运行一下后端项目看看效果,命令行运行:

python manage.py makemigrations
python manage.py migrate
python manage.py runserver

得益于DRF提供的api可视化界面,浏览器访问 127.0.0.1:8000/api/books/ ,如果出现了以下界面并添加数据正常,则说明后端的基本逻辑已经ok了~下图为添加了一条数据之后的效果。

3.Vue前端实现教程