kafka消费数据成功后在将服务器的数据删除(自动提交offset)
自动提交offset第一步 配置文件修改将#–> 设置自动提交offset设置为falseenable-auto-commit: falsespring:profiles:active: localkafka:consumer:# 指定kafka server的地址,集群配多个,中间,逗号隔开bootstrap-servers: 175.24.42.150:9092#如果'enable.au
·
自动提交offset
第一步 配置文件修改
将
#–> 设置自动提交offset 设置为false
enable-auto-commit: false
spring:
profiles:
active: local
kafka:
consumer:
# 指定kafka server的地址,集群配多个,中间,逗号隔开
bootstrap-servers: 175.24.42.150:9092
#如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
auto-commit-interval: 100
# smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
auto-offset-reset: earliest
#--> 设置自动提交offset
enable-auto-commit: false
# 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
group-id: nashConsumerGroup
# 指定消息key和消息体的编解码方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
第二步 写手动方法
@Bean
/**
* 写在接收kafka消息的 同一个类中就行了
* 原先拉去数据是单个json 现在会是多个 list 我设置factory.setConcurrency(1)为一跳,
* 但返回数据还是list。这里的 1条是你提交1次的数据。你一次提交了20条也是一个json里的所有
* 算做一条数据。
*/
public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory consumerFactory){
ConcurrentKafkaListenerContainerFactory<Integer,String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 消费几条数据
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(1500);
//设置为批量消费,每个批次数量在Kafka配置参数中设置
factory.setBatchListener(true);
//设置手动提交ackMode
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
第三步 手动提交
/**
* 消费 HIS 送检、送手术、标本数据
* @param record
* containerFactory="batchFactory" 关联到刚刚那个方法
* ack.acknowledge(); 告知kafka消费成功
*/
@KafkaListener(topics = "his_hospital_1", containerFactory="batchFactory")
public void consumerHisData(List<ConsumerRecord<?, ?>> record, Acknowledgment ack){
try{
logConsumerOffsetService.logOffset(record.get(0));
// 获取接收的数据
orderService.save(record.get(0).value().toString());
// 告知kafka消费成功
ack.acknowledge();
}catch (Exception e){
String logInfo = String.format("\"consumerHisData error,data:%s. /n", record.get(0).value());
sysErrorLogService.saveExceptionLog(e,logInfo);
}
}
更多推荐
已为社区贡献3条内容
所有评论(0)