(Kafka源码三)Kafka的缓存机制
本文主要讲解了线程添加消息(append()方法源码)到RecordAccumulator的源码流程,以及在添加的过程中所涉及到内存的申请与释放的源码分析,最后介绍了RecordAccumulator通过CopyOnWriteMap实现的batches,从而实现了读写分离与高并发读的能力。下一章将从源码角度详细介绍Broker是如何处理生产者发送过来的消息。
在Kafka的架构中,若没有缓存机制,在RecordAccumulator中对于不再使用的批次对象,需要进行回收,释放这些对象所占用的内存,为了降低GC的压力,Kafka作者设计了一个缓存池的机制,从而实现了实现消息批次的内存复用。本文主要讲解缓冲池的设计。
首先来看下kafka生产者到RecordAccumulator这边的架构流程
append()
生产者发送消息的send方法会依次调用以下方法,首先通过场景驱动的方式来看看生产者的源码是如何通过append()方法将消息添加到batches的双端队列中,首先模拟三个线程(线程一,线程二,线程三)往同一个分区写入数据,执行append操作,假设每次都是线程一先获取到锁
,通过synchronized
分段加锁,三个线程都能成功的将消息插入batches中,并且成功的将所申请的内存空间释放掉。可以看出append方法为了保证线程的安全性以及并发能力,append方法不是在方法上直接加锁,而是在内部采用分段加锁的方式,进一步提高了并发能力
,执行流程图如下
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
return doSend(interceptedRecord, callback);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
/*
把消息放入accumulator(默认大小32M),然后accumulator把消息封装
成为一个一个的消息批次的发送。
*/
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
}
//append()方法不是直接锁住整个方法,而是在内部采用分段加锁的方式,进一步提高了并发能力
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Callback callback,
long maxTimeToBlock) throws InterruptedException {
try {
/*
根据分区获取对应的队列(每个分区对应一个双端队列),若队列已经存在,则使用已存在的队列,
若不存在,则创建新的对垒
*/
Deque<RecordBatch> dq = getOrCreateDeque(tp);
//先到达的线程锁住队列
synchronized (dq) {
//若连接已经关闭,则抛出异常
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
// 尝试往队列里面的批次里添加数据,第一次尝试添加失败,因为还没有给消息批次对象分配内存
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
//若添加结果不为null,说明添加成功,返回添加结果
if (appendResult != null)
return appendResult;
}//释放锁
//在消息批次的大小和消息大小之间取一个较大值作为要申请内存的大小
int size = Math.max(this.batchSize, Records.LOG_OVERHEAD + Record.recordSize(key, value));
log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
//根据所需内存的大小分配内存块
ByteBuffer buffer = free.allocate(size, maxTimeToBlock);
synchronized (dq) {
//若连接已经关闭,则抛出异常
if (closed)
throw new IllegalStateException("Cannot send after the producer is closed.");
//再一次尝试将消息写入消息批次中(还是失败,虽然分配了内存,但还没有创建消息批次对象)
RecordAppendResult appendResult = tryAppend(timestamp, key, value, callback, dq);
//不为null,说明添加成功
if (appendResult != null) {
//释放内存,将消息批次所占用的内存空间返回内存池
free.deallocate(buffer);
//返回添加结果
return appendResult;
}
//根据内存块的大小创建消息批次对象
MemoryRecords records = MemoryRecords.emptyRecords(buffer, compression, this.batchSize);
RecordBatch batch = new RecordBatch(tp, records, time.milliseconds());
//尝试往消息批次对象写数据,这时就可以写入成功了
FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
//将消息批次对象添加到双端队列的尾部
dq.addLast(batch);
incomplete.add(batch);
//返回添加结果
return new RecordAppendResult(future, dq.size() > 1 || batch.records.isFull(), true);
}//释放锁
} finally {
appendsInProgress.decrementAndGet();
}
}
BufferPool
首先来看看缓冲池的数据结构
public final class BufferPool {
//缓冲池内存的总大小(默认大小为32M)
private final long totalMemory;
//单个消息批次(ByteBuffer)的大小,默认16K
private final int poolableSize;
// 由于在多线环境下,多个线程可能并发分配和回收消息批次,为了保证线程安全,使用控制并发。
private final ReentrantLock lock;
//双端队列实现的free缓冲池
private final Deque<ByteBuffer> free;
//该队列用于存放那些申请不到足够内存而被阻塞的线程所对应的条件变量
private final Deque<Condition> waiters;
//可用内存(可用内存大小 = totalMemory-free缓冲池的大小),初始值等于总内存大小
private long availableMemory;
...
public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
...
// 总内存
this.totalMemory = memory;
// 可用内存的大小默认等于总内存大小
this.availableMemory = memory;
}
}
BufferPool一开始的内存结构图如下
allocate()
allocate()申请内存的流程图
再来看看内存分配allocate()的源码
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
//若申请的内存的大小超过了内存池的总大小(默认32M),则抛出异常
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
//加锁
this.lock.lock();
try {
//若申请的内存大小等于干好等于一个消息批次(默认大小16k)并且free缓冲池不
//为空,则直接返回free队列头部的ByteBuffer
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
//free缓冲池的总大小
int freeListSize = this.free.size() * this.poolableSize;
//若总内存(free缓冲池大小+可用内存大小)大小 >= 要申请的内存大小
if (this.availableMemory + freeListSize >= size) {
//若可用内存不够但是free缓冲池不为空,则依次将free首部的ByteBuffer加入到可用内存中
freeUp(size);
//对可用内存做内存扣减
this.availableMemory -= size;
//释放锁
lock.unlock();
//根据申请的内存大小分配内存空间
return ByteBuffer.allocate(size);
} else {
//总内存不够 < 申请的内存大小
//accumulated用于记录已经分配了多少内存空间
int accumulated = 0;
ByteBuffer buffer = null;
//创建lock的条件变量
Condition moreMemory = this.lock.newCondition();
//当前线程最多可以等待多少时间
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
//将当前线程的条件变量加入waitter等待队列中,当前线程阻塞,等待其他线程释放内存
this.waiters.addLast(moreMemory);
//根据申请内存的大小,内存池一点一点去分配,直到满足所申请的内存大小
while (accumulated < size) {
//记录当前系统时间
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
//当等待的时间到了或者被其他线程唤醒了,当前线程就恢复继续运行
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} catch (InterruptedException e) {
this.waiters.remove(moreMemory);
throw e;
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
this.waitTime.record(timeNs, time.milliseconds());
}
if (waitingTimeElapsed) {
//从等待队列中移除当前线程的条件变量
this.waiters.remove(moreMemory);
throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
//计算还剩余多少可以等待的时间
remainingTimeToBlockNs -= timeNs;
//若申请大小刚好等于一个消息批次的大小并且free缓冲池不为空
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
//从free队列首部中获取ByteBuffer
buffer = this.free.pollFirst();
//更新内存累加值
accumulated = size;
} else {
//size-accumlated表示还需要多少内存空间,从free缓冲池中划分还需要的内存空间
freeUp(size - accumulated);
// 两者取较小值,表示可以分配的内存
int got = (int) Math.min(size - accumulated, this.availableMemory);
//从可用内存中扣减所需内存
this.availableMemory -= got;
//计算已经分配了多少内存
accumulated += got;
}
}
//从等待队列中的首部移除阻塞线程的condition
Condition removed = this.waiters.removeFirst();
if (removed != moreMemory)
throw new IllegalStateException("Wrong condition: this shouldn't happen.");
//如果可用内存大于0或者free缓冲池不为空,唤醒等待队列中的线程
if (this.availableMemory > 0 || !this.free.isEmpty()) {
//等待队列不为空
if (!this.waiters.isEmpty())
//唤醒首部等待的线程
this.waiters.peekFirst().signal();
}
//释放锁
lock.unlock();
//若buffer为空,就分配内存
if (buffer == null)
return ByteBuffer.allocate(size);
else
//返回已经分配内存的buffer
return buffer;
}
} finally {
if (lock.isHeldByCurrentThread())
//释放锁
lock.unlock();
}
}
deallocate()
释放内存的方法分为两种情况,若释放内存大小等于消息批次的大小,则直接将内存添加到free缓冲池队列的尾部,以达到内存复用的效果,若不等于一个消息批次的大小,则将内存的数值添加到可用内存的数值大小中,然后这块内存就等待JVM的GC回收,不论哪种情况,若等待队列不为空,都会唤醒等待队列waiters首部中被阻塞的线程,让该线程继续从内存池中分配内存。
释放内存的流程相比申请内存要简洁些,deallocate()释放内存的源码如下
public void deallocate(ByteBuffer buffer, int size) {
//加锁
lock.lock();
try {
//若释放的内存大小等于一个消息批次的大小
if (size == this.poolableSize && size == buffer.capacity()) {
//首先将内存的内容清空
buffer.clear();
//把内存放入free队列缓冲池中
this.free.add(buffer);
} else {
//若释放的内存大小不等于消息批次的大小,就直接将该内存块的空间大小添加到可用内存
//的数值大小,等待JVM的GC回收
this.availableMemory += size;
}
//从等待队列的首部移除阻塞的线程
Condition moreMem = this.waiters.peekFirst();
if (moreMem != null)
//唤醒正在等待分配内存的线程
moreMem.signal();
} finally {
//释放锁
lock.unlock();
}
}
对于内存的分配以及释放可以归纳为如下四种情景:
(1)申请16K的内存且free缓冲池不为空
这种情况直接从free缓冲池队列首部弹出大小为16K的ByteBuffer,用完后首先使用clear()方法清空内存里面的内存,再添加到free缓冲池队列的尾部,以达到内存复用的效果,接着唤醒等待队列waiters首部等待分配内存的线程。
(2)申请16K的内存且free()缓冲池为空
这种情况直接从可用内存中划分大小为16K的内存空间,用完后添加到free缓冲池队列的尾部,以便下次使用,
接着唤醒等待队列waiters首部中被阻塞的线程,让该线程继续从内存池中分配内存。
(3)申请非16K的内存且free缓冲池为空
这种情况直接从可用内存划分非16k的内存,用完后将其归还可用内存中,后续被JVM的GC回收。
(4)申请非16K的内存且可用内存不够但free缓冲池不为空的情况
首先从free缓冲池中将ByteBuffer释放回可用内存中,直到满足申请的内存大小,再从可用内存获取申请的内存,用完后再释放回可用内存,后续被JVM的GC回收。
总结:通过以上源码分析可以知道,如果申请的内存大小刚好等于batchSize,那么会直接从free缓冲池中申请内存,当用完后直接将内存归还到free队列,达到内存复用的效果,同时也减轻了GC的压力;若申请的内存大小比batchSize还要大,此时就不会从free缓冲池申请ByteBuffer了,而是直接到可用内存中申请一块新的内存空间,当用完后需要将这块内存归还到可用内存中,等待JVM的GC回收,此时更糟糕的一种情况是可用内存比申请内存小,需要不断的将free队列首部的内存释放到可用内存中,free缓冲池中的内存越来越少,意味着可用内存需要更多JVM的GC回收。因此,需要根据业务消息的大小,适当调整batchSize的大小,避免频繁的GC回收。
CopyOnWriteMap
Kafka可以处理大量的数据并且具备高并发能力,其中一个原因就在于借助了并发容器,在RecordAccumulator的成员变量中有一个用CopyOnWriteMap
实现的batches,从而实现了读写分离以及高并发读的能力。
public final class RecordAccumulator {
//TopicPartition(分区):Deque<RecordBatch> (队列)
private final ConcurrentMap<TopicPartition, Deque<RecordBatch>> batches;
}
//构造方法
public RecordAccumulator(int batchSize,
long totalSize,
CompressionType compression,
long lingerMs,
long retryBackoffMs,
Metrics metrics,
Time time) {
...
//CopyOnWriteMap实现的batches
this.batches = new CopyOnWriteMap<>();
}
batches的结构图
CopyOnWriteMap
的工作原理:当一个线程尝试修改 Map(比如添加、删除或更新键值对)时,它会先创建一个 Map 的副本,然后在副本上进行修改。修改完成后,会用新的副本替换原始 Map。这样,读取 Map 的线程可以继续访问原始 Map,而不会受到修改操作的影响。
CopyOnWriteMap的优缺点:
-
优点:它可以在读多写少的场景下提供较好的性能。因为读操作不需要加锁,而写操作只需要替换引用,所以具有很高的并发读性能。
-
缺点:每次修改操作都需要创建一个新的副本,这会增加内存消耗和垃圾回收的压力。
总结
本文主要讲解了线程添加消息(append()方法源码)到RecordAccumulator的源码流程,以及在添加的过程中所涉及到内存的申请与释放的源码分析,最后介绍了RecordAccumulator通过CopyOnWriteMap实现的batches,从而实现了读写分离与高并发读的能力。下一章将从源码角度详细介绍Broker是如何处理生产者发送过来的消息。
更多推荐
所有评论(0)