基于阿里云DTS数据订阅binlog应用实践功能开发
基于阿里云的DTS封装最近公司需要应用订阅阿里云RDS相关的binlog,基于阿里云提供的案例subscribe_exampleale演化而来,重构成了SpringBoot、并且升级了相应的jar包,避免了很多版本上面带来的坑,还新增集成了新的客户端kafka、后续会考虑redis等等。另外还将binlog进行统一格式化封装。后续还会加强的功能点:消费位点的更新(目前位点是基于kafka同步,可以
基于阿里云的DTS封装
最近公司需要应用订阅阿里云RDS相关的binlog,基于阿里云提供的案例subscribe_exampleale演化而来,重构成了SpringBoot、并且升级了相应的jar包,避免了很多版本上面带来的坑,还新增集成了新的客户端kafka、后续会考虑redis等等。
另外还将binlog进行统一格式化封装。
后续还会加强的功能点:
- 消费位点的更新(目前位点是基于kafka同步,可以做到容灾切换)
- 消息流转的监控
- 消费情况
- 延迟
- 丢失
- 重复消费
- 业务数据处理情况
- 表
- 操作类型
- 消费情况
由于DTS集成的kafka是基于单分区的,所以同一时刻只能有一个消费者消费(也就是自身自带的消费者),在某些情况可能会产生消息堆积,导致消费延迟(如果业务直接基于dts单个消费者开发的话),这里只是将消费过来的消息进行格式化转换之后,直接放入一个全新的kafka,交给业务方自己去消费。避免binlog消费能力下降。
项目结构
- config : 配置类
- listener: 所有binlog消费到的消息都会被监听处理并且消费
- sender: 消息发送
- DtsServerApplication: 启动类
其他的基本和案例保持一致
代码流程介绍
DtsServerApplication
: 启动整个应用
StartCallback
: Spring的容器构建完成之后回调该方法,启动dts的运行环境。(dts-kafka的构建,以及一些位点线程、找对应消费者的流程初始化
以上环境基本初始化完成。
RecordConsumerListener
: binlog回调类。
AbstractEventProcess
: 事件处理类,包括关注事件、消息格式化、业务处理、消息发送等等
DDLEventProcess
: 针对表的DDL操作进行回调处理DataEventProcess
: 针对表数据的增删改查回调处理
binlog格式化模版
{
"changeFieldList":[ // 修改的字段名
"updated"
],
"databaseName":"databseName", // 数据库名称
"fieldDataMap":{
"id":{
"dataType":"java.lang.Integer", // 数据类型
"field":"id", // 字段名称
"oldValue":"502941", // 老的字段
"value":"502941" // 当前字段
},
"updated":{
"dataType":"java.util.Date",
"field":"updated",
"oldValue":"2020-10-16 11:30:06",
"value":"2020-10-16 11:35:06"
}
},
"operation":"UPDATE", // 操作类型 UPDATE、INSERT、DELETE等等
"sourceTimestamp":1602819306, // 触发时间戳
"tableName":"table_info" // 表名
}
如果有需要可以在这个基础上进行二次开发,节省更多时间。
后续还会将一些dts使用心得分享处理,会持续更新.
欢迎大家一起交流
更多推荐
所有评论(0)