从Kafka的一次broker假死介绍Kafka架构和DefaultPartitioner
最近公司的kafka集群出现了节点已经失效但是节点进程和端口都还在的情况,目前我们的系统监控只是做到了进程监控,即为整个集群的每台机群建立进程快照,如果进程(如NameNode、kakfa broker)丢失,则报警并立刻自动重启进程。但是这次的kafka事故进程和端口都还在,因此报警系统没有能够及时报警,因此对此次事故发生的过程和解决方式做详细的分析。 首先,我们一个同学使用kafka的过程中发
最近公司的kafka集群出现了节点已经失效但是节点进程和端口都还在的情况,目前我们的系统监控只是做到了进程监控,即为整个集群的每台机群建立进程快照,如果进程(如NameNode、kakfa broker)丢失,则报警并立刻自动重启进程。但是这次的kafka事故进程和端口都还在,因此报警系统没有能够及时报警,因此对此次事故发生的过程和解决方式做详细的分析。 首先,我们一个同学使用kafka的过程中发现消息无法消费,因此进入进群进行如下检查: 进程和端口:我们的kafka的3个broker,进程和端口都在,正常使用kakfa-console-producer进行消息的生产,抛出异常 使用kakfa-console-consumer进行消息的消费,抛出异常 使用kafka-topics --describe进行topic的详细情况的分析,发现,partition 和 Isr(In-Sync Replication)竟然只剩下一台机器 我们知道,kafka在创建topic的时候会指定partition数量和replication数量,对于每一个partition,都会有一个broker作为leader broker,剩余的broker作为slave broker。我们猜想在我们的代码中生产的消息应该已经丢失。因此进行验证。在紧急重启了假死的两台broker以后,我们开始对消息丢失情况进行验证,令人惊讶的是,没有发生消息丢失。但是,为了以防万一,无论消息是否丢失,我们都必须找到足够的证据。
我们的topic属性是2个partition、2个replication组成,当我们发现从这个topic消费消息发生异常的时候,我们打印了这个topic的描述信息:
[appuser@hz-10-120-241-50 kafka]$ bin/kafka-topics.sh --describe --topic wuchang1 --zookeeper 10.120.241.50:2181
Topic:wuchang1 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: wuchang1 Partition: 0 Leader: 110 Replicas: 110,50 Isr: 110
Topic: wuchang1 Partition: 1 Leader: -1 Replicas: 50,82 Isr:
这个信息其实是我们在测试环境复现出来的现场。我觉得,一个资深的软件工程师,非常注重对事故现场的复现,因为只有成功地复现问题,才能根本地解决问题。在正常情况下,3个broker工作正常,它的描述信息是这样的:
[appuser@hz-10-120-241-50 kafka]$ bin/kafka-topics.sh --describe --topic wuchang1 --zookeeper 10.120.241.50:2181
Topic:wuchang1 PartitionCount:2 ReplicationFactor:2 Configs:
Topic: wuchang1 Partition: 0 Leader: 110 Replicas: 110,50 Isr: 110,50
Topic: wuchang1 Partition: 1 Leader: 50 Replicas: 50,82 Isr: 50,82
为了让我们的kafka cluster能够容忍部分机器宕机,我们的生产环境和测试环境打开了leader 自动选举:auto.leader.rebalance.enable=true
这样,当任何一个TopicPartition的leader丢失,Controller会启动一个监控线程监控所有partition的Leader状态,如果发现某个Topic-Partition的leader丢失,则该线程会为该Leader启动重新选举,代码在KafkaController.scala中:
def onControllerFailover() {
//省略
if (config.autoLeaderRebalanceEnable) {
info("starting the partition rebalance scheduler")
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
}
deleteTopicManager.start()
}
else
info("Controller has been shut down, aborting startup/failover")
}
方法checkAndTriggerPartitionRebalance()就是完成对所有Topic-Partition的leader检查,这个作为参数被定时调用,注意,Controller与Leader不同,Leader是针对某个Topic-Partition而言,而Controller是整个集群的Controller。
为了能够在自动leader选举打开的情况下让某个Topic-Partition失去leader,我们将这个topic的partition 1对应的两个replication 全部kill,这样,即使自动leader检查打开,由于partition-1 已经不存在任何一个活着的replication,因此无从选举出一个leader,此时,这个partition已经不再工作,partition-0也只有仅剩的一个broker来作为leader。
我们当时在测试环境复现问题的时候,在自动leader选举打开的情况下,只要某个partition的replication中有一个还活着(即ISR中还有任何一个broker),这个broker就会被自动选举为leader。这就是Kafka高可用性的一个体现。只有当一个topic的全部replication全部丢失,这个kafka的这个Topic-Partitioin才会变为不可用状态。
反过来看,如果我们的topic的replication-factor设置为2,那么,在自动leader rebalance打开的情况下,任何两台broker丢失,都不会对任何partition造成影响,除非这个Topic-Partition的三个replication全部挂掉。
现在现场已经复现,我们就来验证这种情况下消息是否丢失。
我们线上环境生产消息的代码来源于nginx lua插件,用来将nginx收的的用户访问信息发送到kafka:
topic = args["pbtype"]
-- topic = "LivyRoomMsg"
if (topic == "LivyRoomMsg") then
msgJson = Convert_GjsWebLiveRoomWechatUserLogin(msgJson,args["messageSentTime"])
end
local ok, err = bp:send(topic, nil, msgJson)
从代码片段中我们可以看到,发送消息的时候,key为null。我们使用java代码,同样设置key为null,发送消息到我们测试环境的现场,的确消息未丢失,发送的所有消息都被打倒到了leader存在的partition上面了。
我们知道,Kafka通过key信息决定了消息发送到哪个broker,我们使用的是默认的Partitioner, Kafka默认的Partitioner是DefaultPartitioner,核心方法是partition():
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) { //从可用的partitioner中选择一个
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {//从所有集群中选择一个
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {// 只要有key , 就按照key去确定
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
/**
* topicCounterMap维护了每个topic的一个计数器,这个计数器用来通过Round-Robin方式选择一个partition
* @param topic
* @return
*/
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(new Random().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
默认partitioner的消息分派逻辑是:
- 如果存在key,则通过
Round-Robin的方式,从该topic的所有partition中选择一个分区,即,无论现在分区的状态如何,一旦key确定,对应的broker就确定了,到5; - 如果key为空,则到3;
- 如果这个topic还存在可用的Partition(还存在leader的partition),则通过
Round-Robin的方式,以这个topic递增的随机数作为种子,从这些可用的partition中选择一个partition,将消息发送到这个partition,否则到4; - 如果这个topic没有任何一个可用的Partition,则通过
Round-Robin的方式,以这个topic递增的随机数作为种子,从所有partition中选择一个partition发送消息。很显然,如果选择的partition不可用,消息发送失败; - 退出
DefaultPartition 使用 topicCounterMap来维护每个topic用来通过Round-Robin方式选择partition的序列号,key是所有topic的名字,value是一个整数计数器,每次进行一次选择则自增1,保证所有partition被依次使用到。
在我们使用bin/kafka-console-producer.sh的命令行工具生产消息的时候,其实最终也是调用了。本文中,我并不急于让大家知道问题原因,而希望逐步拨云见日,让大家从代码层面循序渐渐,逐步接近问题真想。这样做,不仅仅能够找到问题原因,更能够学到知识,而不仅仅是确认了或者解决了一个问题。
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
运行kakfa-console-producer.sh时,默认java进程堆内存大小为512M,当然,如果我们可以自行设置和修改。实际执行的,是ConsoleProducer类,显然,ConsoleProducer负责从命令行中读取我们输入的消息,然后生产到Kafka Server。看过$KAFKA_HOME/bin目录下面的脚本代码你就会知道,kafka的代码重用做得非常好,即使是脚本,也充分重用。$KAFKA_HOME/bin/kafka-run-class.sh是一个公共启动类,无论我们调用kakfa-console-producer.sh、kakfa-console-consumer.sh、kafka-topics.sh等等脚本,都是通过kafka-run-class.sh运行起来的,只需要告诉kafka-run-class.sh需要启动的java类以及额外的启动参数,kafka-run-class.sh就会运行这个java类,添加上这些额外的启动参数,以及一些共用的、必须的classpath。 因此,我们继续来看ConsoleProducer的实现,了解这个类的实现机制,直接关系到我们最常用的kafka-console-producer的命令的行为:
def main(args: Array[String]) {
try {
val config = new ProducerConfig(args)
val reader = Class.forName(config.readerClass).newInstance().asInstanceOf[MessageReader]
reader.init(System.in, getReaderProps(config))
val producer =
if(config.useOldProducer) {
new OldProducer(getOldProducerProps(config))
} else {
//NewShinyProducer只是对org.apache.kafka.clients.producer.KafkaProducer
//进行了简单封装,底层还是用org.apache.kafka.clients.producer.KafkaProducer发送消息
new NewShinyProducer(getNewProducerProps(config))
}
//省略
var message: ProducerRecord[Array[Byte], Array[Byte]] = null
do {
//reader的默认实现类是LineMessageReader,一行一行读取用户在命令行中的输入
message = reader.readMessage()
if (message != null)
//在没有特殊指定消息的key的情况下,key为空
producer.send(message.topic, message.key, message.value)
} while (message != null)
} catch {
//省略
}
Exit.exit(0)
}
LineMessageReader是消息读取的实现类,用来读取我们在命令行中输入的Kafka消息:
class LineMessageReader extends MessageReader {
var topic: String = null
var reader: BufferedReader = null
var parseKey = false
var keySeparator = "\t"
var ignoreError = false
var lineNumber = 0
override def init(inputStream: InputStream, props: Properties) {
topic = props.getProperty("topic")
//如果需要指定key,则在kafka-console-producer中增加参数--property "parse.key=true",--property "key.separator=:"
//用来告诉kafka是否使用key以及分割消息和key的分隔符
if (props.containsKey("parse.key"))
parseKey = props.getProperty("parse.key").trim.equalsIgnoreCase("true")
if (props.containsKey("key.separator"))
keySeparator = props.getProperty("key.separator")
if (props.containsKey("ignore.error"))//是否忽略错误
ignoreError = props.getProperty("ignore.error").trim.equalsIgnoreCase("true")
reader = new BufferedReader(new InputStreamReader(inputStream))
}
override def readMessage() = {
lineNumber += 1
print(">")
(reader.readLine(), parseKey) match {
case (null, _) => null
case (line, true) =>
line.indexOf(keySeparator) match {
case -1 => //在用户输入的消息中没有找到keySeparator
if (ignoreError) new ProducerRecord(topic, line.getBytes)
else throw new KafkaException(s"No key found on line $lineNumber: $line")
case n => //找到keySeparator定义的字符,则提取消息体和key,组装成为ProducerRecord对象
val value = (if (n + keySeparator.size > line.size) "" else line.substring(n + keySeparator.size)).getBytes
new ProducerRecord(topic, line.substring(0, n).getBytes, value)
}
case (line, false) =>//用户没有打开parse.key功能,则设置key为null
new ProducerRecord(topic, line.getBytes)
}
}
}
LineMessageReader的主要职责是读取命令行中用户的输入,然后使用KafkaProducer把消息发送出去。我们可以通过参数parse.key=true以及key.separator=:来告诉kakfa我们会显式指定key。绝大多数情况下,除非有特殊需求,我们都不会使用如此繁复冗长的参数。因此,实际上,我使用如下命令进行消息的生产时,效果和我在代码中使用KafkaProducer进行消息的生产并将key设置为null的效果一样,Kafka都会通过使用DefaultPartitioner来进行消息的分派,由于key为null,将选择任何一个活着的broker,因此,虽然我们Kafka的某个topic的部分partition的leader丢失,消息却不会丢失。
其实,我们在使用Kafka过程中,我们会以为我们使用比如一个每次递增1的key,可以实现消息分派的负载均衡,即消息会几乎均匀地分布到所有的partition上面去。但是其实这样做可能会造成消息的丢失,更好的做法,就是直接不指定key,此时Kafka会帮助我们在所有或者的broker中选择一个进行消息分派,不会造成消息丢失,同时负载均衡Kafka也帮我们完成了。
Kafka的key的使用是用来满足定制化的分派规则而不是消息均匀分派,比如:
1. 我们希望这个topic的所有消息打到同一个partition,这时候我们可以指定一个不变的任意的key,根据DefaultPartitioner的实现,消息会固定打到某个partition;
2. 我们希望根据消息的内容完全定制化地控制这个消息对应的partition,这时候我们需要自己实现一个Partitioner。如果我们看过DefaultPartitioner的实现,那么实现自己的定制化的Partitioner就太简单了。
总体来说本文的这些代码难度不是很大,但是对于我们理解Kafka的运行机制从而正确地、毫无误解地使用Kafka非常有帮助。相比我在博客中介绍的Hadoop、Yarn的调度求、资源管理器代码,这段代码非常容易理解,但是也可以从中看到Kafka代码的优雅和规范,良好的接口定义带来良好的可扩展性。
更多推荐


所有评论(0)