一、问题描述

因业务需要,需要实现在不停止 kafka 消费者的情况下修改订阅的主题,在实现这个需求之前先给一个正常的 kafka 的生产消费 demo,个人一直在使用这个模板

二、代码模板

2.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>tech.kpretty</groupId>
    <artifactId>kafka-example</artifactId>
    <version>1.0</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- kafka 客户端 -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.3.0</version>
        </dependency>
        <!-- 日志依赖 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.36</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.17.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- 打包插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>tech.kpretty.Application</mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

2.2 工具类

package tech.kpretty.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class AllUtils {
    // 默认配置文件根目录
    public static final String DEFAULT_BASE_CONF_PATH = System.getProperty("user.dir") + File.separator + "conf" + File.separator;
    // 默认消费者配置文件
    public static final String DEFAULT_CONSUMER_CONF_PATH = DEFAULT_BASE_CONF_PATH + "mqs.sdk.consumer.properties";
    // 默认生产者配置文件
    public static final String DEFAULT_PRODUCER_CONF_PATH = DEFAULT_BASE_CONF_PATH + "mqs.sdk.producer.properties";
    // 默认项目其他配置
    private static final String DEFAULT_CONF_PATH = DEFAULT_BASE_CONF_PATH + "conf.properties";

    private static final Logger LOG = LoggerFactory.getLogger(AllUtils.class);

    private static final Properties properties = new Properties();

    static {
        try {
            properties.load(new FileInputStream(DEFAULT_CONF_PATH));
        } catch (IOException e) {
            LOG.error("读取项目配置文件错误,错误代码-2,错误原因:", e);
            System.exit(-2);
        }
    }

    public static List<String> getConsumerTopics() {
        return Arrays.asList(properties.getProperty("consumer.topics").split(","));
    }

    public static String getProducerTopic(){
        return properties.getProperty("producer.topic");
    }
}

2.3 生产者

package tech.kpretty.producer;

import tech.kpretty.util.AllUtils;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.Properties;
import java.util.UUID;

/**
 * @author wjun
 * @date 2022/4/8 11:37 am
 */
public class ServerProducer implements Runnable {
    private final KafkaProducer<String, String> producer;
    private volatile boolean isRunning = true;
    private static final Logger LOG = LoggerFactory.getLogger(ServerProducer.class);


    public ServerProducer(String path) {
        Properties properties = new Properties();
        try {
            properties.load(new FileInputStream(path));
        } catch (IOException e) {
            LOG.error("读取生产者配置文件错误,错误代码-3,错误原因:", e);
            System.exit(-3);
        }
        producer = new KafkaProducer<>(properties);
        // 注册 hook,监听 ctrl + c
        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
    }

    public ServerProducer() {
        this(AllUtils.DEFAULT_PRODUCER_CONF_PATH);
    }

    @Override
    public void run() {
        while (isRunning) {
            send();
        }
    }

    public void send() {
        // todo 这里修改发送的业务逻辑
        send(UUID.randomUUID().toString());
    }

    public void send(String message) {
        send(message, null);
    }

    public void send(String message, String key) {
        send(message, key, null);
    }

    public void send(String message, String key, Integer partitionNum) {
        ProducerRecord<String, String> record = new ProducerRecord<>(AllUtils.getProducerTopic(), partitionNum, key, message);
        producer.send(record, (metadata, exception) -> {
            if (null == exception) {
                LOG.info("消息发送成功{},元数据 {}", record, metadata);
            } else {
                LOG.error("消息发送失败,错误原因", exception);
            }
        });
    }

    public void close() {
        isRunning = false;
    }
}

2.3 消费者

package tech.kpretty.consumer;

import tech.kpretty.util.AllUtils;
import org.apache.kafka.clients.consumer.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * @author wjun
 * @date 2022/4/8 11:37 am
 */
public class ServerConsumer implements Runnable {
    private final KafkaConsumer<String, String> consumer;
    private volatile boolean isRunning = true;
    private volatile boolean flag;

    private static final Logger LOG = LoggerFactory.getLogger(ServerConsumer.class);


    public ServerConsumer(String path) {
        Properties properties = new Properties();
        try {
            properties.load(new FileInputStream(path));
        } catch (IOException e) {
            LOG.error("读取消费者配置文件错误,错误代码-1,错误原因:", e);
            System.exit(-1);
        }
        consumer = new KafkaConsumer<>(properties);
        // 订阅 topics
        consumer.subscribe(AllUtils.getConsumerTopics());
        // 注册 hook,监听 ctrl + c
        Runtime.getRuntime().addShutdownHook(new Thread(this::close));
    }

    public ServerConsumer() {
        this(AllUtils.DEFAULT_CONSUMER_CONF_PATH);
    }

    @Override
    public void run() {
        while (isRunning) {
            receive();
        }
    }

    public void receive() {
        flag = true;
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
        if (!records.isEmpty()) {
            for (ConsumerRecord<String, String> record : records) {
                // todo 这里为业务逻辑处理
                LOG.info("mqs message:{}", record);
            }
            // 建议手动提交偏移量
            // 若自动提交,请将consumer配置文件中enable.auto.commit删除,并注释下面代码
            commitOffset();
        }
        flag = false;
    }

    /**
     * 异步提交偏移量
     */
    private void commitOffset() {
        consumer.commitAsync((offsets, exception) -> {
            if (null == exception) {
                LOG.info("偏移量提交成功,元数据:{}", offsets);
            } else {
                LOG.error("偏移量提交失败,错误信息:", exception);
            }
        });
    }

    /**
     * 安全关闭消费者
     */
    public void close() {
        isRunning = false;
        // 关闭之前同步提交一次
        do {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } while (flag);
        consumer.commitSync();
        consumer.close();
    }
}

2.4 启动类

package tech.kpretty;

import tech.kpretty.consumer.ServerConsumer;
import tech.kpretty.producer.ServerProducer;

/**
 * @author wjun
 * @date 2022/4/8 11:29 am
 */
public class Application {
    public static void main(String[] args) {
        // 启动消费者
        ServerConsumer consumer = new ServerConsumer();
        new Thread(consumer).start();

        // 启动生产者
        ServerProducer producer = new ServerProducer();
        new Thread(producer).start();
    }
}

2.5 日志配置

log4j.rootLogger=INFO,console,daily
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Target=System.out
log4j.appender.console.Threshold=DEBUG
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=[%5p][%d{yyyy-MM-dd HH:mm:ss}][%10t][%c]%m%n

#log4j.appender.file=org.apache.log4j.RollingFileAppender
#log4j.appender.file.File=./log/xxx.log
#log4j.appender.file.MaxFileSize=100mb
#log4j.appender.file.Threshold=DEBUG
#log4j.appender.file.MaxBackupIndex=100
#log4j.appender.file.layout=org.apache.log4j.PatternLayout
#log4j.appender.file.layout.ConversionPattern=[%5p][%d{yyyy-MM-dd HH:mm:ss}][%10t][%c]%m%n

log4j.appender.daily=org.apache.log4j.DailyRollingFileAppender
log4j.appender.daily.File=./log/xxx.log
log4j.appender.daily.DatePattern='.'yyyy-MM-dd
log4j.appender.daily.layout=org.apache.log4j.PatternLayout
log4j.appender.daily.layout.ConversionPattern=[%5p][%d{yyyy-MM-dd HH:mm:ss}][%10t][%c]%m%n

到这里个人认为一个很标准的 kafka example demo 就完成,喜欢的可以拿去直接用,只需要在根目录创建 conf 文件夹,生产者配置为:mqs.sdk.producer.properties,消费者配置为:mqs.sdk.producer.properties,项目其他配置为:conf.properties

三、需求实现

最简单的实现就是每次 poll 后或者 poll 前重新订阅一次 topics 如下:

public void receive() {
  flag = true;
  ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
  if (!records.isEmpty()) {
    for (ConsumerRecord<String, String> record : records) {
      // todo 这里为业务逻辑处理
      LOG.info("mqs message:{}", record);
    }
    // 建议手动提交偏移量
    // 若自动提交,请将consumer配置文件中enable.auto.commit删除,并注释下面代码
    commitOffset();
  }
  // 订阅 topics
  consumer.subscribe(AllUtils.getConsumerTopics());
  flag = false;  
}

但是通常情况下我们的订阅不会这么频繁的修改,而且每次 poll 重新订阅也带来了不必要的开销(新的订阅会发送 re-join 请求),因此我们希望是当项目的配置文件发生改变的时候我们再去修改订阅,因此思路就明确了:新增一个线程用来实时监听配置文件,当配置文件发生修改时调用消费者的subscribe,为了不影响消费者不建议在业务端添加这个逻辑

需要加一个消费者重新订阅的方法,用来给新增的线程调用

public void update(List<String> topics) {
  consumer.subscribe(topics);
  LOG.info("配置信息已更新");
  LOG.info("任务重新启动");
}

这里我通过判断配置文件的修改时间来确定文件是否发生了修改,代码如下:

package tech.kpretty.util;

import tech.kpretty.consumer.ServerConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
 * @author wjun
 * @date 2022/4/11 17:29 am
 */
public class AutoLoadTopic implements Runnable {
    private final ServerConsumer consumer;
    private final Properties properties;
    private final File file;
    private long lastModifiedTime;
    private static final Logger LOG = LoggerFactory.getLogger(AutoLoadTopic.class);

    public AutoLoadTopic(ServerConsumer consumer) {
        this.consumer = consumer;
        this.properties = new Properties();
        this.file = new File(AllUtils.DEFAULT_CONF_PATH);
        this.lastModifiedTime = file.lastModified();
    }

    @Override
    public void run() {
        while (true) {
            // 获取配置文件路径
            long currentLastModifiedTime = file.lastModified();
            // 如果当前文件的修改时间大于上一次文件修改时间,则代表配置文件已经更新
            if (currentLastModifiedTime > lastModifiedTime) {
                // 更新配置
                consumer.update(getTopics());
                // 修改时间
                lastModifiedTime = currentLastModifiedTime;
            }

            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private List<String> getTopics() {
        try {
            // 清空配置
            properties.clear();
            properties.load(new FileInputStream(file));
        } catch (IOException e) {
            LOG.error("更新配置信息发生致命错误,错误代码-5,错误原因", e);
            System.exit(-5);
        }
        return Arrays.asList(properties.getProperty("consumer.topics").split(","));
    }
}

随即在 Application 中启动这个线程即可

public class Application {
    public static void main(String[] args) {
        // 启动消费者
        ServerConsumer consumer = new ServerConsumer();
        new Thread(consumer).start();

        // 启动生产者
        //ServerProducer producer = new ServerProducer();
        //new Thread(producer).start();

        // 启动动态配置线程
        AutoLoadTopic autoLoadTopic = new AutoLoadTopic(consumer);
        new Thread(autoLoadTopic).start();
    }
}

看起来很完美,但是消费者一定会报错!!!因为 kafka 的 Consumer 是线程不完全的,不允许多线程同时去操作,从源码可以看出

private void acquire() {
  long threadId = Thread.currentThread().getId();
  if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
    throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
  refcount.incrementAndGet();
}

这个方法在 Consumer 多个常用的方法中被调用,如:subscribe、poll、assignment、commitAsync 等等,因此我们在重新订阅的时候一定要保证消费者处于“空闲状态”,这里采用 CAS 的思想,修改部分代码:

// 是否更新标记
private volatile boolean isUpdate = false;
@Override
public void run() {
  while (isRunning) {
    if (!isUpdate) {
      receive();
    }
  }
}

需要更新的时候跳过 receive 进行一段时间的空轮序,但可能发生当修改 isUpdate 是消费者正在处理数据,因此必须等到 receive 结束才可以修改,因此修改 update 逻辑

public void update(List<String> topics) {
  isUpdate = true;
  do {
    try {
      TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  } while (flag);
  consumer.subscribe(topics);
  LOG.info("配置信息已更新");
  isUpdate = false;
  LOG.info("任务重新启动");
}

因为消费者在消费的时候来修改 flag,当开始消费将 flag 置为 true,结束消费将 flag 置为 false,这里的等待可有可无。当配置更新完记得将 isUpdate 置为 false 让消费者开始 poll,这样就实现了这个需求。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐