深度剖析 TDMQ RabbitMQ 版经典队列底层存储机制

导语

RabbitMQ 作为开源消息队列的标杆产品,凭借灵活的路由机制与高可用设计,支撑着海量业务场景的消息流转。而经典队列(Classic Queue) 作为 RabbitMQ 最基础、应用最广泛的队列类型,其底层存储机制直接决定了消息处理的性能边界与可用性上限。

理解经典队列的存储架构,不仅是掌握 RabbitMQ 核心原理的关键,更为生产环境的运维优化提供了理论支撑。本文将从文件目录结构、存储格式定义、读写流程到运维实践策略,全面解析经典队列的底层存储实现逻辑,帮助读者深入理解其在消息生命周期管理中的核心作用。

经典队列介绍

RabbitMQ 作为一款历史悠久的开源消息队列,被广泛应用于各个领域。在 RabbitMQ 中,用户使用虚拟主机(Vhost)隔离资源,交换机负责路由消息,队列则是消息存储的最小单元。

用户通过客户端与 RabbitMQ 的服务端建立连接后,基于通道(Channel)实现消息的高效交互:生产者经过通道将消息发送至交换机,由交换机按绑定规则路由至目标队列;消费者则通过通道从队列中拉取消息,完成业务逻辑处理。

在这一过程中,队列作为消息生命周期的核心载体,衍生出三种差异化实现:

  • 经典队列(Classic Queue):采用轻量级索引与共享存储架构,在单机性能与存储效率间取得平衡,适用于高吞吐非强一致性场景;

  • 仲裁队列(Quorum Queue):基于 Raft 协议实现多副本强一致性,保障关键业务数据不丢失,适用于金融交易、订单管理等关键业务;

  • 流队列(Stream Queue):以日志结构存储消息流,支持回溯消费与持久化流处理,适用于实时数据分析场景。

经典队列作为使用频率最高的队列,了解它的存储机制对于理解其可用性和性能至关重要,接下来将从存储架构、文件格式、读写流程等维度,深入解析经典队列的底层实现逻辑。

存储架构解析

目录结构

RabbitMQ 通过虚拟主机(Vhost)实现资源隔离,每个 Vhost 有独立的物理存储目录,其典型结构如下:

vhost_name/
├── msg_store_persistent/      # 共享存储目录,存储大消息
│   ├── 0.rdq                  # 共享存储文件
│   └── 1.rdq                  # 支持文件滚动
└── queues/                    # 队列专属存储目录└── queue_name/            # 单个队列目录├── queue_name.qi      # 队列索引文件└── queue_name.qs      # 队列存储文件

msg_store_* 是共享存储目录,顾名思义是这个 Vhost 下所有队列共享的存储。由于 Exchange 可能会将同一条消息路由到不同的队列,而将同一条消息存储多次会增加磁盘空间占用,因此经典队列会将大小超过某个阈值的消息存储在共享存储下,通过引用计数来管理这部分消息。

每个队列在 queues 目录下都有属于自己的目录,队列目录下主要有两类文件:

  • 队列存储:名称为 *.qs 的文件,负责存储这个队列中消息大小小于这个阈值的消息。

  • 队列索引:名称为 *.qi 的文件,负责存储消息元数据和消息所在位置。队列索引存储了消息的偏移或唯一标识,通过它们可以定位到消息在队列存储或共享存储中的位置,索引文件中的 Entry 和存储文件中的 Entry 因此在逻辑上构成了一对一的映射关系。

队列索引

队列索引文件由一个 Header 和若干 Entry 组成,Entry 的数量由 classic_queue_index_v2_segment_entry_count 这一参数控制,默认为4096。Entry 有两种类型:Publish Entry 和 Ack Entry。

生产者将消息成功发送到队列后会产生一个 Publish Entry,队列将这条消息投递给消费者并且得到消费者确认后会使用 Ack Entry 覆盖原来的 Publish Entry,代表这条消息可以被删除。

Publish Entry 存储了这条消息的元数据,包括 MsgId、SeqId、存储位置、消息属性和是否持久化的标识。

MsgId 是 RabbitMQ 为每条消息随机生成的 GUID,用来确定消息在共享存储的位置。

SeqId 是这条消息在队列中的序号,用来决定消息在队列索引和队列存储中的位置。

队列存储

队列存储文件和索引文件是一对一的关系,当队列删除它的索引文件时,也会删除对应的存储文件。队列存储文件的结构与索引文件类似,也是由 Header 和 Entry 构成。Header 和 Entry 的具体组成如下所示。

共享存储

ETS 是 Erlang 内置的单机 KV 存储,共享存储使用 ETS 维护了两个组件:

  • Index:是 MsgId 到消息位置的映射。

  • FileSummary:文件到文件统计信息的映射。

经典队列在读取消息时通过索引文件中的 Publish Entry 获取到 MsgId 后还需要从 Index 中获取消息的具体位置,包括这条消息所在的文件、偏移以及它的引用计数。相同 MsgId 的多条消息只会被写入一次,删除消息时,它的引用计数会被减一。文件统计信息中记录了文件中有效数据的数量,这在整理文件时会被用到。

共享存储文件的大小由参数 msg_store_file_size_limit 控制,默认为16MB。每个文件由若干个 Entry 组成,每个 Entry 的具体组成如下所示。

核心工作流程

消息写入

RabbitMQ 根据消息大小决定将消息写入到哪个存储。如果消息大小大于或等于某个值(由参数 queue_index_embed_msgs_below 控制,默认为4KB),RabbitMQ 会将其存于共享存储中,否则会存于队列存储中。

将消息写入存储时会直接写到内部缓冲区:

  • 队列存储内部的缓冲区大小由参数 classic_queue_store_v2_max_cache_size 控制,默认为512KB。

  • 共享存储内部的缓冲区大小则固定为1MB。将消息写入到共享存储时除了需要写入到缓冲区外,还需要更新它内部的 Index 和 FileSummary 组件。

缓冲区大小超过限制后会 Flush 其中的数据,值得注意的是,Flush 时不会调用 Fsync,而是调用 Write 将数据写入到操作系统的 Page Cache 上。这种方式通过牺牲数据安全性以获得更低的延迟,如果需要更强的数据安全性应使用仲裁队列。

存储写入完成后需要在队列索引文件中写入 Publish Entry,此时消息被认为成功写入了。之后还要更新内存中的消息缓存,以加速消息读取。

消息读取

经典队列在内存中维护了专门的缓存来提升读取性能,底层存储会根据队列的消费速率批量读取不超过2048条消息到缓存中。读取消息时会先检查缓存中是否有这条消息,如果有则直接返回,否则会先将消息批量读取到缓存。

将消息从磁盘批量读取到内存中需要先到队列索引中读取元数据,然后分别到队列存储和共享存储中读取消息体,并将它们组装到一起。即便缓存中有消息,但是实际的消息体仍然可能不在缓存中,因为过大(>12KB)过少(<10条)的消息的消息体并不会被读到缓存里,需要在投递消息时逐条去磁盘中读取消息体。

文件整理

共享存储会定时整理有效数据占比低于一半的文件以回收空间。整个过程分为三步:

  1. 将文件末尾的有效数据拷贝到文件前面的无效数据处。
  2. 更新 Index 组件。
  3. 在没有进程读取文件后截断文件。

RabbitMQ 会将文件中的无效数据置0,称为空洞(blank holes)。在文件整理时,RabbitMQ 从最后一条有效消息开始查看其是否能填补前面的空洞,如果可以就将其拷贝到前面,如果它比前面的任何一个空洞都大,那么这一次的文件整理将无法释放任何空间,这是为了防止意外覆盖被移动过的消息。Index 组件中存储了消息的位置,拷贝完成后需要更新对应消息的位置。在没有进程读取文件后就可以截断这个文件以节省磁盘空间。

运维实践

发送确认

为了提高消息发送的可靠性,我们推荐用户打开发送确认(Confirm)。RabbitMQ 会在将消息从缓冲区 Flush 到磁盘后向客户端发送 Confirm,此时生产者可以认为这条消息已经被成功发送到队列。

消费确认

为了提高消息消费的可靠性,我们推荐用户打开手动确认(Manual Ack)。RabbitMQ 在收到 Ack后会写入 Entry 到队列索引中,只有在索引文件中的所有 Publish Entry 全部被 Ack 后,才会删除该文件。如果消费者在发送 Ack 前宕机了,RabbitMQ 会重复投递这条消息,确保消息能真正被消费掉。未被客户端 Ack 的消息会堆积在内存中,如果数量过多则可能触发内存水位限制,甚至导致服务端 OOM。因此在用户打开手动确认后,我们建议用户设置一次最多能预取(prefetch count)的消息数量,避免大量消息堆积在客户端和服务端内存中。

保证队列尽可能短

保持生产和消费速率一致可以减少消息堆积。RabbitMQ 会在发现索引文件中的消息全部被消费后删除索引文件和对应的存储文件,这样可以减少磁盘空间占用。队列的堆积数量少意味着多数读取都可以从缓存中直接读取到消息体,从而提升读取性能。

总结

本文全面探讨了 RabbitMQ 经典队列的底层存储机制,包括其整体架构、实现原理及运维实践。经典队列的底层存储由队列索引和消息存储两大模块构成,其中消息存储又细分为共享存储和队列存储,通过精心设计的文件结构和内存管理策略,实现了高效的消息读写与存储管理。文章详细解析了队列索引、消息存储(包括共享存储和队列存储)的文件结构,介绍了消息读取与写入的流程,以及文件整理的逻辑。在运维实践方面,强调了发送确认、消费确认与保持队列尽可能短的重要性,并给出了相应的配置建议。希望通过本文的介绍,可以帮助大家深入理解 RabbitMQ 经典队列的底层存储机制,为实际应用中的性能优化与故障排查提供有力支持。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/91650.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/91650.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Spring AI开发智能客服(Tool calling)

文章目录前言1 思路分析2 工程结构搭建1_数据库表2_引入依赖3_基础代码3 定义 Tool1_分析查询条件2_定义Function4 系统提示词5 配置ChatClient6 编写Controller7 测试8 Tool calling 底层组件1_ToolCallback2_ToolDefinition3_ToolCallingManager4_ResultConverter5_ToolConte…

设计模式笔记_结构型_适配器模式

1.适配器模式介绍适配器模式是一种结构型设计模式&#xff0c;它允许不兼容的接口协同工作。适配器模式的核心思想是将一个类的接口转换成客户期望的另一个接口&#xff0c;使得原本由于接口不兼容而不能一起工作的类可以一起工作。你可以将其想象成一个“转换插头”——假设你…

事务隔离:从锁实现到MVCC实现

文章目录事务隔离&#xff1a;从锁实现到MVCC实现事务四大特性事务隔离级别锁实现概念实现事务隔离MVCC实现当前读与快照读实现事务隔离Read View总结事务隔离&#xff1a;从锁实现到MVCC实现 面试的时候被面试官问到&#xff1a;你这个项目为什么使用了可重复读而不选择读已提…

小架构step系列18:工具

1 概述 在写代码的时候&#xff0c;有很多通用的、与业务无关逻辑&#xff0c;这些一般写成工具类方法。这些工具类方法慢慢地被积累起来&#xff0c;变成了开源包&#xff0c;可以直接使用开源包&#xff0c;而不是自己再花时间来重复造这些轮子。 这些工具类的开源包比较多…

网络、CentOS 系统、数据库面试知识点总结

文章目录Linux CentOS 面试知识点整理速查复习✅ 一、Linux 高频面试题✅ 二、MySQL 高频面试题✅ 三、计算机网络&#xff08;OSI四层模型&#xff09;高频面试题&#x1f517; 链路层&#xff08;Link Layer&#xff09;&#x1f310; 网络层&#xff08;Internet Layer&…

Vue (Official) v3.0.2 新特性 为非类npm环境引入 globalTypesPath 选项

目录 前言 报错信息 原因 解决方案 总结 前言 在早上更新了vscode后&#xff0c;发现自己 uni-app 项目的 .vue文件 的 template 标签都出现了报错。定位到了问题是因为 Vue (Official) 插件更新导致的&#xff0c;重装了插件的上一个小版本&#xff0c;报错消失&#xff…

程序可能的输出

#include "csapp.h"int main() {int x 3;if (Fork() ! 0)printf("x%d\n", x);printf("x%d\n", --x);exit(0); }分析&#xff1a;父进程先执行printf("x%d\n", x); 输出x4。后执行 printf("x%d\n", --x);输出x3。子进程只执…

2025年UDP应用抗洪指南:从T级清洗到AI免疫,实战防御UDP洪水攻击

一次未防护的UDP暴露&#xff0c;可能让日活百万的应用瞬间瘫痪&#xff0c;损失超千万2025年&#xff0c;随着物联网僵尸网络规模指数级增长及AI驱动的自适应攻击工具泛滥&#xff0c;UDP洪水攻击峰值已突破8Tbps&#xff0c;单次攻击成本却降至50元以下。更致命的是&#xff…

centos7安装MySQL8.4手册

目录前言一、首先更新插件&#xff0c;并查看当前系统版本二、安装步骤1、创建mysql目录2、安装rpm包3、安装 mysql-community-server4、启动MySQL服务5、查看MySQL状态6、设置开机自启动三、查看默认密码四、登录mysql五、修改密码六、开启远程访问1. 修改 MySQL 配置文件2. 重…

人脸检测算法——SCRFD

SCRFD算法核心解析 1. 算法定义与背景 SCRFD&#xff08;Sample and Computation Redistribution for Efficient Face Detection&#xff09;由Jia Guo等人于2021年在arXiv提出&#xff0c;是一种高效、高精度的人脸检测算法&#xff0c;其核心创新在于&#xff1a; 双重重分…

vue3+ts+elementui-表格根据相同值合并

代码<div style"height: auto; overflow: auto"><el-table ref"dataTableRef" v-loading"loading" :data"pageData" highlight-current-row borderselection-change"handleSelectionChange" :span-method"obj…

UI前端与数字孪生融合案例:智慧城市的智慧停车引导系统

hello宝子们...我们是艾斯视觉擅长ui设计、前端开发、数字孪生、大数据、三维建模、三维动画10年经验!希望我的分享能帮助到您!如需帮助可以评论关注私信我们一起探讨!致敬感谢感恩!一、引言&#xff1a;停车难的 “城市痛点” 与数字孪生的破局之道当司机在商圈绕圈 30 分钟仍…

java+vue+SpringBoot集团门户网站(程序+数据库+报告+部署教程+答辩指导)

源代码数据库LW文档&#xff08;1万字以上&#xff09;开题报告答辩稿ppt部署教程代码讲解代码时间修改工具 技术实现 开发语言&#xff1a;后端&#xff1a;Java 前端&#xff1a;vue框架&#xff1a;springboot数据库&#xff1a;mysql 开发工具 JDK版本&#xff1a;JDK1.8 数…

【Docker基础】Docker-compose从入门到精通:安装指南与核心命令全解析

目录 前言 1 Docker-compose核心概念解析 1.1 什么是Docker-compose&#xff1f; 1.2 典型应用场景 2 Docker-compose离线安装详解 2.1 离线安装背景与优势 2.2 详细安装步骤 步骤1&#xff1a;获取离线安装包 步骤2&#xff1a;文件部署与权限设置 步骤3&#xff1a…

面试150 被围绕的区域

思路 使用DFS&#xff0c;将所有与边界相连的’O’都修改为‘#’,然后遍历数组&#xff0c;如果是遇到’#‘修改为’O’,如果是’O’修改为’X’。 class Solution:def solve(self, board: List[List[str]]) -> None:"""Do not return anything, modify boar…

(数据结构)线性表(上):SeqList 顺序表

线性表&#xff08;上&#xff09;&#xff1a;Seqlist 顺序表基本了解线性表顺序表静态顺序表动态顺序表编写动态顺序表项目结构基础结构初始化尾插头插尾删头删查找指定位置pos之前插入数据删除指定位置pos的数据销毁完整代码SeqLIst.hSeqLIst.ctest.c算法题移除元素删除有序…

WebStorm vs VSCode:前端圈的「豆腐脑甜咸之争」

目录 一、初识两位主角&#xff1a;老司机与新势力 二、开箱体验&#xff1a;是「拎包入住」还是「毛坯房改造」 三、智能提示&#xff1a;是「知心秘书」还是「百度搜索」 四、调试功能&#xff1a;是「CT 扫描仪」还是「听诊器」 五、性能表现&#xff1a;是「重型坦克」…

C#将类属性保存到Ini文件方法(利用拓展方法,反射方式获取到分组名和属性名称属性值)

前言&#xff1a;最近学习C#高级课程&#xff0c;里面学到了利用反射和可以得到属性的特性、属性名、属性值&#xff0c;还有拓展方法&#xff0c;一直想将学到的东西利用起来&#xff0c;刚好今天在研究PropertyGrid控件时&#xff0c;想方便一点保存属性值到配置文件&#xf…

kafka 单机部署指南(KRaft 版本)

目录环境准备JDK安装下载jdkjdk安装kafka 部署kafka 下载kafka 版本号结构解析kafka 安装下载和解压安装包配置 KRaft 模式格式化存储目录启动kafkaKafka 配置为 systemd 服务注意事项调整 JVM 内存参数Kafka KRaft 版本&#xff08;即 Kafka 3.0 及更高版本&#xff09;使用 K…

websocket案例 599足球比分

目标地址:aHR0cHM6Ly93d3cuNTk5LmNvbS9saXZlLw接口:打开控制台 点websocket 刷新页面 显示分析:不写理论了关于websocket 几乎发包位置都是下方图片 不管抖音还是快手 等平台这里在进行 new WebSocket 后 是要必须走一步的 也就是 new WebSocket().onopen() 也就是onopen 进行向…