四, Kafka消费者

5.1 消费方式(Pull)

kafka 的consumer 采用pull(拉)模式从broker中拉取数据.

  • push(推) 很难适应消费速率不同的消费者, 因为在这种方式中, 消费发送速率是由broker决定的, 他的目标是尽可能以最快速度传递消息, 但是这样很容造成consumer来不及处理消息, 典型的表现是拒绝服务以及网络阻塞, 而pull模式则可以根据consumer 的消费能力以适当的速率消费信息.
  • pull(拉) 不足之处是, 如果kafka没有数据, 消费者可能会陷入循环中, 一直返回空数据. 针对这一点, Kafka 的消费者在消费数据时会传入一个时长参数timeout, 如果当前没有数据可供消费, consumer 会等待一段时间之后再返回, 这段时间即为timeout.

5.2 消费者的工作流程(待补充)

在这里插入图片描述

5.3 消费者API

5.3.1 消费者组原理

消费者组(ConsumerGroup, CG), 由多个Consumer组成; 形成一个消费者组的条件是, 所有消费者的groupid相同;
消费者组的特点:

  1. 消费者组内每个消费者负责消费不同分区的数据, 一个分区只能由一个组内消费者消费;
  2. 消费者组之间互不影响. 所有的消费者属于某个消费者组, 即消费者组是逻辑上的一个订阅者;

在这里插入图片描述
在这里插入图片描述

5.3.2 消费者组初始化流程(待补充)

在这里插入图片描述

Q: 消费者的再平衡问题

  1. 再平衡的两个条件
    答: 心跳超时(45s以上未收到心跳), 该消费者被移除, 触发再平衡; 或者是消费者处理的时间过长(5分钟以上), 也会触发再平衡;
5.3.3 消费者详细消费流程(待补充)

在这里插入图片描述

5.3.4 消费者重要参数

在这里插入图片描述

在这里插入图片描述

2.4.2 消费者分区分配策略(重要)

一个consumer group中有多个consumer, 一个topic有多个partition, 所以必然会涉及到partition的分配问题, 及确定哪个partition由哪个consumer来消费.

Kafka进行消费者分区分配器的契机:
在这里插入图片描述


  • 对于给consumer分配分区, kafka有三种策略, RoundRobinAssignor(轮询), RangeAssignorStickyAssignor

Kafka提供了消费者客户端参数partition.assignment.strategy来设置消费与订阅主题之间的分区分配策略.
默认情况下, 此参数的值为org.apache.kafka.clients.consumer.RangeAssignor, 即采用RangeAssignor分配策略.
除此之外, Kafka还提供了另外两种分配策略, RoundRobinAssignor 和 StickyAssignor.
消费者客户端参数partition.assignment.strategy可以配置多个分配策略, 彼此之间以逗号分隔.

1. RangeAssignor 配置策略====>(将主题分区按照跨度平均分配给订阅了这些主题的消费者, 跨度=消费者总数/分区总数)
  • RangeAssignor分配策略的原理是按照消费者总数/分区总数来获得一个跨度, 然后将分区按照跨度进行平均分配, 以保证分区尽可能均匀的分配给所有的消费者.

[具体的分配策略]

  1. 对于每一个主题, RangeAssignor策略会将消费组内所有订阅这个主题的消费者按照名称字典顺序排列, 然后为每个消费者划分固定的分区范围, 如果不够平均分配, 那么字典顺序靠前的消费者会被多分配一个分区.
    • 假如, n=分区数/消费者数量, m=分区数%消费者, 那么前m个消费者每个分配n+1个分区, 后面的(消费者数量-m)个消费者每个分配还是n个分区.
    • (m是均分分区后剩下的余数, 即 分区数==消费者*n + m 个, 前m个消费者分配了n+1个, 剩下的(消费者-m)个只能分配n个)

[举个栗子]

  • eg1. 假设消费者组有2个消费者c0, c1, 它俩订阅了2个主题t0,t1, 这俩topic每个主题都有四个分区, 分别是t0p1, t0p2, t0p3, t0p4 和 t1p1, t1p2, t1p3, t1p4.

    • 我们按照上面的说法进行分配, 一番计算, 2消费者按字典排序(t0, t1), 4个分区, 每个消费者倆分区, 我们按照字典顺序循环, 一次给一个topic分配2个分区.
      • 分配结果为:
      • 消费者c0: t0p1, top2, t1p1, t1p2
      • 消费者c1: t0p3, top4, t1p3, t1p4
    • 对于上面这个分配结果, 我们先上面分配俩分区, 再下面分配俩, 然后上面, 然后下面, 正好平均分配.
  • eg2. 要是不均匀分配呢? 假设2个消费者c0, c1, 订阅了2个主题t0, t1, 每个主题都只有3个分区, 分别是为: t0p0, t0p1, t0p2, 和 t1p0, t1p1, t1p2.

    • 3/2=1余1, 所以n=1, m=1. 两个消费者中有一个会比另外一个多分配
      • 分配结果为
      • 消费者c0: t0p0,t0p1, t1p1, t1p1
      • 消费者c1: t0p2,t1p2

上面例子没看懂, 看看下面这张图:
在这里插入图片描述

可以看到, 使用RangeAssignor分配策略的话, 会出分配不均匀的现象, 如果将类似的情形扩大, 则有可能出现部分消费者过载的情况.

2. RoundRobinAssignor 配置策略====>(所有主题的所有分区,所有消费者分别字典排序, 轮询方式逐个分配)
  • Kafka默认策略.
  • RoundRobin分配策略的原理是将消费者组内所有消费者和消费者订阅的所有主题的所有分区分别按照字典顺序排序, 然后通过轮询方式逐个将分区一次分配给每个消费者.
  • 轮询策略对应的partition.assignment.strategy参数值为org.apache.kafka.clients.consumer.RoundRobinAssignor

[举个栗子]

在这里插入图片描述

在这里插入图片描述

3. StickeyAssignor 配置策略====>(初次分配时, 按照订阅关系轮询分配, 订阅关系发生变化时, 只把发生变化的部分按照现有的订阅关系继续轮询分配)

在这里插入图片描述
请添加图片描述
请添加图片描述
请添加图片描述

4. 自定义配置策略====>()

5.4 Offset的维护

  • 由于consumer在消费过程中可能会出现断电宕机等故障, cosumer恢复后, 需要从故障前的位置继续消费, 所以consumer需要实时记录消费到了那个offset, 以便故障恢复后继续消费.

请添加图片描述

0.9版本kafka之前 Offset的存储(位于zk中), 即上面灰色框图的内容, 在0.9版本之后, 由于offset存储在kafka集群的topic中, 所以灰色框图也就不复存在啦.


2.4.3.1 消费者组消费过程中Offset的记录
  • 在zk中. 消费者组节点的记录:
    • 请添加图片描述

消费者组消费一个topic 的过程(拿bigdata02消费bigdata01往topic offsetTest生产的消息为例)

  • offsetTest主题有两个分区, 三个副本
  • 请添加图片描述
  1. 准备: 创建新的topic, 并打开生产者示例, 准备生产消息, 并打开消费者消费相应的主题,

请添加图片描述

请添加图片描述

  1. 打开zookeeper客户端. 查看'/consumers/topic/newBD/console-consumer-对应消费者组id/newBD/offsets', 可以看到, newBD有两个分区0号,1号, 因为我们还没消费, 所以查看0号分区的值为0.

请添加图片描述

  1. 第一次生产和消费:
  • 可以看到, 消费者组消费的newBD的0号分区变为了1
    请添加图片描述
  1. 第二次生产和消费
  • 可以看到. 消费者组消费的newBD的1号分区变为了1
    请添加图片描述
  1. 第三次生产和消费
  • 可以看到. 消费者组消费的newBD的0号分区变为了2
    请添加图片描述

…如此循环往复. offset不断递增.

why so?
现在只有一个主题bigdata, 两个分区, 0和1号,

  1. 生产的时候没有指定分区号和key,就是轮询着往里生产数据(这里用到生产者的消息分配策略)
  2. 然后消费的时候, 因为是1个主题2个分区, 1个消费者 ,所以消费数据也是轮询的(其实是RangeAssignor退化为了RoundRobinAssignor), 先去0号分区消费一条数据, 此时zk的消费者组–bigdata主题–0号分区的 offset为1, 然后又消费了一条数据, 注意此时消费的应该是这一主题下1号分区下的一条数据(轮询嘛, 雨露均沾), 所以zk的 消费者组–bigdata主题–1号分区-- offset为1, 依次往复下去. (这里用到消费者的消息分配策略)
  1. Kafka集群采用了分布式存储和处理(集群)
  • kafka集群中把topic的各个分区分布存储在多台主机上, 打破了单机的IO和处理能力的限制, 提高了读写并行度; 此外, kafka进一步提高定位和读写效率, kafka还会对每个分区存储的文件进行分片处理, 将每个partiton对应的log数据文件分为若干个分片, 这些分片由同名的两个文件(.log和.index)文件构成,(以当前分片存储的第一条消息的offset命名), .log文件的大小固定(由config/server.properties/log.segment.bytes 参数决定), 用来存储生产者生产的消息, .index文件是索引文件, 用来记录offset和当前offset下每条消息开始的位置.
  1. 顺序读写(单机)
  • 在往各个分区的.log文件中存储数据时, kafka规定生产者顺序添加数据, 即将数据不断的追加到.log文件的末尾, 这种添加数据的方式符合磁盘读取数据的最佳方式(按照磁道顺序读写数据, 而不是跨磁道随机读写, 省去大量磁头寻址时间).
  1. 零拷贝技术(单机)

2.5 Zookeeper 在kafka集群中的作用?

参见本文

  1. 零拷贝技术(单机)

2.5 Zookeeper 在kafka集群中的作用?

参见本文

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐