MySqlConnectorTaskpoll方法会获取,并将这些数据存入kafka内。

start 方法

现在先分析MySqlConnectorTask.start方法,下面是一部分代码

...
this.taskContext.start();
boolean startWithSnapshot = false;
boolean snapshotEventsAreInserts = true;
final SourceInfo source = taskContext.source();
...

可以看到start()方法里调用了taskContext.start()方法,后者会调用MysqlSchema.start()方法,MysqlSchema.start()方法内调用了DatabaseHistory.start()方法,默认使用的DatabaseHistory实现是KafkaDatabaseHistory;KafkaDatabaseHistory.start()方法里初始化了一个KafkaProducer实例。

继续分析MySqlConnectorTask.start方法

// Get the offsets for our partition ...
boolean startWithSnapshot = false;
boolean snapshotEventsAreInserts = true;
final SourceInfo source = taskContext.source();
Map<String, ?> offsets = context.offsetStorageReader().offset(taskContext.source().partition());
if (offsets != null) {
    // Set the position in our source info ...
    source.setOffset(offsets);
    logger.info("Found existing offset: {}", offsets);

    // Before anything else, recover the database history to the specified binlog coordinates ...
    taskContext.loadHistory(source);

    if (source.isSnapshotInEffect()) {
        // The last offset was an incomplete snapshot that we cannot recover from...
        if (taskContext.isSnapshotNeverAllowed()) {
            // No snapshots are allowed
            String msg = "The connector previously stopped while taking a snapshot, but now the connector is configured "
                    + "to never allow snapshots. Reconfigure the connector to use snapshots initially or when needed.";
            throw new ConnectException(msg);
        }
        // Otherwise, restart a new snapshot ...
        startWithSnapshot = true;
        logger.info("Prior execution was an incomplete snapshot, so starting new snapshot");
    } else {
        // No snapshot was in effect, so we should just start reading from the binlog ...
        startWithSnapshot = false;

        // But check to see if the server still has those binlog coordinates ...
        if (!isBinlogAvailable()) {
            if (!taskContext.isSnapshotAllowedWhenNeeded()) {
                String msg = "The connector is trying to read binlog starting at " + source + ", but this is no longer "
                        + "available on the server. Reconfigure the connector to use a snapshot when needed.";
                throw new ConnectException(msg);
            }
            startWithSnapshot = true;
        }
    }

} else {
    // We have no recorded offsets ...
    if (taskContext.isSnapshotNeverAllowed()) {
        // We're not allowed to take a snapshot, so instead we have to assume that the binlog contains the
        // full history of the database.
        logger.info("Found no existing offset and snapshots disallowed, so starting at beginning of binlog");
        source.setBinlogStartPoint("", 0L);// start from the beginning of the binlog
        taskContext.initializeHistory();

        // Look to see what the first available binlog file is called, and whether it looks like binlog files have
        // been purged. If so, then output a warning ...
        String earliestBinlogFilename = earliestBinlogFilename();
        if (earliestBinlogFilename == null) {
            logger.warn("No binlog appears to be available. Ensure that the MySQL row-level binlog is enabled.");
        } else if (!earliestBinlogFilename.endsWith("00001")) {
            logger.warn("It is possible the server has purged some binlogs. If this is the case, then using snapshot mode may be required.");
        }
    } else {
        // We are allowed to use snapshots, and that is the best way to start ...
        startWithSnapshot = true;
        // The snapshot will determine if GTIDs are set
        logger.info("Found no existing offset, so preparing to perform a snapshot");
        // The snapshot will also initialize history ...
    }
}

首先看offset != null的内容,如果允许snapshot,就设置startWithSnapshottrue;否则设为false,但是如果这个时候binlog,不可用并且设值必要时刻不可用snapshot,那么就会抛出异常;如果设置为必要时刻可以snapshot,那么就startWithSnapshottrue;

现在看offsets == null部分的内容,offsetnull表示从之前不存在同名的debezium订阅任务;如果用户上传的配置信息里不允许snapshot,那么从binlog0开始的位置进行消费, 之后会调用earliestBinlogFilename()方法,获取最早的binlog日志名。

protected String earliestBinlogFilename() {
    // Accumulate the available binlog filenames ...
    List<String> logNames = new ArrayList<>();
    try {
        logger.info("Checking all known binlogs from MySQL");
        taskContext.jdbc().query("SHOW BINARY LOGS", rs -> {
            while (rs.next()) {
                logNames.add(rs.getString(1));
            }
        });
    } catch (SQLException e) {
        throw new ConnectException("Unexpected error while connecting to MySQL and looking for binary logs: ", e);
    }

    if (logNames.isEmpty()) return null;
    return logNames.get(0);
}

如果用户允许snapshot,将startWithSnapshot设置为true;

下面是剩下的MySqlConnectorTask.start内容

// Check whether the row-level binlog is enabled ...
final boolean rowBinlogEnabled = isRowBinlogEnabled();

// Set up the readers, with a callback to `completeReaders` so that we know when it is finished ...
readers = new ChainedReader();
readers.uponCompletion(this::completeReaders);
BinlogReader binlogReader = new BinlogReader("binlog", taskContext);
if (startWithSnapshot) {
    // We're supposed to start with a snapshot, so set that up ...
    SnapshotReader snapshotReader = new SnapshotReader("snapshot", taskContext);
    snapshotReader.useMinimalBlocking(taskContext.useMinimalSnapshotLocking());
    if (snapshotEventsAreInserts) snapshotReader.generateInsertEvents();
    readers.add(snapshotReader);

    if (taskContext.isInitialSnapshotOnly()) {
        logger.warn("This connector will only perform a snapshot, and will stop after that completes.");
        readers.add(new BlockingReader("blocker"));
        readers.uponCompletion("Connector configured to only perform snapshot, and snapshot completed successfully. Connector will terminate.");
    } else {
        if (!rowBinlogEnabled) {
            throw new ConnectException("The MySQL server is not configured to use a row-level binlog, which is "
                    + "required for this connector to work properly. Change the MySQL configuration to use a "
                    + "row-level binlog and restart the connector.");
        }
        readers.add(binlogReader);
    }
} else {
    if (!rowBinlogEnabled) {
        throw new ConnectException(
                "The MySQL server does not appear to be using a row-level binlog, which is required for this connector to work properly. Enable this mode and restart the connector.");
    }
    // We're going to start by reading the binlog ...
    readers.add(binlogReader);
}

// And finally initialize and start the chain of readers ...
this.readers.initialize();
this.readers.start();

先介绍2Reader的功能: 1) BinlogReader订阅最新的binlog数据;2) SnapshotReader订阅全量数据;
如果startWithSnapshottrue并且不是initial_only模式,就会向readers里添加BinlogReaderSnapshotReader实例,否则如果只是startWithSnapshottrue,那么会添加SnapshotReaderBlockingReader实例,后者会阻塞订阅任务的运行(因为用户配置的为initial_only模式);如果为false就只添加BinlogReader;

poll 方法

下面是MysqlConnectorTask.poll代码

@Override
public List<SourceRecord> poll() throws InterruptedException {
    Reader currentReader = readers;
    if (currentReader == null) {
        return null;
    }
    PreviousContext prevLoggingContext = this.taskContext.configureLoggingContext("task");
    try {
        logger.trace("Polling for events");
        return currentReader.poll();
    } finally {
        prevLoggingContext.restore();
    }
}

currentReader.poll()方法里会调用AbstractReader.poll方法:

@Override
public List<SourceRecord> poll() throws InterruptedException {
    // Before we do anything else, determine if there was a failure and throw that exception ...
    failureException = this.failure.get();
    if (failureException != null) {
        // In this case, we'll throw the exception and the Kafka Connect worker or EmbeddedEngine
        // will then explicitly stop the connector task. Most likely, however, the reader that threw
        // the exception will have already stopped itself and will generate no additional records.
        // Regardless, there may be records on the queue that will never be consumed.
        throw failureException;
    }

    // this reader has been stopped before it reached the success or failed end state, so clean up and abort
    if (!running.get()) {
        cleanupResources();
        throw new InterruptedException( "Reader was stopped while polling" );
    }

    logger.trace("Polling for next batch of records");
    List<SourceRecord> batch = new ArrayList<>(maxBatchSize);
    while (running.get() && (records.drainTo(batch, maxBatchSize) == 0) && !success.get()) {
        // No records are available even though the snapshot has not yet completed, so sleep for a bit ...
        metronome.pause();

        // Check for failure after waking up ...
        failureException = this.failure.get();
        if (failureException != null) throw failureException;
    }

    if (batch.isEmpty() && success.get() && records.isEmpty()) {
        // We found no records but the operation completed successfully, so we're done
        this.running.set(false);
        cleanupResources();
        return null;
    }
    pollComplete(batch);
    logger.trace("Completed batch of {} records", batch.size());
    return batch;
}

注意这里的batch,它是最后会返回的拉取到的数据,这个listrecords这个BlockingQueue来填充。recordsenqueueRecord被填充数据。

protected void enqueueRecord(SourceRecord record) throws InterruptedException {
    if (record != null) {
        if (logger.isTraceEnabled()) {
            logger.trace("Enqueuing source record: {}", record);
        }
        this.records.put(record);
    }
}

这个方法会被SnapshotReaderBinlogReader调用;,这2Reader主要用来收集解析数据。目前为止,kafka connect调用poll拉取数据的过程已经清楚了。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐