【RocketMQ】一分钟了解RocketMQ

MQ是什么

MQ全称为Message Queue,即消息队列 ,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生 产、存储、消费全过程的软件系统,遵循FIFO原则。

MQ的好处有哪些

  • 异步解耦
    最常见的一个场景是用户注册后,需要发送注册邮件和短信通知,以告知用户注册成功。传统的做法有以下两种:
    在这里插入图片描述
    数据流动如下所述:
    1.注册页面填写账号和密码并提交注册信息,这些注册信息首先会被写入注册系统。
    2.注册信息写入注册系统成功后,再发送请求至邮件通知系统。邮件通知系统收到请求后向用户发送邮件通知。
    3.邮件通知系统接收注册系统请求后再向下游的短信通知系统发送请求。短信通知系统收到请求后向用户发送短信通知。

以上三个任务全部完成后,才返回注册结果到客户端,用户才能使用账号登录。假设每个任务耗时分别为 50ms,则用户需要在注册页面等待总共 150ms 才能登录。

并行形式:

对于用户来说,注册功能实际只需要注册系统存储用户的账户信息后,该用户便可以登录,后续的注册短信和邮件不是即时需要关注的步骤。

对于注册系统而言,发送注册成功的短信和邮件通知并不一定要绑定在一起同步完成,所以实际当数据写入注册系统后,注册系统就可以把其他的操作放入对应的 RocketMQ 中然后马上返回用户结果,由 RocketMQ 异步地进行这些操作。

在这里插入图片描述

  • 削峰填谷
    简单来说就是当遇到秒杀等业务时,用户访问量大增,这时候可以使用MQ,将消息存入MQ当中这样就可以减少秒杀等高访问量场景下的造成的影响了
  • 分布式定时/延时调度
    RocketMQ 提供精确度到秒级的分布式定时消息能力(5.0架构后),可广泛应用于订单超时中心处理、分布式延时调度系统等场景。

使用 RocketMQ 定时消息有如下优势:

  • 定时精度高、开发门槛低:消息定时时间不存在阶梯间隔,可以轻松实现任意精度事件触发,无需业务去重。

  • 高性能、可扩展:传统的定时实现方案较为复杂,需要进行数据库扫描,容易遇到性能瓶颈的问题,RocketMQ 可以基于定时消息特性完成事件驱动,实现百万级消息 TPS 能力。

什么是RocketMQ

RocketMQ 是一个开源的分布式消息中间件,由阿里巴巴开发并贡献给 Apache 软件基金会。它主要用于高吞吐量、低延迟的消息传递需求。

RocketMQ 的优点和功能是比较多的,以下是 一些主要特点和功能:

  • 高吞吐量和低延迟:RocketMQ 设计用于处理大量的消息,并提供低延迟的消息传递服务,适合需要高性能的场景。

  • 分布式架构:RocketMQ 使用分布式架构来支持大规模的消息传递。它可以水平扩展,以处理更大的数据量和更高的并发需求。

  • 消息可靠性:RocketMQ 支持消息持久化和多副本机制,确保在系统故障时不会丢失消息。这使得消息的可靠性和一致性得到了保障。

  • 高可用性和容错:RocketMQ 提供了高可用性的解决方案,包括多主多从等架构方案,确保系统的稳定性和连续性。

官网写的很详细,架构、基本概念(主题、队列、生产者、消费者、NameServer、Beroker 等)、工作原理等。推荐大家学习一波:https://rocketmq.apache.org/zh/docs

RocketMQ架构

RocketMQ架构上主要分为四部分

1.1.Producer

消息发布的角色,支持分布式集群方式部署。Producer通过nameserver的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

1.2.Consumer

消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时 也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

1.3.Broker

Broker主要负责消息的存储、投递和查询以及服务高可用保证。

1.4.NameServer

NameServer是一个Broker与Topic路由的注册中心支持Broker的动态注册与发现主要包括两个功能

  • Broker管理
    NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活。

  • 路由信息管理
    每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费

Springboot整合

生产者

@Service
public class RocketMQProducer{@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Value("${rocketmq.producer.send-message-timeout}")private Integer messageTimeOut;/*** 发送普通消息* @return*/public SendResult sendMsg(String msgBody){SendResult result = rocketMQTemplate.syncSend("queue_test_topic", MessageBuilder.withPayload(msgBody).build());return result;}/*** 发送异步消息 在SendCallback中可处理相关成功失败时的逻辑*/public void sendAsyncMsg(String msgBody){rocketMQTemplate.asyncSend("queue_test_topic",MessageBuilder.withPayload(msgBody).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 处理消息发送成功逻辑}@Overridepublic void onException(Throwable e) {// 处理消息发送异常逻辑}});}/*** 发送延时消息<br/>* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h<br/>*/public void sendDelayMsg(String msgBody, Integer delayLevel){rocketMQTemplate.syncSend("queue_test_topic",MessageBuilder.withPayload(msgBody).build(),messageTimeOut,delayLevel);}/*** 发送带tag的消息,直接在topic后面加上":tag"*/public void sendTagMsg(String msgBody){rocketMQTemplate.syncSend("queue_test_topic:tag1",MessageBuilder.withPayload(msgBody).build());}}

消费者

/*** rocketmq 消息监听,@RocketMQMessageListener中的selectorExpression为tag,默认为**/
@Slf4j
@Component
@RocketMQMessageListener(topic = "queue_test_topic",selectorExpression="*",consumerGroup = "queue_group_test")
public class RocketMQMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {byte[] body = message.getBody();String msg = new String(body, CharsetUtil.UTF_8);log.info("接收到消息:{}", msg);}}

测试

@Controller
public class ProducerController {@Autowiredprivate RocketMQProducer rocketMQProducer;@RequestMapping("/send")@ResponseBodypublic SendResult send(String msg)  {//formats: `topicName:tags`return rocketMQProducer.sendMsg(msg);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/90681.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/90681.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

01 01 01 第一部分 C++编程知识 C++入门 第一个C++程序

第一部分 C编程知识第一章 C入门 —— 第一个C程序一、第一个C程序代码展示//写一个C程序&#xff0c;实现在屏幕上打印 “hello world” #include <iostream> using namespace std; int main() {cout << "hello world" << endl;return 0; }二、…

进制定义与转换详解

文章目录&#x1f4d8; 进制定义与转换详解一、进制的含义二、常见进制介绍1. 十进制&#xff08;Decimal&#xff0c;Base-10&#xff09;2. 二进制&#xff08;Binary&#xff0c;Base-2&#xff09;3. 八进制&#xff08;Octal&#xff0c;Base-8&#xff09;4. 十六进制&am…

【安卓笔记】用MVC、MVP、MVVM来实现井字棋案例

0. 环境&#xff1a;电脑&#xff1a;Windows10Android Studio: 2024.3.2编程语言: JavaGradle version&#xff1a;8.11.1Compile Sdk Version&#xff1a;35Java 版本&#xff1a;Java111. 首先、简单实现井字棋的功能。功能拆解&#xff1a;1. 棋盘为3x32. 点击棋盘button&a…

【洛谷】单向链表、队列安排、约瑟夫问题(list相关算法题)

文章目录单向链表题目描述题目解析代码队列安排题目描述题目解析代码约瑟夫问题题目描述题目解析代码单向链表 题目描述 题目解析 这道题因为有大量的任意位置插入删除&#xff0c;所以肯定不能用数组&#xff0c;用链表是最合适的&#xff0c;而在算法竞赛通常都用静态链表&a…

当人机交互迈向新纪元:脑机接口与AR/VR/MR的狂飙之路

从手机到 “头盔”&#xff1a;交互终端的变革猜想​​在当今数字化时代&#xff0c;智能手机无疑是我们生活中不可或缺的一部分。它集通讯、娱乐、办公等多种功能于一身&#xff0c;成为了人们与外界交互的主要窗口。然而&#xff0c;随着科技的飞速发展&#xff0c;智能手机作…

InfluxDB HTTP API 接口调用详解(二)

实际应用案例演示 1. 数据写入案例 假设在一个物联网设备数据采集场景中&#xff0c;有多个传感器设备持续采集环境的温度和湿度数据。我们以 Python 语言为例&#xff0c;使用requests库来调用 InfluxDB 的 Write 接口将数据写入 InfluxDB。 首先&#xff0c;确保已经安装了…

世运会线上知识竞赛答题pk小程序怎么做

随着2025年成都世界运动会的来临&#xff0c;越来越多的企事业单位组织员工进行线上知识竞赛&#xff0c;那么答题PK小程序该怎么做&#xff0c;接下来我们来一一分析&#xff1a; 世运会线上知识竞赛答题pk小程序怎么做一、答题功能&#xff1a;支持多种题型&#xff0c;如选择…

Java毕业设计 | 基于微信小程序的家校互动作业管理系统(Spring Boot+Vue.js+uni-app+AI,附源码+文档)

Java毕业设计 | 基于微信小程序的家校互动作业管理系统&#xff08;Spring BootVue.jsuni-app&#xff0c;附源码文档&#xff09;&#x1f3af; 毕业设计私人教练 专注计算机毕设辅导第 6 年&#xff0c;累计 1v1 带飞 800 同学顺利通关。从选题、开题、代码、论文到答辩&…

CentOS8 使用 Docker 搭建 Jellyfin 家庭影音服务器

CentOS8 使用 Docker 搭建 Jellyfin 家庭影音服务器 一、前言 由于 Jellyfin 的 GPL 协议和 Intel 的 media-driver (iHD) Linux 驱动&#xff08;部分开源&#xff09;在协议上不兼容的缘故&#xff0c;Jellyfin 官方的 Docker 镜像&#xff1a;jellyfin/jellyfin 并不包含 …

PyTorch武侠演义 第一卷:初入江湖 第4章:损失玉佩的评分风波

第一卷&#xff1a;初入江湖 第4章&#xff1a;损失玉佩的评分风波比武开幕 晨钟响彻山谷&#xff0c;PyTorch派三年一度的"模型比武大会"正式开始。各分舵弟子列队入场&#xff0c;林小码跟在Tensor大师身后&#xff0c;眼睛瞪得溜圆——只见&#xff1a; "卷积…

HttpServletRequestWrapper存储Request

HTTP请求的输入流只能被读取一次&#xff0c;再想获取就获取不到了&#xff0c;那有什么方法可以缓存呢&#xff0c;我们可以自定义一个HttpServletRequest&#xff0c;或者是想在请求参数中统一添加或删除参数也可以使用此类进行改造&#xff0c;然后通过过滤器继续向下流转。…

算法:数组part02: 209. 长度最小的子数组 + 59.螺旋矩阵II + 代码随想录补充58.区间和 + 44. 开发商购买土地

算法&#xff1a;数组part02: 209. 长度最小的子数组 59.螺旋矩阵II 代码随想录补充58.区间和 44. 开发商购买土地 209. 长度最小的子数组题目&#xff1a;https://leetcode.cn/problems/minimum-size-subarray-sum/description/ 文章讲解&#xff1a;https://programmercarl…

Spring 核心知识点梳理 1

目录 Spring Spring是什么&#xff1f; Spring中重要的模块 Spring中最重要的就是IOC(控制反转)和AOP(面向切面编程) 什么是IOC DI和IOC之间的区别 为什么要使用IOC呢&#xff1f; IOC的实现机制 什么是AOP Aop的核心概念 AOP的环绕方式 AOP发生的时期 AOP和OOP的…

Kafka运维实战 07 - kafka 三节点集群部署(混合模式)(KRaft 版本3.7.0)

目录环境准备主机准备补充说明JDK安装 (三台主机分别执行)下载jdkjdk安装kafka 部署(三台主机分别执行)kafka 下载kafka 版本号结构解析kafka 安装下载和解压安装包(3台主机都执行)配置 server.properties &#xff08;KRaft 模式&#xff09;192.168.37.10192.168.37.11192.16…

linux内核与GNU之间的联系和区别

要理解操作系统&#xff08;如 GNU/Linux&#xff09;的组成&#xff0c;需要明确 内核&#xff08;Kernel&#xff09; 和 GNU 工具链 各自的功能&#xff0c;以及它们如何协作构成完整的操作系统。以下是详细分析&#xff1a;1. 内核&#xff08;Kernel&#xff09;的功能 内…

文件包含学习总结

目录 漏洞简介 漏洞原理 漏洞分类 漏洞防御 漏洞简介 程序开发人员一般会把重复使用的函数写到单个文件中&#xff0c;需要使用某个函数时直接调用此文件&#xff0c;而无需再次编写&#xff0c;这种文件调用的过程一般被称为文件包含。程序开发人员一般希望代码更灵活&…

TQZC706开发板教程:创建PCIE项目

本例程基于zc706开发板&#xff0c;使用xdma核创建PCIE项目&#xff0c;最终实现插入主机可识别出Xilinx设备。在vivado中创建一个空的706项目。创建完成后添加IP核-->搜索xdma-->双击打开配置。添加XDMA核如下所示basic配置peic id中设置设备号等信息&#xff0c;这里保…

科技赋能景区生.态,负氧离子气象监测站筑牢清新防线

负氧离子气象监测站&#xff0c;如同景区空气质量的坚固防线&#xff0c;默默守护着每一寸土地的清新。​它以精准的监测能力为防线基石。借助 “吸入式电容收集法”&#xff0c;能敏锐捕捉空气中负氧离子的踪迹&#xff0c;精准测量其浓度&#xff0c;同时将温度、湿度、PM2.5…

AMD官网下载失败,不让账户登录下载

别使用163邮箱 使用QQ邮箱&#xff0c;然后用GPT生成一个外国&#xff0c;比如日本的地区信息填上去就可以下载了

Elasticsearch-8.17.0 centos7安装

下载链接 https://www.elastic.co/downloads/past-releases/elasticsearch-8-17-0 https://www.elastic.co/downloads/past-releases/logstash-8-17-0 https://www.elastic.co/cn/downloads/past-releases/kibana-8-17-0https://artifacts.elastic.co/downloads/elasticsearch/…