Oracle CDC 连接器

下载flink-sql-connector-oracle-cdc-2.2.1.jar放在<FLINK_HOME>/lib/.

注: flink-sql-connector-oracle-cdc-XXX-SNAPSHOT 版本为开发分支对应的代码。用户需要下载源码并编译相应的jar。用户使用发布版本,如flink-sql-connector-oracle-cdc-2.2.1.jar,发布版本会在Maven中央仓库中获取。

对于非 CDB 数据库

  1. 启用日志归档

    (1.1)。以 DBA 身份连接到数据库

    ORACLE_SID=SID
    export ORACLE_SID
    sqlplus /nolog
      CONNECT sys/password AS SYSDBA

    (1.2)。启用日志归档

    alter system set db_recovery_file_dest_size = 10G;
    alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
    shutdown immediate;
    startup mount;
    alter database archivelog;
    alter database open;

    注意:

    • 启用日志归档需要重启数据库,尝试时注意

    • 归档日志会占用大量磁盘空间,所以考虑定期清理过期日志

    (1.3)。查看日志归档是否开启

    -- Should now "Database log mode: Archive Mode"
    archive log list;

    注意:

    必须为捕获的表或数据库启用补充日志记录,以便数据更改捕获更改的数据库行之前的状态。下面说明如何在表/数据库级别进行配置。

    -- Enable supplemental logging for a specific table:
    ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    
    -- Enable supplemental logging for database
    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
  2. 创建具有权限的 Oracle 用户

    (2.1)。创建表空间

    sqlplus sys/password@host:port/SID AS SYSDBA;
      CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
      exit;

    (2.2)。创建用户并授予权限

    sqlplus sys/password@host:port/SID AS SYSDBA;
      CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
      GRANT CREATE SESSION TO flinkuser;
      GRANT SET CONTAINER TO flinkuser;
      GRANT SELECT ON V_$DATABASE to flinkuser;
      GRANT FLASHBACK ANY TABLE TO flinkuser;
      GRANT SELECT ANY TABLE TO flinkuser;
      GRANT SELECT_CATALOG_ROLE TO flinkuser;
      GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
      GRANT SELECT ANY TRANSACTION TO flinkuser;
      GRANT LOGMINING TO flinkuser;
    
      GRANT CREATE TABLE TO flinkuser;
      GRANT LOCK ANY TABLE TO flinkuser;
      GRANT ALTER ANY TABLE TO flinkuser;
      GRANT CREATE SEQUENCE TO flinkuser;
    
      GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
      GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
    
      GRANT SELECT ON V_$LOG TO flinkuser;
      GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
      GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
      GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
      GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
      GRANT SELECT ON V_$LOGFILE TO flinkuser;
      GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
      GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
      exit;

对于 CDB 数据库

总的来说,配置CDB数据库的步骤与非CDB数据库非常相似,但命令可能有所不同。

  1. 启用日志归档

    ORACLE_SID=ORCLCDB
    export ORACLE_SID
    sqlplus /nolog
      CONNECT sys/password AS SYSDBA
      alter system set db_recovery_file_dest_size = 10G;
      -- should exist
      alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
      shutdown immediate
      startup mount
      alter database archivelog;
      alter database open;
      -- Should show "Database log mode: Archive Mode"
      archive log list
      exit;

    注意: 还可以使用以下命令启用补充日志记录:

    -- Enable supplemental logging for a specific table:
    ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    
    -- Enable supplemental logging for database
    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
  2. 创建具有权限的 Oracle 用户

    sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba
      CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
      exit
    sqlplus sys/password@//localhost:1521/ORCLPDB1 as sysdba
      CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
      exit
    sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba
      CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
      GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
      GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;
      GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;
      GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;
      GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
      GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
      GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;
      GRANT LOGMINING TO flinkuser CONTAINER=ALL;
      GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;
      GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;
      GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;
    
      GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;
      GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;
    
      GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;
      GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;
      exit

查看有关设置 Oracle的更多信息

如何创建 Oracle CDC 表

Oracle CDC 表可以定义如下:

-- register an Oracle table 'products' in Flink SQL
Flink SQL> CREATE TABLE products (
     ID INT NOT NULL,
     NAME STRING,
     DESCRIPTION STRING,
     WEIGHT DECIMAL(10, 3),
     PRIMARY KEY(id) NOT ENFORCED
     ) WITH (
     'connector' = 'oracle-cdc',
     'hostname' = 'localhost',
     'port' = '1521',
     'username' = 'flinkuser',
     'password' = 'flinkpw',
     'database-name' = 'XE',
     'schema-name' = 'inventory',
     'table-name' = 'products');
  
-- read snapshot and binlogs from products table
Flink SQL> SELECT * FROM products;

注意: 当使用 CDB + PDB 模型时,需要在 Flink DDL 中添加一个额外的选项来指定要连接的 PDB 的名称。'debezium.database.pdb.name' = 'xxx'

连接器选项

选项 必需的 默认 类型 描述
连接器 必需的 (没有任何) 细绳 指定要使用的连接器,这里应该是'oracle-cdc'
主机名 必需的 (没有任何) 细绳 Oracle 数据库服务器的 IP 地址或主机名。
用户名 必需的 (没有任何) String 连接到 Oracle 数据库服务器时要使用的 Oracle 数据库的名称。
密码 必需的 (没有任何) String 连接到 Oracle 数据库服务器时使用的密码。
数据库名称 必需的 (没有任何) String 要监视的 Oracle 服务器的数据库名称。
架构名称 必需的 (没有任何) String 要监视的 Oracle 数据库的架构名称。
表名 必需的 (没有任何) String 要监视的 Oracle 数据库的表名。
港口 选修的 1521 Integer Oracle 数据库服务器的整数端口号。
扫描启动模式 选修的 initial String Oracle CDC 消费者的可选启动模式,有效枚举为“initial”和“latest-offset”。有关更多详细信息,请参阅启动阅读位置部分。
debezium.* 选修的 (没有任何) String 将 Debezium 的属性传递给 Debezium 嵌入式引擎,该引擎用于从 Oracle 服务器捕获数据更改。例如:'debezium.snapshot.mode' = 'never'。查看有关Debezium 的 Oracle Connector 属性的更多信息

局限性

在扫描数据库表的快照时,由于没有可恢复的位置,我们无法执行检查点。为了不执行检查点,Oracle CDC 源将保持检查点等待超时。超时检查点将被识别为失败检查点,默认情况下,这将触发 Flink 作业的故障转移。所以如果数据库表比较大,建议在 Flink 中添加如下配置,避免超时检查点引起的 failover:

execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647

启动阅读位置

配置选项scan.startup.mode指定 Oracle CDC 消费者的启动模式。有效的枚举是:

  • initial(default): 首次启动时对监控的数据库表进行初始快照,并继续读取最新的binlog。

  • latest-offset: 永远不要在第一次启动时对监视的数据库表执行快照,只需从连接器启动以来的更改中读取。

注意:scan.startup.modeoption 的机制依赖于 Debezium 的snapshot.mode配置。所以请不要一起使用它们。如果在表 DDL 中指定了scan.startup.mode和选项,它可能会不起作用。debezium.snapshot.modescan.startup.mode

Flink CDC SQL Oracle To Kafka To ES实例:

public class CP_LOGIN_INFO {
    private static final Logger log = LoggerFactory.getLogger(CP_LOGIN_INFO.class);
    public static void main(String[] args) throws Exception {

        EnvironmentSettings fsSettings = EnvironmentSettings.newInstance() //构建环境
                .useBlinkPlanner()
                .inStreamingMode()
                .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);  //设置流的并行

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings); //流表环境创造
        tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);



        log.info("This message contains {} placeholders. {}", 2, "Yippie");


        //  log.info("-----------------> start");  // 打印日志

//配置检查点
        env.enableCheckpointing(180000); // 开启checkpoint 每5000ms 一次
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(50000);// 确认 checkpoints 之间的时间会进行 500 ms
        env.getCheckpointConfig().setCheckpointTimeout(600000); //设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//设置有且仅有一次模式 目前支持 EXACTLY_ONCE/AT_LEAST_ONCE
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设置并发checkpoint的数目
        env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink-checkpoints/oracle/CP_LOGIN_INFO");  // 这个是存放到hdfs目录下
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 开启在 job 中止后仍然保留的 externalized checkpoints
        env.getCheckpointConfig().enableUnalignedCheckpoints(); // 开启实验性的 unaligned checkpoints



        String sourceDDL ="CREATE TABLE Oracle_Source (\n" +
                "     ID DECIMAL(12,0), \n" +
                "     USER_CODE STRING, \n" +
                "     LOGIN_TIME STRING, \n" +
                "     OVER_TIME STRING, \n" +
                "     TOKEN STRING, \n" +
                "     INSERT_TIME_HIS STRING, \n" +
                "     UPDATE_TIME_HIS STRING, \n" +
                "     VERSION STRING, \n" +
                "     PRIMARY KEY (ID) NOT ENFORCED \n" +
                "     ) WITH (\n" +
                "     'connector' = 'oracle-cdc',\n" +
                "     'hostname' = 'Oracle_IP地址',\n" +
                "     'port' = '1521',\n" +
                "     'username' = 'userxxx',\n" +
                "     'password' = 'pwdxxx',\n" +
                "     'database-name' = 'ORCL',\n" +
                "     'schema-name' = 'Oracle的schema-name',\n" +           // 注意这里要大写
                "     'table-name' = 'CP_LOGIN_INFO',\n" +
                "     'debezium.log.mining.strategy'='online_catalog',\n" +
                "     'debezium.log.mining.batch.size.max'='30000000',\n" +
                "     'debezium.log.mining.batch.size.min'='10000',\n" +
                "     'debezium.log.mining.batch.size.default'='2000000'\n" +
                "     )";

        // 创建一张用于输出的表
        String KafkasinkDDL = "CREATE TABLE SinkKafka (\n" +
                "     ID DECIMAL(12,0), \n" +
                "     USER_CODE STRING, \n" +
                "     LOGIN_TIME BIGINT, \n" +
                "     OVER_TIME BIGINT, \n" +
                "     TOKEN STRING, \n" +
                "     INSERT_TIME_HIS BIGINT, \n" +
                "     UPDATE_TIME_HIS BIGINT, \n" +
                "     VERSION STRING, \n" +
                "     PRIMARY KEY (ID) NOT ENFORCED \n" +
                ") WITH (\n" +
                " 'connector' = 'kafka',\n" +
                " 'topic' = 'CP_LOGIN_INFO',\n" +
                " 'properties.bootstrap.servers' =  'kafka_IP_A:9092,kafka_IP_B:9092,kafka_IP_C:9092',\n" +  //一定要加且加在index的签名
                " 'properties.group.id' = 'test-consumer-group',\n" +
                " 'scan.startup.mode' = 'earliest-offset',\n" +
                " 'format' = 'debezium-json', \n" +
                " 'debezium-json.ignore-parse-errors'='true' \n" +
                ")";

        String transformSQL =
                " INSERT INTO SinkKafka  SELECT ID,\n" +
                        "USER_CODE,\n" +
                        "(CAST(LOGIN_TIME AS BIGINT) - 8 * 60 * 60 * 1000 ) as LOGIN_TIME,\n" +
                        "(CAST(OVER_TIME  AS BIGINT) - 8 * 60 * 60 * 1000 ) as OVER_TIME,\n" +
                        "TOKEN,\n" +
                        "(CAST(INSERT_TIME_HIS AS BIGINT) - 8 * 60 * 60 * 1000 ) as INSERT_TIME_HIS,\n" +
                        "(CAST(UPDATE_TIME_HIS AS BIGINT)  - 8 * 60 * 60 * 1000 ) as UPDATE_TIME_HIS,\n" +

                        "VERSION FROM  Oracle_Source " ;

        String sinkDDL = "CREATE TABLE SinkES (\n" +
                "     ID DECIMAL(12,0), \n" +
                "     USER_CODE STRING, \n" +
                "     LOGIN_TIME BIGINT, \n" +
                "     OVER_TIME BIGINT, \n" +
                "     TOKEN STRING, \n" +
                "     INSERT_TIME_HIS BIGINT, \n" +
                "     UPDATE_TIME_HIS BIGINT, \n" +
                "     VERSION STRING, \n" +
                "     PRIMARY KEY (ID) NOT ENFORCED \n" +
                ") WITH (\n" +
                " 'connector' = 'elasticsearch-7',\n" +
                " 'hosts' = 'http://ES_IP地址:9200',\n" +
                " 'format' = 'json',\n" +  //一定要加且加在index的签名
                " 'index' = 'cp_login_info_testES',\n" +
                " 'username' = 'userxxx',\n" +
                " 'password' = 'pwdxxx',\n" +
                " 'failure-handler' = 'ignore',\n" +
                " 'sink.flush-on-checkpoint' = 'true' ,\n"+
                " 'sink.bulk-flush.max-actions' = '20000' ,\n"+
                " 'sink.bulk-flush.max-size' = '2mb' ,\n"+
                " 'sink.bulk-flush.interval' = '1000ms' ,\n"+
                " 'sink.bulk-flush.backoff.strategy' = 'CONSTANT',\n"+
                " 'sink.bulk-flush.backoff.max-retries' = '3',\n"+
                " 'connection.max-retry-timeout' = '3153600000000',\n"+
                " 'sink.bulk-flush.backoff.delay' = '100ms'\n"+
                ")";

        String transformSQL1 =
                " INSERT INTO SinkES  SELECT ID,\n" +
                        "USER_CODE,\n" +
                        "LOGIN_TIME,\n" +
                        "OVER_TIME,\n" +
                        "TOKEN,\n" +
                        "INSERT_TIME_HIS,\n" +
                        "UPDATE_TIME_HIS,\n" +
                        "VERSION FROM  SinkKafka " ;

        //执行source表ddl
        tableEnv.executeSql(sourceDDL);


        //执行sink表ddl
        tableEnv.executeSql(KafkasinkDDL);
        tableEnv.executeSql(sinkDDL);

        //执行逻辑sql语句
        TableResult tableResult = tableEnv.executeSql(transformSQL);
        TableResult tableResult1 = tableEnv.executeSql(transformSQL1);
        tableResult.print();
        tableResult1.print();
        env.execute();
    }
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐