基于Springboot实现Kafka消费数据
基于Springboot实现Kafka消费数据
·
本文介绍使用Kafka监听和订阅两种不同方式进行数据消费
1、配置文件
spring:
kafka:
bootstrap-servers: 192.168.1.16:9092
#消费者
consumer:
group-id: alarmService
max-poll-records: 10 # 一次 poll 最多返回的记录数
enable-auto-commit: false
auto-commit-interval: 1000ms
properties:
max.poll.interval.ms: 360000
session.timeout.ms: 150000
#以下为kafka用户名密码的配置,不开启sasl时将以下配置删除
# SASL鉴权方式
sasl.mechanism: PLAIN
# 加密协议
security.protocol: SASL_PLAINTEXT
# 设置jaas帐号和密码
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
# listener:
# type: batch
# concurrency: 6
#生产者
producer:
retries: 0 #若设置大于0的值,客户端会将发送失败的记录重新发送
batch-size: 16384 #当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
buffer-memory: 33554432 #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
key-serializer: org.apache.kafka.common.serialization.StringSerializer #关键字的序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类
properties:
session.timeout.ms: 15000
sasl.mechanism: PLAIN
security.protocol: SASL_PLAINTEXT
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="username" password="password";
2、订阅模式
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private String autoCommit;
@Value("${spring.kafka.consumer.auto-commit-interval}")
private String interval;
@Value("${spring.kafka.consumer.key-deserializer}")
private String key;
@Value("${spring.kafka.consumer.value-deserializer}")
private String value;
@Value("${spring.kafka.consumer.properties.security.protocol}")
private String securityProtocol;
@Value("${spring.kafka.consumer.properties.sasl.mechanism}")
private String SASLMechanism;
@Value("${spring.kafka.consumer.properties.sasl.jaas.config}")
private String SASLJaasConfig;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String offsetReset;
@Value("${spring.kafka.consumer.max-poll-records}")
private Integer records;
@Value("${spring.kafka.consumer.properties.session.timeout.ms}")
private Integer timeout;
@Value("${spring.kafka.consumer.properties.max.poll.interval.ms}")
private Integer pollInterval;
/**
* 消费者
* @param topic 主题
* @param groupId group.id
*/
public void kafkaConsumer(String topic, String groupId) {
Properties props = new Properties();
//Kafka集群
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
//消费者组,只要group.id相同,就属于同一个消费者组
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, key);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, value);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offsetReset);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, timeout);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, pollInterval);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,60000);
//用户密码认证参数
props.put("security.protocol", securityProtocol);
props.put("sasl.mechanism", SASLMechanism);
props.put("sasl.jaas.config", SASLJaasConfig);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消费者订阅主题
consumer.subscribe(Arrays.asList(topic));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(5000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
JSONObject json = JSON.parseObject(message.toString());
//处理逻辑
//同步提交,当前线程会阻塞直到offset提交成功
consumer.commitSync();
}
}
}
} finally {
consumer.close();
}
}
3、监听模式
@KafkaListener(topicPattern="#{'${spring.kafka.consumer.topics}'}",groupId = "#{'${spring.kafka.consumer.group-id}'}")
public void kafkaConsumer(ConsumerRecord<?,?> record) {
System.out.println("--------------kafka----------------");
//获取小区id
List<String> communityIds = communityBaseinfoMapper.getCommunityBaseinfoCommunityId();
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
JSONObject json = JSON.parseObject(message.toString());
ApplyAccess papplyAccess = json.toJavaObject(ApplyAccess.class);
String communityId = papplyAccess.getCommunityId();
if (communityIds.contains(communityId)){
//数据存储
String idCard = papplyAccess.getIdCard().replace("*", "");
peopleBaseinfoService.savePapplyAccess(papplyAccess,idCard);
}
}
}
4、kafka配置从头消费历史数据
消费者要从头开始消费某个topic的全量数据,需要满足2个条件(spring-kafka):
(1)使用一个全新的"group.id"(就是之前没有被任何消费者使用过);
(2)指定"auto.offset.reset"参数的值为earliest;
5、auto.offset.reset设置说明
auto.offset.reset具体含义:(注意版本不同,配置参数会有所不一致,具体参考官网)
- earliest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 - latest
当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 - none
topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
更多推荐
已为社区贡献1条内容
所有评论(0)