文章目录
- 如何在一个事件驱动的生产者-消费者模型中使用观察者进行通知与解耦?
- 1 假设场景设计
- 2 Codes
- 3 流程图
- 4 优劣势
- 5 风险可能
如何在一个事件驱动的生产者-消费者模型中使用观察者进行通知与解耦?
1 假设场景设计
- Producer(生产者):生成任务并推送到队列中。
- TaskQueue(主题/被观察者):任务队列,同时也是一个“可被观察”的对象,它在收到新任务后,会主动通知观察者(消费者)。
- Consumer(观察者):注册到队列中,当有新任务时被通知,并从队列中拉取任务。
避免了消费者主动等待(如传统条件变量 wait),改用回调通知。
2 Codes
#include <iostream>
#include <vector>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <functional>
#include <memory>
#include <atomic>// ========== Observer 接口 ==========
class Observer {
public:virtual void onNotified() = 0;virtual ~Observer() = default;
};// ========== 主题(被观察者) ==========
class TaskQueue {
public:void addObserver(std::shared_ptr<Observer> obs) {std::lock_guard<std::mutex> lock(observerMutex_);observers_.push_back(obs);}void pushTask(int task) {{std::lock_guard<std::mutex> lock(queueMutex_);queue_.push(task);}notifyObservers();}bool popTask(int& task) {std::lock_guard<std::mutex> lock(queueMutex_);if (queue_.empty()) return false;task = queue_.front();queue_.pop();return true;}bool hasTask() {std::lock_guard<std::mutex> lock(queueMutex_);return !queue_.empty();}private:void notifyObservers() {std::lock_guard<std::mutex> lock(observerMutex_);for (auto& obs : observers_) {if (obs) obs->onNotified(); // 回调通知}}private:std::queue<int> queue_;std::mutex queueMutex_;std::vector<std::shared_ptr<Observer>> observers_;std::mutex observerMutex_;
};// ========== 消费者(观察者) ==========
class Consumer : public Observer, public std::enable_shared_from_this<Consumer> {
public:Consumer(std::shared_ptr<TaskQueue> queue, int id): queue_(queue), id_(id), stopFlag_(false) {}void start() {thread_ = std::thread([self = shared_from_this()] {self->run();});}void stop() {stopFlag_ = true;cv_.notify_all(); // 所有线程都唤醒}void onNotified() override {cv_.notify_one(); // 唤醒 run 中等待的线程}private:void run() {while (true) {std::unique_lock<std::mutex> lock(cvMutex_);cv_.wait(lock, [this]() {return stopFlag_ || queue_->hasTask(); });if (stopFlag_ && !queue_->hasTask()) break; int task;while (queue_->popTask(task)) {std::cout << "[Consumer " << id_ << "] Consumed task: " << task << std::endl;}}}private:std::shared_ptr<TaskQueue> queue_;int id_;std::thread thread_;std::atomic<bool> stopFlag_;std::condition_variable cv_;std::mutex cvMutex_;
};// ========== 生产者 ==========
void producer(std::shared_ptr<TaskQueue> queue) {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(200));std::cout << "[Producer] Produced task: " << i << std::endl;queue->pushTask(i);}
}int main() {auto queue = std::make_shared<TaskQueue>();// 启动两个消费者auto consumer1 = std::make_shared<Consumer>(queue, 1);auto consumer2 = std::make_shared<Consumer>(queue, 2);queue->addObserver(consumer1);queue->addObserver(consumer2);consumer1->start();consumer2->start();// 启动生产者线程std::thread prodThread(producer, queue);prodThread.join();std::this_thread::sleep_for(std::chrono::seconds(1));consumer1->stop();consumer2->stop();return 0;
}
输出
[Producer] Produced task: 0
[Consumer 2] Consumed task: 0
[Producer] Produced task: 1
[Consumer 2] Consumed task: 1
[Producer] Produced task: 2
[Consumer 2] Consumed task: 2
[Producer] Produced task: 3
[Consumer 2] Consumed task: 3
[Producer] Produced task: 4
[Consumer 2] Consumed task: 4
[Producer] Produced task: 5
[Consumer 2] Consumed task: 5
[Producer] Produced task: 6
[Consumer 2] Consumed task: 6
[Producer] Produced task: 7
[Consumer 2] Consumed task: 7
[Producer] Produced task: 8
[Consumer 2] Consumed task: 8
[Producer] Produced task: 9
[Consumer 2] Consumed task: 9
关键代码解读:
Consumer
类中的 onNotified()
和 run()
方法是如何配合实现消费者监听通知的 ==》背后即“观察者 + 条件变量”的事件驱动机制
TaskQueue::notifyObservers() 调用Consumer::onNotified,唤醒等待的 Consumer::run() 线程。
二者配合流程详解
-
run()
是消费者线程主循环(由start()
启动)- 每个
Consumer
启动后会在一个独立线程中运行run()
方法; - 它使用
cv_.wait(lock)
进入 阻塞等待状态,直到被通知(由notify_one()
唤醒); - 唤醒后尝试从
TaskQueue
中popTask()
,直到队列为空; - 然后再次进入等待。
- 每个
-
onNotified()
是“被观察者”的回调通知函数TaskQueue::notifyObservers()
被调用(例如pushTask()
中调用)时,会遍历注册的观察者;- 每个观察者(即
Consumer
)都会被调用onNotified()
; onNotified()
会调用cv_.notify_one()
,唤醒run()
中正在等待的线程。
3 流程图
[Producer]↓ pushTask()
[TaskQueue]↓ notifyObservers()
[Consumer]↓ onNotified()→ cv_.notify_one()↓
[run() loop]→ cv_.wait() 被唤醒↓→ popTask()↓→ 处理任务
4 优劣势
编码可能遇到的问题 | 原因/应对 |
---|---|
cv.wait() 可能虚假唤醒 | 可用 cv.wait(lock, condition) 代替裸 wait() ,避免无任务时误唤醒。 |
多个消费者抢任务 | 多个消费者被唤醒时要竞争 queue_ 锁,可通过加任务标签或调度器来分配。 |
重复唤醒开销大 | 若任务频繁到达,建议合并通知、或按“任务计数”通知。 |
优点 | 描述 |
---|---|
解耦 | 消费者不需要主动轮询,事件驱动机制带来良好模块化。 |
可扩展 | 支持多个消费者动态注册,符合微服务或事件分发模型。 |
降低等待 | 利用通知机制唤醒消费者,避免空轮询带来的 CPU 消耗。 |
灵活性 | 可轻松拓展为异步观察者队列、支持任务优先级、过滤等机制。 |
5 风险可能
- 若消费者数量多,且频繁 wakeup,可能存在“惊群效应”。
- 可以通过线程绑定或负载均衡策略来优化通知粒度。
- 可扩展为事件过滤、类型区分(如不同类型的消费者响应不同事件)。