转换 Leap Frame

无论是 MLeap 还是 Spark,Transformer 对于 Data Frame 的计算都是一种非常有用的抽象模型。让我们看看如何使用一个简单的 Transformer StringIndexer 来转换一帧 Data Frame。

  1. // Create a StringIndexer that knows how to index the two strings
  2. // In our leap frame
  3. val stringIndexer = StringIndexer(
  4. shape = NodeShape().withStandardInput("a_string").withStandardOutput("a_string_index"),
  5. model = StringIndexerModel(Seq("Hello, MLeap!", "Another row")))
  6. // Transform our leap frame using the StringIndexer transformer
  7. val indices = (for(lf <- stringIndexer.transform(leapFrame);
  8. lf2 <- lf.select("a_string_index")) yield {
  9. lf2.dataset.map(_.getDouble(0))
  10. }).get.toSeq
  11. // Make sure our indexer did its job
  12. assert(indices == Seq(0.0, 1.0))

使用 Pipeline 来转换 Leap Frame

上面的例子可能不是很有趣。当你使用 Leap Frame 和 Transformer 一起来构建一个包含从原始特征到某些预测算法在内的完整 Pipeline 时,它们的真正威力才开始体现。让我们构造一个 Pipeline,其先通过 String Indexer 来生成索引,并把索引传给 One Hot Encoder,而后执行线性回归算法。

  1. // Create our one hot encoder
  2. val oneHotEncoder = OneHotEncoder(shape = NodeShape.vector(1, 2,
  3. inputCol = "a_string_index",
  4. outputCol = "a_string_oh"),
  5. model = OneHotEncoderModel(2, dropLast = false))
  6. // Assemble some features together for use
  7. // By our linear regression
  8. val featureAssembler = VectorAssembler(
  9. shape = NodeShape().withInput("input0", "a_string_oh").
  10. withInput("input1", "a_double").withStandardOutput("features"),
  11. model = VectorAssemblerModel(Seq(TensorShape(2), ScalarShape())))
  12. // Create our linear regression
  13. // It has two coefficients, as the one hot encoder
  14. // Outputs vectors of size 2
  15. val linearRegression = LinearRegression(shape = NodeShape.regression(3),
  16. model = LinearRegressionModel(Vectors.dense(2.0, 3.0, 6.0), 23.5))
  17. // Create a pipeline from all of our transformers
  18. val pipeline = Pipeline(
  19. shape = NodeShape(),
  20. model = PipelineModel(Seq(stringIndexer, oneHotEncoder, featureAssembler, linearRegression)))
  21. // Transform our leap frame using the pipeline
  22. val predictions = (for(lf <- pipeline.transform(leapFrame);
  23. lf2 <- lf.select("prediction")) yield {
  24. lf2.dataset.map(_.getDouble(0))
  25. }).get.toSeq
  26. // Print our predictions
  27. // > 365.70000000000005
  28. // > 166.89999999999998
  29. println(predictions.mkString("\n"))

这个任务体现了 MLeap 的意义在于执行我们通过 Spark、PySpark、Scikit-Learn 或者 Tensorflow 等机器学习框架训练得到的 Pipeline。