kafka-connect-oracle集群模式
目的实现oracle的数据增量同步实现方式使用开源组件,读取oracle的redolog,然后把增量数据推送到kafka。组件地址:https://github.com/erdemcer/kafka-connect-oracle源码介绍了单机版的使用方式,本文介绍集群版的使用方式。前提启用oracle的redolog,详细步骤可参考:https://blog.csdn.net/sunny05296
目的
实现oracle的数据增量同步
实现方式
使用开源组件,读取oracle的redolog,然后把增量数据推送到kafka。
组件地址:https://github.com/erdemcer/kafka-connect-oracle
源码介绍了单机版的使用方式,本文介绍集群版的使用方式。
前提
- 启用oracle的redolog,详细步骤可参考:https://blog.csdn.net/sunny05296/article/details/81130042
- 搭建kafka集群
详细步骤
- 下载 https://github.com/erdemcer/kafka-connect-oracle 的源码,使用maven打包:
kafka-connect-oracle-1.0.jar - 将上一步打的包放到kafka集群所有机器的安装目录的libs目录下,例如我的kafka安装目录是 /data/kafka,我就把jar包放到 /data/kafka/libs 目录。
- 启动kafka集群
- 所有机器开启集群connector:
进入kafka的bin目录,执行:
nohup ./connect-distributed.sh …/config/connect-distributed.properties >> kafka-conn-oracle.log & - 在其中一台机器创建并执行以下脚本:
#!/bin/bash
curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
"name": "oracle-logminer-connector",
"config": {
"connector.class": "com.ecer.kafka.connect.oracle.OracleSourceConnector",
"db.name.alias": "test",
"tasks.max": "1",
"topic": "your-topic",
"db.name": "orcl",
"db.hostname": "127.0.0.1",
"db.port": "1591",
"db.user": "userName",
"db.user.password": "password",
"db.fetch.size": "1",
"table.whitelist": "需要增量的表名,例如:tb.demo",
"parse.dml.data": "true",
"reset.offset": "true"
}
}'
-
查看运行状态:
ps:kafka-connector的默认端口是8083http://localhost:8083/connectors/oracle-logminer-connector/status
下图表示正常运行中
更多kafka-connector的api
GET /connectors – 返回所有正在运行的connector名
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
GET /connectors/{name} – 获取指定connetor的信息
GET /connectors/{name}/config – 获取指定connector的配置信息
PUT /connectors/{name}/config – 更新指定connector的配置信息
GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息
PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume – 恢复一个被暂停的connector
POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。
更多推荐
所有评论(0)