kafka消费者监控,程序启动时运行
1.1 web.xml文件添加listenner,如下:<listener><listener-class>com.sinosoft.lis.listener.ConsumerThreadListener</listener-class></li
·
1.1 web.xml文件添加listenner,如下:
<listener>
<listener-class>com.sinosoft.lis.listener.ConsumerThreadListener</listener-class>
</listener>
1.2 编写监控类,实现ServletContextListener 接口
package com.sinosoft.lis.listener;
import com.sinosoft.lis.services.addquasicust.ConsumerThread;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
public class ConsumerThreadListener implements ServletContextListener {
@Override
public void contextInitialized(ServletContextEvent sce) {
ConsumerThread consumerThread = new ConsumerThread();
consumerThread.start();
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
}
}
1.3服务启动,自动执行contextInitialized()的代码,即kafka相关内容,ConsumerThread 内容如下:
package com.sinosoft.lis.services.addquasicust;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ConsumerThread {
private final KafkaConsumer<String, String> consumer;
private final String topic;
private ExecutorService executor;
public ConsumerThread() {
//获取kafka服务配置
Properties properties = buildKafkaProperty();
this.consumer = new KafkaConsumer<>(properties);
//tocpic名称
this.topic = "topic名称";
this.consumer.subscribe(Arrays.asList(this.topic));
}
public void start() {
try {
int threadCoreNumber = 3;
int threadMaxNumber = 5;
//启用线程池
executor = new ThreadPoolExecutor(threadCoreNumber, threadMaxNumber, 1L, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(500), new ThreadPoolExecutor.CallerRunsPolicy());
//kafka监控,另起一个线程,否则会由于while (true)导致阻断,服务起不来
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
//相隔一定时间从kafka中读取消息
ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
for (ConsumerRecord<String, String> item : consumerRecords) {
//调用业务处理类
executor.submit(new ConsumerThreadHandler(item));
}
}
}
});
thread.start();
} catch (Exception e) {
executor.shutdown();
}
}
//kafka服务配置
private static Properties buildKafkaProperty() {
Properties props = new Properties();
props.put("bootstrap.servers", "storm-144:9092,storm-145:9092,storm-146:9092");
props.put("group.id", "storm");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
}
1.4业务逻辑处理类需实现Runnable 接口
package com.sinosoft.lis.services.addquasicust;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.lang.reflect.Method;
public class ConsumerThreadHandler implements Runnable {
private ConsumerRecord consumerRecord;
public ConsumerThreadHandler(ConsumerRecord consumerRecord) {
this.consumerRecord = consumerRecord;
}
@Override
public void run() {
try {
System.out.println(consumerRecord.value());
} catch (Exception e) {
e.printStackTrace();
}
}
1.5生产者相关代码,用于测试
package com.sinosoft.lis.services.addquasicust;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerTest {
public void newTest() {
Properties props = new Properties();
props.put("bootstrap.servers", "storm-144:9092,storm-145:9092,storm-146:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i <= 10; i++) {
String r = “测试”+i;
producer.send(new ProducerRecord<String, String>("topic名称", r));
}
producer.close();
}
public static void main(String[] args) {
ProducerTest producerTest = new ProducerTest();
producerTest.newTest();
}
}
另:相关jar包
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
注意该jar包要与kafka服务器版本保持一致,否则有可能会发生错误
更多推荐
已为社区贡献1条内容
所有评论(0)