# 公众号:Python 实用宝典
from annoy import AnnoyIndex
import random
f = 40
t = AnnoyIndex(f, 'angular') # 用于存储f维度向量
for i in range(1000):
v = [random.gauss(0, 1) for z in range(f)]
t.add_item(i, v)
t.build(10) # 10 棵树,查询时,树越多,精度越高。
t.save('test.ann')
from annoy import AnnoyIndex
f = 40
u = AnnoyIndex(f, 'angular')
u.load('test.ann')
print(u.get_nns_by_item(1, 5))
# [1, 607, 672, 780, 625]
其中,u.get_nns_by_item(i, n, search_k=-1, include_distances=False)返回第 i 个item的n个最近邻的item。在查询期间,它将检索多达search_k(默认n_trees * n)个点。如果设置include_distances为True,它将返回一个包含两个列表的元组:第二个列表中包含所有对应的距离。
%%cython import numpy as np cimport numpy as cnp ctypedef cnp.int_t DTYPE_t
cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr): cdef: int i = 0 int n = arr.shape[0] int x cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)
while i < n: x = arr[i] if x % 2: new_arr[i] = x + 1 else: new_arr[i] = x - 1 i += 1 return new_arr
%%cython import cython import numpy as np cimport numpy as cnp ctypedef cnp.int_t DTYPE_t
@cython.boundscheck(False) @cython.wraparound(False) cpdef cnp.ndarray[DTYPE_t] _transform(cnp.ndarray[DTYPE_t] arr): cdef: int i = 0 int n = arr.shape[0] int x cnp.ndarray[DTYPE_t] new_arr = np.empty_like(arr)
while i < n: x = arr[i] if x % 2: new_arr[i] = x + 1 else: new_arr[i] = x - 1 i += 1 return new_arr
%load_ext Cython
%%cython
def f_plain(x):
return x * (x - 1)
def integrate_f_plain(a, b, N):
s = 0
dx = (b - a) / N
for i in range(N):
s += f_plain(a + i * dx)
return s * dx
6.46 s ± 41.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
看来直接加头,效率提升不大。
方法2,使用c type
%%cython
cdef double f_typed(double x) except? -2:
return x * (x - 1)
cpdef double integrate_f_typed(double a, double b, int N):
cdef int i
cdef double s, dx
s = 0
dx = (b - a) / N
for i in range(N):
s += f_typed(a + i * dx)
return s * dx
345 ms ± 529 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
import numba
@numba.jitdef f_plain(x):
return x * (x - 1)
@numba.jitdef integrate_f_numba(a, b, N):
s = 0
dx = (b - a) / N
for i in range(N):
s += f_plain(a + i * dx)
return s * dx
@numba.jitdef apply_integrate_f_numba(col_a, col_b, col_N):
n = len(col_N)
result = np.empty(n, dtype='float64')
assert len(col_a) == len(col_b) == n
for i in range(n):
result[i] = integrate_f_numba(col_a[i], col_b[i], col_N[i])
return result
def compute_numba(df):
result = apply_integrate_f_numba(df['a'].to_numpy(),
df['b'].to_numpy(),
df['N'].to_numpy())
return pd.Series(result, index=df.index, name='result')
%timeit compute_numba(df)
6.44 ms ± 440 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)
我们看到,使用numba,需要做的代码改动较小,效率提升幅度却很大!
3.4 进阶 并行化处理
并行化读取数据
在基础篇讲分块读取时,简单提了一下并行化处理,这里详细说下代码。
第一种思路,分块读取,多进程处理。
import pandas as pd
from multiprocessing import Pool
def process(df):
"""
数据处理
"""
pass
# initialise the iterator object
iterator = pd.read_csv('train.csv', chunksize=200000, compression='gzip',
skipinitialspace=True, encoding='utf-8')
# depends on how many cores you want to utilise
max_processors = 4# Reserve 4 cores for our script
pool = Pool(processes=max_processors)
f_list = []
for df in iterator:
# 异步处理每个分块
f = pool.apply_async(process, [df])
f_list.append(f)
if len(f_list) >= max_processors:
for f in f_list:
f.get()
del f_list[:]
from multiprocessing import Pool
import pandas as pd
import os
def read_func(file_path):
df = pd.read_csv(file_path, header=None)
return df
def read_file():
file_list=["train_split%02d"%i for i in range(66)]
p = Pool(4)
res = p.map(read_func, file_list)
p.close()
p.join()
df = pd.concat(res, axis=0, ignore_index=True)
return df
df = read_file()
# 推荐写法,代码耗时:0.33秒 class DemoClass: def __init__(self, value: int): self.value = value # 避免不必要的属性访问器
def main(): size = 1000000 for i in range(size): demo_instance = DemoClass(size) value = demo_instance.value demo_instance.value = i
main()
4. 避免数据复制
4.1 避免无意义的数据复制
# 不推荐写法,代码耗时:6.5秒 def main(): size = 10000 for _ in range(size): value = range(size) value_list = [x for x in value] square_list = [x * x for x in value_list]
main()
上面的代码中value_list完全没有必要,这会创建不必要的数据结构或复制。
# 推荐写法,代码耗时:4.8秒 def main(): size = 10000 for _ in range(size): value = range(size) square_list = [x * x for x in value] # 避免无意义的复制
def main(): string_list = list(string.ascii_letters * 100) for _ in range(10000): result = concatString(string_list)
main()
5. 利用if条件的短路特性
# 不推荐写法,代码耗时:0.05秒 from typing import List
def concatString(string_list: List[str]) -> str: abbreviations = {'cf.', 'e.g.', 'ex.', 'etc.', 'flg.', 'i.e.', 'Mr.', 'vs.'} abbr_count = 0 result = '' for str_i in string_list: if str_i in abbreviations: result += str_i return result
def main(): for _ in range(10000): string_list = ['Mr.', 'Hat', 'is', 'Chasing', 'the', 'black', 'cat', '.'] result = concatString(string_list)
main()
if 条件的短路特性是指对if a and b这样的语句, 当a为False时将直接返回,不再计算b;对于if a or b这样的语句,当a为True时将直接返回,不再计算b。因此, 为了节约运行时间,对于or语句,应该将值为True可能性比较高的变量写在or前,而and应该推后。
# 推荐写法,代码耗时:0.03秒 from typing import List
def concatString(string_list: List[str]) -> str: abbreviations = {'cf.', 'e.g.', 'ex.', 'etc.', 'flg.', 'i.e.', 'Mr.', 'vs.'} abbr_count = 0 result = '' for str_i in string_list: if str_i[-1] == '.' and str_i in abbreviations: # 利用 if 条件的短路特性 result += str_i return result
def main(): for _ in range(10000): string_list = ['Mr.', 'Hat', 'is', 'Chasing', 'the', 'black', 'cat', '.'] result = concatString(string_list)
main()
6. 循环优化
6.1 用for循环代替while循环
# 不推荐写法。代码耗时:6.7秒 def computeSum(size: int) -> int: sum_ = 0 i = 0 while i < size: sum_ += i i += 1 return sum_
def main(): size = 10000 for _ in range(size): sum_ = computeSum(size)
main()
Python 的for循环比while循环快不少。
# 推荐写法。代码耗时:4.3秒 def computeSum(size: int) -> int: sum_ = 0 for i in range(size): # for 循环代替 while 循环 sum_ += i return sum_
def main(): size = 10000 for _ in range(size): sum_ = computeSum(size)
main()
6.2 使用隐式for循环代替显式for循环
针对上面的例子,更进一步可以用隐式for循环来替代显式for循环
# 推荐写法。代码耗时:1.7秒 def computeSum(size: int) -> int: return sum(range(size)) # 隐式 for 循环代替显式 for 循环
def main(): size = 10000 for _ in range(size): sum = computeSum(size)
main()
6.3 减少内层for循环的计算
# 不推荐写法。代码耗时:12.8秒 import math
def main(): size = 10000 sqrt = math.sqrt for x in range(size): for y in range(size): z = sqrt(x) + sqrt(y)
main()
上面的代码中sqrt(x)位于内侧for循环, 每次训练过程中都会重新计算一次,增加了时间开销。
# 推荐写法。代码耗时:7.0秒 import math
def main(): size = 10000 sqrt = math.sqrt for x in range(size): sqrt_x = sqrt(x) # 减少内层 for 循环的计算 for y in range(size): z = sqrt_x + sqrt(y)