Kafka Connect

Kafka Connect是一个可扩展的、可靠的在kafka和其他系统之间流传输的数据工具,通过运行连接器,实现与外部系统交互的自定义逻辑,从其他来源的数据或将数据从Kafka导出到其他系统。下面使用连接器运行Kafka Connect,将数据从文件导入Kafka主题并将数据从Kafka主题导出到文件。

准备消息文件

切换到目录$KAFKA HOME,将消息写入文件test.txt

$ echo -e "foo\nbar" > test.txt

启动独立模式

接下来,我们将启动以独立模式connect-standalone.sh运行的两个连接器。 我们提供三个配置文件作为参数。 第一个始终是Kafka Connect流程的配置,包含常见配置,例如要连接的Kafka代理和数据的序列化格式。 其余配置文件均指定要创建的连接器。 这些文件包括唯一的连接器名称,要实例化的连接器类以及连接器所需的任何其他配置。

$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

Kafka附带的这些示例配置文件使用您之前启动的默认本地群集配置并创建两个连接器:第一个是source连接器,它从输入文件读取行并生成每个Kafka主题,第二个是sink连接器 从Kafka主题读取消息并将每个消息作为输出文件中的一行生成。当Kafka Connect进程启动之后,源连接器应该开始从test.txt读取行并生成主题connect-test,并且接收器连接器应该开始从主题connect-test读取消息并将它们写入文件测试sink.txt

数据测试

$ cat test.sink.txt
foo
bar
$ echo Another line>> test.txt #写入一条消息
$ cat test.sink.txt
foo
bar
Another line

通过检查输出文件test.sink.txt的内容来验证数据已通过整个管道进行传递,继续将数据添加到文件中,再次查看。

消费验证

由于数据传入基于connect-test,所以我们应该可以运行控制台消费者来查看主题中的数据:

$ bin / kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"Another line"}

后记

以上使我联想到应用产生的物理日志文件,可以通过它进行日志聚合收集,通过管道将它们放在日志处理中心,比如:文件服务器,HDFS。

  • 当然日志处理有非常成熟的高性能日志系统Scribe或Flume以及ELK日志分析平台。
  • 相比之下,Kafka同样提供了出色的性能,比如:副本高可用,端到端低延迟
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐