【多线程】六、基于阻塞队列的生产者消费者模型

文章目录

  • Ⅰ. 生产者消费者模型的概念
  • Ⅱ. 生产者消费者模型的优点
  • Ⅲ. 基于阻塞队列的生产者消费者模型
        • Makefile
        • Block_queue.hpp
        • task.hpp
        • test.cpp
  • Ⅳ. 如何理解提高了效率❓❓❓

在这里插入图片描述

Ⅰ. 生产者消费者模型的概念

​ 生产者消费者模型是一种常见的并发模式,用于解决生产者和消费者之间的协作问题。它是通过一个容器来 解决生产者和消费者的强耦合问题

​ 生产者和消费者彼此之间不直接通讯,而 通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,而不用去关心消费者是否要取的问题;消费者不找生产者要数据,而是直接从阻塞队列里取,而不关心生产者是否在生产的问题。

​ 所以 阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,这个 阻塞队列就是用来给生产者和消费者解耦的

​ 但是因为生产者和消费者是异步的,所以必须使用同步机制来保证它们之间的正确协作,避免数据竞争和死锁等问题。

​ 举个例子,生产者就是工厂,消费者就是我们日常消费的人,一般来说,我们是不会直接去工厂买东西的,而是到超市去买,超市就相对于是一个缓冲区,我们每次买东西的时候都会去超市买,超市一般都会储备比较多的货,所以消费者一般不需要去关心是否有货的问题;而工厂每次生产完货之后就像超市投放,并且可以等到超市缺货的时候再去生产,就有了一段空白时间可以干其它的事情,并且也不需要去关心是否有人去消费的问题,起到了相互解耦的作用!

Ⅱ. 生产者消费者模型的优点

  1. 解耦合:生产者和消费者之间的通信是通过缓冲区进行的,彼此之间无需直接通信,从而实现了解耦合,提高了系统的可维护性和可扩展性。
  2. 提高效率:生产者可以一次性生产多个产品,然后将它们批量存储在缓冲区中,消费者可以一次性消费多个产品,这样可以大大提高生产和消费的效率。
  3. 减少资源浪费:如果没有缓冲区,生产者必须阻塞等待消费者消费产品后才能生产下一个产品,而消费者也必须阻塞等待生产者生产产品后才能消费产品,这样会浪费大量的资源,降低了系统的效率。
  4. 支持并发:生产者消费者模型是多线程编程中的经典案例,可以帮助程序员更好地理解和掌握多线程编程的技巧和方法。
    在这里插入图片描述

Ⅲ. 基于阻塞队列的生产者消费者模型

​ 基于 阻塞队列(Blocking Queue 的生产者消费者模型是一种常见的实现方式,其与普通的队列区别在于:(以下的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)

  • 队列为空,从阻塞队列获取元素的操作将会被阻塞,直到阻塞队列中被放入了资源。
  • 队列为满,往阻塞队列里存放元素的操作也会被阻塞,直到阻塞队列中有空闲位置。

在这里插入图片描述

​ 下面我们写一个程序,实现功能:一个就是我们的生产者可以通过第一个阻塞队列向队列中放入任务(可以自定义),然后消费者向队列中拿任务并且执行。除此之外,当消费者向第一个队列中拿出任务并且得到结果的时候,同时向第二个阻塞队列中放入该执行结果,充当第二个阻塞队列中的生产者。而第二个阻塞队列中的消费者则拿到这个生产的任务,实现将其生产者的执行结果写入文本文件中,达到存储的目的!

在这里插入图片描述

​ 下面的程序中我们主要以四则运算为执行任务来测试:

Makefile
mythread : test.cppg++ -o $@ $^ -std=c++11 -lpthread.PHONY : clean
clean:rm -f mythread log.txt
Block_queue.hpp
#pragma once
#include <iostream>
#include <queue>
#include <pthread.h>static const int MAXCAP = 500;// 阻塞队列
template <class T>
class BlockQueue
{
public:BlockQueue(const int& maxcap = MAXCAP):_maxcap(maxcap){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_pcond, nullptr);pthread_cond_init(&_ccond, nullptr);}// 生产操作void put(const T& in) // 输入型参数:const&{pthread_mutex_lock(&_mutex);// 这里必须使用while,而不能是ifwhile(is_full()) // 若队列为满则不能放数据{pthread_cond_wait(&_pcond, &_mutex);}// 走到这说明队列不为满,可以放资源_bq.push(in);pthread_cond_signal(&_ccond); // 唤醒消费者线程pthread_mutex_unlock(&_mutex);}// 消费操作void take(T* out) // 输出型参数:*   // 输入输出型参数:&{pthread_mutex_lock(&_mutex);// 这里必须使用while,而不能是ifwhile(is_empty()) // 若队列为空不能取资源{pthread_cond_wait(&_ccond, &_mutex);}// 走到这说明队列不为空,可以取资源*out = _bq.front(); _bq.pop();pthread_cond_signal(&_pcond); // 唤醒生产者线程pthread_mutex_unlock(&_mutex);}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_pcond);pthread_cond_destroy(&_ccond);}
private:bool is_empty() const{return _bq.size() == 0;}bool is_full() const{return _bq.size() == _maxcap;}
private:std::queue<T> _bq;int _maxcap; // 队列中元素的上限pthread_mutex_t _mutex; // 互斥锁pthread_cond_t _pcond; // 生产者对应的条件变量pthread_cond_t _ccond; // 消费者对应的条件变量
};
task.hpp
#pragma once
#include <iostream>
#include <functional>
#include <pthread.h>
#include <unistd.h>
#include <cstdio>
#include <string>
#include <fstream>
// 计算任务
class CalTask
{using func_t = std::function<int(int, int, char)>;
public:CalTask(){}CalTask(int x, int y, char op, func_t callback):_x(x), _y(y), _op(op), _callback(callback){}std::string operator()(){int result = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);return buffer;}std::string toTaskString(){char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);return buffer;}
private:int _x;int _y;char _op;func_t _callback;
};const std::string oper = "+-*/";int caltask(int x, int y, char op)
{int result = 0;switch(op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if(y == 0){std::cerr << "除零错误" << std::endl;result = -1;}else{result = x / y;}}break;default:break;}return result;
}// 保存任务
class SaveTask
{using func_t = std::function<void(const std::string&)>;
public:SaveTask(){}SaveTask(const std::string &message, func_t callback): _message(message), _callback(callback){}void operator()(){_callback(_message);}
private:std::string _message;func_t _callback;
};void savetask(const std::string& message)
{// c++的文件读写方式std::fstream fs("./log.txt", std::fstream::out | std::fstream::app);fs << message << "\n";fs.close();
}
test.cpp
#include "Block_queue.hpp"
#include "task.hpp"
#include <sys/types.h>
#include <stdlib.h>
#include <unistd.h>
#include <time.h>
#include <string>// 封装一下两个阻塞队列
template <class C, class S> // C:计算  S:存储
class BlockQueues
{
public:BlockQueues(){cal_bq = new BlockQueue<CalTask>();save_bq = new BlockQueue<SaveTask>();}~BlockQueues(){delete cal_bq;delete save_bq;}
public:BlockQueue<C> *cal_bq;BlockQueue<S> *save_bq;
};// 生产者线程
void* Productor(void* bqs)
{BlockQueue<CalTask>* bq = (static_cast<BlockQueues<CalTask, SaveTask>*>(bqs))->cal_bq;while(true){// 生产任务int x = rand() % 100 + 1;int y = rand() % 100;char op = oper[rand() % oper.size()];CalTask ct(x, y, op, caltask);bq->put(ct);std::cout << "productor thread, 生产计算任务: " << ct.toTaskString() << std::endl;sleep(1);}return nullptr;
}// 消费者线程
void* Consumer(void* bqs)
{BlockQueue<CalTask>* bq = (static_cast<BlockQueues<CalTask, SaveTask>*>(bqs))->cal_bq;BlockQueue<SaveTask>* save_bq = (static_cast<BlockQueues<CalTask, SaveTask>*>(bqs))->save_bq;while(true){// 消费任务CalTask t;bq->take(&t);// 执行任务std::string result = t(); // 任务非常耗时!!std::cout << "Consumer thread,完成计算任务: " << result << " ... done"<< std::endl;// 生产任务SaveTask st(result, savetask);save_bq->put(st);std::cout << "Consumer thread,推送存储任务完成..." << std::endl; sleep(1);}return nullptr;
}// 保存者/消费者线程
void* Saver(void* bqs)
{BlockQueue<SaveTask>* save_bq = (static_cast<BlockQueues<CalTask, SaveTask>*>(bqs))->save_bq;while(true){// 消费任务SaveTask st;save_bq->take(&st);// 执行任务std::cout << "save thread,保存任务完成..." << std::endl; st();}return nullptr;
}int main()
{srand((unsigned int)time(nullptr) ^ getpid()); // 生成随机种子BlockQueues<CalTask, SaveTask> bqs;pthread_t consumer, productor, saver;pthread_create(&productor, nullptr, Productor, &bqs);pthread_create(&consumer, nullptr, Consumer, &bqs);pthread_create(&saver, nullptr, Saver, &bqs);pthread_join(consumer, nullptr);pthread_join(productor, nullptr);pthread_join(saver, nullptr);return 0;
}

在这里插入图片描述

​ 上面只是三个单独的生产者消费者线程在执行,其实我们还可以产生更多的生产者或者消费者同时执行,读者自行尝试!

Ⅳ. 如何理解提高了效率❓❓❓

​ 生产者消费者模型可以提高效率的原因在于它可以 有效地利用多核处理器的并行性,从而充分利用系统资源。例如,在一个多线程的程序中,生产者可以在一个线程中运行,而消费者可以在另一个线程中运行,这样就可以同时进行数据的生产和处理,而不必等待一个操作完成后再执行另一个操作。

​ 要知道,其实这种模型是串行的方式通过阻塞队列的,所以其实相对于生产者直接向消费者投递资源、消费者直接向生产者拿资源来说其实效率不是高的很明显,而关键提高效率的要点在于生产者投放资源是需要消耗时间的,这段时间对于其它的生产者来说是不影响的可以继续投放,提高了效率;对于消费者来说,拿取资源的时候,也是需要消耗时间的,但是这段时间内其它的消费者也能拿取资源。总的来说,也就是阻塞队列是在不断工作的,效率真正高在生产者和消费者生产资源和消费资源所消耗的这段时间,并不影响阻塞队列中其它生产者和消费者的工作!

​ 此外,生产者消费者模型还可以 通过使用缓冲区来平衡生产者和消费者之间的速度差异。当生产者生产数据的速度快于消费者处理数据的速度时,数据可以暂存在缓冲区中,以免数据丢失或者降低系统的效率。反之,当消费者处理数据的速度快于生产者生产数据的速度时,缓冲区中可以存储更多的数据,以便消费者随时获取数据。

​ 总之,生产者消费者模型通过合理的分配任务、优化资源利用、平衡生产和消费速度等手段,可以提高系统的效率。

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/904565.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/904565.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Vue】全局事件总线 TodoList 事件总线

目录 一、 实现所有组件看到x事件 二、 实现$on $off 以及 $emit 总结不易~ 本章节对我有很大的收获&#xff0c; 希望对你也是&#xff01;&#xff01;&#xff01; 本节素材已上传至Gitee&#xff1a;yihaohhh/我爱Vue - Gitee.com 全局事件总线图&#xff1a; 本节素材…

Python编程virtualenv库的简介和使用方法

Python编程virtualenv库的简介和使用方法 virtualenv和conda的区别是什么

MySQL的行级锁锁的到底是什么?

大家好&#xff0c;我是锋哥。今天分享关于【MySQL的行级锁锁的到底是什么?】面试题。希望对大家有帮助&#xff1b; MySQL的行级锁锁的到底是什么? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 MySQL的行级锁是数据库管理系统&#xff08;DBMS&#xff09;的一…

【C++游戏引擎开发】第33篇:物理引擎(Bullet)—射线检测

一、射线检测核心理论体系 1.1 射线检测的数学基础 1.1.1 参数化射线方程 射线在三维空间中的数学表达采用参数方程: r ( t ) = o + t d ^ ( t ∈ [

【操作系统】线程崩溃机制详解

在分布式系统与多线程编程的世界里&#xff0c;一个看似简单的问题却暗藏玄机&#xff1a;当某条线程突然崩溃&#xff0c;其所属进程会随之消亡吗&#xff1f;这个问题背后隐藏着操作系统与编程语言的精妙设计&#xff0c;本文将从底层原理到工程实践层层剖析。 一、线程崩溃…

无人机 | 无人机设计概述

无人机设计是一个复杂的系统工程&#xff0c;涉及空气动力学、电子技术、材料科学、控制算法等多个领域的综合应用。以下是无人机设计的主要模块和关键要素概述&#xff1a; 一、总体设计目标 任务需求定义 用途&#xff1a;航拍、物流、农业、军事侦察、环境监测等性能指标&am…

强啊!Oracle Database 23aiOracle Database 23ai:使用列别名进行分组排序!

大家好&#xff0c;这里是架构资源栈&#xff01;点击上方关注&#xff0c;添加“星标”&#xff0c;一起学习大厂前沿架构&#xff01; 从 Oracle Database 23ai 开始&#xff0c;您可以在 GROUP BY 和 HAVING 子句中直接使用列别名。此功能在早期版本的 Oracle Database 中不…

Modbus 转 IEC61850 网关

第一章 产品概述 Modbus 转 IEC61850 网关型号 SG-IEC61850-Modbus &#xff0c;是三格电子推出的工业级网关&#xff08;以 下简称盒子或网关&#xff09;&#xff0c;主要用于 Modbus RTU/TCP 数据采集、 DLT645-1997/2007 数据采集&#xff0c; 可接多功能电力仪表…

MySQL 中的 MVCC 是什么?

MySQL 中的 MVCC&#xff08;Multi-Version Concurrency Control&#xff0c;多版本并发控制&#xff09; 是一种用于实现高并发读写操作的机制&#xff0c;它通过维护数据的多个版本来解决读写冲突&#xff0c;从而在保证事务隔离性的同时&#xff0c;减少锁的使用&#xff0c…

【Python】让Selenium 像Beautifulsoup一样,用解析HTML 结构的方式提取元素!

我在使用selenium的find_element的方式去获取网页元素&#xff0c;一般通过xpath、css_selector、class_name的方式去获取元素的绝对位置。 但是有时候如果网页多了一些弹窗或者啥之类的&#xff0c;绝对位置会发生变化&#xff0c;使用xpath等方法&#xff0c;需要经常变动。…

使用xlwings将excel表中将无规律的文本型数字批量转化成真正的数字

之前我写了一篇文章excel表中将无规律的文本型数字批量转化成真正的数字-CSDN博客 是使用excel自带的操作&#xff0c;相对繁琐。 今天使用xlwings操作&#xff0c;表格如下&#xff08;有真正的数字&#xff0c;也有文本型数字&#xff0c;混在在一起&#xff09;&#xff1…

ICML 2025录取率公布,spotlight posters仅占2.6%

近日&#xff0c;ICML 2025公布了论文录用结果。本次大会共收到 12,107篇有效论文投稿&#xff0c;比去年增加了28%&#xff0c;今年录取论文3,260篇&#xff0c;录取率为 26.9%。其中仅有313篇被列为“焦点海报”&#xff08;即所有投稿中排名前2.6%的论文&#xff09;&#x…

全局网络:重构数字时代的连接范式

从局部到全局 —— 网络架构的范式革命 在全球化与数字化深度融合的今天&#xff0c;传统网络架构的 “碎片化” 问题日益凸显&#xff1a;跨地域数据流通低效、设备互联孤岛化、安全策略难以统一。 全局网络作为一种突破地域与技术边界的新型网络架构&#xff0c;正成为企业…

SpringAI实现AI应用-内置顾问

SpringAI实战链接 1.SpringAl实现AI应用-快速搭建-CSDN博客 2.SpringAI实现AI应用-搭建知识库-CSDN博客 3.SpringAI实现AI应用-内置顾问-CSDN博客 4.SpringAI实现AI应用-使用redis持久化聊天记忆-CSDN博客 5.SpringAI实现AI应用-自定义顾问&#xff08;Advisor&#xff09…

Nginx核心原理以及案例分析(AI)

一、Nginx核心原理分析 1. ‌事件驱动与非阻塞模型‌ ‌Epoll异步机制‌&#xff1a;基于Linux的epoll模型实现异步非阻塞I/O处理&#xff0c;单线程可高效管理数万并发连接&#xff0c;避免传统select模型的轮询性能瓶颈。‌多进程架构‌&#xff1a;采用Master-Worker模式&…

【Bug经验分享】SourceTree用户设置必须被修复/SSH 主机密钥未缓存(踩坑)

文章目录 配置错误问题原因配置错误问题解决主机密钥缓存问题原因主机密钥缓存问题解决 更多相关内容可查看 配置错误问题原因 电脑太卡&#xff0c;曾多次强制关机&#xff0c;在关机前没有关闭SourceTree&#xff0c;导致配置错误等问题 配置错误问题解决 方式一&#xff…

阿里云服务器-centos部署定时同步数据库数据-dbswitch

前言&#xff1a; 本文章介绍通过dbswitch工具实现2个mysql数据库之间实现自动同步数据。 应用场景&#xff1a;公司要求实现正式环境数据库数据自动冷备 dbswitch依赖环境&#xff1a;git ,maven,jdk 方式一&#xff1a; 不需要在服务器中安装git和maven&#xff0c;直接用…

windows10 环境下通过huggingface_hub下载huggingface社区模型

项目场景&#xff1a; 有一些模型需要在huggingface下载&#xff0c;因为国内限制&#xff0c;一般无法访问huggingface网站进行下载。然而&#xff0c;可以通过国内的镜像下载。网上大部分都是在linux系统下&#xff0c;通过huggingface提供的指令下载。本文针对采用python脚…

C++之异常

目录 前言 一、什么是异常 二、C中的异常 2.1 C语言中的异常处理 2.2 C中的异常处理 2.3 异常的抛出与捕获 2.4 栈展开 2.5 查找匹配的处理代码 2.6 异常重新抛出 2.7 异常安全问题 2.8 异常规范 2.9 标准库的异常 前言 在之前我们已经学习了C中不少知识了&#xff0c;但是其中…

$在R语言中的作用

在 R 语言中&#xff0c;$ 是一个非常重要的操作符&#xff0c;主要用于访问对象的成员或组件。它的用途非常广泛&#xff0c;不仅限于数据框&#xff08;data frame&#xff09;&#xff0c;还可以用于列表&#xff08;list&#xff09;、环境&#xff08;environment&#xf…