一、引入相关的Jar包(maven)

项目构建工具使用的是maven:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>springboot-kafka</artifactId>

    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

二、搭建步骤

1.添加配置文件

application.properties的配置代码如下:

spring.application.name=springboot-kafka-02
server.port=8080
# 用于建立初始连接的broker地址
spring.kafka.bootstrap-servers=192.168.42.21:9092
# producer用到的key和value的序列化类
spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer
# 默认的批处理记录数
spring.kafka.producer.batch-size=16384
# 32MB的总发送缓存
spring.kafka.producer.buffer-memory=33554432
# consumer用到的key和value的反序列化类
spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer
# consumer的消费组id
spring.kafka.consumer.group-id=spring-kafka-consumer-02
# 是否自动动提交消费者偏移量
spring.kafka.consumer.enable-auto-commit=true
# 每隔100ms向broker提交一次偏移量
spring.kafka.consumer.auto-commit-interval=100
# 如果该消费者的偏移量不存在,则自动设置为最早的偏移量
spring.kafka.consumer.auto-offset-reset=earliest

2.添加相关的项目包结构

3.添加SpringBoot启动主类

package com.kafka.learn;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class KafkaDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);
    }
}

4.添加Kakfa配置信息类

package com.kafka.learn.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class KafkaConfig {

    /**
     * 第一个是参数是topic名字,第二个参数是分区个数,第三个是topic的复制因子个数
     * 当broker个数为1个时会创建topic失败,
     * 只有在集群中才能使用kafka的备份功能
     * @return
     */
    @Bean
    public NewTopic topic1() {
        return new NewTopic("topic1", 5, (short) 1);
    }

    @Bean
    public NewTopic topic2() {
        return new NewTopic("topic2", 3, (short) 1);
    }
}

5.添加Kakfa消息生产者

在Controller层添加相关的消息生产接口,主要有同步发送、异步发送

同步发送Kafka消息生成者的业务接口设计:

package com.kafka.learn.controller;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.*;

import java.util.concurrent.ExecutionException;


@RestController
@RequestMapping("/kafka/sync")
public class KafkaSyncProducerController {

    @Autowired
    private KafkaTemplate template;

    @RequestMapping(value = "/send/{message}", method = RequestMethod.GET)
    public String sendSync(@PathVariable("message") String message) {
        ListenableFuture future = template.send(new ProducerRecord<Integer, String>(
                "topic-spring-02",
                0,
                1,
                message
        ));

        try {
            // 同步等待broker的响应
            Object o = future.get();
            SendResult<Integer, String> result = (SendResult<Integer, String>) o;

            System.out.println(result.getRecordMetadata().topic()
                    + result.getRecordMetadata().partition()
                    + result.getRecordMetadata().offset());

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        return "success";
    }

    @RequestMapping(value = "/{topic}/send", method = RequestMethod.GET)
    public void sendMessageToTopic(@PathVariable("topic") String topic,
                                   @RequestParam(value = "partition", defaultValue = "0") int partition) {
        System.out.println("开发发送消息给kafka:" + topic);
        template.send(topic, partition, partition, "你好,kafka");
    }
}

异步发送Kafka消息生成者的业务接口设计:

package com.kafka.learn.controller;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
@RequestMapping("/kafka/async")
public class KafkaAsyncProducerController {

    @Autowired
    private KafkaTemplate<Integer, String> template;

    @RequestMapping("/send/{message}")
    public String asyncSend(@PathVariable("message") String message) {

        ProducerRecord<Integer, String> record = new ProducerRecord<>(
                "topic-spring-02",
                0,
                3,
                message
        );

        ListenableFuture<SendResult<Integer, String>> future = template.send(record);

        // 添加回调,异步等待响应
        future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("发送失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                System.out.println("发送成功:" +
                        result.getRecordMetadata().topic() + "\t"
                        + result.getRecordMetadata().partition() + "\t"
                        + result.getRecordMetadata().offset());
            }
        });
        return "success";
    }
}

6.添加Kakfa消息消费者

package com.kafka.learn.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

import java.util.Optional;


@Component
public class MyConsumer {

    @KafkaListener(topics = "topic-spring-02")
    public void onMessage(ConsumerRecord<Integer, String> record) {
        Optional<ConsumerRecord<Integer, String>> optional =
                Optional.ofNullable(record);
        if (optional.isPresent()) {
            System.out.println(
                    record.topic() + "\t"
                            + record.partition() + "\t"
                            + record.offset() + "\t"
                            + record.key() + "\t"
                            + record.value());
        }
    }


    /**
     * id : 消费者监听容器
     * topicPartitions : 配置topic和分区,监听2个topic,分别为:topic1、topic2,topic1只接手分区0,3的消息,topic2接收分区0和分区1的消息,但是分区1的消费者初始位置为5
     *
     * @param record
     */
    @KafkaListener(id = "listen01",
    topicPartitions = {
            @TopicPartition(topic = "topic1", partitions = { "0", "3" }),
            @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "4"))
    })
    public void listen(ConsumerRecord<?, ?> record) {
        System.out.println("topic" + record.topic());
        System.out.println("key:" + record.key());
        System.out.println("value:"+record.value());
    }

}

三、测试结果

测试kafka消息的同步发送:接口 --- http://localhost:8080/kafka/sync/send/你好,kafka

 后端消费结果:

2021-11-12 10:54:20.143  INFO 20160 --- [nio-8080-exec-2] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2021-11-12 10:54:20.144  INFO 20160 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2021-11-12 10:54:20.150  INFO 20160 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet        : Completed initialization in 6 ms
2021-11-12 10:54:20.186  INFO 20160 --- [nio-8080-exec-2] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
	acks = 1
	batch.size = 16384
	bootstrap.servers = [192.168.42.21:9092]
	buffer.memory = 33554432
	client.dns.lookup = default
	client.id = 
	compression.type = none
	connections.max.idle.ms = 540000
	delivery.timeout.ms = 120000
	enable.idempotence = false
	interceptor.classes = []
	key.serializer = class org.apache.kafka.common.serialization.IntegerSerializer
	linger.ms = 0
	max.block.ms = 60000
	max.in.flight.requests.per.connection = 5
	max.request.size = 1048576
	metadata.max.age.ms = 300000
	metric.reporters = []
	metrics.num.samples = 2
	metrics.recording.level = INFO
	metrics.sample.window.ms = 30000
	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
	receive.buffer.bytes = 32768
	reconnect.backoff.max.ms = 1000
	reconnect.backoff.ms = 50
	request.timeout.ms = 30000
	retries = 2147483647
	retry.backoff.ms = 100
	sasl.client.callback.handler.class = null
	sasl.jaas.config = null
	sasl.kerberos.kinit.cmd = /usr/bin/kinit
	sasl.kerberos.min.time.before.relogin = 60000
	sasl.kerberos.service.name = null
	sasl.kerberos.ticket.renew.jitter = 0.05
	sasl.kerberos.ticket.renew.window.factor = 0.8
	sasl.login.callback.handler.class = null
	sasl.login.class = null
	sasl.login.refresh.buffer.seconds = 300
	sasl.login.refresh.min.period.seconds = 60
	sasl.login.refresh.window.factor = 0.8
	sasl.login.refresh.window.jitter = 0.05
	sasl.mechanism = GSSAPI
	security.protocol = PLAINTEXT
	send.buffer.bytes = 131072
	ssl.cipher.suites = null
	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
	ssl.endpoint.identification.algorithm = https
	ssl.key.password = null
	ssl.keymanager.algorithm = SunX509
	ssl.keystore.location = null
	ssl.keystore.password = null
	ssl.keystore.type = JKS
	ssl.protocol = TLS
	ssl.provider = null
	ssl.secure.random.implementation = null
	ssl.trustmanager.algorithm = PKIX
	ssl.truststore.location = null
	ssl.truststore.password = null
	ssl.truststore.type = JKS
	transaction.timeout.ms = 60000
	transactional.id = null
	value.serializer = class org.apache.kafka.common.serialization.StringSerializer

2021-11-12 10:54:20.203  INFO 20160 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2021-11-12 10:54:20.203  INFO 20160 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2021-11-12 10:54:20.203  INFO 20160 --- [nio-8080-exec-2] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1636685660203
2021-11-12 10:54:20.213  INFO 20160 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: BNUXbtH2RYmLyhUrarFpBw
topic-spring-02	0	1	1	你好,kafka
topic-spring-0201

测试kafka消息的异步发送:接口 --- http://localhost:8080/kafka/async/send/你好async,kafka

 后端消费结果:

开发发送消息给kafka:topic1
topictopic1
key:3
value:你好,kafka

测试发送到消费者消费外的分区:http://localhost:8080/kafka/sync/topic1/send?partition=4

后端消费结果(无消费日志):

开发发送消息给kafka:topic1

测试kafka消息的指定分区消费的topic1的同步发送:

        接口 --- http://localhost:8080/kafka/sync/topic2/send?partition=0

后端消费结果:

开发发送消息给kafka:topic2
topictopic2
key:0
value:你好,kafka

总结

  SpringBoot集成Kafka很简单,相关的配置信息很多都已经被Kafka相关的项目组设置好,我们只要配置一些必须的参数,即可完成对Kafka的集成。当然要是需要对Kafka的发送和消费做业务上的限制,就需要我们去做二次封装了,比如:使用自定义的序列化方式,Kafka的生成者使用自定义的封装对象--- 限制发送的内容等等。

  Kafka简单集成使用的是单机场景下的配置,若考虑集群情况下,还需要考虑一下kafka相关的配置参数,需要按自己的业务需求去调整。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐