SpmcArrayQueue
是 JCTools 中为 单生产者-多消费者(Single-Producer-Multi-Consumer) 场景设计的有界队列。与 SPSC 模型相比,SPMC 的复杂性主要体现在消费者侧,因为多个消费者线程需要以线程安全的方式竞争消费同一个队列中的元素。
单生产者-单消费者数组队列 分析见:JCTools Spsc:单生产者-单消费者无锁队列
SpmcArrayQueue
的继承链同样是为了精确控制内存布局,但其侧重点与 SpscArrayQueue
有所不同,它需要处理多消费者对 consumerIndex
的争用。
-
SpmcArrayQueueL1Pad
&SpmcArrayQueueProducerIndexField
:- 与 SPSC 类似,这部分定义了生产者索引
producerIndex
,并用L1Pad
将其与上游的“冷”字段(如buffer
,mask
)隔离开。 - 由于只有一个生产者,
producerIndex
的更新逻辑相对简单,不需要 CAS 操作,使用putOrderedLong
即可。
- 与 SPSC 类似,这部分定义了生产者索引
-
SpmcArrayQueueL2Pad
&SpmcArrayQueueConsumerIndexField
:- 核心变化点:
SpmcArrayQueueConsumerIndexField
中不再有soConsumerIndex
方法,取而代之的是casConsumerIndex
。// ... existing code ... //$gen:ordered-fields abstract class SpmcArrayQueueConsumerIndexField<E> extends SpmcArrayQueueL2Pad<E> {protected final static long C_INDEX_OFFSET = fieldOffset(SpmcArrayQueueConsumerIndexField.class, "consumerIndex");private volatile long consumerIndex;// ... existing code ...@Overridepublic final long lvConsumerIndex(){return consumerIndex;}final boolean casConsumerIndex(long expect, long newValue){return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue);} } // ... existing code ...
- 原因:因为有多个消费者,它们必须通过 CAS(Compare-And-Swap) 操作来原子性地更新
consumerIndex
,以确保只有一个消费者能成功获取并消费一个元素。L2Pad
在此的作用依然是隔离生产者和消费者的热点字段。
- 核心变化点:
-
SpmcArrayQueueMidPad
&SpmcArrayQueueProducerIndexCacheField
:- SPMC 特有的优化:这里引入了一个新的字段
producerIndexCache
。// ... existing code ... //$gen:ordered-fields abstract class SpmcArrayQueueProducerIndexCacheField<E> extends SpmcArrayQueueMidPad<E> {// This is separated from the consumerIndex which will be highly contended in the hope that this value spends most// of it's time in a cache line that is Shared(and rarely invalidated)private volatile long producerIndexCache;// ... existing code ... } // ... existing code ...
producerIndex
会被生产者频繁更新(每次 offer 都会更新)producerIndexCache
只在消费者发现缓存过期时才更新,因此减少了对producerIndex
的 volatile 读取,降低了缓存一致性流量的争用MidPad
的作用就是将这个消费者侧的缓存(producerIndexCache
)与消费者侧的争用点(consumerIndex
)分离开,避免它们互相干扰。
- SPMC 特有的优化:这里引入了一个新的字段
-
SpmcArrayQueueL3Pad
: 最后的填充,隔离producerIndexCache
和SpmcArrayQueue
自身的字段。
offer(E e)
:简单而直接
由于只有一个生产者,offer
的逻辑比 SPSC 还要简单,因为它不需要“前瞻优化”(producerLimit
)。生产者只需要检查目标槽位是否为空即可。
// ... existing code ...@Overridepublic boolean offer(final E e){if (null == e){throw new NullPointerException();}final E[] buffer = this.buffer;final long mask = this.mask;final long currProducerIndex = lvProducerIndex(); // 1. 获取当前生产者索引final long offset = calcCircularRefElementOffset(currProducerIndex, mask);// 2. 检查槽位是否被消费者释放if (null != lvRefElement(buffer, offset)){// 如果槽位不为空,说明队列满了long size = currProducerIndex - lvConsumerIndex();if (size > mask){return false;}else{// 等待消费者释放该槽位 (这会破坏无等待性)while (null != lvRefElement(buffer, offset)){// BURN}}}// 3. 放置元素并更新索引soRefElement(buffer, offset, e);soProducerIndex(currProducerIndex + 1); // 使用 store-ordered 更新return true;}
// ... existing code ...
SPMC 特性分析:
- 单生产者权威:生产者是唯一能推进
producerIndex
的线程,所以它只需lvProducerIndex()
读取自己的进度,然后用soProducerIndex()
更新即可,无需 CAS。 - 依赖消费者:生产者通过
lvRefElement
检查目标槽位是否为null
来判断队列是否已满。这个null
是由消费者在消费后写入的。 - 潜在的自旋等待:代码中有一段
while
循环等待。这通常发生在消费者进度稍稍落后于生产者进度,但队列并未完全满的情况下。生产者会在此“自旋”等待消费者完成对该槽位的消费和清理。这是SpmcArrayQueue
的一个关键特性,它牺牲了一定的无等待性(Wait-Free)来简化设计。
poll()
:竞争与缓存的艺术
poll
方法是 SPMC 模型的核心,完美展现了多消费者如何通过 CAS 和本地缓存来协同工作。
// ... existing code ...@Overridepublic E poll(){long currentConsumerIndex;// 1. 读取本地的生产者进度缓存long currProducerIndexCache = lvProducerIndexCache();do{// 2. 读取全局的消费者进度currentConsumerIndex = lvConsumerIndex();// 3. 快路径判断:使用本地缓存判断队列是否为空if (currentConsumerIndex >= currProducerIndexCache){// 4. 慢路径:本地缓存表明队列为空,需同步最新的生产者进度long currProducerIndex = lvProducerIndex();if (currentConsumerIndex >= currProducerIndex){return null; // 队列确实为空}else{// 更新本地缓存currProducerIndexCache = currProducerIndex;svProducerIndexCache(currProducerIndex);}}}// 5. CAS 竞争:尝试原子性地将 consumerIndex 加一while (!casConsumerIndex(currentConsumerIndex, currentConsumerIndex + 1));// 6. 成功获取元素return removeElement(buffer, currentConsumerIndex, mask);}
// ... existing code ...
SPMC 特性分析:
- 生产者进度缓存:每个消费者线程开始时都会读取
producerIndexCache
。这是一个本地快照,避免了每次都去访问真正的producerIndex
。 - 快慢路径分离:
- 快路径:只要
currentConsumerIndex < currProducerIndexCache
,消费者就认为队列中有元素,直接进入第5步的 CAS 竞争。这是绝大多数情况。 - 慢路径:当快路径条件不满足时,消费者必须通过
lvProducerIndex()
读取最新的生产者进度,并更新自己的本地缓存producerIndexCache
。
- 快路径:只要
- CAS 争用:
casConsumerIndex
是多消费者协调的核心。多个消费者线程可能同时读取到相同的currentConsumerIndex
,但只有一个能通过 CAS 操作成功地将其加一,从而“赢得”消费该位置元素的权利。失败的线程则会重新循环,读取新的consumerIndex
再次尝试。 - 无竞争取出:一旦一个消费者通过 CAS 成功预定了位置,它就可以安全地调用
removeElement
来取出元素。因为removeElement
操作的是一个已经被它“私有化”的索引,不会有其他消费者来干扰。
SpmcArrayQueue.peek() 方法详细分析
peek()
方法是队列中的一个重要操作,它允许查看队列头部元素但不移除该元素。
public E peek() {final E[] buffer = this.buffer;final long mask = this.mask;long currProducerIndexCache = lvProducerIndexCache();long currentConsumerIndex;long nextConsumerIndex = lvConsumerIndex();E e;do {currentConsumerIndex = nextConsumerIndex;if (currentConsumerIndex >= currProducerIndexCache) {long currProducerIndex = lvProducerIndex();if (currentConsumerIndex >= currProducerIndex) {return null;} else {currProducerIndexCache = currProducerIndex;svProducerIndexCache(currProducerIndex);}}e = lvRefElement(buffer, calcCircularRefElementOffset(currentConsumerIndex, mask));// sandwich the element load between 2 consumer index loadsnextConsumerIndex = lvConsumerIndex();} while (null == e || nextConsumerIndex != currentConsumerIndex);return e;
}
方法开始首先获取几个关键变量:
buffer
: 队列的底层数组,存储实际元素mask
: 用于计算循环队列位置的掩码值,通常是 capacity-1currProducerIndexCache
: 生产者索引的缓存值,这是一个重要的优化变量currentConsumerIndex
和nextConsumerIndex
: 消费者索引的当前值和下一个值
- 在
peek()
方法中,消费者首先检查producerIndexCache
- 只有当
currentConsumerIndex >= currProducerIndexCache
时才需要读取真实的producerIndex
- 大多数情况下,队列不为空时,消费者可以直接使用缓存值,避免读取主
producerIndex
-
性能影响:
- 直接读取
producerIndex
:每次都要从主内存读取,可能触发缓存失效 - 使用
producerIndexCache
:大部分时间从本地缓存读取,减少内存屏障和缓存一致性流量
- 直接读取
-
正确性保证:
- 虽然使用了缓存,但通过
lvProducerIndex()
的检查确保了最终一致性 - 当缓存可能过期时(
currentConsumerIndex >= currProducerIndexCache
),会重新读取真实值
- 虽然使用了缓存,但通过
缓存更新逻辑
if (currentConsumerIndex >= currProducerIndexCache) {long currProducerIndex = lvProducerIndex();if (currentConsumerIndex >= currProducerIndex) {return null; // 队列为空} else {currProducerIndexCache = currProducerIndex;svProducerIndexCache(currProducerIndex); // 更新缓存}
}
这段代码处理了两种情况:
- 队列为空:当消费者索引已经赶上或超过生产者索引时,返回 null
- 缓存过期:当缓存值小于实际生产者索引时,更新缓存值
元素加载
e = lvRefElement(buffer, calcCircularRefElementOffset(currentConsumerIndex, mask));
这行代码从缓冲区中加载元素:
calcCircularRefElementOffset
计算循环队列中的实际位置lvRefElement
是一个 volatile 加载操作,确保内存可见性
一致性检查
// sandwich the element load between 2 consumer index loads
nextConsumerIndex = lvConsumerIndex();
} while (null == e || nextConsumerIndex != currentConsumerIndex);
这是一个重要的并发控制机制,被称为"三明治加载":
- 在加载元素前获取消费者索引(
currentConsumerIndex
) - 加载元素本身
- 再次获取消费者索引(
nextConsumerIndex
)
通过比较两次获取的消费者索引是否相同,可以确保在加载元素过程中没有其他消费者修改了队列状态。如果不同,说明有其他消费者已经修改了队列,需要重试。
循环条件分析
while (null == e || nextConsumerIndex != currentConsumerIndex)
循环继续的条件有两个:
-
null == e
: 加载的元素为 null,可能是因为:- 队列确实为空
- 生产者正在写入元素但尚未完成
- 其他消费者已经取走了该元素
-
nextConsumerIndex != currentConsumerIndex
: 消费者索引在加载元素过程中被修改,说明有并发操作干扰
内存访问顺序
方法中的内存访问遵循特定顺序,确保正确的并发语义:
- 首先读取生产者索引缓存(
lvProducerIndexCache()
) - 读取消费者索引(
lvConsumerIndex()
) - 如果需要,读取实际生产者索引(
lvProducerIndex()
) - 读取队列元素(
lvRefElement()
) - 再次读取消费者索引(
lvConsumerIndex()
)
这种顺序确保了在多线程环境下能正确检测队列状态变化。
volatile 操作的作用
lvProducerIndexCache()
: volatile 读取,确保获取最新的缓存值lvConsumerIndex()
: volatile 读取,确保获取最新的消费者位置lvProducerIndex()
: volatile 读取,确保获取最新的生产者位置svProducerIndexCache()
: volatile 写入,确保缓存更新对所有线程可见
这些 volatile 操作确保了多线程间的内存可见性,防止出现不一致的视图。
循环优化
虽然方法中包含一个 do-while 循环,但在正常情况下(队列不为空且没有并发干扰),循环只会执行一次。只有在以下情况下才会多次循环:
- 队列为空
- 有并发消费者干扰
- 生产者正在写入元素但尚未完成
与其他方法的比较
peek() vs poll()
peek()
只查看元素但不移除poll()
查看并移除元素peek()
不需要修改消费者索引,而poll()
需要通过 CAS 操作更新消费者索引
peek() vs relaxedPeek()
@Override
public E relaxedPeek() {final E[] buffer = this.buffer;final long mask = this.mask;long currentConsumerIndex;long nextConsumerIndex = lvConsumerIndex();E e;do {currentConsumerIndex = nextConsumerIndex;e = lvRefElement(buffer, calcCircularRefElementOffset(currentConsumerIndex, mask));// sandwich the element load between 2 consumer index loadsnextConsumerIndex = lvConsumerIndex();}while (nextConsumerIndex != currentConsumerIndex);return e;
}
relaxedPeek()
是 peek()
的"宽松"版本:
- 不检查队列是否为空
- 不使用生产者索引缓存
- 只确保在加载元素过程中消费者索引没有变化
这使得 relaxedPeek()
性能更好,但可能在某些边界情况下行为不同(例如当队列为空时)。
适用场景
peek()
方法特别适用于以下场景:
- 检查队列内容:在不修改队列状态的情况下查看头部元素
- 多消费者环境:在有多个消费者线程的情况下,确保正确处理并发访问
- 性能敏感场景:通过缓存机制减少对共享变量的访问,提高性能
- 需要强一致性保证:相比
relaxedPeek()
,提供更强的一致性保证
总结
SpmcArrayQueue
相比 SpscArrayQueue
的核心特性和设计权衡在于:
- 多消费者协调:引入了 CAS 操作 (
casConsumerIndex
) 来解决多个消费者对consumerIndex
的争用问题,这是从 SPSC 到 SPMC 的根本性变化。 - 消费者侧缓存:增加了
producerIndexCache
字段,让每个消费者可以缓存生产者进度,大大减少了对producerIndex
的volatile
读,降低了缓存一致性流量,提升了消费者侧的性能。 - 内存布局的进一步细分:通过
MidPad
将consumerIndex
(极热争用点)和producerIndexCache
(消费者本地缓存)进行隔离,进一步优化了缓存性能。 - 性能权衡:在
offer
方法中,允许了短暂的自旋等待,牺牲了严格的无等待性,以换取更简单的生产者逻辑。
总而言之,SpmcArrayQueue
是一个通过精巧的内存布局、CAS竞争和消费者侧缓存机制,高效解决了单生产者、多消费者并发难题的优秀实现。