Oracle-CDC

概述

Oracle Extract 节点允许从 Oracle 数据库中读取快照数据和增量数据。本文档介绍如何设置 Oracle Extract 节点以对 Oracle 数据库运行 SQL 查询。

支持的版本

Extract 节点版本Driver
Oracle-CDCOracle: 11, 12, 19Oracle Driver: 19.3.0.0

依赖

为了设置 Oracle Extract 节点,下表提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connectors JAR 包的 SQL 客户端的两个项目的依赖关系信息。

Maven 依赖

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-oracle-cdc</artifactId>
  4. <version>1.5.0-SNAPSHOT</version>
  5. </dependency>

连接 Oracle 数据库还需要 Oracle 驱动程序依赖项。请下载ojdbc8-19.3.0.0.jar 并将其放入 FLINK_HOME/lib/

设置 Oracle

你必须为 Oracle 数据库启用日志归档,并定义一个对 Debezium Oracle 连接器监控的所有数据库具有适当权限的 Oracle 用户。

对于非 CDB 数据库

  • 启用日志归档

    (1.1). 以 DBA 身份连接到数据库

    1. ORACLE_SID=SID
    2. export ORACLE_SID
    3. sqlplus /nolog
    4. CONNECT sys/password AS SYSDBA

    (1.2). 启用日志归档

    1. alter system set db_recovery_file_dest_size = 10G;
    2. alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
    3. shutdown immediate;
    4. startup mount;
    5. alter database archivelog;
    6. alter database open;

    Note:

    • Enable log archiving requires database restart, pay attention when try to do it
    • The archived logs will occupy a large amount of disk space, so consider clean the expired logs the periodically

    (1.3). 检查是否启用了日志归档

    1. -- Should now "Database log mode: Archive Mode"
    2. archive log list;

    注意:

    必须为捕获的表或数据库启用补充日志记录,以便数据更改能够捕获已更改数据库行的之前状态。 下面说明了如何在表/数据库级别进行配置。

    1. -- 为特定表启用补充日志记录:
    2. ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    1. -- 为数据库启用补充日志记录:
    2. ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
  • 创建具有权限的 Oracle 用户

    (2.1). 创建表空间

    1. sqlplus sys/password@host:port/SID AS SYSDBA;
    2. CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
    3. exit;

    (2.2). 创建用户并授予权限

    1. sqlplus sys/password@host:port/SID AS SYSDBA;
    2. CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
    3. GRANT CREATE SESSION TO flinkuser;
    4. GRANT SET CONTAINER TO flinkuser;
    5. GRANT SELECT ON V_$DATABASE to flinkuser;
    6. GRANT FLASHBACK ANY TABLE TO flinkuser;
    7. GRANT SELECT ANY TABLE TO flinkuser;
    8. GRANT SELECT_CATALOG_ROLE TO flinkuser;
    9. GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
    10. GRANT SELECT ANY TRANSACTION TO flinkuser;
    11. GRANT LOGMINING TO flinkuser;
    12. GRANT CREATE TABLE TO flinkuser;
    13. GRANT LOCK ANY TABLE TO flinkuser;
    14. GRANT ALTER ANY TABLE TO flinkuser;
    15. GRANT CREATE SEQUENCE TO flinkuser;
    16. GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
    17. GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
    18. GRANT SELECT ON V_$LOG TO flinkuser;
    19. GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
    20. GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
    21. GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
    22. GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
    23. GRANT SELECT ON V_$LOGFILE TO flinkuser;
    24. GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
    25. GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
    26. exit;

对于 CDB 数据库

总的来说,配置 CDB 数据库的步骤与非 CDB 数据库非常相似,但命令可能会有所不同。

  • 启用日志归档

    1. ORACLE_SID=ORCLCDB
    2. export ORACLE_SID
    3. sqlplus /nolog
    4. CONNECT sys/password AS SYSDBA
    5. alter system set db_recovery_file_dest_size = 10G;
    6. -- should exist
    7. alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
    8. shutdown immediate
    9. startup mount
    10. alter database archivelog;
    11. alter database open;
    12. -- Should show "Database log mode: Archive Mode"
    13. archive log list
    14. exit;

    注意: 您还可以使用以下命令启用补充日志记录:

    1. -- Enable supplemental logging for a specific table:
    2. ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    3. -- Enable supplemental logging for database
    4. ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
  • 创建具有权限的 Oracle 用户

    1. sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba
    2. CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
    3. exit
    1. sqlplus sys/password@//localhost:1521/ORCLPDB1 as sysdba
    2. CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
    3. exit
    1. sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba
    2. CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
    3. GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
    4. GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;
    5. GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;
    6. GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;
    7. GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;
    8. GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
    9. GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
    10. GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;
    11. GRANT LOGMINING TO flinkuser CONTAINER=ALL;
    12. GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;
    13. -- 如果设置 scan.increative.snapshot.enabled=true (默认值),则不需要执行这条语句
    14. GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;
    15. GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;
    16. GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;
    17. GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;
    18. GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;
    19. GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;
    20. GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;
    21. GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;
    22. GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;
    23. GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;
    24. GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;
    25. GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;
    26. exit

查看更多关于 设置 Oracle

如何创建 Oracle Extract 节点

SQL API 用法

Oracle Extract 节点可以定义如下:

  1. -- 创建 an Oracle Extract 节点 'products' in Flink SQL
  2. Flink SQL> CREATE TABLE products (
  3. ID INT NOT NULL,
  4. NAME STRING,
  5. DESCRIPTION STRING,
  6. WEIGHT DECIMAL(10, 3),
  7. PRIMARY KEY(id) NOT ENFORCED
  8. ) WITH (
  9. 'connector' = 'oracle-cdc-inlong',
  10. 'hostname' = 'localhost',
  11. 'port' = '1521',
  12. 'username' = 'flinkuser',
  13. 'password' = 'flinkpw',
  14. 'database-name' = 'XE',
  15. 'schema-name' = 'inlong',
  16. 'table-name' = 'user');
  17. Flink SQL> SELECT * FROM products;

注意:

当使用 CDB + PDB 模型时,您需要在 Flink DDL 中添加一个额外的选项 'debezium.database.pdb.name' = 'xxx' 来指定要连接的 PDB 的名称。

InLong Dashboard 用法

TODO: 将在未来支持此功能。

InLong Manager Client 用法

TODO: 将在未来支持此功能。

Oracle Extact 节点参数

选项是否必须默认类型描述
connector必选(none)String指定要使用的连接器,这里应该是 ‘oracle-cdc-inlong’
hostname必选(none)StringOracle 数据库服务器的 IP 地址或主机名。
username必选(none)String连接到 Oracle 数据库服务器时要使用的 Oracle 数据库的名称。
password必选(none)String连接到 Oracle 数据库服务器时使用的密码。
database-name必选(none)String要监视的 Oracle 服务器的数据库名称。
schema-name必选(none)String要监视的 Oracle 数据库的 Schema 名称。
table-name必选(none)String要监视的 Oracle 数据库的表名。格式为<schema_name>.<table_name>
port可选1521IntegerOracle 数据库服务器的整数端口号。
scan.startup.mode可选initialStringOracle CDC 消费者的可选启动模式,有效枚举为”initial”和”latest-offset”。 请参阅启动阅读位置部分了解更多详细信息。
debezium.*可选(none)String将 Debezium 的属性整合到用于从 Oracle 服务器捕获数据更改的 Debezium Embedded Engine。 例如:‘debezium.snapshot.mode’ = ‘never’。 详细了解 Debezium 的 Oracle 连接器属性
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId]。
source.multiple.enable可选falseBoolean是否开启多模式、表同步功能,如果为 ‘true’,Oracle Extract Node 则将表的物理字段压缩成 ‘canal-json’ 格式的特殊元字段 ‘data_canal’。
scan.incremental.snapshot.enabled可选trueBoolean增量快照是一种读取表快照的新机制。与旧的快照机制相比,增量快照具有许多优点,包括:(1)在快照读取期间 Source 可以是并行的,(2)Source 可以在快照读取过程中执行 Chunk 粒度中的检查点,(3)Source 不需要在快照读取之前获取 ROW SHARE MODE 锁。
scan.incremental.snapshot.chunk.size可选8096Integer表快照的块大小(行数),读取表的快照时,表的快照被分成多个块。
scan.snapshot.fetch.size可选1024Integer读取表快照时每次轮询的最大获取大小。
connect.max-retries可选3Integer连接器应重试以建立 Oracle 数据库服务器连接的最大重试次数。
chunk-meta.group.size可选1000IntegerChunk meta 组大小,如果 meta 大小超过组大小,则 meta 将被分成多个组。
connect.timeout可选30sDuration连接器在尝试连接到 Oracle 数据库服务器后在超时之前应等待的最长时间。
chunk-key.even-distribution.factor.lower-bound可选0.05dDoubleChunk Key 分布因子的下限。分布系数用于确定表格是否均匀分布。当数据分布均匀时,表块将均匀地使用计算优化,当数据分布不均匀时,将进行拆分查询。分布因子可以通过(MAX(id)-MIN(id)+1)/rowCount计算。
chunk-key.even-distribution.factor.upper-bound可选1000.0dDoubleChunk Key 分布因子的上限。分布系数用于确定表格是否均匀分布。当数据分布均匀时,表块将均匀地使用计算优化,当数据分布不均匀时,将进行拆分查询。分布因子可以通过(MAX(id)-MIN(id)+1)/rowCount计算。
connection.pool.size可选20Integer连接池大小。

局限性

在扫描表的快照期间无法执行 checkpoint

在扫描数据库表的快照时,由于没有可恢复的位置,我们无法执行检查点。为了不执行检查点,Oracle CDC 源将保持检查点等待超时。超时检查点将被识别为失败的检查点,默认情况下,这将触发 Flink 作业的故障转移。所以如果数据库表很大,建议添加以下 Flink 配置,避免因为超时检查点而导致故障转移:

  1. execution.checkpointing.interval: 10min
  2. execution.checkpointing.tolerable-failed-checkpoints: 100
  3. restart-strategy: fixed-delay
  4. restart-strategy.fixed-delay.attempts: 2147483647

可用的元数据字段

以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。

字段名称数据类型描述
table_nameSTRING NOT NULL该行所属的表名。
schema_nameSTRING NOT NULL该行所属的模式名称。
database_nameSTRING NOT NULL该行所属的数据库名称。
op_tsTIMESTAMP_LTZ(3) NOT NULL它指示在数据库中进行更改的时间。
如果记录从表的快照而不是change流中读取,则该值始终为0。
meta.table_nameSTRING NOT NULL该行所属的表名。
meta.schema_nameSTRING NOT NULL该行所属的模式名称。
meta.database_nameSTRING NOT NULL该行所属的数据库名称。
meta.op_tsTIMESTAMP_LTZ(3) NOT NULL它指示在数据库中进行更改的时间。
如果记录从表的快照而不是change流中读取,则该值始终为0。
meta.op_typeSTRING数据库操作的类型,如 INSERT/DELETE 等。
meta.data_canalSTRING/BYTEScanal-json 格式化的行的数据只有在 source.multiple.enable 选项为 ‘true’ 时才存在。
meta.is_ddlBOOLEAN是否是 DDL 语句。
meta.tsTIMESTAMP_LTZ(3) NOT NULL接收和处理行的当前时间。
meta.sql_typeMAP将 Sql_type 表字段映射到 Java 数据类型 Id。
meta.oracle_typeMAP表的结构。
meta.pk_namesARRAY表的主键名称。

扩展的 CREATE TABLE 示例演示了公开这些元数据字段的语法:

  1. CREATE TABLE products (
  2. db_name STRING METADATA FROM 'database_name' VIRTUAL,
  3. schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
  4. table_name STRING METADATA FROM 'table_name' VIRTUAL,
  5. op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
  6. meta_db_name STRING METADATA FROM 'meta.database_name' VIRTUAL,
  7. meta_schema_name STRING METADATA FROM 'meta.schema_name' VIRTUAL,
  8. meta_table_name STRING METADATA FROM 'meta.table_name' VIRTUAL,
  9. meat_op_ts TIMESTAMP_LTZ(3) METADATA FROM 'meta.op_ts' VIRTUAL,
  10. meta_op_type STRING METADATA FROM 'meta.op_type' VIRTUAL,
  11. meta_data_canal STRING METADATA FROM 'meta.data_canal' VIRTUAL,
  12. meta_is_ddl BOOLEAN METADATA FROM 'meta.is_ddl' VIRTUAL,
  13. meta_ts TIMESTAMP_LTZ(3) METADATA FROM 'meta.ts' VIRTUAL,
  14. meta_sql_type MAP<STRING, INT> METADATA FROM 'meta.sql_type' VIRTUAL,
  15. meat_oracle_type MAP<STRING, STRING> METADATA FROM 'meta.oracle_type' VIRTUAL,
  16. meta_pk_names ARRAY<STRING> METADATA FROM 'meta.pk_names' VIRTUAL
  17. ID INT NOT NULL,
  18. NAME STRING,
  19. DESCRIPTION STRING,
  20. WEIGHT DECIMAL(10, 3),
  21. PRIMARY KEY(id) NOT ENFORCED
  22. ) WITH (
  23. 'connector' = 'oracle-cdc-inlong',
  24. 'hostname' = 'localhost',
  25. 'port' = '1521',
  26. 'username' = 'flinkuser',
  27. 'password' = 'flinkpw',
  28. 'database-name' = 'XE',
  29. 'schema-name' = 'inventory',
  30. 'table-name' = 'inventory.products'
  31. );

注意:Oracle 方言是区分大小写的,如果字段名没有被引用,它会将字段名转换为大写,Flink SQL 不会转换字段名。因此对于 oracle 数据库中的物理列,我们在 Flink SQL 中定义 oracle-cdc 表时应该使用其在 Oracle 中转换后的字段名称。

特征

Exactly-Once 处理

Oracle Extract 节点是一个 Flink Source 连接器,它将首先读取数据库快照,然后通过exactly-once 处理继续读取更改事件,即使发生故障。请阅读 连接器的工作原理

启动读取位置

配置选项 scan.startup.mode 指定 Oracle Extract 节点消费者的启动模式。有效的枚举是:

  • initial (默认): 首次启动时对被监控的数据库表进行初始快照,并继续读取最新的 Binlog。
  • latest-offset: 永远不要在第一次启动时对受监控的数据库表执行快照,只需从自连接器启动以来的更改。

注意: scan.startup.mode 选项的机制依赖于 Debezium 的snapshot.mode 配置。所以请不要一起使用它们。如果您在 DDL 表中同时指定了 scan.startup.modedebezium.snapshot.mode 选项,可能会导致 scan.startup.mode 不起作用。

单线程读取

Oracle Extract 节点不能并行读取,因为只有一个任务可以接收更改事件。

整库、多模式、表同步

Oracle Extract 节点支持整库、多模式、多表同步。开启该功能后,Oracel Extract 节点会将表的物理字段压缩成 ‘canal-json’ 格式的特殊元字段 ‘data_canal’。

配置参数:

参数是否必须默认值数据类型描述
source.multiple.enableoptionalfalseString指定‘source.multiple.enable’ = ‘true’参数开启整库、多模式、多表同步功能
schema-namerequired(none)String要监视的 Oracle 数据库的 Schema 名称。如果要捕获多个模式,可以使用逗号分割它们。例如:‘schema-name’ = ‘SCHEMA1,SCHEMA2’
table-namerequired(none)String要监视的 Oracle 数据库的表名。如果要捕获多个表,可以使用逗号分割它们。例如:‘table-name’ = ‘SCHEMA1.TB.*, SCHEMA2.TB1’

CREATE TABLE 示例演示该功能语法:

  1. CREATE TABLE node(
  2. data STRING METADATA FROM 'meta.data_canal' VIRTUAL)
  3. WITH (
  4. 'connector' = 'oracle-cdc-inlong',
  5. 'hostname' = 'localhost',
  6. 'port' = '1521',
  7. 'username' = 'flinkuser',
  8. 'password' = 'flinkpw',
  9. 'database-name' = 'XE',
  10. 'schema-name' = 'inventory',
  11. 'table-name' = 'inventory..*',
  12. 'source.multiple.enable' = 'true'
  13. )

数据类型映射

Oracle typeFlink SQL type
NUMBER(p, s <= 0), p - s < 3TINYINT
NUMBER(p, s <= 0), p - s < 5SMALLINT
NUMBER(p, s <= 0), p - s < 10INT
NUMBER(p, s <= 0), p - s < 19BIGINT
NUMBER(p, s <= 0), 19 <= p - s <= 38
DECIMAL(p - s, 0)
NUMBER(p, s > 0)DECIMAL(p, s)
NUMBER(p, s <= 0), p - s > 38STRING
FLOAT
BINARY_FLOAT
FLOAT
DOUBLE PRECISION
BINARY_DOUBLE
DOUBLE
NUMBER(1)BOOLEAN
DATE
TIMESTAMP [(p)]
TIMESTAMP [(p)][WITHOUT TIMEZONE]
TIMESTAMP [(p)] WITH TIME ZONETIMESTAMP [(p)] WITH TIME ZONE
TIMESTAMP [(p)] WITH LOCAL TIME ZONETIMESTAMP_LTZ [(p)]
CHAR(n)
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
NCLOB
XMLType
STRING
BLOB
ROWID
BYTES
INTERVAL DAY TO SECOND
INTERVAL YEAR TO MONTH
BIGINT