三. CachedThreadPool 的实现
3.1 需求:
动态调整线程数量:与 FixedThreadPool 不同,CachedThreadPool 的线程数量是动态调整的。当有新任务提交时,如果线程池中有空闲的线程,则会立即使用空闲线程执行任务;如果线程池中没有空闲线程,则会创建一个新的线程来执行任务。当线程空闲一段时间后,超过一定的时间(默认为 60 秒),会被回收销毁。
3.2 SyncQueue 同步队列的设计和实现
#ifndef SYNCQUEUE3_HPP
#define SYNCQUEUE3_HPP#include<list>
#include<mutex>
#include<condition_variable>
#include<iostream>using namespace std;template<class T>
class SyncQueue
{
private:std::list<T> m_queue; // 任务缓冲区mutable std::mutex m_mutex;std::condition_variable m_notEmpty; // Cstd::condition_variable m_notFull; // Psize_t m_waitTime; //任务队列满等待时间sint m_maxSize; // bool m_needStop; // true 同步队列不在接受新的任务//bool IsFull() const{bool full = m_queue.size() >= m_maxSize;if (full){printf("m_queue 已经满了,需要等待....\n");}return full;}bool IsEmpty() const{bool empty = m_queue.empty(); //if (empty){printf("m_queue 已经空了,需要等待....\n");}return empty;}// return 0; 成功// 1 ;// full// 2 ;// stop;template<class F>int Add(F&& x){std::unique_lock<std::mutex> locker(m_mutex);if (!m_notFull.wait_for(locker, std::chrono::seconds(m_waitTime),[this] { return m_needStop || !IsFull(); })){cout << "task queue full return 1" << endl;return 1;}if (m_needStop){cout << "同步队列停止工作..." << endl;return 2;}m_queue.push_back(std::forward<F>(x));m_notEmpty.notify_one();return 0;}public:SyncQueue(int maxsize = 100, int timeout = 1):m_maxSize(maxsize),m_waitTime(timeout),m_needStop(false) // 同步队列开始工作{}int Put(const T& x){return Add(x);}int Put(T&& x){return Add(std::forward<T>(x));}int notTask(){std::unique_lock<std::mutex> locker(m_mutex);if (!m_notEmpty.wait_for(locker, std::chrono::seconds(m_waitTime))== std::cv_status::timeout){return 1;}return 0;}void Take(std::list<T>& list) //{std::unique_lock<std::mutex> locker(m_mutex);while (!m_needStop && IsEmpty()){m_notEmpty.wait(locker);}if (m_needStop){cout << "同步队列停止工作..." << endl;return;}list = std::move(m_queue);m_notFull.notify_one();}//T& GetTake();// return 0; 成功// 1 ;// empty// 2 ;// stop;int Take(T& t) // 1{std::unique_lock<std::mutex> locker(m_mutex);if (!m_notEmpty.wait_for(locker, std::chrono::seconds(m_waitTime),[this] { return m_needStop || !IsEmpty(); })){return 1; // 队列空}if (m_needStop){cout << "同步队列停止工作..." << endl;return 2;}t = m_queue.front();m_queue.pop_front();m_notFull.notify_one();return 0;}void Stop(){std::unique_lock<std::mutex> locker(m_mutex);while (!IsEmpty()){m_notFull.wait(locker);}m_needStop = true;m_notFull.notify_all();m_notEmpty.notify_all();}bool Empty() const{std::unique_lock<std::mutex> locker(m_mutex);return m_queue.empty();}bool Full() const{std::unique_lock<std::mutex> locker(m_mutex);return m_queue.size() >= m_maxSize;}size_t size() const{std::unique_lock<std::mutex> locker(m_mutex);return m_queue.size();}
};#endif
3.3 CachedThreadPool 线程池的设计和实现
#ifndef CACHEDTHREADPOOL_HPP
#define CACHEDTHREADPOOL_HPP#include"SyncQueue3.hpp"
#include<functional>
#include<unordered_map>
#include<map>
#include<future>using namespace std;int MaxTaskCount = 2;
const int KeepAliveTime = 10; //线程最大存活时间 60 ,为测试改为10class CachedThreadPool
{
public:using Task = std::function<void(void)>;private:std::unordered_map<std::thread::id, std::shared_ptr<std::thread>> m_threadgroup;int m_coreThreadSize; // 核心的线程数量,下限阈值 2int m_maxThreadSize; // 最大的线程数量,上限阈值std::atomic_int m_idleThreadSize; // 空闲线程的数量std::atomic_int m_curThreadSize; // 当前线程池里面的线程总数量mutable std::mutex m_mutex; // SyncQueue<Task> m_queue;std::atomic_bool m_running; // true ; false stop;std::once_flag m_flag;void Start(int numthreads){m_running = true;m_curThreadSize = numthreads;for (int i = 0; i < numthreads; ++i){auto tha = std::make_shared<std::thread>(std::thread(&CachedThreadPool::RunInThread, this));std::thread::id tid = tha->get_id();m_threadgroup.emplace(tid, std::move(tha));m_idleThreadSize++;}}void RunInThread(){auto tid = std::this_thread::get_id();auto startTime = std::chrono::high_resolution_clock().now();while (m_running){Task task;if (m_queue.size() == 0 && m_queue.notTask()){auto now = std::chrono::high_resolution_clock().now();auto intervalTime = std::chrono::duration_cast<std::chrono::seconds>(now - startTime);std::lock_guard<std::mutex> lock(m_mutex);if (intervalTime.count() >= KeepAliveTime &&m_curThreadSize > m_coreThreadSize){m_threadgroup.find(tid)->second->detach();m_threadgroup.erase(tid);m_curThreadSize--;m_idleThreadSize--;cout << "空闲线程销毁" << m_curThreadSize << " " << m_coreThreadSize << endl;return;}}if (!m_queue.Take(task) && m_running){m_idleThreadSize--;task();m_idleThreadSize++;startTime = std::chrono::high_resolution_clock().now();}}}void StopThreadGroup(){m_queue.Stop();m_running = false;for (auto& thread : m_threadgroup){thread.second->join();}m_threadgroup.clear();}public:CachedThreadPool(int initNumThreads = 8, int taskPoolSize = MaxTaskCount):m_coreThreadSize(initNumThreads),m_maxThreadSize(2 * std::thread::hardware_concurrency() + 1),m_idleThreadSize(0),m_curThreadSize(0),m_queue(taskPoolSize),m_running(false){Start(m_coreThreadSize);}~CachedThreadPool(){StopThreadGroup();}template<class Func, class... Args>auto submit(Func&& func, Args&&... args) -> std::future<decltype(func(args...))>{auto task = std::make_shared<std::packaged_task<decltype(func(args...))()>>(std::bind(std::forward<Func>(func), std::forward<Args>(args)...));std::future<decltype(func(args...))> result = task->get_future();if (m_queue.Put([task]() { (*task)(); }) != 0){cout << "调用者运行策略" << endl;(*task)();}if (m_idleThreadSize <= 0 && m_curThreadSize < m_maxThreadSize){std::lock_guard<std::mutex> lock(m_mutex);auto tha = std::make_shared<std::thread>(std::thread(&CachedThreadPool::RunInThread, this));std::thread::id tid = tha->get_id();m_threadgroup.emplace(tid, std::move(tha));m_curThreadSize++;m_idleThreadSize++;}return result;}template<class Func, class... Args>void execute(Func&& func, Args&&... args){submit(std::forward<Func>(func), std::forward<Args>(args)...);}
};#endif
3.4 测试
///
void func(int index)
{static int num = 0;cout << "func_" << index << " num: " << ++num << endl;
}int add(int a, int b)
{return a + b;
}int main()
{CachedThreadPool mypool;for (int i = 0; i < 1000; ++i){if (i % 2 == 0){auto pa = mypool.submit(add, i, i + 1);cout << pa.get() << endl;}else{mypool.execute(func, i);}}CachedThreadPool pool(2);int add(int a, int b, int s){std::this_thread::sleep_for(std::chrono::seconds(s));int c = a + b;cout << "add begin ..." << endl;return c;}void add_a(){auto r = pool.submit(add, 10, 20, 4);cout << "add_a: " << r.get() << endl;}void add_b(){auto r = pool.submit(add, 20, 30, 6);cout << "add_b: " << r.get() << endl;}void add_c(){auto r = pool.submit(add, 30, 40, 1);cout << "add_c: " << r.get() << endl;}void add_d(){auto r = pool.submit(add, 10, 40, 9);cout << "add_d: " << r.get() << endl;}int main(){std::thread tha(add_a);std::thread thb(add_b);std::thread thc(add_c);std::thread thd(add_d);tha.join();thb.join();thc.join();thd.join();std::this_thread::sleep_for(std::chrono::seconds(20));std::thread the(add_a);std::thread thf(add_b);the.join();thf.join();return 0;}
}
3.5 FixedThreadPool 与 CachedThreadPool 特性对比
特性 | FixedThreadPool | CachedThreadPool |
---|---|---|
重用 | 能 reuse 就用,但不能随时建新的线程 | 先查看池中有无以前建立的线程,有就 reuse;没有就建新线程加入池中 |
池大小 | 可指定 nThreads,固定数量 | 可增长,最大值 Integer.MAX_VALUE |
队列大小 | 无限制 | 无限制 |
超时 | 无 IDLE | 默认 60 秒 IDLE |
使用场景 | 针对稳定固定的正规并发线程,用于服务器,执行负载重、CPU 使用率高的任务,防止线程频繁切换得不偿失 | 处理大量短生命周期异步任务,执行大量并发短期异步任务,任务负载要轻 |
结束 | 不会自动销毁 | 放入的线程超过 TIMEOUT 不活动会自动被终止 |
3.6 最佳实践
FixedThreadPool 和 CachedThreadPool 对高负载应用都不特别友好,CachedThreadPool 更危险。若应用要求高负载、低延迟,最好不选这两种,推荐使用 ThreadPoolExecutor ,可进行细粒度控制:
- 将任务队列设置成有边界的队列
- 使用合适的 RejectionHandler 拒绝处理程序
- 若任务完成前后需执行操作,可重载
beforeExecute(Thread, Runnable)
、afterExecute(Runnable, Throwable)
- 重载 ThreadFactory ,若有线程定制化需求
- 运行时动态控制线程池大小(Dynamic Thread Pool)
3.7 使用场景
适用于以下场景:
- 大量短期任务:适合处理大量短期任务,任务到来时尽可能创建新线程执行,有空闲线程则复用,避免频繁创建销毁线程的额外开销。
- 任务响应快速:可根据任务到来快速创建启动新线程执行,减少任务等待时间。
- 不需要限制线程数量:最大线程数不限,只要内存足够,可根据任务动态创建新线程。
- 短期性任务的高并发性:可动态创建线程,适合处理需高并发性的短期任务,任务处理完后保持一定空闲线程用于下一批任务。
需注意,CachedThreadPool 线程数量不受限,任务过多可能导致线程数量过多、系统资源过度消耗,使用时需灵活调整线程数量或用其他线程池控制资源。