KafkaConsumer多线程优化
优化目的:KafkaConsumer是以单线程模式运行,为了提升consumer的消费能力,多线程是一个很好的选择。KafkaConsumer和KafkaProducer不同,后者是线程安全的,因此我们鼓励用户在多个线程中共享一个KafkaProducer实例,这样通常都要比每个线程维护一个KafkaProducer实例效率要高。但对于KafkaConsumer而言,它不是线程安全的,所以实现多.
优化目的:
KafkaConsumer是以单线程模式运行,为了提升consumer的消费能力,多线程是一个很好的选择。KafkaConsumer和KafkaProducer不同,后者是线程安全的,因此我们鼓励用户在多个线程中共享一个KafkaProducer实例,这样通常都要比每个线程维护一个KafkaProducer实例效率要高。但对于KafkaConsumer而言,它不是线程安全的,所以实现多线程时通常有两种实现方法
方案1 每个线程内部维护自己的consumer对象
缺点:
更多的TCP连接开销(每个线程都要维护若干个TCP连接)
consumer数受限于topic分区数,扩展性差
频繁请求导致吞吐量下降
线程自己处理消费到的消息可能会导致超时,从而造成rebalance
方案2 Consumer内部维护自己的worker线程池
缺点:
实现较复杂
线程池容量规划
限流策略
优雅关闭
作者采用第二种方式进行实现,在实现过程中发了一些需要注意的细节,所以写下本文做记录
Consumer V1.0
private ExecutorService scheduler;
{
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("base-consumer-pool-%d").build();
scheduler = new ThreadPoolExecutor(4, 8, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(MAX_QUEUE_SIZE), threadFactory,
new ThreadPoolExecutor.AbortPolicy());
}
@KafkaListener(topics = "test")
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
scheduler.execute(() -> {
//do something
});
}
}
problem
当下游业务吞吐量,低于consumer的吞吐量时,消息会在线程池中积压,最终导致线程池满,抛出RejectedException
当我们设计线程池时,初衷是为了提升consumer的消费能力,当系统处理能力低于kafkaConsumer消费能力时,我们希望积压的消息保留在kafka中,而不是被worker线程池rejected
解决方案:自旋限流
这里借鉴了自旋锁的思路
自旋锁(spin lock)
是一种非阻塞锁,也就是说,如果某线程需要获取锁,但该锁已经被其他线程占用时,该线程不会被挂起,而是在不断的消耗CPU的时间,不停的试图获取锁,从而减少线程切换开销
自旋限流 (spin limit)
在消费线程中,若线程队列大于限流阈值,则挂起当前线程,休眠后,再次验证队列长度,直到队列长度小于阈值Consumer V1.1
private ExecutorService scheduler;
private BlockingQueue<Runnable> workQueue;
private static final int MAX_QUEUE_SIZE = 1024;
{
workQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);
scheduler = new ThreadPoolExecutor(4, 8, 0L, TimeUnit.MILLISECONDS,
workQueue, threadFactory, new ThreadPoolExecutor.AbortPolicy());
}
@KafkaListener(topics = "test")
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
//自旋阻塞consumer, 让消费不了的消息保存在kafka中, 而不是被worker线程池rejected
while (workQueue.size() >= MAX_QUEUE_SIZE) {
Thread.sleep(100);
}
scheduler.execute(() -> {
//do something
});
}
}
problem
当线程队列到达阈值后,自旋阻塞了consumer的消费过程,线程池没有用上maximumPoolSize属性
解决这个问题需要清楚CorePoolSize、MaximumPoolSize和workQueue的含义
线程池处理策略如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
Consumer V1.2
private ExecutorService scheduler;
private BlockingQueue<Runnable> workQueue;
private static final int MAX_QUEUE_SIZE = 1024;
private static final int MAX_POOL_SIZE = 8;
{
workQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);
scheduler = new ThreadPoolExecutor(4, MAX_POOL_SIZE, 0L, TimeUnit.MILLISECONDS,
workQueue, threadFactory, new ThreadPoolExecutor.AbortPolicy());
}
@KafkaListener(topics = "test")
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
//验证队列长队的同时校验活跃线程数
while (workQueue.size() >= MAX_QUEUE_SIZE &&
MAX_POOL_SIZE - ((ThreadPoolExecutor) scheduler).getActiveCount() <= 0 ){
Thread.sleep(100);
}
scheduler.execute(() -> { //do something });
}
}
线程池优雅关闭
Runtime.getRuntime().addShutdownHook()
implements DisposableBean
进程需要使用kill -15进行关闭
@Override
public void destroy() throws Exception {
this.scheduler.shutdown();
awaitTermination(this.scheduler);
}
private void awaitTermination(ExecutorService executor) {
try{
executor.awaitTermination(30, TimeUnit.SECONDS)
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
更多推荐
所有评论(0)