介绍

  1. Kafka是分布式发布-订阅消息系统,最初由LinkedIn公司开发,之后成为之后成为Apache基金会的一部分,由Scala和Java编写。Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。

它与传统系统相比,有以下不同:

  1. 它被设计为一个分布式系统,易于向外扩展;
  2. 它同时为发布和订阅提供高吞吐量;
  3. 它支持多订阅者,当失败时能自动平衡消费者;
  4. 它将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。

基础概念

  1. Broker:Kafka集群包含一个或多个服务器,这些服务器就是Broker
  2. Topic:每条发布到Kafka集群的消息都必须有一个Topic
  3. Partition:是物理概念上的分区,为了提供系统吞吐率,在物理上每个Topic会分成一个或多个Partition,每个Partition对应一个文件夹
  4. Producer:消息产生者,负责生产消息并发送到Kafka Broker
  5. Consumer:消息消费者,向kafka broker读取消息并处理的客户端。
  6. Consumer Group:每个Consumer属于一个特定的组,组可以用来实现一条消息被组内多个成员消费等功能。

安装Kafka

  1. kafka_2.12-2.1.1.tgz
  2. zookeeper-3.4.13.tar.gz

一、虽然kafka内置了zookeeper,但是建议大家还是先安装zookeeper,因为生产环境时集群时,一般都是独立zookeeper:

  1. 1、命令:tar -zvxf zookeeper-3.4.13.tar.gz 解压zookeeper,当前目录下多了一个zookeeper-3.4.13的目录,zookeeper文件都在其中,目录中内容如下图;
1f8345f0e81c7f8166ec8807bd75a3f0.png
  1. 2、命令:mkdir /tmp/zoodata 创建一个目录zoodata用来存储zookeeper数据;
  2. 3、进入conf目录,命令:cp zoo_sample.cfg zoo.cfg 拷贝一个zoo.cfg文件;
  3. 4、vim zoo.cfg 修改其中的dataDir,修改为 dataDir=/tmp/zoodata ;
  4. 5、./zkServer.sh start 启动;
  5. 6、./zkServer.sh status 查看启动状态;如下图启动成功;
447fa7983854c946b4dd35cab21ec7e6.png
  1. 7、Zookeeper默认端口2181

二、配置启动kafka:

  1. 1、命令:tar -zvxf kafka_2.12-2.1.1.tar.gz 解压kafka,解压后目录结构如下图:
270fbc2777f7dbce70038363ac5065af.png
  1. 2、命令:vim config/server.properties 修改内容如下:
  2. broker.id=1
  3. log.dirs=data/kafka-logs
  4. 3、命令:bin/kafka-server-start.sh -daemon config/server.properties 启动Kafka;(-daemon代表着以后台模式运行kafka)
  5. 4、Kafka默认端口为9092,可以使用命令:netstat -anlpt | grep 9092 或者 lsof -i:9092 来查看9092端口占用情况;
57ef6120b218293ba87abb12daf41eff.png

创建Topic

一、创建Topic:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test2

19e5cd9ccbfb2b9e9bfe1cef7ed72f10.png
  1. --zookeeper //此处为为zookeeper监听的地址
  2. --replication-factor 1 //复制两份
  3. --partitions 1 //创建1个分区
  4. --topic //主题为test2
  5. 其中有一个JDK警告:“OpenJDK 64-Bit Server VM warning”,这里我们不用管,因为用的是默认的OpenJDK,说是一个参数问题,如果将Zookeeper端口改为12181,Kafka端口改为19092,好像这个警告就没有了,具体原因不太清楚,大家有兴趣可以在这里看看:参考1,参考2

二、查看Topic:bin/kafka-topics.sh --list --zookeeper localhost:2181

ac54d99e928e039e8e7a837ef92700a9.png

三、查看对应Topic描述信息:bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

004b5692e363d840978556c6ac020e25.png
  1. 第一行给出了所有分区的摘要,每个附加行给出了关于一个分区的信息。 由于我们只有一个分区,所以只有一行。
  2. Leader: 是负责给定分区的所有读取和写入的节点。 每个节点将成为分区随机选择部分的领导者。
  3. Replicas: 是复制此分区日志的节点列表,无论它们是否是领导者,或者即使他们当前处于活动状态。
  4. Isr: 是一组“同步”副本。这是复制品列表的子集,当前活着并被引导到领导者。

四、删除Topic:bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test2

生产消息/消费消息

一、生产消息命令:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 后会让输入消息,如下图:

82dd58f8616269c70bfb95c3daa5c66a.png
  1. 回车后,能继续输入,此时命令行处于阻塞状态,Ctrl+C退出;

二、消费消息命令:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning ,如下图:

a9ebf267237dbd10be7e37569c5882fa.png
  1. --from-beginning:是否从头消费;
  2. Ctrl+C退出;
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐