title: Binlog Consumer Client 用户文档

Binlog Consumer Client 用户文档

目前 Drainer 提供了多种输出方式,包括 MySQL、TiDB、file 等。但是用户往往有一些自定义的需求,比如输出到 Elasticsearch、Hive 等,这些需求 Drainer 现在还没有实现,因此 Drainer 增加了输出到 Kafka 的功能,将 binlog 数据解析后按一定的格式再输出到 Kafka 中,用户编写代码从 Kafka 中读出数据再进行处理。

配置 Kafka Drainer

修改 Drainer 的配置文件,设置输出为 Kafka,相关配置如下:

  1. [syncer]
  2. db-type = "kafka"
  3. [syncer.to]
  4. # Kafka 地址
  5. kafka-addrs = "127.0.0.1:9092"
  6. # Kafka 版本号
  7. kafka-version = "0.8.2.0"

自定义开发

数据格式

首先需要了解 Drainer 写入到 Kafka 中的数据格式:

  1. // Column 保存列的数据,针对数据的类型,保存在对应的变量中
  2. message Column {
  3. // 数据是否为 null
  4. optional bool is_null = 1 [ default = false ];
  5. // 保存 int 类型的数据
  6. optional int64 int64_value = 2;
  7. // 保存 uint、enum, set 类型的数据
  8. optional uint64 uint64_value = 3;
  9. // 保存 float、double 类型的数据
  10. optional double double_value = 4;
  11. // 保存 bit、blob、binary、json 类型的数据
  12. optional bytes bytes_value = 5;
  13. // 保存 date、time、decimal、text、char 类型的数据
  14. optional string string_value = 6;
  15. }
  16. // ColumnInfo 保存列的信息,包括列名、类型、是否为主键
  17. message ColumnInfo {
  18. optional string name = 1 [ (gogoproto.nullable) = false ];
  19. // MySQL 中小写的列字段类型
  20. // https://dev.mysql.com/doc/refman/8.0/en/data-types.html
  21. // numeric 类型:int bigint smallint tinyint float double decimal bit
  22. // string 类型:text longtext mediumtext char tinytext varchar
  23. // blob longblob mediumblob binary tinyblob varbinary
  24. // enum set
  25. // json 类型:json
  26. optional string mysql_type = 2 [ (gogoproto.nullable) = false ];
  27. optional bool is_primary_key = 3 [ (gogoproto.nullable) = false ];
  28. }
  29. // Row 保存一行的具体数据
  30. message Row { repeated Column columns = 1; }
  31. // MutationType 表示 DML 的类型
  32. enum MutationType {
  33. Insert = 0;
  34. Update = 1;
  35. Delete = 2;
  36. }
  37. // Table 包含一个表的数据变更
  38. message Table {
  39. optional string schema_name = 1;
  40. optional string table_name = 2;
  41. repeated ColumnInfo column_info = 3;
  42. repeated TableMutation mutations = 4;
  43. }
  44. // TableMutation 保存一行数据的变更
  45. message TableMutation {
  46. required MutationType type = 1;
  47. // 修改后的数据
  48. required Row row = 2;
  49. // 修改前的数据,只对 Update MutationType 有效
  50. optional Row change_row = 3;
  51. }
  52. // DMLData 保存一个事务所有的 DML 造成的数据变更
  53. message DMLData {
  54. // `tables` 包含事务中所有表的数据变更
  55. repeated Table tables = 1;
  56. }
  57. // DDLData 保存 DDL 的信息
  58. message DDLData {
  59. // 当前使用的数据库
  60. optional string schema_name = 1;
  61. // 相关表
  62. optional string table_name = 2;
  63. // `ddl_query` 是原始的 DDL 语句 query
  64. optional bytes ddl_query = 3;
  65. }
  66. // BinlogType 为 Binlog 的类型,分为 DML 和 DDL
  67. enum BinlogType {
  68. DML = 0; // Has `dml_data`
  69. DDL = 1; // Has `ddl_query`
  70. }
  71. // Binlog 保存一个事务所有的变更,Kafka 中保存的数据为该结构数据序列化后的结果
  72. message Binlog {
  73. optional BinlogType type = 1 [ (gogoproto.nullable) = false ];
  74. optional int64 commit_ts = 2 [ (gogoproto.nullable) = false ];
  75. optional DMLData dml_data = 3;
  76. optional DDLData ddl_data = 4;
  77. }

查看数据格式的具体定义,参见 binlog.proto

Driver

TiDB-Tools 项目提供了用于读取 Kafka 中 binlog 数据的 Driver,具有如下功能:

  • 读取 Kafka 的数据
  • 根据 commit ts 查找 binlog 在 kafka 中的储存位置

使用该 Driver 时,用户需要配置如下信息:

  • KafkaAddr:Kafka 集群的地址
  • CommitTS:从哪个 commit ts 开始读取 binlog
  • Offset:从 Kafka 哪个 offset 开始读取,如果设置了 CommitTS 就不用配置该参数
  • ClusterID:TiDB 集群的 cluster ID
  • Topic: Kafka Topic 名称,如果 Topic 名称为空,将会使用 drainer <ClusterID>_obinlog 中的默认名称

用户以包的形式引用 Driver 的代码即可使用,可以参考 Driver 中提供的示例代码来学习如何使用 Driver 以及 binlog 数据的解析,目前提供了两个例子:

  • 使用该 Driver 将数据同步到 MySQL,该示例包含将 binlog 转化为 SQL 的具体方法
  • 使用该 Driver 将数据打印出来

Driver 项目地址:Binlog Slave Driver

注意:

  • 示例代码仅仅用于示范如何使用 Driver,如果需要用于生产环境需要优化代码。
  • 目前仅提供了 golang 版本的 Driver 以及示例代码。如果需要使用其他语言,用户需要根据 binlog 的 proto 文件生成相应语言的代码文件,并自行开发程序读取 Kafka 中的 binlog 数据、解析数据、输出到下游。也欢迎用户优化 example 代码,以及提交其他语言的示例代码到 TiDB-Tools