linux安装kafka:https://blog.csdn.net/qq_37936542/article/details/109453249

kafka版本:kafka_2.12-2.6.0.tgz 其中2.12是Scala版本,2.6.0是Kafka版本。

导入kafka依赖

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>2.3.4.RELEASE</version>
</dependency>

配置yml

spring:
  kafka:
    topic: Itopic  # topic名称
    bootstrap-servers: 192.168.1.211:9092   # kafka地址
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 序列化
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default_consumer_group #群组ID
      enable-auto-commit: false  # 取消自动提交
      auto-commit-interval: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      ack-mode: MANUAL_IMMEDIATE # 手动提交

生产者

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/kafka")
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Value("${spring.kafka.topic}")
    private String topic;

    @GetMapping("/send")
    public String test() {
        kafkaTemplate.send(topic, "hello");
        return "success";
    }
}

 

单体消费

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.util.Optional;

@Component
public class KafkaConsumer {

    @KafkaListener(topics = {"${spring.kafka.topic}"}, groupId = "${spring.kafka.consumer.group-id}", errorHandler = "dealError", concurrency = "2")
    public void consumer(ConsumerRecord<?, ?> record, Acknowledgment ack) {
        // 手动确认
        ack.acknowledge();
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        kafkaMessage.ifPresent(u -> {
            System.out.println("start consumer:" + u);
            int i = 1 / 0;
        });
    }

    /**
     * 因为手动确认消费,若消费失败,记录重刷
     */
    @Bean
    public ConsumerAwareListenerErrorHandler dealError() {
        return new ConsumerAwareListenerErrorHandler() {
            @Override
            public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
                System.out.println("consumer error:" + e);
                // TODO 将失败的记录保存到数据库,再用定时任务查询记录,并重刷数据
                return null;
            }
        };
    }
}

OK,很Easy,那么接下来我们来配置批量消费

配置containerFactory

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;

@Component
public class KafkaListenerContainerFactory {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private Boolean autoCommit;

    @Bean("containerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<>();
        container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps()));
        // 设置并发量,小于或等于Topic的分区数
        container.setConcurrency(2);
        // 设置为批量监听
        container.setBatchListener(true);
        // 设置提交偏移量的方式
        container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return container;
    }

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>(8);
        // kafka服务地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        // 设置是否自动提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
        // 一次拉取消息数量
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
        // 序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
}

修改消费者代码

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Optional;

@Component
public class KafkaConsumerOne {

    @KafkaListener(topics = {"${spring.kafka.topic}"}, groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory")
    public void consumer(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
        ack.acknowledge();
        records.forEach(record -> {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            kafkaMessage.ifPresent(u -> {
                System.out.println(Thread.currentThread().getName() + ":" + u);
            });
        });
    }

}

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐