一、核心特性
Springboot 集成
支持 @Async 注解,简化异步方法调用。
参数可配置化
核心线程数、最大线程数、队列容量、拒绝策略等均可通过配置调整。
生命周期管理
实现 Lifecycle 接口,支持线程池的启动和关闭(如应用关闭时优雅终止任务)。
任务装饰器
支持通过 TaskDecorator 对任务进行装饰(如传递上下文信息)
二、添加依赖
在 pom.xml
文件中添加 Spring Boot Starter AOP 依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
三、参数详解
通过 Spring 配置文件或 @Bean 定义线程池时,需设置以下关键参数:
参数名称 | 说明 | 默认值 |
---|---|---|
corePoolSize | 核心线程数,即使空闲也不会被回收 | 1 |
maxPoolSize | 最大线程数,当队列满时创建新线程直到达到此值 | Integer.MAX_VALUE |
queueCapacity | 任务队列容量(使用 LinkedBlockingQueue 或 ArrayBlockingQueue) | Integer.MAX_VALUE |
keepAliveSeconds | 非核心线程的空闲存活时间(秒) | 60 |
threadNamePrefix | 线程名前缀,便于日志追踪 | "task-executor-" |
allowCoreThreadTimeOut | 是否允许核心线程超时回收 | false |
rejectedExecutionHandler | 拒绝策略(如 AbortPolicy、CallerRunsPolicy) | AbortPolicy(直接抛出异常) |
四、配置线程池
@Configuration
@EnableAsync
public class ExecutorConfig {private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class);@Value("${async.executor.thread.core_pool_size}")private int corePoolSize;@Value("${async.executor.thread.max_pool_size}")private int maxPoolSize;@Value("${async.executor.thread.queue_capacity}")private int queueCapacity;@Value("${async.executor.thread.name.prefix}")private String namePrefix;@Bean(name = "asyncServiceExecutor")public Executor asyncServiceExecutor() {logger.info("start asyncServiceExecutor");ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();//配置核心线程数executor.setCorePoolSize(corePoolSize);//配置最大线程数executor.setMaxPoolSize(maxPoolSize);//配置队列大小executor.setQueueCapacity(queueCapacity);//配置线程池中的线程的名称前缀executor.setThreadNamePrefix(namePrefix);// rejection-policy:当pool已经达到max size的时候,如何处理新任务// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//执行初始化executor.initialize();return executor;}
}
@Value
是我配置在 application.yml
,可以参考配置,自由定义
# 异步线程配置
# 配置核心线程数
async.executor.thread.core_pool_size = 5
# 配置最大线程数
async.executor.thread.max_pool_size = 5
# 配置队列大小
async.executor.thread.queue_capacity = 99999
# 配置线程池中的线程的名称前缀
async.executor.thread.name.prefix = async-service-
五、应用实践
1、异步任务处理
创建一个服务类 AsyncService
,并在其方法上使用 @Async
注解来定义异步任务:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;@Service
public class AsyncService {private static final Logger logger = LoggerFactory.getLogger(AsyncService.class);@Async("taskExecutor")public void asyncTask(String taskName) {logger.info(Thread.currentThread().getName() + " 开始执行任务: " + taskName);try {Thread.sleep(2000); // 模拟耗时操作} catch (InterruptedException e) {Thread.currentThread().interrupt();logger.error("任务执行被中断", e);} finally {logger.info(Thread.currentThread().getName() + " 任务执行完成: " + taskName);}}
}
创建一个控制器类 AsyncController
,用于触发异步任务(线程安全的)
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.concurrent.Future;@RestController
public class AsyncController {private static final Logger logger = LoggerFactory.getLogger(AsyncController.class);@Autowiredprivate AsyncService asyncService;@GetMapping("/trigger")public String triggerAsyncTasks() {logger.info("开始触发异步任务");for (int i = 0; i < 10; i++) {asyncService.asyncTask("任务 " + i);}return "异步任务已触发";}
}
创建一个监控组件 ThreadPoolMonitor
,用于定期监控线程池的状态
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;@Component
public class ThreadPoolMonitor {private static final Logger logger = LoggerFactory.getLogger(ThreadPoolMonitor.class);@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;@Scheduled(fixedRate = 60000) // 每分钟执行一次public void monitorThreadPool() {int activeCount = taskExecutor.getActiveCount();int poolSize = taskExecutor.getPoolSize();int corePoolSize = taskExecutor.getCorePoolSize();int maxPoolSize = taskExecutor.getMaxPoolSize();int queueSize = taskExecutor.getThreadPoolExecutor().getQueue().size();int completedTaskCount = taskExecutor.getThreadPoolExecutor().getCompletedTaskCount();logger.info("线程池状态 - 活动线程数: {}, 当前线程数: {}, 核心线程数: {}, 最大线程数: {}, 队列大小: {}, 已完成任务数: {}",activeCount, poolSize, corePoolSize, maxPoolSize, queueSize, completedTaskCount);// 检查线程池是否接近饱和if (activeCount >= maxPoolSize * 0.8 || queueSize >= taskExecutor.getQueueCapacity() * 0.8) {logger.warn("线程池负载过高!请考虑优化配置或检查任务执行情况");}}
}
确保在启动类上添加 @EnableAsync
注解,以启用异步任务支持
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;@SpringBootApplication
@EnableAsync
public class AsyncDemoApplication {public static void main(String[] args) {SpringApplication.run(AsyncDemoApplication.class, args);}
}
测试:
启动 Spring Boot 应用后,访问 http://localhost:8080/trigger
,即可看到异步任务在线程池中执行的情况,同时线程池的状态也会定期输出到日志中。
代码说明
-
@EnableAsync 注解 :用于启用 Spring 的异步方法执行支持,确保 Spring 容器能够识别和处理带有
@Async
注解的方法。 -
@Async 注解 :用于标注希望异步执行的方法,需指定所使用的线程池 Bean 的名称,在本例中为 “taskExecutor”。当该方法被调用时,Spring 会将其提交到指定的线程池中执行。
-
ThreadPoolTaskExecutor :是 Spring 提供的一个线程池任务执行器,通过设置核心线程数、最大线程数、队列容量等参数,可以根据应用的需求灵活地配置线程池。
-
异步任务失败处理 :通过自定义的拒绝策略,在线程池满时记录详细信息并抛出异常,以便及时发现任务执行失败的情况。
-
线程池监控 :使用
@Scheduled
注解定期监控线程池的状态,包括活动线程数、当前线程数、核心线程数、最大线程数、队列大小和已完成任务数等,帮助开发者了解线程池的运行情况,以便及时进行优化和调整
2、高并发请求处理
在 Web 应用中处理大量并发请求,避免阻塞主线程
@RestController
public class MyController {@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;@GetMapping("/process")public CompletableFuture<String> handleRequest() {return CompletableFuture.supplyAsync(() -> {// 耗时操作return "Result";}, taskExecutor);}}
3、定时任务调度
@EnableScheduling
@Configuration
public class SchedulerConfig {@Beanpublic ThreadPoolTaskScheduler taskScheduler() {ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();scheduler.setPoolSize(5);scheduler.setThreadNamePrefix("Scheduler-");return scheduler;}}@Service
public class ScheduledService {@Scheduled(fixedRate = 5000)public void scheduledTask() {// 定时任务逻辑}}
拒绝策略(Rejected Policies)
当线程池和队列均满时,处理新任务的策略:
策略类 | 行为描述 |
---|---|
AbortPolicy | 直接抛出 RejectedExecutionException(默认) |
CallerRunsPolicy | 由提交任务的线程直接执行任务(同步阻塞提交者) |
DiscardPolicy | 静默丢弃新任务,不抛异常 |
DiscardOldestPolicy | 丢弃队列中最旧的任务,然后重试提交新任务 |
如下给出不同拒绝策略的配置类,请结合上面的配置类整合使用
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;@Configuration
public class ThreadPoolConfig {@Bean(name = "abortPolicyExecutor")public ThreadPoolTaskExecutor abortPolicyExecutor() {return createExecutor(new ThreadPoolExecutor.AbortPolicy());}@Bean(name = "callerRunsPolicyExecutor")public ThreadPoolTaskExecutor callerRunsPolicyExecutor() {return createExecutor(new ThreadPoolExecutor.CallerRunsPolicy());}@Bean(name = "discardPolicyExecutor")public ThreadPoolTaskExecutor discardPolicyExecutor() {return createExecutor(new ThreadPoolExecutor.DiscardPolicy());}@Bean(name = "discardOldestPolicyExecutor")public ThreadPoolTaskExecutor discardOldestPolicyExecutor() {return createExecutor(new ThreadPoolExecutor.DiscardOldestPolicy());}private ThreadPoolTaskExecutor createExecutor(ThreadPoolExecutor.RejectedExecutionHandler rejectedExecutionHandler) {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(5); // 核心线程数executor.setMaxPoolSize(10); // 最大线程数executor.setQueueCapacity(100); // 队列容量executor.setThreadNamePrefix("Task-Executor-"); // 线程名前缀executor.setRejectedExecutionHandler(rejectedExecutionHandler);executor.initialize();return executor;}
}
创建一个服务类 TaskService
,用于执行任务
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;@Service
public class TaskService {@Async("abortPolicyExecutor")public void executeWithAbortPolicy(String taskName) {executeTask(taskName);}@Async("callerRunsPolicyExecutor")public void executeWithCallerRunsPolicy(String taskName) {executeTask(taskName);}@Async("discardPolicyExecutor")public void executeWithDiscardPolicy(String taskName) {executeTask(taskName);}@Async("discardOldestPolicyExecutor")public void executeWithDiscardOldestPolicy(String taskName) {executeTask(taskName);}private void executeTask(String taskName) {try {System.out.println(Thread.currentThread().getName() + " 开始执行任务: " + taskName);Thread.sleep(2000); // 模拟任务执行时间System.out.println(Thread.currentThread().getName() + " 任务执行完成: " + taskName);} catch (InterruptedException e) {Thread.currentThread().interrupt();System.err.println("任务执行被中断: " + taskName);}}
}
创建一个控制器类 TaskController
,用于触发任务执行
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class TaskController {@Autowiredprivate TaskService taskService;@GetMapping("/trigger/abort")public String triggerAbortPolicy(@RequestParam String taskName) {taskService.executeWithAbortPolicy(taskName);return "任务已提交到使用 AbortPolicy 的线程池";}@GetMapping("/trigger/caller")public String triggerCallerRunsPolicy(@RequestParam String taskName) {taskService.executeWithCallerRunsPolicy(taskName);return "任务已提交到使用 CallerRunsPolicy 的线程池";}@GetMapping("/trigger/discard")public String triggerDiscardPolicy(@RequestParam String taskName) {taskService.executeWithDiscardPolicy(taskName);return "任务已提交到使用 DiscardPolicy 的线程池";}@GetMapping("/trigger/discardoldest")public String triggerDiscardOldestPolicy(@RequestParam String taskName) {taskService.executeWithDiscardOldestPolicy(taskName);return "任务已提交到使用 DiscardOldestPolicy 的线程池";}
}
启动 Spring Boot 应用后,分别访问以下 URL 来测试不同拒绝策略的行为:
-
http://localhost:8080/trigger/abort?taskName=任务1
-
http://localhost:8080/trigger/caller?taskName=任务2
-
http://localhost:8080/trigger/discard?taskName=任务3
-
http://localhost:8080/trigger/discardoldest?taskName=任务4
-
代码说明
-
线程池配置:
-
使用
ThreadPoolTaskExecutor
创建线程池。 -
配置了 4 个不同的线程池,每个线程池使用不同的拒绝策略。
-
每个线程池的核心线程数为 5,最大线程数为 10,队列容量为 100。
-
-
拒绝策略:
-
AbortPolicy:直接抛出
RejectedExecutionException
。 -
CallerRunsPolicy:由提交任务的线程直接执行任务。
-
DiscardPolicy:静默丢弃新任务,不抛异常。
-
DiscardOldestPolicy:丢弃队列中最旧的任务,然后重试提交新任务。
-
任务执行:
-
TaskService
类中的每个方法都使用@Async
注解,并指定使用的线程池。 -
executeTask
方法模拟任务执行,包含一个 2 秒的睡眠时间。 -
通过这个示例,你可以观察不同拒绝策略在任务被拒绝时的行为。例如,当线程池满时,
AbortPolicy
会抛出异常,CallerRunsPolicy
会让提交任务的线程执行任务,DiscardPolicy
会静默丢弃任务,而DiscardOldestPolicy
会丢弃最旧的任务并尝试提交新任务
-
-
6、最佳配置
-
· 合理设置线程池参数
CPU 密集型任务:核心线程数 ≈ CPU 核心数
I/O 密集型任务:核心线程数 ≈ CPU 核心数 * 2,并增大队列容量。
· 避免队列无限堆积
设置合理的 queueCapacity,防止内存溢出(OOM)。
· 统一异常处理
通过 AsyncUncaughtExceptionHandler 捕获异步任务中的异常: -
@Configurationpublic class AsyncConfig implements AsyncConfigurer {@Overridepublic Executor getAsyncExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// ... 配置参数return executor;}@Overridepublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {return (ex, method, params) -> {// 处理异常};}}
应用退出时,调用 shutdown() 并等待剩余任务执行完毕
-
executor.shutdown();try {if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {executor.shutdownNow();}} catch (InterruptedException e) {executor.shutdownNow();}
总结:
-
ThreadPoolTaskExecutor 是 Spring 生态中管理线程任务的利器,通过灵活的配置和与 Spring 的无缝集成,能够高效处理异步任务、高并发请求和定时调度。合理设置参数、选择拒绝策略,并结合监控手段,可显著提升系统性能和稳定性。
-