1:生产者全流程介绍

废话少说,直接上总结。

1.1:使用类介绍

主要类间的使用关系

1:KafkaProducer类,详细源码解析见:(4.1)kafka生产者源码——KafkaProducer类
作用:用于发送数据而提供的kafka 客户端,进行发送数据前的,集群连接配置,网络连接配置,用于向RecordAccumulator写数据。
2:RecordAccumulator消息累加器,用于数据缓存,内存管理,源码详情 (4.2)kafka生产者源码——RecordAccumulator类

  • 2.1:BufferPool 内存管理器,缓冲池,使用分配和是否空间
  • 2.2:ProducerBatch 队列中一批次消息,可以包含多条消息

3:Sender独立于KafkaProducer的线程,负责发送RecordAccumulator数据前的准备工作,创建网络io请求,操作网络io层NetworkClient。源码详见 (4.4)kafka生产者源码——Sender线程类
4:NetworkClient是封装的一层网络客户端,用于生产者和消费者发送请求和获取响应,调用网络实现层Selector。源码详见 kafka网络通信客户端-NetworkClient类
5:Selector,该类是 Kafka 网络层最重要最核心的实现,也是非常经典的工业级通信框架实现,用于连接服务端,处理请求。源码详见 (3.4)broker源码——Kafka 网络层实现机制之 Selector 多路复用器

1.2:生产者源码运行流程图

在这里插入图片描述
kafka生产者流程图

下面进行详细分析。

1.3:生产者源码解析

1:开始生产数据demo详见生产者demo

 KafkaProducer.send(new ProducerRecord<>(topic, messageNo,messageStr))

初始化KafkaProducer,调用其send方法

   private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;/*记录topic-分区数*/
        try {
            // first make sure the metadata for the topic is available
            ClusterAndWaitTime clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
            long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;
            byte[] serializedKey;/*数据序列化,转为 byte[] */
            try {
                serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in key.serializer", cce);
            }
            //value的序列化
            byte[] serializedValue;
            try {
                serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException cce) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                        " specified in value.serializer", cce);
            }
//1.1 对要发送的数据计算其要存储的topic-partition
            int partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);

            setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();
            /*序列化后的消息大小 */
            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                    compressionType, serializedKey, serializedValue, headers);
            /*消息大小合格校验,单条默认最大不能超过1m,也不能超过缓冲区大小32M*/
            ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
            // producer callback will make sure to call both 'callback' and interceptor callback生产者回调将确保同时调用“回调”和拦截器回调
            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

            if (transactionManager != null && transactionManager.isTransactional())
                transactionManager.maybeAddPartitionToTransaction(tp);
//1.2:向RecordAccumulator累加器内存缓冲区中的batch添加数据
            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                    serializedValue, headers, interceptCallback, remainingWaitMs);

//1.3:对累加器中的数据进行判断,是否达到batch发送的条件,符合发送要求,唤醒sender线程发送
            /*也就是说处理消息和发送是两个分开的线程,主线程构造数据和更新回调函数FutureRecordMetadata。
            *sender采用异步发送+ 获取FutureRecordMetadata回调函数确保数据发送的状态*/
            if (result.batchIsFull || result.newBatchCreated) {
                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }
            return result.future;
		}
    }

1.1:调用分区器 分区器分析文档,计算分区,没有指定key时默认采用轮询方式进行发送数据,所以指定key时可能发生数据倾斜现象。
1.2:进入RecordAccumulator累加器,对缓存区大小,batch批数据进行管理。
详见文档 (4.2)kafka生产者源码——RecordAccumulator类

append向 private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches; 队列写入数据

   public RecordAppendResult append(TopicPartition tp,
                                     long timestamp,
                                     byte[] key,
                                     byte[] value,
                                     Header[] headers,
                                     Callback callback,
                                     long maxTimeToBlock) throws InterruptedException {
        // We keep track of the number of appending thread to make sure we do not miss batches in
        // abortIncompleteBatches().
        appendsInProgress.incrementAndGet();
        ByteBuffer buffer = null;/*生产者是多线程的,所以下面采用加锁synchronized和双重追加数据检查机制tryAppend处理dq*/
        if (headers == null) headers = Record.EMPTY_HEADERS;
        try {
            // check if we have an in-progress batch检查该topic分区是否有正进行的批次,获取该分区对应的Deque队列,每个队列中可能存在着多批次未发送的数据
            Deque<ProducerBatch> dq = getOrCreateDeque(tp);
            /*只有一个线程进入*/
            synchronized (dq)
            {/*通过加锁给dq批次追加数据,防止多线程不安全*/
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");

                /*情况1:如果写入分区对应的没有任何一个批次存在,说明需要先创建批次,添加失败appendResult=null*/
                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null)
                    return appendResult;
            }

            //情况2:写入的分区没有正在进行的记录批处理,尝试创建的批处理追加数据

            byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
            /*取批次(默认16k)设置的值和消息大小(默认1M)取最大值,所以两者一般需要调优*/
            int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
            log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
            /*分配内存:取16k和消息大小的最大值*/
            buffer = free.allocate(size, maxTimeToBlock);
            synchronized (dq) {
                // Need to check if producer is closed again after grabbing the dequeue lock.
                if (closed)
                    throw new IllegalStateException("Cannot send after the producer is closed.");

                RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
                if (appendResult != null) {
                    // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
                    return appendResult;
                }

                /*下面三层封装进行了一些属性的分层解析,没有一次定义多次属性,解耦了*/

                /*将分配的内存空间buffer封装为MemoryRecordsBuilder,用于写入record记录*/
                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                /*初始化ProducerBatch,初始化内存空间大小=。时对MemoryRecordsBuilder封装*/
                ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                /*batch.tryAppend:消息如何按照二进制的规范写入MemoryRecordsBuilder,再写到ProducerBatch*/
                FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));

                /*封装后的batch,写入队列*/
                dq.addLast(batch);
                incomplete.add(batch);

                //使用完毕释放掉buffer,进入bufferpool循环使用,不要在final块中取消分配此缓冲区,因为它正在记录批处理中使用
                buffer = null;
                    //更新是否批的状态,检查是否需要发送
                return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
            }
        } finally {
         //使用完毕释放掉buffer,进入bufferpool循环使用
            if (buffer != null)
                free.deallocate(buffer);
            appendsInProgress.decrementAndGet();
        }
    }
    
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                         Callback callback, Deque<ProducerBatch> deque) {
        /*取出未发送批次中的最新批次的数据*/
        ProducerBatch last = deque.peekLast();
        if (last != null) {
            FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
            if (future == null)
                last.closeForRecordAppends();
            else
                return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
        }
        return null;
    }

1.3:队列中接入数据后,就可以进行判断是否可以发送数据了。进入sender线程,先汇总数据以broker-tp汇总batch;创建clientrequest

   void run(long now) {
        if (transactionManager != null) {/*对事务管理器的一些判断,默认不开启,为null*/
            try {
                if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
                    // Check if the previous run expired batches which requires a reset of the producer state.
                    transactionManager.resetProducerId();

                if (!transactionManager.isTransactional()) {
                    // this is an idempotent producer, so make sure we have a producer id
                    maybeWaitForProducerId();
                } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
                    transactionManager.transitionToFatalError(new KafkaException("The client hasn't received acknowledgment for " +
                            "some previously sent messages and can no longer retry them. It isn't safe to continue."));
                } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
                    // as long as there are outstanding transactional requests, we simply wait for them to return
                    client.poll(retryBackoffMs, now);
                    return;
                }

                // do not continue sending if the transaction manager is in a failed state or if there
                // is no producer id (for the idempotent case).
                if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
                    RuntimeException lastError = transactionManager.lastError();
                    if (lastError != null)
                        maybeAbortBatches(lastError);
                    client.poll(retryBackoffMs, now);
                    return;
                } else if (transactionManager.hasAbortableError()) {
                    accumulator.abortUndrainedBatches(transactionManager.lastError());
                }
            } catch (AuthenticationException e) {
                // This is already logged as error, but propagated here to perform any clean ups.
                log.trace("Authentication exception while processing transactional request: {}", e);
                transactionManager.authenticationFailed(e);
            }
        }
        /*重点在sendProducerData中*/
        long pollTimeout = sendProducerData(now);
        client.poll(pollTimeout, now);
    }

    /*发送请求和构建请求的实现*/
    private long sendProducerData(long now) {
        /*更新元数据,topic->partitions->partition leader->isr*/
        Cluster cluster = metadata.fetch();

        // get the list of partitions with data ready to send
        /*遍历所有的batch,判断可以发送的batch和获取可以发送batch对应的partition leader*/
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (!result.unknownLeaderTopics.isEmpty()) {
            // The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
            this.metadata.requestUpdate();
        }

        // remove any nodes we  are not ready to send to移除没有准备好发送数据的节点
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();

            /*检查客户端broker是否准备好,能否连接到node节点*/
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // batch是否需要被发送算法实现
        // create produce requests创建发送请求,把发往同一个broker的所有batch都放在一起,得到batches,减小网络io和请求次数
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
                this.maxRequestSize, now);
        /*消息有序性,默认true,也就是所有分区都会保证*/
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained将所有的分区加到标识中,标识该分区有正在处理的批次。
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }
        /*过期的批次*/
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
        // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
        // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
        // we need to reset the producer id here.
        if (!expiredBatches.isEmpty())
            log.trace("Expired {} batches in accumulator", expiredBatches.size());
        for (ProducerBatch expiredBatch : expiredBatches) {
            failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException(), false);
            if (transactionManager != null && expiredBatch.inRetry()) {
                // This ensures that no new batches are drained until the current in flight batches are fully resolved.
                transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
            }
        }

        sensors.updateProduceRequestMetrics(batches);

        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (!result.readyNodes.isEmpty()) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            // if some partitions are already ready to be sent, the select time would be 0;
            // otherwise if some partition already has some data accumulated but not ready yet,
            // the select time will be the time difference between now and its linger expiry time;
            // otherwise the select time will be the time difference between now and the metadata expiry time;
            pollTimeout = 0;
        }
        /*开始发送请求,会在里面构建clientRequest请求*/
        sendProduceRequests(batches, now);

        return pollTimeout;
    }

1.6:sender线程构建clientRequest完成,调用networkClient进行send发送到selector复用器等待处理请求。此处详见 (3.4)broker源码——Kafka 网络层实现机制之 Selector 多路复用器

 public void send(Send send) {
        // 1. 从请求中获取 connectionId
        String connectionId = send.destination();
        // 2. 从数据包中获取对应连接
        KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
        // 3. 如果关闭连接集合中存在该连接
        if (closingChannels.containsKey(connectionId)) {
            // ensure notification via `disconnected`, leave channel in the state in which closing was triggered
            // 把 connectionId 放入 failedSends 集合里
            this.failedSends.add(connectionId);
        } else {
            try {
                // 4. 将send封装到KafkaChannel中。暂存数据预发送,并没有真正的发送,一次只能发送一个
                channel.setSend(send);
            } catch (Exception e) {
                // update the state for consistency, the channel will be discarded after `close`
                // 5. 更新 KafkaChannel 的状态为发送失败
                channel.state(ChannelState.FAILED_SEND);
                // ensure notification via `disconnected` when `failedSends` are processed in the next poll
                // 6. 把 connectionId 放入 failedSends 集合里
                this.failedSends.add(connectionId);
                // 7. 关闭连接
                close(channel, CloseMode.DISCARD_NO_NOTIFY);
                if (!(e instanceof CancelledKeyException)) {
                    log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}",
                            connectionId, e);
                    throw e;
                }
            }
        }
    }

将send请求封装到KafkaChannel.send属性上,并新增监听OP_WRITE事件,此时算请求发送完成,重新回到send.run()中开始处理请求进入到 client.poll

    /* 1:重点在sendProducerData中,此时会汇总batch,寻找需要发送的partition leader,构建clientRequest到Selector中*/
        long pollTimeout = sendProducerData(now);
        /* 2:请求构建完成,这里会调用java.nio.Selector.select()方法取处理我们构建的clientRequest生产请求
        client.poll(pollTimeout, now);

2:此时就简单了触发写事件,写完移除请求和写事件即可

1.4:从源码理解生产者配置参数

  • max.request.size:定义了sender线程向服务器端发送数据请求时一个请求的最大字节数,一个请求中可以包含多个batch;同时还定义了生产者端可发送一条消息的最大大小。
  • buffer.memory:生产客户端默认分配用于缓存batch的最大内存大小,会创建一个bufferpool缓冲池默认16M,里面包含了多个batch.size大小的byteBuffer
  • batch.size:创建一个缓冲区的大小,可以包含多条消息,提升性能。
  • acks:保证生产的可靠性,决定等待partition的多少副本写入成功
  • linger.ms:batch被创建多久必须被发送出去,默认是0,一般对于生产端数据都是比较小的消息,设置可提升集群吞吐量,减少集群请求压力
  • max.block.ms:send发送数据是缓冲区buffer.memory满的情况下等待处理发送数据的时长,默认阻塞60s
  • key.serializer:消息中key的序列化方式
  • partitioner.class:消息的分区器,默认无key是轮询发送的,也可以自定义
  • compression.type:消息的压缩方式

1.5:从源码分析生产时常见的异常

1:Attempt to allocate bytes, but there is a hard limit of

从源码可知道,是申请的内存大小超过了总缓存空间大小。
涉及参数batch.size(批大小)、buffer.memory(总缓存空间大小),消息实际大小。传入的size取batch.size和消息实际大小的最大值。
解决:batch.size设置不能超过buffer.memory;消息中有超过buffer.memory大小的数据。

    public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
        //要申请的内存大小大于总缓存空间抛出异常
        if (size > this.totalMemory)
            throw new IllegalArgumentException("Attempt to allocate " + size
                                               + " bytes, but there is a hard limit of "
                                               + this.totalMemory
                                               + " on memory allocations.");
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐