LinkedBlockingQueue
基本的入队出队
初始化
public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {// 静态内部类 Node,用于存储队列元素及维护节点间关系static class Node<E> {// 存储的实际元素,泛型 E 类型E item;/*** next 字段的三种可能情况说明:* - 真正的后继节点:正常队列链接场景,指向下一个节点* - 自己(this):出队操作时的特殊标记,用于辅助处理并发下的节点状态* - null:表示当前节点是队列的最后一个节点,没有后继*/Node<E> next;// 构造方法,初始化节点时设置存储的元素Node(E x) {item = x;}}}
初始化链表时,last
和 head
都指向新建的 Node<E>(null)
。该节点是 Dummy 节点(哑节点、哨兵节点) ,作用是占位,其 item
字段值为 null
,用于辅助 LinkedBlockingQueue
链表结构的初始化与操作(比如方便统一处理队列空、头尾指针等情况 )。
入队
当一个节点入队last = last.next = node;
再来一个节点入队last = last.next = node;
出队
// 出队核心逻辑(基于 LinkedBlockingQueue 链表结构)
public E dequeue() {// 以下是关键出队步骤,实际源码会结合锁、条件变量等保证线程安全,这里聚焦节点操作逻辑Node<E> h = head; // 1. 获取真正存储元素的首节点(head 是 dummy 节点,其 next 指向第一个有效节点)Node<E> first = h.next; // 2. 断开原 head 节点的链接,并让其 next 指向自身,辅助 GC 回收(避免内存泄漏)h.next = h; // 3. 更新 head 为真正的首节点(first),完成出队后队列头指针迁移head = first; // 4. 取出首节点存储的元素(队列要弹出的元素)E x = first.item; // 5. 清空首节点的元素引用(帮助 GC,断开元素引用链)first.item = null; // 6. 返回弹出的元素return x;
}
保存头节点:(h = head)
用变量h
暂存队列当前的head
(dummy 节点 )。获取有效首节点:(first = h.next)
通过h.next
拿到真正存储元素的第一个有效节点first
。断开旧头节点链接:(
h.next = h
)
让h.next = h
(自己指向自己 ),切断旧头节点与队列的关联,辅助垃圾回收。更新头节点:(head = first)
把head
指向first
,完成队列头指针迁移,保证后续出队逻辑正确。取出元素:
从first
节点中取出要弹出的元素x
。清空元素引用:
将first.item
置为null
,帮助 GC 回收元素对象,避免内存泄漏。返回结果:
返回弹出的元素x
,完成出队操作。
加锁分析
加锁设计(高明之处)
- 单锁 vs 双锁:
- 单锁:同一时刻最多允许一个线程(生产者 / 消费者二选一 )执行操作。
- 双锁(
putLock
、takeLock
):同一时刻允许一个生产者 + 一个消费者同时执行,消费者线程间、生产者线程间仍串行。
线程安全分析(按节点总数分类)
节点总数 > 2(含 dummy 节点):
putLock
保证last
节点线程安全(入队操作 )。takeLock
保证head
节点线程安全(出队操作 )。- 两把锁隔离入队、出队操作,无竞争。
节点总数 = 2(1 个 dummy 节点 + 1 个正常节点):
仍用两把锁锁定不同对象(last
、head
相关 ),无竞争。节点总数 = 1(仅 dummy 节点):
take
线程因队列为空,被notEmpty
条件阻塞,有竞争时会阻塞等待。
// 用于 put(阻塞)、offer(非阻塞) 的锁,控制入队操作
private final ReentrantLock putLock = new ReentrantLock();
// 用于 take(阻塞)、poll(非阻塞) 的锁,控制出队操作
private final ReentrantLock takeLock = new ReentrantLock();
put操作
public void put(E e) throws InterruptedException {// 1. 校验元素非空if (e == null) throw new NullPointerException();int c = -1; // 记录入队前的元素数量Node<E> node = new Node<E>(e); // 创建新节点final ReentrantLock putLock = this.putLock; // 生产者锁final AtomicInteger count = this.count; // 队列元素数量(原子类,保证线程安全)// 2. 加锁(可中断锁)putLock.lockInterruptibly();try {// 3. 队列满时等待(循环检测,防止虚假唤醒)while (count.get() == capacity) {// 等待 notFull 条件(队列非满时被唤醒)notFull.await(); }// 4. 入队操作enqueue(node); // 将新节点加入队列尾部c = count.getAndIncrement(); // 元素数量 +1(先获取旧值,再自增)// 5. 唤醒其他生产者(如果队列仍有空位)if (c + 1 < capacity) {// 唤醒等待 notFull 的生产者线程notFull.signal(); }} finally {// 6. 释放锁putLock.unlock();}// 7. 唤醒消费者(如果队列从空变为非空)if (c == 0) {// 唤醒等待 notEmpty 的消费者线程notEmpty.signal(); }
}// 辅助方法:入队操作(私有方法,保证线程安全)
private void enqueue(Node<E> node) {// 队列的 last 节点指向新节点,完成入队last = last.next = node;
}
校验元素:
元素为null
时抛出异常。初始化与加锁:
创建节点,获取生产者锁,可中断加锁。队列满时等待:
队列满则循环等待notFull
条件,释放锁并阻塞。入队与计数:
将节点加入队列尾部,原子递增元素数量。唤醒生产者:
队列仍有空位时,唤醒其他等待的生产者。(自己唤醒自己)释放锁:
保证锁释放,避免死锁。唤醒消费者:
队列入队前为空时,唤醒等待的消费者。
take操作
public E take() throws InterruptedException {E x; // 存储出队的元素int c = -1; // 记录出队前的元素数量final AtomicInteger count = this.count; // 队列元素数量(原子类,保证线程安全)final ReentrantLock takeLock = this.takeLock; // 消费者锁// 1. 加锁(可中断锁)takeLock.lockInterruptibly();try {// 2. 队列空时等待(循环检测,防止虚假唤醒)while (count.get() == 0) {// 等待 notEmpty 条件(队列非空时被唤醒)notEmpty.await(); }// 3. 出队操作x = dequeue(); // 从队列头部取出元素c = count.getAndDecrement(); // 元素数量 -1(先获取旧值,再自减)// 4. 唤醒其他消费者(如果队列仍有元素)if (c > 1) {// 唤醒等待 notEmpty 的消费者线程notEmpty.signal(); }} finally {// 5. 释放锁takeLock.unlock();}// 6. 唤醒生产者(如果队列从满变为非满)if (c == capacity) {// 唤醒等待 notFull 的生产者线程signalNotFull(); }return x;
}
加锁:
消费者线程加锁,支持中断。队列空时等待:
队列空则循环等待notEmpty
条件,释放锁并阻塞。出队与计数:
从队列头部取出元素,原子递减元素数量。唤醒消费者:
队列仍有元素时,唤醒其他等待的消费者。释放锁:
保证锁释放,避免死锁。唤醒生产者:
队列出队前满时,唤醒等待的生产者。返回结果:
返回出队的元素。
LinkedBlockingQueue VS ArrayBlockingQueue
LinkedBlockingQueue | ArrayBlockingQueue | |
---|---|---|
有界性支持 | 支持有界(默认容量为 Integer.MAX_VALUE ,可手动指定有界) | 强制有界(初始化必须指定容量,且不可动态扩容) |
底层数据结构 | 链表(Node 节点构成链表) | 数组(提前初始化固定大小的 Object 数组) |
初始化特性 | 懒惰初始化(首次入队时创建节点,无需提前分配内存) | 提前初始化(创建时需指定容量,数组直接分配内存) |
节点创建时机 | 每次入队(put /offer )时动态创建 Node | 初始化时提前创建数组元素(逻辑上的 “节点” 是数组槽位) |
锁机制 | 双锁分离(putLock 控制入队,takeLock 控制出队) | 单锁(ReentrantLock 同时控制入队和出队) |
并发性能(典型场景) | 生产 / 消费可并行(双锁减少竞争),高并发下优势明显 | 生产 / 消费互斥(单锁竞争激烈),高并发下性能受限 |
内存开销 | 动态节点导致额外内存(Node 对象头、指针等) | 数组连续存储,内存更紧凑(无额外指针开销) |
扩容能力 | 理论上可无限扩容(受限于 Integer.MAX_VALUE ) | 不可扩容(容量固定,满了只能等待出队) |
适用场景 | 高并发生产消费、需动态扩容、内存不敏感场景 | 低并发、容量固定、内存敏感场景 |
ConcurrentLinkedQueue
CopyOnWriteArrayList
CopyOnWriteArraySet 是它的马甲
底层实现采用了写入时拷贝的思想,增删改操作会将底层数组拷贝一份,更改操作在新数组上执行,这时不影响其它线程的并发读,读写分离。
以新增为例:
public boolean add(E e) {synchronized (lock) {// 获取旧的数组Object[] es = getArray();int len = es.length;// 拷贝新的数组(这里是比较耗时的操作,但不影响其它读线程)es = Arrays.copyOf(es, len + 1);// 添加新元素es[len] = e;// 替换旧的数组setArray(es);return true;}
}
这里的源码版本是 Java 11,在 Java 1.8 中使用的是可重入锁而不是 synchronized
- CopyOnWriteArraySet 等基于写时拷贝(Copy - On - Write)思想的类,其他读操作未加锁,适合 “读多写少” 的应用场景,以
forEach
方法为例进行展示。 - 适用场景:读多写少场景,读操作无需加锁,能高效并发读取;写操作因拷贝数组相对耗时,适合低频写操作的场景。
public void forEach(Consumer<? super E> action) {Objects.requireNonNull(action);for (Object x : getArray()) {@SuppressWarnings("unchecked") E e = (E) x;action.accept(e);}
}
get弱一致性
时间点 | 操作 | 结果 |
---|---|---|
1 | Thread-0 执行 getArray() | 拿到数组快照 array = [1, 2, 3] |
2 | Thread-1 执行 getArray() | 拿到同一数组快照 array = [1, 2, 3] |
3 | Thread-1 执行 remove(0) → 触发写时拷贝 | 新建数组 arrayCopy = [2, 3] ,替换 array 为 arrayCopy |
4 | Thread-0 执行 array[index] | 访问的是旧数组 [1, 2, 3] 的元素 |
写时拷贝的 “快照” 特性:
读线程(Thread-0
)在时间点 1
拿到的是数组的快照(旧数组 [1, 2, 3]
)。即使后续写线程(Thread-1
)修改了数组(替换为 [2, 3]
),读线程仍会访问自己持有的旧快照。
→ 读操作可能访问到 “过期数据”,这就是弱一致性的体现。
迭代器弱一致性
CopyOnWriteArrayList<Integer> list = new CopyOnWriteArrayList<>();
list.add(1);
list.add(2);
list.add(3);// 1. 获取迭代器(此时迭代器持有数组快照:[1,2,3])
Iterator<Integer> iter = list.iterator(); // 2. 新线程修改集合(触发写时拷贝,数组变为 [2,3])
new Thread(() -> {list.remove(0); System.out.println(list); // 输出: [2, 3]
}).start();// 3. 主线程继续用迭代器遍历
sleep1s(); // 等待子线程执行完毕
while (iter.hasNext()) {System.out.println(iter.next()); // 输出: 1, 2, 3
}
快照机制:
迭代器创建时,会拷贝当前数组的快照(如[1,2,3]
),后续集合的修改(如remove
)会生成新数组([2,3]
),但迭代器仍持有旧快照。无感知遍历:
迭代器遍历的是创建时的数组快照,不感知后续集合的修改 → 遍历结果与 “当前集合实际数据” 不一致。