# Python 实用宝典
# https://pythondict.com
def handle_argv(self, prog_name, argv, command=None):
"""Parse command-line arguments from ``argv`` and dispatch
to :meth:`run`.
:param prog_name: The program name (``argv[0]``).
:param argv: Command arguments.
Exits with an error message if :attr:`supports_args` is disabled
and ``argv`` contains positional arguments.
"""
options, args = self.prepare_args(
*self.parse_options(prog_name, argv, command))
return self(*args, **options)
Command 类的 __call__函数:
# Python 实用宝典
# https://pythondict.com
def __call__(self, *args, **kwargs):
random.seed() # maybe we were forked.
self.verify_args(args)
try:
ret = self.run(*args, **kwargs)
return ret if ret is not None else EX_OK
except self.UsageError as exc:
self.on_usage_error(exc)
return exc.status
except self.Error as exc:
self.on_error(exc)
return exc.status
# Python 实用宝典
# https://pythondict.com
def start(self, embedded_process=False, drift=-0.010):
info('beat: Starting...')
# 打印最大间隔时间
debug('beat: Ticking with max interval->%s',
humanize_seconds(self.scheduler.max_interval))
# 通知注册该signal的函数
signals.beat_init.send(sender=self)
if embedded_process:
signals.beat_embedded_init.send(sender=self)
platforms.set_process_title('celery beat')
try:
while not self._is_shutdown.is_set():
# 调用scheduler.tick()函数检查还剩多余时间
interval = self.scheduler.tick()
interval = interval + drift if interval else interval
# 如果大于0
if interval and interval > 0:
debug('beat: Waking up %s.',
humanize_seconds(interval, prefix='in '))
# 休眠
time.sleep(interval)
if self.scheduler.should_sync():
self.scheduler._do_sync()
except (KeyboardInterrupt, SystemExit):
self._is_shutdown.set()
finally:
self.sync()
这里重点看 self.scheduler.tick() 方法:
# Python 实用宝典
# https://pythondict.com
def tick(self):
"""Run a tick, that is one iteration of the scheduler.
Executes all due tasks.
"""
remaining_times = []
try:
# 遍历每个周期任务设定
for entry in values(self.schedule):
# 下次运行时间
next_time_to_run = self.maybe_due(entry, self.publisher)
if next_time_to_run:
remaining_times.append(next_time_to_run)
except RuntimeError:
pass
return min(remaining_times + [self.max_interval])
%%cython import numpy as np cimport numpy as cnp ctypedef cnp.int_t DTYPE_t
cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr): cdef: int i = 0 int n = arr.shape[0] int x cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)
while i < n: x = arr[i] if x % 2: new_arr[i] = x + 1 else: new_arr[i] = x - 1 i += 1 return new_arr
%%cython import cython import numpy as np cimport numpy as cnp ctypedef cnp.int_t DTYPE_t
@cython.boundscheck(False) @cython.wraparound(False) cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr): cdef: int i = 0 int n = arr.shape[0] int x cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)
while i < n: x = arr[i] if x % 2: new_arr[i] = x + 1 else: new_arr[i] = x - 1 i += 1 return new_arr
%load_ext Cython
%%cython
def f_plain(x):
return x * (x - 1)
def integrate_f_plain(a, b, N):
s = 0
dx = (b - a) / N
for i in range(N):
s += f_plain(a + i * dx)
return s * dx
6.46 s ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
看来直接加头,效率提升不大。
方法2,使用c type
%%cython
cdef double f_typed(double x) except? -2:
return x * (x - 1)
cpdef double integrate_f_typed(double a, double b, int N):
cdef int i
cdef double s, dx
s = 0
dx = (b - a) / N
for i in range(N):
s += f_typed(a + i * dx)
return s * dx
345 ms ± 529 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
import numba
@numba.jitdef f_plain(x):
return x * (x - 1)
@numba.jitdef integrate_f_numba(a, b, N):
s = 0
dx = (b - a) / N
for i in range(N):
s += f_plain(a + i * dx)
return s * dx
@numba.jitdef apply_integrate_f_numba(col_a, col_b, col_N):
n = len(col_N)
result = np.empty(n, dtype='float64')
assert len(col_a) == len(col_b) == n
for i in range(n):
result[i] = integrate_f_numba(col_a[i], col_b[i], col_N[i])
return result
def compute_numba(df):
result = apply_integrate_f_numba(df['a'].to_numpy(),
df['b'].to_numpy(),
df['N'].to_numpy())
return pd.Series(result, index=df.index, name='result')
%timeit compute_numba(df)
6.44 ms ± 440 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
我们看到,使用numba,需要做的代码改动较小,效率提升幅度却很大!
3.4 进阶 并行化处理
并行化读取数据
在基础篇讲分块读取时,简单提了一下并行化处理,这里详细说下代码。
第一种思路,分块读取,多进程处理。
import pandas as pd
from multiprocessing import Pool
def process(df):
"""
数据处理
"""
pass
# initialise the iterator object
iterator = pd.read_csv('train.csv', chunksize=200000, compression='gzip',
skipinitialspace=True, encoding='utf-8')
# depends on how many cores you want to utilise
max_processors = 4# Reserve 4 cores for our script
pool = Pool(processes=max_processors)
f_list = []
for df in iterator:
# 异步处理每个分块
f = pool.apply_async(process, [df])
f_list.append(f)
if len(f_list) >= max_processors:
for f in f_list:
f.get()
del f_list[:]
from multiprocessing import Pool
import pandas as pd
import os
def read_func(file_path):
df = pd.read_csv(file_path, header=None)
return df
def read_file():
file_list=["train_split%02d"%i for i in range(66)]
p = Pool(4)
res = p.map(read_func, file_list)
p.close()
p.join()
df = pd.concat(res, axis=0, ignore_index=True)
return df
df = read_file()
# HELP mysql_slave_status_slave_sql_running Generic metric from SHOW SLAVE STATUS. # TYPE mysql_slave_status_slave_sql_running untyped mysql_slave_status_slave_sql_running{channel_name="",connection_name="",master_host="172.16.1.1",master_uuid=""} 1
2、主从复制落后时间:
在使用show slave status 里面还有一个关键的参数Seconds_Behind_Master。Seconds_Behind_Master表示slave上SQL thread与IO thread之间的延迟,我们都知道在MySQL的复制环境中,slave先从master上将binlog拉取到本地(通过IO thread),然后通过SQL thread将binlog重放,而Seconds_Behind_Master表示本地relaylog中未被执行完的那部分的差值。所以如果slave拉取到本地的relaylog(实际上就是binlog,只是在slave上习惯称呼relaylog而已)都执行完,此时通过show slave status看到的会是0
# HELP mysql_slave_status_seconds_behind_master Generic metric from SHOW SLAVE STATUS. # TYPE mysql_slave_status_seconds_behind_master untyped mysql_slave_status_seconds_behind_master{channel_name="",connection_name="",master_host="172.16.1.1",master_uuid=""} 0
MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Questions"; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | Questions | 15071 | +---------------+-------+
# HELP mysql_global_status_questions Generic metric from SHOW GLOBAL STATUS. # TYPE mysql_global_status_questions untyped mysql_global_status_questions13253
MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Com_insert"; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | Com_insert | 10578 | +---------------+-------+
# HELP mysql_global_status_commands_total Total number of executed MySQL commands. # TYPE mysql_global_status_commands_total counter mysql_global_status_commands_total{command="create_trigger"} 0 mysql_global_status_commands_total{command="create_udf"} 0 mysql_global_status_commands_total{command="create_user"} 1 mysql_global_status_commands_total{command="create_view"} 0 mysql_global_status_commands_total{command="dealloc_sql"} 0 mysql_global_status_commands_total{command="delete"} 3369 mysql_global_status_commands_total{command="delete_multi"} 0
MariaDB [(none)]> SHOW VARIABLES LIKE 'long_query_time'; +-----------------+-----------+ | Variable_name | Value | +-----------------+-----------+ | long_query_time | 10.000000 | +-----------------+-----------+ 1 row in set (0.00 sec)
当然我们也可以修改时间
MariaDB [(none)]> SET GLOBAL long_query_time = 5; Query OK, 0 rows affected (0.00 sec)
然后我们而已通过sql语言查询MySQL实例中Slow_queries的数量:
MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Slow_queries"; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | Slow_queries | 0 | +---------------+-------+ 1 row in set (0.00 sec)
# HELP mysql_global_status_slow_queries Generic metric from SHOW GLOBAL STATUS. # TYPE mysql_global_status_slow_queries untyped mysql_global_status_slow_queries0
MariaDB [(none)]> SHOW VARIABLES LIKE 'max_connections'; +-----------------+-------+ | Variable_name | Value | +-----------------+-------+ | max_connections | 151 | +-----------------+-------+
当然我们可以修改配置文件的形式来增加这个数值。与之对应的就是当前连接数量,当我们当前连接出来超过系统设置的最大值之后常会出现我们看到的Too many connections(连接数过多),下面我查找一下当前连接数:
MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Threads_connected"; +-------------------+-------+ | Variable_name | Value | +-------------------+-------+ | Threads_connected | 41 | +-------------------+-------
MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Threads_running"; +-----------------+-------+ | Variable_name | Value | +-----------------+-------+ | Threads_running | 10 | +-----------------+-------+
# HELP mysql_global_variables_max_connections Generic gauge metric from SHOW GLOBAL VARIABLES. # TYPE mysql_global_variables_max_connections gauge mysql_global_variables_max_connections151
表示最大连接数
# HELP mysql_global_status_threads_connected Generic metric from SHOW GLOBAL STATUS. # TYPE mysql_global_status_threads_connected untyped mysql_global_status_threads_connected41
表示当前的连接数
# HELP mysql_global_status_threads_running Generic metric from SHOW GLOBAL STATUS. # TYPE mysql_global_status_threads_running untyped mysql_global_status_threads_running1
表示当前活跃的连接数
# HELP mysql_global_status_aborted_connects Generic metric from SHOW GLOBAL STATUS. # TYPE mysql_global_status_aborted_connects untyped mysql_global_status_aborted_connects31
累计所有的连接数
# HELP mysql_global_status_connection_errors_total Total number of MySQL connection errors. # TYPE mysql_global_status_connection_errors_total counter mysql_global_status_connection_errors_total{error="internal"} 0 #服务器内部引起的错误、如内存硬盘等 mysql_global_status_connection_errors_total{error="max_connections"} 0 #超出连接处引起的错误
MariaDB [(none)]> show global variables like 'innodb_buffer_pool_size'; +-------------------------+-----------+ | Variable_name | Value | +-------------------------+-----------+ | innodb_buffer_pool_size | 134217728 | +-------------------------+-----------+
# HELP mysql_global_variables_innodb_buffer_pool_size Generic gauge metric from SHOW GLOBAL VARIABLES. # TYPE mysql_global_variables_innodb_buffer_pool_size gauge mysql_global_variables_innodb_buffer_pool_size 1.34217728e+08
MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Innodb_buffer_pool_read_requests"; +----------------------------------+-------------+ | Variable_name | Value | +----------------------------------+-------------+ | Innodb_buffer_pool_read_requests | 38465 | +----------------------------------+-------------+
# HELP mysql_global_status_innodb_buffer_pool_read_requests Generic metric from SHOW GLOBAL STATUS. # TYPE mysql_global_status_innodb_buffer_pool_read_requests untyped mysql_global_status_innodb_buffer_pool_read_requests2.7711547168e+10
MariaDB [(none)]> SHOW GLOBAL STATUS LIKE "Innodb_buffer_pool_reads"; +--------------------------+-------+ | Variable_name | Value | +--------------------------+-------+ | Innodb_buffer_pool_reads | 138 | +--------------------------+-------+ 1 row in set (0.00 sec)
# HELP mysql_global_status_innodb_buffer_pool_reads Generic metric from SHOW GLOBAL STATUS. # TYPE mysql_global_status_innodb_buffer_pool_reads untyped mysql_global_status_innodb_buffer_pool_reads138
groups: - name: MySQL-rules rules: - alert: MySQL Status expr: up == 0 for: 5s labels: severity: warning annotations: summary: "{{$labels.instance}}: MySQL has stop !!!" description: "检测MySQL数据库运行状态"
- alert: MySQL Slave IO Thread Status expr: mysql_slave_status_slave_io_running == 0 for: 5s labels: severity: warning annotations: summary: "{{$labels.instance}}: MySQL Slave IO Thread has stop !!!" description: "检测MySQL主从IO线程运行状态"
- alert: MySQL Slave SQL Thread Status expr: mysql_slave_status_slave_sql_running == 0 for: 5s labels: severity: warning annotations: summary: "{{$labels.instance}}: MySQL Slave SQL Thread has stop !!!" description: "检测MySQL主从SQL线程运行状态"
- alert: MySQL Slave Delay Status expr: mysql_slave_status_sql_delay == 30 for: 5s labels: severity: warning annotations: summary: "{{$labels.instance}}: MySQL Slave Delay has more than 30s !!!" description: "检测MySQL主从延时状态"
- alert: Mysql_Too_Many_slow_queries expr: rate(mysql_global_status_slow_queries[5m]) > 3 for: 2m labels: severity: warning annotations: summary: "{{$labels.instance}}: 慢查询有点多,请检查处理" description: "{{$labels.instance}}: Mysql slow_queries is more than 3 per second ,(current value is: {{ $value }})"
你可以在代码中检查 Python 的版本,以确保你的用户没有在不兼容的版本中运行脚本。检查方式如下:ifnot sys.version_info > (2, 7): # berate your user for running a 10 year # python version elifnot sys.version_info >= (3, 5): # Kindly tell your user (s)he needs to upgrade # because you’re using 3.5 features
完整的命令列表,请点击此处查看(https://ipython.readthedocs.io/en/stable/interactive/magics.html)。还有一个非常实用的功能:引用上一个命令的输出。In 和 Out 是实际的对象。你可以通过 Out[3] 的形式使用第三个命令的输出。IPython 的安装命令如下:pip3 install ipython
4.Python 编程技巧 – 列表推导式
你可以利用列表推导式,避免使用循环填充列表时的繁琐。列表推导式的基本语法如下:[ expression for item in list if conditional ]举一个基本的例子:用一组有序数字填充一个列表:mylist = [i for i in range(10)] print(mylist) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]由于可以使用表达式,所以你也可以做一些算术运算:squares = [x**2 for x in range(10)] print(squares) # [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]甚至可以调用外部函数:defsome_function(a): return (a + 5) / 2
my_formula = [some_function(i) for i in range(10)] print(my_formula) # [2, 3, 3, 4, 4, 5, 5, 6, 6, 7]
最后,你还可以使用 ‘if’ 来过滤列表。在如下示例中,我们只保留能被2整除的数字:filtered = [i for i in range(20) if i%2==0] print(filtered) # [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
有些人非常喜欢表情符,而有些人则深恶痛绝。我在此郑重声明:在分析社交媒体数据时,表情符可以派上大用场。首先,我们来安装表情符模块:pip3 install emoji安装完成后,你可以按照如下方式使用:import emoji result = emoji.emojize(‘Python is :thumbs_up:’) print(result) # ‘Python is 👍’
# You can also reverse this: result = emoji.demojize(‘Python is 👍’) print(result) # ‘Python is :thumbs_up:’
# Convert a string representation of # a number into a list of ints. list_of_ints = list(map(int, “1234567”))) print(list_of_ints) # [1, 2, 3, 4, 5, 6, 7]
# And since a string can be treated like a # list of letters, you can also get the # unique letters from a string this way: print (set(“aaabbbcccdddeeefff”)) # {‘a’, ‘b’, ‘c’, ‘d’, ‘e’, ‘f’}
虽然你可以用三重引号将代码中的多行字符串括起来,但是这种做法并不理想。所有放在三重引号之间的内容都会成为字符串,包括代码的格式,如下所示。我更喜欢另一种方法,这种方法不仅可以将多行字符串连接在一起,而且还可以保证代码的整洁。唯一的缺点是你需要明确指定换行符。s1 = “””Multi line strings can be put between triple quotes. It’s not ideal when formatting your code though”””
print (s1) # Multi line strings can be put # between triple quotes. It’s not ideal # when formatting your code though
s2 = (“You can also concatenate multiple\n” + “strings this way, but you’ll have to\n” “explicitly put in the newlines”)
print(s2) # You can also concatenate multiple # strings this way, but you’ll have to # explicitly put in the newlines
24. Python 编程技巧 – 条件赋值中的三元运算符
这种方法可以让代码更简洁,同时又可以保证代码的可读性:[on_true] if [expression] else [on_false]示例如下:x = “Success!” if (y == 2) else“Failed!”
print(Fore.RED + ‘some red text’) print(Back.GREEN + ‘and with a green background’) print(Style.DIM + ‘and in dim text’) print(Style.RESET_ALL) print(‘back to normal now’)
row = {} for line in re.split("[\n ]*\n[\n ]*", movie_info_txt): line = line.strip() arr = line.split(": ", maxsplit=1) if len(arr) != 2: continue k, v = arr row[k] = v row
import requests from lxml import etree import pandas as pd import re from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED
deffetch_content(url): print(url) headers = { "Accept-Encoding": "Gzip", # 使用gzip压缩传输数据让访问更快 "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36" } r = requests.get(url, headers=headers) return r.text
url = "https://movie.douban.com/cinema/later/shenzhen/" init_page = fetch_content(url) html = etree.HTML(init_page) all_movies = html.xpath("//div[@id='showing-soon']/div") result = [] for e in all_movies: # imgurl, = e.xpath(".//img/@src") name, = e.xpath(".//div[@class='intro']/h3/a/text()") url, = e.xpath(".//div[@class='intro']/h3/a/@href") # date, movie_type, pos = e.xpath(".//div[@class='intro']/ul/li[@class='dt']/text()") like_num, = e.xpath( ".//div[@class='intro']/ul/li[@class='dt last']/span/text()") result.append((name, int(like_num[:like_num.find("人")]), url)) main_df = pd.DataFrame(result, columns=["影名", "想看人数", "url"])
max_workers = main_df.shape[0] with ThreadPoolExecutor(max_workers=max_workers) as executor: future_tasks = [executor.submit(fetch_content, url) for url in main_df.url] wait(future_tasks, return_when=ALL_COMPLETED) pages = [future.result() for future in future_tasks]
result = [] for url, html_text in zip(main_df.url, pages): html = etree.HTML(html_text) row = {} for line in re.split("[\n ]*\n[\n ]*", "".join(html.xpath("//div[@id='info']//text()")).strip()): line = line.strip() arr = line.split(": ", maxsplit=1) if len(arr) != 2: continue k, v = arr row[k] = v row["url"] = url result.append(row) detail_df = pd.DataFrame(result) df = main_df.merge(detail_df, on="url") df.drop(columns=["url"], inplace=True) df.sort_values("想看人数", ascending=False, inplace=True) df.to_csv("shenzhen_movie2.csv", index=False) df