环境

操作系统版本

[root@localhost kafka_2.13-2.8.0]# cat /etc/redhat-release 
CentOS Linux release 7.5.1804 (Core) 
[root@localhost kafka_2.13-2.8.0]# uname -r
3.10.0-862.el7.x86_64

glibc版本

[root@localhost kafka_2.13-2.8.0]# rpm -qa|grep glibc
glibc-common-2.17-222.el7.x86_64
glibc-2.17-222.el7.x86_64

kafka版本

kafka_2.13-2.8.0

配置zookeeper

10.0.2.18节点配置

[root@localhost kafka_2.13-2.8.0]# cat /opt/kafka_2.13-2.8.0/config/zookeeper.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
tickTime=2000
initLimit=5
syncLimit=2
server.1=10.0.2.20:2888:3888
server.2=10.0.2.18:2889:3889
server.3=10.0.2.19:2890:3890
echo "2" > /tmp/zookeeper/myid

10.0.2.19节点配置

[root@localhost kafka_2.13-2.8.0]# cat /opt/kafka_2.13-2.8.0/config/zookeeper.properties
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
tickTime=2000
initLimit=5
syncLimit=2
server.1=10.0.2.20:2888:3888
server.2=10.0.2.18:2889:3889
server.3=10.0.2.19:2890:3890
echo "3" > /tmp/zookeeper/myid

10.0.2.20节点配置

[root@localhost kafka_2.13-2.8.0]# cat /opt/kafka_2.13-2.8.0/config/zookeeper.properties
censed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
# 
#    http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
tickTime=2000
initLimit=5
syncLimit=2
server.1=10.0.2.20:2888:3888
server.2=10.0.2.18:2889:3889
server.3=10.0.2.19:2890:3890
echo "1" > /tmp/zookeeper/myid

启动zookeeper集群

每个节点都执行

cd /opt/kafka_2.13-2.8.0
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

kafka配置

10.0.2.18节点配置

[root@localhost kafka_2.13-2.8.0]# cat /opt/kafka_2.13-2.8.0/config/server.properties
broker.id=2
listeners=PLAINTEXT://10.0.2.18:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

log.retention.hours=168

log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.0.2.18:2181,10.0.2.19:2181,10.0.2.20:2181

zookeeper.connection.timeout.ms=18000

group.initial.rebalance.delay.ms=0
delete.topic.enable=true

10.0.2.19节点配置

[root@localhost kafka_2.13-2.8.0]# cat /opt/kafka_2.13-2.8.0/config/server.properties
broker.id=3
listeners=PLAINTEXT://10.0.2.18:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

log.retention.hours=168

log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.0.2.18:2181,10.0.2.19:2181,10.0.2.20:2181

zookeeper.connection.timeout.ms=18000

group.initial.rebalance.delay.ms=0
delete.topic.enable=true

10.0.2.20节点配置

[root@localhost kafka_2.13-2.8.0]# cat /opt/kafka_2.13-2.8.0/config/server.properties
broker.id=1
listeners=PLAINTEXT://10.0.2.19:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

log.retention.hours=168

log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.0.2.18:2181,10.0.2.19:2181,10.0.2.20:2181

zookeeper.connection.timeout.ms=18000

group.initial.rebalance.delay.ms=0
delete.topic.enable=true

启动kafka集群

所有节点都执行

cd /opt/kafka_2.13-2.8.0
bin/kafka-server-start.sh -daemon config/server.properties

debezium安装包准备

下载安装包

下载安装包如下

debezium-connector-oracle-1.6.0-20210616.001509-60-plugin.tar.gz
instantclient-basic-linux.x64-21.1.0.0.0.zip

下载debezium-connector-oracle
https://debezium.io/documentation/reference/install.html
在这里插入图片描述
下载instantclient
https://debezium.io/documentation/reference/install.html
在这里插入图片描述
跳转到下载页面
https://www.oracle.com/database/technologies/instant-client/downloads.html
在这里插入图片描述
在这里插入图片描述

解压安装包

在这里插入图片描述

复制jar包到对应目录

cp /opt/debezium-connector-oracle/*.jar /opt/kafka_2.13-2.8.0/libs/
cp /opt/instantclient_21_1/*.jar /opt/kafka_2.13-2.8.0/libs/

oracle配置

登录数据库

切换到oracle用户

su - oracle

切换到oralce安装目录
登录oracle数据库

sqlplus / as sysdba

开启归档日志

开启归档日志
需要在mount状态下开始数据库归档,重启至mount

SQL> shutdown immediate
//输出结果
Database closed.
Database dismounted.
ORACLE instance shut down.
SQL> startup mount
ORACLE instance started.

Total System Global Area 1603411968 bytes
Fixed Size            2213776 bytes
Variable Size          989857904 bytes
Database Buffers      603979776 bytes
Redo Buffers            7360512 bytes
Database mounted.

开启数据库归档

SQL> alter database archivelog;
//输出结果
Database altered.

查看归档结果

SQL> archive log list
//输出结果
Database log mode           Archive Mode
Automatic archival           Enabled
Archive destination           /u01/app/oracle/archive_log
Oldest online log sequence     244
Next log sequence to archive   246
Current log sequence           246

开启自动归档

alter system archive log start;  

开启强制归档

ALTER DATABASE FORCE LOGGING;

打开数据库

SQL> alter database open;

Database altered.

确认数据库为归档模式

SQL> select log_mode from v$database;

LOG_MODE
------------------------------------
ARCHIVELOG

SQL> select archiver from v$instance;

ARCHIVER
---------------------
STARTED

开启补充日志

开启最小字段补充日志

SQL> alter database add supplemental log data ;
 
Database altered.

开启全体字段补充日志

SQL> alter database add supplemental log data (all) columns; 
 
Database altered.

确认是否开启

select SUPPLEMENTAL_LOG_DATA_MIN min,
       SUPPLEMENTAL_LOG_DATA_PK  pk,
       SUPPLEMENTAL_LOG_DATA_UI  ui,
       SUPPLEMENTAL_LOG_DATA_FK  fk,
       SUPPLEMENTAL_LOG_DATA_ALL "all"
  6  from v$database;

MIN			 PK	   UI	     FK        all
------------------------ --------- --------- --------- ---------
YES			 NO	   NO	     NO        YES

创建debezium相关用户并授权

CREATE USER c DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs;
GRANT CREATE SESSION TO c;
GRANT SET CONTAINER TO c;
GRANT SELECT ON V_$DATABASE to c;
GRANT FLASHBACK ANY TABLE TO c;
GRANT SELECT ANY TABLE TO c;
GRANT SELECT_CATALOG_ROLE TO c;
GRANT EXECUTE_CATALOG_ROLE TO c;
GRANT SELECT ANY TRANSACTION TO c;
GRANT LOGMINING TO c;


GRANT CREATE TABLE TO c;
GRANT LOCK ANY TABLE TO c;
GRANT ALTER ANY TABLE TO c;
GRANT CREATE SEQUENCE TO c;

GRANT EXECUTE ON DBMS_LOGMNR TO c;
GRANT EXECUTE ON DBMS_LOGMNR_D TO c;

GRANT SELECT ON V_$LOG TO c;
GRANT SELECT ON V_$LOG_HISTORY TO c;
GRANT SELECT ON V_$LOGMNR_LOGS TO c;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c;
GRANT SELECT ON V_$LOGFILE TO c;
GRANT SELECT ON V_$ARCHIVED_LOG TO c;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c;

配置kafka-connect

说明:kafka-connect按照分布式方式配置。

cd /opt/kafka_2.13-2.8.0

10.0.2.18节点配置

cat config/connect-distributed.properties
bootstrap.servers=10.0.2.18:9092,10.0.2.19:9092,10.0.2.20:9092
group.id=connect-cluster
#group.id=1
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false


offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3

offset.flush.interval.ms=10000
rest.advertised.host.name=10.0.2.18
#rest.advertised.port=8083

offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/opt/debezium-connector-oracle/

10.0.2.19节点配置

cat config/connect-distributed.properties
bootstrap.servers=10.0.2.18:9092,10.0.2.19:9092,10.0.2.20:9092
group.id=connect-cluster
#group.id=1
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false


offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3

offset.flush.interval.ms=10000
rest.advertised.host.name=10.0.2.19
#rest.advertised.port=8083

offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/opt/debezium-connector-oracle/

10.0.2.20节点配置

cat config/connect-distributed.properties
bootstrap.servers=10.0.2.18:9092,10.0.2.19:9092,10.0.2.20:9092
group.id=connect-cluster
#group.id=1
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false


offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3

config.storage.topic=connect-configs
config.storage.replication.factor=3

status.storage.topic=connect-status
status.storage.replication.factor=3

offset.flush.interval.ms=10000
rest.advertised.host.name=10.0.2.20
#rest.advertised.port=8083

offset.storage.file.filename=/tmp/connect.offsets
plugin.path=/opt/debezium-connector-oracle/

创建启动必须topic

bin/kafka-topics.sh --create --zookeeper 10.0.2.18:2181 --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
bin/kafka-topics.sh --create --zookeeper 10.0.2.19:2181 --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact

启动kafka-connect

在每个节点都执行

cd /opt/kafka_2.13-2.8.0
bin/connect-distributed.sh config/connect-distributed.properties

创建连接器

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"database.server.name" : "server1",
"database.hostname" : "10.0.2.15",
"database.port" : "1521",
"database.user" : "c",
"database.password" : "dbz",
"database.dbname" : "ORCL",
"database.history.kafka.bootstrap.servers" : "10.0.2.20:9092,10.0.2.18:9092,10.0.2.19:9092",
"database.history.kafka.topic": "schema-changes.inventory"
}
}'

查看连接器

[root@localhost kafka_2.13-2.8.0]# curl -s localhost:8083/connectors|jq
[
  "inventory-connector"
]

查看连接器详细信息

[root@localhost kafka_2.13-2.8.0]# curl -s localhost:8083/connectors/inventory-connector|jq
{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.oracle.OracleConnector",
    "database.user": "c",
    "database.dbname": "ORCL",
    "tasks.max": "1",
    "database.hostname": "10.0.2.15",
    "database.password": "dbz",
    "database.history.kafka.bootstrap.servers": "10.0.2.20:9092,10.0.2.18:9092,10.0.2.19:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "name": "inventory-connector",
    "database.server.name": "server1",
    "database.port": "1521"
  },
  "tasks": [
    {
      "connector": "inventory-connector",
      "task": 0
    }
  ],
  "type": "source"
}

查看连接器状态

[root@localhost kafka_2.13-2.8.0]# curl -s localhost:8083/connectors/inventory-connector/status|jq
{
  "name": "inventory-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.0.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "127.0.0.1:8083"
    }
  ],
  "type": "source"
}

验证是否生成topic

在这里插入图片描述

验证数据同步

查看oracle表中数据

SQL> conn test/test;
Connected.
SQL> select * from student;
0 rows selected.

查看kafka对应的topic中的数据

bin/kafka-console-consumer.sh --bootstrap-server 10.0.2.20:9092 --topic server1.TEST.STUDENT --from-beginning

在这里插入图片描述

验证乱序插入数据是否可以同步

oracle表中插入数据

SQL> insert into student(sno,sname,ssex,sbirthday,sclass) values(108,'曾华','男',to_date('1977-09-01','yyyy-mm-dd'),95033);

1 row created.

SQL> commit;

Commit complete.
SQL> insert into student(sno,sname,ssex,sbirthday,sclass) values(105,'匡明','男',to_date('1975-10-02','yyyy-mm-dd'),95031);

1 row created.

SQL> commit;

Commit complete.

SQL> insert into student(sno,sname,ssex,sbirthday,sclass) values(107,'王丽','女',to_date('1976-01-23','yyyy-mm-dd'),95033);

1 row created.

SQL> commit;

Commit complete.

SQL> insert into student(sno,sname,ssex,sbirthday,sclass) values(109,'王芳','女',to_date('1975-02-10','yyyy-mm-dd'),95031);

1 row created.

SQL> commit;

Commit complete.

SQL> select * from student; 

       SNO SNAME		       SSEX	 SBIRTHDAY	 SCLASS
---------- --------------------------- --------- --------------- ---------------
       108 曾华                        男        01-SEP-77       95033
       105 匡明                        男        02-OCT-75       95031
       107 王丽                        女        23-JAN-76       95033
       109 王芳                        女        10-FEB-75       95031

验证数据是否同步
在这里插入图片描述

验证update更改数据是否同步

SQL> UPDATE student SET SNAME='UPDATE' WHERE SNO='108';

1 row updated.

SQL> commit;

Commit complete.

SQL> select * from student; 

       SNO SNAME		       SSEX	 SBIRTHDAY	 SCLASS
---------- --------------------------- --------- --------------- ---------------
       108 UPDATE01-SEP-77       95033
       105 匡明                        男        02-OCT-75       95031
       107 王丽                        女        23-JAN-76       95033
       109 王芳                        女        10-FEB-75       95031

验证更改是否同步
在这里插入图片描述

验证delete更改数据是否同步

SQL> DELETE FROM student WHERE SNO='105';

1 row deleted.

SQL> commit;

Commit complete.

SQL> select * from student; 

       SNO SNAME		       SSEX	 SBIRTHDAY	 SCLASS
---------- --------------------------- --------- --------------- ---------------
       108 UPDATE01-SEP-77       95033
       107 王丽                        女        23-JAN-76       95033
       109 王芳                        女        10-FEB-75       95031

验证更改是否同步
在这里插入图片描述
验证alter增加字段是否同步

SQL> ALTER TABLE student ADD (age integer default 22 not null);

Table altered.

SQL> commit;

Commit complete.
SQL> select * from student;

       SNO SNAME		       SSEX	 SBIRTHDAY	 SCLASS
---------- --------------------------- --------- --------------- ---------------
       AGE
----------
       108 UPDATE01-SEP-77       95033
	22

       107 王丽                        女        23-JAN-76       95033
	22

       109 王芳                        女        10-FEB-75       95031
	22

kafka-connect报错
在这里插入图片描述
验证更改是否同步
在这里插入图片描述

报错处理

连接器报错

在这里插入图片描述

解决

1、按照提示,执行命令,打开报错的表的全体字段补充日志

SQL>  ALTER TABLE TEST_OGG.TEST_OGG ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

Table altered.

2、直接打开全体字段补充日志

SQL> alter database add supplemental log data (all) columns; 

Database altered.
select SUPPLEMENTAL_LOG_DATA_MIN min,
       SUPPLEMENTAL_LOG_DATA_PK  pk,
       SUPPLEMENTAL_LOG_DATA_UI  ui,
       SUPPLEMENTAL_LOG_DATA_FK  fk,
       SUPPLEMENTAL_LOG_DATA_ALL "all"
from v$database;
MIN			 PK	   UI	     FK        all
------------------------ --------- --------- --------- ---------
YES			 NO	   NO	     NO        YES

不能加载插件错误

在这里插入图片描述

解决

debezium-connector-oracle下的jar包复制到kafka的libs目录下
cp /opt/debezium-connector-oracle/* /opt/kafka_2.13-2.8.0/libs/

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐