[debezium 源码分析] MySqlConnectorTask 启动和拉取数据过程分析
MySqlConnectorTask的poll方法会获取,并将这些数据存入kafka内。start 方法现在先分析MySqlConnectorTask.start方法,下面是一部分代码...this.taskContext.start();boolean startWithSnapshot = false;boolean snapshotEventsAreInserts ...
MySqlConnectorTask的poll方法会获取,并将这些数据存入kafka内。
start 方法
现在先分析MySqlConnectorTask.start方法,下面是一部分代码
...
this.taskContext.start();
boolean startWithSnapshot = false;
boolean snapshotEventsAreInserts = true;
final SourceInfo source = taskContext.source();
...
可以看到start()方法里调用了taskContext.start()方法,后者会调用MysqlSchema.start()方法,MysqlSchema.start()方法内调用了DatabaseHistory.start()方法,默认使用的DatabaseHistory实现是KafkaDatabaseHistory;KafkaDatabaseHistory.start()方法里初始化了一个KafkaProducer实例。
继续分析MySqlConnectorTask.start方法
// Get the offsets for our partition ...
boolean startWithSnapshot = false;
boolean snapshotEventsAreInserts = true;
final SourceInfo source = taskContext.source();
Map<String, ?> offsets = context.offsetStorageReader().offset(taskContext.source().partition());
if (offsets != null) {
// Set the position in our source info ...
source.setOffset(offsets);
logger.info("Found existing offset: {}", offsets);
// Before anything else, recover the database history to the specified binlog coordinates ...
taskContext.loadHistory(source);
if (source.isSnapshotInEffect()) {
// The last offset was an incomplete snapshot that we cannot recover from...
if (taskContext.isSnapshotNeverAllowed()) {
// No snapshots are allowed
String msg = "The connector previously stopped while taking a snapshot, but now the connector is configured "
+ "to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.";
throw new ConnectException(msg);
}
// Otherwise, restart a new snapshot ...
startWithSnapshot = true;
logger.info("Prior execution was an incomplete snapshot, so starting new snapshot");
} else {
// No snapshot was in effect, so we should just start reading from the binlog ...
startWithSnapshot = false;
// But check to see if the server still has those binlog coordinates ...
if (!isBinlogAvailable()) {
if (!taskContext.isSnapshotAllowedWhenNeeded()) {
String msg = "The connector is trying to read binlog starting at " + source + ", but this is no longer "
+ "available on the server. Reconfigure the connector to use a snapshot when needed.";
throw new ConnectException(msg);
}
startWithSnapshot = true;
}
}
} else {
// We have no recorded offsets ...
if (taskContext.isSnapshotNeverAllowed()) {
// We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the
// full history of the database.
logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog");
source.setBinlogStartPoint("", 0L);// start from the beginning of the binlog
taskContext.initializeHistory();
// Look to see what the first available binlog file is called, and whether it looks like binlog files have
// been purged. If so, then output a warning ...
String earliestBinlogFilename = earliestBinlogFilename();
if (earliestBinlogFilename == null) {
logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled.");
} else if (!earliestBinlogFilename.endsWith("00001")) {
logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required.");
}
} else {
// We are allowed to use snapshots, and that is the best way to start ...
startWithSnapshot = true;
// The snapshot will determine if GTIDs are set
logger.info("Found no existing offset, so preparing to perform a snapshot");
// The snapshot will also initialize history ...
}
}
首先看offset != null的内容,如果允许snapshot,就设置startWithSnapshot为true;否则设为false,但是如果这个时候binlog,不可用并且设值必要时刻不可用snapshot,那么就会抛出异常;如果设置为必要时刻可以snapshot,那么就startWithSnapshot为true;
现在看offsets == null部分的内容,offset为null表示从之前不存在同名的debezium订阅任务;如果用户上传的配置信息里不允许snapshot,那么从binlog为0开始的位置进行消费, 之后会调用earliestBinlogFilename()方法,获取最早的binlog日志名。
protected String earliestBinlogFilename() {
// Accumulate the available binlog filenames ...
List<String> logNames = new ArrayList<>();
try {
logger.info("Checking all known binlogs from MySQL");
taskContext.jdbc().query("SHOW BINARY LOGS", rs -> {
while (rs.next()) {
logNames.add(rs.getString(1));
}
});
} catch (SQLException e) {
throw new ConnectException("Unexpected error while connecting to MySQL and looking for binary logs: ", e);
}
if (logNames.isEmpty()) return null;
return logNames.get(0);
}
如果用户允许snapshot,将startWithSnapshot设置为true;
下面是剩下的MySqlConnectorTask.start内容
// Check whether the row-level binlog is enabled ...
final boolean rowBinlogEnabled = isRowBinlogEnabled();
// Set up the readers, with a callback to `completeReaders` so that we know when it is finished ...
readers = new ChainedReader();
readers.uponCompletion(this::completeReaders);
BinlogReader binlogReader = new BinlogReader("binlog", taskContext);
if (startWithSnapshot) {
// We're supposed to start with a snapshot, so set that up ...
SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext);
snapshotReader.useMinimalBlocking(taskContext.useMinimalSnapshotLocking());
if (snapshotEventsAreInserts) snapshotReader.generateInsertEvents();
readers.add(snapshotReader);
if (taskContext.isInitialSnapshotOnly()) {
logger.warn("This connector will only perform a snapshot, and will stop after that completes.");
readers.add(new BlockingReader("blocker"));
readers.uponCompletion("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
} else {
if (!rowBinlogEnabled) {
throw new ConnectException("The MySQL server is not configured to use a row-level binlog, which is "
+ "required for this connector to work properly. Change the MySQL configuration to use a "
+ "row-level binlog and restart the connector.");
}
readers.add(binlogReader);
}
} else {
if (!rowBinlogEnabled) {
throw new ConnectException(
"The MySQL server does not appear to be using a row-level binlog, which is required for this connector to work properly. Enable this mode and restart the connector.");
}
// We're going to start by reading the binlog ...
readers.add(binlogReader);
}
// And finally initialize and start the chain of readers ...
this.readers.initialize();
this.readers.start();
先介绍2个Reader的功能: 1) BinlogReader订阅最新的binlog数据;2) SnapshotReader订阅全量数据;
如果startWithSnapshot为true并且不是initial_only模式,就会向readers里添加BinlogReader和SnapshotReader实例,否则如果只是startWithSnapshot为true,那么会添加SnapshotReader和BlockingReader实例,后者会阻塞订阅任务的运行(因为用户配置的为initial_only模式);如果为false就只添加BinlogReader;
poll 方法
下面是MysqlConnectorTask.poll代码
@Override
public List<SourceRecord> poll() throws InterruptedException {
Reader currentReader = readers;
if (currentReader == null) {
return null;
}
PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
try {
logger.trace("Polling for events");
return currentReader.poll();
} finally {
prevLoggingContext.restore();
}
}
currentReader.poll()方法里会调用AbstractReader.poll方法:
@Override
public List<SourceRecord> poll() throws InterruptedException {
// Before we do anything else, determine if there was a failure and throw that exception ...
failureException = this.failure.get();
if (failureException != null) {
// In this case, we'll throw the exception and the Kafka Connect worker or EmbeddedEngine
// will then explicitly stop the connector task. Most likely, however, the reader that threw
// the exception will have already stopped itself and will generate no additional records.
// Regardless, there may be records on the queue that will never be consumed.
throw failureException;
}
// this reader has been stopped before it reached the success or failed end state, so clean up and abort
if (!running.get()) {
cleanupResources();
throw new InterruptedException( "Reader was stopped while polling" );
}
logger.trace("Polling for next batch of records");
List<SourceRecord> batch = new ArrayList<>(maxBatchSize);
while (running.get() && (records.drainTo(batch, maxBatchSize) == 0) && !success.get()) {
// No records are available even though the snapshot has not yet completed, so sleep for a bit ...
metronome.pause();
// Check for failure after waking up ...
failureException = this.failure.get();
if (failureException != null) throw failureException;
}
if (batch.isEmpty() && success.get() && records.isEmpty()) {
// We found no records but the operation completed successfully, so we're done
this.running.set(false);
cleanupResources();
return null;
}
pollComplete(batch);
logger.trace("Completed batch of {} records", batch.size());
return batch;
}
注意这里的batch,它是最后会返回的拉取到的数据,这个list由records这个BlockingQueue来填充。records在enqueueRecord被填充数据。
protected void enqueueRecord(SourceRecord record) throws InterruptedException {
if (record != null) {
if (logger.isTraceEnabled()) {
logger.trace("Enqueuing source record: {}", record);
}
this.records.put(record);
}
}
这个方法会被SnapshotReader或BinlogReader调用;,这2个Reader主要用来收集解析数据。目前为止,kafka connect调用poll拉取数据的过程已经清楚了。
更多推荐


所有评论(0)