MySQL-CDC

概述

MySQL Extract 节点允许从 MySQL 数据库中读取快照数据和增量数据。本文档介绍如何设置 MySQL Extract 节点以对 MySQL 数据库运行 SQL 查询。

支持的版本

Extract 节点版本Driver
MySQL-CDCMySQL: 5.6, 5.7, 8.0.x
RDS MySQL: 5.6, 5.7, 8.0.x
PolarDB MySQL: 5.6, 5.7, 8.0.x
Aurora MySQL: 5.6, 5.7, 8.0.x
MariaDB: 10.x
PolarDB X: 2.0.1
JDBC Driver: 8.0.21

依赖

为了设置 MySQL Extract 节点,下表提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connectors JAR 包的 SQL 客户端的两个项目的依赖关系信息。

Maven 依赖

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-mysql-cdc</artifactId>
  4. <version>1.2.0-incubating</version>
  5. </dependency>

连接 MySQL 数据库还需要 MySQL 驱动程序依赖项。请下载mysql-connector-java-8.0.21.jar 并将其放入 FLINK_HOME/lib/

设置 MySQL 服务器

你必须定义一个对 Debezium MySQL 连接器监控的所有数据库具有适当权限的 MySQL 用户。

  • 创建 MySQL 用户:
  1. mysql> CREATE USER 'user'@'localhost' IDENTIFIED BY 'password';
  • 向用户授予所需的权限:
  1. mysql> GRANT SELECT, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

注意: 启用 scan.incremental.snapshot.enabled 时不再需要 RELOAD 权限(默认启用)。

  • 刷新用户的权限:
  1. mysql> FLUSH PRIVILEGES;

查看更多关于权限说明

注意事项

为每个 Reader 设置一个不同的 SERVER ID

每一个读取 Binlog 的 MySQL 数据库客户端都应该有一个唯一的 Id,称为 Server Id。 MySQL 服务器将使用此 Id 来维护网络连接和 Binlog 位置。因此,如果不同的作业共享相同的服务器 Id,可能会导致从错误的 Binlog 位置读取。 因此,建议通过 [SQL Hints](https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/hints. html), 例如假设源并行度为 4,那么我们可以使用 SELECT * FROM source_table /*+ OPTIONS('server-id'='5401-5404') */ ; 为 4 个 Source Reader 中的每一个分配唯一的服务器 Id。

设置 MySQL 会话超时

当为大型数据库制作初始一致快照时,您建立的连接可能会在读取表时超时。您可以通过在 MySQL 配置文件中配置 interactive_timeoutwait_timeout 来防止这种行为。

  • interactive_timeout:服务器在关闭交互式连接之前等待其活动的秒数。请参阅 MySQL 文档
  • wait_timeout:服务器在关闭非交互式连接之前等待其活动的秒数。请参阅 MySQL 文档

如何创建一个 MySQL Extract 节点

SQL API 用法

下面这个例子展示了如何用 Flink SQL 创建一个 MySQL Extract 节点:

  1. -- 设置 Checkpoint 3000 毫秒
  2. Flink SQL> SET 'execution.checkpointing.interval' = '3s';
  3. Flink SQL> CREATE TABLE mysql_extract_node (
  4. order_id INT,
  5. order_date TIMESTAMP(0),
  6. customer_name STRING,
  7. price DECIMAL(10, 5),
  8. product_id INT,
  9. order_status BOOLEAN,
  10. PRIMARY KEY(order_id) NOT ENFORCED
  11. ) WITH (
  12. 'connector' = 'mysql-cdc-inlong',
  13. 'hostname' = 'YourHostname',
  14. 'port' = '3306',
  15. 'username' = 'YourUsername',
  16. 'password' = 'YourPassword',
  17. 'database-name' = 'YourDatabaseName',
  18. 'table-name' = 'YourTableName');
  19. Flink SQL> SELECT * FROM mysql_extract_node;

InLong Dashboard 用法

  • Choose the BINLOG Data Source MySQL BINLOG

  • Configure the MySQL Source MySQL SOURCE

InLong Manager Client 用法

TODO: 将在未来支持此功能。

MySQL Extract 节点参数

参数是否必须默认值数据类型描述
connectorrequired(none)String指定要使用的连接器,这里应该是 ‘mysql-cdc-inlong’
hostnamerequired(none)StringMySQL 数据库服务器的 IP 地址或主机名。
usernamerequired(none)String连接到 MySQL 数据库服务器时要使用的 MySQL 用户名称。
passwordrequired(none)String连接到 MySQL 数据库服务器时使用的密码。
database-namerequired(none)String要监控的 MySQL 服务器的数据库名称。 database-name 还支持正则表达式来监控多个表是否匹配正则表达式。
table-namerequired(none)String要监控的 MySQL 数据库的表名。 table-name 还支持正则表达式来监控多个表是否匹配正则表达式。
portoptional3306IntegerMySQL 数据库服务器的整数端口号。
server-idoptional(none)Integer此数据库客户端的数字 Id 或数字 Id 范围,数字 Id 语法类似于 ‘5400’, 数字 Id 范围语法类似于“5400-5408”,启用“scan.incremental.snapshot.enabled”时建议使用数字 Id 范围语法。 在 MySQL 集群中所有当前运行的数据库进程中,每个 Id 都必须是唯一的。此连接器加入 MySQL 集群 作为另一台服务器(具有此唯一 Id),因此它可以读取 Binlog。默认情况下,会生成一个介于 5400 和 6400 之间的随机数, 尽管我们建议设置一个明确的值。
scan.incremental.snapshot.enabledoptionaltrueBoolean增量快照是一种读取表快照的新机制。与旧的快照机制相比, 增量快照有很多优点,包括: (1) 快照读取时 Source 可以并行, (2) Source 可以在快照读取时在 Chunk 粒度上进行检查点, (3) Source 在读快照前不需要获取全局读锁(FLUSH TABLES WITH READ LOCK)。 如果您希望源代码并行运行,每个并行阅读器都应该有一个唯一的服务器 ID,所以 ‘server-id’ 必须是 ‘5400-6400’ 这样的范围,并且范围必须大于并行度。 请参阅增量快照阅读部分了解更多详细信息。
scan.incremental.snapshot.chunk.sizeoptional8096Integer表快照的块大小(行数),读取表的快照时,表的快照被分成多个块。
scan.snapshot.fetch.sizeoptional1024Integer读取表快照时每次轮询的最大获取大小。
scan.startup.modeoptionalinitialStringMySQL CDC 消费者的可选启动模式,有效枚举为”initial” 和”latest-offset”。 请参阅启动阅读位置部分了解更多详细信息。
server-time-zoneoptionalUTCString数据库服务器中的会话时区,例如”Asia/Shanghai”。 它控制 MYSQL 中的 TIMESTAMP 类型如何转换为 STRING。 查看更多这里
debezium.min.row. count.to.stream.resultoptional1000Integer在快照操作期间,连接器将查询每个包含的表,以便为该表中的所有行生成读取事件。此参数确定 MySQL 连接是否会将表的所有结果拉入内存(速度很快但需要大量内存),或者是否将结果改为流式传输(可能较慢,但适用于非常大的表)。该值指定在连接器流式传输结果之前表必须包含的最小行数,默认为 1,000。将此参数设置为’0’以跳过所有表大小检查并始终在快照期间流式传输所有结果。
connect.timeoutoptional30sDuration连接器在尝试连接到 MySQL 数据库服务器后在超时之前应等待的最长时间。
connect.max-retriesoptional3Integer连接器应重试以建立 MySQL 数据库服务器连接的最大重试次数。
connection.pool.sizeoptional20Integer连接池大小。
jdbc.properties.optional20String传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,例如 ‘jdbc.properties.useSSL’ = ‘false’。
heartbeat.intervaloptional30sDuration发送心跳事件的时间间隔,用于跟踪最新可用的 Binlog 偏移量。
append-modeoptionalfalseBoolean是否仅支持 Append,如果为 ‘true’,MySQL Extract Node 会将所有 Upsert 流转换为 Append 流,以支持不支持 Upsert 流的下游场景。
migrate-alloptionalfalseBoolean是否是全库迁移场景,如果为 ‘true’,MySQL Extract Node 则将表的物理字段和其他元字段压缩成 ‘canal-json’ 格式的特殊元字段 ‘data’。
debezium.optional(none)String将 Debezium 的属性传递给用于从 MySQL 服务器捕获数据更改的 Debezium Embedded Engine。 例如:‘debezium.snapshot.mode’ = ‘never’。 详细了解 Debezium 的 MySQL 连接器属性。

可用的元数据字段

以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。

字段名称数据类型描述
meta.table_nameSTRING NOT NULL该行所属的表名。
meta.database_nameSTRING NOT NULL该行所属的数据库名称。
meta.op_tsTIMESTAMP_LTZ(3) NOT NULL它指示在数据库中进行更改的时间。
如果记录是从表的快照而不是binlog中读取的,则该值始终为0。
meta.op_typeSTRING数据库操作的类型,如 INSERT/DELETE 等。
meta.dataSTRINGcanal-json 格式化的行的数据只有在 migrate-all 选项为 ‘true’ 时才存在。
meta.is_ddlBOOLEAN是否是 DDL 语句。
meta.tsTIMESTAMP_LTZ(3) NOT NULL接收和处理行的当前时间。
meta.sql_typeMAP将 Sql_type 表字段映射到 Java 数据类型 Id。
meta.mysql_typeMAP表的结构。
meta.pk_namesARRAY表的主键名称。
meta.batch_idBIGINTBinlog的批次id。
meta.update_beforeARRAY该行更新前的数据。

扩展的 CREATE TABLE 示例演示了使用这些元数据字段的语法:

  1. CREATE TABLE `mysql_extract_node` (
  2. `id` INT,
  3. `name` STRING,
  4. `database_name` string METADATA FROM 'meta.database_name',
  5. `table_name` string METADATA FROM 'meta.table_name',
  6. `op_ts` timestamp(3) METADATA FROM 'meta.op_ts',
  7. `op_type` string METADATA FROM 'meta.op_type',
  8. `batch_id` bigint METADATA FROM 'meta.batch_id',
  9. `is_ddl` boolean METADATA FROM 'meta.is_ddl',
  10. `update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'meta.update_before',
  11. `mysql_type` MAP<STRING, STRING> METADATA FROM 'meta.mysql_type',
  12. `pk_names` ARRAY<STRING> METADATA FROM 'meta.pk_names',
  13. `data` STRING METADATA FROM 'meta.data',
  14. `sql_type` MAP<STRING, INT> METADATA FROM 'meta.sql_type',
  15. `ingestion_ts` TIMESTAMP(3) METADATA FROM 'meta.ts',
  16. PRIMARY KEY (`id`) NOT ENFORCED
  17. ) WITH (
  18. 'connector' = 'mysql-cdc-inlong',
  19. 'hostname' = 'YourHostname',
  20. 'migrate-all' = 'true',
  21. 'port' = '3306',
  22. 'username' = 'YourUsername',
  23. 'password' = 'YourPassword',
  24. 'database-name' = 'YourDatabase',
  25. 'table-name' = 'YourTable'
  26. );

数据类型映射

MySQL typeFlink SQL typeNOTE
TINYINTTINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT
MEDIUMINT
SMALLINT UNSIGNED
INT
BIGINT
INT UNSIGNED
BIGINT
BIGINT UNSIGNEDDECIMAL(20, 0)
REAL
FLOAT
FLOAT
DOUBLEDOUBLE
NUMERIC(p, s)
DECIMAL(p, s)
where p <= 38
DECIMAL(p, s)
NUMERIC(p, s)
DECIMAL(p, s)
where 38 < p <= 65
STRINGThe precision for DECIMAL data type is up to 65 in MySQL, but the precision for DECIMAL is limited to 38 in Flink. So if you define a decimal column whose precision is greater than 38, you should map it to STRING to avoid precision loss.
BOOLEAN
TINYINT(1)
BIT(1)
BOOLEAN
DATEDATE
TIME [(p)]TIME [(p)]
TIMESTAMP [(p)]
DATETIME [(p)]
TIMESTAMP [(p)]
CHAR(n)CHAR(n)
VARCHAR(n)VARCHAR(n)
BIT(n)BINARY(⌈n/8⌉)
BINARY(n)BINARY(n)
VARBINARY(N)VARBINARY(N)
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
STRING
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BYTESCurrently, for BLOB data type in MySQL, only the blob whose length isn’t greater than 2,147,483,647(2 ** 31 - 1) is supported.
YEARINT
ENUMSTRING
JSONSTRINGThe JSON data type will be converted into STRING with JSON format in Flink.
SETARRAY<STRING>As the SET data type in MySQL is a string object that can have zero or more values, it should always be mapped to an array of string