1.1 web.xml文件添加listenner,如下:

<listener>
    <listener-class>com.sinosoft.lis.listener.ConsumerThreadListener</listener-class>
</listener>

1.2 编写监控类,实现ServletContextListener 接口

package com.sinosoft.lis.listener;

import com.sinosoft.lis.services.addquasicust.ConsumerThread;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

public class ConsumerThreadListener implements ServletContextListener {
    @Override
    public void contextInitialized(ServletContextEvent sce) {
        ConsumerThread consumerThread = new ConsumerThread();
        consumerThread.start();
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {

    }
}

1.3服务启动,自动执行contextInitialized()的代码,即kafka相关内容,ConsumerThread 内容如下:

package com.sinosoft.lis.services.addquasicust;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConsumerThread {
    private final KafkaConsumer<String, String> consumer;
    private final String topic;
    private ExecutorService executor;

    public ConsumerThread() {
        //获取kafka服务配置
        Properties properties = buildKafkaProperty();
        this.consumer = new KafkaConsumer<>(properties);
        //tocpic名称
        this.topic = "topic名称";
        this.consumer.subscribe(Arrays.asList(this.topic));
    }

    public void start() {
        try {
            int threadCoreNumber = 3;
            int threadMaxNumber = 5;
            //启用线程池
            executor = new ThreadPoolExecutor(threadCoreNumber, threadMaxNumber, 1L, TimeUnit.MINUTES,
                    new ArrayBlockingQueue<Runnable>(500), new ThreadPoolExecutor.CallerRunsPolicy());
             //kafka监控,另起一个线程,否则会由于while (true)导致阻断,服务起不来
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        //相隔一定时间从kafka中读取消息
                        ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
                        for (ConsumerRecord<String, String> item : consumerRecords) {
                           //调用业务处理类
                            executor.submit(new ConsumerThreadHandler(item));
                        }
                    }
                }
            });
            thread.start();
        } catch (Exception e) {
            executor.shutdown();
        }
    }
	//kafka服务配置
    private static Properties buildKafkaProperty() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "storm-144:9092,storm-145:9092,storm-146:9092");
        props.put("group.id", "storm");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
}

1.4业务逻辑处理类需实现Runnable 接口

package com.sinosoft.lis.services.addquasicust;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.lang.reflect.Method;

public class ConsumerThreadHandler implements Runnable {

    private ConsumerRecord consumerRecord;

    public ConsumerThreadHandler(ConsumerRecord consumerRecord) {
        this.consumerRecord = consumerRecord;
    }

    @Override
    public void run() {
        try {
         System.out.println(consumerRecord.value());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

1.5生产者相关代码,用于测试

package com.sinosoft.lis.services.addquasicust;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
public class ProducerTest {
    public void newTest() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "storm-144:9092,storm-145:9092,storm-146:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i <= 10; i++) {
            String r = “测试”+i;
            producer.send(new ProducerRecord<String, String>("topic名称", r));
        }
        producer.close();
    }

    public static void main(String[] args) {
        ProducerTest producerTest = new ProducerTest();
        producerTest.newTest();

    }
}

另:相关jar包

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.1</version>
</dependency>

注意该jar包要与kafka服务器版本保持一致,否则有可能会发生错误

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐