kafka(15) Kafka Connect
概念Kafka Connect 是一款可扩展并且可靠地在 Apache Kafka 和其他系统之间进行数据传输的工具。 可以很简单的定义 connectors(连接器) 将大量数据迁入、迁出Kafka。逻辑图Kafka Connect 特性如下:Kafka 连接器的通用框架:Kafka Connect 标准化了其他数据系统与Kafka的集成,从而简化了连接器的开发,部署和管理支持分布式模式和单机模
官网地址:Apache Kafka
概念
Kafka Connect 是一款可扩展并且可靠地在 Apache Kafka 和其他系统之间进行数据传输的工具。 可以很简单的定义 connectors(连接器) 将大量数据迁入、迁出Kafka。
逻辑图
Kafka Connect 特性如下:
- Kafka 连接器的通用框架:Kafka Connect 标准化了其他数据系统与Kafka的集成,从而简化了连接器的开发,部署和管理
- 支持分布式模式和单机模式部署
- Rest API:通过简单的Rest API管理连接器
- 偏移量管理:针对Source和Sink都有相应的偏移量(Offset)管理方案,程序员无须关心Offset 的提交
- 分布式模式可扩展的,支持故障转移
Connectors
连接器,分为两种 Source(从源数据库拉取pull数据写入Kafka),Sink(从Kafka消费数据写入push目标数据)
连接器其实并不参与实际的数据copy,连接器负责管理Task。连接器中定义了对应Task的类型,对外提供配置选项(用户创建连接器时需要提供对应的配置信息)。并且连接器还可以决定启动多少个Task线程。
用户可以通过Rest API 启停连接器,查看连接器状态
Confluent 已经提供了许多成熟的连接器,官网地址:Confluent Connector Portfolio
Task
实际进行数据传输的单元,和连接器一样同样分为 Source和Sink
Task的配置和状态存储在Kafka的Topic中,config.storage.topic
和status.storage.topic
。我们可以随时启动,停止任务,以提供弹性、可扩展的数据管道
Worker
刚刚我们讲的Connectors 和Task 属于逻辑单元,而Worker 是实际运行逻辑单元的进程,Worker 分为两种模式,单机模式和分布式模式
单机模式:比较简单,但是功能也受限,只有一些特殊的场景会使用到,例如收集主机的日志,通常来说更多的是使用分布式模式
分布式模式:为Kafka Connect提供了可扩展和故障转移。相同group.id
的Worker,会自动组成集群。当新增Worker,或者有Worker挂掉时,集群会自动协调分配所有的Connector 和 Task(这个过程称为Rebalance)
当使用Worker集群时,创建连接器,或者连接器Task数量变动时,都会触发Rebalance 以保证集群各个Worker节点负载均衡。但是当Task 进入Fail状态的时候并不会触发 Rebalance,只能通过Rest Api 对Task进行重启。
Converters
Kafka Connect 通过 Converter 将数据在Kafka(字节数组)与Task(Object)之间进行转换
默认支持以下Converter
- AvroConverter
io.confluent.connect.avro.AvroConverter
: 需要使用 Schema Registry - ProtobufConverter
io.confluent.connect.protobuf.ProtobufConverter
: 需要使用 Schema Registry - JsonSchemaConverter
io.confluent.connect.json.JsonSchemaConverter
: 需要使用 Schema Registry - JsonConverter
org.apache.kafka.connect.json.JsonConverter
(无需 Schema Registry): 转换为json结构 - StringConverter
org.apache.kafka.connect.storage.StringConverter
: 简单的字符串格式 - ByteArrayConverter
org.apache.kafka.connect.converters.ByteArrayConverter
: 不做任何转换
Converters 与 Connector 是解耦的,下图展示了在Kafka Connect中,Converter 在何时进行数据转换
Transforms
连接器可以通过配置Transform 实现对单个消息(对应代码中的Record)的转换和修改,可以配置多个Transform 组成一个链。例如让所有消息的topic加一个前缀、sink无法消费source 写入的数据格式,这些场景都可以使用Transform 解决
Transform 如果配置在Source 则在Task之后执行,如果配置在Sink 则在Task之前执行。
快速上手
使用步骤:
1. 启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties
2.启动kafka
bin/kafka-server-start.sh config/server.properties
3.准备数据库信息MySQL
新建一个test_user表
CREATE TABLE `test_user` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ;
并创建之后随便造几条模拟测试数据。
准备连接器,这里我是自己写了一个简单的连接器😄
4.下载连接器到本地 并修改配置并启动Worker
vi connect-standalone.properties
5.配置一个connect 的配置文件,如我在
/Users/liuchao58/software/kafka-3.0.0/kafka_2.13-3.0.0/config目录下
vi connector1.properties
内容为:
name=example-source
connector.class=com.github.taven.source.ExampleSourceConnector
tasks.max=1
database.url=jdbc:mysql://localhost:3306/springboot?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=UTC&rewriteBatchedStatements=true
database.username=test
database.password=1
database.tables=test_user
6.向Worker 发送请求,创建连接器
bin/connect-standalone.sh config/connect-standalone.properties config/connector1.properties
7.最后确认数据是否写入Kafka(Source 过程是否成功)
首先查看一下Worker中的运行状态,如果Task的state = RUNNING,代表Task没有抛出任何异常,平稳运行
curl -X GET localhost:8083/connectors/example-source/status
查看kafka 中Topic 是否创建
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
查看topic中数据,此时说明MySQL数据已经成功写入Kafka
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_user --from-beginning
--------------------------
启动 Sink
如通过REST API 执行http请求
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
"name" : "example-sink",
"config" : {
"connector.class" : "com.github.taven.sink.ExampleSinkConnector",
"topics" : "test_user, test_order",
"tasks.max" : "1",
"redis.host" : "127.0.0.1",
"redis.port" : "6379",
"redis.password" : "",
"redis.database" : "0"
}
}'
查看Sink Connector Status
curl -X GET localhost:8083/connectors/example-sink/status
在确认了Sink Status 为RUNNING 后,可以确认下Redis中是否有数据
发现和mysql库里存入的数据一致。
证明sink 通过kafka 传递给redis是成功的。
同时查看Sink Offset消费情况
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group connect-example-sink
上图代表 test_user
topic 两条数据已经全部消费
更多推荐
所有评论(0)