生产环境kafka消费者线程停止消费原因排查
收到生产环境告警,一直好好消费的客户端突然有天消费不到消息,事后找出原因:kafka生产者部分节点升级kafka-client的版本,导致发送到kafka的消息版本变高,由于服务端版本没有变,导致低版本的消费端向服务器获取消息时,服务端报异常[2021-01-26 17:50:59,952] ERROR [KafkaApi-1] Error when handling request {repli
收到生产环境告警,一直好好消费的客户端突然有天消费不到消息,事后找出原因:kafka生产者部分节点升级kafka-client的版本,导致发送到kafka的消息版本变高,由于服务端版本没有变,导致低版本的消费端向服务器获取消息时,服务端报异常
[2021-01-26 17:50:59,952] ERROR [KafkaApi-1] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,topics=[{topic=ucLoginLog1,partitions=[{partition=2,fetch_offset=0,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers
at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
at scala.Option.map(Option.scala:146)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
at scala.Option.flatMap(Option.scala:171)
at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(I
排查过程试过调整偏移量:发现消费到特定位置就不消费,事后来看之前能消费的消息是低版本推送过来的消息,后面的消息为高版本发送的消息,服务端响应消费者的时候抛出版本不一致的消息异常,消费者没有获取到偏移量,导致消费者无法加入当前组,消费者线程挂起
通过源码分析:
结合异常堆栈消息查看kafka服务端源码具体分析如下:
在Broker拉取到Kakfa消息后,调用fetchResponseCallback回调方法,创建返回信息时,会校验消费者Api版本,如果低于当前Broker版本与向下转换消息,最终调用的是MemoryRecordsBuilder类的方法appendWithOffset
抛出异常:java.lang.IllegalArgumentException: Magic v1 does not support record headers
得出结论,kafka虽然解耦了架构,但还是涉及到版本问题的兼容性,对生产应急影响很大。
更多推荐
所有评论(0)