传送门
分布式定时任务系列1:XXL-job安装
分布式定时任务系列2:XXL-job使用
分布式定时任务系列3:任务执行引擎设计
分布式定时任务系列4:任务执行引擎设计续
分布式定时任务系列5:XXL-job中blockingQueue的应用
分布式定时任务系列6:XXL-job触发日志过大引发的CPU告警
分布式定时任务系列7:XXL-job源码分析之任务触发
分布式定时任务系列8:XXL-job源码分析之远程调用
分布式定时任务系列9:XXL-job路由策略
分布式定时任务系列10:XXL-job源码分析之路由策略
番外篇:从XXL-job路由策略的“服务容错“说起
分布式定时任务系列12:XXL-job的任务触发为什么是死循环?
Java并发编程实战1:java中的阻塞队列
定时任务都是通过"死循环"触发?
在前面一共分析过XXL-job的定时任务触发原理、JDK-Timer执行定时任务的原理:
- 底层源码都是通过while(true)这种死循环写法来遍历任务的
- 一个是分布式调度框架,更为重量级、一个是JDK提供的偏轻量级的调度工具
尽管2者定位不同,但是都选择了死循环这种方式来实现任务调度,说明while(true)一定是可行的、通用的设计方式。但是要问任务调度都是采用这种方式,那答案肯定是否定的。一来只分析了XXL-job与Timer的源码导致样本太少,二来接下来要分析的另一个任务调度工具,也的确不是通过死循环这种方式:它就是JDK提供的ScheduledExecutorService!
ScheduledExecutorService源码解析
ScheduledExecutorService是什么
ScheduledExecutorService 是Java并发框架中用于定时任务调度的核心接口,基于线程池实现,支持延迟任务执行和周期性任务调度,相比传统 Timer 类更高效、更灵活。
核心功能
提供
schedule
、scheduleAtFixedRate
、scheduleWithFixedDelay
等方法,可安排任务在指定延迟后执行一次,或按固定频率周期执行。支持任务并发执行,适用于需要定时轮询、消息推送等场景。 关键特性
- 线程池管理:默认使用 ScheduledThreadPoolExecutor ,可设置核心线程数(如
Executors.newScheduledThreadPool(1)
),支持多任务并发执行 - 任务调度策略:
schedule
:延迟后执行一次任务scheduleAtFixedRate
:初始延迟后按固定频率重复执行(如每5秒执行一次)scheduleWithFixedDelay
:首次执行后等待固定延迟再执行(如首次执行后每10秒执行一次) - 取消任务:可通过返回的
Future
对象取消未执行的任务 适用场景
- 定时轮询数据库或第三方接口(如每5分钟检查数据更新)
- 定时发送消息或推送通知(如每日定时邮件发送)
- 周期性任务调度(如每2小时重启服务)
ScheduledExecutorService的使用
ScheduledExecutorService类自jdk1.5才引入,作者是大名鼎鼎的Doug Lea(Java并发包juc的作者)。既然它开始工作的晚,自然离"退休"还早,现在一般推荐使用!
说到Doug Lea自然要推荐他与Joshua Bloch等合著的《Java并发编程实战》,见推荐书单。
看一下源码中给出的一个例子:
import static java.util.concurrent.TimeUnit .*;class BeeperControl {// 创建执行器实例,初始化1个执行线程private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);public void beepForAnHour() {final Runnable beeper = new Runnable() {public void run() {System.out.println("beep");}};// 固定频率周期性执行任务:每10s执行一次任务,发出"哗"声!final ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 10, 10, SECONDS);// 延迟 delay 后执行一次任务:一次性任务,3600s(1小时)后"取消"任务
scheduler.schedule(new Runnable() {public void run() {beeperHandle.cancel(true);}}, 60 * 60, SECONDS);}
}
可以写一个测试跑一下上面的例子,然后再用Timer改写上面的例子,分别执行会发现效果是一致的:
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;/*** @author * @date 2025/7/16*/
public class ScheduleTest {private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);public static void main(String[] args) {ScheduledExecutorServiceBeepForAnHour();timerBeepForAnHour();}/*** 使用ScheduledExecutorService实现定时任务*/public static void ScheduledExecutorServiceBeepForAnHour() {final Runnable beeper = new Runnable() {public void run() {System.out.println(new Date() + "ScheduledExecutorService for beep");}};final ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 10, 10, TimeUnit.SECONDS);scheduler.schedule(new Runnable() {public void run() {beeperHandle.cancel(true);}}, 60 * 60, TimeUnit.SECONDS);}/*** 使用Timer实现实时任务*/public static void timerBeepForAnHour() {// 创建Timer执行器实例Timer timer = new Timer();// 执行任务:,发出"哗"声!TimerTask beeper = new TimerTask() {@Overridepublic void run() {System.out.println(new Date() + "Timer for beep");}};// 固定频率周期性执行任务:每10s执行一次任务,发出"哗"声!timer.scheduleAtFixedRate(beeper, 10, 10 * 1000);// 延迟 delay 后执行一次任务:一次性任务,3600s(1小时)后"取消"任务timer.schedule(new TimerTask() {@Overridepublic void run() {beeper.cancel();}}, 60 * 60 * 1000);}
}
上面例子要注意的是,timer的API里面的时间是毫秒,而ScheduledExecutorService是可以通过TimeUnit指定时间单位的。
仅通过这个例子就简单的认为功能两者一样,那肯定缺乏说服力的。并且如果两个调度器一样,也有重复造轮子的嫌疑,所以再接着改写一下上面的例子:
- 增加一个执行任务,执行器里面维持2个任务
- 在任务里面将执行线程信息打印出来
- 但同时将ScheduledExecutorService的执行线程数从1调整为2
public class ScheduleTest {// 创建ScheduledExecutorService 执行器private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);public static void main(String[] args) {scheduledExecutorServiceBeepForAnHour();timerBeepForAnHour();}/*** 使用ScheduledExecutorService实现定时任务*/public static void scheduledExecutorServiceBeepForAnHour() {// 任务1final Runnable beeper = new Runnable() {public void run() {System.out.println(Thread.currentThread().getName() + "," + new Date() + ", ScheduledExecutorService for beep1");}};// 任务2Runnable beeper2 = () -> {System.out.println(Thread.currentThread().getName() + "," + new Date() + ", ScheduledExecutorService for beep2");};// 添加第1个任务final ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 10, 10, TimeUnit.SECONDS);// 添加第2个任务scheduler.scheduleAtFixedRate(beeper2, 10, 10, TimeUnit.SECONDS);scheduler.schedule(new Runnable() {public void run() {beeperHandle.cancel(true);}}, 60 * 60, TimeUnit.SECONDS);}/*** 使用Timer实现实时任务*/public static void timerBeepForAnHour() {// 创建Timer执行器实例Timer timer = new Timer();// 执行任务1:发出"哗"声!TimerTask beeper = new TimerTask() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "," + new Date() + ", Timer for beep1");}};执行任务2:TimerTask beeper2 = new TimerTask() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "," + new Date() + ", Timer for beep2");}};// 固定频率周期性执行任务:每10s执行一次任务,发出"哗"声!timer.scheduleAtFixedRate(beeper, 10, 10 * 1000);timer.scheduleAtFixedRate(beeper2, 10, 10 * 1000);// 延迟 delay 后执行一次任务:一次性任务,3600s(1小时)后"取消"任务timer.schedule(new TimerTask() {@Overridepublic void run() {beeper.cancel();}}, 60 * 60 * 1000);}
}
运行程序,打印结果:
Timer-0,Mon Jul 21 11:43:46 CST 2025, Timer for beep1
Timer-0,Mon Jul 21 11:43:46 CST 2025, Timer for beep2
pool-1-thread-2,Mon Jul 21 11:43:56 CST 2025, ScheduledExecutorService for beep2
pool-1-thread-1,Mon Jul 21 11:43:56 CST 2025, ScheduledExecutorService for beep1
Timer-0,Mon Jul 21 11:43:56 CST 2025, Timer for beep2
Timer-0,Mon Jul 21 11:43:56 CST 2025, Timer for beep1
pool-1-thread-1,Mon Jul 21 11:44:06 CST 2025, ScheduledExecutorService for beep1
Timer-0,Mon Jul 21 11:44:06 CST 2025, Timer for beep1
pool-1-thread-2,Mon Jul 21 11:44:06 CST 2025, ScheduledExecutorService for beep2
Timer-0,Mon Jul 21 11:44:06 CST 2025, Timer for beep2
pool-1-thread-2,Mon Jul 21 11:44:16 CST 2025, ScheduledExecutorService for beep2
pool-1-thread-1,Mon Jul 21 11:44:16 CST 2025, ScheduledExecutorService for beep1
上面截取了一部分打印结果,会发现对于ScheduledExecutorService来说,随着执行线程的增加,任务是可能会由不同的线程执行。但是Timer来说,它始终只会有一个线程来执行任务。由此我们推断两个类在执行执行上是不同的,再改写一下上面的例子:
- 将任务阻塞50s,大于执行的同期10s
public class ScheduleTest {// 创建执行private final static ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);public static void main(String[] args) {scheduledExecutorServiceBeepForAnHour();timerBeepForAnHour();}/*** 使用ScheduledExecutorService实现定时任务*/public static void scheduledExecutorServiceBeepForAnHour() {final Runnable beeper = new Runnable() {@SneakyThrowspublic void run() {// 阻塞任务1,50sTimeUnit.SECONDS.sleep(50);System.out.println(Thread.currentThread().getName() + "," + new Date() + ", ScheduledExecutorService for beep1");}};Runnable beeper2 = () -> {System.out.println(Thread.currentThread().getName() + "," + new Date() + ", ScheduledExecutorService for beep2");};// 添加第1个任务final ScheduledFuture<?> beeperHandle = scheduler.scheduleAtFixedRate(beeper, 10, 10, TimeUnit.SECONDS);// 添加第2个任务scheduler.scheduleAtFixedRate(beeper2, 10, 10, TimeUnit.SECONDS);scheduler.schedule(new Runnable() {public void run() {beeperHandle.cancel(true);}}, 60 * 60, TimeUnit.SECONDS);}/*** 使用Timer实现实时任务*/public static void timerBeepForAnHour() {// 创建Timer执行器实例Timer timer = new Timer();// 执行任务:,发出"哗"声!TimerTask beeper = new TimerTask() {@SneakyThrows@Overridepublic void run() {// 阻塞任务1,50sTimeUnit.SECONDS.sleep(50);System.out.println(Thread.currentThread().getName() + "," + new Date() + ", Timer for beep1");}};TimerTask beeper2 = new TimerTask() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + "," + new Date() + ", Timer for beep2");}};// 固定频率周期性执行任务:每10s执行一次任务,发出"哗"声!timer.scheduleAtFixedRate(beeper, 10, 10 * 1000);timer.scheduleAtFixedRate(beeper2, 10, 10 * 1000);// 延迟 delay 后执行一次任务:一次性任务,3600s(1小时)后"取消"任务timer.schedule(new TimerTask() {@Overridepublic void run() {beeper.cancel();}}, 60 * 60 * 1000);}
}
运行程序,打印结果:
pool-1-thread-2,Mon Jul 21 12:20:02 CST 2025, ScheduledExecutorService for beep2
pool-1-thread-2,Mon Jul 21 12:20:12 CST 2025, ScheduledExecutorService for beep2
pool-1-thread-2,Mon Jul 21 12:20:22 CST 2025, ScheduledExecutorService for beep2
pool-1-thread-2,Mon Jul 21 12:20:32 CST 2025, ScheduledExecutorService for beep2
pool-1-thread-2,Mon Jul 21 12:20:42 CST 2025, ScheduledExecutorService for beep2
Timer-0,Mon Jul 21 12:20:42 CST 2025, Timer for beep1
Timer-0,Mon Jul 21 12:20:42 CST 2025, Timer for beep2
Timer-0,Mon Jul 21 12:20:42 CST 2025, Timer for beep2
pool-1-thread-2,Mon Jul 21 12:20:52 CST 2025, ScheduledExecutorService for beep2
pool-1-thread-1,Mon Jul 21 12:20:52 CST 2025, ScheduledExecutorService for beep1
从上面的输出观察到的现象:
- 对于Timer来说,是单线程执行的,一旦执行线程被阻塞,所有任务都会被阻塞至于阻塞解除才会被重新执行(是否补偿取决于不同的API)
- 对于ScheduledExecutorService来说,是多线程执行的,单个线程的阻塞不会造成其它任务的执行,理论上执行效率更高
ScheduledExecutorService与Timer对比
特性 | Timer | ScheduledExecutorService |
---|---|---|
引入版本 | Java 1.3(早期) | Java 5(java.util.concurrent 包) |
所属包 | java.util.Timer | java.util.concurrent.ScheduledExecutorService |
核心功能 | 单线程定时任务调度 | 多线程任务调度(支持线程池) |
从构造方法看类结构
为了体现ScheduledExecutorService执行器底层用的线程池,我们大费周章的写了好几个例子来验证推断,其实通过它的构造方法也看出来:
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);// 创建调度线程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}// 创建调度线程池,这里会super通过调用线程池创建
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}// 标准线程池
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);}/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the* pool* @param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are* executed. This queue will hold only the {@code Runnable}* tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor* creates a new thread* @param handler the handler to use when execution is blocked* because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>* {@code corePoolSize < 0}<br>* {@code keepAliveTime < 0}<br>* {@code maximumPoolSize <= 0}<br>* {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}* or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
由于源码里面涉及到线程池代码比较,这里不就完整的贴出来了,只展示相关重要的几个类:
首先创建线程池:
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
Executors介绍
Executors
的核心作用是封装线程池的创建逻辑,通过不同的静态方法提供以下几种线程池类型:
- 固定大小的线程池(Fixed Thread Pool)
- 可缓存的线程池(Cached Thread Pool)
- 单线程的线程池(Single Thread Pool)
- 周期性任务调度线程池(Scheduled Thread Pool)
这些线程池分别适用于不同的场景,比如任务数量固定、任务数量不确定、需要单线程执行、或需要定时执行任务等。
Executors 提供的常用静态方法
方法 | 说明 | 适用场景 |
---|---|---|
newFixedThreadPool(int nThreads) | 创建一个固定大小的线程池,线程数始终为 nThreads | 任务数量固定、资源有限的场景 |
newCachedThreadPool() | 创建一个可缓存的线程池,线程数可根据任务动态调整 | 任务数量不确定、需要快速响应的场景 |
newSingleThreadExecutor() | 创建一个单线程的线程池,保证任务按顺序执行 | 需要串行执行任务的场景 |
newScheduledThreadPool(int corePoolSize) | 创建一个支持定时任务调度的线程池 | 需要周期性或延迟执行任务的场景 |
newWorkStealingPool(int parallelism) | 创建一个基于工作窃取算法的线程池(Java 8+) | 多核 CPU 下并行任务处理 |
而ScheduledThreadPoolExecutor与ScheduledExecutorService是接口->实现关系,而能创建线程池的原因在于ScheduledThreadPoolExecutor与ThreadPoolExecutor的继承关系:
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService {// 具体方法略,重点明确类关系...
}
类图
直接上类图看看:
至此得到了ScheduledExecutorService类的完整类图,其中:
- 通过
Executors获取线程池
ScheduledThreadPoolExecutor - ScheduledThreadPoolExecutor有两个内部类:DelayedWorkQueue、ScheduledFutureTask
- ScheduledFutureTask封装了FutureTask,可以获取任务执行结果、任务取消、设置任务周期
- DelayedWorkQueue封装了延迟队列,提供了任务管理的方法
定时器-任务线程启动
通过线程池之后,任务的启动就不用定时器自动管理了,而是委托给线程池来管理了,这里就不展示开,有兴趣可以看看之前关于线程池源码的分析《深入解析Java线程池-CSDN博客》,同理定时任务执行也不用定时器来触发了
定时任务执行
谈到定时任务的触发,就又回到我们开篇的疑问了,是不是所有的调度器都是通过类似while(true)这种"死循环"策略?答案是"否定"的,至少从ScheduledExecutorService代码来看不是这样的,它的线程、任务的管理都是通过各种池来实现的,这样非常方便。但是线程执行任务完成之后,对于周期性的任务,还是需要扩展任务、队列来完成,比如下面添加任务的代码:
// 添加周期性任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));// 封装任务,里面存储了任务周期period字段RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;}private class ScheduledFutureTask<V>extends FutureTask<V> implements RunnableScheduledFuture<V> {/** Sequence number to break ties FIFO */private final long sequenceNumber;/** The time the task is enabled to execute in nanoTime units */private long time;// 任务周期private final long period;
}
再继续看延迟执行方法:
private void delayedExecute(RunnableScheduledFuture<?> task) {if (isShutdown())reject(task);else {// 重点是这个,添加任务到队列里面super.getQueue().add(task);if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);else// 添加任务成功,调用此方法:用于确保线程池中至少有一个工作线程在运行ensurePrestart();}}
而任务执行完成后,如果是同期性任务则会重新设置执行时间来达到周期性执行效果:
public void run() {// 获取是否周期性任务标志boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);// 如果不是周期性任务,则直接执行else if (!periodic)ScheduledFutureTask.super.run();// 如果是周期性任务,则先重置执行状态&设置下次执行时间再执行任务else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}}
加引号的"列循环"
谈到定时任务的触发,就又回到我们开篇的疑问了,是不是所有的调度器都是通过类似while(true)这种"死循环"策略?答案是"否定"的
在前面回答这个问题时,"否定"加了引号的。严格意义来说,线程池底层的代码其实也是一种死循环,它通过自旋的方式来获取任务:
private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?// 没有条件的for循环,会一直持有CPU,这种方式称为自旋for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}
由于里面涉及大量的JAVA并发编程,就不展开了,可以自行看看源码及相关书籍。
推荐书单
《Java并发编程实战》:力荐!五星!
个人肤浅的认为是看过写的最好的一本Java并发编码方面的书,初看惊为天人,值的读好几遍!不过缺点就是刚看不容易懂也容易忘,隔段时间又要重头看起。而且理论偏多代码例子较少,有一定的阅读门槛