1 什么是Kafka Connect

正如前面的文章所说,Debezium提供的各种Connector都是Kafka Connect的插件,运行于Kafka Connect的服务上。

首先我们要知道,Kafka的特性,例如,topic的分区、I/O结合操作系统的页缓存(page cache)等,这些令Kafka具备了高吞吐量、低延时及高可用等优点。

由于Kafka的优点,当需要实现CDC(Changed Data Capture)时,即捕获数据源的变动并同步至目标数据源,我们可以使用Kafka作为数据源目标数据源之间的数据通道。

而作为开发人员只想专注于开发与数据源交互的代码,至于怎样开发Producer程序把捕获的数据发布到Kafka、怎样保证捕获数据的服务高可用,他们不希望多花精力考虑。这时,可以使用Kafka Connect把上下游的数据源与Kafka串联起来,而与数据源交互的业务代码则以Connector的形式运行在Kafka Connect上。

在这里插入图片描述
如上图所示,从捕获数据源变动情况的Connector被称为Source Connector,它们负责与数据源交互,把捕获到的记录放到一个集合里面(不一定是queue),然后,Kafka Connect会调用Connector对应的Taskpoll方法从集合中获取记录,并发送至Kafka。

还有负责把记录从Kafka拷贝至目标的Sink Connector,但由于Debezium的Connector只实现了Source Connector,下面只对Source Connector作说明。

2 Kafka Connect的相关概念

2.1 Worker

在Kafka Connect的集群中,集群中的Kafka Connect进程实例被称为Worker。

2.2 Connector和Task

运行在Kafka Connect上的插件都包含SourceConnectorSourceTask抽象类的子类。

Connector

先看看SourceConnector的定义

SourceConnector

package org.apache.kafka.connect.source;

import org.apache.kafka.connect.connector.Connector;

/**
 * SourceConnectors都继承该类,负责从数据源捕获数据变化情况
 */
public abstract class SourceConnector extends Connector {

    @Override
    protected SourceConnectorContext context() {
        return (SourceConnectorContext) context;
    }
}

Connector

package org.apache.kafka.connect.connector;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.components.Versioned;

import java.util.List;
import java.util.Map;

public abstract class Connector implements Versioned {
    protected ConnectorContext context;

    /**
     * 初始化Connector,注入ConnectorContext对象
     * @param Connector可使用ConnectorContext的对象和Kafka Connect交互
     */
    public void initialize(ConnectorContext ctx) {
        context = ctx;
    }

    /**
     * 初始化Connector,注入ConnectorContext对象和task的配置信息
     * @param ctx Connector可使用ConnectorContext的对象和Kafka Connect交互
     * @param taskConfigs 任务的配置信息
     */
    public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) {
        context = ctx;
        // Ignore taskConfigs. May result in more churn of tasks during recovery if updated configs
        // are very different, but reduces the difficulty of implementing a Connector
    }

    /**
     * 返回ConnectorContext对象
     * @return ConnectorContext的对象
     */
    protected ConnectorContext context() {
        return context;
    }

    /**
     * 开启Connector
     * @param props 配置信息
     */
    public abstract void start(Map<String, String> props);

    /**
     * 重新配置Connector,会做重启操作
     * @param props 新的配置信息
     */
    public void reconfigure(Map<String, String> props) {
        stop();
        start(props);
    }

    /**
     * 指定SourceTask的实现子类.
     */
    public abstract Class<? extends Task> taskClass();

    /**
     * 从当前的配置信息中提取Task所需的配置信息,返回的List包含多少个元素,就表示多少个Tasks
     * @param maxTasks 一个Connector生成的Task数量
     * @return Tasks的配置信息,例如List包含2个元素,就会有2个Task工作
     */
    public abstract List<Map<String, String>> taskConfigs(int maxTasks);

    /**
     * 停止Connector
     */
    public abstract void stop();

    /**
     * 校验Connector的配置信息.
     * @param connectorConfigs Connector的配置信息
     * @return 对connectorConfigs进行校验后,返回最终的配置信息
     */
    public Config validate(Map<String, String> connectorConfigs) {
        ConfigDef configDef = config();
        if (null == configDef) {
            throw new ConnectException(
                String.format("%s.config() must return a ConfigDef that is not null.", this.getClass().getName())
            );
        }
        List<ConfigValue> configValues = configDef.validate(connectorConfigs);
        return new Config(configValues);
    }

    /**
     * 定义该Connector的配置信息
     * @return 该Connector的配置信息.
     */
    public abstract ConfigDef config();
}

Connector里面定义了很多方法,我们后面会结合PostgreSQL Connector去详细说明这些方法。

从上面的代码,我们可以看出,Connector并没有直接跟数据源交互,它的主要任务是生成配置信息及告诉Kafka Connect:请把我的作业拆分成n个Task

  • Config validate(Map<String, String> connectorConfigs): 校验创建Connector时候传入的配置信息,并根据这些信息生成Config对象。这个Config对象将被Kafka Connect使用。
  • Class<? extends Task> taskClass():指定真正与数据源交互的Task的实现类。
  • List<Map<String, String>> taskConfigs(int maxTasks):生成Connector对应的Task的配置信息,例如,maxTasks为3,表示整个作业由3个Task共同完成,返回的List应该也包含3个元素。

Task

下面看SourceTask的抽象类代码:

package org.apache.kafka.connect.source;

import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.List;
import java.util.Map;

/**
 * SourceTask负责从数据源捕获数据供Kafka Connect发布至Kafka.
 */
public abstract class SourceTask implements Task {

    protected SourceTaskContext context;

    /**
     * 初始化SourceTask.
     */
    public void initialize(SourceTaskContext context) {
        this.context = context;
    }

    /**
     * 对Task进行一次性配置,并开始Task的工作.
     * @param props Task的配置信息
     */
    @Override
    public abstract void start(Map<String, String> props);

    /**
     * 提取Task从数据源捕获的记录
     *
     * @return 捕获的记录
     */
    public abstract List<SourceRecord> poll() throws InterruptedException;

    /**
     * 提交位移信息
     */
    public void commit() throws InterruptedException {
        // This space intentionally left blank.
    }

    /**
     * 停止该Task的运行
     */
    @Override
    public abstract void stop();

    @Deprecated
    public void commitRecord(SourceRecord record) throws InterruptedException {
        // This space intentionally left blank.
    }

    /**
     * 当Kafka Connect成功发送poll()的记录到Kafka后,调用此方法回传已发送成功的记录,具体的实现可以从SourceRecord中提取位移信息
     * @param record 已成功发送的记录
     * @param metadata Kafka Broker的元数据
     * @throws InterruptedException
     */
    public void commitRecord(SourceRecord record, RecordMetadata metadata)
            throws InterruptedException {
        // by default, just call other method for backwards compatibility
        commitRecord(record);
    }
}

从上面的代码,我们可以看出,Task才是真正与数据源和Kafka Connect交互的对象:

  • List<SourceRecord> poll()Task把数据捕获后存进一个集合中,如queue,Kafka Connect调用poll()方法从集合中弹出记录发送到Kafka。
  • commitRecord(SourceRecord record, RecordMetadata metadata):成功把记录发送到Kafka后,Kafka Connect调用 commitRecord()方法回传已被成功发送的记录对象,一般的实现会从record获取位移信息,注意,这个位移不是Kafka消费的offset,而是Task读取数据源的进度信息,例如,PostgreSQL Connector获取的是WALLSN
  • commit(): 一般这个方法会把commitRecord()方法获取的记录的位移信息发布到Kafka进行持久化。

3 Kafka Connect的运行模式

Kafka Connect支持单机(standalone)分布式(distributed)两种部署方式。

3.1 standalone

一个Kafka Connect的进程负责运行所有的Connector和Task。在生产环境并不建议使用。

3.2 distributed

distributed模式为Kafka Connect提供了可扩展性自动容错能力。多个拥有相同group.id的Kafka Connect进程组成了集群,其中作为leaderworker负责ConnectorTask的分配,保证ConnectorTask平均分配到各worker上运行。
在这里插入图片描述
上图展示了,向一个拥有3个worker的Kafka Connect集群创建2个Connector,1个Connector有3个Task,另外一个Connector有2个Task,Connector和Task的分配情况。

4 集群管理

Kafka Connect 使用了Kafka的组管理协议保证集群的高可用性。集群的管理过程跟Kafka Consumer集群rebalance相似,区别在Kafka Consumerleader负责分配partition,而Kafka Connect集群的leader则负责分配ConnectorTask

以下情况都会导致ConnectorTask重新分配(rebalance):

  • 当Connector刚被创建的时候
  • 当Connector被删除的时候
  • 当有新的worker加入集群,它会想coordinator发送join request,从而触发rebalance。
  • 当有worker正常退出集群,它会向coordinator发送leave request,从而触发rebalance。
  • 当coordinator检测到有worker进程崩溃,会通过心跳告诉其他worker重新发送join request。

下面通过一个例子演示Kafka Connect是如何保证集群的高可用性的。

Group1集群有3个workers运行着connector1和connector2,各有3个tasks。各worker跟被选为coordinator的broker通过心跳协议保持通讯。

在这里插入图片描述

现在worker3由于某些原因崩溃了

在这里插入图片描述

Coordinator在检测到worker3崩溃后,在响应worker1和worker2的心跳信息中,包含了要求它们重新入组的信息。

在这里插入图片描述

于是worker1和worker2重新发送入组请求,并在请求中带上各自正在负责的Connector和Task情况。

在这里插入图片描述

Coordinator收到worker1和worker2的入组信息后,把worker1选为leader,并告诉worker1已成功入组及worker1和worker2正在运行的Connector和Task情况;通知worker2已经入组。

在这里插入图片描述

worker1会对比当前Connector和Task运行情况,发现connector1-task3和connector2-task没有被运行,于是在sync group的请求中告诉Coordinator把connector1-task3分配给worker1,connector2-task3分配给worker2。

在这里插入图片描述

Coordinator收到leader的sync group请求后,提取请求中的分配结果,分别发给worker1和worker2。

在这里插入图片描述

最后worker1发觉connector1-task3未运行,于是就运行connector1-task3了。同样worker2发觉connector2-task3未运行,便运行它。

在这里插入图片描述

通过上面的例子,我们知道了,Kafka Connect是利用了Kafka的组管理协议保证集群的高可用性,这与Kafka Consumer集群分配Partition是非常类似的。在以后的章节,我们会尝试分析PostgreSQL Connector的代码,看看它是如何把PostgreSQL和Kafka Connect串联起来的。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐