收到生产环境告警,一直好好消费的客户端突然有天消费不到消息,事后找出原因:kafka生产者部分节点升级kafka-client的版本,导致发送到kafka的消息版本变高,由于服务端版本没有变,导致低版本的消费端向服务器获取消息时,服务端报异常

[2021-01-26 17:50:59,952] ERROR [KafkaApi-1] Error when handling request {replica_id=-1,max_wait_time=500,min_bytes=1,max_bytes=52428800,topics=[{topic=ucLoginLog1,partitions=[{partition=2,fetch_offset=0,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
java.lang.IllegalArgumentException: Magic v1 does not support record headers
        at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:385)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:568)
        at org.apache.kafka.common.record.AbstractRecords.convertRecordBatch(AbstractRecords.java:117)
        at org.apache.kafka.common.record.AbstractRecords.downConvert(AbstractRecords.java:98)
        at org.apache.kafka.common.record.FileRecords.downConvert(FileRecords.java:245)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:523)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1$$anonfun$apply$5.apply(KafkaApis.scala:521)
        at scala.Option.map(Option.scala:146)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:521)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$convertedPartitionData$1$1.apply(KafkaApis.scala:511)
        at scala.Option.flatMap(Option.scala:171)
        at kafka.server.KafkaApis.kafka$server$KafkaApis$$convertedPartitionData$1(KafkaApis.scala:511)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:559)
        at kafka.server.KafkaApis$$anonfun$kafka$server$KafkaApis$$createResponse$2$1.apply(KafkaApis.scala:558)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(I

排查过程试过调整偏移量:发现消费到特定位置就不消费,事后来看之前能消费的消息是低版本推送过来的消息,后面的消息为高版本发送的消息,服务端响应消费者的时候抛出版本不一致的消息异常,消费者没有获取到偏移量,导致消费者无法加入当前组,消费者线程挂起

通过源码分析:

结合异常堆栈消息查看kafka服务端源码具体分析如下:

 

在Broker拉取到Kakfa消息后,调用fetchResponseCallback回调方法,创建返回信息时,会校验消费者Api版本,如果低于当前Broker版本与向下转换消息,最终调用的是MemoryRecordsBuilder类的方法appendWithOffset

 

抛出异常:java.lang.IllegalArgumentException: Magic v1 does not support record headers

 

得出结论,kafka虽然解耦了架构,但还是涉及到版本问题的兼容性,对生产应急影响很大。

 

 

 

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐