Custom Transformers

Every transformer in MLeap can be considered a customtransformer. The only difference between the transformers and bundleintegration code you write and what we write is that ours gets includedin the release jars. We welcome transformer additions to the MLeap project,please make a PR.

There are plenty of examples in the MLeap source codefor how to write your own transformers and make them serializableto/from Spark and MLeap.

Let’s go through a simple example of writing a custom transformer thatmaps an input string to a double using a Map[String, Double]to store the data needed for transformation. We will call our customtransformer: StringMap. This is a transformer that is included inMLeap source code, and you can view it here: StringMapModel.scala.

Overview

A brief overview of the steps involved:

  1. Build our core model logic that can be shared between Spark and MLeap
  2. Build the MLeap transformer
  3. Build the Spark transformer
  4. Build bundle serialization for MLeap
  5. Build bundle serialization for Spark
  6. Configure the MLeap Bundle registries with the MLeap and Sparkcustom transformer

Core Model

The core model is the logic needed to transform the input data. It has no dependencieson Spark or MLeap. In the case of our StringMapModel, it is a class thatknows how to map one string to a double. Let’s see what this lookslike in Scala.

StringMapModel.scala

  1. case class StringMapModel(labels: Map[String, Double]) extends Model {
  2. def apply(label: String): Double = labels(label)
  3. override def inputSchema: StructType = StructType("input" -> ScalarType.String).get
  4. override def outputSchema: StructType = StructType("output" -> ScalarType.Double).get
  5. }

The case class has a set of labels that it know how to map to a double.This is very similar to a StringIndexerModel except that the values ofthe strings are arbitrary and not in sequence.

MLeap Transformer

The MLeap transformer is the piece of code that knows how to executeyour core model against a leap frame. All MLeap transformers inheritfrom a base class: ml.combust.mleap.runtime.transformer.Transformer.For our example StringMap transformer, we can use a utility base classfor simple input/output transformers called:ml.combust.mleap.runtime.transformer.SimpleTransformer. This baseclass takes care of a small amount of boilerplate for any transformerthat has exactly one input and one output column.

Here is the Scala code for the MLeap transformer.

StringMap.scala

  1. import ml.combust.mleap.core.feature.StringMapModel
  2. import ml.combust.mleap.core.types.NodeShape
  3. import ml.combust.mleap.runtime.function.UserDefinedFunction
  4. import ml.combust.mleap.runtime.transformer.{SimpleTransformer, Transformer}
  5. case class StringMap(override val uid: String = Transformer.uniqueName("string_map"),
  6. override val shape: NodeShape,
  7. override val model: StringMapModel) extends SimpleTransformer {
  8. override val exec: UserDefinedFunction = (label: String) => model(label)

Note the UserDefinedFunction exec. This is an MLeapUserDefinedFunction that gets created from a Scala function usingreflection. UserDefinedFunctions are the primary way that MLeap allowsus to transform LeapFrames. The NodeShape shape defines the inputColand outputCol for this transformer.

Spark Transformer

The Spark transformer knows how to execute the core model against aSpark DataFrame. All Spark transformers inherit fromorg.apache.spark.ml.Transformer. If you have ever written a customSpark transformer before, this process will be very familiar.

Here is what a custom Spark transformer looks like in Scala.

StringMap.scala

  1. import ml.combust.mleap.core.feature.StringMapModel
  2. import org.apache.spark.annotation.DeveloperApi
  3. import org.apache.spark.ml.Transformer
  4. import org.apache.spark.ml.param.ParamMap
  5. import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
  6. import org.apache.spark.ml.util.Identifiable
  7. import org.apache.spark.sql.functions._
  8. import org.apache.spark.sql.{DataFrame, Dataset}
  9. import org.apache.spark.sql.types._
  10. class StringMap(override val uid: String,
  11. val model: StringMapModel) extends Transformer
  12. with HasInputCol
  13. with HasOutputCol {
  14. def this(model: StringMapModel) = this(uid = Identifiable.randomUID("string_map"), model = model)
  15. def setInputCol(value: String): this.type = set(inputCol, value)
  16. def setOutputCol(value: String): this.type = set(outputCol, value)
  17. @org.apache.spark.annotation.Since("2.0.0")
  18. override def transform(dataset: Dataset[_]): DataFrame = {
  19. val stringMapUdf = udf {
  20. (label: String) => model(label)
  21. }
  22. dataset.withColumn($(outputCol), stringMapUdf(dataset($(inputCol))))
  23. }
  24. override def copy(extra: ParamMap): Transformer = copyValues(new StringMap(uid, model), extra)
  25. @DeveloperApi
  26. override def transformSchema(schema: StructType): StructType = {
  27. require(schema($(inputCol)).dataType.isInstanceOf[StringType],
  28. s"Input column must be of type StringType but got ${schema($(inputCol)).dataType}")
  29. val inputFields = schema.fields
  30. require(!inputFields.exists(_.name == $(outputCol)),
  31. s"Output column ${$(outputCol)} already exists.")
  32. StructType(schema.fields :+ StructField($(outputCol), DoubleType))
  33. }
  34. }

MLeap Serialization

We need to define how to serialize/deserialize our model to/from anMLeap Bundle. In order to do this, we make an implementation ofml.combust.mleap.bundle.ops.MleapOp and ml.combust.bundle.op.OpModel for ourMLeap transformer and core model, respectively. These type classes areall we need to define bundle serialization.

Here is what the serialization code looks like for our MLeap transformerin Scala.

NOTE: The code below looks long, but most of it is auto-generated by the IDE.

StringMapOp.scala

  1. import ml.combust.bundle.BundleContext
  2. import ml.combust.bundle.dsl._
  3. import ml.combust.bundle.op.OpModel
  4. import ml.combust.mleap.bundle.ops.MleapOp
  5. import ml.combust.mleap.core.feature.StringMapModel
  6. import ml.combust.mleap.runtime.MleapContext
  7. import ml.combust.mleap.runtime.transformer.feature.StringMap
  8. class StringMapOp extends MleapOp[StringMap, StringMapModel] {
  9. override val Model: OpModel[MleapContext, StringMapModel] = new OpModel[MleapContext, StringMapModel] {
  10. // the class of the model is needed for when we go to serialize JVM objects
  11. override val klazz: Class[StringMapModel] = classOf[StringMapModel]
  12. // a unique name for our op: "string_map"
  13. override def opName: String = Bundle.BuiltinOps.feature.string_map
  14. override def store(model: Model, obj: StringMapModel)
  15. (implicit context: BundleContext[MleapContext]): Model = {
  16. // unzip our label map so we can store the label and the value
  17. // as two parallel arrays, we do this because MLeap Bundles do
  18. // not support storing data as a map
  19. val (labels, values) = obj.labels.toSeq.unzip
  20. // add the labels and values to the Bundle model that
  21. // will be serialized to our MLeap bundle
  22. model.withValue("labels", Value.stringList(labels)).
  23. withValue("values", Value.doubleList(values))
  24. }
  25. override def load(model: Model)
  26. (implicit context: BundleContext[MleapContext]): StringMapModel = {
  27. // retrieve our list of labels
  28. val labels = model.value("labels").getStringList
  29. // retrieve our list of values
  30. val values = model.value("values").getDoubleList
  31. // reconstruct the model using the parallel labels and values
  32. StringMapModel(labels.zip(values).toMap)
  33. }
  34. }
  35. // the core model that is used by the transformer
  36. override def model(node: StringMap): StringMapModel = node.model
  37. }

We will need to register StringMapOp with the MLeap bundle registry atruntime to let MLeap know about it. We go over the registry later inthis article.

Spark Serialization

We also need to define how to serialize/deserialize the custom Sparktransformer to/from MLeap. This is very similar to the process we tookfor the MLeap transformer above. We will again be implementing bothml.combust.bundle.op.OpNode and ml.combust.bundle.op.OpModel.

Here is what the serialization code looks like for StringMap in Scala.

StringMapOp.scala

  1. import ml.combust.bundle.BundleContext
  2. import ml.combust.bundle.dsl._
  3. import ml.combust.bundle.op.{OpModel, OpNode}
  4. import ml.combust.mleap.core.feature.StringMapModel
  5. import ml.combust.mleap.runtime.MleapContext
  6. import org.apache.spark.ml.bundle.SparkBundleContext
  7. import org.apache.spark.ml.mleap.feature.StringMap
  8. class StringMapOp extends OpNode[SparkBundleContext, StringMap, StringMapModel] {
  9. override val Model: OpModel[SparkBundleContext, StringMapModel] = new OpModel[SparkBundleContext, StringMapModel] {
  10. // the class of the model is needed for when we go to serialize JVM objects
  11. override val klazz: Class[StringMapModel] = classOf[StringMapModel]
  12. // a unique name for our op: "string_map"
  13. // this should be the same as for the MLeap transformer serialization
  14. override def opName: String = Bundle.BuiltinOps.feature.string_map
  15. override def store(model: Model, obj: StringMapModel)
  16. (implicit context: BundleContext[SparkBundleContext]): Model = {
  17. // unzip our label map so we can store the label and the value
  18. // as two parallel arrays, we do this because MLeap Bundles do
  19. // not support storing data as a map
  20. val (labels, values) = obj.labels.toSeq.unzip
  21. // add the labels and values to the Bundle model that
  22. // will be serialized to our MLeap bundle
  23. model.withValue("labels", Value.stringList(labels)).
  24. withValue("values", Value.doubleList(values))
  25. }
  26. override def load(model: Model)
  27. (implicit context: BundleContext[SparkBundleContext]): StringMapModel = {
  28. // retrieve our list of labels
  29. val labels = model.value("labels").getStringList
  30. // retrieve our list of values
  31. val values = model.value("values").getDoubleList
  32. // reconstruct the model using the parallel labels and values
  33. StringMapModel(labels.zip(values).toMap)
  34. }
  35. }
  36. override val klazz: Class[StringMap] = classOf[StringMap]
  37. override def name(node: StringMap): String = node.uid
  38. override def model(node: StringMap): StringMapModel = node.model
  39. override def load(node: Node, model: StringMapModel)
  40. (implicit context: BundleContext[SparkBundleContext]): StringMap = {
  41. new StringMap(uid = node.name, model = model).
  42. setInputCol(node.shape.standardInput.name).
  43. setOutputCol(node.shape.standardOutput.name)
  44. }
  45. override def shape(node: StringMap)(implicit context: BundleContext[SparkBundleContext]): NodeShape =
  46. NodeShape().withStandardIO(node.getInputCol, node.getOutputCol)
  47. }

We will need to register this with the MLeap registry as well, so thatMLeap knows how to serialize this Spark transformer.

MLeap Bundle Registries

A registry contains all of the custom transformers and types for a givenexecution engine. In this case, we support the MLeap and Spark executionengines for the StringMap transformer, so we will have to configureboth the Spark and MLeap registry to know how to serialize/deserializetheir respective transformers.

MLeap uses Typesafe Config toconfigure registries. By default, MLeap ships with registries configuredfor the Spark runtime and the MLeap runtime. You can take a look at eachof them here:

  1. MLeap Registry
  2. Spark Registry

By default, the MLeap runtime uses the configuration at ml.combust.mleap.registry.default.Spark uses the configuration at ml.combust.mleap.spark.registry.default.

MLeap Registry

In order to add the custom transformer to the default MLeap registry, we will add areference.conf file to our own project that looks like this:

  1. // make a list of all your custom transformers
  2. // the list contains the fully-qualified class names of the
  3. // OpNode implementations for your transformers
  4. my.domain.mleap.ops = ["my.domain.mleap.ops.StringMapOp"]
  5. // include the custom transformers we have defined to the default MLeap registry
  6. ml.combust.mleap.registry.default.ops += "my.domain.mleap.ops"

Spark Registry

In order to add the custom transformer to the default Spark registry, we will add areference.conf file to our own project that looks like this:

  1. // make a list of all your custom transformers
  2. // the list contains the fully-qualified class names of the
  3. // OpNode implementations for your transformers
  4. my.domain.mleap.spark.ops = ["my.domain.spark.ops.StringMapOp"]
  5. // include the custom transformers ops we have defined to the default Spark registries
  6. ml.combust.mleap.spark.registry.v20.ops += my.domain.mleap.spark.ops
  7. ml.combust.mleap.spark.registry.v21.ops += my.domain.mleap.spark.ops
  8. ml.combust.mleap.spark.registry.v22.ops += my.domain.mleap.spark.ops