实验环境

准备三台虚拟机

三台机器均预装CentOS7 操作系统。分别配置机器名 worker1,worker2,worker3。

vi /etc/hosts

192.168.75.61 worker1
192.168.75.62 worker2
192.168.75.63 worker3


firewall-cmd --state   查看防火墙状态
systemctl stop firewalld.service   关闭防火墙
systemctl disable firewalld 禁止开机自启



都需要安装jdk

[root@localhost ~]# mkdir -p /usr/local/soft/jdk

[root@localhost ~]# tar -zxvf jdk-8u201-linux-x64.tar.gz -C /usr/local/soft/jdk

[root@localhost ~]# vim /etc/profile
export JAVA_HOME=/usr/local/soft/jdk/jdk1.8.0_201
export JRE_HOME=/usr/local/soft/jdk/jdk1.8.0_201/jre
export CLASS_PATH=.:$JAVA_HOME/lib:$JRE_HOME/lib
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin

[root@localhost jdk1.8.0_201]# source /etc/profile
[root@localhost jdk1.8.0_201]# java -version



下载kafka3.4.0

前面的2.13是开发kafka的scala语言的版本,后面的3.4.0是kafka应用的版本。

Scala是一种运行于JVM虚拟机之上的语言。在运行时,只需要安装JDK就可以了,选哪个Scala版本没有区别。但是如果要调试源码,就必须选择对应的Scala版本。因为Scala语言的版本并不是向后兼容的。

另外,在选择kafka版本时,建议先去kafka的官网看下发布日志,了解一下各个版本的特性。 https://kafka.apache.org/downloads。




在这里插入图片描述



下载Zookeeper 3.6.2版本

在这里插入图片描述



kafka的安装程序中自带了Zookeeper,可以在kafka的安装包的libs目录下查看到zookeeper的客户端jar包。但是,通常情况下,为了让应用更好维护,我们会使用单独部署的Zookeeper,而不使用kafka自带的Zookeeper。

下载完成后,将这两个工具包上传到三台服务器上,解压后,分别放到/usr/local/soft/kafka和/usr/local/soft/zookeeper目录下。最好是并将部署目录下的bin目录路径配置到path环境变量中。

单机服务

启动

解压kafka

[root@localhost ~]# mkdir /usr/local/soft/kafka/

[root@localhost ~]# tar -zxvf kafka_2.13-3.4.0.tgz -C /usr/local/soft/kafka/

# 最好是设置一下环境变量
[root@worker1 ~]# vi ./.bash_profile 
export KAFKA_HOME=/usr/local/soft/kafka/kafka_2.13-3.4.0
export PATH=$PATH:$KAFKA_HOME/bin
[root@worker1 ~]# source ./.bash_profile

[root@localhost ~]# cd /usr/local/soft/kafka/kafka_2.13-3.4.0

直接执行

# 启动kafka自带的zookeeper服务
# 从nohup.out中可以看到zookeeper默认会在2181端口启动。通过jps指令看到一个QuorumPeerMain进程,确定服务启动成功。
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &

# 启动kafka服务
# 也可以加上-daemon表示后台启动    bin/kafka-server-start.sh -daemon config/server.properties
# 服务会默认在9092端口启动。
nohup bin/kafka-server-start.sh config/server.properties &
[root@localhost kafka_2.13-3.4.0]# jps
2402 Kafka
2850 Jps
2044 QuorumPeerMain



停止服务

[root@worker1 ~]# kafka-server-stop.sh 
[root@worker1 ~]# zookeeper-server-stop.sh 
[root@worker1 ~]# jps
21269 Jps



简单收发消息

Kafka的基础工作机制是消息发送者可以将消息发送到kafka上指定的topic,而消息消费者,可以从指定的topic上消费消息。

在这里插入图片描述



首先,可以使用Kafka提供的客户端脚本创建Topic

#创建Topic
[root@localhost kafka_2.13-3.4.0]# bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
Created topic test.

#查看Topic   --describe参数
[root@localhost kafka_2.13-3.4.0]# bin/kafka-topics.sh --topic test --describe --bootstrap-server localhost:9092
Topic: test	TopicId: XSQWNCb3SbGbDEleLlBvtQ	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0



然后,启动一个消息发送者端。往一个名为test的Topic发送消息。

当命令行出现 > 符号后,随意输入一些字符。Ctrl+C 退出命令行。这样就完成了往kafka发消息的操作。

[root@localhost kafka_2.13-3.4.0]# bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
>1
>2
>3
>4
>

如果不提前创建Topic,那么在第一次往一个之前不存在的Topic发送消息时,消息也能正常发送,只是会抛出LEADER_NOT_AVAILABLE警告。

[oper@worker1 kafka_2.13-3.2.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>123
12[2021-03-05 14:00:23,347] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
3[2021-03-05 14:00:23,479] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2021-03-05 14:00:23,589] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)
>>123

这是因为Broker端在创建完主题后,会显示通知Clients端LEADER_NOT_AVAILABLE异常。Clients端接收到异常后,就会主动去更新元数据,获取新创建的主题信息。



然后启动一个消息消费端,从名为test的Topic上接收消息。

[root@localhost kafka_2.13-3.4.0]# bin/kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092

在这里插入图片描述



其他消费模式

指定消费进度

上方消费者只能接收到我新发送的5和6这两条消息,之前发送的4条消息就没有接收到。

如果想要消费之前发送的消息,可以通过添加--from-beginning参数指定。

[root@worker1 kafka_2.13-3.4.0]# bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
1
2
3
4
5
6



如果需要更精确的消费消息,甚至可以指定从哪一条消息开始消费。

这表示从第0号Partition上的第5条消息开始读起。

bin/kafka-console-consumer.sh --topic test --partition 0 --offset 4 --bootstrap-server localhost:9092
5
6

topic是逻辑上的概念,一个topic可以有多个partition,消息真正是保存在partition中的,可以理解为partition就是真实的queue。

消息生成者就只管指定某个topic,往topic发送消息即可,kafka内部会把消息分发到该topic内的partition中,如果存在多个partition就会有相应的负载均衡策略



分组消费

一个partition中的消息,只会被一个消费者组中的某一个消费者消费。和RocketMQ一样。不同的消费者组可以对消息有不同的处理逻辑,而同一个消费者组的目的是减少消息消费压力,做水平扩容的

kafka-console-consumer.sh脚本中,可以通过--consumer-property group.id=testGroup来指定所属的消费者组

# --consumer-property可以指定多个key-value  其中group.id只是其中一个
kafka-console-consumer.sh --topic test --consumer-property group.id=testGroup --bootstrap-server localhost:9092



我们现在可以启动三个消费者,来验证一下分组消费机制:

# 同一个消费者组
kafka-console-consumer.sh --topic test --consumer-property group.id=testGroup --bootstrap-server localhost:9092
kafka-console-consumer.sh --topic test --consumer-property group.id=testGroup --bootstrap-server localhost:9092
# 另一个消费者组
kafka-console-consumer.sh --topic test --consumer-property group.id=testGroup2 --bootstrap-server localhost:9092

在这里插入图片描述



查看消费者组的偏移量

接下来,还可以使用kafka-consumer-groups.sh观测消费者组的情况。包括他们的消费进度。

# 添加-group 和 --describe两个参数
[root@worker1 ~]# kafka-consumer-groups.sh --group testGroup --describe --bootstrap-server localhost:9092

Consumer group 'testGroup' has no active members. # 当前消费者组无消费者,这是因为我此时停止了所有的消费者进程

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG         CONSUMER-ID     HOST            CLIENT-ID
testGroup       test            0          8               8               0            -               -               -
  • GROUP表示哪一个消费者组
  • TOPIC是topic名
  • PARTITION 一个topic可以存在多个partition,则标识是第0号partition
  • CURRENT-OFFSET表示当前消费者组消费到了topic为test、第0号partition的消费消息offset偏移量
  • LOG-END-OFFSET 表示生产者往当前partition中生产的消息总数
  • LAG 表示 还未消费的数量

比如我此时使用消息生产者再发送几条消息后再查看

# 生产三条消息
[root@worker1 ~]# kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
>1  
>2
>3
>

# 此时 LOG-END-OFFSET就变为了11   而LAG就变为了3
[root@worker1 ~]# kafka-consumer-groups.sh --group testGroup --describe --bootstrap-server localhost:9092

Consumer group 'testGroup' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG         CONSUMER-ID     HOST            CLIENT-ID
testGroup       test            0          8               11              3               -               -               -

虽然业务上是通过Topic来分发消息的,但是实际上,消息是保存在Partition这样一个数据结构上的。



理解Kakfa的消息传递机制

kafka的消息生产者和消息消费者是通过topic这样一个逻辑概念来进行业务沟通的,单实际上所有的消息是存在Partition这样一个数据结构中的

在这里插入图片描述



概念:

  • 客户端client:消息生产者和消息消费者都是Kafka的Client

  • 消费者组:每个消费者可以指定一个所属的消费者组,每一条消息会被多个感兴趣的消费者组消费,但一条消息只会被一个消费者组中的一个消费者消费。

  • 服务端Broker:一个Kafka服务就是一个Broker

  • 话题Topic:逻辑概念,一个Topic被认为是业务含义相同的一组消息。客户端通过绑定Topic来生产或消费自己感兴趣的话题

  • 分区Partition:Topic是逻辑概念,而Partition是实际存储消息的组件。每一个Partition就是一个queue。



集群服务

为什么要使用集群

  • 创建多个partition,多个partition分布在不同的broker上,减少一个broker的压力
  • 为每个partition准备多个备份,保证数据不丢失。多个partition会尽可能的分布在不同的broker上。多个partition是通过zookeeper来选举出一个Leader 处理Client的读写请求,其他备份partition就是Follower角色,只做数据备份和参与重新选举Leader

集群环境下就不会再使用kafka自带的zookeeper了,会单独部署zookeeper服务。

在这里插入图片描述



部署Zookeeper集群

zookeeper是使用的zab协议,选举需要半数以上的同意,所以部署的节点数需要单数。

[root@worker1 ~]# tar -zxf apache-zookeeper-3.6.2-bin.tar.gz -C /usr/local/soft/zookeeper/
[root@worker1 ~]# cd /usr/local/soft/zookeeper/apache-zookeeper-3.6.2-bin

然后进入conf目录,修改配置文件。在conf目录中,提供了一个zoo_sample.cfg文件,这是一个示例文件。我们只需要将这个文件复制一份zoo.cfg,修改下其中的关键配置就可以了。其中比较关键的修改参数如下:

[root@worker1 apache-zookeeper-3.6.2-bin]# cd conf/
[root@worker1 conf]# cp zoo_sample.cfg zoo.cfg
[root@worker1 conf]# vi zoo.cfg 

#Zookeeper的本地数据目录,默认是/tmp/zookeeper。这是Linux的临时目录,随时会被删掉。
dataDir=/app/zookeeper/data
#Zookeeper的服务端口
clientPort=2181
#集群节点配置  2888集群内部数据传输   3888集群内部进行选举
server.1=192.168.75.61:2888:3888
server.2=192.168.75.62:2888:3888
server.3=192.168.75.63:2888:3888

其中,clientPort 2181是对客户端开放的服务端口。

集群配置部分, server.x这个x就是节点在集群中的myid。后面的2888端口是集群内部数据传输使用的端口。3888是集群内部进行选举使用的端口。



启动服务

# --config 指定config目录,它会自己去读取目录下的zoo.cfg文件
[root@worker1 apache-zookeeper-3.6.2-bin]# bin/zkServer.sh --config conf start 
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Starting zookeeper ... FAILED TO START

发现启动失败了,查看vi ./logs/zookeeper-root-server-worker1.out保存信息,提示是没有myid文件

在这里插入图片描述



因为我们上方的配置文件中dataDir=/app/zookeeper/data,那么我们就在这个目录下创建一个myid文件,并指定一个集群环境下唯一id

# 在这个文件中输入一个1   其他节点就输入2  3  保证各节点不重复即可
vi /app/zookeeper/data/myid
1

再启动服务

[root@worker1 apache-zookeeper-3.6.2-bin]# bin/zkServer.sh --config conf start 
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Starting zookeeper ... STARTED



启动完成后,使用jps指令可以看到一个QuorumPeerMain进程就表示服务启动成功。

[root@worker1 apache-zookeeper-3.6.2-bin]# jps
8438 Jps
8395 QuorumPeerMain



三台机器都启动完成后,可以查看下集群状态。

这其中Mode 为leader就是主节点,follower就是从节点。

[root@worker1 apache-zookeeper-3.6.2-bin]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /app/zookeeper/zookeeper-3.5.8/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader   



部署Kafka集群

kafka服务并不需要进行选举,因此也没有奇数台服务的建议。每台服务器上都执行下面的命令

解压kafka

[root@localhost ~]# mkdir /usr/local/soft/kafka/

[root@localhost ~]# tar -zxvf kafka_2.13-3.4.0.tgz -C /usr/local/soft/kafka/

# 最好是设置一下环境变量
[root@worker1 ~]# vi ./.bash_profile 
export KAFKA_HOME=/usr/local/soft/kafka/kafka_2.13-3.4.0
export PATH=$PATH:$KAFKA_HOME/bin
[root@worker1 ~]# source ./.bash_profile

[root@localhost ~]# cd /usr/local/soft/kafka/kafka_2.13-3.4.0



然后进入config目录,修改server.properties。这个配置文件里面的配置项非常多,下面列出几个要重点关注的配置。

[root@worker1 kafka_2.13-3.4.0]# mkdir -p /app/kafka/logs
[root@worker1 kafka_2.13-3.4.0]# vi config/server.properties
#broker 的全局唯一编号,不能重复,只能是数字。 所以一个集群下的kafka实例就需要使用不同的id
broker.id=1
#数据文件地址。同样默认是给的/tmp目录。
log.dirs=/app/kafka/logs
# 默认的每个Topic的分区数1,可以修改,我这里就修改为了2
num.partitions=2
#zookeeper的服务地址,因为我配置了域名映射,所以这里就使用使用的worker1 worker2 worker3
zookeeper.connect=worker1:2181,worker2:2181,worker3:2181
#可以选择指定zookeeper上的基础节点。
#zookeeper.connect=worker1:2181,worker2:2181,worker3:2181/kafka

broker.id需要每个服务器上不一样,分发到其他服务器上时,要注意修改一下。

多个Kafka服务注册到同一个zookeeper集群上的节点,会自动组成集群。

一些核心的配置内如下所示

PropertyDefaultDescription
broker.id0broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为一的即可。
log.dirs/tmp/kafka-logskafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。
listenersPLAINTEXT://127.0.0.1:9092server接受客户端连接的端口,ip配置kafka本机ip即可
zookeeper.connectlocalhost:2181zookeeper连接地址。hostname:port。如果是Zookeeper集群,用逗号连接。
log.retention.hours168每个日志文件删除之前保存的时间。
num.partitions1创建topic的默认分区数
default.replication.factor1自动创建topic的默认副本数量
min.insync.replicas1当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常
delete.topic.enablefalse是否允许删除主题



接下来就可以启动kafka服务了。启动服务时需要指定配置文件。

# -daemon表示后台启动
[root@worker1 kafka_2.13-3.4.0]# bin/kafka-server-start.sh -daemon config/server.properties
[root@worker1 kafka_2.13-3.4.0]# jps
8819 Kafka
8884 Jps
8395 QuorumPeerMain



理解服务端的Topic、Partition和Broker

创建一个topic,指定该topic的partition数量为4,每组partition的副本集个数为2,

这里的副本是指一个Leader一个Follower,并不是指一个Leader两个Follower

# --partitions 参数表示分区数   --replication-factor 表示每个分区的副本集个数
kafka-topics.sh --create --topic disTopic --partitions 4 --replication-factor 2 --bootstrap-server worker1:9092



列出所有的topic

[root@worker1 ~]# kafka-topics.sh --list --bootstrap-server worker1:9092
disTopic



查看列表情况

# --describe查看Topic信息。
[root@worker1 ~]# kafka-topics.sh --describe --topic disTopic --bootstrap-server worker1:9092
Topic: disTopic	TopicId: rDUdZBO7RH2GNPgdRXk7Tw	PartitionCount: 4	ReplicationFactor: 2	Configs: 
	Topic: disTopic	Partition: 0	Leader: 3	Replicas: 3,1	Isr: 3,1
	Topic: disTopic	Partition: 1	Leader: 1	Replicas: 1,2	Isr: 1,2
	Topic: disTopic	Partition: 2	Leader: 2	Replicas: 2,3	Isr: 2,3
	Topic: disTopic	Partition: 3	Leader: 3	Replicas: 3,2	Isr: 3,2
	
	
# 这里有4个partition   就拿partition0来举例,其中partition0这个副本集中的Leader在BrokerId为3的机器上,其中两个副本分别在brokerId1 BrokerId3上
  • Partiton参数列出了四个partition,后面带有分区编号,用来标识这些分区。

  • Leader表示这一组partition的Leader节点是哪一个,这个后面的数字是启动kafka服务时配置文件中指定的broker.id数

    kafka的每一组partition都会选举一个Leader,Leader会处理Client的读写请求,但实际上Leader可能会让某个副本去响应Client的请求。

  • Replicas表示这个Partition的复制集是分配到哪些Broker上的。但是Replicas列出的只是一个逻辑上的分配情况,并不关心数据实际上是不是按照这个分配,甚至有些节点服务挂了,这一列也还会显示

  • Isr表示Partition的实际分配情况,它是Replicas列的一个子集,只列出当前存活,能正常同步数据Broker节点



我们在配置文件中指定的log.dirs=/app/kafka/logs 配置

在这里插入图片描述



一个Broker上的一个partition就对应着一个目录,而这个Partition上的所有消息,就保存在这个对应的目录当中。

在这里插入图片描述



在Kafka中,Topic是逻辑上的概念,数据实际上是保存在Topic下的Partition中的,Partition就是数据存储的物理单元,而Broker是Partition的物理载体,这些Partition会尽量均匀的分配在不同的Broker上,避免一个Broker宕机整个Partition副本集都不能用的情况发生

而消费者消费消息的offset其实就是每个消息在partition上的偏移量



总结

在这里插入图片描述

  1. Topic是逻辑上的概念,producer和consumer通过Topic进行业务沟通
  2. Topic并不存储数据,数据是保存在Topic下的多组Partition中的,消息会尽量平均的分发在各组Partition中,每组Partition保存了Topic下的一部分消息
  3. 每组Partition包含一个Leader和多个Follower,每组Partition的个数成为备份因子replica factor
  4. producer将消息发送到Partition上,consumer通过partition上的offset记录自己所属组在当前partition上消费消息的消费进度
  5. producer将消息发送到Topic,kafka会推送给所有订阅了该topic的消费者组进行处理。但是每个消费者组内部只会有一个消费者实例处理这一条消息
  6. kafka的Broker通过zookeeper组成集群,然后在这些Broker中,需要选举产生一个担任Controller角色的Broker。这个Controller的主要任务就是负责Topic的分配以及后续管理工作。这也是通过ZooKeeper选举的。



Kraft集群

在Kafka的config目录下,提供了一个kraft的文件夹,在这里面就是Kraft协议的参考配置文件。在这个文件夹中有三个配置文件,broker.properties,controller.properties,server.properties,分别给出了Kraft中三种不同角色的示例配置。

  • broker.properties: 数据节点
  • controller.properties: Controller控制节点
  • server.properties: 即可以是数据节点,又可以是Controller控制节点。

这里同样列出几个比较关键的配置项,按照自己的环境进行定制即可。

[root@worker1 kafka_2.13-3.4.0]# vi config/kraft/server.properties
#配置当前节点的角色。Controller相当于Zookeeper的功能,负责集群管理。Broker提供具体的消息转发服务。
process.roles=broker,controller
#配置当前节点的id。与普通集群一样,要求集群内每个节点的ID不能重复。
node.id=1
#配置集群的投票节点。其中@前面的是node.id的值,后面是节点的地址和端口,这个端口跟客户端访问的端口是不一样的。通常将集群内的所有Controllor节点都配置进去。
controller.quorum.voters=1@192.168.75.61:9093,2@192.168.75.62:9093,3@192.168.75.63:9093
#Broker对客户端暴露的服务地址。基于PLAINTEXT协议。一般写本机ip
advertised.listeners=PLAINTEXT://worker1:9092
#Controller服务协议的别名。默认就是CONTROLLER
controller.listener.names=CONTROLLER
#配置监听服务。不同的服务可以绑定不同的接口。这种配置方式在端口前面是省略了一个主机IP的,主机IP默认是使用的java.net.InetAddress.getCanonicalHostName()
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
#数据文件地址。默认配置在/tmp目录下。
log.dirs=/app/kafka/kraft-log
#topic默认的partition分区数。
num.partitions=2



将配置文件分发,并修改每个服务器上的node.idcontroller.quorum.voterslog.dirsnum.partitions属性。

由于Kafka的Kraft集群对数据格式有另外的要求,所以在启动Kraft集群前,还需要对日志目录进行格式化。

# 先使用kafka提供的脚本生成一个uuid
[oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-storage.sh random-uuid
NT1Y5KgOTQ63SPppgsfhqA

# -t 集群ID,三个服务器上使用同一个集群ID。 加上上方的uuid  -c 指定配置文件
[oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-storage.sh format -t NT1Y5KgOTQ63SPppgsfhqA -c config/kraft/server.properties 




在这里插入图片描述



接下来就可以指定配置文件,启动Kafka的服务了。 例如,在Worker1上,启动Broker和Controller服务。

[root@worker1 kafka_2.13-3.4.0]# bin/kafka-server-start.sh -daemon config/kraft/server.properties 
[root@worker1 kafka_2.13-3.4.0]# jps
12960 Kafka
13028 Jps

等三个服务都启动完成后,就可以像普通集群一样去创建Topic,并维护Topic的信息了。



相关概念

topic是逻辑上的概念,一个topic可以有多个partition,消息真正是保存在partition中的,可以理解为partition就是真实的queue。

消息生成者就只管指定某个topic,往topic发送消息即可,kafka内部会把消息分发到该topic内的partition中,如果存在多个partition就会有相应的负载均衡策略



一个partition中的消息,只会被一个消费者组中的某一个消费者消费,和RocketMQ一样。不同的消费者组可以对消息有不同的处理逻辑,而同一个消费者组的目的是减少消息消费压力,做水平扩容的



消息副本,每个消费者组都有一个消息副本,而一个消费者组下的多个消费者共用一个消息副本,也就是一条消息只能被一个消费者组下的一个消费者消费。



概念:

  • 客户端client:消息生产者和消息消费者都是Kafka的Client

  • 消费者组:每个消费者可以指定一个所属的消费者组,每一条消息会被多个感兴趣的消费者组消费,但一条消息只会被一个消费者组中的一个消费者消费。

  • 服务端Broker:一个Kafka服务就是一个Broker

  • 话题Topic:逻辑概念,一个Topic被认为是业务含义相同的一组消息。客户端通过绑定Topic来生产或消费自己感兴趣的话题

  • 分区Partition:Topic是逻辑概念,而Partition是实际存储消息的组件。每一个Partition就是一个queue。

在Kafka中,Topic是逻辑上的概念,数据实际上是保存在Topic下的Partition中的,Partition就是数据存储的物理单元,而Broker是Partition的物理载体,这些Partition会尽量均匀的分配在不同的Broker上,避免一个Broker宕机整个Partition副本集都不能用的情况发生

而消费者消费消息的offset其实就是每个消息在partition上的偏移量

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐