Offsets out of range with no configured reset policy for partition

假设我们有10000个数据
segment就把它分为0-1000,1000-2000,2000-3000…
当我们消费到4500的时候报错了,然后也没有进行处理,过了kafka的生命周期,kafka就把数据全部清理掉了,当kafka在次进行消费,4501时没有数据了就报Offsets out of range with no configured reset policy for partition

如何解决?

我们要实现一个offset的纠偏的工具类
scala语言编写的纠偏工具类:
注意事项:Kafkaconsumer传入conf时,他类型是java的map,而我们的kafkaparams是scala的,需要使用scala包下的collection.JavaConversions._

package com.cartravel.kafka

import java.time.Duration


import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.common.TopicPartition

import scala.collection.mutable
import scala.util.Try
import scala.collection.JavaConversions._
class kafkaOffsetCorrection(kafkaParams:Map[String,Object], topics:Seq[String], topicPartitionMap:mutable.HashMap[TopicPartition,Long]) extends Serializable {
  //主逻辑
  //currentorOffset,earliestOffset,latestOffset
  //矫正逻辑:curr < ear || cur > latest 说明offset不在有效范围之内了,我们就要纠正offset位置--> 从ear开始消费
  //开始矫正
  def Correction():Map[TopicPartition,Long] ={
    val earliestOffsets = getEarliestOffset//获取earliestOffset
    val latestOffset = getLatestOffset
    for ((k,v)<-topicPartitionMap){
      val current: Long = v
      val earliest: Long = earliestOffsets(k)
      val latest = latestOffset(k)
      if (current<earliest || current>latest){//如果成立说明需要进行纠偏-->offset到earliest
        topicPartitionMap.put(k,earliest)
        
      }
      
      
    }
    topicPartitionMap.toMap
  }






  private def getEarliestOffset: Map[TopicPartition,Long] ={
    var newKafkaParams = mutable.Map[String,Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest")

    //通过kafka的api去消费
   val consumer = new KafkaConsumer[String,Object](newKafkaParams) //这里主要,传入的是java的map,所以需要使用scala的JavaConversions._
    consumer.subscribe(topics)
    val pullData = Try {
      consumer.poll(Duration.ofMillis(0))

    }
    if (pullData.isFailure){
      //邮件报警
    }
    val topicPartitions: Set[TopicPartition] = consumer.assignment().toSet
    //暂停消费
    consumer.pause(topicPartitions)
    //从头开始
    consumer.seekToBeginning(topicPartitions)

    val earliestOffsetMap = topicPartitions.map(line=>(line,consumer.position(line))).toMap

    consumer.unsubscribe()
    consumer.close()

    earliestOffsetMap

  }

  private def getLatestOffset: Map[TopicPartition,Long] ={
    val newKafkaParams = mutable.Map[String,Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest")

    //通过kafka的api去消费
    val consumer: KafkaConsumer[String, Object] = new KafkaConsumer[String,Object](newKafkaParams)
    consumer.subscribe(topics)
    val pullData = Try {
      consumer.poll(Duration.ofMillis(0))

    }
    if (pullData.isFailure){
      //邮件报警
    }
    val topicPartitions: Set[TopicPartition] = consumer.assignment().toSet
    //暂停消费
    consumer.pause(topicPartitions)
    //从尾开始消费
    consumer.seekToEnd(topicPartitions)

    val earliestOffsetMap = topicPartitions.map(line=>(line,consumer.position(line))).toMap

    consumer.unsubscribe()
    consumer.close()

    earliestOffsetMap

  }
}

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐