自定义 Transformer

MLeap 中的每一个内置 Transformer 都可以被视为自定义 Transformer。你所写的和我们所写的 Transformer / Bundle 集成代码(Bundle Integration Code)的唯一区别在于,我们的代码被内置到发布的 Jar 包中。我们欢迎用户通过提交 PR 来为 MLeap 项目添砖加瓦。

我们的 MLeap 源码中有大量的例子用来示范如何编写自己的 Transformer,以及如何实现 Transformer 在 Spark 和 MLeap 之间的序列化转换。

让我们通过一个简单的例子来完整过一遍自定义 Transformer 的过程,这个例子使用了一个包含我们所有待转换数据的 Map[String, Double] 对象,来将输入的字符串通映射成浮点值。

我们把这个自定义 Transformer 命名为 StringMap。你可以从 StringMapModel.scala 获取到它的源代码。

概述

简单概述所需步骤:

  1. 构建我们的核心模型逻辑(Core Model Logic)用于在 Spark 和 MLeap 之间共享数据。
  2. 构建 MLeap Transformer
  3. 构建 Spark Transformer
  4. 为 MLeap 编写 Bundle 序列化器
  5. 为 Spark 编写 Bundle 序列化器
  6. 注册 MLeap Transformer 和 Spark Transformer 到 MLeap Bundle 注册表(Registry)中

核心模型

核心模型包含转换输入数据的相关逻辑。核心模型不包含任何对 Spark 和 MLeap 的依赖。在我们 StringMapModel 的例子中,核心模型是一个类,它知道如何将字符串映射到浮点值。让我们看看它的 Scala 实现。

StringMapModel.scala

  1. case class StringMapModel(labels: Map[String, Double]) extends Model {
  2. def apply(label: String): Double = labels(label)
  3. override def inputSchema: StructType = StructType("input" -> ScalarType.String).get
  4. override def outputSchema: StructType = StructType("output" -> ScalarType.Double).get
  5. }

这个 case class 包含了一个 labels 成员,存储字符串映射到浮点值的映射关系。它非常类似于 StringIndexerModel,但是它的字符串是随意且无序的。

MLeap Transformer

MLeap Transformer 包含将核心模型逻辑应用到 Leap Frame 的代码段。所有的 MLeap Transformer 继承自基类 ml.combust.mleap.runtime.transformer.Transformer。我们在 StringMap 例子中使用了一个工具基类 ml.combust.mleap.runtime.transformer.SimpleTransformer 来实现简单的输入输出数据转换。这个基类可以作为任何仅包含单条输入和单条输出的 Transformer 的样板类。

以下是 MLeap Transformer 的 Scala 源码。

StringMap.scala

  1. import ml.combust.mleap.core.feature.StringMapModel
  2. import ml.combust.mleap.core.types.NodeShape
  3. import ml.combust.mleap.runtime.function.UserDefinedFunction
  4. import ml.combust.mleap.runtime.transformer.{SimpleTransformer, Transformer}
  5. case class StringMap(override val uid: String = Transformer.uniqueName("string_map"),
  6. override val shape: NodeShape,
  7. override val model: StringMapModel) extends SimpleTransformer {
  8. override val exec: UserDefinedFunction = (label: String) => model(label)

需要注意 exec 这个 UserDefinedFunction 实例。这是一个从 Scala 方法反射创建得到的 MLeap 用户定义方法。用户自定义方法是 MLeap 允许转换 Leap Frame 的主要手段。NodeShape 类的实例 shape 定义了 Transformer 的输入字段和输出字段。

Spark Transformer

Spark Transformer 知道如何将核心模型逻辑应用到 Spark DataFrame 中。所有的 Spark Transformer 都继承自 org.apache.spark.ml.Transformer。如果你之前曾实现过自定义 Spark Transformer,其过程非常相似。

以下是 Spark Transformer 的 Scala 源码:

StringMap.scala

  1. import ml.combust.mleap.core.feature.StringMapModel
  2. import org.apache.spark.annotation.DeveloperApi
  3. import org.apache.spark.ml.Transformer
  4. import org.apache.spark.ml.param.ParamMap
  5. import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
  6. import org.apache.spark.ml.util.Identifiable
  7. import org.apache.spark.sql.functions._
  8. import org.apache.spark.sql.{DataFrame, Dataset}
  9. import org.apache.spark.sql.types._
  10. class StringMap(override val uid: String,
  11. val model: StringMapModel) extends Transformer
  12. with HasInputCol
  13. with HasOutputCol {
  14. def this(model: StringMapModel) = this(uid = Identifiable.randomUID("string_map"), model = model)
  15. def setInputCol(value: String): this.type = set(inputCol, value)
  16. def setOutputCol(value: String): this.type = set(outputCol, value)
  17. @org.apache.spark.annotation.Since("2.0.0")
  18. override def transform(dataset: Dataset[_]): DataFrame = {
  19. val stringMapUdf = udf {
  20. (label: String) => model(label)
  21. }
  22. dataset.withColumn($(outputCol), stringMapUdf(dataset($(inputCol))))
  23. }
  24. override def copy(extra: ParamMap): Transformer = copyValues(new StringMap(uid, model), extra)
  25. @DeveloperApi
  26. override def transformSchema(schema: StructType): StructType = {
  27. require(schema($(inputCol)).dataType.isInstanceOf[StringType],
  28. s"Input column must be of type StringType but got ${schema($(inputCol)).dataType}")
  29. val inputFields = schema.fields
  30. require(!inputFields.exists(_.name == $(outputCol)),
  31. s"Output column ${$(outputCol)} already exists.")
  32. StructType(schema.fields :+ StructField($(outputCol), DoubleType))
  33. }
  34. }

MLeap 序列化

我们需要定义如何序列化我们的模型到 MLeap Bundle,以及如何从 MLeap Bundle 反序列化得到我们的模型。为了达到这个目的,我们需要为 MLeap Transformer 和核心模型实现 ml.combust.mleap.bundle.ops.MleapOp and ml.combust.bundle.op.OpModel 两个抽象类。这两个类是我们定义 bundle 序列化所需要的所有类。

以下是我们实现 MLeap Transformer 序列化过程的 Scala 源码。

注意:下面的代码看上去很长,其实大多是都是 IDE 自动生成的。

StringMapOp.scala

  1. import ml.combust.bundle.BundleContext
  2. import ml.combust.bundle.dsl._
  3. import ml.combust.bundle.op.OpModel
  4. import ml.combust.mleap.bundle.ops.MleapOp
  5. import ml.combust.mleap.core.feature.StringMapModel
  6. import ml.combust.mleap.runtime.MleapContext
  7. import ml.combust.mleap.runtime.transformer.feature.StringMap
  8. class StringMapOp extends MleapOp[StringMap, StringMapModel] {
  9. override val Model: OpModel[MleapContext, StringMapModel] = new OpModel[MleapContext, StringMapModel] {
  10. // the class of the model is needed for when we go to serialize JVM objects
  11. override val klazz: Class[StringMapModel] = classOf[StringMapModel]
  12. // a unique name for our op: "string_map"
  13. override def opName: String = Bundle.BuiltinOps.feature.string_map
  14. override def store(model: Model, obj: StringMapModel)
  15. (implicit context: BundleContext[MleapContext]): Model = {
  16. // unzip our label map so we can store the label and the value
  17. // as two parallel arrays, we do this because MLeap Bundles do
  18. // not support storing data as a map
  19. val (labels, values) = obj.labels.toSeq.unzip
  20. // add the labels and values to the Bundle model that
  21. // will be serialized to our MLeap bundle
  22. model.withValue("labels", Value.stringList(labels)).
  23. withValue("values", Value.doubleList(values))
  24. }
  25. override def load(model: Model)
  26. (implicit context: BundleContext[MleapContext]): StringMapModel = {
  27. // retrieve our list of labels
  28. val labels = model.value("labels").getStringList
  29. // retrieve our list of values
  30. val values = model.value("values").getDoubleList
  31. // reconstruct the model using the parallel labels and values
  32. StringMapModel(labels.zip(values).toMap)
  33. }
  34. }
  35. // the core model that is used by the transformer
  36. override def model(node: StringMap): StringMapModel = node.model
  37. }

我们还需要注册 StringMapOp 到 MLeap Bundle 注册表中,以让 MLeap 在运行时知道它的存在。我们会在本文稍后来讨论注册表。

Spark 序列化

我们同样需要定义如何序列化 / 反序列化我们的 Spark 模型。其与我们处理 MLeap Transformer 的过程非常相似。我们需要再次实现 ml.combust.bundle.op.OpNodeml.combust.bundle.op.OpModel 两个类。

我们的序列化 Scala 代码如下。

StringMapOp.scala

  1. import ml.combust.bundle.BundleContext
  2. import ml.combust.bundle.dsl._
  3. import ml.combust.bundle.op.{OpModel, OpNode}
  4. import ml.combust.mleap.core.feature.StringMapModel
  5. import ml.combust.mleap.runtime.MleapContext
  6. import org.apache.spark.ml.bundle.SparkBundleContext
  7. import org.apache.spark.ml.mleap.feature.StringMap
  8. class StringMapOp extends OpNode[SparkBundleContext, StringMap, StringMapModel] {
  9. override val Model: OpModel[SparkBundleContext, StringMapModel] = new OpModel[SparkBundleContext, StringMapModel] {
  10. // the class of the model is needed for when we go to serialize JVM objects
  11. override val klazz: Class[StringMapModel] = classOf[StringMapModel]
  12. // a unique name for our op: "string_map"
  13. // this should be the same as for the MLeap transformer serialization
  14. override def opName: String = Bundle.BuiltinOps.feature.string_map
  15. override def store(model: Model, obj: StringMapModel)
  16. (implicit context: BundleContext[SparkBundleContext]): Model = {
  17. // unzip our label map so we can store the label and the value
  18. // as two parallel arrays, we do this because MLeap Bundles do
  19. // not support storing data as a map
  20. val (labels, values) = obj.labels.toSeq.unzip
  21. // add the labels and values to the Bundle model that
  22. // will be serialized to our MLeap bundle
  23. model.withValue("labels", Value.stringList(labels)).
  24. withValue("values", Value.doubleList(values))
  25. }
  26. override def load(model: Model)
  27. (implicit context: BundleContext[SparkBundleContext]): StringMapModel = {
  28. // retrieve our list of labels
  29. val labels = model.value("labels").getStringList
  30. // retrieve our list of values
  31. val values = model.value("values").getDoubleList
  32. // reconstruct the model using the parallel labels and values
  33. StringMapModel(labels.zip(values).toMap)
  34. }
  35. }
  36. override val klazz: Class[StringMap] = classOf[StringMap]
  37. override def name(node: StringMap): String = node.uid
  38. override def model(node: StringMap): StringMapModel = node.model
  39. override def load(node: Node, model: StringMapModel)
  40. (implicit context: BundleContext[SparkBundleContext]): StringMap = {
  41. new StringMap(uid = node.name, model = model).
  42. setInputCol(node.shape.standardInput.name).
  43. setOutputCol(node.shape.standardOutput.name)
  44. }
  45. override def shape(node: StringMap)(implicit context: BundleContext[SparkBundleContext]): NodeShape =
  46. NodeShape().withStandardIO(node.getInputCol, node.getOutputCol)
  47. }

我们同样需要注册这个类到 MLeap 注册表中,从而让 MLeap 知道如何序列化 Spark Transformer。

MLeap Bundle 注册

一个注册表包含了所有的自定义 Transformers,以及给定的执行引擎的类型。在我们的例子中,我们提供了 StringMap 的 MLeap / Spark 执行引擎,因为我们必须要分别配置 Spark 和 MLeap 的注册表,以让 MLeap 知道如何序列化两者各自的 Transformers。

MLeap 使用 Typesafe Config 来配置这些注册表。默认情况下,MLeap 自带了 Spark Runtime 和 MLeap Runtime 的注册表配置。你可以通过如下链接来查阅对应的配置文件:

  1. MLeap 注册表
  2. Spark 注册表

默认情况下,MLeap Runtime 使用 ml.combust.mleap.registry.default 中的配置,Spark 使用 ml.combust.mleap.spark.registry.default 中的配置。

MLeap 注册表

为了添加自定义 Transformer 到默认的 MLeap 注册表中,我们需要添加一个 reference.conf 到我们的文件中,它看起来像这样:

  1. // make a list of all your custom transformers
  2. // the list contains the fully-qualified class names of the
  3. // OpNode implementations for your transformers
  4. my.domain.mleap.ops = ["my.domain.mleap.ops.StringMapOp"]
  5. // include the custom transformers we have defined to the default MLeap registry
  6. ml.combust.mleap.registry.default.ops += "my.domain.mleap.ops"

Spark 注册表

为了添加自定义 Transformer 到默认的 Spark 注册表中,我们需要添加一个 reference.conf 到我们的文件中,它看起来像这样:

  1. // make a list of all your custom transformers
  2. // the list contains the fully-qualified class names of the
  3. // OpNode implementations for your transformers
  4. my.domain.mleap.spark.ops = ["my.domain.spark.ops.StringMapOp"]
  5. // include the custom transformers ops we have defined to the default Spark registries
  6. ml.combust.mleap.spark.registry.v20.ops += my.domain.mleap.spark.ops
  7. ml.combust.mleap.spark.registry.v21.ops += my.domain.mleap.spark.ops
  8. ml.combust.mleap.spark.registry.v22.ops += my.domain.mleap.spark.ops