导读:通过本文可以学习到SpringBoot整合Kafka、逐条消费Kafka消息、批量收取Kafka消息以及SpringBoot整合EmbeddedKafka进行无需启动外部kafka-server的单元测试。

 

开发环境

springboot:  2.2.6.RELEASE

kafka-server:kafka_2.12-2.3.0

spring-kafka:2.3.7.RELEASE

jdk: 1.8

 

一、单条消费。

1、新建工程

 

2、修改pom.xml,增加kafka依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

2、个人习惯把application.properties修改application.yml文件,增加以下配置

server:
  port: 8080
spring:
  application:
    name: kafka-batch-demo
  kafka:
    # kafka服务器地址(可以多个)
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      # 指定一个默认的组名
      group-id: kafka-batch-demo-group
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 缓存容量
      buffer-memory: 524288
    listener:
      missing-topics-fatal: false
    template:
      default-topic: kafka-batch-demo-topic

3、编写测试代码

(1)Const.java

package com.mmc.kafka.demo.kafka;

/**
 * @author 
 * @date 2020/5/7 10:54 上午
 */
public interface Const {

    String KAFKA_BATCH_DEMO_TOPIC = "kafka-batch-demo-topic";
}

(2)KafkaSender.java

/**
 * @author 
 * @date 2019/11/4 3:04 下午
 */
@Slf4j
@Component
public class KafkaSender {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 发送消息到kafka
     */
    public void sendMessage(String topic, String message) {
        log.info("sendMessage to kafka ,topic =[{}],message=[{}]", topic, message);
        kafkaTemplate.send(topic, message);
    }

    /**
     * 发送消息到kafka
     */
    public void sendMessage(String topic, String partionKey, String message) {
        log.info("sendMessage to kafka ,topic =[{}],partionKey=[{}],message=[{}]", topic, partionKey, message);
        kafkaTemplate.send(topic, partionKey, message);
    }

}

(3)KafkaReceiver.java

/**
 * @author 
 * @date 2019/11/4 3:08 下午
 */
@Slf4j
@Component
public class KafkaReceiver {


    @KafkaListener(id = "kafka-single-demo", topics = Const.KAFKA_BATCH_DEMO_TOPIC)
    public void receiveMesage(ConsumerRecord<String, String> record) {

        if (null == record || StringUtils.isEmpty(record.value())) {

            log.warn("KafkaReceiver record is null or record.value is empty.");
            return;
        }

        String reqJson = record.value();
        log.info("KafkaReceiver {}", reqJson);


    }
}

(4)单元测试类

/**
 * @author Joey
 * @date 2020/5/7 10:41 上午
 */
@RunWith(SpringRunner.class)
@SpringBootTest
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@Slf4j
public class KafkaSenderTest implements Const {

    @Resource
    private KafkaSender kafkaSender;

    @Test
    public void sendMessage() throws IOException {

        String json = "hello";

        for (int i = 0; i < 10; i++) {

            kafkaSender.sendMessage(KAFKA_BATCH_DEMO_TOPIC, json);

        }

        System.in.read();
    }
}

4、运行单元测试KafkaSenderTest.java


2020-05-07 11:10:13.266 [kafka-single-demo-0-C-1] INFO  o.s.kafka.listener.KafkaMessageListenerContainer - kafka-single-demo: partitions assigned: [kafka-batch-demo-topic-0]
2020-05-07 11:10:13.303 [kafka-single-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaReceiver - KafkaReceiver hello
2020-05-07 11:10:13.304 [kafka-single-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaReceiver - KafkaReceiver hello
2020-05-07 11:10:13.304 [kafka-single-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaReceiver - KafkaReceiver hello
2020-05-07 11:10:13.304 [kafka-single-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaReceiver - KafkaReceiver hello
2020-05-07 11:10:13.304 [kafka-single-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaReceiver - KafkaReceiver hello
2020-05-07 11:10:13.304 [kafka-single-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaReceiver - KafkaReceiver hello
2020-05-07 11:10:13.304 [kafka-single-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaReceiver - KafkaReceiver hello
2020-05-07 11:10:13.304 [kafka-single-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaReceiver - KafkaReceiver hello
2020-05-07 11:10:13.304 [kafka-single-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaReceiver - KafkaReceiver hello
2020-05-07 11:10:13.304 [kafka-single-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaReceiver - KafkaReceiver hello

5、完整项目结构如下

二、批量消费。

1、修改application.yml,增加 spring.kafka.consumer.max-poll-records , spring.kafka.producer.batch-size , spring.kafka.listener.type

server:
  port: 8080
spring:
  application:
    name: kafka-batch-demo
  kafka:
    # kafka服务器地址(可以多个)
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      # 指定一个默认的组名
      group-id: kafka-batch-demo-group
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 50
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 缓存容量
      buffer-memory: 524288
      # 批量抓取
      batch-size: 65536
    listener:
      missing-topics-fatal: false
      type: batch
    template:
      default-topic: kafka-batch-demo-topic




2、编写测试代码 KafkaBatchReceiver.java

/**
 * @author 
 * @date 2020/5/7 11:39 上午
 */
@Slf4j
@Component
public class KafkaBatchReceiver {

    @KafkaListener(id = "kafka-batch-demo", topics = Const.KAFKA_BATCH_DEMO_TOPIC)
    public void receive(List<ConsumerRecord<String, String>> records) {
        for (ConsumerRecord<String, String> record : records) {
            receiveMesage(record);
        }
    }
    private void receiveMesage(ConsumerRecord<String, String> record) {

        if (null == record || StringUtils.isEmpty(record.value())) {

            log.warn("BatchKafkaReceiver record is null or record.value is empty.");
            return;
        }

        String reqJson = record.value();
        log.info("BatchKafkaReceiver {}", reqJson);


    }
}

3、运行单元测试类KafkaSenderTest.java

2020-05-07 11:48:37.986 [kafka-batch-demo-0-C-1] INFO  o.s.kafka.listener.KafkaMessageListenerContainer - kafka-batch-demo: partitions assigned: [kafka-batch-demo-topic-0]
2020-05-07 11:48:38.013 [kafka-batch-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaBatchReceiver - BatchKafkaReceiver hello
2020-05-07 11:48:38.013 [kafka-batch-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaBatchReceiver - BatchKafkaReceiver hello
2020-05-07 11:48:38.013 [kafka-batch-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaBatchReceiver - BatchKafkaReceiver hello
2020-05-07 11:48:38.013 [kafka-batch-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaBatchReceiver - BatchKafkaReceiver hello
2020-05-07 11:48:38.013 [kafka-batch-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaBatchReceiver - BatchKafkaReceiver hello
2020-05-07 11:48:38.013 [kafka-batch-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaBatchReceiver - BatchKafkaReceiver hello
2020-05-07 11:48:38.013 [kafka-batch-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaBatchReceiver - BatchKafkaReceiver hello
2020-05-07 11:48:38.013 [kafka-batch-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaBatchReceiver - BatchKafkaReceiver hello
2020-05-07 11:48:38.013 [kafka-batch-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaBatchReceiver - BatchKafkaReceiver hello
2020-05-07 11:48:38.013 [kafka-batch-demo-0-C-1] INFO  com.mmc.kafka.demo.kafka.KafkaBatchReceiver - BatchKafkaReceiver hello

4、完整项目结构

下篇文章将介绍,无需启动外部kafka-server,测试@KafkaListener。

 

三、关于内推

你是否苦恼于自己技术不差但总是得不到大厂面试机会,社招流程慢进度不透明,面试发挥不好等等问题,关注一下公众号即可获得各种大厂内推资格,各种优质岗位等着你,欢迎投递。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐