全部需要下载的内容链接

https://download.csdn.net/download/u010978399/21733452

1、下载zookeeper-3.4.10

https://blog.csdn.net/She_lock/article/details/80435176?utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-8.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7Edefault-8.control

2、下载kafka_2.13-2.8.0

kafka安装参考:

https://blog.csdn.net/weixin_39984161/article/details/91971731?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522161959594516780262520102%2522%252C%2522scm%2522%253A%252220140713.130102334..%2522%257D&request_id=161959594516780262520102&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduend~default-1-91971731.pc_search_result_before_js&utm_term=linux%E5%AE%89%E8%A3%85kafka

3、下载Kafka Connector:建议使用1.6以上版本可以对ddl进行捕获

debezium-connector-mysql-1.6.0.Final-plugin.tar.gz

https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.6.0.Final/debezium-connector-mysql-1.6.0.Final-plugin.tar.gz

debezium-connector-postgres-1.6.0.Final-plugin.tar.gz

https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/1.6.0.Final/debezium-connector-postgres-1.6.0.Final-plugin.tar.gz

debezium-connector-oracle-1.6.0.Final-plugin.tar.gz

https://repo1.maven.org/maven2/io/debezium/debezium-connector-oracle/1.6.0.Final/debezium-connector-oracle-1.6.0.Final-plugin.tar.gz

4、安装debezium-connector-oracle

4.1下载debezium-connector-oracle-1.6.0.Final-plugin.tar.gz并解压,安装在自己的服务器,我的安装目录是/home/debezium/

4.2、将debezium-connector-oracle 目录下得jar包都拷贝一份到${KAFKA_HOME}/libs中

4.3、Oracle需要下载客户端并把jar包复制到${KAFKA_HOME}/libs

客户端下载地址:

https://download.oracle.com/otn_software/linux/instantclient/211000/instantclient-basic-linux.x64-21.1.0.0.0.zip

5、kafka环境修改,使用集群方式配置,但其实kafka非集群搭建

kafka安装目录:/home/kafka/kafka_2.13-2.8.0/

单机部署修改 [connect-standalone.properties]
集群部署修改 [connect-distributed.properties]

在这里插入图片描述


bootstrap.servers=192.168.1.121:9092
plugin.path=/home/debezium/debezium-connector-oracle

group.id=amirdebezium
// kafka connect内部信息保存到kafka时消息的序列化方式
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

// kafka connect内部需要用到的三个topic
config.storage.topic=amir-connect-configs
offset.storage.topic=amir-connect-offsets
status.storage.topic=amir-connect-statuses

config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

offset.flush.interval.ms=10000
rest.advertised.host.name=192.168.1.121

cleanup.policy=compact
rest.host.name=192.168.1.121
rest.port=8083

6、启动zookeeper、kafka,connect-distributed环境

6.1.进入zookeeper目录

启动zookeeper

sh zkServer.sh start

停止zookeeper

sh zkServer.sh stop

6.2.进入kafka目录

启动kafka

/home/kafka/kafka_2.13-2.8.0/bin/kafka-server-start.sh    /home/kafka/kafka_2.13-2.8.0/config/server.properties &

关闭kafka

/home/kafka/kafka_2.13-2.8.0/bin/kafka-server-stop.sh /home/kafka/kafka_2.13-2.8.0/config/server.properties &

6.3.以环境配置方式启动connect-distributed

加载环境

export KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:/home/kafka/kafka_2.13-2.8.0/config/connect-log4j.properties

启动

./bin/connect-distributed.sh /home/kafka/kafka_2.13-2.8.0/config/connect-distributed.properties &

末尾 一定要加上符号&是为了后台运行,这样就不会页面一关,服务就没有了

7、提交Oracle-connector,监视Oracle数据库

这个就是在liunx里面,命令直接贴进去

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://172.16.50.22:8085/connectors/ -d '
{
"name": "debezium-oracle",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"database.server.name" : "XE",
"database.hostname" : "172.16.50.239",
"database.port" : "1521",
"database.user" : "amir",
"database.password" : "amir",
"database.dbname" : "XE",
"database.schema" : "MSCDW",
"database.connection.adapter": "logminer", 
"database.tablename.case.insensitive": "true",
"table.include.list" : "MSCDW.*", 
"snapshot.mode" : "initial",
"schema.include.list" : "MSCDW",
"database.history.kafka.bootstrap.servers" : "172.16.50.22:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}'

8、查看创建的kafka connector列表

链接:

172.16.50.22:8085/connectors

在这里插入图片描述

9、查看创建的kafka connector状态

链接:

172.16.50.22:8085/connectors/debezium-oracle/status

这里的debezium-oracle是上一步查出来的名称

在这里插入图片描述

10、查看创建的kafka connector配置

链接:

172.16.50.22:8085/connectors/debezium-oracle/config

在这里插入图片描述

11、查看kafka中topic

当环境搭建好之后,默认为每个表创建一个属于自己的主题,如图所示,小编这里使用的kafka Tool工具查看,注意这里的主题为XE.SCOTT.DEPT,而非XE.MSCDW.CONFIG,其实按照上述步骤应该是MSCDW,但因为在写文档的时候忘记放这块的内容,是后来才发现补的,补的时候配置是监听SCOTT库的DDL,就懒的换了。
在这里插入图片描述

12、flinksqlclient创建表并测试

CREATE TABLE sinkMysqlConfigTable
(
ID STRING,
CRON STRING
) WITH (
‘connector.type’ = ‘jdbc’,
‘connector.url’ = ‘jdbc:mysql://IP:3306/admin’,
‘connector.table’ = ‘config’,
‘connector.username’ = ‘root’,
‘connector.password’ = ‘dhcc@2020,
‘connector.write.flush.max-rows’ =1);

CREATE TABLE createOracleConfigTable (
id STRING,
cron STRING
) WITH (
‘connector’ = ‘kafka’,
‘topic’ =XE.MSCDW.CONFIG,
‘properties.bootstrap.servers’ =172.16.50.22:9092,
‘debezium-json.schema-include’ =false,
‘properties.group.id’ = ‘amirdebezium’,
‘scan.startup.mode’ = ‘earliest-offset’,
‘value.format’ = ‘debezium-json’
);

附上:Oracle的归档开启

#按要求修改,不然会报错

alter system set db_recovery_file_dest_size=5G;

Oracle 开启归档日志

#开启行模式

alter database add supplemental log data (all) columns;

创建 新得表空间与dbzuser,并赋予相应得权限

CREATE TABLESPACE LOGMINER_TBS DATAFILE '/home/oracle/app/oracle/oradata/amir/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
CREATE USER dbzuser IDENTIFIED BY dbz DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO dbzuser;
GRANT SELECT ON V_$DATABASE TO dbzuser;
GRANT FLASHBACK ANY TABLE TO dbzuser;
GRANT SELECT ANY TABLE TO dbzuser;
GRANT SELECT_CATALOG_ROLE TO dbzuser;
GRANT EXECUTE_CATALOG_ROLE TO dbzuser;
GRANT SELECT ANY TRANSACTION TO dbzuser;
GRANT SELECT ANY DICTIONARY TO dbzuser;

GRANT CREATE TABLE TO dbzuser;
GRANT ALTER ANY TABLE TO dbzuser;
GRANT LOCK ANY TABLE TO dbzuser;
GRANT CREATE SEQUENCE TO dbzuser;

GRANT EXECUTE ON DBMS_LOGMNR TO dbzuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO dbzuser;
GRANT SELECT ON V_$LOGMNR_LOGS to dbzuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO dbzuser;
GRANT SELECT ON V_$LOGFILE TO dbzuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO dbzuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO dbzuser;

暂时可以不用,官网有做要求,暂时没明白有什么用

CREATE USER debezium IDENTIFIED BY dbz DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS;
GRANT CONNECT TO debezium;
GRANT CREATE SESSION TO debezium;
GRANT CREATE TABLE TO debezium;
GRANT CREATE SEQUENCE to debezium;
ALTER USER debezium QUOTA 100M on users;

kafka查看topic和消息内容命令

1、查询topic,进入kafka目录:

bin/kafka-topics.sh --list --zookeeper localhost:2181

2、查询topic内容:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topicName --from-beginning
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐