PoolThreadCache
在 Netty 的内存池中扮演着线程本地缓存的角色。它的主要目的是减少线程在分配内存时对全局 PoolArena
的竞争,通过缓存一部分最近释放的内存块,使得同一线程后续申请相同规格的内存时能够快速获取,从而提高分配效率。
下面我们详细分析其源码:
主要成员变量
// ... existing code ...
final class PoolThreadCache {private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);private static final int INTEGER_SIZE_MINUS_ONE = Integer.SIZE - 1;final PoolArena<byte[]> heapArena; // 关联的堆内存Arenafinal PoolArena<ByteBuffer> directArena; // 关联的直接内存Arena// 针对不同大小规格 (Small/Normal) 和类型 (Heap/Direct) 的内存区域缓存// Small类型的内存通常来自PoolSubpageprivate final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;// Normal类型的内存通常直接来自PoolChunk的Pageprivate final MemoryRegionCache<byte[]>[] normalHeapCaches;private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;// 分配次数阈值,达到此阈值时触发trim操作,清理缓存private final int freeSweepAllocationThreshold;// 标记缓存是否已被释放,防止重复释放private final AtomicBoolean freed = new AtomicBoolean();@SuppressWarnings("unused") // Field is only here for the finalizer.// 用于在对象被GC回收前,通过finalizer机制尝试释放缓存中的资源private final FreeOnFinalize freeOnFinalize;// 当前线程缓存的分配次数,用于配合freeSweepAllocationThresholdprivate int allocations;// TODO: Test if adding padding helps under contention//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;// ... existing code ...
}
heapArena
和directArena
: 分别指向该线程缓存关联的堆内存池和直接内存池。线程在缓存未命中时,会向这两个 Arena 申请内存。smallSubPageHeapCaches
,smallSubPageDirectCaches
,normalHeapCaches
,normalDirectCaches
: 这四个数组是核心的缓存存储结构。它们是MemoryRegionCache
类型的数组,MemoryRegionCache
内部维护了一个队列,用于存储缓存的内存块信息。smallSubPage...Caches
: 用于缓存 "Small" 类型的内存块。这类内存块通常小于一个 Page,由PoolSubpage
管理。数组的索引对应不同的elemSize
。normal...Caches
: 用于缓存 "Normal" 类型的内存块。这类内存块通常大于等于一个 Page,直接从PoolChunk
中分配。数组的索引对应不同的规格大小。
freeSweepAllocationThreshold
: 这是一个重要的参数。当PoolThreadCache
的allocations
计数达到这个阈值时,会触发trim()
方法,尝试回收一部分缓存的内存,以避免缓存过多导致内存浪费。freed
: 一个原子布尔值,确保free()
方法只被执行一次,防止资源被多次释放。freeOnFinalize
: 一个内部类实例,如果启用了useFinalizer
,当PoolThreadCache
对象被垃圾回收时,其finalize
方法会被调用,进而调用PoolThreadCache.free(true)
来释放缓存的资源。这是一种兜底机制。allocations
: 记录从该线程缓存成功分配出去的次数。
构造函数 PoolThreadCache(...)
PoolThreadCache.java
// ... existing code ...PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,int freeSweepAllocationThreshold, boolean useFinalizer) {checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;this.heapArena = heapArena;this.directArena = directArena;if (directArena != null) {// 创建直接内存的Small和Normal类型的缓存数组smallSubPageDirectCaches = createSubPageCaches(smallCacheSize, directArena.sizeClass.nSubpages);normalDirectCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, directArena);// 增加Arena中线程缓存的计数directArena.numThreadCaches.getAndIncrement();} else {// No directArea is configured so just null out all cachessmallSubPageDirectCaches = null;normalDirectCaches = null;}if (heapArena != null) {// 创建堆内存的Small和Normal类型的缓存数组smallSubPageHeapCaches = createSubPageCaches(smallCacheSize, heapArena.sizeClass.nSubpages);normalHeapCaches = createNormalCaches(normalCacheSize, maxCachedBufferCapacity, heapArena);// 增加Arena中线程缓存的计数heapArena.numThreadCaches.getAndIncrement();} else {// No heapArea is configured so just null out all cachessmallSubPageHeapCaches = null;normalHeapCaches = null;}// Only check if there are caches in use.// 如果配置了任何缓存,则freeSweepAllocationThreshold必须大于0if ((smallSubPageDirectCaches != null || normalDirectCaches != null|| smallSubPageHeapCaches != null || normalHeapCaches != null)&& freeSweepAllocationThreshold < 1) {throw new IllegalArgumentException("freeSweepAllocationThreshold: "+ freeSweepAllocationThreshold + " (expected: > 0)");}// 根据useFinalizer参数决定是否创建FreeOnFinalize实例freeOnFinalize = useFinalizer ? new FreeOnFinalize(this) : null;}// ... existing code ...
构造函数的主要工作是初始化各个成员变量,特别是根据传入的参数创建不同类型的 MemoryRegionCache
数组。
smallCacheSize
,normalCacheSize
: 分别定义了 Small 类型和 Normal 类型缓存区域中MemoryRegionCache
队列的大小。maxCachedBufferCapacity
: 定义了可以被缓存的 Buffer 的最大容量。超过这个容量的 Buffer 不会被缓存。directArena.sizeClass.nSubpages
: 这个值决定了smallSubPageDirectCaches
数组的大小,即支持多少种不同规格的 Small 类型直接内存缓存。directArena.numThreadCaches.getAndIncrement()
: 每当一个PoolThreadCache
关联到一个PoolArena
时,会增加PoolArena
内部的线程缓存计数器。
createSubPageCaches
PoolThreadCache.java
// ... existing code ...
private static <T> MemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches) {if (cacheSize > 0 && numCaches > 0) {@SuppressWarnings("unchecked")MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];for (int i = 0; i < cache.length; i++) {// TODO: maybe use cacheSize / cache.length// 显式类型实参 T 可被替换为 <>cache[i] = new SubPageMemoryRegionCache<>(cacheSize);}return cache;} else {return null;}
}
这个方法用于创建 SubPageMemoryRegionCache
数组。
numCaches
通常是 arena.sizeClass.nSubpages
,表示支持的 Small 类型规格数量。每个 SubPageMemoryRegionCache
实例的内部队列大小由 cacheSize
决定。
createNormalCaches
// ... existing code ...
@SuppressWarnings("unchecked")
private static <T> MemoryRegionCache<T>[] createNormalCaches(int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {if (cacheSize > 0 && maxCachedBufferCapacity > 0) {int max = Math.min(area.sizeClass.chunkSize, maxCachedBufferCapacity);// Create as many normal caches as we support based on how many sizeIdx we have and what the upper// bound is that we want to cache in general.List<MemoryRegionCache<T>> cache = new ArrayList<MemoryRegionCache<T>>() ;// 从nSubpages开始,因为之前的sizeIdx是为Small类型保留的// area.sizeClass.sizeIdx2size(idx) <= max 确保只为不超过maxCachedBufferCapacity的规格创建缓存for (int idx = area.sizeClass.nSubpages; idx < area.sizeClass.nSizes &&area.sizeClass.sizeIdx2size(idx) <= max; idx++) {// 显式类型实参 T 可被替换为 <>cache.add(new NormalMemoryRegionCache<>(cacheSize));}return cache.toArray(new MemoryRegionCache[0]);} else {return null;}
}
这个方法用于创建 NormalMemoryRegionCache
数组。
它会遍历 PoolArena
的 SizeClasses
中定义的 Normal 类型的规格,但只为那些大小不超过 maxCachedBufferCapacity
(且不超过 chunkSize
) 的规格创建缓存。
内存分配方法
-
allocateSmall(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx)
-
allocateNormal(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int sizeIdx)
这两个方法分别用于分配 Small 和 Normal 类型的内存。它们首先通过cacheForSmall
或cacheForNormal
找到对应的MemoryRegionCache
,然后调用通用的allocate
方法。 -
allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity)
PoolThreadCache.java
// ... existing code ... @SuppressWarnings({ "unchecked", "rawtypes" }) private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {if (cache == null) {// no cache found so just return false herereturn false;}// 尝试从指定的MemoryRegionCache分配boolean allocated = cache.allocate(buf, reqCapacity, this);// 如果分配成功,并且总分配次数达到阈值if (++ allocations >= freeSweepAllocationThreshold) {allocations = 0; // 重置计数器trim(); // 执行trim操作,清理缓存}return allocated; }
这是实际执行从缓存分配的逻辑:
1. 如果找不到对应的 MemoryRegionCache
(例如,该规格的缓存未启用或请求的 sizeIdx
超出范围),则返回 false
。
2. 调用 cache.allocate(buf, reqCapacity, this)
尝试从该 MemoryRegionCache
的队列中取出一个缓存的 Entry
并用它初始化 buf
。
3. 如果分配成功 (allocated
为 true
),则 allocations
计数器加1。
4. 检查 allocations
是否达到 freeSweepAllocationThreshold
。如果是,则将 allocations
重置为0,并调用 trim()
方法来清理所有缓存区域中不活跃的条目。
添加到缓存方法 add(...)
PoolThreadCache.java
// ... existing code ...@SuppressWarnings({ "unchecked", "rawtypes" })boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,long handle, int normCapacity, SizeClass sizeClass) {// 根据normCapacity计算sizeIdxint sizeIdx = area.sizeClass.size2SizeIdx(normCapacity);// 根据area类型(Heap/Direct)、sizeIdx和sizeClass(Small/Normal)获取对应的MemoryRegionCacheMemoryRegionCache<?> cache = cache(area, sizeIdx, sizeClass);if (cache == null) {return false; // 找不到合适的缓存区域}if (freed.get()) { // 如果缓存已被标记为释放,则不再添加return false;}// 调用MemoryRegionCache的add方法return cache.add(chunk, nioBuffer, handle, normCapacity);}// ... existing code ...
当一个 PooledByteBuf
被释放时,如果满足一定条件(例如,它的大小适合缓存,且其来源的 PoolArena
允许缓存),PoolArena
会尝试调用此方法将其对应的内存块信息(chunk
, handle
, normCapacity
等)添加到当前线程的 PoolThreadCache
中。
- 计算
sizeIdx
。 - 通过
cache(area, sizeIdx, sizeClass)
方法定位到具体的MemoryRegionCache
。 - 如果缓存已被释放 (
freed.get()
为true
),则不添加。 - 调用
MemoryRegionCache.add(...)
将内存块信息封装成Entry
对象并尝试放入其内部队列。
缓存检索方法
cache(PoolArena<?> area, int sizeIdx, SizeClass sizeClass)
: 根据sizeClass
(Normal 或 Small) 调用cacheForNormal
或cacheForSmall
。cacheForSmall(PoolArena<?> area, int sizeIdx)
: 判断area
是堆内存还是直接内存,然后从smallSubPageHeapCaches
或smallSubPageDirectCaches
中获取缓存。cacheForNormal(PoolArena<?> area, int sizeIdx)
: 类似cacheForSmall
,但操作的是normalHeapCaches
和normalDirectCaches
。注意这里idx = sizeIdx - area.sizeClass.nSubpages
,因为sizeIdx
是全局的,而 Normal 类型的缓存在数组中的索引需要减去 Small 类型的规格数量。cache(MemoryRegionCache<T>[] cache, int sizeIdx)
: 简单的数组访问,并进行边界检查。
这些方法共同构成了从缓存数组中定位特定 MemoryRegionCache
的逻辑。
释放资源方法 free(boolean finalizer)
PoolThreadCache.java
// ... existing code ...void free(boolean finalizer) {// As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure// we only call this one time.// 使用AtomicBoolean确保free操作只执行一次if (freed.compareAndSet(false, true)) {if (freeOnFinalize != null) {// Help GC: this can race with a finalizer thread, but will be null out regardlessfreeOnFinalize.cache = null; // 解除FreeOnFinalize对PoolThreadCache的引用}// 依次释放所有类型的缓存区域int numFreed = free(smallSubPageDirectCaches, finalizer) +free(normalDirectCaches, finalizer) +free(smallSubPageHeapCaches, finalizer) +free(normalHeapCaches, finalizer);if (numFreed > 0 && logger.isDebugEnabled()) {logger.debug("Freed {} thread-local buffer(s) from thread: {}", numFreed,Thread.currentThread().getName());}// 递减Arena中的线程缓存计数if (directArena != null) {directArena.numThreadCaches.getAndDecrement();}if (heapArena != null) {heapArena.numThreadCaches.getAndDecrement();}}}private static int free(MemoryRegionCache<?>[] caches, boolean finalizer) {if (caches == null) {return 0;}int numFreed = 0;for (MemoryRegionCache<?> c: caches) {numFreed += free(c, finalizer); // 遍历释放数组中的每个MemoryRegionCache}return numFreed;}private static int free(MemoryRegionCache<?> cache, boolean finalizer) {if (cache == null) {return 0;}return cache.free(finalizer); // 调用MemoryRegionCache的free方法}// ... existing code ...
当线程结束或者 PooledByteBufAllocator
关闭时,会调用此方法来释放 PoolThreadCache
中缓存的所有内存块。
freed.compareAndSet(false, true)
保证了此方法体内的逻辑只执行一次。finalizer
参数指示这次释放是否由 finalizer 机制触发。如果是,MemoryRegionCache.freeEntry
的行为会有所不同(主要是为了避免在 finalizer 线程中执行可能导致死锁或复杂状态的操作)。- 它会遍历所有四种缓存数组,并调用每个
MemoryRegionCache
实例的free(finalizer)
方法,该方法会清空其内部队列并将所有缓存的Entry
代表的内存块归还给PoolArena
。 - 最后,递减关联
PoolArena
中的numThreadCaches
计数。
整理缓存方法 trim()
PoolThreadCache.java
// ... existing code ...void trim() {trim(smallSubPageDirectCaches);trim(normalDirectCaches);trim(smallSubPageHeapCaches);trim(normalHeapCaches);}private static void trim(MemoryRegionCache<?>[] caches) {if (caches == null) {return;}for (MemoryRegionCache<?> c: caches) {trim(c);}}private static void trim(MemoryRegionCache<?> cache) {if (cache == null) {return;}cache.trim(); // 调用MemoryRegionCache的trim方法}// ... existing code ...
当 PoolThreadCache
的 allocations
达到 freeSweepAllocationThreshold
时被调用。它会遍历所有缓存数组,并调用每个 MemoryRegionCache
实例的 trim()
方法。MemoryRegionCache.trim()
会根据其自身的分配情况和队列大小,决定是否释放一部分缓存的 Entry
。
SubPageMemoryRegionCache<T>
和 NormalMemoryRegionCache<T>
这两个类都继承自抽象的 MemoryRegionCache<T>
。它们的主要区别在于构造时传入的 SizeClass
(Small 或 Normal) 以及它们如何实现 initBuf
方法:
SubPageMemoryRegionCache.initBuf(...)
调用chunk.initBufWithSubpage(...)
,用于从PoolSubpage
初始化PooledByteBuf
。NormalMemoryRegionCache.initBuf(...)
调用chunk.initBuf(...)
,用于从PoolChunk
的 Page 初始化PooledByteBuf
。
MemoryRegionCache<T>
abstract static class
这是线程缓存的核心数据结构之一,代表特定规格内存的缓存区域。
size
: 缓存队列的容量,是2的幂次方。queue
:PlatformDependent.newFixedMpscUnpaddedQueue(this.size)
创建的一个多生产者单消费者队列 (MPSC),用于存储缓存的Entry<T>
对象。由于PoolThreadCache
是线程本地的,这里的“多生产者”实际上是指其他线程释放内存并尝试将内存块添加到这个线程的缓存中(虽然 Netty 的设计主要是当前线程释放的内存回到当前线程的缓存),而“单消费者”就是当前线程自己从缓存中分配内存。sizeClass
: 标记这个缓存区域是用于Small
还是Normal
类型的内存。allocations
: 记录从这个特定MemoryRegionCache
分配出去的次数,用于其自身的trim()
逻辑。add(PoolChunk<T> chunk, ...)
: 创建一个新的Entry
对象(通过RECYCLER
获取),设置好chunk
、handle
等信息,然后尝试将其加入queue
。如果队列已满 (offer
返回false
),则立即回收这个Entry
对象。allocate(PooledByteBuf<T> buf, ...)
: 从queue
中取出一个Entry
(poll
)。如果队列为空,返回false
。否则,调用抽象方法initBuf
用取出的Entry
中的信息来初始化buf
,然后回收Entry
对象,并增加allocations
计数。free(int max, boolean finalizer)
: 从队列中移除最多max
个Entry
,并对每个Entry
调用freeEntry
。trim()
: 计算当前队列中可以释放的Entry
数量(基于size - allocations
),然后调用free(free, false)
来释放它们。allocations
在这里代表了近期从该缓存区域成功分配的次数,如果这个数字远小于队列的容量size
,说明缓存利用率不高,可以进行清理。freeEntry(Entry entry, boolean finalizer)
: 这是将缓存的内存块真正归还给PoolArena
的地方。它获取Entry
中的chunk
,handle
,nioBuffer
,normCapacity
。如果不是由 finalizer 触发,它会先回收Entry
对象本身,然后调用chunk.arena.free(chunk, entry.nioBuffer, handle, normCapacity, this)
将内存块归还给 Arena。如果是 finalizer 触发,则只归还内存块,不立即回收Entry
(避免在 finalizer 中操作 Recycler 可能引发的问题)。
Entry<T>
一个简单的 POJO,用于封装缓存的内存块信息 (PoolChunk
, ByteBuffer nioBuffer
, long handle
, int normCapacity
)。它通过 Netty 的 Recycler
进行对象池管理,以减少 Entry
对象自身的创建和销毁开销。
// ... existing code ...private static final ObjectPool<Entry<?>> RECYCLER = ObjectPool.newPool(new ObjectCreator<Entry<?>>() {@SuppressWarnings("unchecked")@Overridepublic Entry<?> newObject(Handle<Entry<?>> handle) {return new Entry(handle);}});static final class Entry<T> {final EnhancedHandle<Entry<?>> recyclerHandle; // Recycler的句柄PoolChunk<T> chunk;ByteBuffer nioBuffer; // 缓存的NIO ByteBuffer,主要用于Direct Bufferlong handle = -1; // 内存块在PoolChunk中的句柄int normCapacity; // 规格化容量Entry(Handle<Entry<?>> recyclerHandle) {this.recyclerHandle = (EnhancedHandle<Entry<?>>) recyclerHandle;}void recycle() {chunk = null;nioBuffer = null;handle = -1;recyclerHandle.recycle(this);}// "Unguarded" version of recycle() that must only be used when we are sure that the Entry is not double-recycled.// This is the case when we obtained the Entry from the queue and add it to the cache again.void unguardedRecycle() {chunk = null;nioBuffer = null;handle = -1;recyclerHandle.unguardedRecycle(this);}}@SuppressWarnings("rawtypes")private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {Entry entry = RECYCLER.get(); // 从Recycler获取Entry对象entry.chunk = chunk;entry.nioBuffer = nioBuffer;entry.handle = handle;entry.normCapacity = normCapacity;return entry;}
// ... existing code ...
FreeOnFinalize
一个简单的包装类,其 finalize()
方法会调用 PoolThreadCache.free(true)
。这是为了在 PoolThreadCache
对象本身被 GC 回收时,能够尝试释放其占用的缓存资源,作为一种安全保障。
完全移动到 Java9+ 后, 会使用 java.lang.ref.Cleaner
// ... existing code ...
// Used to free the cache via a finalizer. This is just a best effort and should only be used if the
// ThreadLocal is not removed via FastThreadLocal.onRemoval(...) as this is the preferred way to free the cache.
private static final class FreeOnFinalize {private PoolThreadCache cache;FreeOnFinalize(PoolThreadCache cache) {this.cache = cache;}@Overrideprotected void finalize() throws Throwable {try {super.finalize();} finally {PoolThreadCache cache = this.cache;// this can race with a non-finalizer thread calling free: regardless who wins, the cache will be// null outthis.cache = null;if (cache != null) {// We must only call free if the cache was not null before, which means it was not freed before// by an explicit call to PoolThreadCache.free().//// We must use true as parameter which indicates that we were called from a finalizer.cache.free(true);}}}
}
}
总结
PoolThreadCache
通过精心设计的缓存结构和回收策略,有效地提升了 Netty 内存分配的性能。它利用线程本地性避免了锁竞争,并通过 MemoryRegionCache
对不同规格的内存进行细粒度管理。
freeSweepAllocationThreshold
和 trim
机制确保了缓存在提供性能优势的同时,不会无限制地消耗内存。内部类如 MemoryRegionCache
和 Entry
的设计,以及 Recycler
的使用,都体现了 Netty 对性能和资源管理的极致追求。