自定义线程池 4.0

自定义线程池 4.0

1. 简介

上次我们实现了自定义线程池的 3.1 版本,提供了线程工厂创建线程和工具类创建简单线程池的功能,增强了线程池的灵活性,并且用起来更加方便了,本文我们将做如下的优化:

  • 给线程池添加关闭的方法。

这个功能看起来容易,实际做起来却很难

2. 总体设计

首先,我们需要思考什么叫关闭线程池,是执行完堆积的任务再让线程停止?还是让线程立即停止?实际上,这两种关闭线程池都可能用的到。先给这两个关闭的方法起名字吧,然后再说明使用场景:

  • shutdownWhenNoTask():执行完堆积的任务再让线程停止。使用的场景在于任务比较重要,必须得执行,但自己不想执行,想委托线程池执行。
  • shutdownNow():让线程立即停止,返回任务队列中的所有任务。使用的场景在于任务不重要,想要立即停止线程池,或者自己来执行没有执行的任务,不依赖线程池执行。

其次,当线程池关闭后,是无法再提交任务的,所以在线程池关闭后,再调用 submit() 方法直接拒绝。

接着,如何知道线程池处于什么状态呢?这有些困难,我先讲讲我的设计:给线程池添加一个状态变量 state,然后定义 2 个常量——RUNNING, SHUTDOWN,分别表示 运行状态关闭状态state 初始化为运行状态,在调用关闭方法时切换成关闭状态。但是,关闭方法可能会被多个线程调用,所以需要 state 的类型设计成原子类,这样这个状态变量就是线程安全的。

最后,如何让线程停止呢?这个问题思考起来就很复杂了,让我们进入标题 3 吧!

3. “让线程停止”的设计方案

3.1 Thread.interrupt()

Thread 有一个成员方法 interrupt(),这个方法并不是让线程直接停止,而是给线程打上停止的 标记,之后可以通过调用另一个成员方法 isInterrupted() 查看线程是否被打上停止标记。此外,调用这个方法会唤醒阻塞的线程,然后抛出 InterruptedException,这就是很多阻塞方法都需要处理 InterruptedException 异常的原因。

如果使用这个方法来向所有线程传递中断信号,那么可能会出现如下的情况:

线程 A 在执行任务时阻塞,线程 B 调用方法停止线程池,从而调用线程 A 的 interrupt() 方法给线程 A 打上停止标记,并唤醒线程 A。目前看来还正常,但如果线程 A 做了如下的异常处理,通过 Thread.interrupted() 清理了中断标记,那这个中断标记就无法被我们识别,从而无法使用中断标记使线程退出

try {...
} catch (InterruptedException e) {Thread.interrupted();...
}

3.2 POISON_PILL

POISON_PILL(毒丸) 是一种常见的设计模式,用于 安全地终止并发系统或进程间通信,通常通过 通过发送一个特殊的“毒丸”消息来通知接收者停止处理,从而优雅地关闭系统 的方式实现,它的核心思想如下:

  • 信号机制:发送一个特殊的消息(毒丸)作为终止信号。
  • 安全终止:接收者收到毒丸后,完成当前任务后停止处理,避免数据丢失或资源泄漏。
  • 解耦控制:发送者和接收者无需直接通信,通过消息传递实现异步终止。

我们完全可以使用毒丸来终止线程池中的所有线程,当调用停止方法时,向任务队列中投放一个毒丸,只要线程拿到毒丸,就直接退出吗?并不行,如果这时一个线程直接退出了,那其他线程怎么办?难道要在停止方法中投放与当前线程数量相同数量的毒丸吗?这样也不太好,我们可以把线程的退出想象成一个“流”,在这个“流”上,前面的事情处理完之后,需要通知后面的事情开始处理,同时也可以在没有后续事情时选择不通知,也就是说,我们可以在线程退出前向任务队列中投放一个毒丸,这样线程就前赴后继地拿到毒丸、退出了。

于是我们可以写出一个投放毒丸的方法(这个方法下面需要优化,目前只是一个简单的实现):

public void offerPoisonPill() {synchronized (threadPoolMonitor) {taskQueue.offer(POISON_PILL); // POISON_PILL 是一个 Runnable 类型的常量}
}

4. 两个关闭方法的实现

4.1 shutdownWhenNoTask()

这个方法要做的事情很简单,只需要将线程池的状态切换成关闭状态,然后再向队列中投放一个毒丸即可。

4.2 shutdownNow()

这个方法要做的事情稍微有点复杂,除了切换线程池状态和投放毒丸之外,还需要做两件事:

  • 给所有线程发送中断信号,尝试让它们中断。
  • 将队列中的所有任务放到一个集合中并返回。

5. 投放毒丸方法的优化

标题 3.2 中实现的投放毒丸是有问题的,假设任务队列已满,则会投放失败,所以我们需要得到 offer() 方法返回 true 的结果,从而保证毒丸真正放到队列中。

5.1 v1 多次投放

有一种十分容易想到的做法:

private void doOfferPoisonPill() {// 不断投放毒丸,直到成功为止while (!taskQueue.offer(POISON_PILL)) {}
}

但是,这种做法很耗性能,只要投放不成功,就一直重试,连休息的时间都没有(如果让线程休眠一段时间,会稍微好一点,但也属于线程在 忙等待)。

5.2 v2 阻塞投放

于是,想到了更高级的做法,只要投放不成功,就等待队列有空余位置,于是诞生了如下的代码:

/*** 任务队列的锁,用于生成一个 {@link Condition} 对象*/
private final Lock taskQueueLock = new ReentrantLock();/*** 任务队列已满的条件对象*/
private final Condition taskQueueNotFull = taskQueueLock.newCondition();private void doOfferPoisonPill() {// 不断投放毒丸,直到成功为止while (!taskQueue.offer(POISON_PILL)) {taskQueueLock.lock();try {taskQueueNotFull.await();} catch (InterruptedException ignore) {} finally {taskQueueLock.unlock();}}
}

更重要的是,我们不能再让其他类随便调用任务队列取出元素的方法了,因为我们需要 在取出元素时唤醒在这里阻塞等待队列空余位置的线程,所以我们需要将其取出元素的方法包装起来,如下所示:

/*** 使用 {@link BlockingQueue#take()} 方法从队列中取出任务** @return 任务队列中的任务*/
private Runnable takeTaskFromQueue() throws InterruptedException {taskQueueLock.lock();try {Runnable task = taskQueue.take();taskQueueNotFull.signalAll();return task;} finally {taskQueueLock.unlock();}
}/*** 使用 {@link BlockingQueue#take()} 方法从队列中取出任务** @param nanos 等待的时间,单位:ns* @return 任务队列中的任务,如果等待超时,则返回 {@code null}*/
private Runnable pollTaskFromQueue(long nanos) throws InterruptedException {taskQueueLock.lock();try {Runnable task = taskQueue.poll(nanos, TimeUnit.NANOSECONDS);if (task != null) {taskQueueNotFull.signalAll();}return task;} finally {taskQueueLock.unlock();}
}

6. 实现 4.0 版本

听明白上面这些设计后,我们终于能实现 4.0 版本了:

6.1 Worker

public abstract class Worker implements Runnable {/*** 线程执行的初始任务*/private Runnable initialTask;/*** 对 <strong>真正运行的线程</strong> 的引用,用于调用其 {@link Thread#start()} 方法启动线程*/private final Thread actuallyRunningThread;/*** {@link Worker} 存在的线程池*/protected final ThreadPool4_0 threadPool;public Worker(Runnable initialTask, Set<Worker> workerPool, ThreadFactory threadFactory, ThreadPool4_0 threadPool) {this.initialTask = initialTask;this.actuallyRunningThread = threadFactory.newThread(this);workerPool.add(this);this.threadPool = threadPool;}@Overridepublic final void run() {initialTask.run();initialTask = null; // help GCtry {while (true) {Runnable t = getTask();if (t == null) {// 检查是否获取到任务了,如果没有则退出循环,停止运行break;} else if (t == ThreadPool4_0.POISON_PILL) {// 如果任务是毒丸,则先往队列中再放一个毒丸,然后退出循环threadPool.offerPoisonPillIfThreadRest();// 以下这句话只是为了测试,在正式环境中最好注释掉LogUtil.infoWithTimeAndThreadName("发现毒丸,退出循环");break;}t.run();}} finally {onWorkerExit();}}/*** 启动内部保存的线程*/public final void start() {actuallyRunningThread.start();}/*** 给正在运行中的线程打上中断标记*/public final void interrupt() {actuallyRunningThread.interrupt();}/*** 获取任务,当返回 @{code null} 时,这个 {@link Worker} 对象就退出循环* <p>* 使用模板方法模式,交给子类实现** @return 获取到的任务*/protected abstract Runnable getTask();/*** 当 {@link Worker} 准备退出时执行的回调函数* <p>* 用于将 {@link Worker} 对象从线程池中移除*/private void onWorkerExit() {threadPool.removeWorkerFromThreadPool(this);}
}

6.2 ThreadPool4_0

public class ThreadPool4_0 {/*** 线程池中核心线程的最大数量*/private final int corePoolSize;/*** 线程池中线程的最大数量*/private final int maxPoolSize;/*** 临时线程阻塞的最长时间(单位:ns),超过这个时间还没有领取到任务就直接退出*/private final long keepAliveTime;/*** 任务队列*/private final BlockingQueue<Runnable> taskQueue;/*** 拒绝策略,用于在无法执行任务的时候拒绝任务*/private final RejectPolicy rejectPolicy;/*** 线程工厂*/private final ThreadFactory threadFactory;/*** 默认的线程工厂*/private static final ThreadFactory DEFAULT_THREAD_FACTORY = new ThreadFactory() {/*** 计数器,用来记录当前创建的是第几个线程,从 0 开始*/private int counter = 0;@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "thread-pool-" + counter++);}};/*** 构造一个线程池,默认参数如下:* <ul>*     <li>拒绝策略默认为抛出异常的拒绝策略</li>*     <li>线程工厂默认为线程添加了简单的名字 thread-pool-?</li>* </ul>** @param corePoolSize 线程池中核心线程的最大数量* @param maxPoolSize 线程池中线程的最大数量* @param keepAliveTime 临时线程阻塞的最长时间* @param unit 时间的单位* @param taskQueue 任务队列*/public ThreadPool4_0(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> taskQueue) {this(corePoolSize, maxPoolSize, keepAliveTime, unit, taskQueue, RejectPolicy.THROW_EXCEPTION);}/*** 构造一个线程池,默认参数如下:* <ul>*     <li>拒绝策略默认为抛出异常的拒绝策略</li>* </ul>** @param corePoolSize 线程池中核心线程的最大数量* @param maxPoolSize 线程池中线程的最大数量* @param keepAliveTime 临时线程阻塞的最长时间* @param unit 时间的单位* @param taskQueue 任务队列* @param threadFactory 线程工厂*/public ThreadPool4_0(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> taskQueue, ThreadFactory threadFactory) {this(corePoolSize, maxPoolSize, keepAliveTime, unit, taskQueue, RejectPolicy.THROW_EXCEPTION, threadFactory);}/*** 构造一个线程池,默认参数如下:* <ul>*     <li>线程工厂默认为线程添加了简单的名字 thread-pool-?</li>* </ul>** @param corePoolSize 线程池中核心线程的最大数量* @param maxPoolSize 线程池中线程的最大数量* @param keepAliveTime 临时线程阻塞的最长时间* @param unit 时间的单位* @param taskQueue 任务队列* @param rejectPolicy 拒绝策略*/public ThreadPool4_0(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) {this(corePoolSize, maxPoolSize, keepAliveTime, unit, taskQueue, rejectPolicy, DEFAULT_THREAD_FACTORY);}/*** 构造一个线程池** @param corePoolSize 线程池中核心线程的最大数量* @param maxPoolSize 线程池中线程的最大数量* @param keepAliveTime 临时线程阻塞的最长时间* @param unit 时间的单位* @param taskQueue 任务队列* @param rejectPolicy 拒绝策略* @param threadFactory 线程工厂*/public ThreadPool4_0(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy, ThreadFactory threadFactory) {this.corePoolSize = corePoolSize;this.maxPoolSize = maxPoolSize;this.keepAliveTime = unit.toNanos(keepAliveTime);this.taskQueue = taskQueue;this.rejectPolicy = rejectPolicy;this.threadFactory = threadFactory;}/*** 存放线程的集合,使用 {@link Set} 是因为 {@link Set#remove(Object)} 性能更高*/private final Set<Worker> threadPool = new HashSet<>();/*** 线程池的管程* <p>* 用于保证 <strong>将线程放入线程池</strong>、<strong>从线程池中移除线程</strong> 的互斥性* 同时也在保证 {@link #currPoolSize} 相关操作的互斥性*/private final Object threadPoolMonitor = new Object();/*** 线程池中当前线程数量,这个值 <= threadPool.size()* 在创建新线程时增加,在放毒丸时减少,threadPool.size() 减少的时机晚于 currPoolSize*/private int currPoolSize = 0;/*** <h3>核心线程执行的任务</h3>* {@link #getTask()} 方法会一直阻塞,直到有新任务*/public final class CoreWorker extends Worker {public CoreWorker(Runnable initialTask, Set<Worker> workerPool, ThreadFactory threadFactory,ThreadPool4_0 threadPool) {super(initialTask, workerPool, threadFactory, threadPool);}@Overrideprotected Runnable getTask() {try {return takeTaskFromQueue();} catch (InterruptedException e) {throw new RuntimeException(e);}}}/*** <h3>临时线程执行的任务</h3>* {@link #getTask()} 方法会在阻塞一定时间后如果还没有任务,则会返回 {@code null}*/public final class TempWorker extends Worker {public TempWorker(Runnable initialTask, Set<Worker> workerPool, ThreadFactory threadFactory,ThreadPool4_0 threadPool) {super(initialTask, workerPool, threadFactory, threadPool);}@Overrideprotected Runnable getTask() {try {return pollTaskFromQueue(keepAliveTime);} catch (InterruptedException e) {throw new RuntimeException(e);}}}/*** 线程池的状态,状态共有 2 种:* <ul>*     <li>{@link #RUNNING} 运行状态</li>*     <li>{@link #SHUTDOWN} 关闭状态,调用了线程池的关闭方法</li>* </ul>*/private final AtomicInteger state = new AtomicInteger(RUNNING);private static final int RUNNING = 1;private static final int SHUTDOWN = 2;/*** 提交任务** @param task 待执行的任务*/public void submit(Runnable task) {// 如果线程池的状态不是 RUNNING 状态,则直接拒绝任务if (state.get() != RUNNING) {rejectPolicy.reject(this, task);return;}// 如果 线程数量 小于 最大核心线程数量,则新建一个 核心线程 执行任务,然后直接返回synchronized (threadPoolMonitor) {if (currPoolSize < corePoolSize) {CoreWorker coreWorker = new CoreWorker(task, threadPool, threadFactory, this);coreWorker.start();currPoolSize++;return;}}// 如果能够放到任务队列中,则直接返回if (taskQueue.offer(task)) {return;}// 如果 线程数量 小于 最大线程数量,则新建一个 临时线程 执行任务synchronized (threadPoolMonitor) {if (currPoolSize < maxPoolSize) {TempWorker tempWorker = new TempWorker(task, threadPool, threadFactory, this);tempWorker.start();currPoolSize++;return;}}// 线程数量到达最大线程数量,任务队列已满,执行拒绝策略rejectPolicy.reject(this, task);}/*** 在没有任务执行时停止所有线程* 当此方法被调用,线程池会停止接受提交任务,然后等待线程将 它们正在执行的任务 和 任务队列中的任务 都执行完毕,之后让所有线程退出*/public void shutdownWhenNoTask() {// 将状态从 RUNNING 切换到 SHUTDOWNif (state.compareAndSet(RUNNING, SHUTDOWN)) {// 如果切换成功,则向任务队列中投放一个毒丸offerPoisonPill();}}/*** 立刻停止所有线程* 当此方法被调用,线程池会停止接受提交任务,给线程发送中断信号* <p>* 注意:<strong>如果在调用此方法之前调用了 {@link #shutdownWhenNoTask()} 方法,不会立刻停止所有线程</strong>** @return 任务队列中的任务*/public List<Runnable> shutdownNow() {// 将状态从 RUNNING 切换到 SHUTDOWN,如果修改失败,则表示线程池已经调用过关闭相关的方法了,直接返回一个空集合即可if (!state.compareAndSet(RUNNING, SHUTDOWN)) {return new ArrayList<>();}// 给所有线程发送中断信号synchronized (threadPoolMonitor) {threadPool.forEach(Worker::interrupt);}// 将任务队列中的任务放到一个集合中List<Runnable> taskList = new ArrayList<>(taskQueue.size());taskQueue.drainTo(taskList);// 向任务队列中投放一个毒丸offerPoisonPill();// 返回任务集合return taskList;}/*** 获取当前线程池中的线程数量** @return 当前线程池中的线程数量*/public int getCurrPoolSize() {synchronized (threadPoolMonitor) {return currPoolSize;}}/*** 获取当前任务队列中的任务数** @return 当前任务队列中的任务数*/public int getCurrTaskNum() {return taskQueue.size();}/*** 丢弃任务队列 {@link #taskQueue} 中的最旧的任务(队头任务)** @return 任务队列中的最旧的任务(队头任务)*/public Runnable discardOldestTask() {return taskQueue.poll();}/*** 使用 {@link BlockingQueue#take()} 方法从队列中取出任务** @return 任务队列中的任务*/private Runnable takeTaskFromQueue() throws InterruptedException {taskQueueLock.lock();try {Runnable task = taskQueue.take();taskQueueNotFull.signalAll();return task;} finally {taskQueueLock.unlock();}}/*** 使用 {@link BlockingQueue#take()} 方法从队列中取出任务** @param nanos 等待的时间,单位:ns* @return 任务队列中的任务,如果等待超时,则返回 {@code null}*/private Runnable pollTaskFromQueue(long nanos) throws InterruptedException {taskQueueLock.lock();try {Runnable task = taskQueue.poll(nanos, TimeUnit.NANOSECONDS);if (task != null) {taskQueueNotFull.signalAll();}return task;} finally {taskQueueLock.unlock();}}/*** 毒丸,用于让线程池中的所有线程退出* 投放到任务队列中,只要线程获取到这个任务,就退出,并且在 线程池中还有线程 的情况下将毒丸重新放回任务队列*/public static final Runnable POISON_PILL = () -> {};/*** 任务队列的锁,用于生成一个 {@link Condition} 对象*/private final Lock taskQueueLock = new ReentrantLock();/*** 任务队列已满的条件对象*/private final Condition taskQueueNotFull = taskQueueLock.newCondition();// 实际上投放毒丸的操作private void doOfferPoisonPill() {// 不断投放毒丸,直到成功为止while (!taskQueue.offer(POISON_PILL)) {taskQueueLock.lock();try {taskQueueNotFull.await();} catch (InterruptedException ignore) {} finally {taskQueueLock.unlock();}}currPoolSize--;}/*** 向任务队列中投放一个毒丸,等待线程领取后退出*/public void offerPoisonPill() {synchronized (threadPoolMonitor) {doOfferPoisonPill();}}/*** 在线程池中还有其他线程的情况下,向任务队列中投放一个毒丸,等待线程领取后退出,用于*/public void offerPoisonPillIfThreadRest() {synchronized (threadPoolMonitor) {if (currPoolSize > 0) {doOfferPoisonPill();}}}/*** 从 {@link #threadPool} 中移除指定的 {@link Worker} 对象** @param worker 待移除的 {@link Worker} 对象*/public void removeWorkerFromThreadPool(Worker worker) {synchronized (threadPoolMonitor) {threadPool.remove(worker);}}
}

7. 测试程序

public class ThreadPool4_0Test {/*** 测试线程池 4.0 版本的基本功能*/@Testpublic void test() throws InterruptedException {final int taskSize = 3;CountDownLatch latch = new CountDownLatch(taskSize);ThreadPool4_0 threadPool = new ThreadPool4_0(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(3));LogUtil.infoWithTimeAndThreadName("提交任务前");for (int i = 0; i < taskSize; i++) {int finalI = i;threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}LogUtil.infoWithTimeAndThreadName("正在执行任务" + finalI);latch.countDown();});}LogUtil.infoWithTimeAndThreadName("提交任务后");// 等待测试结束latch.await();LogUtil.infoWithTimeAndThreadName("任务执行完毕");}/*** 测试线程池 4.0 版本 {@link ThreadPool4_0#shutdownWhenNoTask()} 的功能*/@Testpublic void testShutdownWhenNoTask() throws InterruptedException {final int taskSize = 3;CountDownLatch latch = new CountDownLatch(taskSize);ThreadPool4_0 threadPool = new ThreadPool4_0(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));LogUtil.infoWithTimeAndThreadName("提交任务前");for (int i = 0; i < taskSize; i++) {int finalI = i;threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}LogUtil.infoWithTimeAndThreadName("正在执行任务" + finalI);latch.countDown();});}LogUtil.infoWithTimeAndThreadName("提交任务后");LogUtil.infoWithTimeAndThreadName("调用停止方法前,线程数量是" + threadPool.getCurrPoolSize());LogUtil.infoWithTimeAndThreadName("调用停止方法前,任务数量是" + threadPool.getCurrTaskNum());threadPool.shutdownWhenNoTask();LogUtil.infoWithTimeAndThreadName("调用停止方法后,线程数量是" + threadPool.getCurrPoolSize());LogUtil.infoWithTimeAndThreadName("调用停止方法后,任务数量是" + threadPool.getCurrTaskNum());// 等待任务执行完毕latch.await();LogUtil.infoWithTimeAndThreadName("任务执行完毕");LogUtil.infoWithTimeAndThreadName("执行完任务后,线程数量是" + threadPool.getCurrPoolSize());LogUtil.infoWithTimeAndThreadName("执行完任务后,任务数量是" + threadPool.getCurrTaskNum());}/*** 测试线程池 4.0 版本 {@link ThreadPool4_0#shutdownNow()} 的功能*/@Testpublic void testShutdownNow() throws InterruptedException {final int taskSize = 3;ThreadPool4_0 threadPool = new ThreadPool4_0(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));LogUtil.infoWithTimeAndThreadName("提交任务前");for (int i = 0; i < taskSize; i++) {int finalI = i;threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {LogUtil.infoWithTimeAndThreadName("收到中断信号,停止任务执行");}LogUtil.infoWithTimeAndThreadName("正在执行任务" + finalI);});}LogUtil.infoWithTimeAndThreadName("提交任务后");LogUtil.infoWithTimeAndThreadName("调用停止方法前,线程数量是" + threadPool.getCurrPoolSize());LogUtil.infoWithTimeAndThreadName("调用停止方法前,任务数量是" + threadPool.getCurrTaskNum());List<Runnable> taskList = threadPool.shutdownNow();LogUtil.infoWithTimeAndThreadName("有" + taskList.size() + "个任务没有执行");LogUtil.infoWithTimeAndThreadName("调用停止方法后,线程数量是" + threadPool.getCurrPoolSize());LogUtil.infoWithTimeAndThreadName("调用停止方法后,任务数量是" + threadPool.getCurrTaskNum());// 等待任务执行完毕Thread.sleep(taskSize * 1000 + 100);LogUtil.infoWithTimeAndThreadName("任务执行完毕");LogUtil.infoWithTimeAndThreadName("执行完任务后,线程数量是" + threadPool.getCurrPoolSize());LogUtil.infoWithTimeAndThreadName("执行完任务后,任务数量是" + threadPool.getCurrTaskNum());}/*** 测试线程池 4.0 版本 调用停止方法后直接拒绝任务 的功能*/@Testpublic void testShutdownRejectTask() {Assertions.assertThrows(RuntimeException.class, () -> {final int taskSize = 3;ThreadPool4_0 threadPool = new ThreadPool4_0(1, 2, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));LogUtil.infoWithTimeAndThreadName("提交任务前");for (int i = 0; i < taskSize; i++) {int finalI = i;threadPool.submit(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {LogUtil.infoWithTimeAndThreadName("收到中断信号,停止任务执行");}LogUtil.infoWithTimeAndThreadName("正在执行任务" + finalI);});}LogUtil.infoWithTimeAndThreadName("提交任务后");threadPool.shutdownWhenNoTask();threadPool.submit(() -> {});});}
}

8. 测试结果

8.1 test

21:21:24 [    main] 提交任务前
21:21:24 [    main] 提交任务后
21:21:25 [thread-pool-0] 正在执行任务0
21:21:26 [thread-pool-0] 正在执行任务1
21:21:27 [thread-pool-0] 正在执行任务2
21:21:27 [    main] 任务执行完毕

8.2 testShutdownWhenNoTask

在调用关闭方法后,队列中还剩一个任务,等待其执行完毕后,所有线程退出。

21:22:04 [    main] 提交任务前
21:22:04 [    main] 提交任务后
21:22:04 [    main] 调用停止方法前,线程数量是2
21:22:04 [    main] 调用停止方法前,任务数量是1
21:22:05 [thread-pool-0] 正在执行任务0
21:22:05 [thread-pool-1] 正在执行任务2
21:22:05 [thread-pool-1] 发现毒丸,退出循环
21:22:05 [    main] 调用停止方法后,线程数量是1
21:22:05 [    main] 调用停止方法后,任务数量是1
21:22:06 [thread-pool-0] 正在执行任务1
21:22:06 [thread-pool-0] 发现毒丸,退出循环
21:22:06 [    main] 任务执行完毕
21:22:06 [    main] 执行完任务后,线程数量是0
21:22:06 [    main] 执行完任务后,任务数量是0

8.3 testShutdownNow

在调用关闭方法后,队列中的一个任务被取出来,随后两个线程相继退出。

21:22:22 [    main] 提交任务前
21:22:22 [    main] 提交任务后
21:22:22 [    main] 调用停止方法前,线程数量是2
21:22:22 [    main] 调用停止方法前,任务数量是1
21:22:22 [thread-pool-1] 收到中断信号,停止任务执行
21:22:22 [thread-pool-0] 收到中断信号,停止任务执行
21:22:22 [thread-pool-1] 正在执行任务2
21:22:22 [thread-pool-0] 正在执行任务0
21:22:22 [thread-pool-1] 发现毒丸,退出循环
21:22:22 [    main] 有1个任务没有执行
21:22:22 [thread-pool-0] 发现毒丸,退出循环
21:22:22 [    main] 调用停止方法后,线程数量是0
21:22:22 [    main] 调用停止方法后,任务数量是0
21:22:25 [    main] 任务执行完毕
21:22:25 [    main] 执行完任务后,线程数量是0
21:22:25 [    main] 执行完任务后,任务数量是0

8.4 testShutdownRejectTask

这个测试只要通过就说明抛出异常了,即拒绝了提交的任务。

21:22:53 [    main] 提交任务前
21:22:53 [    main] 提交任务后
21:22:54 [thread-pool-0] 正在执行任务0
21:22:54 [thread-pool-1] 正在执行任务2
21:22:54 [thread-pool-1] 发现毒丸,退出循环

9. 思考

  • 我们在实现让线程池停止的方法时使用了毒丸的设计,你了解 ThreadPoolExecutor 使用的设计是什么吗?
  • 在测试时,我发现 testShutdownWhenNoTask 会有死锁的情况出现,你知道这是为什么吗?

10. 总结

这次我们实现了自定义线程池的 4.0 版本,了解了毒丸的思想,还在一定程度上顾及了多线程环境下的线程安全问题。以上,算是将线程池基本功能都实现了一遍,之后会专门写一篇文章用来回答之前没有讲的思考题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/pingmian/84736.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

list is not in GROUPBY clause and contains nonaggregated column ‘*.*‘

SELECT list is not in GROUP BY clause and contains nonaggregated column mydb.t.address which is not functionally dependent on columns in GROUP BY clause; this is incompatible with sql_modeonly_full_group_by 关于查询列不在分组字段内触发错误 之前我一直使用其…

Linux vmware image iso qcow2镜像大全

Download Linux VMware Images | Linux VMware Images

城市排水管网液位流量监测系统解决方案

一、方案背景 城市排水管网作为城市的“生命线”&#xff0c;其运行状况直接关系到城市的防洪排涝、水环境质量以及居民的生活质量。随着城市化进程的加速&#xff0c;城市排水管网规模不断扩大&#xff0c;结构日益复杂&#xff0c;传统的人工巡检和简单监测手段已难以满足对排…

算法学习笔记:3.广度优先搜索 (BFS)——二叉树的层序遍历

什么是广度优先搜索 (BFS)? 想象一下你在玩一个迷宫游戏&#xff0c;你需要找到从起点到终点的最短路径。广度优先搜索 (BFS) 就像是你在迷宫中逐层探索的过程&#xff1a; 先探索距离起点最近的所有位置然后探索距离起点第二近的所有位置以此类推&#xff0c;直到找到终点 …

并发编程-Synchronized

Mark Word 什么是Mark Word&#xff1f; Mark Word是Java对象头中的一个字段&#xff0c;它是一个32位或64位的字段&#xff08;取决于系统架构&#xff09;&#xff0c;用于存储对象的元数据信息。这些信息包括对象的哈希码、锁状态、年龄等。 Mark Word有什么用&#xff1f…

【51单片机】5. 矩阵键盘与矩阵键盘密码锁Demo

1. 矩阵键盘原理 通过矩阵连接的模式&#xff0c;原本需要16个引脚连接的按钮只需要8个引脚就能连接好&#xff0c;减少了I/O口的占用。 矩阵按钮是通过扫描来读取状态的。 2. 扫描的概念 输出扫描示例&#xff1a;数码管扫描 原理&#xff1a;显示第1位→显示第2位→显示第…

Android Studio jetpack compose折叠日历日期选择器【折叠日历】

今天写一个日期选择器&#xff0c;大家根据自己需求改代码&#xff0c;记得点赞支持&#xff0c;谢谢&#xff5e; 这是进入的默认状态 折叠状态选中本周其他日期状态 切换上下周状态 展开日历状态 切换上下月状态 选中状态 代码如下&#xff1a; import android.content.C…

驭码CodeRider 2.0全栈开发实战指南:从零构建现代化电商平台

驭码CodeRider 2.0全栈开发实战指南:从零构建现代化电商平台 一、CodeRider 2.0:重新定义全栈智能开发 1.1 革命性升级亮点 #mermaid-svg-AKjytNB4hD95UZtF {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-AKjyt…

大模型智能体AutoGen面试题及参考答案

目录 AutoGen 的核心是什么? Agent 在 AutoGen 中承担什么角色? AutoGen 是如何定义 AssistantAgent、UserProxyAgent 等代理类型的? 什么是 GroupChat(组对话)模式? AutoGen 的 system message 在框架中扮演什么作用? 如何通过 Agent 实现自然语言处理? AutoGen…

深度学习笔记26-天气预测(Tensorflow)

&#x1f368; 本文为&#x1f517;365天深度学习训练营中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 一、前期准备 1.数据导入 import numpy as np import pandas as pd import warnings import seaborn as sns import matplotlib.pyplot as plt warnings.filt…

day54 python对抗生成网络

目录 一、GAN对抗生成网络思想 二、实践过程 1. 数据准备 2. 构建生成器和判别器 3. 训练过程 4. 生成结果与可视化 三、学习总结 一、GAN对抗生成网络思想 GAN的核心思想非常有趣且富有对抗性。它由两部分组成&#xff1a;生成器&#xff08;Generator&#xff09;和判…

龙虎榜——20250613

上证指数放量下跌收阴线&#xff0c;个股下跌超4000只&#xff0c;受外围消息影响情绪总体较差。 深证指数放量下跌&#xff0c;收阴线&#xff0c;6月总体外围风险较高&#xff0c;转下跌走势的概率较大&#xff0c;注意风险。 2025年6月13日龙虎榜行业方向分析 1. 石油石化&…

Linux常用命令加强版替代品

Linux常用命令加强版替代品 还在日复一日地使用 ls、grep、cd 这些“上古”命令吗&#xff1f;是时候给你的终端来一次大升级了&#xff01;本文将为你介绍一系列强大、高效且设计现代的Linux命令行工具&#xff0c;它们将彻底改变你的工作流&#xff0c;让你爱上在终端里操作…

Hadoop 003 — JAVA操作MapReduce入门案例

MapReduce入门案例-分词统计 文章目录 MapReduce入门案例-分词统计1.xml依赖2.编写MapReduce处理逻辑3.上传统计文件到HDFS3.配置MapReduce作业并测试4.执行结果 1.xml依赖 <dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-commo…

Python打卡第53天

浙大疏锦行 作业&#xff1a; 对于心脏病数据集&#xff0c;对于病人这个不平衡的样本用GAN来学习并生成病人样本&#xff0c;观察不用GAN和用GAN的F1分数差异。 import pandas as pd import numpy as np import torch import torch.nn as nn import torch.optim as optim from…

力扣-279.完全平方数

题目描述 给你一个整数 n &#xff0c;返回 和为 n 的完全平方数的最少数量 。 完全平方数 是一个整数&#xff0c;其值等于另一个整数的平方&#xff1b;换句话说&#xff0c;其值等于一个整数自乘的积。例如&#xff0c;1、4、9 和 16 都是完全平方数&#xff0c;而 3 和 1…

前端构建工具Webapck、Vite——>前沿字节开源Rspack详解——2023D2大会

Rspack 以下是针对主流构建工具&#xff08;Webpack、Vite、Rollup、esbuild&#xff09;的核心不足分析&#xff0c;以及 Rspack 如何基于这些痛点进行针对性改进 的深度解析&#xff1a; 一、主流构建工具的不足 1. Webpack&#xff1a;性能与生态的失衡 核心问题 冷启动慢…

输入法,开头输入这U I V 三个字母会不显示 任何中文

1. 汉语拼音规则的限制 汉语拼音中不存在以“V”“U”“I”为声母的情况&#xff1a; 汉语拼音的声母是辅音&#xff0c;而“V”“U”“I”在汉语拼音中都是元音&#xff08;或韵母的一部分&#xff09;。汉语拼音的声母系统中没有“V”“U”“I”作为声母的音节。例如&#xf…

Linux文件权限详解:从入门到精通

前言 权限是什么&#xff1f; 本质&#xff1a;无非就是能做和不能做什么。 为什么要有权限呢&#xff1f; 目的&#xff1a;为了控制用户行为&#xff0c;防止发生错误。 1.权限的理解 在学习下面知识之前要先知道的一点是&#xff1a;linux下一切皆文件&#xff0c;对li…

在多云环境透析连接ngx_stream_proxy_protocol_vendor_module

1、模块定位与价值 多云接入&#xff1a;在同一 Nginx 实例前端接入来自多云平台的私有链路时&#xff0c;能区分 AWS、GCP、Azure 特有的连接 ID。安全审计&#xff1a;自动记录云平台侧的 Endpoint/VPC ID&#xff0c;有助于联调和安全事件追踪。路由分流&#xff1a;基于不…