Kafka生产者源码解析(二)——RecordAccumulator
谈到面试,其实说白了就是刷题刷题刷题,天天作死的刷。。。。。为了准备这个“金三银四”的春招,狂刷一个月的题,狂补超多的漏洞知识,像这次美团面试问的算法、数据库、Redis、设计模式等这些题目都是我刷到过的并且我也将自己刷的题全部整理成了PDF或者Word文档(含详细答案解析)66个Java面试知识点架构专题(MySQL,Java,Redis,线程,并发,设计模式,Nginx,Linux,框架,微服
《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》,点击传送门,即可获取!
进入RecordAccumulator类中,可以看到它有很多的属性字段,其中batches这个字段需要引起我们的注意,它是一个以TopicPartition作为key,Deque作为value的ConcurrentMap,TopicPartition存储了topic及partition信息,能够标记消息属于哪个主题和应该发往哪个分区;Deque是一个双端队列,里面存放的是ProducerBatch对象,ProducerBatch用于存储一批将要被发送的消息记录;ProducerBatch通过MemoryRecordsBuilder对象拥有一个DataOutputStream对象的引用,这里就是我们消息存放的最终归宿,根据MemoryRecordsBuilder构造方法的源码可知DataOutputStream里面持有ByteBufferOutputStream,这是一个缓存buffer,所以往DataOutputStream里面写消息数据,就是往缓存里面写消息数据。
|
最后存入RecordAccumulator中的消息将会是这样。
|
二、append方法解析
RecordAccumulator的构造方法中通过CopyOnWriteMap初始化了上述谈到的batches对象,同时还初始化了其他的属性内容,这里不再赘述其构造的过程,而是着重分析上一篇中遗留的内容:KafkaProducer是如何通过accumulator.append方法将消息追加到RecordAccumulator消息累加器中的。
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
//并发数加1,统计正在向RecordAccumulator中追加消息的线程数
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
//查找TopicPartition对应的Deque,如果没有则创建
Deque dq = getOrCreateDeque(tp);
//追加消息时需要加锁
synchronized (dq) {
if (closed)
throw new KafkaException(“Producer closed while send in progress”);
//尝试往Deque中最后一个ProducerBatch中追加消息记录
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null)
//消息追加成功返回结果
return appendResult;
}
//来到这一步说明上面消息追加失败
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
//获取要创建的ProducerBatch的内存大小
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace(“Allocating a new {} byte message buffer for topic {} partition {}”, size, tp.topic(), tp.partition());
//从BufferPool中申请空间用于后面创建新的ProducerBatch
buffer = free.allocate(size, maxTimeToBlock);
//和上面一样,追加消息时需要加锁
synchronized (dq) {
// Need to check if producer is closed again after grabbing the dequeue lock.
if (closed)
throw new KafkaException(“Producer closed while send in progress”);
//在创建新的ProducerBatch之前再次尝试往Deque中最后一个ProducerBatch中追加消息记录,说不定现在成功了呢
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
if (appendResult != null) {
//消息追加成功返回结果
return appendResult;
}
//如果消息还是追加失败了。。。
//构造MemoryRecordsBuilder,消息将会存入它拥有的MemoryRecords对象
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
//创建ProducerBatch
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
//使用batch.tryAppend追加消息
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
//将刚创建的ProducerBatch放入Deque双端队列尾部
dq.addLast(batch);
incomplete.add(batch);
//到这里消息已经追加成功,将buffer置空
buffer = null;
//返回结果
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
}
} finally {
if (buffer != null)
//释放之前申请的新空间
free.deallocate(buffer);
//结束,并发数减1
appendsInProgress.decrementAndGet();
}
}
上面的代码已经给出了注释,现将这段代码的流程总结如下:
|
这段代码的核心部分便是batch.tryAppend方法,下面是该方法的部分源码,首先是检查了一下消息存储器的剩余空间是否充足,若不足则直接返回null,后面走申请空间新建ProducerBatch的流程。如果空间剩余充足则MemoryRecordsBuilder会调用append方法进行消息追加。
public FutureRecordMetadata tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, long now) {
//检查消息存储器中剩余空间是否充足,若空间不足则直接返回null
if (!recordsBuilder.hasRoomFor(timestamp, key, value, headers)) {
return null;
} else {
//消息写入
Long checksum = this.recordsBuilder.append(timestamp, key, value, headers);
……………………
return future;
}
}
然后像洋葱一样不断剥开append方法的皮,,,,,发现MemoryRecordsBuilder最终会根据KafkaProducer客户端版本的不同去调用下面两个方法之一:appendDefaultRecord和appendLegacyRecord。
private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value,
Header[] headers) throws IOException {
………………
int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers);
recordWritten(offset, timestamp, sizeInBytes);
}
private long appendLegacyRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value) throws IOException {
………………
long crc = LegacyRecord.write(appendStream, magic, timestamp, key, value, CompressionType.NONE, timestampType);
recordWritten(offset, timestamp, size + Records.LOG_OVERHEAD);
return crc;
}
它们分别通过DefaultRecord.writeTo和LegacyRecord.write去实现最终的消息追加,它们的第一个参数就是一开始所谈到的DataOutputStream对象,DataOutputStream里面持有ByteBufferOutputStream,这是一个缓存buffer,所以往DataOutputStream里面写消息数据,就是往缓存里面写消息数据,后面的recordWritten方法主要是处理位移问题。
下面主要以writeTo方法源码为例来看下其最终处理逻辑:
public static int writeTo(DataOutputStream out,
int offsetDelta,
long timestampDelta,
ByteBuffer key,
ByteBuffer value,
Header[] headers) throws IOException {
//计算消息数据大小
int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers);
总结
谈到面试,其实说白了就是刷题刷题刷题,天天作死的刷。。。。。
为了准备这个“金三银四”的春招,狂刷一个月的题,狂补超多的漏洞知识,像这次美团面试问的算法、数据库、Redis、设计模式等这些题目都是我刷到过的
并且我也将自己刷的题全部整理成了PDF或者Word文档(含详细答案解析)
66个Java面试知识点
架构专题(MySQL,Java,Redis,线程,并发,设计模式,Nginx,Linux,框架,微服务等)+大厂面试题详解(百度,阿里,腾讯,华为,迅雷,网易,中兴,北京中软等)
算法刷题(PDF)
《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》,点击传送门,即可获取!
题(MySQL,Java,Redis,线程,并发,设计模式,Nginx,Linux,框架,微服务等)+大厂面试题详解(百度,阿里,腾讯,华为,迅雷,网易,中兴,北京中软等)**
[外链图片转存中…(img-eeWgjBj8-1714700995599)]
算法刷题(PDF)
[外链图片转存中…(img-nCjJEHaY-1714700995600)]
《一线大厂Java面试题解析+核心总结学习笔记+最新讲解视频+实战项目源码》,点击传送门,即可获取!
更多推荐
所有评论(0)