[debezium 源码分析] MySqlConnectorTask 启动和拉取数据过程分析
MySqlConnectorTask的poll方法会获取,并将这些数据存入kafka内。start 方法现在先分析MySqlConnectorTask.start方法,下面是一部分代码...this.taskContext.start();boolean startWithSnapshot = false;boolean snapshotEventsAreInserts ...
MySqlConnectorTask
的poll
方法会获取,并将这些数据存入kafka
内。
start 方法
现在先分析MySqlConnectorTask.start
方法,下面是一部分代码
...
this.taskContext.start();
boolean startWithSnapshot = false;
boolean snapshotEventsAreInserts = true;
final SourceInfo source = taskContext.source();
...
可以看到start()
方法里调用了taskContext.start()
方法,后者会调用MysqlSchema.start()
方法,MysqlSchema.start()
方法内调用了DatabaseHistory.start()
方法,默认使用的DatabaseHistory
实现是KafkaDatabaseHistory
;KafkaDatabaseHistory.start()
方法里初始化了一个KafkaProducer
实例。
继续分析MySqlConnectorTask.start
方法
// Get the offsets for our partition ...
boolean startWithSnapshot = false;
boolean snapshotEventsAreInserts = true;
final SourceInfo source = taskContext.source();
Map<String, ?> offsets = context.offsetStorageReader().offset(taskContext.source().partition());
if (offsets != null) {
// Set the position in our source info ...
source.setOffset(offsets);
logger.info("Found existing offset: {}", offsets);
// Before anything else, recover the database history to the specified binlog coordinates ...
taskContext.loadHistory(source);
if (source.isSnapshotInEffect()) {
// The last offset was an incomplete snapshot that we cannot recover from...
if (taskContext.isSnapshotNeverAllowed()) {
// No snapshots are allowed
String msg = "The connector previously stopped while taking a snapshot, but now the connector is configured "
+ "to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.";
throw new ConnectException(msg);
}
// Otherwise, restart a new snapshot ...
startWithSnapshot = true;
logger.info("Prior execution was an incomplete snapshot, so starting new snapshot");
} else {
// No snapshot was in effect, so we should just start reading from the binlog ...
startWithSnapshot = false;
// But check to see if the server still has those binlog coordinates ...
if (!isBinlogAvailable()) {
if (!taskContext.isSnapshotAllowedWhenNeeded()) {
String msg = "The connector is trying to read binlog starting at " + source + ", but this is no longer "
+ "available on the server. Reconfigure the connector to use a snapshot when needed.";
throw new ConnectException(msg);
}
startWithSnapshot = true;
}
}
} else {
// We have no recorded offsets ...
if (taskContext.isSnapshotNeverAllowed()) {
// We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the
// full history of the database.
logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog");
source.setBinlogStartPoint("", 0L);// start from the beginning of the binlog
taskContext.initializeHistory();
// Look to see what the first available binlog file is called, and whether it looks like binlog files have
// been purged. If so, then output a warning ...
String earliestBinlogFilename = earliestBinlogFilename();
if (earliestBinlogFilename == null) {
logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled.");
} else if (!earliestBinlogFilename.endsWith("00001")) {
logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required.");
}
} else {
// We are allowed to use snapshots, and that is the best way to start ...
startWithSnapshot = true;
// The snapshot will determine if GTIDs are set
logger.info("Found no existing offset, so preparing to perform a snapshot");
// The snapshot will also initialize history ...
}
}
首先看offset != null
的内容,如果允许snapshot
,就设置startWithSnapshot
为true
;否则设为false
,但是如果这个时候binlog
,不可用并且设值必要时刻不可用snapshot
,那么就会抛出异常;如果设置为必要时刻可以snapshot
,那么就startWithSnapshot
为true
;
现在看offsets == null
部分的内容,offset
为null
表示从之前不存在同名的debezium
订阅任务;如果用户上传的配置信息里不允许snapshot
,那么从binlog
为0
开始的位置进行消费, 之后会调用earliestBinlogFilename()
方法,获取最早的binlog
日志名。
protected String earliestBinlogFilename() {
// Accumulate the available binlog filenames ...
List<String> logNames = new ArrayList<>();
try {
logger.info("Checking all known binlogs from MySQL");
taskContext.jdbc().query("SHOW BINARY LOGS", rs -> {
while (rs.next()) {
logNames.add(rs.getString(1));
}
});
} catch (SQLException e) {
throw new ConnectException("Unexpected error while connecting to MySQL and looking for binary logs: ", e);
}
if (logNames.isEmpty()) return null;
return logNames.get(0);
}
如果用户允许snapshot
,将startWithSnapshot
设置为true
;
下面是剩下的MySqlConnectorTask.start
内容
// Check whether the row-level binlog is enabled ...
final boolean rowBinlogEnabled = isRowBinlogEnabled();
// Set up the readers, with a callback to `completeReaders` so that we know when it is finished ...
readers = new ChainedReader();
readers.uponCompletion(this::completeReaders);
BinlogReader binlogReader = new BinlogReader("binlog", taskContext);
if (startWithSnapshot) {
// We're supposed to start with a snapshot, so set that up ...
SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext);
snapshotReader.useMinimalBlocking(taskContext.useMinimalSnapshotLocking());
if (snapshotEventsAreInserts) snapshotReader.generateInsertEvents();
readers.add(snapshotReader);
if (taskContext.isInitialSnapshotOnly()) {
logger.warn("This connector will only perform a snapshot, and will stop after that completes.");
readers.add(new BlockingReader("blocker"));
readers.uponCompletion("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
} else {
if (!rowBinlogEnabled) {
throw new ConnectException("The MySQL server is not configured to use a row-level binlog, which is "
+ "required for this connector to work properly. Change the MySQL configuration to use a "
+ "row-level binlog and restart the connector.");
}
readers.add(binlogReader);
}
} else {
if (!rowBinlogEnabled) {
throw new ConnectException(
"The MySQL server does not appear to be using a row-level binlog, which is required for this connector to work properly. Enable this mode and restart the connector.");
}
// We're going to start by reading the binlog ...
readers.add(binlogReader);
}
// And finally initialize and start the chain of readers ...
this.readers.initialize();
this.readers.start();
先介绍2
个Reader
的功能: 1) BinlogReader
订阅最新的binlog
数据;2) SnapshotReader
订阅全量数据;
如果startWithSnapshot
为true
并且不是initial_only
模式,就会向readers
里添加BinlogReader
和SnapshotReader
实例,否则如果只是startWithSnapshot
为true
,那么会添加SnapshotReader
和BlockingReader
实例,后者会阻塞订阅任务的运行(因为用户配置的为initial_only
模式);如果为false
就只添加BinlogReader
;
poll 方法
下面是MysqlConnectorTask.poll
代码
@Override
public List<SourceRecord> poll() throws InterruptedException {
Reader currentReader = readers;
if (currentReader == null) {
return null;
}
PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
try {
logger.trace("Polling for events");
return currentReader.poll();
} finally {
prevLoggingContext.restore();
}
}
currentReader.poll()
方法里会调用AbstractReader.poll
方法:
@Override
public List<SourceRecord> poll() throws InterruptedException {
// Before we do anything else, determine if there was a failure and throw that exception ...
failureException = this.failure.get();
if (failureException != null) {
// In this case, we'll throw the exception and the Kafka Connect worker or EmbeddedEngine
// will then explicitly stop the connector task. Most likely, however, the reader that threw
// the exception will have already stopped itself and will generate no additional records.
// Regardless, there may be records on the queue that will never be consumed.
throw failureException;
}
// this reader has been stopped before it reached the success or failed end state, so clean up and abort
if (!running.get()) {
cleanupResources();
throw new InterruptedException( "Reader was stopped while polling" );
}
logger.trace("Polling for next batch of records");
List<SourceRecord> batch = new ArrayList<>(maxBatchSize);
while (running.get() && (records.drainTo(batch, maxBatchSize) == 0) && !success.get()) {
// No records are available even though the snapshot has not yet completed, so sleep for a bit ...
metronome.pause();
// Check for failure after waking up ...
failureException = this.failure.get();
if (failureException != null) throw failureException;
}
if (batch.isEmpty() && success.get() && records.isEmpty()) {
// We found no records but the operation completed successfully, so we're done
this.running.set(false);
cleanupResources();
return null;
}
pollComplete(batch);
logger.trace("Completed batch of {} records", batch.size());
return batch;
}
注意这里的batch
,它是最后会返回的拉取到的数据,这个list
由records
这个BlockingQueue
来填充。records
在enqueueRecord
被填充数据。
protected void enqueueRecord(SourceRecord record) throws InterruptedException {
if (record != null) {
if (logger.isTraceEnabled()) {
logger.trace("Enqueuing source record: {}", record);
}
this.records.put(record);
}
}
这个方法会被SnapshotReader
或BinlogReader
调用;,这2
个Reader
主要用来收集解析数据。目前为止,kafka connect
调用poll
拉取数据的过程已经清楚了。
更多推荐
所有评论(0)