springboot2.0 整合kafka
我们使用springboot 2.1.3版本<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>...
我们使用springboot 2.1.3版本
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
查看springboot的官网,对kafka的定义
https://spring.io/projects/spring-kafka#overview
Spring for Apache Kafka(Spring Kafka)项目将核心的Spring概念应用到基于Kafka的消息传递解决方案的开发中。它提供了一个“模板”,作为发送消息的高级抽象。它还支持带有@kafkalistener注释和“监听器容器”的消息驱动POJO。这些库促进依赖注入和声明性的使用。在所有这些情况下,您将看到与Spring框架中的JMS支持和SpringAMQP中的rabbitmq支持的相似之处。
大概就是这么个意思吧;特性的话,也说了,提供模板类,消费的监听,生产的事务,测试
-
KafkaTemplate
-
KafkaMessageListenerContainer
-
@KafkaListener
-
KafkaTransactionManager
-
spring-kafka-test
构建以个kafka的项目
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
定义yml配置文件,关于kafka的参数的详细定义,可以参考
《kafka-producer参数详细》《kafka--consumer参数》
spring:
kafka:
bootstrap-servers:
- 172.16.2.54:9092
consumer:
group-id: test
enable-auto-commit: true
auto-commit-interval: 1000
auto-offset-reset: earliest
client-id: test
定义生产者代码,通过定时器,不断写入数据
@Component
@EnableScheduling
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 定时任务
*/
@Scheduled(cron = "0/1 * * * * ?")
public void send() {
String message = UUID.randomUUID().toString();
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send("test", message);
future.addCallback(o -> System.out.println("send-消息发送成功:" + message),
throwable -> System.out.println("消息发送失败:" + message));
}
}
定义消费端代码
@Component
public class KafkaConsumer {
@KafkaListener(topics = { "test" })
public void consumer(ConsumerRecord<String, String> consumerRecord) {
System.out.println("消息消费--》" + consumerRecord.value());
}
}
然后直接启动springboot的启动类就可以了,这样就对kafka整合完成了。
最后,关于springboot对kafka的自动装配参看 org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration.class
@Configuration
@ConditionalOnClass(KafkaTemplate.class)
@EnableConfigurationProperties(KafkaProperties.class)
@Import({ KafkaAnnotationDrivenConfiguration.class,
KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {
大概可以自己看下,自动注入了哪些类,需要定制化,重写就可以了
特别注意下
@Import({ KafkaAnnotationDrivenConfiguration.class,
KafkaStreamsAnnotationDrivenConfiguration.class })
KafkaAnnotationDrivenConfiguration这个类
@Configuration
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {
有的说,需要加@EnableKafka注解,才能开启kafka的功能,这个类,就是不用开启注解,也能生效
主要也是装配了,在最后,大概可以自己去看
@Configuration
@EnableKafka
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
protected static class EnableKafkaConfiguration {
}
好了,就到这里,原理啥的,就不解释了,kafka是个比较复杂的东西,搞深入,还是需要时间的
更多推荐
所有评论(0)