Rockermq的部署与使用(0-1)

RocketMQ​ 是阿里巴巴开源的一款 ​分布式消息中间件,具有高吞吐、低延迟、高可用等特点,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。

一.RockerMQ的概念: 

RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer 负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 Broker。Message Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup 由多个Consumer 实例构成。Rockermq官网链接地址

1. 架构分析:

  • 生产者(Producer):负责产生消息,生产者向消息服务器发送由业务应用程序系统生成的消息。
  • 消费者(Consumer):负责消费消息,消费者从消息服务器拉取信息并将其输入用户应用程序。
  • 消息服务器(Broker):是消息存储中心,主要作用是接收来自 Producer 的消息并存储, Consumer 从这里取得消息。
  • 名称服务器(NameServer):用来保存 Broker 相关 Topic 等元信息并给 Producer ,提供 Consumer 查找 Broker 信息。

2.整体流程:

1、首先启动 Namesrv(名称服务器,Namesrv 起来后监听端口,等待 Broker、Producer、Consumer 连上来,相当于一个路由控制中心。

2、Broker 启动,跟所有的 Namesrv 保持长连接,定时发送心跳包

心跳包中,包含当前 Broker 信息(IP+端口等)以及存储所有 Topic 信息。 注册成功后,Namesrv 集群中就有 Topic 跟 Broker 的映射关系。

3、收发消息前,先创建 Topic 。创建 Topic 时,需要指定该 Topic 要存储在哪些 Broker上。也可以在发送消息时自动创建Topic。

4、Producer 发送消息。

启动时,先跟 Namesrv 集群中的其中一台建立长连接,并从Namesrv 中获取当前发送的 Topic 存在哪些 Broker 上,然后跟对应的 Broker 建立长连接,直接向 Broker 发消息。

5、Consumer 消费消息。

Consumer 跟 Producer 类似。跟其中一台 Namesrv 建立长连接,获取当前订阅 Topic 存在哪些 Broker 上,然后直接跟 Broker 建立连接通道,开始消费消息。

二.Rockermq单机部署:

1.安装解压包:

rocketmq-all-4.6.0-source-release.zip

点击横杠即可下载安装包。

2.Windows部署 / Linux部署:

(1)Windows部署: 

解压后使用 Maven 编译 RocketMQ 源码。命令行操作如下: 

# 进入 RocketMQ 源码目录
$ cd rocketmq-all-4.6.0-source-release# Maven 编译 RocketMQ ,并跳过测试。耐心等待...
$ mvn -Prelease-all -DskipTests clean install -U

随后进入.\bin\mqnamesrv.cmd来启动Namesrv,如果报错 Java not found,请检查 JAVA_HOME 环境变量。

.\mqnamesrv.cmd

正常会出现下图: 

随后新建一个CMD窗口(不要关闭 NameServer),进入 bin 目录,启动Broker:

mqbroker.cmd -c ..\conf\broker.conf -n localhost:9876

(2)Linux部署:

使用wget命令下载,随后使用unzip命令解压: 

# 下载
$ wget wget http://mirror.bit.edu.cn/apache/rocketmq/4.6.0/rocketmq-all-4.6.0-source-release.zip# 解压
$ unzip rocketmq-all-4.6.0-source-release.zip

随后同样使用 Maven 编译 RocketMQ 源码。命令行操作如下:

# 进入 RocketMQ 源码目录
$ cd rocketmq-all-4.6.0-source-release# Maven 编译 RocketMQ ,并跳过测试。耐心等待...
$ mvn -Prelease-all -DskipTests clean install -U

编译完成,在我们进入 distribution 目录下,就可以看到 RocketMQ 的发布包了。命令行操作如下:

# 进入 distribution 目录下
$ cd distribution/target/rocketmq-4.6.0/rocketmq-4.6.0# 打印目录
$ ls
40 -rwxr-xr-x   1 yunai  staff  17336 Nov 19 20:59 LICENSE8 -rwxr-xr-x   1 yunai  staff   1338 Nov 19 20:59 NOTICE
16 -rwxr-xr-x   1 yunai  staff   4225 Nov 19 20:59 README.md0 drwxr-xr-x   6 yunai  staff    192 Dec  3 12:48 benchmark # 性能基准测试0 drwxr-xr-x  30 yunai  staff    960 Nov 19 20:59 bin # 执行脚本0 drwxr-xr-x  12 yunai  staff    384 Nov 19 20:59 conf # 配置文件0 drwxr-xr-x  36 yunai  staff   1152 Dec  3 12:48 lib # RocketMQ jar 包

随后启动Namesrv:

nohup sh bin/mqnamesrv &

启动完成后,查看日志:

# 查看 Namesrv 日志。
$ tail -f ~/logs/rocketmqlogs/namesrv.log2019-12-03 12:58:04 INFO main - The Name Server boot success. serializeType=JSON

默认情况下,Namesrv 日志文件所在地址为 ~/logs/rocketmqlogs/namesrv.log 。如果想要自定义,可以通过 conf/logback_namesrv.xml 配置文件来进行修改。

 随后启动broker:

在 conf 目录下,RocketMQ 提供了多种 Broker 的配置文件:

  • broker.conf :单主,异步刷盘。
  • 2m/ :双主,异步刷盘。
  • 2m-2s-async/ :两主两从,异步复制,异步刷盘。
  • 2m-2s-sync/ :两主两从,同步复制,异步刷盘。
  • dledger/ :Dledger 集群,至少三节点。

这里,我们只启动一个 RocketMQ Broker 服务,所以使用 broker.conf 配置文件。命令行操作如下:

nohup sh bin/mqbroker -c conf/broker.conf  -n 127.0.0.1:9876 &
  • 通过 -c 参数,配置读取的主 Broker 配置。

  • 通过 -n 参数,设置 RocketMQ Namesrv 地址。

 3.测试消息:

(1)测试发送消息: 

通过使用 bin/tools.sh 工具类,实现测试发送消息。命令行操作如下:

# 设置 Namesrv 服务器的地址
export NAMESRV_ADDR=127.0.0.1:9876# 执行生产者 Producer 发送测试消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

如果发送成功,我们会看到大量成功的发送日志。

SendResult [sendStatus=SEND_OK, msgId=FE800000000000004F2B5386138462F500000D7163610D67E7F100F4, offsetMsgId=C0A8032C00002A9F000000000000D7EE, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=61]
SendResult [sendStatus=SEND_OK, msgId=FE800000000000004F2B5386138462F500000D7163610D67E7F200F5, offsetMsgId=C0A8032C00002A9F000000000000D8D1, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=61]

通过发送结果为 sendStatus=SEND_OK 状态,说明消息都发送成功了。

(2)测试消费消息:

通过使用 bin/tools.sh 工具类,实现测试消费消息。命令行操作如下:

# 设置 Namesrv 服务器的地址
export NAMESRV_ADDR=127.0.0.1:9876# 执行消费者 Consumer 消费测试消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

如果消费成功,我们会看到大量成功的消费日志。

ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=2, storeSize=227, queueOffset=131, sysFlag=0, bornTimestamp=1575354513732, bornHost=/192.168.3.44:55510, storeTimestamp=1575354513733, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F000000000001D1FC, commitLogOffset=119292, bodyCRC=1549304357, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=145, CONSUME_START_TIME=1575354867104, UNIQ_KEY=FE800000000000004F2B5386138462F500000D7163610D67E944020E, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 50, 54], transactionId='null'}]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=227, queueOffset=130, sysFlag=0, bornTimestamp=1575354513729, bornHost=/192.168.3.44:55510, storeTimestamp=1575354513729, storeHost=/192.168.3.44:10911, msgId=C0A8032C00002A9F000000000001CE70, commitLogOffset=118384, bodyCRC=1530218044, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=145, CONSUME_START_TIME=1575354867103, UNIQ_KEY=FE800000000000004F2B5386138462F500000D7163610D67E941020A, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 53, 50, 50], transactionId='null'}]]

通过 ConsumeMessageThread_4 和 ConsumeMessageThread_3 线程名,我们可以看出,目前是进行并发消费消息。

三.Rockermq的简单使用实例:

(1)项目结构:

rocketmq-demo/
├── src/
│   ├── main/
│   │   ├── java/
│   │   │   └── com/example/
│   │   │       ├── producer/OrderProducer.java   # 生产者
│   │   │       ├── consumer/OrderConsumer.java  # 消费者
│   │   │       └── RocketmqDemoApplication.java # 启动类
│   │   └── resources/
│   │       └── application.yml                  # 配置文件
│   └── test/
└── pom.xml                                      # Maven依赖

(2)添加依赖:

<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- RocketMQ Starter --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.3</version></dependency>
</dependencies>

(3)编写配置文件:

rocketmq:name-server: localhost:9876  # NameServer地址producer:group: order-producer-group  # 生产者组consumer:group: order-consumer-group  # 消费者组topic: order-topic           # 订阅的Topic

(4)编写生产者代码:

package com.example.producer;import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class OrderProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendOrderMessage(String orderId) {// 发送消息(Topic: order-topic, Tag: pay)rocketMQTemplate.convertAndSend("order-topic:pay", "订单支付成功,订单ID: " + orderId);System.out.println("生产者发送消息: " + orderId);}
}

(5)编写消费者代码:

package com.example.consumer;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;@Component
@RocketMQMessageListener(topic = "order-topic",          // 监听的TopicselectorExpression = "pay",     // 过滤Tag=pay的消息consumerGroup = "order-consumer-group"
)
public class OrderConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("消费者收到消息: " + message);}
}

(6)启动类:

package com.example;import com.example.producer.OrderProducer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;@SpringBootApplication
public class RocketmqDemoApplication {public static void main(String[] args) {ConfigurableApplicationContext context = SpringApplication.run(RocketmqDemoApplication.class, args);// 测试发送消息OrderProducer producer = context.getBean(OrderProducer.class);producer.sendOrderMessage("1001");  // 发送订单ID=1001}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/79646.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/79646.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件测试报告机构如何保障软件质量并维护其安全性?

软件测试报告机构在软件开发流程里起着十分关键的作用&#xff0c;它可以保障软件的质量&#xff0c;它还能够维护软件的安全性。下面&#xff0c;我们就来深入了解一下这类机构。 机构作用 软件测试报告机构是软件质量的“把关者”&#xff0c;能对软件进行全面评估&#xf…

4个纯CSS自定义的简单而优雅的滚动条样式

今天发现 uni-app 项目的滚动条不显示&#xff0c;查了下原来是设置了 ::-webkit-scrollbar {display: none; } 那么怎么用 css 设置滚动条样式呢&#xff1f; 定义滚动条整体样式‌ ::-webkit-scrollbar 定义滚动条滑块样式 ::-webkit-scrollbar-thumb 定义滚动条轨道样式‌…

ES6入门---第二单元 模块五:模块化

js不支持模块化 注意&#xff1a; 需要放到服务器环境 1、如何定义模块&#xff1f; export 东西 例&#xff1a;1.js文件中 console.log(1模块加载了);//显示是否加载了 export const a 12; export const b 5; export let c 101; const a12; const b5; const c101;ex…

14.Excel:排序和筛选

一 位置 两个位置。 二 排序&#xff1a;如何使用 1.常规使用 补充&#xff1a;不弹出排序提醒排序。 选中要排序列中的任意一个单元格&#xff0c;然后排序。 2.根据要求进行排序 1.根据姓名笔画进行降序排序 要勾选上数据包含标题&#xff0c;默认是勾选了。 2.根据运营部、…

嵌入式系统基础知识

目录 一、冯诺依曼结构与哈佛结构 &#xff08;一&#xff09;冯诺依曼结构 &#xff08;二&#xff09;哈佛架构 二、ARM存储模式 &#xff08;一&#xff09;大端模式 &#xff08;二&#xff09;小端模式 &#xff08;三&#xff09;混合模式 三、CISC 与 RISC &am…

CSS 预处理器 Sass

目录 Sass 一、Sass 是什么&#xff1f; 二、核心功能详解 1. 变量&#xff08;Variables&#xff09; 2. 嵌套&#xff08;Nesting&#xff09; 3. 混合宏&#xff08;Mixins&#xff09; 4. 继承&#xff08;Inheritance&#xff09; 5. 运算&#xff08;Operations&…

信息收集新利器:SSearch Chrome 插件来了

SSearch 下载地址 SSearch &#x1f623;用途 每次谷歌语法搜索时还得自己写&#xff0c;我想省事一点&#xff0c;弄了一个插件&#xff0c;先加了几个常用的语法&#xff0c;点击后会跳转到对应搜索页面&#xff0c;也可以直接在搜索框微调 后续也会加些其他语法 &#…

Docker搭建SFTP

在这个教程中&#xff0c;我们将通过一个简单的例子来展示如何使用 Docker 和 atmoz/sftp 镜像设置一个基本的 SFTP 服务。这个服务将允许用户通过 SFTP 安全地访问和管理文件。我们将配置一个名为 ops 的用户&#xff0c;其密码为 123456&#xff0c;并限定用户只能访问特定的…

正态分布习题集 · 答案与解析篇

正态分布习题集 答案与解析篇 与题目篇编号一致,如有其他解题思路,欢迎在评论区交流。 1. 基础定义与性质 1.1 密度函数 X ∼ N ( μ , σ 2 ) X \sim N(\mu,\sigma^2) X∼N(μ,σ2) 的 PDF: [ f(x) = \frac{1}{\sigma\sqrt{2\pi}} \exp\left(-\frac{(x-\mu)2}{2\sigma2}\…

Java学习手册:SQL 优化技巧

一、SQL 查询优化 选择合适的索引列 &#xff1a;索引可以显著提高查询速度&#xff0c;但需要选择合适的列来创建索引。通常&#xff0c;对于频繁作为查询条件的列、连接操作的列以及排序或分组操作的列&#xff0c;应该考虑创建索引。例如&#xff0c;在一个订单表中&#xf…

(02)Redis 的订阅发布Pub/Sub

我们为了自己实现一个MQ功能&#xff0c;就要深入底层挖掘现有开源产品的实现过程。 Redis 发布订阅底层结构解析 Redis 不存储消息&#xff0c;仅作为“实时中转”&#xff1b;只有订阅者在线时才能收到消息&#xff1b;消息是广播给所有订阅此频道的客户端。 1. 核心数据结…

使用Docker一键安装SigLens:简单快捷的日志分析解决方案

在当今复杂的IT环境中,高效的日志管理和分析变得越来越重要。SigLens作为一款强大的开源日志分析工具,为开发者和运维人员提供了直观、高效的日志处理体验。本文将介绍如何使用Docker快速安装SigLens,让您在几分钟内就能开始进行日志分析。 为什么选择Docker安装SigLens? Do…

C#与西门子PLC通信:S7NetPlus和HslCommunication使用指南

西门子S7协议是用来和PLC进行通讯的一个协议&#xff0c;默认端口是102&#xff0c;数据会保存在一个个DB块中&#xff0c;比较经典的用法是一个DB块专门用来读取&#xff0c;一个用来写入。 DB&#xff08;数据块&#xff09; {块号}.DBX/DBD/DBW{字节地址}.{位偏移} 1、数据…

【中间件】brpc_基础_remote_task_queue

文章目录 remote task queue1 简介2 核心功能2.1 任务提交与分发2.2 无锁或低锁设计2.3 与 bthread 深度集成2.4 流量控制与背压 3 关键实现机制3.1 数据结构3.2 任务提交接口3.3 任务窃取&#xff08;Work Stealing&#xff09;3.4 同步与唤醒 4 性能优化5 典型应用场景6 代码…

C语言实现数据结构:堆排序和二叉树_链式

一.堆的应用 1.堆排序 void test01() {int arr[] { 17,20,10,13,19,15 };int n sizeof(arr) / sizeof(arr[0]);HP p;HPInit(&p);for (int i 0; i < n; i){HPPush(&p, arr[i]);}int i 0;while (!HPEmpty(&p)){arr[i] HPTop(&p);HPPop(&p);}for (i…

C和指针——预处理

预处理是编译前的过程&#xff0c;主要对define&#xff0c;include以及一些编译器定义的内容进行替换 #define的本质就是替换 1、例子 #define FOREVER for(;;) 2、例子 #define TEMPD "1231231231\ 123123123" \\如果太长了&#xff0c;可以用\换行 3、例子——可…

C++ set和map

目录 一、关联式容器 1.1 键值对 1.1.1 概念 1.1.2 pair 1.2 树形结构的关联式容器 二、set 2.1 set 的介绍 2.2 set 的使用 2.2.1 set 的构造 2.2.2 set 的迭代器 2.2.3 set 的容量操作 2.2.4 set 的修改操作 2.2.5 set 的查找操作 三、multiset 3.1 multiset …

「Mac畅玩AIGC与多模态07」开发篇03 - 开发第一个 Agent 插件调用应用

一、概述 本篇介绍如何在 macOS 环境下,基于 Dify 平台自带的网页爬虫插件工具,开发一个可以提取网页内容并作答的 Agent 应用。通过使用内置插件,无需自定义开发,即可实现基本的网页信息提取与智能体回答整合。 二、环境准备 1. 确认本地部署环境 确保以下环境已搭建并…

cline或业务系统集成n8n的工作流(MCP Server Trigger、Call n8n Workflow Tool node)

1.成果展示 1.1n8n的主工作流 1.2n8n的子工作流 1.3cline集成效果 2.实操过程 2.1Call n8n Workflow Tool node节点 Call n8n Workflow Tool节点是一个工具&#xff0c;它允许代理运行另一个n8n工作流并获取其输出数据。 在此页面上&#xff0c;您将找到“调用n8n工作流工具…

深入了解Linux系统—— 环境变量

命令行参数 我们知道&#xff0c;我们使用的指令它本质上也是一个程序&#xff0c;我们要执行这个指令&#xff0c;输入指令名然后回车即可执行&#xff1b;但是对于指令带选项&#xff0c;又是如何实现的呢&#xff1f; 问题&#xff1a;main函数有没有参数&#xff1f; 在我…