点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(正在更新…)

章节内容

上节我们完成了如下内容:

  • 生产者阶段丢失
  • 生产者Broker阶段丢失
  • 消费者丢失消息

在这里插入图片描述

延时队列

基本概念

两个Follower副本都已经拉取到了Leader副本的最新位置,此时又向Leader副本发送拉取请求,而Leader副本并没有新的消息写入,那么此时Leader副本应该如何处理问题呢?可以直接返回空的拉取结果给Follower副本,不过在Leader副本一直没有新消息写入的情况下,Follower副本会一直发送拉取请求,并且总收到空的拉取结果,消耗资源。
在这里插入图片描述
Kafka在处理拉取请求时,会先读取一次日志文件,如果收集不到足够多(fetchMinBytes),由参数(fetch.min.bytes配置,默认为1),那么就会创建一个延时拉取的操作(DelyedFetch)以等待拉取足够数量的消息。当延时拉取操作执行时,会再读取一次日志,然后将拉去结果返回给Follower副本。

延时操作不只是拉取消息时特有的操作,在Kafka中也有多种演示操作,比如延时数据删除、延时生产等。
对于延时生产而言,如果在使用生产者客户端发送消息的时候将acks设置为-1,那么意味着需要等待ISR集合中所有副本都确认收到消息之后才能正确的响应结果,或者捕获超时异常。
在这里插入图片描述
假设某个分区有3个副本,Leader、Follower1和Follower2,他们都在分区的ISR集合中。不考虑ISR变动的情况,Kafka在收到客户端的生产请求之后,将消息3和合消息4写入Leader副本的本地日志文件。

由于客户端设置了acks=-1,那么需要等到Follower1和Follower2两个副本都收到消息3和消息4才能告知客户端正确的接收了所发送的消息。如果在一定时间内,Follower1副本和Follower2副本没有能够完全拉取到消息3和消息4,那么就需要返回超时异常给客户端。生产请求的超时时间由参数:request.timeout.ms配置,默认值为3000,即30秒。
在这里插入图片描述
那么这里等待消息3和消息4写入Follower1副本和Follower2副本,并返回相应的响应结果给客户端的动作是由谁来执行的?在将消息写入Leader副本的本地日志文件之后,Kafka会创建一个延时的生产操作DelayedProduce,用来处理消息正常写入所有副本或超时情况,以返回相应的响应结果给客户端。

延时操作

延时操作需要延时反应响应的结果,首先它必须有一个超时时间(delayMs),如果在这个超时时间内没有完成既定的任务,那么就需要强制完成以返回响应结果给客户端。其次,延时操作不同与定时操作,定时操作是指在特定时间之后执行的操作,而延时操作可以在所设定的超时时间之前完成,所以延时操作能够支持外部事件的触发。

延时生产操作

它的外部事件是所要写入消息的某个分区HW(HighWatermark)发生增长。也就是说,随着Follower副本不断的与Leader副本进行消息同步,进而促使HW进一步增长,HW每增长一次都会检测是否能够完成次延时生产操作,如果可以就执行以此返回响应结果给客户端,如果在超时时间内始终无法完成,则强制执行。

延迟拉取操作

是由超时触发或外部事件触发而被执行的,超时触发很好理解,就要等到超时时间之后触发第二次读取日志文件的操作。外部事件触发就稍微复杂了一些,因为拉取请求不单单由Follower副本发起,也可以由消费者客户端发起,两种情况所对应的外部事件也不同的。如果是Follower副本的延时拉取,它的外部事件就是消息追加到了Leader副本的本地日志文件中,如果是消费者客户端的延时拉取,它的外部事件可以简单的理解为HW的增长。

实现方式

  • 时间轮实现延时队列:TimeWheel,size,每个单元格的事件,每个单元格都代表一个时间,size*每个单元格的时间就是一个周期。

重试队列

基本概念

Kafka没有重试机制不支持消息重试,也没有死信队列,因此使用Kafka做消息队列时,需要自己实现消息重试的功能。

如何实现

创建新的Kafka主题作为重试队列:

  • 创建一个topic作为重试topic,用于接收等待重试的消息
  • 普通topic消费者设置等待重试消息的下一个重试topic
  • 从重试topic获取等待重试消息存储到redis的zset中,并以下一次消费时间排序
  • 定时任务从Redis获取到达消费事件的消息,并把消息发送到对应的topic
  • 同一个消息重试次数过多则不再重试

代码实现

新建项目

由于重复了很多次,且没什么技术难度,这里跳过。
我们新建一个Maven项目。

POM.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>springboot-kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

修改配置

spring:
  application:
    name: kafka-test
  kafka:
    bootstrap-servers: h121.wzk.icu:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: earliest
  redis:
    database: 0
    host: h121.wzk.icu
    port: 6379
    password:
    lettuce:
      pool:
        min-idle: 8
        max-idle: 500
        max-active: 2000
        max-wait: 10000
    timeout: 5000

server:
  port: 8085

启动类

package icu.wzk;


import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StartApp {

    public static void main(String[] args) {
        SpringApplication.run(StartApp.class, args);
    }

}

AppConfig

package icu.wzk.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;

@Configuration
public class AppConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 配置连接⼯⼚
        template.setConnectionFactory(factory);
        return template;
    }

}

KafkaController

package icu.wzk.controller;


import icu.wzk.service.KafkaService;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;

@RestController
public class KafkaController {

    @Resource
    private KafkaService kafkaService;

    private static final String TOPIC = "tp_demo_retry_01";

    @RequestMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) throws ExecutionException, InterruptedException {
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, message);
        String result = kafkaService.sendMessage(record);
        return result;
    }

}

KafkaService

package icu.wzk.service;


import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.concurrent.ExecutionException;

@Service
public class KafkaService {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public String sendMessage(ProducerRecord<String, String> record) throws ExecutionException, InterruptedException {
        SendResult<String, String> result = kafkaTemplate.send(record).get();
        RecordMetadata metadata = result.getRecordMetadata();
        String returnResult = metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset();
        System.out.println("发送消息: " + returnResult);
        return returnResult;
    }

}

ConsumerService

package icu.wzk.service;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class ConsumerListener {

    @Resource
    private KafkaRetryService kafkaRetryService;

    private static int index = 0;

    @KafkaListener(topics = "tp_demo_retry_01", groupId = "wzkicu")
    public void consumer(ConsumerRecord<String, String> record) {
        try {
            // 业务处理
            System.out.println("消费的消息: " + record);
            index ++;
            if (index % 2 == 0) {
                throw new Exception("重发消息");
            }
        } catch (Exception e) {
            // 消息重试
            kafkaRetryService.consumerLater(record);
        }
    }

}

KafkaRetryService

package icu.wzk.service;

import com.alibaba.fastjson.JSON;
import icu.wzk.model.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.nio.ByteBuffer;
import java.util.Calendar;
import java.util.Date;


@Service
public class KafkaRetryService {

    /**
     * 消费失败后下一次消息的延迟时间(秒)
     */
    private static final int[] RETRY_INTERVAL_SECONDS = {
            10, 30, 1 * 60, 2 * 60, 5 * 60, 10 * 60, 30 * 60,
            1* 60 * 60, 2 * 60 * 60
    };

    private static final String RETRY_TOPIC = "tp_demo_retry_02";

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void consumerLater(ConsumerRecord<String, String> record) {
        // 获取消息的已重试次数
        int retryTimes = getRetryTimes(record);
        Date nextConsumerTime = getNextConsumerTime(retryTimes);
        if (null == nextConsumerTime) {
            return;
        }
        // 组织消息
        RetryRecord retryRecord = new RetryRecord();
        retryRecord.setNextTime(nextConsumerTime.getTime());
        retryRecord.setTopic(record.topic());
        retryRecord.setRetryTimes(retryTimes);
        retryRecord.setKey(record.key());
        retryRecord.setValue(record.value());
        // 转换字符串
        String value = JSON.toJSONString(retryRecord);
        // 发到重试队列
        kafkaTemplate.send(RETRY_TOPIC, null, value);
    }

    /**
     * 获取消息已经重试的次数
     */
    private int getRetryTimes(ConsumerRecord record) {
        int retryTimes = -1;
        for (Header header :record.headers()) {
            if (RetryRecord.KEY_RETRY_TIMES.equals(header.key())) {
                ByteBuffer byteBuffer = ByteBuffer.wrap(header.value());
                retryTimes = byteBuffer.getInt();
            }
        }
        retryTimes ++;
        return retryTimes;
    }

    /**
     * 获取等待重试的下一次消息时间
     */
    private Date getNextConsumerTime(int retryTimes) {
        // 重试次数超过上限 不再重试
        if (RETRY_INTERVAL_SECONDS.length < retryTimes) {
            return null;
        }
        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.SECOND, RETRY_INTERVAL_SECONDS[retryTimes]);
        return calendar.getTime();
    }

}

RetryListener

package icu.wzk.service;


import com.alibaba.fastjson.JSON;
import icu.wzk.model.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Set;
import java.util.UUID;

@Component
@EnableScheduling
public class RetryListener {

    private static final String RETRY_KEY_ZSET = "_retry_key";
    private static final String RETRY_VALUE_MAP = "_retry_value";

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    private String topic = "tp_demo_retry_01";

    @KafkaListener(topics = "tp_demo_retry_02", groupId = "wzkicu")
    public void consumer(ConsumerRecord<String, String> record) {
        System.out.println("需要重试的消息: " + record);
        RetryRecord retryRecord = JSON.parseObject(record.value(), RetryRecord.class);
        // 防止待重试消息太多撑爆redis 可以将重试消息按下一次重试时间分开存储到不同介质中
        String key = UUID.randomUUID().toString();
        redisTemplate.opsForHash().put(RETRY_VALUE_MAP, key, record.value());
        redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, key, retryRecord.getNextTime());
    }

    @Scheduled(fixedDelay = 2000)
    public void retryFromRedis() {
        System.out.println("retry redis");
        long currentTime = System.currentTimeMillis();
        // 时间倒序获取
        Set<ZSetOperations.TypedTuple<Object>> typeTuples = redisTemplate
                .opsForZSet()
                .reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0, currentTime);
        // 移除取出的消息
        redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0, currentTime);
        for (ZSetOperations.TypedTuple<Object> tuple : typeTuples) {
            String key = tuple.getValue().toString();
            String value = redisTemplate.opsForHash().get(RETRY_VALUE_MAP, key).toString();
            redisTemplate.opsForHash().delete(RETRY_VALUE_MAP, key);
            RetryRecord retryRecord = JSON.parseObject(value, RetryRecord.class);
            ProducerRecord record = retryRecord.parse();
            ProducerRecord recordReal = new ProducerRecord(
                    topic, record.partition(), record.timestamp(),
                    record.key(), record.value(), record.headers()
            );
            kafkaTemplate.send(recordReal);
        }
    }

}

RetryRecord

package icu.wzk.model;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;


public class RetryRecord {

    public static final String KEY_RETRY_TIMES = "retryTimes";

    private String key;
    private String value;
    private Integer retryTimes;
    private String topic;
    private Long nextTime;

    public ProducerRecord parse() {
        Integer partition = null;
        Long timestamp = System.currentTimeMillis();
        List<Header> headers = new ArrayList<>();
        ByteBuffer retryTimesBuffer = ByteBuffer.allocate(4);
        retryTimesBuffer.putInt(retryTimes);
        retryTimesBuffer.flip();
        headers.add(new RecordHeader(RetryRecord.KEY_RETRY_TIMES, retryTimesBuffer));
        ProducerRecord sendRecord = new ProducerRecord(topic, partition, timestamp, key, value, headers);
        return sendRecord;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    public Integer getRetryTimes() {
        return retryTimes;
    }

    public void setRetryTimes(Integer retryTimes) {
        this.retryTimes = retryTimes;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public Long getNextTime() {
        return nextTime;
    }

    public void setNextTime(Long nextTime) {
        this.nextTime = nextTime;
    }
}

测试结果

Postman

在这里插入图片描述

控制台

在这里插入图片描述

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐