高可用消息队列线程池设计与实现:从源码解析到最佳实践

前言

在现代分布式系统中,消息队列处理是核心组件之一。今天我们将深入解析一个高性能、高可用的消息队列线程池实现——FindMessageQueue,并探讨如何将其优化应用于实际项目中。

一、核心架构设计

1.1 整体架构图

┌─────────────────────────────────────────────────┐
│                FindMessageQueue                 │
│                                                 │
│  ┌─────────────────────────────────────────┐    │
│  │           ThreadPoolExecutor            │    │
│  │                                         │    │
│  │  ┌─────────────┐  ┌─────────────────┐   │    │
│  │  │ 核心线程池   │  │  有界任务队列    │   │    │
│  │  │ (Core Pool) │  │ (Bounded Queue) │   │    │
│  │  └─────────────┘  └─────────────────┘   │    │
│  └─────────────────────────────────────────┘    │
│                                                 │
│  ┌─────────────────────────────────────────┐    │
│  │             熔断器机制                   │    │
│  │         (Circuit Breaker)               │    │
│  └─────────────────────────────────────────┘    │
│                                                 │
│  ┌─────────────────────────────────────────┐    │
│  │             监控系统                     │    │
│  │           (Monitoring)                  │    │
│  └─────────────────────────────────────────┘    │
│                                                 │
│  ┌─────────────────────────────────────────┐    │
│  │           指标统计系统                   │    │
│  │          (Metrics System)               │    │
│  └─────────────────────────────────────────┘    │
└─────────────────────────────────────────────────┘

1.2 核心组件介绍

// 核心线程池配置
private final ThreadPoolExecutor executorService;
private final int queueCapacity;// 熔断器机制
private final AtomicBoolean circuitBreakerOpen = new AtomicBoolean(false);
private final AtomicLong circuitBreakerOpenedTime = new AtomicLong(0);// 监控指标
private final AtomicLong totalTasksSubmitted = new AtomicLong(0);
private final AtomicLong totalTasksRejected = new AtomicLong(0);
private final AtomicLong totalTasksCompleted = new AtomicLong(0);
private final AtomicLong totalTasksFailed = new AtomicLong(0);

二、详细源码解析

2.1 线程池初始化

public FindMessageQueue(int threadPoolSize) {this.queueCapacity = 1000;this.executorService = new ThreadPoolExecutor(threadPoolSize,                    // 核心线程数threadPoolSize,                    // 最大线程数60L, TimeUnit.SECONDS,            // 线程空闲存活时间new LinkedBlockingQueue<>(queueCapacity), // 有界队列new ThreadPoolExecutor.DiscardPolicy() // 拒绝策略);startMonitorThread(); // 启动监控线程
}

关键参数说明:

  • corePoolSize = maximumPoolSize:创建固定大小线程池

  • keepAliveTime = 60秒:空闲线程回收时间

  • LinkedBlockingQueue:有界队列防止内存溢出

  • DiscardPolicy:队列满时由调用线程执行任务

2.2 熔断器机制实现

// 熔断检查逻辑
if (rejectionRate > rejectionRateThreshold && !circuitBreakerOpen.get()) {logger.warn("拒绝率过高({}%),触发熔断机制", rejectionRate * 100);circuitBreakerOpen.set(true);circuitBreakerOpenedTime.set(System.currentTimeMillis());
}// 熔断恢复逻辑  
if (circuitBreakerOpen.get() && System.currentTimeMillis() - circuitBreakerOpenedTime.get() > circuitBreakerResetTimeout) {if (rejectionRate < rejectionRateThreshold / 2) {circuitBreakerOpen.set(false); // 恢复服务}
}

2.3 任务提交机制

public boolean addTask(Runnable task, long timeout, TimeUnit unit) {totalTasksSubmitted.incrementAndGet();// 熔断器检查if (circuitBreakerOpen.get()) {totalTasksRejected.incrementAndGet();return false;}try {if (timeout <= 0) {executorService.execute(wrapTask(task)); // 异步执行return true;} else {Future<?> future = executorService.submit(wrapTask(task));future.get(timeout, unit); // 同步等待结果return true;}} catch (RejectedExecutionException e) {totalTasksRejected.incrementAndGet();return false;}
}

2.4 监控系统实现

private void monitorQueueHealth() {int queueSize = executorService.getQueue().size();int activeCount = executorService.getActiveCount();double queueUsage = (double) queueSize / queueCapacity;double rejectionRate = (double) totalTasksRejected.get() / totalTasksSubmitted.get();logger.info("线程池监控 - 活跃线程: {}, 队列大小: {}/{}, 使用率: {}%, 拒绝率: {}%",activeCount, queueSize, queueCapacity, queueUsage * 100, rejectionRate * 100);
}

三、优化改进方案

3.1 使用Spring Boot集成

@Configuration
public class ThreadPoolConfig {@Beanpublic FindMessageQueue findMessageQueue(@Value("${thread.pool.size:10}") int poolSize,@Value("${thread.queue.capacity:1000}") int queueCapacity) {return new FindMessageQueue(poolSize) {@Overrideprotected void init(int threadPoolSize) {// 可重写初始化逻辑super.queueCapacity = queueCapacity;}};}
}

3.2 添加Prometheus监控指标

@Component
public class ThreadPoolMetrics {private final FindMessageQueue messageQueue;// 注册监控指标public void registerMetrics() {Gauge.builder("thread_pool_queue_size", messageQueue, FindMessageQueue::getQueueSize).description("当前任务队列大小").register(MeterRegistry);Gauge.builder("thread_pool_rejection_rate", messageQueue, q -> (double) q.getRejectedCount() / q.getSubmittedCount()).description("任务拒绝率").register(MeterRegistry);}
}

3.3 增强的熔断策略

// 多维度熔断条件
private boolean shouldTriggerCircuitBreaker() {double rejectionRate = getRejectionRate();double queueUsage = getQueueUsage();long avgTaskTime = getAverageTaskTime();return rejectionRate > rejectionRateThreshold || queueUsage > 0.9 || avgTaskTime > maxAllowedTaskTime;
}

3.4 动态配置调整

@RefreshScope
@Component
public class DynamicThreadPoolConfig {@Autowiredprivate FindMessageQueue messageQueue;@EventListenerpublic void onConfigUpdate(EnvironmentChangeEvent event) {// 动态调整线程池参数if (event.getKeys().contains("thread.pool.size")) {adjustThreadPoolSize();}}
}

总结

核心优势:

  1. 高可用性:熔断器机制防止系统雪崩

  2. 可观测性:完善的监控和指标统计

  3. 弹性伸缩:动态调整线程池参数

  4. 错误隔离:任务失败不影响主线程

适用场景:

  • 消息队列处理

  • 批量数据处理

  • 异步任务执行

  • 高并发请求处理

注意事项:

  • 合理设置线程池大小和队列容量

  • 监控关键指标并及时调整参数

  • 实现恰当的错误处理和重试机制

  • 定期进行压力测试和性能调优

这个FindMessageQueue实现提供了一个生产级别的线程池解决方案,通过熔断器、监控系统和弹性设计,确保了系统的高可用性和稳定性。

附赠:完整代码:

package com.baotademo.controller;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;public class FindMessageQueue {private static final Logger logger = LoggerFactory.getLogger(FindMessageQueue.class);private final ThreadPoolExecutor executorService;private final int queueCapacity;// 熔断器状态private final AtomicBoolean circuitBreakerOpen = new AtomicBoolean(false);private final AtomicLong circuitBreakerOpenedTime = new AtomicLong(0);private final long circuitBreakerResetTimeout = 30000; // 30秒后尝试恢复// 监控指标private final AtomicLong totalTasksSubmitted = new AtomicLong(0);private final AtomicLong totalTasksRejected = new AtomicLong(0);private final AtomicLong totalTasksCompleted = new AtomicLong(0);private final AtomicLong totalTasksFailed = new AtomicLong(0);// 监控阈值private final double queueUsageThreshold = 0.8; // 队列使用率超过80%警告private final double rejectionRateThreshold = 0.1; // 拒绝率超过10%触发熔断public FindMessageQueue(int threadPoolSize) {this.queueCapacity = 1000;// 使用有界队列+合适的拒绝策略this.executorService = new ThreadPoolExecutor(threadPoolSize, // 核心线程数threadPoolSize, // 最大线程数60L, TimeUnit.SECONDS, // 空闲线程存活时间new LinkedBlockingQueue<>(queueCapacity), // 有界任务队列new ThreadPoolExecutor.DiscardPolicy() // 拒绝策略:由调用线程执行);// 启动监控线程startMonitorThread();}// 启动监控线程private void startMonitorThread() {ScheduledExecutorService monitorExecutor = Executors.newSingleThreadScheduledExecutor();monitorExecutor.scheduleAtFixedRate(() -> {try {monitorQueueHealth();} catch (Exception e) {logger.error("监控线程执行异常", e);}}, 1, 5, TimeUnit.SECONDS); // 5秒监控一次}// 监控队列健康状态private void monitorQueueHealth() {int queueSize = executorService.getQueue().size();int activeCount = executorService.getActiveCount();long completedTaskCount = executorService.getCompletedTaskCount();long submittedTasks = totalTasksSubmitted.get();long rejectedTasks = totalTasksRejected.get();// 计算队列使用率double queueUsage = (double) queueSize / queueCapacity;// 计算拒绝率double rejectionRate = submittedTasks > 0 ? (double) rejectedTasks / submittedTasks : 0;// 记录监控指标logger.info("线程池监控 - 活跃线程: {}, 队列大小: {}/{}, 队列使用率: {}%, 拒绝率: {}%, 已完成任务: {}",activeCount, queueSize, queueCapacity,String.format("%.2f", queueUsage * 100),String.format("%.2f", rejectionRate * 100),completedTaskCount);// 检查是否需要触发熔断if (rejectionRate > rejectionRateThreshold && !circuitBreakerOpen.get()) {logger.warn("拒绝率过高({}%),触发熔断机制", String.format("%.2f", rejectionRate * 100));circuitBreakerOpen.set(true);circuitBreakerOpenedTime.set(System.currentTimeMillis());}// 检查是否可以恢复熔断if (circuitBreakerOpen.get() &&System.currentTimeMillis() - circuitBreakerOpenedTime.get() > circuitBreakerResetTimeout) {logger.info("尝试恢复熔断器,当前拒绝率: {}%", String.format("%.2f", rejectionRate * 100));// 如果拒绝率下降到阈值以下,恢复服务if (rejectionRate < rejectionRateThreshold / 2) {logger.info("拒绝率已恢复正常({}%),关闭熔断器", String.format("%.2f", rejectionRate * 100));circuitBreakerOpen.set(false);} else {// 否则重置熔断时间,继续熔断circuitBreakerOpenedTime.set(System.currentTimeMillis());}}// 队列使用率过高警告if (queueUsage > queueUsageThreshold) {logger.warn("任务队列使用率过高: {}%", String.format("%.2f", queueUsage * 100));}}// 向队列添加任务public boolean addTask(Runnable task) {return addTask(task, 0, TimeUnit.MILLISECONDS);}// 带超时的任务添加public boolean addTask(Runnable task, long timeout, TimeUnit unit) {totalTasksSubmitted.incrementAndGet();// 检查熔断器状态if (circuitBreakerOpen.get()) {logger.warn("熔断器已打开,拒绝新任务");totalTasksRejected.incrementAndGet();return false;}try {// 尝试提交任务if (timeout <= 0) {executorService.execute(task);return true;} else {// 带超时的提交Future<?> future = executorService.submit(task);try {future.get(timeout, unit);return true;} catch (TimeoutException e) {logger.warn("任务执行超时,已取消");future.cancel(true);totalTasksFailed.incrementAndGet();return false;}}} catch (RejectedExecutionException e) {logger.warn("任务被线程池拒绝,当前队列大小: {}", executorService.getQueue().size());totalTasksRejected.incrementAndGet();return false;} catch (Exception e) {logger.error("添加任务时发生异常", e);totalTasksFailed.incrementAndGet();return false;}}// 获取当前队列大小public int getQueueSize() {return executorService.getQueue().size();}// 获取活跃线程数public int getActiveCount() {return executorService.getActiveCount();}// 获取熔断器状态public boolean isCircuitBreakerOpen() {return circuitBreakerOpen.get();}// 手动重置熔断器public void resetCircuitBreaker() {circuitBreakerOpen.set(false);circuitBreakerOpenedTime.set(0);logger.info("熔断器已手动重置");}// 获取监控指标public String getMetrics() {return String.format("任务统计 - 已提交: %d, 已拒绝: %d, 已完成: %d, 失败: %d, 拒绝率: %.2f%%",totalTasksSubmitted.get(),totalTasksRejected.get(),totalTasksCompleted.get(),totalTasksFailed.get(),totalTasksSubmitted.get() > 0 ?(double) totalTasksRejected.get() / totalTasksSubmitted.get() * 100 : 0);}// 优雅关闭public void shutdown() {logger.info("开始关闭线程池...");executorService.shutdown();try {if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {logger.warn("线程池未正常关闭,尝试强制关闭");executorService.shutdownNow();}} catch (InterruptedException e) {executorService.shutdownNow();Thread.currentThread().interrupt();}logger.info("线程池已关闭");}// 立即关闭public void shutdownNow() {logger.info("立即关闭线程池");executorService.shutdownNow();}// 包装任务以跟踪完成情况private Runnable wrapTask(Runnable task) {return () -> {try {task.run();totalTasksCompleted.incrementAndGet();} catch (Exception e) {totalTasksFailed.incrementAndGet();logger.error("任务执行失败", e);throw e;}};}
}

使用方法:

1.实例化:

private static final FindMessageQueue findMessageQueue = new FindMessageQueue(50);

2.调用:

    public CompletableFuture<R> sendQueneSms(@RequestBody Map<String, Object> request,HttpServletRequest requesthead) {CompletableFuture<R> future = new CompletableFuture<>();// 设置超时ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);ScheduledFuture<?> timeoutFuture = scheduler.schedule(() -> {if (!future.isDone()) {logger.warn("请求处理超时");future.complete(R.error("处理超时,请稍后重试"));}}, 10, TimeUnit.SECONDS); // 10秒超时// 创建任务Runnable task = () -> {try {R result = loadHistoryMessage(request, requesthead);future.complete(result);} catch (Exception e) {logger.error("处理历史消息失败", e);future.complete(R.error("处理失败: " + e.getMessage()));} finally {// 取消超时检查timeoutFuture.cancel(true);scheduler.shutdown();}};// 添加任务到队列boolean success = findMessageQueue.addTask(task, 5, TimeUnit.SECONDS); // 5秒提交超时if (!success) {// 任务提交失败,直接返回降级响应timeoutFuture.cancel(true);scheduler.shutdown();if (findMessageQueue.isCircuitBreakerOpen()) {future.complete(R.error("系统繁忙,熔断器已打开,请稍后重试"));} else {future.complete(R.error("系统繁忙,请稍后重试"));}}return future;}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/96494.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/96494.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Android App瘦身方法介绍

第一章 安装包构成深度剖析1.1 APK文件结构解剖APK文件本质是一个ZIP压缩包&#xff0c;通过unzip -l app.apk命令可查看其内部结构&#xff1a;Archive: app.apkLength Method Size Cmpr Date Time CRC-32 Name -------- ------ ------- ---- ---------- -…

深入浅出迁移学习:从理论到实践

1. 引言&#xff1a;为什么需要迁移学习&#xff1f;在深度学习爆发的这十年里&#xff0c;我们见证了模型性能的飞速提升 ——ResNet 在图像分类上突破人类视觉极限&#xff0c;BERT 在 NLP 任务上刷新基准&#xff0c;GPT 系列更是开启了大语言模型时代。但这些亮眼成果的背后…

嵌入式人别再瞎折腾了!这8个开源项目,解决按键/队列/物联网所有痛点,小白也能抄作业

嵌入式人别再瞎折腾了&#xff01;这8个开源项目&#xff0c;解决按键/队列/物联网所有痛点&#xff0c;小白也能抄作业 你是不是也有过这样的崩溃时刻&#xff1a;想做个按键控制&#xff0c;结果长按、连击、组合键的逻辑写了200行if-else&#xff0c;最后还时不时串键&#…

C++篇(7)string类的模拟实现

一、string的成员变量string和数据结构中的顺序表类似&#xff0c;本质上可以理解成字符顺序表&#xff0c;其成员变量仍然是_str&#xff0c;_size和_capacity。但是&#xff0c;C标准库里面也有一个string&#xff0c;和我们要自己实现的string类冲突了&#xff0c;该如何解决…

【直接套模板】如何用 Web of Science 精准检索文献?

在文献检索的时候遇到一些问题&#xff0c;单独使用关键词检索出来的文章数量太多&#xff0c;如果是多加一些限定词&#xff0c;又什么都检索不到&#xff1a;比如我明明知道某篇论文已经发表&#xff0c;但在 Web of Science (WoS) 里却检索不到。这其实和检索式的写法密切相…

HTTP 协议:从原理到应用的深度剖析

一、什么是HTTP协议&#xff1f;HTTP协议&#xff0c;全称 Hyper Text Transfer Protocol&#xff08;超⽂本传输协议&#xff09;的缩写&#xff0c;是⽤于服务器与客户端浏览器之间传输超⽂本数据&#xff08;⽂字、图⽚、视频、⾳频&#xff09;的应⽤层协议。它规定了客户端…

【算法--链表】138.随机链表的复制--通俗讲解

算法通俗讲解推荐阅读 【算法–链表】83.删除排序链表中的重复元素–通俗讲解 【算法–链表】删除排序链表中的重复元素 II–通俗讲解 【算法–链表】86.分割链表–通俗讲解 【算法】92.翻转链表Ⅱ–通俗讲解 【算法–链表】109.有序链表转换二叉搜索树–通俗讲解 【算法–链表…

为什么现在企业注重数据可视化?一文讲清可视化数据图表怎么做

目录 一、企业注重数据可视化的原因 1.提升数据理解效率 2.发现数据中的规律和趋势 3.促进企业内部沟通与协作 4.增强决策的科学性 5.提升企业竞争力 二、可视化数据图表的基本概念 1.常见的可视化图表类型 2.可视化图表的构成要素 3.可视化图表的设计原则 三、制作…

Cursor 辅助开发:快速搭建 Flask + Vue 全栈 Demo 的实战记录

Cursor 辅助开发&#xff1a;快速搭建 Flask Vue 全栈 Demo 的实战记录 &#x1f31f; Hello&#xff0c;我是摘星&#xff01; &#x1f308; 在彩虹般绚烂的技术栈中&#xff0c;我是那个永不停歇的色彩收集者。 &#x1f98b; 每一个优化都是我培育的花朵&#xff0c;每一个…

实战:用 Python 搭建 MCP 服务 —— 模型上下文协议(Model Context Protocol)应用指南

&#x1f4cc; 实战&#xff1a;用 Python 搭建 MCP 服务 —— 模型上下文协议&#xff08;Model Context Protocol&#xff09;应用指南 标签&#xff1a;#MCP #AI工程化 #Python #LLM上下文管理 #Agent架构&#x1f3af; 引言&#xff1a;为什么需要 MCP&#xff1f; 在构建大…

宋红康 JVM 笔记 Day16|垃圾回收相关概念

一、今日视频区间 P154-P168 二、一句话总结 System.gc()的理解&#xff1b;内存溢出与内存泄漏&#xff1b;Stop The World;垃圾回收的并行与并发&#xff1b;安全点与安全区域&#xff1b;再谈引用&#xff1a;强引用&#xff1b;再谈引用&#xff1a;软引用&#xff1b;再谈…

OpenCV 高阶 图像金字塔 用法解析及案例实现

目录 一、什么是图像金字塔&#xff1f; 二、图像金字塔的核心作用 三、图像金字塔的核心操作&#xff1a;上下采样 3.1 向下采样&#xff08; pyrDown &#xff09;&#xff1a;从高分辨率到低分辨率 1&#xff09;原理与步骤 2&#xff09;关键注意事项 3&#xff09;…

【ARMv7】系统复位上电后的程序执行过程

引子&#xff1a;对于ARMv7-M系列SOC来说&#xff0c;上电后程序复位执行的过程相对来说比较简单&#xff0c;因为绝大部分芯片&#xff0c;都是XIP&#xff08;eXecute In Place&#xff0c;就地执行&#xff09;模式执行程序&#xff0c;不需要通过BooROM->PL(preloader)-…

神经网络的初始化:权重与偏置的数学策略

在深度学习中&#xff0c;神经网络的初始化是一个看似不起眼&#xff0c;却极其重要的环节。它就像是一场漫长旅程的起点&#xff0c;起点的选择是否恰当&#xff0c;往往决定了整个旅程的顺利程度。今天&#xff0c;就让我们一起深入探讨神经网络初始化的数学策略&#xff0c;…

第 16 篇:服务网格的未来 - Ambient Mesh, eBPF 与 Gateway API

系列文章:《Istio 服务网格详解》 第 16 篇:服务网格的未来 - Ambient Mesh, eBPF 与 Gateway API 本篇焦点: 反思当前主流 Sidecar 模式的挑战与权衡。 深入了解 Istio 官方的未来演进方向:Ambient Mesh (无边车模式)。 探讨革命性技术 eBPF 将如何从根本上重塑服务网格的…

摆动序列:如何让数组“上下起伏”地最长?

文章目录摘要描述题解答案题解代码分析代码解析示例测试及结果时间复杂度空间复杂度总结摘要 今天我们要聊的是 LeetCode 第 376 题 —— 摆动序列。 题目的意思其实很有意思&#xff1a;如果一个序列里的相邻差值能保持正负交替&#xff0c;就叫做“摆动”。比如 [1, 7, 4, 9…

玩转Docker | 使用Docker部署KissLists任务管理工具

玩转Docker | 使用Docker部署KissLists任务管理工具 前言 一、KissLists介绍 KissLists简介 KissLists核心特点 KissLists注意事项 二、系统要求 环境要求 环境检查 Docker版本检查 检查操作系统版本 三、部署KissLists服务 下载KissLists镜像 编辑部署文件 创建容器 检查容器状…

【滑动窗口】C++高效解决子数组问题

个人主页 &#xff1a; zxctscl 专栏 【C】、 【C语言】、 【Linux】、 【数据结构】、 【算法】 如有转载请先通知 文章目录前言1 209. 长度最小的子数组1.1 分析1.2 代码2 3. 无重复字符的最长子串2.1 分析2.2 代码3 1004. 最大连续1的个数 III3.1 分析3.2 代码4 1658. 将 x …

[rStar] 搜索代理(MCTS/束搜索)

第2章&#xff1a;搜索代理(MCTS/束搜索) 欢迎回到rStar 在前一章中&#xff0c;我们学习了求解协调器&#xff0c;它就像是解决数学问题的项目经理。 它组织整个过程&#xff0c;但本身并不进行"思考"&#xff0c;而是将这项工作委托给其专家团队。 今天&#x…

Electron 核心模块速查表

为了更全面地覆盖常用 API&#xff0c;以下表格补充了更多实用方法和场景化示例&#xff0c;同时保持格式清晰易读。 一、主进程模块 模块名核心用途关键用法 示例注意事项app应用生命周期管理• 退出应用&#xff1a;app.quit()• 重启应用&#xff1a;app.relaunch() 后需…