背景:

   对kafka消息进行监听,生产者发了消息,但是消费端没有接到消息,监听代码

消费端,kafka配置

spring.kafka.bootstrap-servers=kafka.cestc.dmp:9591

spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="Kafka#Cestc2021";

spring.kafka.properties.security.protocol=SASL_PLAINTEXT

spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256

#=============== provider =======================

spring.kafka.producer.retries=0

# 每次批量发送消息的数量

spring.kafka.producer.batch-size=16384

spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息体的编解码方式

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer =======================

# 指定默认消费者group id

spring.kafka.consumer.group-id=dq

spring.kafka.consumer.auto-offset-reset=latest

spring.kafka.consumer.enable-auto-commit=true

spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

@KafkaListener(groupId = "${spring.kafka.consumer.group-id:dq}",topics = {"t_dq_rwzt_topic"})
public ReturnT<String> listenKafka2(String records, Acknowledgment ack) {

}

offset explorer发现生产者发送了消息,offset是0

问题解决:

后来查看生产者kafka配置,发现他们的enable-auto-commit是false:

spring.kafka.consumer.enable-auto-commit=false

修改kafka配置

spring.kafka.consumer.enable-auto-commit=false

# 在侦听器容器中运行的线程数

spring.kafka.listener.concurrency=5

# listner负责ack,每调用commit方法,立即向服务器提交

spring.kafka.listener.ack-mode=manual_immediate

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐