无锁队列实现与内存回收机制:Hazard Pointer 深度解析
在并发系统中,为了提升性能和避免锁竞争,我们常常追求 lock-free 数据结构。但当你实现完一个无锁队列后,会发现一个严重问题:
内存什么时候释放?怎样避免访问已经被回收的节点?
Hazard Pointer(危险指针)机制,就是为了解决这一痛点。
本篇将带你深入理解:
- 什么是无锁队列(MS Queue)
- 为什么需要 Hazard Pointer
- 如何用 C++ 实现完整的无锁队列 + 安全回收机制
- 避免 ABA 问题的技巧
一、无锁队列背景:MS 队列简介
Michael-Scott 队列(MS Queue)是经典的无锁并发队列,具备以下特性:
- 使用
std::atomic
实现 - 支持多生产者、多消费者
- 基于 CAS(Compare-And-Swap)进行并发控制
队列结构:
head --> [dummy] --> [node1] --> [node2] --> nullptr^tail
- 使用 dummy 节点初始化队列,避免空指针特判。
- enqueue 修改 tail,dequeue 修改 head。
二、C++ 无锁队列实现(简化版)
template <typename T>
struct Node {std::atomic<Node*> next;T value;Node(const T& val) : next(nullptr), value(val) {}
};template <typename T>
class LockFreeQueue {
private:std::atomic<Node<T>*> head;std::atomic<Node<T>*> tail;public:LockFreeQueue() {Node<T>* dummy = new Node<T>(T{});head.store(dummy);tail.store(dummy);}void enqueue(const T& val) {Node<T>* new_node = new Node<T>(val);while (true) {Node<T>* last = tail.load();Node<T>* next = last->next.load();if (last == tail.load()) {if (!next) {if (last->next.compare_exchange_weak(next, new_node)) {tail.compare_exchange_weak(last, new_node);return;}} else {tail.compare_exchange_weak(last, next);}}}}bool dequeue(T& result) {while (true) {Node<T>* first = head.load();Node<T>* last = tail.load();Node<T>* next = first->next.load();if (first == head.load()) {if (first == last) {if (!next) return false; // emptytail.compare_exchange_weak(last, next);} else {result = next->value;if (head.compare_exchange_weak(first, next)) {// ❗问题:何时 delete first ?return true;}}}}}
};
问题:节点释放时机?
在并发环境下 dequeue
之后,其他线程可能仍在访问那个被删除的节点,不能立即 delete。
三、为什么需要 Hazard Pointer?
常见方法(失败):
- 延迟 delete? 不知道什么时候没人访问。
- GC? C++ 没有自动垃圾回收。
- 引用计数? 有性能开销,难以 lock-free。
- 线程局部链表回收? 难以同步他人状态。
Hazard Pointer 的核心思想:
每个线程维护一个“我正在访问的指针”集合,回收前先检查是否有人正在访问。
四、Hazard Pointer 机制原理
数据结构:
// 线程局部
thread_local Node* my_hazard_ptr;// 全局列表
std::vector<std::atomic<Node*>> hazard_pointers;
工作流程:
-
访问某节点前,将其地址写入 hazard pointer。
-
删除时,把要删除的节点放入“回收候选集合”。
-
定期扫描所有线程的 hazard pointer:
- 如果节点不在任何 hazard 中,则可以安全 delete。
回收伪码:
void retire_node(Node* node) {retired_list.push_back(node);if (retired_list.size() >= threshold) {scan_and_reclaim();}
}void scan_and_reclaim() {std::unordered_set<Node*> hp_set;for (auto& hp : hazard_pointers) {Node* p = hp.load();if (p) hp_set.insert(p);}auto it = retired_list.begin();while (it != retired_list.end()) {if (hp_set.count(*it) == 0) {delete *it;it = retired_list.erase(it);} else {++it;}}
}
五、完整工程结构(建议)
lockfree_queue/
├── include/
│ ├── lockfree_queue.h // MS队列
│ └── hazard_pointer.h // Hazard机制封装
├── src/
│ ├── hazard_pointer.cpp
│ └── main.cpp
├── test/
│ └── test_queue.cpp // GTest 测试用例
├── CMakeLists.txt
示例:Hazard 指针封装类接口
class HazardPointerManager {
public:static void* get_slot(); // 获取当前线程 hazard pointer 槽static void retire(void* ptr);static void reclaim(); // 回收所有未被引用的节点
};
六、ABA 问题与 Hazard Pointer 的结合防护
虽然 Hazard Pointer 可以避免野指针访问,但不能防止 ABA 问题。
因此,通常会在无锁队列中配合使用 Tagged Pointer(带版本号的指针)。
struct TaggedPtr {Node* ptr;uint64_t tag;
};
std::atomic<TaggedPtr> head;
通过不断增加 tag
可以有效防止 ABA。
七、总结与建议
问题 | 是否解决 | 说明 |
---|---|---|
并发入队 | ✅ | CAS 更新尾指针 |
并发出队 | ✅ | CAS 更新头指针 |
节点回收 | ✅ | Hazard Pointer 扫描后安全 delete |
ABA 问题 | ✅(需 TaggedPtr) | 防止假象“未变化” |
八、参考资料
- Michael & Scott: “Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms”
- Herb Sutter: “Lock-Free Programming (Hazard Pointers)”
- C++ Concurrency in Action 第二版
- Facebook Folly Hazard Pointer 实现
下期预告
《设计一个可扩展的 Lock-Free 环:支持动态扩容、线程绑定与优先级任务》