目录
线程池
介绍
分类
实现
工作原理
核心组成
拒绝策略
固态线程池
功能
std::future
实现
拒绝策略支持
提交任务
超时取消
用户检测取消
安全销毁
代码
测试
线程池
介绍
线程池(图解,本质,模拟实现代码),添加单例模式(懒汉思路+代码)_线程池单例-CSDN博客
- 包括线程池介绍+使用pthread库接口
这里我们就使用c++11中的thread库来实现一下,并且引入更多特性
分类
固定大小线程池(Fixed-size Thread Pool):
线程池中的线程数在初始化时被设定并保持固定,不会根据负载自动扩展或收缩。
适用于负载较为平稳的场景,例如Web服务器、数据库连接池等。
动态线程池(DynamicThread Pool):
线程池的线程数是动态变化的,根据任务的数量来增加或减少线程数。
当线程池中没有任务时,线程会被回收(通常有一个最大线程数限制)。
适用于任务量不稳定、并发变化较大的场景,如文件处理、短时间的批量任务等。
单线程化线程池(Single-threaded Thread Pool):
线程池中只有一个线程,所有任务都会按顺序提交给这个线程执行。
适用于串行化任务,如日志记录、事件驱动任务等。
调度线程池(Scheduled Thread Pool):
该线程池支持任务的延迟执行和周期性执行。
通常用于定时任务或周期性任务,例如定时清理缓存等。
实现
工作原理
- 线程池初始化:线程池初始化时创建一定数量的工作线程,并使其处于“等待”状态,准备执行任务
- 任务提交:当有新任务提交时,线程池将任务放入任务队列中
- 任务执行:线程池中的空闲线程从任务队列中取出任务并执行
- 任务完成:任务执行完后,线程回到等待状态,准备接收新的任务
- 线程销毁:当线程池中的线程闲置超过一定时间,线程池可以销毁一些线程以释放系统资源(适用于动态线程池)
核心组成
一个线程池一般由以下几个核心组件组成:
线程工作线程集合
- 一组预先创建的固定数量或动态伸缩(包括核心线程和临时创建)的线程
- 每个线程循环从任务队列中获取任务执行
任务队列
- 用于存放等待执行的任务,一般为线程安全的队列(如
std::queue + mutex
),支持任务入队/出队任务提交接口
- 对外暴露的函数接口,用于将任务提交到线程池,如
submit()
同步机制
- 用于保护共享资源和协调线程间关系(每个线程都要访问任务队列)
- 常用
mutex
,condition_variable
,atomic
等任务拒绝策略(Rejection Policy)
- 当队列已满时,决定如何处理新任务(下面介绍)
生命周期管理(Shutdown & Destruction)
- 控制线程池的启动、停止、销毁,确保线程安全退出并释放资源
可选的任务取消机制
- 支持任务在执行中被取消(比如因当前任务执行超时等原因)
可选的任务超时机制
- 为每个任务设置执行超时,到期未完成的任务自动取消或通知中断
拒绝策略
抛出异常,拒绝执行(默认策略)
- 抛出
RejectedExecutionException
异常,告知任务无法执行- 适合你希望及时发现问题并中止提交任务
由提交任务的线程执行
- 将任务回退给调用者,即由提交任务的线程执行任务,而不是交给线程池中的线程处理
可以起到“削峰”作用,但影响主线程性能(会阻塞提交线程)
静默丢弃
- 丢弃无法执行的任务,不抛出异常
- 适合对任务可丢弃、无严重后果的场景(如日志)
丢弃队列中最旧的任务,然后尝试重新提交当前任务
- 适合任务有时效性(如更新 UI、股票报价)
固态线程池
功能
支持:
固定线程数: 构造时指定线程数,固定数量线程常驻执行任务
有界队列: 支持设置任务队列最大容量,避免过载
拒绝策略支持: 支持 BLOCK(等待)、DISCARD(丢弃)、THROW(抛异常)三种策略
提交任务: 提交
void(exec_context)
类型任务,通过适配器转换为 void() 并存入任务队列,最终返回std::future<void>
超时取消: 自动按任务类型 (因为我这里是把自己写的搜索引擎项目中增设的线程池作为例子,所以这里的类型就分为建立索引,搜索,搜索联想) , 设置超时时间,到时通知任务取消
用户检测取消: 任务内部可用
canceled()
检测取消请求并安全退出安全销毁: 析构时安全停止所有线程并等待退出
线程安全: 所有关键资源由
mutex
和condition_variable
保护,支持多线程并发提交任务
std::future
std::future
<T>
是 C++11 引入的标准库类型
- 可以异步获取一个任务的执行结果
- 作用 -- 当你把一个任务提交给线程池时,这个任务可能要等一会儿才能执行(毕竟线程有限,排队中),那么你就需要一个东西来 “将来拿到结果”
实现
拒绝策略支持
定义了一个枚举类
enum class RejectionPolicy{BLOCK,DISCARD,THROW};
当任务提交后,根据当前的任务队列使用情况和拒绝策略的设置,决定对任务的处理方式
{std::unique_lock<std::mutex> lock(mtx_);// 阻塞策略:等待有空间if (tasks_.size() >= max_queue_size_ && !stop_){if (reject_policy_ == RejectionPolicy::BLOCK){//因为这里是带谓词的wait,所以即使虚假唤醒也会在条件为真后返回cond_.wait(lock, [this]{ return tasks_.size() < max_queue_size_ || stop_; });}else if (reject_policy_ == RejectionPolicy::DISCARD){throw std::runtime_error("Task queue is full. Task was discarded.");}else if (reject_policy_ == RejectionPolicy::THROW){throw std::runtime_error("Task queue is full.");}}if (stop_){throw std::runtime_error("ThreadPool is stopping. Cannot accept new tasks.");}tasks_.emplace([taskWrapper](){ (*taskWrapper)(); });}
这里的静默丢弃
- 虽然定义上是不抛出异常,但为了调用方知道任务没被接收,还是加上了
这里的while循环是为了防止虚假唤醒:
提交任务
// 传入void(exec_context) ,因为内部也需要配合template <class F>std::future<void> submit(const std::string &taskType, F &&f){auto timeout = getTimeoutForTask(taskType);auto cancellableTask = std::make_shared<CancellableTask>();cancellableTask->func = std::forward<F>(f);// 管理任务执行结果auto taskWrapper = std::make_shared<std::packaged_task<void()>>([cancellableTask](){ (*cancellableTask)(); });// 从packaged_task中获取一个关联的future对象std::future<void> taskFuture = taskWrapper->get_future();{std::unique_lock<std::mutex> lock(mtx_);// 阻塞策略:等待有空间if (tasks_.size() >= max_queue_size_ && !stop_){if (reject_policy_ == RejectionPolicy::BLOCK){cond_.wait(lock, [this]{ return tasks_.size() < max_queue_size_ || stop_; });}else if (reject_policy_ == RejectionPolicy::DISCARD){throw std::runtime_error("Task queue is full. Task was discarded.");}else if (reject_policy_ == RejectionPolicy::THROW){throw std::runtime_error("Task queue is full.");}}if (stop_){throw std::runtime_error("ThreadPool is stopping. Cannot accept new tasks.");}tasks_.emplace([taskWrapper](){ (*taskWrapper)(); });}cond_.notify_one();// 启动一个后台线程监控超时取消std::thread([taskFuture = std::move(taskFuture),controller = cancellableTask->controller, timeout]() mutable{if (taskFuture.wait_for(timeout) == std::future_status::timeout) {controller->notify_cancel();std::cerr << "[Timeout] Task exceeded time limit and was cancelled.\n";} }).detach();return taskFuture;}
这里因为任务队列中使用的void()类型,而外部传入的是void(exec_context) (因为需要内部配合停止)
- 所以需要对类型进行一个转换
- 也就是通过cancellableTask
这个
可取消的任务封装器,实际是一个无参可调用对象,内部实例化一个exec_context ,然后调用带参数版本的函数- 最后将 (*taskWrapper)() 放入队列,实际就是调用了cancellableTask的operator()()
- 于是实现了将void(exec_context) -> void()
超时取消
首先,是定义了两个模块,分别的作用是 -- 取消状态的设置 和 作为对外接口
// 控制标识符 struct exec_controller {bool notify_cancel() { return _should_cancel.exchange(true); }bool should_cancel() const { return _should_cancel; }private:std::atomic<bool> _should_cancel{false}; }; // 判断是否被取消 struct exec_context {exec_context(std::shared_ptr<exec_controller> impl): _impl(std::move(impl)) {}bool canceled() const { return _impl->should_cancel(); }private:std::shared_ptr<exec_controller> _impl; };
- 使用原子变量避免竞态条件
- 读取控制器可以被多个任务共享 (通过智能指针控制生命周期)
其次,定义了一个可取消的任务封装器
struct CancellableTask {// 封装一个取消控制器std::shared_ptr<exec_controller> controller =std::make_shared<exec_controller>();// 将取消上下文封装进普通任务中std::function<void(exec_context)> func;void operator()(){exec_context ctx{controller};func(ctx);} };
然后,还定义了根据不同任务类型,返回对应超时时间的函数
static std::chrono::millisecondsgetTimeoutForTask(const std::string &taskType){if (taskType == ns_helper::TASK_TYPE_BUILD_INDEX){return std::chrono::seconds(120);}else if (taskType == ns_helper::TASK_TYPE_PERSIST_INDEX){return std::chrono::seconds(4 * 3600);}else if (taskType == ns_helper::TASK_TYPE_SEARCH ||taskType == ns_helper::TASK_TYPE_AUTOCOMPLETE){return std::chrono::seconds(1);}else{return std::chrono::seconds(1);}}
- 我这里因为是搜索引擎,并且没有做性能优化,所以设置的时间比较长(跪)
并且,启动了一个后台线程去监控任务状态
// 启动一个后台线程监控超时取消std::thread([taskFuture = std::move(taskFuture),controller = cancellableTask->controller, timeout]() mutable{if (taskFuture.wait_for(timeout) == std::future_status::timeout) {controller->notify_cancel();std::cerr << "[Timeout] Task exceeded time limit and was cancelled.\n";} }).detach();
- 这里就只讲一下lambda内部逻辑,传参会在下面的核心逻辑介绍
- 总之,这个线程会等待任务完成timeout秒,如果未完成,则任务超时 -> 设置取消控制器的标识符
- 这里还设置了调用了datach,让它和主线程分离(也就是后台运行)
最后,就是核心逻辑了
// 传入void(exec_context) ,因为内部也需要配合template <class F>std::future<void> submit(const std::string &taskType, F &&f){auto timeout = getTimeoutForTask(taskType);auto cancellableTask = std::make_shared<CancellableTask>();cancellableTask->func = std::forward<F>(f);// 管理任务执行结果auto taskWrapper = std::make_shared<std::packaged_task<void()>>([cancellableTask](){ (*cancellableTask)(); });// 从packaged_task中获取一个关联的future对象std::future<void> taskFuture = taskWrapper->get_future();{std::unique_lock<std::mutex> lock(mtx_);// 阻塞策略:等待有空间if (tasks_.size() >= max_queue_size_ && !stop_){if (reject_policy_ == RejectionPolicy::BLOCK){cond_.wait(lock, [this]{ return tasks_.size() < max_queue_size_ || stop_; });}else if (reject_policy_ == RejectionPolicy::DISCARD){throw std::runtime_error("Task queue is full. Task was discarded.");}else if (reject_policy_ == RejectionPolicy::THROW){throw std::runtime_error("Task queue is full.");}}if (stop_){throw std::runtime_error("ThreadPool is stopping. Cannot accept new tasks.");}tasks_.emplace([taskWrapper](){ (*taskWrapper)(); });}cond_.notify_one();// 启动一个后台线程监控超时取消std::thread([taskFuture = std::move(taskFuture),controller = cancellableTask->controller, timeout]() mutable{if (taskFuture.wait_for(timeout) == std::future_status::timeout) {controller->notify_cancel();std::cerr << "[Timeout] Task exceeded time limit and was cancelled.\n";} }).detach();return taskFuture;}
- 其实大头设计就是那些组件,把它们组合起来使用+任务代码中显式检测取消标识,就能实现超时取消机制 -- 在后台单独启动一个线程,监控提交的任务是否在规定的超时时间内完成,如果超时则通知取消
用户检测取消
传入的任务也需要配合(检测取消标识符,为真时结束执行)
void test_cancel_task(exec_context ctx) {std::cout << "Task started.\n";for (int i = 0; i < 50; ++i){if (ctx.canceled()){std::cout << "Task detected cancellation, exiting early.\n";return;}std::this_thread::sleep_for(std::chrono::milliseconds(100));std::cout << "Task working... step " << i << "\n";}std::cout << "Task completed normally.\n"; }
安全销毁
explicit FixedThreadPool(size_t thread_count, size_t max_queue_size = 1000,RejectionPolicy policy = RejectionPolicy::BLOCK): max_queue_size_(max_queue_size), reject_policy_(policy), stop_(false){for (size_t i = 0; i < thread_count; ++i){workers_.emplace_back([this]{while (true) {std::function<void()> task;{std::unique_lock<std::mutex> lock(mtx_);cond_.wait(lock, [this] { return stop_ || !tasks_.empty(); });if (stop_ && tasks_.empty())return;task = std::move(tasks_.front());tasks_.pop();}task();} });}}
~FixedThreadPool(){{std::unique_lock<std::mutex> lock(mtx_);stop_ = true;}cond_.notify_all();for (std::thread &worker : workers_){if (worker.joinable()){worker.join();}}}
- 释放资源前,先设置停止标志,并唤醒所有线程,让线程检测到停止标识后,安全地将队列中的遗留任务处理完毕,然后释放线程资源
代码
#include <atomic> #include <chrono> #include <condition_variable> #include <exception> #include <functional> #include <future> #include <iostream> #include <mutex> #include <optional> #include <queue> #include <stdexcept> #include <string> #include <thread> #include <vector>#include "../code/assistance.hpp"// --- exec control logic ---struct exec_controller {bool notify_cancel() { return _should_cancel.exchange(true); }bool should_cancel() const { return _should_cancel; }private:std::atomic<bool> _should_cancel{false}; };struct exec_context {exec_context(std::shared_ptr<exec_controller> impl): _impl(std::move(impl)) {}bool canceled() const { return _impl->should_cancel(); }private:std::shared_ptr<exec_controller> _impl; };// --- CancellableTask ---struct CancellableTask {std::shared_ptr<exec_controller> controller =std::make_shared<exec_controller>();std::function<void(exec_context)> func;void operator()(){exec_context ctx{controller};func(ctx);} };// --- FixedThreadPool ---class FixedThreadPool { public:enum class RejectionPolicy{BLOCK,DISCARD,THROW};explicit FixedThreadPool(size_t thread_count, size_t max_queue_size = 1000,RejectionPolicy policy = RejectionPolicy::BLOCK): max_queue_size_(max_queue_size), reject_policy_(policy), stop_(false){for (size_t i = 0; i < thread_count; ++i){workers_.emplace_back([this]{while (true) {std::function<void()> task;{std::unique_lock<std::mutex> lock(mtx_);cond_.wait(lock, [this] { return stop_ || !tasks_.empty(); });if (stop_ && tasks_.empty())return;task = std::move(tasks_.front());tasks_.pop();}task();} });}}~FixedThreadPool(){{std::unique_lock<std::mutex> lock(mtx_);stop_ = true;}cond_.notify_all();for (std::thread &worker : workers_){if (worker.joinable()){worker.join();}}}template <class F>std::future<void> submit(const std::string &taskType, F &&f){auto timeout = getTimeoutForTask(taskType);auto cancellableTask = std::make_shared<CancellableTask>();cancellableTask->func = std::forward<F>(f);auto taskWrapper = std::make_shared<std::packaged_task<void()>>([cancellableTask](){ (*cancellableTask)(); });std::future<void> taskFuture = taskWrapper->get_future();{std::unique_lock<std::mutex> lock(mtx_);// 阻塞策略:等待有空间if (reject_policy_ == RejectionPolicy::BLOCK){cond_.wait(lock,[this]{ return tasks_.size() < max_queue_size_ || stop_; });}// 丢弃策略:抛出异常(不再返回默认构造的 future)else if (reject_policy_ == RejectionPolicy::DISCARD){if (tasks_.size() >= max_queue_size_){throw std::runtime_error("Task queue is full. Task was discarded.");}}// 异常策略:同样抛出异常else if (reject_policy_ == RejectionPolicy::THROW){if (tasks_.size() >= max_queue_size_){throw std::runtime_error("Task queue is full.");}}if (stop_){throw std::runtime_error("ThreadPool is stopping. Cannot accept new tasks.");}tasks_.emplace([taskWrapper](){ (*taskWrapper)(); });}cond_.notify_one();// 启动一个后台线程监控超时取消std::thread([taskFuture = std::move(taskFuture),controller = cancellableTask->controller, timeout]() mutable{if (taskFuture.wait_for(timeout) == std::future_status::timeout) {controller->notify_cancel();std::cerr << "[Timeout] Task exceeded time limit and was cancelled.\n";} }).detach();return taskFuture;}private:static std::chrono::millisecondsgetTimeoutForTask(const std::string &taskType){if (taskType == ns_helper::TASK_TYPE_BUILD_INDEX){return std::chrono::seconds(120);}else if (taskType == ns_helper::TASK_TYPE_PERSIST_INDEX) // ✅ 修复这里{return std::chrono::seconds(4 * 3600);}else if (taskType == ns_helper::TASK_TYPE_SEARCH ||taskType == ns_helper::TASK_TYPE_AUTOCOMPLETE){return std::chrono::seconds(1);}else{return std::chrono::seconds(1);}}std::vector<std::thread> workers_;std::queue<std::function<void()>> tasks_;std::mutex mtx_;std::condition_variable cond_;bool stop_;size_t max_queue_size_;RejectionPolicy reject_policy_; };
测试
void test_cancel_task(exec_context ctx) {std::cout << "Task started.\n";for (int i = 0; i < 50; ++i){if (ctx.canceled()){std::cout << "Task detected cancellation, exiting early.\n";return;}std::this_thread::sleep_for(std::chrono::milliseconds(100));std::cout << "Task working... step " << i << "\n";}std::cout << "Task completed normally.\n"; }int main() {FixedThreadPool fp(2, 10, FixedThreadPool::RejectionPolicy::BLOCK);auto future = fp.submit("search", test_cancel_task);try{if (future.valid()){future.get();}}catch (const std::exception &e){std::cout << "Task threw exception: " << e.what() << "\n";}return 0; }
动态线程池会在另一篇讲(我还没写 1 - 1) ,大家也可以帮我纠一纠错(跪倒)