TiCDC Canal-JSON Protocol

Canal-JSON 是由 Alibaba Canal 定义的一种数据交换格式协议。通过本文,你可以了解 TiCDC 对 Canal-JSON 数据格式的实现,包括 TiDB 扩展字段、Canal-JSON 数据格式定义,以及和官方实现进行对比等相关内容。

使用 Canal-JSON

当使用 MQ (Message Queue) 作为下游 Sink 时,你可以在 sink-uri 中指定使用 Canal-JSON,TiCDC 将以 Event 为基本单位封装构造 Canal-JSON Message,向下游发送 TiDB 的数据变更事件。

Event 分为三类:

  • DDL Event:代表 DDL 变更记录,在上游成功执行 DDL 语句后发出,DDL Event 会被发送到索引为 0 的 MQ Partition。
  • DML Event:代表一行数据变更记录,在行变更发生时该类 Event 被发出,包含变更后该行的相关信息。
  • WATERMARK Event:代表一个特殊的时间点,表示在这个时间点前收到的 Event 是完整的。仅适用于 TiDB 扩展字段,当你在 sink-uri 中设置 enable-tidb-extension=true 时生效。

使用 Canal-JSON 时的配置样例如下所示:

  1. cdc cli changefeed create --pd=http://127.0.0.1:2379 --changefeed-id="kafka-canal-json" --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&protocol=canal-json"

TiDB 扩展字段

Canal-JSON 协议本是为 MySQL 设计的,其中并不包含 TiDB 专有的 CommitTS 事务唯一标识等重要字段。为了解决这个问题,TiCDC 在 Canal-JSON 协议格式中附加了 TiDB 扩展字段。在 sink-uri 中设置 enable-tidb-extensiontrue(默认为 false)后,TiCDC 生成 Canal-JSON 消息时的行为如下:

  • TiCDC 发送的 DML Event 和 DDL Event 类型消息中,将会含有一个名为 _tidb 的字段。
  • TiCDC 将会发送 WATERMARK Event 消息。·

配置样例如下所示:

  1. cdc cli changefeed create --pd=http://127.0.0.1:2379 --changefeed-id="kafka-canal-json-enable-tidb-extension" --sink-uri="kafka://127.0.0.1:9092/topic-name?kafka-version=2.4.0&protocol=canal-json&enable-tidb-extension=true"

Message 格式定义

下面介绍 DDL Event、DML Event 和 WATERMARK Event 的格式定义,以及消费端的数据解析。

DDL Event

TiCDC 会把一个 DDL Event 编码成如下 Canal-JSON 格式:

  1. {
  2. "id": 0,
  3. "database": "test",
  4. "table": "",
  5. "pkNames": null,
  6. "isDdl": true,
  7. "type": "QUERY",
  8. "es": 1639633094670,
  9. "ts": 1639633095489,
  10. "sql": "drop database if exists test",
  11. "sqlType": null,
  12. "mysqlType": null,
  13. "data": null,
  14. "old": null,
  15. "_tidb": { // TiDB 的扩展字段
  16. "commitTs": 163963309467037594
  17. }
  18. }

以上 JSON 数据的字段解释如下:

字段 类型 说明
id Number TiCDC 默认值为 0
database String Row 所在的 Database 的名字
table String Row 所在的 Table 的名字
pkNames Array 组成 primary key 的所有列的名字
isDdl Bool 该条消息是否为 DDL 事件
type String Canal-JSON 定义的事件类型
es Number 产生该条消息的事件发生时的 13 位(毫秒级)时间戳
ts Number TiCDC 生成该条消息时的 13 位(毫秒级)时间戳
sql String 当 isDdl 为 true 时,记录对应的 DDL 语句
sqlType Object 当 isDdl 为 false 时,记录每一列数据类型在 Java 中的类型表示
mysqlType object 当 isDdl 为 false 时,记录每一列数据类型在 MySQL 中的类型表示
data Object 当 isDdl 为 false 时,记录每一列的名字及其数据值
old Object 仅当该条消息由 Update 类型事件产生时,记录每一列的名字,和 Update 之前的数据值
_tidb Object TiDB 扩展字段,仅当 enable-tidb-extension 为 true 时才会存在。其中的 commitTs 值为造成 Row 变更的事务的 TSO

DML Event

对于一行 DML 数据变更事件,TiCDC 会将其编码成如下形式:

  1. {
  2. "id": 0,
  3. "database": "test",
  4. "table": "tp_int",
  5. "pkNames": [
  6. "id"
  7. ],
  8. "isDdl": false,
  9. "type": "INSERT",
  10. "es": 1639633141221,
  11. "ts": 1639633142960,
  12. "sql": "",
  13. "sqlType": {
  14. "c_bigint": -5,
  15. "c_int": 4,
  16. "c_mediumint": 4,
  17. "c_smallint": 5,
  18. "c_tinyint": -6,
  19. "id": 4
  20. },
  21. "mysqlType": {
  22. "c_bigint": "bigint",
  23. "c_int": "int",
  24. "c_mediumint": "mediumint",
  25. "c_smallint": "smallint",
  26. "c_tinyint": "tinyint",
  27. "id": "int"
  28. },
  29. "data": [
  30. {
  31. "c_bigint": "9223372036854775807",
  32. "c_int": "2147483647",
  33. "c_mediumint": "8388607",
  34. "c_smallint": "32767",
  35. "c_tinyint": "127",
  36. "id": "2"
  37. }
  38. ],
  39. "old": null,
  40. "_tidb": { // TiDB 的扩展字段
  41. "commitTs": 163963314122145239
  42. }
  43. }

WATERMARK Event

仅当 enable-tidb-extensiontrue 时,TiCDC 才会发送 WATERMARK Event,其 type 字段值为 TIDB_WATERMARK。该类型事件具有 _tidb 字段,当前只含有 watermarkTs,其值为该 Event 发送时的 TSO。

当你收到一个该类型的事件,所有 commitTs 小于 watermarkTs 的事件均已发送完毕。因为 TiCDC 提供 At Least Once 语义,可能出现重复发送数据的情况。如果后续收到有 commitTs 小于 watermarkTs 的事件,可以忽略。

WATERMARK Event 的示例如下:

  1. {
  2. "id": 0,
  3. "database": "",
  4. "table": "",
  5. "pkNames": null,
  6. "isDdl": false,
  7. "type": "TIDB_WATERMARK",
  8. "es": 1640007049196,
  9. "ts": 1640007050284,
  10. "sql": "",
  11. "sqlType": null,
  12. "mysqlType": null,
  13. "data": null,
  14. "old": null,
  15. "_tidb": { // TiDB 的扩展字段
  16. "watermarkTs": 429918007904436226
  17. }
  18. }

消费端数据解析

从上面的示例中可知,Canal-JSON 具有统一的数据格式,针对不同的事件类型,有不同的字段填充规则。消费者可以使用统一的方法对该 JSON 格式的数据进行解析,然后通过判断字段值的方式,来确定具体事件类型:

  • isDdl 为 true 时,该消息含有一条 DDL Event。
  • isDdl 为 false 时,需要对 type 字段加以判断。如果 typeTIDB_WATERMARK,可得知其为 WATERMARK Event,否则就是 DML Event。

字段说明

Canal-JSON 格式会在 mysqlType 字段和 sqlType 字段中记录对应的数据类型。

MySQL Type 字段

Canal-JSON 格式会在 mysqlType 字段中记录每一列的 MySQL Type 的字符串表示。相关详情可以参考 TiDB Data Types

SQL Type 字段

Canal-JSON 格式会在 sqlType 字段中记录每一列的 Java SQL Type,即每条数据在 JDBC 中对应的数据类型,其值可以通过 MySQL Type 和具体数据值计算得到。具体对应关系如下:

MySQL Type Java SQL Type Code
Boolean -6
Float 7
Double 8
Decimal 3
Char 1
Varchar 12
Binary 2004
Varbinary 2004
Tinytext 2005
Text 2005
Mediumtext 2005
Longtext 2005
Tinyblob 2004
Blob 2004
Mediumblob 2004
Longblob 2004
Date 91
Datetime 93
Timestamp 93
Time 92
Year 12
Enum 4
Set -7
Bit -7
JSON 12

整数类型

你需要考虑整数类型是否有 Unsigned 约束,以及当前取值大小,分别对应不同的 Java SQL Type Code。如下表所示。

MySQL Type String Value Range Java SQL Type Code
tinyint [-128, 127] -6
tinyint unsigned [0, 127] -6
tinyint unsigned [128, 255] 5
smallint [-32768, 32767] 5
smallint unsigned [0, 32767] 5
smallint unsigned [32768, 65535] 4
mediumint [-8388608, 8388607] 4
mediumint unsigned [0, 8388607] 4
mediumint unsigned [8388608, 16777215] 4
int [-2147483648, 2147483647] 4
int unsigned [0, 2147483647] 4
int unsigned [2147483648, 4294967295] -5
bigint [-9223372036854775808, 9223372036854775807] -5
bigint unsigned [0, 9223372036854775807] -5
bigint unsigned [9223372036854775808, 18446744073709551615] 3

TiCDC 涉及的 Java SQL Type 及其 Code 映射关系如下表所示。

Java SQL Type Java SQL Type Code
CHAR 1
DECIMAL 3
INTEGER 4
SMALLINT 5
REAL 7
DOUBLE 8
VARCHAR 12
DATE 91
TIME 92
TIMESTAMP 93
BLOB 2004
CLOB 2005
BIGINT -5
TINYINT -6
Bit -7

想要了解 Java SQL Type 的更多信息,请参考 Java SQL Class Types

TiCDC Canal-JSON 和 Canal 官方实现对比

TiCDC 对 Canal-JSON 数据格式的实现,包括 Update 类型事件和 mysqlType 字段,和官方有些许不同。主要差异见下表。

差异点 TiCDC Canal
Update 类型事件 old 字段包含所有列数据 old 字段仅包含被修改的列数据
mysqlType 字段 对于含有参数的类型,没有类型参数信息 对于含有参数的类型,会包含完整的参数信息

Update 类型事件

对于 Update 类型事件,Canal 官方实现中,old 字段仅包含被修改的列数据,而 TiCDC 的实现则包含所有列数据。

假设在上游 TiDB 按顺序执行如下 SQL 语句:

  1. create table tp_int
  2. (
  3. id int auto_increment,
  4. c_tinyint tinyint null,
  5. c_smallint smallint null,
  6. c_mediumint mediumint null,
  7. c_int int null,
  8. c_bigint bigint null,
  9. constraint pk
  10. primary key (id)
  11. );
  12. insert into tp_int(c_tinyint, c_smallint, c_mediumint, c_int, c_bigint)
  13. values (127, 32767, 8388607, 2147483647, 9223372036854775807);
  14. update tp_int set c_int = 0, c_tinyint = 0 where c_smallint = 32767;

对于 update 语句,TiCDC 将会输出一条 typeUPDATE 的事件消息,如下所示。该 update 语句仅对 c_intc_tinyint 两列进行了修改。输出事件消息的 old 字段,则包含所有列数据。

  1. {
  2. "id": 0,
  3. ...
  4. "type": "UPDATE",
  5. ...
  6. "sqlType": {
  7. ...
  8. },
  9. "mysqlType": {
  10. ...
  11. },
  12. "data": [
  13. {
  14. "c_bigint": "9223372036854775807",
  15. "c_int": "0",
  16. "c_mediumint": "8388607",
  17. "c_smallint": "32767",
  18. "c_tinyint": "0",
  19. "id": "2"
  20. }
  21. ],
  22. "old": [ // TiCDC 输出事件消息的 `old` 字段,则包含所有列数据。
  23. {
  24. "c_bigint": "9223372036854775807",
  25. "c_int": "2147483647", // 修改的列
  26. "c_mediumint": "8388607",
  27. "c_smallint": "32767",
  28. "c_tinyint": "127", // 修改的列
  29. "id": "2"
  30. }
  31. ]
  32. }

官方 Canal 输出事件消息的 old 字段仅包含被修改的列数据。示例如下。

  1. {
  2. "id": 0,
  3. ...
  4. "type": "UPDATE",
  5. ...
  6. "sqlType": {
  7. ...
  8. },
  9. "mysqlType": {
  10. ...
  11. },
  12. "data": [
  13. {
  14. "c_bigint": "9223372036854775807",
  15. "c_int": "0",
  16. "c_mediumint": "8388607",
  17. "c_smallint": "32767",
  18. "c_tinyint": "0",
  19. "id": "2"
  20. }
  21. ],
  22. "old": [ // Canal 输出事件消息的 `old` 字段,仅包含被修改的列的数据。
  23. {
  24. "c_int": "2147483647", // 修改的列
  25. "c_tinyint": "127", // 修改的列
  26. }
  27. ]
  28. }

mysqlType 字段

对于 mysqlType 字段,Canal 官方实现中,对于含有参数的类型,会包含完整的参数信息,TiCDC 实现则没有类型参数信息。

在下面示例的表定义 SQL 语句中,如 decimal / char / varchar / enum 等类型,都含有参数。对比 TiCDC 和 Canal 官方实现分别生成的 Canal-JSON 格式数据可知,在 mysqlType 字段中的数据,TiCDC 实现只包含基本 MySQL Type。如果业务需要类型参数信息,需要你自行通过其他方式实现。

假设在上游数据库按顺序执行如下 SQL 语句:

  1. create table t (
  2. id int auto_increment,
  3. c_decimal decimal(10, 4) null,
  4. c_char char(16) null,
  5. c_varchar varchar(16) null,
  6. c_binary binary(16) null,
  7. c_varbinary varbinary(16) null,
  8. c_enum enum('a','b','c') null,
  9. c_set set('a','b','c') null,
  10. c_bit bit(64) null,
  11. constraint pk
  12. primary key (id)
  13. );
  14. insert into t (c_decimal, c_char, c_varchar, c_binary, c_varbinary, c_enum, c_set, c_bit)
  15. values (123.456, "abc", "abc", "abc", "abc", 'a', 'a,b', b'1000001');

TiCDC 输出内容如下:

  1. {
  2. "id": 0,
  3. ...
  4. "isDdl": false,
  5. "sqlType": {
  6. ...
  7. },
  8. "mysqlType": {
  9. "c_binary": "binary",
  10. "c_bit": "bit",
  11. "c_char": "char",
  12. "c_decimal": "decimal",
  13. "c_enum": "enum",
  14. "c_set": "set",
  15. "c_varbinary": "varbinary",
  16. "c_varchar": "varchar",
  17. "id": "int"
  18. },
  19. "data": [
  20. {
  21. ...
  22. }
  23. ],
  24. "old": null,
  25. }

Canal 官方实现输出内容如下:

  1. {
  2. "id": 0,
  3. ...
  4. "isDdl": false,
  5. "sqlType": {
  6. ...
  7. },
  8. "mysqlType": {
  9. "c_binary": "binary(16)",
  10. "c_bit": "bit(64)",
  11. "c_char": "char(16)",
  12. "c_decimal": "decimal(10, 4)",
  13. "c_enum": "enum('a','b','c')",
  14. "c_set": "set('a','b','c')",
  15. "c_varbinary": "varbinary(16)",
  16. "c_varchar": "varchar(16)",
  17. "id": "int"
  18. },
  19. "data": [
  20. {
  21. ...
  22. }
  23. ],
  24. "old": null,
  25. }

TiCDC Canal-JSON 改动说明

Delete 类型事件中 Old 字段的变化说明

TiCDC 实现的 Canal-JSON 格式,v5.4.0 及以后版本的实现,和之前的有些许不同,具体如下:

  • Delete 类型事件,Old 字段的内容发生了变化。

如下是一个 DELETE 事件的数据内容,在 v5.4.0 前的实现中,”old” 的内容和 “data” 相同,在 v5.4.0 及之后的实现中,”old” 将被设为 null。你可以通过 “data” 字段获取到被删除的数据。

  1. {
  2. "id": 0,
  3. "database": "test",
  4. ...
  5. "type": "DELETE",
  6. ...
  7. "sqlType": {
  8. ...
  9. },
  10. "mysqlType": {
  11. ...
  12. },
  13. "data": [
  14. {
  15. "c_bigint": "9223372036854775807",
  16. "c_int": "0",
  17. "c_mediumint": "8388607",
  18. "c_smallint": "32767",
  19. "c_tinyint": "0",
  20. "id": "2"
  21. }
  22. ],
  23. "old": null,
  24. // 以下示例是 v5.4.0 之前的实现,`old` 内容等同于 `data` 内容
  25. "old": [
  26. {
  27. "c_bigint": "9223372036854775807",
  28. "c_int": "0",
  29. "c_mediumint": "8388607",
  30. "c_smallint": "32767",
  31. "c_tinyint": "0",
  32. "id": "2"
  33. }
  34. ]
  35. }