最近公司的kafka集群出现了节点已经失效但是节点进程和端口都还在的情况,目前我们的系统监控只是做到了进程监控,即为整个集群的每台机群建立进程快照,如果进程(如NameNodekakfa broker)丢失,则报警并立刻自动重启进程。但是这次的kafka事故进程和端口都还在,因此报警系统没有能够及时报警,因此对此次事故发生的过程和解决方式做详细的分析。 首先,我们一个同学使用kafka的过程中发现消息无法消费,因此进入进群进行如下检查: 进程和端口:我们的kafka的3个broker,进程和端口都在,正常使用kakfa-console-producer进行消息的生产,抛出异常 使用kakfa-console-consumer进行消息的消费,抛出异常 使用kafka-topics --describe进行topic的详细情况的分析,发现,partition 和 Isr(In-Sync Replication)竟然只剩下一台机器 我们知道,kafka在创建topic的时候会指定partition数量和replication数量,对于每一个partition,都会有一个broker作为leader broker,剩余的broker作为slave broker。我们猜想在我们的代码中生产的消息应该已经丢失。因此进行验证。在紧急重启了假死的两台broker以后,我们开始对消息丢失情况进行验证,令人惊讶的是,没有发生消息丢失。但是,为了以防万一,无论消息是否丢失,我们都必须找到足够的证据。

我们的topic属性是2个partition、2个replication组成,当我们发现从这个topic消费消息发生异常的时候,我们打印了这个topic的描述信息:

[appuser@hz-10-120-241-50 kafka]$ bin/kafka-topics.sh --describe --topic wuchang1 --zookeeper 10.120.241.50:2181
Topic:wuchang1  PartitionCount:2        ReplicationFactor:2     Configs:
        Topic: wuchang1 Partition: 0    Leader: 110     Replicas: 110,50        Isr: 110
        Topic: wuchang1 Partition: 1    Leader: -1      Replicas: 50,82 Isr: 

这个信息其实是我们在测试环境复现出来的现场。我觉得,一个资深的软件工程师,非常注重对事故现场的复现,因为只有成功地复现问题,才能根本地解决问题。在正常情况下,3个broker工作正常,它的描述信息是这样的:

[appuser@hz-10-120-241-50 kafka]$ bin/kafka-topics.sh --describe --topic wuchang1 --zookeeper 10.120.241.50:2181
Topic:wuchang1  PartitionCount:2        ReplicationFactor:2     Configs:
        Topic: wuchang1 Partition: 0    Leader: 110     Replicas: 110,50        Isr: 110,50
        Topic: wuchang1 Partition: 1    Leader: 50      Replicas: 50,82 Isr: 50,82

为了让我们的kafka cluster能够容忍部分机器宕机,我们的生产环境和测试环境打开了leader 自动选举:auto.leader.rebalance.enable=true
这样,当任何一个TopicPartition的leader丢失,Controller会启动一个监控线程监控所有partition的Leader状态,如果发现某个Topic-Partition的leader丢失,则该线程会为该Leader启动重新选举,代码在KafkaController.scala中:

  def onControllerFailover() {
   //省略
      if (config.autoLeaderRebalanceEnable) {
        info("starting the partition rebalance scheduler")
        autoRebalanceScheduler.startup()
        autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
          5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
      }
      deleteTopicManager.start()
    }
    else
      info("Controller has been shut down, aborting startup/failover")
  }

方法checkAndTriggerPartitionRebalance()就是完成对所有Topic-Partition的leader检查,这个作为参数被定时调用,注意,Controller与Leader不同,Leader是针对某个Topic-Partition而言,而Controller是整个集群的Controller
为了能够在自动leader选举打开的情况下让某个Topic-Partition失去leader,我们将这个topic的partition 1对应的两个replication 全部kill,这样,即使自动leader检查打开,由于partition-1 已经不存在任何一个活着的replication,因此无从选举出一个leader,此时,这个partition已经不再工作,partition-0也只有仅剩的一个broker来作为leader。
我们当时在测试环境复现问题的时候,在自动leader选举打开的情况下,只要某个partition的replication中有一个还活着(即ISR中还有任何一个broker),这个broker就会被自动选举为leader。这就是Kafka高可用性的一个体现。只有当一个topic的全部replication全部丢失,这个kafka的这个Topic-Partitioin才会变为不可用状态。
反过来看,如果我们的topic的replication-factor设置为2,那么,在自动leader rebalance打开的情况下,任何两台broker丢失,都不会对任何partition造成影响,除非这个Topic-Partition的三个replication全部挂掉。

现在现场已经复现,我们就来验证这种情况下消息是否丢失。

我们线上环境生产消息的代码来源于nginx lua插件,用来将nginx收的的用户访问信息发送到kafka:

    topic = args["pbtype"]
    -- topic = "LivyRoomMsg"
    if (topic == "LivyRoomMsg") then
        msgJson = Convert_GjsWebLiveRoomWechatUserLogin(msgJson,args["messageSentTime"])
    end
    local ok, err = bp:send(topic, nil, msgJson)

从代码片段中我们可以看到,发送消息的时候,key为null。我们使用java代码,同样设置key为null,发送消息到我们测试环境的现场,的确消息未丢失,发送的所有消息都被打倒到了leader存在的partition上面了。
我们知道,Kafka通过key信息决定了消息发送到哪个broker,我们使用的是默认的Partitioner, Kafka默认的Partitioner是DefaultPartitioner,核心方法是partition():

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) { //从可用的partitioner中选择一个
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {//从所有集群中选择一个
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {// 只要有key , 就按照key去确定
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

  /**
     * topicCounterMap维护了每个topic的一个计数器,这个计数器用来通过Round-Robin方式选择一个partition
     * @param topic
     * @return
     */
    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(new Random().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

默认partitioner的消息分派逻辑是:

  1. 如果存在key,则通过Round-Robin的方式,从该topic的所有partition中选择一个分区,即,无论现在分区的状态如何,一旦key确定,对应的broker就确定了,到5;
  2. 如果key为空,则到3;
  3. 如果这个topic还存在可用的Partition(还存在leader的partition),则通过Round-Robin的方式,以这个topic递增的随机数作为种子,从这些可用的partition中选择一个partition,将消息发送到这个partition,否则到4;
  4. 如果这个topic没有任何一个可用的Partition,则通过Round-Robin的方式,以这个topic递增的随机数作为种子,从所有partition中选择一个partition发送消息。很显然,如果选择的partition不可用,消息发送失败;
  5. 退出

DefaultPartition 使用 topicCounterMap来维护每个topic用来通过Round-Robin方式选择partition的序列号,key是所有topic的名字,value是一个整数计数器,每次进行一次选择则自增1,保证所有partition被依次使用到。

在我们使用bin/kafka-console-producer.sh的命令行工具生产消息的时候,其实最终也是调用了。本文中,我并不急于让大家知道问题原因,而希望逐步拨云见日,让大家从代码层面循序渐渐,逐步接近问题真想。这样做,不仅仅能够找到问题原因,更能够学到知识,而不仅仅是确认了或者解决了一个问题。

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"

运行kakfa-console-producer.sh时,默认java进程堆内存大小为512M,当然,如果我们可以自行设置和修改。实际执行的,是ConsoleProducer类,显然,ConsoleProducer负责从命令行中读取我们输入的消息,然后生产到Kafka Server。看过$KAFKA_HOME/bin目录下面的脚本代码你就会知道,kafka的代码重用做得非常好,即使是脚本,也充分重用。$KAFKA_HOME/bin/kafka-run-class.sh是一个公共启动类,无论我们调用kakfa-console-producer.shkakfa-console-consumer.shkafka-topics.sh等等脚本,都是通过kafka-run-class.sh运行起来的,只需要告诉kafka-run-class.sh需要启动的java类以及额外的启动参数,kafka-run-class.sh就会运行这个java类,添加上这些额外的启动参数,以及一些共用的、必须的classpath。 因此,我们继续来看ConsoleProducer的实现,了解这个类的实现机制,直接关系到我们最常用的kafka-console-producer的命令的行为:

  def main(args: Array[String]) {

    try {
        val config = new ProducerConfig(args)
        val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
        reader.init(System.in, getReaderProps(config))

        val producer =
          if(config.useOldProducer) {
            new OldProducer(getOldProducerProps(config))
          } else {
            //NewShinyProducer只是对org.apache.kafka.clients.producer.KafkaProducer
            //进行了简单封装,底层还是用org.apache.kafka.clients.producer.KafkaProducer发送消息
            new NewShinyProducer(getNewProducerProps(config))
          }
        //省略
        var message: ProducerRecord[Array[Byte], Array[Byte]] = null
        do {
          //reader的默认实现类是LineMessageReader,一行一行读取用户在命令行中的输入
          message = reader.readMessage()
          if (message != null)
            //在没有特殊指定消息的key的情况下,key为空
            producer.send(message.topic, message.key, message.value)
        } while (message != null)
    } catch {
      //省略
    }
    Exit.exit(0)
  }

LineMessageReader是消息读取的实现类,用来读取我们在命令行中输入的Kafka消息:

  class LineMessageReader extends MessageReader {
    var topic: String = null
    var reader: BufferedReader = null
    var parseKey = false
    var keySeparator = "\t"
    var ignoreError = false
    var lineNumber = 0

    override def init(inputStream: InputStream, props: Properties) {
      topic = props.getProperty("topic")
      //如果需要指定key,则在kafka-console-producer中增加参数--property "parse.key=true",--property "key.separator=:"
      //用来告诉kafka是否使用key以及分割消息和key的分隔符
      if (props.containsKey("parse.key"))
        parseKey = props.getProperty("parse.key").trim.equalsIgnoreCase("true")
      if (props.containsKey("key.separator"))
        keySeparator = props.getProperty("key.separator")
      if (props.containsKey("ignore.error"))//是否忽略错误
        ignoreError = props.getProperty("ignore.error").trim.equalsIgnoreCase("true")
      reader = new BufferedReader(new InputStreamReader(inputStream))
    }

    override def readMessage() = {
      lineNumber += 1
      print(">")
      (reader.readLine(), parseKey) match {
        case (null, _) => null
        case (line, true) =>
          line.indexOf(keySeparator) match {
            case -1 => //在用户输入的消息中没有找到keySeparator
              if (ignoreError) new ProducerRecord(topic, line.getBytes)
              else throw new KafkaException(s"No key found on line $lineNumber: $line")
            case n => //找到keySeparator定义的字符,则提取消息体和key,组装成为ProducerRecord对象
              val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes
              new ProducerRecord(topic, line.substring(0, n).getBytes, value)
          }
        case (line, false) =>//用户没有打开parse.key功能,则设置key为null
          new ProducerRecord(topic, line.getBytes)
      }
    }
  }

LineMessageReader的主要职责是读取命令行中用户的输入,然后使用KafkaProducer把消息发送出去。我们可以通过参数parse.key=true以及key.separator=:来告诉kakfa我们会显式指定key。绝大多数情况下,除非有特殊需求,我们都不会使用如此繁复冗长的参数。因此,实际上,我使用如下命令进行消息的生产时,效果和我在代码中使用KafkaProducer进行消息的生产并将key设置为null的效果一样,Kafka都会通过使用DefaultPartitioner来进行消息的分派,由于key为null,将选择任何一个活着的broker,因此,虽然我们Kafka的某个topic的部分partition的leader丢失,消息却不会丢失。

其实,我们在使用Kafka过程中,我们会以为我们使用比如一个每次递增1的key,可以实现消息分派的负载均衡,即消息会几乎均匀地分布到所有的partition上面去。但是其实这样做可能会造成消息的丢失,更好的做法,就是直接不指定key,此时Kafka会帮助我们在所有或者的broker中选择一个进行消息分派,不会造成消息丢失,同时负载均衡Kafka也帮我们完成了。
Kafka的key的使用是用来满足定制化的分派规则而不是消息均匀分派,比如:
1. 我们希望这个topic的所有消息打到同一个partition,这时候我们可以指定一个不变的任意的key,根据DefaultPartitioner的实现,消息会固定打到某个partition;
2. 我们希望根据消息的内容完全定制化地控制这个消息对应的partition,这时候我们需要自己实现一个Partitioner。如果我们看过DefaultPartitioner的实现,那么实现自己的定制化的Partitioner就太简单了。

总体来说本文的这些代码难度不是很大,但是对于我们理解Kafka的运行机制从而正确地、毫无误解地使用Kafka非常有帮助。相比我在博客中介绍的Hadoop、Yarn的调度求、资源管理器代码,这段代码非常容易理解,但是也可以从中看到Kafka代码的优雅和规范,良好的接口定义带来良好的可扩展性。

Logo

Kafka开源项目指南提供详尽教程,助开发者掌握其架构、配置和使用,实现高效数据流管理和实时处理。它高性能、可扩展,适合日志收集和实时数据处理,通过持久化保障数据安全,是企业大数据生态系统的核心。

更多推荐