MetadataCache分析
MetadataCache 是broker用来缓存当前集群中所有分区状态的组件,他需要通过KafkaController通过向集群中的broker发送UpdateMetadataRequest请求来更新MetadataCache中缓存的数据,每一个broker在收到请求后会异步更新MetadataCache中的数据 一 核心字段brokerId:Int 对应的broker的id
MetadataCache 是broker用来缓存当前集群中所有分区状态的组件,他需要通过KafkaController通过向集群中的broker发送UpdateMeta
dataRequest请求来更新MetadataCache中缓存的数据,每一个broker在收到请求后会异步更新MetadataCache中的数据
一 核心字段
brokerId:Int 对应的broker的id
cache:mutable.Map[String, mutable.Map[Int,PartitionStateInfo]]() 用来缓存每一个分区的状态,PartitionStateInfo记录了AR集合,ISR集合,leader副本id.leaderEpoch和controllerEpoch
controllerId:Int 当前的controller
aliveBrokers:mutable.Map[Int, Broker]() 保存当前可用的broker信息
aliveNodes:mutable.Map[Int, collection.Map[SecurityProtocol,Node]] 保存当前可用的节点信息
二 重要方法
def maybeUpdateMetadataCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) {
replicaStateChangeLocksynchronized {
// 检查是否是过时的请求
if(updateMetadataRequest.controllerEpoch< controllerEpoch) {
val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %dfrom an " +
"oldcontroller %d with epoch %d. Latest known controller epoch is %d").format(localBrokerId,
correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch,
controllerEpoch)
stateChangeLogger.warn(stateControllerEpochErrorMessage)
throw new ControllerMovedException(stateControllerEpochErrorMessage)
} else {
// 更新MetadataCache
metadataCache.updateCache(correlationId, updateMetadataRequest)
controllerEpoch= updateMetadataRequest.controllerEpoch
}
}
}
def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest) {
inWriteLock(partitionMetadataLock) {
controllerId = updateMetadataRequest.controllerId match {
case id if id < 0 => None
case id => Some(id)
}
// 清空所有broker信息和节点信息,从updateMetadataRequest请求里的liveBrokers重新构建
aliveNodes.clear()
aliveBrokers.clear()
updateMetadataRequest.liveBrokers.asScala.foreach { broker =>
val nodes = new EnumMap[SecurityProtocol, Node](classOf[SecurityProtocol])
val endPoints = new EnumMap[SecurityProtocol, EndPoint](classOf[SecurityProtocol])
broker.endPoints.asScala.foreach { case (protocol, ep) =>
endPoints.put(protocol, EndPoint(ep.host, ep.port, protocol))
nodes.put(protocol, new Node(broker.id, ep.host, ep.port))
}
aliveBrokers(broker.id) = Broker(broker.id, endPoints.asScala, Option(broker.rack))
aliveNodes(broker.id) = nodes.asScala
}
// 根据UpdateMetadataRequest的partitionStates分区状态更新cache
updateMetadataRequest.partitionStates.asScala.foreach { case (tp, info) =>
val controllerId = updateMetadataRequest.controllerId
val controllerEpoch = updateMetadataRequest.controllerEpoch
if (info.leader == LeaderAndIsr.LeaderDuringDelete) {
removePartitionInfo(tp.topic, tp.partition)
stateChangeLogger.trace(s"Broker $brokerId deleted partition $tp from metadata cache in response to UpdateMetadata " +
s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
} else {
val partitionInfo = partitionStateToPartitionStateInfo(info)
// 更新PartitionStateInfo
addOrUpdatePartitionInfo(tp.topic, tp.partition, partitionInfo)
stateChangeLogger.trace(s"Broker $brokerId cached leader info $partitionInfo for partition $tp in response to " +
s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
}
}
}
}
在消费者和生产者使用到的Metadata对象缓存了Kafka集群元数据信息,Metadata在更新时要向服务器端发送MetadataRequest
,然后调用KafkaApis.handleTopicMetadataRequest方法处理,然后这个方法getTopicMetadata就需要从MetadataCache中查询
更多推荐
所有评论(0)