Apache RocketMQ 中Message (消息)的核心概念

好的,我们来深入理解一下 Apache RocketMQ 中 Message (消息) 这个核心概念。这份文档详细阐述了消息的定义、在模型中的位置、内部属性、约束和使用建议。

你可以将 Message 看作是 RocketMQ 系统中数据传输和处理的最小原子单位。它承载了业务数据,并附带了丰富的元信息,是生产者、Broker 和消费者之间通信的载体。


1. Message 的本质定义

  • 最小传输单元 (Smallest Unit of Data Transmission)

    • Message 是 RocketMQ 中数据传输的基本单元。生产者将业务数据(负载)和扩展属性封装成 Message,发送给 Broker;Broker 再根据订阅关系将 Message 传递给消费者。
  • 核心特性 (Characteristics)

    • 不可变性 (Immutability)
      • 消息一旦生成,其内容(特别是系统属性和负载)在传输和存储过程中不会改变。它被视为一个“已发生的事件”。
      • 消费者获取到的消息是只读的 (read-only)。在 5.x 版本中,这是强约束;在 3.x/4.x 版本中虽无强约束,但最佳实践也建议不要修改。
      • 最佳实践:如果需要基于收到的消息发送新消息,应该创建一个新消息(例如 MessageBuilder.buildFrom(m)),而不是直接修改原消息。
    • 持久性 (Persistence)
      • 默认情况下,RocketMQ 会将接收到的消息持久化存储在 Broker 的存储文件中。这是保证消息不丢失、支持消息追溯和系统故障恢复的基础。

2. Message 在模型中的位置

在这里插入图片描述

  • 生命周期流程
    1. 生产 (Produced):由生产者 (Producer) 创建并初始化。
    2. 发送 (Sent):生产者将消息发送到 Apache RocketMQ Broker
    3. 存储 (Stored):Broker 接收到消息后,将其按接收顺序存储在特定 Topic 的某个 Queue 中。
    4. 消费 (Consumed)消费者 (Consumer) 根据订阅关系,从 Broker 的相应 Queue 中拉取 (pull) 消息进行消费。

3. Message 的核心内部属性

这些属性分为系统保留属性 (System retention attributes)可选属性 (Optional attributes),以及负载 (Load)

系统保留属性 (由系统或生产者设置)
  • Topic 名称 (Topic name)

    • 作用:标识该消息属于哪个逻辑主题。在集群内必须唯一。
    • 来源:由生产者 SDK 设置。
  • 消息类型 (Message type)

    • 作用:定义消息的语义和处理方式。RocketMQ 支持多种类型:
      • Normal:普通消息,无特殊语义。
      • FIFO:顺序消息,保证同一消息组 (Message Group) 内的消息按发送顺序被消费。实现依赖于 Queue 的有序性。
      • Delay:延迟消息,可以指定延迟时间(最大40天),延迟时间到后才对消费者可见。
      • Transaction:事务消息,用于实现分布式事务,确保本地数据库操作和消息发送的最终一致性。
  • 消息队列 (Message queue)

    • 作用:指明该消息最终被存储在哪个具体的 Queue 中(属于哪个 Topic 的哪个 Queue)。
    • 来源:由 Broker 在消息到达后,根据路由策略(如轮询、哈希等)指定并填充
  • 消息 Offset (Message offset)

    • 作用:标识该消息在其所属 Queue 内部的物理存储位置(偏移量)。是 Broker 内部管理消息顺序和消费者消费进度的关键。
    • 来源:由 Broker 指定并填充。从 0 开始递增。
  • 消息 ID (Message ID)

    • 作用:消息的全局唯一标识符。在集群内绝对唯一,用于消息追踪、排查问题。
    • 来源:由生产者客户端自动生成(通常是 32 位的数字和大写字母组成的字符串)。
可选属性 (由生产者设置)
  • (可选) 消息 Keys (Message keys)

    • 作用:为消息设置一个或多个索引键。主要用于消息查询(通过 Message IDMessage Key 在控制台或通过 API 查找特定消息)和去重(结合业务逻辑)。
    • 来源:由生产者客户端定义。
  • (可选) 消息 Tag (Message tag)

    • 作用:消息的标签,用于消费者进行消息过滤。消费者可以订阅特定的 Tag,从而只接收带有该 Tag 的消息,实现简单的消息分类。
    • 来源:由生产者客户端定义。
    • 约束:每个消息只能设置一个 Tag
  • (可选) 定时时间 (Scheduled time)

    • 作用:配合 Delay 消息类型使用,指定消息延迟的具体时间戳(毫秒级),而不是延迟时长。
    • 来源:由消息生产者定义。
    • 约束:最大延迟时间 40 天
时间戳属性
  • 消息发送时间 (Message sending time)

    • 作用:记录消息在生产者客户端本地被发送出去的时间戳(毫秒级)。
    • 来源:由生产者客户端填充
    • 注意:这是客户端时间,可能与 Broker 时间有偏差。
  • 消息存储时间 (Message store timestamp)

    • 作用:记录消息被Broker 成功写入存储(落盘)的时间戳(毫秒级)。对于延迟消息和事务消息,消费者感知到的“有效时间”通常基于此时间。
    • 来源:由Broker 填充
    • 注意:这是 Broker 时间,是消息在服务端的“出生”时间。
重试与自定义
  • 重试次数 (Retry times)

    • 作用:记录该消息被 Broker 重新投递给消费者的次数。每次消费失败触发重试,次数加一。第一次消费时为 0。
    • 来源:由 Broker 标记。消费者可以获取此信息以进行幂等处理或特殊逻辑。
  • 自定义属性 (Custom attributes)

    • 作用:生产者可以添加任意的 Key-Value (字符串类型) 对作为扩展信息,供业务逻辑使用。
    • 来源:由生产者根据需要指定。
  • 消息负载 (Message load)

    • 作用:消息的实际业务数据内容,即有效载荷 (Payload)。
    • 来源:由生产者序列化成二进制字节流后设置。
    • 约束:大小不能超过系统限制。

4. Message 的行为约束

  • 大小限制 (Size Limit)
    • 核心约束:单条消息的大小不能超过上限,否则发送会失败。
    • 默认限制4 MB。这是非常重要的参数,直接影响网络传输、存储和处理性能。

5. Message 的使用建议与最佳实践

  • 避免单条消息过大 (Overloaded transmission)

    • 原因:RocketMQ 是事件驱动的中间件。过大的消息会:
      • 加重网络传输负担,增加延迟。
      • 影响错误重试:重试大消息成本高。
      • 影响流控 (Throttling):流控粒度可能不够精细。
    • 建议
      1. 严格控制单条消息的数据量,使其尽可能小。
      2. 如果业务上必须传输大量数据,强烈建议
        • 拆分消息:将大数据按固定大小拆分成多条小消息。
        • 使用外部存储:将实际数据(如文件、图片)存放到对象存储(如 OSS)、文件系统或数据库中,然后在消息的 loadcustom attributes只传递数据的访问链接 (URL) 或 ID
  • 遵守消息不可变性原则 (Immutability)

    • 正确做法:收到消息后,如果需要转发或基于它生成新消息,使用 MessageBuilder.buildFrom(m) 这样的方法创建一个新消息实例,然后修改新实例的属性(如 Topic, Tag, Load 等)再发送。
    • 错误做法:直接调用 m.update() 修改收到的消息 m 的内容,然后发送。这违反了不可变性原则,可能导致不可预知的行为或在 5.x 版本中被拒绝。

总结与核心理解

  1. Message 是原子单元:它封装了业务数据和元信息,是 RocketMQ 传输的最小单位。
  2. Message 是不可变的:内容一旦产生,在传递过程中不应被修改。最佳实践是“读取-创建-发送”新消息。
  3. Message 是持久化的:默认落盘存储,保证可靠性。
  4. Message 拥有丰富的属性
    • Topic/Queue/Offset 定义了其在系统中的位置和顺序。
    • Message ID 提供全局唯一标识。
    • Keys/Tag 支持查询和过滤。
    • Message Type 定义了语义(普通、顺序、延迟、事务)。
    • Sending Time/Store Timestamp 记录了关键时间点。
    • Retry Times 协助处理消费失败。
    • Custom Attributes 提供扩展能力。
    • Load 承载实际业务数据。
  5. Message 有严格的大小限制默认 4MB避免大消息是关键设计原则,应通过拆分外链方式处理大数据。
  6. 最佳实践
    • 合理使用 Tag 进行消息过滤。
    • 利用 Keys 进行消息追踪。
    • 遵守不可变性,通过 buildFrom 创建新消息。
    • 绝对不要发送超过 4MB 的消息,采用拆分或外链方案。

简而言之,Message 是 RocketMQ 的“信封”和“信件”本身。理解其结构、属性、约束和最佳实践,对于设计高效、可靠、可维护的消息系统至关重要。记住:小消息、不可变、善用属性、规避大负载

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/91891.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/91891.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C 语言问题

1. C语言中 union 与 struct 的区别类型structunion内存分配机制编译器为每个成员‌独立分配内存空间,总内存大小 所有成员大小之和(考虑内存对齐)所有成员‌共享同一段内存空间,总内存大小 ‌最大成员的大小‌数据存储特性1. 所…

[ LeetCode优选算法专题一双指针-----盛最多的水]

1.题目链接 LeetCode盛最多的水 2.题目描述 3.题目解析 问题本质分析 "盛最多水的容器" 问题可以抽象为:在坐标轴上有 n 条垂直线段,第 i 条线段的两个端点分别是 (i, 0) 和 (i, height [i])。找到两条线段,使得它们与 x 轴共同…

旧笔记本电脑如何安装飞牛OS

01引言随着电子产品的更新换代,我们有很多的电子产品已经满足不了现在的工作需求和日常娱乐了,比如:用了很久厚重笔记本电脑放在现在办公也是有点吃力了,我们现在换新了旧的还不想放在那里吃灰,怎么办呢?我…

某金服Java面试终极指南:25题完整解析与场景化方案

涵盖分布式锁、缓存、事务、高并发等金融系统核心考点,附解决方案与抗风险设计一、分布式锁深度解决方案 1. Redis分布式锁完整实现 // 原子加锁 防死锁 String uuid UUID.randomUUID().toString(); Boolean locked redisTemplate.opsForValue().setIfAbsent(&qu…

MATLAB 2025a的下载以及安装,安装X310的测试附加功能(附加安装包)

首先将安装包下载到本地中之后解压该文件夹,打开文件发现有两个文件,其中crach文件夹中是破解matlab所用到的文件。而另一个压缩包就是需要安装的文件,要先解压在安装。在安装之前将网络断开,不然可能破解不成功,先进入…

Scala实用编程(附电子书资料)

概述 Scala 是一种多范式编程语言,结合了面向对象编程(OOP)和函数式编程(FP)的特性电子书资料:https://pan.quark.cn/s/88737d4a680d Scala 的核心特点多范式融合 既支持面向对象编程(类、继承、…

数据结构(8)双向链表

目录 一、概念与结构 二、双向链表的实现 1、初始化 2、尾插 3、头插 4、尾删 5、头删 6、在指定位置之后插入结点 7、删除指定位置的结点 三、完整参考代码 一、概念与结构 这里的双向链表是指带头的的双向循环链表,这里的“带头”和之前所说的“头结…

【DeepSeek-R1 】分词系统架构解析

文章目录 🧩前言 🔍 1. SentencePiece Unigram 的核心原理 1.1 算法基础框架 1.2 核心数学原理 1.3 与BPE/WordPiece的对比 ⚙️ 2. DeepSeek-R1 分词器实现细节 2.1 词表结构设计 2.2 关键特性实现 📊 3. 性能优化关键技术 3.1 加速策略对比 3.2 编码过程伪代码 🔬 4.…

Linux自主实现shell

以下是在Linux操作系统 centos7版本下实现的shell &#xff0c;该shell具备bash的基础功能&#xff0c;无上下键输入历史命令功能&#xff0c;删除字符或命令时按住Ctrl Back #include<stdio.h> #include<stdlib.h> #include<string.h> #include<unistd.…

vue+elementUI上传图片至七牛云组件封装及循环使用

1.效果&#xff08;解决循环组件赋值问题&#xff09; 废话不多说直接上代码 2.下载七牛云依赖 npm install qiniu-js # 或者使用 yarn yarn add qiniu-js3.在vue组件中引入 import * as qiniu from qiniu-js4.在components文件夹下创建UploadImg1/uploadImg.vue组件 <templ…

2025年6月电子学会青少年软件编程(C语言)等级考试试卷(一级)

答案和更多内容请查看网站&#xff1a;【试卷中心 -----> 电子学会 ----> C/C ----> 一级】 网站链接 青少年软件编程历年真题模拟题实时更新 一、编程题 第 1 题 希望如光 题目描述 在充满挑战的生活中&#xff0c;希望往往是支撑人们穿越黑暗的核心力量。这…

拒绝复杂,AI图表制作简单化

在信息爆炸的时代&#xff0c;数据可视化已成为传递信息的核心手段。无论是职场汇报中的业绩分析&#xff0c;还是学术研究里的实验数据呈现&#xff0c;一张清晰直观的图表往往能胜过千言万语。而 AI 技术的介入&#xff0c;彻底改变了图表制作的传统模式 —— 它不仅让零基础…

easypoi生成多个sheet的动态表头的实现

在使用 EasyPOI 导出 Excel 时&#xff0c;生成多个 Sheet 且每个 Sheet 的表头是动态的&#xff08;即每个 Sheet 的列数和列名可能不同&#xff09;&#xff0c;可以通过如下方式实现&#xff1a;✅ 实现原理简述 使用 Workbook workbook ExcelExportUtil.exportExcel(expor…

移除链表元素+反转链表+链表的中间节点+合并两个有序链表+环形链表约瑟夫问题+分割链表

一、移除链表元素 给你一个链表的头节点 phead 和一个整数 val &#xff0c;请你删除链表中所有满足 Node.val val 的节点&#xff0c;并返回 新的头节点 (列表中的节点数目在范围 [0, 104] 内) 示例&#xff1a;输入&#xff1a;head [1,2,6,3,4,5,6], val 6 …

vue3+arcgisAPI4示例:轨迹点模拟移动(附源码下载)

demo源码运行环境以及配置运行环境&#xff1a;依赖Node安装环境&#xff0c;需要安装Node。 运行工具&#xff1a;vscode或者其他工具。 配置方式&#xff1a;下载demo源码&#xff0c;vscode打开&#xff0c;然后顺序执行以下命令&#xff1a; &#xff08;1&#xff09;下载…

Design Compiler:Milkyway库的创建与使用

相关阅读 Design Compilerhttps://blog.csdn.net/weixin_45791458/category_12738116.html?spm1001.2014.3001.5482 DC Ultra推出了拓扑模式&#xff0c;在综合时会对标准单元进行粗布局(Coarse Placement)并使用虚拟布线(Virtual Routing)技术计算互联延迟&#xff0c;关于拓…

嵌入式教学的云端革命:高精度仿真如何重塑倒车雷达实验与工程教育——深圳航天科技创新研究院赋能新一代虚实融合实训平台

一、嵌入式教学的困境与破局之道 在传统嵌入式系统教学中&#xff0c;硬件依赖始终是核心痛点。以“倒车雷达实验”为例&#xff0c;学生需操作STM32开发板、超声波传感器、蜂鸣器等硬件&#xff0c;面临设备损耗、接线错误、调试效率低等问题。更关键的是&#xff0c;物理硬件…

flutter-boilerplate-project 学习笔记

项目地址&#xff1a; https://github.com/zubairehman/flutter_boilerplate_project/tree/master 样板包含创建新库或项目所需的最小实现。存储库代码预加载了一些基本组件&#xff0c;例如基本应用程序架构、应用程序主题、常量和创建新项目所需的依赖项。通过使用样板代码…

集成电路学习:什么是CMSIS微控制器软件接口标准

CMSIS,即Cortex Microcontroller Software Interface Standard(Cortex微控制器软件接口标准),是由ARM公司与多家不同的芯片和软件供应商紧密合作定义的一个标准。该标准旨在为基于ARM Cortex处理器的微控制器提供一套与供应商无关的硬件抽象层,从而简化软件的开发、重用,…

由浅入深使用LangGraph创建一个Agent工作流

创建一个简单的工作流&#xff1a;Start ——> 节点1(固定输入输出) ——> Endfrom langchain_core.messages import SystemMessage, HumanMessage, AIMessage from langgraph.graph import StateGraph, START, END from typing_extensions import TypedDict from typing…