Canal Format

Changelog-Data-Capture Format Format: Deserialization Schema

Canal 是一个 CDC(ChangeLog Data Capture,变更日志数据捕获)工具,可以实时地将 MySQL 变更传输到其他系统。Canal 为变更日志提供了统一的数据格式,并支持使用 JSON 或 protobuf 序列化消息(Canal 默认使用 protobuf)。

Flink 支持将 Canal 的 JSON 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如

  • 将增量数据从数据库同步到其他系统
  • 日志审计
  • 数据库的实时物化视图
  • 关联维度数据库的变更历史,等等。

注意:未来会支持 Canal protobuf 类型消息的解析以及输出 Canal 格式的消息。

依赖

为了设置 Canal 格式,下表提供了使用自动化构建工具(例如:Maven 或 SBT)项目和使用绑定 SQL JAR 包的 SQL Client 所需的依赖信息。

Maven 依赖SQL Client JAR
flink-json内置

注意:有关如何部署 Canal 以将变更日志同步到消息队列,请参阅 Canal 文档

如何使用 Canal Format

Canal 为变更日志提供了统一的格式,下面是一个从 MySQL 库 products 表中捕获更新操作的简单示例:

  1. {
  2. "data": [
  3. {
  4. "id": "111",
  5. "name": "scooter",
  6. "description": "Big 2-wheel scooter",
  7. "weight": "5.18"
  8. }
  9. ],
  10. "database": "inventory",
  11. "es": 1589373560000,
  12. "id": 9,
  13. "isDdl": false,
  14. "mysqlType": {
  15. "id": "INTEGER",
  16. "name": "VARCHAR(255)",
  17. "description": "VARCHAR(512)",
  18. "weight": "FLOAT"
  19. },
  20. "old": [
  21. {
  22. "weight": "5.15"
  23. }
  24. ],
  25. "pkNames": [
  26. "id"
  27. ],
  28. "sql": "",
  29. "sqlType": {
  30. "id": 4,
  31. "name": 12,
  32. "description": 12,
  33. "weight": 7
  34. },
  35. "table": "products",
  36. "ts": 1589373560798,
  37. "type": "UPDATE"
  38. }

注意:有关各个字段的含义,请参阅 Canal 文档

MySQL products 表有4列(idnamedescriptionweight)。上面的 JSON 消息是 products 表上的一个更新事件,表示 id = 111 的行数据上 weight 字段值从5.15变更成为 5.18。假设消息已经同步到了一个 Kafka 主题:products_binlog,那么就可以使用以下DDL来从这个主题消费消息并解析变更事件。

  1. CREATE TABLE topic_products (
  2. -- 元数据与 MySQL "products" 表完全相同
  3. id BIGINT,
  4. name STRING,
  5. description STRING,
  6. weight DECIMAL(10, 2)
  7. ) WITH (
  8. 'connector' = 'kafka',
  9. 'topic' = 'products_binlog',
  10. 'properties.bootstrap.servers' = 'localhost:9092',
  11. 'properties.group.id' = 'testGroup',
  12. 'format' = 'canal-json' -- 使用 canal-json 格式
  13. )

将 Kafka 主题注册成 Flink 表之后,就可以将 Canal 消息用作变更日志源。

  1. -- 关于MySQL "products" 表的实时物化视图
  2. -- 计算相同产品的最新平均重量
  3. SELECT name, AVG(weight) FROM topic_products GROUP BY name;
  4. -- MySQL "products" 表的所有数据和增量更改同步到
  5. -- Elasticsearch "products" 索引以供将来搜索
  6. INSERT INTO elasticsearch_products
  7. SELECT * FROM topic_products;

Format 参数

选项要求默认类型描述
format
必填(none)String指定要使用的格式,此处应为 ‘canal-json’.
canal-json.ignore-parse-errors
选填falseBoolean当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
canal-json.timestamp-format.standard
选填‘SQL’String指定输入和输出时间戳格式。 当前支持的值是 ‘SQL’‘ISO-8601’:
  • 选项 ‘SQL’ 将解析 “yyyy-MM-dd HH:mm:ss.s{precision}” 格式的输入时间戳,例如 ‘2020-12-30 12:13:14.123’,并以相同格式输出时间戳。
  • 选项 ‘ISO-8601’ 将解析 “yyyy-MM-ddTHH:mm:ss.s{precision}” 格式的输入时间戳,例如 ‘2020-12-30T12:13:14.123’,并以相同的格式输出时间戳。

数据类型映射

目前,Canal Format 使用 JSON Format 进行反序列化。 有关数据类型映射的更多详细信息,请参阅 JSON Format 文档