// 消费线程
private Thread consumerThread;
// 线程名称,自己设置下
private String threadName;
private volatile boolean running = false ;
// 业务线程池,请根据自己的业务设置合理参数
private ExecutorService pool = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2 ,
Runtime.getRuntime().availableProcessors() * 2 , 120 , TimeUnit.SECONDS,
blockingDeque, NamedThreadFactory.create(THREAD_POOL_NAME_PREFIX + threadName + "-" ),
new CallerRunsPolicy()
);;
public synchronized void start() {
// 此处省略 consumer 初始化的过程
consumer = null ;
consumer.subscribe(Lists.newArrayList( "topic-aaa" ));
running = true ;
consumerThread = new Thread(() -> {
while (running) {
try {
ConsumerRecords<String, String> records = consumer.poll( 500 );
for (ConsumerRecord<String, String> record : records) {
// 放入自己的业务线程池
pool.submit( new Worker(record, records));
}
} catch (WakeupException exception) {
// 捕获 WakeupException consumer 准备退出
LOG.info( "WakeupException 准备关闭consumer" );
// 跳出循环,关闭consumer
break ;
} catch (Exception exception) {
LOG.error( "error cause by " , exception);
}
}
consumer.close();
// 此处使用的 guava 的线程池关闭方法,也可以根据自己的需求去自定义实现线程池关闭逻辑,阻塞时间请根据业务情况自行设置
boolean isTerminated = MoreExecutors.shutdownAndAwaitTermination(pool, 1 , TimeUnit.SECONDS);
LOG.info( "consumer closed ,poll terminated:{}" , isTerminated);
});
consumerThread.setName(threadName);
consumerThread.start();
}
public synchronized void close() {
if (!running) {
LOG.info( "consumer 不是运行状态,不需要关闭,threadName[{}]" , threadName);
return ;
}
Stopwatch stopwatch = Stopwatch.createStarted();
running = false ;
consumer.wakeup();
try {
// 等待counsumer线程执行完成
consumerThread.join();
} catch (InterruptedException e) {
LOG.error( "consumerThread.join InterruptedException" , e);
}
LOG.info( "consumer线程关闭完成,threadName[{}],Spend time[{}]" , threadName, stopwatch);
}
|
所有评论(0)