从flink社区看到 MongoDB -> Debezium -> Kafka 作为 Flink datasource 的方案,试着搭了一套。

MongoDB 使用副本+分片

Debezium 作为 Kafka Connect 的插件,首先配置 Kafka Connect,选择的是 connect-distribute 模式,这里踩到了一个坑, 因为我的集群 java 版本还在 7,所以 Kafka Connect load 不到插件, 要升级到 java8 。

如果要用于生产, Kafka Connect 需要的 3 个 topic( offsets, configs, status ) 需要认真配置 partition 和 replication。Kafka 集群如果有认证, 需要在 connect-distribute.properties 中也配置认证三兄弟( sasl.jaas.config, sasl.mechanism, security.protocol )

Kafka Connect 在分布式模式下,使用REST API交互,所以需要为 Debezium 插件准备一个 json 格式的配置文件

vi debe-connect.json

{
  "name": "test-connector",
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "mongodb.hosts": "configs/mongos1:mongos-port,configs/mongos2:mongos-port,configs/mongos3:mongos-port",
    "tasks.max" : "1",
    "mongodb.name": "mongo-test",
    "mongodb.user": "root",
    "mongodb.password": "123456",
    "collection.whitelist": "databaseName.collectionName",
    "connect.max.attempts": "12"
  }
}

name: connector 任务的名字,需要唯一

mongodb.hosts: configs 位置是副本集配置的名字

collection.whitelist: 指定监控的 collection 可以为单个,也可以使用正则匹配多个。对应的还有 database.whitelist/blacklist 具体阅读官网吧

mongodb.name: 字面意思,按照我的配置最终 Kafka 内 topic 名字会是 mongo-test.databaseName.collectionName

有认证的 MongoDB 需要配置用户名密码

提交 connector 命令

curl -i -X POST -H "Content-type:application/json" -H "Accept:application/json" -d @路径/debe-connector.json http://localhost:8083/connectors

查看已提交 connector 命令

curl http://localhost:8083/connectors

删除已提交 connector 命令

curl -i -X DELETE http://localhost:8083/connectors/connectorName

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐