线程池流程总结:
1、构造函数中创建线程,并添加到线程池(构造函数返回时,线程自动启动,并停在等待wait:从线程池取出一个任务处);
2、主线程中添加任务,到任务队列。并用“条件变量”通知一个线程,从线程池取出一个任务;
3、取出任务后,执行线程的任务函数 =》回调添加的“实际的线程函数”;
4、主线程执行完,return返回 =》调用线程池析构函数;
5、“条件变量”通知所有线程停止,使得线程循环退出,并等待所有线程完成任务;
6、主线程main结束。
一、C++线程池1
1、用c++封装线程池
#include <iostream>
#include <vector>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>using namespace std;class ThreadPool {
public:ThreadPool(size_t threads) : stop(false) {for (size_t i = 0; i < threads; ++i) {//一、lambda表达式返回值是一个线程对象,为什么没有看见创建线程的语句?// thread是什么时候创建的呢?// //二、lambda表达式什么时候执行?// 1.当线程池的构造函数返回时,线程池中的线程才开始运行// 2.当你创建一个std::thread对象并传入一个函数时(对象的实例化),// 这个线程会自动开始执行该函数。因此,通常你不需要显式调用start()方法!//lambda表达式创建线程,并将线程加入线程池workers.emplace_back([this] {//线程循环,不断从任务队列中取出任务并执行while (true) {//取出任务std::function<void()> task;{//互斥锁保护:任务队列和线程池停止状态std::unique_lock<std::mutex> lock(this->queueMutex);bool empty = this->tasks.empty();bool stopped = this->stop;//等待条件变量通知或线程池停止this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });//线程池停止且任务队列为空,退出线程循环if (this->stop && this->tasks.empty()) return;//取出任务task = std::move(this->tasks.front());//从任务队列中删除任务this->tasks.pop();}//执行任务task();}});}}//定义任务入队函数模板template<class F, class... Args>void enqueue(F&& fun, Args&&... args) // 添加任务到任务队列(传递:线程函数、参数){//将任务封装成std::functionauto task = std::bind(std::forward<F>(fun), std::forward<Args>(args)...);{//互斥锁保护:任务队列和线程池停止状态std::unique_lock<std::mutex> lock(queueMutex);//线程池停止if (stop) throw std::runtime_error("Enqueue on stopped ThreadPool");//将任务加入任务队列tasks.emplace(task);}//通知一个线程condition.notify_one();}// 析构函数:等待所有线程完成任务,并停止线程池~ThreadPool() {{//互斥锁保护:线程池停止状态std::unique_lock<std::mutex> lock(queueMutex);stop = true;}//通知所有线程condition.notify_all();//等待所有线程完成任务for (std::thread& worker : workers) {worker.join();}}
private:std::vector<std::thread> workers;//线程池std::queue<std::function<void()>> tasks;//任务队列std::mutex queueMutex;//互斥锁(保护任务队列tasks 和线程池停止状态stop)std::condition_variable condition;//条件变量(加入任务到任务队列时通知一个线程)bool stop;//线程池是否停止
};
2、main测试
int main() {ThreadPool pool(2); // 创建一个包含4个线程的线程池for (int i = 0; i < 2; ++i) { // 添加2个任务到线程池中执行//任务入队函数模板:输出任务编号和线程IDpool.enqueue([i] { std::cout << "Task " << i << " is executed by thread " << std::this_thread::get_id() << std::endl; });}system("pause");return 0; // 主线程等待所有工作线程完成(在析构函数中处理)
}
3、通过测试,可以看出“线程池执行步骤”:
1、主线程 ThreadPool pool(2); // 创建一个包含4个线程的线程池
ThreadPool构造函数中:向“线程池workers”中添加lambda表达式形式的线程
//1.1线程循环,不断从任务队列中取出任务并执行
while(true)
{//1.2等待:条件变量通知或线程池停止this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
}2、主线程中添加任务,到任务队列 函数模板:输出任务编号和线程ID
pool.enqueue([i] { std::cout << "Task " << i << " is executed by thread " << std::this_thread::get_id() << std::endl; });//2.1将任务加入任务队列(先用包装器function包装任务)
tasks.emplace(task);//2.2条件变量通知一个线程(每加入一个任务,就通知一次线程执行任务!!)
condition.notify_one();//2.3线程循环中的等待(条件变量通知),满足条件,开始向下执行!
this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });//2.4从任务队列取出任务
task = std::move(this->tasks.front());//2.5 执行任务(回调)task();=》回调“实际的线程函数”=》就是添加进来的lambda表达式:std::cout << "Task " << i << " is executed by thread " << std::this_thread::get_id() << std::endl; 3、主线程执行完。return 0; // 主线程返回//3.1 调用线程池析构函数
~ThreadPool()
{//1.设置线程停止标识为truestop = true;//2.通知所有线程condition.notify_all();=》//线程循环while(true){//while循环一直在这等待!(lock满足条件,向下执行!)this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });//当线程任务执行完成:stop = true;//线程池停止且任务队列为空,退出线程循环if (this->stop && this->tasks.empty()) return;}3.//等待所有线程完成任务worker.join();
}4、主线程结束
二、C++线程池2
1、线程池.h头文件
#ifndef _thread_pool_HPP
#define _thread_pool_HPP#include <vector>
#include <deque>
#include <thread>
#include <functional>
#include <mutex>
#include <condition_variable>//!
//! convenience macro to log with file and line information
//!
#ifdef __SOLA_LOGGING_ENABLED
#define __SOLA_LOG(level, msg) sola::level(msg, __FILE__, __LINE__);
#else
#define __SOLA_LOG(level, msg)
#endif /* __SOLA_LOGGING_ENABLED */namespace sola {class logger_iface {
public://! ctorlogger_iface(void) = default;//! dtorvirtual ~logger_iface(void) = default;//! copy ctorlogger_iface(const logger_iface&) = default;//! assignment operatorlogger_iface& operator=(const logger_iface&) = default;public://!//! debug logging//!//! \param msg message to be logged//! \param file file from which the message is coming//! \param line line in the file of the message//!virtual void debug(const std::string& msg, const std::string& file, std::size_t line) = 0;//!//! info logging//!//! \param msg message to be logged//! \param file file from which the message is coming//! \param line line in the file of the message//!virtual void info(const std::string& msg, const std::string& file, std::size_t line) = 0;//!//! warn logging//!//! \param msg message to be logged//! \param file file from which the message is coming//! \param line line in the file of the message//!virtual void warn(const std::string& msg, const std::string& file, std::size_t line) = 0;//!//! error logging//!//! \param msg message to be logged//! \param file file from which the message is coming//! \param line line in the file of the message//!virtual void error(const std::string& msg, const std::string& file, std::size_t line) = 0;
};//!
//! default logger class provided by the library
//!
class logger : public logger_iface {
public://!//! log level//!enum class log_level {error = 0,warn = 1,info = 2,debug = 3};public://! ctorlogger(log_level level = log_level::info);//! dtor~logger(void) = default;//! copy ctorlogger(const logger&) = default;//! assignment operatorlogger& operator=(const logger&) = default;public://!//! debug logging//!//! \param msg message to be logged//! \param file file from which the message is coming//! \param line line in the file of the message//!void debug(const std::string& msg, const std::string& file, std::size_t line);//!//! info logging//!//! \param msg message to be logged//! \param file file from which the message is coming//! \param line line in the file of the message//!void info(const std::string& msg, const std::string& file, std::size_t line);//!//! warn logging//!//! \param msg message to be logged//! \param file file from which the message is coming//! \param line line in the file of the message//!void warn(const std::string& msg, const std::string& file, std::size_t line);//!//! error logging//!//! \param msg message to be logged//! \param file file from which the message is coming//! \param line line in the file of the message//!void error(const std::string& msg, const std::string& file, std::size_t line);private://!//! current log level in use//!log_level m_level;//!//! mutex used to serialize logs in multithreaded environment//!std::mutex m_mutex;
};//!
//! variable containing the current logger
//! by default, not set (no logs)
//!
extern std::unique_ptr<logger_iface> active_logger;//!
//! debug logging
//! convenience function used internally to call the logger
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void debug(const std::string& msg, const std::string& file, std::size_t line);//!
//! info logging
//! convenience function used internally to call the logger
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void info(const std::string& msg, const std::string& file, std::size_t line);//!
//! warn logging
//! convenience function used internally to call the logger
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void warn(const std::string& msg, const std::string& file, std::size_t line);//!
//! error logging
//! convenience function used internally to call the logger
//!
//! \param msg message to be logged
//! \param file file from which the message is coming
//! \param line line in the file of the message
//!
void error(const std::string& msg, const std::string& file, std::size_t line);class thread_pool{public://任务包装器typedef std::function<void()> task_t;thread_pool(int init_size = 3);~thread_pool();void stop();void add_task(const task_t&); //thread safe; 添加任务private:thread_pool(const thread_pool&);//禁止复制拷贝.const thread_pool& operator=(const thread_pool&);bool is_started() { return m_is_started; }void start();//启动线程池void thread_loop();//线程循环函数task_t take();//取任务函数private:typedef std::vector<std::thread*> threads_t;//线程容器typedef std::deque<task_t> tasks_t;//任务队列int m_init_threads_size;//初始线程数量threads_t m_threads;//线程容器tasks_t m_tasks;//任务队列std::mutex m_mutex;//互斥锁std::condition_variable m_cond;//条件变量bool m_is_started;//线程池是否启动};}
#endif
2、线程池cpp文件
#include <assert.h>
#include <iostream>
#include <sstream>
#include "thread_pool.hpp"namespace sola {std::unique_ptr<logger_iface> active_logger = nullptr;static const char black[] = {0x1b, '[', '1', ';', '3', '0', 'm', 0};static const char red[] = {0x1b, '[', '1', ';', '3', '1', 'm', 0};static const char yellow[] = {0x1b, '[', '1', ';', '3', '3', 'm', 0};static const char blue[] = {0x1b, '[', '1', ';', '3', '4', 'm', 0};static const char normal[] = {0x1b, '[', '0', ';', '3', '9', 'm', 0};logger::logger(log_level level): m_level(level) {}voidlogger::debug(const std::string& msg, const std::string& file, std::size_t line) {if (m_level >= log_level::debug) {std::lock_guard<std::mutex> lock(m_mutex);std::cout << "[" << black << "DEBUG" << normal << "][sola::logger][" << file << ":" << line << "] " << msg << std::endl;}}voidlogger::info(const std::string& msg, const std::string& file, std::size_t line) {if (m_level >= log_level::info) {std::lock_guard<std::mutex> lock(m_mutex);std::cout << "[" << blue << "INFO " << normal << "][sola::logger][" << file << ":" << line << "] " << msg << std::endl;}}voidlogger::warn(const std::string& msg, const std::string& file, std::size_t line) {if (m_level >= log_level::warn) {std::lock_guard<std::mutex> lock(m_mutex);std::cout << "[" << yellow << "WARN " << normal << "][sola::logger][" << file << ":" << line << "] " << msg << std::endl;}}voidlogger::error(const std::string& msg, const std::string& file, std::size_t line) {if (m_level >= log_level::error) {std::lock_guard<std::mutex> lock(m_mutex);std::cerr << "[" << red << "ERROR" << normal << "][sola::logger][" << file << ":" << line << "] " << msg << std::endl;}}voiddebug(const std::string& msg, const std::string& file, std::size_t line) {if (active_logger)active_logger->debug(msg, file, line);}voidinfo(const std::string& msg, const std::string& file, std::size_t line) {if (active_logger)active_logger->info(msg, file, line);}voidwarn(const std::string& msg, const std::string& file, std::size_t line) {if (active_logger)active_logger->warn(msg, file, line);}voiderror(const std::string& msg, const std::string& file, std::size_t line) {if (active_logger)active_logger->error(msg, file, line);}static std::stringget_tid(){std::stringstream tmp;tmp << std::this_thread::get_id();return tmp.str();}//线程池构造函数thread_pool::thread_pool(int init_size):m_init_threads_size(init_size),m_mutex(),m_cond(),m_is_started(false){//构造函数中自动启动线程池start();}thread_pool::~thread_pool(){//如果线程池已经启动,则先停止if(m_is_started){stop();}}//启动线程池void thread_pool::start(){assert(m_threads.empty());m_is_started = true;m_threads.reserve(m_init_threads_size);//预先给线程容器分配空间for (int i = 0; i < m_init_threads_size; ++i){//创建线程并加入线程容器(线程函数为thread_loop)m_threads.push_back(new std::thread(std::bind(&thread_pool::thread_loop, this)));}}//停止线程池void thread_pool::stop(){__SOLA_LOG(debug, "thread_pool::stop() stop.");{std::unique_lock<std::mutex> lock(m_mutex);m_is_started = false;//通知所有线程停止m_cond.notify_all();__SOLA_LOG(debug, "thread_pool::stop() notifyAll().");}for (threads_t::iterator it = m_threads.begin(); it != m_threads.end() ; ++it){(*it)->join(); //等待线程退出delete *it; //释放线程资源}m_threads.clear();//清空线程容器}//线程池线程函数void thread_pool::thread_loop(){__SOLA_LOG(debug, "thread_pool::threadLoop() tid : " + get_tid() + " start.");//线程池线程循环while(m_is_started){//从任务队列中取出一个任务task_t task = take();//如果取出的任务不为空,则执行任务(std::function类型可以直接判断是否为空!)if(task){//执行任务:回调实际的任务函数task();}}__SOLA_LOG(debug, "thread_pool::threadLoop() tid : " + get_tid() + " exit.");}//向线程池添加任务void thread_pool::add_task(const task_t& task){std::unique_lock<std::mutex> lock(m_mutex);/*while(m_tasks.isFull()){//when m_tasks have maxsizecond2.notify_one();}*///向任务队列中添加任务m_tasks.push_back(task);//通知一个线程m_cond.notify_one();}//从线程池取出一个任务thread_pool::task_t thread_pool::take(){std::unique_lock<std::mutex> lock(m_mutex);//always use a while-loop, due to spurious wakeup//如果任务队列为空 + 线程池没有停止,则等待while(m_tasks.empty() && m_is_started){__SOLA_LOG(debug, "thread_pool::take() tid : " + get_tid() + " wait.");m_cond.wait(lock);}__SOLA_LOG(debug, "thread_pool::take() tid : " + get_tid() + " wakeup.");task_t task;tasks_t::size_type size = m_tasks.size();//如果任务队列不为空 + 线程池没有停止,则取出一个任务if(!m_tasks.empty() && m_is_started){task = m_tasks.front();m_tasks.pop_front();assert(size - 1 == m_tasks.size());/*if (TaskQueueSize_ > 0){cond2.notify_one();}*/}return task;}
}
3、测试main函数
#include <iostream>
#include <chrono>
#include <condition_variable>
#include "thread_pool.hpp"std::mutex g_mutex;void priorityFunc()
{for (int i = 1; i < 4; ++i){std::this_thread::sleep_for(std::chrono::seconds(1));std::lock_guard<std::mutex> lock(g_mutex);std::cout << "priorityFunc() [" << i << "] at thread [ " << std::this_thread::get_id() << "] output" << std::endl;}}void testFunc()
{// loop to print character after a random period of timefor (int i = 1; i < 4; ++i){std::this_thread::sleep_for(std::chrono::seconds(1));std::lock_guard<std::mutex> lock(g_mutex);std::cout << "testFunc() [" << i << "] at thread [ " << std::this_thread::get_id() << "] output" << std::endl;}}int main()
{sola::active_logger = std::unique_ptr<sola::logger>(new sola::logger(sola::logger::log_level::debug));sola::thread_pool thread_pool;// add tasks to the thread poolfor(int i = 0; i < 2 ; i++)thread_pool.add_task(testFunc);system("pause");return 0;
}
4、执行流程
1、主线程构造函数
thread_pool thread_pool;
{//构造函数中自动启动线程池start();
}start()
{//创建线程并加入线程容器(线程函数为thread_loop)m_threads.push_back(new std::thread(std::bind(&thread_pool::thread_loop, this)));
}//线程池线程函数
void thread_pool::thread_loop()
{//线程池线程循环while(m_is_started){//从任务队列中取出一个任务task_t task = take();//如果取出的任务不为空,则执行任务(std::function类型可以直接判断是否为空!)if(task){//执行任务:回调实际的任务函数task();}}
}//等待:从线程池取出一个任务
thread_pool::task_t thread_pool::take()
{//如果任务队列为空 + 线程池没有停止,则等待while(m_tasks.empty() && m_is_started){m_cond.wait(lock);}
}2、主线程中添加任务,到任务队列
thread_pool.add_task(testFunc);thread_pool::add_task(testFunc)
{//向任务队列中添加任务m_tasks.push_back(task);//通知一个线程m_cond.notify_one();
}//wait条件满足:从线程池取出一个任务
thread_pool::task_t thread_pool::take()
{//如果任务队列为空 + 线程池没有停止,则等待while(m_tasks.empty() && m_is_started){m_cond.wait(lock);}//如果任务队列不为空 + 线程池没有停止,则取出一个任务if(!m_tasks.empty() && m_is_started){task = m_tasks.front();m_tasks.pop_front();}return task;
}//线程池线程函数:执行线程的任务函数
void thread_pool::thread_loop()
{//线程池线程循环while(m_is_started){//从任务队列中取出一个任务task_t task = take();//如果取出的任务不为空,则执行任务(std::function类型可以直接判断是否为空!)if(task){//执行任务:回调实际的任务函数task();}}
}//task()回调任务函数
void testFunc()
{
}3、主线程执行完。return 0; // 主线程等待所有工作线程完成(在析构函数中处理)//3.1 调用线程池析构函数
~ThreadPool()
{
//1.调用stop
stop()
{//1.1通知所有线程停止m_cond.notify_all();=》//1.2使得线程循环退出!while(true){//while循环一直在这等待!(lock满足条件,向下执行!)//如果任务队列为空 + 线程池没有停止,则等待while(m_tasks.empty() && m_is_started){m_cond.wait(lock);}//当线程任务执行完成:stop = true;//线程池停止且任务队列为空,退出线程循环//如果任务队列不为空 + 线程池没有停止,则取出一个任务if(!m_tasks.empty() && m_is_started){task = m_tasks.front();m_tasks.pop_front();}return task;}1.3//等待所有线程完成任务(*it)->join(); //等待线程退出
}4、主线程结束