kafka-connect-distribute 模式,使用 debezium source 同步 MongoDB 集群到 Kafka
从flink社区看到 MongoDB ->Debezium-> Kafka 作为 Flinkdatasource 的方案,试着搭了一套。MongoDB 使用副本+分片Debezium 作为 Kafka Connect 的插件,首先配置 Kafka Connect,选择的是 connect-distribute 模式,这里踩到了一个坑, 因为我的集群 java 版本还在 7...
从flink社区看到 MongoDB -> Debezium -> Kafka 作为 Flink datasource 的方案,试着搭了一套。
MongoDB 使用副本+分片
Debezium 作为 Kafka Connect 的插件,首先配置 Kafka Connect,选择的是 connect-distribute 模式,这里踩到了一个坑, 因为我的集群 java 版本还在 7,所以 Kafka Connect load 不到插件, 要升级到 java8 。
如果要用于生产, Kafka Connect 需要的 3 个 topic( offsets, configs, status ) 需要认真配置 partition 和 replication。Kafka 集群如果有认证, 需要在 connect-distribute.properties 中也配置认证三兄弟( sasl.jaas.config, sasl.mechanism, security.protocol )
Kafka Connect 在分布式模式下,使用REST API交互,所以需要为 Debezium 插件准备一个 json 格式的配置文件
vi debe-connect.json
{
"name": "test-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"mongodb.hosts": "configs/mongos1:mongos-port,configs/mongos2:mongos-port,configs/mongos3:mongos-port",
"tasks.max" : "1",
"mongodb.name": "mongo-test",
"mongodb.user": "root",
"mongodb.password": "123456",
"collection.whitelist": "databaseName.collectionName",
"connect.max.attempts": "12"
}
}
name: connector 任务的名字,需要唯一
mongodb.hosts: configs 位置是副本集配置的名字
collection.whitelist: 指定监控的 collection 可以为单个,也可以使用正则匹配多个。对应的还有 database.whitelist/blacklist 具体阅读官网吧
mongodb.name: 字面意思,按照我的配置最终 Kafka 内 topic 名字会是 mongo-test.databaseName.collectionName
有认证的 MongoDB 需要配置用户名密码
提交 connector 命令
curl -i -X POST -H "Content-type:application/json" -H "Accept:application/json" -d @路径/debe-connector.json http://localhost:8083/connectors
查看已提交 connector 命令
curl http://localhost:8083/connectors
删除已提交 connector 命令
curl -i -X DELETE http://localhost:8083/connectors/connectorName
更多推荐
所有评论(0)