官网地址:Apache Kafka

概念

Kafka Connect 是一款可扩展并且可靠地在 Apache Kafka 和其他系统之间进行数据传输的工具。 可以很简单的定义 connectors(连接器) 将大量数据迁入、迁出Kafka。

逻辑图

Kafka Connect 特性如下:

  • Kafka 连接器的通用框架:Kafka Connect 标准化了其他数据系统与Kafka的集成,从而简化了连接器的开发,部署和管理
  • 支持分布式模式和单机模式部署
  • Rest API:通过简单的Rest API管理连接器
  • 偏移量管理:针对Source和Sink都有相应的偏移量(Offset)管理方案,程序员无须关心Offset 的提交
  • 分布式模式可扩展的,支持故障转移

Connectors

连接器,分为两种 Source(从源数据库拉取pull数据写入Kafka),Sink(从Kafka消费数据写入push目标数据)

连接器其实并不参与实际的数据copy,连接器负责管理Task。连接器中定义了对应Task的类型,对外提供配置选项(用户创建连接器时需要提供对应的配置信息)。并且连接器还可以决定启动多少个Task线程。

用户可以通过Rest API 启停连接器,查看连接器状态

Confluent 已经提供了许多成熟的连接器,官网地址:Confluent Connector Portfolio

Task

实际进行数据传输的单元,和连接器一样同样分为 Source和Sink

Task的配置和状态存储在Kafka的Topic中,config.storage.topicstatus.storage.topic。我们可以随时启动,停止任务,以提供弹性、可扩展的数据管道

Worker

刚刚我们讲的Connectors 和Task 属于逻辑单元,而Worker 是实际运行逻辑单元的进程,Worker 分为两种模式,单机模式和分布式模式

单机模式:比较简单,但是功能也受限,只有一些特殊的场景会使用到,例如收集主机的日志,通常来说更多的是使用分布式模式

分布式模式:为Kafka Connect提供了可扩展和故障转移。相同group.id的Worker,会自动组成集群。当新增Worker,或者有Worker挂掉时,集群会自动协调分配所有的Connector 和 Task(这个过程称为Rebalance)

当使用Worker集群时,创建连接器,或者连接器Task数量变动时,都会触发Rebalance 以保证集群各个Worker节点负载均衡。但是当Task 进入Fail状态的时候并不会触发 Rebalance,只能通过Rest Api 对Task进行重启。

Converters

Kafka Connect 通过 Converter 将数据在Kafka(字节数组)与Task(Object)之间进行转换

默认支持以下Converter

  • AvroConverter io.confluent.connect.avro.AvroConverter: 需要使用 Schema Registry
  • ProtobufConverter io.confluent.connect.protobuf.ProtobufConverter: 需要使用 Schema Registry
  • JsonSchemaConverter io.confluent.connect.json.JsonSchemaConverter: 需要使用 Schema Registry
  • JsonConverter org.apache.kafka.connect.json.JsonConverter (无需 Schema Registry): 转换为json结构
  • StringConverter org.apache.kafka.connect.storage.StringConverter: 简单的字符串格式
  • ByteArrayConverter org.apache.kafka.connect.converters.ByteArrayConverter: 不做任何转换

Converters 与 Connector 是解耦的,下图展示了在Kafka Connect中,Converter 在何时进行数据转换

Transforms

连接器可以通过配置Transform 实现对单个消息(对应代码中的Record)的转换和修改,可以配置多个Transform 组成一个。例如让所有消息的topic加一个前缀、sink无法消费source 写入的数据格式,这些场景都可以使用Transform 解决

Transform 如果配置在Source 则在Task之后执行,如果配置在Sink 则在Task之前执行。

 快速上手

使用步骤:

1. 启动zookeeper

bin/zookeeper-server-start.sh  config/zookeeper.properties

2.启动kafka

bin/kafka-server-start.sh config/server.properties

3.准备数据库信息MySQL

新建一个test_user表

CREATE TABLE `test_user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ;

并创建之后随便造几条模拟测试数据。

 准备连接器,这里我是自己写了一个简单的连接器😄

  jar 包代码地址: https://github.com/TavenYin/kafka-connect-example/blob/master/release/kafka-connector-example-bin.jar

4.下载连接器到本地 并修改配置并启动Worker

 vi connect-standalone.properties

5.配置一个connect 的配置文件,如我在

/Users/liuchao58/software/kafka-3.0.0/kafka_2.13-3.0.0/config目录下

 vi connector1.properties

内容为:

name=example-source
connector.class=com.github.taven.source.ExampleSourceConnector
tasks.max=1
database.url=jdbc:mysql://localhost:3306/springboot?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=UTC&rewriteBatchedStatements=true
database.username=test
database.password=1
database.tables=test_user

 6.向Worker 发送请求,创建连接器

bin/connect-standalone.sh config/connect-standalone.properties config/connector1.properties

7.最后确认数据是否写入Kafka(Source 过程是否成功)

首先查看一下Worker中的运行状态,如果Task的state = RUNNING,代表Task没有抛出任何异常,平稳运行

curl -X GET localhost:8083/connectors/example-source/status

 查看kafka 中Topic 是否创建

bin/kafka-topics.sh --list --bootstrap-server localhost:9092

查看topic中数据,此时说明MySQL数据已经成功写入Kafka

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test_user --from-beginning

--------------------------

启动 Sink 

如通过REST API 执行http请求

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{
  "name" : "example-sink",
  "config" : {
    "connector.class" : "com.github.taven.sink.ExampleSinkConnector",
    "topics" : "test_user, test_order",
    "tasks.max" : "1",
    "redis.host" : "127.0.0.1",
    "redis.port" : "6379",
    "redis.password" : "",
    "redis.database" : "0"
  }
}'

查看Sink Connector Status

curl -X GET localhost:8083/connectors/example-sink/status

在确认了Sink Status 为RUNNING 后,可以确认下Redis中是否有数据

发现和mysql库里存入的数据一致。 

证明sink  通过kafka 传递给redis是成功的。

同时查看Sink Offset消费情况

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group connect-example-sink

上图代表 test_user topic 两条数据已经全部消费 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐