重点记录debezium connect的使用

工具选型

  1. MongoDB version 4.4.3
  2. 捕获工具 debezium connect 1.8.0
  3. Kafka 2.13+

参考过程

MongoDB | 编排文件

version: '3.0'

services:
  mongo:
    image: mongo:4.4.3
    container_name: mongo
    command: mongod --replSet cdcReplset --bind_ip_all
    environment:
      - MONGO_INITDB_ROOT_USERNAME=root
      - MONGO_INITDB_ROOT_PASSWORD=123456
    ports:
      - 27017:27017
    networks:
      product:
        aliases:
          - docker.mongo

networks:
  product:
    external: true

Kafka | 编排文件 (外置zookeeper)

version: '3.0'

services:
  kafka:
    image: wurstmeister/kafka:2.13-2.7.0
    container_name: kafka
    ports:
      - 9092:9092
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=docker.zookeeper.node1:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.31.233:9092
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
    networks:
      product:
        aliases:
          - docker.kafka
networks:
  product:
    external: true

debezium connect | 编排文件

version: '3.0'

services:
  connect:
    image: debezium/connect:1.8.0.Final
    container_name: connect
    ports:
      - 8083:8083
    environment:
      - BOOTSTRAP_SERVERS=docker.kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
    networks:
      product:
        aliases:
          - docker.debezium

networks:
  product:
    external: true

配置过程

初始化MongoDB副本集, mongo shell 初始化 ,此处只有一个副本

rs.initiate({_id: "cdcReplset", members: [{_id: 0, host: "docker.mongo:27017"}]})

添加 debezium connect 任务
官网参考

{
  "name": "inventory-connector", 1
  "config": {
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", 2
    "mongodb.hosts": "rs0/192.168.99.100:27017", 3
    "mongodb.name": "fullfillment", 4
    "collection.include.list": "inventory[.]*" 5
  }
}
  1. 当我们将连接器注册到 Kafka Connect 服务时,我们的连接器的名称
  2. MongoDB 连接器类的名称。
  3. 用于连接到 MongoDB 副本集的主机地址。
  4. MongoDB 副本集的逻辑名称,它为生成的事件形成命名空间,用于连接器写入的所有 Kafka 主题名称、Kafka Connect 模式名称以及 Avro 连接时对应的 Avro 模式的命名空间使用转换器。
  5. 与要监视的所有集合的集合命名空间(例如,.)匹配的正则表达式列表。这是可选的。

参考命令

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ 
-d '{
    "name": "cdc_database-connector",
    "config": {
        "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
        "mongodb.hosts": "cdcReplset/docker.mongo:27017",
        "mongodb.user": "root",
        "mongodb.password": "123456",
        "mongodb.name": "cdc_topic_prefix",
        "collection.include.list": "cdc_database.cdc_collection",
        "decimal.handling.mode": "string",
        "inconsistent.schema.handling.mode": "warn"
    }
}'

跑完稍等一会去看kafka topic

附Debezium connect捕获MySQL参考报文
POST /connectors

{
    "name": "dev-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "192.168.31.78",
        "database.port": "3306",
        "database.user": "root",
        "database.password": "123456",
        "database.server.id": "184054",
        "database.server.name": "dev",
        "database.include.list": "ruoyi_db",
        "database.history.kafka.bootstrap.servers": "kafka:9092",
        "database.history.kafka.topic": "dbhistory.dev",
        "include.schema.changes": "true",
        "decimal.handling.mode": "string",
        "binary.handling.mode": "base64",
        "snapshot.mode": "initial", //默认initial执行快照, schema_only不执行快照只捕获连接后的binlog变更
        "snapshot.locking.mode": "none" //快照阶段读取数据库结构锁模型, 配置不加锁要求快照阶段不发生数据库结构变更, 至于表数据快照MySQL快照读会使用可重复读不用加锁
    }
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐