设计一个线程池,要求如下:
- 队列最大容量为10(内存队列)。
- 当队列满了之后,拒绝策略将新的任务写入数据库。
- 从队列中取任务时,若该队列为空,能够从数据库中加载之前被拒绝的任务
模拟数据库 (TaskDatabase)
- 使用
LinkedBlockingQueue
模拟数据库存储 - 线程安全操作(ReentrantLock保证)
- 实际应用需替换为JDBC/MyBatis等持久化方案
static class TaskDatabase {private final BlockingQueue<Runnable> dbQueue = new LinkedBlockingQueue<>();private final ReentrantLock dbLock = new ReentrantLock();public void saveTask(Runnable task) {dbLock.lock();try {System.out.println("[DB] 存储被拒绝任务到数据库, 当前数据库任务数: " + (dbQueue.size() + 1));dbQueue.put(task);} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {dbLock.unlock();}}public Runnable loadTask() {dbLock.lock();try {Runnable task = dbQueue.poll();if (task != null) {System.out.println("[DB] 从数据库加载任务, 剩余数据库任务: " + dbQueue.size());}return task;} finally {dbLock.unlock();}}}
Runable任务序列化:
public interface SerializableTask extends Serializable {void execute();
}class CustomTask {public static void main(String[] args) {SerializableTask task = () -> System.out.println("Hello, World!");String serializedTask = serializedTask(task);SerializableTask deserialization = deserialization(serializedTask);deserialization.execute();}static String serializedTask(SerializableTask runnable){try(ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(baos)) {// 序列化任务对象oos.writeObject(runnable);return Base64.getEncoder().encodeToString(baos.toByteArray());}catch (Exception e){throw new RuntimeException("无法序列化");}}static SerializableTask deserialization(String serializedTask){// 反序列化任务byte[] data = Base64.getDecoder().decode(serializedTask);try (ByteArrayInputStream bais = new ByteArrayInputStream(data);ObjectInputStream ois = new ObjectInputStream(bais)) {SerializableTask task = (SerializableTask) ois.readObject();return task;}catch (Exception e){throw new RuntimeException("无法反序列化");}}
}
自定义阻塞队列 (DatabaseBackedBlockingQueue)
- 继承
LinkedBlockingQueue
并重写关键方法 - take()方法逻辑 :
- 优先从内存队列取任务
- 队列为空时从数据库加载
- 数据库也为空时阻塞等待新任务
- offer()方法 :队列未满时接受,满时返回false触发拒绝策略
// 自定义阻塞队列(支持从数据库加载任务)static class DatabaseBackedBlockingQueue extends LinkedBlockingQueue<Runnable> {private final TaskDatabase database;public DatabaseBackedBlockingQueue(int maxLocalCapacity, TaskDatabase database) {super(maxLocalCapacity);this.database = database;}@Overridepublic Runnable take() throws InterruptedException {// 1. 优先检查本地队列Runnable task = super.poll();if (task != null) return task;// 2. 本地队列为空时尝试从数据库加载while (true) {Runnable dbTask = database.loadTask();if (dbTask != null) return dbTask;// 3. 数据库为空则等待新任务if (isEmpty()) {task = super.take(); // 阻塞直到有新任务if (task != null) return task;}}}}
拒绝策略 (DatabaseRejectionHandler)
- 实现
RejectedExecutionHandler
接口 - 当内存队列满时将任务存入数据库
- 任务存入后会被后续的take()方法加载执行
// 自定义拒绝策略(保存到数据库)static class DatabaseRejectionHandler implements RejectedExecutionHandler {private final TaskDatabase database;public DatabaseRejectionHandler(TaskDatabase database) {this.database = database;}@Overridepublic void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {System.out.println("[Rejected] 线程池队列已满,任务转入数据库");database.saveTask(task);}}
资源管理 :
- 核心/最大线程数根据容器资源动态调整
- 线程工厂添加命名前缀(便于监控)
- 保活时间控制闲置线程销毁
// 5. 监控线程池状态
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {System.out.println("\n[监控] 活跃线程: " + executor.getActiveCount()+ " | 队列大小: " + executor.getQueue().size()+ " | 总完成任务: " + executor.getCompletedTaskCount());
}, 1, 2, TimeUnit.SECONDS);
自定义线程工厂
// 自定义线程工厂static class NamedThreadFactory implements ThreadFactory {private final AtomicInteger counter = new AtomicInteger(1);private final String namePrefix;public NamedThreadFactory(String namePrefix) {this.namePrefix = namePrefix;}@Overridepublic Thread newThread(Runnable r) {return new Thread(r, namePrefix + "-" + counter.getAndIncrement());}}
测试
public static void main(String[] args) throws InterruptedException {// 1. 创建任务数据库TaskDatabase taskDatabase = new TaskDatabase();// 2. 配置线程池int queueCapacity = 10; // 队列容量// 3. 创建自定义队列和拒绝策略DatabaseBackedBlockingQueue workQueue =new DatabaseBackedBlockingQueue(queueCapacity, taskDatabase);ThreadPoolExecutor executor = new ThreadPoolExecutor(2,2,0,TimeUnit.SECONDS,workQueue,new NamedThreadFactory("custom-pool"),new DatabaseRejectionHandler(taskDatabase));// 4. 模拟任务提交int totalTasks = 50;System.out.println("开始提交任务, 总数: " + totalTasks);CountDownLatch countDownLatch = new CountDownLatch(50);for (int i = 1; i <= totalTasks; i++) {final int taskId = i;executor.execute(() -> {try {System.out.println(Thread.currentThread().getName()+ " 执行任务: " + taskId);Thread.sleep(1000); // 模拟任务执行countDownLatch.countDown();} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}// 5. 监控线程池状态ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();monitor.scheduleAtFixedRate(() -> {System.out.println("\n[监控] 活跃线程: " + executor.getActiveCount()+ " | 队列大小: " + executor.getQueue().size()+ " | 总完成任务: " + executor.getCompletedTaskCount());}, 1, 2, TimeUnit.SECONDS);// 6. 等待任务执行完成countDownLatch.await();executor.shutdown();monitor.shutdown();System.out.println("剩余数据库任务: " + taskDatabase.dbQueue.size());}