spring-boot 2.3.x 整合kafka
spring-boot 2.3.x整合kafka文章目录spring-boot 2.3.x整合kafka1、查看springboot的官网,对kafka的定义2、kafka的安装(docker形式)2.1、下载zookeeper的镜像2.2、启动运行zookeeper镜像的实例2.3、下载kafka的镜像2.4、启动运行kafka的镜像实例2.5、查看启动的容器3、构建一个kafka的项目3.1、
spring-boot 2.3.x 整合kafka
文章目录
本地项目的基础环境
环境 | 版本 |
---|---|
jdk | 1.8.0_201 |
maven | 3.6.0 |
Spring-boot | 2.3.3.RELEASE |
1、查看springboot的官网,对kafka的定义
Spring for Apache Kafka(Spring Kafka)项目将核心的Spring概念应用到基于Kafka的消息传递解决方案的开发中。它提供了一个“模板”,作为发送消息的高级抽象。它还支持带有@kafkalistener注释和“监听器容器”的消息驱动POJO。这些库促进依赖注入和声明性的使用。在所有这些情况下,您将看到与Spring框架中的JMS支持和SpringAMQP中的rabbitmq支持的相似之处。
大概就是这么个意思吧;特性的话,也说了,提供模板类,消费的监听,生产的事务,测试
KafkaTemplate
KafkaMessageListenerContainer
@KafkaListener
KafkaTransactionManager
spring-kafka-test
2、kafka的安装(docker形式)
这里使用docker,做一个快速的单机版本安装,需要更详细的其他形式的安装,可以查看其他的相关资料;
2.1、下载zookeeper的镜像
docker pull wurstmeister/zookeeper
2.2、启动运行zookeeper镜像的实例
docker run -d --name zookeeper_01 -p 2181:2181 -t wurstmeister/zookeeper
这里只是暴露了2181
的端口,如果是集群也可以继续开放2888
和3888
端口
2888
和3888
,分别为数据同步通讯端口以及选举通讯端口;
2.3、下载kafka的镜像
docker pull wurstmeister/kafka
2.4、启动运行kafka的镜像实例
docker run -d --name kafka_01 -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.16.2.41:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.16.2.41:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -t wurstmeister/kafka
KAFKA_BROKER_ID
:集群的时候,broker的编号,唯一
KAFKA_ZOOKEEPER_CONNECT
:zookeeper的ip:端口,最好不要写loclhost什么的,物理机跟镜像还是有区别的,其他机器,也会访问不到
KAFKA_ADVERTISED_LISTENERS
:kafka的监听的地址,注册入zookeeper的地址,也要写物理机映射的ip和端口
KAFKA_LISTENERS
:0.0.0.0表示对所有的网络接口有效,如果hostname为空表示只对默认的网络接口有效,也就是说如果你没有配置advertised.listeners
2.5、查看启动的容器
docker ps
3、构建一个kafka的项目
<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.badger</groupId>
<artifactId>badger-spring-boot-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<maven-jar-plugin.version>3.1.1</maven-jar-plugin.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
<name>badger-spring-boot-kafka</name>
</project>
.1、定义yml配置文件,关于kafka的参数的详细定义,可以参考
《kafka-producer参数详细》《kafka–consumer参数》
spring:
kafka:
bootstrap-servers:
- localhost:9092
consumer:
group-id: test
enable-auto-commit: true
auto-commit-interval: 1000
auto-offset-reset: earliest
client-id: test
org.springframework.boot.autoconfigure.kafka.KafkaProperties.class
具体更可以参看配置类
3.2、定义生产者代码,通过定时器,不断写入数据
@Component
@EnableScheduling
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* 定时任务
*/
@Scheduled(cron = "0/1 * * * * ?")
public void send() {
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("badger_test",
UUID.randomUUID().toString());
future.addCallback(o -> System.out.println("send-消息发送成功"), throwable -> System.out.println("消息发送失败:"));
}
}
3.3、定义消费端代码
@Component
public class KafkaConsumer {
@KafkaListener(topics = { "badger_test" })
public void consumer(ConsumerRecord<String, Object> consumerRecord) {
System.out.println("消息消费--》" + consumerRecord.value().getClass().getName() + "数据 -->" + consumerRecord.value());
}
}
3.4、主启动类
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(KafkaApplication.class, args);
}
}
4、测试演示
直接启动springboot的启动类就可以了
5、自动装配
最后,关于springboot对kafka的自动装配参看org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration.class
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class,
KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {
大概可以自己看下,自动注入了哪些类,需要定制化,重写就可以了
特别注意下
@Import({ KafkaAnnotationDrivenConfiguration.class,KafkaStreamsAnnotationDrivenConfiguration.class })
KafkaAnnotationDrivenConfiguration
这个类
@Configuration
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {
有的说,需要加@EnableKafka注解,才能开启kafka的功能,这个类,就是不用开启注解,也能生效
主要也是装配了,在最后,大概可以自己去看
@Configuration
@EnableKafka
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableKafkaConfiguration {
}
好了,就到这里,原理啥的,就不解释了,kafka是个比较复杂的东西,搞深入,还是需要时间的;
详细代码也可以参看《码云》
更多推荐
所有评论(0)