基于阿里云的DTS封装

最近公司需要应用订阅阿里云RDS相关的binlog,基于阿里云提供的案例subscribe_exampleale演化而来,重构成了SpringBoot、并且升级了相应的jar包,避免了很多版本上面带来的坑,还新增集成了新的客户端kafka、后续会考虑redis等等。

另外还将binlog进行统一格式化封装。

后续还会加强的功能点:

  1. 消费位点的更新(目前位点是基于kafka同步,可以做到容灾切换)
  2. 消息流转的监控
    1. 消费情况
      • 延迟
      • 丢失
      • 重复消费
    2. 业务数据处理情况
      1. 操作类型

由于DTS集成的kafka是基于单分区的,所以同一时刻只能有一个消费者消费(也就是自身自带的消费者),在某些情况可能会产生消息堆积,导致消费延迟(如果业务直接基于dts单个消费者开发的话),这里只是将消费过来的消息进行格式化转换之后,直接放入一个全新的kafka,交给业务方自己去消费。避免binlog消费能力下降。

项目结构

  • config : 配置类
  • listener: 所有binlog消费到的消息都会被监听处理并且消费
  • sender: 消息发送
  • DtsServerApplication: 启动类

其他的基本和案例保持一致

代码流程介绍

DtsServerApplication : 启动整个应用

StartCallback: Spring的容器构建完成之后回调该方法,启动dts的运行环境。(dts-kafka的构建,以及一些位点线程、找对应消费者的流程初始化

以上环境基本初始化完成。

RecordConsumerListener : binlog回调类。

AbstractEventProcess: 事件处理类,包括关注事件、消息格式化、业务处理、消息发送等等

  • DDLEventProcess : 针对表的DDL操作进行回调处理
  • DataEventProcess: 针对表数据的增删改查回调处理

binlog格式化模版

{
"changeFieldList":[                                // 修改的字段名
        "updated"
    ],
    "databaseName":"databseName",                  // 数据库名称
    "fieldDataMap":{
        "id":{
            "dataType":"java.lang.Integer",        // 数据类型
            "field":"id",                          // 字段名称
            "oldValue":"502941",                   // 老的字段
            "value":"502941"                       // 当前字段
        },
        "updated":{
            "dataType":"java.util.Date",
            "field":"updated",
            "oldValue":"2020-10-16 11:30:06",
            "value":"2020-10-16 11:35:06"
        }
    },
    "operation":"UPDATE",                         // 操作类型 UPDATE、INSERT、DELETE等等
    "sourceTimestamp":1602819306,                 // 触发时间戳
    "tableName":"table_info"                      // 表名
}

如果有需要可以在这个基础上进行二次开发,节省更多时间。

项目地址

后续还会将一些dts使用心得分享处理,会持续更新.

欢迎大家一起交流

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐