【C/C++】如何在一个事件驱动的生产者-消费者模型中使用观察者进行通知与解耦

文章目录

  • 如何在一个事件驱动的生产者-消费者模型中使用观察者进行通知与解耦?
    • 1 假设场景设计
    • 2 Codes
    • 3 流程图
    • 4 优劣势
    • 5 风险可能

如何在一个事件驱动的生产者-消费者模型中使用观察者进行通知与解耦?


1 假设场景设计

  • Producer(生产者):生成任务并推送到队列中。
  • TaskQueue(主题/被观察者):任务队列,同时也是一个“可被观察”的对象,它在收到新任务后,会主动通知观察者(消费者)
  • Consumer(观察者):注册到队列中,当有新任务时被通知,并从队列中拉取任务。

避免了消费者主动等待(如传统条件变量 wait),改用回调通知


2 Codes

#include <iostream>
#include <vector>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <functional>
#include <memory>
#include <atomic>// ========== Observer 接口 ==========
class Observer {
public:virtual void onNotified() = 0;virtual ~Observer() = default;
};// ========== 主题(被观察者) ==========
class TaskQueue {
public:void addObserver(std::shared_ptr<Observer> obs) {std::lock_guard<std::mutex> lock(observerMutex_);observers_.push_back(obs);}void pushTask(int task) {{std::lock_guard<std::mutex> lock(queueMutex_);queue_.push(task);}notifyObservers();}bool popTask(int& task) {std::lock_guard<std::mutex> lock(queueMutex_);if (queue_.empty()) return false;task = queue_.front();queue_.pop();return true;}bool hasTask() {std::lock_guard<std::mutex> lock(queueMutex_);return !queue_.empty();}private:void notifyObservers() {std::lock_guard<std::mutex> lock(observerMutex_);for (auto& obs : observers_) {if (obs) obs->onNotified();  // 回调通知}}private:std::queue<int> queue_;std::mutex queueMutex_;std::vector<std::shared_ptr<Observer>> observers_;std::mutex observerMutex_;
};// ========== 消费者(观察者) ==========
class Consumer : public Observer, public std::enable_shared_from_this<Consumer> {
public:Consumer(std::shared_ptr<TaskQueue> queue, int id): queue_(queue), id_(id), stopFlag_(false) {}void start() {thread_ = std::thread([self = shared_from_this()] {self->run();});}void stop() {stopFlag_ = true;cv_.notify_all();  // 所有线程都唤醒}void onNotified() override {cv_.notify_one();  // 唤醒 run 中等待的线程}private:void run() {while (true) {std::unique_lock<std::mutex> lock(cvMutex_);cv_.wait(lock, [this]() {return stopFlag_ || queue_->hasTask(); });if (stopFlag_ && !queue_->hasTask()) break; int task;while (queue_->popTask(task)) {std::cout << "[Consumer " << id_ << "] Consumed task: " << task << std::endl;}}}private:std::shared_ptr<TaskQueue> queue_;int id_;std::thread thread_;std::atomic<bool> stopFlag_;std::condition_variable cv_;std::mutex cvMutex_;
};// ========== 生产者 ==========
void producer(std::shared_ptr<TaskQueue> queue) {for (int i = 0; i < 10; ++i) {std::this_thread::sleep_for(std::chrono::milliseconds(200));std::cout << "[Producer] Produced task: " << i << std::endl;queue->pushTask(i);}
}int main() {auto queue = std::make_shared<TaskQueue>();// 启动两个消费者auto consumer1 = std::make_shared<Consumer>(queue, 1);auto consumer2 = std::make_shared<Consumer>(queue, 2);queue->addObserver(consumer1);queue->addObserver(consumer2);consumer1->start();consumer2->start();// 启动生产者线程std::thread prodThread(producer, queue);prodThread.join();std::this_thread::sleep_for(std::chrono::seconds(1));consumer1->stop();consumer2->stop();return 0;
}

输出

[Producer] Produced task: 0
[Consumer 2] Consumed task: 0
[Producer] Produced task: 1
[Consumer 2] Consumed task: 1
[Producer] Produced task: 2
[Consumer 2] Consumed task: 2
[Producer] Produced task: 3
[Consumer 2] Consumed task: 3
[Producer] Produced task: 4
[Consumer 2] Consumed task: 4
[Producer] Produced task: 5
[Consumer 2] Consumed task: 5
[Producer] Produced task: 6
[Consumer 2] Consumed task: 6
[Producer] Produced task: 7
[Consumer 2] Consumed task: 7
[Producer] Produced task: 8
[Consumer 2] Consumed task: 8
[Producer] Produced task: 9
[Consumer 2] Consumed task: 9

关键代码解读:
Consumer 类中的 onNotified()run() 方法是如何配合实现消费者监听通知的 ==》背后即“观察者 + 条件变量”的事件驱动机制
TaskQueue::notifyObservers() 调用Consumer::onNotified,唤醒等待的 Consumer::run() 线程。

二者配合流程详解

  1. run() 是消费者线程主循环(由 start() 启动)

    • 每个 Consumer 启动后会在一个独立线程中运行 run() 方法;
    • 它使用 cv_.wait(lock) 进入 阻塞等待状态,直到被通知(由 notify_one() 唤醒);
    • 唤醒后尝试从 TaskQueuepopTask(),直到队列为空;
    • 然后再次进入等待。
  2. onNotified() 是“被观察者”的回调通知函数

    • TaskQueue::notifyObservers() 被调用(例如 pushTask() 中调用)时,会遍历注册的观察者;
    • 每个观察者(即 Consumer)都会被调用 onNotified()
    • onNotified() 会调用 cv_.notify_one(),唤醒 run() 中正在等待的线程。

3 流程图

[Producer]↓ pushTask()
[TaskQueue]↓ notifyObservers()
[Consumer]↓ onNotified()→ cv_.notify_one()↓
[run() loop]→ cv_.wait() 被唤醒↓→ popTask()↓→ 处理任务

4 优劣势

编码可能遇到的问题原因/应对
cv.wait() 可能虚假唤醒可用 cv.wait(lock, condition) 代替裸 wait(),避免无任务时误唤醒。
多个消费者抢任务多个消费者被唤醒时要竞争 queue_ 锁,可通过加任务标签或调度器来分配。
重复唤醒开销大若任务频繁到达,建议合并通知、或按“任务计数”通知。
优点描述
解耦消费者不需要主动轮询,事件驱动机制带来良好模块化。
可扩展支持多个消费者动态注册,符合微服务或事件分发模型。
降低等待利用通知机制唤醒消费者,避免空轮询带来的 CPU 消耗。
灵活性可轻松拓展为异步观察者队列、支持任务优先级、过滤等机制。

5 风险可能

  • 若消费者数量多,且频繁 wakeup,可能存在“惊群效应”。
  • 可以通过线程绑定负载均衡策略来优化通知粒度。
  • 可扩展为事件过滤、类型区分(如不同类型的消费者响应不同事件)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/bicheng/82467.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

MVC和MVVM架构的区别

MVC和MVVM都是前端开发中常用的设计模式&#xff0c;都是为了解决前端开发中的复杂性而设计的&#xff0c;而MVVM模式则是一种基于MVC模式的新模式。 MVC(Model-View-Controller)的三个核心部分&#xff1a;模型、视图、控制器相较于MVVM(Model-View-ViewModel)的三个核心部分…

兰亭妙微 | 图标设计公司 | UI设计案例复盘

在「33」「312」新高考模式下&#xff0c;选科决策成为高中生和家长的「头等大事」。兰亭妙微公司受委托优化高考选科决策平台个人诊断报告界面&#xff0c;核心挑战是&#xff1a;如何将复杂的测评数据&#xff08;如学习能力倾向、学科报考机会、职业兴趣等&#xff09;转化为…

有铜半孔的设计规范与材料创新

设计关键参数 孔径与间距限制 最小孔径需≥0.6mm&#xff0c;孔边距≥0.5mm&#xff0c;避免铜层脱落&#xff1b;拼版时半孔区域需预留2mm间距防止撕裂。 阻焊桥设计 必须保留阻焊桥&#xff08;宽度≥0.1mm&#xff09;&#xff0c;防止焊锡流入孔内造成短路。 猎板的材料…

Engineering a direct k-way Hypergraph Partitioning Algorithm【2017 ALENEX】

文章目录 一、作者二、摘要三、相关工作四、算法概述五、实验结果六、主要贡献 一、作者 Yaroslav Akhremtsev, Tobias Heuer, Peter Sanders, Sebastian Schlag 二、摘要 我们开发了一种快速且高质量的多层算法&#xff0c;能够直接将超图划分为 k 个平衡的块 —— 无需借助递…

视频问答功能播放器(视频问答)视频弹题功能实例

视频问答播放器是一种互动教学工具&#xff0c;在视频播放过程中弹出题目卡&#xff0c;学员答题后才能继续观看&#xff0c;提升学习参与度。视频问答功能播放器(视频问答)视频弹题功能实例&#xff1a; 视频播放器的视频问答功能&#xff08;也叫问答播放器、视频弹题、视频问…

2025年AI代理演进全景:从技术成熟度曲线到产业重构

2025年AI代理演进全景&#xff1a;从技术成熟度曲线到产业重构 一、技术成熟度曲线定位&#xff1a;AI代理的“期望膨胀期” 根据Gartner技术成熟度曲线&#xff08;Hype Cycle™&#xff09;&#xff0c;AI代理&#xff08;Agentic AI&#xff09;当前正处于期望膨胀期向泡沫…

基于python的机器学习(八)—— 评估算法(一)

目录 一、机器学习评估的基本概念 1.1 评估的定义与目标 1.2 常见评估指标 1.3 训练集、验证集与测试集的划分 二、分离数据集 2.1 分离训练数据集和评估数据集 2.2 k折交叉验证分离 2.3 弃一交叉验证分离 2.4 重复随机评估和训练数据集分离 三、交叉验证技术 3.…

Win11 系统登入时绑定微软邮箱导致用户名欠缺

Win11 系统登入时绑定微软邮箱导致用户名欠缺 解决思路 -> 解绑当前微软邮箱和用户名 -> 断网离线建立本地账户 -> 设置本地账户为Admin权限 -> 注销当前账户&#xff0c;登入新建的用户 -> 联网绑定微软邮箱 -> 删除旧的用户命令步骤 管理员权限打开…

Mac系统-最方便的一键环境部署软件ServBay(支持php,java,python,node,go,mysql等)没有之一,已亲自使用!

自从换成Mac电脑以后&#xff0c;做开发有时候要部署各种环境&#xff0c;如php&#xff0c;mysql&#xff0c;nginx&#xff0c;pgsql&#xff0c;java&#xff0c;node&#xff0c;python&#xff0c;go时&#xff0c;尝试过原生环境部署&#xff0c;各种第三方软件部署&…

Flink中Kafka连接器的基本应用

文章目录 前言Kafka连接器基础案例演示前置说明和环境准备步骤Kafka连接器基本配置关联数据源映射转换案例效果演示基于Kafka连接器同步数据到MySQL案例说明前置准备Kafka连接器消费位点调整映射转换与数据投递MysqlSlink持久化收集器数据最终效果演示小结参考前言 本文将基于…

Leetcode 刷题记录 11 —— 二叉树第二弹

本系列为笔者的 Leetcode 刷题记录&#xff0c;顺序为 Hot 100 题官方顺序&#xff0c;根据标签命名&#xff0c;记录笔者总结的做题思路&#xff0c;附部分代码解释和疑问解答&#xff0c;01~07为C语言&#xff0c;08及以后为Java语言。 01 二叉树的层序遍历 /*** Definition…

【R语言科研绘图】

R语言在绘制SCI期刊图像时具有显著优势&#xff0c;以下从功能、灵活性和学术适配性三个方面分析其适用性&#xff1a; 数据可视化库丰富 R语言拥有ggplot2、lattice、ggpubr等专业绘图包&#xff0c;支持生成符合SCI期刊要求的高分辨率图像&#xff08;如TIFF/PDF格式&#…

【Node.js】Web开发框架

个人主页&#xff1a;Guiat 归属专栏&#xff1a;node.js 文章目录 1. Node.js Web框架概述1.1 Web框架的作用1.2 Node.js主要Web框架生态1.3 框架选择考虑因素 2. Express.js2.1 Express.js概述2.2 基本用法2.2.1 安装Express2.2.2 创建基本服务器 2.3 路由2.4 中间件2.5 请求…

PDF 转 JPG 图片小工具:CodeBuddy 助力解决转换痛点

本文所使用的 CodeBuddy 免费下载链接&#xff1a;腾讯云代码助手 CodeBuddy - AI 时代的智能编程伙伴 前言 在数字化办公与内容创作的浪潮中&#xff0c;将 PDF 文件转换为 JPG 图片格式的需求日益频繁。无论是学术文献中的图表提取&#xff0c;还是宣传资料的视觉化呈现&am…

Linux 文件系统层次结构

Linux 的文件系统遵循 Filesystem Hierarchy Standard (FHS) 标准&#xff0c;其目录结构是层次化的&#xff0c;每个目录都有明确的用途。以下是 Linux 中部分目录的作用解析&#xff1a; 1. 根目录 / 作用&#xff1a;根目录是整个文件系统的顶层目录&#xff0c;所有其他目…

密码学标准(Cryptography Standards)介绍

密码学标准(Cryptography Standards)是为确保信息安全传输、存储和处理而制定的一系列技术规范和协议,广泛应用于通信、金融、互联网等领域。以下从分类、主流标准、应用场景和发展趋势四个方面进行详细介绍: 一、密码学标准的分类 密码学标准可根据技术原理和应用场景分…

ubuntu 22.04安装和使用docker介绍

docker安装和使用 准备环境常见的docker操作linux系统常用的配置卸载docker 准备环境 本机环境&#xff1a; Linux yz-MS-7E06 6.8.0-59-generic #61~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Tue Apr 15 17:03:15 UTC 2 x86_64 x86_64 x86_64 GNU/Linux安装依赖软件&#xff1a;…

obsidian 中的查找和替换插件,支持正则

最近用着 obsidian 时&#xff0c;发现想要在当前文档中 查找和替换 内容时&#xff0c;没有自动查找和替换的功能&#xff0c;去插件市场查找也没有发现好用的插件&#xff0c;那就自己写一个吧。 全程用的 AI 来写的&#xff0c;当然&#xff0c;我对 JS/CSS/TypeScript 等没…

针对vue项目的webpack优化攻略

一、开发阶段优化 1. 热更新加速&#xff08;HMR&#xff09; // vue.config.js module.exports {devServer: {hot: true, // 开启热更新injectClient: true, // 自动注入HMR客户端watchOptions: {ignored: /node_modules/, // 忽略node_modules变化aggregateTimeout: 300…

BTC官网关注巨鲸12亿美元平仓,XBIT去中心化交易平台表现稳定

在全球加密货币市场波动加剧的背景下&#xff0c;2025年5月25日传出重磅消息。据今日最新国际报道&#xff0c;知名巨鲸James Wynn完全平仓价值12亿美元的BTC多头仓位&#xff0c;整体盈利约845万美元&#xff0c;此举引发市场广泛关注。与此同时&#xff0c;收益型稳定币市场迎…