6-7,使用spark-scala调用tensorflow2.0训练好的模型

本篇文章介绍在spark中调用训练好的tensorflow模型进行预测的方法。

本文内容的学习需要一定的spark和scala基础。

如果使用pyspark的话会比较简单,只需要在每个executor上用Python加载模型分别预测就可以了。

但工程上为了性能考虑,通常使用的是scala版本的spark。

本篇文章我们通过TensorFlow for Java 在spark中调用训练好的tensorflow模型。

利用spark的分布式计算能力,从而可以让训练好的tensorflow模型在成百上千的机器上分布式并行执行模型推断。

〇,spark-scala调用tensorflow模型概述

在spark(scala)中调用tensorflow模型进行预测需要完成以下几个步骤。

(1)准备protobuf模型文件

(2)创建spark(scala)项目,在项目中添加java版本的tensorflow对应的jar包依赖

(3)在spark(scala)项目中driver端加载tensorflow模型调试成功

(4)在spark(scala)项目中通过RDD在executor上加载tensorflow模型调试成功

(5) 在spark(scala)项目中通过DataFrame在executor上加载tensorflow模型调试成功

一,准备protobuf模型文件

我们使用tf.keras 训练一个简单的线性回归模型,并保存成protobuf文件。

  1. import tensorflow as tf
  2. from tensorflow.keras import models,layers,optimizers
  3. ## 样本数量
  4. n = 800
  5. ## 生成测试用数据集
  6. X = tf.random.uniform([n,2],minval=-10,maxval=10)
  7. w0 = tf.constant([[2.0],[-1.0]])
  8. b0 = tf.constant(3.0)
  9. Y = X@w0 + b0 + tf.random.normal([n,1],mean = 0.0,stddev= 2.0) # @表示矩阵乘法,增加正态扰动
  10. ## 建立模型
  11. tf.keras.backend.clear_session()
  12. inputs = layers.Input(shape = (2,),name ="inputs") #设置输入名字为inputs
  13. outputs = layers.Dense(1, name = "outputs")(inputs) #设置输出名字为outputs
  14. linear = models.Model(inputs = inputs,outputs = outputs)
  15. linear.summary()
  16. ## 使用fit方法进行训练
  17. linear.compile(optimizer="rmsprop",loss="mse",metrics=["mae"])
  18. linear.fit(X,Y,batch_size = 8,epochs = 100)
  19. tf.print("w = ",linear.layers[1].kernel)
  20. tf.print("b = ",linear.layers[1].bias)
  21. ## 将模型保存成pb格式文件
  22. export_path = "./data/linear_model/"
  23. version = "1" #后续可以通过版本号进行模型版本迭代与管理
  24. linear.save(export_path+version, save_format="tf")
  1. !ls {export_path+version}
  1. # 查看模型文件相关信息
  2. !saved_model_cli show --dir {export_path+str(version)} --all

模型文件信息中这些标红的部分都是后面有可能会用到的。

6-7,使用spark-scala调用tensorflow模型 - 图1

二,创建spark(scala)项目,在项目中添加java版本的tensorflow对应的jar包依赖

如果使用maven管理项目,需要添加如下 jar包依赖

  1. <!-- https://mvnrepository.com/artifact/org.tensorflow/tensorflow -->
  2. <dependency>
  3. <groupId>org.tensorflow</groupId>
  4. <artifactId>tensorflow</artifactId>
  5. <version>1.15.0</version>
  6. </dependency>

也可以从下面网址中直接下载 org.tensorflow.tensorflow的jar包

以及其依赖的org.tensorflow.libtensorflow 和 org.tensorflowlibtensorflow_jni的jar包 放到项目中。

https://mvnrepository.com/artifact/org.tensorflow/tensorflow/1.15.0

三, 在spark(scala)项目中driver端加载tensorflow模型调试成功

我们的示范代码在jupyter notebook中进行演示,需要安装toree以支持spark(scala)。

  1. import scala.collection.mutable.WrappedArray
  2. import org.{tensorflow=>tf}
  3. //注:load函数的第二个参数一般都是“serve”,可以从模型文件相关信息中找到
  4. val bundle = tf.SavedModelBundle
  5. .load("/Users/liangyun/CodeFiles/eat_tensorflow2_in_30_days/data/linear_model/1","serve")
  6. //注:在java版本的tensorflow中还是类似tensorflow1.0中静态计算图的模式,需要建立Session, 指定feed的数据和fetch的结果, 然后 run.
  7. //注:如果有多个数据需要喂入,可以连续使用多个feed方法
  8. //注:输入必须是float类型
  9. val sess = bundle.session()
  10. val x = tf.Tensor.create(Array(Array(1.0f,2.0f),Array(2.0f,3.0f)))
  11. val y = sess.runner().feed("serving_default_inputs:0", x)
  12. .fetch("StatefulPartitionedCall:0").run().get(0)
  13. val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt)
  14. y.copyTo(result)
  15. if(x != null) x.close()
  16. if(y != null) y.close()
  17. if(sess != null) sess.close()
  18. if(bundle != null) bundle.close()
  19. result

输出如下:

  1. Array(Array(3.019596), Array(3.9878292))

6-7,使用spark-scala调用tensorflow模型 - 图2

四,在spark(scala)项目中通过RDD在executor上加载tensorflow模型调试成功

下面我们通过广播机制将Driver端加载的TensorFlow模型传递到各个executor上,并在executor上分布式地调用模型进行推断。

  1. import org.apache.spark.sql.SparkSession
  2. import scala.collection.mutable.WrappedArray
  3. import org.{tensorflow=>tf}
  4. val spark = SparkSession
  5. .builder()
  6. .appName("TfRDD")
  7. .enableHiveSupport()
  8. .getOrCreate()
  9. val sc = spark.sparkContext
  10. //在Driver端加载模型
  11. val bundle = tf.SavedModelBundle
  12. .load("/Users/liangyun/CodeFiles/master_tensorflow2_in_20_hours/data/linear_model/1","serve")
  13. //利用广播将模型发送到executor上
  14. val broads = sc.broadcast(bundle)
  15. //构造数据集
  16. val rdd_data = sc.makeRDD(List(Array(1.0f,2.0f),Array(3.0f,5.0f),Array(6.0f,7.0f),Array(8.0f,3.0f)))
  17. //通过mapPartitions调用模型进行批量推断
  18. val rdd_result = rdd_data.mapPartitions(iter => {
  19. val arr = iter.toArray
  20. val model = broads.value
  21. val sess = model.session()
  22. val x = tf.Tensor.create(arr)
  23. val y = sess.runner().feed("serving_default_inputs:0", x)
  24. .fetch("StatefulPartitionedCall:0").run().get(0)
  25. //将预测结果拷贝到相同shape的Float类型的Array中
  26. val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt)
  27. y.copyTo(result)
  28. result.iterator
  29. })
  30. rdd_result.take(5)
  31. bundle.close

输出如下:

  1. Array(Array(3.019596), Array(3.9264367), Array(7.8607616), Array(15.974984))

6-7,使用spark-scala调用tensorflow模型 - 图3

五, 在spark(scala)项目中通过DataFrame在executor上加载tensorflow模型调试成功

除了可以在Spark的RDD数据上调用tensorflow模型进行分布式推断,

我们也可以在DataFrame数据上调用tensorflow模型进行分布式推断。

主要思路是将推断方法注册成为一个sparkSQL函数。

  1. import org.apache.spark.sql.SparkSession
  2. import scala.collection.mutable.WrappedArray
  3. import org.{tensorflow=>tf}
  4. object TfDataFrame extends Serializable{
  5. def main(args:Array[String]):Unit = {
  6. val spark = SparkSession
  7. .builder()
  8. .appName("TfDataFrame")
  9. .enableHiveSupport()
  10. .getOrCreate()
  11. val sc = spark.sparkContext
  12. import spark.implicits._
  13. val bundle = tf.SavedModelBundle
  14. .load("/Users/liangyun/CodeFiles/master_tensorflow2_in_20_hours/data/linear_model/1","serve")
  15. val broads = sc.broadcast(bundle)
  16. //构造预测函数,并将其注册成sparkSQL的udf
  17. val tfpredict = (features:WrappedArray[Float]) => {
  18. val bund = broads.value
  19. val sess = bund.session()
  20. val x = tf.Tensor.create(Array(features.toArray))
  21. val y = sess.runner().feed("serving_default_inputs:0", x)
  22. .fetch("StatefulPartitionedCall:0").run().get(0)
  23. val result = Array.ofDim[Float](y.shape()(0).toInt,y.shape()(1).toInt)
  24. y.copyTo(result)
  25. val y_pred = result(0)(0)
  26. y_pred
  27. }
  28. spark.udf.register("tfpredict",tfpredict)
  29. //构造DataFrame数据集,将features放到一列中
  30. val dfdata = sc.parallelize(List(Array(1.0f,2.0f),Array(3.0f,5.0f),Array(7.0f,8.0f))).toDF("features")
  31. dfdata.show
  32. //调用sparkSQL预测函数,增加一个新的列作为y_preds
  33. val dfresult = dfdata.selectExpr("features","tfpredict(features) as y_preds")
  34. dfresult.show
  35. bundle.close
  36. }
  37. }
  1. TfDataFrame.main(Array())
  1. +----------+
  2. | features|
  3. +----------+
  4. |[1.0, 2.0]|
  5. |[3.0, 5.0]|
  6. |[7.0, 8.0]|
  7. +----------+
  8. +----------+---------+
  9. | features| y_preds|
  10. +----------+---------+
  11. |[1.0, 2.0]| 3.019596|
  12. |[3.0, 5.0]|3.9264367|
  13. |[7.0, 8.0]| 8.828995|
  14. +----------+---------+

以上我们分别在spark 的RDD数据结构和DataFrame数据结构上实现了调用一个tf.keras实现的线性回归模型进行分布式模型推断。

在本例基础上稍作修改则可以用spark调用训练好的各种复杂的神经网络模型进行分布式模型推断。

但实际上tensorflow并不仅仅适合实现神经网络,其底层的计算图语言可以表达各种数值计算过程。

利用其丰富的低阶API,我们可以在tensorflow2.0上实现任意机器学习模型,

结合tf.Module提供的便捷的封装功能,我们可以将训练好的任意机器学习模型导出成模型文件并在spark上分布式调用执行。

这无疑为我们的工程应用提供了巨大的想象空间。

如果对本书内容理解上有需要进一步和作者交流的地方,欢迎在公众号”Python与算法之美”下留言。作者时间和精力有限,会酌情予以回复。

也可以在公众号后台回复关键字:加群,加入读者交流群和大家讨论。

image.png