幂等生产者

  • 幂等”这个词原是数学领域中的概念,指的是某些操作或函数能够被执行多次,但每次得到的结果都是不变的
  • 在命令式编程语言(比如 C)中,若一个子程序是幂等的,那它必然不能修改系统状态。这样不管运行这个子程序多少次,与该子程序关联的那部分系统状态保持不变。
  • 在函数式编程语言(比如 Scala 或 Haskell)中,很多纯函数(pure function)天然就是幂等的,它们不执行任何的 side effect。
  • 幂等性有很多好处,其最大的优势在于我们可以安全地重试任何幂等性操作,反正它们也不会破坏我们的系统状态

背景

  • 在很多系统中消息重复是不被允许的,例如一些业务结算平台(如物流平台、银行结算平台等)
  • 为了解决重试导致的消息重复、乱序问题,kafka引入了幂等消息。幂等消息保证producer在一次会话内写入同一个partition内的消息具有幂等性,也就是说消息不会重复。
  • Kafka的幂等性其实就是将原来需要在下游系统中进行的去重操作放在了数据上游kafka 中。
至少一次语义

所谓的消息交付可靠性保障,是指 Kafka 对 Producer 和 Consumer 要处理的消息提供什么样的承诺。常见的承诺有以下三种:

  1. 最多一次(at most once):消息可能会丢失,但绝不会被重复发送。
  2. 至少一次(at least once):消息不会丢失,但有可能被重复发送。
  3. 精确一次(exactly once):消息不会丢失,也不会被重复发送。

Kafka 默认提供的交付可靠性保障是即至少一次,因为kafka 的producer 在消息发送失败(没有接收到kafka broker 的ACK信息)的时候则会进行重试,这就是kafka 为什么默认提供的是至少一次的交付语义,但是这样可能导致消息重复

Kafka 也可以提供最多一次交付保障,只需要让 Producer 禁止重试即可。这样一来,消息要么写入成功,要么写入失败,但绝不会重复发送。我们通常不会希望出现消息丢失的情况,但一些场景里偶发的消息丢失其实是被允许的,相反,消息重复是绝对要避免的。此时,使用最多一次交付保障就是最恰当的。

无论是至少一次还是最多一次,都不如精确一次来得有吸引力。大部分用户还是希望消息只会被交付一次,这样的话,消息既不会丢失,也不会被重复处理。或者说,即使 Producer 端重复发送了相同的消息,Broker 端也能做到自动去重。在下游 Consumer 看来,消息依然只有一条,而这就是我们今天要介绍的幂等性。

使用方法

  • producer 默认不是幂等性的,但我们可以创建幂等性 Producer。它其实是 0.11.0.0 版本引入的新功能。在此之前,Kafka 向分区发送数据时,可能会出现同一条消息被发送了多次,导致消息重复的情况。
  • 在 0.11 之后,指定 Producer 幂等性的方法很简单,仅需要设置一个参数即可,即 props.put(“enable.idempotence”, ture),或 props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true)。
  • enable.idempotence 被设置成 true 后,Producer 自动升级成幂等性 Producer,其他所有的代码逻辑都不需要改变
  • Prodcuer 幂等性对外保留的接口非常简单,其底层的实现对上层应用做了很好的封装,应用层并不需要去关心具体的实现细节,对用户非常友好。

原理

  • Kafka 自动帮你做消息的重复去重,底层具体的原理很简单,就是经典的用空间去换时间的优化思路,即在 Broker 端多保存一些字段。
  • 当 Producer 发送了具有相同字段值的消息后,Broker 能够自动知晓这些消息已经重复了,于是可以在后台默默地把它们“丢弃”掉。
具体实现
区分producer会话
  • producer每次启动后,首先向broker申请一个全局唯一的pid,用来标识本次会话,这个ProducerID对客户端使用者是不可见的
  • 重启之后标识producer的PID就变化了,broker就不认识,所以幂等性是只能在单次会话内的
Sequence Numbe

对于每个PID,该Producer发送数据的每个<Topic,Partition>都对应一个从0开始单调递增的Sequence Number,Broker端在缓存中保存了这seq number

对于接收的每条消息,如果其序号比Broker缓存中序号大于1则接受它,否则将其丢弃,这样就可以实现了消息重复提交了

但是只能保证单个Producer对于同一个<Topic,Partition>的Exactly Once语义

producer重试与消息检测
  • producer在收到明确的的消息丢失ack,或者超时后未收到ack,要进行重试。
  • 重试的时候sequence number不变,因为sequence number在第一次发送的时候已经确定了,重试只是重新发送
new_seq=old_seq+1: 正常消息;
new_seq<=old_seq : 重复消息;
new_seq>old_seq+1: 消息丢失;

使用范围

  • 生产者重启时PID 就会发生变化,同时不同的 分区(Partition)也具有不同的编号,所以生产者幂等性无法保证跨分区和跨会话的 Exactly Once。
单分区的特性
  • 首先,它只能保证单分区上的幂等性,即一个幂等性 Producer 能够保证某个主题的一个分区上不出现重复消息,它无法实现多个分区的幂等性。

  • 一个幂等性的producer,只保证单分区的幂等性,而producer的消息会发给一个主题的多个分区,这就是为什么不能保证整个Topic的幂等了。

  • 其实这里有一个问题值得思考,那就是为什么幂等性是针对单分区的

单会话的特性
  • 它只能实现单会话上的幂等性,不能实现跨会话的幂等性。这里的会话,你可以理解为 Producer 进程的一次运行。当你重启了 Producer 进程之后,这种幂等性保证就丧失了
原因
  • 重启之后标识producer的PID就变化了,broker就不认识了——这个是幂等性的另一个限制条件,无法实现夸会话的幂等性。
解决方案
  • 如果想实现多分区以及多会话上的消息无重复,就是事务(transaction)或者依赖事务型 Producer。
  • 这也是幂等性 Producer 和事务型 Producer 的最大区别!

源码

其实前面我们也说过一次,可以看看kafka的client 的源码,看client 的源码其实可以让你快速了解kafka 都有哪些东西可以用,server 端的源码可以让你了解原理,其实很多时候知道有什么样的工具,就能帮你解决很多问题了。

image-20210313082621298

其实prodcuer 端的很多东西我们都介绍过了,这里面的几个类我们都学过了

名称作用
Producer这是一个接口,我们使用的KafkaProducer就是继承自这个接口
KafkaProducer我们发送数据的时候使用的对象
Partitioner分区器的接口,我们也看过它的几个默认实现(DefaultPartitioner,RoundRobinPartitioner,UniformStickyPartitioner),你也可以自己继承这个接口写一个
ProducerConfig客户端配置类,我们在创建Properties的时候都是直接写的字符串,也可以使用这个配置类提供的常量,ProducerConfig.BOOTSTRAP_SERVERS_CONFIG和“bootstrap.servers”等价的
ProducerRecord我们发送的消息对象
RecordMetadata发送消息之后返回的对象,记录了消息发送的元信息
Callback回调接口,主要用在异步发送,这个接口kafka 也提供了一个默认实现ErrorLoggingCallback
ProducerInterceptor生产者拦截器

接下来我们看一下今天的几个主角,也就是internals 包下的几个类

image-20210313083837974

Sender

首先说一下为什么要说这个类呢,大家看一下下面的截图就知道了

image-20210313104057771

这是KafkaProducer里面doSend方法的段,也就是我们客户端调用的send方法,这个说明了一个问题,什么问题呢,那就是KafkaProducer的send 方法并不是直接将消息发送出去的,而是将消息追加到缓存区。

如果当前缓存区已写满或创建了一个新的缓存区,则唤醒 Sender(消息发送线程),将缓存区中的消息发送到 broker 服务器,最终返回 future。从这里也能得知,doSend 方法执行完成后,此时消息还不一定成功发送到 broker,因为还没有发送呢。

这里当缓存区满了之后则将sender 唤醒,进行消息发送,其实到这里我们应该能猜到sender 是个什么了,是个线程public class Sender implements Runnable

Producer Id

前面我们说PID是实现幂等的关键元素,我们下面看一下PID是怎么获得的,就是在Sender 的run 方法里面

void run(long now) {
    if (transactionManager != null) {
        try {
            if (transactionManager.shouldResetProducerStateAfterResolvingSequences())
                // Check if the previous run expired batches which requires a reset of the producer state.
                transactionManager.resetProducerId();
            if (!transactionManager.isTransactional()) {
                // this is an idempotent producer, so make sure we have a producer id 获取ProducerId
                maybeWaitForProducerId();
            } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
                transactionManager.transitionToFatalError(
                    new KafkaException("The client hasn't received acknowledgment for " +
                        "some previously sent messages and can no longer retry them. It isn't safe to continue."));
            } else if (transactionManager.hasInFlightTransactionalRequest() || maybeSendTransactionalRequest(now)) {
                // as long as there are outstanding transactional requests, we simply wait for them to return
                client.poll(retryBackoffMs, now);
                return;
            }

            // do not continue sending if the transaction manager is in a failed state or if there
            // is no producer id (for the idempotent case).
            if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
                RuntimeException lastError = transactionManager.lastError();
                if (lastError != null)
                    maybeAbortBatches(lastError);
                client.poll(retryBackoffMs, now);
                return;
            } else if (transactionManager.hasAbortableError()) {
                accumulator.abortUndrainedBatches(transactionManager.lastError());
            }
        } catch (AuthenticationException e) {
            // This is already logged as error, but propagated here to perform any clean ups.
            log.trace("Authentication exception while processing transactional request: {}", e);
            transactionManager.authenticationFailed(e);
        }
    }

    long pollTimeout = sendProducerData(now);
    client.poll(pollTimeout, now);
}

run 方法里面有一个maybeWaitForProducerId方法就是用来获取ProducerId的,这个名称真的是见名知意,我们也简单看一下这个方法

private void maybeWaitForProducerId() {
    while (!forceClose && !transactionManager.hasProducerId() && !transactionManager.hasError()) {
        Node node = null;
        try {
            node = awaitLeastLoadedNodeReady(requestTimeoutMs);
            if (node != null) {
                ClientResponse response = sendAndAwaitInitProducerIdRequest(node);
                InitProducerIdResponse initProducerIdResponse = (InitProducerIdResponse) response.responseBody();
                Errors error = initProducerIdResponse.error();
                if (error == Errors.NONE) {
                  	// 这个 ProducerId 是通过网络请求之后获得的
                    ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(
                            initProducerIdResponse.producerId(), initProducerIdResponse.epoch());
                    transactionManager.setProducerIdAndEpoch(producerIdAndEpoch);
                    return;
                } else if (error.exception() instanceof RetriableException) {
                    log.debug("Retriable error from InitProducerId response", error.message());
                } else {
                    transactionManager.transitionToFatalError(error.exception());
                    break;
                }
            } else {
                log.debug("Could not find an available broker to send InitProducerIdRequest to. Will back off and retry.");
            }
        } catch (UnsupportedVersionException e) {
            transactionManager.transitionToFatalError(e);
            break;
        } catch (IOException e) {
            log.debug("Broker {} disconnected while awaiting InitProducerId response", node, e);
        }
        log.trace("Retry InitProducerIdRequest in {}ms.", retryBackoffMs);
        time.sleep(retryBackoffMs);
        metadata.requestUpdate();
    }
}

当transactionManager 没有ProducerId()的时候才执行的

而且我们看到这InitProducerIdResponse使用response上获得的,所以我们认为这个ProducerId其实不是客户端生成的,而是服务端生成的。接下来我们看一下这个请求的方法sendAndAwaitInitProducerIdRequest

private ClientResponse sendAndAwaitInitProducerIdRequest(Node node) throws IOException {
    String nodeId = node.idString();
    InitProducerIdRequest.Builder builder = new InitProducerIdRequest.Builder(null);
    ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, requestTimeoutMs, null);
    return NetworkClientUtils.sendAndReceive(client, request, time);
}

可以看出它是通过node的信息,创建了一个ClientRequest 发送出去,更准确的是InitProducerIdRequest 然后获取返回后的response 中的ProducerId,如果你感兴趣的话,也可以看一下服务端是怎么处理这个请求的,下面是服务端的代码,scala 写的

def handleInitProducerIdRequest(request: RequestChannel.Request): Unit = {
  val initProducerIdRequest = request.body[InitProducerIdRequest]
  val transactionalId = initProducerIdRequest.transactionalId

  if (transactionalId != null) {
    // 权限校验
    if (!authorize(request.session, Write, new Resource(TransactionalId, transactionalId))) {
      sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)
      return
    }
  } else if (!authorize(request.session, IdempotentWrite, Resource.ClusterResource)) {
    sendErrorResponseMaybeThrottle(request, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)
    return
  }

  def sendResponseCallback(result: InitProducerIdResult): Unit = {
    def createResponse(requestThrottleMs: Int): AbstractResponse = {
      val responseBody = new InitProducerIdResponse(requestThrottleMs, result.error, result.producerId, result.producerEpoch)
      trace(s"Completed $transactionalId's InitProducerIdRequest with result $result from client ${request.header.clientId}.")
      responseBody
    }
    sendResponseMaybeThrottle(request, createResponse)
  }
  // 权限校验通过 成相应的了 pid,返回给 producer
  txnCoordinator.handleInitProducerId(transactionalId, initProducerIdRequest.transactionTimeoutMs, sendResponseCallback)
}

这里我们看到最终是通过Coordinator 生成的

image-20210313115815699

可以看到Server 在给一个 client 初始化 PID 时,实际上是通过 ProducerIdManager 的 generateProducerId() 方法产生一个 PID。

接下来其实就到了ZK 了,你要是感兴趣可以接着往下走走

def generateProducerId(): Long = {
  this synchronized {
    // grab a new block of producerIds if this block has been exhausted
    if (nextProducerId > currentProducerIdBlock.blockEndId) {
      getNewProducerIdBlock()
      nextProducerId = currentProducerIdBlock.blockStartId + 1
    } else {
      nextProducerId += 1
    }

    nextProducerId - 1
  }
}
ProducerBatch

为什么突然又冒出来这么一个类呢,前面我们提到KafkaProducer的send 方法其实只是将消息添加到了缓存之中,并没有真正的发送,我们知道发送是在Sender 的run 方法里面完成的,我们的Producer Id也是在run 方法的里面获取的

image-20210313121309352

我们这里将Sender 的run 方法的内容分为了两块,第一块执行一些初始化的操作,第二部分发送数据,我们接下来看一下这个方法的实现,做了一定的删减

    private long sendProducerData(long now) {
        Cluster cluster = metadata.fetch();

        // 获取准备好可以接受数据的partition 集合
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (!result.unknownLeaderTopics.isEmpty()) {
            // The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);
            this.metadata.requestUpdate();
        }

        // 从集合中去掉不能接收消息的节点
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // 创建发送数据的批量请求ProducerBatch
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
                this.maxRequestSize, now);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        sendProduceRequests(batches, now);

        return pollTimeout;
    }

accumulator 就是我们缓存消息的地方,这里我们看到,数据最终是封装到ProducerBatch里面进去发送的,接下来到了我们的主角了sequence numbe,我们知道sequence numbe应该是在ProducerBatch里面的,那应该就是在accumulator 的drain 方法里面实现的了,那我们就看一下这个代码的实现,代码有点长,我们只选取部分

image-20210313123102441

这个代码首先获取了ProducerIdAndEpoch 这个类的对象,这个对象就封装了ProducerId,然后我们就关注一下sequence number,我们看到这里有一个判断那就是!batch.hasSequence(),下面也写了一段注释,就是Sequence一旦生成不可改变。

接下来到看了关键之处了batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);我们知道batch对象应该有sequence number这样的一个属性,所以我们猜测这个属性就是在batch.setProducerState 方法中完成赋值的。

注意一下第二个参数transactionManager.sequenceNumber(batch.topicPartition),根据Partition 获取到了sequenceNumber,我们也可以看一下这个代码

/**
 * Returns the next sequence number to be written to the given TopicPartition.
 */
synchronized Integer sequenceNumber(TopicPartition topicPartition) {
    Integer currentSequenceNumber = nextSequence.get(topicPartition);
    if (currentSequenceNumber == null) {
        currentSequenceNumber = 0;
        nextSequence.put(topicPartition, currentSequenceNumber);
    }
    return currentSequenceNumber;
}

这里我们就完成了我们的获取,你看一下下面截图的地方,获取完成之后会调用自增的操作,来维持SequenceNumber的自增特性

image-20210313124043092

最后我们再看一下setProducerState的内容,我们看到setProducerState 方法之后,我们的批量数据就有了一个属性baseSequence,可以用于服务器端进行判断。

public void setProducerState(long producerId, short producerEpoch, int baseSequence, boolean isTransactional) {
    if (isClosed()) {
        // Sequence numbers are assigned when the batch is closed while the accumulator is being drained.
        // If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will
        // be re queued. In this case, we should not attempt to set the state again, since changing the producerId and sequence
        // once a batch has been sent to the broker risks introducing duplicates.
        throw new IllegalStateException("Trying to set producer state of an already closed batch. This indicates a bug on the client.");
    }
    this.producerId = producerId;
    this.producerEpoch = producerEpoch;
    this.baseSequence = baseSequence;
    this.isTransactional = isTransactional;
}
sequence number

有了 PID 之后,在 PID + Topic-Partition 级别上添加一个 sequence numbers 信息,就可以实现 Producer 的幂等性了。

ProducerBatch 也提供了一个 setProducerState() 方法,它可以给一个 batch 添加一些 meta 信息(pid、baseSequence、isTransactional),这些信息是会伴随着 ProduceRequest 发到 Server 端,Server 端也正是通过这些 meta 来做相应的判断,接下来我们看一下服务器端的处理,代码入口在KafkaApis 在这个类中(scala),我们找到对应处理请求的方法handleProduceRequest,然后我们看一下这个代码的实现。我们这里只截取了部分

image-20210313125710439

可以看到后面前面主要还是权限校验,后面则开始遍历处理数据

幂等为什么不支持跨会话和多分区

跨会话

不支持跨会话的原因是重启之后标识producer的PID就变化了,这就导致broker无法根据这个<PID,TP,SEQNUM>条件去去判断是否重复。

跨分区

我们知道在某一个partition 上判断是否重复是通过一个递增的sequence number,也就是说这个递增是针对当前特定分区的,如果你要是发送到其他分区上去了,那么递增关系就不存在了。

思考

  1. retry会保证发送到同一个分区吗?
  2. 什么情况下单分区的幂等能保证全局的幂等

总结

  1. 主要介绍了什么是幂等以及它的实现原理
  2. 从源码层面上分析了Producer Id 和 Sequence Number 的获取和它们的工作原理
  3. Sequence Number它不是和某一条消息进行绑定的,而是和一批消息进行绑定的。
  4. 幂等的不足之处不支持跨回话和跨分区,优点的话也很明显,使用简单
Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐