Kafka再体验

一、Kafka的Java客户端

1. 生产者

1. 依赖
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.4.1</version>
</dependency>
2. 生产者发送消息的基本实现
//消息的发送方
public class MyProducer {

  private final static String TOPIC_NAME = "my-replicated-topic";

  public static void main(String[] args) throws ExecutionException, InterruptedException {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094");
         
    //把发送的key从字符串序列化为字节数组
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    //把发送消息value从字符串序列化为字节数组
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    Producer<String, String> producer = new KafkaProducer<String, String>(props);
 
    Order order = new Order((long) i, i);
    
    ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
        , order.getOrderId().toString(), JSON.toJSONString(order));
    //等待消息发送成功的同步阻塞方法
    RecordMetadata metadata = producer.send(producerRecord).get();
    //=====阻塞=======
    System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
                    + metadata.partition() + "|offset-" + metadata.offset());
   
  }
}
3. 发送消息到指定分区上
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
   , 0, order.getOrderId().toString(), JSON.toJSONString(order));
4. 未指定分区,则会通过业务key的hash运算,算出消息往哪个分区上发
//未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
  , order.getOrderId().toString(), JSON.toJSONString(order));
5. 同步发送
//等待消息发送成功的同步阻塞方法
 RecordMetadata metadata = producer.send(producerRecord).get();
            System.out.println("同步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
                    + metadata.partition() + "|offset-" + metadata.offset());

在这里插入图片描述

6. 异步发送消息

直接执行下面的业务逻辑。可以提供callback,让broker异步的调用callback,告知生产者,消息发送的结果

//要发送5条消息
      Order order = new Order((long) i, i);
      //指定发送分区
            ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(TOPIC_NAME
                    , 0, order.getOrderId().toString(), JSON.toJSONString(order));
       //异步回调方式发送消息
      producer.send(producerRecord, new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception exception) {
          if (exception != null) {
            System.err.println("发送消息失败:" + exception.getStackTrace());

          }
          if (metadata != null) {
            System.out.println("异步方式发送消息结果:" + "topic-" + metadata.topic() + "|partition-"
              + metadata.partition() + "|offset-" + metadata.offset());
          }
        }
      });

8. 其他一些细节
  • 发送会默认会重试3次,每次间隔100ms
    发送的消息会先进入到本地缓冲区(32mb),kakfa会跑一个线程,该线程去缓冲区中取16k的数据,发送到kafka,如果到10毫秒数据没取满16k,也会发送一次。

二、消费者

1. 消费者消费消息的基本实现

public class MyConsumer {

  private final static String TOPIC_NAME = "my-replicated-topic";
  private final static String CONSUMER_GROUP_NAME = "testGroup";

  public static void main(String[] args) {
    Properties props = new Properties();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.31.167.10:9092,10.31.167.10:9093,10.31.167.10:9094");
    // 消费分组名
    props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    //创建一个消费者的客户端
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    // 消费者订阅主题列表
    consumer.subscribe(Arrays.asList(TOPIC_NAME));
  
    while (true) {
      /*
       * poll() API 是拉取消息的长轮询
       */
      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
      for (ConsumerRecord<String, String> record : records) {
        System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
          record.offset(), record.key(), record.value());
      }
    }
  }
}

2. 自动提交offset

  • 自动: 自动提交会丢消息: 因为如果消费者还没消费完poll下来的消息就自动提交了偏移量,那么此时消费者挂了,于是下一个消费者会从已提交的offset的下一个位置开始消费消息。之前未被消费的消息就丢失掉了。
  • 消费者poll到消息后默认情况下,会自动向broker的_consumer_offsets主题提交当前主题-分区消费的偏移量。
// 是否自动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");

3. 手动提交offset

  • 设置手动提交参数
  • 在消费完消息后进行手动提交
1. 手动同步提交
if (records.count() > 0) {
   // 手动同步提交offset,当前线程会阻塞直到offset提交成功
   // 一般使用同步提交,因为提交之后一般也没有什么逻辑代码了
   consumer.commitSync();
}
2. 手动异步提交
if (records.count() > 0) {
   // 手动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后面的程序逻辑
   consumer.commitAsync(new OffsetCommitCallback() {
      @Override
      public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null) {
                System.err.println("Commit failed for " + offsets);
                System.err.println("Commit failed exception: " + exception.getStackTrace());
            }
      }
   });
}

4. 消费者poll消息的过程

  1. 消费者建立了与broker之间的长连接,开始poll消息
    • 默认一次poll500条消息
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
  1. 可以根据消费速度的快慢来设置,因为如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能力过弱,将其踢出消费组。将分区分配给其他消费者。可以通过这个值进行设置:
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
  1. 如果每隔1s内没有poll到任何消息,则继续去poll消息,循环往复,直到poll到消息。如果超出了1s,则此次长轮询结束。
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
  1. 消费者发送心跳的时间间隔
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
  1. kafka如果超过10秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);

5. 指定分区消费

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

6. 消息回溯消费

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));

7. 指定offset消费

consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seek(new TopicPartition(TOPIC_NAME, 0), 10);

8. 从指定时间点消费

List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
    //从1小时前开始消费
    long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
    Map<TopicPartition, Long> map = new HashMap<>();
    for (PartitionInfo par : topicPartitions) {
        map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
    }
    Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
    for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
        TopicPartition key = entry.getKey();
        OffsetAndTimestamp value = entry.getValue();
        if (key == null || value == null) continue;
        Long offset = value.offset();
        System.out.println("partition-" + key.partition() + "|offset-" + offset);
        System.out.println();
        //根据消费里的timestamp确定offset
        if (value != null) {
            consumer.assign(Arrays.asList(key));
            consumer.seek(key, offset);
        }
    }

9. 新消费组的消费偏移量

  1. 当消费主题的是一个新的消费组,或者指定offset的消费方式,offset不存在,那么应该如何消费?
  2. latest(默认) :只消费自己启动之后发送到主题的消息
  3. earliest:第一次从头开始消费,以后按照消费offset记录继续消费,这个需要区别于consumer.seekToBeginning(每次都从头开始消费)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

二、Springboot中使用Kafka

1. 引入依赖

<dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
 </dependency>

2. 配置文件

server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: 172.16.253.21:9093
    producer: # 生产者
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 500
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
      # MANUAL_IMMEDIATE
      ack-mode: MANUAL_IMMEDIATE
  redis:
    host: 172.16.253.21

3. 消息生产者

发送消息到指定topic

@RestController
public class KafkaController {
  private final static String TOPIC_NAME = "my-replicated-topic";

  @Autowired
  private KafkaTemplate<String, String> kafkaTemplate;

  @RequestMapping("/send")
  public void send() {
    kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");
  }
}

4. 消息消费者

  1. 设置消费组,消费指定topic
@KafkaListener(topics = "my-replicated-topic",groupId = "MyGroup1")
  public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
    String value = record.value();
    System.out.println(value);
    System.out.println(record);
    //手动提交offset
    ack.acknowledge();
  }
  1. 设置消费组、多topic、指定分区、指定偏移量消费及设置消费者个数。
  @KafkaListener(groupId = "testGroup", topicPartitions = {
      @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
      @TopicPartition(topic = "topic2", partitions = "0",
       partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
    },concurrency = "3")//concurrency就是同组下的消费者个数,就是并发消费数,建议小于等于分区总数
  public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
    String value = record.value();
    System.out.println(value);
    System.out.println(record);
    //手动提交offset
    ack.acknowledge();
  }

四、Kafka集群Controller、Rebalance和HW

1. Controller

Kafka集群中的broker在zk中创建临时序号节点,序号最小的节点(最先创建的节点)将作为集群的controller,负责管理整个集群中的所有分区和副本的状态:

  1. 当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本
  2. 当检测到某个分区的ISR集合发⽣变化时,由控制器负责通知所有broker更新其元数据信息
  3. 当使⽤kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责让新分区被其他节点感知到。

2. Rebalance机制

  • 前提是:消费者没有指明分区消费。
  • 当消费组里消费者和分区的关系发生变化,那么就会触发rebalance机制。
  • 这个机制会重新调整消费者消费哪个分区。在触发rebalance机制之前,消费者消费哪个分区有三种策略:
    1. range:通过公式来计算某个消费者消费哪个分区
    1. 轮询:大家轮着消费
    1. ticky:在触发了rebalance后,在消费者消费的原分区不变的基础上进行调整。

3. HW与LEO

  1. HW俗称⾼⽔位,HighWatermark的缩写,取⼀个partition对应的ISR中最⼩的LEO(log-end-offset)作为HW(木桶效应,最短板),consumer最多只能消费到HW所在的位置
  2. 每个replica都有HW,leader和follower各⾃负责更新⾃⼰的HW的状态
  3. 对于leader新写⼊的消息,consumer不能⽴刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW,此时消息才能被consumer消费
  4. 这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获取。

五、Kafka线上问题优化

1. 如何防止消息丢失

  1. 发送方:ack设置成all或者-1,把min.insync.replicas配置成分区备份数
  2. 消费方:把自动提交改成手动提交(消费完成之后在手动提交)

2. 如何防止消息的重复消费

保证消费消息的幂等性

  1. mysql中加业务id,并设置为主键,唯一
  2. 使用redis或者zk分布式锁

3. 如何做到顺序消费RocketMQ

  1. 发送方:确保消息是按顺序发送的,ack不能设置成0,关闭重试,使用同步发送,等待成功发送,再发送下一条。
  2. 消费方:消息是发送到一个分区中,只能有一个消费组的消费者来接收消息,因此会牺牲性能

4. 解决消息积压问题

消息积压会导致很多问题,比如磁盘被打满、生产端发消息导致kafka性能过慢,就容易出现服务雪崩,就需要有相应的手段:

  1. 提升一个消费者的消费能力:在一个消费者中启动多个线程,让多个线程同时消费
  2. 充分利用个服务器的CPU资源:若方案一不够,则启动多个消费者,部署在不同的服务器上
  3. 让一个消费者去把收到的消息发往另一个topic上,另一个topic设置多个分区和多个消费者,进行具体的业务消费

5. 延迟队列

延迟队列是为了解决任务推迟执行的问题,消息进入延迟队列之后暂时不能被消费,等超过了设定的时间才能被消费者进行消费

延迟队列的应用场景:在订单创建成功后如果超过30分钟没有付款,则需要取消订单,此时可用延时队列来实现

  1. 创建多个topic,每个topic表示延时的间隔
    • topic_5s: 延时5s执行的队列
    • topic_1m: 延时1分钟执行的队列
    • topic_30m: 延时30分钟执行的队列
  2. 消息生产者,发送消息到相应的topic,并带上消息的发送时间
  3. 消费者订阅相应的topic,消费时轮询消费整个topic中的消息
    • 消费者消费消息时判断消息的创建时间和当前时间是否超过30分钟(前提是订单没支付)
    • 如果是:去数据库中修改订单状态为已取消
    • 如果否︰记录当前消息的offset,并不再继续消费之后的消息。等待1分钟后,再次向kafka拉取该offset及之后的消息,继续进行判断,以此反复。

在这里插入图片描述

六、Kafka-eagle监控平台

安装Kafka-eagle

  • 官网下载压缩包

http://www.kafka-eagle.org/

  • 安装jdk,配置环境变量,也注意把KE_HOME也配好
  • 解压缩后修改配置文件 system-config.properties
# 配置zk
cluster1.zk.list=172.16.253.35:2181
# 配置mysql
kafka.eagle.driver=com.mysql.cj.jdbc.Driver
kafka.eagle.url=jdbc:mysql://172.16.253.22:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456
  • 进入到bin目录,为ke.sh增加可执行的权限
chmod +x ke.sh
  • 启动kafka-eagle
./ke.sh start
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐