当我们使用producer client发送消息之前,需要创建该对象:

Producer<String, String> producer = new KafkaProducer<>(props);

所以从这里开始,首先,该类的注释一定要看,里面包含大量的最佳实践,大概的内容有如下几点:

(1)kafka producer是线程安全的。

(2)不要使用多个实例,多个线程共享同一个producer的实例就可以了。

(3)producer包含了一个内存缓冲池,这个缓冲中包含了一些要发送的数据,producer负责将这些数据转成request发送出去,关闭producer失败会导致数据丢失。

(4)send方法是个异步操作,执行会将数据写入本地缓存,会立即返回,但是这不意味着数据已经发送到broker。

(5)buffer.memory用来控制使用的最大缓存,如果生成的速度比较快导致缓存满了,那么会阻塞max.block.ms这么长时间,如果这么长时间过去之后还是block,则抛出timeout异常。

(6)还有acks,retries等参数的配置。

接着就是构造函数的执行:

private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
    try {
        log.trace("Starting the Kafka producer");
        //用户设置的参数
        Map<String, Object> userProvidedConfigs = config.originals();
        this.producerConfig = config;
        this.time = new SystemTime();
        //1.生成client.id配置,如果没有配置则使用"producer-{id}"这样的自动生成的id,是个自增长的数量
        clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
        if (clientId.length() <= 0)
            clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
        Map<String, String> metricTags = new LinkedHashMap<String, String>();
        metricTags.put("client-id", clientId);
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
                .tags(metricTags);
        List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                MetricsReporter.class);
        reporters.add(new JmxReporter(JMX_PREFIX));
        this.metrics = new Metrics(metricConfig, reporters, time);
        //2.创建分区器,用来决定发送的消息路由到topic的哪个分区。PARTITIONER_CLASS_CONFIG:partitioner.class,设置分区器,如果没有指定则使用默认的
        this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
        //retry.backoff.ms,生产端发送失败会重试,失败后重新发送请求的间隔时间,防止短时间内重复多次发送请求
        long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
        //使用用户设置的key.serializer
        if (keySerializer == null) {
            this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                    Serializer.class);
            this.keySerializer.configure(config.originals(), true);
        } else {
            config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
            this.keySerializer = keySerializer;
        }
        //使用用户设置的value.serializer
        if (valueSerializer == null) {
            this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                    Serializer.class);
            this.valueSerializer.configure(config.originals(), false);
        } else {
            config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
            this.valueSerializer = valueSerializer;
        }

        // load interceptors and make sure they get clientId
        userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
        //3.自定义拦截器,很少使用
        List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                ProducerInterceptor.class);
        this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);

        ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
        //4.元数据,代表了集群目前的状态,以及每个partition及leader情况,如果发现写入的topic对应的元数据不再本地,就会去broker上拉取元数据信息
        //metadata.fetch.timeout.ms:超时时间,默认1分钟,默认每隔五分钟强制刷新
        this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
        //一次request请求最大的大小,一个请求中会包含
        this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
        //producer端总的内存大小,这个内存不能设置的跟jvm内存一样大,因为jvm的内存还要做数据压缩,inflight数据等使用,默认32M大小
        this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
        //压缩类型,默认不压缩,可以使用gzip,snappy,lz4 这几种压缩方式
        this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
        /* check for user defined settings.
         * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
         * This should be removed with release 0.9 when the deprecated configs are removed.
         */
        //在缓存区满了之后是否block,默认是false,超过一段时间后就会抛出异常,默认block一分钟
        if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
            log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
                    "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
            boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
            if (blockOnBufferFull) {
                this.maxBlockTimeMs = Long.MAX_VALUE;
            } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
                log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
                this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
            } else {
                this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
            }
        } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
            log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
                    "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
            this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
        } else {
            this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
        }

        /* check for user defined settings.
         * If the TIME_OUT config is set use that for request timeout.
         * This should be removed with release 0.9
         */
        //REQUEST_TIMEOUT_MS_CONFIG:请求的超时时间,默认30s,超过时间则重试,重试次数耗尽则报错
        if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
            log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
                    ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
            this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
        } else {
            this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
        }
        //5.数据缓存核心组件,
        // BATCH_SIZE_CONFIG:batch.size,每批数据量的大小,一个batch包含多个record。默认16384 bytes = 16KB
        // 一次请求Request对应一个broker,request会包含多个batch,每个batch对应了这个broker上的一个partition,
        // batch太小会导致频繁发送request,导致网络通信次数变多,导致吞吐量变小。如果设置成0.那就不打包,来一条发一条。
        // 如果batch过大则内存里会缓存大量的batch,浪费内存。
        // LINGER_MS_CONFIG:linger.ms,默认是0。如果到了这个时间还没有到达batch大小,到了linger时间也会发送出去。
        this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                this.totalMemorySize,
                this.compressionType,
                config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                retryBackoffMs,
                metrics,
                time);
        //BOOTSTRAP_SERVERS_CONFIG:bootstrap.servers,broker的地址列表,由用户编写的程序提供
        List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
        //6.这里并没有更新元数据
        this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
        //创建KafkaChannel
        ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
        //初始化NetworkClient。后面的通信底层都是基于这个类
        //CONNECTIONS_MAX_IDLE_MS_CONFIG:最大网络连接空闲时间就要被回收掉。默认9分钟。
        //MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:max.in.flight.requests.per.connection,每个连接最多有多少个没收到响应就停止发送数据
        //RECONNECT_BACKOFF_MS_CONFIG:reconnect.backoff.ms,重试间隔时间
        //SEND_BUFFER_CONFIG:send.buffer.bytes,tcp网络连接发送缓冲区大小。默认128K
        //RECEIVE_BUFFER_CONFIG:receive.buffer.bytes,tcp网络连接接收缓冲区大小。默认32K
        NetworkClient client = new NetworkClient(
                new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
                this.metadata,
                clientId,
                config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
                config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                this.requestTimeoutMs, time);
        //sender是个线程,发送数据的线程
        //ACKS_CONFIG:acks.0表示写入本地缓存就算成功,1表示写入了leader。all表示isr列表里全部写成功。
        this.sender = new Sender(client,
                this.metadata,
                this.accumulator,
                config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
                config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
                config.getInt(ProducerConfig.RETRIES_CONFIG),
                this.metrics,
                new SystemTime(),
                clientId,
                this.requestTimeoutMs);
        //启动包装了sender的KafkaThread线程
        String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
        this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
        this.ioThread.start();

        this.errors = this.metrics.sensor("errors");


        config.logUnused();
        AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
        log.debug("Kafka producer started");
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed
        // this is to prevent resource leak. see KAFKA-2121
        close(0, TimeUnit.MILLISECONDS, true);
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka producer", t);
    }
}

producer主要构造了如下重要的组件:

1.clientId的构造,这里如果使用kafka的限流功能,需要手动指定这个id

2.指定分区器,来指定数据发送到哪个分区。

3.指定kv序列化器。

4.设置retry,acks,超时时间,缓冲区大小,批大小等参数。

5.初始化元数据。

6.设置压缩。

7.构造NetworkClient 。

8.构造KafkaThread,该thread包装了send线程。

这里重点分析如下三块:

(1)初始化元数据及update方法。

(2)构造NetworkClient 。

(3)开启KafkaThread线程

(1)初始化元数据及update方法。

metadata在new出来后,this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());实际上并没有更新元数据信息。metadata类中最重要的就是Cluaster属性,cluster中主要变量:

private final boolean isBootstrapConfigured;
//node是对kafka一台机器的封装,包含host,port,id等信息
private final List<Node> nodes;
private final Set<String> unauthorizedTopics;
private final Set<String> internalTopics;
//集群所有的partition
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
//一个topic中所有的partition信息
private final Map<String, List<PartitionInfo>> partitionsByTopic;
//可用的topic集合
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
//每个node上有哪些partition
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
//每个nodeId与node的对应关系
private final Map<Integer, Node> nodesById;
private final ClusterResource clusterResource;

这里面还有个重要的类:PartitionInfo,主要包含如下变量,比较重要的是ISR列表:inSyncReplicas的概念,里面包含了leader本身已经与leader保持同步的follower的信息。

public class PartitionInfo {
    private final String topic;
    private final int partition;
    //leader
    private final Node leader;
    //副本
    private final Node[] replicas;
    //ISR列表
    private final Node[] inSyncReplicas;

 

Cluster.bootstrap方法:通过bootstrap方法创建一个Cluster,创建一个cluster后返回:

public static Cluster bootstrap(List<InetSocketAddress> addresses) {
    List<Node> nodes = new ArrayList<>();
    int nodeId = -1;
    for (InetSocketAddress address : addresses)
        nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
    return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet());
}

 

这里根据用户代码的配置生成了Node,每个node代表了一台kafka服务端的机器,多个node对象及其他一些topic信息组成Cluster对象,但是此时一些topic信息是空的,只是做了初始化。

再来分析update方法:

public synchronized void update(Cluster cluster, long now) {
    Objects.requireNonNull(cluster, "cluster should not be null");

    this.needUpdate = false;
    this.lastRefreshMs = now;
    this.lastSuccessfulRefreshMs = now;
    this.version += 1;

    if (topicExpiryEnabled) {
        // Handle expiry of topics from the metadata refresh set.
        for (Iterator<Map.Entry<String, Long>> it = topics.entrySet().iterator(); it.hasNext(); ) {
            Map.Entry<String, Long> entry = it.next();
            long expireMs = entry.getValue();
            if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE)
                entry.setValue(now + TOPIC_EXPIRY_MS);
            else if (expireMs <= now) {
                it.remove();
                log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(), expireMs, now);
            }
        }
    }
    //监听器,metadata发生变动的时候通知监听者
    for (Listener listener: listeners)
        listener.onMetadataUpdate(cluster);

    String previousClusterId = cluster.clusterResource().clusterId();
    //第一次进入,这里needMetadataForAllTopics在构造函数中赋值为false,进入else语句,
    //直接将前面初始化的cluster对象赋值给metadata的成员变量cluster
    if (this.needMetadataForAllTopics) {
        // the listener may change the interested topics, which could cause another metadata refresh.
        // If we have already fetched all topics, however, another fetch should be unnecessary.
        this.needUpdate = false;
        this.cluster = getClusterForCurrentTopics(cluster);
    } else {
        //赋值metadata的成员变量cluster
        this.cluster = cluster;
    }

    // The bootstrap cluster is guaranteed not to have any useful information
    if (!cluster.isBootstrapConfigured()) {
        String clusterId = cluster.clusterResource().clusterId();
        if (clusterId == null ? previousClusterId != null : !clusterId.equals(previousClusterId))
            log.info("Cluster ID: {}", cluster.clusterResource().clusterId());
        clusterResourceListeners.onUpdate(cluster.clusterResource());
    }
    //这里,如果有别的线程对当前类操作,并且处于等待状态,那么会被通知
    notifyAll();
    log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}   

 

这里最重要的就是将new出来的cluster对象直接赋值给metadata的cluster成员变量,所以此处的update方法并没有去broker集群通过网络拉取元数据,此时cluaster对象里的很多与topic相关的内容是空的。

(2)构造NetworkClient 。

NetworkClient的初始化主要是一些赋值及生成一些对象,NetworkClient主要负责异步的io请求和响应,并且不是线程安全的。这里会初始化一个DefaultMetadataUpdater组件,用于更新元数据

private NetworkClient(MetadataUpdater metadataUpdater,
                      Metadata metadata,
                      Selectable selector,
                      String clientId,
                      int maxInFlightRequestsPerConnection,
                      long reconnectBackoffMs,
                      int socketSendBuffer,
                      int socketReceiveBuffer,
                      int requestTimeoutMs,
                      Time time) {

    /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
     * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
     * super constructor is invoked.
     */
    //第一次,走这里,metadataUpdater 传过来的就是 null
    if (metadataUpdater == null) {
        if (metadata == null)
            throw new IllegalArgumentException("`metadata` must not be null");
        //1.初始化DefaultMetadataUpdater
        this.metadataUpdater = new DefaultMetadataUpdater(metadata);
    } else {
        this.metadataUpdater = metadataUpdater;
    }
    this.selector = selector;
    this.clientId = clientId;
    //这个类里面的集合记录了已经发送出去但是未收到服务端响应的request
    this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
    //记录当前producer与各个broker的连接状态
    this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs);
    this.socketSendBuffer = socketSendBuffer;
    this.socketReceiveBuffer = socketReceiveBuffer;
    this.correlation = 0;
    this.randOffset = new Random();
    this.requestTimeoutMs = requestTimeoutMs;
    this.time = time;
}

(3)开启KafkaThread线程

这里KafkaThread只是Sender线程的包装,并且上面的NetworkClient作为Sender的成员变量:

所以这里的关系是:KafkaThread包含了Sender,Sender包含了NetworkClient,NetworkClient是跟broker做实际的网络通信组件。

public KafkaThread(final String name, Runnable runnable, boolean daemon) {
    super(runnable, name);
    setDaemon(daemon);
    setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
        public void uncaughtException(Thread t, Throwable e) {
            log.error("Uncaught exception in " + name + ": ", e);
        }
    });
}

从sender线程的初始化可以看出send类的重要组件:网络通信组件,内存缓存,元数据

public Sender(KafkaClient client,
              Metadata metadata,
              RecordAccumulator accumulator,
              boolean guaranteeMessageOrder,
              int maxRequestSize,
              short acks,
              int retries,
              Metrics metrics,
              Time time,
              String clientId,
              int requestTimeout) {
    //网络通信组件
    this.client = client;
    //内存缓存
    this.accumulator = accumulator;
    //元数据
    this.metadata = metadata;
    this.guaranteeMessageOrder = guaranteeMessageOrder;
    this.maxRequestSize = maxRequestSize;
    this.running = true;
    this.acks = acks;
    this.retries = retries;
    this.time = time;
    this.clientId = clientId;
    //统计组件
    this.sensors = new SenderMetrics(metrics);
    this.requestTimeout = requestTimeout;
}

当真正执行线程的时候执行的还是Sender类的run方法:

public void run() {
    log.debug("Starting Kafka producer I/O thread.");

    // main loop, runs until close is called
    //running,volatile修饰,作为标志位的经典用法
    //这里死循环,不停的执行run方法
    while (running) {
        try {
            run(time.milliseconds());
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }
    //后面不重要
    }

执行run(time.milliseconds());这里执行逻辑比较多,但是第一次执行这个方法的时候,多数的数据结构都是空的,主要是执行最后一行代码:

void run(long now) {
    //第一次到这里,直接从metadata里获取cluster对象,此时的cluster对象是个空的 ,并没有去broker上拉取元数据信息
    Cluster cluster = metadata.fetch();
    // get the list of partitions with data ready to send
    //第一次来到这里,因为cluster的属性值多数是空的,所以并没有执行什么,以至于result里的数据结构都是空的
    RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

    // if there are any partitions whose leaders are not known yet, force metadata update
    if (!result.unknownLeaderTopics.isEmpty()) {//第一次会跳过这里
        // The set of topics with unknown leader contains topics with leader election pending as well as
        // topics which may have expired. Add the topic again to metadata to ensure it is included
        // and request metadata update, since there are messages to send to the topic.
        for (String topic : result.unknownLeaderTopics)
            this.metadata.add(topic);
        this.metadata.requestUpdate();
    }

    // remove any nodes we aren't ready to send to
    //第一次:result是空的,这里都会跳过
    Iterator<Node> iter = result.readyNodes.iterator();
    long notReadyTimeout = Long.MAX_VALUE;
    while (iter.hasNext()) {
        Node node = iter.next();
        if (!this.client.ready(node, now)) {
            iter.remove();
            notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
        }
    }

    // create produce requests
    //第一次:batches是空的
    Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
                                                                     result.readyNodes,
                                                                     this.maxRequestSize,
                                                                     now);
    if (guaranteeMessageOrder) {
        // Mute all the partitions drained
        for (List<RecordBatch> batchList : batches.values()) {
            for (RecordBatch batch : batchList)
                this.accumulator.mutePartition(batch.topicPartition);
        }
    }

    List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
    // update sensors
    for (RecordBatch expiredBatch : expiredBatches)
        this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

    sensors.updateProduceRequestMetrics(batches);
    List<ClientRequest> requests = createProduceRequests(batches, now);
    // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
    // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
    // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
    // with sendable data that aren't ready to send since they would cause busy looping.
    long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
    if (result.readyNodes.size() > 0) {
        log.trace("Nodes with data ready to send: {}", result.readyNodes);
        log.trace("Created {} produce requests: {}", requests.size(), requests);
        pollTimeout = 0;
    }
    for (ClientRequest request : requests)
        client.send(request, now);

    // if some partitions are already ready to be sent, the select time would be 0;
    // otherwise if some partition already has some data accumulated but not ready yet,
    // the select time will be the time difference between now and its linger expiry time;
    // otherwise the select time will be the time difference between now and the metadata expiry time;
    this.client.poll(pollTimeout, now);
}

后面的 流程比较复杂,总体的调用流程是:

run(long now)
	->this.client.poll(pollTimeout, now);
		->metadataUpdater.maybeUpdate(now);
			->maybeUpdate(now, node);
		->handleCompletedReceives(responses, updatedNow);
			->metadataUpdater.maybeHandleCompletedReceive(req, now, body)
				->handleResponse(req.request().header(), body, now);
					->this.metadata.update(cluster, now)

再次来到update方法,与上一次不同的是,这次已经真正从broker端获取到了cluster信息,会将这些信息更新给元数据。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐