目录

1. 消费者与消费组

2. 一个完整的消费逻辑

3. 参数配置

3.1 四个必配参数

3.2 反序列化器(必配)

3.3 拦截器(非必配)

4. 创建消费者实例

5. 订阅主题与分区

5.1 订阅

5.2 取消订阅

6. 拉取消息并消费

7. 提交消费位移

8. 关闭资源


1. 消费者与消费组

消费者:并非逻辑概念,它是实际的应用实例,它可以是一个线程,也可以是一个进程。同一个消费组内的消费者既可以部署在同一台机器上,也可以部署在不同机器上。

消费组:逻辑概念,它将旗下的消费者归为一类,每个消费者只隶属于一个消费组。每个消费组都有一个固定的名称,消费者在进行消费前需要指定其所属消费组的名称。这个可以通过消费者客户端参数group.id来配置,默认值为空字符串。

对于消息中间件而言,一般有两种消息投递模式:点对点模式和发布/订阅模式。

点对点模式(P2P,Point-to-Point):基于队列,生产者发送消息到队列,消费者从队列中接收消息。

发布/订阅模式(Pub/Sub):消息发布者将消息发布到一个内容节点(即主题Topic),而消息订阅者则从主题中订阅消息。主题可以认为是消息传递的中介,它使得消息的订阅者和发布者互相保持独立,不需要进行接触即可保证消息的传递。(这让我想到了疫情期间咱们订外卖订蔬菜的无接触配送机制)  发布/订阅模式在消息的一对多广播时采用。

而Kafka同时支持两种消息投递模式,正是得益于消费者与消费组模型的契合:

  • 如果所有的消费者都隶属于同一个消费组,那么所有的消息都会被均衡地投递给每个消费者,即每条消息只会被一个消费者处理,这就相当于点对点模式的应用。
  • 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者,即每条消息会被所有的消费者处理,这就相当于发布/订阅模式的应用。

2. 一个完整的消费逻辑

对于Kafka消费者客户端开发而言,一个正常的消费逻辑应该包含以下几个步骤:

(1)配置消费者客户端参数及创建相应的消费者实例;

(2)订阅主题;

(3)拉取消息并消费;

(4)提交消费位移;

(5)关闭消费者实例。

示例代码如下:

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class Consumer {
    private static final String brokerList = "localhost:9092";
    private static final String topic = "topic-demo";
    private static final String groupId = "group.demo";

    private static Properties initConfig() {
        Properties properties = new Properties();
        properties.put("key.deserializer", StringDeserializer.class.getName());
        properties.put("value.deserializer", StringDeserializer.class.getName());
        properties.put("bootstrap.servers", brokerList);
        properties.put("group.id", groupId);
        return properties;
    }

    public static void main(String[] args) {
        //1.初始化参数配置
        Properties properties = initConfig();
        //2.创建消费者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        //3.订阅主题
        consumer.subscribe(Collections.singletonList(topic));
        while(true) {
            // 4.拉取消息并消费
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}

GAV坐标如下:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.0</version>
        </dependency>

3. 参数配置

3.1 四个必配参数

  • bootstrap.servers :该参数同时也是生产者客户端的必配参数。它是用来指定连接Kafka集群所需的broker地址清单。具体的内容格式为host1:port1,host2:port2,可以设置一个或多个地址,中间以逗号隔开,此参数的默认值为""。注意这里并非需要所有的broker地址,消费者会从给定的broker地址里查找到其他broker的信息。不过建议至少设置两个以上的broker地址信息,当其中任意一个宕机时,消费者仍然可以连接到Kafka集群上。
  • group.id :消费者隶属的消费组的名称,默认值为 ""。如果设置为空,则会报异常:Exception in thread "main" org.apache.kafka.common.errors.InvalidGroupldException: The configured groupld is invalid。一般而言,这个参数需要设置成具有一定的业务意义的名称。
  • key.deserializervalue.deserializer :与生产者客户端KafkaProducer 中的key.serializer和value.serializer参数对应。消费者从broker端获取的消息格式都是字节数组(byte[])类型,所以需要执行相应的反序列化操作才能还原成原有的对象格式。这两个参数分别用来指定消息中key 和 value所需反序列化操作的反序列化器,这两个参数无默认值。注意这里必须填写反序列化器类的全限定名,比如示例中的org .apache.kafka.common.serialization.StringDeserializer, 单单指定StringDeserializer是错误的。

3.2 反序列化器(必配)

        Kafka所提供的反序列化器有 ByteBufferDeserializer、ByteArrayDeserializer、 BytesDeserializer、 DoubleDeserializer、 FloatDeserializer、 IntegerDeserializer、 LongDeserializer、 ShortDeserializer、StringDeserializer, 它们分别用于ByteBuffer、ByteArray、Bytes、Double、Float、 Integer、Long、 Short 及 String类型的反序列化,这些反序列化器也都实现了Deserializer接口。

        如无特殊需要,不建议使用自定义的序列化器或反序列化器。因为这样会增加生产者与消费者之间的耦合度,在系统升级换代的时候很容易出错。 自定义的类型有一个不得不面对的问题就是KafkaProducer和KafkaConsumer之间的序列化和反序列化的兼容性。

3.3 拦截器(非必配)

        与生产者拦截器相对应,消费者也有自己的拦截器。消费者拦截器主要在消费到消息或在提交消费位移时进行一些定制化的操作。消费者拦截器需要自定义实现org.apache.kafka.clients.consumer.Consumerlnterceptor接口。

        ConsumerInterceptor接口包含3个方法:

        KafkaConsumer会在poll()方法返回之前调用拦截器的onConsume()方法来对消息进行相应的定制化操作,比如修改返回的消息内容、按照某种规则过滤消息(可能会减少poll()方法返回的消息的个数)。 如果onConsume()方法中抛出异常,那么会被捕获并记录到日志中,但是异常不会再向上传递。

        KafkaConsumer会在提交完消费位移之后调用拦截器的onCommit()方法,可以使用这个方法来记录跟踪所提交的位移信息,比如当消费者使用commitSync的无参方法时,我们不知道提交的消费位移的具体细节,而使用拦截器的onCommit()方法却可以做到这一点。 

4. 创建消费者实例

        Kafka提供了4个构造方法,如下:

5. 订阅主题与分区

5.1 订阅

一个消费者可以订阅一个或多个主题。

(1)subscribe()方法

        Kafka提供了四个subscribe()方法的重载来订阅主题,如下:

  • subscribe(Collection<String>) : 以集合的形式,让消费者可以订阅多个主题;
  • subscribe(Collection<String>, ConsumerRebalanceListener) : 暂且略过
  • subscribe(Pattern) : 以正则表达式的形式订阅特定模式的主题;
  • subscribe(Pattern, ConsumerRebalanceListener) : 暂且略过

(2)  assign()方法

        消费者不仅可以通过subscribe()方法订阅主题,还可以通过assign()方法直接订阅某些主题的特定分区。

5.2 取消订阅

        KafkaConsumer 中的 unsubscribe() 方法来取消主题的订阅。这个方法既可以取消通过subscribe方法的订阅,也可以取消assign方法的订阅。

        如果将subscribe(Collection)或 assign(Collection)中的集合参数设置为空集合,其作用等同于unsubscribe()方法。

6. 拉取消息并消费

        消息的消费一般有两种模式:推模式和 拉模式。推模式是服务端主动将消息推送给消费者,拉模式是消费者主动向服务端发起请求来拉取消息。

        Kafka中的消费是基于拉模式的。消费者所要做的就是重复地调用poll()方法,poll()方法返回的是所订阅的主题(分区)上的一组消息。

  • poll(Duration)

该方法的具体定义为:public ConsumerRecords<K,V> poll(final Duration timeout)

其中timeout参数是用来控制poll()方法的阻塞时间,在消费者的缓冲区里没有可用数据时会发生阻塞。该参数可以通过Duration中的 ofMillis()、ofSeconds()、ofMinutes()、ofHours()等多种不同的方法指定不同的时间单位。timeout的设置取决于程序对响应速度的要求,比如需要多长时间内将控制权交给执行轮询的应用线程。可直接将timeout设置为0,这样poll()方法会立刻返回,而不管是否已经拉取到了消息。如果应用线程唯一的工作就是消费Kafka中的消息,则可以将这个参数设置为最大值Long.MAX_VALUE。

  • poll(long) :已废弃,不予讨论
  • poll(Timer, boolean) :暂且略过 

7. 提交消费位移

(1)消费位移概念梳理

        对于 Kafka 中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中对应的位置。对于消费者而言,它也有一个 offset 的概念,消费者使用 offset 来表示消费到分区中某个消息所在的位置。

        对于消息在分区中的位置,offset 称为 “偏移量” ;对于消费者消费到的位置,将 offset 称为“位移” ,有时候也会更明确地称之为 “消费位移”。

(2)消费位移持久化

        在每次调用 poll()方法时,它返回的是还没有被消费过的消息集。要做到这一点,就需要记录上一次消费时的消费位移,并且这个消费位移必须做持久化保存,而不是单单保存在内存中。

        消费位移必须持久化的原因:

  • 如果不持久化,只保存在内存中,那当消费者重启后就无法知晓之前的消费位移了。
  • 当有新消费者加入后,必然要执行再均衡的动作,对于同一分区而言,它可能在再均衡动作之后分配给新消费者。如果不持久化保存消费位移,那么这个新消费者也无法知晓之前的消费位移。

(3)位移提交可能造成的问题

对于位移提交的具体时机的把握很有讲究,弄不好的话,可能会造成重复消费和消息丢失的现象。

  • 重复消费 :
  • 消息丢失 :

(4)位移提交方式

在Kafka中有两种位移提交方式:自动提交和手动提交。

  • 自动提交

        默认的位移提交方式。这个由消费者客户端参数 enable.auto.commit 配置,默认值为 true 。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms 配置,默认值为5秒,此参数生效的前提是 enable. auto.commit 参数为 true。

        在默认的方式下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。

        自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费和消息丢失的问题。假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。

  • 手动提交

        开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为 false。示例如下:

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 

        手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()和 commitAsync()两种类型的方法。

8. 关闭资源

        上面的示例使用一个while循环来包裹住 poll()方法及相应的消费逻辑,如何优雅地退出这个循环也很有考究。有以下两种方式可以参考:

  • 使用 while(isRunning.get()) 的方式取代 while(true),这样就可以通过在其他地方设定 isRunning.set(false) 来退出 while 循环。
  • 调用KafkaConsumer的 wakeup() 方法。wakeup() 方法是 KafkaConsumer 中唯一可以从其他线程里安全调用的方法。调用 wakeup() 方法后可以退出 poll() 的逻辑,并抛出 WakeupException 的异常,我们不需要处理 WakeupException 的异常,它只是一种跳出循环的方式。

        跳出循环以后一定要显式地执行关闭动作以释放运行过程中占用的各种系统资源,包括内存资源、 Socket 连接等。KafkaConsumer 提供了 close()方法来实现关闭,close()方法有四种重载方法。如下所示:

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐