PoolArena
是 Netty 内存池化机制的核心组件之一,它负责管理一整块或多块内存(PoolChunk
),并将这些内存分配给应用程序。每个 PoolArena
实例都与一个特定的线程相关联(通过 PoolThreadCache
),或者在禁用线程缓存时被多个线程共享。Netty 会创建多个 PoolArena
来减少多线程环境下的锁竞争。
PoolArena
是一个抽象类,它有两个具体的子类:
HeapArena
: 用于分配堆内存 (byte[]
)。DirectArena
: 用于分配直接内存 (ByteBuffer
)。
主要成员变量和职责
让我们看一下 PoolArena
类中的一些关键字段:
// ...
abstract class PoolArena<T> implements PoolArenaMetric {// ...enum SizeClass {Small,Normal}final PooledByteBufAllocator parent; // 指向创建此 Arena 的 PooledByteBufAllocatorfinal PoolSubpage<T>[] smallSubpagePools; // 用于管理 Small 类型内存分配的 PoolSubpage 池数组// PoolChunkList 用于根据 PoolChunk 的使用率将其组织起来// qInit: 0-25% 使用率 (最初创建的 Chunk)// q000: < 50% 使用率// q025: 25-75% 使用率// q050: 50-100% 使用率// q075: 75-100% 使用率// q100: 100% 使用率 (已满,但仍可分配 Subpage)private final PoolChunkList<T> q050;private final PoolChunkList<T> q025;private final PoolChunkList<T> q000;private final PoolChunkList<T> qInit;private final PoolChunkList<T> q075;private final PoolChunkList<T> q100;private final List<PoolChunkListMetric> chunkListMetrics; // 用于收集 PoolChunkList 的度量信息// 各种分配和释放的计数器private long allocationsNormal; // Normal 类型分配次数private final LongAdder allocationsSmall = new LongAdder(); // Small 类型分配次数 (线程安全)private final LongAdder allocationsHuge = new LongAdder(); // Huge 类型分配次数 (线程安全)private final LongAdder activeBytesHuge = new LongAdder(); // Huge 类型活跃字节数 (线程安全)private long deallocationsSmall; // Small 类型释放次数private long deallocationsNormal; // Normal 类型释放次数private long pooledChunkAllocations; // 池化 Chunk 的分配次数private long pooledChunkDeallocations; // 池化 Chunk 的释放次数private final LongAdder deallocationsHuge = new LongAdder(); // Huge 类型释放次数 (线程安全)// 使用此 Arena 的线程缓存数量final AtomicInteger numThreadCaches = new AtomicInteger();private final ReentrantLock lock = new ReentrantLock(); // 用于保护 Arena 内部状态的锁final SizeClasses sizeClass; // 描述了 Arena 的大小规格配置 (pageSize, chunkSize 等)// ...
}
SizeClass
: 枚举类型,表示内存分配的类型,分为Small
(小于等于pageSize / 2
,通常从PoolSubpage
分配) 和Normal
(大于pageSize / 2
但小于chunkSize
,直接从PoolChunk
分配)。parent
: 指向PooledByteBufAllocator
,这是内存分配器的顶层入口。smallSubpagePools
: 这是一个PoolSubpage
数组,数组的每个元素是一个双向链表的头节点。相同大小的Small
类型的PoolSubpage
会被链接到同一个链表上,便于快速查找和分配。qInit
,q000
,q025
,q050
,q075
,q100
: 这些是PoolChunkList
对象,它们形成了一个双向链表结构。PoolArena
根据PoolChunk
的内存使用率(usage()
)将其组织在不同的PoolChunkList
中。例如,q050
存储使用率在 50% 到 100% 之间的PoolChunk
。这种组织方式有助于在分配内存时,优先从使用率较高的PoolChunk
中分配,以期尽快填满并释放空闲的PoolChunk
,从而减少内存碎片。- Metrics Counters: 大量的计数器用于追踪不同类型(Small, Normal, Huge)的分配和释放次数,以及活跃的字节数和 Chunk 数量。这些信息对于监控内存池的性能和状态非常有用。
LongAdder
用于在高并发场景下提供比AtomicLong
更好的性能。 numThreadCaches
: 记录了当前有多少个PoolThreadCache
正在使用这个PoolArena
。lock
: 一个可重入锁,用于在修改PoolArena
的共享数据结构(如PoolChunkList
)时进行同步,防止并发冲突。sizeClass
: (实际上是this.sizeClass
,来自构造函数参数SizeClasses sizeClass
) 这是一个SizeClasses
对象,它封装了关于内存规格的配置信息,如pageSize
(页大小)、pageShifts
、chunkSize
(块大小)等,并提供了一些计算方法,如根据请求大小计算规格索引 (size2SizeIdx
)。
构造函数
PoolArena.java
// ...protected PoolArena(PooledByteBufAllocator parent, SizeClasses sizeClass) {assert null != sizeClass;this.parent = parent;this.sizeClass = sizeClass;smallSubpagePools = newSubpagePoolArray(sizeClass.nSubpages);for (int i = 0; i < smallSubpagePools.length; i ++) {smallSubpagePools[i] = newSubpagePoolHead(i);}q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, sizeClass.chunkSize);q075 = new PoolChunkList<T>(this, q100, 75, 100, sizeClass.chunkSize);q050 = new PoolChunkList<T>(this, q100, 50, 100, sizeClass.chunkSize); // 注意这里 nextList 是 q100q025 = new PoolChunkList<T>(this, q050, 25, 75, sizeClass.chunkSize);q000 = new PoolChunkList<T>(this, q025, 1, 50, sizeClass.chunkSize);qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, sizeClass.chunkSize);q100.prevList(q075);q075.prevList(q050);q050.prevList(q025);q025.prevList(q000);q000.prevList(null); // q000 的前一个 List 是 null,表示它是链表的头部(在查找时)qInit.prevList(qInit); // qInit 的 prevList 指向自身,它是一个特殊的 ListList<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);metrics.add(qInit);metrics.add(q000);metrics.add(q025);metrics.add(q050);metrics.add(q075);metrics.add(q100);chunkListMetrics = Collections.unmodifiableList(metrics);}private PoolSubpage<T> newSubpagePoolHead(int index) {PoolSubpage<T> head = new PoolSubpage<T>(index);head.prev = head;head.next = head;return head;}@SuppressWarnings("unchecked")private PoolSubpage<T>[] newSubpagePoolArray(int size) {return new PoolSubpage[size];}
// ...
构造函数主要做了以下几件事:
- 初始化
parent
和sizeClass
。 - 初始化
smallSubpagePools
数组,其中每个元素都是一个PoolSubpage
链表的头节点。newSubpagePoolHead
创建一个空的双向循环链表。 - 初始化
qInit
到q100
这些PoolChunkList
。注意它们的minUsage
和maxUsage
参数,以及它们之间的nextList
和prevList
关系,形成了一个查找链。qInit
: 用于存放新创建的PoolChunk
,使用率范围是Integer.MIN_VALUE
到25%
。q000
: 使用率1%
到50%
。q025
: 使用率25%
到75%
。q050
: 使用率50%
到100%
。q075
: 使用率75%
到100%
。q100
: 使用率100%
到Integer.MAX_VALUE
(实际上是100%)。 这些PoolChunkList
通过prevList
和nextList
链接起来,方便在分配和释放时根据PoolChunk
的使用率变化将其移动到合适的PoolChunkList
中。
内存分配 (allocate
)
内存分配是 PoolArena
的核心功能。
PoolArena.java
// ...PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {PooledByteBuf<T> buf = newByteBuf(maxCapacity); // 创建一个 PooledByteBuf 对象 (具体类型由子类决定)allocate(cache, buf, reqCapacity); // 调用内部的分配方法return buf;}private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {final int sizeIdx = sizeClass.size2SizeIdx(reqCapacity); // 根据请求容量计算规格索引if (sizeIdx <= sizeClass.smallMaxSizeIdx) { // Small 类型分配tcacheAllocateSmall(cache, buf, reqCapacity, sizeIdx);} else if (sizeIdx < sizeClass.nSizes) { // Normal 类型分配tcacheAllocateNormal(cache, buf, reqCapacity, sizeIdx);} else { // Huge 类型分配 (大于 chunkSize)int normCapacity = sizeClass.directMemoryCacheAlignment > 0? sizeClass.normalizeSize(reqCapacity) : reqCapacity;// Huge allocations are never served via the cache so just call allocateHugeallocateHuge(buf, normCapacity);}}
// ...
allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity)
: 这是外部调用的入口。它首先通过newByteBuf(maxCapacity)
创建一个PooledByteBuf
实例(具体是PooledHeapByteBuf
还是PooledDirectByteBuf
等由子类实现),然后调用内部的allocate
方法来实际分配内存。allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity)
:- 首先,根据请求容量
reqCapacity
计算出对应的sizeIdx
(size index)。 - Small Allocation: 如果
sizeIdx
小于等于smallMaxSizeIdx
(通常是pageSize / 2
对应的索引),则认为是小内存分配,调用tcacheAllocateSmall
。 - Normal Allocation: 如果
sizeIdx
大于smallMaxSizeIdx
但小于nSizes
(总规格数,对应chunkSize
的索引),则认为是普通内存分配,调用tcacheAllocateNormal
。 - Huge Allocation: 如果
sizeIdx
超出了nSizes
,表示请求的内存大于chunkSize
,则认为是大内存分配,调用allocateHuge
。大内存分配不会使用线程缓存,并且会创建一个独立的、非池化的PoolChunk
。
- 首先,根据请求容量
tcacheAllocateSmall
(Small 类型分配)
PoolArena.java
// ...private void tcacheAllocateSmall(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,final int sizeIdx) {if (cache.allocateSmall(this, buf, reqCapacity, sizeIdx)) { // 尝试从线程缓存分配// was able to allocate out of the cache so move onreturn;}/** Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and* {@link PoolChunk#free(long)} may modify the doubly linked list as well.*/final PoolSubpage<T> head = smallSubpagePools[sizeIdx]; // 获取对应 sizeIdx 的 Subpage 链表头final boolean needsNormalAllocation;head.lock(); // 对 Subpage 链表头加锁try {final PoolSubpage<T> s = head.next;needsNormalAllocation = s == head; // 如果链表为空,则需要进行 Normal Allocation 来创建新的 Subpageif (!needsNormalAllocation) {assert s.doNotDestroy && s.elemSize == sizeClass.sizeIdx2size(sizeIdx) : "doNotDestroy=" +s.doNotDestroy + ", elemSize=" + s.elemSize + ", sizeIdx=" + sizeIdx;long handle = s.allocate(); // 从 Subpage 中分配一个元素assert handle >= 0;s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity, cache); // 初始化 ByteBuf}} finally {head.unlock();}if (needsNormalAllocation) { // 如果没有可用的 Subpagelock(); // 获取 Arena 的全局锁try {allocateNormal(buf, reqCapacity, sizeIdx, cache); // 进行 Normal Allocation (可能会创建新的 Chunk 和 Subpage)} finally {unlock();}}incSmallAllocation(); // 增加 Small 分配计数}
// ...
- 首先尝试从
PoolThreadCache
中分配。如果成功,则直接返回。 - 如果线程缓存分配失败,则从
smallSubpagePools
中查找对应sizeIdx
的PoolSubpage
链表。 - 对该链表的头节点
head
加锁(head.lock()
),这是为了保护PoolSubpage
链表的并发修改。 - 如果链表中有可用的
PoolSubpage
(s != head
),则从该PoolSubpage
(s
) 中调用s.allocate()
分配一个元素(得到一个handle
),然后用这个handle
初始化PooledByteBuf
。 - 如果链表为空 (
s == head
),说明当前没有合适的PoolSubpage
可供分配。此时,needsNormalAllocation
为true
。 - 释放
head
的锁。 - 如果
needsNormalAllocation
为true
,则需要进行一次“普通分配”(allocateNormal
)。这通常意味着需要从某个PoolChunk
中分配一个新的PoolSubpage
。这个过程需要获取PoolArena
的全局锁 (lock()
)。 - 最后,增加
allocationsSmall
计数。
tcacheAllocateNormal
(Normal 类型分配)
PoolArena.java
// ...private void tcacheAllocateNormal(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity,final int sizeIdx) {if (cache.allocateNormal(this, buf, reqCapacity, sizeIdx)) { // 尝试从线程缓存分配// was able to allocate out of the cache so move onreturn;}lock(); // 获取 Arena 的全局锁try {allocateNormal(buf, reqCapacity, sizeIdx, cache); // 进行 Normal Allocation++allocationsNormal; // 增加 Normal 分配计数} finally {unlock();}}
// ...
- 首先尝试从
PoolThreadCache
中分配。如果成功,则直接返回。 - 如果线程缓存分配失败,则获取
PoolArena
的全局锁 (lock()
)。 - 调用
allocateNormal
方法进行实际的分配。 - 增加
allocationsNormal
计数。 - 释放锁。
allocateNormal
(核心普通分配逻辑)
// ...private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache threadCache) {assert lock.isHeldByCurrentThread(); // 确认当前线程已持有 Arena 锁if (q050.allocate(buf, reqCapacity, sizeIdx, threadCache) || // 尝试从 q050 分配q025.allocate(buf, reqCapacity, sizeIdx, threadCache) || // 尝试从 q025 分配q000.allocate(buf, reqCapacity, sizeIdx, threadCache) || // 尝试从 q000 分配qInit.allocate(buf, reqCapacity, sizeIdx, threadCache) || // 尝试从 qInit 分配q075.allocate(buf, reqCapacity, sizeIdx, threadCache)) { // 最后尝试从 q075 分配 (q100 通常是满的)return;}// Add a new chunk.// 如果所有 PoolChunkList 都分配失败,则创建一个新的 PoolChunkPoolChunk<T> c = newChunk(sizeClass.pageSize, sizeClass.nPSizes, sizeClass.pageShifts, sizeClass.chunkSize);boolean success = c.allocate(buf, reqCapacity, sizeIdx, threadCache); // 从新 Chunk 中分配assert success; // 新创建的 Chunk 必然能分配成功qInit.add(c); // 将新 Chunk 加入到 qInit 列表++pooledChunkAllocations; // 增加 Chunk 分配计数}
// ...
此方法在持有 PoolArena
全局锁的情况下执行:
- 按顺序尝试从
PoolChunkList
分配:q050
(50-100% usage)q025
(25-75% usage)q000
(1-50% usage)qInit
(newly created chunks, <25% usage)q075
(75-100% usage)- (
q100
列表中的PoolChunk
通常是满的,但仍可能用于分配PoolSubpage
,这个逻辑在PoolChunkList.allocate
->PoolChunk.allocate
中处理)。 这个顺序的目的是优先使用那些已经分配了一部分内存的PoolChunk
,以期更快地填满它们,从而减少内存碎片。
- 如果上述所有
PoolChunkList
都无法成功分配(即它们内部的PoolChunk
都没有足够的空间或者无法分配出请求大小的内存块/子页),则需要创建一个新的PoolChunk
。 - 创建新
PoolChunk
: 调用newChunk(...)
方法(由子类HeapArena
或DirectArena
实现)创建一个新的PoolChunk
。 - 从这个新创建的
PoolChunk
中调用c.allocate(...)
来分配内存给buf
。新创建的PoolChunk
肯定是空的,所以这次分配一定会成功。 - 将新创建的
PoolChunk
添加到qInit
列表中。 - 增加
pooledChunkAllocations
计数。
为什么对于chunkList 是这样的分配顺序
观察到的分配顺序是 q050 , q025 , q000 , qInit , 最后是 q075 。这个顺序并非简单地按利用率从低到高或从高到低,而是基于一种旨在减少内存碎片的“最佳适应”(Best-Fit)策略的变体,这种策略深受 jemalloc 内存分配器的影响。
首先,我们先明确一下这些 PoolChunkList (也就是 q 系列的链表)各自管理的 PoolChunk 的内存使用率范围。根据 PoolArena.java
中的定义:
-
qInit : 使用率低于 25% 的 Chunk (通常是新创建的)。
-
q000 : 使用率在 1% 到 50% 之间的 Chunk 。
-
q025 : 使用率在 25% 到 75% 之间的 Chunk 。
-
q050 : 使用率在 50% 到 100% 之间的 Chunk 。
-
q075 : 使用率在 75% 到 100% 之间的 Chunk 。
-
q100 : 使用率达到 100% 的 Chunk (已满)。
现在我们来分析这个分配顺序:
-
主要分配策略 ( q050 -> q025 -> q000 -> qInit ) : 这个顺序是从使用率较高的 Chunk 列表开始,逐步到使用率较低的列表。这体现了“最佳适应”的思想。 目标是优先填满那些已经分配了较多内存的 Chunk 。这样做的好处是:
-
减少内存碎片 :通过集中在少数 Chunk 中进行分配,可以更快地将它们填满(达到100%使用率),然后将它们移到 q100 链表中。一个全满的 Chunk 不再参与后续的分配,当它内部的所有 ByteBuf 都被释放后,这个 Chunk 就可以被完全回收,将内存归还给操作系统。
-
提高效率 :如果总是从利用率最低的 Chunk (如 qInit )开始分配,会导致大量 Chunk 都处于“部分使用”的状态,使得内存碎片化严重,难以分配较大的连续内存块,并且降低了内存归还给系统的可能性。
-
-
q075 为什么排在最后 : q075 列表中的 Chunk 使用率已经非常高(75%-100%),意味着它们的剩余空间很小。 allocateNormal 方法用于分配“正常大小”的内存块(大于 small ,小于 huge ),这种大小的内存在一个几乎已满的 Chunk 中找到合适空间的概率较低。
-
性能优化 :将 q075 放在最后检查是一种性能优化。与其一开始就徒劳地在这些几乎已满的 Chunk 中搜索,不如先尝试其他更有可能成功的 Chunk 列表。只有当其他所有列表都无法满足分配请求时,才最后尝试在这些“残羹剩饭”中寻找机会。这减少了不必要的搜索开销。
-
总结来说,Netty 的这个分配顺序是一个精心设计的权衡:
-
主体顺序 ( q050 -> qInit ) 是为了 对抗内存碎片 ,倾向于“物尽其用”,尽快填满并回收 Chunk 。
-
q075 的特殊位置 是为了 提升分配性能 ,避免在成功率低的 Chunk 上浪费时间。
这种设计使得 PooledByteBufAllocator 在高并发和长时间运行的场景下依然能保持高效和稳定的内存管理。
allocateHuge
(Huge 类型分配)
内存池的主要优势在于复用频繁申请和释放的小到中等大小的内存块,以避免频繁向操作系统申请内存和垃圾回收带来的开销。
对于“巨大”(Huge)的内存分配(通常大于 chunkSize ,默认16MB),这种分配本身就不频繁。如果将这些巨大的内存块池化,意味着Netty需要长期持有一个或多个非常大的内存块,即使它们在大部分时间里是空闲的。这会导致严重的内存资源浪费,尤其是在高并发环境下,可能会长时间占用大量内存,降低了整体的内存使用效率。
// ...private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {PoolChunk<T> chunk = newUnpooledChunk(reqCapacity); // 创建一个非池化的、大小刚好满足需求的 ChunkactiveBytesHuge.add(chunk.chunkSize()); // 增加 Huge 类型活跃字节数buf.initUnpooled(chunk, reqCapacity); // 用这个非池化 Chunk 初始化 ByteBufallocationsHuge.increment(); // 增加 Huge 分配计数}
// ...
- 调用
newUnpooledChunk(reqCapacity)
创建一个非池化的PoolChunk
。这个PoolChunk
的大小就是reqCapacity
,它不会被放入任何PoolChunkList
中,也不会被其他分配请求共享。 - 更新
activeBytesHuge
和allocationsHuge
计数。 - 调用
buf.initUnpooled(...)
来初始化PooledByteBuf
。
内存释放 (free
)
// ...void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {chunk.decrementPinnedMemory(normCapacity); // 减少 Chunk 的 pinned 内存计数if (chunk.unpooled) { // 如果是 Huge Allocation (非池化 Chunk)int size = chunk.chunkSize();destroyChunk(chunk); // 直接销毁 ChunkactiveBytesHuge.add(-size); // 更新 Huge 类型活跃字节数deallocationsHuge.increment(); // 更新 Huge 类型释放计数} else { // 池化 ChunkSizeClass sizeClass = sizeClass(handle); // 判断是 Small 还是 Normal 释放if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {// 尝试将内存在线程缓存中缓存起来// cached so not free it.return;}// 如果线程缓存失败或没有缓存,则真正释放回 ArenafreeChunk(chunk, handle, normCapacity, sizeClass, nioBuffer, false);}}private static SizeClass sizeClass(long handle) {return isSubpage(handle) ? SizeClass.Small : SizeClass.Normal;}void freeChunk(PoolChunk<T> chunk, long handle, int normCapacity, SizeClass sizeClass, ByteBuffer nioBuffer,boolean finalizer) {final boolean destroyChunk;lock(); // 获取 Arena 全局锁try {// We only call this if freeChunk is not called because of the PoolThreadCache finalizer as otherwise this// may fail due lazy class-loading in for example tomcat.if (!finalizer) { // 如果不是由 finalizer 触发的释放switch (sizeClass) {case Normal:++deallocationsNormal;break;case Small:++deallocationsSmall;break;default:throw new Error();}}// 调用 PoolChunkList 的 free 方法,该方法内部会调用 PoolChunk 的 free// 如果 PoolChunk 完全空闲,则 !chunk.parent.free(...) 返回 true,表示需要销毁 ChunkdestroyChunk = !chunk.parent.free(chunk, handle, normCapacity, nioBuffer);if (destroyChunk) {// all other destroyChunk calls come from the arena itself being finalized, so don't need to be counted++pooledChunkDeallocations; // 更新 Chunk 释放计数}} finally {unlock();}if (destroyChunk) {// destroyChunk not need to be called while holding the synchronized lock.destroyChunk(chunk); // 在锁外销毁 Chunk}}
// ...
free(...)
:- 首先减少
chunk
的pinnedMemory
计数。 - 非池化 Chunk (Huge): 如果
chunk.unpooled
为true
,说明是为大内存分配创建的独立PoolChunk
。直接调用destroyChunk(chunk)
销毁它,并更新相应的 Huge 类型计数器。 - 池化 Chunk:
- 通过
isSubpage(handle)
判断是Small
还是Normal
类型的释放。 - 尝试将这块内存添加到
PoolThreadCache
中。如果添加成功,则直接返回,内存被缓存了。 - 如果缓存失败或没有线程缓存,则调用
freeChunk(...)
将内存真正释放回PoolArena
。
- 通过
- 首先减少
freeChunk(...)
:- 获取
PoolArena
的全局锁。 - 更新
deallocationsNormal
或deallocationsSmall
计数(如果不是由finalizer
触发)。 - 调用
chunk.parent.free(...)
,这里的chunk.parent
是指该PoolChunk
所在的PoolChunkList
。PoolChunkList.free(...)
方法会进一步调用PoolChunk.free(handle)
来释放PoolChunk
内部的内存。如果PoolChunk
在释放这块内存后变为空闲(freeBytes == chunkSize
),PoolChunkList.free(...)
会将该PoolChunk
从链表中移除,并返回false
。因此,destroyChunk
变量会是true
。 - 如果
destroyChunk
为true
,增加pooledChunkDeallocations
计数。 - 释放锁。
- 如果
destroyChunk
为true
,则在锁外部调用destroyChunk(chunk)
来实际销毁PoolChunk
(例如,释放底层的ByteBuffer
或byte[]
)。
- 获取
内存重分配 (reallocate
)
reallocate 函数主要在 PooledByteBuf 的容量需要改变,并且无法在当前已分配的内存块( PoolChunk )内完成调整时被调用。具体来说,调用链是这样的:
-
当调用 ByteBuf.capacity(int newCapacity) 方法来调整一个池化的 ByteBuf (即 PooledByteBuf )的大小时。
-
在
capacity
方法内部,会检查新的容量 newCapacity 是否可以直接在当前内存区域( maxLength )内容纳。 -
如果 newCapacity 超出了当前内存块能支持的范围(例如,比当前 length 大,且大于 maxLength ),或者不满足一些特定的收缩条件,就需要进行真正的内存重分配。此时,它会调用 chunk.arena.reallocate(this, newCapacity) ,也就是我们正在讨论的
reallocate
方法。
// ...void reallocate(PooledByteBuf<T> buf, int newCapacity) {assert newCapacity >= 0 && newCapacity <= buf.maxCapacity();final int oldCapacity;final PoolChunk<T> oldChunk;final ByteBuffer oldNioBuffer;final long oldHandle;final T oldMemory;final int oldOffset;final int oldMaxLength;final PoolThreadCache oldCache;// We synchronize on the ByteBuf itself to ensure there is no "concurrent" reallocations for the same buffer.// ...synchronized (buf) { // 对 ByteBuf 对象本身加锁,防止对同一个 buf 的并发重分配oldCapacity = buf.length;if (oldCapacity == newCapacity) {return;}// 保存旧 buf 的信息oldChunk = buf.chunk;oldNioBuffer = buf.tmpNioBuf;oldHandle = buf.handle;oldMemory = buf.memory;oldOffset = buf.offset;oldMaxLength = buf.maxLength;oldCache = buf.cache;// This does not touch buf's reader/writer indices// 为 buf 分配新的内存空间allocate(parent.threadCache(), buf, newCapacity);}int bytesToCopy;if (newCapacity > oldCapacity) {bytesToCopy = oldCapacity;} else {buf.trimIndicesToCapacity(newCapacity); // 如果新容量更小,调整读写指针bytesToCopy = newCapacity;}memoryCopy(oldMemory, oldOffset, buf, bytesToCopy); // 将旧内存数据拷贝到新内存free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, oldCache); // 释放旧的内存块}
// ...
reallocate 函数的核心作用是“搬家”:为 ByteBuf 申请一块新的、符合 newCapacity 大小的内存,将旧内存中的有效数据复制过去,然后释放旧的内存。这个过程确保了 ByteBuf 扩容或缩容时数据的完整性。
其主要步骤如下:
-
线程安全保障 : synchronized (buf) 会锁定当前的 ByteBuf 实例,防止多个线程同时对同一个 ByteBuf 进行重分配,避免状态错乱。
-
保存旧内存信息 :记录下当前 ByteBuf 的旧容量、所属的 PoolChunk 、句柄 handle 、内存对象 memory 等所有与旧内存位置相关的信息。
-
分配新内存 :调用 allocate(parent.threadCache(), buf, newCapacity) 方法,从内存池中申请一块新的内存。这个调用会更新 buf 对象内部的字段(如 chunk , handle , memory 等),使其指向新的内存位置。
-
数据拷贝 :调用 memoryCopy(...) 方法,将旧内存中的数据拷贝到新分配的内存中。拷贝的字节数是新旧容量中较小的那一个,以防止越界。
-
释放旧内存 :调用 free(...) 方法,将之前保存的旧内存块信息传入,把旧内存归还给内存池,以便后续复用。
HeapArena
(堆内存)
lastDestroyedChunk
:HeapArena
会尝试缓存最后一个被销毁的PoolChunk<byte[]>
。当需要创建新的PoolChunk
时,如果参数匹配,会复用这个缓存的PoolChunk
,避免重新分配byte[]
数组的开销。newByteArray(int size)
: 使用PlatformDependent.allocateUninitializedArray(size)
来分配字节数组,这可能比new byte[size]
更高效,因为它可能不会对数组内容进行零初始化(取决于 JVM 实现和-XX:+AlwaysZeroTLAB
等标志)。newChunk(...)
: 尝试从lastDestroyedChunk
复用,否则创建新的PoolChunk
,其内存是新分配的byte[]
。newUnpooledChunk(...)
: 创建新的PoolChunk
,其内存是新分配的byte[]
。destroyChunk(...)
: 如果是池化的PoolChunk
并且lastDestroyedChunk
为空,则将其缓存起来。否则依赖 GC 回收byte[]
。newByteBuf(...)
: 根据PlatformDependent.hasUnsafe()
的结果,创建PooledUnsafeHeapByteBuf
或PooledHeapByteBuf
。memoryCopy(...)
: 使用System.arraycopy
进行内存拷贝。
HeapArena
是 Netty 高性能内存池在堆内存管理上的具体体现。它通过继承 PoolArena
的通用分级管理框架(PoolChunkList
和 PoolSubpage
),并结合线程缓存(PoolThreadCache
)机制,实现了高效的内存分配与回收。
其针对堆内存的特有实现,如复用 PoolChunk
对象和使用 System.arraycopy
,进一步优化了性能,有效降低了 GC 压力和内存碎片,是 Netty 实现高性能网络通信的重要基石之一。
HeapArena
继承自 PoolArena
,因此其核心结构与 PoolArena
保持一致,主要包括:
-
smallSubpagePools
:PoolSubpage<byte[]>[]
数组,用于管理小规格内存(小于pageSize
)的分配。每个PoolSubpage
内部通过位图(bitmap)来跟踪更小内存块(element)的分配状态,实现了对小内存的高效利用。 -
PoolChunkList
链表(qInit
、q000
、q025
、q050
、q075
、q100
):这些链表用于管理不同内存使用率区间的PoolChunk
。PoolChunk
是内存池的基本分配单元(默认为 16MB)。通过将PoolChunk
按使用率分组,可以快速找到合适的Chunk
进行内存分配,实现了分级管理。 -
SizeClasses
:一个用于规格化请求容量的工具类,它将任意大小的内存请求映射到预定义的规格(size index),并提供相关的计算方法。 - 锁机制:使用
ReentrantLock
来保证在多线程环境下对Arena
内部数据结构访问的线程安全。
内存分配 (allocate)
HeapArena
作为 PoolArena
的子类,重写了几个关键的抽象方法,以适配堆内存的特性:
-
isDirect()
:返回false
,表明它管理的是非直接内存(堆内存)。 -
newChunk(...)
:创建一个新的PoolChunk<byte[]>
。值得注意的是,这里有一个优化:它会尝试复用一个最近被销毁的PoolChunk
(通过lastDestroyedChunk
字段)。如果复用失败,则通过newByteArray(chunkSize)
创建一个新的byte[]
数组作为PoolChunk
的底层内存。
private final AtomicReference<PoolChunk<byte[]>> lastDestroyedChunk;protected PoolChunk<byte[]> newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {PoolChunk<byte[]> chunk = lastDestroyedChunk.getAndSet(null);if (chunk != null) {assert chunk.chunkSize == chunkSize &&chunk.pageSize == pageSize &&chunk.maxPageIdx == maxPageIdx &&chunk.pageShifts == pageShifts;return chunk; // The parameters are always the same, so it's fine to reuse a previously allocated chunk.}return new PoolChunk<byte[]>(this, null, null, newByteArray(chunkSize), pageSize, pageShifts, chunkSize, maxPageIdx);}
-
newUnpooledChunk(...)
:创建一个用于大内存分配的非池化PoolChunk
,底层同样是byte[]
。 -
newByteArray(...)
:内部调用PlatformDependent.allocateUninitializedArray(size)
来分配byte[]
数组。使用allocateUninitializedArray
可以避免数组初始化时的额外开销。 -
destroyChunk(...)
:销毁一个PoolChunk
。对于堆内存,销毁操作实际上依赖于 GC。但为了性能,HeapArena
会尝试缓存最后一个被销毁的Chunk
(lastDestroyedChunk.set(chunk)
),以便在下次创建新Chunk
时可以复用。 -
newByteBuf(...)
:根据PlatformDependent.hasUnsafe()
的结果,创建PooledUnsafeHeapByteBuf
或PooledHeapByteBuf
实例。 -
memoryCopy(...)
:使用System.arraycopy
来实现内存复制,这是针对byte[]
数组最高效的方式。
DirectArena
DirectArena 是 PoolArena
的一个静态内部类,专门用于管理堆外内存(Direct Memory),其管理的内存类型是 java.nio.ByteBuffer
。它继承了 PoolArena<ByteBuffer>
,复用了 PoolArena
中通用的内存池管理算法(如伙伴算法的变体、多层级的 PoolChunkList
管理等),但对直接内存的分配、释放和操作等关键部分提供了专门的实现。
类的定义和构造函数
static final class DirectArena extends PoolArena<ByteBuffer> {DirectArena(PooledByteBufAllocator parent, SizeClasses sizeClass) {super(parent, sizeClass);}
}
-
extends PoolArena<ByteBuffer>
泛型参数<ByteBuffer>
表明这个 Arena 管理的底层内存是ByteBuffer
对象,与管理byte[]
的HeapArena
形成对比。 -
static final
static
意味着DirectArena
的实例不依赖于外部PoolArena
的实例;final
表示它不能被继承。
内存块的创建(newChunk
和 newUnpooledChunk
)
@Override
protected PoolChunk<ByteBuffer> newChunk(int pageSize, int maxPageIdx, int pageShifts, int chunkSize) {if (sizeClass.directMemoryCacheAlignment == 0) {CleanableDirectBuffer cleanableDirectBuffer = allocateDirect(chunkSize);return new PoolChunk<ByteBuffer>(this, cleanableDirectBuffer, memory, memory, pageSize, pageShifts, chunkSize, maxPageIdx);}CleanableDirectBuffer cleanableDirectBuffer = allocateDirect(chunkSize + sizeClass.directMemoryCacheAlignment);final ByteBuffer memory = PlatformDependent.alignDirectBuffer(base, sizeClass.directMemoryCacheAlignment);return new PoolChunk<ByteBuffer>(this, cleanableDirectBuffer, base, memory, pageSize, pageShifts, chunkSize, maxPageIdx);
}
- 核心功能
与HeapArena
使用new byte[chunkSize]
不同,DirectArena
通过PlatformDependent.allocateDirect(capacity)
分配堆外内存(通常调用ByteBuffer.allocateDirect()
)。 - 内存对齐(
directMemoryCacheAlignment
)
若directMemoryCacheAlignment > 0
,会分配稍大的内存并通过PlatformDependent.alignDirectBuffer()
对齐地址,优化 CPU 缓存行利用,防止伪共享(False Sharing)。 -
CleanableDirectBuffer
包装分配的内存,利用 Java 9+ 的Cleaner
(或旧版类似机制)确保PoolChunk
和ByteBuffer
被垃圾回收时,底层堆外内存可靠释放,防止泄漏。
内存块的销毁和创建
@Override
protected void destroyChunk(PoolChunk<ByteBuffer> chunk) {chunk.cleanable.clean();
}
- 与
HeapArena
的区别
HeapArena
会缓存最后销毁的PoolChunk
以供复用(lastDestroyedChunk
),而DirectArena
直接调用chunk.cleanable.clean()
立即释放内存。
理解为什么可以“销毁”,我们需要知道 destroyChunk 是在什么条件下被触发的。
- 当一个 PoolChunk 内的所有内存都被释放后,它的使用率会降为 0% ( usage() == 0 )。此时, PoolChunkList 的 free 方法会尝试将这个完全空闲的 chunk 移动到前一个 PoolChunkList (即使用率更低的 List )。
- 然而,对于管理着 0%-25% 使用率的 qInit 这个 PoolChunkList 来说,它的 prevList 是 null 。当它试图移动一个使用率为 0 的 chunk 时, move0 方法会因为 prevList == null 而返回 false 。这个 false 返回值会一路传递,最终导致 PoolArena 调用 destroyChunk 。
因此 当destroyChunk 被调用,意味着这个 PoolChunk 已经完全变空了 。它不再服务于任何内存分配。
ByteBuf
的创建(newByteBuf
)
@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {if (HAS_UNSAFE) {return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);} else {return PooledDirectByteBuf.newInstance(maxCapacity);}
}
- 创建策略
根据PlatformDependent.hasUnsafe()
结果选择实现:-
PooledUnsafeDirectByteBuf
(支持sun.misc.Unsafe
):使用Unsafe
API 直接操作内存地址,性能最高。 -
PooledDirectByteBuf
(不支持Unsafe
):回退到标准ByteBuffer
API(get
/put
),兼容性更好。
-
内存复制(memoryCopy
)
@Override
protected void memoryCopy(ByteBuffer src, int srcOffset, PooledByteBuf<ByteBuffer> dstBuf, int length) {if (HAS_UNSAFE) {PlatformDependent.copyMemory(PlatformDependent.directBufferAddress(src) + srcOffset,PlatformDependent.directBufferAddress(dstBuf.memory) + dstBuf.offset, length);} else {src.position(srcOffset).limit(srcOffset + length);dst.position(dstBuf.offset);dst.put(src);}
}
- 高性能复制
- 支持
Unsafe
时调用PlatformDependent.copyMemory
(底层为Unsafe.copyMemory
),实现高效内存块复制。 - 否则回退到
ByteBuffer.put(ByteBuffer)
的标准方法。
- 支持
度量信息 (Metrics)
PoolArena
实现了 PoolArenaMetric
接口,提供了大量关于内存池状态的度量信息,例如:
numThreadCaches()
: 使用此 Arena 的线程缓存数量。numSmallSubpages()
,numChunkLists()
: Subpage 和 ChunkList 的数量。smallSubpages()
,chunkLists()
: 返回 Subpage 和 ChunkList 的度量信息列表。numAllocations()
,numDeallocations()
: 总的分配和释放次数。numSmallAllocations()
,numNormalAllocations()
,numHugeAllocations()
: 不同类型的分配次数。numChunkAllocations()
,numChunkDeallocations()
: Chunk 的分配和释放次数。numActiveAllocations()
,numActiveSmallAllocations()
, etc.: 当前活跃的分配数量。numActiveBytes()
: 当前活跃的总字节数。numPinnedBytes()
: 当前被固定的字节数(用于直接 I/O 等)。
这些方法大多通过读取内部的计数器或遍历 chunkListMetrics
来获取数据。访问某些计数器(如 allocationsNormal
)时会加锁。
总结
PoolArena
是 Netty jemalloc 风格内存池的核心,它通过管理 PoolChunk
和 PoolSubpage
来高效地分配和释放内存。
- 分级管理: 将内存请求分为 Small, Normal, Huge 三种类型,采用不同的分配策略。
- Chunk 列表: 使用多个
PoolChunkList
(qInit, q000, q025, q050, q075, q100) 根据PoolChunk
的使用率对其进行组织,优化分配查找。 - Subpage 池: 对 Small 类型的分配,使用
PoolSubpage
进一步细化内存管理,减少碎片。 - 线程缓存: 与
PoolThreadCache
配合,为每个线程提供本地缓存,极大减少了对PoolArena
全局锁的竞争。 - 同步: 使用
ReentrantLock
保护 Arena 级别的共享数据,使用PoolSubpage
自身的锁保护其内部链表,使用synchronized(buf)
保护reallocate
操作。 - 堆外/堆内支持: 通过
HeapArena
和DirectArena
子类分别支持堆内存和直接内存的池化。 - 度量: 提供丰富的度量信息,方便监控和调优。
PoolArena
的设计体现了 Netty 在高性能网络编程中对内存管理的极致追求,通过精细化的管理和多层次的缓存来提高内存分配效率并减少GC压力。