Protobuf

This Apache Druid extension enables Druid to ingest and understand the Protobuf data format. Make sure to include druid-protobuf-extensions as an extension.

The druid-protobuf-extensions provides the Protobuf Parser for stream ingestion. See corresponding docs for details.

Example: Load Protobuf messages from Kafka

This example demonstrates how to load Protobuf messages from Kafka. Please read the Load from Kafka tutorial first. This example will use the same “metrics” dataset.

Files used in this example are found at ./examples/quickstart/protobuf in your Druid directory.

  • We will use Kafka Indexing Service.
  • Kafka broker host is localhost:9092.
  • Kafka topic is metrics_pb instead of metrics.
  • datasource name is metrics-kafka-pb instead of metrics-kafka to avoid the confusion.

Here is the metrics JSON example.

  1. {
  2. "unit": "milliseconds",
  3. "http_method": "GET",
  4. "value": 44,
  5. "timestamp": "2017-04-06T02:36:22Z",
  6. "http_code": "200",
  7. "page": "/",
  8. "metricType": "request/latency",
  9. "server": "www1.example.com"
  10. }

Proto file

The proto file should look like this. Save it as metrics.proto.

  1. syntax = "proto3";
  2. message Metrics {
  3. string unit = 1;
  4. string http_method = 2;
  5. int32 value = 3;
  6. string timestamp = 4;
  7. string http_code = 5;
  8. string page = 6;
  9. string metricType = 7;
  10. string server = 8;
  11. }

Descriptor file

Using the protoc Protobuf compiler to generate the descriptor file. Save the metrics.desc file either in the classpath or reachable by URL. In this example the descriptor file was saved at /tmp/metrics.desc.

  1. protoc -o /tmp/metrics.desc metrics.proto

Supervisor spec JSON

Below is the complete Supervisor spec JSON to be submitted to the Overlord. Please make sure these keys are properly configured for successful ingestion.

  • descriptor for the descriptor file URL.
  • protoMessageType from the proto definition.
  • parseSpec format must be json.
  • topic to subscribe. The topic is “metrics_pb” instead of “metrics”.
  • bootstrap.server is the Kafka broker host.
  1. {
  2. "type": "kafka",
  3. "dataSchema": {
  4. "dataSource": "metrics-kafka2",
  5. "parser": {
  6. "type": "protobuf",
  7. "descriptor": "file:///tmp/metrics.desc",
  8. "protoMessageType": "Metrics",
  9. "parseSpec": {
  10. "format": "json",
  11. "timestampSpec": {
  12. "column": "timestamp",
  13. "format": "auto"
  14. },
  15. "dimensionsSpec": {
  16. "dimensions": [
  17. "unit",
  18. "http_method",
  19. "http_code",
  20. "page",
  21. "metricType",
  22. "server"
  23. ],
  24. "dimensionExclusions": [
  25. "timestamp",
  26. "value"
  27. ]
  28. }
  29. }
  30. },
  31. "metricsSpec": [
  32. {
  33. "name": "count",
  34. "type": "count"
  35. },
  36. {
  37. "name": "value_sum",
  38. "fieldName": "value",
  39. "type": "doubleSum"
  40. },
  41. {
  42. "name": "value_min",
  43. "fieldName": "value",
  44. "type": "doubleMin"
  45. },
  46. {
  47. "name": "value_max",
  48. "fieldName": "value",
  49. "type": "doubleMax"
  50. }
  51. ],
  52. "granularitySpec": {
  53. "type": "uniform",
  54. "segmentGranularity": "HOUR",
  55. "queryGranularity": "NONE"
  56. }
  57. },
  58. "tuningConfig": {
  59. "type": "kafka",
  60. "maxRowsPerSegment": 5000000
  61. },
  62. "ioConfig": {
  63. "topic": "metrics_pb",
  64. "consumerProperties": {
  65. "bootstrap.servers": "localhost:9092"
  66. },
  67. "taskCount": 1,
  68. "replicas": 1,
  69. "taskDuration": "PT1H"
  70. }
  71. }

Kafka Producer

Here is the sample script that publishes the metrics to Kafka in Protobuf format.

  1. Run protoc again with the Python binding option. This command generates metrics_pb2.py file.
  1. protoc -o metrics.desc metrics.proto --python_out=.
  1. Create Kafka producer script.

This script requires protobuf and kafka-python modules.

  1. #!/usr/bin/env python
  2. import sys
  3. import json
  4. from kafka import KafkaProducer
  5. from metrics_pb2 import Metrics
  6. producer = KafkaProducer(bootstrap_servers='localhost:9092')
  7. topic = 'metrics_pb'
  8. metrics = Metrics()
  9. for row in iter(sys.stdin):
  10. d = json.loads(row)
  11. for k, v in d.items():
  12. setattr(metrics, k, v)
  13. pb = metrics.SerializeToString()
  14. producer.send(topic, pb)
  1. run producer
  1. ./bin/generate-example-metrics | ./pb_publisher.py
  1. test
  1. kafka-console-consumer --zookeeper localhost --topic metrics_pb

It should print messages like this

  1. millisecondsGETR"2017-04-06T03:23:56Z*2002/list:request/latencyBwww1.example.com