kafka-connect-oracle 连接器安装部署
由于业务需求,需要从Oracle数据库中获取某表的修改日志信息。这里讲解一下在GitHub中kafka-connect-oracle 连接器的部署。1、GitHub中下载项目kafka-connect-oracle2、添加配置a.数据库必须处于archivelog模式:sqlplus / as sysdba//sqlplus sys/sys as sysdba;SQL>shutdown im
由于业务需求,需要从Oracle数据库中获取某表的修改日志信息。这里讲解一下在GitHub中kafka-connect-oracle 连接器的部署。
1、GitHub中下载项目
2、添加配置
a.数据库必须处于archivelog模式:
sqlplus / as sysdba //sqlplus sys/sys as sysdba;
SQL>shutdown immediate
SQL>startup mount
SQL>alter database archivelog;
SQL>alter database open;
b.启用补充日志记录:
sqlplus / as sysdba
SQL>alter database add supplemental log data (all) columns;
c.为了成功执行连接器,必须使用特权Oracle用户启动连接器。如果给定的用户具有DBA角色,则可以跳过此步骤。否则,需要执行以下脚本来创建特权用户:
create role logmnr_role;
grant create session to logmnr_role;
grant execute_catalog_role,select any transaction ,select any dictionary to logmnr_role;
create user kafka identified by kafkapass;
grant logmnr_role to kafka;
alter user kafka quota unlimited on users;
3、修改配置文件
配置文件在此路径下:kafka-connect-oracle-master.config.OracleSourceConnector.properties
可以看到其中的默认配置信息:
其中参数介绍:
名称 |
类型 | 描述 |
name | String | 连接器名称 |
connector.class | String | 此连接器的Java类的名称。 |
db.name.alias | String | 数据库的标识符名称(例如Test,Dev,Prod)或用于标识数据库的特定名称。此名称将用作主题和架构名称的标头。 |
task.max | Integer | 创建的最大任务数。此连接器使用单个任务。 |
topic | String | 消息将被写入的主题的名称。如果设置了一个值,则所有消息都将被写入此声明的主题(如果未设置),则将为每个数据库表动态创建一个主题。 |
db.name | String | 要连接的数据库的服务名称或sid。通常使用数据库服务名称。 |
db.hostname | String | Oracle数据库服务器的IP地址或主机名。 |
db.port | Integer | Oracle数据库服务器的端口号。 |
db.user | String | 用于连接到数据库以启动和执行logminer的数据库用户的名称。该用户必须提供上述必要的特权。 |
db.user.password | String | 数据库用户密码。 |
db.fetch.size | Integer | 此配置属性设置Oracle行提取大小值。 |
table.whitelist | String | 用逗号分隔的将要捕获的数据库模式或表名称的列表。 |
parse.dml.data | Boolean | 如果为true,则捕获的sql DML语句将解析为字段和值;如果为false,则仅发布sql DML语句。 |
reset.offset | Boolean | 如果为true,则在连接器启动时将偏移值设置为数据库的当前SCN。如果为false,则连接器将从上一个偏移值开始。 |
start.scn | Long | 如果设置为,则将偏移值设置为该指定值,并且logminer将从此SCN启动。如果连接器希望从所需的SCN启动,则可以使用此属性。 |
multitenant | Boolean | 如果为true,则启用多租户支持。如果为false,将使用单实例配置。 |
table.blacklist | String | 用逗号分隔的数据库模式或表名列表,将不会捕获。 |
修改前(默认):
name=oracle-logminer-connector
connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector
db.name.alias=test
tasks.max=1
topic=cdctest
db.name=testdb
db.hostname=10.1.X.X
db.port=1521
db.user=kminer
db.user.password=kminerpass
db.fetch.size=1
table.whitelist=TEST.*,TEST2.TABLE2
parse.dml.data=true
reset.offset=true
start.scn=
multitenant=false
修改后:
name=oracle-logminer-connector
connector.class=com.ecer.kafka.connect.oracle.OracleSourceConnector
db.name.alias=orcl
tasks.max=1
topic=student
db.name=orcl
db.hostname=192.168.129.156
db.port=1521
db.user=kafka
db.user.password=kafkapass
db.fetch.size=1
table.whitelist=orcl.student
table.blacklist=
parse.dml.data=true
reset.offset=false
multitenant=false
4、保存后建立与运行
A.右键项目名后 Run as->Maven clean,然后再Run as->Maven install。
B.产生jar包:kafka-connect-oracle-1.0.jar 和lib中的ojdbc7.jar 复制到KAFKA_HOME / lib文件夹中。
a.先拷贝到一个指定的路径下:/root/kafka---oracle
b.在kafka的conf.dist路径下找到connect-distributed.properties文件并修改其中的plugin.path:
plugin.path=/root/kafka---oracle
也建议一些连接器,转换器的所需的jar包可以放在这里指定路径
eg:如果CONFLUENT平台用于kafka集群,请将ojdbc7.jar和kafka-connect-oracle-1.0.jar复制到$ CONFLUENT_HOME / share / java / kafka-connect-jdbc文件夹。
C.将config / OracleSourceConnector.properties文件复制到$ KAFKA_HOME / config中。我这里的kafka是CDH中安装的,所以路径是:/opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/etc/kafka/conf.dist
D.尽量不改动其中的文件,复制connect-distributed.properties为connect-distributed-Oracle.properties 并对其修改
其中配置信息:
bootstrap.servers=localhost:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets
offset.storage.replication.factor=1
config.storage.topic=connect-configs
config.storage.replication.factor=1
status.storage.topic=connect-status
status.storage.replication.factor=1
offset.flush.interval.ms=10000
plugin.path=/root/kafka---oracle
E.启动连接器
CDH中kafka的connect-distributed.sh存在目录:/opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/lib/kafka/bin
CDH中kafka的connect-distributed.properties存在目录:/opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4/etc/kafka/conf.dist
> cd /opt/cloudera/parcels/KAFKA-4.1.0-1.4.1.0.p0.4
> ./lib/kafka/bin/connect-distributed.sh ./etc/kafka/conf.dist/connect-distributed-Oracle.properties ./etc/kafka/conf.dist/OracleSourceConnector.properties
可能会报错:
…………
log4j:ERROR Could not read configuration file from URL [file:./lib/kafka/bin/../config/connect-log4j.properties].
java.io.FileNotFoundException: ./lib/kafka/bin/../config/connect-log4j.properties (No such file or directory)
…………
解决办法在此博客:使用kafka报错log4j:ERROR Could not read configuration file from URL
注意:这里的kafka是CDH创建的,所以后面设计到的sh或者properties不选择connect-standalone,而是connect-distributed!!!主要的区别就是 connect-standalone 是对单节点的kafka进行操作,而connect-distributed是对集群中的kafka进行操作。
F.执行成功
更多推荐
所有评论(0)