spring-boot 2.3.x 整合kafka


本地项目的基础环境

环境版本
jdk1.8.0_201
maven3.6.0
Spring-boot2.3.3.RELEASE

1、查看springboot的官网,对kafka的定义

《spring官网 kafka》

Spring for Apache Kafka(Spring Kafka)项目将核心的Spring概念应用到基于Kafka的消息传递解决方案的开发中。它提供了一个“模板”,作为发送消息的高级抽象。它还支持带有@kafkalistener注释和“监听器容器”的消息驱动POJO。这些库促进依赖注入和声明性的使用。在所有这些情况下,您将看到与Spring框架中的JMS支持和SpringAMQP中的rabbitmq支持的相似之处。

大概就是这么个意思吧;特性的话,也说了,提供模板类,消费的监听,生产的事务,测试

  • KafkaTemplate
  • KafkaMessageListenerContainer
  • @KafkaListener
  • KafkaTransactionManager
  • spring-kafka-test

2、kafka的安装(docker形式)

这里使用docker,做一个快速的单机版本安装,需要更详细的其他形式的安装,可以查看其他的相关资料;

2.1、下载zookeeper的镜像

docker pull wurstmeister/zookeeper

2.2、启动运行zookeeper镜像的实例

docker run -d --name zookeeper_01 -p 2181:2181 -t wurstmeister/zookeeper

这里只是暴露了2181的端口,如果是集群也可以继续开放28883888端口

28883888,分别为数据同步通讯端口以及选举通讯端口;

2.3、下载kafka的镜像

docker pull wurstmeister/kafka

2.4、启动运行kafka的镜像实例

docker run  -d --name kafka_01 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.16.2.41:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.2.41:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka

KAFKA_BROKER_ID:集群的时候,broker的编号,唯一

KAFKA_ZOOKEEPER_CONNECT:zookeeper的ip:端口,最好不要写loclhost什么的,物理机跟镜像还是有区别的,其他机器,也会访问不到

KAFKA_ADVERTISED_LISTENERS:kafka的监听的地址,注册入zookeeper的地址,也要写物理机映射的ip和端口

KAFKA_LISTENERS:0.0.0.0表示对所有的网络接口有效,如果hostname为空表示只对默认的网络接口有效,也就是说如果你没有配置advertised.listeners

2.5、查看启动的容器

docker ps

3、构建一个kafka的项目

<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
    xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.3.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.badger</groupId>
    <artifactId>badger-spring-boot-kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    <name>badger-spring-boot-kafka</name>
</project>

.1、定义yml配置文件,关于kafka的参数的详细定义,可以参考

《kafka-producer参数详细》《kafka–consumer参数》

spring:
  kafka:
    bootstrap-servers:
    - localhost:9092
    consumer:
      group-id: test
      enable-auto-commit: true
      auto-commit-interval: 1000
      auto-offset-reset: earliest
      client-id: test

org.springframework.boot.autoconfigure.kafka.KafkaProperties.class具体更可以参看配置类

3.2、定义生产者代码,通过定时器,不断写入数据

@Component
@EnableScheduling
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    /**
     * 定时任务
     */
    @Scheduled(cron = "0/1 * * * * ?")
    public void send() {
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("badger_test",
                UUID.randomUUID().toString());
        future.addCallback(o -> System.out.println("send-消息发送成功"), throwable -> System.out.println("消息发送失败:"));
    }
}

3.3、定义消费端代码

@Component
public class KafkaConsumer {

    @KafkaListener(topics = { "badger_test" })
    public void consumer(ConsumerRecord<String, Object> consumerRecord) {
        System.out.println("消息消费--》" + consumerRecord.value().getClass().getName() + "数据 -->" + consumerRecord.value());
    }
}

3.4、主启动类

@SpringBootApplication
public class KafkaApplication {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(KafkaApplication.class, args);
    }
}

4、测试演示

直接启动springboot的启动类就可以了

5、自动装配

最后,关于springboot对kafka的自动装配参看org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration.class

@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class,
		KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {

大概可以自己看下,自动注入了哪些类,需要定制化,重写就可以了

特别注意下

@Import({ KafkaAnnotationDrivenConfiguration.class,KafkaStreamsAnnotationDrivenConfiguration.class })

KafkaAnnotationDrivenConfiguration这个类

@Configuration
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {

有的说,需要加@EnableKafka注解,才能开启kafka的功能,这个类,就是不用开启注解,也能生效

主要也是装配了,在最后,大概可以自己去看

	@Configuration
	@EnableKafka
	@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
	protected static class EnableKafkaConfiguration {

	}

好了,就到这里,原理啥的,就不解释了,kafka是个比较复杂的东西,搞深入,还是需要时间的;

详细代码也可以参看《码云》

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐