分类目录归档:解决方案

Python 调用 Shodan 实战教程 — 互联网上最可怕的搜索引擎

Shodan 在百度百科里被给出了这么一句话介绍:Shodan是互联网上最可怕的搜索引擎。

为什么呢?与谷歌、百度等搜索引擎爬取网页信息不同,Shodan爬取的是互联网上所有设备的IP地址及其端口号。

而随着智能家电的普及,家家户户都有许多电器连接到互联网,这些设备存在被入侵的可能性,这是十分危险的。

说了这么多,给大家体验下shodan,让你们有更切身的理解。打开shodan,在搜索框输入 Hikvision-Webs:

你会搜素到这个品牌的摄像头设备遍及全球的IP及其暴露的端口号:

可以看到,这台机器暴露了17、80、111、995、3128、5000、6000、20547端口,黑客可以根据这些端口进行针对性的攻击。

不过也不需要过于担心,如果你的服务不存在漏洞,一般是无法攻入的。但有些端口号会暴露摄像头的web管理端,如下:

那么黑客可能可以用暴力破解的方式,强行进入摄像头后台管理端,获取到实时的录像。

谨记这会侵犯别人的隐私权,是违法的行为,我们是遵纪守法的好公民所以知道它的原理和危害就足够。我们的目的是运用技术保护好个人隐私,如非必要不将摄像头接入互联网,一定要接入的话,不能使用容易被破解的弱口令。

Shodan Web端非常好用,但如果我们有从Python搜索的需求怎么办?

没关系,shodan 官方也提供了python官方SDK包,下面就来讲讲这个SDK包的使用。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install shodan

2.Shodan 注册账号获取API

使用 Shodan 必须注册账号,注册网址https://account.shodan.io/register

输入完相关信息,点击 CREATE 会跳转到个人账户页:

此时 API Key 会显示你的API秘钥,请记录这个秘钥,后续会使用到这个秘钥去请求接口。

3.Shodan 基本调用实战教程

Shodan本质上就是一个搜索引擎,你只需要输入搜索的关键词:

# 公众号:Python 实用宝典
# 2021-05-04
from shodan import Shodan

api = Shodan('你的API KEY')

def search_shodan(keyword):
    # 调用搜索接口
    result = api.search(keyword)

    # 显示所有IP
    for service in result['matches']:
            print(service['ip_str'])

search_shodan("Hikvision-Webs")

结果如下:

可惜的是,普通API只能像这样搜索关键字,无法使用过滤条件如:Hikvision-Webs country:”US” 搜索美国内的所有Hikvision网站管理端。

如果你想要使用过滤条件,Shodan需要你升级API权限:

挺贵的,不过还好是一次性支付,永久使用。

4. Shodan 高级使用教程

Shodan 的用处当然不仅仅是在黑客攻防中,它还能用于统计。如果你想要了解哪些国家的使用这款摄像头的数量最多,可以使用 Facets 特性。

# 公众号:Python 实用宝典
# 2021-05-04
from shodan import Shodan

api = Shodan('你的API KEY')
def try_facets(query):
    FACETS = [
        'org',
        'domain',
        'port',
        'asn',
        ('country', 3),
    ]

    FACET_TITLES = {
        'org': 'Top 5 Organizations',
        'domain': 'Top 5 Domains',
        'port': 'Top 5 Ports',
        'asn': 'Top 5 Autonomous Systems',
        'country': 'Top 3 Countries',
    }

    try:
        # 使用 count() 方法可以不需要升级API,且比 search 方法更快。
        result = api.count(query, facets=FACETS)

        print('Shodan Summary Information')
        print('Query: %s' % query)
        print('Total Results: %s\n' % result['total'])

        # 显示每个要素的摘要
        for facet in result['facets']:
            print(FACET_TITLES[facet])

            for term in result['facets'][facet]:
                print('%s: %s' % (term['value'], term['count']))

    except Exception as e:
        print('Error: %s' % e)

try_facets("Hikvision-Webs")

得到结果如下:

从 Top 3 Countries 中可以看到,这款摄像头使用数量排名前三的国家分别是:美国、日本和德国。

没想到吧,Shodan居然还能用于产品分析。同样地原理,如果你把关键词改为”apache”,你可以知道目前哪些国家使用apache服务器数量最多,最普遍被使用的版本号是什么。

简而言之,Shodan是一个非常强大的搜索引擎,它在好人手里,能被发挥出巨大的潜能。如果Shodan落入坏人之手的话,那真是一个可怕的东西。

为了避免受到不必要的攻击,请大家及时检查所有联网设备的管理端的密码,如果有使用默认密码及弱口令,立即进行密码的更改,以保证服务的安全。

本文所有源代码可在 Python 实用宝典 公众号后台回复:shodan 下载。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 下划线_的五大作用—临时变量、保护变量、私有变量、魔术方法…

Python有很多地方使用下划线。在不同场合下,有不同含义:比如_var表示内部变量;__var表示私有属性;__var__表示魔术方法;这些含义有的是程序员群体的约定,如_var;有的是Python解释器规定的形式,如__var

本文总结Python语言编程中常用下划线的地方,力图一次搞懂_用法。目前常见的用法有五种:

  • _用于临时变量
  • var_用于解决命名冲突问题
  • _var用于保护变量
  • __var用于私有变量
  • __var__用于魔术方法

下面我们具体看看这些下划线应用场景。

一、_用于临时变量

单下划线一般用于表示临时变量,在REPL、for循环和元组拆包等场景中比较常见。

1.1 REPL

单下划线在REPL中关联的是上一次计算的非None结果。

>>> 1+1
2
>>> _
2
>>> a=2+2
>>> _
2

1+1,结果为2,赋值给_;而赋值表达式a=2+2a为4,但整个表达式结果为None,故不会关联到_。这有点类似日常大家使用的计算器中的ANS按键,直接保存了上次的计算结果。

1.2 for循环中的_

for循环中_作为临时变量用。下划线来指代没什么意义的变量。例如在如下函数中,当我们只关心函数执行次数,而不关心具体次序的情况下,可以使用_作为参数。

nums = 13
for _ in range(nums):
    fun_oper()

1.3 元组拆包中的_

第三个用法是元组拆包,赋值的时候可以用_来表示略过的内容。如下代码忽略北京市人口数,只取得名字和区号。

>>> city,_,code = ('Beijing',21536000,'010')
>>> print(city,code)
Beijing 010

如果需要略过的内容多于一个的话,可以使用*开头的参数,表示忽略多个内容。如下代码忽略面积和人口数,只取得名字和区号

city,*_,code = ('Beijing',21536000,16410.54,'010')

1.4 国际化函数

在一些国际化编程中,_常用来表示翻译函数名。例如gettext包使用时:

import gettext
zh = gettext.tranlation('dict','locale',languages=['zh_CN'])
zh.install()
_('hello world')

依据设定的字典文件,其返回相应的汉字“你好世界”。

1.5 大数字表示形式

_也可用于数字的分割,这在数字比较长的时候常用。

>>> a = 9_999_999_999
>>> a
9999999999

a的值自动忽略了下划线。这样用_分割数字,有利于便捷读取比较大的数。

二、var_用于解决命名冲突问题

变量后面加一个下划线。主要用于解决命名冲突问题,元编程中遇时Python保留的关键字时,需要临时创建一个变量的副本时,都可以使用这种机制。

def type_obj_class(name,class_):
    pass

def tag(name,*content,class_):
    pass

以上代码中出现的class是Python的保留关键字,直接使用会报错,使用下划线后缀的方式解决了这个问题。

三、_var用于保护变量

前面一个下划线,后面加上变量,这是仅供内部使用的“保护变量”。比如函数、方法或者属性。

这种保护不是强制规定,而是一种程序员的约定,解释器不做访问控制。一般来讲这些属性都作为实现细节而不需要调用者关心,随时都可能改变,我们编程时虽然能访问,但是不建议访问。

这种属性,只有在导入时,才能发挥保护作用。而且必须是from XXX import *这种导入形式才能发挥保护作用。

使用from XXX import *是一种通配导入(wildcard import),这是Python社区不推荐的方式,因为你根本搞不清你到底导入了什么属性、方法,很可能搞乱你自己的命名空间。PEP8推荐的导入方式是from XXX import aVar , b_func , c_func这种形式。

比如在下例汽车库函数tools.py里定义的“保护属性”:发动机型号和轮胎型号,这属于实现细节,没必要暴露给用户。当我们使用from tools import * 语句调用时,其实际并没有导入所有_开头的属性,只导入了普通drive方法。

_moto_type = 'L15b2'
_wheel_type = 'michelin'

def drive():
    _start_engine()
    _drive_wheel()

def _start_engine():
    print('start engine %s'%_moto_type)
    
def _drive_wheel():
    print('drive wheel %s'%_wheel_type)

查看命令空间print(vars())可见,只有drive函数被导入进来,其他下划线开头的“私有属性”都没有导入进来。

{'__name__''__main__''__doc__': None, '__package__': None, '__loader__': <_frozen_importlib_external.SourceFileLoader object at 0x005CF868>, '__spec__': None, '__annotations__':{}, '__builtins__': <module 'builtins' (built-in)>, '__file__''.\\xiahuaxian.py''__cached__': None, 'walk': <function walk at 0x01DA8C40>, 'root''.\\__pycache__''_': [21536000, 16410.54], 'dirs': ['tools.cpython-38.pyc'], 'city''Beijing''code''010''drive': <function drive at 0x01DBC4A8>}

3.1 突破保护属性

之所以说是“保护”并不是“私有”,是因为Python没有提供解释器机制来控制访问权限。我们依然可以访问这些属性:

import tools
tools._moto_type = 'EA211'
tools.drive()

以上代码,以越过“保护属性”。此外,还有两种方法能突破这个限制,一种是将“私有属性”添加到tool.py文件的__all__列表里,使from tools import *也导入这些本该隐藏的属性。

__all__ = ['drive','_moto_type','_wheel_type']

另一种是导入时指定“受保护属性”名。

from tools import drive,_start_engine
_start_engine()

甚至是,使用import tools也可以轻易突破保护限制。所以可见,“保护属性”是一种简单的隐藏机制,只有在from tools import *时,由解释器提供简单的保护,但是可以轻易突破。这种保护更多地依赖程序员的共识:不访问、修改“保护属性”。除此之外,有没有更安全的保护机制呢?有,就是下一部分讨论的私有变量。

四、__var用于私有变量

私有属性解决的之前的保护属性保护力度不够的问题。变量前面加上两个下划线,类里面作为属性名和方法都可以。两个下划线属性由Python的改写机制来实现对这个属性的保护。

看下面汽车例子中,品牌为普通属性,发动机为“保护属性”,车轮品牌为“私有属性”。

class Car:
    def __init__(self):
        self.brand = 'Honda'
        self._moto_type = 'L15B2'
        self.__wheel_type = 'michelin'

    def drive(self):
        print('Start the engine %s,drive the wheel %s,I get a running %s car'%
        (self._moto_type,
        self.__wheel_type,
        self.brand))

我们用var(car1)查看下具体属性值,

['_Car__wheel_type''__class__''__delattr__''__dict__''__dir__''__doc__''__eq__''__format__''__ge__''__getattribute__''__gt__''__hash__''__init__''__init_subclass__''__le__''__lt__''__module__''__ne__''__new__''__reduce__''__reduce_ex__''__repr__''__setattr__''__sizeof__''__str__''__subclasshook__''__weakref__''_moto_type''brand''drive']

可见,实例化car1中,普通属性self.brand和保护属性self._moto_type都得以保存,两个下划线的私有属性__wheel_type没有了。取而代之的是_Car_wheel_type这个属性。这就是改写机制(Name mangling)。两个下划线的属性,被改写成带有类名前缀的变量,这样子类很难明明一个和如此复杂名字重名的属性。保证了属性不被重载,保证了其的私有性。

4.1 突破私有属性

这里“私有变量”的实现,是从解释器层面给与的改写,保护了私有变量。但是这个机制并非绝对安全,因为我们依然可以通过obj._ClasssName__private来访问__private私有属性。

car1.brand = 'Toyota'
car1._moto_type = '6AR-FSE'
car1._Car__wheel_type = 'BRIDGESTONE'
car1.drive()

结果

Start the engine 6AR-FSE,\
drive the wheel BRIDGESTONE,\
I get a running Toyota car

可见,对改写机制改写的私有变量,虽然保护性加强了,但依然可以访问并修改。只是这种修改,只是一种杂耍般的操作,并不可取。

五、__var__用于魔术方法

变量前面两个下划线,后面两个下划线。这是Python当中的魔术方法,一般是给系统程序调用的。例如上例中的__init__就是类的初始化魔术方法,还有支持len函数的__len__方法,支持上下文管理器协议的__enter__和__exit__方法,支持迭代器协议的__iter__方法,支持格式化显示的__repr__和__str__方法等等。这里我们为上例的Car类添加魔术方法__repr__来支持格式化显示。

    def __repr__(self):
        return '***Car %s:with %s Engine,%sWheel***'%
        (self.brand,self._moto_type,self.__wheel_type)

未添加__repr__魔术方法之前,print(car1)结果为<__main__.Car object at 0x0047F7F0>,这个结果让人看的一头雾水,增加repr魔术方法之后,显示结果为***Car Toyota:with 6AR-FSE Engine,BRIDGESTONE Wheel***清晰明了,利于调试。这就是魔术方法的功效:支持系统调用,改进用户类表现,增加协议支持,使用户类表现得更像系统类。

5.1 Python魔术方法分类

以下所有魔术方法均需要在前后加上__,这里省略了这些双下划线。

  • 一元运算符 neg pos abs invert
  • 转换 complex int float round inex
  • 算术运算 add sub mul truediv floordiv mod divmod pow lshift rshift and xor or

算术运算除and之外,前面再加上r,表示反运算。除dimod外,前面加上i,表示就地运算。

  • 比较 lt le eq ne gt ge
  • 类属性 getattr getattribute setattr delattr dir get set delete
  • 格式化 bytes hash bool format
  • 类相关 init del new
  • 列表 getitem
  • 迭代器 iter next
  • 上下文管理器 enter exit

六、总结

总之,下划线在 Python 当中应用还是很广泛的,甚至可以说 Python 对下划线有所偏爱

可以看到 _常用于临时变量,在REPL,for循环,元组拆包和国际化中得到了广泛应用

var_用于解决命名冲突问题,使用时比较简单易懂的。_var对变量的保护,只是一种脆弱的保护,更多依靠程序员的约定。__var用于私有变量,借助改写机制支持,已经支持了私有变量,但是仍然存在漏洞

__var__用于魔术方法,进行了一个简单的介绍,魔术方法较多,但是理解并不复杂。希望以后可以进一步介绍这些魔术方法

继续阅读Python 下划线_的五大作用—临时变量、保护变量、私有变量、魔术方法…

超详细快速部署 Python 脚本到手机上

1. 前言

最近有读者后台给我留言,说这段时间云服务器涨价了,自己日常就运行一些简单的脚本,因此不太想入坑云服务器,问我能不能提供一个不一样的思路给他

本篇文章将介绍一款软件,即:iSH

这款 App 功能非常强大,可以运行各种脚本,适用于需求不是很大的小伙伴

2. 介绍

iSH 是一款运行在 iOS 系统上的 App,可以运行 Linux Shell,底层操作系统基于「 Alpine 

PS:Alpine 是一个超轻量级的 Linux 发行版,是一个由社区开发的 Linux 操作系统,该操作系统以安全为理念,面向 x86 路由器、防火墙、虚拟专用网、IP 电话盒及服务器而设计

项目地址:

https://github.com/ish-app/ish/

安装有 2 种方式,分别是:

  • App Store 搜索关键字「 iSH 」下载

  • Github 下载源码,使用 Xcode 编译安装

App 界面如下,从左往右,功能键包含:Tab 键、Ctrl 键、ESC 键、滚动键( Arrow 键)、软件设置、快速粘贴键、隐藏输入法

其中,

Tab 键、Ctrl 键、ESC 键和 PC 端使用方法一致

滚动键用于光标移动和历史命令切换( 通过向上、向下滑动来切换历史命令 )

设置中,可以对外观主题、文字样式、应用图标、文件管理进行查看设置

3. 更换源及安装依赖

由于默认的源在国外,下载依赖很慢,我们需要更换源

使用 vim 命令编辑文件 「 /etc/apk/repositories 」,删除默认的源,更换为阿里或者清华的源

# 编辑文件
vim /etc/apk/repositories

# 替换为国内的源
# 阿里源
https://mirrors.aliyun.com/alpine/v3.11/main
https://mirrors.aliyun.com/alpine/v3.11/community

编辑完成后,保存退出

下面就可以安装 Python 及常见依赖库了

3-1  安装 Python3

iSH 使用命令「 apk add app_name 」安装应用程序

# 安装python3
apk add python3

3-2  安装 pip

首先,我们使用「 wget 」命令下载 pip 文件,然后安装 pip

# 下载get-pip文件
wget https://bootstrap.pypa.io/get-pip.py

# 安装
python3 get-pip.pya

3-3  安装依赖包

以最常见的 requests 为例,直接使用 pip3 安装即可

# 安装依赖
pip3 install requests

需要注意的是,iSH 安装速度比较慢,需要耐心等待

4. 执行脚本

由于手机上编辑脚本效率太低,大部分时候我们都是在 PC 端编写完成,然后导入到 iSH 中运行

常见方案为 iSH + SSH + Git,为了演示方便,我这里使用「 Web Server for Chrome 」在 PC 端搭建了文件共享服务器,然后将脚本文件放置到共享目录

然后再 iSH 终端,使用 wget 命令直接下载脚本文件

最后,进入到脚本文件夹目录,运行脚本文件即可

# 下载脚本文件压缩包
wget http://192.168.2.121:8887/rsc.zip

# 解压文件
unzip rsc.zip

# 进入到文件夹目录
cd rsc/

# 运行脚本文件
python3 main_proj.py

5. 拓展一下

iSH 常见命令如下:

5-1  安装

apk add <name>
apk add vim
apk add python3

5-2  卸载

# 卸载应用
apk del <name>

5-3  搜索应用

# 搜索应用
# PS:可以先搜索,然后再安装
apk search <name>

5-4  更新包管理器

iSH 使用 Alpine 包管理器,使用下面的命令可以更新 Alpine 存储库列表

# 更新存储库列表
apk update

6. 总结

iSH 作为一款 iOS 端的应用,可以非常便捷地完成 Python 脚本的部署运行,当然 Shell 脚本也是支持的

另外,iSH 可以开启 SSH Server 供远程连接,这部分内容及 iSH 详细使用文档我已经写成 PDF 并上传到后台,回复关键字「 iSH 」获取完整内容

 

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 想了解EventLoop?这篇文章就够了

原文来自 python-parallel-programming-cookbook-cn

Python的Asyncio模块提供了管理事件、协程、任务和线程的方法,以及编写并发代码的原语。此模块的主要组件和概念包括:

  • 事件循环: 在Asyncio模块中,每一个进程都有一个事件循环。
  • 协程: 这是子程序的泛化概念。协程可以在执行期间暂停,这样就可以等待外部的处理(例如IO)完成之后,从之前暂停的地方恢复执行。
  • Futures: 定义了 Future 对象,和 concurrent.futures 模块一样,表示尚未完成的计算。
  • Tasks: 这是Asyncio的子类,用于封装和管理并行模式下的协程。

本节中重点讨论事件,事实上,异步编程的上下文中,事件无比重要。因为事件的本质就是异步。

1. 什么是事件循环

在计算系统中,可以产生事件的实体叫做事件源,能处理事件的实体叫做事件处理者。此外,还有一些第三方实体叫做事件循环。它的作用是管理所有的事件,在整个程序运行过程中不断循环执行,追踪事件发生的顺序将它们放到队列中,当主线程空闲的时候,调用相应的事件处理者处理事件。最后,我们可以通过下面的伪代码来理解事件循环::

while(1) {
  events = getEvents();
  for (e in events)
    processEvent(e);
}

所有的事件都在 while 循环中捕捉,然后经过事件处理者处理。事件处理的部分是系统唯一活跃的部分,当一个事件处理完成,流程继续处理下一个事件。

2. 准备工作

Asyncio提供了一下方法来管理事件循环:

  • loop = get_event_loop(): 得到当前上下文的事件循环。
  • loop.call_later(time_delay, callback, argument): 延后 time_delay 秒再执行 callback 方法。
  • loop.call_soon(callback, argument): 尽可能快调用 callbackcall_soon() 函数结束,主线程回到事件循环之后就会马上调用 callback 。
  • loop.time(): 以float类型返回当前事件循环的内部时间。
  • asyncio.set_event_loop(): 为当前上下文设置事件循环。
  • asyncio.new_event_loop(): 根据此策略创建一个新的事件循环并返回。
  • loop.run_forever(): 在调用 stop() 之前将一直运行。

3. 如何做…

下面的代码中,我们将展示如何使用Asyncio库提供的事件循环创建异步模式的应用。

import asyncio
import datetime
import time

def function_1(end_time, loop):
    print("function_1 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_2, end_time, loop)
    else:
        loop.stop()

def function_2(end_time, loop):
    print("function_2 called ")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_3, end_time, loop)
    else:
        loop.stop()

def function_3(end_time, loop):
    print("function_3 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_1, end_time, loop)
    else:
        loop.stop()

def function_4(end_time, loop):
    print("function_5 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_4, end_time, loop)
    else:
        loop.stop()

loop = asyncio.get_event_loop()

end_loop = loop.time() + 9.0
loop.call_soon(function_1, end_loop, loop)
# loop.call_soon(function_4, end_loop, loop)
loop.run_forever()
loop.close()

运行结果如下::

python3 event.py
function_1 called
function_2 called
function_3 called
function_1 called
function_2 called
function_3 called
function_1 called
function_2 called
function_3 called

在这个例子中,我们定义了三个异步的任务,相继执行,入下图所示的顺序。

首先,我们要得到这个事件循环::

loop = asyncio.get_event_loop()

然后我们通过 call_soon 方法调用了 function_1() 函数。

end_loop = loop.time() + 9.0
loop.call_soon(function_1, end_loop, loop)

让我们来看一下 function_1() 的定义::

def function_1(end_time, loop):
    print("function_1 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_2, end_time, loop)
    else:
        loop.stop()

这个函数通过以下参数定义了应用的异步行为:

  • end_time: 定义了 function_1() 可以运行的最长时间,并通过 call_later 方法传入到 function_2() 中作为参数
  • loop: 之前通过 get_event_loop() 方法得到的事件循环

function_1() 的任务非常简单,只是打印出函数名字。当然,里面也可以写非常复杂的操作。

print("function_1 called")

任务执行结束之后,它将会比较 loop.time() +1s和设定的运行时间,如果没有超过,使用 call_later 在1秒之后执行 function_2() 。

if (loop.time() + 1.0) < end_time:
    loop.call_later(1, function_2, end_time, loop)
else:
    loop.stop()

function_2() 和 function_3() 的作用类似。

如果运行的时间超过了设定,事件循环终止。

loop.run_forever()
loop.close()

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 实现列表分列与字典分列及三个实例

本文讲解了列表和字典转化为pandas的列的多种方法及实战例子和教程。

1.问题来源

源于林胖发出的一道基础题:

2.解法

2.1 基础解法explode函数

这道题最简单的解法,相信大部分用过pandas的朋友都会,林胖也马上发出了自己的答案:

import pandas as pd

mydict = {'A': [1], 'B': [2, 3], 'C': [4, 5, 6]}
pd.DataFrame(mydict.items()).explode(1)

结果:

详解

mydict.items()是python基础字典的内容,它返回了这个字典键值对组成的元组列表:

mydict.items()

返回:

dict_items([('A', [1]), ('B', [2, 3]), ('C', [4, 5, 6])])

将这个内部是元组的可迭代对象传入DataFrame的构造函数中:

pd.DataFrame(mydict.items())

返回结果:

这是pandas最基础的开篇知识点使用可迭代对象构造DataFrame,列表的每个元素都是整个DataFrame对应的一行,而这个元素内部迭代出来的每个元素将构成DataFrame的某一列。

然后再看看这个explode函数,它是pandas 0.25版本才出现的函数,只有一个参数可以传入列名,然后该函数就可以把该列的列表每个元素扩展到多行上。

效果与hive使用lateral view+explode实现的效果几乎一致,类似于:

select a,b_i from df lateral view explode(b) tmp as b_i;

可以参考很早之前的一篇文章:https://blog.csdn.net/as604049322/article/details/105985770

2.2 没有exlode函数如何解决这个问题

但是,黄佬说版本太低没有这个函数,于是我给群友们出了一道题:

在黄佬的邀请下,一位经过我多次辅导的群友率先使用了循环法解题:

我觉得非常棒,但我也希望看到有人再用变形法实现一次。林胖和一位群友再次给出了简化版本的循环解法:

经过一番提示后,小五哥和林胖终于给出了变形法的解法:

非常不错,群友们终于独立的多思路解决了这个问题,真的要撒花呀!!!

下面我们详细分析一下,循环法和变形法的解法吧:

2.3 循环法解题

基本写法:

result = []
for k, vs in mydict.items():
    for v in vs:
        result.append((k, v))
pd.DataFrame(result)

本质上就是实现了一个笛卡尔积的拉平操作,将mydict.items这个可迭代对象的元组构造笛卡尔积并按照整体拉平。

上面的基本写法,应该99%以上的朋友都能看懂,但 林胖 的循环简化解法:

import itertools
result = []
for k, v in mydict.items():
    result.extend(itertools.product(k, v))
pd.DataFrame(result)

部分朋友可能没有看明白,这个就需要查询一下product方法的官方文档(https://docs.python.org/zh-cn/3.7/library/itertools.html?highlight=product#itertools.product):

product(*iterables, repeat=1) --> product object

参数:

  • iterables 为可迭代对象
  • 可选参数repeat 表示重复次数

用于生成可迭代对象输入的笛卡儿积,相当于生成器表达式中的嵌套循环。

例如:product(A, B) 中的元素A和B将共同构成可迭代元素[A, B]作为iterables传入和 ((x,y) for x in A for y in B) 返回结果一样。

返回示例:

  • product(‘ab’, range(3)) –> (‘a’,0) (‘a’,1) (‘a’,2) (‘b’,0) (‘b’,1) (‘b’,2)
  • product((0,1), (0,1), (0,1)) –> (0,0,0) (0,0,1) (0,1,0) (0,1,1) (1,0,0) …

也可以传入可选参数 repeat 表示重复的次数:例如,product(A, repeat=4)product(A, A, A, A) 的返回结果是一样的。


列表的extend方法是将可迭代对象的每个元素都添加到列表中,而append方法只能添加单个元素。

当然,我们还可以将整个for循环改写成列表生成式:

result = [(k, v) for k, vs in mydict.items() for v in vs]
pd.DataFrame(result)

也可以简化代码量。

2.4 变形法解题

df = pd.DataFrame(mydict.items(), columns=["a", "b"])
df

实现思路,上面的界面是下面最左边:

2.4.1 列表分列的2种方法

列表分列的思路:Pandas的Series对象调用apply方法单个元素返回的结果是Series时,这个Series的每个数据会作为Datafrem的每一列,索引会作为列名。

对Series进行列表分列

例如:

df["b"].apply(pd.Series)

结果:

不过这样会丢失原本的”a”列,我们可以先将”a”列设置为索引,再进行Series分列操作:

df.set_index("a")["b"].apply(pd.Series)

或者把结果设置成原本的”a”列为索引:

df["b"].apply(pd.Series).set_index(df["a"])

结果均为上述实现思路的第二步。

直接对Datafream进行列表分列

如果我们希望直接使用Datafream实现分列可以借助agg方法,因为agg方法是对每一列的Series对象操作:

df.agg({"a": lambda x: x, "b": pd.Series})

结果:

但这操作导致列多了一个级别,需要删除:

df.agg({"a": lambda x: x, "b": pd.Series}).droplevel(0, axis=1)

结果:

只要再执行set_index("a")

df.agg({"a": lambda x: x, "b": pd.Series}).droplevel(0, axis=1).set_index("a")

结果就会与实现思路的第二步结果一致。

2.4.2 将字典的键作为索引的2种读取方法

当然上面我只是为了给大家讲述分列的一些方法。对于这个例子,其实我们可以直接通过pd.DataFrame.from_dict方法orient参数传入’index’,直接获得第二步的结果(只是索引没有名称):

df = pd.DataFrame.from_dict(mydict, 'index')

或者分别传入data和索引index:

df = pd.DataFrame(data=mydict.values(), index=mydict.keys())

都能得到以下结果:

2.4.3 melt实现逆透视

说起逆透视我个人首先想到了melt方法,然后才想到melt方法实现的本质用到了stack方法。

为了避免索引丢失,我们首先还原索引为普通的列:

df = df.rename_axis(index="a").reset_index()
df

结果:

然后使用melt方法进行逆透视:

df.melt(id_vars='a', value_name='b')

结果:

然后删除第二列,再删除空值行,再将数值列转换为整数类型就搞定。

最终代码:

df = pd.DataFrame.from_dict(mydict, 'index')
df = df.melt(id_vars='a', value_name='b').drop(columns="variable").dropna()
df.b = df.b.astype("int")
df

成功得到结果:

2.4.4 stack实现逆透视

df = pd.DataFrame.from_dict(mydict, 'index')
df.stack()

结果:

A  0    1.0
B  0    2.0
   1    3.0
C  0    4.0
   1    5.0
   2    6.0
dtype: float64

结果返回了一个多级索引的Series,我们首先需要删除索引中多余的部分:

df.stack().droplevel(1)

结果:

A    1.0
B    2.0
B    3.0
C    4.0
C    5.0
C    6.0
dtype: float64

此时我们再还原索引到普通列:

df.stack().droplevel(1).reset_index()

再重新设置一下列名:

df.stack().droplevel(1).reset_index().set_axis(["a", "b"], axis=1)

最后重设一下B列的类型:

df.b = df.b.astype("int")

最终代码:

df = pd.DataFrame.from_dict(mydict, 'index')
df = df.stack().droplevel(1).reset_index().set_axis(["a", "b"], axis=1)
df.b = df.b.astype("int")
df

结果:

2.实际应用

这次我将分享三个实际案例,让大家看看列表分列的一些实际应用。

首先,我们先导包并设置Pandas显示参数:

import pandas as pd
pd.set_option("display.max_colwidth"100)

正则提取并分列

需求:

读取数据:

df = pd.read_excel("正则提取与分列.xlsm", usecols=[0])
df.head()

结果:

实现代码:

result = df.copy()
result["tmp"] = result["补回原因"].str.findall("([\d.]+[到至][\d.]+)")
result = result.agg({"补回原因"lambda x: x, "tmp": pd.Series}).droplevel(0, axis=1)
result.head()

结果:

分步解析:

df["tmp"] = df["补回原因"].str.findall("([\d.]+[到至][\d.]+)")
df.head(5)

结果:

这步使用正则提取出每个日期字符串,[\d.]+表示连续的数字或.用于匹配时间字符串,两个时间之间的连接字符可能是到或至。

然后我使用agg函数直接对Datafream分列:

df.agg({"补回原因"lambda x: x, "tmp": pd.Series})

结果:

由于列索引多了一级,所以需要删除:

df.agg({"补回原因"lambda x: x, "tmp": pd.Series}).droplevel(0, axis=1).head()

结果:

droplevel(0, axis=1)用于删除多级索引指定的级别,axis=0可以删除行索引,axis=1则可以删除列索引,第一参数表示删除级别0。当然如果列索引存在名称时还可以传入名称字符串,可参考官网文档:

df = pd.DataFrame([
...     [1234],
...     [5678],
...     [9101112]
... ]).set_index([01]).rename_axis(['a''b'])
>>> df.columns = pd.MultiIndex.from_tuples([
...    ('c''e'), ('d''f')
... ], names=['level_1''level_2'])
>>> df
level_1   c   d
level_2   e   f
a b
1 2      3   4
5 6      7   8
9 10    11  12
>>> df.droplevel('a')
level_1   c   d
level_2   e   f
b
2        3   4
6        7   8
10      11  12
>>> df.droplevel('level2', axis=1)
level_1   c   d
a b
1 2      3   4
5 6      7   8
9 10    11  12

分组聚合并分列

需求:

首先,读取数据:

df = pd.read_excel("分组聚合并分列.xlsx")
df

结果:

实现代码:

(
    df.groupby("姓名")["得分"]
    .apply(list)
    .apply(pd.Series)
    .fillna("")
    .rename(columns=lambda x: f"得分{x+1}")
    .reset_index()
    .astype({"得分1":"int8"})
)

结果:

分布解析:

首先将每个姓名的得分聚合成列表,并最终返回一个Series:

df.groupby("姓名")["得分"].apply(list)

结果:

姓名
孙四娘          [7, 28]
看见星光    [88, 28, 23]
看见月光    [69, 10, 87]
老祝          [51, 29]
马青梅             [99]
Name: 得分, dtype: object

当然,这步的标准写法应该是使用Series的内部方法:

df.groupby("姓名")["得分"].apply(lambda x:x.to_list())

使用Series内部方法的性能比python列表方法转换快一些。

作为一个Series就可以通过将每个列表元素转换为Series,从而最终返回一个分列的Datafream:

_.apply(pd.Series)

结果:

注意:_在ipython表示上一个输出返回的结果,jupyter还额外支持_num表示num编号单元格的输出。

_.fillna("")

结果:

fillna表示填充缺失值,传入””表示将缺失值填充为空字符串。

下面重命名一下列名:

_.rename(columns=lambda x: f"得分{x+1}")

结果:

然后还原索引:

_.reset_index()

结果:

发现结果中有一列,不是整数,所以还原成整数(总分100分,8位足够存储):

_.astype({"得分1":"int8"})

结果:

解析json字符串并字典分列

需求:

首先读取数据:

df = pd.read_excel("字典分列.xlsx")
df.head()

结果:

处理代码:

result = df.features.apply(eval).apply(pd.Series)
result["counts"] = df.counts
result

结果:

  储存条件 品牌 推荐理由 品种 食用方式 是否进口 特色服务 是否有机 counts
0 常温 NaN NaN NaN NaN NaN NaN NaN 33
1 冷藏 NaN NaN NaN NaN NaN NaN NaN 24
2 常温 禾煜 NaN NaN NaN NaN NaN NaN 22
3 常温 妙洁 NaN NaN NaN NaN NaN NaN 16
4 冷冻 NaN NaN NaN NaN NaN NaN NaN 14
2083 常温 乐事 够薄够脆 NaN NaN NaN NaN NaN 1
2084 冷藏 NaN 生态种植 黄瓜 NaN NaN NaN 有机 1
2085 冷藏 NaN 腥味较淡 鲫鱼 NaN NaN 免费宰杀 NaN 1
2086 冷藏 NaN 甜脆可口 佛手瓜 NaN NaN NaN NaN 1
2087 冷藏 叮咚日日鲜 全程可追溯 猪小排 NaN NaN NaN NaN 1

2088 rows × 9 columns

浅析:

df.features.apply(eval)用于将features列的每个json字符串解析为字典对象。

**.apply(pd.Series)则可以将每个字典对象转换成Series,则可以将该字典扩展到多列,并将原始的Series转换为Datafream。

result["counts"] = df.counts则将原始数据的counts列添加到结果列中。

本文转自快学Python,有部分增删。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Redis适合做队列吗?详解三种Redis队列的使用方式

作者:Magic Kaito

来源:水滴与银弹

我经常听到很多人讨论,关于「把 Redis 当作队列来用是否合适」的问题。

有些人表示赞成,他们认为 Redis 很轻量,用作队列很方便。

也些人则反对,认为 Redis 会「丢」数据,最好还是用「专业」的队列中间件更稳妥。

究竟哪种方案更好呢?

这篇文章,我就和你聊一聊把 Redis 当作队列,究竟是否合适这个问题。

我会从简单到复杂,一步步带你梳理其中的细节,把这个问题真正的讲清楚。

看完这篇文章后,我希望你对这个问题你会有全新的认识。

在文章的最后,我还会告诉你关于「技术选型」的思路,文章有点长,希望你可以耐心读完。

从最简单的开始:List 队列

首先,我们先从最简单的场景开始讲起。

如果你的业务需求足够简单,想把 Redis 当作队列来使用,肯定最先想到的就是使用 List 这个数据类型。

因为 List 底层的实现就是一个「链表」,在头部和尾部操作元素,时间复杂度都是 O(1),这意味着它非常符合消息队列的模型。

如果把 List 当作队列,你可以这么来用。

生产者使用 LPUSH 发布消息:

127.0.0.1:6379> LPUSH queue msg1
(integer) 1
127.0.0.1:6379> LPUSH queue msg2
(integer) 2

消费者这一侧,使用 RPOP 拉取消息:

127.0.0.1:6379> RPOP queue
"msg1"
127.0.0.1:6379> RPOP queue
"msg2"

这个模型非常简单,也很容易理解。

但这里有个小问题,当队列中已经没有消息了,消费者在执行 RPOP 时,会返回 NULL。

127.0.0.1:6379> RPOP queue
(nil)   // 没消息了

而我们在编写消费者逻辑时,一般是一个「死循环」,这个逻辑需要不断地从队列中拉取消息进行处理,伪代码一般会这么写:

while true:
    msg = redis.rpop("queue")
    // 没有消息,继续循环
    if msg == null:
        continue
    // 处理消息
    handle(msg)

如果此时队列为空,那消费者依旧会频繁拉取消息,这会造成「CPU 空转」,不仅浪费 CPU 资源,还会对 Redis 造成压力。

怎么解决这个问题呢?

也很简单,当队列为空时,我们可以「休眠」一会,再去尝试拉取消息。代码可以修改成这样:

while true:
    msg = redis.rpop("queue")
    // 没有消息,休眠2s
    if msg == null:
        sleep(2)
        continue
    // 处理消息        
    handle(msg)

这就解决了 CPU 空转问题。

这个问题虽然解决了,但又带来另外一个问题:当消费者在休眠等待时,有新消息来了,那消费者处理新消息就会存在「延迟」。

假设设置的休眠时间是 2s,那新消息最多存在 2s 的延迟。

要想缩短这个延迟,只能减小休眠的时间。但休眠时间越小,又有可能引发 CPU 空转问题。

鱼和熊掌不可兼得。

那如何做,既能及时处理新消息,还能避免 CPU 空转呢?

Redis 是否存在这样一种机制:如果队列为空,消费者在拉取消息时就「阻塞等待」,一旦有新消息过来,就通知我的消费者立即处理新消息呢?

幸运的是,Redis 确实提供了「阻塞式」拉取消息的命令:BRPOP / BLPOP,这里的 B 指的是阻塞(Block)。

现在,你可以这样来拉取消息了:

while true:
    // 没消息阻塞等待,0表示不设置超时时间
    msg = redis.brpop("queue"0)
    if msg == null:
        continue
    // 处理消息
    handle(msg)

使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个「超时时间」,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 NULL。

这个方案不错,既兼顾了效率,还避免了 CPU 空转问题,一举两得。

注意:如果设置的超时时间太长,这个连接太久没有活跃过,可能会被 Redis Server 判定为无效连接,之后 Redis Server 会强制把这个客户端踢下线。所以,采用这种方案,客户端要有重连机制。

解决了消息处理不及时的问题,你可以再思考一下,这种队列模型,有什么缺点?

我们一起来分析一下:

  1. 不支持重复消费:消费者拉取消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费,即不支持多个消费者消费同一批数据
  2. 消息丢失:消费者拉取到消息后,如果发生异常宕机,那这条消息就丢失了

第一个问题是功能上的,使用 List 做消息队列,它仅仅支持最简单的,一组生产者对应一组消费者,不能满足多组生产者和消费者的业务场景。

第二个问题就比较棘手了,因为从 List 中 POP 一条消息出来后,这条消息就会立即从链表中删除了。也就是说,无论消费者是否处理成功,这条消息都没办法再次消费了。

这也意味着,如果消费者在处理消息时异常宕机,那这条消息就相当于丢失了。

针对这 2 个问题怎么解决呢?我们一个个来看。

发布/订阅模型:Pub/Sub

从名字就能看出来,这个模块是 Redis 专门是针对「发布/订阅」这种队列模型设计的。

它正好可以解决前面提到的第一个问题:重复消费。

即多组生产者、消费者的场景,我们来看它是如何做的。

Redis 提供了 PUBLISH / SUBSCRIBE 命令,来完成发布、订阅的操作。

假设你想开启 2 个消费者,同时消费同一批数据,就可以按照以下方式来实现。

首先,使用 SUBSCRIBE 命令,启动 2 个消费者,并「订阅」同一个队列。

// 2个消费者 都订阅一个队列
127.0.0.1:6379> SUBSCRIBE queue
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "queue"
3) (integer) 1

此时,2 个消费者都会被阻塞住,等待新消息的到来。

之后,再启动一个生产者,发布一条消息。

127.0.0.1:6379> PUBLISH queue msg1
(integer) 1

这时,2 个消费者就会解除阻塞,收到生产者发来的新消息。

127.0.0.1:6379> SUBSCRIBE queue
// 收到新消息
1) "message"
2) "queue"
3) "msg1"

看到了么,使用 Pub/Sub 这种方案,既支持阻塞式拉取消息,还很好地满足了多组消费者,消费同一批数据的业务需求。

除此之外,Pub/Sub 还提供了「匹配订阅」模式,允许消费者根据一定规则,订阅「多个」自己感兴趣的队列。

// 订阅符合规则的队列
127.0.0.1:6379> PSUBSCRIBE queue.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "queue.*"
3) (integer) 1

这里的消费者,订阅了 queue.* 相关的队列消息。

之后,生产者分别向 queue.p1 和 queue.p2 发布消息。

127.0.0.1:6379> PUBLISH queue.p1 msg1
(integer) 1
127.0.0.1:6379> PUBLISH queue.p2 msg2
(integer) 1

这时再看消费者,它就可以接收到这 2 个生产者的消息了。

127.0.0.1:6379> PSUBSCRIBE queue.*
Reading messages... (press Ctrl-C to quit)
...
// 来自queue.p1的消息
1) "pmessage"
2) "queue.*"
3) "queue.p1"
4) "msg1"

// 来自queue.p2的消息
1) "pmessage"
2) "queue.*"
3) "queue.p2"
4) "msg2"

我们可以看到,Pub/Sub 最大的优势就是,支持多组生产者、消费者处理消息。

讲完了它的优点,那它有什么缺点呢?

其实,Pub/Sub 最大问题是:丢数据

如果发生以下场景,就有可能导致数据丢失:

  1. 消费者下线
  2. Redis 宕机
  3. 消息堆积

究竟是怎么回事?

这其实与 Pub/Sub 的实现方式有很大关系。

Pub/Sub 在实现时非常简单,它没有基于任何数据类型,也没有做任何的数据存储,它只是单纯地为生产者、消费者建立「数据转发通道」,把符合规则的数据,从一端转发到另一端。

一个完整的发布、订阅消息处理流程是这样的:

  1. 消费者订阅指定队列,Redis 就会记录一个映射关系:队列->消费者
  2. 生产者向这个队列发布消息,那 Redis 就从映射关系中找出对应的消费者,把消息转发给它

看到了么,整个过程中,没有任何的数据存储,一切都是实时转发的。

这种设计方案,就导致了上面提到的那些问题。

例如,如果一个消费者异常挂掉了,它再重新上线后,只能接收新的消息,在下线期间生产者发布的消息,因为找不到消费者,都会被丢弃掉。

如果所有消费者都下线了,那生产者发布的消息,因为找不到任何一个消费者,也会全部「丢弃」。

所以,当你在使用 Pub/Sub 时,一定要注意:消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失。

这也是前面讲例子时,我们让消费者先订阅队列,之后才让生产者发布消息的原因。

另外,因为 Pub/Sub 没有基于任何数据类型实现,所以它也不具备「数据持久化」的能力。

也就是说,Pub/Sub 的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,Pub/Sub 的数据也会全部丢失。

最后,我们来看 Pub/Sub 在处理「消息积压」时,为什么也会丢数据?

当消费者的速度,跟不上生产者时,就会导致数据积压的情况发生。

如果采用 List 当作队列,消息积压时,会导致这个链表很长,最直接的影响就是,Redis 内存会持续增长,直到消费者把所有数据都从链表中取出。

但 Pub/Sub 的处理方式却不一样,当消息积压时,有可能会导致消费失败和消息丢失

这是怎么回事?

还是回到 Pub/Sub 的实现细节上来说。

每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个「缓冲区」,这个缓冲区其实就是一块内存。

当生产者发布消息时,Redis 先把消息写到对应消费者的缓冲区中。

之后,消费者不断地从缓冲区读取消息,处理消息。

但是,问题就出在这个缓冲区上。

因为这个缓冲区其实是有「上限」的(可配置),如果消费者拉取消息很慢,就会造成生产者发布到缓冲区的消息开始积压,缓冲区内存持续增长。

如果超过了缓冲区配置的上限,此时,Redis 就会「强制」把这个消费者踢下线。

这时消费者就会消费失败,也会丢失数据。

如果你有看过 Redis 的配置文件,可以看到这个缓冲区的默认配置:client-output-buffer-limit pubsub 32mb 8mb 60。

它的参数含义如下:

  • 32mb:缓冲区一旦超过 32MB,Redis 直接强制把消费者踢下线
  • 8mb + 60:缓冲区超过 8MB,并且持续 60 秒,Redis 也会把消费者踢下线

Pub/Sub 的这一点特点,是与 List 作队列差异比较大的。

从这里你应该可以看出,List 其实是属于「拉」模型,而 Pub/Sub 其实属于「推」模型

List 中的数据可以一直积压在内存中,消费者什么时候来「拉」都可以。

但 Pub/Sub 是把消息先「推」到消费者在 Redis Server 上的缓冲区中,然后等消费者再来取。

当生产、消费速度不匹配时,就会导致缓冲区的内存开始膨胀,Redis 为了控制缓冲区的上限,所以就有了上面讲到的,强制把消费者踢下线的机制。

好了,现在我们总结一下 Pub/Sub 的优缺点:

  1. 支持发布 / 订阅,支持多组生产者、消费者处理消息
  2. 消费者下线,数据会丢失
  3. 不支持数据持久化,Redis 宕机,数据也会丢失
  4. 消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失

有没有发现,除了第一个是优点之外,剩下的都是缺点。

所以,很多人看到 Pub/Sub 的特点后,觉得这个功能很「鸡肋」。

也正是以上原因,Pub/Sub 在实际的应用场景中用得并不多。

目前只有哨兵集群和 Redis 实例通信时,采用了 Pub/Sub 的方案,因为哨兵正好符合即时通讯的业务场景。

我们再来看一下,Pub/Sub 有没有解决,消息处理时异常宕机,无法再次消费的问题呢?

其实也不行,Pub/Sub 从缓冲区取走数据之后,数据就从 Redis 缓冲区删除了,消费者发生异常,自然也无法再次重新消费。

好,现在我们重新梳理一下,我们在使用消息队列时的需求。

当我们在使用一个消息队列时,希望它的功能如下:

  • 支持阻塞等待拉取消息
  • 支持发布 / 订阅模式
  • 消费失败,可重新消费,消息不丢失
  • 实例宕机,消息不丢失,数据可持久化
  • 消息可堆积

Redis 除了 List 和 Pub/Sub 之外,还有符合这些要求的数据类型吗?

其实,Redis 的作者也看到了以上这些问题,也一直在朝着这些方向努力着。

Redis 作者在开发 Redis 期间,还另外开发了一个开源项目 disque。

这个项目的定位,就是一个基于内存的分布式消息队列中间件。

但由于种种原因,这个项目一直不温不火。

终于,在 Redis 5.0 版本,作者把 disque 功能移植到了 Redis 中,并给它定义了一个新的数据类型:Stream

下面我们就来看看,它能符合上面提到的这些要求吗?

趋于成熟的队列:Stream

我们来看 Stream 是如何解决上面这些问题的。

我们依旧从简单到复杂,依次来看 Stream 在做消息队列时,是如何处理的?

首先,Stream 通过 XADD 和 XREAD 完成最简单的生产、消费模型:

  • XADD:发布消息
  • XREAD:读取消息

生产者发布 2 条消息:

// *表示让Redis自动生成消息ID
127.0.0.1:6379> XADD queue * name zhangsan
"1618469123380-0"
127.0.0.1:6379> XADD queue * name lisi
"1618469127777-0"

使用 XADD 命令发布消息,其中的「*」表示让 Redis 自动生成唯一的消息 ID。

这个消息 ID 的格式是「时间戳-自增序号」。

消费者拉取消息:

// 从开头读取5条消息,0-0表示从开头读取
127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0
1) 1) "queue"
   2) 1) 1) "1618469123380-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618469127777-0"
         2) 1) "name"
            2) "lisi"

如果想继续拉取消息,需要传入上一条消息的 ID:

127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0
(nil)

没有消息,Redis 会返回 NULL。

以上就是 Stream 最简单的生产、消费。

这里不再重点介绍 Stream 命令的各种参数,我在例子中演示时,凡是大写的单词都是「固定」参数,凡是小写的单词,都是可以自己定义的,例如队列名、消息长度等等,下面的例子规则也是一样,为了方便你理解,这里有必要提醒一下。

下面我们来看,针对前面提到的消息队列要求,Stream 都是如何解决的?

1) Stream 是否支持「阻塞式」拉取消息?

可以的,在读取消息时,只需要增加 BLOCK 参数即可。

// BLOCK 0 表示阻塞等待,不设置超时时间
127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0

这时,消费者就会阻塞等待,直到生产者发布新的消息才会返回。

2) Stream 是否支持发布 / 订阅模式?

也没问题,Stream 通过以下命令完成发布订阅:

  • XGROUP:创建消费者组
  • XREADGROUP:在指定消费组下,开启消费者拉取消息

下面我们来看具体如何做?

首先,生产者依旧发布 2 条消息:

127.0.0.1:6379> XADD queue * name zhangsan
"1618470740565-0"
127.0.0.1:6379> XADD queue * name lisi
"1618470743793-0"

之后,我们想要开启 2 组消费者处理同一批数据,就需要创建 2 个消费者组:

// 创建消费者组1,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE queue group1 0-0
OK
// 创建消费者组2,0-0表示从头拉取消息
127.0.0.1:6379> XGROUP CREATE queue group2 0-0
OK

消费者组创建好之后,我们可以给每个「消费者组」下面挂一个「消费者」,让它们分别处理同一批数据。

第一个消费组开始消费:

// group1的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
   2) 1) 1) "1618470740565-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618470743793-0"
         2) 1) "name"
            2) "lisi"

同样地,第二个消费组开始消费:

// group2的consumer开始消费,>表示拉取最新数据
127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue >
1) 1) "queue"
   2) 1) 1) "1618470740565-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618470743793-0"
         2) 1) "name"
            2) "lisi"

我们可以看到,这 2 组消费者,都可以获取同一批数据进行处理了。

这样一来,就达到了多组消费者「订阅」消费的目的。

3) 消息处理时异常,Stream 能否保证消息不丢失,重新消费?

除了上面拉取消息时用到了消息 ID,这里为了保证重新消费,也要用到这个消息 ID。

当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为「处理完成」。

// group1下的 1618472043089-0 消息已处理完成
127.0.0.1:6379> XACK queue group1 1618472043089-0

如果消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息。

待这组消费者重新上线后,Redis 就会把之前没有处理成功的数据,重新发给这个消费者。这样一来,即使消费者异常,也不会丢失数据了。

// 消费者重新上线,0-0表示重新拉取未ACK的消息
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS queue 0-0
// 之前没消费成功的数据,依旧可以重新消费
1) 1) "queue"
   2) 1) 1) "1618472043089-0"
         2) 1) "name"
            2) "zhangsan"
      2) 1) "1618472045158-0"
         2) 1) "name"
            2) "lisi"

4) Stream 数据会写入到 RDB 和 AOF 做持久化吗?

Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。

我们只需要配置好持久化策略,这样的话,就算 Redis 宕机重启,Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。

5) 消息堆积时,Stream 是怎么处理的?

其实,当消息队列发生消息堆积时,一般只有 2 个解决方案:

  1. 生产者限流:避免消费者处理不及时,导致持续积压
  2. 丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息

而 Redis 在实现 Stream 时,采用了第 2 个方案。

在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。

// 队列长度最大10000
127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan
"1618473015018-0"

当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。

这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。

除了以上介绍到的命令,Stream 还支持查看消息长度(XLEN)、查看消费者状态(XINFO)等命令,使用也比较简单,你可以查询官方文档了解一下,这里就不过多介绍了。

好了,通过以上介绍,我们可以看到,Redis 的 Stream 几乎覆盖到了消息队列的各种场景,是不是觉得很完美?

既然它的功能这么强大,这是不是意味着,Redis 真的可以作为专业的消息队列中间件来使用呢?

但是还「差一点」,就算 Redis 能做到以上这些,也只是「趋近于」专业的消息队列。

原因在于 Redis 本身的一些问题,如果把其定位成消息队列,还是有些欠缺的。

到这里,就不得不把 Redis 与专业的队列中间件做对比了。

下面我们就来看一下,Redis 在作队列时,到底还有哪些欠缺?

与专业的消息队列对比

其实,一个专业的消息队列,必须要做到两大块:

  1. 消息不丢
  2. 消息可堆积

前面我们讨论的重点,很大篇幅围绕的是第一点展开的。

这里我们换个角度,从一个消息队列的「使用模型」来分析一下,怎么做,才能保证数据不丢?

使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者

消息是否会发生丢失,其重点也就在于以下 3 个环节:

  1. 生产者会不会丢消息?
  2. 消费者会不会丢消息?
  3. 队列中间件会不会丢消息?

1) 生产者会不会丢消息?

当生产者在发布消息时,可能发生以下异常情况:

  1. 消息没发出去:网络故障或其它问题导致发布失败,中间件直接返回失败
  2. 不确定是否发布成功:网络问题导致发布超时,可能数据已发送成功,但读取响应结果超时了

如果是情况 1,消息根本没发出去,那么重新发一次就好了。

如果是情况 2,生产者没办法知道消息到底有没有发成功?所以,为了避免消息丢失,它也只能继续重试,直到发布成功为止。

生产者一般会设定一个最大重试次数,超过上限依旧失败,需要记录日志报警处理。

也就是说,生产者为了避免消息丢失,只能采用失败重试的方式来处理。

但发现没有?这也意味着消息可能会重复发送。

是的,在使用消息队列时,要保证消息不丢,宁可重发,也不能丢弃。

那消费者这边,就需要多做一些逻辑了。

对于敏感业务,当消费者收到重复数据数据时,要设计幂等逻辑,保证业务的正确性。

从这个角度来看,生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。

所以,无论是 Redis 还是专业的队列中间件,生产者在这一点上都是可以保证消息不丢的。

2) 消费者会不会丢消息?

这种情况就是我们前面提到的,消费者拿到消息后,还没处理完成,就异常宕机了,那消费者还能否重新消费失败的消息?

要解决这个问题,消费者在处理完消息后,必须「告知」队列中间件,队列中间件才会把标记已处理,否则仍旧把这些数据发给消费者。

这种方案需要消费者和中间件互相配合,才能保证消费者这一侧的消息不丢。

无论是 Redis 的 Stream,还是专业的队列中间件,例如 RabbitMQ、Kafka,其实都是这么做的。

所以,从这个角度来看,Redis 也是合格的。

3) 队列中间件会不会丢消息?

前面 2 个问题都比较好处理,只要客户端和服务端配合好,就能保证生产端、消费端都不丢消息。

但是,如果队列中间件本身就不可靠呢?

毕竟生产者和消费这都依赖它,如果它不可靠,那么生产者和消费者无论怎么做,都无法保证数据不丢。

在这个方面,Redis 其实没有达到要求。

Redis 在以下 2 个场景下,都会导致数据丢失。

  1. AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能
  2. 主从复制也是异步的,主从切换时,也存在丢失数据的可能(从库还未同步完成主库发来的数据,就被提成主库)

基于以上原因我们可以看到,Redis 本身的无法保证严格的数据完整性

所以,如果把 Redis 当做消息队列,在这方面是有可能导致数据丢失的。

再来看那些专业的消息队列中间件是如何解决这个问题的?

像 RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时,一般是部署一个集群,生产者在发布消息时,队列中间件通常会写「多个节点」,以此保证消息的完整性。这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。

也正因为如此,RabbitMQ、Kafka在设计时也更复杂。毕竟,它们是专门针对队列场景设计的。

但 Redis 的定位则不同,它的定位更多是当作缓存来用,它们两者在这个方面肯定是存在差异的。

最后,我们来看消息积压怎么办?

4) 消息积压怎么办?

因为 Redis 的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。

所以,Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。

但 Kafka、RabbitMQ 这类消息队列就不一样了,它们的数据都会存储在磁盘上,磁盘的成本要比内存小得多,当消息积压时,无非就是多占用一些磁盘空间,相比于内存,在面对积压时也会更加「坦然」。

综上,我们可以看到,把 Redis 当作队列来使用时,始终面临的 2 个问题:

  1. Redis 本身可能会丢数据
  2. 面对消息积压,Redis 内存资源紧张

到这里,Redis 是否可以用作队列,我想这个答案你应该会比较清晰了。

如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。

而且,Redis 相比于 Kafka、RabbitMQ,部署和运维也更加轻量。

如果你的业务场景对于数据丢失非常敏感,而且写入量非常大,消息积压时会占用很多的机器资源,那么我建议你使用专业的消息队列中间件。

总结

好了,总结一下。这篇文章我们从「Redis 能否用作队列」这个角度出发,介绍了 List、Pub/Sub、Stream 在做队列的使用方式,以及它们各自的优劣。

之后又把 Redis 和专业的消息队列中间件做对比,发现 Redis 的不足之处。

最后,我们得出 Redis 做队列的合适场景。

这里我也列了一个表格,总结了它们各自的优缺点。

后记

最后,我想和你再聊一聊关于「技术方案选型」的问题。

你应该也看到了,这篇文章虽然始于 Redis,但并不止于 Redis。

我们在分析 Redis 细节时,一直在提出问题,然后寻找更好的解决方案,在文章最后,又聊到一个专业的消息队列应该怎么做。

其实,我们在讨论技术选型时,就是一个关于如何取舍的问题。

而这里我想传达给你的信息是,在面对技术选型时,不要不经过思考就觉得哪个方案好,哪个方案不好

你需要根据具体场景具体分析,这里我把这个分析过程分为 2 个层面:

  1. 业务功能角度
  2. 技术资源角度

这篇文章所讲到的内容,都是以业务功能角度出发做决策的。

但这里的第二点,从技术资源角度出发,其实也很重要。

技术资源的角度是说,你所处的公司环境、技术资源能否匹配这些技术方案

这个怎么解释呢?

简单来讲,就是你所在的公司、团队,是否有匹配的资源能 hold 住这些技术方案。

我们都知道 Kafka、RabbitMQ 是非常专业的消息中间件,但它们的部署和运维,相比于 Redis 来说,也会更复杂一些。

如果你在一个大公司,公司本身就有优秀的运维团队,那么使用这些中间件肯定没问题,因为有足够优秀的人能 hold 住这些中间件,公司也会投入人力和时间在这个方向上。

但如果你是在一个初创公司,业务正处在快速发展期,暂时没有能 hold 住这些中间件的团队和人,如果贸然使用这些组件,当发生故障时,排查问题也会变得很困难,甚至会阻碍业务的发展。

而这种情形下,如果公司的技术人员对于 Redis 都很熟,综合评估来看,Redis 也基本可以满足业务 90% 的需求,那当下选择 Redis 未必不是一个好的决策。

所以,做技术选型不只是技术问题,还与人、团队、管理、组织结构有关

也正是因为这些原因,当你在和别人讨论技术选型问题时,你会发现每个公司的做法都不相同。

毕竟每个公司所处的环境和文化不一样,做出的决策当然就会各有差异。

如果你不了解这其中的逻辑,那在做技术选型时,只会趋于表面现象,无法深入到问题根源。

而一旦你理解了这个逻辑,那么你在看待这个问题时,不仅对于技术会有更加深刻认识,对技术资源和人的把握,也会更加清晰。

希望你以后在做技术选型时,能够把这些因素也考虑在内,这对你的技术成长之路也是非常有帮助的。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

PyJNIus — 将Java的类转为Python的类

PyJNIus 是一个神奇的 Python 第三方模块。它能使用Java本地接口将Java类作为Python类访问的Python模块。

如果你需要在Python中使用Java 类,这个第三方模块是你最好的选择。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install pyjnius

2.快速开始

使用Jnius导入Java类特别简单,你只需要引入 autoclass 并引用你所需要的类即可:

>>> from jnius import autoclass
>>> autoclass('java.lang.System').out.println('Hello world')
Hello world

>>> Stack = autoclass('java.util.Stack')
>>> stack = Stack()
>>> stack.push('hello')
>>> stack.push('world')
>>> print(stack.pop())
world
>>> print(stack.pop())
hello

当你引入类后,你只需要按 Java 的函数操作即可,如上述代码中的 push 和 pop 函数。

最令人惊喜的是,你还能在安卓系统中利用这个模块使用Python调用Java类:

from time import sleep
from jnius import autoclass

Hardware = autoclass('org.renpy.android.Hardware')
print('DPI is', Hardware.getDPI())

Hardware.accelerometerEnable(True)
for x in xrange(20):
    print(Hardware.accelerometerReading())
    sleep(.1)

输出结果如下:

I/python  ( 5983): Android kivy bootstrap done. __name__ is __main__
I/python  ( 5983): Run user program, change dir and execute main.py
I/python  ( 5983): DPI is 160
I/python  ( 5983): [0.0, 0.0, 0.0]
I/python  ( 5983): [-0.0095768067985773087, 9.3852710723876953, 2.2218191623687744]
I/python  ( 5983): [-0.0095768067985773087, 9.3948478698730469, 2.2218191623687744]
I/python  ( 5983): [-0.0095768067985773087, 9.3948478698730469, 2.2026655673980713]
I/python  ( 5983): [-0.028730420395731926, 9.4044246673583984, 2.2122423648834229]
I/python  ( 5983): [-0.019153613597154617, 9.3852710723876953, 2.2026655673980713]
I/python  ( 5983): [-0.028730420395731926, 9.3852710723876953, 2.2122423648834229]
I/python  ( 5983): [-0.0095768067985773087, 9.3852710723876953, 2.1835119724273682]
I/python  ( 5983): [-0.0095768067985773087, 9.3756942749023438, 2.1835119724273682]
I/python  ( 5983): [0.019153613597154617, 9.3948478698730469, 2.2122423648834229]
I/python  ( 5983): [0.038307227194309235, 9.3852710723876953, 2.2218191623687744]
I/python  ( 5983): [-0.028730420395731926, 9.3948478698730469, 2.2026655673980713]
I/python  ( 5983): [-0.028730420395731926, 9.3852710723876953, 2.2122423648834229]
I/python  ( 5983): [-0.038307227194309235, 9.3756942749023438, 2.2026655673980713]
I/python  ( 5983): [0.3926490843296051, 9.3086557388305664, 1.3311761617660522]
I/python  ( 5983): [-0.10534487664699554, 9.4331550598144531, 2.1068975925445557]
I/python  ( 5983): [0.26815059781074524, 9.3469638824462891, 2.3463177680969238]
I/python  ( 5983): [-0.1149216815829277, 9.3852710723876953, 2.31758713722229]
I/python  ( 5983): [-0.038307227194309235, 9.41400146484375, 1.8674772977828979]
I/python  ( 5983): [0.13407529890537262, 9.4235782623291016, 2.2026655673980713]

为了能实现上述效果,你需要使用:python-for-android.

这是Android上Python应用程序的打包工具。您可以创建自己的Python发行版(包括所需的模块和依赖项),并将其与自己的代码捆绑在APK中。

详细教程可以见GitHub:
https://github.com/kivy/python-for-android

3.进阶使用

当您使用 autoclass 时,它将发现指定Java类的所有方法和字段并对其进行解析。如果你只想声明和使用所需的内容。可以这么弄:

from time import sleep
from jnius import MetaJavaClass, JavaClass, JavaMethod, JavaStaticMethod

class Hardware(JavaClass):
    __metaclass__ = MetaJavaClass
    __javaclass__ = 'org/renpy/android/Hardware'
    vibrate = JavaStaticMethod('(D)V')
    accelerometerEnable = JavaStaticMethod('(Z)V')
    accelerometerReading = JavaStaticMethod('()[F')
    getDPI = JavaStaticMethod('()I')

# 使用这个新类
print('DPI is', Hardware.getDPI())

Hardware.accelerometerEnable()
for x in xrange(20):
    print(Hardware.accelerometerReading())
    sleep(.1)

这种形式支持你只引入你想要使用的类,不会造成资源浪费,代码效率更高。

尤其是对于安卓系统有限的资源而言,推荐使用这种局部引入的方式。

当然,如果你是桌面系统(windows, macOS),资源相对充足,使用autoclass引入的方式是可以接受的。不过,在Windows上,确保 JAVA_HOME 指向你的Java安装路径,以便 PyJNIus 可以找到 jvm.dll.

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

一行代码实战,用Python玩转所有的 emoji

本文简介

还记得刚刚玩儿QQ、微信时,表情轰炸的场景吗?
  • 小时候,快乐是件很简单的事儿!
  • 长大后,简单是件很快乐的事儿!
随着时间的推移,有些表情被淘汰了,有的表情被保留了下来。慢慢地,它似乎可以代替我们说话人的表情和语气了,它好像成为了社交必备。你或许还看过,微信表情可以当作证据提交,这样的新闻。
“表情”很好玩,我们可以随时随地用它表达我们此时此刻的情感;

“表情”很烦人,有些表情确实让人挺尴尬。

就是玩儿,反正玩儿 也不犯罪,大家开心就好。今天黄同学就带大家讲述如何用Python玩转“表情”。

安装emoji库

emoji库,属于第三方库。在使用之前,我们需要提前安装和导入。

① emoji库的安装

pip install emoji -i https://pypi.tuna.tsinghua.edu.cn/simple/
当出现successfully表示安装成功。

②emoji库的导入

import emoji

玩儿起来

emoji中主要就两个函数,供大家玩耍,分别是:
  • emojize():根据 code 生成 emoji 表情;
  • demojize():将 emoji 表情解码为code;
注意哦:默认情况,你只能使用一部分表情。很多表情,需要加参数use_aliases=True后,才可以展示。
昨天没睡好,今天好困呀!
emoji.emojize(“昨天没睡好,今天:zzz:呀”)
结果如下:
笑逐颜开,好彩自然来!
emoji.emojize(“:smile:逐颜开,好彩自然来!”,use_aliases=True)
结果如下:
黄同学是白羊座!
emoji.emojize(“黄同学是:aries:”,use_aliases=True)
结果如下:
小明是AB型血型!
emoji.emojize(“小明是:ab:型血型!”,use_aliases=True)
结果如下:
晚上十一点了!
emoji.emojize(“晚上:clock11:了!”,use_aliases=True)
结果如下:
其实这个库有很多表情包了,部分截图如下:
这里直接给大家上官网吧,大家可以自行下去玩玩儿。https://pypi.org/project/emoji/
本文转自快学Python

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 优化提速的 8 个小技巧

 

   作者:张皓

   来源:https://zhuanlan.zhihu.com/p/143052860

Python 是一种脚本语言,相比 C/C++ 这样的编译语言,在效率和性能方面存在一些不足。但是,有很多时候,Python 的效率并没有想象中的那么夸张。本文对一些 Python 代码加速运行的技巧进行整理。

0. 代码优化原则

本文会介绍不少的 Python 代码加速运行的技巧。在深入代码优化细节之前,需要了解一些代码优化基本原则。

第一个基本原则是不要过早优化。很多人一开始写代码就奔着性能优化的目标,“让正确的程序更快要比让快速的程序正确容易得多”。因此,优化的前提是代码能正常工作。过早地进行优化可能会忽视对总体性能指标的把握,在得到全局结果前不要主次颠倒。

第二个基本原则是权衡优化的代价。优化是有代价的,想解决所有性能的问题是几乎不可能的。通常面临的选择是时间换空间或空间换时间。另外,开发代价也需要考虑。

第三个原则是不要优化那些无关紧要的部分。如果对代码的每一部分都去优化,这些修改会使代码难以阅读和理解。如果你的代码运行速度很慢,首先要找到代码运行慢的位置,通常是内部循环,专注于运行慢的地方进行优化。在其他地方,一点时间上的损失没有什么影响。

1. 避免全局变量

# 不推荐写法。代码耗时:26.8秒
import math

size = 10000
for x in range(size):
    for y in range(size):
        z = math.sqrt(x) + math.sqrt(y)

许多程序员刚开始会用 Python 语言写一些简单的脚本,当编写脚本时,通常习惯了直接将其写为全局变量,例如上面的代码。但是,由于全局变量和局部变量实现方式不同,定义在全局范围内的代码运行速度会比定义在函数中的慢不少。通过将脚本语句放入到函数中,通常可带来 15% – 30% 的速度提升。

# 推荐写法。代码耗时:20.6秒
import math

def main():  # 定义到函数中,以减少全部变量使用
    size = 10000
    for x in range(size):
        for y in range(size):
            z = math.sqrt(x) + math.sqrt(y)

main()

2. 避免.

2.1 避免模块和函数属性访问

# 不推荐写法。代码耗时:14.5秒
import math

def computeSqrt(size: int):
    result = []
    for i in range(size):
        result.append(math.sqrt(i))
    return result

def main():
    size = 10000
    for _ in range(size):
        result = computeSqrt(size)

main()

每次使用.(属性访问操作符时)会触发特定的方法,如__getattribute__()__getattr__(),这些方法会进行字典操作,因此会带来额外的时间开销。通过from import语句,可以消除属性访问。

# 第一次优化写法。代码耗时:10.9秒
from math import sqrt

def computeSqrt(size: int):
    result = []
    for i in range(size):
        result.append(sqrt(i))  # 避免math.sqrt的使用
    return result

def main():
    size = 10000
    for _ in range(size):
        result = computeSqrt(size)

main()

在第 1 节中我们讲到,局部变量的查找会比全局变量更快,因此对于频繁访问的变量sqrt,通过将其改为局部变量可以加速运行。

# 第二次优化写法。代码耗时:9.9秒
import math

def computeSqrt(size: int):
    result = []
    sqrt = math.sqrt  # 赋值给局部变量
    for i in range(size):
        result.append(sqrt(i))  # 避免math.sqrt的使用
    return result

def main():
    size = 10000
    for _ in range(size):
        result = computeSqrt(size)

main()

除了math.sqrt外,computeSqrt函数中还有.的存在,那就是调用listappend方法。通过将该方法赋值给一个局部变量,可以彻底消除computeSqrt函数中for循环内部的.使用。

# 推荐写法。代码耗时:7.9秒
import math

def computeSqrt(size: int):
    result = []
    append = result.append
    sqrt = math.sqrt    # 赋值给局部变量
    for i in range(size):
        append(sqrt(i))  # 避免 result.append 和 math.sqrt 的使用
    return result

def main():
    size = 10000
    for _ in range(size):
        result = computeSqrt(size)

main()

2.2 避免类内属性访问

# 不推荐写法。代码耗时:10.4秒
import math
from typing import List

class DemoClass:
    def __init__(self, value: int):
        self._value = value
    
    def computeSqrt(self, size: int) -> List[float]:
        result = []
        append = result.append
        sqrt = math.sqrt
        for _ in range(size):
            append(sqrt(self._value))
        return result

def main():
    size = 10000
    for _ in range(size):
        demo_instance = DemoClass(size)
        result = demo_instance.computeSqrt(size)

main()

避免.的原则也适用于类内属性,访问self._value的速度会比访问一个局部变量更慢一些。通过将需要频繁访问的类内属性赋值给一个局部变量,可以提升代码运行速度。

# 推荐写法。代码耗时:8.0秒
import math
from typing import List

class DemoClass:
    def __init__(self, value: int):
        self._value = value
    
    def computeSqrt(self, size: int) -> List[float]:
        result = []
        append = result.append
        sqrt = math.sqrt
        value = self._value
        for _ in range(size):
            append(sqrt(value))  # 避免 self._value 的使用
        return result

def main():
    size = 10000
    for _ in range(size):
        demo_instance = DemoClass(size)
        demo_instance.computeSqrt(size)

main()

3. 避免不必要的抽象

# 不推荐写法,代码耗时:0.55秒
class DemoClass:
    def __init__(self, value: int):
        self.value = value

    @property
    def value(self) -> int:
        return self._value

    @value.setter
    def value(self, x: int):
        self._value = x

def main():
    size = 1000000
    for i in range(size):
        demo_instance = DemoClass(size)
        value = demo_instance.value
        demo_instance.value = i

main()

任何时候当你使用额外的处理层(比如装饰器、属性访问、描述器)去包装代码时,都会让代码变慢。大部分情况下,需要重新进行审视使用属性访问器的定义是否有必要,使用getter/setter函数对属性进行访问通常是 C/C++ 程序员遗留下来的代码风格。如果真的没有必要,就使用简单属性。

# 推荐写法,代码耗时:0.33秒
class DemoClass:
    def __init__(self, value: int):
        self.value = value  # 避免不必要的属性访问器

def main():
    size = 1000000
    for i in range(size):
        demo_instance = DemoClass(size)
        value = demo_instance.value
        demo_instance.value = i

main()

4. 避免数据复制

4.1 避免无意义的数据复制

# 不推荐写法,代码耗时:6.5秒
def main():
    size = 10000
    for _ in range(size):
        value = range(size)
        value_list = [x for x in value]
        square_list = [x * x for x in value_list]

main()

上面的代码中value_list完全没有必要,这会创建不必要的数据结构或复制。

# 推荐写法,代码耗时:4.8秒
def main():
    size = 10000
    for _ in range(size):
        value = range(size)
        square_list = [x * x for x in value]  # 避免无意义的复制

main()

另外一种情况是对 Python 的数据共享机制过于偏执,并没有很好地理解或信任 Python 的内存模型,滥用 copy.deepcopy()之类的函数。通常在这些代码中是可以去掉复制操作的。

4.2 交换值时不使用中间变量

# 不推荐写法,代码耗时:0.07秒
def main():
    size = 1000000
    for _ in range(size):
        a = 3
        b = 5
        temp = a
        a = b
        b = temp

main()

上面的代码在交换值时创建了一个临时变量temp,如果不借助中间变量,代码更为简洁、且运行速度更快。

# 推荐写法,代码耗时:0.06秒
def main():
    size = 1000000
    for _ in range(size):
        a = 3
        b = 5
        a, b = b, a  # 不借助中间变量

main()

4.3 字符串拼接用join而不是+

# 不推荐写法,代码耗时:2.6秒
import string
from typing import List

def concatString(string_list: List[str]) -> str:
    result = ''
    for str_i in string_list:
        result += str_i
    return result

def main():
    string_list = list(string.ascii_letters * 100)
    for _ in range(10000):
        result = concatString(string_list)

main()

当使用a + b拼接字符串时,由于 Python 中字符串是不可变对象,其会申请一块内存空间,将ab分别复制到该新申请的内存空间中。因此,如果要拼接 n 个字符串,会产生 n-1 个中间结果,每产生一个中间结果都需要申请和复制一次内存,严重影响运行效率。而使用join()拼接字符串时,会首先计算出需要申请的总的内存空间,然后一次性地申请所需内存,并将每个字符串元素复制到该内存中去。

# 推荐写法,代码耗时:0.3秒
import string
from typing import List

def concatString(string_list: List[str]) -> str:
    return ''.join(string_list)  # 使用 join 而不是 +

def main():
    string_list = list(string.ascii_letters * 100)
    for _ in range(10000):
        result = concatString(string_list)

main()

5. 利用if条件的短路特性

# 不推荐写法,代码耗时:0.05秒
from typing import List

def concatString(string_list: List[str]) -> str:
    abbreviations = {'cf.''e.g.''ex.''etc.''flg.''i.e.''Mr.''vs.'}
    abbr_count = 0
    result = ''
    for str_i in string_list:
        if str_i in abbreviations:
            result += str_i
    return result

def main():
    for _ in range(10000):
        string_list = ['Mr.''Hat''is''Chasing''the''black''cat''.']
        result = concatString(string_list)

main()

if 条件的短路特性是指对if a and b这样的语句, 当aFalse时将直接返回,不再计算b;对于if a or b这样的语句,当aTrue时将直接返回,不再计算b。因此, 为了节约运行时间,对于or语句,应该将值为True可能性比较高的变量写在or前,而and应该推后。

# 推荐写法,代码耗时:0.03秒
from typing import List

def concatString(string_list: List[str]) -> str:
    abbreviations = {'cf.''e.g.''ex.''etc.''flg.''i.e.''Mr.''vs.'}
    abbr_count = 0
    result = ''
    for str_i in string_list:
        if str_i[-1] == '.' and str_i in abbreviations:  # 利用 if 条件的短路特性
            result += str_i
    return result

def main():
    for _ in range(10000):
        string_list = ['Mr.''Hat''is''Chasing''the''black''cat''.']
        result = concatString(string_list)

main()

6. 循环优化

6.1 用for循环代替while循环

# 不推荐写法。代码耗时:6.7秒
def computeSum(size: int) -> int:
    sum_ = 0
    i = 0
    while i < size:
        sum_ += i
        i += 1
    return sum_

def main():
    size = 10000
    for _ in range(size):
        sum_ = computeSum(size)

main()

Python 的for循环比while循环快不少。

# 推荐写法。代码耗时:4.3秒
def computeSum(size: int) -> int:
    sum_ = 0
    for i in range(size):  # for 循环代替 while 循环
        sum_ += i
    return sum_

def main():
    size = 10000
    for _ in range(size):
        sum_ = computeSum(size)

main()

6.2 使用隐式for循环代替显式for循环

针对上面的例子,更进一步可以用隐式for循环来替代显式for循环

# 推荐写法。代码耗时:1.7秒
def computeSum(size: int) -> int:
    return sum(range(size))  # 隐式 for 循环代替显式 for 循环

def main():
    size = 10000
    for _ in range(size):
        sum = computeSum(size)

main()

6.3 减少内层for循环的计算

# 不推荐写法。代码耗时:12.8秒
import math

def main():
    size = 10000
    sqrt = math.sqrt
    for x in range(size):
        for y in range(size):
            z = sqrt(x) + sqrt(y)

main() 

上面的代码中sqrt(x)位于内侧for循环, 每次训练过程中都会重新计算一次,增加了时间开销。

# 推荐写法。代码耗时:7.0秒
import math

def main():
    size = 10000
    sqrt = math.sqrt
    for x in range(size):
        sqrt_x = sqrt(x)  # 减少内层 for 循环的计算
        for y in range(size):
            z = sqrt_x + sqrt(y)

main() 

7. 使用numba.jit

我们沿用上面介绍过的例子,在此基础上使用numba.jitnumba可以将 Python 函数 JIT 编译为机器码执行,大大提高代码运行速度。关于numba的更多信息见下面的主页:http://numba.pydata.org/numba.pydata.org

# 推荐写法。代码耗时:0.62秒
import numba

@numba.jit
def computeSum(size: float) -> int:
    sum = 0
    for i in range(size):
        sum += i
    return sum

def main():
    size = 10000
    for _ in range(size):
        sum = computeSum(size)

main()

8. 选择合适的数据结构

Python 内置的数据结构如str, tuple, list, set, dict底层都是 C 实现的,速度非常快,自己实现新的数据结构想在性能上达到内置的速度几乎是不可能的。

list类似于 C++ 中的std::vector,是一种动态数组。其会预分配一定内存空间,当预分配的内存空间用完,又继续向其中添加元素时,会申请一块更大的内存空间,然后将原有的所有元素都复制过去,之后销毁之前的内存空间,再插入新元素。

删除元素时操作类似,当已使用内存空间比预分配内存空间的一半还少时,会另外申请一块小内存,做一次元素复制,之后销毁原有大内存空间。

因此,如果有频繁的新增、删除操作,新增、删除的元素数量又很多时,list的效率不高。此时,应该考虑使用collections.dequecollections.deque是双端队列,同时具备栈和队列的特性,能够在两端进行 O(1) 复杂度的插入和删除操作。

list的查找操作也非常耗时。当需要在list频繁查找某些元素,或频繁有序访问这些元素时,可以使用bisect维护list对象有序并在其中进行二分查找,提升查找的效率。

另外一个常见需求是查找极小值或极大值,此时可以使用heapq模块将list转化为一个堆,使得获取最小值的时间复杂度是 O(1)

下面的网页给出了常用的 Python 数据结构的各项操作的时间复杂度:https://wiki.python.org/moin/TimeComplexity

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

PySnooper – 永远不要使用print进行调试

PySnooper 是一个非常方便的调试器。如果您正在试图弄清楚为什么您的Python代码没有按照您的预期去做,您会希望使用具有断点和监视功能的成熟Debug工具,但是许多Debug工具配置起来非常麻烦。

现在,有了PySnooper,您并不需要配置那么复杂的Debug工具,就能够完成对整个代码的分析。它能告诉您哪些代码正在运行,以及局部变量的值是什么。

其实,PySnooper就是替代了一行一行print的重复性工作,给你的代码一个pysnooper装饰器,它能自动识别到语句和变量并将其值print出来:

import pysnooper

@pysnooper.snoop()
def number_to_bits(number):
    if number:
        bits = []
        while number:
            number, remainder = divmod(number, 2)
            bits.insert(0, remainder)
        return bits
    else:
        return [0]

number_to_bits(6)

效果如下:

Source path:... 1.py
Starting var:.. number = 6
23:03:35.990701 call         4 def number_to_bits(number):
23:03:35.991699 line         5     if number:
23:03:35.991699 line         6         bits = []
New var:....... bits = []
23:03:35.991699 line         7         while number:
23:03:35.991699 line         8             number, remainder = divmod(number, 2)
Modified var:.. number = 3
New var:....... remainder = 0
23:03:35.991699 line         9             bits.insert(0, remainder)
Modified var:.. bits = [0]
23:03:36.004664 line         7         while number:
23:03:36.005661 line         8             number, remainder = divmod(number, 2)
Modified var:.. number = 1
Modified var:.. remainder = 1
23:03:36.005661 line         9             bits.insert(0, remainder)
Modified var:.. bits = [1, 0]
23:03:36.007657 line         7         while number:
23:03:36.007657 line         8             number, remainder = divmod(number, 2)
Modified var:.. number = 0
23:03:36.008655 line         9             bits.insert(0, remainder)
Modified var:.. bits = [1, 1, 0]
23:03:36.008655 line         7         while number:
23:03:36.009651 line        10         return bits
23:03:36.009651 return      10         return bits
Return value:.. [1, 1, 0]
Elapsed time: 00:00:00.020945

可以看到,它将每一行变量的值都输出到屏幕上,方便你调试代码。

仅仅需要写一行代码—使用装饰器就可以实现这个方便的调试功能,比起一行行写print,这可方便多了。

0.安装

使用这个模块,你只需要使用Pip安装PySnooper:

pip install pysnooper

接下来讲讲这个模块其他好用的功能:

1.支持日志文件

如果你觉得print到屏幕上不方便,还可以将其输出到log文件中,你只需要将装饰器那一行改为:

@pysnooper.snoop('/my/log/file.log')

2.读取局外变量或其他表达式

如果你想读取在装饰器作用范围以外的变量或者表达式的值,还可以使用watch参数:

@pysnooper.snoop(watch=('foo.bar', 'self.x["whatever"]'))

3.如果你不想用装饰器,也可以用上下文的形式调试

没错,装饰器有限定的使用条件,使用起来比较局限,因此pysnooper还支持使用 with 的上下文形式:

import pysnooper
import random

def foo():
    lst = []
    for i in range(10):
        lst.append(random.randrange(1, 1000))

    with pysnooper.snoop():
        lower = min(lst)
        upper = max(lst)
        mid = (lower + upper) / 2
        print(lower, mid, upper)

foo()

效果如下,只有上下文里的代码才会被调试出来:

New var:....... i = 9
New var:....... lst = [681, 267, 74, 832, 284, 678, ...]
09:37:35.881721 line        10         lower = min(lst)
New var:....... lower = 74
09:37:35.882137 line        11         upper = max(lst)
New var:....... upper = 832
09:37:35.882304 line        12         mid = (lower + upper) / 2
74 453.0 832
New var:....... mid = 453.0
09:37:35.882486 line        13         print(lower, mid, upper)
Elapsed time: 00:00:00.000344

当我们只需要调试部分代码的时候,这个上下文形式的调试方法非常方便。

此外,PySnooper还有许多更强大的用法,大家可以看他们的高级使用文档:

https://github.com/cool-RR/PySnooper/blob/master/ADVANCED_USAGE.md

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典