【JAVA】消息队列(MQ)是个好东西

一、前言

     再JAVA系统开发中,再高并发的场景经常需要使用到消息队列,有时候是不得不使用到消息对了。特别是大数据量的并发处理。对数据实时性要求又没那么高的情况下。

用户请求 → 接入层(Nginx) → 限流 → 消息队列 → 订单服务 → 库存服务 → 支付服务
↑                     ↓
结果缓存 ←───────────────┘

在高并发场景下,消息队列(MQ)作为系统解耦、流量削峰和异步处理的核心组件,其性能优化和稳定性保障至关重要。下面我将从架构设计、性能优化、可靠性保障等方面详细解析高并发场景下的MQ使用策略。

高并发场景下的MQ选型策略

1. 主流MQ性能对比

特性RabbitMQKafkaRocketMQPulsar
吞吐量万级百万级十万级百万级
延迟微秒级毫秒级毫秒级毫秒级
持久化支持支持支持支持
事务消息支持支持(0.11+)支持支持
高可用镜像队列分区复制主从复制分层存储
适用场景业务解耦/复杂路由日志/流处理订单/交易多租户/流计算

2. 选型建议

  • 电商秒杀:RocketMQ(事务消息+高吞吐)

  • 日志收集:Kafka(超高吞吐+分区存储)

  • 金融支付:RabbitMQ(强一致性+复杂路由)

  • 物联网IoT:Pulsar(多租户+低延迟)

二、MQ主要使用,一是数据产生,第二是消费

消息队列(MQ)是分布式系统中常用的异步通信机制,Java中常用的MQ实现包括RabbitMQ、Apache Kafka、ActiveMQ、RocketMQ等。下面我将介绍这些MQ在Java中的基本使用方法。

三、直接上代码示例

生产者:

/*** 消息队列的生产者*/
package cn.xxx.module.member.mq.producer;
@Slf4j
@Component
public class MemberUserProducer {@Resourceprivate ApplicationContext applicationContext;/*** 发送 {@link MemberUserCreateMessage} 消息** @param userId 用户编号*/public void sendUserCreateMessage(Long userId) {applicationContext.publishEvent(new MemberUserCreateMessage().setUserId(userId));}}

消费者:

/*** 消息队列的消费者*/
package cn.xxx.module.member.mq.consumer;
@Slf4j
@Component
public class MemberRegisterPointIssueConsumer implements ApplicationRunner {@Resourceprivate RocketTXMqService rocketTXMqService;@Resourceprivate MemberPointIssueApi memberPointIssueApi;@Value("${rocketmq.producer2.topic}")private String memberRegisterPointIssueTopic;@Overridepublic void run(ApplicationArguments args) throws Exception {DefaultMQPushConsumer pushConsumer = rocketTXMqService.getPushConsumer2();if (null != pushConsumer) {try {pushConsumer.subscribe(memberRegisterPointIssueTopic, "*");// 注册回调实现类来处理从broker拉取回来的消息pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息处理逻辑IssueRegisterTaskPointReqVO issueRegisterTaskPointReqVO = JSONObject.parseObject(msgs.get(0).getBody(), IssueRegisterTaskPointReqVO.class);log.info("%s Receive Topic %s New Messages: %s issueRegisterTaskPointReqVO: %s %n", Thread.currentThread().getName(), memberRegisterPointIssueTopic, msgs, JSONObject.toJSONString(issueRegisterTaskPointReqVO));memberPointIssueApi.issueRegisterTaskPoint(issueRegisterTaskPointReqVO);// 标记该消息已经被成功消费, 根据消费情况,返回处理状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 启动消费者实例pushConsumer.start();log.info("push topic{} consumer start success", memberRegisterPointIssueTopic);} catch (MQClientException e) {log.error("push topic{} MQClientException:{}", memberRegisterPointIssueTopic, e.getMessage());}}}
}

可靠性保障机制

1. 消息不丢失设计

环节保障措施
生产者事务消息/confirm机制+本地消息表+定时任务补偿
Broker同步刷盘+多副本同步复制(ISR)+ RAID磁盘阵列
消费者手动ACK+消费幂等设计+死信队列+消息轨迹追踪

 消息积压处理方案

// 动态扩容消费者实例(Kafka示例)
// 1. 监控积压量
long lag = getConsumerLag("order-topic", "consumer-group");
// 2. 自动扩容规则
if (lag > 100000) {  // 积压超过10万scaleConsumerInstances(2); // 双倍扩容
} else if (lag < 1000) {scaleConsumerInstances(0.5); // 缩容50%
}// 3. 紧急处理方案
if (lag > 500000) {// 启动降级处理服务startDegradeService();// 消息转存至冷存储transferToColdStorage();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/87662.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/87662.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Golang面试题】Go结构体的特点,与其它语言的区别

Go 结构体深度解析&#xff1a;与 C/C、Java 的全面对比 一、核心概念对比特性Go 结构体 (struct)C/C 结构体 (struct)Java 类 (class)本质值类型复合数据类型值类型复合数据类型引用类型内存分配栈或堆 (编译器决定)栈 (显式控制)堆 (JVM管理)默认访问权限首字母大写导出publi…

CppCon 2018 学习:OOP is dead, long live Data-oriented design

探讨了面向对象编程&#xff08;OOP&#xff09;的一些根本性问题深入理解&#xff1a; 标题&#xff1a;What is so wrong with OOP? 什么是面向对象的问题&#xff1f; 这不是说 OOP “绝对错误”&#xff0c;而是指出它在实践中经常引发的问题&#xff0c;尤其是在性能敏…

科学的第五范式:人工智能如何重塑发现之疆

在人类探索未知的壮阔史诗中&#xff0c;科学方法的演进如同照亮迷雾的灯塔。从基于经验的第一范式&#xff08;描述自然现象&#xff09;&#xff0c;到以理论推演为核心的第二范式&#xff08;牛顿定律、麦克斯韦方程&#xff09;&#xff0c;再到以计算机模拟为标志的第三范…

tmux 左下角会话名显示不全的解决方法

在 tmux 中显示完整的会话名 有时候我们要在服务器上长时间跑某个任务&#xff0c;但不可能时时刻刻保持终端模拟器开启&#xff0c;这时候就需要用到 tmux &#xff0c;可以在关闭会话的同时让任务继续在后台跑&#xff0c;后续还可以连回来。但在 tmux 会话中&#xff0c;左…

【期末分布式】分布式的期末考试资料大题整理

&#x1f9f8;安清h&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;【Spring篇】【计算机网络】【Mybatis篇】 &#x1f3af;大题 ✨一.Nacos的服务注册与发现 &#x1f6a6;1.怎么来进行服务的注册与发现的这样的一个流程&#xff0c;描述一下。 &#x1f383;描述…

Android手机无网离线使用FunASR识别麦克风语音内容

手机断网离线使用FunASR识别麦克风语音内容 --本地AI电话机器人 上一篇&#xff1a;阿里FunASR本地断网离线识别模型简析 下一篇&#xff1a;手机无网离线使用FunASR识别手机历史通话录音 一、前言 继上一篇《阿里FunASR本地断网离线识别模型简析》和前面几篇ASR相关理论的…

Stable Diffusion 项目实战落地:从0到1 掌握ControlNet 第五篇 线稿到高清修复:一步步教你用AI做出完美IP形象

大家好!上一篇,我们一起玩转了字体风格变换 ,让文字根据提示词进行自如变换,个性十足又充满创意! 如果你错过了那篇文章,别担心,赶紧点这里补课:Stable Diffusion 项目实战落地:从0到1 掌握ControlNet 第四篇 风格化字体大揭秘:从线稿到涂鸦,ControlNet让文字焕发新生…

Java网络编程:TCP/UDP套接字通信详解

TCP客户端套接字创建与使用 Socket类基础概念 Socket类的对象代表TCP客户端套接字&#xff0c;用于与TCP服务器套接字进行通信。与服务器端通过accept()方法获取Socket对象不同&#xff0c;客户端需要主动执行三个关键步骤&#xff1a;创建套接字、绑定地址和建立连接。 客户端…

VMware vSphere 9与ESXi 9正式发布:云原生与AI驱动的虚拟化平台革新

2025年6月18日&#xff0c;VMware正式推出其旗舰虚拟化平台vSphere 9及配套的ESXi 9操作系统&#xff0c;标志着企业级虚拟化技术迈入以云原生、人工智能&#xff08;AI&#xff09;和硬件加速为核心的新纪元。此次更新不仅在功能层面实现突破&#xff0c;更通过授权模式革新为…

汽车功能安全概念阶段开发【相关项定义HARA】2

文章目录 1 浅谈概念阶段开发2 功能安全概念阶段开发2.1 相关项定义2.2 危害分析与风险评估&#xff08;HARA-Hazard Analysis and Risk Assessment&#xff09; 3 关键输出与对后续阶段的影响4 总结 1 浅谈概念阶段开发 概念阶段开发是整个研发流程的起点和基石。它发生在任何…

WPF中依赖属性和附加属性

依赖属性&#xff08;DependencyProperty&#xff09; 依赖属性是WPF中的一种特殊属性&#xff0c;它的实现依赖于DependencyObject类提供的基础设施。与普通的.NET属性不同&#xff0c;依赖属性的值可以通过多种方式确定&#xff0c;包括继承、样式、数据绑定和动画等。 主要特…

Docker 中如何实现镜像的推送和拉取

在 Docker 中&#xff0c;镜像的推送&#xff08;push&#xff09;和拉取&#xff08;pull&#xff09;是通过与**Docker 镜像仓库&#xff08;Registry&#xff09;**交互完成的。默认仓库是 Docker Hub&#xff0c;但你也可以使用私有仓库&#xff08;Harbor、Nexus、AWS ECR…

[C#] WPF - 自定义样式(Slider篇)

一、定义样式 在App.xaml里面定义样式&#xff1a; <Applicationx:Class"WpfApp.StudySlider.App"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml"xmlns:local&q…

eBPF 实战指南:精准定位 TCP 重传,洞察网络瓶颈真相

更多云服务器知识&#xff0c;尽在hostol.com 你有没有遇到过这种情况&#xff1f;网站访问卡顿&#xff0c;接口响应慢得像蜗牛爬。你 ping 服务器没丢包&#xff0c;CPU 内存也没打满&#xff0c;日志也没报错&#xff0c;结果就是不知道哪儿出的问题。 你用抓包分析&#x…

在 Ubuntu 系统上安装 Docker 环境

在当今的开发环境中&#xff0c;Docker 已经成为容器化技术的主流选择。它可以帮助开发者轻松地创建、部署和运行应用程序。本文将详细介绍如何在 Ubuntu 系统上安装 Docker 和 Docker Compose&#xff0c;并解决在安装过程中可能遇到的一些常见问题。 一、安装 Docker 1.卸载旧…

【Qt】QxORM无法删除和更改主键值为0的行,否则报错:invalid primary key

1、问题描述 使用 QxORM 删除或者更改数据库时,当主键值为 0 时,报错: [QxOrm] invalid primary key2、原因分析 2.1 源码分析 查找打印错误提示的代码: #define QX_DAO_ERR_INVALID_PRIMARY_KEY "[QxOrm] invalid primary key" QSqlError IxDao_Help…

数学建模_线性规划

问题背景模型介绍matlab求解 示例 问题背景 模型介绍 matlab求解 max问题转化为min问题 > > >号转化为 < < <号 示例 看到多个线性规划目标 2个目标函数变成1个目标函数 后面省略

51单片机制作万年历

硬件设计 主控芯片&#xff1a;一般选用AT89C52单片机&#xff0c;它与MCS - 51单片机产品兼容&#xff0c;有8K字节在系统可编程Flash存储器、32个可编程I/O口线、三个16位定时器 / 计数器等。时钟芯片&#xff1a;常用DS1302时钟芯片&#xff0c;能提供实时时钟 / 日历、定时…

Oracle CTE递归实现PCB行业的叠层关系

1、需求背景&#xff0c;出货报告要实现叠板假层的处理&#xff0c;需求如下 表ID,layer,MEDIUM数据如下 第一种情况&#xff0c;layer有K的 IDlayerMEDIUM1L1-L2302L2-L3403L3-K1204K1-L4105L4-L5206L5-L6307L7-K2108K2-L8119L8-L91010L9-L1030 实现layer有K1的&#xff0c…

Kubernetes 服务发布基础学习

一、Service 概述&#xff08;一&#xff09;Service 的定义Service 是 Kubernetes 中的一种抽象概念&#xff0c;用于定义一组 Pod 以及访问这组 Pod 的策略。其核心作用是将一组 Pod 封装为一个虚拟服务&#xff0c;并为客户端提供统一的入口&#xff0c;从而实现服务的负载均…