这3种数组内的字典去重方式 你会几种?

你知道吗?如果数组是字典组成的,直接对数组内的字典采用set的方式进行去重,会报错:

test = [{"a": 1}, {"a": 1}, {"a": 3}, {"b": 4}]
test = list(set(test))
>>>TypeError: unhashable type: 'dict'

因为使用set去重的前提是该对象为不可变对象,而字典是可变对象,因此无法使用该方法去重。

那么怎么解决这个问题呢?有三个办法。

1.使用reduce方法

reduce() 函数会对参数序列中元素进行累积。

比如:

from functools import reduce
>>>def add(x, y) :            # 两数相加
...    return x + y
... 
>>>reduce(add, [1,2,3,4,5])   # 计算列表和:1+2+3+4+5
15

上述写法也能用lambda函数简化为:

from functools import reduce
>>> reduce(lambda x, y: x+y, [1,2,3,4,5])  # 使用 lambda 匿名函数
15

因此,我们自己编写一个函数进行数组内的字典去重:

from functools import reduce

data = [{"a": 1}, {"a": 1}, {"a": 3}, {"b": 4}]
result = []
def unduplicate(result, data):
    if data not in result:
        result = result + [data]
    return result

for i in data:
    result = unduplicate(result, i)

>>> result
>>> [{'a': 1}, {'a': 3}, {'b': 4}]

稍显复杂,如果使用reduce函数和lambda函数,代码能简化很多:

def delete_duplicate(data):
    func = lambda x, y: x + [y] if y not in x else x
    data = reduce(func, [[], ] + data)
    return data

>>> delete_duplicate(data)
>>> [{'a': 1}, {'a': 3}, {'b': 4}]

当然, 我也能一行写完这个功能:

data = reduce(lambda x, y: x + [y] if y not in x else x, [[], ] + data)

只不过有可能会被打死在工位上,所以不建议这么干。

2.奇怪的技巧

就如文章开头提到的,字典之所以不能用set去重,是因为它是可变对象。

但是…如果我们把它变成不可变对象呢?

data = [{"a": 1}, {"a": 1}, {"a": 3}, {"b": 4}]
def delete_duplicate(data):
    immutable_dict = set([str(item) for item in data])
    data = [eval(i) for i in immutable_dict]
    return data
>>> delete_duplicate(data)
>>> [{'a': 1}, {'a': 3}, {'b': 4}]

没错,这能成。

1.遍历字典,将每个子项变成字符串存放到数组中,再通过set函数去重。

2.通过eval函数,将去重后的数组里的每个子项重新转化回字典。

如此Python,怎能不好玩?

3.高效的方式

上面讲了两种骚操作,其实都不太建议在实际工作中使用。

一个原因是真的太骚了,怕被打趴在工位上。

另一个原因是,它们在应对较大数据量的时候,性能不太行。

下面是最正统的方式:

data = [dict(t) for t in set([tuple(d.items()) for d in data])]
>>>data
>>>[{'a': 1}, {'b': 2}]

其实和第二种方式一样,是将数组内的每个字典转成元组,也就是不可变对象,再使用set进行去重。去重完毕后再使用dict函数将元组重新组成字典对。

但是,这种方法对于字典内还有字典的数据结构是不适用的,因此对于字典对里还有字典情况的去重,比如:

data2 = [{"a": {"b": "c"}}, {"a": {"b": "c"}}]

这种情况我建议使用第二种方式去重:

data2 = [{"a": {"b": "c"}}, {"a": {"b": "c"}}]
def delete_duplicate_str(data):
    immutable_dict = set([str(item) for item in data])
    data = [eval(i) for i in immutable_dict]
    return data
print(delete_duplicate_str(data2))

>>> [{'a': {'b': 'c'}}]

怎么样,这三种方式你都学会了吗?如果觉得有收获的话记得收藏一下。以后遇到类似的去重场景时可以拿出阅读一下。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化

为什么小黄鸭调试法在中国行不通

许多程序员都有向他人请教代码问题或解释代码的经历,而在解释的过程中,程序员可能就发觉了问题的解决方案,一边阐述代码的意图,一遍观察代码实际上产生的行为并调试,一旦有任何不协调的地方就能够迅速地发现并解决问题。

小黄鸭调试法就是这样,工作的时候,在电脑旁边放一只小黄鸭,耐心地跟它讲解每一行代码的逻辑和意义,以此来激发灵感和发现矛盾。

可惜,在当今中国的编码环境中,由于产品迭代周期极短,需求量大、代码量大,程序员根本没有时间在产品上线前实施小黄鸭调试。

而这样就引发了一个问题,代码没有经过小黄鸭调试,可能存在许多隐藏的Bug,而在程序员修复这些隐藏的Bugs的时候,可能会引入新的Bug…

更糟糕的是,如果修Bug的人不是当初写Bug的那个人,对整个代码逻辑和功能逻辑并不一定非常了解,那么有可能写出比原有Bug更加严重的Bug…

久而久之就形成了负反馈,产品功能越来越多,程序员越来越多,代码越写越糟糕,最终达到一种状态:能用就行

那么为什么?小黄鸭调试法在中国行不通的本质原因是什么?

1.“时间就是金钱,效率就是生命”

房价太高,结婚要买房,娃儿要读书,不急不行。上头急、组长急、产品急、开发急,大家都急,急急如厕令,厕所都得装个计时器,哪有时间跟你只鸭子折腾。

2.代码写得好,加班少,涨薪真的没你份儿

大部分开发者的目的都不是写出最优质的代码,而是赚最多的钱。为了钱而写代码,编程不过赚钱的工具,功能写得越多、钱就越多。

Bug多一点无所谓,毕竟老子写的功能比Bug多得多,那你小黄鸭调试法还有啥用,不是在拖累我赚钱的速度吗?

3.各种编程营销组织带坏编程风气

如果你是编程教育的细分领域,教量化投资、SaaS编程等技术含量高的课程也就算了。问题是一个教Python基础的课程起这样的标题是何居心?

总而言之,小黄鸭调试法的行不通,是因为中国程序员大量的时间(包括加班时间),花在了一些原本可以避免的Bug和可有可无的需求上。

从微观上看,是对KPI的焦虑、是对生活的担忧。

从宏观上看,是公司违法成本低、甚至根本不需要成本。

租下来的办公楼,多开几个小时灯就能多出一个新功能,成本近乎于零,没人用不亏,有人用则赚爆,何乐而不为?

但是许多人没想过的是,这样的生产成果,终究会化为垃圾,甚至对于整个产品而言单纯只是拖油瓶。真正长久不衰的功能,往往需要精心打造,而非急功近利。

在当今中国的编码环境中,我看不到有进行任何科学规划的团队,看不到有精心打造软件产品的公司。

就长期而言,我对中国的软件业持看空态度。为了急功近利的眼前效益,程序员生产的代码质量自然变得极差,就如同前面提到的负反馈效应,越往后,质量只会越来越差。

代码世界里的那些令人喷饭的注释

代码里的注释经常能让人嘀笑皆非,有些人喜欢在里边“搞事情”,另外有些人非常擅长写幽默搞笑的注释内容,还有些人无奈地在注释里告警后人…

下面就给大家展示一些国外程序员们写的注释。

注意:看的时候严禁喝水或进食。

1、亲爱的代码维护人员:

当您尝试优化这段代码但发现这是一个极端错误的决定的时候,请修改下面的计时器,以便警示后人。

总计浪费在这段代码的时间 = 16 小时

2、真的很有问题

3、谨以此代码献给我的妻子达琳,感谢她一直支持我,还有我三个孩子和一只狗。

4、神奇代码,请勿改动

5、喝醉啦,迟些再弄

6、你可能会认为你读得懂以下的代码。但是你不会懂的,相信我吧。

要是你尝试玩弄这段代码的话,你将会在无尽的通宵中不断地咒骂自己为什么会认为自己聪明到可以优化这段代码。

好了,现在请关闭这个文件去玩点别的吧。

7、程序员1(于 2002 年 6 月 7 日):在登陆界面临时加入一些调试代码

程序员2(于 2007 年 5 月 22 日):临你个屁啊

8、反正这个办法就修复了问题,我也不知道为什么会这样

9、要理解什么是递归的话,请参考本文件的底部

(在文件的底部)

要理解什么是递归的话,请参考本文件的顶部

10、狂插两下; //痛啊

11、亲爱的未来的我自己,请原谅我。

我有着难以表达的歉意。

12、我不对以下代码负责。

是他们逼我写的,是违背我意愿的。

13、疯了吗?欢迎来到斯巴达。

14、要是你能修正这个问题的话,我会送给你两个七十二岁的处女

15、没有注释留给你,难写的代码必定难读

16、IE 浏览器的 Hack (在这里先假设 IE 是浏览器)

17、有待修正。 修正什么啊?

18、要是再让我看到这种代码,我会带着枪来上班的

19、有只龙在这里……

20、在你阅读以下代码时,你要先搞懂为什么我在这样做。

我想读取一个根节点下面所有的子节点,以便控制根节点不会显示在选择框上。但那个傻逼的 DBA 找了一些某些傻逼的借口不让我用索引去读取这些数据,而要求我用他们傻逼的迭代器。所以有了以下代码。

21、当我写这段代码的时候,只有老天和我自己知道我在做什么。

现在,只剩老天知道了。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化

自定义MySQL数据流 — Python 量化投资实战教程(9)

前面八篇量化投资实战教程,我们所使用到的数据仅仅只有收盘价、成交量等普通指标,如果我们有其他的指标需要进行回测怎么办?

此外,前面使用的数据源都是基于csv文件的,我们能否从数据库(比如MySQL)中直接提取数据作为回测的数据源呢?

​事实上,backtrader虽然没有直接提供接口给我们做这样的优化,但是我们可以通过继承DataBase基类重写DataFeed实现目的。下面就给大家演示一下如何从MySQL中提取数据并增加换手率指标进行回测。

本文完整源代码和数据均在开源代码仓库中:
https://github.com/Ckend/pythondict-quant

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

在终端输入以下命令安装我们所需要的依赖模块:

pip install backtrader
pip install numpy
pip install matplotlib

看到 Successfully installed xxx 则说明安装成功。

2.自定义DataFeed

何为DataFeed?DataFeed 即 backtrader 中的“数据源”,任何数据进入策略回测前都要通过DataFeed,而DataFeed中会对数据进行处理,使得策略可以高效地进行计算。

而我们今天要做的,就是增加一个基于MySQL的DataFeed,使得整个流程变得更加自动化。

首先,需要定义一个类,使其继承与backtrader的数据基类 DataBase:

from backtrader.feed import DataBase
from backtrader import date2num


class MySQLData(DataBase):
    pass

如果需要从外部传入所需股票数据的代码和其一定范围内的K线数据,需要提前定义params. 此外,如果你有除了datetime(时间)open(开盘价)close(收盘价)high(最高价)low(最低价)volume(成交量) 之外的指标。需要提前定义lines,如下所示:

class MySQLData(DataBase):
    params = (
        ('fromdate', datetime.datetime.min),
        ('todate', datetime.datetime.max),
        ('ts_code', ''),
    )
    lines = (
        "turnover",
        "turnover_rate"
    )

可见在lines中,我增加了两个指标:turnover(成交额)turnover_rate(换手率)

接下来,编写一个函数根据params参数从MySQL中获取数据:

    def load_data_from_db(self, table, ts_code, start_time, end_time):
        """
        从MySQL加载指定数据
        Args:
            table (str): 表名
            ts_code (str): 股票代码
            start_time (str): 起始时间
            end_time (str): 终止时间
        return:
            data (List): 数据集
        """
        db = pymysql.connect(
            host="localhost",
            user="root",
            password="12345678",
            db="golden_stone",
            port=3306
        )

        cur = db.cursor()
        sql = cur.execute(
            f"SELECT * FROM {table} WHERE trade_time >= '{start_time}' and trade_time < '{end_time}'"
            f"and ts_code = '{ts_code}' order by trade_time asc"
        )
        data = cur.fetchall()
        db.close()
        return iter(list(data))

代码本身没有什么可说的,记得替换你本地的mysql配置,值得注意的是最后一行,拿到mysql数据后需要转化为迭代器。

在类初始化的时候,需要定义相关的数据存放变量并调用上述函数获取数据:

    def __init__(self, *args, **kwargs):
        self.result = []
        self.empty = False

    def start(self):
        self.result = self.load_data_from_db("stock_normalk", self.p.ts_code, self.p.fromdate, self.p.todate)

接下来到了关键的步骤,在 `cerebro.adddata(data)` 的时候,cerebro会遍历Datafeed的所有数据,此时会调用_load函数, 因此我们需要在这里,将数据库中提取的每列数据对应到lines上:

    def _load(self):
        if self.empty:
            return False
        try:
            one_row = next(self.result)
        except StopIteration:
            return False
        self.lines.datetime[0] = date2num(one_row[3])
        self.lines.open[0] = float(one_row[4])
        self.lines.high[0] = float(one_row[5])
        self.lines.low[0] = float(one_row[6])
        self.lines.close[0] = float(one_row[7])
        self.lines.volume[0] = float(one_row[8])
        self.lines.turnover[0] = float(one_row[9])
        self.lines.turnover_rate[0] = float(one_row[12])
        return True

如果你完整地看完了我的上述分析,那么理解下面整个DataFeed,甚至自己写一个DataFeed,是非常容易的。

# Python 实用宝典
# 自定义数据流 — Python 量化投资实战教程(9)
import datetime
import traceback
import pymysql

from backtrader.feed import DataBase
from backtrader import date2num


class MySQLData(DataBase):
    params = (
        ('fromdate', datetime.datetime.min),
        ('todate', datetime.datetime.max),
        ('ts_code', ''),
    )
    lines = (
        "turnover",
        "turnover_rate"
    )

    def load_data_from_db(self, table, ts_code, start_time, end_time):
        """
        从MySQL加载指定数据
        Args:
            table (str): 表名
            ts_code (str): 股票代码
            start_time (str): 起始时间
            end_time (str): 终止时间
        return:
            data (List): 数据集
        """
        db = pymysql.connect(
            host="localhost",
            user="root",
            password="12345678",
            db="golden_stone",
            port=3306
        )

        cur = db.cursor()
        sql = cur.execute(
            f"SELECT * FROM {table} WHERE trade_time >= '{start_time}' and trade_time < '{end_time}'"
            f"and ts_code = '{ts_code}' order by trade_time asc"
        )
        data = cur.fetchall()
        db.close()
        return iter(list(data))

    def __init__(self, *args, **kwargs):
        self.result = []
        self.empty = False

    def start(self):
        self.result = self.load_data_from_db("stock_normalk", self.p.ts_code, self.p.fromdate, self.p.todate)

    def _load(self):
        if self.empty:
            return False
        try:
            one_row = next(self.result)
        except StopIteration:
            return False
        self.lines.datetime[0] = date2num(one_row[3])
        self.lines.open[0] = float(one_row[4])
        self.lines.high[0] = float(one_row[5])
        self.lines.low[0] = float(one_row[6])
        self.lines.close[0] = float(one_row[7])
        self.lines.volume[0] = float(one_row[8])
        self.lines.turnover[0] = float(one_row[9])
        self.lines.turnover_rate[0] = float(one_row[12])
        return True

3.使用自定义数据流进行回测

接下来,让我们尝试使用这个自定义数据流输入数据,采用第二章的macd策略辅助增加换手率指标进行回测。

这里当然需要你先读懂第二章的内容,如果有点忘记了,可以回头阅读一下,非常简单:

Python 量化投资实战教程(2) —MACD策略

首先,在回测模块及next函数中,引入换手率指标:

class TestStrategy(bt.Strategy):
    def log(self, txt, dt=None):
        ''' Logging function fot this strategy'''
        dt = dt or self.datas[0].datetime.date(0)
        print('%s, %s' % (dt.isoformat(), txt))

    @staticmethod
    def percent(today, yesterday):
        return float(today - yesterday) / today

    def __init__(self):
        self.dataclose = self.datas[0].close
        self.volume = self.datas[0].volume
        # 新的变更:引入换手率指标
        self.turnover_rate = self.datas[0].turnover_rate

next函数买入时增加判断换手率必须小于3%的条件:

    # Python 实用宝典
    def next(self):
        self.log('Close, %.2f' % self.dataclose[0])
        if self.order:
            return

        if not self.position:
            condition1 = self.macd[-1] - self.signal[-1]
            condition2 = self.macd[0] - self.signal[0]
            # 增加判断换手率小于3%的条件
            if condition1 < 0 and condition2 > 0 and self.turnover_rate[0] < 3:
                self.log('BUY CREATE, %.2f' % self.dataclose[0])
                self.order = self.buy()

        else:
            condition = (self.dataclose[0] - self.bar_executed_close) / self.dataclose[0]
            if condition > 0.1 or condition < -0.1:
                self.log('SELL CREATE, %.2f' % self.dataclose[0])
                self.order = self.sell()

最后,引入我们刚刚编写完成的MySQLData Feed,传入相关参数读取股票为603520.SH的数据流,取2017年1月1日至2020年4月12日的数据,并调用回测函数:

if __name__ == '__main__':
    cerebro = bt.Cerebro()

    cerebro.addstrategy(TestStrategy)

    # 加载数据到模型中
    data = MySQLData(
        ts_code="sh603520",
        fromdate=datetime.datetime(2017, 1, 1),
        todate=datetime.datetime(2020, 4, 12),
    )
    cerebro.adddata(data)

    cerebro.broker.setcash(10000)

    cerebro.addsizer(bt.sizers.FixedSize, stake=100)

    cerebro.broker.setcommission(commission=0.005)

    print('Starting Portfolio Value: %.2f' % cerebro.broker.getvalue())

    cerebro.run()

    print('Final Portfolio Value: %.2f' % cerebro.broker.getvalue())

    cerebro.plot()

效果如下:

与原来相比,加入换手率指标后,利润率有一定的提高,不过就如我们之前所说的,单一股票维度的回测是不准确的,如果大家有兴趣,可以将第三章:

Python 量化投资实战教程(3) —A股回测MACD策略

改造一下并加入换手率指标进行回测,看看这个指标是否真的有正效益。

本文完整源代码和数据均在开源代码仓库中:
https://github.com/Ckend/pythondict-quant

如果你访问不了github,也可以在公众号后台回复 量化投资9 下载相关代码。

欢迎在公众号后台回复:加群,回答相应红字验证信息,进入互助群交流。

我们的文章到此就结束啦,如果你喜欢今天的Python 实战教程,请持续关注Python实用宝典。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 (pythondict.com)
不只是一个宝典
欢迎关注公众号:Python实用宝典

Python编写的超帅数独可视化解题器

数独相信大家都玩过,它被称为“聪明人的游戏”,在很多人眼里:

会玩数独=高智商

为什么?因为数独能够培养观察力,提高反应力: 数独的练习能够锻炼手眼脑的协调性、提高手脑并用的能力,锻炼大脑的思维灵活度,全面提高反应力。

非常适合孩子在成长过程中锻炼大脑,适合成年人在生活中激活思维。

不过当我们遇到不会解的数独怎么办?答案是,用Python算出来!

基于 Pygame-Sudoku-Solver 这个开源项目,可视化解决数独问题变得极其简单。

1.准备

开始之前,你要确保Python和pip已经成功安装在电脑上,如果没有,请访问这篇文章:超详细Python安装指南 进行安装。

(可选1) 如果你用Python的目的是数据分析,可以直接安装Anaconda:Python数据分析与挖掘好帮手—Anaconda,它内置了Python和pip.

(可选2) 此外,推荐大家用VSCode编辑器来编写小型Python项目:Python 编程的最好搭档—VSCode 详细指南

Windows环境下打开Cmd(开始—运行—CMD),苹果系统环境下请打开Terminal(command+空格输入Terminal),输入命令安装依赖:

1.在终端输入以下命令下载该开源库

git clone https://github.com/tymscar/Pygame-Sudoku-Solver.git

2.使用cd命令进入该文件夹,并安装依赖:

cd Pygame-Sudoku-Solver
pip install -r requirements.txt

接下来,可以试试运行该项目了:

python solver.py

此时会出现一个空白3*3的九宫格

2.怎么解题

这个开源项目的解题方法如下:

1.输入题目数字 — 你只需要点击空白区域,此时会回显绿色方块,输入数字,如果数字合法则会填入框内,如果不合法则会闪现红色。

2.当你将数独题目里的所有数字填写完毕,单击空格键即可开始运算:

此图像的alt属性为空;文件名为2020110915311471.gif

而且,细心的作者还帮大家准备了夜晚模式,单击“d”键可切换到夜晚模式:

3.原理

所有的解题源代码都放在了solver.py文件中,大家可以在里面看到整个解题过程。

作者没有写任何注释,但是代码逻辑思路是清晰的,比如核心判断逻辑,Cell类里的 isValid, 用于判断某个值 (what变量) 放进某个 Cell 里是否合法:

此处,lineV.cells 表示数组中每一列组成的cell;lineH.cells即每一行组成的cell;box.cells即每个子九宫格。他们都有一个共同的特点:其中不能出现重复的值。

因此你会看到如果某个值存在于这些cells当中,isValid直接返回False,表明其不应该出现在这个位置。

如果你的网络较差,git clone拿不到代码,可以在公众号后台回复:数独 下载源代码。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化

Python 自动化,Helium 凭什么取代 Selenium?

来自AirPython哥的分享。

1. Helium 是什么?

Helium 是一款 Web 端自动化开源框架,全称是:Selenium-Python-Helium,从名字上就可以看出,Helium 似乎和 Selenium 息息相关

确实,Helium 针对 Selenium 进行了封装,它屏蔽了 Selenium 很多实现细节,提供了更加简洁直观的 API,更方便我们进行 Web 端的自动化

官方表示,要实现同样的功能,Helium 相比 Selenium 要少 30% – 50% 的代码

目前,Helium 仅支持 Chrome 和 FireFox

2.优缺点

Helium 主要包含下面 6 个优点:

  • Helium 自带 WebDriver,不需要下载、配置浏览器驱动
  • 内嵌页面 iframe 页面元素直接操作,不需要使用 switch_to.frame() 切换 iframe
  • 窗体管理更方便,可以直接使用窗口标题或部分标题内容来切换窗体
  • 隐式等待,针对某个元素执行点击操作,Selenium 如果元素没有出现,脚本会执行失败;而 Helium 默认最多等待 10s,等待元素出现后立马执行点击操作
  • 显式等待,Helium 提供更加优雅的 API 来等待页面元素出现
  • API 更简洁直观,代码量少

Helium 主要缺点,体现在:

  • 由于封装,屏蔽了很多细节,所以它不合适二次开发
  • 目前仅支持 Chrome 和 FireFox 浏览器
  • 版本更新慢、遗留 Bug 及文档少

3.准备一下

切换到对应的虚拟环境下,通过 pip 命令安装依赖即可

# 安装依赖
pip3 install helium

接着,我们在 IDE 中,使用 helium.__all__ 打印出它包含的属性及方法

我们发现,Helium 包含的操作动作、控件对象、键盘操作关键字基本覆盖了大部分的自动化操作场景

4.Selenium VS Helium

是骡子是马,拉出来溜溜 ~

接下来,我们以登录 126 邮箱为例,来比较 Selenium 和 Helium

1、传统 Selenium 实现

首先,我们需要下载并配置 WebDriver,然后实例化 WebDriver 对象,打开邮箱登录的主页面

from selenium import webdriver

# 实例化Driver
driver = webdriver.Chrome()
# 隐式等待10s
driver.implicitly_wait(10)
# 打开主页面
driver.get(home_url)

通过观察网页元素,发现输入框区域被包裹在 iframe 内嵌页面中

所以,我们需要使用 switch_to.frame() 函数切换到对应的 iframe,才能操作 iframe 内部的元素

from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait

# 显示等待打开主页面
wait = WebDriverWait(driver, 10, 0.5)

# 切换到对应的iframe,否则无法操作内部元素
wait.until(
    EC.frame_to_be_available_and_switch_to_it(driver.find_element_by_xpath('//iframe[contains(@id,"x-URS-iframe")]')))

接着,使用 Selenium API( 这里以 Xpath 为例 )查找用户名和密码输入框、登录按钮;输入用户名和密码,模拟登录操作

# 用户名输入框
element_input = wait.until(EC.visibility_of(driver.find_element_by_xpath('//input[@name="email"]')))
element_input.clear()
element_input.send_keys(username)

# 密码输入框
element_password = wait.until(EC.visibility_of(driver.find_element_by_xpath('//input[@name="password"]')))
element_password.clear()
element_password.send_keys(password)

# 登录按钮
wait.until(EC.element_to_be_clickable((By.XPATH, '//a[@id="dologin"]'))).click()

最后,通过某个页面元素是否出现来判断是否登录成功

# 找一个登录成功的页面元素
# 通过元素属性+元素值来唯一定位元素
result = True
try:
    element_recy_email = wait.until(EC.element_to_be_clickable((By.XPATH, '//span[@class="oz0" and contains(text(),"收 信")]')))
    if element_recy_email:
        result = True
    else:
        result = False
except Exception as e:
    result = False

print("邮箱登陆成功" if result else "邮箱登录失败")

2、Helium 实现

接下来,我们通过 Helium 的方式来实现这一操作

首先,我们只需要 2 行代码即可以打开主页

from helium import *

# 打开主页
driver = start_chrome(home_url)

# 等待元素加载完成
wait_until(Text("你的专业电子邮局").exists)

然后,通过内置 TextField 控件对象及预设文本内容,使用 write 动作输入用户名和密码

# 不需要切换iframe,直接输入
write(username,TextField('邮箱帐号或手机号码'))
write(password,TextField('输入密码'))

值得一提的是,Helium 不需要切换 iframe,可以直接操作内嵌页面元素,简直不要太方便!

# 模拟点击Enter键登录
press(ENTER)

通过 Helium 内置的 wait_until 方法 + 控件对象,可以显式等待元素出现,默认最长时间为 10s比如,这里等待登录完成的主页面加载完成,收件箱可以点击,执行一次点击操作

wait_until(Text('收 信').exists)

# 点击收件箱
click(Text('收 信'))

最后,调用 kill_browser() 方法关闭浏览器,结束自动化操作

# 退出
sleep(10)

# 关闭浏览器
kill_browser()

需要指出的是,Helium 使用 start_chrome() 方法返回的对象实际上就是 WebDriver 对象,可以结合 Selenium API 一起使用

5.最后

通过上面的对比发现,Helium 相比 Selenium 使用似乎更方便,但是它不适用于一些复杂的页面

因此,在实际自动化项目中,建议搭配 Selenium 和 Helium 使用,简单的页面使用 Helium,复杂的页面切换到 Selenium

如果你觉得文章还不错,请大家 点赞、分享、留言下,因为这将是我持续输出更多优质文章的最强动力!

Python制作国际空间站实时跟踪器

Open Notify是一个开源项目,旨在为NASA的一些出色数据提供简单的编程接口。

open-notify.org 的作者做了一些工作,以获取原始数据并将其转换为与太空和航天器有关的API

本文将通过这个接口,获取得到国际空间站的位置,并实时地绘制到地图上:

感谢cr0sis/Real-time-International-space-station-tracker

为了实现本文的目标,你得先安装ISS_Info:

pip install ISS-Info

下面分步骤讲解整套绘制流程

1.地图初始化

为了实时展示国际空间站的路径,需要使用turtle绘制曲线,因此可以创建一个turtle画布,将背景设为地球:

  
import ISS_Info
import turtle
import time
import json
import urllib.request

screen = turtle.Screen()
screen.setup(720,360)
screen.setworldcoordinates(-180,-90,180,90)
screen.bgpic("map.png")
screen.bgcolor("black")
screen.register_shape("isss.gif")
screen.title("Real time ISS tracker")

iss = turtle.Turtle()
iss.shape("isss.gif")

2.获取空间站的人数

如果能知道空间站上的宇航员人数,我们就能更加准确的跟踪国际空间站。幸运的是open-notify确实提供了这样的接口。

为了获取人数信息,我们必须向:
http://api.open-notify.org/astros.json
请求拿到数据,并将相应的宇航员名字写在左上角:

astronauts = turtle.Turtle()
astronauts.penup()
astronauts.color('black')
astronauts.goto(-178,86)
astronauts.hideturtle()
url = "http://api.open-notify.org/astros.json"
response = urllib.request.urlopen(url)
result = json.loads(response.read())
print("There are currently " + str(result["number"]) + " astronauts in space:")
print("")
astronauts.write("People in space: " + str(result["number"]), font=style)
astronauts.sety(astronauts.ycor() - 5)

people = result["people"]

for p in people:
    print(p["name"] + " on: " + p["craft"])
    astronauts.write(p["name"] + " on: " + p["craft"], font=style)
    astronauts.sety(astronauts.ycor() - 5)

3.绘制空间站位置

为了能够绘制空间站的实时位置,我们需要请求拿到空间站的位置信息。请求的接口是:
http://api.open-notify.org/iss-now.json

不过作者将其封装成了一个函数,我们直接调用 iss_current_loc 即可,循环获取国际空间站位置:

while True:  
    location = ISS_Info.iss_current_loc()
    lat = location['iss_position']['latitude']
    lon = location['iss_position']['longitude']
    print("Position: \n latitude: {}, longitude: {}".format(lat,lon))
    pos = iss.pos() 
    posx = iss.xcor()
    if iss.xcor() >= (179.1):           ### Stop drawing at the right edge of  
        iss.penup()                     ### the screen to avoid a 
        iss.goto(float(lon),float(lat)) ### horizontal wrap round line
        time.sleep(5)
    else:
      iss.goto(float(lon),float(lat))
      iss.pendown()
      time.sleep(5)

我们还可以标出自己目前所处的位置,以查看和国际空间站的距离及空间站经过你上空的时间点(UTC)。

# 深圳
lat = 112.5118928
lon = 23.8534489

prediction = turtle.Turtle()
prediction.penup()
prediction.color('yellow')
prediction.goto(lat, lon)
prediction.dot(5)
prediction.hideturtle()

url = 'http://api.open-notify.org/iss-pass.json?lat=' +str(lat-90) + '&lon=' + str(lon)
response = urllib.request.urlopen(url)
result = json.loads(response.read())

over = result ['response'][1]['risetime']

prediction.write(time.ctime(over), font=style) 

不过这里值得注意的是,iss-pass.json这个接口的纬度计算必须在-90到90之内,因此深圳的纬度需要减去90.

最终效果如下:

在Python实用宝典公众号后台回复“国际空间站”或者“ISS”即可获得本文完整源代码哦。

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化

Python 导出word所有图片并转化格式

作者:叶庭云

日常工作中,你是否遇到过这样的场景,领导发来一份 Word 文档,要求你将文档中的图片存储到一个文件夹内,并且还要将图片都改成 .jpg 或者 .png,你会怎么办?

你是不是一边内心崩溃,一边开始一张张的 另存为

今天,我就教你两招省时省力的方法,不管文档中有几张甚到几百张图片,你都可以快速保存下来。

一、分析

图片在文档的应用已经是十分普遍的现象了,在Word文档中插入合适的图片无疑会让我们的文档变得更美观。

先来回想一下,我们平常是如何在Word中插入图片的?

  • 在本地电脑中事先存储好需要的图片素材,然后插入到Word中

其实,第二种方法有一个弊端在于图片只存在 Word 中,如果我们需要将它们保存到本地电脑中以供日后使用,最常用的方法是单击鼠标右键,选择 另存为图片,然后选择路径进行保存。

这种方法在只需要处理少数几张图片时还算适用,一旦图片数量增多,处理工作就会变得繁琐且容易出错。

那么,我们怎样可以将这些图片批量保存呢?

二、提取出 Word 文档里的图片

解决方法就是:更改文件格式。直接将 Word 文档的后缀名改成 “.rar” (“.zip”也是可以的)的压缩格式。打开压缩文件,点击【word】-【media】,文档中使用的图片就出现在这里,只需要选中解压出来即可。

用于测试的 Word 文档如下:

操作方法如下:

点击查看,选择详细信息,勾上文件扩展名。

直接将 Word 文档的后缀名改成 “.rar” (“.zip”也是可以的)的压缩格式。

打开压缩文件,点击【word】-【media】,文档中使用的图片就出现在这里,只需要选中解压出来即可。

三、利用 python 批量转换格式

# -*- coding: UTF-8 -*-
"""
@File    :test_01.py
@Author  :叶庭云
@CSDN    :https://yetingyun.blog.csdn.net/
"""
# 导入os模块
import os

# 不存在 jpg图片 这个文件夹  创建
if not os.path.exists('jpg图片'):
    os.mkdir('jpg图片')


path = r'.\jpg图片'
# 列出 media 文件夹下所有图片
files = os.listdir(r'.\media')

for item in files:
    # 拼接出media 文件夹下所有图片路径
    file_1 = '.\media' + '/' + item
    # 读取图片数据
    with open(file_1, 'rb') as f:
        con = f.read()
    # 重新写入  以 .jpg 格式 并保存到jog图片文件夹
    file_name = path + '/' + item.split('.')[0] + '.jpg'
    with open(file_name, 'wb') as f:
        f.write(con)

运行效果如下:

程序运行,嗖的一下,图片格式都转换成了 .jpg 并保存到新的文件夹里。

作者:叶庭云

CSDN:https://blog.csdn.net/fyfugoyfa

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化

Django RestFramework 请求流程、解析器、序列化器分析

本文重点在于讲解什么是REST规范及Django RestFramework中的APIview请求流程,这个流程包括源代码分析和后续的解析器组件及序列化器组件。

1.什么是REST

编程是数据结构和算法的结合,而在Web类型的App中,我们对于数据的操作请求是通过url来承载的,本文详细介绍了REST规范和CBV请求流程。

编程是数据结构和算法的结合,小程序如简单的计算器,我们输入初始数据,经过计算,得到最终的数据,这个过程中,初始数据和结果数据都是数据,而计算过程是我们所说的广义上的算法。

大程序,如一个智能扫地机器人,我们可以设置打扫的距离,左右摆动的幅度来打扫房间,这里面打扫的举例,摆动幅度,都是数据,而打扫的过程是较为复杂的算法过程,总之,也是算法,即程序的实现方式。

另外,我们还可以设置打扫时间等等初始数据。

总之一句话,编程即数据结构和算法的结合。简单的程序可能不需要跟用户交互数据,但是现代的应用程序几乎都需要跟用户进行交互,不分应用程序类型,不管是CS型还是BS型的程序都是如此,而Python最擅长的Web App即BS型的程序,就是通过url和http来跟用户进行数据交互的,通过url和http请求,用户可以操作服务器端的程序,主要操作分为:增、删、改、查几类

引入

在开始之前,我们回顾一下咱们之前写过的图书管理系统项目,请仔细回想一下,对于该项目的设计,我们大概是以下面这种方式来实现的

传统url设计风格
  • url各式各样,设计混乱
DRF

理论上来说,这种方式完全可以实现我们的需求,但是一旦项目丰富起来,随着数据量增加,随着各个业务系统之间的逻辑关系不断的复杂,url会越来越复杂,理论上来说,不管是什么类型、什么名称的url都能指向具体的业务逻辑(视图函数),从而实现业务需求,但是如果没有明确的规范,因每个人的思维方式不一样、命名方式不一样而导致的url非常的乱,不方便项目的后期维护和扩展。

  • 对于请求处理成功或者失败的返回信息没有明确的响应信息规范,返回给客户端的信息往往都是很随意的

以上这些情况的出现,导致了很多的问题,让互联网的世界变得杂乱不堪,日益复杂且臃肿。

因此http协议创始人警告我们这些凡人们正在错误的使用http协议,除了警告,他还发表了一篇博客,大概意思就是教大家如何正确使用http协议,如何正确定义url,这就是REST(Representational State Transfer),不需要管这几个英文单词代表什么意思,只需要记住下面一句话:

  • 用url唯一定位资源,用Http请求方式(GET, POST, DELETE, PUT)描述用户行为

根据这句话,我们重新定义图书管理系统中的url

RESTful Api设计风格
DRF

可以看到,url非常简洁优雅,不包含任何操作,不包含任何动词,简简单单,用来描述服务器中的资源而已,服务器根据用户的请求方式对资源进行各种操作。而对数据的操作,最常见的就是CRUD(创建,读取,更新,删除),通过不同的请求方式,就足够描述这些操作方式了。

如果不够用,Http还有其他的请求方式呢!比如:PATCH,OPTIONS,HEAD, TRACE, CONNECT。

REST定义返回结果
DRF

每一种请求方式的返回结果不同。

REST定义错误信息
{
    "error": "Invalid API key"
}

通过一个字典,返回错误信息。

这就是REST,上图中的url就是根据REST规范进行设计的RESTful api。

因此REST是一种软件架构设计风格,不是标准,也不是具体的技术实现,只是提供了一组设计原则和约束条件。

它是目前最流行的 API 设计规范,用于 Web 数据接口的设计。2000年,由Roy Fielding在他的博士论文中提出,Roy Fielding是HTTP规范的主要编写者之一。

那么,我们所要讲的Django RestFramework与rest有什么关系呢?

其实,DRF(Django RestFramework)是一套基于Django开发的、帮助我们更好的设计符合REST规范的Web应用的一个Django App,所以,本质上,它是一个Django App。

2.为什么使用DRF

从概念就可以看出,有了这样的一个App,能够帮助我们更好的设计符合RESTful规范的Web应用,实际上,没有它,我们也能自己设计符合规范的Web应用。下面的代码演示如何手动实现符合RESTful规范的Web应用。

class CoursesView(View):
    def get(self, request):
        courses = list()

        for item in Courses.objects.all():
            course = {
                "title": item.title,
                "price": item.price,
                "publish_date": item.publish_date,
                "publish_id": item.publish_id
            }

            courses.append(course)

        return HttpResponse(json.dumps(courses, ensure_ascii=False))

如上代码所示,我们获取所有的课程数据,并根据REST规范,将所有资源的通过对象列表返回给用户。

可见,就算没有DRF我们也能够设计出符合RESTful规范的接口甚至是整个Web App,但是,如果所有的接口都自定义,难免会出现重复代码,为了提高工作效率,我们建议使用优秀的工具。

DRF就是这样一个优秀的工具,另外,它不仅仅能够帮助我们快速的设计符合REST规范的接口,还提供诸如认证、权限等等其他的强大功能。

什么时候使用DRF?

前面提到,REST是目前最流行的 API 设计规范,如果使用Django开发你的Web应用,那么请尽量使用DRF,如果使用的是Flask,可以使用Flask-RESTful。

3.Django View请求流程

首先安装Django,然后安装DRF:

pip install django
pip install djangorestframework

安装完成之后,我们就可以开始使用DRF框架来实现咱们的Web应用了,本篇文章包括以下知识点:

  • APIView
  • 解析器组件
  • 序列化组件

介绍DRF,必须要介绍APIView,它是重中之重,是下面所有组件的基础,因为所有的请求都是通过它来分发的,至于它究竟是如何分发请求的呢?

想要弄明白这个问题,我们就必须剖析它的源码,而想要剖析DRF APIView的源码,我们需要首先剖析django中views.View类的源码,为什么使用视图类调用as_view()之后,我们的请求就能够被不同的函数处理呢?

DRF

源码中最后会通过getattr在self中查找request.method.lower(),也就是get、post或者delete这些方法中的一个,那么,self是谁,就是至关重要的一点,前面讲到过,谁调用类中的方法,self就指向谁,此时,一层层往回找,我们会发现,self = cls(**initkwargs),self就是我们视图类的实例化对象,所以,dispatch函数肯定会到该视图类中找对应的方法(get或者post)。

接下来是提问时间,请问如果有如下函数,不修改函数内容的情况下,如何给函数新增一个统计执行时间的功能:

def outer(func):
    def inner(*args, **kwargs):
        import time
        start_time = time.time()
        ret = func(*args, **kwargs)
        end_time = time.time()
        print("This function elapsed %s" % str(end_time - start_time))
        return ret
    return inner


@outer
def add(x, y):
    return x + y

这是函数,如果是类呢?面向对象编程,如何扩展你的程序,比如有如下代码:

class Person(object):
    def show(self):
        print("Person's show method executed!")
        

class MyPerson(Person):
    def show(self):
        print("MyPerson's show method executed")
        super().show()
        

mp = MyPerson()
mp.show()

这就是面向对象的程序扩展,现在大家是否对面向对象有了更加深刻的认识呢?接下来给大家十分钟时间,消化一下上面两个概念,然后请思考,那么假设你是Django RestFramework的开发者,你想自定制一些自己想法,如何实现。

好了,相信大家都已经有了自己的想法,接下来,我们一起来分析一下,Django RestFramework的APIView是如何对Django框架的View进行功能扩展的。

from django.shortcuts import HttpResponse

import json

from .models import Courses

# 引入APIView
from rest_framework.views import APIView
# Create your views here.


class CoursesView(APIView):  # 继承APIView而不是原来的View
    def get(self, request):
        courses = list()

        for item in Courses.objects.all():
            course = {
                "title": item.title,
                "description": item.description
            }

            courses.append(course)

        return HttpResponse(json.dumps(courses, ensure_ascii=False))
DRF

以上就是Django RestFramework APIView的请求处理流程,我们可以通过重写dispatch()方法或者重写as_view()方法来自定制自己的想法。

那么,Django RestFramework到底自定制了哪些内容呢?在本文的最开始,我们已经介绍过了,就是那些组件,比如解析器组件、序列化组件、权限、频率组件等。

Ajax发送Json数据给服务器

接下来,我们就开始介绍Django RestFramework中的这些组件,首先,最基本的,就是解析器组件,在介绍解析器组件之前,我提一个问题,请大家思考,如何发送Json格式的数据给后端服务器?

好了,时间到,请看下面的代码,通过ajax请求,我们可以发送json格式的数据到后端:

<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <title>Title</title>
  <script src="https://cdn.bootcss.com/jquery/3.3.1/jquery.min.js"></script>
  <script src="/static/jquery-1.10.2.min.js"></script>
</head>
<body>
  <form action="" method="post" enctype="application/x-www-form-urlencoded">
    {% csrf_token %}
    用户名: <input type="text" name="username"/>
    密码:  <input type="password" name="password"/>
    提交:  <input type="submit" value="提交"/>
  </form>

  <hr>
  <button class="btn">点击发送Ajax请求</button>

  <script>
    $(".btn").click(function () {
      $.ajax({
        url: '',
        type: 'post',
        contentType: 'application/json',
        data: JSON.stringify({
          username: "alex",
          password: 123
        }
        ),
        success: function (data) {
          console.log(data);
        }
      })
    })

  </script>

</body>
</html>

通过上文的知识点复习我们已经知道,Content-Type用来定义发送数据的编码协议,所以,在上面的代码中,我们指定Content-Type为application/json,即可将我们的Json数据发送到后端,那么后端如何获取呢?

服务器对Json数据的处理方式

按照之前的方式,我们使用request.POST, 如果打印该值,会发现是一个空对象:request post <QueryDict: {}>,该现象证明Django并不能处理请求协议为application/json编码协议的数据,我们可以去看看request源码,可以看到下面这一段:

if self.content_type == 'multipart/form-data':
    if hasattr(self, '_body'):
        # Use already read data
        data = BytesIO(self._body)
    else:
        data = self
    try:
        self._post, self._files = self.parse_file_upload(self.META, data)
    except MultiPartParserError:
        # An error occurred while parsing POST data. Since when
        # formatting the error the request handler might access
        # self.POST, set self._post and self._file to prevent
        # attempts to parse POST data again.
        # Mark that an error occurred. This allows self.__repr__ to
        # be explicit about it instead of simply representing an
        # empty POST
        self._mark_post_parse_error()
        raise
elif self.content_type == 'application/x-www-form-urlencoded':
    self._post, self._files = QueryDict(self.body, encoding=self._encoding), MultiValueDict()
else:
    self._post, self._files = QueryDict(encoding=self._encoding), MultiValueDict()

可见Django原生解析器并不处理application/json编码协议的数据请求,好了,有了这样的认识之后,咱们就可以开始正式介绍DRF的解析器了,解析器,顾名思义,就是用来解析数据的请求的。

虽然Django的原生解析器不支持application/json编码协议,但是我们可以通过拿到原始的请求数据(request.body)来手动处理application/json请求,虽然这种方式不方便,也并不推荐,请看如下代码:

class LoginView(View):
    def get(self, request):
        return render(request, 'classbasedview/login.html')

    def post(self, request):
        print(request.POST)  # <QueryDict: {}>
        print(request.body)  # b'{"username":"alex","password":123}'
        data = request.body.decode('utf-8')
        dict_data = json.loads(data)

        username = dict_data['username']
        password = dict_data['password']

        return HttpResponse(json.dumps(dict_data))

通过上面的代码,我们可以通过request.body手动处理application/json请求,不过,如上文所说,并不推荐。

4.DRF 解析器组件

首先,来看看解析器组件的使用,稍后我们一起剖析其源码:

from django.http import JsonResponse

from rest_framework.views import APIView
from rest_framework.parsers import JSONParser, FormParser
# Create your views here.


class LoginView(APIView):
    parser_classes = [FormParser]

    def get(self, request):
        return render(request, 'parserver/login.html')

    def post(self, request):
        # request是被drf封装的新对象,基于django的request
        # request.data是一个property,用于对数据进行校验
        # request.data最后会找到self.parser_classes中的解析器
        # 来实现对数据进行解析
        
        print(request.data)  # {'username': 'alex', 'password': 123}

        return JsonResponse({"status_code": 200, "code": "OK"})

使用方式非常简单,分为如下两步:

  • from rest_framework.views import APIView
  • 继承APIView
  • 直接使用request.data就可以获取Json数据

如果你只需要解析Json数据,不允许任何其他类型的数据请求,可以这样做:

  • from rest_framework.parsers import JsonParser
  • 给视图类定义一个parser_classes变量,值为列表类型[JsonParser]
  • 如果parser_classes = [], 那就不处理任何数据类型的请求了

问题来了,这么神奇的功能,DRF是如何做的?因为昨天讲到Django原生无法处理application/json协议的请求,所以拿json解析来举例,请同学们思考一个问题,如果是你,你会在什么地方加入新的Json解析功能?

首先,需要明确一点,我们肯定需要在request对象上做文章,为什么呢?

因为只有有了用户请求,我们的解析才有意义,没有请求,就没有解析,更没有处理请求的逻辑,所以,我们需要弄明白,在整个流程中,request对象是什么时候才出现的,是在绑定url和处理视图之间的映射关系的时候吗?我们来看看源码:

@classonlymethod
def as_view(cls, **initkwargs):
    """Main entry point for a request-response process."""
    for key in initkwargs:
        if key in cls.http_method_names:
            raise TypeError("You tried to pass in the %s method name as a "
                            "keyword argument to %s(). Don't do that."
                            % (key, cls.__name__))
            if not hasattr(cls, key):
                raise TypeError("%s() received an invalid keyword %r. as_view "
                                "only accepts arguments that are already "
                                "attributes of the class." % (cls.__name__, key))

def view(request, *args, **kwargs):
    self = cls(**initkwargs)
    if hasattr(self, 'get') and not hasattr(self, 'head'):
        self.head = self.get
        self.request = request
        self.args = args
        self.kwargs = kwargs
        return self.dispatch(request, *args, **kwargs)
    view.view_class = cls
    view.view_initkwargs = initkwargs

    # take name and docstring from class
    update_wrapper(view, cls, updated=())

    # and possible attributes set by decorators
    # like csrf_exempt from dispatch
    update_wrapper(view, cls.dispatch, assigned=())
    return view

看到了吗?在执行view函数的时候,那么什么时候执行view函数呢?当然是请求到来,根据url查找映射表,找到视图函数,然后执行view函数并传入request对象,所以,如果是我,我可以在这个视图函数里面加入处理application/json的功能:

@classonlymethod
def as_view(cls, **initkwargs):
    """Main entry point for a request-response process."""
    for key in initkwargs:
        if key in cls.http_method_names:
            raise TypeError("You tried to pass in the %s method name as a "
                            "keyword argument to %s(). Don't do that."
                            % (key, cls.__name__))
            if not hasattr(cls, key):
                raise TypeError("%s() received an invalid keyword %r. as_view "
                                "only accepts arguments that are already "
                                "attributes of the class." % (cls.__name__, key))

def view(request, *args, **kwargs):
    if request.content_type == "application/json":
        import json
        return HttpResponse(json.dumps({"error": "Unsupport content type!"}))

    self = cls(**initkwargs)
    if hasattr(self, 'get') and not hasattr(self, 'head'):
        self.head = self.get
        self.request = request
        self.args = args
        self.kwargs = kwargs
        return self.dispatch(request, *args, **kwargs)
    view.view_class = cls
    view.view_initkwargs = initkwargs

    # take name and docstring from class
    update_wrapper(view, cls, updated=())

    # and possible attributes set by decorators
    # like csrf_exempt from dispatch
    update_wrapper(view, cls.dispatch, assigned=())
    return view

看到了吧,然后我们试试发送json请求,看看返回结果如何?是不是非常神奇?事实上,你可以在这里,也可以在这之后的任何地方进行功能的添加。

那么,DRF是如何做的呢?我们在使用的时候只是继承了APIView,然后直接使用request.data,所以,我斗胆猜测,功能肯定是在APIView中定义的,废话,具体在哪个地方呢?

接下来,我们一起来分析一下DRF解析器源码,看看DRF在什么地方加入了这个功能。

parser

上图详细描述了整个过程,最重要的就是重新定义的request对象,和parser_classes变量,也就是我们在上面使用的类变量。好了,通过分析源码,验证了我们的猜测。

5.序列化组件

首先我们要学会使用序列化组件。定义几个 model:

from django.db import models

# Create your models here.


class Publish(models.Model):
    nid = models.AutoField(primary_key=True)
    name = models.CharField(max_length=32)
    city = models.CharField(max_length=32)
    email = models.EmailField()

    def __str__(self):
        return self.name


class Author(models.Model):
    nid = models.AutoField(primary_key=True)
    name = models.CharField(max_length=32)
    age = models.IntegerField()

    def __str__(self):
        return self.name


class Book(models.Model):
    title = models.CharField(max_length=32)
    publishDate = models.DateField()
    price = models.DecimalField(max_digits=5, decimal_places=2)
    publish = models.ForeignKey(to="Publish", to_field="nid", on_delete=models.CASCADE)
    authors = models.ManyToManyField(to="Author")

    def __str__(self):
        return self.title

通过序列化组件进行GET接口设计

设计url,本次我们只设计GET和POST两种接口:

from django.urls import re_path

from serializers import views

urlpatterns = [
    re_path(r'books/$', views.BookView.as_view())
]

我们新建一个名为app_serializers.py的模块,将所有的序列化的使用集中在这个模块里面,对程序进行解耦:

# -*- coding: utf-8 -*-
from rest_framework import serializers

from .models import Book


class BookSerializer(serializers.Serializer):
    title = serializers.CharField(max_length=128)
    publish_date = serializers.DateTimeField()
    price = serializers.DecimalField(max_digits=5, decimal_places=2)
    publish = serializers.CharField(max_length=32)
    authors = serializers.CharField(max_length=32)

接着,使用序列化组件,开始写视图类:

# -*- coding: utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response

# 当前app中的模块
from .models import Book
from .app_serializer import BookSerializer

# Create your views here.

class BookView(APIView):
    def get(self, request):
        origin_books = Book.objects.all()
        serialized_books = BookSerializer(origin_books, many=True)

        return Response(serialized_books.data)

如此简单,我们就已经,通过序列化组件定义了一个符合标准的接口,定义好model和url后,使用序列化组件的步骤如下:

  • 导入序列化组件:from rest_framework import serializers
  • 定义序列化类,继承serializers.Serializer(建议单独创建一个专用的模块用来存放所有的序列化类):class BookSerializer(serializers.Serializer):pass
  • 定义需要返回的字段(字段类型可以与model中的类型不一致,参数也可以调整),字段名称必须与model中的一致
  • 在GET接口逻辑中,获取QuerySet
  • 开始序列化:将QuerySet作业第一个参数传给序列化类,many默认为False,如果返回的数据是一个列表嵌套字典的多个对象集合,需要改为many=True
  • 返回:将序列化对象的data属性返回即可

上面的接口逻辑中,我们使用了Response对象,它是DRF重新封装的响应对象。该对象在返回响应数据时会判断客户端类型(浏览器或POSTMAN),如果是浏览器,它会以web页面的形式返回,如果是POSTMAN这类工具,就直接返回Json类型的数据。

此外,序列化类中的字段名也可以与model中的不一致,但是需要使用source参数来告诉组件原始的字段名,如下:

class BookSerializer(serializers.Serializer):
    BookTitle = serializers.CharField(max_length=128, source="title")
    publishDate = serializers.DateTimeField()
    price = serializers.DecimalField(max_digits=5, decimal_places=2)
    # source也可以用于ForeignKey字段
    publish = serializers.CharField(max_length=32, source="publish.name")
    authors = serializers.CharField(max_length=32)

下面是通过POSTMAN请求该接口后的返回数据,大家可以看到,除ManyToManyField字段不是我们想要的外,其他的都没有任何问题:

[
    {
        "title": "Python入门",
        "publishDate": null,
        "price": "119.00",
        "publish": "浙江大学出版社",
        "authors": "serializers.Author.None"
    },
    {
        "title": "Python进阶",
        "publishDate": null,
        "price": "128.00",
        "publish": "清华大学出版社",
        "authors": "serializers.Author.None"
    }
]

那么,多对多字段如何处理呢?如果将source参数定义为”authors.all”,那么取出来的结果将是一个QuerySet,对于前端来说,这样的数据并不是特别友好,我们可以使用如下方式:

class BookSerializer(serializers.Serializer):
    title = serializers.CharField(max_length=32)
    price = serializers.DecimalField(max_digits=5, decimal_places=2)
    publishDate = serializers.DateField()
    publish = serializers.CharField()
    publish_name = serializers.CharField(max_length=32, read_only=True, source='publish.name')
    publish_email = serializers.CharField(max_length=32, read_only=True, source='publish.email')
    # authors = serializers.CharField(max_length=32, source='authors.all')
    authors_list = serializers.SerializerMethodField()

    def get_authors_list(self, authors_obj):
        authors = list()
        for author in authors_obj.authors.all():
            authors.append(author.name)

        return authors

请注意,get_必须与字段名称一致,否则会报错。

通过序列化组件进行POST接口设计

接下来,我们设计POST接口,根据接口规范,我们不需要新增url,只需要在视图类中定义一个POST方法即可,序列化类不需要修改,如下:

# -*- coding: utf-8 -*-
from rest_framework.views import APIView
from rest_framework.response import Response

# 当前app中的模块
from .models import Book
from .app_serializer import BookSerializer

# Create your views here.


class BookView(APIView):
    def get(self, request):
        origin_books = Book.objects.all()
        serialized_books = BookSerializer(origin_books, many=True)

        return Response(serialized_books.data)

    def post(self, request):
        verified_data = BookSerializer(data=request.data)

        if verified_data.is_valid():
            book = verified_data.save()
            # 可写字段通过序列化添加成功之后需要手动添加只读字段
            authors = Author.objects.filter(nid__in=request.data['authors'])
            book.authors.add(*authors)

            return Response(verified_data.data)
        else:
            return Response(verified_data.errors)

POST接口的实现方式,如下:

  • url定义:需要为post新增url,因为根据规范,url定位资源,http请求方式定义用户行为
  • 定义post方法:在视图类中定义post方法
  • 开始序列化:通过我们上面定义的序列化类,创建一个序列化对象,传入参数data=request.data(application/json)数据
  • 校验数据:通过实例对象的is_valid()方法,对请求数据的合法性进行校验
  • 保存数据:调用save()方法,将数据插入数据库
  • 插入数据到多对多关系表:如果有多对多字段,手动插入数据到多对多关系表
  • 返回:将插入的对象返回

请注意,因为多对多关系字段是我们自定义的,而且必须这样定义,返回的数据才有意义,而用户插入数据的时候,serializers.Serializer没有实现create,我们必须手动插入数据,就像这样:

# 第二步, 创建一个序列化类,字段类型不一定要跟models的字段一致
class BookSerializer(serializers.Serializer):
    # nid = serializers.CharField(max_length=32)
    title = serializers.CharField(max_length=128)
    price = serializers.DecimalField(max_digits=5, decimal_places=2)
    publish = serializers.CharField()
    # 外键字段, 显示__str__方法的返回值
    publish_name = serializers.CharField(max_length=32, read_only=True, source='publish.name')
    publish_city = serializers.CharField(max_length=32, read_only=True, source='publish.city')
    # authors = serializers.CharField(max_length=32) # book_obj.authors.all()

    # 多对多字段需要自己手动获取数据,SerializerMethodField()
    authors_list = serializers.SerializerMethodField()

    def get_authors_list(self, book_obj):
        author_list = list()

        for author in book_obj.authors.all():
            author_list.append(author.name)

        return author_list

    def create(self, validated_data):
        # {'title': 'Python666', 'price': Decimal('66.00'), 'publish': '2'}
        validated_data['publish_id'] = validated_data.pop('publish')
        book = Book.objects.create(**validated_data)

        return book

    def update(self, instance, validated_data):
        # 更新数据会调用该方法
        instance.title = validated_data.get('title', instance.title)
        instance.publishDate = validated_data.get('publishDate', instance.publishDate)
        instance.price = validated_data.get('price', instance.price)
        instance.publish_id = validated_data.get('publish', instance.publish.nid)

        instance.save()

        return instance

这样就会非常复杂化程序,如果我希望序列化类自动插入数据呢?

这是问题一:如何让序列化类自动插入数据?

另外问题二:如果字段很多,那么显然,写序列化类也会变成一种负担,有没有更加简单的方式呢?

答案是肯定的,我们可以这样做:

class BookSerializer(serializers.ModelSerializer):
    class Meta:
        model = Book

        fields = ('title',
                  'price',
                  'publish',
                  'authors',
                  'author_list',
                  'publish_name',
                  'publish_city'
                  )
        extra_kwargs = {
            'publish': {'write_only': True},
            'authors': {'write_only': True}
        }

    publish_name = serializers.CharField(max_length=32, read_only=True, source='publish.name')
    publish_city = serializers.CharField(max_length=32, read_only=True, source='publish.city')

    author_list = serializers.SerializerMethodField()

    def get_author_list(self, book_obj):
        # 拿到queryset开始循环 [{}, {}, {}, {}]
        authors = list()

        for author in book_obj.authors.all():
            authors.append(author.name)

        return authors

步骤如下:

  • 继承ModelSerializer:不再继承Serializer
  • 添加extra_kwargs类变量:extra_kwargs = {‘publish’: {‘write_only’: True}}

使用ModelSerializer完美的解决了上面两个问题。好了,这就是今天的全部内容。

参考资料:

https://pizzali.github.io/2018/12/07/DRF%E4%B9%8BREST%E8%A7%84%E8%8C%83%E4%BB%8B%E7%BB%8D%E5%8F%8AView%E8%AF%B7%E6%B1%82%E6%B5%81%E7%A8%8B%E5%88%86%E6%9E%90/

https://pizzali.github.io/2018/12/07/DRF%E4%B9%8B%E8%A7%A3%E6%9E%90%E5%99%A8%E7%BB%84%E4%BB%B6%E5%8F%8A%E5%BA%8F%E5%88%97%E5%8C%96%E7%BB%84%E4%BB%B6/

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化

Django Celery 异步与定时任务实战教程

Django与Celery是基于Python进行Web后端开发的核心搭配,在运营开发(即面向企业内部)的场景中非常常见。

下面是基于Django的Celery异步任务和定时任务的实战教程,大家觉得有用的话点个赞/在看吧!

1.配置Django Celery

配置celery主要有几点:

1. 在settings.py的同级目录下,创建celery.py文件(名字自己随意取),这个文件主要是用来生成celery的实例app.

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'NBAsite.settings')

app = Celery('NBAsite',broker='redis://localhost:6379/0',backend='redis://localhost')
app.config_from_object('django.conf:settings',namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

我们将 celery 实例的 broker 和 backend 都设为了redis.

其中 broker 的意思是“经纪人”,像股票经纪人一样,是用于促成“交易”的,Celery中它的职责就是给 worker 推送任务。

而backend的职责是存放执行信息和结果,这些数据需要被持久化存于数据库。但为了简化问题,我们将其与broker一样放置于redis当中。

2. 需要你在自己已经创建的app(不是celery的app,而是django项目的app)目录下面,创建task.py文件(这个文件名只能是这个)

因为Celery会统一从每个app下面的tasks里面监听任务。

3. 编写tasks.py的任务

看一下tasks内部的任务如何写:

from __future__ import absolute_import, unicode_literals
from NBAsite.celery import app
from celery import shared_task
import time

@shared_task
def waste_time():
    time.sleep(3)
    return "Run function 'waste_time' finished."

任务的目标是延迟3秒后,返回一个语句。

4. init.py中的设置

这个是非常关键的一点,如何让django在启动的时候,也把celery给启动了呢?
答案是在项目的init文件内,导入celery的app

from __future__ import absolute_import, unicode_literals
import pymysql

from .celery import app as celery_app

pymysql.install_as_MySQLdb()
__all__ = ('celery_app',)

2.Django 其他配置

为了能够触发该异步任务,我们接下来配置一些常规文件,views和url,首先是views函数:

from .tasks import waste_time

def test_c(request):
    result = waste_time.delay().get()
    return JsonResponse({'status':'successful'})

然后是url:

path('test_c', test_c, name='test_c'),

3.进行测试

首先,运行django项目

python manage.py runserver

这样,django项目和celery的app就被一起启动了,但是这个时候是无法执行这个task的,因为worker没有被启动,我们可以试一下:

访问http://127.0.0.1:8000/stats/test_c 会得到以下报错:

正确的姿势是怎么样的?需要先激活worker,然后再访问API:

celery -A NBAsite worker -l info

从上图下方的log信息里可以看到,在延迟了3秒后,任务启动并返回字符串,而在页面上,也可以看到成功返回。

需要注意的是,如果你修改了tasks的内容,是需要重启celery才能生效的,最简单的方法就是重启django项目。

这样,我们就完成了简单的异步任务的配置和使用。

4.定时任务配置

在异步任务中,我们只用到了worker,而在定时任务中,还要用到celery的beat调度器。

首先来看下如何配置定时任务,或者说如何配置这个调度器。

还是在celery.py里面进行配置:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'NBAsite.settings')

app = Celery('NBAsite',broker = 'redis://localhost:6379/0',backend='redis://localhost')

app.config_from_object('django.conf:settings',namespace='CELERY')

app.conf.beat_schedule ={
        'autosc':{                           
            'task':'stats.tasks.auto_sc',    
            'schedule':crontab(hour=20,minute=47),   
        },
}
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

重点是增加了app.conf.beat_schedule这个定时任务配置,指定了 stats 文件夹下 tasks.py 中的auto_sc函数,定时于20:47分执行。

5.具体任务页面tasks

增加一个对应要做定时任务的task

@shared_task
def auto_sc():
    print ('sc test?')
    return 'halo'

6.运行命令和结果

命令的话可以将激活worker和激活beat合并在一起,如下:

celery -A NBAsite worker -B -l info

不过,windows不被允许这么使用,因此在windows环境下,你需要同时打开worker和beater:

celery -A NBAsite worker -l info
celery -A NBAsite beat -l info

看上图下方的log可知定时任务被成功执行。

参考资料:
https://www.jianshu.com/p/173070bcdfaf
https://www.jianshu.com/p/ee32074a10de

我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。

有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。

原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!


​Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典

Pandas 性能优化

有趣好用的Python教程