Kafka重复消费以及消费线程安全关闭的解决方案
Kafka重复消费以及消费线程安全关闭的解决方案
背景和原因分析
Kafka消费程序每次重启都会出现重复消费的情况,考虑是在kill掉程序的时候,有部分消费完的数据没有提交offsect。
props.setProperty("enable.auto.commit", "true");
此处表明自动提交,即延迟提交(poll的时候会根据配置的自动提交时间间隔去进行检测并提交)。当kill掉程序的时候,可能消费完的数据还没有到达提交的时间点程序就被kill掉了。
重复消费解决方案:
关闭自动提交,采用异步提交+同步提交的方式来提交offsect。
// 关闭自动提交
props.setProperty("enable.auto.commit", "false");
// 消费逻辑
try {
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, byte[]> record : records) {
// 具体业务逻辑
}
consumer.commitAsync();
}
System.out.println("while end.");
} catch (Exception e) {
System.err.println("consume error..." + e.getMessage());
} finally {
try {
consumer.commitSync();
System.out.println("commit sync suc.");
} catch (Exception e) {
System.err.println("commit sync error." + e.getMessage());
} finally {
consumer.close();
System.out.println("close.");
}
}
这样还不够,当kill掉程序的时候,会发现并没有走到finally中。说明线程非正常停止。
线程安全关闭解决方案:
1.使用线程池来运行线程
2.在实例销毁前使用结束标志手动停止线程
3.使用CountDownLatch等待线程停止
第一步:定义线程池
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
int cpuCoreNum = Runtime.getRuntime().availableProcessors();
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(cpuCoreNum);
threadPoolTaskExecutor.setMaxPoolSize(cpuCoreNum * 2);
threadPoolTaskExecutor.setQueueCapacity(2000);
threadPoolTaskExecutor.setKeepAliveSeconds(60);
threadPoolTaskExecutor.setThreadNamePrefix("global_thread_pool_task_executor");
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskExecutor.setAwaitTerminationSeconds(10);// 确保该值是线程池中各个线程阻塞的最大时长
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
此处两个配置参数至关重要
setWaitForTasksToCompleteOnShutdown(true)表示等待正在进行和排队的任务完成。
threadPoolTaskExecutor.setAwaitTerminationSeconds(10)虽然我们已经配置为等待正在进行和排队的任务完成,但Spring仍然会继续关闭容器的其余部分。这可能会释放任务执行器所需的资源,并导致任务失败。配置这个最大等待时间可以确保在指定的时间段内,容器级别的关闭过程将被阻止。
等待时间设置多少具体看线程池中业务线程最大耗时来定。
如果不停止线程,就会超过线程池的等待时间。通过以下WARN日志可以发现,在停止线程池的时候仍然存在业务线程没有停掉的情况,所以还需要定义一个标志来手动停止线程。
WARN 11472 --- [extShutdownHook] o.s.s.concurrent.ThreadPoolTaskExecutor : Timed out while waiting for executor 'threadPoolTaskExecutor' to terminate
第二步:定义结束标志,并在对象销毁前停止线程
// 线程中断标志
public volatile boolean running = true;
while (running) {
...
}
然后再实现DisposableBean接口中的destroy方法,在实例销毁之前将running置为false停止线程
@Override
public void destroy() throws Exception {
this.running = false; // 循环并非立即停止,而是等到当前执行的循环体执行结束才会停止,所以这个地方的等待时间需要与线程池中的setAwaitTerminationSeconds参数相对应
}
当destroy方法运行结束,系统就会销毁掉当前实例,接着就会开始销毁当前实例的依赖(没有被其它实例所引用的话),而此时需要注意的是线程其实并没有运行结束。所以问题出现了:线程还在运行中,而运行所需要的资源(比如Redis连接资源)被提前关闭了,就会导致异常出现。所以在将running置为false之后还需要使用CountDwonLatch等待线程结束,再接着销毁其它依赖。
此处省略第三步,直接上完整的样例代码:
@Component
public class ConsumerClosedSafely implements CommandLineRunner, DisposableBean {
private volatile boolean running = true;
private final CountDownLatch latch = new CountDownLatch(1);
private final String[] topics = new String[]{"test"};
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
public void consume() throws Exception{
Properties props = new Properties();
//TODO 其它属性
props.setProperty("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topics));
// 消费逻辑
try {
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
//TODO 具体业务逻辑
}
consumer.commitAsync();
}
System.out.println("while end.");
} catch (Exception e) {
System.err.println("consume error..." + e.getMessage());
} finally {
try {
consumer.commitSync();
System.out.println("commit sync suc.");
} catch (Exception e) {
System.err.println("commit sync error." + e.getMessage());
} finally {
consumer.close();
System.out.println("close.");
// 计数器减一
latch.countDown();
System.out.println("latch count down .");
}
}
}
@Override
public void run(String... args) throws Exception {
Runnable r = ()->{
try {
consume();
} catch (Exception e) {
System.exit(1);
}
};
threadPoolTaskExecutor.execute(r);
}
@Override
public void destroy() throws Exception {
// 终止循环
this.running= false;
// 等待运行结束
latch.await();
}
}
更多推荐
所有评论(0)