1、环境介绍
操作系统:centos 7.9
jdk版本:11.0.12
kafka版本:2.8.0
Debezium版本:1.6(debezium-connector-oracle-1.6.1.Final-plugin.tar.gz)
oracle版本:19c

kafka connect 分布式部署
https://blog.csdn.net/zyj81092211/article/details/119647591
kafka connector 配置 Debezium
https://blog.csdn.net/zyj81092211/article/details/119840744

2、安装oracle
https://blog.csdn.net/zyj81092211/article/details/120082828

3、设置oracle
(1)创建目录

mkdir /u01/app/oracle/oradata/recovery_area

(2)连接oracle

sqlplus / as sysdba

(3)进行设置

alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/u01/app/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
archive log list;

在这里插入图片描述

(4)打开pdb

alter pluggable database orders open;

在这里插入图片描述
(5)、连接至pdb ORDERS

alter session set container=ORDERS;

(6)、创建用户

create user debezium identified by Smtgbk_123;
grant dba to debezium;

(7)、重新链接数据库

sqlplus debezium/Smtgbk_123@localhost/orders

(8)、创建测试表

CREATE TABLE customers (
  id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,
  first_name VARCHAR2(255) NOT NULL,
  last_name VARCHAR2(255) NOT NULL,
  email VARCHAR2(255) NOT NULL UNIQUE
);

(9)、开启补充日志
开启表级补充日志

ALTER TABLE customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

开启数据库级别日志补充,在CDB中,执行以下命令
在这里插入图片描述

ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

(10)、为连接器创建用户
a、使用管理员重新连接数据库

sqlplus / as sysdba

b、创建根容器表空间
在这里插入图片描述

CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/CDB19C/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

c、创建PDB表空间
切换至PDB

alter session set container=ORDERS;

在这里插入图片描述

CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/CDB19C/ORDERS/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

d、创建连接器的 LogMiner 用户

 sqlplus / as sysdba

在这里插入图片描述

CREATE USER c##dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;
GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT ALTER ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;

4、获取 Oracle JDBC 驱动程序
下载地址
https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html
在这里插入图片描述
将软件包中ojdbc8.jar的上传到kafka connector集群所有节点libs文件夹内
在这里插入图片描述
在这里插入图片描述
杀死kafka 进程重启

kafka-server-start.sh -daemon /data/kafka-connect/config/server.properties
connect-distributed.sh -daemon /data/kafka-connect/config/connect-distributed.properties

5、Debezium Oracle 连接器配置
官方示例:
在这里插入图片描述
在这里插入图片描述

curl -H "Content-Type: application/json" -X POST -d  '{
    "name": "source-oracle202", 
    "config": {
     "connector.class" : "io.debezium.connector.oracle.OracleConnector",
     "tasks.max" : "1",
     "database.server.name" : "oracle202",
     "database.hostname" : "10.99.99.202",
     "database.port" : "1521",
     "database.user" : "c##dbzuser",
     "database.password" : "dbz",
     "database.dbname" : "CDB19C",
     "database.pdb.name" : "ORDERS",
     "database.history.kafka.bootstrap.servers" : "kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092",
     "database.history.kafka.topic": "schema-changes.orders"
  }
}' http://kafkac01.wtown.com:8083/connectors/

查看状态正常
在这里插入图片描述
查看topic
在这里插入图片描述

6、测试
插入数据

INSERT INTO "DEBEZIUM"."CUSTOMERS" ("ID", "FIRST_NAME", "LAST_NAME", "EMAIL") VALUES ('1', 'zhang', 'san', 'zhagnsan@163.com');

查看topic中多了一个oracle202.DEBEZIUM.CUSTOMERS
在这里插入图片描述
消费oracle202.DEBEZIUM.CUSTOMERS

kafka-console-consumer.sh --bootstrap-server kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092 --topic  oracle202.DEBEZIUM.CUSTOMERS --from-beginning

刚才插入的数据已经抓取到
在这里插入图片描述

注:创建non cdb数据库参考官方文档
官方文档:
https://debezium.io/documentation/reference/1.6/connectors/oracle.html

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐