kafka最全组件详解
(1)producer:消息生产者,发布消息到 kafka 集群的终端或服务。(2)broker:kafka 集群中包含的服务器。(kafka实例)(3)topic:每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。(4)partition:a)partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单...
(1)producer:消息生产者,发布消息到 kafka 集群的终端或服务。
(2)broker:kafka 集群中包含的服务器。(kafka实例)
(3)topic:每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。
(4)partition:
a)partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。( 消费者和生产者能操作的最小单元是分区,也就是不可能只消费一条数据)
b)kafka分区内有序,整体不一定有序。(业务需要整体有序时,可以用相同的key,这样会hash到同一分区中,此业务的数据都在一个分区中了,那么整体也就有序了)
c)如果consumer比partition多,是浪费,因为kafka的设计是在一个partition上是不允许并发的,consumer数不要大于partition数
(5)consumer:从 kafka 集群中消费消息的终端或服务。除了生产者分区是分布式,消费者也是分布式,这样减小IO压力。
(6)Consumer Group (CG):
a)这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;
b)消费者组是逻辑概念,只是一个标记而已,具体的修改在config的server.properties中的设置int类型的broker_id;
c)同一个消费者组里面不能是同时消费者消费消息,只能有一个消费者去消费,第二,同一个消费者组里面是不会重复消费消息的,第三,同一个消费者组的一个消费者不是以一条一条数据为单元的,是以分区为单元,就相当于消费者和分区建立某种socket,进行传输数据,所以,一旦建立这个关系,这个分区的内容只能是由这个消费者消费。
d)使用场景:( 同一组和不同组的使用情景) 生产环境一般要求消费者消费的数据一样且多个,比如一个写到hdfs,一个放到spark计算,这样就得要求不同的消费者在不同的消费者组里。
(7)replica:partition 的副本,保障 partition 的高可用。
a)副本是正本的拷贝。在kafka中,正本和副本都称之为副本(Repalica),但存在leader和follower之分。活跃的称之为leader,其他的是follower。每个分区的数据都会有多份副本,以此来保证Kafka的高可用。 topic下会划分多个partition,每个partition都有自己的replica,其中只有一个是leader replica,其余的是follower replica。
b)消息进来的时候会先存入leader replica,然后从leader replica复制到follower replica。只有复制全部完成时,consumer才可以消费此条消息。这是为了确保意外发生时,数据可以恢复。consumer的消费也是从leader replica读取的。 由此可见,leader replica做了大量的工作。所以如果不同partition的leader replica在kafka集群的broker上分布不均匀,就会造成负载不均衡。
c) kafka通过轮询算法保证leader replica是均匀分布在多个broker上。
可以看到每个partition的leader replica均匀的分布在三个broker上,follower replica也是均匀分布的。
关于Replica,有如下知识点:
1、Replica均匀分配在Broker上,同一个partition的replica不会在同一个borker上
2、同一个partition的Replica数量不能多于broker数量。多个replica为了数据安全,一台server存多个replica没有意义。server挂掉,上面的副本都要挂掉。
3、分区的leader replica均衡分布在broker上。此时集群的负载是均衡的。这就叫做分区平衡。
d)分区平衡:
1、AR: assigned replicas,已分配的副本。每个partition都有自己的AR列表,里面存储着这个partition最初分配的所有replica。注意AR列表不会变化,除非增加分区。
2、PR(优先replica):AR列表中的第一个replica就是优先replica,而且永远是优先replica。最初,优先replica和leader replica是同一个replica。
3、ISR:in sync replicas,同步副本。每个partition都有自己的ISR列表。ISR是会根据同步情况动态变化的。
最初ISR列表和AR列表是一致的,但由于某个节点死掉,或者某个节点的follower replica落后leader replica太多,那么该节点就会被从ISR列 表中移除。此时,ISR和AR就不再一致
一个拥有3个replica的partition,最初是下图的样子。 AR和ISR保持一致,并且初始时刻,优先副本和leader副本都指向replica 0.
接下来,replica 0所在的机器下线了,那么情况会变成如下图所示:
可以看到replica 0已经从ISR中移除掉了。同时,由于重新选举,leader副本变成了replica 1,而优先副本还是replica 0。优先副本是不会改变的。
由于最初时,leader副本在broker均匀分布,分区是平衡的。但此时,由于此partition的leader副本换成了另外一个,所以此时分区平衡已经被破坏。
replica 0所在的机器修复了,又重新上线,情况如下图:
可以看到replica 0重新回到ISR列表中,不过此时他没能恢复leader的身份。只能作为follower当一名小弟.
此时分区依旧是不平衡的。但是不会永远不平衡。
kafka会定时触发分区平衡操作,也可以主动触发分区平衡。这就是所谓的分区平衡操作,操作完后如下图。
可以看到此时leader副本通过选举,会重新变回来replica 0,因为replica 0是优先副本,其实优先的含义就是选择leader时被优先选择。这样整个分区又回到了初始状态,而初始时,leader副本是均匀分布的。此时已经分区平衡了。
由此可见,分区平衡操作就是使leader副本和优先副本保持一致的操作。可以把优先副本理解为分区的平衡状态位,平衡操作就是让leader副本归位。
e)副本管理器
副本机制使得kafka整个集群中,只要有一个代理存活,就可以保证集群正常运行。这大大提高了Kafka的可靠性和稳定性。
Kafka中代理的存活,需要满足以下两个条件:
存活的节点要维持和zookeeper的session连接,通过zookeeper的心跳机制实现
Follower副本要与leader副本保持同步,不能落后太多。
满足以上条件的节点在ISR中,一旦宕机,或者中断时间太长,Leader就会把同步副本从ISR中踢出。
所有节点中,leader节点负责接收客户端的读写操作,follower节点从leader复制数据。
副本管理器负责对副本管理。由于副本是分区的副本,所以对副本的管理体现在对分区的管理。
LEO和HW
LEO是Log End Offset缩写。表示每个分区副本的最后一条消息的位置,也就是说每个副本都有LEO。
HW是Hight Watermark缩写,他是一个分区所有副本中,最小的那个LEO。
分区test-0有三个副本,每个副本的LEO就是自己最后一条消息的offset。可以看到最小的LEO是Replica2的,等于3,也就是说HW=3。这代表offset=4的消息还没有被所有副本复制,是无法被消费的。而offset<=3的数据已经被所有副本复制,是可以被消费的。
(8)leader:replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
(9)follower:replica 中的一个角色,从 leader 中复制数据。
(10)controller:kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。(选举和故障转移)
假设此集群有三个broker,同时启动。 控制器也叫leader broker。
(一)3个broker从zookeeper获取/controller临时节点信息。/controller存储的是选举出来的leader信息。此举是为了确认是否已经存在leader。
(二)如果还没有选举出leader,那么此节点是不存在的,返回-1。如果返回的不是-1,而是leader的json数据,那么说明已经有leader存在,选举结束。
(三)三个broker发现返回-1,了解到目前没有leader,于是均会触发向临时节点/controller写入自己的信息。最先写入的就会成为leader。
(四)假设broker 0的速度最快,他先写入了/controller节点,那么他就成为了leader。而broker1、broker2很不幸,因为晚了一步,他们在写/controller的过程中会抛出ZkNodeExistsException,也就是zk告诉他们,此节点已经存在了。
经过以上四步,broker 0成功写入/controller节点,其它broker写入失败了,所以broker 0成功当选leader。
此外zk中还有controller_epoch节点,存储了leader的变更次数,初始值为0,以后leader每变一次,该值+1。所有向控制器发起的请求,都会携带此值。如果控制器和自己内存中比较,请求值小,说明kafka集群已经发生了新的选举,此请求过期,此请求无效。如果请求值大于控制器内存的值,说明已经有新的控制器当选了,自己已经退位,请求无效。kafka通过controller_epoch保证集群控制器的唯一性及操作的一致性。
(11)zookeeper:
a)kafka 通过 zookeeper 来存储集群的 meta 信息。
b)Kafka使用zookeeper作为其分布式协调框架,很好的将消息生产、消息存储、消息消费的过程结合在一起。
c)保存kafka的集群状态信息的,包括每个broker,为什么?,因为zk和broker建立监听,一旦有一个broker宕机了,另一个备份就可以变为领导,
第二,zk保存消费者的消费信息,为什么要保存?就是为了消费者下一次再次消费可以得知offset这个偏移量,consumer信息高版本在本地维护(0.9版本以上).
d) 同时借助zookeeper,kafka能够生产者、消费者和broker在内的所以组件在无状态的情况下,建立起生产者和消费者的订阅关系,并实现生产者与消费者的负载均衡。
(12)offset: 给分区中的消息提供了一个顺序ID号,称之为偏移量。因此,为了唯一地识别分区中的每条消息,我们使用这些偏移量。
更多推荐
所有评论(0)