03-Debezium的载体Kafka Connect
什么是Kafka Connect正如前面的文章所说,Debezium提供的各种Connector都是实现了Kafka Connect的插件,运行于Kafka Connect的服务上。首先我们要知道,Kafka的特性,例如,topic的分区、I/O结合操作系统的页缓存(page cache)等,这些令Kafka具备了高吞吐量、低延时及高可用等优点。由于Kafka的优点,当需要实现CDC(Change
1 什么是Kafka Connect
正如前面的文章所说,Debezium提供的各种Connector都是Kafka Connect的插件,运行于Kafka Connect的服务上。
首先我们要知道,Kafka的特性,例如,topic的分区、I/O结合操作系统的页缓存(page cache)等,这些令Kafka具备了高吞吐量、低延时及高可用等优点。
由于Kafka的优点,当需要实现CDC(Changed Data Capture)
时,即捕获数据源
的变动并同步至目标数据源
,我们可以使用Kafka作为数据源
和目标数据源
之间的数据通道。
而作为开发人员只想专注于开发与数据源交互的代码,至于怎样开发Producer程序把捕获的数据发布到Kafka、怎样保证捕获数据的服务高可用,他们不希望多花精力考虑。这时,可以使用Kafka Connect把上下游的数据源与Kafka串联起来,而与数据源交互的业务代码则以Connector的形式运行在Kafka Connect上。
如上图所示,从捕获数据源变动情况
的Connector被称为Source Connector
,它们负责与数据源交互,把捕获到的记录放到一个集合里面(不一定是queue),然后,Kafka Connect会调用Connector
对应的Task
的poll
方法从集合中获取记录,并发送至Kafka。
还有负责把记录从Kafka拷贝至目标的Sink Connector
,但由于Debezium的Connector只实现了Source Connecto
r,下面只对Source Connector
作说明。
2 Kafka Connect的相关概念
2.1 Worker
在Kafka Connect的集群中,集群中的Kafka Connect进程实例被称为Worker。
2.2 Connector和Task
运行在Kafka Connect上的插件都包含SourceConnector
和SourceTask
抽象类的子类。
Connector
先看看SourceConnector的定义
SourceConnector
package org.apache.kafka.connect.source;
import org.apache.kafka.connect.connector.Connector;
/**
* SourceConnectors都继承该类,负责从数据源捕获数据变化情况
*/
public abstract class SourceConnector extends Connector {
@Override
protected SourceConnectorContext context() {
return (SourceConnectorContext) context;
}
}
Connector
package org.apache.kafka.connect.connector;
import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.components.Versioned;
import java.util.List;
import java.util.Map;
public abstract class Connector implements Versioned {
protected ConnectorContext context;
/**
* 初始化Connector,注入ConnectorContext对象
* @param Connector可使用ConnectorContext的对象和Kafka Connect交互
*/
public void initialize(ConnectorContext ctx) {
context = ctx;
}
/**
* 初始化Connector,注入ConnectorContext对象和task的配置信息
* @param ctx Connector可使用ConnectorContext的对象和Kafka Connect交互
* @param taskConfigs 任务的配置信息
*/
public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) {
context = ctx;
// Ignore taskConfigs. May result in more churn of tasks during recovery if updated configs
// are very different, but reduces the difficulty of implementing a Connector
}
/**
* 返回ConnectorContext对象
* @return ConnectorContext的对象
*/
protected ConnectorContext context() {
return context;
}
/**
* 开启Connector
* @param props 配置信息
*/
public abstract void start(Map<String, String> props);
/**
* 重新配置Connector,会做重启操作
* @param props 新的配置信息
*/
public void reconfigure(Map<String, String> props) {
stop();
start(props);
}
/**
* 指定SourceTask的实现子类.
*/
public abstract Class<? extends Task> taskClass();
/**
* 从当前的配置信息中提取Task所需的配置信息,返回的List包含多少个元素,就表示多少个Tasks
* @param maxTasks 一个Connector生成的Task数量
* @return Tasks的配置信息,例如List包含2个元素,就会有2个Task工作
*/
public abstract List<Map<String, String>> taskConfigs(int maxTasks);
/**
* 停止Connector
*/
public abstract void stop();
/**
* 校验Connector的配置信息.
* @param connectorConfigs Connector的配置信息
* @return 对connectorConfigs进行校验后,返回最终的配置信息
*/
public Config validate(Map<String, String> connectorConfigs) {
ConfigDef configDef = config();
if (null == configDef) {
throw new ConnectException(
String.format("%s.config() must return a ConfigDef that is not null.", this.getClass().getName())
);
}
List<ConfigValue> configValues = configDef.validate(connectorConfigs);
return new Config(configValues);
}
/**
* 定义该Connector的配置信息
* @return 该Connector的配置信息.
*/
public abstract ConfigDef config();
}
Connector
里面定义了很多方法,我们后面会结合PostgreSQL Connector去详细说明这些方法。
从上面的代码,我们可以看出,Connector
并没有直接跟数据源交互,它的主要任务是生成配置信息
及告诉Kafka Connect:请把我的作业拆分成n个Task
:
Config validate(Map<String, String> connectorConfigs)
: 校验创建Connector
时候传入的配置信息,并根据这些信息生成Config
对象。这个Config对象将被Kafka Connect使用。Class<? extends Task> taskClass()
:指定真正与数据源交互的Task的实现类。List<Map<String, String>> taskConfigs(int maxTasks)
:生成Connector
对应的Task的配置信息,例如,maxTasks
为3,表示整个作业由3个Task
共同完成,返回的List
应该也包含3个元素。
Task
下面看SourceTask的抽象类代码:
package org.apache.kafka.connect.source;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.List;
import java.util.Map;
/**
* SourceTask负责从数据源捕获数据供Kafka Connect发布至Kafka.
*/
public abstract class SourceTask implements Task {
protected SourceTaskContext context;
/**
* 初始化SourceTask.
*/
public void initialize(SourceTaskContext context) {
this.context = context;
}
/**
* 对Task进行一次性配置,并开始Task的工作.
* @param props Task的配置信息
*/
@Override
public abstract void start(Map<String, String> props);
/**
* 提取Task从数据源捕获的记录
*
* @return 捕获的记录
*/
public abstract List<SourceRecord> poll() throws InterruptedException;
/**
* 提交位移信息
*/
public void commit() throws InterruptedException {
// This space intentionally left blank.
}
/**
* 停止该Task的运行
*/
@Override
public abstract void stop();
@Deprecated
public void commitRecord(SourceRecord record) throws InterruptedException {
// This space intentionally left blank.
}
/**
* 当Kafka Connect成功发送poll()的记录到Kafka后,调用此方法回传已发送成功的记录,具体的实现可以从SourceRecord中提取位移信息
* @param record 已成功发送的记录
* @param metadata Kafka Broker的元数据
* @throws InterruptedException
*/
public void commitRecord(SourceRecord record, RecordMetadata metadata)
throws InterruptedException {
// by default, just call other method for backwards compatibility
commitRecord(record);
}
}
从上面的代码,我们可以看出,Task
才是真正与数据源和Kafka Connect交互的对象:
List<SourceRecord> poll()
:Task
把数据捕获后存进一个集合中,如queue
,Kafka Connect调用poll()
方法从集合中弹出记录发送到Kafka。commitRecord(SourceRecord record, RecordMetadata metadata)
:成功把记录发送到Kafka后,Kafka Connect调用commitRecord()
方法回传已被成功发送的记录对象
,一般的实现会从record获取位移信息
,注意,这个位移
不是Kafka消费的offset
,而是Task
读取数据源的进度信息,例如,PostgreSQL Connector获取的是WAL
的LSN
。commit()
: 一般这个方法会把commitRecord()
方法获取的记录的位移信息发布到Kafka进行持久化。
3 Kafka Connect的运行模式
Kafka Connect支持单机(standalone)
和分布式(distributed)
两种部署方式。
3.1 standalone
一个Kafka Connect的进程负责运行所有的Connector和Task。在生产环境并不建议使用。
3.2 distributed
distributed模式
为Kafka Connect提供了可扩展性
和自动容错
能力。多个拥有相同group.id
的Kafka Connect进程组成了集群,其中作为leader
的worker
负责Connector
和Task
的分配,保证Connector
和Task
平均分配到各worker上运行。
上图展示了,向一个拥有3个worker的Kafka Connect集群创建2个Connector,1个Connector有3个Task,另外一个Connector有2个Task,Connector和Task的分配情况。
4 集群管理
Kafka Connect 使用了Kafka的组管理协议
保证集群的高可用性
。集群的管理过程跟Kafka Consumer集群
的rebalance
相似,区别在Kafka Consumer
的leader
负责分配partition
,而Kafka Connect集群的leader
则负责分配Connector
和Task
。
以下情况都会导致Connector
和Task
重新分配(rebalance):
- 当Connector刚被创建的时候
- 当Connector被删除的时候
- 当有新的worker加入集群,它会想coordinator发送join request,从而触发rebalance。
- 当有worker正常退出集群,它会向coordinator发送leave request,从而触发rebalance。
- 当coordinator检测到有worker进程崩溃,会通过心跳告诉其他worker重新发送join request。
下面通过一个例子演示Kafka Connect是如何保证集群的高可用性
的。
Group1集群有3个workers运行着connector1和connector2,各有3个tasks。各worker跟被选为coordinator的broker通过心跳协议保持通讯。
现在worker3由于某些原因崩溃了
Coordinator在检测到worker3崩溃后,在响应worker1和worker2的心跳信息中,包含了要求它们重新入组的信息。
于是worker1和worker2重新发送入组请求,并在请求中带上各自正在负责的Connector和Task情况。
Coordinator收到worker1和worker2的入组信息后,把worker1选为leader,并告诉worker1已成功入组及worker1和worker2正在运行的Connector和Task情况;通知worker2已经入组。
worker1会对比当前Connector和Task运行情况,发现connector1-task3和connector2-task没有被运行,于是在sync group的请求中告诉Coordinator把connector1-task3分配给worker1,connector2-task3分配给worker2。
Coordinator收到leader的sync group请求后,提取请求中的分配结果,分别发给worker1和worker2。
最后worker1发觉connector1-task3未运行,于是就运行connector1-task3了。同样worker2发觉connector2-task3未运行,便运行它。
通过上面的例子,我们知道了,Kafka Connect是利用了Kafka的组管理协议保证集群的高可用性,这与Kafka Consumer集群分配Partition是非常类似的。在以后的章节,我们会尝试分析PostgreSQL Connector的代码,看看它是如何把PostgreSQL和Kafka Connect串联起来的。
更多推荐
所有评论(0)