0.背景

kafka的服务端和客户端之间使用IP连接,服务端也未配置任何域名,但是却出现了一个奇怪的问题。kafka客户端达到一定数量的时候,大量的连接在超时,服务端出现了许多CLOSE_WAIT状态的连接,抓包发现服务端有时候关闭完一个连接需要花费几分钟的时间,超出了客户端设置的超时时间,客户端在不断重连,服务端的CLOSE_WAIT越来越多。表现出来的问题表象就是客户端连接超时,连kafka集群内部通信也超时,ISR集合在不断的刷新,kafka无法正常对外提供服务。最终解决办法是在服务端的/etc/hosts中增加了所有客户端的IP和hostname映射。

为什么listeners配置的明明是IP,客户端也是使用IP连接,却需要修改/etc/hosts来解决这个问题。

1.从源码角度分析到底哪里使用了hostname

首先这个问题在客户端数量较少的时候并不会出现,出现有两个条件:

  • kafka服务端将协议从SASL_PLAINTEXT修改为了SASL_SSL
  • 大量的客户端在重连

客户端与服务端建立的过程中,很多地方使用到了InetAddress,比如:
SaslChannelBuilder的buildTransportLayer(仅SASL_SSL协议用到)

    protected TransportLayer buildTransportLayer(String id, SelectionKey key, SocketChannel socketChannel,
                                                 ChannelMetadataRegistry metadataRegistry) throws IOException {
        if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
            return SslTransportLayer.create(id, key,
                sslFactory.createSslEngine(socketChannel.socket().getInetAddress().getHostName(),
                    socketChannel.socket().getPort()),
                metadataRegistry);
        } else {
            return new PlaintextTransportLayer(key);
        }
    }

SaslServerAuthenticator的createSaslServer

    private void createSaslServer(String mechanism) throws IOException {
        this.saslMechanism = mechanism;
        Subject subject = subjects.get(mechanism);
        final AuthenticateCallbackHandler callbackHandler = callbackHandlers.get(mechanism);
        if (mechanism.equals(SaslConfigs.GSSAPI_MECHANISM)) {
            saslServer = createSaslKerberosServer(callbackHandler, configs, subject);
        } else {
            try {
                saslServer = Subject.doAs(subject, (PrivilegedExceptionAction<SaslServer>) () ->
                    Sasl.createSaslServer(saslMechanism, "kafka", serverAddress().getHostName(), configs, callbackHandler));
            } catch (PrivilegedActionException e) {
                throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
            }
        }
    }

SaslChannelBuilder的buildChannel

    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize,
                                     MemoryPool memoryPool, ChannelMetadataRegistry metadataRegistry) throws KafkaException {
        try {
            SocketChannel socketChannel = (SocketChannel) key.channel();
            Socket socket = socketChannel.socket();
            TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel, metadataRegistry);
            Supplier<Authenticator> authenticatorCreator;
            if (mode == Mode.SERVER) {
                authenticatorCreator = () -> buildServerAuthenticator(configs,
                        Collections.unmodifiableMap(saslCallbackHandlers),
                        id,
                        transportLayer,
                        Collections.unmodifiableMap(subjects),
                        Collections.unmodifiableMap(connectionsMaxReauthMsByMechanism),
                        metadataRegistry);
            } else {
                LoginManager loginManager = loginManagers.get(clientSaslMechanism);
                authenticatorCreator = () -> buildClientAuthenticator(configs,
                        saslCallbackHandlers.get(clientSaslMechanism),
                        id,
                        socket.getInetAddress().getHostName(),
                        loginManager.serviceName(),
                        transportLayer,
                        subjects.get(clientSaslMechanism));
            }
            return new KafkaChannel(id, transportLayer, authenticatorCreator, maxReceiveSize,
                memoryPool != null ? memoryPool : MemoryPool.NONE, metadataRegistry);
        } catch (Exception e) {
            log.info("Failed to create channel due to ", e);
            throw new KafkaException(e);
        }
    }

这些意味着在客户端与服务端建立连接的过程中,需要多次解析hostname,即使我们是使用IP进行连接的。
为了验证这个排查思路,我们写了如下的一小段代码分别在正常环境和问题环境获取hostname

    public static void main(String[] args) throws UnknownHostException {
        String ip = args[0];
        long start = System.currentTimeMillis();
        InetAddress inetAddress = InetAddress.getByName(ip);
        System.out.println("get hostname:" + inetAddress.getHostName());
        long end = System.currentTimeMillis();
        System.out.println(end - start);
    }

发现正常环境在服务端获取客户端的hostname 1ms的时间都用不了,而问题环境却需要花费5秒钟的时间。
这个建立连接的过程中多次获取hostname,导致超时,客户端主动断开连接,而服务端响应关闭时客户端早已因超时强制关掉了连接,因此在服务端会出现CLOSE_WAIT状态的连接。CLOSE_WAIT越堆积越多,连接的处理就会越慢,超时情况就会越来越严重,直到服务端再无法对外提供服务。

2.为什么只在服务端增加IP hostname映射关系便可解决

从源码看其实kafka的服务端和客户端都在解析hostname,但是我们仅仅在服务端将客户端的IP hostname映射配置到/etc/hosts问题就解决了,而且没有重启任何服务,kafka集群就自行恢复正常了。
因为对于一个会在短时间内接受大量连接的服务端而言解析hostname会造成极大的性能问题,比如系统刚启动的时候,业务都在并发的启动,服务端会在短时间内收到所有客户端建立连接的请求,这时候对于服务端而言,若是每个连接要处理20s,那么100个连接就是2000s,况且出现问题的环境,远不止100个连接。这就是说,若是服务端解析hostname很耗时,对于服务端而言压力巨大,而对于单个客户端而言,解析一次hostname耗时久仅仅是建立连接过程慢一些,建立的是长连接并不会有很大的性能损耗。

3.这真的是合理的解决办法吗

一次严重的kafka集群不可用问题,就在配置了/etc/hosts后神奇的解决了。但是这样做真的合理吗?若是kafka客户端是动态扩容的,这服务端的/etc/hosts又该如何刷新?

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐