虽然深度学习日益盛行,但目前spark还不支持深度学习算法。虽然也有相关库sparktorch能够将spark和pytorch结合起来,但是使用发现并非那么好用,而且此库目前活跃度较低,不方便debug。因此,本地训练深度学习模型并部署到spark中是一种有效的利用深度学习进行大规模预测的方法。
将pytorch模型嵌入部署到spark中进行大规模预测主要包括三步:
1.利用spark进行特征工程预处理,以保证训练集和测试集特征处理一致;
第一二步都比较简单,这里省去。主要对第三步进行说明。
模型分发(broadcast)分两种情况,第一种是简单可通过nn.Sequential
定义的模型。对于这种情况可以,模型可以直接用。如下:
# 生成测试数据 from sklearn.datasets import make_classification X, y = make_classification(n_samples=50000, n_features=100, random_state=0) df = pd.DataFrame(X) df['label'] = np.random.randint(2,size=(50000)) df1 = spark.createDataFrame(df) df1 = df1.withColumn('features', Func.array([col(f"{i}") for i in range(0, 100)])).repartition(1000) # 创建模型并进行预测 %spark2_1.pyspark import torch.nn as nn network = nn.Sequential( nn.Linear(100, 2560), nn.ReLU(), nn.Linear(2560, 2560), nn.ReLU(), nn.Linear(2560, 2) #nn.Softmax(dim=1) ) class network(nn.Module): def __init__(self): super(network, self).__init__() self.l1 = nn.Linear(100, 2560) self.l2 = nn.Linear(2560, 2560) self.l3 = nn.Linear(2560, 2) def forward(self, x): x = self.l1(x) x = self.l2(x) x = self.l3(x) return x net = network() bc_model_state = spark.sparkContext.broadcast(net.state_dict()) def get_model_for_eval(): # Broadcast the model state_dict net.load_state_dict(bc_model_state.value) net.eval() return net def one_row_predict(x): model = get_model_for_eval() t = torch.tensor(x, dtype=torch.float32) t = model(t).cpu().detach().numpy() #prediction = model(t).cpu().detach().item() # return prediction return list([float(i) for i in t]) one_row_udf = udf(one_row_predict, ArrayType(FloatType())) df1 = df1.withColumn('pred_one_row', one_row_udf(col('features')))
在上面我们定义了一个简单模型,然后将其直接分发进行预测(这里省去了模型训练过程)。
但是当我们想使用一个比较复杂的模型来进行预测时(简单来讲就是不能使用 nn.Sequential
改写),使用上面的方法则会报错。
这时候需要将模型写入一个文件中,假设模型文件的路径为/export/models/item2vec.py
, 使用pyspark中的addFile对其进行分发,然后import导入模型。
假设我们的模型文件/export/models/item2vec.py
如下:
class Item2vec(nn.Module): def __init__(self, cv_dict, csr_cols): super(Item2vec, self).__init__() pass def forward(self, x): pass def predict(self, x): pass
假设模型已经训练好,现在要使用训练好的模型进行大规模预测:
from pyspark import SparkFiles sc.addFile('/export/models/item2vec.py') import sys sys.path.append('/export/models/') from item2vec import Item2vec # model 表示训练好的模型 bc_model_state = sc.broadcast(model.state_dict()) net = Item2vec(cv_dict, csr_cols) def get_model_for_eval_demo(): # Broadcast the model state_dict net.load_state_dict(bc_model_state.value) net.eval() return net
上面的操作已经将模型分发(broadcast)出去,接下来就可以进行预测了。
预测这里介绍两种方式:一种是使用 udf + withColumn
, 另一种则是使用 rdd + mapPartitions
。
由于这里使用的是 pyspark 2.1,还没有pandas udf,因此使用 udf + withColumn
时只能一行一行的预测,运行速度上来说是比不上 rdd + mapPartitions
。
对于pyspark 2.3以后的版本多了pandas udf后则可以使用batch predict了,具体可以参考
https://docs.databricks.com/static/notebooks/deep-learning/pytorch-images.html
udf + withColumn 的方式
# udf + withColumn 的方式 def one_row_predict_demo(x) x = torch.tensor(x, dtype=torch.float) _, prob = bc_model.predict(x) return round(float(prob[0]), 4) one_row_predict_demo_udf = udf(one_row_predict_demo, DoubleType()) one_row_predict_demo_udf = udf(one_row_predict_demo, DoubleType()) df = demo.withColumn('demo_prob', one_row_predict_demo_udf('features'))
rdd + map 方式
def one_row_predict_map(rdds): bc_model = get_model_for_eval_demo() for row in rdds: x = torch.tensor(row.x, dtype=torch.float) _, prob = bc_model.predict(x) yield (row['id'], round(float(prob[0]), 4)) df = demo.rdd.mapPartitions(one_row_predict_map).toDF(['id', 'pred_prob'])
2. 效率优化(1)——mapPartition
上面的方法已经可以使得我们将训练好的深度学习模型部署到spark进行大规模预测了,但是其速度是非常慢的。通过在 mapPartitions
中进行一些处理,我们可以对预测进行加速:
# 代码源自 https://github.com/SaeedNajafi/infer-pytorch-pyspark def basic_row_handler(row): return row def predict_map(index, partition, ml_task, batch_size=16, row_preprocessor=basic_row_handler, row_postprocessor=basic_row_handler): # local model loading within each executor model = LocalPredictor(ml_task=ml_task, batch_size=batch_size, partition_index=index) batch = [] count = 0 for row in partition: row_dict = row.asDict() # apply preprocessor on each row. row_dict_prep = row_preprocessor(row_dict) batch.append(row_dict_prep) count += 1 if count == batch_size: # predict the ml and apply the postprocessor. for ret_row in model.predict(batch): # ml prediction ret_row_post = row_postprocessor(ret_row) if ret_row_post is not None: yield Row(**ret_row_post) batch = [] count = 0 # Flush remaining rows in the batches. if count != 0: for ret_row in model.predict(batch): # ml prediction ret_row_post = row_postprocessor(ret_row) if ret_row_post is not None: yield Row(**ret_row_post) batch = [] count = 0
上面的代码可以看作是在mapPartitions中进行了“延迟”预测——即先将一个partition中的多行数据进行处理然后合并为一个batch进行一起预测,这样能大大的提升运行效率。一个比较极端的情况是每个partition仅进行一次预测。
3. 效率优化(2)——pandas_udf
pandas_udf在udf的基础上进行了进一步的优化,利用pandas_udf程序运行效率更高。在这里我们可以借助于pandas_udf
提升我们程序的运行效率:
# Enable Arrow support. spark.conf.set("spark.sql.execution.arrow.enabled", "true") spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "64") sc.addFile('get_model.py') from get_model import get_model model_path = '/path/to/model.pt' data_path = '/path/to/data' # model 表示训练好的模型 model = torch.load(model_path) bc_model_state = sc.broadcast(model.state_dict()) def get_model_for_eval(): # Broadcast the model state_dict model = get_model() model.load_state_dict(bc_model_state.value) model.eval() return model # model = torch.load(model_path) # model = sc.broadcast(model) @pandas_udf(FloatType()) def predict_batch_udf(arr: pd.Series) -> pd.Series: model = get_model_for_eval() # model.to(device) arr = np.vstack(arr.map(lambda x: eval(x)).values) arr = torch.tensor(arr).long() with torch.no_grad(): predictions = list(model(arr).cpu().numpy()) return pd.Series(predictions) # 预测 data = data.withColumn('predictions', predict_batch_udf('features'))
作者:井底蛙蛙呱呱呱
链接:https://www.jianshu.com/p/fc60c967c8b8
我们的文章到此就结束啦,如果你喜欢今天的 Python 教程,请持续关注Python实用宝典。
有任何问题,可以在公众号后台回复:加群,回答相应验证信息,进入互助群询问。
原创不易,希望你能在下面点个赞和在看支持我继续创作,谢谢!
Python实用宝典 ( pythondict.com )
不只是一个宝典
欢迎关注公众号:Python实用宝典