Basic Demo

This basic demo will guide you through using Spark to build and exportan ML pipeline to an MLeap Bundle and later use it to transform a dataframe using the MLeap Runtime.

Build and Export an MLeap Bundle

In this section we will programmatically create a simple Spark MLpipeline then export it to an MLeap Bundle. Our pipeline is very simple,it performs string indexing on a categorical feature then runs theresult through a binarizer to force the result to a 1 or 0. Thispipeline has no real-world purpose, but illustrates how easy it is tocreate MLeap Bundles from Spark ML pipelines.

  1. import ml.combust.bundle.BundleFile
  2. import ml.combust.mleap.spark.SparkSupport._
  3. import org.apache.spark.ml.Pipeline
  4. import org.apache.spark.ml.bundle.SparkBundleContext
  5. import org.apache.spark.ml.feature.{Binarizer, StringIndexer}
  6. import org.apache.spark.sql._
  7. import org.apache.spark.sql.functions._
  8. import resource._
  9. val datasetName = "./mleap-docs/assets/spark-demo.csv"
  10. val dataframe: DataFrame = spark.sqlContext.read.format("csv")
  11. .option("header", true)
  12. .load(datasetName)
  13. .withColumn("test_double", col("test_double").cast("double"))
  14. // User out-of-the-box Spark transformers like you normally would
  15. val stringIndexer = new StringIndexer().
  16. setInputCol("test_string").
  17. setOutputCol("test_index")
  18. val binarizer = new Binarizer().
  19. setThreshold(0.5).
  20. setInputCol("test_double").
  21. setOutputCol("test_bin")
  22. val pipelineEstimator = new Pipeline()
  23. .setStages(Array(stringIndexer, binarizer))
  24. val pipeline = pipelineEstimator.fit(dataframe)
  25. // then serialize pipeline
  26. val sbc = SparkBundleContext().withDataset(pipeline.transform(dataframe))
  27. for(bf <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline.zip"))) {
  28. pipeline.writeBundle.save(bf)(sbc).get
  29. }

Dataset used for training can be found here.

NOTE: right click and “Save As…”, Gitbook prevents directly clicking on the link.

Import and MLeap Bundle

In this section we will load the MLeap Bundle from the first sectioninto the MLeap Runtime. We will then use the MLeap Runtime transformer totransform a leap frame.

  1. import ml.combust.bundle.BundleFile
  2. import ml.combust.mleap.runtime.MleapSupport._
  3. import resource._
  4. // load the Spark pipeline we saved in the previous section
  5. val bundle = (for(bundleFile <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline.zip"))) yield {
  6. bundleFile.loadMleapBundle().get
  7. }).opt.get
  8. // create a simple LeapFrame to transform
  9. import ml.combust.mleap.runtime.frame.{DefaultLeapFrame, Row}
  10. import ml.combust.mleap.core.types._
  11. // MLeap makes extensive use of monadic types like Try
  12. val schema = StructType(StructField("test_string", ScalarType.String),
  13. StructField("test_double", ScalarType.Double)).get
  14. val data = Seq(Row("hello", 0.6), Row("MLeap", 0.2))
  15. val frame = DefaultLeapFrame(schema, data)
  16. // transform the dataframe using our pipeline
  17. val mleapPipeline = bundle.root
  18. val frame2 = mleapPipeline.transform(frame).get
  19. val data2 = frame2.dataset
  20. // get data from the transformed rows and make some assertions
  21. assert(data2(0).getDouble(2) == 1.0) // string indexer output
  22. assert(data2(0).getDouble(3) == 1.0) // binarizer output
  23. // the second row
  24. assert(data2(1).getDouble(2) == 2.0)
  25. assert(data2(1).getDouble(3) == 0.0)

That’s it! This is a very simple example. Most likely you will not bemanually constructing Spark ML pipelines as we have done here, butrather you will be using estimators and pipelines together to train onyour data and produce useful models. For a more advanced example, seeour MNIST Demo.