分类目录归档:设计模式

Box 为你的字典添加点符号访问特性

正常情况下,我们想访问字典中的某个值,都是通过中括号访问,比如:

test_dict = {"test": {"imdb stars": 6.7, "length": 104}}

print(test_dict["test"]["imdb stars"])
# 104

而通过Box模块,我们可以扩展字典功能,使用点符号访问元素:

from box import Box

movie_box = Box({ "Robin Hood: Men in Tights": { "imdb stars": 6.7, "length": 104 } })

movie_box.Robin_Hood_Men_in_Tights.imdb_stars

# 6.7

另外,可以看到默认情况下转换后,字典键值中的空格被转化为了下划线。

下面具体介绍 Box 模块的使用方法。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install --upgrade python-box[all]

2.基本使用

我们可以像文章开头那样传入一个字典给 Box,生成一个Box对象;也可以直接使用参数赋值的方式生成一个Box对象:

from box import Box

my_box = Box(funny_movie='Hudson Hawk', best_movie='Kung Fu Panda')
my_box.funny_movie
# 'Hudson Hawk'

请记住,任何情况下,你往Box对象里添加字典或是数组,这些字典或数组都会被转变为Box对象:

my_box = Box({"team": {"red": {"leader": "Sarge", "members": []}}})
print(my_box.team.red.leader)
# Sarge

my_box.team.blue = {"leader": "Church", "members": []} 
print(repr(my_box.team.blue))
# <Box: {'leader': 'Church', 'members': []}>

访问列表中的 Box 对象也非常轻松:

my_box.team.red.members = [
    {"name": "Grif", "rank": "Minor Junior Private Negative First Class"},
    {"name": "Dick Simmons", "rank": "Captain"}
]

print(my_box.team.red.members[0].name)
# Grif

局限性

请注意,字典中有些默认方法,如:clear, copy, fromkeys, get, items, keys, pop, popitem, setdefault, to_dict, update, merge_update, values,当你的键值和这些方法名称冲突时,你无法使用点符号访问它们。

不过冲突时,你依然可以使用传统的字典取值访问它们,例如:

my_box['keys']

合并

要合并两个Box对象,你只需要通过 merge_update 方法:

from box import Box

box_1 = Box(val={'important_key': 1}) 
box_2 = Box(val={'less_important_key': 2})

box_1.merge_update(box_2)

print(box_1)
# {'val': {'important_key': 1, 'less_important_key': 2}}

当然,你也可以用传统的 update 方法:

from box import Box

box_1 = Box(val={'important_key': 1}) 
box_2 = Box(val={'less_important_key': 2})

box_1.update(box_2)

print(box_1)
# {'val': {'less_important_key': 2}}

转换为原始列表/字典

如果你需要把一个 Box 对象的字典转化为原始字典,.to_dict() 方法就可以帮你实现:

from box import Box

box_1 = Box(val={'important_key': 1}) 

print(box_1)
# {'val': {'less_important_key': 2}}
print(type(box_1))
# <class 'box.box.Box'>
print(type(box_1.to_dict()))
# <class 'dict'>

如果你需要把一个 Box 对象的列表转化为原始列表,你可以使用 .to_list() 方法:

from box import BoxList

my_boxlist = BoxList({'item': x} for x in range(10))
#  <BoxList: [<Box: {'item': 0}>, <Box: {'item': 1}>, ...

my_boxlist[5].item
# 5

print(type(my_boxlist.to_list()))
# <class 'list'>

3.导入导出功能

Box对象有一个很方便的功能,就是能够轻松地将Box对象导出为Json/yaml/csv/msgpack文件:

from box import BoxList

my_boxlist = BoxList({'item': x} for x in range(10))
#  <BoxList: [<Box: {'item': 0}>, <Box: {'item': 1}>, ...

my_boxlist.to_json(filename="test.json")
# 在当前文件夹下生成一个 test.json 文件

此外,还能接受 Json/yaml/csv/msgpack 文件导入:

new_box = Box.from_json(filename="films.json")

各种类型的文件对应的方法如下:

转换器方法描述
to_dict递归地将所有 Box(和 BoxList)对象转换回字典(和列表)
to_json将 Box 对象另存为 JSON 字符串或使用filename参数写入文件
to_yaml将 Box 对象另存为 YAML 字符串或使用filename参数写入文件
to_msgpack将 Box 对象另存为 msgpack 字节或使用filename参数写入文件
to_toml*将 Box 对象另存为 TOML 字符串或使用filename参数写入文件
to_csv**将 BoxList 对象另存为 CSV 字符串或使用filename参数写入文件
from_jsonClassmethod,从一个 JSON 文件或字符串创建一个 Box 对象(所有 Box 参数都可以传递)
from_yaml类方法,从 YAML 文件或字符串创建一个 Box 对象(所有 Box 参数都可以传递)
from_msgpackClassmethod,从msgpack文件或字节创建一个Box对象(所有Box参数都可以传递)
from_toml*Classmethod,从TOML文件或字符串创建一个Box对象(所有Box参数都可以传递)
from_csv**Classmethod,从一个CSV文件或字符串创建一个BoxList对象(可以传递所有BoxList参数)

* 不适用于 BoxList,仅适用于 Box ** 不适用于 Box,仅适用于 BoxList。

还有更多的特性,大家可以参考 Box 模块官方WIki:

https://github.com/cdgriffith/Box/wiki

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 函数耗时异常自动化监控实战教程

来源:文呓

本文内容包括Python性能可视化分析,逻辑优化,及根据不同的模型动态计算安全阈值,实现各个函数耗时及程序总耗时的自动化监控预警

在做Python性能分析优化的时候,可以借助cProfile生成性能数据文件,通过pstats获取详细耗时分布数据,结合gprof2dot脚本生成函数调用栈结构图做可视化分析,提高性能分析的效率。

接着从具体的耗时分布,先从占用大头的函数分析具体逻辑实现,逐步优化,同时保存pstats函数耗时平均值数据作为后续异常自动化监控的样本数据。

实现耗时自动化监控必须是可以根据算法动态调整安全阈值,而不是人工定死安全阈值范围,这样才可以实现异常监控的自循环和迭代校准。

一、性能数据函数耗时采集及可视化报表生成

1. 性能数据文件保存(cProfile)

首先是性能数据文件的保存,cProfile和profile提供了Python程序的确定性性能分析。profile是一组统计数据,用来描述程序的各个部分执行的频率和时间。在程序开始的时候调用enable开始性能数据采集,结束的时候调用dump_stats停止性能数据采集并保存性能数据到指定路径的文件。

import cProfile
# 程序开始的时候打开数据采集开关
pr = cProfile.Profile()
pr.enable()

# 在程序运行结束的时候dump性能数据到指定路径文件中,profliePath为保存文件的绝对路径参数
pr.dump_stats(profliePath)

2. 详细性能数据读取查看

保存性能数据到文件之后,可以用pstats读取文件中的数据,profile统计数据可以通过pstats模块格式化为报表。

import pstats 
# 读取性能数据 
pS = pstats.Stats(profliePath) 
# 根据函数自身累计耗时做排序 
pS.sort_stats('tottime') 
# 打印所有耗时函数信息 
pS.print_stats()
print_stats()输出示例:
79837 function calls (75565 primitive calls) in 37.311 seconds
Ordered by: internal time
ncalls  tottime  percall  cumtime  percall  filename:lineno(function)
 2050    30.167    0.015   30.167    0.015  {time.sleep}
   16     6.579    0.411    6.579    0.411  {select.select}
    1     0.142    0.142    0.142    0.142  {method 'do_handshake' of '_ssl._SSLSocket' objects}
  434     0.074    0.000    0.074    0.000  {method 'read' of '_ssl._SSLSocket' objects}
    1     0.062    0.062    0.062    0.062  {method 'connect' of '_socket.socket' objects}
   37     0.046    0.001    0.046    0.001  {posix.read}
   14     0.024    0.002    0.024    0.002  {posix.fork}

输出字段说明:

  • ncalls  函数被调用次数(只有一个数字时表示不存在递归,有斜杠分割数字时,后面的数字表示非递归调用的次数)
  • tottime  函数总计运行时间,不包括子函数调用时间
  • percall  函数运行一次的平均时间,等于tottime/ncalls
  • cumtime 函数总计运行时间,包括子函数调用时间
  • percall  函数运行一次的平均时间,等于cumtime/ncalls
  • filename:lineno(function) 函数所在的文件名,函数的行号,函数名或基础框架函数类

如果要获取print_stats()里面各个字段信息可以通过如下方式:

# func————filename:lineno(function)
# cc ———— call count,调用次数 
# nc ———— ncalls
# tt ———— tottime
# ct ———— cumtime
# callers ———— 调用堆栈数组,每项数据包括了func, (cc, nc, tt, ct) 字段
for index in range(len(pS.fcn_list)): 
    func = pS.fcn_list[index] 
    cc, nc, tt, ct, callers = pS.stats[func]  
    print cc, nc, tt, ct, func, callers
    for func, (cc, nc, tt, ct) in callers.iteritems():
        print func,cc, nc, tt, ct

二、生成函数调用栈结构图(gprof2dot)教程

gprof2dot脚本把gprof或callgrind分析获得的信息,转化成一个以DOT语言描述的程序调用有向图对象,再通过Graphviz将DOT有向图对象渲染成图片,这样就可以很直观地看出整个程序的调用栈,包括函数所在的类和行数、耗时占比、函数递归次数、以及被调用的次数。

先从GitHub上下载gprof2dot.py脚本到本地,和执行的程序的脚本文件放在同一目录下,当然要使用这个脚本还需要安装graphviz,使用brew命令安装,若安装过程中遇到异常,根据异常提示执行命令安装需要的工具

brew install graphviz

生成程序函数调用栈结构图的逻辑可以参考如下逻辑实现,具体根据自身需求做下修改。

import os
# 获取当前gprof2dot.py脚本路径
gprof2dotPath = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'gprof2dot.py')
# 函数调用栈结构图保存文件名路径,这边使用生成PNG图片结果
dumpProfPath = profliePath.replace("stats", "png")
dumpCmd = "python %s -f pstats %s | dot -Tpng -o %s" % (gprof2dotPath, profliePath, dumpProfPath)
os.popen(dumpCmd)

三、性能分析及优化实战

在生成函数调用栈结构图之后,就可以很容易的看出各个函数之间的调用关系,每个方块内包括的信息包括函数所在的类和行数耗时百分比被调用次数,如果这个函数存在递归的情况,方块边缘会有一个回旋的箭头标明递归的次数

从结构图里面找到耗时占用较多的部分,分析具体函数的实现逻辑,定位具体耗时的原因,优化的策略如下:

  • 去除多余的逻辑:去除冗余代码
  • 优化递归函数:加日志打印递归时候的各个参数,如果发现很多参数都是重复的,可以加缓存,避免多余的递归消耗。
  • 归并通用逻辑调用:一个函数多次调用同一个子函数获取参数,查看这个子函数的调用是否可以进行整合归并,避免多余的函数调用。
  • 通过上下文环境判断测试程序的初始化是否必要,非必要情况下不进行测试环境的重置操作。

四、耗时异常自动化监控

如果是通过历史的耗时数据计算得到平均值+固定浮动百分比的方式,来配置耗时安全阈值参数实施异常监控存在很大的问题,因为函数执行的耗时容易受设备和运行环境的影响,人工固定浮动百分比的方式维护性差,数据本身不可迭代自循环,总不能每次出现误报问题之后都去手动调整参数。

这边监控的维度包括两方面,一方面是程序各个函数执行耗时的平均值,另一方面是完整程序执行的总耗时,在前期先把这些历史耗时数据保存在数据库中,供后续自动化异常监控的实现提供样本数据。

要实现自动化阈值调整,需要借助常规的模型算法实现,这边只对耗时单个维度的异常做自动化监控实现。

根据原理,无监督异常检测模型一般可分为以下几类:

  • 基于统计和概率模型:主要是对数据的分布做出假设,并找出假设下所定义的“异常”;
  • 线性模型:主要思想是通过线性方法找到合适的低维子空间使得异常点在其中区别于正常点;
  • 基于距离:这种方法认为异常点距离正常点比较远,通过比较数据点之间的距离区分异常点;
  • 基于密度:由于数据分布不均匀,绝对距离无法衡量数据点之间相对远近时,用局部密度表示数据点的异常情况;
  • 基于聚类:将数据点聚类,不属于任何簇、距离最近的簇较远、稀疏聚类里的点认为是异常点;
  • 基于树:通过划分子空间构建树模型寻找异常点。

异常耗时数据是波动的一维数据,这边就直接采用基于统计和概率模型的方式,根据保存的历史数据判断数据是否符合正态分布

若符合正态分布则用 μ+3δ(平均值+3倍标准差)的方式计算得到安全阈值

若不符合正态分布,则用Turkey 箱型图方案 Q+1.5IQR 计算安全阈值。

根据实际测试来看,随着样本数据的增加,会出现前期符合正态分布的函数耗时曲线,随着样本数据的增加会变成不符合正态分布。

Python中用于判断数据是否符合正态分布的代码如下,当pvalue值大于0.05时为正态分布,dataList是耗时数组数据:

from scipy import stats
import numpy
percallMean = numpy.mean(dataList) # 计算均值
# percallVar = numpy.var(dataList) # 求方差
percallStd = numpy.std(dataList) # 计算标准差
kstestResult = stats.kstest(dataList, 'norm', (percallMean, percallStd))
# 当pvalue值大于0.05为正态分布
if kstestResult[1] > 0.05:
    pass

1. 正态分布数据方案

在统计学中,如果一个数据分布近似正态,那么大约 68% 的数据值会在均值的一个标准差范围内,大约 95% 会在两个标准差范围内,大约 99.7% 会在三个标准差范围内。因此,如果任何数据点超过标准差的 3 倍,那么这些点很有可能是异常值或离群点。即正态分布的安全阈值上限为:percallMean + 3 * percallStd

 

2. Turkey 箱型图方案

基于正态分布的 3σ 法则或 Z 分数方法的异常检测是以假定数据服从正态分布为前提的,但实际数据往往并不严格服从正态分布。应用这种方法于非正态分布数据中判断异常值,其有效性是有限的。Tukey 箱型图是一种用于反映原始数据分布的特征常用方法,也可用于异常点识别。在识别异常点时其主要依靠实际数据,因此有其自身的优越性。

箱型图为我们提供了识别异常值的一个标准:异常值被定义为小于 Q1-1.5IQR 或大于 Q+1.5IQR 的值。虽然这种标准有点任意性,但它来源于经验判断,经验表明它在处理需要特别注意的数据方面表现不错。

计算箱型图安全阈值Python实现逻辑如下:

import numpy
percallMean = numpy.mean(dataList)  # 计算均值
boxplotQ1 = numpy.percentile(dataList, 25)
boxplotQ2 = numpy.percentile(dataList, 75)
boxplotIQR = boxplotQ2 - boxplotQ1
upperLimit =  boxplotQ2 + 1.5 * boxplotIQR

在程序实现中就是,在一个程序或用例执行完毕之后,先拿历史数据判断是否符合正态分布,当然历史样本数据至少要达到20个才比较准确,小于20个的时候就继续收集数据,不做异常判断。根据正态分布模型或箱型图模型计算安全阈值参数,判断当前各个函数耗时平均值或用例总耗时是否超过阈值,超过则预警。

高斯模型和箱型图两种方式阈值范围对比

这边给出stats文件数据汇总解析之后,根据相应的模型绘制耗时曲线及阈值或正态曲线及阈值的代码实现,statFolder参数替换成自己stats文件所在文件夹即可。

# coding=utf-8
import os
import pstats
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
import traceback
from scipy import stats
import numpy

"""
汇总函数耗时平均值数据
"""
def dataSummary(array, fileName, fcn, percall):
    (funcPath, line, func) = fcn
    exists = False
    for item in array:
        if item["func"] == func and item["funcPath"] == funcPath and item["line"] == line:
            exists = True
            item["cost"].append({
                "percall": percall,
                "fileName": fileName
            })
    if not exists:
        array.append({
            "func": func,
            "funcPath": funcPath,
            "line": line,
            "cost": [{
                "percall": percall,
                "fileName": fileName
            }]
        })

"""
高斯函数计算Y值
"""
def gaussian(x, mu, delta):
    exp = numpy.exp(- numpy.power(x - mu, 2) / (2 * numpy.power(delta, 2)))
    c = 1 / (delta * numpy.sqrt(2 * numpy.pi))
    return c * exp

"""
读取汇总所有stats文件数据
"""
def readStatsFile(statFolder, filterData):
    for path, dir_list, file_list in os.walk(statFolder, "r"):
        for fileName in file_list:
            if fileName.find(".stats") > 0:
                fileAbsolutePath = os.path.join(path, fileName)
                pS = pstats.Stats(fileAbsolutePath)
             # 先对耗时数据从大到小进行排序
                pS.sort_stats('cumtime')
                # pS.print_stats()
                # 统计前100条耗时数据
                for index in range(100):
                    fcn = pS.fcn_list[index]
                    (funcPath, line, func) = fcn
                    # cc ———— call count,调用次数
                    # nc ———— ncalls,调用次数(只有一个数字时表示不存在递归;有斜杠分割数字时,后面的数字表示非递归调用的次数)
                    # tt ———— tottime,函数总计运行时间,除去函数中调用的子函数运行时间
                    # ct ———— cumtime,函数总计运行时间,含调用的子函数运行时间
                    cc, nc, tt, ct, callers = pS.stats[fcn]
                    # print fileName, func, cc, nc, tt, ct, callers
                    percall = ct / nc
                    # 只统计单次函数调用大于1毫秒的数据
                    if percall >= 0.001:
                        dataSummary(filterData, fileName, fcn, percall)

"""
绘制高斯函数曲线和安全阈值
"""
def drawGaussian(func, line, percallMean, threshold, percallList, dumpFolder):
    plt.title(func)
    plt.figure(figsize=(10, 8))
    for delta in [0.2, 0.5, 1]:
        gaussY = []
        gaussX = []
        for item in percallList:
            # 这边为了呈现正态曲线效果,减去平均值
            gaussX.append(item - percallMean)
            y = gaussian(item - percallMean, 0, delta)
            gaussY.append(y)
        plt.plot(gaussX, gaussY, label='sigma={}'.format(delta))
    # 绘制水位线
    plt.plot([threshold - percallMean, threshold - percallMean], [0, 5 * gaussian(percallMean, 0, 1)], color='red',
             linestyle="-", label="Threshold:" + str("%.5f" % threshold))
    plt.xlabel("Time(s)", fontsize=12)
    plt.legend()
    plt.tight_layout()
    # 可能不同类中包含相同的函数名,加上行数参数避免覆盖
    imagePath = dumpFolder + "cost_%s_%s.png" % (func, str(line))
    plt.savefig(imagePath)

"""
绘制耗时曲线和安全阈值
"""
def drawCurve(func, line, percallList, dumpFolder):
    boxplotQ1 = numpy.percentile(percallList, 25)
    boxplotQ2 = numpy.percentile(percallList, 75)
    boxplotIQR = boxplotQ2 - boxplotQ1
    upperLimit = boxplotQ2 + 1.5 * boxplotIQR
    # 不符合正态分布,绘制波动曲线
    timeArray = [i for i in range(len(percallList))]
    plt.title(dataItem["func"])
    plt.figure(figsize=(10, 8))
    # 绘制水位线
    plt.plot([0, len(percallList)], [upperLimit, upperLimit], color='red', linestyle="-",
             label="Threshold:" + str("%.5f" % upperLimit))
    plt.plot(timeArray, percallList, label=dataItem["func"] + "_" + str(dataItem["line"]))
    plt.ylabel("Time(s)", fontsize=12)
    plt.legend()
    plt.tight_layout()
    imagePath = dumpFolder + "cost_%s_%s.png" % (func, str(line))
    plt.savefig(imagePath)

if __name__ == "__main__":
    try:
        statFolder = "/Users/chenwenguan/Downloads/2aab7e17-a1b6-1253/"
        chartFolder = statFolder + "chart/"
        if not os.path.exists(chartFolder):
            os.mkdir(chartFolder)
        filterData = []
        readStatsFile(statFolder, filterData);
        for dataItem in filterData:
            percallList = map(lambda x: x["percall"], dataItem["cost"])
            func = dataItem["func"]
            line = dataItem["line"]
            # 样本个数大于20才进行绘制
            if len(percallList) > 20:
                percallMean = numpy.mean(percallList) # 计算均值
                # percallVar = numpy.var(percallMap) # 求方差
                percallStd = numpy.std(percallList)  # 计算标准差
                # pvalue值大于0.05为正太分布
                kstestResult = stats.kstest(percallList, 'norm', (percallMean, percallStd))
                print "percallStd:%s, pvalue:%s" % (percallStd, kstestResult[1])
                # 符合正态分布绘制分布曲线
                if kstestResult[1] > 0.05:
                    threshold = percallMean + 3 * percallStd
                    drawGaussian(func, line, percallMean, threshold, percallList, chartFolder)
                else:
                    drawCurve(func, line, percallList, chartFolder)
            else:
                pass
    except Exception:
        print 'exeption:' + traceback.format_exc()

两种耗时模型绘制的曲线效果图如下:

函数耗时高斯分布曲线及阈值效果示例

 

函数耗时曲线及Turkey箱型图阈值示例

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python替换字符串的新工具 FlashText!比正则快M倍以上!

FlashText算法是由Vikash Singh于2017年发表的大规模替换字符串算法,这个算法的时间复杂度仅由文本长度(N)决定,算法时间复杂度为O(N)。

而对于正则表达式的替换,算法时间复杂度还需要考虑被替换的关键词数量(M),因此时间复杂度为O(MxN)。

简而言之,基于FlashText算法的字符串替换比正则表达式替换快M倍以上,这个M是需要替换的关键词数量,关键词越多,FlashText算法的优势就越明显

下面就给大家介绍如何在 Python 中基于 flashtext 模块使用 FlashText 算法进行字符串查找和替换,如果觉得对你的项目团队很有帮助,请记得帮作者转发一下哦。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install flashtext

2.FlashText基本使用—替换字符串

提取关键词

一个最基本的提取关键词的例子如下:

from flashtext import KeywordProcessor
# 1. 初始化关键字处理器
keyword_processor = KeywordProcessor()
# 2. 添加关键词
keyword_processor.add_keyword('Big Apple', 'New York')
keyword_processor.add_keyword('Bay Area')
# 3. 处理目标句子并提取相应关键词
keywords_found = keyword_processor.extract_keywords('I love Big Apple and Bay Area.')
# 4. 结果
print(keywords_found)
# ['New York', 'Bay Area']

其中 add_keyword 的第一个参数代表需要被查找的关键词,第二个参数是给这个关键词一个别名,如果找到了则以别名显示。

替换关键词

如果你想要替换关键词,只需要调用处理器的 replace_keywords 函数:

from flashtext import KeywordProcessor
# 1. 初始化关键字处理器
keyword_processor = KeywordProcessor()
# 2. 添加关键词
keyword_processor.add_keyword('New Delhi', 'NCR region')
# 3. 替换关键词
new_sentence = keyword_processor.replace_keywords('I love Big Apple and new delhi.')
# 4. 结果
print(new_sentence)
# 'I love New York and NCR region.'

关键词大小写敏感

如果你需要精确提取,识别大小写字母,那么你可以在处理器初始化的时候设定 sensitive 参数:

from flashtext import KeywordProcessor
# 1. 初始化关键字处理器, 注意设置大小写敏感(case_sensitive)为TRUE
keyword_processor = KeywordProcessor(case_sensitive=True)
# 2. 添加关键词
keyword_processor.add_keyword('Big Apple', 'New York')
keyword_processor.add_keyword('Bay Area')
# 3. 处理目标句子并提取相应关键词
keywords_found = keyword_processor.extract_keywords('I love big Apple and Bay Area.')
# 4. 结果
print(keywords_found)
# ['Bay Area']

标记关键词位置

如果你需要获取关键词在句子中的位置,在 extract_keywords 的时候添加 span_info=True 参数即可:

from flashtext import KeywordProcessor
# 1. 初始化关键字处理器
keyword_processor = KeywordProcessor()
# 2. 添加关键词
keyword_processor.add_keyword('Big Apple', 'New York')
keyword_processor.add_keyword('Bay Area')
# 3. 处理目标句子并提取相应关键词, 并标记关键词的起始、终止位置
keywords_found = keyword_processor.extract_keywords('I love big Apple and Bay Area.', span_info=True)
# 4. 结果
print(keywords_found)
# [('New York', 7, 16), ('Bay Area', 21, 29)]

获取目前所有的关键词

如果你需要获取当前已经添加的所有关键词,只需要调用处理器的get_all_keywords 函数:

from flashtext import KeywordProcessor
# 1. 初始化关键字处理器
keyword_processor = KeywordProcessor()
# 2. 添加关键词
keyword_processor.add_keyword('j2ee', 'Java')
keyword_processor.add_keyword('colour', 'color')
# 3. 获取所有关键词
keyword_processor.get_all_keywords()
# output: {'colour': 'color', 'j2ee': 'Java'}

批量添加关键词

批量添加关键词有两种方法,一种是通过词典,一种是通过数组:

from flashtext import KeywordProcessor
# 1. 初始化关键字处理器
keyword_processor = KeywordProcessor()
# 2. (第一种)通过字典批量添加关键词
keyword_dict = {
    "java": ["java_2e", "java programing"],
    "product management": ["PM", "product manager"]
}
keyword_processor.add_keywords_from_dict(keyword_dict)
# 2. (第二种)通过数组批量添加关键词
keyword_processor.add_keywords_from_list(["java", "python"])
# 3. 第一种的提取效果如下
keyword_processor.extract_keywords('I am a product manager for a java_2e platform')
# output ['product management', 'java']

单一或批量删除关键词

删除关键词也非常简单,和添加类似:

from flashtext import KeywordProcessor
# 1. 初始化关键字处理器
keyword_processor = KeywordProcessor()
# 2. 通过字典批量添加关键词
keyword_dict = {
    "java": ["java_2e", "java programing"],
    "product management": ["PM", "product manager"]
}
keyword_processor.add_keywords_from_dict(keyword_dict)
# 3. 提取效果如下
print(keyword_processor.extract_keywords('I am a product manager for a java_2e platform'))
# ['product management', 'java']
# 4. 单个删除关键词
keyword_processor.remove_keyword('java_2e')
# 5. 批量删除关键词,也是可以通过词典或者数组的形式
keyword_processor.remove_keywords_from_dict({"product management": ["PM"]})
keyword_processor.remove_keywords_from_list(["java programing"])
# 6. 删除了java programing关键词后的效果如下
keyword_processor.extract_keywords('I am a product manager for a java_2e platform')
# ['product management']

3.FlashText 高级使用

支持额外信息

前面提到在添加关键词的时候第二个参数为其别名,其实你不仅可以指示别名,还可以将额外信息放到第二个参数中:

from flashtext import KeywordProcessor
# 1. 初始化关键字处理器
kp = KeywordProcessor()
# 2. 添加关键词并附带额外信息
kp.add_keyword('Taj Mahal', ('Monument', 'Taj Mahal'))
kp.add_keyword('Delhi', ('Location', 'Delhi'))
# 3. 效果如下
kp.extract_keywords('Taj Mahal is in Delhi.')
# [('Monument', 'Taj Mahal'), ('Location', 'Delhi')]

这样,在提取关键词的时候,你还能拿到其他一些你想要在得到此关键词时输出的信息。

支持特殊单词边界

Flashtext 检测的单词边界一般局限于 \w [A-Za-z0-9_] 外的任意字符,但是如果你想添加某些特殊字符作为单词的一部分也是可以实现的:

from flashtext import KeywordProcessor
# 1. 初始化关键字处理器
keyword_processor = KeywordProcessor()
# 2. 添加关键词
keyword_processor.add_keyword('Big Apple')
# 3. 正常效果
print(keyword_processor.extract_keywords('I love Big Apple/Bay Area.'))
# ['Big Apple']
# 4. 将 '/' 作为单词一部分
keyword_processor.add_non_word_boundary('/')
# 5. 优化后的效果
print(keyword_processor.extract_keywords('I love Big Apple/Bay Area.'))
# []

4.结尾

个人认为这个模块已经满足我们的基本使用了,如果你有一些该模块提供的功能之外的使用需求,可以给 flashtext 贡献代码:
https://github.com/vi3k6i5/flashtext

附 FlashText 与正则相比 查询关键词 所花费的时间之比:

附 FlashText 与正则相比 替换关键词 所花费的时间之比:

这篇文章如果对你有帮助的话,记得转发一下哦。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 想了解EventLoop?这篇文章就够了

原文来自 python-parallel-programming-cookbook-cn

Python的Asyncio模块提供了管理事件、协程、任务和线程的方法,以及编写并发代码的原语。此模块的主要组件和概念包括:

  • 事件循环: 在Asyncio模块中,每一个进程都有一个事件循环。
  • 协程: 这是子程序的泛化概念。协程可以在执行期间暂停,这样就可以等待外部的处理(例如IO)完成之后,从之前暂停的地方恢复执行。
  • Futures: 定义了 Future 对象,和 concurrent.futures 模块一样,表示尚未完成的计算。
  • Tasks: 这是Asyncio的子类,用于封装和管理并行模式下的协程。

本节中重点讨论事件,事实上,异步编程的上下文中,事件无比重要。因为事件的本质就是异步。

1. 什么是事件循环

在计算系统中,可以产生事件的实体叫做事件源,能处理事件的实体叫做事件处理者。此外,还有一些第三方实体叫做事件循环。它的作用是管理所有的事件,在整个程序运行过程中不断循环执行,追踪事件发生的顺序将它们放到队列中,当主线程空闲的时候,调用相应的事件处理者处理事件。最后,我们可以通过下面的伪代码来理解事件循环::

while(1) {
  events = getEvents();
  for (e in events)
    processEvent(e);
}

所有的事件都在 while 循环中捕捉,然后经过事件处理者处理。事件处理的部分是系统唯一活跃的部分,当一个事件处理完成,流程继续处理下一个事件。

2. 准备工作

Asyncio提供了一下方法来管理事件循环:

  • loop = get_event_loop(): 得到当前上下文的事件循环。
  • loop.call_later(time_delay, callback, argument): 延后 time_delay 秒再执行 callback 方法。
  • loop.call_soon(callback, argument): 尽可能快调用 callbackcall_soon() 函数结束,主线程回到事件循环之后就会马上调用 callback 。
  • loop.time(): 以float类型返回当前事件循环的内部时间。
  • asyncio.set_event_loop(): 为当前上下文设置事件循环。
  • asyncio.new_event_loop(): 根据此策略创建一个新的事件循环并返回。
  • loop.run_forever(): 在调用 stop() 之前将一直运行。

3. 如何做…

下面的代码中,我们将展示如何使用Asyncio库提供的事件循环创建异步模式的应用。

import asyncio
import datetime
import time

def function_1(end_time, loop):
    print("function_1 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_2, end_time, loop)
    else:
        loop.stop()

def function_2(end_time, loop):
    print("function_2 called ")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_3, end_time, loop)
    else:
        loop.stop()

def function_3(end_time, loop):
    print("function_3 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_1, end_time, loop)
    else:
        loop.stop()

def function_4(end_time, loop):
    print("function_5 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_4, end_time, loop)
    else:
        loop.stop()

loop = asyncio.get_event_loop()

end_loop = loop.time() + 9.0
loop.call_soon(function_1, end_loop, loop)
# loop.call_soon(function_4, end_loop, loop)
loop.run_forever()
loop.close()

运行结果如下::

python3 event.py
function_1 called
function_2 called
function_3 called
function_1 called
function_2 called
function_3 called
function_1 called
function_2 called
function_3 called

在这个例子中,我们定义了三个异步的任务,相继执行,入下图所示的顺序。

首先,我们要得到这个事件循环::

loop = asyncio.get_event_loop()

然后我们通过 call_soon 方法调用了 function_1() 函数。

end_loop = loop.time() + 9.0
loop.call_soon(function_1, end_loop, loop)

让我们来看一下 function_1() 的定义::

def function_1(end_time, loop):
    print("function_1 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_2, end_time, loop)
    else:
        loop.stop()

这个函数通过以下参数定义了应用的异步行为:

  • end_time: 定义了 function_1() 可以运行的最长时间,并通过 call_later 方法传入到 function_2() 中作为参数
  • loop: 之前通过 get_event_loop() 方法得到的事件循环

function_1() 的任务非常简单,只是打印出函数名字。当然,里面也可以写非常复杂的操作。

print("function_1 called")

任务执行结束之后,它将会比较 loop.time() +1s和设定的运行时间,如果没有超过,使用 call_later 在1秒之后执行 function_2() 。

if (loop.time() + 1.0) < end_time:
    loop.call_later(1, function_2, end_time, loop)
else:
    loop.stop()

function_2() 和 function_3() 的作用类似。

如果运行的时间超过了设定,事件循环终止。

loop.run_forever()
loop.close()

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Jrnl — Python编写的超方便命令行笔记程序

Jrnl 是用Python编写的命令行笔记应用程序,用起来非常简单方便,特别适合需要快速记录文本信息的同学。

您可以使用它轻松创建,搜索和查看所有的笔记。笔记以人类可读的纯文本存储,当然也可以使用 AES加密进行加密

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

pip install jrnl

2.快速上手

要创建一个新的笔记,你只需要在终端这样输入:

jrnl now: 第一次使用. 我擦,这玩意儿真的好用吗?                                              

jrnl 是笔记开始的标志。now: 的标记会记录一个当前时间的时间戳,后接的第一句话到句号(.)为止是笔记的标题。句号后续的所有内容都是该笔记的内容。

第一次使用的时候,会让你指定笔记记录的位置和是否需要加密:

Path to your journal file (leave blank for C:\Users\83493\.local\share\jrnl\journal.txt):
Do you want to encrypt your journal? You can always change this later [y/N] n

一般默认即可,除非你需要做特殊的处理。

如果要查看刚刚编写的记录,可以这样查看到今日为止的所有笔记:

jrnl -to today

结果如下:

或者:

jrnl -n 1

-n 后接的是数字,能够查看最近n条笔记,比如最近一条笔记:

不错,要记点简单的东西的时候甚至不需要开文档编辑器,直接终端用jrnl记录即可。

下面是更多功能的说明。

3.基本使用

如果你在输入 jrnl 命令时后面不接时间,jrnl 会默认使用当前时间插入到笔记中。

不过有时候我们想记的笔记或者日记是多日之前甚至是几个月之前的,这时候jrnl也提供了许多强大的时间格式:

3.1 笔记的时间

jrnl 支持的时间格式如下:

  • at 6am
  • yesterday
  • last monday
  • sunday at noon
  • 2 march 2012
  • 7 apr
  • 5/20/1998 at 23:42
  • 2020-05-22T15:55-04:00

比如:

jrnl 2021-02-01: 2月初. 2月的第一天,祝大家2月万事如意,快快乐乐。

然后查看到今日为止的所有笔记如下:

PS G:\push> jrnl -to today
2021-02-01 09:00 Called in sick.
| Used the time to clean and spent 4h on writing my book.

2021-02-01 09:00 2月初. 
| 2月的第一天,祝大家2月万事如意,快快乐乐。

2021-02-02 00:21 第一次使用.
| 我擦,这玩意儿真的好用吗?

当然,不使用冒号也是可以记笔记的:

PS G:\push> jrnl 不用冒号也能记笔记吗?
[Entry added to default journal]

3.2 标签功能

jrnl 支持标签功能。默认标记符号为@(不用#号是因为它是保留字符)。

要使用标签,请在所需标记的文字前面加上@符号:

jrnl Had a wonderful day at the @beach with @Tom and @Anna.

尽管可以在标记条目时使用大写字母,但按标记搜索时不区分大小写。

条目中可以使用多个标签没有限制。

3.3 重点笔记

要将笔记标记为重点项,只需使用星号(*)对它进行“星标” :

jrnl last sunday *: Best day of my life.

如果你不想添加日期,则以下选项是等效的(确保*号后面没有空格):

jrnl *: Best day of my life.
jrnl *Best day of my life.
jrnl Best day of my life.*

3.4 查看和搜索

要查看到今天为止的所有条目,请输入:

jrnl -to today

jrnl提供了几个过滤命令,以单破折号(-)开头,可让您更方便地进行查找。例如 -n:

jrnl -n 10

列出最近的十个条目。更简洁的写法是 jrnl -10,这两者效果一致。

如果要查看从去年年初到今年三月底之前编写的所有条目,请输入

jrnl -from "last year" -to march

使用多个单词的过滤条件需要使用引号("")括起来。

要查看特定日期的条目,请使用-on

jrnl -on yesterday

-contains 命令显示包含该关键词的所有笔记,–edit 允许你编辑这些笔记。

jrnl -contains "dogs" --edit

不过编辑笔记之前,jrnl会提示你配置一个默认的编辑器(因为编辑功能需要打开编辑器):

按标签过滤

您可以按标签过滤笔记。例如:

jrnl @pinkie @WorldDomination

显示@pinkie@WorldDomination 的所有笔记。标签过滤器可以与其他过滤器结合使用:

jrnl -n 5 @pinkie -and @WorldDomination

显示包含  @pinkie 和 @worldDomination 的最近五个笔记。

要查看笔记中所有的标签,请输入:

jrnl --tags

查看所有重点笔记:

jrnl -starred

3.5 删除笔记

删除笔记非常简单,相当于搜索后加 –delete 参数进行删除。

PS G:\push> jrnl -contains "2月" --delete
Delete entry '2021-02-01 09:00 2月初.2月的第一天,祝大家2月万事如意,快快乐乐。'? [y/N] y

基本的使用就是这些,jrnl 还有一些高级用法,大家可以在官网参考使用:
https://jrnl.sh/en/stable/advanced/

综上所述,如果你有快速记录文本信息的需求,这个工具是你的不二之选。​

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 你可能从未听说过的5种隐藏技巧

1. …

没错,你没看错,就是 …

在Python中 … 代表着一个名为 Ellipsis 的对象。根据官方说明,它是一个特殊值,通常可以作为空函数的占位符,或是用于Numpy中的切片操作

如:

def my_awesome_function():
    ...

等同于:

def my_awesome_function():
    Ellipsis

当然,你也可以使用pass或者字符串作为占位符:

def my_awesome_function():
    pass
def my_awesome_function():
    "An empty, but also awesome function"

他们最终的效果都是相同的。

接下来讲讲 … 对象是如何在Numpy中体现出作用的,创建一个 3x3x3 的矩阵数组,然后获取所有最内层矩阵的第二列:

>>> import numpy as np
>>> array = np.arange(27).reshape(3, 3, 3)
>>> array
array([[[ 0,  1,  2],
        [ 3,  4,  5],
        [ 6,  7,  8]],

       [[ 9, 10, 11],
        [12, 13, 14],
        [15, 16, 17]],

       [[18, 19, 20],
        [21, 22, 23],
        [24, 25, 26]]])

为了获取最层矩阵的第二列,传统方法可能是这样的:

>>> array[:, :, 1] 
array([[ 1,  4,  7],
       [10, 13, 16],
       [19, 22, 25]])

如果你会用 … 对象,则是这样的:

>>> array[..., 1] 
array([[ 1,  4,  7],
       [10, 13, 16],
       [19, 22, 25]])

不过请注意, … 对象仅可用于Numpy,不适用于Python内置数组。

2.解压迭代对象

解压迭代对象是一个非常方便的特性:

>>> a, *b, c = range(1, 11)
>>> a
1
>>> c
10
>>> b
[2, 3, 4, 5, 6, 7, 8, 9]

或者是:

>>> a, b, c = range(3)
>>> a
0
>>> b
1
>>> c
2

同理,与其写这样的代码:

>>> lst = [1]
>>> a = lst[0]
>>> a
1
>>> (a, ) = lst
>>> a
1

你不如跟解压迭代对象一样,进行更优雅的赋值操作:

>>> lst = [1]
>>> [a] = lst
>>> a
1

虽然这看起来有点蠢,但就我个人来看,比前一种写法更优雅一些。

3.展开的艺术

数组展开有各种千奇百怪的姿势,比如说:

>>> l = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
>>> flattened = [elem for sublist in l for elem in sublist]
>>> flattened
[1, 2, 3, 4, 5, 6, 7, 8, 9]

如果你对reduce和lambda有一定了解,建议使用更优雅的方式:

>>> from functools import reduce
>>> reduce(lambda x,y: x+y, l)
[1, 2, 3, 4, 5, 6, 7, 8, 9]

reduce和lambda组合起来,就能针对 l 数组内的每个子数组做拼接操作。

当然,还有更神奇的方式:

>>> sum(l, [])
[1, 2, 3, 4, 5, 6, 7, 8, 9]
>>> # 其实相当于 [] + [1, 2, 3] + [4, 5, 6] + [7, 8, 9]

没错,这样对二维数组做sum操作,就能使二维数组内的每个元素做“加”法拼接起来。同样的道理,如果你对三位数组做sum操作,就能使其变为二维数组,再对二维数组做sum操作,就能展开为一维数组。

不过,虽然这个技巧很出色,并不推荐使用,因为可读性太差了。

4.下划线 _ 变量

每当你在Python解释器,IPython或Django Console中运行表达式时,Python都会将输出的值绑定到 _ 变量中:

>>> nums = [1, 3, 7]
>>> sum(nums)
11
>>> _
11
>>>

由于它是一个变量,你可以随时覆盖它,或像普通变量一样操作它:

>>> 9 + _
20
>>> a = _
>>> a
20

5.多种用途的else

很多人都不知道,else 可以被用于许多地方,除了典型的 if else, 我们还可以在循环和异常处理里用到它。

循环

如果需要判断循环里是否处理了某个逻辑,通常情况下会这么做:

found = False
a = 0

while a < 10:
    if a == 12:
        found = True
    a += 1
if not found:
    print("a was never found")

如果引入else,我们可以少用一个变量:

a = 0

while a < 10:
    if a == 12:
        break
    a += 1
else:
    print("a was never found")

异常处理

我们可以在 try … except … 中使用 else 编写未捕获到异常时的逻辑:

In [13]: try:
    ...:     {}['lala']
    ...: except KeyError:
    ...:     print("Key is missing")
    ...: else:
    ...:     print("Else here")
    ...: 
Key is missing

这样,如果程序没有异常,则会走else分支:

In [14]: try:
    ...:     {'lala': 'bla'}['lala']
    ...: except KeyError:
    ...:     print("Key is missing")
    ...: else:
    ...:     print("Else here")
    ...: 
Else here

如果你经常做异常处理,你就会知道这个技巧相当方便。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 简化函数调用的3种技巧

假设有一个函数,这个函数需要接收4个参数,并返回这4个参数的和:

def sum_four(a, b, c, d):
    return a + b + c + d

如果需要固定最后前三个参数,仅改变最后一个参数的值,这时候可能需要这么调用:

>>> a, b, c = 1, 2, 3

>>> sum_four(a=a, b=b, c=c, d=1)
7

>>> sum_four(a=a, b=b, c=c, d=2)
8

>>> sum_four(a=a, b=b, c=c, d=3)
9

>>> sum_four(a=a, b=b, c=c, d=4)
10

这样写实在是太丑了,如果用 Map 函数,是否能简化代码?

答案是肯定的,但是Map函数只能接受单一元素,如果你强行使用的话,它会报这样的错:

>>> list(map(sum_four, [(1, 2, 3, 4)]))
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: sum_four() missing 3 required positional arguments: 'b', 'c', and 'd'

怎么解决?

方案1: itertools.starmap

我们可以使用 itertools 的函数 starmap 替换Map.

它与Map不同,允许接受一个元组作为传入sum_four的参数。

>>> import itertools
>>> list(itertools.starmap(sum_four, [(1, 2, 3, 4)]))
[10]

非常棒,这样的话,上述问题就可以使用starmap函数解决:

>>> import itertools

>>> ds = [1, 2, 3, 4]

>>> items = ((a, b, c, d) for d in ds)

>>> list(items)
 [(1, 2, 3, 1), (1, 2, 3, 2), (1, 2, 3, 3), (1, 2, 3, 4)]

>>> list(itertools.starmap(sum_four, items))
 [7, 8, 9, 10]

请注意 items 是一个生成器,这是为了避免 items 过大导致内存消耗量过大。平时开发的时候注意这些细节,能够使你和普通的开发者拉开差距。

方案2: functools.partial

第二种解决方案是使用 partial 函数固定前三个参数。

根据文档,partial 将“冻结”函数的参数的某些部分,从而生成简化版的函数。

因此上述问题的解决方案就是:

>>> import functools
>>> partial_sum_four = functools.partial(sum_four, a, b, c)
>>> partial_sum_four(3)
9
>>> # 这样就可以使用map函数了:
>>> list(map(partial_sum_four, ds))
[7, 8, 9, 10]

方案3: itertools.repeat()

事实上,Map 函数是允许传递可迭代参数的,但是有一个有趣的特点,他会用每个可迭代对象里的项作为传入函数的不同参数。这样说可能太过于抽象了,来看看实际的例子:

>>> list(map(sum_four, [1, 1, 1, 1], [2, 2, 2, 2], [3, 3, 3, 3], [1,2,3,4]))
 [7, 8, 9, 10]

明白了吧,每次都使用了不同数组中对应下标的项传入函数进行计算。

这样,我们可以使用这个特点进行优化。

itertools.repeat() 函数能够根据参数产生一个迭代器,该迭代器一次又一次返回对象。不指定times参数,它将无限期运行。

而 Map 函数会在最短的可迭代对象被迭代完后,就会自动停止运行。

结合这两个特点,上述问题的解决方案就出来了:

>>> import itertools
>>> list(map(sum_four, itertools.repeat(a), itertools.repeat(b), itertools.repeat(c), ds))
 [7, 8, 9, 10]

这招还是非常巧妙的。缺点是能读懂的人不多。不过没关系,计算机世界中某些东西知道就好,你并不一定需要去使用它。

比如本文中的这几种解决方案,日常生活工作中一般用不到,所以你不需要死记硬背,但你需要知道【有这样的问题】和【有这些解决方案】,万一遇到了相似的场景,你就可以回忆起这篇文章并快速找到解决的方法。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!

给作者打赏,选择打赏金额
¥1¥5¥10¥20¥50¥100¥200 自定义

​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

关于Python3.9,你不可不知的4个新特性

1.词典联合运算符

这是我最喜欢的功能之一,语法非常优美。

在Python3.9,如果你有两个词典,现在可以用这些运算符进行合并和更新。

合并运算符 “|”:

还有update运算符|=,它会更新原始字典:

a = {1: 'a', 2: 'b', 3: 'c'}
b = {4: 'd', 5: 'e'}
a |= b
print(a)
{1: 'a', 2: 'b', 3: 'c', 4: 'd', 5: 'e'}

如果我们的词典共享一个key,那么将使用第二个词典中的value:

a = {1: 'a', 2: 'b', 3: 'c', 6: 'in both'}
b = {4: 'd', 5: 'e', 6: 'but different'}
print(a | b)
{1: 'a', 2: 'b', 3: 'c', 6: 'but different', 4: 'd', 5: 'e'}

使用可迭代对象进行字典更新

|=操作符的另一个很酷的特性是能够使用可迭代对象(例如列表或生成器)使用新的键值对更新字典:

a = {'a': 'one', 'b': 'two'}
b = ((i, i**2) for i in range(3))
a |= b
print(a)
{'a': 'one', 'b': 'two', 0: 0, 1: 1, 2: 4}

当然,如果你用|这样做,则会得到TypeError,因为它只能用于dict类型之间的联合。

2.字符串方法

removeprefix()和removesuffix()

str.removeprefix(substring: string) 是一个方法,接收一个substring参数,顾名思义,它将删除字符串对应的substring后缀,如果没有对应的后缀,返回原字符串。

str.removesuffix(substring: string) 是一个方法,接收一个substring参数,它将删除字符串的对应substring前缀,如果没有对应的前缀,返回原字符串。

当然,两个函数执行你可以通过使用string[len(prefix):]前缀和string[:-len(suffix)]后缀来实现。

这些是非常简单的操作,因此也是非常简单的功能,考虑到你可能经常执行这些操作,Python3.9 提供的这两个内置函数应该能让你非常爽。

3.新的数学函数

Python 3.9 的数学模块进行了不少的优化并添加了许多新功能。

比如以前gcd计算最大公因数的函数只能应用于2个数字,这就很蛋疼,我们必须使用 math.gcd(80, math.gcd(64, 152))来处理大于2个数字的情况。

现在 gcd 允许计算任意数量的数字。

import math

# Greatest common divisor
math.gcd(80, 64, 152)
# 8

Math模块中,第一个新增的功能是:

# 最小公倍数
math.lcm(4, 8, 5)
# 40

用于计算最小公倍数:math.lcm,与gcd一样,它允许可变数量的参数。

4.新解析器

这一个更改你可能看不见、摸不着,但它可能改变Python的未来。

以前Python使用 LL(1) 解析器,现在Python开始使用 PEG 解析器,官方认为,这个更改会使得他们更加方便地构建新功能。

因此,请期待Python 3.10,Python团队或许能给我们带来更多的惊喜!

我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python 简单实用的日志装饰器

在写代码的时候,往往会漏掉日志这个关键因素,导致功能在使用的时候出错却无法溯源。

其实,只需要写一个非常简单的日志装饰器,我们就能大大提升排查问题的效率。

1.简陋版

写一个装饰器非常简单,因为本质上装饰器就是一个返回函数的“高阶”函数而已:

1.函数作为参数传递进装饰器。
2.装饰器内定义一个函数,处理作为参数传递进来的函数。
3.返回这个装饰器内定义的函数

import datetime


def log(func):
    """
    日志装饰器,简单记录函数的日志

    Args:
        func (function): 函数
    """
    def inner(*args):
        timestamp = str(datetime.datetime.now()).split(".")[0]
        res = func(*args)
        print(f"[{timestamp}] ({func.__name__}) {args} -> {res}")
        return res
    return inner

用一下试试看:

@log
def pluser(a, b):
    return a + b

pluser(1, 2)

效果如下:

虽然这样可以实现我们所需要的功能,但其实有很大的优化空间。

2.普通版

第一版代码中有一个显而易见的问题,装饰器内定义的处理函数不支持kwargs,而在装饰器中支持kwargs仅仅是举手之劳而已。

第二个问题是,生成时间戳的时候采用字符串截取的形式,这种形式过于粗暴。其实可以使用strftime做字符串转换。

修改如下:

import datetime


def log(func):
    """
    日志装饰器,简单记录函数的日志

    Args:
        func (function): 函数
    """
    def inner(*args, **kwargs):
        timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        res = func(*args, **kwargs)
        print(f"[{timestamp}] ({func.__name__}) {args} -> {res}")
        return res
    return inner

似乎优化地差不多了,不过依然存在改进空间。

3.优化版

在前两版代码中,我们使用print进行日志输出,其实这种处理日志的方式并不标准。

使用logging模块控制日志输出是一个更好地选择。

为了使用logging模块记录日志,我们需要先配置好logging相关的选项。

1.首先,生成一个日志记录器,并配置日志等级:

import logging

# 获取日志记录器,配置日志等级
logger = logging.getLogger(__name__)
logger.setLevel('DEBUG')

2.配置日志格式、增加handler控制输出流:

# 默认日志格式
formatter = logging.Formatter("%(asctime)s - [%(levelname)s] - %(message)s")
# 输出到控制台的handler
chlr = logging.StreamHandler()
# 配置默认日志格式
chlr.setFormatter(formatter)

此处可以设置handler所需要处理的日志等级,没有设置则默认使用logger自身的Level,即DEBUG等级。

3.最后,将此handler加入到日志记录器内:

# 日志记录器增加此handler
logger.addHandler(chlr)

logging 完整配置如下:

import logging

# 获取日志记录器,配置日志等级
logger = logging.getLogger(__name__)
logger.setLevel('DEBUG')

# 默认日志格式
formatter = logging.Formatter("%(asctime)s - [%(levelname)s] - %(message)s")
# 输出到控制台的handler
chlr = logging.StreamHandler()
# 配置默认日志格式
chlr.setFormatter(formatter)

# 日志记录器增加此handler
logger.addHandler(chlr)

使用的时候非常简单,就是把print换成logger.debug即可:

def log(func):
    """
    日志装饰器,简单记录函数的日志

    Args:
        func (function): 函数
    """
    def inner(*args, **kwargs):
        timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        res = func(*args, **kwargs)
        logger.debug(f"func: {func.__name__} {args} -> {res}")
        return res
    return inner

效果如下:

这样,一个比较完善的日志装饰器就完成了。

附常用的日志等级配置:

我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典

Kafka入门及Kafka-Python初体验

本文介绍了以下内容:

1.什么是Kafka?

2.为什么我们需要使用Kafka这样的消息系统及使用它的好处

3.如何将Kafka使用到我们的后端设计中。

译自https://timber.io/blog/hello-world-in-kafka-using-python/,有部分删改。

1.Kafka是什么、为什么我们需要它?

简而言之,Kafka是一个分布式消息系统。这是什么意思呢?

想象一下,你现在有一个简单的Web应用,其包含了网页前端客户端(Client)、服务端和数据库:

你需要记录所有发生在你的Web应用的事件,比如点击、请求、搜索等,以便后续进行计算和运营分析。

假设每个事件都由单独的APP完成,那么一个简单的解决方案就是将数据存储在数据库中,所有APP连接到数据库进行存储:

这看起来简单,但是其中还会出现许多问题:

1.点击、请求、搜索等事件会产生大量的数据到数据库中,这可能会导致插入事件存在延迟。

2.如果选择将高频数据存储在SQL或MongoDB等数据库中,很难再原有历史数据的基础上扩展数据库。

3.如果你需要用这些数据进行数据分析,你可能无法直接对数据库进行高频率的读取操作。

4.每个APP可以遵循自己的数据格式,这就意味着当你需要在不同的APP进行数据交换时,你需要进行数据格式的转换。

通过使用像Kafka这样的消息流系统,可以很好地解决这些问题,因为他们可以执行以下操作:

1.存储的大量数据可以被持久化、校验和复制,具备容错能力。

2.支持跨系统实时处理连续的数据流。

3.允许APP独立发布数据或数据流,并与使用它的APP无关。

那么它和传统数据库有何不同?

尽管Kafka可以持久化地存储数据,但它不是数据库。

Kafka不仅允许APP存储或提取连续的数据流,还支持实时处理。这与对被动数据执行CRUD操作或对传统数据库执行查询的方式不同。

听起来不错,那么Kafka是如何解决以上挑战的?

Kafka是一个分布式平台,是为规模而构建的,这意味着它可以处理高频率的读写和存储大量数据。它确保数据始终可靠。它还支持从故障中恢复的强大机制。

以下是为什么应该使用Kafka的一些关键因素:

1.1 简化后端架构

在Kafka的帮助下,我们前面的结构会变得简单一些:

1.2 通用数据管道

如上所示,Kafka充当多个APP和服务的通用数据管道,这给了我们两个好处:

1.数据是集成的,我们将来自不同系统的数据都存在一个地方,这使得Kafka成为真正的数据源。任何APP都可以将数据推送到该平台,然后由另一个APP提取数据。

2.Kafka使得应用程序之间交换数据变得容易。因为我们可以标准化数据格式,减少了数据格式的转换。

1.3 通用连接性

尽管Kafka允许你使用标准数据格式,但并不意味着你的APP就不需要数据转换了,它只是减少了我们转换数据的频率罢了。

此外,Kafka提供了一个叫 Kafka Connect 的框架允许我们维护遗留的老系统。

1.4 实时数据处理

类似于监控系统这样的实时APP,往往需要连续的数据流,这些数据需要被立即处理或尽量减少延迟处理。

Kafka的流式处理,使得处理引擎可以在很短的时间内(几毫米到几分钟)内取数、分析、以及响应。

2.Kafka入门

2.1 安装

安装Kafka是一个相当简单的过程。只需遵循以下给定步骤:

1.下载最新的1.1.0版本的Kafka

2.使用以下命令解压缩下载文件: tar -xzf kafka_2.11-1.1.0.tgz

3.cd到Kafka目录开始使用它: cd kafka_2.11-1.1.0

2.2 启动服务器

ZooKeeper是一个针对Kafka等分布式环境的集中管理工具,它为大型分布式系统提供配置服务、同步服务及命名注册表。

因此,我们需要先启动ZooKeeper服务器,然后再启动Kafka服务器。使用以下命令即可:

# Start ZooKeeper Server
bin/zookeeper-server-start.sh config/zookeeper.properties

# Start Kafka Server
bin/kafka-server-start.sh config/server.properties

2.3 Kafka 基本概念

我们快速介绍一下Kafka体系结构的核心概念:

1.Kafka在一个或多个服务器上作为集群运行。

2.Kafka将数据流存储在名为topics的类别中。每条数据均由键、值、时间戳组成。

3.Kafka使用发布-订阅模式。它允许某些APP充当producers(生产者),记录数据并将数据发布到Kafka topic中。

同样,它允许某些APP充当consumer(消费者)和订阅Kafka topic并处理由它产生的数据。

4.除了Prodcuer API 和 Consumer API,Kafka还为应用提供了一个 Streams API 作为流处理器。通过 Connector API 我们可以将Kafka连接到其他现有的应用程序和数据系统。

2.4 架构

如你所见,每个Kafka的 Topic 可以分为多个Partition(分区),可以使用broker(经纪人)在不同的计算机上复制这些 Topic,从而使消费者可以并行读取 Topic.

kafka的复制是针对分区的:

比如上图中有4个broker, 1个topic, 2个分区,复制因子是3。当producer发送一个消息的时候,它会选择一个分区,比如topic1-part1分区,将消息发送给这个分区的leader, broker2、broker3会拉取这个消息,一旦消息被拉取过来,slave会发送ack给master,这时候master才commit这个log。

因此,整个系统的容错级别极高。当系统正常运行时,对Topic的所有读取和写入都将通过leader,且leader会保证所有其他broker均被更新。

如果Broker失效了,系统会自动重新配置,此时副本也可以接管成为Leader.

2.5 创建Kafka Topic

让我们创建一个名为 sample,含有一个partition(分区)和一个replica(副本)的Kafka Topic:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic sample

列出所有的Kafka Topics,检查是否成功创建了sample Topic:

bin/kafka-topics.sh --list --zookeeper localhost:2181

describe topics 命令还可以获得特定Topic的详细信息:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic sample

2.6 创建生产者与消费者

这里是代码实战部分,利用Kafka-Python实现简单的生产者和消费者。

1.首先需要安装kafka-python:

pip install kafka-python

2.创建消费者(consumer.py)

from kafka import KafkaConsumer
consumer = KafkaConsumer('sample')
for message in consumer:
    print (message)

3.创建生产者(producer.py)

现在,有一个消费者正在订阅我们的消息流,因此我们要创建一个生产者,发布消息到Kafka:

from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('sample', b'Hello, World!')
producer.send('sample', key=b'message-two', value=b'This is Kafka-Python')

现在,你重新运行消费者(consumer.py),你就会接收到生产者发送过来的消息。

我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应红色字体的验证信息,进入互助群询问。


​Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典