Kafka 实战使用、单机搭建、集群搭建、Kraft集群搭建
Topic是逻辑上的概念,producer和consumer通过Topic进行业务沟通Topic并不存储数据,数据是保存在Topic下的多组Partition中的,消息会尽量平均的分发在各组Partition中,每组Partition保存了Topic下的一部分消息每组Partition包含一个Leader和多个Follower,每组Partition的个数成为备份因子replica factor。
文章目录
实验环境
准备三台虚拟机
三台机器均预装CentOS7 操作系统。分别配置机器名 worker1,worker2,worker3。
vi /etc/hosts
192.168.75.61 worker1
192.168.75.62 worker2
192.168.75.63 worker3
firewall-cmd --state 查看防火墙状态
systemctl stop firewalld.service 关闭防火墙
systemctl disable firewalld 禁止开机自启
都需要安装jdk
[root@localhost ~]# mkdir -p /usr/local/soft/jdk
[root@localhost ~]# tar -zxvf jdk-8u201-linux-x64.tar.gz -C /usr/local/soft/jdk
[root@localhost ~]# vim /etc/profile
export JAVA_HOME=/usr/local/soft/jdk/jdk1.8.0_201
export JRE_HOME=/usr/local/soft/jdk/jdk1.8.0_201/jre
export CLASS_PATH=.:$JAVA_HOME/lib:$JRE_HOME/lib
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
[root@localhost jdk1.8.0_201]# source /etc/profile
[root@localhost jdk1.8.0_201]# java -version
前面的2.13是开发kafka的scala语言的版本,后面的3.4.0是kafka应用的版本。
Scala是一种运行于JVM虚拟机之上的语言。在运行时,只需要安装JDK就可以了,选哪个Scala版本没有区别。但是如果要调试源码,就必须选择对应的Scala版本。因为Scala语言的版本并不是向后兼容的。
另外,在选择kafka版本时,建议先去kafka的官网看下发布日志,了解一下各个版本的特性。 https://kafka.apache.org/downloads。
kafka的安装程序中自带了Zookeeper,可以在kafka的安装包的libs目录下查看到zookeeper的客户端jar包。但是,通常情况下,为了让应用更好维护,我们会使用单独部署的Zookeeper,而不使用kafka自带的Zookeeper。
下载完成后,将这两个工具包上传到三台服务器上,解压后,分别放到/usr/local/soft/kafka和/usr/local/soft/zookeeper目录下。最好是并将部署目录下的bin目录路径配置到path环境变量中。
单机服务
启动
解压kafka
[root@localhost ~]# mkdir /usr/local/soft/kafka/
[root@localhost ~]# tar -zxvf kafka_2.13-3.4.0.tgz -C /usr/local/soft/kafka/
# 最好是设置一下环境变量
[root@worker1 ~]# vi ./.bash_profile
export KAFKA_HOME=/usr/local/soft/kafka/kafka_2.13-3.4.0
export PATH=$PATH:$KAFKA_HOME/bin
[root@worker1 ~]# source ./.bash_profile
[root@localhost ~]# cd /usr/local/soft/kafka/kafka_2.13-3.4.0
直接执行
# 启动kafka自带的zookeeper服务
# 从nohup.out中可以看到zookeeper默认会在2181端口启动。通过jps指令看到一个QuorumPeerMain进程,确定服务启动成功。
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动kafka服务
# 也可以加上-daemon表示后台启动 bin/kafka-server-start.sh -daemon config/server.properties
# 服务会默认在9092端口启动。
nohup bin/kafka-server-start.sh config/server.properties &
[root@localhost kafka_2.13-3.4.0]# jps
2402 Kafka
2850 Jps
2044 QuorumPeerMain
停止服务
[root@worker1 ~]# kafka-server-stop.sh
[root@worker1 ~]# zookeeper-server-stop.sh
[root@worker1 ~]# jps
21269 Jps
简单收发消息
Kafka的基础工作机制是消息发送者可以将消息发送到kafka上指定的topic,而消息消费者,可以从指定的topic上消费消息。
首先,可以使用Kafka提供的客户端脚本创建Topic
#创建Topic
[root@localhost kafka_2.13-3.4.0]# bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9092
Created topic test.
#查看Topic --describe参数
[root@localhost kafka_2.13-3.4.0]# bin/kafka-topics.sh --topic test --describe --bootstrap-server localhost:9092
Topic: test TopicId: XSQWNCb3SbGbDEleLlBvtQ PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
然后,启动一个消息发送者端。往一个名为test的Topic发送消息。
当命令行出现 > 符号后,随意输入一些字符。Ctrl+C 退出命令行。这样就完成了往kafka发消息的操作。
[root@localhost kafka_2.13-3.4.0]# bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
>1
>2
>3
>4
>
如果不提前创建Topic,那么在第一次往一个之前不存在的Topic发送消息时,消息也能正常发送,只是会抛出LEADER_NOT_AVAILABLE警告。
[oper@worker1 kafka_2.13-3.2.0]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test >123 12[2021-03-05 14:00:23,347] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) 3[2021-03-05 14:00:23,479] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) [2021-03-05 14:00:23,589] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {test=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient) >>123
这是因为Broker端在创建完主题后,会显示通知Clients端LEADER_NOT_AVAILABLE异常。Clients端接收到异常后,就会主动去更新元数据,获取新创建的主题信息。
然后启动一个消息消费端,从名为test的Topic上接收消息。
[root@localhost kafka_2.13-3.4.0]# bin/kafka-console-consumer.sh --topic test --bootstrap-server localhost:9092
其他消费模式
指定消费进度
上方消费者只能接收到我新发送的5和6这两条消息,之前发送的4条消息就没有接收到。
如果想要消费之前发送的消息,可以通过添加--from-beginning
参数指定。
[root@worker1 kafka_2.13-3.4.0]# bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
1
2
3
4
5
6
如果需要更精确的消费消息,甚至可以指定从哪一条消息开始消费。
这表示从第0号Partition上的第5条消息开始读起。
bin/kafka-console-consumer.sh --topic test --partition 0 --offset 4 --bootstrap-server localhost:9092
5
6
topic是逻辑上的概念,一个topic可以有多个partition,消息真正是保存在partition中的,可以理解为partition就是真实的queue。
消息生成者就只管指定某个topic,往topic发送消息即可,kafka内部会把消息分发到该topic内的partition中,如果存在多个partition就会有相应的负载均衡策略
分组消费
一个partition中的消息,只会被一个消费者组中的某一个消费者消费。和RocketMQ一样。不同的消费者组可以对消息有不同的处理逻辑,而同一个消费者组的目的是减少消息消费压力,做水平扩容的
在kafka-console-consumer.sh
脚本中,可以通过--consumer-property group.id=testGroup
来指定所属的消费者组
# --consumer-property可以指定多个key-value 其中group.id只是其中一个
kafka-console-consumer.sh --topic test --consumer-property group.id=testGroup --bootstrap-server localhost:9092
我们现在可以启动三个消费者,来验证一下分组消费机制:
# 同一个消费者组
kafka-console-consumer.sh --topic test --consumer-property group.id=testGroup --bootstrap-server localhost:9092
kafka-console-consumer.sh --topic test --consumer-property group.id=testGroup --bootstrap-server localhost:9092
# 另一个消费者组
kafka-console-consumer.sh --topic test --consumer-property group.id=testGroup2 --bootstrap-server localhost:9092
查看消费者组的偏移量
接下来,还可以使用kafka-consumer-groups.sh
观测消费者组的情况。包括他们的消费进度。
# 添加-group 和 --describe两个参数
[root@worker1 ~]# kafka-consumer-groups.sh --group testGroup --describe --bootstrap-server localhost:9092
Consumer group 'testGroup' has no active members. # 当前消费者组无消费者,这是因为我此时停止了所有的消费者进程
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testGroup test 0 8 8 0 - - -
- GROUP表示哪一个消费者组
- TOPIC是topic名
- PARTITION 一个topic可以存在多个partition,则标识是第0号partition
- CURRENT-OFFSET表示当前消费者组消费到了topic为test、第0号partition的消费消息offset偏移量
- LOG-END-OFFSET 表示生产者往当前partition中生产的消息总数
- LAG 表示 还未消费的数量
比如我此时使用消息生产者再发送几条消息后再查看
# 生产三条消息
[root@worker1 ~]# kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
>1
>2
>3
>
# 此时 LOG-END-OFFSET就变为了11 而LAG就变为了3
[root@worker1 ~]# kafka-consumer-groups.sh --group testGroup --describe --bootstrap-server localhost:9092
Consumer group 'testGroup' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testGroup test 0 8 11 3 - - -
虽然业务上是通过Topic来分发消息的,但是实际上,消息是保存在Partition这样一个数据结构上的。
理解Kakfa的消息传递机制
kafka的消息生产者和消息消费者是通过topic这样一个逻辑概念来进行业务沟通的,单实际上所有的消息是存在Partition这样一个数据结构中的
概念:
-
客户端client:消息生产者和消息消费者都是Kafka的Client
-
消费者组:每个消费者可以指定一个所属的消费者组,每一条消息会被多个感兴趣的消费者组消费,但一条消息只会被一个消费者组中的一个消费者消费。
-
服务端Broker:一个Kafka服务就是一个Broker
-
话题Topic:逻辑概念,一个Topic被认为是业务含义相同的一组消息。客户端通过绑定Topic来生产或消费自己感兴趣的话题
-
分区Partition:Topic是逻辑概念,而Partition是实际存储消息的组件。每一个Partition就是一个queue。
集群服务
为什么要使用集群
- 创建多个partition,多个partition分布在不同的broker上,减少一个broker的压力
- 为每个partition准备多个备份,保证数据不丢失。多个partition会尽可能的分布在不同的broker上。多个partition是通过zookeeper来选举出一个Leader 处理Client的读写请求,其他备份partition就是Follower角色,只做数据备份和参与重新选举Leader
集群环境下就不会再使用kafka自带的zookeeper了,会单独部署zookeeper服务。
部署Zookeeper集群
zookeeper是使用的zab协议,选举需要半数以上的同意,所以部署的节点数需要单数。
[root@worker1 ~]# tar -zxf apache-zookeeper-3.6.2-bin.tar.gz -C /usr/local/soft/zookeeper/
[root@worker1 ~]# cd /usr/local/soft/zookeeper/apache-zookeeper-3.6.2-bin
然后进入conf目录,修改配置文件。在conf目录中,提供了一个zoo_sample.cfg文件,这是一个示例文件。我们只需要将这个文件复制一份zoo.cfg,修改下其中的关键配置就可以了。其中比较关键的修改参数如下:
[root@worker1 apache-zookeeper-3.6.2-bin]# cd conf/
[root@worker1 conf]# cp zoo_sample.cfg zoo.cfg
[root@worker1 conf]# vi zoo.cfg
#Zookeeper的本地数据目录,默认是/tmp/zookeeper。这是Linux的临时目录,随时会被删掉。
dataDir=/app/zookeeper/data
#Zookeeper的服务端口
clientPort=2181
#集群节点配置 2888集群内部数据传输 3888集群内部进行选举
server.1=192.168.75.61:2888:3888
server.2=192.168.75.62:2888:3888
server.3=192.168.75.63:2888:3888
其中,clientPort 2181是对客户端开放的服务端口。
集群配置部分, server.x这个x就是节点在集群中的myid。后面的2888端口是集群内部数据传输使用的端口。3888是集群内部进行选举使用的端口。
启动服务
# --config 指定config目录,它会自己去读取目录下的zoo.cfg文件
[root@worker1 apache-zookeeper-3.6.2-bin]# bin/zkServer.sh --config conf start
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Starting zookeeper ... FAILED TO START
发现启动失败了,查看vi ./logs/zookeeper-root-server-worker1.out
保存信息,提示是没有myid文件
因为我们上方的配置文件中dataDir=/app/zookeeper/data
,那么我们就在这个目录下创建一个myid文件,并指定一个集群环境下唯一id
# 在这个文件中输入一个1 其他节点就输入2 3 保证各节点不重复即可
vi /app/zookeeper/data/myid
1
再启动服务
[root@worker1 apache-zookeeper-3.6.2-bin]# bin/zkServer.sh --config conf start
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Starting zookeeper ... STARTED
启动完成后,使用jps指令可以看到一个QuorumPeerMain进程就表示服务启动成功。
[root@worker1 apache-zookeeper-3.6.2-bin]# jps
8438 Jps
8395 QuorumPeerMain
三台机器都启动完成后,可以查看下集群状态。
这其中Mode 为leader就是主节点,follower就是从节点。
[root@worker1 apache-zookeeper-3.6.2-bin]# bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /app/zookeeper/zookeeper-3.5.8/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
部署Kafka集群
kafka服务并不需要进行选举,因此也没有奇数台服务的建议。每台服务器上都执行下面的命令
解压kafka
[root@localhost ~]# mkdir /usr/local/soft/kafka/
[root@localhost ~]# tar -zxvf kafka_2.13-3.4.0.tgz -C /usr/local/soft/kafka/
# 最好是设置一下环境变量
[root@worker1 ~]# vi ./.bash_profile
export KAFKA_HOME=/usr/local/soft/kafka/kafka_2.13-3.4.0
export PATH=$PATH:$KAFKA_HOME/bin
[root@worker1 ~]# source ./.bash_profile
[root@localhost ~]# cd /usr/local/soft/kafka/kafka_2.13-3.4.0
然后进入config目录,修改server.properties。这个配置文件里面的配置项非常多,下面列出几个要重点关注的配置。
[root@worker1 kafka_2.13-3.4.0]# mkdir -p /app/kafka/logs
[root@worker1 kafka_2.13-3.4.0]# vi config/server.properties
#broker 的全局唯一编号,不能重复,只能是数字。 所以一个集群下的kafka实例就需要使用不同的id
broker.id=1
#数据文件地址。同样默认是给的/tmp目录。
log.dirs=/app/kafka/logs
# 默认的每个Topic的分区数1,可以修改,我这里就修改为了2
num.partitions=2
#zookeeper的服务地址,因为我配置了域名映射,所以这里就使用使用的worker1 worker2 worker3
zookeeper.connect=worker1:2181,worker2:2181,worker3:2181
#可以选择指定zookeeper上的基础节点。
#zookeeper.connect=worker1:2181,worker2:2181,worker3:2181/kafka
broker.id需要每个服务器上不一样,分发到其他服务器上时,要注意修改一下。
多个Kafka服务注册到同一个zookeeper集群上的节点,会自动组成集群。
一些核心的配置内如下所示
Property | Default | Description |
---|---|---|
broker.id | 0 | broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯每个broker都可以用一个唯一的非负整数id进行标识;这个id可以作为一的即可。 |
log.dirs | /tmp/kafka-logs | kafka存放数据的路径。这个路径并不是唯一的,可以是多个,路径之间只需要使用逗号分隔即可;每当创建新partition时,都会选择在包含最少partitions的路径下进行。 |
listeners | PLAINTEXT://127.0.0.1:9092 | server接受客户端连接的端口,ip配置kafka本机ip即可 |
zookeeper.connect | localhost:2181 | zookeeper连接地址。hostname:port。如果是Zookeeper集群,用逗号连接。 |
log.retention.hours | 168 | 每个日志文件删除之前保存的时间。 |
num.partitions | 1 | 创建topic的默认分区数 |
default.replication.factor | 1 | 自动创建topic的默认副本数量 |
min.insync.replicas | 1 | 当producer设置acks为-1时,min.insync.replicas指定replicas的最小数目(必须确认每一个repica的写数据都是成功的),如果这个数目没有达到,producer发送消息会产生异常 |
delete.topic.enable | false | 是否允许删除主题 |
接下来就可以启动kafka服务了。启动服务时需要指定配置文件。
# -daemon表示后台启动
[root@worker1 kafka_2.13-3.4.0]# bin/kafka-server-start.sh -daemon config/server.properties
[root@worker1 kafka_2.13-3.4.0]# jps
8819 Kafka
8884 Jps
8395 QuorumPeerMain
理解服务端的Topic、Partition和Broker
创建一个topic,指定该topic的partition数量为4,每组partition的副本集个数为2,
这里的副本是指一个Leader一个Follower,并不是指一个Leader两个Follower
# --partitions 参数表示分区数 --replication-factor 表示每个分区的副本集个数
kafka-topics.sh --create --topic disTopic --partitions 4 --replication-factor 2 --bootstrap-server worker1:9092
列出所有的topic
[root@worker1 ~]# kafka-topics.sh --list --bootstrap-server worker1:9092
disTopic
查看列表情况
# --describe查看Topic信息。
[root@worker1 ~]# kafka-topics.sh --describe --topic disTopic --bootstrap-server worker1:9092
Topic: disTopic TopicId: rDUdZBO7RH2GNPgdRXk7Tw PartitionCount: 4 ReplicationFactor: 2 Configs:
Topic: disTopic Partition: 0 Leader: 3 Replicas: 3,1 Isr: 3,1
Topic: disTopic Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: disTopic Partition: 2 Leader: 2 Replicas: 2,3 Isr: 2,3
Topic: disTopic Partition: 3 Leader: 3 Replicas: 3,2 Isr: 3,2
# 这里有4个partition 就拿partition0来举例,其中partition0这个副本集中的Leader在BrokerId为3的机器上,其中两个副本分别在brokerId1 BrokerId3上
-
Partiton参数列出了四个partition,后面带有分区编号,用来标识这些分区。
-
Leader表示这一组partition的Leader节点是哪一个,这个后面的数字是启动kafka服务时配置文件中指定的broker.id数
kafka的每一组partition都会选举一个Leader,Leader会处理Client的读写请求,但实际上Leader可能会让某个副本去响应Client的请求。
-
Replicas表示这个Partition的复制集是分配到哪些Broker上的。但是Replicas列出的只是一个逻辑上的分配情况,并不关心数据实际上是不是按照这个分配,甚至有些节点服务挂了,这一列也还会显示
-
Isr表示Partition的实际分配情况,它是Replicas列的一个子集,只列出当前存活,能正常同步数据Broker节点
我们在配置文件中指定的log.dirs=/app/kafka/logs
配置
一个Broker上的一个partition就对应着一个目录,而这个Partition上的所有消息,就保存在这个对应的目录当中。
在Kafka中,Topic是逻辑上的概念,数据实际上是保存在Topic下的Partition中的,Partition就是数据存储的物理单元,而Broker是Partition的物理载体,这些Partition会尽量均匀的分配在不同的Broker上,避免一个Broker宕机整个Partition副本集都不能用的情况发生
而消费者消费消息的offset其实就是每个消息在partition上的偏移量
总结
- Topic是逻辑上的概念,producer和consumer通过Topic进行业务沟通
- Topic并不存储数据,数据是保存在Topic下的多组Partition中的,消息会尽量平均的分发在各组Partition中,每组Partition保存了Topic下的一部分消息
- 每组Partition包含一个Leader和多个Follower,每组Partition的个数成为备份因子replica factor
- producer将消息发送到Partition上,consumer通过partition上的offset记录自己所属组在当前partition上消费消息的消费进度
- producer将消息发送到Topic,kafka会推送给所有订阅了该topic的消费者组进行处理。但是每个消费者组内部只会有一个消费者实例处理这一条消息
- kafka的Broker通过zookeeper组成集群,然后在这些Broker中,需要选举产生一个担任Controller角色的Broker。这个Controller的主要任务就是负责Topic的分配以及后续管理工作。这也是通过ZooKeeper选举的。
Kraft集群
在Kafka的config目录下,提供了一个kraft的文件夹,在这里面就是Kraft协议的参考配置文件。在这个文件夹中有三个配置文件,broker.properties,controller.properties,server.properties,分别给出了Kraft中三种不同角色的示例配置。
- broker.properties: 数据节点
- controller.properties: Controller控制节点
- server.properties: 即可以是数据节点,又可以是Controller控制节点。
这里同样列出几个比较关键的配置项,按照自己的环境进行定制即可。
[root@worker1 kafka_2.13-3.4.0]# vi config/kraft/server.properties
#配置当前节点的角色。Controller相当于Zookeeper的功能,负责集群管理。Broker提供具体的消息转发服务。
process.roles=broker,controller
#配置当前节点的id。与普通集群一样,要求集群内每个节点的ID不能重复。
node.id=1
#配置集群的投票节点。其中@前面的是node.id的值,后面是节点的地址和端口,这个端口跟客户端访问的端口是不一样的。通常将集群内的所有Controllor节点都配置进去。
controller.quorum.voters=1@192.168.75.61:9093,2@192.168.75.62:9093,3@192.168.75.63:9093
#Broker对客户端暴露的服务地址。基于PLAINTEXT协议。一般写本机ip
advertised.listeners=PLAINTEXT://worker1:9092
#Controller服务协议的别名。默认就是CONTROLLER
controller.listener.names=CONTROLLER
#配置监听服务。不同的服务可以绑定不同的接口。这种配置方式在端口前面是省略了一个主机IP的,主机IP默认是使用的java.net.InetAddress.getCanonicalHostName()
listeners=PLAINTEXT://:9092,CONTROLLER://:9093
#数据文件地址。默认配置在/tmp目录下。
log.dirs=/app/kafka/kraft-log
#topic默认的partition分区数。
num.partitions=2
将配置文件分发,并修改每个服务器上的node.id
、controller.quorum.voters
、log.dirs
、num.partitions
属性。
由于Kafka的Kraft集群对数据格式有另外的要求,所以在启动Kraft集群前,还需要对日志目录进行格式化。
# 先使用kafka提供的脚本生成一个uuid
[oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-storage.sh random-uuid
NT1Y5KgOTQ63SPppgsfhqA
# -t 集群ID,三个服务器上使用同一个集群ID。 加上上方的uuid -c 指定配置文件
[oper@worker1 kafka_2.13-3.4.0]$ bin/kafka-storage.sh format -t NT1Y5KgOTQ63SPppgsfhqA -c config/kraft/server.properties
接下来就可以指定配置文件,启动Kafka的服务了。 例如,在Worker1上,启动Broker和Controller服务。
[root@worker1 kafka_2.13-3.4.0]# bin/kafka-server-start.sh -daemon config/kraft/server.properties
[root@worker1 kafka_2.13-3.4.0]# jps
12960 Kafka
13028 Jps
等三个服务都启动完成后,就可以像普通集群一样去创建Topic,并维护Topic的信息了。
相关概念
topic是逻辑上的概念,一个topic可以有多个partition,消息真正是保存在partition中的,可以理解为partition就是真实的queue。
消息生成者就只管指定某个topic,往topic发送消息即可,kafka内部会把消息分发到该topic内的partition中,如果存在多个partition就会有相应的负载均衡策略
一个partition中的消息,只会被一个消费者组中的某一个消费者消费,和RocketMQ一样。不同的消费者组可以对消息有不同的处理逻辑,而同一个消费者组的目的是减少消息消费压力,做水平扩容的
消息副本,每个消费者组都有一个消息副本,而一个消费者组下的多个消费者共用一个消息副本,也就是一条消息只能被一个消费者组下的一个消费者消费。
概念:
-
客户端client:消息生产者和消息消费者都是Kafka的Client
-
消费者组:每个消费者可以指定一个所属的消费者组,每一条消息会被多个感兴趣的消费者组消费,但一条消息只会被一个消费者组中的一个消费者消费。
-
服务端Broker:一个Kafka服务就是一个Broker
-
话题Topic:逻辑概念,一个Topic被认为是业务含义相同的一组消息。客户端通过绑定Topic来生产或消费自己感兴趣的话题
-
分区Partition:Topic是逻辑概念,而Partition是实际存储消息的组件。每一个Partition就是一个queue。
在Kafka中,Topic是逻辑上的概念,数据实际上是保存在Topic下的Partition中的,Partition就是数据存储的物理单元,而Broker是Partition的物理载体,这些Partition会尽量均匀的分配在不同的Broker上,避免一个Broker宕机整个Partition副本集都不能用的情况发生
而消费者消费消息的offset其实就是每个消息在partition上的偏移量
更多推荐
所有评论(0)