第四章 Spring Boot 整合 Kafka消息队列 实战
Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的Kafka-client,用于在Spring Boot 项目里快速集成kafka。例如:以上就是今天要讲的内容,本文仅仅简单介绍了 Spring Boot 集成消息消费者的封装。维基框架framewiki-kafka: kafka 服务
系列文章目录
第一章 Kafka 配置部署及SASL_PLAINTEXT安全认证
第二章 Spring Boot 整合 Kafka消息队列 生产者
第三章 Spring Boot 整合 Kafka消息队列 消息者
第四章 Spring Boot 整合 Kafka消息队列 实战
前言
Kafka 是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。本文主是基于Spirng Boot封装了Apache 的Kafka-client,用于在Spring Boot 项目里快速集成kafka。
一、Kafka 是什么?
Apache Kafka是分布式发布-订阅消息系统。
它最初由LinkedIn公司开发,之后成为Apache项目的一部分。
Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
二、项目结构
三、服务端
1.引入库
引入需要依赖的jar包,引入POM文件
<dependencies>
<dependency>
<groupId>com.cdkjframework</groupId>
<artifactId>cdkj-core</artifactId>
<exclusions>
<exclusion>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.mybatis.dynamic-sql</groupId>
<artifactId>mybatis-dynamic-sql</artifactId>
</exclusion>
<exclusion>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
</exclusion>
<exclusion>
<groupId>com.cdkjframework</groupId>
<artifactId>cdkj-redis</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.cdkjframework</groupId>
<artifactId>cdkj-kafka</artifactId>
<version>1.0.5</version>
</dependency>
</dependencies>
2.配置文件
主配置文件 application.yml
server:
port: 8080
spring:
application:
name: framewiki-kafka-server
profiles:
active: dev
开发环境配置文件 application-dev.yml
spring:
custom:
kafka:
username: admin
password: admin-secret
partitions: 1
max-block: 60000
enable-auto-commit: false
batch-listener: false
bootstrap-servers:
- 192.168.0.95:9092
3.端启动类
启动类 KafkaApplication
引入kafka客户端自动配置注解 @EnableAutoKafka
在项目启动后发送消息测试是否功能
package com.framewiki.kafka.web;
import com.cdkjframework.core.spring.CdkjApplication;
import com.cdkjframework.kafka.producer.annotation.EnableAutoKafka;
import com.cdkjframework.kafka.producer.util.ProducerUtils;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
/**
* @ProjectName: framewiki-kafka
* @Package: com.framewiki.kafka.web
* @ClassName: KafkaApplication
* @Description: kafka 启动类
* @Author: xiaLin
* @Date: 2024/3/17 18:04
* @Version: 1.0
*/
@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
@EnableAutoKafka
public class KafkaApplication implements ApplicationRunner {
/**
* 启动类
*
* @param args 参数
*/
public static void main(String[] args) {
CdkjApplication.run(KafkaApplication.class, args);
}
/**
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
ProducerUtils.sendMessageAsync("test","2132123");
}
}
四、客户端
1.引入库
引入需要依赖的jar包,引入POM文件
<dependencies>
<dependency>
<groupId>com.cdkjframework</groupId>
<artifactId>cdkj-core</artifactId>
<exclusions>
<exclusion>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
</exclusion>
<exclusion>
<groupId>org.mybatis.dynamic-sql</groupId>
<artifactId>mybatis-dynamic-sql</artifactId>
</exclusion>
<exclusion>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
</exclusion>
<exclusion>
<groupId>com.cdkjframework</groupId>
<artifactId>cdkj-redis</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.cdkjframework</groupId>
<artifactId>cdkj-kafka-client</artifactId>
<version>1.0.5</version>
</dependency>
</dependencies>
2.配置文件
主配置文件 application.yml
server:
port: 8081
spring:
application:
name: framewiki-kafka-client
profiles:
active: dev
开发环境配置文件 application-dev.yml
spring:
custom:
kafka:
username: admin
password: admin-secret
topics: test,test2
groupId: framewiki-kafka-client
partitions: 1
enable-auto-commit: false
batch-listener: false
bootstrap-servers:
- 192.168.0.95:9092
3.端启动类
启动类 KafkaClientApplication
引入kafka客户端自动配置注解 @EnableAutoKafkaClient
package com.framewiki.kafka.client.web;
import com.cdkjframework.core.spring.CdkjApplication;
import com.cdkjframework.kafka.consumer.annotation.EnableAutoKafkaClient;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* @ProjectName: framewiki-kafka
* @Package: com.framewiki.kafka.client.web
* @ClassName: KafkaClientApplication
* @Author: xiaLin
* @Description: Java 类说明
* @Date: 2024/5/4 23:15
* @Version: 1.0
*/
@SpringBootApplication
@EnableAutoKafkaClient
public class KafkaClientApplication {
/**
* 启动主函数
*
* @param args 启动参数
*/
public static void main(String[] args) {
CdkjApplication.run(KafkaClientApplication.class, args);
}
}
4.主题监听服务
实现服务类 ConsumerServiceImpl,该服务继承封装包内的 ConsumerService 接口并实现方法
onMessage
package com.framewiki.kafka.client.web.service;
import com.cdkjframework.exceptions.GlobalException;
import com.cdkjframework.kafka.consumer.service.ConsumerService;
import com.cdkjframework.util.log.LogUtils;
import org.springframework.stereotype.Service;
/**
* @ProjectName: framewiki-kafka
* @Package: com.framewiki.kafka.client.web.service
* @ClassName: consumerServiceImpl
* @Author: xiaLin
* @Description: Java 类说明
* @Date: 2024/5/4 23:22
* @Version: 1.0
*/
@Service
public class ConsumerServiceImpl implements ConsumerService {
/**
* 日志
*/
private final LogUtils logUtils = LogUtils.getLogger(ConsumerServiceImpl.class);
/**
* 消息内容
*
* @param topics 主题
* @param message 内容
* @throws GlobalException 异常信息
*/
@Override
public void onMessage(String topics, String message) throws GlobalException {
logUtils.error("topics:{},message:{}", topics, message);
}
}
总结
例如:以上就是今天要讲的内容,本文仅仅简单介绍了 Spring Boot 集成消息消费者、生产者实践。
相对应的开源项目欢迎访问:维基框架
更多推荐
所有评论(0)