感觉还是看着图(来源于某机构)来讲运行机制以及各个组件比较直观
这里写图片描述
图中从左到右能比较明显的看到三部分:对应生产者,目的地,消费者,生成者生成消息发送到目的地,消费者则从目的地主动拉取消息。
Producer:生产者,只负责数据生产,生产者的代码可以集成到任务系统中。
数据的分发策略由producer决定,默认是defaultPartition Utils.abs(key.hashCode) % numPartitions
Broker:当前服务器上的Kafka进程,俗称拉皮条。只管数据存储,不管是谁生产,不管是谁消费,一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
在集群中每个broker都有一个唯一brokerid,不得重复。
Topic:目标发送的目的地,这是一个逻辑上的概念,落到磁盘上是一个partition的目录。partition的目录中有多个segment组合(index,log)
一个Topic对应多个partition[0,1,2,3],一个partition对应多个segment组合。一个segment有默认的大小是1G。
每个partition可以设置多个副本(replication-factor 1),会从所有的副本中选取一个leader出来。所有读写操作都是通过leader来进行的。
特别强调,和mysql中主从有区别,mysql做主从是为了读写分离,在kafka中读写操作都是leader。
ConsumerGroup:数据消费者组,ConsumerGroup可以有多个,每个ConsumerGroup消费的数据都是一样的。
可以把多个consumer线程划分为一个组,组里面所有成员共同消费一个topic的数据,组员之间不能重复消费。
Offset:kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka
数据发送到目的地(topic)上是发送到哪几个partition上的呢?然后副本数量是多少呢?
这个是由自己创建topic的时候自己指定的,比如

[root@mini1 bin]# ./kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 2 --partitions 3 --topic orderMq
Created topic "orderMq".

创建了名为orderMq的topic,指定了发送到3个partition上,每个partition有2个副本。如图所示,使用第一个生产者发送关于订单的信息到3台机器(对应3个kafka,也就是3个broker)对应的4个partition上。接着消费者1去消费订单信息,实际是一个消费者组,里面应该有4个消费者(consumer)去分别消费orderMq这个topic对应的4个partition上的消息(具体消息的分发下面介绍)。

比如我这里机器是三台机器,mini1,mini2,mini3。两个生产者向orderMq发送消息,一个消费者从orderMq拉取消息

[root@mini1 bin]# kafka-console-producer.sh --broker-list mini1:9092 --topic orderMq
[2017-11-21 22:36:56,683] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
hello tom
hi jerry
spring ^H
hhaah
xixi
nini
ni
sss
ffffff
fffwewww
rrghhhhh
ddd
[root@mini2 ~]#  kafka-console-producer.sh --broker-list mini1:9092 --topic orderMq
[2017-11-15 01:41:36,777] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
hahahh
dddd
[root@mini1 bin]# kafka-console-consumer.sh --zookeeper mini1:2181 -from-beginning --topic orderMq
hello tom
hi jerry
spring 
hhaah
xixi
nini
ni
sss
ffffff
fffwewww
rrghhhhh
hahahh
dddd
ddd

查看partition(三台机器一一查看)

[root@mini3 kafka-logs]# ll
总用量 32
-rw-r--r--. 1 root root    4 11月 21 22:34 cleaner-offset-checkpoint
drwxr-xr-x. 2 root root 4096 11月 21 22:25 orderMq-0
drwxr-xr-x. 2 root root 4096 11月 21 22:25 orderMq-2
drwxr-xr-x. 2 root root 4096 11月 21 23:13 payment-0
drwxr-xr-x. 2 root root 4096 11月 21 23:13 payment-1
drwxr-xr-x. 2 root root 4096 11月 21 23:13 payment-3
-rw-r--r--. 1 root root   64 11月 22 05:23 recovery-point-offset-checkpoint
-rw-r--r--. 1 root root   64 11月 22 05:23 replication-offset-checkpoint
[root@mini2 kafka-logs]# ll
总用量 28
-rw-r--r--. 1 root root    4 11月 14 18:53 cleaner-offset-checkpoint
drwxr-xr-x. 2 root root 4096 11月 14 18:45 orderMq-1
drwxr-xr-x. 2 root root 4096 11月 14 18:45 orderMq-2
drwxr-xr-x. 2 root root 4096 11月 14 19:32 payment-0
drwxr-xr-x. 2 root root 4096 11月 14 19:32 payment-2
-rw-r--r--. 1 root root   52 11月 15 01:47 recovery-point-offset-checkpoint
-rw-r--r--. 1 root root   52 11月 15 01:47 replication-offset-checkpoint
[root@mini1 kafka]# cd kafka-logs/
[root@mini1 kafka-logs]# ll
总用量 32
-rw-r--r--. 1 root root    4 11月 21 22:34 cleaner-offset-checkpoint
drwxr-xr-x. 2 root root 4096 11月 21 22:25 orderMq-0
drwxr-xr-x. 2 root root 4096 11月 21 22:25 orderMq-1
drwxr-xr-x. 2 root root 4096 11月 21 23:13 payment-1
drwxr-xr-x. 2 root root 4096 11月 21 23:13 payment-2
drwxr-xr-x. 2 root root 4096 11月 21 23:13 payment-3
-rw-r--r--. 1 root root   64 11月 22 03:38 recovery-point-offset-checkpoint
-rw-r--r--. 1 root root   64 11月 22 03:39 replication-offset-checkpoint

orderMq对应的partition分别为orderMq-0,orderMq-1,orderMq-2。并且有2个副本(算自身在内)

[root@mini3 kafka-logs]# cd orderMq-0
[root@mini3 orderMq-0]# ll
总用量 4
-rw-r--r--. 1 root root 10485760 1121 22:31 00000000000000000000.index
-rw-r--r--. 1 root root      219 1122 05:22 00000000000000000000.log

其中.log文件保存的是生产者发送的消息
图中的蓝色框里面是问题。以下逐一回答(图中也已给出)

1、消息的分发
Producer客户端负责消息的分发
kafka集群中的任何一个broker都可以向producer提供metadata信息,这些metadata中包含”集群中存活的servers列表”/”partitions leader列表”等信息;
当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接;
消息由producer直接通过socket发送到broker,中间不会经过任何”路由层”,事实上,消息被路由到哪个partition上由producer客户端决定;
比如可以采用”random”“key-hash”“轮询”等,如果一个topic中有多个partitions,那么在producer端实现”消息均衡分发”是必要的。
在producer端的配置文件中,开发者可以指定partition路由的方式。

2、分组策略
通过指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希(defaultPartition Utils.abs(key.hashCode) % numPartitions)到对应分区配置文件中使用#partitioner.class=kafka.producer.DefaultPartitioner上文中的key是producer在发送数据时传入的,produer.send(KeyedMessage(topic,myPartitionKey,messageContent))

3、如何保证消息的完全生产
kafka如何保证数据的完全生产
ack机制:broker表示发来的数据已确认接收无误,表示数据已经保存到磁盘。
0:不等待broker返回确认消息
1:等待topic中某个partition leader保存成功的状态反馈
-1:等待topic中某个partition 所有副本都保存成功的状态反馈
可以在生产者配置文件中进行配置request.required.acks=0

4、每个partition的数据如何保存到磁盘上(broker如何保存数据)
在理论环境下,broker按照顺序读写的机制,可以每秒保存600M的数据。主要通过pagecache机制,尽可能的利用当前物理机器上的空闲内存来做缓存。
当前topic所属的broker,必定有一个该topic的partition,partition是一个磁盘目录。partition的目录中有多个segment组合(index,log)默认是1G。

5、partition数量和broker数量关系
一台物理机对应一个broker,一个broker上可以有多个partition,但一个partition只属于一个broker。换成是topic数量和broker数量关系也是一样。那么partition如何分布在broker上呢?
比如创建orderMq这个topic的时候,指定5个partition,1个副本。
那么保存的时候三台机器上的分布如下
mini1
orderMq-1
orderMq-4
mini2
orderMq-2
mini3
orderMq-0
orderMq-3
遵循以下规则

int i = 0
list{kafka01,kafka02,kafka03}

for(int i=0;i<5;i++){
    brIndex = i%broker;
    hostName = list.get(brIndex)
}

至于kafka的独特之处,负载均衡与保证消费顺序在后面讲解。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐