Storage sink 消费程序设计

本文介绍如何设计和实现一个 TiDB 数据变更的消费程序。

Storage sink 消费程序编写指引 - 图1

注意

当前 Storage sink 无法处理 DROP DATABASE DDL,请你尽量避免执行该语句。如果需要执行 DROP DATABASE DDL,请在下游 MySQL 手动执行。

TiCDC 不提供消费存储服务的数据的标准实现。本文介绍一个基于 Golang 的消费示例程序,该示例程序能够读取存储服务中的数据并写入到兼容 MySQL 的下游数据库。你可以参考本文提供的数据格式和以下示例代码实现消费端。

Golang 示例代码

Consumer 设计

下图是 Consumer 的整体消费流程:

TiCDC storage consumer overview

以下是 Consumer 消费流程中的组件和功能定义,及其功能注释:

  1. type StorageReader struct {
  2. }
  3. // Read the files from storage
  4. // Add newly added files and delete files that not exist in storage
  5. func (c *StorageReader) ReadFiles() {}
  6. // Query newly added files and the latest checkpoint from storage,one file can only be returned once
  7. func (c *StorageReader) ExposeNewFiles() (int64, []string) {}
  8. // ConsumerManager is responsible for assigning tasks to TableConsumer.
  9. // Different consumers can consume data concurrently,
  10. // but data of one table must be processed by the same TableConsumer.
  11. type ConsumerManager struct {
  12. // StorageCheckpoint is recorded in metadata file and it can be fetched by calling `StorageReader.ExposeNewFiles()`.
  13. // It indicates that the data whose transaction commit time is less than this checkpoint has been stored in storage
  14. StorageCheckpoint int64
  15. // it indicates where the consumer has consumed
  16. // ConsumerManager periodically collects TableConsumer.Checkpoint,
  17. // then Checkpoint is updated to the minimum value of all TableConsumer.Checkpoint
  18. Checkpoint int64
  19. tableFiles[schema][table]*TableConsumer
  20. }
  21. // Query newly files from StorageReader
  22. // For new created table, create a TableConsumer for the new table
  23. // If any, send new files to the corresponding TableConsumer
  24. func (c *ConsumerManager) Dispatch() {}
  25. type TableConsumer struct {
  26. // it indicates where this TableConsumer has consumed
  27. // Its initial value is ConsumerManager.Checkpoint
  28. // TableConsumer.Checkpoint is equal to TableVersionConsumer.Checkpoint
  29. Checkpoint int64
  30. schema,table string
  31. // Must be consumed sequentially according to the table version order
  32. verConsumers map[version int64]*TableVersionConsumer
  33. currentVer, previousVer int64
  34. }
  35. // Send newly files to the corresponding TableVersionConsumer
  36. // For any DDL, assign a TableVersionConsumer for the new table version
  37. func (tc *TableConsumer) Dispatch() {}
  38. // If DDL query is empty or its tableVersion is less than TableConsumer.Checkpoint,
  39. // - ignore this DDL, and consume the data under the table version
  40. // Otherwise,
  41. // - execute DDL first, and then consume the data under the table version
  42. // - But for dropped table, self recycling after drop table DDL is executed
  43. func (tc *TableConsumer) ExecuteDDL() {}
  44. type TableVersionConsumer struct {
  45. // it indicates where the TableVersionConsumer has consumed
  46. // Its initial value is TableConsumer.Checkpoint
  47. Checkpoint int64
  48. schema,table,version string
  49. // For same table version, data in different partitions can be consumed concurrently
  50. # partitionNum int64
  51. // Must be consumed sequentially according to the data file number
  52. fileSet map[filename string]*TableVersionConsumer
  53. currentVersion
  54. }
  55. // If data commit ts is less than TableConsumer.Checkpoint
  56. // or bigger than ConsumerManager.StorageCheckpoint,
  57. // - ignore this data
  58. // Otherwise,
  59. // - process this data and write it to MySQL
  60. func (tc *TableVersionConsumer) ExecuteDML() {}

DDL 事件的处理

举例来说,第一次遍历目录时目录内容如下:

  1. ├── metadata
  2. └── test
  3. ├── tbl_1
  4. └── 437752935075545091
  5. ├── CDC000001.json
  6. └── schema.json

此时,首先需要解析 schema.json 文件中的表结构信息,从中获取 DDL Query 语句,并将其分为两种情况处理:

  • 如果 DDL Query 为空或者 TableVersion 小于 consumer checkpoint 则跳过该语句。
  • 否则,在下游 MySQL 数据库中执行获取到的 DDL 语句。

接着再开始同步数据文件 CDC000001.json

例如,以下的 test/tbl_1/437752935075545091/schema.json 文件中 DDL Query 不为空:

  1. {
  2. "Table":"test",
  3. "Schema":"tbl_1",
  4. "Version": 1,
  5. "TableVersion":437752935075545091,
  6. "Query": "create table tbl_1 (Id int primary key, LastName char(20), FirstName varchar(30), HireDate datetime, OfficeLocation Blob(20))",
  7. "TableColumns":[
  8. {
  9. "ColumnName":"Id",
  10. "ColumnType":"INT",
  11. "ColumnNullable":"false",
  12. "ColumnIsPk":"true"
  13. },
  14. {
  15. "ColumnName":"LastName",
  16. "ColumnType":"CHAR",
  17. "ColumnLength":"20"
  18. },
  19. {
  20. "ColumnName":"FirstName",
  21. "ColumnType":"VARCHAR",
  22. "ColumnLength":"30"
  23. },
  24. {
  25. "ColumnName":"HireDate",
  26. "ColumnType":"DATETIME"
  27. },
  28. {
  29. "ColumnName":"OfficeLocation",
  30. "ColumnType":"BLOB",
  31. "ColumnLength":"20"
  32. }
  33. ],
  34. "TableColumnsTotal":"5"
  35. }

当程序再次遍历目录,发现该表新增了一个版本目录。程序先消费完 test/tbl_1/437752935075545091 目录下的所有文件,然后再消费新目录下的数据。

  1. ├── metadata
  2. └── test
  3. ├── tbl_1
  4. ├── 437752935075545091
  5. ├── CDC000001.json
  6. └── schema.json
  7. └── 437752935075546092
  8. └── CDC000001.json
  9. └── schema.json

消费逻辑跟上述一致,先解析 schema.json 文件中的表结构信息,从中获取 DDL Query 语句并按不同情况处理,然后同步数据文件 CDC000001.json

DML 事件的处理

在处理完 DDL 事件后,可以在 {schema}/{table}/{table-version-separator}/ 目录下,根据具体的文件格式(CSV 或 Canal-JSON)并按照文件序号依次处理 DML 事件。

因为 TiCDC 提供 At Least Once 语义,可能出现重复发送数据的情况,所以需要在消费程序中比较数据事件的 commit ts 和 consumer checkpoint,并在 commit ts 小于 consumer checkpoint 的情况下进行去重处理。