问题产生原理:

kafka会有三种日志留存策略:

基于空间维度

基于时间维度

基于起始位移维度

 

留存机制工作原理:

每个Kafka broker启动时,都会在后台开启一个定时任务,定期去检查执行所有topic日志留存,这个定时任务触发时间周期由broker端参数 log.retention.check.interval.ms控制,默认时5分钟,即每台broker每5分钟会尝试检查下是否有可删除日志,如果要缩短间隔时间,只要调小这个参数即可。

 

生产上有何产生?

业务场景是我的实时计算要对三种不同数据结构的topic进行统计计算,在消费者消费数据缓慢情况下,kafka的日志已经会删除了,此时流处理中的offset无法找到kafka-log文件中所与之匹配的日志信息,则会报错。会有头越界和尾越界两种问题

 

解决办法:

在 kafka/config/server.properties配置文件中 log.retention.hours 默认为1,改成24

同时在代码中每个启动流计算先更新offset成earliestOffset,git中有demo:

https://github.com/xtr1993/spark_streaming_outOfRangeException.git

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐