spring 集成kafka消费者
本文主要是说明spring集成kafka的消费者功能和过程中遇到的各种小问题。环境:spring-kafka-2.1.11.RELEASE;spring-boot-2.0.8.RELEASEmain函数需要加上@EnableKafka,spring boot中几乎所有支持的模块都会有一个对应的Enable,例如schedule,spring cloud系列等,所以以后使用其他spring ...
本文主要是说明spring集成kafka的消费者功能和过程中遇到的各种小问题。
环境:spring-kafka-2.1.11.RELEASE;spring-boot-2.0.8.RELEASE
main函数需要加上@EnableKafka,spring boot中几乎所有支持的模块都会有一个对应的Enable,例如schedule,spring cloud系列等,所以以后使用其他spring boot的模块都是类似的。
@SpringBootApplication
@EnableScheduling
@EnableKafka
public class Application {
public static void main(String[] args) {
try {
SpringApplication.run(Application.class, args);
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer最简单的实现方式是直接使用注解标记consumer处理方法,这样只能每次消费一条数据,效率上有点低了。
@KafkaListener(id = "test-a", topics = {"topic1", "topic2"})
public void consumer(ConsumerRecord<?, ?> consumerRecord) {
try {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (kafkaMessage.isPresent()) {
Object message = consumerRecord.value();
}
} catch (Exception e) {
e.printStackTrace();
}
}
spring-kafka提供了多种接口方式:
public void consumer(ConsumerRecord<?, ?> record) {} //1
public void consumer(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment) {} //2
public void consumer(ConsumerRecord<?, ?> record, Consumer<?, ?> consumer) {} //3
public void consumer(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {} //4
public void consumer(List<ConsumerRecord<?, ?>> records) {} //5
public void consumer(List<ConsumerRecord<?, ?>> records, Acknowledgment acknowledgment) {} //6
public void consumer(List<ConsumerRecord<?, ?>> records, Consumer<?, ?> consumer) {} //7
public void consumer(List<ConsumerRecord<?, ?>> records, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {} //8
这8个接口其实是成对的,第一个参数前四个是每次消费一个,后四个是每次批量消费,批量消费数据数量有kafka配置max.poll.records决定的。
需要注意的是如果使用批量消费需要设置factory的setBatchListener为true就可以,下面这个类只需要修改一下配置就可以直接使用。但是如果没有设置setBatchListener为true就用5号接口,接收到的数据每次仍然是一条,但是被分割之后放到list中了,例如:{"a":"1","b":"2","c":"2"},参数就可能分成{"a":"1" "b":"2" "c":"2"},三个元素,但是组合起来还是一个完整的消息。
@Configuration
@EnableKafka
@Slf4j
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
String servers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
boolean autoCommit;
@Value("${spring.kafka.consumer.group-id}")
String groupId;
@Value("${spring.kafka.consumer.key-deserializer}")
String keyDeserializer;
@Value("${spring.kafka.consumer.value-deserializer}")
String valueDeserializer;
public KafkaConsumerConfig(){
log.info("kafka消费者配置加载...");
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());//consumer配置
factory.setConcurrency(5);//同步线程数
factory.setBatchListener(true);//开启批量消费
factory.getContainerProperties().setPollTimeout(30000);//监听超时时间
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory(consumerProperties());
}
@Bean
public Map<String, Object> consumerProperties() {
Map<String, Object> props= new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
return props;
}
}
在使用spring-kafka过程中也遇到一些奇怪问题
1、spring-boot自动shutdown问题,一般会有这个日志:
Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@2145b572: startup date [Mon Mar 18 10:13:39 CST 2019]; root of context hierarchy
Unregistering JMX-exposed beans on shutdown
这是因为我忘记加nohup了,所以当客户端关闭之后就shutdown了,但是通过搜索也有很多其他的原因,可能是jar包冲突导致运行时shutdown了,需要把spring-boot自带的Tomcat删掉就可以。
2、在使用过程中一直监控程序使用内存的情况,从一开始的400M+,运行2天后大约稳定在6.3g,一开始以为spring-kafka有内存泄露呢,但是通过查看jvm里面对象的使用情况也没有发现自己写的代码中有什么异常,初步怀疑是spring-kafka自己内部在运行时做了很多数据缓存导致的,因为最多的内存使用是byte[]数组和char[]数组,还需要继续探索。
这个命令会导致程序挂起,暂停运行!!!!
jmap -histo -F pid > log
1: 865742 119811952 char[]
2: 655987 71963880 byte[]
3: 1178861 43733632 java.lang.Object[]
4: 418444 33327424 java.util.HashMap$Node[]
5: 674303 32366544 java.util.HashMap
6: 742833 23770656 java.util.HashMap$Node
7: 699492 16787808 java.util.ArrayList
8: 648159 15555816 java.lang.Long
9: 294649 14143152 java.nio.HeapByteBuffer
10: 245321 13737976 java.util.LinkedHashMap
11: 289588 11583520 java.util.HashMap$KeyIterator
12: 432808 10387392 org.apache.kafka.common.internals.PartitionStates$PartitionState
13: 423257 10158168 org.apache.kafka.common.protocol.types.Struct
更多推荐
所有评论(0)