kafka 重试机制

重试源码

首先我们从KafkaProducer的send 方法入手我们看到其实客户端是不会直接发送数据的,而是将其加入到了一个缓存里面去,然后根据缓存的情况(result)判断要不要发送,发送也只是唤醒sender 对象

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
        serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
    this.sender.wakeup();
}
Sender

我们看到sender 是一个Runnable的实现类,那我们直接看它的run 方法,我们看到run 方法最后调用了一个叫做sendProducerData的方法来发送数据

long pollTimeout = sendProducerData(now);
client.poll(pollTimeout, now);

接下来就有这样的一个调用链sendProducerData ->sendProduceRequests ->handleProduceResponse->completeBatch->canRetry 这里就是我们的重试代码了

首先我们还是先看一下completeBatch 方法,我们看到只有在特定的错误原因下我们才去重试的,毕竟不是所有的重试都是有意义的,所以首先要求错误不能是未知错误Errors.NONE

private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                           long now, long throttleUntilTimeMs) {
    Errors error = response.error;

    if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() &&
            (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
        // If the batch is too large, we split the batch and send the split batches again. We do not decrement
        // the retry attempts in this case.
        log.warn(
            "Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
            correlationId,
            batch.topicPartition,
            this.retries - batch.attempts(),
            error);
        if (transactionManager != null)
            transactionManager.removeInFlightBatch(batch);
        this.accumulator.splitAndReenqueue(batch);
        this.accumulator.deallocate(batch);
        this.sensors.recordBatchSplit();
    } else if (error != Errors.NONE) {
        if (canRetry(batch, response, now)) {
            log.warn(
                "Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
                correlationId,
                batch.topicPartition,
                this.retries - batch.attempts() - 1,
                error);
            if (transactionManager == null) {
                reenqueueBatch(batch, now);
            } else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
                // If idempotence is enabled only retry the request if the current producer id is the same as
                // the producer id of the batch.
                log.debug("Retrying batch to topic-partition {}. ProducerId: {}; Sequence number : {}",
                        batch.topicPartition, batch.producerId(), batch.baseSequence());
              	// 重新加入队列`accumulator.reenqueue(batch, currentTimeMs);`,后面就会按照数据的发送流程重新发送
                reenqueueBatch(batch, now);
            } else {
                failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +
                        "batch but the producer id changed from " + batch.producerId() + " to " +
                        transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."), false);
            }
        } else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
          	// 这个错误其实是比较有意思的,就是我们的重试机制发出去了重复的消息,其实就是我们学习的幂等的SEQUENCE_NUMBER,所以我们认为这个错误是不需要重试的
            completeBatch(batch, response);
        } else {
            final RuntimeException exception;
            if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
                exception = new TopicAuthorizationException(batch.topicPartition.topic());
            else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
                exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
            else
                exception = error.exception();
            // tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust
            // its retries -- if it did, we don't know whether the sequence number was accepted or not, and
            // thus it is not safe to reassign the sequence.
            failBatch(batch, response, exception, batch.attempts() < this.retries);
        }
        if (error.exception() instanceof InvalidMetadataException) {
            if (error.exception() instanceof UnknownTopicOrPartitionException) {
                log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
                        "topic-partition may not exist or the user may not have Describe access to it",
                    batch.topicPartition);
            } else {
                log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " +
                        "to request metadata update now", batch.topicPartition, error.exception().toString());
            }
            metadata.requestUpdate();
        }
    } else {
        completeBatch(batch, response);
    }

    // Unmute the completed partition.
    if (guaranteeMessageOrder)
        this.accumulator.unmutePartition(batch.topicPartition, throttleUntilTimeMs);
}

接下来就是针对具体的错误原因进行处理了,有一个比较特殊的就是批次过大,也就是Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1,有一个批次分割的操作

reenqueueBatch 就是重新加入队列,也就是重试

completeBatch 就是我们认为当前批次的发送是成功的,不需要重试,例如DUPLICATE_SEQUENCE_NUMBER错误

failBatch 就是失败的批次,且不能进行尝试的。

1. 比较致命的错误
2. 达到了重试的限制次数,例如`failBatch(batch, response, exception, batch.attempts() < this.retries)`

我们看一下reenqueue 的代码实现,其实到现在我们还是没有将加入队列和发送消息关系串联起来,但是我们看到一个方法getOrCreateDeque,这个方法其实我们在发送消息的加入缓存的时候调用的一个方法

public void reenqueue(ProducerBatch batch, long now) {
    batch.reenqueued(now);
    Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);
    synchronized (deque) {
        if (transactionManager != null)
            insertInSequenceOrder(deque, batch);
        else
            deque.addFirst(batch);
    }
}

到这里我们的就大致明白了,KafkaProducer的发送方法是最终是调用了getOrCreateDeque将数据添加到了缓存,重试也是调用getOrCreateDeque将数据添加到了缓存,唯一不一样的地方是重试添加到了队列的头部,发送是添加到了队列的尾部

private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                     Callback callback, Deque<ProducerBatch> deque) {
    ProducerBatch last = deque.peekLast();
    if (last != null) {
        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
        if (future == null)
            last.closeForRecordAppends();
        else
            return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
    }
    return null;
}

下面我们看一下canRetry的实现,其实这个方法就是判断当前失败的批次能不能重试

private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response, long now) {
    return !batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now) &&
        batch.attempts() < this.retries &&
        !batch.isDone() &&
        ((response.error.exception() instanceof RetriableException) ||
            (transactionManager != null && transactionManager.canRetry(response, batch)));
}

这里限制条件挺多的,我们只关注两个,如果满足则可以重试

  1. 重试的次数
  2. 异常是不是可重试异常response.error.exception() instanceof RetriableException),下面就是全部的可重试异常

image-20210317103741837

当然这中间还有一个就是transactionManager.canRetry

幂等生产者的影响

其实我们看到在整个重试的过程中不不需要重新计算partition信息的,也就是说我们的partition信息只在客户端的doSend方法里面只计算一次的

image-20210317104300063

这就说明了retry会保证发送到同一个分区,如果不能保证的话,幂等就又会多了一个限制条件,那就是不可重试

全局幂等

一个幂等性的producer,只保证单分区的幂等性,而一个producer的消息会发给一个主题的多个分区,每个单分区都保证幂等性,其实就是实现了多分区的幂等性,也就是全局幂等。

总结

  1. 重试机制是数据可靠性的一种保障

  2. 由于重试过程中不会重新计算消息的分区信息,所以重试可以保证幂等性不会被破坏

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐