[2018-11-23 15:35:14,958] ERROR Processor got uncaught exception. (kafka.network.Processor)
java.lang.ArrayIndexOutOfBoundsException: 18
at org.apache.kafka.common.protocol.ApiKeys.forId(ApiKeys.java:68)
at org.apache.kafka.common.requests.AbstractRequest.getRequest(AbstractRequest.java:39)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:79)
at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:426)
at kafka.network.Processor$$anonfun$run$11.apply(SocketServer.scala:421)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at kafka.network.Processor.run(SocketServer.scala:421)
at java.lang.Thread.run(Thread.java:748)

原因:新版sdk访问较旧版的kafka, 发送kafka不支持的request

  1. 当前用的kafka版本为0.9.0.1, 支持的request最大id为16, 这个18是新版 kafka中的ApiVersion Request, 因此会抛这个异常出来;
    1. 跟了一下代码, 在SocketServer中:
    2. try { val channel = selector.channel(receive.source) val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName), channel.socketAddress) val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol) requestChannel.sendRequest(req) } catch { case e @ (_: InvalidRequestException | _: SchemaException) => // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier error("Closing socket for " + receive.source + " because of error", e) isClose = true close(selector, receive.source) }
    3. 在处理Request时并未处理这个异常,导致这个异常被其外层的 try...catch...处理, 直接进入了下一轮的 selector.poll(300), 而在这个 selector.poll(300)中会清理之前所有的接收到的Requests, 这就导致在这种情况下,可能会漏处理一些Request, 这样看起来还是个比较严重的问题;
    4.  
      解决:
      selector.completedReceives.asScala.foreach { receive =>
                var isClose = false
      
                try {
                  val channel = selector.channel(receive.source)
                  val session = RequestChannel.Session(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, channel.principal.getName),
                    channel.socketAddress)
                  val req = RequestChannel.Request(processor = id, connectionId = receive.source, session = session, buffer = receive.payload, startTimeMs = time.milliseconds, securityProtocol = protocol)
                  requestChannel.sendRequest(req)
                } catch {
                  case e @ (_: InvalidRequestException | _: SchemaException) =>
                    // note that even though we got an exception, we can assume that receive.source is valid. Issues with constructing a valid receive object were handled earlier
                    error("Closing socket for " + receive.source + " because of error", e)
                    isClose = true
                    close(selector, receive.source)
                  case e : ArrayIndexOutOfBoundsException =>
                    error("NotSupport Request | Closing socket for " + receive.source + " because of error", e)
                    isClose = true
                    close(selector, receive.source)
                }
                if (!isClose) {
                  selector.mute(receive.source)
                }
              }
      

        

转载于:https://www.cnblogs.com/huiandong/p/10008032.html

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐