背景和原因分析

Kafka消费程序每次重启都会出现重复消费的情况,考虑是在kill掉程序的时候,有部分消费完的数据没有提交offsect。

props.setProperty("enable.auto.commit", "true");

此处表明自动提交,即延迟提交(poll的时候会根据配置的自动提交时间间隔去进行检测并提交)。当kill掉程序的时候,可能消费完的数据还没有到达提交的时间点程序就被kill掉了。

重复消费解决方案:

关闭自动提交,采用异步提交+同步提交的方式来提交offsect。

// 关闭自动提交
props.setProperty("enable.auto.commit", "false");
// 消费逻辑
try {
    while (true) {
        ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, byte[]> record : records) {
            // 具体业务逻辑
        }
        consumer.commitAsync();
    }
    System.out.println("while end.");
} catch (Exception e) {
    System.err.println("consume error..." + e.getMessage());
} finally {
    try {
        consumer.commitSync();
        System.out.println("commit sync suc.");
    } catch (Exception e) {
        System.err.println("commit sync error." + e.getMessage());
    } finally {
        consumer.close();
        System.out.println("close.");
    }
}

这样还不够,当kill掉程序的时候,会发现并没有走到finally中。说明线程非正常停止。

线程安全关闭解决方案:

1.使用线程池来运行线程
2.在实例销毁前使用结束标志手动停止线程
3.使用CountDownLatch等待线程停止

第一步:定义线程池

@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
    int cpuCoreNum = Runtime.getRuntime().availableProcessors();
    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
    threadPoolTaskExecutor.setCorePoolSize(cpuCoreNum);
    threadPoolTaskExecutor.setMaxPoolSize(cpuCoreNum * 2);
    threadPoolTaskExecutor.setQueueCapacity(2000);
    threadPoolTaskExecutor.setKeepAliveSeconds(60);
    threadPoolTaskExecutor.setThreadNamePrefix("global_thread_pool_task_executor");
    threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
    threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
    threadPoolTaskExecutor.setAwaitTerminationSeconds(10);// 确保该值是线程池中各个线程阻塞的最大时长
    threadPoolTaskExecutor.initialize();
    return threadPoolTaskExecutor;
}

此处两个配置参数至关重要
setWaitForTasksToCompleteOnShutdown(true)表示等待正在进行和排队的任务完成。
threadPoolTaskExecutor.setAwaitTerminationSeconds(10)虽然我们已经配置为等待正在进行和排队的任务完成,但Spring仍然会继续关闭容器的其余部分。这可能会释放任务执行器所需的资源,并导致任务失败。配置这个最大等待时间可以确保在指定的时间段内,容器级别的关闭过程将被阻止。
等待时间设置多少具体看线程池中业务线程最大耗时来定。
如果不停止线程,就会超过线程池的等待时间。通过以下WARN日志可以发现,在停止线程池的时候仍然存在业务线程没有停掉的情况,所以还需要定义一个标志来手动停止线程。

WARN 11472 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor  : Timed out while waiting for executor 'threadPoolTaskExecutor' to terminate

第二步:定义结束标志,并在对象销毁前停止线程

// 线程中断标志
public volatile boolean running = true;
while (running) {
	...
}

然后再实现DisposableBean接口中的destroy方法,在实例销毁之前将running置为false停止线程

@Override
public void destroy() throws Exception {
    this.running = false; // 循环并非立即停止,而是等到当前执行的循环体执行结束才会停止,所以这个地方的等待时间需要与线程池中的setAwaitTerminationSeconds参数相对应
}

当destroy方法运行结束,系统就会销毁掉当前实例,接着就会开始销毁当前实例的依赖(没有被其它实例所引用的话),而此时需要注意的是线程其实并没有运行结束。所以问题出现了:线程还在运行中,而运行所需要的资源(比如Redis连接资源)被提前关闭了,就会导致异常出现。所以在将running置为false之后还需要使用CountDwonLatch等待线程结束,再接着销毁其它依赖。
此处省略第三步,直接上完整的样例代码:

@Component
public class ConsumerClosedSafely implements CommandLineRunner, DisposableBean {

    private volatile boolean running = true;
    private final CountDownLatch latch = new CountDownLatch(1);
    private final String[] topics = new String[]{"test"};

    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;

    public void consume() throws Exception{
        Properties props = new Properties();
        //TODO 其它属性
        props.setProperty("enable.auto.commit", "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topics));
        // 消费逻辑
        try {
            while (running) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    //TODO 具体业务逻辑
                }
                consumer.commitAsync();
            }
            System.out.println("while end.");
        } catch (Exception e) {
            System.err.println("consume error..." + e.getMessage());
        } finally {
            try {
                consumer.commitSync();
                System.out.println("commit sync suc.");
            } catch (Exception e) {
                System.err.println("commit sync error." + e.getMessage());
            } finally {
                consumer.close();
                System.out.println("close.");
                // 计数器减一
                latch.countDown();
                System.out.println("latch count down .");
            }
        }
    }

    @Override
    public void run(String... args) throws Exception {
        Runnable r = ()->{
            try {
                consume();
            } catch (Exception e) {
                System.exit(1);
            }
        };
        threadPoolTaskExecutor.execute(r);
    }

    @Override
    public void destroy() throws Exception {
        // 终止循环
        this.running= false;
        // 等待运行结束
        latch.await();
    }
}
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐