标签归档:转载

Python 电信用户流失预测模型实战教程

1.流失预测 研究背景

1、做好用户流失预测可以降低营销成本。老生常谈,新客户开发成本老客户维护成本的5倍。

2、获得更好的用户体验。并不是所有的增值服务都可以有效留住客户。

3、获得更高的销售回报。价格敏感型客户和非价格敏感性客户。

2.提出问题

1、流失客户有哪些显著性特征?

2、当客户在哪些特征下什么条件下比较容易发生流失

3.数据集描述

该数据是datafountain上的《电信客户流失数据》,这里提供一个下载地址。
https://www.datafountain.cn/datasets/35guide

该数据集有21个变量,7043个数据点。变量可分为以下三个部分:用户属性、用户行为、研究对象。

用户属性
customerID :用户ID
gender:性别(Female & Male)
SeniorCitizen :老年人(1表示是,0表示不是)
Partner :是否有配偶(Yes or No)
Dependents :是否经济独立(Yes or No)
tenure :客户的职位(0-72,共73个职位)

用户行为
PhoneService :是否开通电话服务业务(Yes or No)
MultipleLines :是否开通了多线业务(Yes 、No or No phoneservice 三种)
InternetService :是否开通互联网服务(No, DSL数字网络,fiber optic光纤网络 三种)
OnlineSecurity :是否开通网络安全服务(Yes,No,No internetserive 三种)
OnlineBackup :是否开通在线备份业务(Yes,No,No internetserive 三种)
DeviceProtection :是否开通了设备保护业务(Yes,No,No internetserive 三种)
TechSupport :是否开通了技术支持服务(Yes,No,No internetserive 三种)
StreamingTV :是否开通网络电视(Yes,No,No internetserive 三种)
StreamingMovies :是否开通网络电影(Yes,No,No internetserive 三种)
Contract :签订合同方式 (按月,一年,两年)
PaperlessBilling :是否开通电子账单(Yes or No)
PaymentMethod :付款方式(bank transfer,credit card,electronic check,mailed check)
MonthlyCharges :月费用
TotalCharges :总费用

研究对象Churn:该用户是否流失(Yes or No)

4.分析思路

分析视角分析方法的灵魂。

分析方法有上百种,但分析视角只有四种:

  • 对比视角
  • 分类视角
  • 相关视角
  • 描述视角

一旦将业务需求拆解成指标,接下来只需要针对每个指标进行分析视角四选一即可。

数据集描述,已经将变量分为三个维度了:用户属性、用户行为、研究对象(是否流失客户),三个维度组合一下就得出了以下解题思路了:

  • 哪些属性的用户比较容易流失?
  • 哪些行为的用户比较容易流失?

以上两个分析思路运用的是【对比视角】,该视角下具体的分析方法有:

  • 数值型数据:均值比较
  • 分类型数据:频数分布比较(交叉分析)

以上的分析方法是统计分析,只能一个维度一个维度地去比较。但实际情况中,并不是每个维度的权重都一样的,那如何去研究各个维度的权重?

权重问题属于分类视角,故我们可以采用分类模型,要用哪个分类模型呢?不知道。可以全部采用,看模型精度得分,然后选得分最高的模型进行进一步预测。

  • Random Forest 随机森林
  • SVC 支持向量机
  • LogisticRegression 逻辑回归
  • KNN 近邻算法
  • Naive Bayes  朴素贝叶斯
  • Decision Tree 决策树
  • AdaBoost
  • GradientBoosting
  • XGB
  • CatBoost

5.分析结论及运营建议

5.1 分析结论

综合统计分析XGB算法输出特征重要性得出流失客户有以下特征(依特征重要性从大到小排列):

  1. tenure :1-5号职位的用户比较容易流失
  2. PaymentMethod :使用电子支票支付的人
  3. MonthlyCharges 、TotalCharges : 总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失
  4. PaperlessBilling : 开通电子账单
  5. Partner : 单身
  6. OnlineBackup : 没开通在线备份业务
  7. InternetService :开通了Fiber optic 光纤网络
  8. TechSupport :没开通“技术支持服务”
  9. DeviceProtection :没开通设备保护业务
  10. OnlineSecurity :没开通网络安全服务
  11. Contract :按月签订合同方式
  12. Dependents :无经济独立
  13. SeniorCitizen :青年人
  14. TotalCharges :总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失

当条件覆盖得越多,人群越精确,但与此同时,覆盖的人群也会越少。业务方可直接在数据库中,通过SQL检索符合要求的客户,然后做针对性的运营工作。

5.2 运营建议

如何留住客户,可以从两方面去思考:

  • 增加用户的沉没成本(损失厌恶)
    • 会员等级
    • 积分制
    • 充值赠送
    • 满减券
    • 其他增值服务
  • 培养用户的条件反射(习惯)
    • 会员日
    • 定期用户召回
    • 签到
    • 每日定时抽奖
    • 小游戏

电子账单解锁新权益

  • 现象:“开通电子账单”的人反而容易流失。
  • 基本假设:价格敏感型客户。电子账单,让客户理性消费。
  • 建议:让“电子账单”变成一项“福利。跟连锁便利店,联名发”商品满减券”,每月的账单时间,就将”商品满减券“和账单一起推送过去。文案:您上月消费了XX元,解锁了xx会员权益。
  • 底层规律:增加沉没成本。

“单身用户”尊享亲情网

  • 现象:“单身用户”容易流失。
  • 基本假设:社交欲望低。
  • 建议:一个单身用户拥有建立3个人以内的“亲情网”的权益。
  • 底层规律:增加沉没成本。

推广“在线备份、设备保护、技术支持、网络保护”等增值服务。

6.实战教程-数据清洗

6.1 导入模块

6.1.1 数据处理

import pandas as pd
import numpy as np

6.1.2 可视化

import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style='darkgrid',font_scale=1.3)
plt.rcParams['font.family']='SimHei'
plt.rcParams['axes.unicode_minus']=False

6.1.3 特征工程

import sklearn
from sklearn import preprocessing                            #数据预处理模块
from sklearn.preprocessing import LabelEncoder               #编码转换
from sklearn.preprocessing import StandardScaler             #归一化
from sklearn.model_selection import StratifiedShuffleSplit   #分层抽样
from sklearn.model_selection import train_test_split         #数据分区
from sklearn.decomposition import PCA                        #主成分分析 (降维)

6.1.4 分类算法

from sklearn.ensemble import RandomForestClassifier     #随机森林
from sklearn.svm import SVC,LinearSVC                   #支持向量机
from sklearn.linear_model import LogisticRegression     #逻辑回归
from sklearn.neighbors import KNeighborsClassifier      #KNN算法
from sklearn.cluster import KMeans                     #K-Means 聚类算法
from sklearn.naive_bayes import GaussianNB              #朴素贝叶斯
from sklearn.tree import DecisionTreeClassifier         #决策树

6.1.5 分类算法–集成学习

import xgboost as xgb
from xgboost import XGBClassifier                      
from catboost import CatBoostClassifier                
from sklearn.ensemble import AdaBoostClassifier        
from sklearn.ensemble import GradientBoostingClassifier 

6.1.6 模型评估

from sklearn.metrics import classification_report,precision_score,recall_score,f1_score  #分类报告
from sklearn.metrics import confusion_matrix           #混淆矩阵
from sklearn.metrics import silhouette_score           #轮廓系数(评价k-mean聚类效果)
from sklearn.model_selection import GridSearchCV       #交叉验证
from sklearn.metrics import make_scorer
from sklearn.ensemble import VotingClassifier          #投票

6.1.7 忽略警告

import warnings
warnings.filterwarnings('ignore')

6.2 读取数据

df=pd.read_csv(r'C:\Users\Think\Desktop\刻意练习数据\电信数据集\Customer-Churn.csv',header=0)
#预览数据
df.head()
#查看数据大小
df.shape
#查看数据数据及分布
df.describe()

这里安利一下spyder编辑器,下图是这个编辑器的界面。编程过程中,有赋值变量的操作,该编辑器都会在右上角呈现,双击一下,就可以像在Execel上查看数据,非常方便。查看该数据集的详情。

6.3 数据清洗

6.3.1 缺失值处理

#查看缺失值
df.isnull().sum()

注:缺失值的数据类型是 float 类型。一旦有变量的数据类型转换成float 类型,需再次查看缺失值。

6.3.2 重复值处理

#查看重复值
df.duplicated().sum()

【输出】

6.3.3 数值类型转换

#查看数据类型
df.info()

【输出】TotalCharages总费用应该跟MonthlvCharges是同一个数据类型(float64)。故需将TotalCharages由object转换成float64,且需要再次查看缺失值。

#总费用 TotalCharges  该列的数据类型应是float64,不是object
# df['TotalCharges'].astype('float64')
# 此处用“astype”转化数据类型报错 “could not convert string to float”
#改用强制转化 convert_numeric=True   
df['TotalCharges']=df['TotalCharges'].convert_objects(convert_numeric=True)
df['TotalCharges'].dtype

输出如下:再次查看缺失值:TotalCharges列有11个缺失值,处理缺失值的原则是尽量填充,最后才是删除。
缺失值填充的原则:

  • 分类型数据:众数填充
  • 数值型数据:正态分布,均值/中位数填充;偏态分布,中位数填充。

TotalCharges列是数值型数据,先画直方图查看数据分布形态。

#分别作直方图:全部客户类型、流失客户类型、留存客户类型
plt.figure(figsize=(14,5))
plt.subplot(1,3,1)
plt.title('全部客户的总付费直方图')
sns.distplot(df['TotalCharges'].dropna())

plt.subplot(1,3,2)
plt.title('流失客户的总付费直方图')
sns.distplot(df[df['Churn']=='Yes']['TotalCharges'].dropna())

plt.subplot(1,3,3)
plt.title('留存客户的总付费直方图')
sns.distplot(df[df['Churn']=='No']['TotalCharges'].dropna())

结果如下:从三个直方图看,该列数据是偏态分布,故选择中位数填充。

df.fillna({'TotalCharges':df['TotalCharges'].median()},inplace=True)
#再次确认是否还有空值
df.isnull().sum()

结果如下:

6.4 查看样本分布

研究对象’Churn’列重新编码“Yes”=1,“No”=0。重新编码有下面两种方法。

方法一:replace

df['Churn'].replace(to_replace = 'Yes', value = 1,inplace = True)
df['Churn'].replace(to_replace = 'No', value = 0,inplace = True)

方法二:map函数

df['Churn']=df['Churn'].map({'Yes':1,'No':0})

预览数据:

df['Churn'].head()

结果如下:绘制饼图,查看流失客户占比。

churn_value=df["Churn"].value_counts()
labels=df["Churn"].value_counts().index

plt.figure(figsize=(7,7))
plt.pie(churn_value,labels=labels,colors=["b","w"], explode=(0.1,0),autopct='%1.1f%%', shadow=True)
plt.title("流失客户占比高达26.5%")
plt.show()  

结果如下:【分析】:流失客户样本占比26.5%,留存客户样本占比73.5%,明显的“样本不均衡”。

解决样本不均衡有以下方法可以选择:

  • 分层抽样
  • 过抽样
  • 欠抽样

7.实战教程-特征选择

提取特征

feature=df.iloc[:,1:20]

7.1 整数编码

查看变量间的两两相关性

#重新编码
corr_df = feature.apply(lambda x: pd.factorize(x)[0])
corr_df.head()
#相关性矩阵
corr=corr_df.corr()
corr

结果如下:相关性矩阵可视化

#绘制热力图观察变量之间的相关性强弱
plt.figure(figsize=(15,12))
ax = sns.heatmap(corr, xticklabels=corr.columns, yticklabels=corr.columns, 
                 linewidths=0.2, cmap="RdYlGn",annot=True)
plt.title("Correlation between variables")

结果如下:【分析】:从热力图来看,互联网服务、网络安全、在线备份、设备维护服务、技术支持服务、开通网络电视服务、开通网络电影之间相关性很强,且是正相关。电话服务和多线业务之间也存在很强的正相关关系。

7.2 独热编码

查看研究对象”Churn”与其他变量下的标签相关性。独热编码,可以将分类变量下的标签转化成列

df_onehot = pd.get_dummies(df.iloc[:,1:21])
df_onehot.head()

结果如下:绘图查看用户流失(‘Churn’)与各个维度之间的关系

plt.figure(figsize=(15,6))
df_onehot.corr()['Churn'].sort_values(ascending=False).plot(kind='bar')
plt.title('Correlation between Churn  and variables ')

结果如下:【分析】:从图看gender(性别)、PhoneService(电话服务)相关性几乎为0,故两个维度可以忽略。[‘SeniorCitizen’,’Partner’,’Dependents’, ‘Contract’,MultipleLines,’InternetService’,  ‘OnlineSecurity’, ‘OnlineBackup’, ‘DeviceProtection’,’TechSupport’, ‘StreamingTV’, ‘StreamingMovies’,’PaperlessBilling’,’PaymentMethod’]等都有较高的相关性,将以上维度合并成一个列表kf_var,然后进行频数比较。

kf_var=list(df.columns[2:5])
for var in list(df.columns[7:18]):
    kf_var.append(var)
print('kf_var=',kf_var)

结果如下:

8.实战教程-统计分析

8.1 频数分布比较

8.1.1 卡方检验

组间有显著性差异,频数分布比较才有意义,否则可能会做无用功。
“卡方检验”,就是提高频数比较结论可信度的统计方法。

#分组间确实是有显著性差异,频数比较的结论才有可信度,故需进行”卡方检验“
from scipy.stats import chi2_contingency   #统计分析 卡方检验
#自定义卡方检验函数
def KF(x):
    df1=pd.crosstab(df['Churn'],df[x])
    li1=list(df1.iloc[0,:])
    li2=list(df1.iloc[1,:])
    kf_data=np.array([li1,li2])
    kf=chi2_contingency(kf_data)
    if kf[1]<0.05:
        print('Churn by {} 的卡方临界值是{:.2f},小于0.05,表明{}组间有显著性差异,可进行【交叉分析】'.format(x,kf[1],x),'\n')
    else:
        print('Churn by {} 的卡方临界值是{:.2f},大于0.05,表明{}组间无显著性差异,不可进行交叉分析'.format(x,kf[1],x),'\n')
#对 kf_var进行卡方检验
print('kf_var的卡方检验结果如下:','\n')
print(list(map(KF, kf_var)))

kf_var的卡方检验结果如下:

Churn by SeniorCitizen 的卡方临界值是0.00,小于0.05,表明SeniorCitizen组间有显著性差异,可进行【交叉分析】

Churn by Partner 的卡方临界值是0.00,小于0.05,表明Partner组间有显著性差异,可进行【交叉分析】

Churn by Dependents 的卡方临界值是0.00,小于0.05,表明Dependents组间有显著性差异,可进行【交叉分析】

Churn by MultipleLines 的卡方临界值是0.99,大于0.05,表明MultipleLines组间无显著性差异,不可进行交叉分析

Churn by InternetService 的卡方临界值是0.00,小于0.05,表明InternetService组间有显著性差异,可进行【交叉分析】

Churn by OnlineSecurity 的卡方临界值是0.00,小于0.05,表明OnlineSecurity组间有显著性差异,可进行【交叉分析】

Churn by OnlineBackup 的卡方临界值是0.00,小于0.05,表明OnlineBackup组间有显著性差异,可进行【交叉分析】

Churn by DeviceProtection 的卡方临界值是0.00,小于0.05,表明DeviceProtection组间有显著性差异,可进行【交叉分析】

Churn by TechSupport 的卡方临界值是0.00,小于0.05,表明TechSupport组间有显著性差异,可进行【交叉分析】

Churn by StreamingTV 的卡方临界值是0.00,小于0.05,表明StreamingTV组间有显著性差异,可进行【交叉分析】

Churn by StreamingMovies 的卡方临界值是0.00,小于0.05,表明StreamingMovies组间有显著性差异,可进行【交叉分析】

Churn by Contract 的卡方临界值是0.00,小于0.05,表明Contract组间有显著性差异,可进行【交叉分析】

Churn by PaperlessBilling 的卡方临界值是0.00,小于0.05,表明PaperlessBilling组间有显著性差异,可进行【交叉分析】

Churn by PaymentMethod 的卡方临界值是0.00,小于0.05,表明PaymentMethod组间有显著性差异,可进行【交叉分析】

从卡方检验的结果,kf_var包含的特征,组间都有显著性差异,可进行频数比较。

8.1.2 柱形图

频数比较–柱形图

plt.figure(figsize=(20,25))
a=0
for k in kf_var:
    a=a+1
    plt.subplot(4,4,a)
    plt.title('Churn BY '+ k)
    sns.countplot(x=k,hue='Churn',data=df)

结果如下:因为PaymentMethod的标签比较长,影响看图,所以单独画。

plt.xticks(rotation=45)
sns.countplot(x='PaymentMethod',hue='Churn',data=df)

可以直接从柱形图去判断对哪个维度对流失客户的影响大吗?不能,因为“样本不均衡”(流失客户样本占比26.5%,留存客户样本占比73.5%),基数不一样,故不能直接通过“频数”的柱形图去分析。
解决办法:交叉分析,且作同行百分比(’Churn’作为“行”)

8.1.3 交叉分析

print('ka_var列表中的维度与Churn交叉分析结果如下:','\n')
for i in kf_var:
    print('................Churn BY {}...............'.format(i))
    print(pd.crosstab(df['Churn'],df[i],normalize=0),'\n'#交叉分析,同行百分比

ka_var列表中的维度与Churn交叉分析结果如下:【SeniorCitizen 分析】:年轻用户 在流失、留存,两个标签的人数占比都高。【Parter 分析】:单身用户更容易流失。【Denpendents 分析】:经济不独立的用户更容易流失。【MultipleLines 分析】:是否开通MultipleLines,对留存和流失都没有明显的促进作用。【InternetService 分析】:办理了 “Fiber optic 光纤网络”的客户容易流失。【OnlineSecurity 分析】:没开通“网络安全服务”的客户容易流失。【OnlineBackup 分析】:没开通“在线备份服务”的客户容易流失。【DeviceProtection 分析】:没开通“设备保护业务”的用户比较容易流失【TechSupport 分析】:没开通“技术支持服务”的用户容易流失。【StreamingTV 分析】:是否开通“网络电视”服务,对用户留存、流失,没有明显的促进作用。【StreamingMovies 分析】:是否开通“网络电影”服务,对用户留存、流失,没有明显的促进作用。【Contract 分析】逐月签订合同的用户最容易流失。

因为”Churn BY PaymentMethod”打印出来显示不全,故我就从临时表将“交叉表”给截图出来了: 【分析】使用“电子支票”支付的人更容易流失。

8.2 均值比较

组间有显著性差异,均值比较才有意义。
显著性检验,先通过了齐性检验,再通过方差分析,最后才能做均值比较。

8.2.0 齐性检验,方差分析

#自定义齐性检验 & 方差分析 函数
def ANOVA(x):
    li_index=list(df['Churn'].value_counts().keys())
    args=[]
    for i in li_index:
        args.append(df[df['Churn']==i][x])
    w,p=stats.levene(*args)             #齐性检验
    if p<0.05:
        print('警告:Churn BY {}的P值为{:.2f},小于0.05,表明齐性检验不通过,不可作方差分析'.format(x,p),'\n')
    else:
        f,p_value=stats.f_oneway(*args) #方差分析
        print('Churn BY {} 的f值是{},p_value值是{}'.format(x,f,p_value),'\n')
        if p_value<0.05:
            print('Churn BY {}的均值有显著性差异,可进行均值比较'.format(x),'\n')
        else:
            print('Churn BY {}的均值无显著性差异,不可进行均值比较'.format(x),'\n')

对MonthlyCharges、TotalCharges维度分别进行齐性检验和方差分析

print('MonthlyCharges、TotalCharges的齐性检验 和方差分析结果如下:','\n')
ANOVA('MonthlyCharges')
ANOVA('TotalCharges')

【输出】:
MonthlyCharges、TotalCharges的齐性检验 和方差分析结果如下:

警告:Churn BY MonthlyCharges的P值为0.00,小于0.05,表明齐性检验不通过,不可作方差分析

警告:Churn BY TotalCharges的P值为0.00,小于0.05,表明齐性检验不通过,不可作方差分析

8.3 总结

用户出现以下特征比较容易流失:

  • SeniorCitizen:青年人
  • Partner :单身
  • Dependents :无经济独立
  • InternetService:开通了 “Fiber optic 光纤网络”
  • OnlineSecurity:没开通“网络安全服务”
  • OnlineBackup:没开通“在线备份业务”
  • DeviceProtection:没开通通了“设备保护业务
  • TechSupport:没开通“技术支持服务”
  • Contract:“按月”签订合同方式
  • PaperlessBilling:开通电子账单
  • PaymentMethod:使用“电子支票”支付的人

我们可以在SQL(数据库)上找有以上特征的客户,进行精准营销,即可以降低用户流失。虽然特征选得越多,越精确,但覆盖的人群也会越少。故,我们还需要计算“特征”的【重要性】,将最为重要的几个特征作为筛选条件。

计算特征的【重要性】,是“分类视角”,接下来我们会挑选常见的分类模型,进行批量训练,然后挑出得分最高的模型,进一步计算“特征重要性”。

9.实战教程-特征工程

9.1 提取特征

有前面的流失率与各个维度的相关系数柱状图可知:
流失率与gender(性别)、PhoneService(电话服务)相关性几乎为0,可以筛选掉,而customerID是随机数,不影响建模,故可以筛选掉。最终得到特征 churn_var

churn_var=df.iloc[:,2:20]
churn_var.drop("PhoneService",axis=1, inplace=True)
churn_var.head()

结果如下:

9.2 处理“量纲差异大”

“MonthlyCharges”、”TotalCharges”两个特征跟其他特征相比,量纲差异大。处理量纲差异大,有两种方法:

  1. 标准化
  2. 离散化

以上两种方法,哪个能让模型精度提高,就选哪个。根据模型的最后得分,我选了“离散化”来处理量纲差异大。

9.2.1 标准化

scaler = StandardScaler(copy=False)
scaler.fit_transform(churn_var[['MonthlyCharges','TotalCharges']])  #fit_transform拟合数据
churn_var[['MonthlyCharges','TotalCharges']]=scaler.transform(churn_var[['MonthlyCharges','TotalCharges']])  #transform标准化

print(churn_var[['MonthlyCharges','TotalCharges']].head() )#查看拟合结果

【输出】

9.2.2 特征离散化

特征离散化后,模型易于快速迭代,且模型更稳定。

1、处理’MonthlyCharges’:

#查看'MonthlyCharges'列的4分位
churn_var['MonthlyCharges'].describe() 

离散操作
18.25=<churn_var[‘MonthlyCharges’]<=35.5,标记 “1”
35.5<churn_var[‘MonthlyCharges’]<=70.35,标记 “2”
70.35<churn_var[‘MonthlyCharges’]<=89.85,标记 “3”
89.85=<churn_varf[‘MonthlyCharges’]<=118.75,标记“4”

#用四分位数进行离散
churn_var['MonthlyCharges']=pd.qcut(churn_var['MonthlyCharges'],4,labels=['1','2','3','4'])
churn_var['MonthlyCharges'].head()

结果如下:2、处理’TotalCharges’:

#查看'TotalCharges'列的4分位
churn_var['TotalCharges'].describe()

结果如下:

离散操作:
18=<churn_var[‘TotalCharges’]<=402,标记 “1”
402<churn_var[‘TotalCharges’]<=1397,标记 “2”
1397<churn_var[‘TotalCharges’]<=3786,标记 “3”
3786<churn_var[‘TotalCharges’]<=8684,标记 “4”

#用四分位数进行离散 
churn_var['TotalCharges']=pd.qcut(churn_var['TotalCharges'],4,labels=['1','2','3','4'])
churn_var['TotalCharges'].head()

【输出】

9.3 分类数据转换成“整数编码”

9.3.1 查看churn_var中分类变量的label(标签)

#自定义函数获取分类变量中的label
def Label(x):
    print(x,"--" ,churn_var[x].unique()) 
#筛选出数据类型为“object”的数据点
df_object=churn_var.select_dtypes(['object']) 
print(list(map(Label,df_object)))

结果如下:

通过同行百分比的“交叉分析”发现,label “No internetserive”的人数占比在以下特征[OnlineSecurity,OnlineBackup,DeviceProtection,TechSupport,StreamingTV,StreamingTV]都是惊人的一致,故我们可以判断label “No internetserive”不影响流失率。
因为这6项增值服务,都是需要开通“互联网服务”的基础上才享受得到的。不开通“互联网服务”视为没开通这6项增值服务,故可以将 6个特正中的“No internetserive” 并到 “No”里面。

churn_var.replace(to_replace='No internet service',value='No',inplace=True)

而特征MultipleLines的“ No phoneservice”在流失客户、留存客户样本中的人数占比几乎接近,且比较少,故可以将“ No phoneservice”并到“No”。

churn_var.replace(to_replace='No phone service',value='No',inplace=True)
df_object=churn_var.select_dtypes(['object']) 
print(list(map(Label,df_object.columns)))

结果如下:

9.3.2 整数编码

整数编码的方法有两种:

  1. sklearn中的LabelEncoder()
  2. pandas中的factorize
    此处选用 LabelEncoder()
def labelencode(x):
    churn_var[x] = LabelEncoder().fit_transform(churn_var[x])
for i in range(0,len(df_object.columns)):
    labelencode(df_object.columns[i])
print(list(map(Label,df_object.columns)))

结果如下:

9.4 处理“样本不均衡”

分拆变量

x=churn_var
y=df['Churn'].values
print('抽样前的数据特征',x.shape)
print('抽样前的数据标签',y.shape)

【输出】
抽样前的数据特征 (7043, 17)
抽样前的数据标签 (7043,)

处理样本不均衡常用的方式有三种:

  1. 分层抽样
  2. 过抽样
  3. 欠抽样

笔者先后尝试了“分层抽样”和“欠抽样”,前者最终得到的模型中精度最高的是0.63,而后者最终得到的模型中精度最低是0.78,最高是0.84。所以说“抽样方式”的选择极为重要,大家要在这里多试错。

分层抽样

sss=StratifiedShuffleSplit(n_splits=5, test_size=0.2, random_state=0)
print(sss)
print("训练数据和测试数据被分成的组数:",sss.get_n_splits(x,y))
# 分拆训练集和测试集
for train_index, test_index in sss.split(x, y):
    print("train:", train_index, "test:", test_index)
    x_train,x_test=x.iloc[train_index], x.iloc[test_index]
    y_train,y_test=y[train_index], y[test_index]

“过抽样”让模型精度更高,故我选“过抽样”。

from imblearn.over_sampling import SMOTE
model_smote=SMOTE()
x,y=model_smote.fit_sample(x,y)
x=pd.DataFrame(x,columns=churn_var.columns)
#分拆数据集:训练集 和 测试集
x_train,x_test,y_train,y_test=train_test_split(x,y,test_size=0.3,random_state=0)

输出数据集大小

print('过抽样数据特征:', x.shape,
      '训练数据特征:',x_train.shape,
      '测试数据特征:',x_test.shape)

print('过抽样后数据标签:', y.shape,
      '   训练数据标签:',y_train.shape,
      '   测试数据标签:',y_test.shape)

【输出】

过抽样后数据特征: (10348, 17) 训练数据特征: (7243, 17) 测试数据特征: (3105, 17)
过抽样后数据标签: (10348,)    训练数据标签: (7243,)    测试数据标签: (3105,)

10.实战教程-数据建模

使用分类算法

Classifiers=[["Random Forest",RandomForestClassifier()],
             ["Support Vector Machine",SVC()],
             ["LogisticRegression",LogisticRegression()],
             ["KNN",KNeighborsClassifier(n_neighbors=5)],
             ["Naive Bayes",GaussianNB()],
             ["Decision Tree",DecisionTreeClassifier()],
             ["AdaBoostClassifier", AdaBoostClassifier()],
             ["GradientBoostingClassifier", GradientBoostingClassifier()],
             ["XGB", XGBClassifier()],
             ["CatBoost", CatBoostClassifier(logging_level='Silent')]  
]

训练模型

Classify_result=[]
names=[]
prediction=[]
for name,classifier in Classifiers:
    classifier=classifier
    classifier.fit(x_train,y_train)
    y_pred=classifier.predict(x_test)
    recall=recall_score(y_test,y_pred)
    precision=precision_score(y_test,y_pred)
    f1score = f1_score(y_test, y_pred)
    class_eva=pd.DataFrame([recall,precision,f1score])
    Classify_result.append(class_eva)
    name=pd.Series(name)
    names.append(name)
    y_pred=pd.Series(y_pred)
    prediction.append(y_pred)

11.模型评估

names=pd.DataFrame(names)
names=names[0].tolist()
result=pd.concat(Classify_result,axis=1)
result.columns=names
result.index=["recall","precision","f1score"]
result

【输出】
特征工程,采用“标准化”处理量纲差异,采用“分层抽样”处理样本不均衡。
最终模型精度得分,最高分是0.63,是“朴素贝叶斯”模型

特征工程,采用“离散化”处理量纲差异,采用“过抽样”处理样本不均衡。
最终模型精度得分,最高分是0.84,是“XGB”模型

12.基于“XGB”模型输出特征重要性

笔者尝试了两个算法分别输出“特征重要性”:CatBoost算法 和 XGB 算法

  • CatBoost算法
model = CatBoostClassifier()
model.fit(x_train,y_train,eval_set=(x_test, y_test),plot=True)
#特征重要性可视化
catboost=pd.DataFrame(columns=['feature','feature_importance'])
catboost['feature']=model.feature_names_
catboost['feature_importance']=model.feature_importances_
catboost=catboost.sort_values('feature_importance',ascending=False#降序排列
plt.figure(figsize=(10,10))
plt.title('特征重要性')
sns.barplot(x='feature_importance',y='feature',data=catboost)

【输出】-XGB 算法

model_xgb= XGBClassifier()
model_xgb.fit(x_train,y_train)
from xgboost import plot_importance
plot_importance(model_xgb,height=0.5)
plt.show()

【输出】由于 XGB算法精度得分最高,故我们以XGB得到的“特征重要性”进行分析。
【分析】

  1. 第一重要特征:tenure
plt.figure(figsize=(20,4))
sns.countplot(x='tenure',hue='Churn',data=df)

【输出】【分析】
由图可知,流失客户集中在1-5号职位,运营团队需要重点关注1-5号职位。

  1. 第二重要特征:PaymentMethod

【分析】
使用“电子支票”支付的人更容易流失。

  1. 第三重要特征:MonthlyCharges
    查看流失用户、留存用户在付费方面的偏好:
    ‘MonthlyCharges’、’TotalCharges’,离散化后,可进行卡方检验,然后交叉分析。
  • 卡方检验:’MonthlyCharges’、’TotalCharges’
df['MonthlyCharges-']=churn_var['MonthlyCharges']
df['TotalCharges-']=churn_var['TotalCharges']
print('kf_var的卡方检验结果如下:','\n')
KF('MonthlyCharges-')
KF('TotalCharges-')

【输出】
kf_var的卡方检验结果如下:

Churn by MonthlyCharges 的卡方临界值是0.00,小于0.05,表明MonthlyCharges组间有显著性差异,可进行【交叉分析】

Churn by TotalCharges 的卡方临界值是0.00,小于0.05,表明TotalCharges组间有显著性差异,可进行【交叉分析】

  • 交叉分析
for i in ['MonthlyCharges','TotalCharges']:
    print('................Churn BY {}...............'.format(i))
    print(pd.crosstab(df['Churn'],df[i],normalize=0),'\n')

【输出】18.25=<churn_var[‘MonthlyCharges’]<=35.5,标记 “1”
35.5<churn_var[‘MonthlyCharges’]<=70.35,标记 “2”
70.35<churn_var[‘MonthlyCharges’]<=89.85,标记 “3”
89.85=<churn_varf[‘MonthlyCharges’]<=118.75,标记“4”
【分析】
月付费70.35–118.75元的用户更容易流失18=<churn_var[‘TotalCharges’]<=402,标记 “1”
402<churn_var[‘TotalCharges’]<=1397,标记 “2”
1397<churn_var[‘TotalCharges’]<=3786,标记 “3”
3786<churn_var[‘TotalCharges’]<=8684,标记 “4”
【分析】
总付费18–1397元的用户更容易流失

基于”MonthlyCharges”和“TotalCharges”画四分图:
求两个维度的均值

 print('MonthlyCharges的均值是{:.2f},TotalCharges的均值是{:.2f}'.format(df['MonthlyCharges'].mean(),df['TotalCharges'].mean()))

流失客户四分图:

df_1=df[df['Churn']==1#流失客户
df_0=df[df['Churn']==0#留存客户
plt.figure(figsize=(10,10))   
sns.scatterplot('MonthlyCharges','TotalCharges',hue='Churn', palette=plt.cm.RdYlBu,data=df_1)
plt.axhline(y=df['TotalCharges'].mean(),ls="-",c="k")
plt.axvline(x=df['MonthlyCharges'].mean(),ls="-",c="green")

【输出】【分析】
四分图的右下区域,流失客户比较集中,即总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失。

留存客户四分图

plt.figure(figsize=(10,10)) 
sns.scatterplot('MonthlyCharges','TotalCharges',hue='Churn', palette=plt.cm.RdYlBu_r,data=df_0)
plt.axhline(y=df['TotalCharges'].mean(),ls="-",c="k")
plt.axvline(x=df['MonthlyCharges'].mean(),ls="-",c="green")

【输出】【结论】
综合“ 统计分析” 和 “XGB算法输出特征重要性” 得出流失客户有以下特征(依特征重要性从大到小排列):

  1. tenure:1-5号职位的用户比较容易流失
  2. PaymentMethod:使用“电子支票”支付的人
  3. MonthlyCharges 、TotalCharges:总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失
  4. PaperlessBilling:开通电子账单
  5. Partner:单身
  6. OnlineBackup:没开通“在线备份业务”
  7. InternetService:开通了 “Fiber optic 光纤网络”
  8. TechSupport:没开通“技术支持服务”
  9. DeviceProtection:没开通通了“设备保护业务
  10. OnlineSecurity:没开通“网络安全服务”
  11. Contract:“按月”签订合同方式
  12. Dependents:无经济独立
  13. SeniorCitizen :青年人
  14. TotalCharges:总费用在2281.92元以下,月费用在64.76元以上的客户比较容易流失

转自猫有九条命。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化全方位实战教程

本文讲解了Pandas性能优化的几种方法,比如第一篇文章讲到了transform函数的应用、Cython编写C扩展、减少类型转换、使用特殊的数组。第二篇Pandas基础优化讲到了尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。第三篇进阶版优化讲到了一些更高级的优化技巧。

1.Pandas 性能优化 40 倍 – DataFrame

1. 1 性能优化小试牛刀

大名鼎鼎的Pandas是数据分析的神器。有时候我们需要对上千万甚至上亿的数据进行非常复杂处理,那么运行效率就是一个不能忽视的问题。

比如下面这个简单例子,我们随机生成100万条数据,对val这一列进行处理:如果是偶数则减1,奇数则加1。实际的数据分析工作要比这个例子复杂的多,但考虑到我们没有那么多时间等待运行结果,所以就偷个懒吧。可以看到transform函数的平均运行时间是284ms:

import pandas as pd
import numpy as np

def gen_data(size):
    d = dict()
    d["genre"] = np.random.choice(["A""B""C""D"], size=size)
    d["val"] = np.random.randint(low=0, high=100, size=size)
    return pd.DataFrame(d)

data = gen_data(1000000)
data.head()
def transform(data):
    data.loc[:, "new_val"] = data.val.apply(lambda x: x + 1 if x % 2 else x - 1)

%timeit -n 1 transform(data)
284 ms ± 8.95 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.2. 用Cython编写C扩展

试试用我们的老朋友Cython来写一下 x + 1 if x % 2 else x - 1 这个函数。平均运行时间降低到了202ms,果然速度变快了。性能大约提升了1.4倍,离40倍的flag还差的好远。

%load_ext cython
%%cython
cpdef int _transform(int x):
    if x % 2:
        return x + 1
    return x - 1

def transform(data):
    data.loc[:, "new_val"] = data.val.apply(_transform)

%timeit -n 1 transform(data)
202 ms ± 13.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.3. 减少类型转换

为了减少C和Python之间的类型转换,我们直接把val这一列作为Numpy数组传递给Cython函数,注意区分cnpnp。平均运行时间直接降到10.8毫秒,性能大约提升了26倍,仿佛看到了一丝希望。

%%cython
import numpy as np
cimport numpy as cnp
ctypedef cnp.int_t DTYPE_t

cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr):
    cdef:
        int i = 0
        int n = arr.shape[0]
        int x
        cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)

    while i < n:
        x = arr[i]
        if x % 2:
            new_arr[i] = x + 1
        else:
            new_arr[i] = x - 1
        i += 1
    return new_arr

def transform(data):
    data.loc[:, "new_val"] = _transform(data.val.values)

%timeit -n 1 transform(data)
10.8 ms ± 512 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

1.4. 使用不安全的数组

利用@cython.boundscheck(False)@cython.wraparound(False)装饰器关闭数组的边界检查和负下标处理,平均运行时间变为5.9毫秒。性能提升了42倍左右,顺利完成任务。

%%cython
import cython
import numpy as np
cimport numpy as cnp
ctypedef cnp.int_t DTYPE_t

@cython.boundscheck(False)
@cython.wraparound(False)
cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr):
    cdef:
        int i = 0
        int n = arr.shape[0]
        int x
        cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)

    while i < n:
        x = arr[i]
        if x % 2:
            new_arr[i] = x + 1
        else:
            new_arr[i] = x - 1
        i += 1
    return new_arr

def transform(data):
    data.loc[:, "new_val"] = _transform(data.val.values)

%timeit -n 1 transform(data)
6.76 ms ± 545 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

来源:Python中文社区

作者:李小文,先后从事过数据分析、数据挖掘工作,主要开发语言是Python,现任一家小型互联网公司的算法工程师。https://github.com/tushushu

2.Pandas性能优化:基础篇

Pandas 号称“数据挖掘瑞士军刀”,是数据处理最常用的库。在数据挖掘或者kaggle比赛中,我们经常使用pandas进行数据提取、分析、构造特征。而如果数据量很大,操作算法复杂,那么pandas的运行速度可能非常慢。本文根据实际工作中的经验,总结了一些pandas的使用技巧,帮助提高运行速度或减少内存占用。

2.1 按行迭代优化

很多时候,我们会按行对dataframe进行迭代,一般我们会用iterrows这个函数。在新版的pandas中,提供了一个更快的itertuples函数。

我们测试一下速度:

import pandas as pd
import numpy as np
import time
df = pd.DataFrame({'a': np.random.randn(1000),
                     'b': np.random.randn(1000),
                    'N': np.random.randint(100, 1000, (1000)),
                   'x':  np.random.randint(1, 10, (1000))})

%%timeit
a2=[]
for index,row in df.iterrows():
    temp=row['a']
    a2.append(temp*temp)
df['a2']=a2    

67.6 ms ± 3.69 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)

%%timeit
a2=[]
for row in df.itertuples():
    temp=getattr(row, 'a')
    a2.append(temp*temp)
df['a2']=a2

1.54 ms ± 168 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看到直接循环,itertuples速度是iterrows的很多倍。

所以如果在必须要对dataframe进行遍历的话,直接用itertuples替换iterrows。

2.2 apply 优化

一般情况下,如果要对dataframe里的数据逐行处理,而不需要上下文信息,可以使用apply函数。
对于上面的例子,我们使用apply看下:

%%timeit
df['a2']=df.a.apply(lambda x: x*x)

360 µs ± 355 ns per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看到,效率又有提升,apply的速度是itertuples的5倍左右。

注意一下,apply不光能对单个列值做处理,也能对多个列的值做处理。

%%timeit
df['a3']=df.apply( lambda row: row['a']*row['b'],axis=1)

15 ms ± 1.61 ms per loop (mean ± std. dev. of 7 runs, 100 loops each)

这里看出来,多值处理的时候几乎等于iterrows。因此比单列值apply慢了许多,所以这里不推荐对整行进行apply。

我们可以简单的这样改写:

%%timeit
df['a3']=df['a']*df['b']

204 µs ± 8.31 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

如果在计算的时候,使用series.values 提取numpy 数组并使用numpy原生函数计算,效率可能更高

%%timeit
df['a3']=np.multiply(df['a'].values,df['b'].values)

93.3 µs ± 1.45 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

2.3 聚合agg效率优化

有的时候,我们会对一列数据按值进行分组(groupby),再分组后,依次对每一组数据中的其他列进行聚合(agg)。

还是上面的那个dataframe,我们看下:
采用自定义函数的agg函数:

%%timeit
df.groupby("x")['a'].agg(lambda x:x.sum())

1.27 ms ± 45.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

采用agg内置函数:

%%timeit
df.groupby("x")['a'].agg(sum)
#等价df.groupby("x")['a'].sum()

415 µs ± 20.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

刚才是单列聚合,我们看下多列聚合:
自定义函数:

%%timeit
df.groupby("x").agg({"a":lambda x:x.sum(),"b":lambda x:x.count()})

2.6 ms ± 8.17 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)

采用内置函数:

%%timeit
df.groupby("x").agg({"a":"sum","b":"count"})

1.33 ms ± 29.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)

可以看出,对于多列聚合,内置函数仍然比自定义函数快1倍。所以再进行聚合操作时,尽量使用内置函数提高效率。

下面列出了一些内置函数:

内置函数描述
count计数
sum求和
mean平均值
median中位数
min,max最大值/最小值
std,var标准差/方差
prod求积

当我们需要统计的方法没有对于内置函数的情况下,在自定义函数的时候,优先选用pandas或numpy内置的其他高效函数,如

df.groupby("x")['a'].agg(lambda x:np.sum(np.abs(x)))

在数据量很大的时候要比非numpy函数效率高(数据量小的时候差不多)。

2.4 数据读取优化

如果我们需要用pandas读取较多次文件,或者读取的文件较大,那么这方面占用的时间也会较长,我们也需要对其进行优化。pandas最常见读取函数是read_csv函数,可以对csv文件进行读取。但是pandas read_csv对读取较大的、数据结构复杂的文件,效率不是很理想。

这里有几种方法可以优化:

1. 分块读取csv文件
如果文件过大,内存不够或者时间上等不及,可以分块进行读取。

chunksize = 10 ** 6for chunk in pd.read_csv(filename,  chunksize=chunksize):
     process(chunk)

这里也可以使用多进程方式,具体我们在后面进阶篇介绍。

  1. 过滤掉不需要的列

如果读取的文件列很多,可以使用usecols字段,只load需要的列,提高效率,节约内存。

df = pd.read_csv(filename,  usecols=["a","b","c"]):
  1. 为列指定类型
    panda在read_csv的时候,会自动匹配列的数值类型,这样会导致速度很慢,并且占用内存较大。我们可以为每个列指定类型。
df = pd.read_csv("f500.csv", dtype = {"revenues" : "float64","name":str})
  1. 保存为其他格式
    如果需要频繁的读取和写入,则可以将文件保存为其他格式的,如pickle或hdf。pickle和hdf读取速度是csv的数倍。这里注意一下,pickle比原csv略小,hdf比原csv略大。
 import pandas as pd
 #读取csv
 df = pd.read_csv('filename.csv')
 
 #pickle格式
 df.to_pickle('filename.pkl') 
 df = pd.read_pickle('filename.pkl') 
 
 #hdf格式
 df.to_hdf('filename.hdf','df') 
 df = pd.read_hdf('filename.hdf','df') 

file typetimespeed
csv1.93 s1
pickle415 m4.6
hdf808 ms2.3
  1. 用第三方的包读取
    如可以使用Dask DataFrame读取大文件。第三方包放到最后细讲。

2.5 优化数据处理逻辑

这点算是业务角度优化。如果我们能直接数据处理的步骤,那么处理时间就少了很多。

这需要具体问题具体分析,举个例子,假设我们有一个通讯记录数据集:

call_areacall_seconds
03364
23075
25847
12032

call_seconds 是拨打的时长,单位是秒。
call_area=1 是拨打国内电话,费率是0.1/min,call_area=0 是拨打国外电话,费率是0.7/min
call_area=2 是接听电话,不收费。

我们随机生成一批数据:

import pandas as pd
import numpy as np
import time
import math
row_number=10000
df = pd.DataFrame({'call_area':  np.random.randint(0, 3, (row_number)),
                   'call_seconds':  np.random.randint(0, 10000, (row_number))})

如果我们想计算每个电话的费用:

def get_cost(call_area,call_seconds):
    if call_area==1:
        rate=0.1
    elif call_area==0:
        rate=0.7
    else:
        return 0
    return math.ceil(call_seconds/60)*rate

方法1,采用按行迭代循环

%%timeit
cost_list = []
for i,row in df.iterrows():
    call_area=row['call_area']
    call_seconds=row['call_seconds']
    cost_list.append(get_cost(call_area,call_seconds))
df['cost'] = cost_list

方法2,采用apply行的方式

%%timeit
df['cost']=df.apply(
         lambda row: get_cost(
             row['call_area'],
             row['call_seconds']),
         axis=1)

方法3,采用mask+loc,分组计算

%%timeit
mask1=df.call_area==1
mask2=df.call_area==0
mask3=df.call_area==2
df['call_mins']=np.ceil(df['call_seconds']/60)
df.loc[mask1,'cost']=df.loc[mask1,'call_mins']*0.1
df.loc[mask2,'cost']=df.loc[mask2,'call_mins']*0.7
df.loc[mask3,'cost']=0

方法4,使用numpy的方式,可以使用index的方式找到对应的费用。

%%timeit
prices = np.array([0.7, 0.1, 0])
mins=np.ceil(df['call_seconds'].values/60)
df['cost']=prices[df.call_area]*mins

测试结果:

方法运行时间运行速度
iterrows513 ms1
apply181 ms2.8
loc6.44 ms79.6
numpy219 µs2342

方法4速度快是因为它采用了numpy向量化的数据处理方式。

总结

在优化尽量使用内置原生函数,写代码尽量避免循环,尽量写能够向量化计算的代码,最后别忘了按照自己业务需求进行算法优化。

3.Pandas性能优化:进阶篇

在这里介绍一些更高级的pandas优化方法。

3.1 numpy

我们先来回顾一下上节说过的一个例子

import pandas as pd
import numpy as np
import time
row_number=100000
df = pd.DataFrame({'a': np.random.randn(row_number),
                     'b': np.random.randn(row_number),
                    'N': np.random.randint(100, 1000, (row_number)),
                   'x':  np.random.randint(1, 10, (row_number))})

我们要计算a列与b列的乘积

方法1,采用apply

%timeit df.apply( lambda row: row['a']*row['b'],axis=1)

方法2,直接对series做乘法

%timeit df['a']*df['b']

方法3,使用numpy函数

%timeit  np.multiply(df['a'].values,df['b'].values)
方法运行时间运行速度
方法11.45s1
方法2254µs5708
方法341.2 µs3536

这提示我们,采用一些好的方法可以大幅度提高pandas的运行速度。

3.2 cython

我们还继续使用上面的dataframe,现在定义一个函数:

def f(x):
    return x * (x - 1)

def integrate_f(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f(a + i * dx)
    return s * dx

我们要计算每一行integrate_f的值,

方法1,还是apply:

%timeit df.apply(lambda x: integrate_f(x['a'], x['b'], x['N'].astype(int)), axis=1)

这个函数运行时间就较长了:

7.05 s ± 54.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

考虑可以用cython重写程序,提高效率。
在使用cython的时候,可能需要安装gcc环境或者mingw(windows)。

方法1,直接加头编译

%load_ext Cython
%%cython
def f_plain(x):
    return x * (x - 1)

def integrate_f_plain(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

6.46 s ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

看来直接加头,效率提升不大。

方法2,使用c type

%%cython
cdef double f_typed(double x) except? -2:
     return x * (x - 1)
cpdef double integrate_f_typed(double a, double b, int N):
    cdef int i
    cdef double s, dx
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_typed(a + i * dx)
    return s * dx

345 ms ± 529 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

可以看到,使用cython的特定编程方法,效率提升较大。

3.3 numba

numba是一个动态JIT编译器,在一些数值计算中可以大幅度提高运行速度。
我们学cython,在python程序上直接加numba jit的头。

import numba

@numba.jitdef f_plain(x):
    return x * (x - 1)

@numba.jitdef integrate_f_numba(a, b, N):
    s = 0
    dx = (b - a) / N
    for i in range(N):
        s += f_plain(a + i * dx)
    return s * dx

@numba.jitdef apply_integrate_f_numba(col_a, col_b, col_N):
    n = len(col_N)
    result = np.empty(n, dtype='float64')
    assert len(col_a) == len(col_b) == n
    for i in range(n):
        result[i] = integrate_f_numba(col_a[i], col_b[i], col_N[i])
    return result

def compute_numba(df):
    result = apply_integrate_f_numba(df['a'].to_numpy(),
                                     df['b'].to_numpy(),
                                     df['N'].to_numpy())
    return pd.Series(result, index=df.index, name='result')

 %timeit compute_numba(df)

6.44 ms ± 440 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

我们看到,使用numba,需要做的代码改动较小,效率提升幅度却很大!

3.4 进阶 并行化处理

并行化读取数据

在基础篇讲分块读取时,简单提了一下并行化处理,这里详细说下代码。

第一种思路,分块读取,多进程处理。

import pandas as pd
from multiprocessing import Pool
 
def process(df):
    """
  数据处理
    """
    pass
 
# initialise the iterator object
iterator = pd.read_csv('train.csv', chunksize=200000, compression='gzip',
skipinitialspace=True, encoding='utf-8')
# depends on how many cores you want to utilise
max_processors = 4# Reserve 4 cores for our script
pool = Pool(processes=max_processors)
f_list = []
for df in iterator:
    # 异步处理每个分块
    f = pool.apply_async(process, [df])
    f_list.append(f)
    if len(f_list) >= max_processors:
        for f in f_list:
            f.get()
            del f_list[:]

第二种思路,把大文件拆分成多份,多进程读取。

利用linux中的split命令,将csv切分成p个文件。

!split -l 200000 -d train.csv train_split
#将文件train.csv按每200000行分割,前缀名为train_split,并设置文件命名为数字

代码部分

from multiprocessing import Pool
import pandas as pd
import os
 
def read_func(file_path):
    df = pd.read_csv(file_path, header=None)
    return df
 
def read_file():
    file_list=["train_split%02d"%i for i in range(66)]
    p = Pool(4)
    res = p.map(read_func, file_list)
    p.close()
    p.join()
    df = pd.concat(res, axis=0, ignore_index=True)
    return df
 
df = read_file()

并行化apply

apply的func如果在用了我们之前说的技术优化了速度之后仍然很慢,或者func遇到网络阻塞,那么我们需要去并行化执行apply。这里提供一种处理思路:

import multiprocessing as mp
import time
def slow_func(s):
    time.sleep(1)
    return "done"with mp.Pool(mp.cpu_count()) as pool:
    df['newcol'] = pool.map(slow_func, df['qid'])

3.5 进阶 第三方pandas库

由于padans的操作如apply,都是单线程的,直接调用效率不高。我可以使用第三方库进行并行操作。
当然第三方库会带来新的代码不兼容问题。我们有时候会考虑像上一章一样,手写并行化处理。这个权衡需要我们在编程之初就要规划好,避免后期因为bug需要重构。

dask库

pip install dask

类pandas库,可以并行读取、运行。

import pandas as pd
import dask.dataframe as dd
from dask.multiprocessing import getand the syntax isdata = <your_pandas_dataframe>
ddata = dd.from_pandas(data, npartitions=30)

def some_function(x,y,z):
    return x+y+z

res = ddata.map_partitions(lambda df: df.apply((lambda row: myfunc(*row)), axis=1))
.compute(get=get)  

swifter

pip install swifter

pandas的插件,可以直接在pandas上操作:

import swifter

def some_function(data):
    return data * 10

data['out'] = data['in'].swifter.apply(some_function)

Modin库

Modin后端使用dask或者ray,是个支持分布式运行的类pandas库,当然功能异常强大。具体请看官网,这里就不具体介绍了。

https://modin.readthedocs.io/en/latest/using_modin.html

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Selenium 实战教程-爬取人民网留言板单进程、多线程、多进程版

本文讲解了如何通过selenium爬取人民网留言板留言,并从单进程、多线程、多进程版角度进行爬虫性能优化,是一篇优秀的 selenium 实战教程。

第一节:单进程版+selenium实战教程

一、项目概述

1.项目说明

本项目主要是对领导留言板内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。网站链接是http://liuyan.people.com.cn/home?p=0,任意选择一条留言点击进入详情页后,如下对于图中标出的数据,均要进行爬取,以此构成一条留言的组成部分。

2.环境配置

(1)Python:3.x
(2)所需库:

  • dateutil
    • 安装方法:
    • pip install python-dateutil
  • selenium
    • 安装方法:
    • pip install selenium

(3)模拟驱动:chromedriver,可点击https://download.csdn.net/download/CUFEECR/12193208进行下载Google浏览器80.0.3987.16版对应版本,或点击http://chromedriver.storage.googleapis.com/index.html下载与Google对应版本,并放入Python对应安装路径下的Scripts目录下。

二、项目实施

1.导入所需要的库

import csv
import os
import random
import re
import time

import dateutil.parser as dparser
from random import choice
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options

主要导入在爬取过程中需要用到的处理库和selenium中要用到的类。

2.全局变量和参数配置

## 时间节点
start_date = dparser.parse('2019-06-01')
## 浏览器设置选项
chrome_options = Options()
chrome_options.add_argument('blink-settings=imagesEnabled=false')

我们假设只爬取2019.6.1以后的留言,因为这之前的留言自动给好评,没有参考价值,因此设置时间节点,并禁止网页加载图片,减少对网络的带宽要求、提升加载速率。

3.产生随机时间和用户代理

def get_time():
    '''获取随机时间'''
    return round(random.uniform(36), 1)


def get_user_agent():
    '''获取随机用户代理'''
    user_agents = [
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
        "Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
        "Mozilla/5.0 (iPod; U; CPU iPhone OS 2_1 like Mac OS X; ja-jp) AppleWebKit/525.18.1 (KHTML, like Gecko) Version/3.1.1 Mobile/5F137 Safari/525.20",
        "Mozilla/5.0 (Linux;u;Android 4.2.2;zh-cn;) AppleWebKit/534.46 (KHTML,like Gecko) Version/5.1 Mobile Safari/10600.6.3 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
        "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    ]
    ## 在user_agent列表中随机产生一个代理,作为模拟的浏览器
    user_agent = choice(user_agents)
    return user_agent

产生随机时间并随机模拟浏览器用于访问网页,降低被服务器识别出是爬虫而被禁的可能。

4.获取领导的fid

def get_fid():
    '''获取所有领导id'''
    with open('url_fid.txt''r'as f:
        content = f.read()
        fids = content.split()
    return fids

每个领导都有一个fid用于区分,这里采用手动获取fid并保存到txt中,在开始爬取时再逐行读取。

5.获取领导所有留言链接

def get_detail_urls(position, list_url):
    '''获取每个领导的所有留言链接'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    drivertemp = webdriver.Chrome(options=chrome_options)
    drivertemp.maximize_window()
    drivertemp.get(list_url)
    time.sleep(2)
    ## 循环加载页面
    while True:
        datestr = WebDriverWait(drivertemp, 10).until(
            lambda driver: driver.find_element_by_xpath(
                '//*[@id="list_content"]/li[position()=last()]/h3/span')).text.strip()
        datestr = re.search(r'\d{4}-\d{2}-\d{2}', datestr).group()
        date = dparser.parse(datestr, fuzzy=True)
        print('正在爬取链接 --', position, '--', date)
        if date < start_date:
            break
        ## 模拟点击加载
        try:
            WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.ID, "show_more")))
            drivertemp.execute_script('window.scrollTo(document.body.scrollHeight, document.body.scrollHeight - 600)')
            time.sleep(get_time())
            drivertemp.execute_script('window.scrollTo(document.body.scrollHeight - 600, document.body.scrollHeight)')
            WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.XPATH, '//*[@id="show_more"]')))
            drivertemp.find_element_by_xpath('//*[@id="show_more"]').click()
        except:
            break
        time.sleep(get_time() - 1)
    detail_elements = drivertemp.find_elements_by_xpath('//*[@id="list_content"]/li/h2/b/a')
    ## 获取所有链接
    for element in detail_elements:
        detail_url = element.get_attribute('href')
        yield detail_url
    drivertemp.quit()

根据第4步提供的fid找到一个领导对应的所有留言的链接,由于领导的留言列表并未一次显示完,下方有一个加载更多按钮,如下每次需要进行点击向下加载,所以要模拟点击的操作,向下滑动,等完全加载后再次点击,直到底部。函数返回值时,不是一次返回一个列表,而是通过yield关键字生成生成器,按照程序执行的进度生成url,可以减少内存的压力。

6.获取留言详情

def get_message_detail(driver, detail_url, writer, position):
    '''获取留言详情'''
    print('正在爬取留言 --', position, '--', detail_url)
    driver.get(detail_url)
    ## 判断,如果没有评论则跳过
    try:
        satis_degree = WebDriverWait(driver, 2.5).until(
            lambda driver: driver.find_element_by_class_name("sec-score_firstspan")).text.strip()
    except:
        return
    ## 获取留言各部分内容
    message_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/h3/span")).text
    message_date = re.search(r'\d{4}-\d{2}-\d{2}', message_date_temp).group()
    message_datetime = dparser.parse(message_date, fuzzy=True)
    if message_datetime < start_date:
        return
    message_title = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_class_name("context-title-text")).text.strip()
    label_elements = WebDriverWait(driver, 2.5).until(lambda driver: driver.find_elements_by_class_name("domainType"))
    try:
        label1 = label_elements[0].text.strip()
        label2 = label_elements[1].text.strip()
    except:
        label1 = ''
        label2 = label_elements[0].text.strip()
    message_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/p")).text.strip()
    replier = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[1]/i")).text.strip()
    reply_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/p")).text.strip()
    reply_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[2]/em")).text
    reply_date = re.search(r'\d{4}-\d{2}-\d{2}', reply_date_temp).group()
    review_scores = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_elements_by_xpath("/html/body/div[8]/ul/li[2]/h4[1]/span/span/span"))
    resolve_degree = review_scores[0].text.strip()[:-1]
    handle_atti = review_scores[1].text.strip()[:-1]
    handle_speed = review_scores[2].text.strip()[:-1]
    review_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/p")).text.strip()
    is_auto_review = '是' if (('自动默认好评' in review_content) or ('默认评价' in review_content)) else '否'
    review_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/h4[2]/em")).text
    review_date = re.search(r'\d{4}-\d{2}-\d{2}', review_date_temp).group()
    ## 存入CSV文件
    writer.writerow(
        [position, message_title, label1, label2, message_date, message_content, replier, reply_content, reply_date,
         satis_degree, resolve_degree, handle_atti, handle_speed, is_auto_review, review_content, review_date])

我们只需要有评论的留言,因此在最开始要过滤掉没有评论的留言。然后通过xpath、class_name等方式定位到相应的元素获取留言的各个部分的内容,每条留言共保存14个内容,并保存到csv中。

7.获取并保存领导所有留言

def get_officer_messages(index, fid):
    '''获取并保存领导的所有留言'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    driver = webdriver.Chrome(options=chrome_options)
    list_url = "http://liuyan.people.com.cn/threads/list?fid={}##state=4".format(fid)
    driver.get(list_url)
    position = WebDriverWait(driver, 10).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[4]/i")).text
    ## time.sleep(get_time())
    print(index, '-- 正在爬取 --', position)
    start_time = time.time()
    ## encoding='gb18030'
    csv_name = position + '.csv'
    ## 文件存在则删除重新创建
    if os.path.exists(csv_name):
        os.remove(csv_name)
    with open(csv_name, 'a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for detail_url in get_detail_urls(position, list_url):
            get_message_detail(driver, detail_url, writer, position)
            time.sleep(get_time())
    end_time = time.time()
    crawl_time = int(end_time - start_time)
    crawl_minute = crawl_time // 60
    crawl_second = crawl_time % 60
    print(position, '已爬取结束!!!')
    print('该领导用时:{}分钟{}秒。'.format(crawl_minute, crawl_second))
    driver.quit()
    time.sleep(5)

获取该领导的职位信息并为该领导创建一个独立的csv用于保存提取到的留言信息,调用get_message_detail()方法获取每条留言的具体信息并保存,计算出每个领导的执行时间。

8.合并文件

def merge_csv():
    '''将所有文件合并'''
    file_list = os.listdir('.')
    csv_list = []
    for file in file_list:
        if file.endswith('.csv'):
            csv_list.append(file)
    ## 文件存在则删除重新创建
    if os.path.exists('DATA.csv'):
        os.remove('DATA.csv')
    with open('DATA.csv''a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for csv_file in csv_list:
            with open(csv_file, 'r', encoding='gb18030'as csv_f:
                reader = csv.reader(csv_f)
                line_count = 0
                for line in reader:
                    line_count += 1
                    if line_count != 1:
                        writer.writerow(
                            (line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7], line[8],
                             line[9], line[10], line[11], line[12], line[13], line[14], line[15]))

将爬取的所有领导的数据进行合并。

9.主函数调用

def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    for index, fid in enumerate(fids):
        try:
            get_officer_messages(index + 1, fid)
        except:
            get_officer_messages(index + 1, fid)
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()

主函数中先获取领导所有留言,再合并所有数据文件,完成整个爬取过程,并统计整个程序的运行时间,便于分析运行效率。

三、结果、分析及说明

1.结果说明

完整代码和测试执行结果可点击https://download.csdn.net/download/CUFEECR/12198734下载,仅供交流学习,请勿滥用。整个执行过程较长,因为是单线程的,必须要等一个领导数据爬取完毕之后才能爬取下一个,我选择了10个领导进行测试,在云服务器中的运行结果分别如下显然,整个运行时间将近5小时,效率相对较低,有很大的提升空间。最终得到了合并的DATA.csv

2.改进分析

(1)该版本的代码未实现自动爬取所有的fid,需要手动保存,是其中一点不足,可以在后期改进。(2)爬取留言详情页也是采用的selenium模拟,会降低请求效率,可以考虑用requests库请求。(3)该版本是单进程(线程)的,必须要一个领导爬取完之后才能进行下一个领导的爬取,效率较低,特别是留言较多的领导耗时很长,可以考虑使用多进程或多线程进行优化。

第二节:多线程版+selenium实战教程

一、项目概述

本项目主要是对领导留言板http://liuyan.people.com.cn/home?p=0内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。具体项目说明和环境配置可参考本系列的第一篇Python 爬取留言板留言(一):单进程版+selenium模拟。本篇在第一篇的基础上做了一些改进

  1. 采用了多线程,设定同时运行的线程的数量为3,线程数量适中,这样在保证在同一时刻有多个线程在执行爬取的同时,也能避免线程过多对内存、CPU和网络带宽的高要求,从而大大降低了整体运行时间,这是该项目的主要改进。
  2. 对异常处理进行了优化,之前异常处理是放在获取一个领导对应的所有的留言链接函数里的,当获取不到加载更多按钮并且超时时就会抛出异常,这样使得如果异常发生在其他部分如获取留言详情时会被忽略,改进之后将其放入主函数,对于每一个领导都放入异常处理,这里涵盖了对该领导爬取时的所有操作,只要在任一环节报错都会捕捉到,同时增加了5层嵌套异常处理,增加了对出现异常的容忍度(在发生网络环境不好而加载不出页面、内存消耗较多而卡顿、被被爬取方反爬而不能爬取等情况时,可以对官员重新爬取以保证数据的完整程度和程序的健壮性)。

二、项目实施

由于在实现过程中有3种常用的方法实现多线程,因此对应也有3种不同的具体实现,这里选第1种进行说明:

1.导入所需要的库

import csv
import os
import random
import re
import time
import threading

import dateutil.parser as dparser
from random import choice
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options

主要导入在爬取过程中需要用到的处理库和selenium中要用到的类。

2.全局变量和参数配置

## 时间节点
start_date = dparser.parse('2019-06-01')
## 控制同时运行的线程数为3
sem = threading.Semaphore(3)
## 浏览器设置选项
chrome_options = Options()
chrome_options.add_argument('blink-settings=imagesEnabled=false')

我们假设只爬取2019.6.1以后的留言,因为这之前的留言自动给好评,没有参考价值,因此设置时间节点,同时在全局中设置同时运行的线程数为3,并禁止网页加载图片,减少对网络的带宽要求、提升加载速率。

3.产生随机时间和用户代理

def get_time():
    '''获取随机时间'''
    return round(random.uniform(36), 1)


def get_user_agent():
    '''获取随机用户代理'''
    user_agents = [
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
        "Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
        "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 2.0.50727; Media Center PC 6.0)",
        "Mozilla/5.0 (Linux; Android 5.1.1; Nexus 6 Build/LYZ28E) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.23 Mobile Safari/537.36",
        "Mozilla/5.0 (iPod; U; CPU iPhone OS 2_1 like Mac OS X; ja-jp) AppleWebKit/525.18.1 (KHTML, like Gecko) Version/3.1.1 Mobile/5F137 Safari/525.20",
        "Mozilla/5.0 (Linux;u;Android 4.2.2;zh-cn;) AppleWebKit/534.46 (KHTML,like Gecko) Version/5.1 Mobile Safari/10600.6.3 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
        "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    ]
    ## 在user_agent列表中随机产生一个代理,作为模拟的浏览器
    user_agent = choice(user_agents)
    return user_agent

产生随机时间并随机模拟浏览器用于访问网页,降低被服务器识别出是爬虫而被禁的可能。

4.获取领导的fid

def get_fid():
    '''获取所有领导id'''
    with open('url_fid.txt''r'as f:
        content = f.read()
        fids = content.split()
    return fids

每个领导都有一个fid用于区分,这里采用手动获取fid并保存到txt中,在开始爬取时再逐行读取。

5.获取领导所有留言链接

def get_detail_urls(position, list_url):
    '''获取每个领导的所有留言链接'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    drivertemp = webdriver.Chrome(options=chrome_options)
    drivertemp.maximize_window()
    drivertemp.get(list_url)
    time.sleep(2)
    ## 循环加载页面
    try:
        while WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.ID, "show_more"))):
            datestr = WebDriverWait(drivertemp, 10).until(
                lambda driver: driver.find_element_by_xpath(
                    '//*[@id="list_content"]/li[position()=last()]/h3/span')).text.strip()
            datestr = re.search(r'\d{4}-\d{2}-\d{2}', datestr).group()
            date = dparser.parse(datestr, fuzzy=True)
            print('正在爬取链接 --', position, '--', date)
            if date < start_date:
                break
            ## 模拟点击加载
            drivertemp.find_element_by_xpath('//*[@id="show_more"]').click()
            time.sleep(get_time())
        detail_elements = drivertemp.find_elements_by_xpath('//*[@id="list_content"]/li/h2/b/a')
        ## 获取所有链接
        for element in detail_elements:
            detail_url = element.get_attribute('href')
            yield detail_url
        drivertemp.quit()
    except TimeoutException:
        drivertemp.quit()
        get_detail_urls(position, list_url)

根据第4步提供的fid找到一个领导对应的所有留言的链接,由于领导的留言列表并未一次显示完,下方有一个加载更多按钮,如下每次需要进行点击向下加载,所以要模拟点击的操作,向下滑动,等完全加载后再次点击,直到底部,有可能会滑倒页面最底部不再显示按钮或者由于被反爬或网络不好而未加载出来,此时定位元素会超时,增加异常处理,递归调用。函数返回值时,不是一次返回一个列表,而是通过yield关键字生成生成器,按照程序执行的进度生成url,可以减少内存的压力。

6.获取留言详情

def get_message_detail(driver, detail_url, writer, position):
    '''获取留言详情'''
    print('正在爬取留言 --', position, '--', detail_url)
    driver.get(detail_url)
    ## 判断,如果没有评论则跳过
    try:
        satis_degree = WebDriverWait(driver, 2.5).until(
            lambda driver: driver.find_element_by_class_name("sec-score_firstspan")).text.strip()
    except:
        return
    ## 获取留言各部分内容
    message_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/h3/span")).text
    message_date = re.search(r'\d{4}-\d{2}-\d{2}', message_date_temp).group()
    message_datetime = dparser.parse(message_date, fuzzy=True)
    if message_datetime < start_date:
        return
    message_title = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_class_name("context-title-text")).text.strip()
    label_elements = WebDriverWait(driver, 2.5).until(lambda driver: driver.find_elements_by_class_name("domainType"))
    try:
        label1 = label_elements[0].text.strip()
        label2 = label_elements[1].text.strip()
    except:
        label1 = ''
        label2 = label_elements[0].text.strip()
    message_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/p")).text.strip()
    replier = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[1]/i")).text.strip()
    reply_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/p")).text.strip()
    reply_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[2]/em")).text
    reply_date = re.search(r'\d{4}-\d{2}-\d{2}', reply_date_temp).group()
    review_scores = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_elements_by_xpath("/html/body/div[8]/ul/li[2]/h4[1]/span/span/span"))
    resolve_degree = review_scores[0].text.strip()[:-1]
    handle_atti = review_scores[1].text.strip()[:-1]
    handle_speed = review_scores[2].text.strip()[:-1]
    review_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/p")).text.strip()
    is_auto_review = '是' if (('自动默认好评' in review_content) or ('默认评价' in review_content)) else '否'
    review_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/h4[2]/em")).text
    review_date = re.search(r'\d{4}-\d{2}-\d{2}', review_date_temp).group()
    ## 存入CSV文件
    writer.writerow(
        [position, message_title, label1, label2, message_date, message_content, replier, reply_content, reply_date,
         satis_degree, resolve_degree, handle_atti, handle_speed, is_auto_review, review_content, review_date])

我们只需要有评论的留言,因此在最开始要过滤掉没有评论的留言。然后通过xpath、class_name等方式定位到相应的元素获取留言的各个部分的内容,每条留言共保存14个属性,并保存到csv中。

7.获取并保存领导所有留言

user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    driver = webdriver.Chrome(options=chrome_options)
    list_url = "http://liuyan.people.com.cn/threads/list?fid={}##state=4".format(fid)
    driver.get(list_url)
    try:
        position = WebDriverWait(driver, 10).until(
            lambda driver: driver.find_element_by_xpath("/html/body/div[4]/i")).text
        print(index, '-- 正在爬取 --', position)
        start_time = time.time()
        csv_name = position + '.csv'
        ## 文件存在则删除重新创建
        if os.path.exists(csv_name):
            os.remove(csv_name)
        with open(csv_name, 'a+', newline='', encoding='gb18030'as f:
            writer = csv.writer(f, dialect="excel")
            writer.writerow(
                ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
                 '办理速度分''是否自动好评''评价内容''评价日期'])
            for detail_url in get_detail_urls(position, list_url):
                get_message_detail(driver, detail_url, writer, position)
                time.sleep(get_time())
        end_time = time.time()
        crawl_time = int(end_time - start_time)
        crawl_minute = crawl_time // 60
        crawl_second = crawl_time % 60
        print(position, '已爬取结束!!!')
        print('该领导用时:{}分钟{}秒。'.format(crawl_minute, crawl_second))
        driver.quit()
        time.sleep(5)
    except:
        driver.quit()
        get_officer_messages(index, fid)

获取该领导的职位信息并为该领导创建一个独立的csv用于保存提取到的留言信息,增加异常处理递归调用,调用get_message_detail()方法获取每条留言的具体信息并保存,计算出每个领导的执行时间。

8.合并文件

def merge_csv():
    '''将所有文件合并'''
    file_list = os.listdir('.')
    csv_list = []
    for file in file_list:
        if file.endswith('.csv'):
            csv_list.append(file)
    ## 文件存在则删除重新创建
    if os.path.exists('DATA.csv'):
        os.remove('DATA.csv')
    with open('DATA.csv''a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for csv_file in csv_list:
            with open(csv_file, 'r', encoding='gb18030'as csv_f:
                reader = csv.reader(csv_f)
                line_count = 0
                for line in reader:
                    line_count += 1
                    if line_count != 1:
                        writer.writerow(
                            (line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7], line[8],
                             line[9], line[10], line[11], line[12], line[13], line[14], line[15]))

将爬取的所有领导的数据进行合并。

9.主函数调用

多线程的实现主要在这部分,有3种方式实现:

  • 这通过threading.Semaphore()指定线程数量,后边在实现作为线程参数的函数时使用上下文处理器
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    thread_list = []
    ## 将所有线程加入线程列表,便于控制同时执行的线程数量
    for index, fid in enumerate(fids):
        t = threading.Thread(target=get_officer_messages, args=(index + 1, fid))
        thread_list.append([t, fid])
    for thread, fid in thread_list:
        ## 5层嵌套进行异常处理
        try:
            thread.start()
        except:
            try:
                thread.start()
            except:
                try:
                    thread.start()
                except:
                    try:
                        thread.start()
                    except:
                        try:
                            thread.start()
                        except:
                            ## 如果仍出现异常加入失败名单
                            print('该官员爬取失败,已存入失败名单,以备再爬')
                            if not os.path.exists('fid_not_success.txt'):
                                with open('fid_not_success.txt''a+'as f:
                                    f.write(fid)
                            else:
                                with open('fid_not_success.txt''a+'as f:
                                    f.write('\n' + fid)
                            continue
    for thread, fid in thread_list:
        thread.join()
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()
  • 通过concurrent.futures.ThreadPoolExecutor指定线程数量,并调用submit()函数实现线程的调用执行
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    with ThreadPoolExecutor(3as executor:
        for index, fid in enumerate(fids):
            ## 5层嵌套进行异常处理
            try:
                executor.submit(get_officer_messages, index + 1, fid)
            except:
                try:
                    executor.submit(get_officer_messages, index + 1, fid)
                except:
                    try:
                        executor.submit(get_officer_messages, index + 1, fid)
                    except:
                        try:
                            executor.submit(get_officer_messages, index + 1, fid)
                        except:
                            try:
                                executor.submit(get_officer_messages, index + 1, fid)
                            except:
                                ## 如果仍出现异常加入失败名单
                                print('该官员爬取失败,已存入失败名单,以备再爬')
                                if not os.path.exists('fid_not_success.txt'):
                                    with open('fid_not_success.txt''a+'as f:
                                        f.write(fid)
                                else:
                                    with open('fid_not_success.txt''a+'as f:
                                        f.write('\n' + fid)
                                continue
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()
  • 通过concurrent.futures.ThreadPoolExecutor指定线程数量,并调用map()函数实现函数和多个参数的映射来执行线程

def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    with ThreadPoolExecutor(3as executor:
        executor.map(get_officer_messages, range(1, len(fids) + 1), fids)
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()

主函数中先通过多线程获取领导所有留言,再合并所有数据文件,完成整个爬取过程,并统计整个程序的运行时间,便于分析运行效率。

三、结果、分析及说明

1.结果说明

3个完整代码和测试执行结果可点击https://download.csdn.net/download/CUFEECR/12199138下载,欢迎进行测试和交流学习,请勿滥用。整个执行过程相比于单线程大大缩短了时间,我选择了10个领导进行测试,它们的留言数量有差异,以便于发现多线程的优势,在云服务器中的运行结果分别如下运行时间缩短到不到1小时半左右,约等于第一篇单线程的三分之一,因为同一时刻有3个子线程执行,大大降低了运行时间,效率比之前提高很多,加入多线程之后,可以让运行时间较长和较短的相互补充,同时多个线程同时运行,在同一时刻爬取多个领导,很显然大大缩短了运行时间。最后得到了合并的DATA.csv:可以进一步总结多线程的优势 :

  • 易于调度
  • 提高并发性:通过线程可方便有效地实现并发性。进程可创建多个线程来执行同一程序的不同部分。
  • 开销少:创建线程比创建进程要快,所需开销很少。
  • 利于充分发挥多处理器的功能:通过创建多线程进程,每个线程在一个处理器上运行,从而实现应用程序的并发性,使每个处理器都得到充分运行。

2.改进分析

(1)该版本的代码仍未实现自动爬取所有的fid,需要手动保存,是其中一点不足,可以在后期改进。(2)爬取留言详情页仍然采用的selenium模拟,会降低请求效率,可以考虑用requests库请求。(3)该版本对于反爬的措施较弱,因此很多时候会出现异常,比如得到的页面不正常找不到对应的元素,请求时间延长等,可以在之后的版本加入进一步的防反爬措施,进一步增加代码的健壮性。

第三节:多进程版+selenium实战教程

一、项目概述

本项目主要是对领导留言板http://liuyan.people.com.cn/home?p=0内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。具体项目说明和环境配置可参考本系列的第一篇Python 爬取留言板留言(一):单进程版+selenium模拟
本篇在第二篇的基础上做了一个主要改进:
从多线程改变为多进程,设定同时运行的进程的数量为3,数量适中,这样在保证在同一时刻有多个进程在执行爬取的同时,也能避免进程过多对内存、CPU和网络带宽的高要求,从而大大降低了整体运行时间,这是该项目的主要改进。

二、项目实施

由于在实现过程中有2种常用的方法实现多线程,因此对应也有2种不同的具体实现。

1.导入所需要的库

import csv
import os
import random
import re
import time


import dateutil.parser as dparser
from random import choice
from multiprocessing import Pool
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options

主要导入在爬取过程中需要用到的处理库和selenium中要用到的类。

2.全局变量和参数配置

## 时间节点
start_date = dparser.parse('2019-06-01')
## 浏览器设置选项
chrome_options = Options()
chrome_options.add_argument('blink-settings=imagesEnabled=false')

我们假设只爬取2019.6.1以后的留言,因为这之前的留言自动给好评,没有参考价值,因此设置时间节点,并禁止网页加载图片,减少对网络的带宽要求、提升加载速率。

3.产生随机时间和用户代理

def get_time():
    '''获取随机时间'''
    return round(random.uniform(36), 1)


def get_user_agent():
    '''获取随机用户代理'''
    user_agents = [
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1",
        "Mozilla/5.0 (Linux; Android 5.1.1; Nexus 6 Build/LYZ28E) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.23 Mobile Safari/537.36",
        "Mozilla/5.0 (iPod; U; CPU iPhone OS 2_1 like Mac OS X; ja-jp) AppleWebKit/525.18.1 (KHTML, like Gecko) Version/3.1.1 Mobile/5F137 Safari/525.20",
        "Mozilla/5.0 (Linux;u;Android 4.2.2;zh-cn;) AppleWebKit/534.46 (KHTML,like Gecko) Version/5.1 Mobile Safari/10600.6.3 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
        "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    ]
    ## 在user_agent列表中随机产生一个代理,作为模拟的浏览器
    user_agent = choice(user_agents)
    return user_agent

产生随机时间并随机模拟浏览器用于访问网页,降低被服务器识别出是爬虫而被禁的可能。

4.获取领导的fid

def get_fid():
    '''获取所有领导id'''
    with open('url_fid.txt''r'as f:
        content = f.read()
        fids = content.split()
    return fids

每个领导都有一个fid用于区分,这里采用手动获取fid并保存到txt中,在开始爬取时再逐行读取。

5.获取领导所有留言链接

def get_detail_urls(position, list_url):
    '''获取每个领导的所有留言链接'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    drivertemp = webdriver.Chrome(options=chrome_options)
    drivertemp.maximize_window()
    drivertemp.get(list_url)
    time.sleep(2)
    ## 循环加载页面
    try:
        while WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.ID, "show_more"))):
            datestr = WebDriverWait(drivertemp, 10).until(
                lambda driver: driver.find_element_by_xpath(
                    '//*[@id="list_content"]/li[position()=last()]/h3/span')).text.strip()
            datestr = re.search(r'\d{4}-\d{2}-\d{2}', datestr).group()
            date = dparser.parse(datestr, fuzzy=True)
            print('正在爬取链接 --', position, '--', date)
            if date < start_date:
                break
            ## 模拟点击加载
            drivertemp.find_element_by_xpath('//*[@id="show_more"]').click()
            time.sleep(get_time())
        detail_elements = drivertemp.find_elements_by_xpath('//*[@id="list_content"]/li/h2/b/a')
        ## 获取所有链接
        for element in detail_elements:
            detail_url = element.get_attribute('href')
            yield detail_url
        drivertemp.quit()
    except TimeoutException:
        drivertemp.quit()
        get_detail_urls(position, list_url)

根据第4步提供的fid找到一个领导对应的所有留言的链接,由于领导的留言列表并未一次显示完,下方有一个加载更多按钮,如下每次需要进行点击向下加载,所以要模拟点击的操作,向下滑动,等完全加载后再次点击,直到底部,有可能会滑倒页面最底部不再显示按钮或者由于被反爬或网络不好而未加载出来,此时定位元素会超时,增加异常处理,递归调用。
函数返回值时,不是一次返回一个列表,而是通过yield关键字生成生成器,按照程序执行的进度生成url,可以减少内存的压力。

6.获取留言详情

def get_message_detail(driver, detail_url, writer, position):
    '''获取留言详情'''
    print('正在爬取留言 --', position, '--', detail_url)
    driver.get(detail_url)
    ## 判断,如果没有评论则跳过
    try:
        satis_degree = WebDriverWait(driver, 2.5).until(
            lambda driver: driver.find_element_by_class_name("sec-score_firstspan")).text.strip()
    except:
        return
    ## 获取留言各部分内容
    message_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/h3/span")).text
    message_date = re.search(r'\d{4}-\d{2}-\d{2}', message_date_temp).group()
    message_datetime = dparser.parse(message_date, fuzzy=True)
    if message_datetime < start_date:
        return
    message_title = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_class_name("context-title-text")).text.strip()
    label_elements = WebDriverWait(driver, 2.5).until(lambda driver: driver.find_elements_by_class_name("domainType"))
    try:
        label1 = label_elements[0].text.strip()
        label2 = label_elements[1].text.strip()
    except:
        label1 = ''
        label2 = label_elements[0].text.strip()
    message_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/p")).text.strip()
    replier = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[1]/i")).text.strip()
    reply_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/p")).text.strip()
    reply_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[2]/em")).text
    reply_date = re.search(r'\d{4}-\d{2}-\d{2}', reply_date_temp).group()
    review_scores = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_elements_by_xpath("/html/body/div[8]/ul/li[2]/h4[1]/span/span/span"))
    resolve_degree = review_scores[0].text.strip()[:-1]
    handle_atti = review_scores[1].text.strip()[:-1]
    handle_speed = review_scores[2].text.strip()[:-1]
    review_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/p")).text.strip()
    is_auto_review = '是' if (('自动默认好评' in review_content) or ('默认评价' in review_content)) else '否'
    review_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/h4[2]/em")).text
    review_date = re.search(r'\d{4}-\d{2}-\d{2}', review_date_temp).group()
    ## 存入CSV文件
    writer.writerow(
        [position, message_title, label1, label2, message_date, message_content, replier, reply_content, reply_date,
         satis_degree, resolve_degree, handle_atti, handle_speed, is_auto_review, review_content, review_date])

我们只需要有评论的留言,因此在最开始要过滤掉没有评论的留言。然后通过xpath、class_name等方式定位到相应的元素获取留言的各个部分的内容,每条留言共保存14个属性,并保存到csv中。

7.获取并保存领导所有留言

def get_officer_messages(index, fid):
    '''获取并保存领导的所有留言'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    driver = webdriver.Chrome(options=chrome_options)
    list_url = "http://liuyan.people.com.cn/threads/list?fid={}##state=4".format(fid)
    driver.get(list_url)
    try:
        position = WebDriverWait(driver, 10).until(
            lambda driver: driver.find_element_by_xpath("/html/body/div[4]/i")).text
        print(index, '-- 正在爬取 --', position)
        start_time = time.time()
        csv_name = position + '.csv'
        ## 文件存在则删除重新创建
        if os.path.exists(csv_name):
            os.remove(csv_name)
        with open(csv_name, 'a+', newline='', encoding='gb18030'as f:
            writer = csv.writer(f, dialect="excel")
            writer.writerow(
                ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
                 '办理速度分''是否自动好评''评价内容''评价日期'])
            for detail_url in get_detail_urls(position, list_url):
                get_message_detail(driver, detail_url, writer, position)
                time.sleep(get_time())
        end_time = time.time()
        crawl_time = int(end_time - start_time)
        crawl_minute = crawl_time // 60
        crawl_second = crawl_time % 60
        print(position, '已爬取结束!!!')
        print('该领导用时:{}分钟{}秒。'.format(crawl_minute, crawl_second))
        driver.quit()
        time.sleep(5)
    except:
        driver.quit()
        get_officer_messages(index, fid)

获取该领导的职位信息并为该领导创建一个独立的csv用于保存提取到的留言信息,增加异常处理递归调用,调用get_message_detail()方法获取每条留言的具体信息并保存,计算出每个领导的执行时间。

8.合并文件

def merge_csv():
    '''将所有文件合并'''
    file_list = os.listdir('.')
    csv_list = []
    for file in file_list:
        if file.endswith('.csv'):
            csv_list.append(file)
    ## 文件存在则删除重新创建
    if os.path.exists('DATA.csv'):
        os.remove('DATA.csv')
    with open('DATA.csv''a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for csv_file in csv_list:
            with open(csv_file, 'r', encoding='gb18030'as csv_f:
                reader = csv.reader(csv_f)
                line_count = 0
                for line in reader:
                    line_count += 1
                    if line_count != 1:
                        writer.writerow(
                            (line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7], line[8],
                             line[9], line[10], line[11], line[12], line[13], line[14], line[15]))

将爬取的所有领导的数据进行合并。

9.主函数调用

多线程的实现主要在这部分,有2种方式实现:

  • 通过for循环遍历所有任务和参数,并调用apply_async()函数将任务加入进程池
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    ## 创建进程池
    pool = Pool(3)
    ## 将任务加入进程池并传入参数
    for index, fid in zip(range(1, len(fids) + 1), fids):
        pool.apply_async(get_officer_messages, (index, fid))
    pool.close()
    pool.join()
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()
  • 通过调用map()函数将任务加入进程池并实现函数与参数的映射
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    ## 处理传入的参数,使之对应索引合并并且可迭代
    itera_merge = list(zip(range(1, len(fids) + 1), fids))
    ## 创建进程池
    pool = Pool(3)
    ## 将任务传入进程池并通过映射传入参数
    pool.map(get_officer_messages_enc, itera_merge)
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()

主函数中先通过多进程获取领导所有留言,再合并所有数据文件,完成整个爬取过程,并统计整个程序的运行时间,便于分析运行效率。

三、结果、分析及说明

1.结果说明

2个完整代码和测试执行结果可点击https://download.csdn.net/download/CUFEECR/12200752下载,欢迎进行测试和交流学习,请勿滥用
整个执行过程相比于单线程大大缩短了时间,我选择了10个领导进行测试,它们的留言数量有差异,以便于发现多线程的优势,在云服务器中的运行结果分别如下运行时间缩短到不到100分钟,与单进程相比大大缩短了时间、提高了效率,因为同一时刻有3个子进程执行。加入多进程之后,可以让运行时间较长和较短的相互补充,在任意时刻多个进程同时运行。但是也可以看出来与多线程相比,多进程的运行时间相对稍长,虽然差别不大,但是这可能就是性能的瓶颈。可能的原因是进程需要的资源更多,这对内存、CPU和网络的要求更高,从而对设备提出了更高的要求,有时可能设备性能跟不上程序要求,从而降低了效率。
最后得到了合并的DATA.csv:对于多线程和多进程的简单对比分析如下:
一个线程至少有一个进程,一个进程至少有一个线程,线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高,进程在执行过程中拥有独立的存储单元,而多个线程共享存储器,从而极大的提高了程序的运行效率。线程不能够独立执行,必须依存在进程中。

多线程:

  • 线程执行开销小(占用的资源非常少)但是不利于资源的管理和保护;
  • 如果需要共享数据,建议使用线程;
  • 适用于IO密集型任务(Web和文档读写等),遇到IO阻塞,速度远远小于CPU的运行速度,可以多开辟一些线程,有线程阻塞了,其他线程依然正常工作。

多进程:

  • 执行额开销比较大(占用的资源多),但是利于资源的管理和保护;
  • 适用于计算密集型(视频的译码编码和科学数据计算等)。

显然,在爬虫中应该偏向使用多线程。

2.改进分析

(1)该版本的代码仍未实现自动爬取所有的fid,需要手动保存,是其中一点不足,可以在后期改进。
(2)爬取留言详情页仍然采用的selenium模拟,会降低请求效率,可以考虑用requests库请求。
(3)该版本对于反爬的措施较弱,因此很多时候会出现异常,比如得到的页面不正常找不到对应的元素,请求时间延长等,可以在之后的版本加入进一步的防反爬措施,进一步增加代码的健壮性。

3.合法性说明

  • 本项目是为了学习和科研的目的,所有读者可以参考执行思路和程序代码,但不能用于恶意和非法目的(恶意攻击网站服务器、非法盈利等),如有违者请自行负责。
  • 本项目所获取的数据都是在进一步的分析之后用于对电子政务的实施改进,对政府的决策能起到一定的参考作用,并非于恶意抓取数据来攫取不正当竞争的优势,也未用于商业目的牟取不法利益,运行代码只是用几个fid进行测试,并非大范围地爬取,同时严格控制爬取的速率、争取不对服务器造成压力,如侵犯当事者(即被抓取的网络主体)的利益,请联系更改或删除。
  • 本项目是留言板爬取系列的第二篇,后期会继续更新,欢迎读者交流,以期不断改进。

作者:Corley

源自:快学python

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Prometheus + Granafa 40分钟构建MySQL监控平台实战教程

Prometheus + Granafa 概述

对于MySQL的监控平台,相信大家实现起来有很多了:基于天兔的监控,还有基于zabbix相关的二次开发。相信很多同行都应该已经开始玩起来了。我这边的选型是Prometheus + Granafa的实现方式。简而言之就是我现在的生产环境使用的是prometheus,还有就是granafa满足的我的日常工作需要。在入门的简介和安装,大家可以参考这里:

https://blog.51cto.com/cloumn/detail/77

1、首先看下我们的监控效果、mysql主从

构建高大上的MySQL监控平台

2、mysql状态:

构建高大上的MySQL监控平台

构建高大上的MySQL监控平台

3、缓冲池状态:

构建高大上的MySQL监控平台

exporter 相关部署实战教程

1、安装exporter

    [root@controller2 opt]# https://github.com/prometheus/mysqld_exporter/releases/download/v0.10.0/mysqld_exporter-0.10.0.linux-amd64.tar.gz
    [root@controller2 opt]# tar -xf mysqld_exporter-0.10.0.linux-amd64.tar.gz 

2、添加mysql 账户:

    GRANT SELECT, PROCESS, SUPER, REPLICATION CLIENT, RELOAD ON *.* TO 'exporter'@'%' IDENTIFIED BY 'localhost';
    flush privileges;

3、编辑配置文件:

    [root@controller2 mysqld_exporter-0.10.0.linux-amd64]# cat /opt/mysqld_exporter-0.10.0.linux-amd64/.my.cnf 
    [client]
    user=exporter
    password=123456

4、设置配置文件:

    [root@controller2 mysqld_exporter-0.10.0.linux-amd64]# cat /etc/systemd/system/mysql_exporter.service 
    [Unit]
    Description=mysql Monitoring System
    Documentation=mysql Monitoring System

    [Service]
    ExecStart=/opt/mysqld_exporter-0.10.0.linux-amd64/mysqld_exporter \
             -collect.info_schema.processlist \
             -collect.info_schema.innodb_tablespaces \
             -collect.info_schema.innodb_metrics  \
             -collect.perf_schema.tableiowaits \
             -collect.perf_schema.indexiowaits \
             -collect.perf_schema.tablelocks \
             -collect.engine_innodb_status \
             -collect.perf_schema.file_events \
             -collect.info_schema.processlist \
             -collect.binlog_size \
             -collect.info_schema.clientstats \
             -collect.perf_schema.eventswaits \
             -config.my-cnf=/opt/mysqld_exporter-0.10.0.linux-amd64/.my.cnf

    [Install]
    WantedBy=multi-user.target

5、添加配置到prometheus server

      - job_name: 'mysql'
        static_configs:
         - targets: ['192.168.1.11:9104','192.168.1.12:9104']

6、测试看有没有返回数值:

http://192.168.1.12:9104/metrics

正常我们通过mysql_up可以查询倒mysql监控是否已经生效,是否起起来

    #HELP mysql_up Whether the MySQL server is up.
    #TYPE mysql_up gauge
    mysql_up 1

监控相关指标

在做任何一个东西监控的时候,我们要时刻明白我们要监控的是什么,指标是啥才能更好的去监控我们的服务,在mysql里面我们通常可以通过一下指标去衡量mysql的运行情况:mysql主从运行情况、查询吞吐量、慢查询情况、连接数情况、缓冲池使用情况以及查询执行性能等。

主从复制运行指标:

1、主从复制线程监控:

大部分情况下,很多企业使用的都是主从复制的环境,监控两个线程是非常重要的,在mysql里面我们通常是通过命令:

    MariaDB [(none)]> show slave status\G;
    *************************** 1. row ***************************
                   Slave_IO_State: Waiting for master to send event
                      Master_Host: 172.16.1.1
                      Master_User: repl
                      Master_Port: 3306
                    Connect_Retry: 60
                  Master_Log_File: mysql-bin.000045
              Read_Master_Log_Pos: 72904854
                   Relay_Log_File: mariadb-relay-bin.000127
                    Relay_Log_Pos: 72905142
            Relay_Master_Log_File: mysql-bin.000045
                 Slave_IO_Running: Yes
                Slave_SQL_Running: Yes

Slave_IO_Running、Slave_SQL_Running两个线程正常那么说明我们的复制集群是健康状态的。

MySQLD Exporter中返回的样本数据中通过mysql_slave_status_slave_sql_running来获取主从集群的健康状况。

    # HELP mysql_slave_status_slave_sql_running Generic metric from SHOW SLAVE STATUS.
    # TYPE mysql_slave_status_slave_sql_running untyped
    mysql_slave_status_slave_sql_running{channel_name="",connection_name="",master_host="172.16.1.1",master_uuid=""} 1

2、主从复制落后时间:

在使用show slave status
里面还有一个关键的参数Seconds_Behind_Master。Seconds_Behind_Master表示slave上SQL thread与IO thread之间的延迟,我们都知道在MySQL的复制环境中,slave先从master上将binlog拉取到本地(通过IO thread),然后通过SQL
thread将binlog重放,而Seconds_Behind_Master表示本地relaylog中未被执行完的那部分的差值。所以如果slave拉取到本地的relaylog(实际上就是binlog,只是在slave上习惯称呼relaylog而已)都执行完,此时通过show slave status看到的会是0

Seconds_Behind_Master: 0

MySQLD Exporter中返回的样本数据中通过mysql_slave_status_seconds_behind_master 来获取相关状态。

    # HELP mysql_slave_status_seconds_behind_master Generic metric from SHOW SLAVE STATUS.
    # TYPE mysql_slave_status_seconds_behind_master untyped
    mysql_slave_status_seconds_behind_master{channel_name="",connection_name="",master_host="172.16.1.1",master_uuid=""} 0

查询吞吐量:

说到吞吐量,那么我们如何从那方面来衡量呢? 
通常来说我们可以根据mysql 的插入、查询、删除、更新等操作来

为了获取吞吐量,MySQL 有一个名为 Questions 的内部计数器(根据 MySQL
用语,这是一个服务器状态变量),客户端每发送一个查询语句,其值就会加一。由 Questions 指标带来的以客户端为中心的视角常常比相关的Queries
计数器更容易解释。作为存储程序的一部分,后者也会计算已执行语句的数量,以及诸如PREPARE 和 DEALLOCATE PREPARE
指令运行的次数,作为服务器端预处理语句的一部分。可以通过命令来查询:

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Questions";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    |
 Questions     | 15071 |
    +---------------+-------+

MySQLD Exporter中返回的样本数据中通过mysql_global_status_questions反映当前Questions计数器的大小:

    # HELP mysql_global_status_questions Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_questions untyped
    mysql_global_status_questions 13253

当然由于prometheus
具有非常丰富的查询语言,我们可以通过这个累加的计数器来查询某一短时间内的查询增长率情况,可以做相关的阈值告警处理、例如一下查询2分钟时间内的查询情况:

rate(mysql_global_status_questions[2m])

当然上面是总量,我们可以分别从监控读、写指令的分解情况,从而更好地理解数据库的工作负载、找到可能的瓶颈。通常,通常,读取查询会由 Com_select
指标抓取,而写入查询则可能增加三个状态变量中某一个的值,这取决于具体的指令:

Writes = Com_insert + Com_update + Com_delete

下面我们通过命令获取插入的情况:

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Com_insert";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    |
 Com_insert    | 10578 |
    +---------------+-------+

从MySQLD
Exporter的/metrics返回的监控样本中,可以通过global_status_commands_total获取当前实例各类指令执行的次数:

    # HELP mysql_global_status_commands_total Total number of executed MySQL commands.
    # TYPE mysql_global_status_commands_total counter
    mysql_global_status_commands_total{command="create_trigger"} 0
    mysql_global_status_commands_total{command="create_udf"} 0
    mysql_global_status_commands_total{command="create_user"} 1
    mysql_global_status_commands_total{command="create_view"} 0
    mysql_global_status_commands_total{command="dealloc_sql"} 0
    mysql_global_status_commands_total{command="delete"} 3369
    mysql_global_status_commands_total{command="delete_multi"} 0

慢查询性能

查询性能方面,慢查询也是查询告警的一个重要的指标。MySQL还提供了一个Slow_queries的计数器,当查询的执行时间超过long_query_time的值后,计数器就会+1,其默认值为10秒,可以通过以下指令在MySQL中查询当前long_query_time的设置:

    MariaDB [(none)]> SHOW VARIABLES LIKE 'long_query_time';
    +-----------------+-----------+
    | Variable_name   | Value     |
    +-----------------+-----------+
    |
 long_query_time | 10.000000 |
    +-----------------+-----------+
    1 row in set (0.00 sec)

当然我们也可以修改时间

    MariaDB [(none)]> SET GLOBAL long_query_time = 5;
    Query OK, 0 rows affected (0.00 sec)

然后我们而已通过sql语言查询MySQL实例中Slow_queries的数量:

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Slow_queries";
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    |
 Slow_queries  | 0     |
    +---------------+-------+
    1 row in set (0.00 sec)

MySQLD
Exporter返回的样本数据中,通过mysql_global_status_slow_queries指标展示当前的Slow_queries的值:

    # HELP mysql_global_status_slow_queries Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_slow_queries untyped
    mysql_global_status_slow_queries 0

同样的,更具根据Prometheus 慢查询语句我们也可以查询倒他某段时间内的增长率:

rate(mysql_global_status_slow_queries[5m])

连接数监控

监控客户端连接情况相当重要,因为一旦可用连接耗尽,新的客户端连接就会遭到拒绝。MySQL 默认的连接数限制为 151。

    MariaDB [(none)]> SHOW VARIABLES LIKE 'max_connections';
    +-----------------+-------+
    | Variable_name   | Value |
    +-----------------+-------+
    |
 max_connections | 151   |
    +-----------------+-------+

当然我们可以修改配置文件的形式来增加这个数值。与之对应的就是当前连接数量,当我们当前连接出来超过系统设置的最大值之后常会出现我们看到的Too many
connections(连接数过多),下面我查找一下当前连接数:

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Threads_connected";
    +-------------------+-------+
    | Variable_name     | Value |
    +-------------------+-------+
    |
 Threads_connected | 41     |
    +-------------------+-------

当然mysql 还提供Threads_running 这个指标,帮助你分隔在任意时间正在积极处理查询的线程与那些虽然可用但是闲置的连接。

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Threads_running";
    +-----------------+-------+
    | Variable_name   | Value |
    +-----------------+-------+
    |
 Threads_running | 10     |
    +-----------------+-------+

如果服务器真的达到 max_connections
限制,它就会开始拒绝新的连接。在这种情况下,Connection_errors_max_connections
指标就会开始增加,同时,追踪所有失败连接尝试的Aborted_connects 指标也会开始增加。

MySQLD Exporter返回的样本数据中:

    # HELP mysql_global_variables_max_connections Generic gauge metric from SHOW GLOBAL VARIABLES.
    # TYPE mysql_global_variables_max_connections gauge
    mysql_global_variables_max_connections 151         

表示最大连接数

    # HELP mysql_global_status_threads_connected Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_threads_connected untyped
    mysql_global_status_threads_connected 41

表示当前的连接数

    # HELP mysql_global_status_threads_running Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_threads_running untyped
    mysql_global_status_threads_running 1

表示当前活跃的连接数

    # HELP mysql_global_status_aborted_connects Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_aborted_connects untyped
    mysql_global_status_aborted_connects 31

累计所有的连接数

    # HELP mysql_global_status_connection_errors_total Total number of MySQL connection errors.
    # TYPE mysql_global_status_connection_errors_total counter
    mysql_global_status_connection_errors_total{error="internal"} 0
    #服务器内部引起的错误、如内存硬盘等
    mysql_global_status_connection_errors_total{error="max_connections"} 0
    #超出连接处引起的错误

当然根据prom表达式,我们可以查询当前剩余可用的连接数:

mysql_global_variables_max_connections - mysql_global_status_threads_connected

查询mysq拒绝连接数

mysql_global_status_aborted_connects

缓冲池情况:

MySQL 默认的存储引擎 InnoDB
使用了一片称为缓冲池的内存区域,用于缓存数据表与索引的数据。缓冲池指标属于资源指标,而非工作指标,前者更多地用于调查(而非检测)性能问题。如果数据库性能开始下滑,而磁盘
I/O 在不断攀升,扩大缓冲池往往能带来性能回升。 
默认设置下,缓冲池的大小通常相对较小,为 128MiB。不过,MySQL 建议可将其扩大至专用数据库服务器物理内存的 80% 大小。我们可以查看一下:

    MariaDB [(none)]> show global variables like 'innodb_buffer_pool_size';
    +-------------------------+-----------+
    | Variable_name           | Value     |
    +-------------------------+-----------+
    |
 innodb_buffer_pool_size | 134217728 |
    +-------------------------+-----------+

MySQLD Exporter返回的样本数据中,使用mysql_global_variables_innodb_buffer_pool_size来表示。

    # HELP mysql_global_variables_innodb_buffer_pool_size Generic gauge metric from SHOW GLOBAL VARIABLES.
    # TYPE mysql_global_variables_innodb_buffer_pool_size gauge
    mysql_global_variables_innodb_buffer_pool_size 1.34217728e+08

    Innodb_buffer_pool_read_requests记录了正常从缓冲池读取数据的请求数量。可以通过以下指令查看

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Innodb_buffer_pool_read_requests";
    +----------------------------------+-------------+
    | Variable_name                    | Value       |
    +----------------------------------+-------------+
    |
 Innodb_buffer_pool_read_requests | 38465 |
    +----------------------------------+-------------+

MySQLD
Exporter返回的样本数据中,使用mysql_global_status_innodb_buffer_pool_read_requests来表示。

    # HELP mysql_global_status_innodb_buffer_pool_read_requests Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_innodb_buffer_pool_read_requests untyped
    mysql_global_status_innodb_buffer_pool_read_requests 2.7711547168e+10

当缓冲池无法满足时,MySQL只能从磁盘中读取数据。Innodb_buffer_pool_reads即记录了从磁盘读取数据的请求数量。通常来说从内存中读取数据的速度要比从磁盘中读取快很多,因此,如果Innodb_buffer_pool_reads的值开始增加,可能意味着数据库的性能有问题。
可以通过以下只能查看Innodb_buffer_pool_reads的数量

    MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Innodb_buffer_pool_reads";
    +--------------------------+-------+
    | Variable_name            | Value |
    +--------------------------+-------+
    |
 Innodb_buffer_pool_reads | 138  |
    +--------------------------+-------+
    1 row in set (0.00 sec)

MySQLD
Exporter返回的样本数据中,使用mysql_global_status_innodb_buffer_pool_read_requests来表示。

    # HELP mysql_global_status_innodb_buffer_pool_reads Generic metric from SHOW GLOBAL STATUS.
    # TYPE mysql_global_status_innodb_buffer_pool_reads untyped
    mysql_global_status_innodb_buffer_pool_reads 138

通过以上监控指标,以及实际监控的场景,我们可以利用PromQL快速建立多个监控项。可以查看两分钟内读取磁盘的增长率的增长率:

rate(mysql_global_status_innodb_buffer_pool_reads[2m])

官方模板ID

上面是我们简单列举的一些指标,下面我们使用granafa给 MySQLD_Exporter添加监控图表:

  • 主从主群监控(模板7371):

  • 相关mysql 状态监控7362:

  • 缓冲池状态7365:

  • 简单的告警规则

除了相关模板之外,没有告警规则那么我们的监控就是不完美的,下面列一下我们的监控告警规则

    groups:
    - name: MySQL-rules
      rules:
      - alert: MySQL Status 
        expr: up == 0
        for: 5s 
        labels:
          severity: warning
        annotations:
          summary: "{{$labels.instance}}: MySQL has stop !!!"
          description: "检测MySQL数据库运行状态"

      - alert: MySQL Slave IO Thread Status
        expr: mysql_slave_status_slave_io_running == 0
        for: 5s 
        labels:
          severity: warning
        annotations: 
          summary: "{{$labels.instance}}: MySQL Slave IO Thread has stop !!!"
          description: "检测MySQL主从IO线程运行状态"

      - alert: MySQL Slave SQL Thread Status 
        expr: mysql_slave_status_slave_sql_running == 0
        for: 5s 
        labels:
          severity: warning
        annotations: 
          summary: "{{$labels.instance}}: MySQL Slave SQL Thread has stop !!!"
          description: "检测MySQL主从SQL线程运行状态"

      - alert: MySQL Slave Delay Status 
        expr: mysql_slave_status_sql_delay == 30
        for: 5s 
        labels:
          severity: warning
        annotations: 
          summary: "{{$labels.instance}}: MySQL Slave Delay has more than 30s !!!"
          description: "检测MySQL主从延时状态"

      - alert: Mysql_Too_Many_Connections
        expr: rate(mysql_global_status_threads_connected[5m]) > 200
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "{{$labels.instance}}: 连接数过多"
          description: "{{$labels.instance}}: 连接数过多,请处理 ,(current value is: {{ $value }})"  

      - alert: Mysql_Too_Many_slow_queries
        expr: rate(mysql_global_status_slow_queries[5m]) > 3
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "{{$labels.instance}}: 慢查询有点多,请检查处理"
          description: "{{$labels.instance}}: Mysql slow_queries is more than 3 per second ,(current value is: {{ $value }})"

2、添加规则到prometheus:

    rule_files:
      - "rules/*.yml" 

3、打开web ui我们可以看到规则生效了:

构建高大上的MySQL监控平台

总结

到处监控mysql的相关状态已经完成,大家可以根据mysql更多的监控指标去完善自己的监控,当然这一套就是我用在线上环境的,可以参考参考。

来源:https://blog.51cto.com/xiaoluoge/2476375
作者:小罗ge11

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pyzbar 两步批量识别快递单号条形码实战教程

这是Python改变生活系列的第三篇,讲到了如何通过Python的pyzbar批量识别快递单号的条形码,以提高我们的生活工作效率,这是一篇实战教程。

1.识别快递单号的前情提要

了解我的小伙伴可能都知道,小五经常给大家送书。最近一年,不算联合抽奖送书,单独我自购+出版社赞助已送出1000本书籍。

如果是自购的话,还需要自己联系快递小哥寄出书籍。

寄出后快递小哥会给我截图来反馈,然而我想要单号的时候就遇到问题了。

每次寄完书,我都只能得到n个截图(内含快递信息)。

为了及时反馈大家物流信息,我需要尽快将快递单号提取出来。

2.思考解决办法

每次大概都有十几到几十张截图,手动去识别真的太麻烦。

不如先看看每张截图大概是什么样子,再去想想批量处理的办法吧。

主要是为了批量获取图片中的快递单号,我想到了两个解决办法:

  1. 用python识别条形码来直接获得准确快递单号

  2. 用python调用ocr,识别截图中的快递单号文字

大家觉得哪个更简单更准确呢?

今天我先聊聊第一种方法的流程和踩坑经历。

 

3.实战教程-遍历图片

首先,第一步需要先获取文件夹中的所有截图,再依次进行条形码识别。

具体操作可以参考注释

import os

def get_jpg():
    jpgs = []
    path = os.getcwd()
    for i in os.listdir(path):  #获取文件列表
        if i.split(".")[-1] == "jpg":  #筛选jpg文件(截图)
            oldname=os.path.join(path,i)  #旧文件名
            i = i.replace('微信图片_','')
            newname=os.path.join(path,i)  #新文件名
            os.rename(oldname,newname)  #改名
            jpgs.append(i)
    return jpgs

上面的代码中除了遍历筛选图片,还涉及了改名的操作。

这是因为我在后面使用 opencv 时,打开的路径只要含有中文就会一直报错,于是我就干脆把截图名称里的中文去除。

执行构建的get_jpg()函数,得到

这些就是演示文件中的四个截图文件,下面开始对他们进行识别。

4.实战教程-识别条形码

python的第三方模块 pyzbar 可以很方便地处理二维码的识别。我们这次用它来识别一维条形码的话,用法也大致一样。不过还要搭配 cv2 使用,主要是为了利用cv2.imread()来读取图片文件。

注意:对于cv2模块,安装时需要输入pip3 install opencv-python,但在导入的时候采用import cv2

识别条形码的具体语句如下所示:

import pyzbar.pyzbar as pyzbar
import cv2

def get_barcode(img):
    image = cv2.imread(img)
    barcodes = pyzbar.decode(image)
    barcode = barcodes[0]
    barcode_data = barcode.data.decode("utf-8")
    return barcode_data

上面构建的get_barcode()函数可以实现识别条形码,并返回结果数据。

我们可以用for循环遍历前文获取的所有图片,再依次使用get_barcode()函数来识别条形码。

data_m =[]
for i in jpgs:
    data = get_barcode(i)
    data_m.append(data)
data_m

可以发现,成功识别了四张截图里的条形码,并获取了对应的快递单号。

小结

回顾今天的问题案例,我先通过思考想出了两种解决办法。第一种的优点是识别条形码比OCR更准确,但是其只获取了快递单号。后续在给获得赠书的同学反馈时,我还需要手动将名字和单号对应,不够偷懒。后续将给大家介绍第二种方法的流程和优缺点。

如果想看更多python改变生活的真实问题案例,给本文右下角点个赞吧👍

如果你也有一直想用python解决的问题,欢迎在评论区告诉我🚀

本文转自快学Python。

我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应红字验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

点击下方阅读原文可获得更好的阅读体验

Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典

强大的Json解析工具 Jsonpath Python 实战教程

 

JsonPath是一种简单的方法来提取给定JSON文档的部分内容。 JsonPath有许多编程语言,如Javascript,Python和PHP,Java。JsonPath提供的json解析非常强大,它提供了类似正则表达式的语法,基本上可以满足所有你想要获得的json内容。

本文介绍了Json相关的基础知识,引入XML和Jsonpath的对比,说明Jsonpath出现的必要性,并在文末附上了 Jsonpath 实战教程。

1.关于JSON

JSON是一个标记符序列。这套标记符包括:构造字符、字符串、数字和三个字面值

构造字符

JSON包括六个构造字符,分别是:左方括号、右方括号、左大括号、右大括号、冒号与逗号。

JSON值

JSON值可以是对象、数组、数字、字符串或者三个字面值(false、true、null),并且字面值必须是小写英文字母。

对象

对象是由花括号括起来,逗号分割的成员构成,成员是字符串键和上面所说的JSON值构成,例如:

{"name":"jack","age":18,"address":{"country"}}

数组

数组是由方括号括起来的一组数值构成,例如:

[1,2,32,3,6,5,5]

字符串与数字想必就不用我过多叙述吧。

下面我就举例一些合法的JSON格式的数据:

{"a":1,"b":[1.2.3]}
[1,2,"3",{"a":4}]
3.14
"json_data"

2.为什么要使用JSON

JSON是一种轻量级的数据交互格式,它使得人们很容易的进行阅读和编写。同时也方便机器进行解析和生成。适用于进行数据交互的场景,比如网站前台与后台之间的数据交互。

JSON的使用方法

json.loads()

把JSON格式字符串解码转成Python对象,从JSON到Python类型转换表如下:

JSON Python
object dict
array list
string str
number(int) int
number(real) float
true True
false False
null None
  • 将数组转成列表对象
import json


strList = "[1,2,3,3,4]"
print(json.loads(strList))
print(type(json.loads(strList)))

试着运行上面的代码,你会发现已经成功的将strList转换为列表对象。

  • 将对象转换成字典
import json


strDict = '{"city":"上海","name":"jack","age":18}'
print(json.loads(strDict))
print(type(json.loads(strDict)))

试着运行上面的代码,你会发现已经成功的将object转换为dict类型的数据。

json.dumps()

其实这个方法也很好理解,就是将Python类型的对象转换为json字符串。从Python类型向JSON类型转换的对照表如下:

 

python JSON
dict object
list, tuple array
str string
int, float number
True true
False false
None null
  • 将Python列表对象转换为JSON字符串
import json


list_str = [1,2,3,6,5]
print(json.dumps(list_str))
print(type(json.dumps(list_str)))

试着运行上面的代码,你会发现成功的将列表类型转换成了字符串类型。

  • 将Python元组对象转换为JSON字符串
import json


tuple_str = (1,2,3,6,5)
print(json.dumps(tuple_str))
print(type(json.dumps(tuple_str)))

试着运行上面的代码,你会发现成功的将元组类型的数据转换成了字符串。

  • 将Python字典对象转换为JSON字符串
import json 


dict_str = {"name""小明""age":18"city""中国深圳"}
print(json.dumps(dict_str))
print(type(json.dumps(dict_str)))

输出结果:

{"name""\u5c0f\u660e""age": 18, "city""\u4e2d\u56fd\u6df1\u5733"}
<class 'str'>

看到上面的输出结果也许你会有点疑惑,其实不需要疑惑,这是ASCII编码方式造成的,因为**json.dumps()**做序列化操作时默认使用的就是ASCII编码,因此我们可以这样写:

import json


dict_str = {"name""小明""age":18"city""中国深圳"}
print(json.dumps(dict_str, ensure_ascii=False))
print(type(json.dumps(dict_str)))

输出结果:

{"name""小明""age": 18, "city""中国深圳"}
<class 'str'>

因为ensure_ascii的默认值是True,因此我们可以添加参数ensure_ascii将它的默认值改成False,这样编码方式就会更改为utf-8了。

json.load()

该方法的主要作用是将文件中JSON形式的字符串转换为Python类型。

具体代码示例如下:

import json

str_list = json.load(open('position.json', encoding='utf-8'))
print(str_dict)
print(type(str_dict))

运行上面的代码,你会发现成功的将字符串类型的JSON数据转换为了dict类型。

代码中的文件position.json我也会分享给大家。

  • json.dump()

将Python内置类型序列化为JSON对象后写入文件。具体代码示例如下所示:

import json

list_str = [{'city':'深圳'}, {'name''小明'},{'age':18}]
dict_str = {'city':'深圳','name':'小明','age':18}

json.dump(list_str, open('listStr.json''w'), ensure_ascii=False)
json.dump(list_str, open('dictStr.json''w'), ensure_ascii=False)

3.jsonpath

XML的优点是提供了大量的工具来分析、转换和有选择地从XML文档中提取数据。Xpath是这些功能强大的工具之一。

对于JSON数据来说,也出现了jsonpath这样的工具来解决这些问题:

  • 数据可以通过交互方式从客户端上的JSON结构提取,不需要特殊的脚本。
  • 客户端请求的JSON数据可以减少到服务器的上的相关部分,从而大幅度减少服务器响应的带宽使用。

jsonpath表达式始终引用JSON结构的方式与Xpath表达式与XML文档使用的方式相同。

jsonpath的安装方法

pip install jsonpath

jsonpath与Xpath

下面表格是jsonpath语法与Xpath的完整概述和比较。

Xpath jsonpath 概述
/ $ 根节点
. @ 当前节点
/ .or[] 取子节点
* * 匹配所有节点
[] [] 迭代器标识(如数组下标,根据内容选值)
// 不管在任何位置,选取符合条件的节点
n/a [,] 支持迭代器中多选
n/a ?() 支持过滤操作
n/a () 支持表达式计算

下面我们就通过几个示例来学习jsonxpath的使用方法。

我们先来看下面这段json数据

"store": {
    "book": [
      { "category""reference",
        "author""Nigel Rees",
        "title""Sayings of the Century",
        "price"8.95
      },
      { "category""fiction",
        "author""Evelyn Waugh",
        "title""Sword of Honour",
        "price"12.99
      },
      { "category""fiction",
        "author""Herman Melville",
        "title""Moby Dick",
        "isbn""0-553-21311-3",
        "price"8.99
      },
      { "category""fiction",
        "author""J. R. R. Tolkien",
        "title""The Lord of the Rings",
        "isbn""0-395-19395-8",
        "price"22.99
      }
    ],
    "bicycle": {
      "color""red",
      "price"19.95
    }
  }
}

获取符合条件的节点

假如我需要获取到作者的名称该怎么样写呢?

如果通过Python的字典方法来获取是非常麻烦的,所以在这里我们可以选择使用jsonpath.。

具体代码示例如下所示:

import jsonpath


author = jsonpath.jsonpath(data_json, '$.store.book[*].author')
print(author)

运行上面的代码你会发现,成功的获取到了所有的作者名称,并保存在列表中。

或者还可以这样写:

import jsonpath

author = jsonpath.jsonpath(data_json, '$..author')
print(author)

使用指定索引

还是使用上面的json数据,假如我现在需要获取第三本书的价格。

third_book_price = jsonpath.jsonpath(data_json, '$.store.book[2].price')
print(third_book_price)

运行上面的代码,你会发现成功的获取到了第三本书的价格。

使用过滤器

isbn_book = jsonpath.jsonpath(data_json, '$..book[?(@.isbn)]')
print(isbn_book)
print(type(isbn_book))

通过运行上面的代码,你会发现,成功的将含有isbn编号的书籍过滤出来了。

同样的道理,根据上面的例子,我们也可以将价格小于10元的书过滤出来。

book = jsonpath.jsonpath(data_json, '$..book[?(@.price<10)]')
print(book)
print(type(book))

通过运行上面的代码,你会发现这里已经成功的将价格小于10元的书提取出来了。

jsonpath其实是非常适合用来获取json格式的数据的一款工具,最重要的是这款工具轻量简单容使用。关于jsonpath的介绍到这里就结束了,下面我们就进入实战演练吧!

4.Jsonpath 实战教程

前言

每年的6月份都是高校学生的毕业季,作为计算机专业的你来说,如果刚刚毕业就可以进入大厂,想必是一个非常不错的选择。因此,今天我带来的项目就是爬取腾讯招聘的网站,获取职位名称、职位类别、工作地点、工作国家、职位的更新时间、职位描述

爬取内容一共有329页,在前329页的职位都是在这个月发布的,还是比较新,对大家来说更有参考的价值。

网页链接:https://careers.tencent.com/search.html

准备

工欲善其事,必现利其器。首先我们要准备好几个库:pandas、requests、jsonpath

如果没有安装,请参考下面的安装过程:

pip install requests
pip install pandas
pip install jsonpath

需求分析与功能实现

获取所有的职位信息

对网页进行分析的时候,我发现想从网页上直接获取信息是是做不到的,该网页的响应信息如下所示:

<!DOCTYPE html><html><head><meta charset=utf-8><meta http-equiv=X-UA-Compatible content="IE=edge"><meta name=viewport content="initial-scale=1,maximum-scale=1,user-scalable=no"><meta name=keywords content=""><meta name=description content=""><meta name=apple-mobile-web-app-capable content=no><meta name=format-detection content="telephone=no"><title>搜索 | 腾讯招聘</title><link rel=stylesheet href=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/css/main.css><link rel=stylesheet href=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/css/jquery-ui.min.css></head><body><div id=app></div><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/careersmlr/HeadFoot_zh-cn.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/careersmlr/HostMsg_zh-cn.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/careersmlr/Search_zh-cn.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/vendor/config.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/vendor/jquery.min.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/vendor/jquery.ellipsis.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/vendor/report.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/vendor/qrcode.min.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/manifest.build.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/vendor.build.js></script><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/p_zh-cn_search.build.js></script></body><script type=text/javascript src=https://cdn.multilingualres.hr.tencent.com/tencentcareer/static/js/vendor/common.js></script></html>

因此我判断,这个是动态Ajax加载的数据,因此就要去网页控制器上查找职位数据是否存在。

经过一番查找,果然发现是动态加载的数据,信息如下所示:

格式化之后的数据如下所示:

{
    "Code":200,
    "Data":{
        "Count":8500,
        "Posts":[
            {
                "Id":0,
                "PostId":"1346716678288842752",
                "RecruitPostId":71330,
                "RecruitPostName":"41071-腾讯会议项目经理(西安)(CSIG全资子公司)",
                "CountryName":"中国",
                "LocationName":"西安",
                "BGName":"CSIG",
                "ProductName":"腾讯云",
                "CategoryName":"产品",
                "Responsibility":"1、负责研发项目及研发效能的计划制定、进度驱动和跟踪、风险识别以及应对,确保项目按计划完成;
2、负责组织项目各项评审会议及项目例会,制定并推广项目流程规范,确保项目有序进行;
3、负责与项目外部合作伙伴进行沟通,制定流程规范双方合作,并推动合作事宜;
4、及时发现并跟踪解决项目问题,有效管理项目风险。
"
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=1346716678288842752",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1346716729744564224",
                "RecruitPostId":71331,
                "RecruitPostName":"41071-腾讯会议产品策划(平台方向)(CSIG全资子公司)",
                "CountryName":"中国",
                "LocationName":"西安",
                "BGName":"CSIG",
                "ProductName":"腾讯云",
                "CategoryName":"产品",
                "Responsibility":"1、负责腾讯会议企业管理平台的产品策划工作,包括企业运营平台、运维、会控平台和工具的产品设计和迭代优化;
2、协调和推动研发团队完成产品开发、需求落地,并能在需求上线后进行持续数据分析和反馈跟进,不断提升产品竞争力;
3、根据行业场景抽象用户需求,沉淀面向不同类型客户的云端管控平台解决方案;
 "
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=1346716729744564224",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1346062593894129664",
                "RecruitPostId":71199,
                "RecruitPostName":"41071-腾讯会议产品策划(CSIG全资子公司)",
                "CountryName":"中国",
                "LocationName":"西安",
                "BGName":"CSIG",
                "ProductName":"腾讯云",
                "CategoryName":"产品",
                "Responsibility":"负责腾讯会议的产品策划工作:
1、研究海外用户办公习惯及SaaS市场动态,调研海外相关SaaS产品并输出产品调研结论,综合市场情况和用户需求输出高质量的产品需求或解决方案;
2、负责腾讯会议各产品线的英文版的功能同步和产品设计工作,把关产品功能同步和国际版需求改造等;
3、协调和推动研发团队完成产品开发、需求落地,并能在需求上线后进行持续数据分析和反馈跟进,不断提升产品竞争力; "
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=1346062593894129664",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1352161575309418496",
                "RecruitPostId":72134,
                "RecruitPostName":"CSIG16-推荐算法高级工程师",
                "CountryName":"中国",
                "LocationName":"北京",
                "BGName":"CSIG",
                "ProductName":"",
                "CategoryName":"技术",
                "Responsibility":"1. 参与地图场景下推荐算法优化,持续提升转化效果和用户体验;
2. 负责地图场景下推荐引擎架构设计和开发工作;
3. 跟进业界推荐领域最新进展,并推动其在地图场景下落地。"
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=0",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1352158432852975616",
                "RecruitPostId":72133,
                "RecruitPostName":"41071-腾讯云SDK 终端研发工程师(CSIG全资子公司)",
                "CountryName":"中国",
                "LocationName":"西安",
                "BGName":"CSIG",
                "ProductName":"",
                "CategoryName":"技术",
                "Responsibility":"1. 负责腾讯云 GME SDK(游戏多媒体引擎)的开发和优化工作,并配套开发相应的场景解决方案业务流程,以满足不同场景和不同行业的客户需求; 
2. 全流程参与客户需求咨询、需求评估、方案设计、方案编码实施及交付工作; 
3. 负责优化腾讯云GME产品易用性,并跟踪客户的接入成本、完善服务体系,解决客户使用产品服务和解决方案过程中的技术问题,不断完善问题处理机制和流程。"
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=0",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1352155053116366848",
                "RecruitPostId":72131,
                "RecruitPostName":"40931-智慧交通数据平台前端开发工程师(北京)",
                "CountryName":"中国",
                "LocationName":"北京",
                "BGName":"CSIG",
                "ProductName":"",
                "CategoryName":"技术",
                "Responsibility":"负责腾讯智慧交通领域的平台前端开发工作;
负责规划与制定前端整体发展计划与基础建设;
负责完成前端基础架构设计与组件抽象。"
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=0",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1306860769169645568",
                "RecruitPostId":66367,
                "RecruitPostName":"35566-HRBP(腾讯全资子公司)",
                "CountryName":"中国",
                "LocationName":"武汉",
                "BGName":"CSIG",
                "ProductName":"",
                "CategoryName":"人力资源",
                "Responsibility":"负责区域研发公司的HR政策、制度、体系与重点项目在部门内部的落地与推动执行;
深入了解所负责领域业务与人员发展状况,评估并明确组织与人才发展对HR的需求;
驱动平台资源提供HR解决方案,并整合内部资源推动执行;提升管理干部的人力资源管理能力,关注关键人才融入与培养,确保持续的沟通与反馈;
协助管理层进行人才管理、团队发展、组织氛围建设等,确保公司文化在所属业务领域的落地;
负责所对接部门的人才招聘工作;
"
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=1306860769169645568",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1351353005709991936",
                "RecruitPostId":71981,
                "RecruitPostName":"35566-招聘经理(腾讯云全资子公司)",
                "CountryName":"中国",
                "LocationName":"武汉",
                "BGName":"CSIG",
                "ProductName":"",
                "CategoryName":"人力资源",
                "Responsibility":"1、负责CSIG区域研发公司相关部门的社会招聘及校园招聘工作,制定有效的招聘策略并推动落地执行,保障人才开源、甄选和吸引;
2、负责相关部门人力资源市场分析,有效管理并优化招聘渠道;
3、参与招聘体系化建设,甄选相关优化项目,有效管理及优化招聘渠道。"
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=1351353005709991936",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1351838518279675904",
                "RecruitPostId":72081,
                "RecruitPostName":"35566-雇主品牌经理(腾讯云全资子公司)",
                "CountryName":"中国",
                "LocationName":"武汉",
                "BGName":"CSIG",
                "ProductName":"",
                "CategoryName":"人力资源",
                "Responsibility":"1、负责腾讯云区域研发公司雇主品牌的规划和建设工作,结合业务招聘需求,制定有效的品牌方案;
2、负责讯云区域研发公司的公众号、媒体账号的内容策划、撰写,协调相关资源完成高质量内容输出;
3、负责招聘创意项目的策划和项目统筹,借助各种平台渠道,完成创意内容的传播触达,提升人选对腾讯云区域研发公司的认知和意向度;"
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=1351838518279675904",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            },
            {
                "Id":0,
                "PostId":"1199244591342030848",
                "RecruitPostId":55432,
                "RecruitPostName":"22989-数据库解决方案架构师(北京/上海/深圳)",
                "CountryName":"中国",
                "LocationName":"上海",
                "BGName":"CSIG",
                "ProductName":"",
                "CategoryName":"产品",
                "Responsibility":"支持客户的应用架构设计,了解客户的业务逻辑和应用架构,给出合理的产品方案建议; 
支持客户的数据库方案设计,从运维、成本、流程等角度主导云数据库产品落地; 
梳理客户的核心诉求,提炼为普适性的产品能力,推动研发团队提升产品体验;
根据客户的行业属性,定制行业场景的解决方案,提升云数据库的影响力;"
,
                "LastUpdateTime":"2021年01月21日",
                "PostURL":"http://careers.tencent.com/jobdesc.html?postId=1199244591342030848",
                "SourceID":1,
                "IsCollect":false,
                "IsValid":true
            }
        ]
    }
}

经过对比发现上面的json数据与网页信息是完全相同的。

看到json数据你有没有一丝的惊喜,终于到了可以大显身手的时候了。

你会发现,上面每一个节点的参数都是独立的,不会存在重复,那我们可以这样写:

def get_info(data):
    recruit_post_name = jsonpath.jsonpath(data, '$..RecruitPostName')
    category_name = jsonpath.jsonpath(data, '$..CategoryName')
    country_name= jsonpath.jsonpath(data, '$..CountryName')
    location_name = jsonpath.jsonpath(data, '$.Data.Posts..LocationName')
    responsibility = jsonpath.jsonpath(data, '$..Responsibility')
    responsibility = [i.replace('\n''').replace('\r'''for i in responsibility]
    last_update_time = jsonpath.jsonpath(data, '$..LastUpdateTime')

运行上面的代码,你会发现成功的获取到了每一组数据。

关于翻页

打开网页之后你会发现腾讯的职位信息一共有850页,但是前面的json数据仅仅只有第一页的数据怎么办呢?

不用担心,直接点击第二页看看网络数据有什么变化。

如上图所示,当点击第二页的时候,又加载出来了一个数据,点击进去之后你就会发现,这个数据刚好就是第二页的职位信息。

那接下来就是发现规律的时候了,第一页与第二页保存JSON数据的URL如下所示:

# 第一页
https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1611215870971&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex=1&pageSize=10&language=zh-cn&area=cn

# 第二页
https://careers.tencent.com/tencentcareer/api/post/Query?timestamp=1611217026103&countryId=&cityId=&bgIds=&productId=&categoryId=&parentCategoryId=&attrId=&keyword=&pageIndex=2&pageSize=10&language=zh-cn&area=cn

经过测试发现,可以将URL地址进行简化,简化后的URL如下所示:

# 第一页
https://careers.tencent.com/tencentcareer/api/post/Query?pageIndex=1&pageSize=10

# 第二页
https://careers.tencent.com/tencentcareer/api/post/Query?pageIndex=1&pageSize=10

数据保存

将爬取下来的数据保存至csv文件,核心代码如下所示:

df = pd.DataFrame({
        'country_name': country_name,
        'location_name': location_name,
        'recruit_post_name':recruit_post_name,
        'category_name': category_name,
        'responsibility':responsibility,
        'last_update_time':last_update_time
    })

if __name__ == '__main__':
    tengxun = TengXun()
    df = pd.DataFrame(columns=['country_name''location_name''category_name','recruit_post_name''responsibility''last_update_time'])

    for page in range(1330):
        print(f'正在获取第{page}页')
        url = tengxun.get_url(page)
        data = tengxun.get_json(url)
        time.sleep(0.03)

        df1 = get_info(data)
        df = pd.concat([df, df1])
        df = df.reset_index(drop=True)
    # pprint.pprint(data)

    df.to_csv('../data/腾讯招聘.csv', encoding='utf-8-sig')

最后结果

 

我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应红字验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

点击下方阅读原文可获得更好的阅读体验

Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典

手把手Django+Vue前后端分离入门实战教程

1.前言

众所周知,Django对于网站快速开发非常友好,这得益于框架为我们做了很多事情,让我们只需要做一些简单的配置和逻辑即可把网站的功能开发出来。

但是,在使用Django的过程中,有一个地方一直是比较难受的,那就是使用Django自带的模版,这种通常需要自己利用HTML+CSS+Jquery的方式总感觉是上一个时代的做法,前后端分离无论对于开发效率、多端支持等等都是很有好处的。

所以,本文希望通过一个简单的demo,讲一讲基于Django+Vue的前后端分离开发,将Django作为一个纯后端的服务,为前端Vue页面提供数据支持。

本文采用的django版本号2.2.5,Vue版本2.9.6。

如果看不完可以先收藏关注哈~

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,可以访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器,它有许多的优点:Python 编程的最好搭档—VSCode 详细指南

请选择以下任一种方式输入命令安装依赖
1. Windows 环境 打开 Cmd (开始-运行-CMD)。
2. MacOS 环境 打开 Terminal (command+空格输入Terminal)。
3. 如果你用的是 VSCode编辑器 或 Pycharm,可以直接使用界面下方的Terminal.

pip install django

vue的安装可见:
https://www.runoob.com/vue2/vue-install.html

创建前后端项目:创建一个文件夹,然后命令行创建项目即可,如下图:

命令行进入后端文件夹 book_demo,输入下面命令,浏览器登陆 127.0.0.1:8000 看见欢迎页即成功。

python manage.py runserver

再进入前端文件夹 appfront ,输入下面命令,浏览器登陆 127.0.0.1:8080 看见欢迎页即成功。

npm run dev

上面两个命令也是对应前后端项目的启动命令,后面就直接将过程说成启动前/后端项目。

2.Django+Vue后端实现

为了方便后端的实现,作为django做后端api服务的一种常用插件,django-rest-framework(DRF)提供了许多好用的特性,所以本文demo中也应用一下,命令行输入命令安装:

pip install django-rest-framework

进入book_demo目录,创建一个新的名为books的应用,并在新应用中添加urls.py文件,方便后面的路由配置,命令行输入:

python manage.py startapp books
cd books touch urls.py

现在的目录结构如下:

下面开始做一些简单的配置:

将DRF和books配置到django项目中,打开项目中的 settings.py 文件,添加:

# book_demo/settings.py
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',

    # demo add
    'rest_framework',
    'books',
]

对整个项目的路由进行配置,让访问 api/ 路径时候转到books应用中的urls.py文件配置进行处理。

# book_demo/settings.py
from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path('admin/', admin.site.urls),
    path('api/', include('books.urls')), # demo add
]

下面在books应用中写简单的逻辑,demo只最简单涉及对书本的增删改查。因为这一部分不是本文重点,所以这里只介绍写代码流程和贴代码,对代码不做详细解释:

在models.py文件中写简单数据类Books:

# books/models.py
from django.db import models


class Books(models.Model):
    name = models.CharField(max_length=30)
    author = models.CharField(max_length=30, blank=True, null=True)

在books文件夹中创建serializer.py文件,并写对应序列化器BooksSerializer:

# books/serializer.py
from rest_framework import serializers

from books.models import Books


class BooksSerializer(serializers.ModelSerializer):
    class Meta:
        model = Books
        fields = '__all__'

在views.py文件中写对应的视图集BooksViewSet来处理请求:

# books/views.py
from rest_framework import viewsets

from books.models import Books
from books.serializer import BooksSerializer


class BooksViewSet(viewsets.ModelViewSet):
    queryset = Books.objects.all()
    serializer_class = BooksSerializer

在urls.py文件中写对应的路由映射:

# books/urls.py
from django.urls import path, include
from rest_framework.routers import DefaultRouter

from books import views


router = DefaultRouter()
router.register('books', views.BooksViewSet)

urlpatterns = [
    path('', include(router.urls)),
]

对于books应用中的内容,如果对DRF和Django流程熟悉的同学肯定知道都干了些什么,篇幅关系这里只能简单解释,DRF通过视图集ViewSet的方式让我们对某一个数据类Model可以进行增删改查,而且不同的操作对应于不同的请求方式,比如查看所有books用get方法,添加一本book用post方法等,让整个后端服务是restful的。

如果实在看不懂代码含义,只需知道这样做之后就可以通过不同的网络请求对后端数据库的Books数据进行操作即可,后续可以结合Django和DRF官方文档对代码进行学习,或关注本人未来分享的内容。

到这里,可以运行一下后端项目看看效果,命令行运行:

python manage.py makemigrations
python manage.py migrate
python manage.py runserver

得益于DRF提供的api可视化界面,浏览器访问 127.0.0.1:8000/api/books/ ,如果出现了以下界面并添加数据正常,则说明后端的基本逻辑已经ok了~下图为添加了一条数据之后的效果。

3.Vue前端实现教程

接下来的工作以appfront项目目录为根目录进行,开始写一点前端的展示和表单,希望达到两个目标,一是能从后端请求到所有的books列表,二是能往后端添加一条book数据。说白了就是希望把上面的页面用Vue简单实现一下,然后能达到相同的功能。

对于Vue项目中各个文件的功能这里也不多解释,可以参考其文档系统学习。这里只需要知道欢迎页面中的样式是写在App.vue和components/HelloWorld.vue中即可。

这里直接用HelloWorld.vue进行修改,只求功能不追求页面了~

// appfront/src/components/HelloWorld.vue
<template>
  <div class="hello">
    <h1>{{ msg }}</h1>
    <!-- show books list -->
    <ul>
      <li v-for="(book, index) in books" :key="index" style="display:block">
        {{index}}-{{book.name}}-{{book.author}}
      </li>
    </ul>
    <!-- form to add a book -->
    <form action="">
      输入书名:<input type="text" placeholder="book name" v-model="inputBook.name"><br>
      输入作者:<input type="text" placeholder="book author" v-model="inputBook.author"><br>
    </form>
    <button type="submit" @click="bookSubmit()">submit</button>
  </div>
</template>

<script>
export default {
  name: 'HelloWorld',
  data () {
    return {
      msg: 'Welcome to Your Vue.js App',
      // books list data
      books: [
        {name: 'test', author: 't'},
        {name: 'test2', author: 't2'}
      ],
      // book data in the form
      inputBook: {
        "name": "",
        "author": "",
      }
    }
  },
  methods: {
    loadBooks () {...}, // load books list when visit the page
    bookSubmit () {...} // add a book to backend when click the button
  },
  created: function () {
    this.loadBooks()
  }
}
</script>
...


启动前端项目,浏览器访问127.0.0.1:8080,可以看到刚写的页面已经更新上去了,丑是丑了点,意思到了就行~如下图:

4.前后端联调

敲黑板,重点来了!!上面的页面中数据其实是写死在前端页面的模拟数据,这一节希望通过从后端拿数据并展示在前端页面的方式来完成前后端联调。前后端联调,涉及最多的就是跨域的问题,为了保证安全,通常需要遵循同源策略,即“协议、域名、端口”三者都相同,具体可以看看相关的博客,这里只需知道上述三者相同才算同源即可。

后端部分,对于django的跨域问题,网上比较常用的做法就是利用django-cors-headers模块来解决,这里也不能免俗,操作如下。

先在命令行中进行对应模块的安装:‍‍‍‍‍‍

pip install django-cors-headers

然后在项目中添加该模块:

# books_demo/settings.py
INSTALLED_APPS = [
    ...
    # demo
    'corsheaders',
    ...
]

MIDDLEWARE = [
    'corsheaders.middleware.CorsMiddleware', # 需注意与其他中间件顺序,这里放在最前面即可
    ...
]

# 支持跨域配置开始
CORS_ORIGIN_ALLOW_ALL = True
CORS_ALLOW_CREDENTIALS = True

后端部分告于段落,接下来需要补充一下前端的逻辑,Vue框架现在一般都用axios模块进行网络请求,这里沿用这种方式,下面是在前端项目中操作:

首先命令行安装axios模块,如果没有安装cnpm就还是用npm安装:

cnpm install axios

为了方便管理api请求的各种逻辑,在前端项目的src目录下创建api目录,然后创建api.js和index.js文件。index.js文件是对axios做配置:

// appfront/src/api/index.js
import Vue from 'vue'
import Axios from 'axios'

const axiosInstance = Axios.create({
    withCredentials: true
})

// 通过拦截器处理csrf问题,这里的正则和匹配下标可能需要根据实际情况小改动
axiosInstance.interceptors.request.use((config) => {
    config.headers['X-Requested-With'] = 'XMLHttpRequest'
    const regex = /.*csrftoken=([^;.]*).*$/
    config.headers['X-CSRFToken'] = document.cookie.match(regex) === null ? null : document.cookie.match(regex)[1]
    return config
})

axiosInstance.interceptors.response.use(
    response => {
        return response
    },
    error => {
        return Promise.reject(error)
    }
)

Vue.prototype.axios = axiosInstance

export default axiosInstance

api.js文件是对后端进行请求,可以看到,获取books列表和添加一本book各对应一个请求:

// appfront/src/api/api.js
import axiosInstance from './index'

const axios = axiosInstance

export const getBooks = () => {return axios.get(`http://localhost:8000/api/books/`)}

export const postBook = (bookName, bookAuthor) => {return axios.post(`http://localhost:8000/api/books/`, {'name': bookName, 'author': bookAuthor})}

然后更新HelloWorld.vue中的处理逻辑:

// appfront/src/components/HelloWorld.vue
<script>
import {getBooks, postBook} from '../api/api.js'
export default {
  ...
  methods: {
    loadBooks () {
      getBooks().then(response => {
        this.books = response.data
      })
    }, // load books list when visit the page
    bookSubmit () {
      postBook(this.inputBook.name, this.inputBook.author).then(response => {
        console.log(response)
        this.loadBooks()
      })
    } // add a book to backend when click the button
  },
  ...
}
</script>

至此,一个极其简陋的查询和添加书籍的功能算是完成了~如下图:

可以看到,列表里面的数据是从后端读取到的,同时前端的提交数据库也能有对应的操作,所以前后端至此是打通了。

5.打包

现阶段是前后端分开开发,但是当最后要用的时候,还需要把代码合在一起。

首先对前端项目进行打包,这里用Vue的自动打包:

npm run build

可以看到前端项目中多出了一个dist文件夹,这个就是前端文件的打包结果。需要把dist文件夹复制到books_demo项目文件夹中。

然后对settings.py文件进行相应的修改,其实就是帮django指定模版文件和静态文件的搜索地址:

# books_demo/books_demo/settings.py
...
TEMPLATES = [
    {
        'BACKEND': 'django.template.backends.django.DjangoTemplates',
        'DIRS': [os.path.join(BASE_DIR, 'dist')], # demo add
        ...
    },
]
...
STATICFILES_DIRS = [
    os.path.join(BASE_DIR, 'dist/static'),
]
..

最后在根urls.py文件中配置一下入口html文件的对应路由:

# books_demo/books_demo/urls.py
...
from django.views.generic.base import TemplateView

urlpatterns = [
    ...
    path('', TemplateView.as_view(template_name='index.html'))
]

重新启动项目,这次用浏览器访问 127.0.0.1:8000 ,即django服务的对应端口即可。

可以看到,项目的交互是正常的,符合我们的预期。

总结

本文以一个非常简单的demo为例,介绍了利用django+drf+vue的前后端分离开发模式,基本可以算是手把手入门。有了这个小demo之后,不管是前端页面还是后端功能,都可以做相应的扩展,从而开发出更加复杂使用的网站。

我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应红字验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

点击下方阅读原文可获得更好的阅读体验

Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典

抓包神器 MitmProxy 使用教程 – 结合Python

本文是一个较为完整的 mitmproxy教程,侧重于介绍如何开发拦截脚本,帮助读者能够快速得到一个自定义的代理工具。玩爬虫的小伙伴都知道,抓包工具除了MitmProxy 外,还有 FiddlerCharles以及浏览器 netwrok 

既然都有这么多抓包工具了,为什么还要会用 MitmProxy 呢??主要有以下几点:

  1. 不需要安装软件,直接在线(浏览器)进行抓包(包括手机端和 PC 端)
  2. 配合 Python 脚本抓包改包
  3. 抓包过程的所有数据包都可以自动保留到txt里面,方便过滤分析
  4. 使用相对简单,易上手

本文假设读者有基本的 python 知识,且已经安装好了一个 python 3 开发环境。如果你对 nodejs 的熟悉程度大于对 python,可移步到 anyproxy,anyproxy 的功能与 mitmproxy 基本一致,但使用 js 编写定制脚本。除此之外我就不知道有什么其他类似的工具了,如果你知道,欢迎评论告诉我。

本文基于 mitmproxy v5,当前版本号为 v5.0.1

1.基本介绍

顾名思义,mitmproxy 就是用于 MITM 的 proxy,MITM 即中间人攻击(Man-in-the-middle attack)。用于中间人攻击的代理首先会向正常的代理一样转发请求,保障服务端与客户端的通信,其次,会适时的查、记录其截获的数据,或篡改数据,引发服务端或客户端特定的行为。

不同于 fiddler 或 wireshark 等抓包工具,mitmproxy 不仅可以截获请求帮助开发者查看、分析,更可以通过自定义脚本进行二次开发。举例来说,利用 fiddler 可以过滤出浏览器对某个特定 url 的请求,并查看、分析其数据,但实现不了高度定制化的需求,类似于:“截获对浏览器对该 url 的请求,将返回内容置空,并将真实的返回内容存到某个数据库,出现异常时发出邮件通知”。而对于 mitmproxy,这样的需求可以通过载入自定义 python 脚本轻松实现。

但 mitmproxy 并不会真的对无辜的人发起中间人攻击,由于 mitmproxy 工作在 HTTP 层,而当前 HTTPS 的普及让客户端拥有了检测并规避中间人攻击的能力,所以要让 mitmproxy 能够正常工作,必须要让客户端(APP 或浏览器)主动信任 mitmproxy 的 SSL 证书,或忽略证书异常,这也就意味着 APP 或浏览器是属于开发者本人的——显而易见,这不是在做黑产,而是在做开发或测试。

事实上,以上说的仅是 mitmproxy 以正向代理模式工作的情况,通过调整配置,mitmproxy 还可以作为透明代理、反向代理、上游代理、SOCKS 代理等,但这些工作模式针对 mitmproxy 来说似乎不大常用,故本文仅讨论正向代理模式。

2.特性

  • 拦截HTTP和HTTPS请求和响应并即时修改它们
  • 保存完整的HTTP对话以供以之后重发和分析
  • 重发HTTP对话的客户端
  • 重发先前记录的服务的HTTP响应
  • 反向代理模式将流量转发到指定的服务器
  • 在macOS和Linux上实现透明代理模式
  • 使用Python对HTTP流量进行脚本化修改
  • 实时生成用于拦截的SSL / TLS证书
  • And much, much more…

3.配置 MitmProxy

MitmProxy可以说是客户端,也可以说是一个 python 库

方式一:客户端

  • https://mitmproxy.org/downloads/

在这个地址下可以下载对应的客户端安装即可

方式二:Python库

  • pip install mitmproxy

通过这个 pip 命令可以下载好 MitmProxy,下面将会以 Python 库的使用方式给大家讲解如何使用(推荐方式二

2.启动 MitmProxy

MitmProxy 启动有三个命令(三种模式)

  1. mitmproxy,提供命令行界面

  2. mitmdump,提供一个简单的终端输出(还可以配合Python抓包改包

  3. mitmweb,提供在线浏览器抓包界面

mitmdump启动

  • mitmdump -w d://lyc.txt

这样就启动mitmdump,接着在本地设置代理Ip是本机IP,端口8080

安装证书

访问下面这个链接

  • http://mitm.it/

 

可以选择自己的设备(window,或者Android、Apple设备去)安装证书。

然后随便打开一个网页,比如百度

这里是因为证书问题,提示访问百度提示https证书不安全,那么下面开始解决这个问题,因此就引出了下面的这种启动方式

浏览器代理式启动

哪一个浏览器都可以,下面以Chrome浏览器为例(其他浏览器操作一样)

先找到chrome浏览器位置,我的chrome浏览器位置如下图

通过下面命令启动

  • "C:\Users\Administrator\AppData\Local\Google\Chrome\Application\chrome.exe" --proxy-server=127.0.0.1:8080 --ignore-certificate-errors

proxy-server是设置代理和端口

–ignore-certificate-errors是忽略证书

然后会弹出来Chrome浏览器,接着我们搜索知乎

在cmd中就可以看到数据包

这些文本数据可以在编程中进行相应的操作,比如可以放到python中进行过来监听处理

4.启动 Mitmweb

新开一个cmd(终端)窗口,输入下来命令启动mitmweb

  • mitmweb

之后会在浏览器自动打开一个网页(其实手动打开也可以,地址就是:http://127.0.0.1:8081)

现在页面中什么也没有,那下面我们在刷新一个知乎页面

重点:关闭mitmproxy终端!关闭mitmproxy终端!关闭mitmproxy终端!

如果不改变在mitmweb中获取不到数据,数据只在mitmproxy中,因此需要关闭mitmproxy这个命令终端

刷新知乎页面之后如下:

在刚刚的网页版抓包页面就可以看到数据包了

并且还包括https类型,比如查看其中一个数据包,找到数据是对应的,说明抓包成功。

5.配合 Python 脚本

mitmproxy代理(抓包)工具最强大之处在于对python脚步的支持(可以在python代码中直接处理数据包)

下面开始演示,先新建一个py文件(lyc.py)

from mitmproxy import ctx

# 所有发出的请求数据包都会被这个方法所处理
# 所谓的处理,我们这里只是打印一下一些项;当然可以修改这些项的值直接给这些项赋值即可
def request(flow):
    # 获取请求对象
    request = flow.request
    # 实例化输出类
    info = ctx.log.info
    # 打印请求的url
    info(request.url)
    # 打印请求方法
    info(request.method)
    # 打印host头
    info(request.host)
    # 打印请求端口
    info(str(request.port))
    # 打印所有请求头部
    info(str(request.headers))
    # 打印cookie头
    info(str(request.cookies))
# 所有服务器响应的数据包都会被这个方法处理
# 所谓的处理,我们这里只是打印一下一些项
def response(flow):
    # 获取响应对象
    response = flow.response
    # 实例化输出类
    info = ctx.log.info
    # 打印响应码
    info(str(response.status_code))
    # 打印所有头部
    info(str(response.headers))
    # 打印cookie头部
    info(str(response.cookies))
    # 打印响应报文内容
    info(str(response.text))

在终端中输入一下命令启动

mitmdump.exe -s lyc.py

(PS:这里需要通过另一个端启动浏览器)

"C:\Users\Administrator\AppData\Local\Google\Chrome\Application\chrome.exe" --proxy-server=127.0.0.1:8080 --ignore-certificate-errors

然后访问某个网站

在终端中就可以看到信息

这些信息就是我们在 lyc.py 中指定的显示信息

PS:在手机上配置好代理之后,mitmproxy 同样可以抓取手机端数据

小结:

  1. 不需要安装软件,直接在线(浏览器)进行抓包(包括手机端和 PC 端)
  2. 配合 Python 脚本抓包改包
  3. 抓包过程的所有数据包都可以自动保留到 txt 里面,方便过滤分析
  4. 使用相对简单,易上手
本文转自星安果。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 函数耗时异常自动化监控实战教程

来源:文呓

本文内容包括Python性能可视化分析,逻辑优化,及根据不同的模型动态计算安全阈值,实现各个函数耗时及程序总耗时的自动化监控预警

在做Python性能分析优化的时候,可以借助cProfile生成性能数据文件,通过pstats获取详细耗时分布数据,结合gprof2dot脚本生成函数调用栈结构图做可视化分析,提高性能分析的效率。

接着从具体的耗时分布,先从占用大头的函数分析具体逻辑实现,逐步优化,同时保存pstats函数耗时平均值数据作为后续异常自动化监控的样本数据。

实现耗时自动化监控必须是可以根据算法动态调整安全阈值,而不是人工定死安全阈值范围,这样才可以实现异常监控的自循环和迭代校准。

一、性能数据函数耗时采集及可视化报表生成

1. 性能数据文件保存(cProfile)

首先是性能数据文件的保存,cProfile和profile提供了Python程序的确定性性能分析。profile是一组统计数据,用来描述程序的各个部分执行的频率和时间。在程序开始的时候调用enable开始性能数据采集,结束的时候调用dump_stats停止性能数据采集并保存性能数据到指定路径的文件。

import cProfile
# 程序开始的时候打开数据采集开关
pr = cProfile.Profile()
pr.enable()

# 在程序运行结束的时候dump性能数据到指定路径文件中,profliePath为保存文件的绝对路径参数
pr.dump_stats(profliePath)

2. 详细性能数据读取查看

保存性能数据到文件之后,可以用pstats读取文件中的数据,profile统计数据可以通过pstats模块格式化为报表。

import pstats 
# 读取性能数据 
pS = pstats.Stats(profliePath) 
# 根据函数自身累计耗时做排序 
pS.sort_stats('tottime') 
# 打印所有耗时函数信息 
pS.print_stats()
print_stats()输出示例:
79837 function calls (75565 primitive calls) in 37.311 seconds
Ordered by: internal time
ncalls  tottime  percall  cumtime  percall  filename:lineno(function)
 2050    30.167    0.015   30.167    0.015  {time.sleep}
   16     6.579    0.411    6.579    0.411  {select.select}
    1     0.142    0.142    0.142    0.142  {method 'do_handshake' of '_ssl._SSLSocket' objects}
  434     0.074    0.000    0.074    0.000  {method 'read' of '_ssl._SSLSocket' objects}
    1     0.062    0.062    0.062    0.062  {method 'connect' of '_socket.socket' objects}
   37     0.046    0.001    0.046    0.001  {posix.read}
   14     0.024    0.002    0.024    0.002  {posix.fork}

输出字段说明:

  • ncalls  函数被调用次数(只有一个数字时表示不存在递归,有斜杠分割数字时,后面的数字表示非递归调用的次数)
  • tottime  函数总计运行时间,不包括子函数调用时间
  • percall  函数运行一次的平均时间,等于tottime/ncalls
  • cumtime 函数总计运行时间,包括子函数调用时间
  • percall  函数运行一次的平均时间,等于cumtime/ncalls
  • filename:lineno(function) 函数所在的文件名,函数的行号,函数名或基础框架函数类

如果要获取print_stats()里面各个字段信息可以通过如下方式:

# func————filename:lineno(function)
# cc ———— call count,调用次数 
# nc ———— ncalls
# tt ———— tottime
# ct ———— cumtime
# callers ———— 调用堆栈数组,每项数据包括了func, (cc, nc, tt, ct) 字段
for index in range(len(pS.fcn_list)): 
    func = pS.fcn_list[index] 
    cc, nc, tt, ct, callers = pS.stats[func]  
    print cc, nc, tt, ct, func, callers
    for func, (cc, nc, tt, ct) in callers.iteritems():
        print func,cc, nc, tt, ct

二、生成函数调用栈结构图(gprof2dot)教程

gprof2dot脚本把gprof或callgrind分析获得的信息,转化成一个以DOT语言描述的程序调用有向图对象,再通过Graphviz将DOT有向图对象渲染成图片,这样就可以很直观地看出整个程序的调用栈,包括函数所在的类和行数、耗时占比、函数递归次数、以及被调用的次数。

先从GitHub上下载gprof2dot.py脚本到本地,和执行的程序的脚本文件放在同一目录下,当然要使用这个脚本还需要安装graphviz,使用brew命令安装,若安装过程中遇到异常,根据异常提示执行命令安装需要的工具

brew install graphviz

生成程序函数调用栈结构图的逻辑可以参考如下逻辑实现,具体根据自身需求做下修改。

import os
# 获取当前gprof2dot.py脚本路径
gprof2dotPath = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'gprof2dot.py')
# 函数调用栈结构图保存文件名路径,这边使用生成PNG图片结果
dumpProfPath = profliePath.replace("stats", "png")
dumpCmd = "python %s -f pstats %s | dot -Tpng -o %s" % (gprof2dotPath, profliePath, dumpProfPath)
os.popen(dumpCmd)

三、性能分析及优化实战

在生成函数调用栈结构图之后,就可以很容易的看出各个函数之间的调用关系,每个方块内包括的信息包括函数所在的类和行数耗时百分比被调用次数,如果这个函数存在递归的情况,方块边缘会有一个回旋的箭头标明递归的次数

从结构图里面找到耗时占用较多的部分,分析具体函数的实现逻辑,定位具体耗时的原因,优化的策略如下:

  • 去除多余的逻辑:去除冗余代码
  • 优化递归函数:加日志打印递归时候的各个参数,如果发现很多参数都是重复的,可以加缓存,避免多余的递归消耗。
  • 归并通用逻辑调用:一个函数多次调用同一个子函数获取参数,查看这个子函数的调用是否可以进行整合归并,避免多余的函数调用。
  • 通过上下文环境判断测试程序的初始化是否必要,非必要情况下不进行测试环境的重置操作。

四、耗时异常自动化监控

如果是通过历史的耗时数据计算得到平均值+固定浮动百分比的方式,来配置耗时安全阈值参数实施异常监控存在很大的问题,因为函数执行的耗时容易受设备和运行环境的影响,人工固定浮动百分比的方式维护性差,数据本身不可迭代自循环,总不能每次出现误报问题之后都去手动调整参数。

这边监控的维度包括两方面,一方面是程序各个函数执行耗时的平均值,另一方面是完整程序执行的总耗时,在前期先把这些历史耗时数据保存在数据库中,供后续自动化异常监控的实现提供样本数据。

要实现自动化阈值调整,需要借助常规的模型算法实现,这边只对耗时单个维度的异常做自动化监控实现。

根据原理,无监督异常检测模型一般可分为以下几类:

  • 基于统计和概率模型:主要是对数据的分布做出假设,并找出假设下所定义的“异常”;
  • 线性模型:主要思想是通过线性方法找到合适的低维子空间使得异常点在其中区别于正常点;
  • 基于距离:这种方法认为异常点距离正常点比较远,通过比较数据点之间的距离区分异常点;
  • 基于密度:由于数据分布不均匀,绝对距离无法衡量数据点之间相对远近时,用局部密度表示数据点的异常情况;
  • 基于聚类:将数据点聚类,不属于任何簇、距离最近的簇较远、稀疏聚类里的点认为是异常点;
  • 基于树:通过划分子空间构建树模型寻找异常点。

异常耗时数据是波动的一维数据,这边就直接采用基于统计和概率模型的方式,根据保存的历史数据判断数据是否符合正态分布

若符合正态分布则用 μ+3δ(平均值+3倍标准差)的方式计算得到安全阈值

若不符合正态分布,则用Turkey 箱型图方案 Q+1.5IQR 计算安全阈值。

根据实际测试来看,随着样本数据的增加,会出现前期符合正态分布的函数耗时曲线,随着样本数据的增加会变成不符合正态分布。

Python中用于判断数据是否符合正态分布的代码如下,当pvalue值大于0.05时为正态分布,dataList是耗时数组数据:

from scipy import stats
import numpy
percallMean = numpy.mean(dataList) # 计算均值
# percallVar = numpy.var(dataList) # 求方差
percallStd = numpy.std(dataList) # 计算标准差
kstestResult = stats.kstest(dataList, 'norm', (percallMean, percallStd))
# 当pvalue值大于0.05为正态分布
if kstestResult[1] > 0.05:
    pass

1. 正态分布数据方案

在统计学中,如果一个数据分布近似正态,那么大约 68% 的数据值会在均值的一个标准差范围内,大约 95% 会在两个标准差范围内,大约 99.7% 会在三个标准差范围内。因此,如果任何数据点超过标准差的 3 倍,那么这些点很有可能是异常值或离群点。即正态分布的安全阈值上限为:percallMean + 3 * percallStd

 

2. Turkey 箱型图方案

基于正态分布的 3σ 法则或 Z 分数方法的异常检测是以假定数据服从正态分布为前提的,但实际数据往往并不严格服从正态分布。应用这种方法于非正态分布数据中判断异常值,其有效性是有限的。Tukey 箱型图是一种用于反映原始数据分布的特征常用方法,也可用于异常点识别。在识别异常点时其主要依靠实际数据,因此有其自身的优越性。

箱型图为我们提供了识别异常值的一个标准:异常值被定义为小于 Q1-1.5IQR 或大于 Q+1.5IQR 的值。虽然这种标准有点任意性,但它来源于经验判断,经验表明它在处理需要特别注意的数据方面表现不错。

计算箱型图安全阈值Python实现逻辑如下:

import numpy
percallMean = numpy.mean(dataList)  # 计算均值
boxplotQ1 = numpy.percentile(dataList, 25)
boxplotQ2 = numpy.percentile(dataList, 75)
boxplotIQR = boxplotQ2 - boxplotQ1
upperLimit =  boxplotQ2 + 1.5 * boxplotIQR

在程序实现中就是,在一个程序或用例执行完毕之后,先拿历史数据判断是否符合正态分布,当然历史样本数据至少要达到20个才比较准确,小于20个的时候就继续收集数据,不做异常判断。根据正态分布模型或箱型图模型计算安全阈值参数,判断当前各个函数耗时平均值或用例总耗时是否超过阈值,超过则预警。

高斯模型和箱型图两种方式阈值范围对比

这边给出stats文件数据汇总解析之后,根据相应的模型绘制耗时曲线及阈值或正态曲线及阈值的代码实现,statFolder参数替换成自己stats文件所在文件夹即可。

# coding=utf-8
import os
import pstats
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import traceback
from scipy import stats
import numpy

"""
汇总函数耗时平均值数据
"""
def dataSummary(array, fileName, fcn, percall):
    (funcPath, line, func) = fcn
    exists = False
    for item in array:
        if item["func"] == func and item["funcPath"] == funcPath and item["line"] == line:
            exists = True
            item["cost"].append({
                "percall": percall,
                "fileName": fileName
            })
    if not exists:
        array.append({
            "func": func,
            "funcPath": funcPath,
            "line": line,
            "cost": [{
                "percall": percall,
                "fileName": fileName
            }]
        })

"""
高斯函数计算Y值
"""
def gaussian(x, mu, delta):
    exp = numpy.exp(- numpy.power(x - mu, 2) / (2 * numpy.power(delta, 2)))
    c = 1 / (delta * numpy.sqrt(2 * numpy.pi))
    return c * exp

"""
读取汇总所有stats文件数据
"""
def readStatsFile(statFolder, filterData):
    for path, dir_list, file_list in os.walk(statFolder, "r"):
        for fileName in file_list:
            if fileName.find(".stats") > 0:
                fileAbsolutePath = os.path.join(path, fileName)
                pS = pstats.Stats(fileAbsolutePath)
             # 先对耗时数据从大到小进行排序
                pS.sort_stats('cumtime')
                # pS.print_stats()
                # 统计前100条耗时数据
                for index in range(100):
                    fcn = pS.fcn_list[index]
                    (funcPath, line, func) = fcn
                    # cc ———— call count,调用次数
                    # nc ———— ncalls,调用次数(只有一个数字时表示不存在递归;有斜杠分割数字时,后面的数字表示非递归调用的次数)
                    # tt ———— tottime,函数总计运行时间,除去函数中调用的子函数运行时间
                    # ct ———— cumtime,函数总计运行时间,含调用的子函数运行时间
                    cc, nc, tt, ct, callers = pS.stats[fcn]
                    # print fileName, func, cc, nc, tt, ct, callers
                    percall = ct / nc
                    # 只统计单次函数调用大于1毫秒的数据
                    if percall >= 0.001:
                        dataSummary(filterData, fileName, fcn, percall)

"""
绘制高斯函数曲线和安全阈值
"""
def drawGaussian(func, line, percallMean, threshold, percallList, dumpFolder):
    plt.title(func)
    plt.figure(figsize=(10, 8))
    for delta in [0.2, 0.5, 1]:
        gaussY = []
        gaussX = []
        for item in percallList:
            # 这边为了呈现正态曲线效果,减去平均值
            gaussX.append(item - percallMean)
            y = gaussian(item - percallMean, 0, delta)
            gaussY.append(y)
        plt.plot(gaussX, gaussY, label='sigma={}'.format(delta))
    # 绘制水位线
    plt.plot([threshold - percallMean, threshold - percallMean], [0, 5 * gaussian(percallMean, 0, 1)], color='red',
             linestyle="-", label="Threshold:" + str("%.5f" % threshold))
    plt.xlabel("Time(s)", fontsize=12)
    plt.legend()
    plt.tight_layout()
    # 可能不同类中包含相同的函数名,加上行数参数避免覆盖
    imagePath = dumpFolder + "cost_%s_%s.png" % (func, str(line))
    plt.savefig(imagePath)

"""
绘制耗时曲线和安全阈值
"""
def drawCurve(func, line, percallList, dumpFolder):
    boxplotQ1 = numpy.percentile(percallList, 25)
    boxplotQ2 = numpy.percentile(percallList, 75)
    boxplotIQR = boxplotQ2 - boxplotQ1
    upperLimit = boxplotQ2 + 1.5 * boxplotIQR
    # 不符合正态分布,绘制波动曲线
    timeArray = [i for i in range(len(percallList))]
    plt.title(dataItem["func"])
    plt.figure(figsize=(10, 8))
    # 绘制水位线
    plt.plot([0, len(percallList)], [upperLimit, upperLimit], color='red', linestyle="-",
             label="Threshold:" + str("%.5f" % upperLimit))
    plt.plot(timeArray, percallList, label=dataItem["func"] + "_" + str(dataItem["line"]))
    plt.ylabel("Time(s)", fontsize=12)
    plt.legend()
    plt.tight_layout()
    imagePath = dumpFolder + "cost_%s_%s.png" % (func, str(line))
    plt.savefig(imagePath)

if __name__ == "__main__":
    try:
        statFolder = "/Users/chenwenguan/Downloads/2aab7e17-a1b6-1253/"
        chartFolder = statFolder + "chart/"
        if not os.path.exists(chartFolder):
            os.mkdir(chartFolder)
        filterData = []
        readStatsFile(statFolder, filterData);
        for dataItem in filterData:
            percallList = map(lambda x: x["percall"], dataItem["cost"])
            func = dataItem["func"]
            line = dataItem["line"]
            # 样本个数大于20才进行绘制
            if len(percallList) > 20:
                percallMean = numpy.mean(percallList) # 计算均值
                # percallVar = numpy.var(percallMap) # 求方差
                percallStd = numpy.std(percallList)  # 计算标准差
                # pvalue值大于0.05为正太分布
                kstestResult = stats.kstest(percallList, 'norm', (percallMean, percallStd))
                print "percallStd:%s, pvalue:%s" % (percallStd, kstestResult[1])
                # 符合正态分布绘制分布曲线
                if kstestResult[1] > 0.05:
                    threshold = percallMean + 3 * percallStd
                    drawGaussian(func, line, percallMean, threshold, percallList, chartFolder)
                else:
                    drawCurve(func, line, percallList, chartFolder)
            else:
                pass
    except Exception:
        print 'exeption:' + traceback.format_exc()

两种耗时模型绘制的曲线效果图如下:

函数耗时高斯分布曲线及阈值效果示例

 

函数耗时曲线及Turkey箱型图阈值示例

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

30 个好用的 Python 编程技巧、小贴士

作者 | Erik-Jan van Baaren 
译者 | 弯月,责编 | 屠敏 
出品 | CSDN(ID:CSDNnews)
以下为译文:
借本文为大家献上 Python 语言的 30 个最佳实践、小贴士和技巧,希望能对各位勤劳的程序员有所帮助,并希望大家工作顺利! 

1. Python 版本

在此想提醒各位:自2020年1月1日起,Python 官方不再支持 Python 2。本文中的很多示例只能在 Python 3 中运行。如果你仍在使用 Python 2.7,请立即升级。

2. Python 编程技巧 – 检查 Python 的最低版本

你可以在代码中检查 Python 的版本,以确保你的用户没有在不兼容的版本中运行脚本。检查方式如下:
if not sys.version_info > (27):
   # berate your user for running a 10 year
   # python version
elif not sys.version_info >= (35):
   # Kindly tell your user (s)he needs to upgrade
   # because you’re using 3.5 features

3.Python 编程技巧 – IPython

IPython 本质上就是一个增强版的shell。就冲着自动补齐就值得一试,而且它的功能还不止于此,它还有很多令我爱不释手的命令,例如:
  • %cd:改变当前的工作目录

  • %edit:打开编辑器,并关闭编辑器后执行键入的代码

  • %env:显示当前环境变量

  • %pip install [pkgs]:无需离开交互式shell,就可以安装软件包

  • %time 和 %timeit:测量执行Python代码的时间

完整的命令列表,请点击此处查看(https://ipython.readthedocs.io/en/stable/interactive/magics.html)。
还有一个非常实用的功能:引用上一个命令的输出。In 和 Out 是实际的对象。你可以通过 Out[3] 的形式使用第三个命令的输出。
IPython 的安装命令如下:
pip3 install ipython

4.Python 编程技巧 – 列表推导式

你可以利用列表推导式,避免使用循环填充列表时的繁琐。列表推导式的基本语法如下:
[ expression for item in list if conditional ]
举一个基本的例子:用一组有序数字填充一个列表:
mylist = [i for i in range(10)]
print(mylist)
# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
由于可以使用表达式,所以你也可以做一些算术运算:
squares = [x**2 for x in range(10)]
print(squares)
# [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
甚至可以调用外部函数:
def some_function(a):
    return (a + 5) / 2

 

my_formula = [some_function(i) for i in range(10)]
print(my_formula)
# [2, 3, 3, 4, 4, 5, 5, 6, 6, 7]

最后,你还可以使用 ‘if’ 来过滤列表。在如下示例中,我们只保留能被2整除的数字:
filtered = [i for i in range(20) if i%2==0]
print(filtered)
# [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

5. Python 编程技巧 -检查对象使用内存的状况

你可以利用 sys.getsizeof() 来检查对象使用内存的状况:
import sys

 

mylist = range(010000)
print(sys.getsizeof(mylist))
# 48

等等,为什么这个巨大的列表仅包含48个字节?
因为这里的 range 函数返回了一个类,只不过它的行为就像一个列表。在使用内存方面,range 远比实际的数字列表更加高效。
你可以试试看使用列表推导式创建一个范围相同的数字列表: 
import sys

 

myreallist = [x for x in range(010000)]
print(sys.getsizeof(myreallist))
# 87632

6. Python 编程技巧 – 返回多个值

Python 中的函数可以返回一个以上的变量,而且还无需使用字典、列表或类。如下所示:
def get_user(id):
    # fetch user from database
    # ….
    return name, birthdate

 

name, birthdate = get_user(4)

如果返回值的数量有限当然没问题。但是,如果返回值的数量超过3个,那么你就应该将返回值放入一个(数据)类中。

7. Python 编程技巧 – 使用数据类

Python从版本3.7开始提供数据类。与常规类或其他方法(比如返回多个值或字典)相比,数据类有几个明显的优势:
  • 数据类的代码量较少

  • 你可以比较数据类,因为数据类提供了 __eq__ 方法

  • 调试的时候,你可以轻松地输出数据类,因为数据类还提供了 __repr__ 方法

  • 数据类需要类型提示,因此可以减少Bug的发生几率 

数据类的示例如下:
from dataclasses import dataclass

 

@dataclass
class Card:
    rank: str
    suit: str

card = Card(“Q”“hearts”)

print(card == card)
# True

print(card.rank)
# ‘Q’

print(card)
Card(rank=‘Q’, suit=‘hearts’)

详细的使用指南请点击这里(https://realpython.com/python-data-classes/)。

8. Python 编程技巧 – 交换变量

如下的小技巧很巧妙,可以为你节省多行代码:
a = 1
b = 2
a, b = b, a
print (a)
# 2
print (b)
# 1

9. Python 编程技巧 – 合并字典(Python 3.5以上的版本)

从Python 3.5开始,合并字典的操作更加简单了:
dict1 = { ‘a’: 1, ‘b’: 2 }
dict2 = { ‘b’: 3, ‘c’: 4 }
merged = { **dict1, **dict2 }
print (merged)
# {‘a’: 1, ‘b’: 3, ‘c’: 4}
如果 key 重复,那么第一个字典中的 key 会被覆盖。

10. Python 编程技巧 – 字符串的首字母大写

如下技巧真是一个小可爱:
mystring = “10 awesome python tricks”
print(mystring.title())
’10 Awesome Python Tricks’

11. Python 编程技巧 – 将字符串分割成列表

你可以将字符串分割成一个字符串列表。在如下示例中,我们利用空格分割各个单词:
mystring = “The quick brown fox”
mylist = mystring.split(‘ ‘)
print(mylist)
# [‘The’, ‘quick’, ‘brown’, ‘fox’]

12. Python 编程技巧 – 根据字符串列表创建字符串

与上述技巧相反,我们可以根据字符串列表创建字符串,然后在各个单词之间加入空格:
mylist = [‘The’‘quick’‘brown’‘fox’]
mystring = ” “.join(mylist)
print(mystring)
# ‘The quick brown fox’
你可能会问为什么不是 mylist.join(” “),这是个好问题!
根本原因在于,函数 String.join() 不仅可以联接列表,而且还可以联接任何可迭代对象。将其放在String中是为了避免在多个地方重复实现同一个功能。

13. Python 编程技巧 – 表情符

有些人非常喜欢表情符,而有些人则深恶痛绝。我在此郑重声明:在分析社交媒体数据时,表情符可以派上大用场。
首先,我们来安装表情符模块:
pip3 install emoji
安装完成后,你可以按照如下方式使用:
import emoji
result = emoji.emojize(‘Python is :thumbs_up:’)
print(result)
# ‘Python is 👍’

 

# You can also reverse this:
result = emoji.demojize(‘Python is 👍’)
print(result)
# ‘Python is :thumbs_up:’

更多有关表情符的示例和文档,请点击此处(https://pypi.org/project/emoji/)。

14. Python 编程技巧 – 列表切片

列表切片的基本语法如下:
a[start:stop:step]
start、stop 和 step 都是可选项。如果不指定,则会使用如下默认值:
  • start:0

  • end:字符串的结尾

  • step:1

示例如下:
# We can easily create a new list from 
# the first two elements of a list:
first_two = [1, 2, 3, 4, 5][0:2]
print(first_two)
# [1, 2]

 

# And if we use a step value of 2, 
# we can skip over every second number
# like this:
steps = [1, 2, 3, 4, 5][0:5:2]
print(steps)
# [1, 3, 5]

# This works on strings too. In Python,
# you can treat a string like a list of
# letters:
mystring = “abcdefdn nimt”[::2]
print(mystring)
# ‘aced it’

15. Python 编程技巧 – 反转字符串和列表

你可以利用如上切片的方法来反转字符串或列表。只需指定 step 为 -1,就可以反转其中的元素:
revstring = “abcdefg”[::-1]
print(revstring)
# ‘gfedcba’

 

revarray = [1, 2, 3, 4, 5][::-1]
print(revarray)
# [5, 4, 3, 2, 1]

16. Python 编程技巧 – 显示猫猫

我终于找到了一个充分的借口可以在我的文章中显示猫猫了,哈哈!当然,你也可以利用它来显示图片。首先你需要安装 Pillow,这是一个 Python 图片库的分支:
pip3 install Pillow
接下来,你可以将如下图片下载到一个名叫 kittens.jpg 的文件中:
然后,你就可以通过如下 Python 代码显示上面的图片:
from PIL import Image

 

im = Image.open(“kittens.jpg”)
im.show()
print(im.format, im.size, im.mode)
# JPEG (1920, 1357) RGB

Pillow 还有很多显示该图片之外的功能。它可以分析、调整大小、过滤、增强、变形等等。完整的文档,请点击这里(https://pillow.readthedocs.io/en/stable/)。

17. Python 编程技巧 – map()

Python 有一个自带的函数叫做 map(),语法如下:
map(functionsomething_iterable)
所以,你需要指定一个函数来执行,或者一些东西来执行。任何可迭代对象都可以。在如下示例中,我指定了一个列表:
def upper(s):
    return s.upper()

 

mylist = list(map(upper, [‘sentence’‘fragment’]))
print(mylist)
# [‘SENTENCE’, ‘FRAGMENT’]

# Convert a string representation of
# a number into a list of ints.
list_of_ints = list(map(int“1234567”)))
print(list_of_ints)
# [1, 2, 3, 4, 5, 6, 7]

你可以仔细看看自己的代码,看看能不能用 map() 替代某处的循环。

18. Python 编程技巧 – 获取列表或字符串中的唯一元素

如果你利用函数 set() 创建一个集合,就可以获取某个列表或类似于列表的对象的唯一元素:
mylist = [1, 1, 2, 3, 4, 5, 5, 5, 6, 6]
print (set(mylist))
# {1, 2, 3, 4, 5, 6}

 

# And since a string can be treated like a 
# list of letters, you can also get the 
# unique letters from a string this way:
print (set(“aaabbbcccdddeeefff”))
# {‘a’, ‘b’, ‘c’, ‘d’, ‘e’, ‘f’}

19. Python 编程技巧 – 查找出现频率最高的值

你可以通过如下方法查找出现频率最高的值:
test = [1, 2, 3, 4, 2, 2, 3, 1, 4, 4, 4]
print(max(set(test), key = test.count))
# 4
你能看懂上述代码吗?想法搞明白上述代码再往下读。
没看懂?我来告诉你吧:
  • max() 会返回列表的最大值。参数 key 会接受一个参数函数来自定义排序,在本例中为 test.count。该函数会应用于迭代对象的每一项。

  • test.count 是 list 的内置函数。它接受一个参数,而且还会计算该参数的出现次数。因此,test.count(1) 将返回2,而 test.count(4) 将返回4。

  • set(test) 将返回 test 中所有的唯一值,也就是 {1, 2, 3, 4}。

因此,这一行代码完成的操作是:首先获取 test 所有的唯一值,即{1, 2, 3, 4};然后,max 会针对每一个值执行 list.count,并返回最大值。
这一行代码可不是我个人的发明。

20. Python 编程技巧 – 创建一个进度条

你可以创建自己的进度条,听起来很有意思。但是,更简单的方法是使用 progress 包:
pip3 install progress
接下来,你就可以轻松地创建进度条了:
from progress.bar import Bar

 

bar = Bar(‘Processing’, max=20)
for i in range(20):
    # Do some work
    bar.next()
bar.finish()

21. Python 编程技巧 – 在交互式shell中使用_(下划线运算符)

你可以通过下划线运算符获取上一个表达式的结果,例如在 IPython 中,你可以这样操作:
In [1]: 3 * 3
Out[1]: 9In [2]: _ + 3
Out[2]: 12
Python Shell 中也可以这样使用。另外,在 IPython shell 中,你还可以通过 Out[n] 获取表达式 In[n] 的值。例如,在如上示例中,Out[1] 将返回数字9。

22. Python 编程技巧 – 快速创建Web服务器

你可以快速启动一个Web服务,并提供当前目录的内容:
python3 -m http.server
当你想与同事共享某个文件,或测试某个简单的HTML网站时,就可以考虑这个方法。

23. Python 编程技巧 – 多行字符串

虽然你可以用三重引号将代码中的多行字符串括起来,但是这种做法并不理想。所有放在三重引号之间的内容都会成为字符串,包括代码的格式,如下所示。
我更喜欢另一种方法,这种方法不仅可以将多行字符串连接在一起,而且还可以保证代码的整洁。唯一的缺点是你需要明确指定换行符。
s1 = “””Multi line strings can be put
        between triple quotes. It’s not ideal
        when formatting your code though”””

 

print (s1)
# Multi line strings can be put
#         between triple quotes. It’s not ideal
#         when formatting your code though

s2 = (“You can also concatenate multiple\n” +
        “strings this way, but you’ll have to\n”
        “explicitly put in the newlines”)

print(s2)
# You can also concatenate multiple
# strings this way, but you’ll have to
# explicitly put in the newlines

24. Python 编程技巧 – 条件赋值中的三元运算符

这种方法可以让代码更简洁,同时又可以保证代码的可读性:
[on_trueif [expression] else [on_false]
示例如下:
x = “Success!” if (y == 2) else “Failed!”

25. Python 编程技巧 – 统计元素的出现次数

你可以使用集合库中的 Counter 来获取列表中所有唯一元素的出现次数,Counter 会返回一个字典:
from collections import Counter

 

mylist = [1123455566]
c = Counter(mylist)
print(c)
# Counter({1: 2, 2: 1, 3: 1, 4: 1, 5: 3, 6: 2})

# And it works on strings too:
print(Counter(“aaaaabbbbbccccc”))
# Counter({‘a’: 5, ‘b’: 5, ‘c’: 5})

26. Python 编程技巧 – 比较运算符的链接

你可以在 Python 中将多个比较运算符链接到一起,如此就可以创建更易读、更简洁的代码:
x = 10

 

# Instead of:
if x > 5 and x < 15:
    print(“Yes”)
# yes

# You can also write:
if 5 < x < 15:
    print(“Yes”)
# Yes

27. Python 编程技巧 – 添加颜色

你可以通过 Colorama,设置终端的显示颜色:
from colorama import Fore, Back, Style

 

print(Fore.RED + ‘some red text’)
print(Back.GREEN + ‘and with a green background’)
print(Style.DIM + ‘and in dim text’)
print(Style.RESET_ALL)
print(‘back to normal now’)

28. Python 编程技巧 – 日期的处理

python-dateutil 模块作为标准日期模块的补充,提供了非常强大的扩展,你可以通过如下命令安装: 
pip3 install python-dateutil 
你可以利用该库完成很多神奇的操作。在此我只举一个例子:模糊分析日志文件中的日期:
from dateutil.parser import parse

 

logline = ‘INFO 2020-01-01T00:00:01 Happy new year, human.’
timestamp = parse(log_line, fuzzy=True)
print(timestamp)
# 2020-01-01 00:00:01

你只需记住:当遇到常规 Python 日期时间功能无法解决的问题时,就可以考虑 python-dateutil !

29.Python 编程技巧 – 整数除法

在 Python 2 中,除法运算符(/)默认为整数除法,除非其中一个操作数是浮点数。因此,你可以这么写:
# Python 2
5 / 2 = 2
5 / 2.0 = 2.5
在 Python 3 中,除法运算符(/)默认为浮点除法,而整数除法的运算符为 //。因此,你需要这么写:
Python 3
5 / 2 = 2.5
5 // 2 = 2
这项变更背后的动机,请参阅 PEP-0238(https://www.python.org/dev/peps/pep-0238/)。

30. Python 编程技巧 – 通过chardet 来检测字符集

你可以使用 chardet 模块来检测文件的字符集。在分析大量随机文本时,这个模块十分实用。安装方法如下:
pip install chardet
安装完成后,你就可以使用命令行工具 chardetect 了,使用方法如下:
chardetect somefile.txt
somefile.txtascii with confidence 1.0
你也可以在编程中使用该库,完整的文档请点击这里:
https://chardet.readthedocs.io/en/latest/usage.html
这 30 个小例子虽然有一些是老生长谈,但是确实非常经典,值得反复记忆、练习和收藏!

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典