debezium mysql 创建订阅任务逻辑
用户通过kafka connect提供的REST接口,向debezium提交配置内容,debezium会根据提交的配置信息,启动debezium订阅任务。debezium创建mysql同步任务逻辑主要在MysqlConnectorTask.start方法内,由以下3个大步骤组成:1. 初始化和构建Readers2. Readers工作3. 数据获取之后初始化和构建Reade...
用户通过kafka connect
提供的REST
接口,向debezium
提交配置内容,debezium
会根据提交的配置信息,启动debezium
订阅任务。
debezium
创建mysql
同步任务逻辑主要在MysqlConnectorTask.start
方法内,由以下3
个大步骤组成:
1. 初始化和构建Readers
2. Readers
工作
3. 数据获取之后
初始化和构建Readers
初始化的整个过程主要在MySqlConnectorTask.start
方法内。
- 从
offset.storage.topic
获取offset
信息,记录上一次消费的位点。 - 判断
debezium
是否需要snapshot
; 如果需要snapshot
,会构建SnapshotReader
。 - 最后构建
BinlogReader
。
完成上述3
个步骤后,debezium
初始化完成,其构建的readers
准备开始工作。如果SnapshotReader
存在,它会先于BinlogReader
工作,在SnapshotReader
完成工作之前,BinlogReader
会一直处于阻塞状态。
Readers
工作
Readers
分为BinlogReader
和SnapshotReader
。
SnapshotReader
工作
SnapshotReader
通过调用doStart
方法后开始工作,正真运行时调用的方法是execute
,整个工作过程是在一个单独线程内进行。
其具体工作过程有一下几步:
1. 设置事务隔离级别,SET TRANSACTION ISOLATION LEVEL REPEATABLE READ
;
2. 对数据库设置全局只读锁,FLUSH TABLES WITH READ LOCK
,当然可以通过设置snapshot.locking.mode
,取消全局的只读锁。这一步很关键,会影响到库的数据插入。默认是表级别的只读锁;
3. 开始事务,START TRANSACTION WITH CONSISTENT SNAPSHOT
;
4. 从master
读取读取binlog
内容;
5. 获取数据库;
6. 根据第5
步获取的数据库,获取所有的库含有的表,同时对不需要的表进行过滤;
7. 如果第2
步没有加全局只读锁,则尝试加表级别的只读锁。和2
步骤里一样,可以通过设置不加只读锁;
8. 构建ddl
语句,对应每个表的schema
; 这一步非常重要;
9. 如果加锁模式为SnapshotLockingMode.MINIMAL
,并且目前只读锁为全局锁,则释放全局只读锁;
10. SNAPSHOT
模式读取数据;首先判断表数据量,根据数据大小设置链接参数,获取connection
; 执行select * from xxx
,获取表的数据;
11. 最后释放只读锁;
上述几个步骤后,debezium snapshot
完成。完成之后会调用SnapshotReader.doStop
方法,释放相关的资源,该方法内会调用MySqlConnectorTask.completeReaders
方法,用来启动下一个Reader
,也就是BinlogReader
。
BinlogReader
工作
BinlogReader
主要用于获取Mysql
的实时变更。它初始化时注册了很多处理器,用于处理不同的DML
。
BinlogReader
内部会注册监听器,用于监听mysql
的变换。当有数据变更时,它会收到一个事件。根据事件类型,使用不同的注册的处理器处理数据。
数据获取之后
Reader
用于获取数据,数据获取之后会被放入一个队列内,队列内的数据最后会被kafka connect
处理,发送相应的kafka topic
。
以上3
大步骤是debezium mysql connector
工作的核心逻辑。
更多推荐
所有评论(0)