MNIST Demo

本教程会向你展示如何使用 MLeap 和 Bundle.ML 组件来导出一个 Spark ML Pipeline,并在完全不依赖 Spark Context 的前提下,使用 MLeap 来转换新数据。

我们会构建一个基于 MNIST 数据集训练,包含一个 Vector Assembler、一个 Binarizer、一个 PCA 以及一个 Random Forest Model,用于手写图像分类的 Pipeline。这个练习的目的不是为了训练得到一个最优模型,而是演示在 Spark 中训练一个 Pipeline 然后在 Spark 之外部署这个 Pipeline(数据处理 + 算法)是多么得简单。

本教程的代码分为两个部分:

  • Spark ML Pipeline 代码:原生 Spark 代码,用于训练 ML Pipeline,而后把它序列化成 Bundle.ML。
  • MLeap 代码:加载一个序列化后的 Bundle 到 MLeap,然后用其转换 Leap Frame。

开始之前,我们先来了解一些术语:

名词

  • Estimator:真正意义上的机器学习算法,基于 Data Frame 训练 Transformer 并产生一个模型。
  • 模型:在 Spark 里面,模型是代码和元数据,它基于训练过的算法对新数据进行评分。
  • Transformer:任何用于转换 Data Frame 的都被叫做 Transformer,对于训练一个 Estimator 来说 Transformer 不是必须的(例如一个 Binarizer)。
  • LeapFrame:一种 Data Frame 的数据结构,用于存储数据以及相关联的 Schema。

训练一个 Spark Pipeline

加载数据

  1. // Note that we are taking advantage of com.databricks:spark-csv package to load the data
  2. import org.apache.spark.ml.feature.{VectorAssembler,StringIndexer,IndexToString, Binarizer}
  3. import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
  4. import org.apache.spark.ml.evaluation.{MulticlassClassificationEvaluator}
  5. import org.apache.spark.ml.{Pipeline,PipelineModel}
  6. import org.apache.spark.ml.feature.PCA
  7. // MLeap/Bundle.ML Serialization Libraries
  8. import ml.combust.mleap.spark.SparkSupport._
  9. import resource._
  10. import ml.combust.bundle.BundleFile
  11. import org.apache.spark.ml.bundle.SparkBundleContext
  12. val datasetPath = "./mleap-demo/data/mnist/mnist_train.csv"
  13. var dataset = spark.sqlContext.read.format("com.databricks.spark.csv").
  14. option("header", "true").
  15. option("inferSchema", "true").
  16. load(datasetPath)
  17. val testDatasetPath = "./mleap-demo/data/mnist/mnist_test.csv"
  18. var test = spark.sqlContext.read.format("com.databricks.spark.csv").
  19. option("inferSchema", "true").
  20. option("header", "true").
  21. load(testDatasetPath)

你可以下载训练测试数据集(存放在 S3 上),当然你必须要修改成自己的 datasetPathtestDatasetPath

原始数据托管在 Yann LeCun 的网站上

构建 ML Data Pipeline

  1. // Define Dependent and Independent Features
  2. val predictionCol = "label"
  3. val labels = Seq("0","1","2","3","4","5","6","7","8","9")
  4. val pixelFeatures = (0 until 784).map(x => s"x$x").toArray
  5. val layers = Array[Int](pixelFeatures.length, 784, 800, labels.length)
  6. val vector_assembler = new VectorAssembler()
  7. .setInputCols(pixelFeatures)
  8. .setOutputCol("features")
  9. val stringIndexer = { new StringIndexer()
  10. .setInputCol(predictionCol)
  11. .setOutputCol("label_index")
  12. .fit(dataset)
  13. }
  14. val binarizer = new Binarizer()
  15. .setInputCol(vector_assembler.getOutputCol)
  16. .setThreshold(127.5)
  17. .setOutputCol("binarized_features")
  18. val pca = new PCA().
  19. setInputCol(binarizer.getOutputCol).
  20. setOutputCol("pcaFeatures").
  21. setK(10)
  22. val featurePipeline = new Pipeline().setStages(Array(vector_assembler, stringIndexer, binarizer, pca))
  23. // Transform the raw data with the feature pipeline and persist it
  24. val featureModel = featurePipeline.fit(dataset)
  25. val datasetWithFeatures = featureModel.transform(dataset)
  26. // Select only the data needed for training and persist it
  27. val datasetPcaFeaturesOnly = datasetWithFeatures.select(stringIndexer.getOutputCol, pca.getOutputCol)
  28. val datasetPcaFeaturesOnlyPersisted = datasetPcaFeaturesOnly.persist()

我们本想让 Pipeline 包含随机森林模型,但目前有一个 Bug (SPARK-16845) 让我们暂时没法这么做(这个问题会在 2.2.0 中得到修复)。

训练一个随机森林模型

  1. // You can optionally experiment with CrossValidator and MulticlassClassificationEvaluator to determine optimal
  2. // settings for the random forest
  3. val rf = new RandomForestClassifier().
  4. setFeaturesCol(pca.getOutputCol).
  5. setLabelCol(stringIndexer.getOutputCol).
  6. setPredictionCol("prediction").
  7. setProbabilityCol("probability").
  8. setRawPredictionCol("raw_prediction")
  9. val rfModel = rf.fit(datasetPcaFeaturesOnlyPersisted)

序列化 ML Data Pipeline 和 RF Model 为 Bundle.ML

  1. import org.apache.spark.ml.mleap.SparkUtil
  2. val pipeline = SparkUtil.createPipelineModel(uid = "pipeline", Array(featureModel, rfModel))
  3. val sbc = SparkBundleContext().withDataset(rfModel.transform(datasetWithFeatures))
  4. for(bf <- managed(BundleFile("jar:file:/tmp/mnist-spark-pipeline.zip"))) {
  5. pipeline.writeBundle.save(bf)(sbc).get
  6. }

反序列化为 MLeap 和评分新数据

这一步的目的是展示如何反序列一个 bundle 然后使用它来对 Leap Frame 进行评分,而无需任何 Spark 依赖。你可以从我们的 S3 存储桶下载这个 mnist.json

  1. import ml.combust.mleap.runtime.MleapSupport._
  2. import ml.combust.mleap.runtime.MleapContext.defaultContext
  3. import java.io.File
  4. // load the Spark pipeline we saved in the previous section
  5. val mleapPipeline = (for(bf <- managed(BundleFile("jar:file:/tmp/mnist-spark-pipeline.zip"))) yield {
  6. bf.loadMleapBundle().get.root
  7. }).tried.get

从我们的 mleap-demo Git 仓库中加载一个样例 Leap Frame(data/mnist.json)。

  1. import ml.combust.mleap.runtime.serialization.FrameReader
  2. val s = scala.io.Source.fromURL("file:///./mleap-demo/mnist.json").mkString
  3. val bytes = s.getBytes("UTF-8")
  4. val frame = FrameReader("ml.combust.mleap.json").fromBytes(bytes)
  5. // transform the dataframe using our pipeline
  6. val frame2 = mleapPipeline.transform(frame).get
  7. val data = frame2.dataset

接下来你可以从这里拿到更多的示例和 Notebook。