1.kafka源码分析
kafka 源码
当我们使用producer client发送消息之前,需要创建该对象:
Producer<String, String> producer = new KafkaProducer<>(props);
所以从这里开始,首先,该类的注释一定要看,里面包含大量的最佳实践,大概的内容有如下几点:
(1)kafka producer是线程安全的。
(2)不要使用多个实例,多个线程共享同一个producer的实例就可以了。
(3)producer包含了一个内存缓冲池,这个缓冲中包含了一些要发送的数据,producer负责将这些数据转成request发送出去,关闭producer失败会导致数据丢失。
(4)send方法是个异步操作,执行会将数据写入本地缓存,会立即返回,但是这不意味着数据已经发送到broker。
(5)buffer.memory用来控制使用的最大缓存,如果生成的速度比较快导致缓存满了,那么会阻塞max.block.ms这么长时间,如果这么长时间过去之后还是block,则抛出timeout异常。
(6)还有acks,retries等参数的配置。
接着就是构造函数的执行:
private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
try {
log.trace("Starting the Kafka producer");
//用户设置的参数
Map<String, Object> userProvidedConfigs = config.originals();
this.producerConfig = config;
this.time = new SystemTime();
//1.生成client.id配置,如果没有配置则使用"producer-{id}"这样的自动生成的id,是个自增长的数量
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
Map<String, String> metricTags = new LinkedHashMap<String, String>();
metricTags.put("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
reporters.add(new JmxReporter(JMX_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time);
//2.创建分区器,用来决定发送的消息路由到topic的哪个分区。PARTITIONER_CLASS_CONFIG:partitioner.class,设置分区器,如果没有指定则使用默认的
this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
//retry.backoff.ms,生产端发送失败会重试,失败后重新发送请求的间隔时间,防止短时间内重复多次发送请求
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
//使用用户设置的key.serializer
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.keySerializer.configure(config.originals(), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
}
//使用用户设置的value.serializer
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(config.originals(), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
}
// load interceptors and make sure they get clientId
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
//3.自定义拦截器,很少使用
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
//4.元数据,代表了集群目前的状态,以及每个partition及leader情况,如果发现写入的topic对应的元数据不再本地,就会去broker上拉取元数据信息
//metadata.fetch.timeout.ms:超时时间,默认1分钟,默认每隔五分钟强制刷新
this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
//一次request请求最大的大小,一个请求中会包含
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
//producer端总的内存大小,这个内存不能设置的跟jvm内存一样大,因为jvm的内存还要做数据压缩,inflight数据等使用,默认32M大小
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
//压缩类型,默认不压缩,可以使用gzip,snappy,lz4 这几种压缩方式
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
/* check for user defined settings.
* If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
* This should be removed with release 0.9 when the deprecated configs are removed.
*/
//在缓存区满了之后是否block,默认是false,超过一段时间后就会抛出异常,默认block一分钟
if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
if (blockOnBufferFull) {
this.maxBlockTimeMs = Long.MAX_VALUE;
} else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
} else {
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
}
} else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
"Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
} else {
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
}
/* check for user defined settings.
* If the TIME_OUT config is set use that for request timeout.
* This should be removed with release 0.9
*/
//REQUEST_TIMEOUT_MS_CONFIG:请求的超时时间,默认30s,超过时间则重试,重试次数耗尽则报错
if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
} else {
this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
}
//5.数据缓存核心组件,
// BATCH_SIZE_CONFIG:batch.size,每批数据量的大小,一个batch包含多个record。默认16384 bytes = 16KB
// 一次请求Request对应一个broker,request会包含多个batch,每个batch对应了这个broker上的一个partition,
// batch太小会导致频繁发送request,导致网络通信次数变多,导致吞吐量变小。如果设置成0.那就不打包,来一条发一条。
// 如果batch过大则内存里会缓存大量的batch,浪费内存。
// LINGER_MS_CONFIG:linger.ms,默认是0。如果到了这个时间还没有到达batch大小,到了linger时间也会发送出去。
this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.totalMemorySize,
this.compressionType,
config.getLong(ProducerConfig.LINGER_MS_CONFIG),
retryBackoffMs,
metrics,
time);
//BOOTSTRAP_SERVERS_CONFIG:bootstrap.servers,broker的地址列表,由用户编写的程序提供
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
//6.这里并没有更新元数据
this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
//创建KafkaChannel
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values());
//初始化NetworkClient。后面的通信底层都是基于这个类
//CONNECTIONS_MAX_IDLE_MS_CONFIG:最大网络连接空闲时间就要被回收掉。默认9分钟。
//MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:max.in.flight.requests.per.connection,每个连接最多有多少个没收到响应就停止发送数据
//RECONNECT_BACKOFF_MS_CONFIG:reconnect.backoff.ms,重试间隔时间
//SEND_BUFFER_CONFIG:send.buffer.bytes,tcp网络连接发送缓冲区大小。默认128K
//RECEIVE_BUFFER_CONFIG:receive.buffer.bytes,tcp网络连接接收缓冲区大小。默认32K
NetworkClient client = new NetworkClient(
new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder),
this.metadata,
clientId,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
this.requestTimeoutMs, time);
//sender是个线程,发送数据的线程
//ACKS_CONFIG:acks.0表示写入本地缓存就算成功,1表示写入了leader。all表示isr列表里全部写成功。
this.sender = new Sender(client,
this.metadata,
this.accumulator,
config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
(short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
config.getInt(ProducerConfig.RETRIES_CONFIG),
this.metrics,
new SystemTime(),
clientId,
this.requestTimeoutMs);
//启动包装了sender的KafkaThread线程
String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
this.errors = this.metrics.sensor("errors");
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed
// this is to prevent resource leak. see KAFKA-2121
close(0, TimeUnit.MILLISECONDS, true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
producer主要构造了如下重要的组件:
1.clientId的构造,这里如果使用kafka的限流功能,需要手动指定这个id
2.指定分区器,来指定数据发送到哪个分区。
3.指定kv序列化器。
4.设置retry,acks,超时时间,缓冲区大小,批大小等参数。
5.初始化元数据。
6.设置压缩。
7.构造NetworkClient 。
8.构造KafkaThread,该thread包装了send线程。
这里重点分析如下三块:
(1)初始化元数据及update方法。
(2)构造NetworkClient 。
(3)开启KafkaThread线程
(1)初始化元数据及update方法。
metadata在new出来后,this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds());实际上并没有更新元数据信息。metadata类中最重要的就是Cluaster属性,cluster中主要变量:
private final boolean isBootstrapConfigured;
//node是对kafka一台机器的封装,包含host,port,id等信息
private final List<Node> nodes;
private final Set<String> unauthorizedTopics;
private final Set<String> internalTopics;
//集群所有的partition
private final Map<TopicPartition, PartitionInfo> partitionsByTopicPartition;
//一个topic中所有的partition信息
private final Map<String, List<PartitionInfo>> partitionsByTopic;
//可用的topic集合
private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
//每个node上有哪些partition
private final Map<Integer, List<PartitionInfo>> partitionsByNode;
//每个nodeId与node的对应关系
private final Map<Integer, Node> nodesById;
private final ClusterResource clusterResource;
这里面还有个重要的类:PartitionInfo,主要包含如下变量,比较重要的是ISR列表:inSyncReplicas的概念,里面包含了leader本身已经与leader保持同步的follower的信息。
public class PartitionInfo {
private final String topic;
private final int partition;
//leader
private final Node leader;
//副本
private final Node[] replicas;
//ISR列表
private final Node[] inSyncReplicas;
Cluster.bootstrap方法:通过bootstrap方法创建一个Cluster,创建一个cluster后返回:
public static Cluster bootstrap(List<InetSocketAddress> addresses) {
List<Node> nodes = new ArrayList<>();
int nodeId = -1;
for (InetSocketAddress address : addresses)
nodes.add(new Node(nodeId--, address.getHostString(), address.getPort()));
return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), Collections.<String>emptySet(), Collections.<String>emptySet());
}
这里根据用户代码的配置生成了Node,每个node代表了一台kafka服务端的机器,多个node对象及其他一些topic信息组成Cluster对象,但是此时一些topic信息是空的,只是做了初始化。
再来分析update方法:
public synchronized void update(Cluster cluster, long now) {
Objects.requireNonNull(cluster, "cluster should not be null");
this.needUpdate = false;
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.version += 1;
if (topicExpiryEnabled) {
// Handle expiry of topics from the metadata refresh set.
for (Iterator<Map.Entry<String, Long>> it = topics.entrySet().iterator(); it.hasNext(); ) {
Map.Entry<String, Long> entry = it.next();
long expireMs = entry.getValue();
if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE)
entry.setValue(now + TOPIC_EXPIRY_MS);
else if (expireMs <= now) {
it.remove();
log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(), expireMs, now);
}
}
}
//监听器,metadata发生变动的时候通知监听者
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster);
String previousClusterId = cluster.clusterResource().clusterId();
//第一次进入,这里needMetadataForAllTopics在构造函数中赋值为false,进入else语句,
//直接将前面初始化的cluster对象赋值给metadata的成员变量cluster
if (this.needMetadataForAllTopics) {
// the listener may change the interested topics, which could cause another metadata refresh.
// If we have already fetched all topics, however, another fetch should be unnecessary.
this.needUpdate = false;
this.cluster = getClusterForCurrentTopics(cluster);
} else {
//赋值metadata的成员变量cluster
this.cluster = cluster;
}
// The bootstrap cluster is guaranteed not to have any useful information
if (!cluster.isBootstrapConfigured()) {
String clusterId = cluster.clusterResource().clusterId();
if (clusterId == null ? previousClusterId != null : !clusterId.equals(previousClusterId))
log.info("Cluster ID: {}", cluster.clusterResource().clusterId());
clusterResourceListeners.onUpdate(cluster.clusterResource());
}
//这里,如果有别的线程对当前类操作,并且处于等待状态,那么会被通知
notifyAll();
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
}
这里最重要的就是将new出来的cluster对象直接赋值给metadata的cluster成员变量,所以此处的update方法并没有去broker集群通过网络拉取元数据,此时cluaster对象里的很多与topic相关的内容是空的。
(2)构造NetworkClient 。
NetworkClient的初始化主要是一些赋值及生成一些对象,NetworkClient主要负责异步的io请求和响应,并且不是线程安全的。这里会初始化一个DefaultMetadataUpdater组件,用于更新元数据
private NetworkClient(MetadataUpdater metadataUpdater,
Metadata metadata,
Selectable selector,
String clientId,
int maxInFlightRequestsPerConnection,
long reconnectBackoffMs,
int socketSendBuffer,
int socketReceiveBuffer,
int requestTimeoutMs,
Time time) {
/* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the
* super constructor is invoked.
*/
//第一次,走这里,metadataUpdater 传过来的就是 null
if (metadataUpdater == null) {
if (metadata == null)
throw new IllegalArgumentException("`metadata` must not be null");
//1.初始化DefaultMetadataUpdater
this.metadataUpdater = new DefaultMetadataUpdater(metadata);
} else {
this.metadataUpdater = metadataUpdater;
}
this.selector = selector;
this.clientId = clientId;
//这个类里面的集合记录了已经发送出去但是未收到服务端响应的request
this.inFlightRequests = new InFlightRequests(maxInFlightRequestsPerConnection);
//记录当前producer与各个broker的连接状态
this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs);
this.socketSendBuffer = socketSendBuffer;
this.socketReceiveBuffer = socketReceiveBuffer;
this.correlation = 0;
this.randOffset = new Random();
this.requestTimeoutMs = requestTimeoutMs;
this.time = time;
}
(3)开启KafkaThread线程
这里KafkaThread只是Sender线程的包装,并且上面的NetworkClient作为Sender的成员变量:
所以这里的关系是:KafkaThread包含了Sender,Sender包含了NetworkClient,NetworkClient是跟broker做实际的网络通信组件。
public KafkaThread(final String name, Runnable runnable, boolean daemon) {
super(runnable, name);
setDaemon(daemon);
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
log.error("Uncaught exception in " + name + ": ", e);
}
});
}
从sender线程的初始化可以看出send类的重要组件:网络通信组件,内存缓存,元数据
public Sender(KafkaClient client,
Metadata metadata,
RecordAccumulator accumulator,
boolean guaranteeMessageOrder,
int maxRequestSize,
short acks,
int retries,
Metrics metrics,
Time time,
String clientId,
int requestTimeout) {
//网络通信组件
this.client = client;
//内存缓存
this.accumulator = accumulator;
//元数据
this.metadata = metadata;
this.guaranteeMessageOrder = guaranteeMessageOrder;
this.maxRequestSize = maxRequestSize;
this.running = true;
this.acks = acks;
this.retries = retries;
this.time = time;
this.clientId = clientId;
//统计组件
this.sensors = new SenderMetrics(metrics);
this.requestTimeout = requestTimeout;
}
当真正执行线程的时候执行的还是Sender类的run方法:
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
//running,volatile修饰,作为标志位的经典用法
//这里死循环,不停的执行run方法
while (running) {
try {
run(time.milliseconds());
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
//后面不重要
}
执行run(time.milliseconds());这里执行逻辑比较多,但是第一次执行这个方法的时候,多数的数据结构都是空的,主要是执行最后一行代码:
void run(long now) {
//第一次到这里,直接从metadata里获取cluster对象,此时的cluster对象是个空的 ,并没有去broker上拉取元数据信息
Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
//第一次来到这里,因为cluster的属性值多数是空的,所以并没有执行什么,以至于result里的数据结构都是空的
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// if there are any partitions whose leaders are not known yet, force metadata update
if (!result.unknownLeaderTopics.isEmpty()) {//第一次会跳过这里
// The set of topics with unknown leader contains topics with leader election pending as well as
// topics which may have expired. Add the topic again to metadata to ensure it is included
// and request metadata update, since there are messages to send to the topic.
for (String topic : result.unknownLeaderTopics)
this.metadata.add(topic);
this.metadata.requestUpdate();
}
// remove any nodes we aren't ready to send to
//第一次:result是空的,这里都会跳过
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
}
}
// create produce requests
//第一次:batches是空的
Map<Integer, List<RecordBatch>> batches = this.accumulator.drain(cluster,
result.readyNodes,
this.maxRequestSize,
now);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<RecordBatch> batchList : batches.values()) {
for (RecordBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
List<RecordBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
// update sensors
for (RecordBatch expiredBatch : expiredBatches)
this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
sensors.updateProduceRequestMetrics(batches);
List<ClientRequest> requests = createProduceRequests(batches, now);
// If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
// loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
// that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
// with sendable data that aren't ready to send since they would cause busy looping.
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
if (result.readyNodes.size() > 0) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
log.trace("Created {} produce requests: {}", requests.size(), requests);
pollTimeout = 0;
}
for (ClientRequest request : requests)
client.send(request, now);
// if some partitions are already ready to be sent, the select time would be 0;
// otherwise if some partition already has some data accumulated but not ready yet,
// the select time will be the time difference between now and its linger expiry time;
// otherwise the select time will be the time difference between now and the metadata expiry time;
this.client.poll(pollTimeout, now);
}
后面的 流程比较复杂,总体的调用流程是:
run(long now)
->this.client.poll(pollTimeout, now);
->metadataUpdater.maybeUpdate(now);
->maybeUpdate(now, node);
->handleCompletedReceives(responses, updatedNow);
->metadataUpdater.maybeHandleCompletedReceive(req, now, body)
->handleResponse(req.request().header(), body, now);
->this.metadata.update(cluster, now)
再次来到update方法,与上一次不同的是,这次已经真正从broker端获取到了cluster信息,会将这些信息更新给元数据。
更多推荐
所有评论(0)