以下给出一个参考示例,但具体的某些细节的处理还需要根据自己的业务自行抉择

实现思路,通过 consumer.wakeup() 方法让  poll 方法抛出异常,然后捕获异常,退出

《kafka权威指南》 第 64 页也有给出示例,有兴趣的可以找来看看

// 消费线程

private Thread consumerThread;

// 线程名称,自己设置下

private String threadName;

private volatile boolean running = false;

// 业务线程池,请根据自己的业务设置合理参数

private ExecutorService pool = new ThreadPoolExecutor(

    Runtime.getRuntime().availableProcessors() * 2,

    Runtime.getRuntime().availableProcessors() * 2120, TimeUnit.SECONDS,

    blockingDeque, NamedThreadFactory.create(THREAD_POOL_NAME_PREFIX + threadName + "-"),

    new CallerRunsPolicy()

);;

public synchronized void start() {

  // 此处省略 consumer 初始化的过程

  consumer = null;

  consumer.subscribe(Lists.newArrayList("topic-aaa"));

  running = true;

  consumerThread = new Thread(() -> {

    while (running) {

      try {

        ConsumerRecords<String, String> records = consumer.poll(500);

        for (ConsumerRecord<String, String> record : records) {

          // 放入自己的业务线程池

          pool.submit(new Worker(record, records));

        }

      catch (WakeupException exception) {

        // 捕获 WakeupException consumer 准备退出

        LOG.info("WakeupException 准备关闭consumer");

        // 跳出循环,关闭consumer

        break;

      catch (Exception exception) {

        LOG.error("error cause by ", exception);

      }

    }

    consumer.close();

    // 此处使用的 guava 的线程池关闭方法,也可以根据自己的需求去自定义实现线程池关闭逻辑,阻塞时间请根据业务情况自行设置

    boolean isTerminated = MoreExecutors.shutdownAndAwaitTermination(pool, 1, TimeUnit.SECONDS);

    LOG.info("consumer closed ,poll terminated:{}", isTerminated);

  });

  consumerThread.setName(threadName);

  consumerThread.start();

}

public synchronized void close() {

  if (!running) {

    LOG.info("consumer 不是运行状态,不需要关闭,threadName[{}]", threadName);

    return;

  }

  Stopwatch stopwatch = Stopwatch.createStarted();

  running = false;

  consumer.wakeup();

  try {

    // 等待counsumer线程执行完成

    consumerThread.join();

  catch (InterruptedException e) {

    LOG.error("consumerThread.join InterruptedException", e);

  }

  LOG.info("consumer线程关闭完成,threadName[{}],Spend time[{}]", threadName, stopwatch);

}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐