Protobuf Data Source Guide

Since Spark 3.4.0 release, Spark SQL provides built-in support for reading and writing protobuf data.

Deploying

The spark-protobuf module is external and not included in spark-submit or spark-shell by default.

As with any Spark applications, spark-submit is used to launch your application. spark-protobuf_2.12 and its dependencies can be directly added to spark-submit using --packages, such as,

  1. ./bin/spark-submit --packages org.apache.spark:spark-protobuf_2.12:3.4.0 ...

For experimenting on spark-shell, you can also use --packages to add org.apache.spark:spark-protobuf_2.12 and its dependencies directly,

  1. ./bin/spark-shell --packages org.apache.spark:spark-protobuf_2.12:3.4.0 ...

See Application Submission Guide for more details about submitting applications with external dependencies.

to_protobuf() and from_protobuf()

The spark-protobuf package provides function to_protobuf to encode a column as binary in protobuf format, and from_protobuf() to decode protobuf binary data into a column. Both functions transform one column to another column, and the input/output SQL data type can be a complex type or a primitive type.

Using protobuf message as columns is useful when reading from or writing to a streaming source like Kafka. Each Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc.

  • If the “value” field that contains your data is in protobuf, you could use from_protobuf() to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a different sink.
  • to_protobuf() can be used to turn structs into protobuf message. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka.

Spark SQL schema is generated based on the protobuf descriptor file or protobuf class passed to from_protobuf and to_protobuf. The specified protobuf class or protobuf descriptor file must match the data, otherwise, the behavior is undefined: it may fail or return arbitrary results.

  1. import org.apache.spark.sql.protobuf.functions._
  2. // `from_protobuf` and `to_protobuf` provides two schema choices. First, via the protobuf descriptor
  3. // file, and then via the protobuf message class name.
  4. // give input .proto protobuf schema
  5. // syntax = "proto3"
  6. // message AppEvent {
  7. // string name = 1;
  8. // int64 id = 2;
  9. // string context = 3;
  10. // }
  11. val df = spark
  12. .readStream
  13. .format("kafka")
  14. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  15. .option("subscribe", "topic1")
  16. .load()
  17. // 1. Decode the Protobuf data of schema `AppEvent` into a struct;
  18. // 2. Filter by column `name`;
  19. // 3. Encode the column `event` in Protobuf format.
  20. // The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
  21. val output = df
  22. .select(from_protobuf($"value", "AppEvent", descriptorFilePath) as $"event")
  23. .where("event.name == \"alice\"")
  24. .select(to_protobuf($"user", "AppEvent", descriptorFilePath) as $"event")
  25. val query = output
  26. .writeStream
  27. .format("kafka")
  28. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  29. .option("topic", "topic2")
  30. .start()
  31. // Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
  32. // class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
  33. // it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
  34. // 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
  35. // https://github.com/rangadi/shaded-protobuf-classes.
  36. var output = df
  37. .select(from_protobuf($"value", "org.example.protos..AppEvent") as $"event")
  38. .where("event.name == \"alice\"")
  39. output.printSchema()
  40. // root
  41. // |--event: struct (nullable = true)
  42. // | |-- name : string (nullable = true)
  43. // | |-- id: long (nullable = true)
  44. // | |-- context: string (nullable = true)
  45. output = output.select(to_protobuf($"event", "org.sparkproject.spark_protobuf.protobuf.AppEvent") as $"event")
  46. val query = output
  47. .writeStream
  48. .format("kafka")
  49. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  50. .option("topic", "topic2")
  51. .start()
  1. import static org.apache.spark.sql.functions.col;
  2. import static org.apache.spark.sql.protobuf.functions.*;
  3. // `from_protobuf` and `to_protobuf` provides two schema choices. First, via the protobuf descriptor
  4. // file, and then via the protobuf message class name.
  5. // give input .proto protobuf schema
  6. // syntax = "proto3"
  7. // message AppEvent {
  8. // string name = 1;
  9. // int64 id = 2;
  10. // string context = 3;
  11. // }
  12. Dataset<Row> df = spark
  13. .readStream()
  14. .format("kafka")
  15. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  16. .option("subscribe", "topic1")
  17. .load();
  18. // 1. Decode the Protobuf data of schema `AppEvent` into a struct;
  19. // 2. Filter by column `name`;
  20. // 3. Encode the column `event` in Protobuf format.
  21. // The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
  22. Dataset<Row> output = df
  23. .select(from_protobuf(col("value"), "AppEvent", descriptorFilePath).as("event"))
  24. .where("event.name == \"alice\"")
  25. .select(to_protobuf(col("event"), "AppEvent", descriptorFilePath).as("event"));
  26. // Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
  27. // class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
  28. // it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
  29. // 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
  30. // https://github.com/rangadi/shaded-protobuf-classes.
  31. Dataset<Row> output = df
  32. .select(
  33. from_protobuf(col("value"),
  34. "org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"))
  35. .where("event.name == \"alice\"")
  36. output.printSchema()
  37. // root
  38. // |--event: struct (nullable = true)
  39. // | |-- name : string (nullable = true)
  40. // | |-- id: long (nullable = true)
  41. // | |-- context: string (nullable = true)
  42. output = output.select(
  43. to_protobuf(col("event"),
  44. "org.sparkproject.spark_protobuf.protobuf.AppEvent").as("event"));
  45. StreamingQuery query = output
  46. .writeStream()
  47. .format("kafka")
  48. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  49. .option("topic", "topic2")
  50. .start();
  1. from pyspark.sql.protobuf.functions import from_protobuf, to_protobuf
  2. # `from_protobuf` and `to_protobuf` provides two schema choices. First, via the protobuf descriptor
  3. # file, and then via the protobuf message class name.
  4. # give input .proto protobuf schema
  5. # syntax = "proto3"
  6. # message AppEvent {
  7. # string name = 1;
  8. # int64 id = 2;
  9. # string context = 3;
  10. # }
  11. df = spark\
  12. .readStream\
  13. .format("kafka")\
  14. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  15. .option("subscribe", "topic1")\
  16. .load()
  17. # 1. Decode the Protobuf data of schema `AppEvent` into a struct;
  18. # 2. Filter by column `name`;
  19. # 3. Encode the column `event` in Protobuf format.
  20. # The Protobuf protoc command can be used to generate a protobuf descriptor file for give .proto file.
  21. output = df\
  22. .select(from_protobuf("value", "AppEvent", descriptorFilePath).alias("event"))\
  23. .where('event.name == "alice"')\
  24. .select(to_protobuf("event", "AppEvent", descriptorFilePath).alias("event"))
  25. # Alternatively, you can decode and encode the SQL columns into protobuf format using protobuf
  26. # class name. The specified Protobuf class must match the data, otherwise the behavior is undefined:
  27. # it may fail or return arbitrary result. To avoid conflicts, the jar file containing the
  28. # 'com.google.protobuf.*' classes should be shaded. An example of shading can be found at
  29. # https://github.com/rangadi/shaded-protobuf-classes.
  30. output = df\
  31. .select(from_protobuf("value", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))\
  32. .where('event.name == "alice"')
  33. output.printSchema()
  34. # root
  35. # |--event: struct (nullable = true)
  36. # | |-- name : string (nullable = true)
  37. # | |-- id: long (nullable = true)
  38. # | |-- context: string (nullable = true)
  39. output = output
  40. .select(to_protobuf("event", "org.sparkproject.spark_protobuf.protobuf.AppEvent").alias("event"))
  41. query = output\
  42. .writeStream\
  43. .format("kafka")\
  44. .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  45. .option("topic", "topic2")\
  46. .start()

Supported types for Protobuf -> Spark SQL conversion

Currently Spark supports reading protobuf scalar types, enum types, nested type, and maps type under messages of Protobuf. In addition to the these types, spark-protobuf also introduces support for Protobuf OneOf fields. which allows you to handle messages that can have multiple possible sets of fields, but only one set can be present at a time. This is useful for situations where the data you are working with is not always in the same format, and you need to be able to handle messages with different sets of fields without encountering errors.

Protobuf typeSpark SQL type
booleanBooleanType
intIntegerType
longLongType
floatFloatType
doubleDoubleType
stringStringType
enumStringType
bytesBinaryType
MessageStructType
repeatedArrayType
mapMapType
OneOfStruct

It also supports reading the following Protobuf types Timestamp and Duration

Protobuf logical typeProtobuf schemaSpark SQL type
durationMessageType{seconds: Long, nanos: Int}DayTimeIntervalType
timestampMessageType{seconds: Long, nanos: Int}TimestampType

Supported types for Spark SQL -> Protobuf conversion

Spark supports the writing of all Spark SQL types into Protobuf. For most types, the mapping from Spark types to Protobuf types is straightforward (e.g. IntegerType gets converted to int);

Spark SQL typeProtobuf type
BooleanTypeboolean
IntegerTypeint
LongTypelong
FloatTypefloat
DoubleTypedouble
StringTypestring
StringTypeenum
BinaryTypebytes
StructTypemessage
ArrayTyperepeated
MapTypemap

Handling circular references protobuf fields

One common issue that can arise when working with Protobuf data is the presence of circular references. In Protobuf, a circular reference occurs when a field refers back to itself or to another field that refers back to the original field. This can cause issues when parsing the data, as it can result in infinite loops or other unexpected behavior. To address this issue, the latest version of spark-protobuf introduces a new feature: the ability to check for circular references through field types. This allows users use the recursive.fields.max.depth option to specify the maximum number of levels of recursion to allow when parsing the schema. By default, spark-protobuf will not permit recursive fields by setting recursive.fields.max.depth to -1. However, you can set this option to 0 to 10 if needed.

Setting recursive.fields.max.depth to 0 drops all recursive fields, setting it to 1 allows it to be recursed once, and setting it to 2 allows it to be recursed twice. A recursive.fields.max.depth value greater than 10 is not allowed, as it can lead to performance issues and even stack overflows.

SQL Schema for the below protobuf message will vary based on the value of recursive.fields.max.depth.

  1. syntax = "proto3"
  2. message Person {
  3. string name = 1;
  4. Person bff = 2
  5. }
  6. // The protobuf schema defined above, would be converted into a Spark SQL columns with the following
  7. // structure based on `recursive.fields.max.depth` value.
  8. 0: struct<name: string, bff: null>
  9. 1: struct<name string, bff: <name: string, bff: null>>
  10. 2: struct<name string, bff: <name: string, bff: struct<name: string, bff: null>>> ...