// 消费线程 private Thread consumerThread; // 线程名称,自己设置下 private String threadName; private volatile boolean running = false ; // 业务线程池,请根据自己的业务设置合理参数 private ExecutorService pool = new ThreadPoolExecutor( Runtime.getRuntime().availableProcessors() * 2 , Runtime.getRuntime().availableProcessors() * 2 , 120 , TimeUnit.SECONDS, blockingDeque, NamedThreadFactory.create(THREAD_POOL_NAME_PREFIX + threadName + "-" ), new CallerRunsPolicy() );; public synchronized void start() { // 此处省略 consumer 初始化的过程 consumer = null ; consumer.subscribe(Lists.newArrayList( "topic-aaa" )); running = true ; consumerThread = new Thread(() -> { while (running) { try { ConsumerRecords<String, String> records = consumer.poll( 500 ); for (ConsumerRecord<String, String> record : records) { // 放入自己的业务线程池 pool.submit( new Worker(record, records)); } } catch (WakeupException exception) { // 捕获 WakeupException consumer 准备退出 LOG.info( "WakeupException 准备关闭consumer" ); // 跳出循环,关闭consumer break ; } catch (Exception exception) { LOG.error( "error cause by " , exception); } } consumer.close(); // 此处使用的 guava 的线程池关闭方法,也可以根据自己的需求去自定义实现线程池关闭逻辑,阻塞时间请根据业务情况自行设置 boolean isTerminated = MoreExecutors.shutdownAndAwaitTermination(pool, 1 , TimeUnit.SECONDS); LOG.info( "consumer closed ,poll terminated:{}" , isTerminated); }); consumerThread.setName(threadName); consumerThread.start(); } public synchronized void close() { if (!running) { LOG.info( "consumer 不是运行状态,不需要关闭,threadName[{}]" , threadName); return ; } Stopwatch stopwatch = Stopwatch.createStarted(); running = false ; consumer.wakeup(); try { // 等待counsumer线程执行完成 consumerThread.join(); } catch (InterruptedException e) { LOG.error( "consumerThread.join InterruptedException" , e); } LOG.info( "consumer线程关闭完成,threadName[{}],Spend time[{}]" , threadName, stopwatch); } |
所有评论(0)