Debezium Format

Changelog-Data-Capture Format Format: Deserialization Schema

Debezium 是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把来自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON 和 Apache Avro 序列化消息。

Flink 支持将 Debezium JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如

  • 将增量数据从数据库同步到其他系统
  • 日志审计
  • 数据库的实时物化视图
  • 关联维度数据库的变更历史,等等。

注意: 支持解析 Debezium Avro 消息和输出 Debezium 消息已经规划在路线图上了。

依赖

为了设置 Debezium Format,下表提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 SQL JAR 包的 SQL Client 的两个项目的依赖项信息。

Maven 依赖SQL Client JAR
flink-json内置

注意: 请参考 Debezium 文档,了解如何设置 Debezium Kafka Connect 用来将变更日志同步到 Kafka 主题。

如何使用 Debezium Format

Debezium 为变更日志提供了统一的格式,这是一个从 MySQL product 表捕获的更新操作的简单示例:

  1. {
  2. "before": {
  3. "id": 111,
  4. "name": "scooter",
  5. "description": "Big 2-wheel scooter",
  6. "weight": 5.18
  7. },
  8. "after": {
  9. "id": 111,
  10. "name": "scooter",
  11. "description": "Big 2-wheel scooter",
  12. "weight": 5.15
  13. },
  14. "source": {...},
  15. "op": "u",
  16. "ts_ms": 1589362330904,
  17. "transaction": null
  18. }

注意: 请参考 Debezium 文档,了解每个字段的含义。

MySQL 产品表有4列(idnamedescriptionweight)。上面的 JSON 消息是 products 表上的一条更新事件,其中 id = 111 的行的 weight 值从 5.18 更改为 5.15。假设此消息已同步到 Kafka 主题 products_binlog,则可以使用以下 DDL 来使用此主题并解析更改事件。

  1. CREATE TABLE topic_products (
  2. -- schema MySQL products 表完全相同
  3. id BIGINT,
  4. name STRING,
  5. description STRING,
  6. weight DECIMAL(10, 2)
  7. ) WITH (
  8. 'connector' = 'kafka',
  9. 'topic' = 'products_binlog',
  10. 'properties.bootstrap.servers' = 'localhost:9092',
  11. 'properties.group.id' = 'testGroup',
  12. 'format' = 'debezium-json' -- 使用 debezium-json 作为 format
  13. )

在某些情况下,用户在设置 Debezium Kafka Connect 时,可能会开启 Kafka 的配置 'value.converter.schemas.enable',用来在消息体中包含 schema 信息。然后,Debezium JSON 消息可能如下所示:

  1. {
  2. "schema": {...},
  3. "payload": {
  4. "before": {
  5. "id": 111,
  6. "name": "scooter",
  7. "description": "Big 2-wheel scooter",
  8. "weight": 5.18
  9. },
  10. "after": {
  11. "id": 111,
  12. "name": "scooter",
  13. "description": "Big 2-wheel scooter",
  14. "weight": 5.15
  15. },
  16. "source": {...},
  17. "op": "u",
  18. "ts_ms": 1589362330904,
  19. "transaction": null
  20. }
  21. }

为了解析这一类信息,你需要在上述 DDL WITH 子句中添加选项 'debezium-json.schema-include' = 'true'(默认为 false)。通常情况下,建议不要包含 schema 的描述,因为这样会使消息变得非常冗长,并降低解析性能。

在将主题注册为 Flink 表之后,可以将 Debezium 消息用作变更日志源。

  1. -- MySQL "products" 的实时物化视图
  2. -- 计算相同产品的最新平均重量
  3. SELECT name, AVG(weight) FROM topic_products GROUP BY name;
  4. -- MySQL "products" 表的所有数据和增量更改同步到
  5. -- Elasticsearch "products" 索引,供将来查找
  6. INSERT INTO elasticsearch_products
  7. SELECT * FROM topic_products;

Format 参数

参数是否必选默认值类型描述
format
必选(none)String指定要使用的格式,此处应为 ‘debezium-json’
debezium-json.schema-include
可选falseBoolean设置 Debezium Kafka Connect 时,用户可以启用 Kafka 配置 ‘value.converter.schemas.enable’ 以在消息中包含 schema。此选项表明 Debezium JSON 消息是否包含 schema。
debezium-json.ignore-parse-errors
可选falseBoolean当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
debezium-json.timestamp-format.standard
可选‘SQL’String声明输入和输出的时间戳格式。当前支持的格式为‘SQL’ 以及 ‘ISO-8601’
  • 可选参数 ‘SQL’ 将会以 “yyyy-MM-dd HH:mm:ss.s{precision}” 的格式解析时间戳, 例如 ‘2020-12-30 12:13:14.123’,且会以相同的格式输出。
  • 可选参数 ‘ISO-8601’ 将会以 “yyyy-MM-ddTHH:mm:ss.s{precision}” 的格式解析输入时间戳, 例如 ‘2020-12-30T12:13:14.123’ ,且会以相同的格式输出。

数据类型映射

目前,Debezium Format 使用 JSON Format 进行反序列化。有关数据类型映射的更多详细信息,请参考 JSON Format 文档