Springboot与Kafka的小插曲
目录1.背景2.环境3.应用1)pom.xml2)application.yml3)main方法4)Entity5)ServiceImpl6)controller4.遇到的问题5.参考资料1.背景Kafka是什么,用途是什么,这在度娘那里一问便知。正所谓“实践是检验真理的唯一标准”,于是在应用中增加了Kafka,然后遭遇到一些小怪兽,这里把打怪的过程简单记录了下来~~2.环境springboot2
目录
1.背景
Kafka是什么,用途是什么,这在度娘那里一问便知。正所谓“实践是检验真理的唯一标准”,于是在应用中增加了Kafka,然后遭遇到一些小怪兽,这里把打怪的过程简单记录了下来~~
2.环境
springboot | 2.1.2.RELEASE |
spring-kafka | 2.2.0.RELEASE |
kafka-clients | 2.1.1 |
3.应用
应用的场景大意:
根据ID查询某个对象的信息,包括name和id,发送给kafka;然后将查询次数记录在redis中
1)pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>5.1.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
</dependencies>
2)application.yml
spring:
# REDIS
redis:
host: 127.0.0.1
port: 6379
database: 0
timeouout: 1800000
lettuce:
pool:
max-active: 20
max-wait: -1
max-idle: 5
min-idle: 0
# KAFKA
kafka:
bootstrap-servers: 172.20.220.68:9092
template:
default-topic: COUNT_QUERIED_COURSE_BY_ID
producer:
retries: 0
batch-size: 4096
buffer-memory: 40960
acks: 1
consumer:
enable-auto-commit: false
heartbeat-interval: 100
auto-offset-reset: latest
group-id: course
listener:
poll-timeout: 20000
concurrency: 1
3)main方法
添加注解:
@EnableKafka
4)Entity
@Data
public class MyEntity implements Serializable {
private static final long serialVersionUID = 1L;
private String id;
private String name;
}
5)ServiceImpl
1)MyCacheServiceImpl和MyCacheService
@Service
public class MyCacheServiceImpl implements MyCacheService {
@Resource
private RedisTemplate<String, Object> redisTemplate;
@Override
public Object getCacheKey(String key) {
ValueOperations<String, Object> valueOperations = redisTemplate.opsForValue();
return valueOperations.get(key);
}
@Override
public void setCacheToRedis(String key, Object value) {
ValueOperations<String, Object> valueOperations = redisTemplate.opsForValue();
valueOperations.set(key, value);
}
@Override
public void removeKey(String key) {
redisTemplate.delete(key);
}
}
public interface MyCacheService {
Object getCacheKey(String key);
void setCacheToRedis(String key, Object value);
void removeKey(String key);
}
2)MyServiceImpl和MyService
这里省略,就是根据id查询出MyEntity整个对象即可
6)controller
@Value("${spring.kafka.template.default-topic}")
private String TOPIC;
@Resource
private MyService myService;
@Resource
private MyCacheService myCacheService;
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@GetMapping("getDetail/{id}")
public CommonResult getDetail(@PathVariable String id) {
MyEntity myEntity = myService.getDetail(id);
// 发送给KAFKA
kafkaTemplate.send(new ProducerRecord<>(TOPIC, id, myEntity.getName()));
return CommonResult.ok().data("item", myEntity);
}
@KafkaListener(topics = "${spring.kafka.template.default-topic}") // 这里只能用static的变量,否则会报错
public void myConsumer(ConsumerRecord<?, ?> consumerRecord) throws Exception {
Optional<?> kafkaMessage = Optional.ofNullable(consumerRecord.value());
if (!kafkaMessage.isPresent()) {
System.out.println("kafka数据消费失败");
}
String id = consumerRecord.key().toString();
String name = kafkaMessage.get().toString();
Integer total = (Integer) myCacheService.getCacheKey(TOPIC + ":" + id + ":" + name);
if (null != total) {
total = total + 1;
myCacheService.removeKey(TOPIC + ":" + id + ":" + name);
} else {
total = 1;
}
myCacheService.setCacheToRedis(TOPIC + ":" + id + ":" + name, total);
}
4.遇到的问题
1)依赖包版本问题
2)出现类似报错: springboot和kafka整合过程中出现的一个错误_wang_jun_jie的博客-CSDN博客
原因是:依赖包缺少了spring-messaging和spring-retry
3)出现类似报错:No entry found for connection 0
需要添加kafka所在IP地址的解析。
参考:
Kafka: No entry found for connection 0_se7en_q的博客-CSDN博客
网上另一种解决方法如下,但在当下环境无效
4)出现类似报错:not present and missingTopicsFatal is true
原因:当前springboot的版本较低,为了配合该版本,所使用的kafka版本也较低。会出现的情况是,无topic时,kafka无法启动。
解决方法:可以代码中先创建topic,或者后台手动创建,或者升级springboot和kafka版本
参考:
Spring kafka踩坑记 missingTopicsFatal_huenbin的博客-CSDN博客_missingtopicsfatal
5.参考资料
kafka template操作kafka_想要养只布偶的西瑞的博客-CSDN博客_kafkatemplate
SpringBoot集成kafka全面实战_Felix-CSDN博客_springboot集成kafka
Springboot2整合kafka的两种使用方式_冲动的仔bb博客-CSDN博客_springboot引入kafka
SpringBoot2.x 整合Kafka - shine-rainbow - 博客园
kafka学习(五)Spring Boot 整合 kafka - 有梦想的肥宅 - 博客园
SpringBoot微服务电商项目开发实战 --- Kafka集成接入 - 码农大哥 - 博客园
kafka springboot 集成配置+测试_yang_zzu的博客-CSDN博客_springboot集成kafka配置
kafka发送消息的三种方式_bin的主栏-CSDN博客_kafka发送消息
lynn_parents: SpringBoot集成Dubbo、Zookeeper实现的一个分布式电商项目,集成了Redis,Kafka - Gitee.com
springboot kafka @KafkaListener 动态指定topics_星雨心梦的博客-CSDN博客_kafka topics 动态
更多推荐
所有评论(0)