SpringBoot是一个在JavaEE开发中非常常用的组件,可用于kafka的生产者,也可以用于SpringBoot的消费者
在这里插入图片描述
1. 导入jar包

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2. 配置kafka

spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.0.120:9092 # 连接kafka
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer # key序列化
      value-serializer: org.apache.kafka.common.serialization.StringSerializer # value序列化
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # key反序列化
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer # value反序列化
      enable-auto-commit: true # 自动提交
#      group-id: testTopicGroup # 指定消费者组,也可以在方法注解中加组信息

3. 生产者代码

@Component
public class KafkaProducer {
    // 引入kafka模板
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Value("${test.topic}")
    private String topic;
    @Value("${test.key}")
    private String key;

    private ExecutorService pool = Executors.newFixedThreadPool(10);

    @Scheduled(cron = "0/2 * * * * ?") // 通过定时任务循环发送
    public void send() {
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            CompletableFuture.runAsync(() -> {
                kafkaTemplate.send(topic, key, finalI + "=>" + date());
            }, pool);
        }
    }

    private String date() {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS");
        return format.format(new Date());
    }
}

4. 消费者代码

@Configuration
public class KafkaConsumer {

    @KafkaListener(topics = "${test.topic}", groupId = "testTopicGroup")
    public void listener(ConsumerRecord<String, String> record) {
        int partition = record.partition();
        String key = record.key();
        String topic = record.topic();
        String value = record.value();
        System.out.println(date() + ":   分区:" + partition + "  |  主体:" + topic + "  |  键:" + key + "  |  值:" + value);
    }

    private String date() {
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return format.format(new Date());
    }
}

源码地址:https://gitee.com/peachtec/hxz-study

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐