Debezium 抽取oracle数据
1、环境介绍操作系统:centos 7.9jdk版本:11.0.12kafka版本:2.8.0Debezium版本:1.6(debezium-connector-oracle-1.6.1.Final-plugin.tar.gz)oracle版本:19c2、安装oraclehttps://blog.csdn.net/zyj81092211/article/details/1200828283、设置o
1、环境介绍
操作系统:centos 7.9
jdk版本:11.0.12
kafka版本:2.8.0
Debezium版本:1.6(debezium-connector-oracle-1.6.1.Final-plugin.tar.gz)
oracle版本:19c
kafka connect 分布式部署
https://blog.csdn.net/zyj81092211/article/details/119647591
kafka connector 配置 Debezium
https://blog.csdn.net/zyj81092211/article/details/119840744
2、安装oracle
https://blog.csdn.net/zyj81092211/article/details/120082828
3、设置oracle
(1)创建目录
mkdir /u01/app/oracle/oradata/recovery_area
(2)连接oracle
sqlplus / as sysdba
(3)进行设置
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/u01/app/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
archive log list;
(4)打开pdb
alter pluggable database orders open;
(5)、连接至pdb ORDERS
alter session set container=ORDERS;
(6)、创建用户
create user debezium identified by Smtgbk_123;
grant dba to debezium;
(7)、重新链接数据库
sqlplus debezium/Smtgbk_123@localhost/orders
(8)、创建测试表
CREATE TABLE customers (
id NUMBER(9) GENERATED BY DEFAULT ON NULL AS IDENTITY (START WITH 1001) NOT NULL PRIMARY KEY,
first_name VARCHAR2(255) NOT NULL,
last_name VARCHAR2(255) NOT NULL,
email VARCHAR2(255) NOT NULL UNIQUE
);
(9)、开启补充日志
开启表级补充日志
ALTER TABLE customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
开启数据库级别日志补充,在CDB中,执行以下命令
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
(10)、为连接器创建用户
a、使用管理员重新连接数据库
sqlplus / as sysdba
b、创建根容器表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/CDB19C/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
c、创建PDB表空间
切换至PDB
alter session set container=ORDERS;
CREATE TABLESPACE logminer_tbs DATAFILE '/u01/app/oracle/oradata/CDB19C/ORDERS/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
d、创建连接器的 LogMiner 用户
sqlplus / as sysdba
CREATE USER c##dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
GRANT CREATE SESSION TO c##dbzuser CONTAINER=ALL;
GRANT SET CONTAINER TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to c##dbzuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO c##dbzuser CONTAINER=ALL;
GRANT LOGMINING TO c##dbzuser CONTAINER=ALL;
GRANT CREATE TABLE TO c##dbzuser CONTAINER=ALL;
GRANT LOCK ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT ALTER ANY TABLE TO c##dbzuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR TO c##dbzuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO c##dbzuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##dbzuser CONTAINER=ALL;
4、获取 Oracle JDBC 驱动程序
下载地址
https://www.oracle.com/database/technologies/instant-client/linux-x86-64-downloads.html
将软件包中ojdbc8.jar的上传到kafka connector集群所有节点libs文件夹内
杀死kafka 进程重启
kafka-server-start.sh -daemon /data/kafka-connect/config/server.properties
connect-distributed.sh -daemon /data/kafka-connect/config/connect-distributed.properties
5、Debezium Oracle 连接器配置
官方示例:
curl -H "Content-Type: application/json" -X POST -d '{
"name": "source-oracle202",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"database.server.name" : "oracle202",
"database.hostname" : "10.99.99.202",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "CDB19C",
"database.pdb.name" : "ORDERS",
"database.history.kafka.bootstrap.servers" : "kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092",
"database.history.kafka.topic": "schema-changes.orders"
}
}' http://kafkac01.wtown.com:8083/connectors/
查看状态正常
查看topic
6、测试
插入数据
INSERT INTO "DEBEZIUM"."CUSTOMERS" ("ID", "FIRST_NAME", "LAST_NAME", "EMAIL") VALUES ('1', 'zhang', 'san', 'zhagnsan@163.com');
查看topic中多了一个oracle202.DEBEZIUM.CUSTOMERS
消费oracle202.DEBEZIUM.CUSTOMERS
kafka-console-consumer.sh --bootstrap-server kafkac01.wtown.com:9092,kafkac02.wtown.com:9092,kafkac03.wtown.com:9092 --topic oracle202.DEBEZIUM.CUSTOMERS --from-beginning
刚才插入的数据已经抓取到
注:创建non cdb数据库参考官方文档
官方文档:
https://debezium.io/documentation/reference/1.6/connectors/oracle.html
更多推荐
所有评论(0)