自定义线程池-实现任务0丢失的处理策略

设计一个线程池,要求如下:

  1. 队列最大容量为10(内存队列)。
  2. 当队列满了之后,拒绝策略将新的任务写入数据库。
  3. 从队列中取任务时,若该队列为空,能够从数据库中加载之前被拒绝的任务

模拟数据库 (TaskDatabase)

  • 使用LinkedBlockingQueue模拟数据库存储
  • 线程安全操作(ReentrantLock保证)
  • 实际应用需替换为JDBC/MyBatis等持久化方案
 static class TaskDatabase {private final BlockingQueue<Runnable> dbQueue = new LinkedBlockingQueue<>();private final ReentrantLock dbLock = new ReentrantLock();public void saveTask(Runnable task) {dbLock.lock();try {System.out.println("[DB] 存储被拒绝任务到数据库, 当前数据库任务数: " + (dbQueue.size() + 1));dbQueue.put(task);} catch (InterruptedException e) {Thread.currentThread().interrupt();} finally {dbLock.unlock();}}public Runnable loadTask() {dbLock.lock();try {Runnable task = dbQueue.poll();if (task != null) {System.out.println("[DB] 从数据库加载任务, 剩余数据库任务: " + dbQueue.size());}return task;} finally {dbLock.unlock();}}}

Runable任务序列化:

public interface SerializableTask extends Serializable {void execute();
}class CustomTask {public static void main(String[] args) {SerializableTask task = () -> System.out.println("Hello, World!");String serializedTask = serializedTask(task);SerializableTask deserialization = deserialization(serializedTask);deserialization.execute();}static String serializedTask(SerializableTask runnable){try(ByteArrayOutputStream baos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(baos)) {// 序列化任务对象oos.writeObject(runnable);return Base64.getEncoder().encodeToString(baos.toByteArray());}catch (Exception e){throw new RuntimeException("无法序列化");}}static SerializableTask deserialization(String serializedTask){// 反序列化任务byte[] data = Base64.getDecoder().decode(serializedTask);try (ByteArrayInputStream bais = new ByteArrayInputStream(data);ObjectInputStream ois = new ObjectInputStream(bais)) {SerializableTask task = (SerializableTask) ois.readObject();return task;}catch (Exception e){throw new RuntimeException("无法反序列化");}}
}

自定义阻塞队列 (DatabaseBackedBlockingQueue)

  • 继承LinkedBlockingQueue并重写关键方法
  • take()方法逻辑
    • 优先从内存队列取任务
    • 队列为空时从数据库加载
    • 数据库也为空时阻塞等待新任务
  • offer()方法 :队列未满时接受,满时返回false触发拒绝策略
 // 自定义阻塞队列(支持从数据库加载任务)static class DatabaseBackedBlockingQueue extends LinkedBlockingQueue<Runnable> {private final TaskDatabase database;public DatabaseBackedBlockingQueue(int maxLocalCapacity, TaskDatabase database) {super(maxLocalCapacity);this.database = database;}@Overridepublic Runnable take() throws InterruptedException {// 1. 优先检查本地队列Runnable task = super.poll();if (task != null) return task;// 2. 本地队列为空时尝试从数据库加载while (true) {Runnable dbTask = database.loadTask();if (dbTask != null) return dbTask;// 3. 数据库为空则等待新任务if (isEmpty()) {task = super.take();  // 阻塞直到有新任务if (task != null) return task;}}}}

拒绝策略 (DatabaseRejectionHandler)

  • 实现RejectedExecutionHandler接口
  • 当内存队列满时将任务存入数据库
  • 任务存入后会被后续的take()方法加载执行
 // 自定义拒绝策略(保存到数据库)static class DatabaseRejectionHandler implements RejectedExecutionHandler {private final TaskDatabase database;public DatabaseRejectionHandler(TaskDatabase database) {this.database = database;}@Overridepublic void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {System.out.println("[Rejected] 线程池队列已满,任务转入数据库");database.saveTask(task);}}

资源管理

  • 核心/最大线程数根据容器资源动态调整
  • 线程工厂添加命名前缀(便于监控)
  • 保活时间控制闲置线程销毁
// 5. 监控线程池状态
ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();
monitor.scheduleAtFixedRate(() -> {System.out.println("\n[监控] 活跃线程: " + executor.getActiveCount()+ " | 队列大小: " + executor.getQueue().size()+ " | 总完成任务: " + executor.getCompletedTaskCount());
}, 1, 2, TimeUnit.SECONDS);

自定义线程工厂

   // 自定义线程工厂static class NamedThreadFactory implements ThreadFactory {private final AtomicInteger counter = new AtomicInteger(1);private final String namePrefix;public NamedThreadFactory(String namePrefix) {this.namePrefix = namePrefix;}@Overridepublic Thread newThread(Runnable r) {return new Thread(r, namePrefix + "-" + counter.getAndIncrement());}}

测试

public static void main(String[] args) throws InterruptedException {// 1. 创建任务数据库TaskDatabase taskDatabase = new TaskDatabase();// 2. 配置线程池int queueCapacity = 10; // 队列容量// 3. 创建自定义队列和拒绝策略DatabaseBackedBlockingQueue workQueue =new DatabaseBackedBlockingQueue(queueCapacity, taskDatabase);ThreadPoolExecutor executor = new ThreadPoolExecutor(2,2,0,TimeUnit.SECONDS,workQueue,new NamedThreadFactory("custom-pool"),new DatabaseRejectionHandler(taskDatabase));// 4. 模拟任务提交int totalTasks = 50;System.out.println("开始提交任务, 总数: " + totalTasks);CountDownLatch countDownLatch = new CountDownLatch(50);for (int i = 1; i <= totalTasks; i++) {final int taskId = i;executor.execute(() -> {try {System.out.println(Thread.currentThread().getName()+ " 执行任务: " + taskId);Thread.sleep(1000); // 模拟任务执行countDownLatch.countDown();} catch (InterruptedException e) {Thread.currentThread().interrupt();}});}// 5. 监控线程池状态ScheduledExecutorService monitor = Executors.newSingleThreadScheduledExecutor();monitor.scheduleAtFixedRate(() -> {System.out.println("\n[监控] 活跃线程: " + executor.getActiveCount()+ " | 队列大小: " + executor.getQueue().size()+ " | 总完成任务: " + executor.getCompletedTaskCount());}, 1, 2, TimeUnit.SECONDS);// 6. 等待任务执行完成countDownLatch.await();executor.shutdown();monitor.shutdown();System.out.println("剩余数据库任务: " + taskDatabase.dbQueue.size());}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/87529.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/87529.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【NLP入门系列四】评论文本分类入门案例

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 博主简介&#xff1a;努力学习的22级本科生一枚 &#x1f31f;​&#xff1b;探索AI算法&#xff0c;C&#xff0c;go语言的世界&#xff1b;在迷茫中寻找光芒…

Ubuntu安装ClickHouse

注&#xff1a;本文章的ubuntu的版本为&#xff1a;ubuntu-20.04.6-live-server-amd64。 Ubuntu&#xff08;在线版&#xff09; 更新软件源 sudo apt-get update 安装apt-transport-https 允许apt工具通过https协议下载软件包。 sudo apt-get install apt-transport-htt…

C++26 下一代C++标准

C++26 将是继 C++23 之后的下一个 C++ 标准。这个新标准对 C++ 进行了重大改进,很可能像 C++98、C++11 或 C++20 那样具有划时代的意义。 一:C++标准回顾 C++ 已经有 40 多年的历史了。过去这些年里发生了什么?这里给出一个简化版的答案,直到即将到来的 C++26。 1. C++9…

【MySQL】十六,MySQL窗口函数

在 MySQL 8.0 及以后版本中&#xff0c;窗口函数&#xff08;Window Functions&#xff09;为数据分析和处理提供了强大的工具。窗口函数允许在查询结果集上执行计算&#xff0c;而不必使用子查询或连接&#xff0c;这使得某些类型的计算更加高效和简洁。 语法结构 function_…

微型气象仪在城市环境的应用

微型气象仪凭借其体积小、成本低、部署灵活、数据实时性强等特点&#xff0c;在城市环境中得到广泛应用&#xff0c;能够为城市规划、环境管理、公共安全、居民生活等领域提供精细化气象数据支持。一、核心应用场景1. 城市微气候监测与优化热岛效应研究场景&#xff1a;在城市不…

【仿muduo库实现并发服务器】eventloop模块

仿muduo库实现并发服务器一.eventloop模块1.成员变量std::thread::id _thread_id;//线程IDPoller _poll;int _event_fd;std::vector<Function<Function>> _task;TimerWheel _timer_wheel2.EventLoop构造3.针对eventfd的操作4.针对poller的操作5.针对threadID的操作…

Redis 加锁、解锁

Redis 加锁和解锁的应用 上代码 应用调用示例 RedisLockEntity lockEntityYlb RedisLockEntity.builder().lockKey(TradeConstants.HP_APP_AMOUNT_LOCK_PREFIX appUser.getAccount()).value(orderId).build();boolean isLockedYlb false;try {if (redisLock.tryLock(lockE…

在 Windows 上为 WSL 增加 root 账号密码并通过 Shell 工具连接

1. 为 WSL 设置 root 用户密码 在 Windows 上使用 WSL&#xff08;Windows Subsystem for Linux&#xff09;时&#xff0c;默认情况下并没有启用 root 账号的密码。为了通过 SSH 或其他工具以 root 身份连接到 WSL&#xff0c;我们需要为 root 用户设置密码。 设置 root 密码步…

2730、找到最长的半重复子字符穿

题目&#xff1a; 解答&#xff1a; 窗口为[left&#xff0c;right]&#xff0c;ans为窗口长度&#xff0c;same为子串长度&#xff0c;窗口满足题设条件&#xff0c;即只含一个连续重复字符&#xff0c;则更新ans&#xff0c;否则从左边开始一直弹出&#xff0c;直到满足条件…

MCP Java SDK源码分析

MCP Java SDK源码分析 一、引言 在当今人工智能飞速发展的时代&#xff0c;大型语言模型&#xff08;LLMs&#xff09;如GPT - 4、Claude等展现出了强大的语言理解和生成能力。然而&#xff0c;这些模型面临着一个核心限制&#xff0c;即无法直接访问外部世界的数据和工具。M…

[Linux]内核如何对信号进行捕捉

要理解Linux中内核如何对信号进行捕捉&#xff0c;我们需要很多前置知识的理解&#xff1a; 内核态和用户态的区别CPU指令集权限内核态和用户态之间的切换 由于文章的侧重点不同&#xff0c;上面这些知识我会在这篇文章尽量详细提及&#xff0c;更详细内容还得请大家查看这篇…

设计模式-观察者模式、命令模式

观察者模式Observer&#xff08;观察者&#xff09;—对象行为型模式定义&#xff1a;定义了一种一对多的依赖关系,让多个观察者对象同时监听某一主题对象,在它的状态发生变化时,会通知所有的观察者.先将 Observer A B C 注册到 Observable &#xff0c;那么当 Observable 状态…

【Unity笔记01】基于单例模式的简单UI框架

单例模式的UIManagerusing System.Collections; using System.Collections.Generic; using UnityEngine;public class UIManager {private static UIManager _instance;public Dictionary<string, string> pathDict;public Dictionary<string, GameObject> prefab…

深入解析 OPC UA:工业自动化与物联网的关键技术

在当今快速发展的工业自动化和物联网&#xff08;IoT&#xff09;领域&#xff0c;数据的无缝交换和集成变得至关重要。OPC UA&#xff08;Open Platform Communications Unified Architecture&#xff09;作为一种开放的、跨平台的工业通信协议&#xff0c;正在成为这一领域的…

MCP 协议的未来发展趋势与学习路径

MCP 协议的未来发展趋势 6.1 MCP 技术演进与更新 MCP 协议正在快速发展&#xff0c;不断引入新的功能和改进。根据 2025 年 3 月 26 日发布的协议规范&#xff0c;MCP 的最新版本已经引入了多项重要更新&#xff1a; 1.HTTP Transport 正式转正&#xff1a;引入 Streamable …

硬件嵌入式学习路线大总结(一):C语言与linux。内功心法——从入门到精通,彻底打通你的任督二脉!

嵌入式工程师学习路线大总结&#xff08;一&#xff09; 引言&#xff1a;C语言——嵌入式领域的“屠龙宝刀”&#xff01; 兄弟们&#xff0c;如果你想在嵌入式领域闯出一片天地&#xff0c;C语言就是你手里那把最锋利的“屠龙宝刀”&#xff01;它不像Python那样优雅&#xf…

MCP server资源网站去哪找?国内MCP服务合集平台有哪些?

在人工智能飞速发展的今天&#xff0c;AI模型与外部世界的交互变得愈发重要。一个好的工具不仅能提升开发效率&#xff0c;还能激发更多的创意。今天&#xff0c;我要给大家介绍一个宝藏平台——AIbase&#xff08;<https://mcp.aibase.cn/>&#xff09;&#xff0c;一个…

修改Spatial-MLLM项目,使其专注于无人机航拍视频的空间理解

修改Spatial-MLLM项目&#xff0c;使其专注于无人机航拍视频的空间理解。以下是修改方案和关键代码实现&#xff1a; 修改思路 输入处理&#xff1a;将原项目的视频文本输入改为单一无人机航拍视频/图像输入问题生成&#xff1a;自动生成空间理解相关的问题&#xff08;无需用户…

攻防世界-Reverse-insanity

知识点 1.ELF文件逆向 2.IDApro的使用 3.strings的使用 步骤 方法一&#xff1a;IDA 使用exeinfo打开&#xff0c;发现是32位ELF文件&#xff0c;然后用ida32打开。 找到main函数&#xff0c;然后F5反编译&#xff0c;得到flag。 tip&#xff1a;该程序是根据随机函数生成…

【openp2p】 学习1:P2PApp和优秀的go跨平台项目

P2PApp下面给出一个基于 RESTful 风格的 P2PApp 管理方案示例,供二次开发或 API 对接参考。核心思路就是把每个 P2PApp 当成一个可创建、查询、修改、启动/停止、删除的资源来管理。 一、P2PApp 资源模型 P2PApp:id: string # 唯一标识name: string # …