Data Preparation

DataFrame Metadata

Column metadata is one of the most useful and the least known features of the Spark Dataset. It is worth noting that all features described below, although not private, are part of the developer API and as such can be unstable or even removed in minor versions.

Metadata in ML pipelines

Although it is widely used by ML Pipelines to indicate variable types and levels a whole process is usually completely transparent and at least partially hidden from the final user so let’s look at a simple pipeline and see what happens behind the scenes.

We’ll start with a simple dataset:

  1. val df = Seq(
  2. (0.0, "x", 2.0),
  3. (1.0, "y", 3.0),
  4. (2.0, "x", -1.0)
  5. ).toDF("label", "x1", "x2")

and a following pipeline:

  1. import org.apache.spark.ml.Pipeline
  2. import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
  3. val stages = Array(
  4. new StringIndexer().setInputCol("x1").setOutputCol("x1_"),
  5. new VectorAssembler().setInputCols(Array("x1_", "x2")).setOutputCol("features")
  6. )
  7. val model = new Pipeline().setStages(stages).fit(df)

Now we can extract stages, transform data step-by-step:

  1. val dfs = model.stages.scanLeft(df)((df, stage) => stage.transform(df))

and see what is going on at each stage:

  1. Our initial dataset has no metadata:

    1. dfs(0).schema.map(_.metadata)
    2. // Seq[org.apache.spark.sql.types.Metadata] = List({}, {}, {}, {})
  2. After transforming with StringIndexerModel we can see indexer specific metadata:

    1. dfs(1).schema.last.metadata
    2. // org.apache.spark.sql.types.Metadata =
    3. // {"ml_attr":{"vals":["x","y"],"type":"nominal","name":"x1_"}}

    It is important to note that this information is stored locally so it is better to keep that in mind if number of unique values is large.

  3. Finally metadata for assembled feature vector:

    1. dfs(2).schema.last.metadata
    2. // org.apache.spark.sql.types.Metadata = {"ml_attr":{"attrs":{
    3. // "numeric":[{"idx":1,"name":"x2"}],
    4. // "nominal":[{"vals":["x","y"],"idx":0,"name":"x1_"}]
    5. // },"num_attrs":2}}

    Metadata from upstream stages is picked by the assembler and used to describe vector indices.

Let’s check if metadata is actually used in practice:

  1. import org.apache.spark.ml.classification.DecisionTreeClassifier
  2. new DecisionTreeClassifier().setLabelCol("label").fit(dfs.last).toDebugString
  3. // String =
  4. // "DecisionTreeClassificationModel (uid=dtc_72c1c370aa00) of depth 2 with 5 nodes
  5. // If (feature 0 in {0.0})
  6. // If (feature 1 <= -1.0)
  7. // Predict: 2.0
  8. // Else (feature 1 > -1.0)
  9. // Predict: 1.0
  10. // Else (feature 0 not in {0.0})
  11. // Predict: 0.0
  12. // "

Note: Prior to Spark 2.0.0 label column would require indexing.

As you can see nominal and numerical features are recognized and used in different ways. Which is exactly the thing we would expect.

Setting ML attributes manually

Scala

So far so good but what if you work with data which has been already preprocessed? In case like this Spark provides a set of utilities designed to create ML compliant metadata. Let’s get familiar with a whole process by building metadata equivalent to the one generated by the ML pipeline we used before.

We’ll need a NominalAttribute:

  1. import org.apache.spark.ml.attribute.NominalAttribute
  2. val firstAttr = NominalAttribute.defaultAttr.withValues("x", "y").withName("x1_")

and a NumericAttribute

  1. import org.apache.spark.ml.attribute.NumericAttribute
  2. val secondAttr = NumericAttribute.defaultAttr.withName("x2")

Numeric attributes provide also a number of methods which can be used to store basic descriptive statistics like mean, standard deviation, minimum or maximum

Finally we combine attributes using AttributeGroup and convert it to Metadata object:

  1. import org.apache.spark.ml.attribute.AttributeGroup
  2. val featuresMetadata = new AttributeGroup("features", Array(firstAttr, secondAttr))

All what is left is quick sanity check:

  1. featuresMetadata == dfs.last.schema.last.metadata
  2. // Boolean = true

Generated metadata can be applied using Column.as method:

  1. /* Note We use local MLib API only to show VectorUDT usage.
  2. */
  3. import org.apache.spark.mllib.linalg.Vectors
  4. val records = Seq(
  5. (0.0, Vectors.dense(Array(0.0, 2.0))),
  6. (1.0, Vectors.dense(Array(1.0, 3.0))),
  7. (2.0, Vectors.dense(Array(0.0, -1.0)))
  8. )
  9. records.toDF("label", "features")
  10. .withColumn("features", $"features".as("features", featuresMetadata))

or added to schema when creating DataFrame:

  1. import org.apache.spark.sql.types._
  2. import org.apache.spark.mllib.linalg.VectorUDT
  3. val schema = StructType(Seq(
  4. StructField("label", DoubleType, false),
  5. StructField("features", new VectorUDT(), false, featuresMetadata)
  6. ))
Python

Unlike Scala Python doesn’t provide any helpers and metadata is simply represented as a standard Python dict.

  1. from pyspark.ml import Pipeline
  2. from pyspark.ml.feature import StringIndexer, VectorAssembler
  3. df = sc.parallelize((
  4. (0.0, "x", 2.0),
  5. (1.0, "y", 3.0),
  6. (2.0, "x", -1.0)
  7. )).toDF(["label", "x1", "x2"])
  8. model = Pipeline(stages=[
  9. StringIndexer(inputCol="x1", outputCol="x1_"),
  10. VectorAssembler(inputCols=["x1_", "x2"], outputCol="features"),
  11. ]).fit(df)
  12. model.transform(df).schema[-1].metadata
  13. ## {'ml_attr': {'attrs': {'nominal': [{'idx': 0,
  14. ## 'name': 'x1_',
  15. ## 'vals': ['x', 'y']}],
  16. ## 'numeric': [{'idx': 1, 'name': 'x2'}]},
  17. ## 'num_attrs': 2}}

Before Spark 2.2, PySpark doesn’t support attaching metadata to a single column. It is possible though, to use method similar to this one:

  1. import json
  2. from pyspark import SparkContext
  3. from pyspark.sql import Column
  4. from pyspark.sql.functions import col
  5. def withMeta(self, alias, meta):
  6. sc = SparkContext._active_spark_context
  7. jmeta = sc._gateway.jvm.org.apache.spark.sql.types.Metadata
  8. return Column(getattr(self._jc, "as")(alias, jmeta.fromJson(json.dumps(meta))))
  9. Column.withMeta = withMeta
  10. meta = {"ml_attr": {"name": "label_with_meta",
  11. "type": "nominal",
  12. "vals": ["0.0", "1.0", "2.0"]}}
  13. df_with_meta = df.withColumn("label_with_meta", col("label").withMeta("", meta))
  14. df_with_meta.schema[-1].metadata == meta
  15. ## True

Spark 2.2 added support for setting metadata with Column.alias:

  1. df_with_meta = df.withColumn("label_with_meta", col("label").alias("label", metadata=meta))
  2. df_with_meta.schema[-1].metadata == meta
  3. ## True

Setting custom column metadata

Arguably the true power of metadata shows itself when used outside restricted ML environment. It is possible to attach an arbitrary JSON document to each column using it to provenance tracking, storing diagnostic information or performing different data enrichment tasks.

Metadata object created from JSON string:

  1. import org.apache.spark.sql.types.Metadata
  2. Metadata.fromJson("""{"foo": "bar"}""")
  3. // org.apache.spark.sql.types.Metadata = {"foo":"bar"}

or constructed using MetadataBuilder:

  1. import org.apache.spark.sql.types.MetadataBuilder
  2. new MetadataBuilder().putString("foo", "bar").build
  3. // org.apache.spark.sql.types.Metadata = {"foo":"bar"}

Moreover it can attached to Parquet files and loaded back later:

  1. Seq((1L, "foo"), (2L, "bar"))
  2. .toDF("id", "txt")
  3. .withColumn("id", $"id".as("", Metadata.fromJson("""{"foo": "bar"}""")))
  4. .write.parquet("/tmp/foo")
  5. spark.read.parquet("/tmp/foo").schema.headOption.map(_.metadata)
  6. // Option[org.apache.spark.sql.types.Metadata] = Some({"foo":"bar"})

Accessing Metadata Directly

Metadata can be also accessed directly using Parquet tools:

  1. import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsScalaMapConverter}
  2. import org.apache.parquet.hadoop.ParquetFileReader
  3. import org.apache.hadoop.fs.{FileSystem, Path}
  4. import org.apache.hadoop.conf.Configuration
  5. val conf = spark.sparkContext.hadoopConfiguration
  6. def getFooters(conf: Configuration, path: String) = {
  7. val fs = FileSystem.get(conf)
  8. val footers = ParquetFileReader.readAllFootersInParallel(conf, fs.getFileStatus(new Path(path)))
  9. footers
  10. }
  11. def getFileMetadata(conf: Configuration, path: String) = {
  12. getFooters(conf, path)
  13. .asScala.map(_.getParquetMetadata.getFileMetaData.getKeyValueMetaData.asScala)
  14. }
  15. getFileMetadata(conf, "/tmp/foo").headOption
  16. // Option[scala.collection.mutable.Map[String,String]] =
  17. // Some(Map(org.apache.spark.sql.parquet.row.metadata ->
  18. // {"type":"struct","fields":[{"name":"id","type":"long","nullable":false,"metadata":{"foo":"bar"}}
  19. // {"name":"txt","type":"string","nullable":true,"metadata":{}}]}))

We can also use extracted footers to write standalone metadata file when needed:

  1. import org.apache.parquet.hadoop.ParquetFileWriter
  2. def createMetadata(conf: Configuration, path: String) = {
  3. val footers = getFooters(conf, path)
  4. ParquetFileWriter.writeMetadataFile(conf, new Path(path), footers)
  5. }