springboot整合kafka实现批量消费
linux安装kafka:https://blog.csdn.net/qq_37936542/article/details/109453249kafka版本:kafka_2.12-2.6.0.tgz其中2.12是Scala版本,2.6.0是Kafka版本。导入kafka依赖<dependency><groupId>org.springframework.kafka<
·
linux安装kafka:https://blog.csdn.net/qq_37936542/article/details/109453249
kafka版本:kafka_2.12-2.6.0.tgz 其中2.12是Scala版本,2.6.0是Kafka版本。
导入kafka依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.4.RELEASE</version>
</dependency>
配置yml
spring:
kafka:
topic: Itopic # topic名称
bootstrap-servers: 192.168.1.211:9092 # kafka地址
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default_consumer_group #群组ID
enable-auto-commit: false # 取消自动提交
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
ack-mode: MANUAL_IMMEDIATE # 手动提交
生产者
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/kafka")
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Value("${spring.kafka.topic}")
private String topic;
@GetMapping("/send")
public String test() {
kafkaTemplate.send(topic, "hello");
return "success";
}
}
单体消费
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.Optional;
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"${spring.kafka.topic}"}, groupId = "${spring.kafka.consumer.group-id}", errorHandler = "dealError", concurrency = "2")
public void consumer(ConsumerRecord<?, ?> record, Acknowledgment ack) {
// 手动确认
ack.acknowledge();
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
kafkaMessage.ifPresent(u -> {
System.out.println("start consumer:" + u);
int i = 1 / 0;
});
}
/**
* 因为手动确认消费,若消费失败,记录重刷
*/
@Bean
public ConsumerAwareListenerErrorHandler dealError() {
return new ConsumerAwareListenerErrorHandler() {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
System.out.println("consumer error:" + e);
// TODO 将失败的记录保存到数据库,再用定时任务查询记录,并重刷数据
return null;
}
};
}
}
OK,很Easy,那么接下来我们来配置批量消费
配置containerFactory
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class KafkaListenerContainerFactory {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.enable-auto-commit}")
private Boolean autoCommit;
@Bean("containerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<>();
container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps()));
// 设置并发量,小于或等于Topic的分区数
container.setConcurrency(2);
// 设置为批量监听
container.setBatchListener(true);
// 设置提交偏移量的方式
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return container;
}
private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>(8);
// kafka服务地址
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 设置是否自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
// 一次拉取消息数量
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
// 序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
修改消费者代码
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;
@Component
public class KafkaConsumerOne {
@KafkaListener(topics = {"${spring.kafka.topic}"}, groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory")
public void consumer(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
ack.acknowledge();
records.forEach(record -> {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
kafkaMessage.ifPresent(u -> {
System.out.println(Thread.currentThread().getName() + ":" + u);
});
});
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)