Confluent Avro Format

Format: Serialization Schema Format: Deserialization Schema

Avro Schema Registry (avro-confluent) 格式能让你读取被 io.confluent.kafka.serializers.KafkaAvroSerializer序列化的记录,以及可以写入成能被 io.confluent.kafka.serializers.KafkaAvroDeserializer反序列化的记录。

当以这种格式读取(反序列化)记录时,将根据记录中编码的 schema 版本 id 从配置的 Confluent Schema Registry 中获取 Avro writer schema ,而从 table schema 中推断出 reader schema。

当以这种格式写入(序列化)记录时,Avro schema 是从 table schema 中推断出来的,并会用来检索要与数据一起编码的 schema id。我们会在配置的 Confluent Schema Registry 中配置的 subject 下,检索 schema id。subject 通过 avro-confluent.schema-registry.subject 参数来制定。

Avro Schema Registry 格式只能与Apache Kafka SQL连接器结合使用。


In order to use the Avro Schema Registry format the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

Maven dependencySQL Client JAR

如何创建使用 Avro-Confluent 格式的表

以下是一个使用 Kafka 连接器和 Confluent Avro 格式创建表的示例。

  1. CREATE TABLE user_behavior (
  2. user_id BIGINT,
  3. item_id BIGINT,
  4. category_id BIGINT,
  5. behavior STRING,
  6. ts TIMESTAMP(3)
  7. ) WITH (
  8. 'connector' = 'kafka',
  9. 'properties.bootstrap.servers' = 'localhost:9092',
  10. 'topic' = 'user_behavior'
  11. 'format' = 'avro-confluent',
  12. 'avro-confluent.schema-registry.url' = 'http://localhost:8081',
  13. 'avro-confluent.schema-registry.subject' = 'user_behavior'
  14. )

Format 参数

必选(none)String指定要使用的格式,这里应该是 ‘avro-confluent’.
必选(none)String用于获取/注册 schemas 的 Confluent Schema Registry 的URL
sink 必选(none)StringConfluent Schema Registry主题,用于在序列化期间注册此格式使用的 schema


目前 Apache Flink 都是从 table schema 去推断反序列化期间的 Avro reader schema 和序列化期间的 Avro writer schema。显式地定义 Avro schema 暂不支持。 Apache Avro Format中描述了 Flink 数据类型和 Avro 类型的对应关系。

除了此处列出的类型之外,Flink 还支持读取/写入可为空(nullable)的类型。 Flink 将可为空的类型映射到 Avro union(something, null), 其中 something 是从 Flink 类型转换的 Avro 类型。

您可以参考 Avro Specification 以获取有关 Avro 类型的更多信息。