标签归档:爬虫

Python 教你批量下载多用户抖音去水印短视频

TikTokDownload 是由国人开源的抖音去水印视频下载工具。开源地址是:

https://github.com/Johnserf-Seed/TikTokDownload

对于某些做视频分析和研究的同学来说,这个工具非常有用,可以快速获取到视频资料。

下面就来介绍一下这个工具的使用方法。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

git clone https://github.com/Johnserf-Seed/TikTokDownload.git
cd TikTokDownload
pip install -r requirements.txt

如果你的网络环境无法访问Github,你可以在Python实用宝典公众号后台回复 TikTokDownload 下载最新源代码(2022-11-26)。

2.抖音去水印短视频下载方法

运行软件前先打开目录下 conf.ini 文件按照要求进行配置:

配置完成后,在 TikTokDownload 目录下新建一个py文件,填入以下代码即可使用:

# example.py
import TikTokDownload as TK
import Util

# 单视频下载
# TK.video_download(*TK.main())

# 批量下载
if __name__ == '__main__':
    # 获取命令行参数
    cmd = Util.Command()
    # 获取用户主页数据
    profile = Util.Profile()
    # 使用参数,没有则使用默认参数并下载
    profile.getProfile(cmd.setting())
	# 如果需要定时下载则注释这个input
    input('[  完成  ]:已完成批量下载,输入任意键后退出:')

效果如下:

​视频会被默认保存在当前目录的Download目录下。

单个视频链接,你可以通过 TK.video_download 下载

import TikTokDownload as TK
TK.video_download("视频链接", "yes")

video_download 第一个参数是视频的原始链接,第二个参数表明是否下载音乐原声,yes为下载。

3.批量多用户下载

通过修改配置的方式,我们只能实现逐个的用户短视频下载,每次下载新的用户的短视频都得修改配置,这非常麻烦。

如果我们想要在一次运行中就下载到全部用户的短视频应该怎么做呢?

方法很简单,我们把想要下载的抖音号和对应的Userid放在rooms.txt中, 用逗号分隔:

1545798353,MS4wLjABAAAAdv-v-WcZO48UMZRDLB-huZxYObcxv5Z5FFWXKw4-o_8
135180247,MS4wLjABAAAAtmTX6GSVN_AFW792_8srxdu1kPNXkuSGoG8Xl8xDHbE

使用下方这份代码,就可以将两个作者的全部短视频下载下来。

# 公众号:Python实用宝典
import Util

def read_rooms():
    f = open("rooms.txt", "r", encoding="utf-8")
    short_rooms = ["https://www.douyin.com/user/" + l.strip("\n").split(",")[1] for l in f.readlines()]
    return short_rooms

# 批量下载
if __name__ == '__main__':
    userids = []
    cmd = Util.Command()
    for room in read_rooms():
        setting = cmd.setting()
        setting[0] = room
        # 获取用户主页数据
        profile = Util.Profile()
        # 使用参数,没有则使用默认参数并下载
        profile.getProfile(tuple(setting))

在TikTokDownload目录下保存为batch_download.py,然后使用Python运行这个py文件即可:

cd TikTokDownload
python batch_download.py

4.常见错误

  1. 单个视频链接与用户主页链接要分清,软件闪退可以通过终端运行查看报错信息(一般是链接弄错的问题)如:
    • 链接一定要输入仔细,配置文件只支持用户主页
  2. 配置文件一定要注意编码格式(推荐Notepad++)

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 模拟登陆神库!集合了20+个平台的模拟登陆脚本

Awesome-python-login-model 是一个国人开发的模拟登陆仓库,在这个仓库上有20几个网站的模拟登陆脚本,你可以基于这个仓库实现的代码做简易的修改,以实现自己的自动化功能。

https://github.com/Kr1s77/awesome-python-login-model

其支持模拟登陆的网站有:

可以看到,支持的站点非常多,大家可以从他仓库里学到许多关于模拟登陆的方法,简单的来讲,大多数脚本采用的是直接登录的方式,有的网站直接登录难度很大,比如qq空间,bilibili等使用 selenium + webdriver 的方式就相对轻松一些。

一些网站虽然在登录的时候采用的是selenium的方式,为了效率,我们可以在登录过后得到的cookie维护起来,然后调用 requests 或者 scrapy 等进行数据采集,这样数据采集的速度可以得到保证。

如果你无法登陆 Github 克隆它的仓库,请在 Python 实用宝典 后台回复 Awesome-python-login-model 下载仓库代码。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

使用这个仓库的时候,你需要按需安装并加载相应的模块,不过无非就是以下几个模块:

pip install beautifulsoup4
pip install selenium
pip install pyppeteer
pip install pillow

上面的模块你并不需要全部安装,最好是找到你所需要模拟登陆的网站的脚本,查看它头部 import 了什么模块,按需安装即可。

2.简单的模拟登陆实战

下面来看一个拉勾网的登陆脚本:

# -*- coding:utf-8 -*-
import re
import os
import time
import json
import sys
import subprocess
import requests
import hashlib
from bs4 import BeautifulSoup

"""
info:
author:CriseLYJ
github:https://github.com/CriseLYJ/
update_time:2019-3-6
"""


class Lagou_login(object):
    def __init__(self):
        self.session = requests.session()
        self.CaptchaImagePath = os.path.split(os.path.realpath(__file__))[0] + os.sep + 'captcha.jpg'
        self.HEADERS = {'Referer': 'https://passport.lagou.com/login/login.html',
                        'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36'
                                      ' (KHTML, like Gecko) Chrome/53.0.2785.104 Safari/537.36'
                                      ' Core/1.53.4882.400 QQBrowser/9.7.13059.400',
                        'X-Requested-With': 'XMLHttpRequest'}

    # 密码加密
    def encryptPwd(self, passwd):
        # 对密码进行了md5双重加密
        passwd = hashlib.md5(passwd.encode('utf-8')).hexdigest()
        # veennike 这个值是在js文件找到的一个写死的值
        passwd = 'veenike' + passwd + 'veenike'
        passwd = hashlib.md5(passwd.encode('utf-8')).hexdigest()
        return passwd

    # 获取请求token
    def getTokenCode(self):
        login_page = 'https://passport.lagou.com/login/login.html'

        data = self.session.get(login_page, headers=self.HEADERS)

        soup = BeautifulSoup(data.content, "lxml", from_encoding='utf-8')
        '''
            要从登录页面提取token,code, 在头信息里面添加
            <!-- 页面样式 --><!-- 动态token,防御伪造请求,重复提交 -->
            <script type="text/javascript">
                window.X_Anti_Forge_Token = 'dde4db4a-888e-47ca-8277-0c6da6a8fc19';
                window.X_Anti_Forge_Code = '61142241';
            </script>
        '''
        anti_token = {'X-Anit-Forge-Token': 'None',
                      'X-Anit-Forge-Code': '0'}
        anti = soup.findAll('script')[1].getText().splitlines()
        anti = [str(x) for x in anti]

        anti_token['X-Anit-Forge-Token'] = re.findall(r'= \'(.+?)\'', anti[1])[0]
        anti_token['X-Anit-Forge-Code'] = re.findall(r'= \'(.+?)\'', anti[2])[0]

        return anti_token

    # 人工读取验证码并返回
    def getCaptcha(self):
        captchaImgUrl = 'https://passport.lagou.com/vcode/create?from=register&refresh=%s' % time.time()
        # 写入验证码图片
        f = open(self.CaptchaImagePath, 'wb')
        f.write(self.session.get(captchaImgUrl, headers=self.HEADERS).content)
        f.close()
        # 打开验证码图片
        if sys.platform.find('darwin') >= 0:
            subprocess.cx5c r[p'6;-]l=09all(['open', self.CaptchaImagePath])
        elif sys.platform.find('linux') >= 0:
            subprocess.call(['xdg-open', self.CaptchaImagePath])
        else:
            os.startfile(self.CaptchaImagePath)

        # 输入返回验证码
        captcha = input("请输入当前地址(% s)的验证码: " % self.CaptchaImagePath)
        print('你输入的验证码是:% s' % captcha)
        return captcha

    # 登陆操作
    def login(self, user, passwd, captchaData=None, token_code=None):
        postData = {'isValidate': 'true',
                    'password': passwd,
                    # 如需验证码,则添加上验证码
                    'request_form_verifyCode': (captchaData if captchaData != None else ''),
                    'submit': '',
                    'username': user
                    }
        login_url = 'https://passport.lagou.com/login/login.json'

        # 头信息添加tokena
        login_headers = self.HEADERS.copy()
        token_code = self.getTokenCode() if token_code is None else token_code
        login_headers.update(token_code)

        # data = {"content":{"rows":[]},"message":"该帐号不存在或密码错误,请重新输入","state":400}
        response = self.session.post(login_url, data=postData, headers=login_headers)
        data = json.loads(response.content.decode('utf-8'))

        if data['state'] == 1:
            return response.content
        elif data['state'] == 10010:
            print(data['message'])
            captchaData = self.getCaptcha()
            token_code = {'X-Anit-Forge-Code': data['submitCode'], 'X-Anit-Forge-Token': data['submitToken']}
            return self.login(user, passwd, captchaData, token_code)
        else:
            print(data['message'])
            return False


if __name__ == "__main__":

    username = input("请输入你的手机号或者邮箱\n >>>:")
    passwd = input("请输入你的密码\n >>>:")

    lg = Lagou_login()
    passwd = lg.encryptPwd(passwd)

    data = lg.login(username, passwd)
    if data:
        print(data)
        print('登录成功')
    else:
        print('登录不成功')

从头部的 import 引入来看,你需要安装并加载 Beautifulsoup4 模块:

pip install beautifulsoup4

安装完成后,终端需要 cd 进入此脚本所在文件夹,执行脚本:

python Lagou.py

运行脚本后需要你输入一定的信息进行登陆,做得非常方便和贴心:

登陆完成后,你就可以做任何你想要做的事情了。

3.基于selenium的模拟登陆

有些网站的爬取没有那么简单,他们会做权限校验、会做反爬机制。这种情况下,我们可以用selenium解决一些比较困难和复杂的登陆场景。

基于selenium的模拟登陆稍微复杂一点,你需要设置chromedriver的路径到环境变量中。如果你没有设置,运行登陆脚本的时候会出现以下错误:

怎么下载并设置 Chromedriver 到环境变量里呢?你可以在这里下载到最新版的chromedriver:

https://chromedriver.chromium.org/

现在最新版 Chromedriver 版本号到了 91.0.4472.101,下载链接如下: https://chromedriver.storage.googleapis.com/index.html?path=91.0.4472.101

可以看到,每个系统需要下载的 Chromedriver 版本不一样,请对应你的系统下载指定的版本即可。mac64 和 mac_m1指的是使用了不同芯片的Mac笔记本,你可以在Mac上,单击菜单栏左上角的[Apple]图标,然后选择“关于本机”选项。看到如下写着芯片 Apple M1 则应该下载mac_m1版本。

如果你的网络存在问题无法下载,没关系,关注 Python实用宝典 公众号,后台回复 Chromedriver 即可下载,我已经把这4个版本放到了国内网盘上。

下载 Chromedriver 完成后,你还需要设置环境变量

(macOS 系统) 请这样设置环境变量:

1. 把解压得到的 Chromedriver 放到一个你不会经常变动的路径

如 /usr/local/bin/ ,你需要 Command+空格 输入并打开终端(Terminal),执行以下命令:

cd /usr/local/bin/
open .

然后将 Chromedriver 拖入,就能成功将 Chromedriver 放入其中。

2.添加环境变量

在终端输入下列命令就能添加到环境变量:

export PATH=$PATH:/usr/local/bin/chromedriver

执行完这一步,恭喜你成功在 macOS 上安装了 Chromedriver.

(Windows 系统) 请这样设置环境变量:

1.在左下角搜索环境变量,打开“编辑系统环境变量”的选项:

2.设置 Chromedriver 环境变量:

将你的 chromedriver 所在目录放入到 PATH 变量中,如图所示。比如我的 chromedriver.exe 的路径是C:\Users\83493\Documents\bin\chromedriver.exe, 那么此处就应该增加 C:\Users\83493\Documents\bin 路径。

设置完成后,你便成功在 Windows 上安装了 Chromedriver. 另外请注意设置后要重启终端或CMD让环境变量生效。

另外如果你在使用 Chromedriver 的时候出现了类似于以下的报错,不要慌:

这是由于当前 Chromedriver 版本是91, 而你现有的 Chrome 版本是 90 造成的,升级Chrome即可解决问题。

完成selenium的基本配置后,我们可以尝试运行QQ空间模拟登陆:

进入项目文件夹的qqzone文件夹:

cd awesome-python-login-model\qqzone

然后直接运行 qq_zone.py 文件:

python qq_zone.py

此时会弹出一个浏览器并让你输入信息:

输入信息后,就会正常走登陆流程:

看到如上的界面,说明登陆完成,此时Cookie什么的都已经被设定完毕,你可以把Cookie存下来,并做任何你想做的事情了。

如果你愿意研究作者的代码,你会发现其实很简单:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
info:
author:CriseLYJ
github:https://github.com/CriseLYJ/
update_time:2019-3-7
"""

import time  # 用来延时
from selenium import webdriver

driver = webdriver.Chrome()  # 选择浏览器,此处我选择的Chrome
QQ_NUMBER = input('请输入你的QQ号')
PASSWORD = input('请输入你的QQ密码')

driver.get('http://i.qq.com/')
driver.switch_to.frame('login_frame')
driver.find_element_by_id('switcher_plogin').click()

driver.find_element_by_name('u').clear()
driver.find_element_by_name('u').send_keys(QQ_NUMBER)  # 此处输入你的QQ号
driver.find_element_by_name('p').clear()
driver.find_element_by_name('p').send_keys(PASSWORD)  # 此处输入你的QQ密码

driver.execute_script("document.getElementById('login_button').parentNode.hidefocus=false;")

driver.find_element_by_xpath('//*[@id="loginform"]/div[4]/a').click()
driver.find_element_by_id('login_button').click()

time.sleep(10)  # 因为我曾经是QQ会员,所以每次登陆时都会提醒我要不要再续费的弹窗...

driver.find_element_by_id('dialog_button_1').click()  # 这个地方是我把那个弹窗给点击了,配合上面的延时用的,延时是等待那个弹窗出现,然后此处点击取消

btns = driver.find_elements_by_css_selector('a.item.qz_like_btn_v3')  # 此处是CSS选择器
for btn in btns:
    btn.click()

简单的讲,代码一共分了4个步骤,分别如下:

1.让使用者输入QQ号和密码。

2.切换浏览器焦点到登录框中,选择元素输入账号和密码。

3.为了显示登录按钮,执行了以下脚本:

driver.execute_script("document.getElementById('login_button').parentNode.hidefocus=false;")

4.点击确认按钮,完成登录。

可以看到,基于 Selenium 的自动化控制一点都不难,一旦熟悉控制流程及相应的方法后应该如鱼得水。只要你度过一开始安装 Chromedriver 时的繁琐阶段,后面代码开发时多参考他人的代码,Selenium这个自动化工具是可以被熟练掌握的。

总而言之,Awesome-python-login-model 这个模拟登陆的代码库,可以给你带来不少的便利,你可以直接基于它提供的登陆脚本开发,也可以参考这些脚本自己写一个其他网站的模拟登陆脚本,并给作者提交PR。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Selenium 实战教程-爬取人民网留言板单进程、多线程、多进程版

本文讲解了如何通过selenium爬取人民网留言板留言,并从单进程、多线程、多进程版角度进行爬虫性能优化,是一篇优秀的 selenium 实战教程。

第一节:单进程版+selenium实战教程

一、项目概述

1.项目说明

本项目主要是对领导留言板内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。网站链接是http://liuyan.people.com.cn/home?p=0,任意选择一条留言点击进入详情页后,如下对于图中标出的数据,均要进行爬取,以此构成一条留言的组成部分。

2.环境配置

(1)Python:3.x
(2)所需库:

  • dateutil
    • 安装方法:
    • pip install python-dateutil
  • selenium
    • 安装方法:
    • pip install selenium

(3)模拟驱动:chromedriver,可点击https://download.csdn.net/download/CUFEECR/12193208进行下载Google浏览器80.0.3987.16版对应版本,或点击http://chromedriver.storage.googleapis.com/index.html下载与Google对应版本,并放入Python对应安装路径下的Scripts目录下。

二、项目实施

1.导入所需要的库

import csv
import os
import random
import re
import time

import dateutil.parser as dparser
from random import choice
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options

主要导入在爬取过程中需要用到的处理库和selenium中要用到的类。

2.全局变量和参数配置

## 时间节点
start_date = dparser.parse('2019-06-01')
## 浏览器设置选项
chrome_options = Options()
chrome_options.add_argument('blink-settings=imagesEnabled=false')

我们假设只爬取2019.6.1以后的留言,因为这之前的留言自动给好评,没有参考价值,因此设置时间节点,并禁止网页加载图片,减少对网络的带宽要求、提升加载速率。

3.产生随机时间和用户代理

def get_time():
    '''获取随机时间'''
    return round(random.uniform(36), 1)


def get_user_agent():
    '''获取随机用户代理'''
    user_agents = [
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
        "Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
        "Mozilla/5.0 (iPod; U; CPU iPhone OS 2_1 like Mac OS X; ja-jp) AppleWebKit/525.18.1 (KHTML, like Gecko) Version/3.1.1 Mobile/5F137 Safari/525.20",
        "Mozilla/5.0 (Linux;u;Android 4.2.2;zh-cn;) AppleWebKit/534.46 (KHTML,like Gecko) Version/5.1 Mobile Safari/10600.6.3 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
        "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    ]
    ## 在user_agent列表中随机产生一个代理,作为模拟的浏览器
    user_agent = choice(user_agents)
    return user_agent

产生随机时间并随机模拟浏览器用于访问网页,降低被服务器识别出是爬虫而被禁的可能。

4.获取领导的fid

def get_fid():
    '''获取所有领导id'''
    with open('url_fid.txt''r'as f:
        content = f.read()
        fids = content.split()
    return fids

每个领导都有一个fid用于区分,这里采用手动获取fid并保存到txt中,在开始爬取时再逐行读取。

5.获取领导所有留言链接

def get_detail_urls(position, list_url):
    '''获取每个领导的所有留言链接'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    drivertemp = webdriver.Chrome(options=chrome_options)
    drivertemp.maximize_window()
    drivertemp.get(list_url)
    time.sleep(2)
    ## 循环加载页面
    while True:
        datestr = WebDriverWait(drivertemp, 10).until(
            lambda driver: driver.find_element_by_xpath(
                '//*[@id="list_content"]/li[position()=last()]/h3/span')).text.strip()
        datestr = re.search(r'\d{4}-\d{2}-\d{2}', datestr).group()
        date = dparser.parse(datestr, fuzzy=True)
        print('正在爬取链接 --', position, '--', date)
        if date < start_date:
            break
        ## 模拟点击加载
        try:
            WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.ID, "show_more")))
            drivertemp.execute_script('window.scrollTo(document.body.scrollHeight, document.body.scrollHeight - 600)')
            time.sleep(get_time())
            drivertemp.execute_script('window.scrollTo(document.body.scrollHeight - 600, document.body.scrollHeight)')
            WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.XPATH, '//*[@id="show_more"]')))
            drivertemp.find_element_by_xpath('//*[@id="show_more"]').click()
        except:
            break
        time.sleep(get_time() - 1)
    detail_elements = drivertemp.find_elements_by_xpath('//*[@id="list_content"]/li/h2/b/a')
    ## 获取所有链接
    for element in detail_elements:
        detail_url = element.get_attribute('href')
        yield detail_url
    drivertemp.quit()

根据第4步提供的fid找到一个领导对应的所有留言的链接,由于领导的留言列表并未一次显示完,下方有一个加载更多按钮,如下每次需要进行点击向下加载,所以要模拟点击的操作,向下滑动,等完全加载后再次点击,直到底部。函数返回值时,不是一次返回一个列表,而是通过yield关键字生成生成器,按照程序执行的进度生成url,可以减少内存的压力。

6.获取留言详情

def get_message_detail(driver, detail_url, writer, position):
    '''获取留言详情'''
    print('正在爬取留言 --', position, '--', detail_url)
    driver.get(detail_url)
    ## 判断,如果没有评论则跳过
    try:
        satis_degree = WebDriverWait(driver, 2.5).until(
            lambda driver: driver.find_element_by_class_name("sec-score_firstspan")).text.strip()
    except:
        return
    ## 获取留言各部分内容
    message_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/h3/span")).text
    message_date = re.search(r'\d{4}-\d{2}-\d{2}', message_date_temp).group()
    message_datetime = dparser.parse(message_date, fuzzy=True)
    if message_datetime < start_date:
        return
    message_title = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_class_name("context-title-text")).text.strip()
    label_elements = WebDriverWait(driver, 2.5).until(lambda driver: driver.find_elements_by_class_name("domainType"))
    try:
        label1 = label_elements[0].text.strip()
        label2 = label_elements[1].text.strip()
    except:
        label1 = ''
        label2 = label_elements[0].text.strip()
    message_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/p")).text.strip()
    replier = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[1]/i")).text.strip()
    reply_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/p")).text.strip()
    reply_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[2]/em")).text
    reply_date = re.search(r'\d{4}-\d{2}-\d{2}', reply_date_temp).group()
    review_scores = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_elements_by_xpath("/html/body/div[8]/ul/li[2]/h4[1]/span/span/span"))
    resolve_degree = review_scores[0].text.strip()[:-1]
    handle_atti = review_scores[1].text.strip()[:-1]
    handle_speed = review_scores[2].text.strip()[:-1]
    review_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/p")).text.strip()
    is_auto_review = '是' if (('自动默认好评' in review_content) or ('默认评价' in review_content)) else '否'
    review_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/h4[2]/em")).text
    review_date = re.search(r'\d{4}-\d{2}-\d{2}', review_date_temp).group()
    ## 存入CSV文件
    writer.writerow(
        [position, message_title, label1, label2, message_date, message_content, replier, reply_content, reply_date,
         satis_degree, resolve_degree, handle_atti, handle_speed, is_auto_review, review_content, review_date])

我们只需要有评论的留言,因此在最开始要过滤掉没有评论的留言。然后通过xpath、class_name等方式定位到相应的元素获取留言的各个部分的内容,每条留言共保存14个内容,并保存到csv中。

7.获取并保存领导所有留言

def get_officer_messages(index, fid):
    '''获取并保存领导的所有留言'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    driver = webdriver.Chrome(options=chrome_options)
    list_url = "http://liuyan.people.com.cn/threads/list?fid={}##state=4".format(fid)
    driver.get(list_url)
    position = WebDriverWait(driver, 10).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[4]/i")).text
    ## time.sleep(get_time())
    print(index, '-- 正在爬取 --', position)
    start_time = time.time()
    ## encoding='gb18030'
    csv_name = position + '.csv'
    ## 文件存在则删除重新创建
    if os.path.exists(csv_name):
        os.remove(csv_name)
    with open(csv_name, 'a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for detail_url in get_detail_urls(position, list_url):
            get_message_detail(driver, detail_url, writer, position)
            time.sleep(get_time())
    end_time = time.time()
    crawl_time = int(end_time - start_time)
    crawl_minute = crawl_time // 60
    crawl_second = crawl_time % 60
    print(position, '已爬取结束!!!')
    print('该领导用时:{}分钟{}秒。'.format(crawl_minute, crawl_second))
    driver.quit()
    time.sleep(5)

获取该领导的职位信息并为该领导创建一个独立的csv用于保存提取到的留言信息,调用get_message_detail()方法获取每条留言的具体信息并保存,计算出每个领导的执行时间。

8.合并文件

def merge_csv():
    '''将所有文件合并'''
    file_list = os.listdir('.')
    csv_list = []
    for file in file_list:
        if file.endswith('.csv'):
            csv_list.append(file)
    ## 文件存在则删除重新创建
    if os.path.exists('DATA.csv'):
        os.remove('DATA.csv')
    with open('DATA.csv''a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for csv_file in csv_list:
            with open(csv_file, 'r', encoding='gb18030'as csv_f:
                reader = csv.reader(csv_f)
                line_count = 0
                for line in reader:
                    line_count += 1
                    if line_count != 1:
                        writer.writerow(
                            (line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7], line[8],
                             line[9], line[10], line[11], line[12], line[13], line[14], line[15]))

将爬取的所有领导的数据进行合并。

9.主函数调用

def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    for index, fid in enumerate(fids):
        try:
            get_officer_messages(index + 1, fid)
        except:
            get_officer_messages(index + 1, fid)
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()

主函数中先获取领导所有留言,再合并所有数据文件,完成整个爬取过程,并统计整个程序的运行时间,便于分析运行效率。

三、结果、分析及说明

1.结果说明

完整代码和测试执行结果可点击https://download.csdn.net/download/CUFEECR/12198734下载,仅供交流学习,请勿滥用。整个执行过程较长,因为是单线程的,必须要等一个领导数据爬取完毕之后才能爬取下一个,我选择了10个领导进行测试,在云服务器中的运行结果分别如下显然,整个运行时间将近5小时,效率相对较低,有很大的提升空间。最终得到了合并的DATA.csv

2.改进分析

(1)该版本的代码未实现自动爬取所有的fid,需要手动保存,是其中一点不足,可以在后期改进。(2)爬取留言详情页也是采用的selenium模拟,会降低请求效率,可以考虑用requests库请求。(3)该版本是单进程(线程)的,必须要一个领导爬取完之后才能进行下一个领导的爬取,效率较低,特别是留言较多的领导耗时很长,可以考虑使用多进程或多线程进行优化。

第二节:多线程版+selenium实战教程

一、项目概述

本项目主要是对领导留言板http://liuyan.people.com.cn/home?p=0内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。具体项目说明和环境配置可参考本系列的第一篇Python 爬取留言板留言(一):单进程版+selenium模拟。本篇在第一篇的基础上做了一些改进

  1. 采用了多线程,设定同时运行的线程的数量为3,线程数量适中,这样在保证在同一时刻有多个线程在执行爬取的同时,也能避免线程过多对内存、CPU和网络带宽的高要求,从而大大降低了整体运行时间,这是该项目的主要改进。
  2. 对异常处理进行了优化,之前异常处理是放在获取一个领导对应的所有的留言链接函数里的,当获取不到加载更多按钮并且超时时就会抛出异常,这样使得如果异常发生在其他部分如获取留言详情时会被忽略,改进之后将其放入主函数,对于每一个领导都放入异常处理,这里涵盖了对该领导爬取时的所有操作,只要在任一环节报错都会捕捉到,同时增加了5层嵌套异常处理,增加了对出现异常的容忍度(在发生网络环境不好而加载不出页面、内存消耗较多而卡顿、被被爬取方反爬而不能爬取等情况时,可以对官员重新爬取以保证数据的完整程度和程序的健壮性)。

二、项目实施

由于在实现过程中有3种常用的方法实现多线程,因此对应也有3种不同的具体实现,这里选第1种进行说明:

1.导入所需要的库

import csv
import os
import random
import re
import time
import threading

import dateutil.parser as dparser
from random import choice
from selenium import webdriver
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options

主要导入在爬取过程中需要用到的处理库和selenium中要用到的类。

2.全局变量和参数配置

## 时间节点
start_date = dparser.parse('2019-06-01')
## 控制同时运行的线程数为3
sem = threading.Semaphore(3)
## 浏览器设置选项
chrome_options = Options()
chrome_options.add_argument('blink-settings=imagesEnabled=false')

我们假设只爬取2019.6.1以后的留言,因为这之前的留言自动给好评,没有参考价值,因此设置时间节点,同时在全局中设置同时运行的线程数为3,并禁止网页加载图片,减少对网络的带宽要求、提升加载速率。

3.产生随机时间和用户代理

def get_time():
    '''获取随机时间'''
    return round(random.uniform(36), 1)


def get_user_agent():
    '''获取随机用户代理'''
    user_agents = [
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 6.0; Acoo Browser; SLCC1; .NET CLR 2.0.50727; Media Center PC 5.0; .NET CLR 3.0.04506)",
        "Mozilla/4.0 (compatible; MSIE 7.0; AOL 9.5; AOLBuild 4337.35; Windows NT 5.1; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)",
        "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729; .NET CLR 2.0.50727; Media Center PC 6.0)",
        "Mozilla/5.0 (Linux; Android 5.1.1; Nexus 6 Build/LYZ28E) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.23 Mobile Safari/537.36",
        "Mozilla/5.0 (iPod; U; CPU iPhone OS 2_1 like Mac OS X; ja-jp) AppleWebKit/525.18.1 (KHTML, like Gecko) Version/3.1.1 Mobile/5F137 Safari/525.20",
        "Mozilla/5.0 (Linux;u;Android 4.2.2;zh-cn;) AppleWebKit/534.46 (KHTML,like Gecko) Version/5.1 Mobile Safari/10600.6.3 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
        "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    ]
    ## 在user_agent列表中随机产生一个代理,作为模拟的浏览器
    user_agent = choice(user_agents)
    return user_agent

产生随机时间并随机模拟浏览器用于访问网页,降低被服务器识别出是爬虫而被禁的可能。

4.获取领导的fid

def get_fid():
    '''获取所有领导id'''
    with open('url_fid.txt''r'as f:
        content = f.read()
        fids = content.split()
    return fids

每个领导都有一个fid用于区分,这里采用手动获取fid并保存到txt中,在开始爬取时再逐行读取。

5.获取领导所有留言链接

def get_detail_urls(position, list_url):
    '''获取每个领导的所有留言链接'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    drivertemp = webdriver.Chrome(options=chrome_options)
    drivertemp.maximize_window()
    drivertemp.get(list_url)
    time.sleep(2)
    ## 循环加载页面
    try:
        while WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.ID, "show_more"))):
            datestr = WebDriverWait(drivertemp, 10).until(
                lambda driver: driver.find_element_by_xpath(
                    '//*[@id="list_content"]/li[position()=last()]/h3/span')).text.strip()
            datestr = re.search(r'\d{4}-\d{2}-\d{2}', datestr).group()
            date = dparser.parse(datestr, fuzzy=True)
            print('正在爬取链接 --', position, '--', date)
            if date < start_date:
                break
            ## 模拟点击加载
            drivertemp.find_element_by_xpath('//*[@id="show_more"]').click()
            time.sleep(get_time())
        detail_elements = drivertemp.find_elements_by_xpath('//*[@id="list_content"]/li/h2/b/a')
        ## 获取所有链接
        for element in detail_elements:
            detail_url = element.get_attribute('href')
            yield detail_url
        drivertemp.quit()
    except TimeoutException:
        drivertemp.quit()
        get_detail_urls(position, list_url)

根据第4步提供的fid找到一个领导对应的所有留言的链接,由于领导的留言列表并未一次显示完,下方有一个加载更多按钮,如下每次需要进行点击向下加载,所以要模拟点击的操作,向下滑动,等完全加载后再次点击,直到底部,有可能会滑倒页面最底部不再显示按钮或者由于被反爬或网络不好而未加载出来,此时定位元素会超时,增加异常处理,递归调用。函数返回值时,不是一次返回一个列表,而是通过yield关键字生成生成器,按照程序执行的进度生成url,可以减少内存的压力。

6.获取留言详情

def get_message_detail(driver, detail_url, writer, position):
    '''获取留言详情'''
    print('正在爬取留言 --', position, '--', detail_url)
    driver.get(detail_url)
    ## 判断,如果没有评论则跳过
    try:
        satis_degree = WebDriverWait(driver, 2.5).until(
            lambda driver: driver.find_element_by_class_name("sec-score_firstspan")).text.strip()
    except:
        return
    ## 获取留言各部分内容
    message_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/h3/span")).text
    message_date = re.search(r'\d{4}-\d{2}-\d{2}', message_date_temp).group()
    message_datetime = dparser.parse(message_date, fuzzy=True)
    if message_datetime < start_date:
        return
    message_title = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_class_name("context-title-text")).text.strip()
    label_elements = WebDriverWait(driver, 2.5).until(lambda driver: driver.find_elements_by_class_name("domainType"))
    try:
        label1 = label_elements[0].text.strip()
        label2 = label_elements[1].text.strip()
    except:
        label1 = ''
        label2 = label_elements[0].text.strip()
    message_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/p")).text.strip()
    replier = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[1]/i")).text.strip()
    reply_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/p")).text.strip()
    reply_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[2]/em")).text
    reply_date = re.search(r'\d{4}-\d{2}-\d{2}', reply_date_temp).group()
    review_scores = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_elements_by_xpath("/html/body/div[8]/ul/li[2]/h4[1]/span/span/span"))
    resolve_degree = review_scores[0].text.strip()[:-1]
    handle_atti = review_scores[1].text.strip()[:-1]
    handle_speed = review_scores[2].text.strip()[:-1]
    review_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/p")).text.strip()
    is_auto_review = '是' if (('自动默认好评' in review_content) or ('默认评价' in review_content)) else '否'
    review_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/h4[2]/em")).text
    review_date = re.search(r'\d{4}-\d{2}-\d{2}', review_date_temp).group()
    ## 存入CSV文件
    writer.writerow(
        [position, message_title, label1, label2, message_date, message_content, replier, reply_content, reply_date,
         satis_degree, resolve_degree, handle_atti, handle_speed, is_auto_review, review_content, review_date])

我们只需要有评论的留言,因此在最开始要过滤掉没有评论的留言。然后通过xpath、class_name等方式定位到相应的元素获取留言的各个部分的内容,每条留言共保存14个属性,并保存到csv中。

7.获取并保存领导所有留言

user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    driver = webdriver.Chrome(options=chrome_options)
    list_url = "http://liuyan.people.com.cn/threads/list?fid={}##state=4".format(fid)
    driver.get(list_url)
    try:
        position = WebDriverWait(driver, 10).until(
            lambda driver: driver.find_element_by_xpath("/html/body/div[4]/i")).text
        print(index, '-- 正在爬取 --', position)
        start_time = time.time()
        csv_name = position + '.csv'
        ## 文件存在则删除重新创建
        if os.path.exists(csv_name):
            os.remove(csv_name)
        with open(csv_name, 'a+', newline='', encoding='gb18030'as f:
            writer = csv.writer(f, dialect="excel")
            writer.writerow(
                ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
                 '办理速度分''是否自动好评''评价内容''评价日期'])
            for detail_url in get_detail_urls(position, list_url):
                get_message_detail(driver, detail_url, writer, position)
                time.sleep(get_time())
        end_time = time.time()
        crawl_time = int(end_time - start_time)
        crawl_minute = crawl_time // 60
        crawl_second = crawl_time % 60
        print(position, '已爬取结束!!!')
        print('该领导用时:{}分钟{}秒。'.format(crawl_minute, crawl_second))
        driver.quit()
        time.sleep(5)
    except:
        driver.quit()
        get_officer_messages(index, fid)

获取该领导的职位信息并为该领导创建一个独立的csv用于保存提取到的留言信息,增加异常处理递归调用,调用get_message_detail()方法获取每条留言的具体信息并保存,计算出每个领导的执行时间。

8.合并文件

def merge_csv():
    '''将所有文件合并'''
    file_list = os.listdir('.')
    csv_list = []
    for file in file_list:
        if file.endswith('.csv'):
            csv_list.append(file)
    ## 文件存在则删除重新创建
    if os.path.exists('DATA.csv'):
        os.remove('DATA.csv')
    with open('DATA.csv''a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for csv_file in csv_list:
            with open(csv_file, 'r', encoding='gb18030'as csv_f:
                reader = csv.reader(csv_f)
                line_count = 0
                for line in reader:
                    line_count += 1
                    if line_count != 1:
                        writer.writerow(
                            (line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7], line[8],
                             line[9], line[10], line[11], line[12], line[13], line[14], line[15]))

将爬取的所有领导的数据进行合并。

9.主函数调用

多线程的实现主要在这部分,有3种方式实现:

  • 这通过threading.Semaphore()指定线程数量,后边在实现作为线程参数的函数时使用上下文处理器
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    thread_list = []
    ## 将所有线程加入线程列表,便于控制同时执行的线程数量
    for index, fid in enumerate(fids):
        t = threading.Thread(target=get_officer_messages, args=(index + 1, fid))
        thread_list.append([t, fid])
    for thread, fid in thread_list:
        ## 5层嵌套进行异常处理
        try:
            thread.start()
        except:
            try:
                thread.start()
            except:
                try:
                    thread.start()
                except:
                    try:
                        thread.start()
                    except:
                        try:
                            thread.start()
                        except:
                            ## 如果仍出现异常加入失败名单
                            print('该官员爬取失败,已存入失败名单,以备再爬')
                            if not os.path.exists('fid_not_success.txt'):
                                with open('fid_not_success.txt''a+'as f:
                                    f.write(fid)
                            else:
                                with open('fid_not_success.txt''a+'as f:
                                    f.write('\n' + fid)
                            continue
    for thread, fid in thread_list:
        thread.join()
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()
  • 通过concurrent.futures.ThreadPoolExecutor指定线程数量,并调用submit()函数实现线程的调用执行
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    with ThreadPoolExecutor(3as executor:
        for index, fid in enumerate(fids):
            ## 5层嵌套进行异常处理
            try:
                executor.submit(get_officer_messages, index + 1, fid)
            except:
                try:
                    executor.submit(get_officer_messages, index + 1, fid)
                except:
                    try:
                        executor.submit(get_officer_messages, index + 1, fid)
                    except:
                        try:
                            executor.submit(get_officer_messages, index + 1, fid)
                        except:
                            try:
                                executor.submit(get_officer_messages, index + 1, fid)
                            except:
                                ## 如果仍出现异常加入失败名单
                                print('该官员爬取失败,已存入失败名单,以备再爬')
                                if not os.path.exists('fid_not_success.txt'):
                                    with open('fid_not_success.txt''a+'as f:
                                        f.write(fid)
                                else:
                                    with open('fid_not_success.txt''a+'as f:
                                        f.write('\n' + fid)
                                continue
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()
  • 通过concurrent.futures.ThreadPoolExecutor指定线程数量,并调用map()函数实现函数和多个参数的映射来执行线程

def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    with ThreadPoolExecutor(3as executor:
        executor.map(get_officer_messages, range(1, len(fids) + 1), fids)
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()

主函数中先通过多线程获取领导所有留言,再合并所有数据文件,完成整个爬取过程,并统计整个程序的运行时间,便于分析运行效率。

三、结果、分析及说明

1.结果说明

3个完整代码和测试执行结果可点击https://download.csdn.net/download/CUFEECR/12199138下载,欢迎进行测试和交流学习,请勿滥用。整个执行过程相比于单线程大大缩短了时间,我选择了10个领导进行测试,它们的留言数量有差异,以便于发现多线程的优势,在云服务器中的运行结果分别如下运行时间缩短到不到1小时半左右,约等于第一篇单线程的三分之一,因为同一时刻有3个子线程执行,大大降低了运行时间,效率比之前提高很多,加入多线程之后,可以让运行时间较长和较短的相互补充,同时多个线程同时运行,在同一时刻爬取多个领导,很显然大大缩短了运行时间。最后得到了合并的DATA.csv:可以进一步总结多线程的优势 :

  • 易于调度
  • 提高并发性:通过线程可方便有效地实现并发性。进程可创建多个线程来执行同一程序的不同部分。
  • 开销少:创建线程比创建进程要快,所需开销很少。
  • 利于充分发挥多处理器的功能:通过创建多线程进程,每个线程在一个处理器上运行,从而实现应用程序的并发性,使每个处理器都得到充分运行。

2.改进分析

(1)该版本的代码仍未实现自动爬取所有的fid,需要手动保存,是其中一点不足,可以在后期改进。(2)爬取留言详情页仍然采用的selenium模拟,会降低请求效率,可以考虑用requests库请求。(3)该版本对于反爬的措施较弱,因此很多时候会出现异常,比如得到的页面不正常找不到对应的元素,请求时间延长等,可以在之后的版本加入进一步的防反爬措施,进一步增加代码的健壮性。

第三节:多进程版+selenium实战教程

一、项目概述

本项目主要是对领导留言板http://liuyan.people.com.cn/home?p=0内的所有留言的具体内容进行抓取,对留言详情、回复详情和评价详情进行提取保存,并用于之后的数据分析和进一步处理,可以对政府的决策和电子政务的实施提供依据。具体项目说明和环境配置可参考本系列的第一篇Python 爬取留言板留言(一):单进程版+selenium模拟
本篇在第二篇的基础上做了一个主要改进:
从多线程改变为多进程,设定同时运行的进程的数量为3,数量适中,这样在保证在同一时刻有多个进程在执行爬取的同时,也能避免进程过多对内存、CPU和网络带宽的高要求,从而大大降低了整体运行时间,这是该项目的主要改进。

二、项目实施

由于在实现过程中有2种常用的方法实现多线程,因此对应也有2种不同的具体实现。

1.导入所需要的库

import csv
import os
import random
import re
import time


import dateutil.parser as dparser
from random import choice
from multiprocessing import Pool
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.chrome.options import Options

主要导入在爬取过程中需要用到的处理库和selenium中要用到的类。

2.全局变量和参数配置

## 时间节点
start_date = dparser.parse('2019-06-01')
## 浏览器设置选项
chrome_options = Options()
chrome_options.add_argument('blink-settings=imagesEnabled=false')

我们假设只爬取2019.6.1以后的留言,因为这之前的留言自动给好评,没有参考价值,因此设置时间节点,并禁止网页加载图片,减少对网络的带宽要求、提升加载速率。

3.产生随机时间和用户代理

def get_time():
    '''获取随机时间'''
    return round(random.uniform(36), 1)


def get_user_agent():
    '''获取随机用户代理'''
    user_agents = [
        "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
        "Mozilla/5.0 (iPhone; CPU iPhone OS 9_1 like Mac OS X) AppleWebKit/601.1.46 (KHTML, like Gecko) Version/9.0 Mobile/13B143 Safari/601.1",
        "Mozilla/5.0 (Linux; Android 5.1.1; Nexus 6 Build/LYZ28E) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.23 Mobile Safari/537.36",
        "Mozilla/5.0 (iPod; U; CPU iPhone OS 2_1 like Mac OS X; ja-jp) AppleWebKit/525.18.1 (KHTML, like Gecko) Version/3.1.1 Mobile/5F137 Safari/525.20",
        "Mozilla/5.0 (Linux;u;Android 4.2.2;zh-cn;) AppleWebKit/534.46 (KHTML,like Gecko) Version/5.1 Mobile Safari/10600.6.3 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)",
        "Mozilla/5.0 (compatible; Baiduspider/2.0; +http://www.baidu.com/search/spider.html)"
    ]
    ## 在user_agent列表中随机产生一个代理,作为模拟的浏览器
    user_agent = choice(user_agents)
    return user_agent

产生随机时间并随机模拟浏览器用于访问网页,降低被服务器识别出是爬虫而被禁的可能。

4.获取领导的fid

def get_fid():
    '''获取所有领导id'''
    with open('url_fid.txt''r'as f:
        content = f.read()
        fids = content.split()
    return fids

每个领导都有一个fid用于区分,这里采用手动获取fid并保存到txt中,在开始爬取时再逐行读取。

5.获取领导所有留言链接

def get_detail_urls(position, list_url):
    '''获取每个领导的所有留言链接'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    drivertemp = webdriver.Chrome(options=chrome_options)
    drivertemp.maximize_window()
    drivertemp.get(list_url)
    time.sleep(2)
    ## 循环加载页面
    try:
        while WebDriverWait(drivertemp, 502).until(EC.element_to_be_clickable((By.ID, "show_more"))):
            datestr = WebDriverWait(drivertemp, 10).until(
                lambda driver: driver.find_element_by_xpath(
                    '//*[@id="list_content"]/li[position()=last()]/h3/span')).text.strip()
            datestr = re.search(r'\d{4}-\d{2}-\d{2}', datestr).group()
            date = dparser.parse(datestr, fuzzy=True)
            print('正在爬取链接 --', position, '--', date)
            if date < start_date:
                break
            ## 模拟点击加载
            drivertemp.find_element_by_xpath('//*[@id="show_more"]').click()
            time.sleep(get_time())
        detail_elements = drivertemp.find_elements_by_xpath('//*[@id="list_content"]/li/h2/b/a')
        ## 获取所有链接
        for element in detail_elements:
            detail_url = element.get_attribute('href')
            yield detail_url
        drivertemp.quit()
    except TimeoutException:
        drivertemp.quit()
        get_detail_urls(position, list_url)

根据第4步提供的fid找到一个领导对应的所有留言的链接,由于领导的留言列表并未一次显示完,下方有一个加载更多按钮,如下每次需要进行点击向下加载,所以要模拟点击的操作,向下滑动,等完全加载后再次点击,直到底部,有可能会滑倒页面最底部不再显示按钮或者由于被反爬或网络不好而未加载出来,此时定位元素会超时,增加异常处理,递归调用。
函数返回值时,不是一次返回一个列表,而是通过yield关键字生成生成器,按照程序执行的进度生成url,可以减少内存的压力。

6.获取留言详情

def get_message_detail(driver, detail_url, writer, position):
    '''获取留言详情'''
    print('正在爬取留言 --', position, '--', detail_url)
    driver.get(detail_url)
    ## 判断,如果没有评论则跳过
    try:
        satis_degree = WebDriverWait(driver, 2.5).until(
            lambda driver: driver.find_element_by_class_name("sec-score_firstspan")).text.strip()
    except:
        return
    ## 获取留言各部分内容
    message_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/h3/span")).text
    message_date = re.search(r'\d{4}-\d{2}-\d{2}', message_date_temp).group()
    message_datetime = dparser.parse(message_date, fuzzy=True)
    if message_datetime < start_date:
        return
    message_title = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_class_name("context-title-text")).text.strip()
    label_elements = WebDriverWait(driver, 2.5).until(lambda driver: driver.find_elements_by_class_name("domainType"))
    try:
        label1 = label_elements[0].text.strip()
        label2 = label_elements[1].text.strip()
    except:
        label1 = ''
        label2 = label_elements[0].text.strip()
    message_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[6]/p")).text.strip()
    replier = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[1]/i")).text.strip()
    reply_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/p")).text.strip()
    reply_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[1]/h3[2]/em")).text
    reply_date = re.search(r'\d{4}-\d{2}-\d{2}', reply_date_temp).group()
    review_scores = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_elements_by_xpath("/html/body/div[8]/ul/li[2]/h4[1]/span/span/span"))
    resolve_degree = review_scores[0].text.strip()[:-1]
    handle_atti = review_scores[1].text.strip()[:-1]
    handle_speed = review_scores[2].text.strip()[:-1]
    review_content = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/p")).text.strip()
    is_auto_review = '是' if (('自动默认好评' in review_content) or ('默认评价' in review_content)) else '否'
    review_date_temp = WebDriverWait(driver, 2.5).until(
        lambda driver: driver.find_element_by_xpath("/html/body/div[8]/ul/li[2]/h4[2]/em")).text
    review_date = re.search(r'\d{4}-\d{2}-\d{2}', review_date_temp).group()
    ## 存入CSV文件
    writer.writerow(
        [position, message_title, label1, label2, message_date, message_content, replier, reply_content, reply_date,
         satis_degree, resolve_degree, handle_atti, handle_speed, is_auto_review, review_content, review_date])

我们只需要有评论的留言,因此在最开始要过滤掉没有评论的留言。然后通过xpath、class_name等方式定位到相应的元素获取留言的各个部分的内容,每条留言共保存14个属性,并保存到csv中。

7.获取并保存领导所有留言

def get_officer_messages(index, fid):
    '''获取并保存领导的所有留言'''
    user_agent = get_user_agent()
    chrome_options.add_argument('user-agent=%s' % user_agent)
    driver = webdriver.Chrome(options=chrome_options)
    list_url = "http://liuyan.people.com.cn/threads/list?fid={}##state=4".format(fid)
    driver.get(list_url)
    try:
        position = WebDriverWait(driver, 10).until(
            lambda driver: driver.find_element_by_xpath("/html/body/div[4]/i")).text
        print(index, '-- 正在爬取 --', position)
        start_time = time.time()
        csv_name = position + '.csv'
        ## 文件存在则删除重新创建
        if os.path.exists(csv_name):
            os.remove(csv_name)
        with open(csv_name, 'a+', newline='', encoding='gb18030'as f:
            writer = csv.writer(f, dialect="excel")
            writer.writerow(
                ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
                 '办理速度分''是否自动好评''评价内容''评价日期'])
            for detail_url in get_detail_urls(position, list_url):
                get_message_detail(driver, detail_url, writer, position)
                time.sleep(get_time())
        end_time = time.time()
        crawl_time = int(end_time - start_time)
        crawl_minute = crawl_time // 60
        crawl_second = crawl_time % 60
        print(position, '已爬取结束!!!')
        print('该领导用时:{}分钟{}秒。'.format(crawl_minute, crawl_second))
        driver.quit()
        time.sleep(5)
    except:
        driver.quit()
        get_officer_messages(index, fid)

获取该领导的职位信息并为该领导创建一个独立的csv用于保存提取到的留言信息,增加异常处理递归调用,调用get_message_detail()方法获取每条留言的具体信息并保存,计算出每个领导的执行时间。

8.合并文件

def merge_csv():
    '''将所有文件合并'''
    file_list = os.listdir('.')
    csv_list = []
    for file in file_list:
        if file.endswith('.csv'):
            csv_list.append(file)
    ## 文件存在则删除重新创建
    if os.path.exists('DATA.csv'):
        os.remove('DATA.csv')
    with open('DATA.csv''a+', newline='', encoding='gb18030'as f:
        writer = csv.writer(f, dialect="excel")
        writer.writerow(
            ['职位姓名''留言标题''留言标签1''留言标签2''留言日期''留言内容''回复人''回复内容''回复日期''满意程度''解决程度分''办理态度分',
             '办理速度分''是否自动好评''评价内容''评价日期'])
        for csv_file in csv_list:
            with open(csv_file, 'r', encoding='gb18030'as csv_f:
                reader = csv.reader(csv_f)
                line_count = 0
                for line in reader:
                    line_count += 1
                    if line_count != 1:
                        writer.writerow(
                            (line[0], line[1], line[2], line[3], line[4], line[5], line[6], line[7], line[8],
                             line[9], line[10], line[11], line[12], line[13], line[14], line[15]))

将爬取的所有领导的数据进行合并。

9.主函数调用

多线程的实现主要在这部分,有2种方式实现:

  • 通过for循环遍历所有任务和参数,并调用apply_async()函数将任务加入进程池
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    ## 创建进程池
    pool = Pool(3)
    ## 将任务加入进程池并传入参数
    for index, fid in zip(range(1, len(fids) + 1), fids):
        pool.apply_async(get_officer_messages, (index, fid))
    pool.close()
    pool.join()
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()
  • 通过调用map()函数将任务加入进程池并实现函数与参数的映射
def main():
    '''主函数'''
    fids = get_fid()
    print('爬虫程序开始执行:')
    s_time = time.time()
    ## 处理传入的参数,使之对应索引合并并且可迭代
    itera_merge = list(zip(range(1, len(fids) + 1), fids))
    ## 创建进程池
    pool = Pool(3)
    ## 将任务传入进程池并通过映射传入参数
    pool.map(get_officer_messages_enc, itera_merge)
    print('爬虫程序执行结束!!!')
    print('开始合成文件:')
    merge_csv()
    print('文件合成结束!!!')
    e_time = time.time()
    c_time = int(e_time - s_time)
    c_minute = c_time // 60
    c_second = c_time % 60
    print('{}位领导共计用时:{}分钟{}秒。'.format(len(fids), c_minute, c_second))


if __name__ == '__main__':
    '''执行主函数'''
    main()

主函数中先通过多进程获取领导所有留言,再合并所有数据文件,完成整个爬取过程,并统计整个程序的运行时间,便于分析运行效率。

三、结果、分析及说明

1.结果说明

2个完整代码和测试执行结果可点击https://download.csdn.net/download/CUFEECR/12200752下载,欢迎进行测试和交流学习,请勿滥用
整个执行过程相比于单线程大大缩短了时间,我选择了10个领导进行测试,它们的留言数量有差异,以便于发现多线程的优势,在云服务器中的运行结果分别如下运行时间缩短到不到100分钟,与单进程相比大大缩短了时间、提高了效率,因为同一时刻有3个子进程执行。加入多进程之后,可以让运行时间较长和较短的相互补充,在任意时刻多个进程同时运行。但是也可以看出来与多线程相比,多进程的运行时间相对稍长,虽然差别不大,但是这可能就是性能的瓶颈。可能的原因是进程需要的资源更多,这对内存、CPU和网络的要求更高,从而对设备提出了更高的要求,有时可能设备性能跟不上程序要求,从而降低了效率。
最后得到了合并的DATA.csv:对于多线程和多进程的简单对比分析如下:
一个线程至少有一个进程,一个进程至少有一个线程,线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高,进程在执行过程中拥有独立的存储单元,而多个线程共享存储器,从而极大的提高了程序的运行效率。线程不能够独立执行,必须依存在进程中。

多线程:

  • 线程执行开销小(占用的资源非常少)但是不利于资源的管理和保护;
  • 如果需要共享数据,建议使用线程;
  • 适用于IO密集型任务(Web和文档读写等),遇到IO阻塞,速度远远小于CPU的运行速度,可以多开辟一些线程,有线程阻塞了,其他线程依然正常工作。

多进程:

  • 执行额开销比较大(占用的资源多),但是利于资源的管理和保护;
  • 适用于计算密集型(视频的译码编码和科学数据计算等)。

显然,在爬虫中应该偏向使用多线程。

2.改进分析

(1)该版本的代码仍未实现自动爬取所有的fid,需要手动保存,是其中一点不足,可以在后期改进。
(2)爬取留言详情页仍然采用的selenium模拟,会降低请求效率,可以考虑用requests库请求。
(3)该版本对于反爬的措施较弱,因此很多时候会出现异常,比如得到的页面不正常找不到对应的元素,请求时间延长等,可以在之后的版本加入进一步的防反爬措施,进一步增加代码的健壮性。

3.合法性说明

  • 本项目是为了学习和科研的目的,所有读者可以参考执行思路和程序代码,但不能用于恶意和非法目的(恶意攻击网站服务器、非法盈利等),如有违者请自行负责。
  • 本项目所获取的数据都是在进一步的分析之后用于对电子政务的实施改进,对政府的决策能起到一定的参考作用,并非于恶意抓取数据来攫取不正当竞争的优势,也未用于商业目的牟取不法利益,运行代码只是用几个fid进行测试,并非大范围地爬取,同时严格控制爬取的速率、争取不对服务器造成压力,如侵犯当事者(即被抓取的网络主体)的利益,请联系更改或删除。
  • 本项目是留言板爬取系列的第二篇,后期会继续更新,欢迎读者交流,以期不断改进。

作者:Corley

源自:快学python

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Prometheus 实战教程 + Grafana + Python — 实时监控东方财富人气榜股票

上次我们讲过普罗米修斯(prometheus)这个接近完美的监控系统,有很多读者不了解它到底要如何搭建、应用,需要一篇 Prometheus 实战教程。今天我们就结合普罗米修斯、Grafana和Python采集脚本,写一个小小的东方财富人气榜 TOP100监控系统。

跟着本文的教程耐心往下走,你可能只需要花30分钟便可完成环境的搭建,非常舒服,下面先介绍基本概念。

普罗米修斯(prometheus)上次我们已经使用一整篇文章介绍过了,它是一个开源监控报警系统和时序列数据库。如果你没有阅读过这篇文章,请花五分钟读一下:

Grafana 是一个开源的数据可视化网络应用程序平台。用户配置连接的数据源之后,Grafana可以在网络浏览器里显示数据图表和警告。

比如说我基于 普罗米修斯(prometheus) + node_exporter 监控主机性能指标,然后由Grafana构建主机实时监控仪表盘,它是长这样的:

至于东方财富人气榜,指的是这个:

它能将市场目前最活跃的一些股票提取出来,可供我们作为投资的一种参考。

而我们今天要做的,就是自己搭建一套监控系统,实时监控某只股票在TOP100上的排名变化。

1.Prometheus 安装教程

创建 Prometheus 安装目录并添加 promethus 用户:

PROM_PATH='/data/prometheus'
mkdir -p ${PROM_PATH}
mkdir -p ${PROM_PATH}/{data,conf,logs,bin}
useradd prometheus
cd /usr/local/src

下载解压 prometheus, 这里我们选用2021年5月18日更新的最新版 v2.27.1:

wget https://github.com/prometheus/prometheus/releases/download/v2.27.1/prometheus-2.27.1.linux-amd64.tar.gz
tar -xvf prometheus-2.27.1.linux-amd64.tar.gz
cd prometheus-2.27.1.linux-amd64/
cp prometheus promtool ${PROM_PATH}/bin/
cp prometheus.yml ${PROM_PATH}/conf/
chown -R prometheus.prometheus /data/prometheus

设置环境变量:

cat >> /etc/profile <<EOF
PATH=/data/prometheus/bin:$PATH:$HOME/bin
EOF

将 Promethus 配置为系统服务之一,以便使用 systemctl 命令管控服务:

cat >>/etc/systemd/system/prometheus.service <<EOF
[Unit]
Description=Prometheus
Documentation=https://prometheus.io/
After=network.target

[Service]
Type=simple
User=prometheus
ExecStart=/data/prometheus/bin/prometheus --config.file=/data/prometheus/conf/prometheus.yml --storage.tsdb.path=/data/prometheus/data --storage.tsdb.retention=90d
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

现在使用下面的systemctl命令重新加载systemd系统,并查看服务是否启动:

systemctl daemon-reload
systemctl enable prometheus
systemctl start prometheus
systemctl status prometheus

看到 running 状态说明一切正常:

记得开放9090端口,这样才可以访问 Prometheus 的 Web 端,访问 http://服务器IP:9090 查看得到 Prometheus Web界面,说明安装成功:

2.Grafana 安装教程

Grafana 我们也使用最新的 8.0.1 版本,安装方式如下:

CentOS系列系统使用以下命令安装:

cd /usr/local/src
wget https://dl.grafana.com/oss/release/grafana-8.0.1-1.x86_64.rpm
sudo yum localinstall grafana-6.5.2-1.x86_64.rpm

Ubuntu和Debian系列系统使用以下命令安装:

cd /usr/local/src
sudo apt-get install -y adduser libfontconfig1
wget https://dl.grafana.com/oss/release/grafana_8.0.1_amd64.deb
sudo dpkg -i grafana_8.0.1_amd64.deb

然后启动系统服务即可:

systemctl start grafana-server
systemctl status grafana-server

看到 running 状态说明一切正常:

记得开放3000端口,这样你才可以访问你的Grafana: http://你的服务器IP:3000 如下所示:

输入用户名,密码登录系统。用户名与密码都是”admin”,如果能打开页面则已经安装成功了。

3.初尝Grafana+Prometheus实战教程

为了初步尝试这套系统,我们可以通过简单的采集主机性能数据开始。Node_exporter是一个Prometheus推出的官方主机性能采集工具。通过它我们能很方便地输出主机性能指标到Prometheus.

3.1 下载安装Node_Exporter:

NODE_PATH='/data/prometheus/node_exporter/'
cd /usr/local/src/
mkdir -p ${NODE_PATH}
wget https://github.com/prometheus/node_exporter/releases/download/v1.1.2/node_exporter-1.1.2.linux-amd64.tar.gz
tar -xvf node_exporter-1.1.2.linux-amd64.tar.gz
cp node_exporter-1.1.2.linux-amd64/node_exporter ${NODE_PATH}
chown -R prometheus.prometheus ${NODE_PATH}

配置node_exporter为系统服务:

cat > /lib/systemd/system/node_exporter.service <<EOF
[Unit]
Description=node_exporter
Documentation=https://prometheus.io/
After=network.target
 
[Service]
Type=simple
User=prometheus
ExecStart=/data/prometheus/node_exporter/node_exporter
Restart=on-failure
 
[Install]
WantedBy=multi-user.target
EOF

现在使用systemctl命令重新加载系统命令,并查看服务是否启动:

systemctl daemon-reload
systemctl enable node_exporter
systemctl start node_exporter
systemctl status node_exporter

看到如下图的状态说明启动成功。

放行9100端口,访问http://你的服务器地址:9100/metrics 看到如下指标页面说明安装成功:

配置 prometheus.yaml (ubuntu 下为 prometheus.yml), 让 prometheus 采集 node_exporter 输出的指标数据:

vim /data/prometheus/conf/prometheus.yml

配置如下:

# my global config
global:
  scrape_interval:     15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
  alertmanagers:
  - static_configs:
    - targets:
      # - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'prometheus'

    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.

    static_configs:
    - targets: ['localhost:9090']

   # 主要是新增了node_exporter的job,如果有多个node_exporter,在targets数组后面加即可

  - job_name: 'node_exporter'
    static_configs:
      - targets: ['localhost:9100']

保存后重启prometheus:

systemctl restart prometheus

最后配置Grafana:

然后选择 Prometheus 数据源:

输入 Prometheus url 然后点击 save&test 保存:

然后导入官方仪表盘,官方提供的模板号为8919:

然后你就能看见本机非常漂亮的性能指标数据仪表盘了。

不看不知道,一看吓一跳,看来我需要升级这台机器的内存了。

4.编写采集脚本

为了能够采集东方财富人气榜前100名,我们需要用Python编写一个人气榜采集脚本,并使其像 node_exporter 一样输出指标信息:

为了达到这个目的,我们必须安装 prometheus_client 模块:

pip3 install prometheus_client

获取股票排名的代码如下:

# Python实用宝典
# 2021-06-13
# 文件名: fetch_stock.py
import time
import requests
from prometheus_client import start_http_server, CollectorRegistry, Gauge


reg = CollectorRegistry()
gauge = Gauge(
    'rank', '人气榜排名',
    ['stock_id'], registry=reg
)


def process_request():
    url = "https://emappdata.eastmoney.com/stockrank/getAllCurrentList"
    kwargs = {
        "appId": "appId01",
        "pageNo": 1,
        "pageSize": "100",
    }
    result = requests.post(url, json=kwargs).json()
    for i in result.get("data", []):
        gauge.labels(stock_id=i["sc"]).set(i["rk"])
    time.sleep(60)


if __name__ == '__main__':
    start_http_server(8000, registry=reg)
    while True:
        process_request()

这里我们只捕获人气榜前100名,并通过Prometheus客户端的start_http_server开启一个Web服务,这样你通过http服务访问8000端口的时候就能输出这些指标。

为了让其能持续输出指标数据,我们要用nohup使其成为一个常驻进程:

nohup python3 fetch_stock.py &

开放8000端口,访问 http://你的服务器IP:8000 就能查看输出的指标:

5.应用采集脚本

同配置Node_exporter一样,我们需要将自己编写好的采集脚本落入Prometheus,配置prometheus.yaml:

配置 prometheus.yaml, 让 prometheus 采集 node_exporter 输出的指标数据:

#(CentOS) vim /data/prometheus/conf/prometheus.yaml
vim /data/prometheus/conf/prometheus.yml # ubuntu

配置如下:

# my global config
global:
  scrape_interval:     15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
  evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
  # scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
  alertmanagers:
  - static_configs:
    - targets:
      # - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
  # - "first_rules.yml"
  # - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: 'prometheus'

    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.

    static_configs:
    - targets: ['localhost:9090']

   # 主要是新增了node_exporter的job,如果有多个node_exporter,在targets数组后面加即可
  - job_name: 'node_exporter'
    static_configs:
      - targets: ['localhost:9100']

   # 新增我们的Python股票采集脚本
  - job_name: 'hot_list'
    static_configs:
      - targets: ['localhost:8000']

保存后重启prometheus:

systemctl restart prometheus

最后配置Grafana, 选择新建一个dashboard:

然后选择rank指标:

点击 Use query 就能获取所有股票的排名曲线:

6.配置Grafana告警

为了在某只股票达到某种排名的时候触发通知,我们需要先配置好告警渠道:

然后配置邮件告警,点击 Test, 此时 Grafana 会告诉你一个错误:

就是我们还没有配置好 SMTP 相关服务,需要配置 SMTP 相关服务才能正常发送邮件,如果你是按照本文按照Grafana的教程走下来的,那么Grafana.ini的文件位于 /etc/grafana/grafana.ini.

vim /etc/grafana/grafana.ini

然后在 smtp 部分配置你的 host、user、password、from_address、from_name,并打开 enabled 如下图所示:

然后重启 Grafana-server

systemctl restart grafana-server

再点击Test,你的邮箱里收到这样的邮件说明通知可以正常发送了:

然后我们进入正题,监控某只股票的排名变化,比如 SH600070:

然后点击 Alert 配置告警,一旦其排名高于65名则发送邮件通知:

完成后点击右上角的 save 保存即可:

然后进入 Alerting 告警中心,你会看到刚刚配置的告警规则在这里可以进行管控:

点击Pause可以暂停这个告警,Edit alert可以去更改告警条件。

一旦触发告警,这个状态便会更改,你就会收到邮件:

邮件效果如下:

邮件里的告警图片没显示出来,因为我们没有安装 “grafana image renderer”, 需要在你的服务器执行以下命令安装并重启 Grafana:

grafana-cli plugins install grafana-image-renderer
systemctl restart grafana-server

新的告警邮件便能看到图片了:

怎么样,用Prometheus+Grafana+Python采集搭建一个股票监控系统还是非常简单的吧?创新性地监控东方财富人气榜上某只股票的变化并产生告警,能让你熟悉监控策略的配置,见微知著。跟着本文的教程走,相信你会有不少收获。

如果我们延伸一下,结合量化投资系列教程的可转债交易策略 — Python 量化投资实战教程(10),是否可以构建一些更有意义的策略?答案是肯定的。

我们可以监控所有100元以下的可转债对应的股票,如果这些股票进入了人气榜TOP100或者飙升榜(本文没有采集,有兴趣的读者可以自行采集),就购入这些低价可转债,这种买入策略或许也不错。

你也可以抛弃东方财富的榜单分类,构建自己的排名环比增长买入策略,环比下跌卖出策略,我相信这会非常有意思。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

抓包神器 MitmProxy 使用教程 – 结合Python

本文是一个较为完整的 mitmproxy教程,侧重于介绍如何开发拦截脚本,帮助读者能够快速得到一个自定义的代理工具。玩爬虫的小伙伴都知道,抓包工具除了MitmProxy 外,还有 FiddlerCharles以及浏览器 netwrok 

既然都有这么多抓包工具了,为什么还要会用 MitmProxy 呢??主要有以下几点:

  1. 不需要安装软件,直接在线(浏览器)进行抓包(包括手机端和 PC 端)
  2. 配合 Python 脚本抓包改包
  3. 抓包过程的所有数据包都可以自动保留到txt里面,方便过滤分析
  4. 使用相对简单,易上手

本文假设读者有基本的 python 知识,且已经安装好了一个 python 3 开发环境。如果你对 nodejs 的熟悉程度大于对 python,可移步到 anyproxy,anyproxy 的功能与 mitmproxy 基本一致,但使用 js 编写定制脚本。除此之外我就不知道有什么其他类似的工具了,如果你知道,欢迎评论告诉我。

本文基于 mitmproxy v5,当前版本号为 v5.0.1

1.基本介绍

顾名思义,mitmproxy 就是用于 MITM 的 proxy,MITM 即中间人攻击(Man-in-the-middle attack)。用于中间人攻击的代理首先会向正常的代理一样转发请求,保障服务端与客户端的通信,其次,会适时的查、记录其截获的数据,或篡改数据,引发服务端或客户端特定的行为。

不同于 fiddler 或 wireshark 等抓包工具,mitmproxy 不仅可以截获请求帮助开发者查看、分析,更可以通过自定义脚本进行二次开发。举例来说,利用 fiddler 可以过滤出浏览器对某个特定 url 的请求,并查看、分析其数据,但实现不了高度定制化的需求,类似于:“截获对浏览器对该 url 的请求,将返回内容置空,并将真实的返回内容存到某个数据库,出现异常时发出邮件通知”。而对于 mitmproxy,这样的需求可以通过载入自定义 python 脚本轻松实现。

但 mitmproxy 并不会真的对无辜的人发起中间人攻击,由于 mitmproxy 工作在 HTTP 层,而当前 HTTPS 的普及让客户端拥有了检测并规避中间人攻击的能力,所以要让 mitmproxy 能够正常工作,必须要让客户端(APP 或浏览器)主动信任 mitmproxy 的 SSL 证书,或忽略证书异常,这也就意味着 APP 或浏览器是属于开发者本人的——显而易见,这不是在做黑产,而是在做开发或测试。

事实上,以上说的仅是 mitmproxy 以正向代理模式工作的情况,通过调整配置,mitmproxy 还可以作为透明代理、反向代理、上游代理、SOCKS 代理等,但这些工作模式针对 mitmproxy 来说似乎不大常用,故本文仅讨论正向代理模式。

2.特性

  • 拦截HTTP和HTTPS请求和响应并即时修改它们
  • 保存完整的HTTP对话以供以之后重发和分析
  • 重发HTTP对话的客户端
  • 重发先前记录的服务的HTTP响应
  • 反向代理模式将流量转发到指定的服务器
  • 在macOS和Linux上实现透明代理模式
  • 使用Python对HTTP流量进行脚本化修改
  • 实时生成用于拦截的SSL / TLS证书
  • And much, much more…

3.配置 MitmProxy

MitmProxy可以说是客户端,也可以说是一个 python 库

方式一:客户端

  • https://mitmproxy.org/downloads/

在这个地址下可以下载对应的客户端安装即可

方式二:Python库

  • pip install mitmproxy

通过这个 pip 命令可以下载好 MitmProxy,下面将会以 Python 库的使用方式给大家讲解如何使用(推荐方式二

2.启动 MitmProxy

MitmProxy 启动有三个命令(三种模式)

  1. mitmproxy,提供命令行界面

  2. mitmdump,提供一个简单的终端输出(还可以配合Python抓包改包

  3. mitmweb,提供在线浏览器抓包界面

mitmdump启动

  • mitmdump -w d://lyc.txt

这样就启动mitmdump,接着在本地设置代理Ip是本机IP,端口8080

安装证书

访问下面这个链接

  • http://mitm.it/

 

可以选择自己的设备(window,或者Android、Apple设备去)安装证书。

然后随便打开一个网页,比如百度

这里是因为证书问题,提示访问百度提示https证书不安全,那么下面开始解决这个问题,因此就引出了下面的这种启动方式

浏览器代理式启动

哪一个浏览器都可以,下面以Chrome浏览器为例(其他浏览器操作一样)

先找到chrome浏览器位置,我的chrome浏览器位置如下图

通过下面命令启动

  • "C:\Users\Administrator\AppData\Local\Google\Chrome\Application\chrome.exe" --proxy-server=127.0.0.1:8080 --ignore-certificate-errors

proxy-server是设置代理和端口

–ignore-certificate-errors是忽略证书

然后会弹出来Chrome浏览器,接着我们搜索知乎

在cmd中就可以看到数据包

这些文本数据可以在编程中进行相应的操作,比如可以放到python中进行过来监听处理

4.启动 Mitmweb

新开一个cmd(终端)窗口,输入下来命令启动mitmweb

  • mitmweb

之后会在浏览器自动打开一个网页(其实手动打开也可以,地址就是:http://127.0.0.1:8081)

现在页面中什么也没有,那下面我们在刷新一个知乎页面

重点:关闭mitmproxy终端!关闭mitmproxy终端!关闭mitmproxy终端!

如果不改变在mitmweb中获取不到数据,数据只在mitmproxy中,因此需要关闭mitmproxy这个命令终端

刷新知乎页面之后如下:

在刚刚的网页版抓包页面就可以看到数据包了

并且还包括https类型,比如查看其中一个数据包,找到数据是对应的,说明抓包成功。

5.配合 Python 脚本

mitmproxy代理(抓包)工具最强大之处在于对python脚步的支持(可以在python代码中直接处理数据包)

下面开始演示,先新建一个py文件(lyc.py)

from mitmproxy import ctx

# 所有发出的请求数据包都会被这个方法所处理
# 所谓的处理,我们这里只是打印一下一些项;当然可以修改这些项的值直接给这些项赋值即可
def request(flow):
    # 获取请求对象
    request = flow.request
    # 实例化输出类
    info = ctx.log.info
    # 打印请求的url
    info(request.url)
    # 打印请求方法
    info(request.method)
    # 打印host头
    info(request.host)
    # 打印请求端口
    info(str(request.port))
    # 打印所有请求头部
    info(str(request.headers))
    # 打印cookie头
    info(str(request.cookies))
# 所有服务器响应的数据包都会被这个方法处理
# 所谓的处理,我们这里只是打印一下一些项
def response(flow):
    # 获取响应对象
    response = flow.response
    # 实例化输出类
    info = ctx.log.info
    # 打印响应码
    info(str(response.status_code))
    # 打印所有头部
    info(str(response.headers))
    # 打印cookie头部
    info(str(response.cookies))
    # 打印响应报文内容
    info(str(response.text))

在终端中输入一下命令启动

mitmdump.exe -s lyc.py

(PS:这里需要通过另一个端启动浏览器)

"C:\Users\Administrator\AppData\Local\Google\Chrome\Application\chrome.exe" --proxy-server=127.0.0.1:8080 --ignore-certificate-errors

然后访问某个网站

在终端中就可以看到信息

这些信息就是我们在 lyc.py 中指定的显示信息

PS:在手机上配置好代理之后,mitmproxy 同样可以抓取手机端数据

小结:

  1. 不需要安装软件,直接在线(浏览器)进行抓包(包括手机端和 PC 端)
  2. 配合 Python 脚本抓包改包
  3. 抓包过程的所有数据包都可以自动保留到 txt 里面,方便过滤分析
  4. 使用相对简单,易上手
本文转自星安果。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 单线程、多线程和协程的爬虫性能对比

单线程、多线程和协程的爬虫性能对比

今天我要给大家分享的是如何爬取豆瓣上深圳近期即将上映的电影影讯,并分别用普通的单线程、多线程和协程来爬取,从而对比单线程、多线程和协程在网络爬虫中的性能。

具体要爬的网址是:https://movie.douban.com/cinema/later/shenzhen/

除了要爬入口页以外还需爬取每个电影的详情页,具体要爬取的结构信息如下:

1.基础爬取测试

下面我演示使用xpath解析数据。

入口页数据读取:

import requests
from lxml import etree
import pandas as pd
import re

main_url = "https://movie.douban.com/cinema/later/shenzhen/"
headers = {
    "Accept-Encoding""Gzip",
    "User-Agent""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"
}
r = requests.get(main_url, headers=headers)
r

结果:

<Response [200]>

检查一下所需数据的xpath:

可以看到每个电影信息都位于id为showing-soon下面的div里面,再分别分析内部的电影名称、url和想看人数所处的位置,于是可以写出如下代码:

html = etree.HTML(r.text)
all_movies = html.xpath("//div[@id='showing-soon']/div")
result = []
for e in all_movies:
    #  imgurl, = e.xpath(".//img/@src")
    name, = e.xpath(".//div[@class='intro']/h3/a/text()")
    url, = e.xpath(".//div[@class='intro']/h3/a/@href")
    # date, movie_type, pos = e.xpath(".//div[@class='intro']/ul/li[@class='dt']/text()")
    like_num, = e.xpath(
        ".//div[@class='intro']/ul/li[@class='dt last']/span/text()")
    result.append((name, int(like_num[:like_num.find("人")]), url))
main_df = pd.DataFrame(result, columns=["影名""想看人数""url"])
main_df

结果:

然后再选择一个详情页的url进行测试,我选择了熊出没·狂野大陆这部电影,因为文本数据相对最复杂,也最具备代表性:

url = main_df.at[17"url"]
url

结果:

'https://movie.douban.com/subject/34825886/'

分析详情页结构:

文本信息都在这个位置中,下面我们直接提取这个div下面的所有文本节点:

r = requests.get(url, headers=headers)
html = etree.HTML(r.text)
movie_infos = html.xpath("//div[@id='info']//text()")
print(movie_infos)

结果:

['\n        ''导演'': ''丁亮''\n        ''编剧'': ''徐芸'' / ''崔铁志'' / ''张宇''\n        ''主演'': ''张伟'' / ''张秉君'' / ''谭笑''\n        ''类型:'' ''喜剧'' / ''科幻'' / ''动画''\n        \n        ''制片国家/地区:'' 中国大陆''\n        ''语言:'' 汉语普通话''\n        ''上映日期:'' ''2021-02-12(中国大陆)'' / ''2020-08-01(上海电影节)''\n        ''片长:'' ''100分钟''\n        ''又名:'' 熊出没大电影7 / 熊出没科幻大电影 / Boonie Bears: The Wild Life''\n        ''IMDb链接:'' ''tt11654032''\n\n']

为了阅读方便,拼接一下:

movie_info_txt = "".join(movie_infos)
print(movie_info_txt)

结果:

        导演: 丁亮
        编剧: 徐芸 / 崔铁志 / 张宇
        主演: 张伟 / 张秉君 / 谭笑
        类型: 喜剧 / 科幻 / 动画
        
        制片国家/地区: 中国大陆
        语言: 汉语普通话
        上映日期: 2021-02-12(中国大陆) / 2020-08-01(上海电影节)
        片长: 100分钟
        又名: 熊出没大电影7 / 熊出没科幻大电影 / Boonie Bears: The Wild Life
        IMDb链接: tt11654032

接下来就简单了:

row = {}
for line in re.split("[\n ]*\n[\n ]*", movie_info_txt):
    line = line.strip()
    arr = line.split(": ", maxsplit=1)
    if len(arr) != 2:
        continue
    k, v = arr
    row[k] = v
row

结果:

{'导演''丁亮',
 '编剧''徐芸 / 崔铁志 / 张宇',
 '主演''张伟 / 张秉君 / 谭笑',
 '类型''喜剧 / 科幻 / 动画',
 '制片国家/地区''中国大陆',
 '语言''汉语普通话',
 '上映日期''2021-02-12(中国大陆) / 2020-08-01(上海电影节)',
 '片长''100分钟',
 '又名''熊出没大电影7 / 熊出没科幻大电影 / Boonie Bears: The Wild Life',
 'IMDb链接''tt11654032'}

可以看到成功的切割出了每一项。

下面根据上面的测试基础,我们完善整体的爬虫代码:

2.单线程 爬虫性能

import requests
from lxml import etree
import pandas as pd
import re

main_url = "https://movie.douban.com/cinema/later/shenzhen/"
headers = {
    "Accept-Encoding""Gzip",
    "User-Agent""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"
}
r = requests.get(main_url, headers=headers)
html = etree.HTML(r.text)
all_movies = html.xpath("//div[@id='showing-soon']/div")
result = []
for e in all_movies:
    imgurl, = e.xpath(".//img/@src")
    name, = e.xpath(".//div[@class='intro']/h3/a/text()")
    url, = e.xpath(".//div[@class='intro']/h3/a/@href")
    print(url)
#     date, movie_type, pos = e.xpath(".//div[@class='intro']/ul/li[@class='dt']/text()")
    like_num, = e.xpath(
        ".//div[@class='intro']/ul/li[@class='dt last']/span/text()")
    r = requests.get(url, headers=headers)
    html = etree.HTML(r.text)
    row = {}
    row["电影名称"] = name
    for line in re.split("[\n ]*\n[\n ]*""".join(html.xpath("//div[@id='info']//text()")).strip()):
        line = line.strip()
        arr = line.split(": ", maxsplit=1)
        if len(arr) != 2:
            continue
        k, v = arr
        row[k] = v
    row["想看人数"] = int(like_num[:like_num.find("人")])
#     row["url"] = url
#     row["图片地址"] = imgurl
#     print(row)
    result.append(row)
df = pd.DataFrame(result)
df.sort_values("想看人数", ascending=False, inplace=True)
df.to_csv("shenzhen_movie.csv", index=False)

结果:

https://movie.douban.com/subject/26752564/
https://movie.douban.com/subject/35172699/
https://movie.douban.com/subject/34992142/
https://movie.douban.com/subject/30349667/
https://movie.douban.com/subject/30283209/
https://movie.douban.com/subject/33457717/
https://movie.douban.com/subject/30487738/
https://movie.douban.com/subject/35068230/
https://movie.douban.com/subject/27039358/
https://movie.douban.com/subject/30205667/
https://movie.douban.com/subject/30476403/
https://movie.douban.com/subject/30154423/
https://movie.douban.com/subject/27619748/
https://movie.douban.com/subject/26826330/
https://movie.douban.com/subject/26935283/
https://movie.douban.com/subject/34841067/
https://movie.douban.com/subject/34880302/
https://movie.douban.com/subject/34825886/
https://movie.douban.com/subject/34779692/
https://movie.douban.com/subject/35154209/

爬到的文件:

整体耗时:

42.5 秒

3.多线程 爬虫性能

单线程的爬取耗时还是挺长的,下面看看使用多线程的爬取效率:

import requests
from lxml import etree
import pandas as pd
import re
from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED


def fetch_content(url):
    print(url)
    headers = {
        "Accept-Encoding""Gzip",  # 使用gzip压缩传输数据让访问更快
        "User-Agent""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"
    }
    r = requests.get(url, headers=headers)
    return r.text


url = "https://movie.douban.com/cinema/later/shenzhen/"
init_page = fetch_content(url)
html = etree.HTML(init_page)
all_movies = html.xpath("//div[@id='showing-soon']/div")
result = []
for e in all_movies:
#     imgurl, = e.xpath(".//img/@src")
    name, = e.xpath(".//div[@class='intro']/h3/a/text()")
    url, = e.xpath(".//div[@class='intro']/h3/a/@href")
#     date, movie_type, pos = e.xpath(".//div[@class='intro']/ul/li[@class='dt']/text()")
    like_num, = e.xpath(
        ".//div[@class='intro']/ul/li[@class='dt last']/span/text()")
    result.append((name, int(like_num[:like_num.find("人")]), url))
main_df = pd.DataFrame(result, columns=["影名""想看人数""url"])

max_workers = main_df.shape[0]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
    future_tasks = [executor.submit(fetch_content, url) for url in main_df.url]
    wait(future_tasks, return_when=ALL_COMPLETED)
    pages = [future.result() for future in future_tasks]

result = []
for url, html_text in zip(main_df.url, pages):
    html = etree.HTML(html_text)
    row = {}
    for line in re.split("[\n ]*\n[\n ]*""".join(html.xpath("//div[@id='info']//text()")).strip()):
        line = line.strip()
        arr = line.split(": ", maxsplit=1)
        if len(arr) != 2:
            continue
        k, v = arr
        row[k] = v
    row["url"] = url
    result.append(row)
detail_df = pd.DataFrame(result)
df = main_df.merge(detail_df, on="url")
df.drop(columns=["url"], inplace=True)
df.sort_values("想看人数", ascending=False, inplace=True)
df.to_csv("shenzhen_movie2.csv", index=False)
df

结果:

耗时 8 秒

由于每个子页面都是单独的线程爬取,每个线程几乎都是同时在工作,所以最终耗时仅取决于爬取最慢的子页面

4.协程 异步爬虫性能

由于我在jupyter中运行,为了使协程能够直接在jupyter中直接运行,所以我在代码中增加了下面两行代码,在普通编辑器里面可以去掉:

import nest_asyncio
nest_asyncio.apply()

这个问题是因为jupyter所依赖的高版本Tornado存在bug,将Tornado退回到低版本也可以解决这个问题。

下面我使用协程来完成这个需求的爬取:

import aiohttp
from lxml import etree
import pandas as pd
import re
import asyncio
import nest_asyncio
nest_asyncio.apply()


async def fetch_content(url):
    print(url)
    header = {
        "Accept-Encoding""Gzip",  # 使用gzip压缩传输数据让访问更快
        "User-Agent""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"
    }
    async with aiohttp.ClientSession(
        headers=header, connector=aiohttp.TCPConnector(ssl=False)
    ) as session:
        async with session.get(url) as response:
            return await response.text()


async def main():
    url = "https://movie.douban.com/cinema/later/shenzhen/"
    init_page = await fetch_content(url)
    html = etree.HTML(init_page)
    all_movies = html.xpath("//div[@id='showing-soon']/div")
    result = []
    for e in all_movies:
        #         imgurl, = e.xpath(".//img/@src")
        name, = e.xpath(".//div[@class='intro']/h3/a/text()")
        url, = e.xpath(".//div[@class='intro']/h3/a/@href")
    #     date, movie_type, pos = e.xpath(".//div[@class='intro']/ul/li[@class='dt']/text()")
        like_num, = e.xpath(
            ".//div[@class='intro']/ul/li[@class='dt last']/span/text()")
        result.append((name, int(like_num[:like_num.find("人")]), url))
    main_df = pd.DataFrame(result, columns=["影名""想看人数""url"])

    tasks = [fetch_content(url) for url in main_df.url]
    pages = await asyncio.gather(*tasks)

    result = []
    for url, html_text in zip(main_df.url, pages):
        html = etree.HTML(html_text)
        row = {}
        for line in re.split("[\n ]*\n[\n ]*""".join(html.xpath("//div[@id='info']//text()")).strip()):
            line = line.strip()
            arr = line.split(": ", maxsplit=1)
            if len(arr) != 2:
                continue
            k, v = arr
            row[k] = v
        row["url"] = url
        result.append(row)
    detail_df = pd.DataFrame(result)
    df = main_df.merge(detail_df, on="url")
    df.drop(columns=["url"], inplace=True)
    df.sort_values("想看人数", ascending=False, inplace=True)
    return df

df = asyncio.run(main())
df.to_csv("shenzhen_movie3.csv", index=False)
df

结果:

耗时仅 7 秒,相对比多线程更快一点

由于request库不支持协程,所以我使用了支持协程的aiohttp进行页面抓取

当然实际爬取的耗时还取绝于当时的网络,但整体来说,协程爬取会比多线程爬虫稍微快一些

5.性能对比回顾

今天我向你演示了,单线程爬虫、多线程爬虫和协程爬虫

可以看到,一般情况下协程爬虫速度最快,多线程爬虫略慢一点,单线程爬虫则必须上一个页面爬取完成才能继续爬取。

但协程爬虫相对来说并不是那么好编写,数据抓取无法使用request库,只能使用aiohttp

所以在实际编写爬虫时,我们一般都会使用多线程爬虫来提速,但必须注意的是网站都有ip访问频率限制,爬的过快可能会被封ip,所以一般我们在多线程提速的同时使用代理ip来并发的爬取数据

6.彩蛋:xpath+pandas解析表格并提取url

我们在深圳影的底部能够看到一个[查看全部即将上映的影片] (https://movie.douban.com/coming)的按钮,点进去能够看到一张完整近期上映电影的列表,发现这个列表是个table标签的数据:

那就简单了,解析table我们可能压根就不需要用xpath,直接用pandas即可,但片名中包含的url地址还需解析,所以我采用xpath+pandas来解析这个网页,看看我的代码吧:

import pandas as pd
import requests
from lxml import etree

headers = {
    "Accept-Encoding""Gzip",
    "User-Agent""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"
}
r = requests.get("https://movie.douban.com/coming", headers=headers)
html = etree.HTML(r.text)
table_tag = html.xpath("//table")[0]
df, = pd.read_html(etree.tostring(table_tag))
urls = table_tag.xpath(".//td[2]/a/@href")
df["url"] = urls
df

结果

这样就能到了主页面的完整数据,再简单的处理一下即可

结语

感谢各位读者,有什么想法和收获欢迎留言评论噢!

本文转自AirPython.

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 3000+上市公司信息爬虫实战教程

入门爬虫很容易,几行代码就可以,可以说是学习 Python 最简单的途径。本实战教程便是教你怎么入门爬虫。

刚开始动手写爬虫,你只需要关注最核心的部分,也就是先成功抓到数据,其他的诸如:下载速度、存储方式、代码条理性等先不管,这样的代码简短易懂、容易上手,能够增强信心。

基本环境配置

  • 版本:Python3
  • 系统:Windows
  • 相关模块:pandas、csv

上市公司信息 – 爬取目标网站

目标地址:中商官网

爬虫实战教程 – 实现代码

import pandas as pd
import csv
for i in range(1,178):  # 爬取全部页
    tb = pd.read_html('http://s.askci.com/stock/a/?reportTime=2017-12-31&pageNum=%s' % (str(i)))[3
    tb.to_csv(r'1.csv', mode='a', encoding='utf_8_sig', header=1, index=0)

3000+ 上市公司的信息,安安静静地躺在 Excel 中:

有了上面的信心后,我开始继续完善代码,因为 5 行代码太单薄,功能也太简单,大致从以下几个方面进行了完善:

增加异常处理

由于爬取上百页的网页,中途很可能由于各种问题导致爬取失败,所以增加了 try except 、if 等语句,来处理可能出现的异常,让代码更健壮。

增加代码灵活性

初版代码由于固定了 URL 参数,所以只能爬取固定的内容,但是人的想法是多变的,一会儿想爬这个一会儿可能又需要那个,所以可以通过修改 URL 请求参数,来增加代码灵活性,从而爬取更灵活的数据。

修改存储方式

初版代码我选择了存储到 Excel 这种最为熟悉简单的方式,人是一种惰性动物,很难离开自己的舒适区。但是为了学习新知识,所以我选择将数据存储到 MySQL 中,以便练习 MySQL 的使用。

加快爬取速度

初版代码使用了最简单的单进程爬取方式,爬取速度比较慢,考虑到网页数量比较大,所以修改为了多进程的爬取方式。

经过以上这几点的完善,代码量从原先的 5 行增加到了下面的几十行:

import requests
import pandas as pd
from bs4 import BeautifulSoup
from lxml import etree
import time
import pymysql
from sqlalchemy import create_engine
from urllib.parse import urlencode  # 编码 URL 字符串

start_time = time.time()  #计算程序运行时间
def get_one_page(i):
    try:
        headers = {
            'User-Agent''Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36'
        }
        paras = {
        'reportTime''2017-12-31',
        #可以改报告日期,比如2018-6-30获得的就是该季度的信息
        'pageNum': i   #页码
        }
        url = 'http://s.askci.com/stock/a/?' + urlencode(paras)
        response = requests.get(url,headers = headers)
        if response.status_code == 200:
            return response.text
        return None
    except RequestException:
        print('爬取失败')

def parse_one_page(html):
    soup = BeautifulSoup(html,'lxml')
    content = soup.select('#myTable04')[0#[0]将返回的list改为bs4类型
    tbl = pd.read_html(content.prettify(),header = 0)[0]
    # prettify()优化代码,[0]从pd.read_html返回的list中提取出DataFrame
    tbl.rename(columns = {'序号':'serial_number''股票代码':'stock_code''股票简称':'stock_abbre''公司名称':'company_name''省份':'province''城市':'city''主营业务收入(201712)':'main_bussiness_income''净利润(201712)':'net_profit''员工人数':'employees''上市日期':'listing_date''招股书':'zhaogushu''公司财报':'financial_report''行业分类':'industry_classification''产品类型':'industry_type''主营业务':'main_business'},inplace = True)
    return tbl

def generate_mysql():
    conn = pymysql.connect(
        host='localhost',
        user='root',
        password='',
        port=3306,
        charset = 'utf8',  
        db = 'wade')
    cursor = conn.cursor()

    sql = 'CREATE TABLE IF NOT EXISTS listed_company (serial_number INT(20) NOT NULL,stock_code INT(20) ,stock_abbre VARCHAR(20) ,company_name VARCHAR(20) ,province VARCHAR(20) ,city VARCHAR(20) ,main_bussiness_income VARCHAR(20) ,net_profit VARCHAR(20) ,employees INT(20) ,listing_date DATETIME(0) ,zhaogushu VARCHAR(20) ,financial_report VARCHAR(20) , industry_classification VARCHAR(20) ,industry_type VARCHAR(100) ,main_business VARCHAR(200) ,PRIMARY KEY (serial_number))'
    cursor.execute(sql)
    conn.close()

def write_to_sql(tbl, db = 'wade'):
    engine = create_engine('mysql+pymysql://root:@localhost:3306/{0}?charset=utf8'.format(db))
    try:
        tbl.to_sql('listed_company2',con = engine,if_exists='append',index=False)
        # append表示在原有表基础上增加,但该表要有表头
    except Exception as e:
        print(e)

def main(page):
    generate_mysql()
    for i in range(1,page):  
        html = get_one_page(i)
        tbl = parse_one_page(html)
        write_to_sql(tbl)

# # 单进程
if __name__ == '__main__':    
    main(178)
    endtime = time.time()-start_time
    print('程序运行了%.2f秒' %endtime)

# 多进程
from multiprocessing import Pool
if __name__ == '__main__':
     pool = Pool(4)
     pool.map(main, [i for i in range(1,178)])  #共有178页
    endtime = time.time()-start_time
    print('程序运行了%.2f秒' %(time.time()-start_time))

结语

这个过程觉得很自然,因为每次修改都是针对一个小点,一点点去学,搞懂后添加进来,而如果让你上来就直接写出这几十行的代码,你很可能就放弃了。

所以,你可以看到,入门爬虫是有套路的,最重要的是给自己信心。

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对小编的支持。

作者:苏克

源自:https://www.makcyun.top/web_scraping_withpython18.html

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 新型的爬虫框架 feapder, 支持爬虫报警机制

1. 前言

众所周知,Python 最流行的爬虫框架是 Scrapy,它主要用于爬取网站结构性数据

今天推荐一款更加简单、轻量级,且功能强大的爬虫框架:feapder

项目地址:

https://github.com/Boris-code/feapder

2. 介绍及安装

和 Scrapy 类似,feapder 支持轻量级爬虫、分布式爬虫、批次爬虫、爬虫报警机制等功能

内置的 3 种爬虫如下:

  • AirSpider

    轻量级爬虫,适合简单场景、数据量少的爬虫

  • Spider

    分布式爬虫,基于 Redis,适用于海量数据,并且支持断点续爬、自动数据入库等功能

  • BatchSpider

    分布式批次爬虫,主要用于需要周期性采集的爬虫

在实战之前,我们在虚拟环境下安装对应的依赖库

# 安装依赖库
pip3 install feapder

3. 实战一下

我们以最简单的 AirSpider 来爬取一些简单的数据

目标网站:aHR0cHM6Ly90b3BodWIudG9kYXkvIA==

详细实现步骤如下( 5 步)

3-1  创建爬虫项目

首先,我们使用「 feapder create -p 」命令创建一个爬虫项目

# 创建一个爬虫项目
feapder create -p tophub_demo

3-2  创建爬虫 AirSpider

命令行进入到 spiders 文件夹目录下,使用「 feapder create -s 」命令创建一个爬虫

cd spiders

# 创建一个轻量级爬虫
feapder create -s tophub_spider 1

其中

  • 1 为默认,表示创建一个轻量级爬虫 AirSpider

  • 2 代表创建一个分布式爬虫 Spider

  • 3 代表创建一个分布式批次爬虫 BatchSpider

3-3  配置数据库、创建数据表、创建映射 Item

以 Mysql 为例,首先我们在数据库中创建一张数据表

# 创建一张数据表
create table topic
(
    id         int auto_increment
        primary key,
    title      varchar(100)  null comment '文章标题',
    auth       varchar(20)   null comment '作者',
    like_count     int default 0 null comment '喜欢数',
    collection int default 0 null comment '收藏数',
    comment    int default 0 null comment '评论数'
);

然后,打开项目根目录下的 settings.py 文件,配置数据库连接信息

# settings.py

MYSQL_IP = "localhost"
MYSQL_PORT = 3306
MYSQL_DB = "xag"
MYSQL_USER_NAME = "root"
MYSQL_USER_PASS = "root"

最后,创建映射 Item( 可选 )

进入到 items 文件夹,使用「 feapder create -i 」命令创建一个文件映射到数据库

PS:由于 AirSpider 不支持数据自动入库,所以这步不是必须

3-4  编写爬虫及数据解析

第一步,首先使「 MysqlDB初始化数据库

from feapder.db.mysqldb import MysqlDB

class TophubSpider(feapder.AirSpider):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.db = MysqlDB()

第二步,在 start_requests 方法中,指定爬取主链接地址,使用关键字「download_midware 配置随机 UA

import feapder
from fake_useragent import UserAgent

def start_requests(self):
    yield feapder.Request("https://tophub.today/", download_midware=self.download_midware)

def download_midware(self, request):
    # 随机UA
    # 依赖:pip3 install fake_useragent
    ua = UserAgent().random
    request.headers = {'User-Agent': ua}
    return request

第三步,爬取首页标题、链接地址

使用 feapder 内置方法 xpath 去解析数据即可

def parse(self, request, response):
    # print(response.text)
    card_elements = response.xpath('//div[@class="cc-cd"]')

    # 过滤出对应的卡片元素【什么值得买】
    buy_good_element = [card_element for card_element in card_elements if
                        card_element.xpath('.//div[@class="cc-cd-is"]//span/text()').extract_first() == '什么值得买'][0]

    # 获取内部文章标题及地址
    a_elements = buy_good_element.xpath('.//div[@class="cc-cd-cb nano"]//a')

    for a_element in a_elements:
        # 标题和链接
        title = a_element.xpath('.//span[@class="t"]/text()').extract_first()
        href = a_element.xpath('.//@href').extract_first()

        # 再次下发新任务,并带上文章标题
        yield feapder.Request(href, download_midware=self.download_midware, callback=self.parser_detail_page,
                              title=title)

第四步,爬取详情页面数据

上一步下发新的任务,通过关键字「 callback 」指定回调函数,最后在 parser_detail_page 中对详情页面进行数据解析

def parser_detail_page(self, request, response):
    """
    解析文章详情数据
    :param request:
    :param response:
    :return:
    """

    title = request.title

    url = request.url

    # 解析文章详情页面,获取点赞、收藏、评论数目及作者名称
    author = response.xpath('//a[@class="author-title"]/text()').extract_first().strip()

    print("作者:", author, '文章标题:', title, "地址:", url)

    desc_elements = response.xpath('//span[@class="xilie"]/span')

    print("desc数目:", len(desc_elements))

    # 点赞
    like_count = int(re.findall('\d+', desc_elements[1].xpath('./text()').extract_first())[0])
    # 收藏
    collection_count = int(re.findall('\d+', desc_elements[2].xpath('./text()').extract_first())[0])
    # 评论
    comment_count = int(re.findall('\d+', desc_elements[3].xpath('./text()').extract_first())[0])

    print("点赞:", like_count, "收藏:", collection_count, "评论:", comment_count)

3-5  数据入库

使用上面实例化的数据库对象执行 SQL,将数据插入到数据库中即可

# 插入数据库
sql = "INSERT INTO topic(title,auth,like_count,collection,commentvalues('%s','%s','%s','%d','%d')" % (
title, author, like_count, collection_count, comment_count)

# 执行
self.db.execute(sql)

4. 最后

本篇文章通过一个简单的实例,聊到了 feapder 中最简单的爬虫 AirSpider

关于 feapder 高级功能的使用,后面我将会通过一系列实例进行详细说明

如果你觉得文章还不错,请大家 点赞、分享、留言 下,因为这将是我持续输出更多优质文章的最强动力!

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Newspaper — 一个能下载38种语言新闻文章的 Python 模块

Newspaper 是一个很棒的python库,用于提取和整理文章。

它有以下的优点:

  • 多线程文章下载框架
  • 识别新闻网址
  • 从html提取文本
  • 从html提取顶部图像
  • 从html提取所有图像
  • 从文本中提取关键字
  • 自动提取摘要
  • 自动提取作者
  • 自动提取 Google 趋势词

下面是这个开源模块的安装和使用教程。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip3 install newspaper3k

遇到任何安装问题,可以在本文下方留言框或Python实用宝典公众号上留言,也可以访问项目官网查看相关安装指南:
https://github.com/codelucas/newspaper

2.基本使用

Newspaper 中是以文章为对象实现各种操作的,比如下载指定新闻的HTML:

from newspaper import Article

url = 'http://fox13now.com/2013/12/30/new-year-new-laws-obamacare-pot-guns-and-drones/'

# 根据url生成Article对象
article = Article(url)

# 下载文章
article.download()

# 文章的HTML
article.html
#'<!DOCTYPE HTML><html itemscope itemtype="http://...'

通过解析新闻和文章,你能获得此文章的作者、发布时间、摘要、顶部图像、所有图像、多媒体等:

"""
Python 实用宝典
《Newspaper — 一个能下载38种语言新闻文章的 Python 模块》
"""

# 解析文章
article.parse()

# 获取文章作者
article.authors
# ['Leigh Ann Caldwell', 'John Honway']

# 获取文章发布日期
article.publish_date
# datetime.datetime(2013, 12, 30, 0, 0)

# 获取文章文本
article.text
# 'Washington (CNN) -- Not everyone subscribes to a New Year's resolution...'

# 获取顶部图像
article.top_image
# 'http://someCDN.com/blah/blah/blah/file.png'

# 获取文章多媒体资源
article.movies
# ['http://youtube.com/path/to/link.com', ...]

除此之外,该模块还附带了 NLP 功能,你能用它来识别文章关键字并自动提取摘要:

# 使用 NLP 解析
article.nlp()

# 获取文章关键词
article.keywords
# ['New Years', 'resolution', ...]

# 获取文章摘要
article.summary
# 'The study shows that 93% of people ...'

你看,这个工具不无敌吗?它还能提取某个网站的所有新闻文章,比如我想提取CNN的新闻文章:

import newspaper

cnn_paper = newspaper.build('http://cnn.com')

for article in cnn_paper.articles:
    print(article.url)
# http://www.cnn.com/2013/11/27/justice/tucson-arizona-captive-girls/
# http://www.cnn.com/2013/12/11/us/texas-teen-dwi-wreck/index.html

在此之上,你还能拿到CNN的其他新闻门户分类:

for category in cnn_paper.category_urls():
    print(category)

# http://lifestyle.cnn.com
# http://cnn.com/world
# http://tech.cnn.com
# ...

许多中文媒体的文章下载也是支持的:

import newspaper
sina_paper = newspaper.build('http://www.sina.com.cn/', language='zh')

for category in sina_paper.category_urls():
    print(category)
# http://health.sina.com.cn
# http://eladies.sina.com.cn
# http://english.sina.com
# ...

article = sina_paper.articles[0]
article.download()
article.parse()

print(article.text)
# 新浪武汉汽车综合 随着汽车市场的日趋成熟,
# 传统的"集全家之力抱得爱车归"的全额购车模式已然过时,
# 另一种轻松的新兴 车模式――金融购车正逐步成为时下消费者购
# 买爱车最为时尚的消费理念,他们认为,这种新颖的购车
# 模式既能在短期内
# ...

print(article.title)
# 两年双免0手续0利率 科鲁兹掀背金融轻松购_武汉车市_武汉汽
# 车网_新浪汽车_新浪网

从上面的例子你可以看到,你可以非常容易地提取中文文章,仅需要在Article的language参数中指定 ‘zh’ :

"""
Python 实用宝典
《Newspaper — 一个能下载38种语言新闻文章的 Python 模块》
"""

from newspaper import Article
url = 'http://www.bbc.co.uk/zhongwen/simp/chinese_news/2012/12/121210_hongkong_politics.shtml'
a = Article(url, language='zh') # Chinese
a.download()
a.parse()
print(a.text[:150])

# 香港行政长官梁振英在各方压力下就其大宅的违章建
# 筑(僭建)问题到立法会接受质询,并向香港民众道歉。
# 梁振英在星期二(12月10日)的答问大会开始之际
# 在其演说中道歉,但强调他在违章建筑问题上没有隐瞒的
# 意图和动机。 一些亲北京阵营议员欢迎梁振英道歉,
# 且认为应能获得香港民众接受,但这些议员也质问梁振英有

print(a.title)
# 港特首梁振英就住宅违建事件道歉

这个工具所支持的所有语言如下:

input code      full name

  ar              Arabic
  be              Belarusian
  bg              Bulgarian
  da              Danish
  de              German
  el              Greek
  en              English
  es              Spanish
  et              Estonian
  fa              Persian
  fi              Finnish
  fr              French
  he              Hebrew
  hi              Hindi
  hr              Croatian
  hu              Hungarian
  id              Indonesian
  it              Italian
  ja              Japanese
  ko              Korean
  lt              Lithuanian
  mk              Macedonian
  nb              Norwegian (Bokmål)
  nl              Dutch
  no              Norwegian
  pl              Polish
  pt              Portuguese
  ro              Romanian
  ru              Russian
  sl              Slovenian
  sr              Serbian
  sv              Swedish
  sw              Swahili
  th              Thai
  tr              Turkish
  uk              Ukrainian
  vi              Vietnamese
  zh              Chinese

你可以按需选择自己所需要的语言。

3.高级玩法

前面我们说过,Newspaper 是一个可以并发下载文章的框架,它是这么玩的:

"""
Python 实用宝典
《Newspaper — 一个能下载38种语言新闻文章的 Python 模块》
"""

import newspaper
from newspaper import news_pool

slate_paper = newspaper.build('http://slate.com')
tc_paper = newspaper.build('http://techcrunch.com')
espn_paper = newspaper.build('http://espn.com')

papers = [slate_paper, tc_paper, espn_paper]
news_pool.set(papers, threads_per_source=2) # (3*2) = 总计 6 线程
news_pool.join()

# 到这一步,你可以假定三个新闻源的文章都下载完成了
print(slate_paper.articles[10].html)
# u'<html> ...'

可以看到,作者通过 build 三个新闻源,拿到一个总的新闻源池进行并发请求。

其中,.set 函数起到了调度作用,它能通过指定 threads_per_source 的值设定每个新闻源的线程。最后再 join 起来开始并发请求新闻源并开始下载新闻。

此外,Newspaper 还有一些参数可供你配置,比如:

keep_article_html,默认为False,“如果要保留正文文本的html,则设置为True”

http_success_only,默认为True,“设置为False也可以捕获非2XX响应”

MIN_WORD_COUNT,默认为300,“文章中的单词数量”

MIN_SENT_COUNT,默认为7,“句子数”

MAX_TITLE,默认值为200,“文章标题中的字符数”

MAX_TEXT,默认值为100000,“文章文字中的字符数”

MAX_KEYWORDS,默认值为35,“文章中的关键词数”

MAX_AUTHORS,默认值为10,“文章中的作者姓名数量”

MAX_SUMMARY,默认值为5000,“摘要的字符数”

MAX_SUMMARY_SENT,默认为5,“摘要中的句子数”

memoize_articles,默认为True,“运行后缓存并保存运行后的文章”

fetch_images,默认为True,“如果不需要获取图片,请将其设置为false”

request_timeout,默认为7,请求7秒后未响应完成则超时

number_threads,默认值为10,多线程数量

如果你需要使用以上参数,可以设一个Config对象,传入指定的 Article 对象或build 方法中,如:

import newspaper
from newspaper import Config, Article, Source

config = Config()
config.memoize_articles = False

cbs_paper = newspaper.build('http://cbs.com', config)

非常简单易懂,而且设置起来的维护成本不算很高。

在做一些舆情分析或者NLP算法训练/测试的时候,这个模块简直就是你的福音。你可以很方便地从网站上提取任意语言的文本数据,拿来测试或者训练都可以。

对于那些想要搞舆情分析,寻找市场热点的同学而言,这个模块也是非常方便,你能搭配邮件发布工具,并使用Newspaper的关键词提取功能,迅速制作一个关键词热点实时告警的工具。

总而言之,这是一个非常值得了解并学习使用的第三方模块,强烈推荐。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python Selenium 实现网站自动签到教程

前情提要

小五收藏了一些论坛网站,经常需要自己登录签到,以此来获得积分金币等等。

步骤倒是并不复杂,只需要填写账号密码登录,然后点击签到即可。

但天天手动太容易忘了这件事啦。毕竟我们都会用python了,那就可以使用Selenium操作;浏览器实现自动签到啊!

现在开始上手工作👉

准备工作

首先我们需要先安装Selenium,从而实现后续自动化操控浏览器。我们可以利用它来模拟鼠标按键,跟按键精灵很类似。

pip install selenium

待其安装完成后,准备另一个必需工具。

selenium可以操控多款浏览器,包括谷歌,火狐等,这里小五使用的是谷歌浏览器。

这里需要知道浏览器的版本信息,只需打开“关于Chrome”,就可以看到了。

如上图所示,我的浏览器版本是89.0.4389.114。然后我们需要前往(http://chromedriver.storage.googleapis.com/index.html)找到与浏览器相匹配的版本,没有一模一样的选择最近的版本也可以。

点击进去选择对应的系统版本,下载后将chromedriver.exe解压出来,最后将其放到与python.exe文件相同的路径下。

如果你是用的anaconda就放在下面目录下

C:\Users\Administrator\anaconda3

具体位置如下图所示👇

下面我们就可以正式用python自动签到了。

selenium 教程-代码及讲解

首先打开我要登录的网站,具体域名就不分享给大家了。

先导入selenium库,这里只需使用selenium中的webdriver模块,运行

from selenium import webdriver

打开下载的浏览器驱动,设置隐式等待时

wd=webdriver.Chrome()
wd.implicitly_wait(1)

👆执行代码的时候会自行去寻找chromedriver.exe(在python目录下寻找)。如果我们前面没有把它放在固定的路径下,就需要在这里指定chromedriver.exe路径。

打开登录网页

wd.get('待登录网站URL')

如上图所示,模拟浏览器已经打开了网站的登录界面。这个时候我们需要定位到输入框、密码框以及登录按钮等。

这里不用担心,Selenium提供了很多种定位DOM元素的方法,各有各的特点和优势。今天就主要使用 by_xpath() 这个方法来定位元素,这个方法比较灵活方便,大部分属性都可以通过它来定位。

【检查】→【进入开发者模式】点击左上角的图标,再点击你要找的对象,即可得到该对象的信息。点位该对象后,右键copy它的XPath!

input = wd.find_element_by_xpath('//*[@id="email"]')
input.send_keys('kxpython@163.com')

同理,我们可以定位到密码框,再send_keys输入密码

password = wd.find_element_by_xpath('//*[@id="password"]')
password.send_keys('kxpython')

至于需要点击的对象,可以使用click()来实现模拟点击的功能。

点击登录

button_login = wd.find_element_by_xpath('//*[@id="app"]/section/div/div/div/div[2]/form/div/div[5]/button')
button_login.click()

点击跳过弹窗

一般登录后会有个小弹窗,关闭即可。

wd.find_element_by_xpath('//*[@id="popup-ann-modal"]/div/div/div[3]/button').click()

尝试签到

try:
    wd.find_element_by_xpath('//*[@id="checkin-div"]/a').click()
except:
    print("已签到")

最后要记得关闭浏览器窗口

wd.quit()

注:close()关闭当前窗口,wd.quit()则是关闭所有窗口。

点击运行上面的全部代码,即可实现python对网站的自动签到。我们还可以将该python脚本设置为定时任务,这样就不用每天手动去签到啦!

小结

作为演示的这个网站非常简单,如果大家也想用python来自动签到的话,要根据自己的网站情况来改写脚本。

其实 Selenium 更多的用法是自动化测试、爬虫等,具体Selenium的详细用法,可以参考文档。

地址:http://selenium-python.readthedocs.org/

我们学习python时,可以尝试用来解决自己的生活问题,这样学得也更扎实,学得也更有趣味!

来源:快学Python

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典