Avro Serialization

Technical Information

A system that wants to use Avro serialization needs to complete two steps:

  • Deploy a Schema Registry instance

  • Use these properties to configure Apache Connect instance

  1. key.converter=io.confluent.connect.avro.AvroConverter
  2. key.converter.schema.registry.url=http://localhost:8081
  3. value.converter=io.confluent.connect.avro.AvroConverter
  4. value.converter.schema.registry.url=http://localhost:8081

Note: In addition to setting key/value converters,it is strongly recommended to set internal key/value converters to use JSON converters for easier analysis of stored configuration and offsets. If you would still prefer to use Avro converter it is not possible now due to a known issue.

  1. internal.key.converter=org.apache.kafka.connect.json.JsonConverter
  2. internal.value.converter=org.apache.kafka.connect.json.JsonConverter

Debezium Docker Images

See the MySQL and the Avro message format tutorial example for a quickstart with MySQL.

Deploy a Schema Registry instance:

  1. docker run -it --rm --name schema-registry \
  2. --link zookeeper \
  3. -e SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181 \
  4. -e SCHEMA_REGISTRY_HOST_NAME=schema-registry \
  5. -e SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081 \
  6. -p 8181:8181 confluentinc/cp-schema-registry

Run a Kafka Connect image configured to use Avro:

  1. docker run -it --rm --name connect \
  2. --link zookeeper:zookeeper \
  3. --link kafka:kafka \
  4. --link mysql:mysql \
  5. --link schema-registry:schema-registry \
  6. -e GROUP_ID=1 \
  7. -e CONFIG_STORAGE_TOPIC=my_connect_configs \
  8. -e OFFSET_STORAGE_TOPIC=my_connect_offsets \
  9. -e KEY_CONVERTER=io.confluent.connect.avro.AvroConverter \
  10. -e VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter \
  11. -e INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
  12. -e INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter \
  13. -e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
  14. -e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081 \
  15. -p 8083:8083 debezium/connect:1.0

Run a console consumer which reads new Avro messages from the db.myschema.mytable topic and decodes to JSON:

  1. docker run -it --rm --name avro-consumer \
  2. --link zookeeper:zookeeper \
  3. --link kafka:kafka \
  4. --link mysql:mysql \
  5. --link schema-registry:schema-registry \
  6. debezium/connect:1.0 \
  7. /kafka/bin/kafka-console-consumer.sh \
  8. --bootstrap-server kafka:9092 \
  9. --property print.key=true \
  10. --formatter io.confluent.kafka.formatter.AvroMessageFormatter \
  11. --property schema.registry.url=http://schema-registry:8081 \
  12. --topic db.myschema.mytable

Naming

As stated in the Avro documentation, names must adhere to the following rules:

  • Start with [A-Za-z_]

  • Subsequently contain only [A-Za-z0-9_] characters

Debezium uses the column’s name as the basis for the Avro field. This can lead to problems during serialization if the column name does not also adhere to the Avro naming rules above. Debezium provides a configuration option, sanitize.field.names that can be set to true if you have columns that do not adhere to the rule-set above, allowing those fields to be serialized without having to actually modify your schema.

Getting More Information

This post from the Debezium blog describes the concepts of serializers, converters etc. and discusses the advantages of using Avro.

For first steps with Kafka Connect, there’s a helpful quickstart in Confluent’s documentation.