为什么使用消息队列?

以用户下单购买商品的行为举例,在使用微服务架构时,我们需要调用多个服务。传统的调用方式是同步调用,这会存在一定的性能问题

使用消息队列可以实现异步的通信方式,相比于同步的通信⽅式,异步的⽅式可以让上游快速成功,极大提高系统的吞吐量。在分布式系统中,通过下游多个服务的分布式事务的保障,也能保障业务执行之后的最终⼀致性


Kafka 概述

1. 介绍

Kafka 是⼀个分布式的、⽀持分区的(partition)、多副本的 (replica),基于 zookeeper 协调的分布式消息系统,它最大的特性就是可以实时处理大量数据以满足各类需求场景:

  • 日志收集:使用 Kafka 收集各种服务的日志,并通过 kafka 以统一接口服务的方式开放给各种 consumer,例如 hadoop、Hbase、Solr 等
  • 消息系统:解耦和生产者和消费者、缓存消息等
  • 用户活动跟踪:Kafka 经常被用来记录 web 用户或者 app 用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘
  • 运营指标:Kafka 也经常用来记录运营监控数据,包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告
2. 相关术语
名称解释
Broker消息中间件处理节点,⼀个 Kafka 节点就是⼀个 broker,⼀个或者多个 Broker 可以组成⼀个 Kafka 集群
TopicKafka 根据 topic 对消息进行归类,发布到 Kafka 集群的每条消息都需要指定⼀个 topic
Producer消息生产者,向 Broker 发送消息的客户端
Consumer消息消费者,从 Broker 读取消息的客户端
ConsumerGroup每个 Consumer 属于⼀个特定的 Consumer Group,⼀条消息可以被多个不同的 Consumer Group 消费,但是⼀个 Consumer Group 中只能有⼀个 Consumer 能够消费该消息
Partition物理上的概念,⼀个 topic 可以分为多个 partition,每个 partition 内部消息是有序的
3. 安装

安装 Kafka 之前需要先安装 JDK 和 Zookeeper,在官网下载 Kafka 安装包:http://kafka.apache.org/downloads,直接解压即可

需要修改配置文件,进⼊到 config 目录内,修改 server.properties

# broker.id 属性在 kafka 集群中必须唯一
broker.id= 0
# kafka 部署的机器 ip 和提供服务的端口号
listeners=PLAINTEXT://192.168.65.60:9092
# kafka 的消息存储文件
log.dir=/usr/local/data/kafka-logs
# kafka 连接 zookeeper 的地址
zookeeper.connect= 192.168.65.60:2181

server.properties 核心配置详解:

PropertyDefaultDescription
broker.id0每个 broker 都可以用⼀个唯⼀的非负整数 id 进行标识,作为 broker 的 名字
log.dirs/tmp/kafka-logskafka 存放数据的路径,这个路径并不是唯⼀的,可以是多个,路径之间只需要使⽤逗号分隔即可;每当创建新 partition 时,都会选择在包含最少 partitions 的路径下进行
listenersPLAINTEXT://192.168.65.60:9092server 接受客户端连接的端⼝,ip 配置 kafka 本机 ip 即可
zookeeper.connectlocalhost:2181zooKeeper 连接字符串的格式为:hostname:port,此处 hostname 和 port 分别是 ZooKeeper 集群中某个节点的 host 和 port;zookeeper 如果是集群,连接⽅式为 hostname1:port1,hostname2:port2,hostname3:port3
log.retention.hours168每个日志文件删除之前保存的时间,默认数据保存时间对所有 topic 都⼀样
num.partitions1创建 topic 的默认分区数
default.replication.factor1⾃动创建 topic 的默认副本数量,建议设置为⼤于等于 2
min.insync.replicas1当 producer 设置 acks 为 -1 时,min.insync.replicas 指定 replicas 的最小数目(必须确认每⼀个 repica 的写数据都是成功的),如果这个数目没有达到,producer 发送消息会产生异常
delete.topic.enablefalse是否允许删除主题

进入到 bin 目录下,使用命令来启动

./kafka-server-start.sh -daemon../config/server.properties

验证是否启动成功:进入到 zk 中的节点看 id 是 0 的 broker 有没有存在(上线)

ls /brokers/ids/

实现消息的生产和消费

1. 主题 Topic

topic 可以实现消息的分类,不同消费者订阅不同的 topic

执行以下命令创建名为 test 的 topic,这个 topic 只有一个 partition,并且备份因子也设置为 1

./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 1 --partitions 1 --topic test

在较新版本(2.2 及更高版本)的 Kafka 不再需要 ZooKeeper 连接字符串,即不需要 --zookeeper localhost:2181。使用 Kafka Broker的 --bootstrap-server localhost:9092 来替代 --zookeeper localhost:2181

kafka-topics.sh --bootstrap-server localhost:9092 --create --topic heima --partitions 1 --replication-factor 1

查看当前 kafka 内有哪些 topic

./kafka-topics.sh --list --zookeeper 172.16.253.35:2181

高版本命令如下

./kafka-topics.sh --list --bootstrap-server localhost:9092
2. 发送消息

把消息发送给 broker 中的某个 topic,打开⼀个 kafka 发送消息的客户端,然后开始⽤客户端向 kafka 服务器发送消息

kafka 自带了一个 producer 命令客户端,可以从本地文件中读取内容,或者我们也可以以命令行中直接输入内容,并将这些内容以消息的形式发送到 kafka 集群中。在默认情况下,每一个行会被当做成一个独立的消息

./kafka-console-producer.sh --broker-list 172.16.253.38:9092 --topic test
3. 消费消息

对于 consumer,kafka 同样也携带了一个命令行客户端,会将获取到内容在命令中进行输出,默认是消费最新的消息。使用 kafka 的消费者客户端,从指定 kafka 服务器的指定 topic 中消费消息

方式一:从最后一条消息的 偏移量+1 开始消费

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --topic test

方式二:从头开始消费

./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092 --from-beginning --topic test

消息的发送方会把消息发送到 broker 中,broker 会存储消息,消息是按照发送的顺序进行存储。因此消费者在消费消息时可以指明主题中消息的偏移量。默认情况下,是从最后一个消息的下一个偏移量开始消费

4. 单播消息

一个消费组里只有一个消费者能消费到某一个 topic 中的消息,可以创建多个消费者,这些消费者在同一个消费组中

./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup --topic test
5. 多播消息

在一些业务场景中需要让一条消息被多个消费者消费,那么就可以使用多播模式。kafka 实现多播,只需要让不同的消费者处于不同的消费组即可

./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup1 --topic test

./kafka-console-consumer.sh --bootstrap-server 10.31.167.10:9092 --consumer-property group.id=testGroup2 --topic test
6. 查看消费组及信息
# 查看当前主题下有哪些消费组
./kafka-consumer-groups.sh --bootstrap-server 10.31.167.10:9092 --list
# 查看消费组中的具体信息:比如当前偏移量、最后一条消息的偏移量、堆积的消息数量
./kafka-consumer-groups.sh --bootstrap-server 172.16.253.38:9092 --describe --group testGroup

  • Currennt-offset:当前消费组的已消费偏移量
  • Log-end-offset:主题对应分区消息的结束偏移量(HW)
  • Lag:当前消费组未消费的消息数
7. 其他细节

  • 生产者将消息发送给 broker,broker 会将消息保存在本地的日志文件中

    /usr/local/kafka/data/kafka-logs/主题-分区/00000000.log
    
  • 消息的保存是有序的,通过 offset 偏移量来描述消息的有序性

  • 消费者消费消息时也是通过 offset 来描述当前要消费的那条消息的位置


主题与分区

主题 Topic 在 kafka 中是⼀个逻辑概念,kafka 通过 topic 将消息进行分类。不同的 topic 会被订阅该 topic 的消费者消费。但是有⼀个问题,如果说这个 topic 的消息非常多,消息是会被保存到 log 日志文件中的,这会出现文件过大的问题,因此,kafka 提出了 Partition 分区的概念

通过 partition 将⼀个 topic 中的消息分区来存储,这样的好处有多个:

  • 分区存储,可以解决存储文件过大的问题
  • 提供了读写的吞吐量:读和写可以同时在多个分区进⾏

为⼀个主题创建多个分区

./kafka-topics.sh --create --zookeeper localhost:2181 --partitions 2 --topic test1

通以下命令查看 topic 的分区信息

./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1

分区的作用:

  • 可以分布式存储
  • 可以并行写

了解了 Partition,再补充一个 Kafka 细节:在消息日志文件中,kafka 内部创建了 __consumer_offsets 主题包含了 50 个分区。这个主题用来存放消费者某个主题的偏移量,每个消费者会把消费的主题的偏移量自主上报给 kafka 中的默认主题:consumer_offsets。因此 kafka 为了提升这个主题的并发性,默认设置了 50 个分区

  • 提交到哪个分区:通过 hash 函数:hash(consumerGroupId) % __consumer_offsets 主题的分区数
  • 提交到该主题中的内容是:key 是 consumerGroupId + topic + 分区号,value 就是当前 offset 的值
  • 文件中保存的消息,默认保存七天,七天到后消息会被删除

Kafka 集群

1. 搭建

创建三个 server.properties 文件

# 0 1 2
broker.id=2
# 9092 9093 9094
listeners=PLAINTEXT://192.168.65.60:9094
# kafka-logs kafka-logs-1 kafka-logs-2
log.dir=/usr/local/data/kafka-logs-2

通过命令启动三台 broker

./kafka-server-start.sh -daemon../config/server0.properties
./kafka-server-start.sh -daemon../config/server1.properties
./kafka-server-start.sh -daemon../config/server2.properties

搭建完后通过查看 zk 中的 /brokers/ids 看是否启动成功

2. 副本

下面的命令,在创建主题时,除了指明了主题的分区数以外,还指明了副本数,分别是:一个主题,两个分区、三个副本

./kafka-topics.sh --create --zookeeper 172.16.253.35:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic

通过查看主题信息,其中的关键数据:

  • replicas:当前副本所存在的 broker 节点

  • leader:副本里的概念

    • 每个 partition 都有一个 broker 作为 leader
    • 消息发送方要把消息发给哪个 broker,就看副本的 leader 是在哪个 broker 上面,副本里的 leader 专门用来接收消息
    • 接收到消息,其他 follower 通过 poll 的方式来同步数据
  • follower:leader 处理所有针对这个 partition 的读写请求,而 follower 被动复制 leader,不提供读写(主要是为了保证多副本数据与消费的一致性),如果 leader 所在的 broker 挂掉,那么就会进行新 leader 的选举

  • isr:可以同步的 broker 节点和已同步的 broker 节点,存放在 isr 集合中

3. broker、主题、分区、副本

Kafka 集群中由多个 broker 组成,⼀个 broker 存放⼀个 topic 的不同 partition 以及它们的副本

4. 集群消息的发送
./kafka-console-producer.sh --broker-list 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --topic my-replicated-topic
5. 集群消息的消费
./kafka-console-consumer.sh --bootstrap-server 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094 --from-beginning --topic my-replicated-topic
6. 分区消费组消费者的细节

  • ⼀个 partition 只能被⼀个消费组中的⼀个消费者消费,目的是为了保证消费的顺序性,但是多个 partion 的多个消费者的消费顺序性是得不到保证的
  • 一个消费者可以消费多个 partition,如果消费者挂了,那么会触发rebalance机制,由其他消费者来消费该分区
  • 消费组中消费者的数量不能比一个 topic 中的 partition 数量多,否则多出来的消费者消费不到消息

Java 中使用 Kafka

1. 生产者
1.1 引入依赖
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.4.1</version>
</dependency>
1.2 生产者发送消息
/**
 * 消息的发送方
 */
public class MyProducer {

    private final static String TOPIC_NAME = "my-replicated-topic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1.设置参数
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094");
        // 把发送的 key 从字符串序列化为字节数组
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 把发送消息 value 从字符串序列化为字节数组
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 2.创建⽣产消息的客户端,传⼊参数
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        // 3.创建消息
        // key: 作⽤是决定了往哪个分区上发
        // value: 具体要发送的消息内容
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC_NAME, "mykeyvalue", "hellokafka");
        // 4.发送消息,得到消息发送的元数据并输出
        RecordMetadata metadata = producer.send(producerRecord).get();
        System.out.println("同步⽅式发送消息结果:" + "topic-" + metadata.topic() + "|partition-" + metadata.partition() + "|offset-" + metadata.offset());
    }
}
1.3 发送消息到指定分区
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0 , order.getOrderId().toString(), JSON.toJSONString(order));

如果未指定分区,则会通过业务 Key 的 hash 运算,得出要发送的分区,公式为:hash(key)%partitionNum

1.4 同步发送消息

⽣产者同步发消息,在收到 kafka 的 ack 告知发送成功之前将⼀直处于阻塞状态

// 等待消息发送成功的同步阻塞方法
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步方式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" +metadata.offset());
1.5 异步发送消息

异步发送,⽣产者发送完消息后就可以执⾏之后的业务,broker 在收到消息后异步调用生产者提供的 callback 回调方法

// 指定发送分区
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME, 0 , order.getOrderId().toString(),JSON.toJSONString(order));
// 异步回调方式发送消息
producer.send(producerRecord, new Callback() {
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if (exception != null) {
            System.err.println("发送消息失败:" +
                               exception.getStackTrace());
        }
        if (metadata != null) {
            System.out.println("异步方式发送消息结果:" + "topic-" +metadata.topic() + "|partition-"+ metadata.partition() + "|offset-" + metadata.offset());
        }
    }
});
1.6 生产者中的 ack 的配置

在同步发送的前提下,生产者在获得集群返回的 ack 之前会⼀直阻塞,那么集群什么时候返回 ack 呢?此时 ack 有三个配置:

  • acks = 0:表示 producer 不需要等待任何 broker 确认收到消息的回复,就可以继续发送下一条消息,性能最高,但最容易丢消息
  • acks = 1:至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失
  • acks = -1 或 all:需要等待 min.insync.replicas(默认为 1 ,推荐配置大于等于2)这个参数配置的副本个数都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证,一般是金融级别,或跟钱打交道的场景才会使用这种配置
props.put(ProducerConfig.ACKS_CONFIG, "1");
// 发送失败,默认会重试三次,每次间隔 100ms
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100)
1.7 消息发送的缓冲区

  • kafka 默认会创建⼀个消息缓冲区,用来存放要发送的消息,缓冲区是 32m

    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
    
  • kafka 本地线程会在缓冲区中⼀次拉 16k 的数据,发送到 broker

    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
    
  • 如果线程拉不到 16k 的数据,间隔 10ms 也会将已拉到的数据发到 broker

    props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
    
2. 消费者
2.1 消费消息
public class MySimpleConsumer {
    
    private final static String TOPIC_NAME = "my-replicated-topic";
    private final static String CONSUMER_GROUP_NAME = "testGroup";
    
    public static void main(String[] args) {
        
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");
 		// 消费分组名
        props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        
 		// 1.创建⼀个消费者的客户端
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
 		// 2.消费者订阅主题列表
        consumer.subscribe(Arrays.asList(TOPIC_NAME));
 		while (true) {
            /*
             * 3. poll() API 是拉取消息的⻓轮询
             */
 			ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
 			for (ConsumerRecord<String, String> record : records) {
 				// 4.打印消息
                System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
            }
        }
    }
}
2.2 自动提交和手动提交 offset

无论是自动提交还是手动提交,都需要把所属的 消费组 + 消费的某个主题 + 消费的某个分区 + 消费的偏移量 提交到集群的 _consumer_offsets 主题里面

  • 自动提交:消费者 poll 消息下来以后自动提交 offset

    // 是否自动提交 offset,默认就是 true
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    // 自动提交 offset 的间隔时间
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    

    注意:如果消费者还没消费完 poll 下来的消息就自动提交了偏移量,此时消费者挂了,于是下⼀个消费者会从已提交的 offset 的下⼀个位置开始消费消息,之前未被消费的消息就丢失掉了

  • 手动提交:需要把自动提交的配置改成 false

    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    

    手动提交又分成了两种:

    • 手动同步提交

      在消费完消息后调用同步提交的方法,当集群返回 ack 前⼀直阻塞,返回 ack 后表示提交成功,执行之后的逻辑

      while (true) {
          /*
           * poll() API 是拉取消息的⻓轮询
           */
       	ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
          for (ConsumerRecord<String, String> record : records) {
              System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),record.offset(), record.key(), record.value());
          }
       	// 所有的消息已消费完
       	if (records.count() > 0) { // 有消息
       		// ⼿动同步提交 offset, 当前线程会阻塞直到 offset 提交成功
       		// ⼀般使⽤同步提交, 因为提交之后⼀般也没有什么逻辑代码了
              consumer.commitSync(); // ====阻塞=== 提交成功
          }
      }
      
    • 手动异步提交

      在消息消费完后提交,不需要等到集群 ack,直接执行之后的逻辑,可以设置⼀个回调方法,供集群调用

      while (true) {
          /*
           * poll() API 是拉取消息的⻓轮询
           */
          ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
          for (ConsumerRecord<String, String> record : records) {
              System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
          }
       	// 所有的消息已消费完
       	if (records.count() > 0) {
       		// 手动异步提交 offset,当前线程提交 offset 不会阻塞,可以继续处理后⾯的程序逻辑
       		consumer.commitAsync(new OffsetCommitCallback() {
       			@Override
       			public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                      if (exception != null) {
       					System.err.println("Commit failed for " + offsets);
       					System.err.println("Commit failed exception: " + exception.getStackTrace());
                      }
                  }
              });
          }
      }
      
2.3 长轮询 poll 消息

消费者建立与 broker 之间的长连接,开始 poll 消息,默认⼀次 poll 五百条消息

// ⼀次 poll 最⼤拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500)

可以根据消费速度的快慢来设置,如果两次 poll 的时间超出了 30s 的时间间隔,kafka 会认为其消费能力过弱,将其踢出消费组,将分区分配给其他消费者

代码中设置了长轮询的时间是 1000 毫秒

while (true) {
    /*
     * poll() API 是拉取消息的⻓轮询
     */
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(), record.offset(), record.key(), record.value());
    }
}
  • 如果⼀次 poll 到 500 条,就直接执行 for 循环
  • 如果这⼀次没有 poll 到 500 条,且时间在1秒内,那么长轮询继续 poll,要么到 500 条,要么到 1s
  • 如果多次 poll 都没达到 500 条,且 1 秒时间到了,那么直接执行 for 循环
2.4 健康状态检查

消费者每隔 1s 向 Kafka 集群发送心跳,集群发现如果有超过 10s 没有续约的消费者,将被踢出消费组,触发该消费组的 rebalance 机制,将该分区交给消费组里的其他消费者进行消费

// consumer 给 broker 发送⼼跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
// kafka 如果超过 10 秒没有收到消费者的⼼跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000)
2.5 指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
2.6 消息回溯消费

也即从头开始消费消息

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME,
0)));
2.7 指定偏移量消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);
2.8 从指定时间点消费

根据时间,去所有的 partition 中确定该时间对应的 offset,然后去所有的 partition 中找到该 offset 之后的消息开始消费

List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
// 从一小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
    map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
}
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
    TopicPartition key = entry.getKey();
    OffsetAndTimestamp value = entry.getValue();
    if (key == null || value == null) {
    	continue;
    }
    // 根据消费⾥的 timestamp 确定 offset
    Long offset = value.offset();
    System.out.println("partition-" + key.partition() + "|offset-" + offset);
    if (value != null) {
        consumer.assign(Arrays.asList(key));
        consumer.seek(key, offset);
    }
}
2.9 新消费组的消费 offset 规则

新消费组中的消费者在启动以后,默认会从当前分区的最后⼀条消息的 offset+1 开始消费(消费新消息),可以通过以下的设置,让新的消费者第⼀次从头开始消费,之后开始消费新消息(最后消费的位置的偏移量 +1)

  • Latest:默认的,消费新消息
  • earliest:第⼀次从头开始消费,之后开始消费新消息(最后消费的位置的偏移量 +1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

SpringBoot 中使用 Kafka

1. 引入依赖
<dependency>
	<groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
2. 编写配置文件
server:
	port: 8080
spring:
	kafka:
		bootstrap-servers: 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094
 		producer: # ⽣产者
 			retries: 3 # 设置⼤于0的值,则客户端会将发送失败的记录重新发送
 			batch-size: 16384
 			buffer-memory: 33554432
 			acks: 1
 			# 指定消息key和消息体的编解码⽅式
 			key-serializer: org.apache.kafka.common.serialization.StringSerializer
 			value-serializer: org.apache.kafka.common.serialization.StringSerializer
        consumer:
        	group-id: default-group
        	enable-auto-commit: false
        	auto-offset-reset: earliest
        	key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        	value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        	max-poll-records: 500
 		listener:
 			# 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交
 			# RECORD
 			# 当每⼀批 poll() 的数据被消费者监听器(ListenerConsumer)处理之后提交
 			# BATCH
 			# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交
 			# TIME
 			# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交
 			# COUNT
 			# TIME | COUNT 有⼀个条件满⾜时提交
 			# COUNT_TIME
 			# 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交
 			# MANUAL
 			# 手动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种
 			# MANUAL_IMMEDIATE
 			ack-mode: MANUAL_IMMEDIATE
 redis:
 	host: 172.16.253.21
3. 编写生产者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/msg")
public class MyKafkaController {

    private final static String TOPIC_NAME = "my-replicated-topic";

    @Autowired
 	private KafkaTemplate<String,String> kafkaTemplate;
    
    @RequestMapping("/send")
 	public String sendMessage(){
        kafkaTemplate.send(TOPIC_NAME,0,"key","this is a message!");
        return "send success!";
    }
}
4. 编写消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

@Component
public class MyConsumer {
    
    @KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")
    public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println(value);
        System.out.println(record);
        // 手动提交offset
        ack.acknowledge();
    }
}

配置消费主题、分区和偏移量

@KafkaListener(groupId = "testGroup", topicPartitions = {
	@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
	@TopicPartition(topic = "topic2", partitions = "0",
                    partitionOffsets = 	@PartitionOffset(partition = "1", initialOffset = "100"))},concurrency = "3") // concurrency 就是同组下的消费者个数,就是并发消费数,建议⼩于等于分区总数
public void listenGroupPro(ConsumerRecord<String, String> record, Acknowledgment ack) {
    String value = record.value();
    System.out.println(value);
    System.out.println(record);
 	// 手动提交offset
    ack.acknowledge();
}

Kafka 集群 Controller、Rebalance、HW

1. Controller

Kafka 集群中的 broker 在 zookeeper 中创建临时序号节点,序号最小的节点(最先创建的节点)将作为集群的 controller,负责管理整个集群中的所有分区和副本的状态:

  • 当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本,选举的规则是从 isr 集合中最左边获取
  • 当集群中有 broker 新增或减少,controller 会同步信息给其他 broker
  • 当集群中有分区新增或减少,controller 会同步信息给其他 broker
2. Rebalance

如果消费者没有指明分区消费,那么当消费组里消费者和分区的关系发生变化,就会触发 rebalance 机制,重新调整消费者该消费哪个分区

在触发 rebalance 机制之前,消费者消费哪个分区有三种分配策略:

  • range:通过公式来计算某个消费者消费哪个分区,公式为:前面的消费者是 (分区总数/消费者数量)+1,之后的消费者是 分区总数/消费者数量
  • 轮询:大家轮着来
  • sticky:粘合策略,如果需要 rebalance,会在之前已分配的基础上调整,不会改变之前的分配情况。如果这个策略没有开,那么就要全部重新分配,所以建议开启
3. HW 和 LEO

LEO 是某个副本最后消息的消息位置(log-end-offset),HW 是已完成同步的位置。消息在写入 broker 时,且每个 broker 完成这条消息的同步后,HW 才会变化。在这之前,消费者是消费不到这条消息的,在同步完成之后,HW 更新之后,消费者才能消费到这条消息,这样的目的是防止消息的丢失


Kafka 线上问题优化

1. 防止消息丢失

生产者:

  1. 使用同步发送
  2. 把 ack 设成 1 或者 all,并且设置同步的分区数 >= 2

消费者:

  1. 把自动提交改成手动提交
2. 防止重复消费

如果生产者发送完消息后,却因为网络抖动,没有收到 ack,但实际上 broker 已经收到了。此时生产者会进行重试,于是 broker 就会收到多条相同的消息,而造成消费者的重复消费

解决方案:

  • 生产者关闭重试,但会造成丢消息,不建议
  • 消费者解决非幂等性消费问题,所谓非幂等性,就是用户对于同一操作发起的一次请求或者多次请求的结果是一致的,可以用唯一主键或分布式锁来实现
3. 保证消息的顺序消费

生产者:使用同步发送,ack 设置成非 0 的值

消费者:主题只能设置⼀个分区,消费组中只能有⼀个消费者

4. 解决消息积压

所谓消息积压,就是消息的消费者的消费速度远赶不上生产者的生产消息的速度,导致 kafka 中有大量的数据没有被消费。随着没有被消费的数据堆积越多,消费者寻址的性能会越来越差,最后导致整个 kafka 对外提供的服务的性能很差,从而造成其他服务也访问速度变慢,造成服务雪崩

解决方案:

  • 在这个消费者中,使用多线程,充分利用机器的性能消费消息
  • 通过业务的架构设计,提升业务层面消费的性能
  • 创建多个消费组,多个消费者,部署到其他机器上,⼀起消费,提高消费者的消费速度
  • 创建⼀个消费者,该消费者在 kafka 另建⼀个主题,配上多个分区,多个分区再配上多个消费者。该消费者将 poll 下来的消息,不进行消费,直接转发到新建的主题上。此时,新的主题的多个分区的多个消费者就开始⼀起消费了

5. 实现延时队列

假设一个应用场景:订单创建后,超过 30 分钟没有支付,则需要取消订单,这种场景可以通过延时队列来实现,实现方案如下:

  • 在 Kafka 创建相应的主题,比如该主题的超时时间定为 30 分钟
  • 消费者消费该主题的消息(轮询)
  • 消费者消费消息时,判断消息的创建时间和当前时间是否超过 30 分钟(前提是订单没有完成支付)
    • 超过:数据库修改订单状态为已取消
    • 没有超过:记录当前消息的 offset,并不再继续消费之后的消息。等待 1 分钟后,再次从 Kafka 拉取该 offset 及之后的消息,继续判断,以此反复

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐