目的

实现oracle的数据增量同步

实现方式

使用开源组件,读取oracle的redolog,然后把增量数据推送到kafka。
组件地址:https://github.com/erdemcer/kafka-connect-oracle

源码介绍了单机版的使用方式,本文介绍集群版的使用方式。


前提

  1. 启用oracle的redolog,详细步骤可参考:https://blog.csdn.net/sunny05296/article/details/81130042
  2. 搭建kafka集群

详细步骤

  1. 下载 https://github.com/erdemcer/kafka-connect-oracle 的源码,使用maven打包:
    kafka-connect-oracle-1.0.jar
  2. 将上一步打的包放到kafka集群所有机器的安装目录的libs目录下,例如我的kafka安装目录是 /data/kafka,我就把jar包放到 /data/kafka/libs 目录。
  3. 启动kafka集群
  4. 所有机器开启集群connector:
    进入kafka的bin目录,执行:
    nohup ./connect-distributed.sh …/config/connect-distributed.properties >> kafka-conn-oracle.log &
  5. 在其中一台机器创建并执行以下脚本:
#!/bin/bash

curl -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
	"name": "oracle-logminer-connector",
	"config": {
		"connector.class": "com.ecer.kafka.connect.oracle.OracleSourceConnector",
		"db.name.alias": "test",
		"tasks.max": "1",
		"topic": "your-topic",
		"db.name": "orcl",
		"db.hostname": "127.0.0.1",
		"db.port": "1591",
		"db.user": "userName",
		"db.user.password": "password",
		"db.fetch.size": "1",
		"table.whitelist": "需要增量的表名,例如:tb.demo",
		"parse.dml.data": "true",
		"reset.offset": "true"
	}
}'



  1. 查看运行状态:
    ps:kafka-connector的默认端口是8083

    http://localhost:8083/connectors/oracle-logminer-connector/status
    下图表示正常运行中 在这里插入图片描述

更多kafka-connector的api

GET /connectors – 返回所有正在运行的connector名
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
GET /connectors/{name} – 获取指定connetor的信息
GET /connectors/{name}/config – 获取指定connector的配置信息
PUT /connectors/{name}/config – 更新指定connector的配置信息
GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息
PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume – 恢复一个被暂停的connector
POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐