一、RocketMQ概述
RocketMQ是一款由阿里巴巴于2012年开源的分布式消息中间件,旨在提供高吞吐量、高可靠性的消息传递服务。主要特点有:
-
灵活的可扩展性
-
海量消息堆积能力
-
支持顺序消息
-
支持多种消息过滤方式
-
支持事务消息
-
支持回溯消费
-
支持延时消息
RocketMQ的基本概念
Producer(生产者):
负责生产消息,一般由业务系统负责。一个消息生产者会把业务应用系统里产生的消息发送到Broker服务器RocketMQ提供了3种方式发送消息:同步、异步和单向
同步发送(Sync Send):
同步发送是指消息发送方在发送一条消息后,在收到服务器返回的响应结果后才发送下一条消息。如果消息成功发送到Broker,则返回发送成功的结果;如果发送失败(如Broker宕机),则抛出异常。
适用于重要通知消息场景:重要通知邮件、金融交易和订单处理等
异步发送(Async Send):
异步发送是指消息发送方在发送一条消息后,不需要等待服务器响应即可继续发送消息。但需要Broker返回确认信息。
单向发送(One-way Send):
单向发送是指消息发送方发送一条消息后,不会等待服务器响应,也不需要Broker返回确认信息。这种方式发送消息的过程耗时非常短,常用于对可靠性要求不是很高,但对实时性要求较高的场景。
生产者组(Producer Group)
RocketMQ中一组生产者实例的集合,这些生产者实例可以共同工作以发送消息到同一个或多个主题(Topic)。
Consumer(消费者):
消息消费的角色,支持分布式集群方式部署。消费者有两种类型:拉取型和推送型支持集群方式和广播方式的消费。
拉取型消费(Pull Consumer):
在Pull模式下,消费者主动从消息中间件拉取消息。这种模式给了消费者更多的控制权,它们可以根据自己的处理能力和当前状态来决定何时拉取消息、拉取多少消息。
推动型消费(Push Consumer):
在Push模式下,消息中间件主动将消息推送到消费者(Consumer)端。一旦有消息到达,中间件会立即尝试将消息传递给消费者,而不需要消费者主动请求。
Push消费模式下,消费者首先需要注册一个消费监听器(MessageListener)。当Broker(消息服务器)有消息到达并准备好推送给消费者时,会触发这个监听器,进而开始消费消息。这种机制使得消费者无需主动向Broker请求消息,而是等待Broker将消息推送过来,从而提高了消息消费的实时性和便捷性
消费者组(Consumer Group)
是一个承载多个消费行为一致的消费者的逻辑资源分组,它并不是运行实体,而是一个用于管理和组织消费者的逻辑概念。
消息服务器(Broker):
Broker是RocketMQ中负责存储消息的核心组件。它接收生产者(Producer)发送的消息,并将其持久化到本地磁盘或分布式存储系统中,确保消息的安全性和可靠性。除了存储消息外,Broker还负责将消息转发给消费者(Consumer)。当消费者发起消息拉取请求时,Broker会根据请求中的信息(如Topic、Queue等)查找并返回相应的消息给消费者。
Master:
Master节点是RocketMQ中的主节点,负责接收客户端(生产者)的写入请求,并将消息持久化到磁盘上。它是消息处理的核心,承担着主要的消息存储和转发任务。
Slave:
Slave节点是RocketMQ中的从节点,它负责从Master节点复制消息数据,并保持与Master节点的同步。Slave节点不直接处理客户端的写入请求,但可以在Master节点故障时接替其工作,提供消息的读取服务。
Broker的集群部署有单Master、多Master、多Master多Slave(同步双写)、多Master多Slave(异步复制)
NameServer(名称服务器):
充当路由消息的提供者,生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。
NameServer是一个几乎无状态的节点,可集群部署,节点之间无任何信息同步。
无状态
NameServer 几乎不存储任何持久化数据。它主要维护的是Broker的元数据信息,如Broker的地址、主题信息等,但这些信息是可以从Broker动态获取的,因此NameServer本身不需要进行复杂的数据同步或持久化操作。
集群部署:
NameServer支持集群部署,即可以部署多个NameServer实例来提供更高的可用性和容错性。
在集群模式下,生产者和消费者可以随机选择一个或多个NameServer进行查询,这样可以分散查询负载,提高系统整体的性能和可靠性
节点间无信息同步:
NameServer集群中的各个节点之间是相互独立的,它们之间不会进行任何信息的同步或通信。
每个NameServer都独立地从Broker获取最新的元数据信息,并维护自己的数据副本。这种设计简化了NameServer集群的管理和部署,同时也避免了因节点间同步导致的延迟和复杂性。
路由信息提供:
生产者(Producer)和消费者(Consumer)在发送或接收消息前,会先向NameServer查询目标主题(Topic)对应的Broker列表。NameServer会返回包含Broker地址的列表,生产者和消费者随后可以根据这个列表选择具体的Broker进行消息的发送或接收。
Broker注册与发现:
当Broker启动时,它会向NameServer注册自己的信息,包括地址、端口、主题列表等。NameServer会接收这些信息并更新自己的元数据,使得生产者和消费者能够发现新的Broker或Broker的变更
Topic
Topic是生产者在发送消息和消费者在拉取消息时的基本类别。它表示一类消息的集合,每个Topic都包含若干条消息,每条消息只能属于一个Topic。Topic与生产者和消费者之间的关系非常松散,一个Topic可能有0个、一个或多个生产者向其发送消息,同样,一个消费者组可以订阅一个或多个Topic,只要该组的实例保持其订阅一致即可。
Topic的主要作用包括:
- 消息分类:通过Topic将不同类型的消息进行区分,便于管理和消费。
- 消息路由:生产者发送消息时,会根据Topic将消息路由到相应的Broker上。
- 负载均衡:在消费者组中,通过Topic实现消息的负载均衡,使得消息能够均匀地被多个消费者处理。
Tag
Tag是消息的二级分类,可以看作是Topic下的子主题。它为用户提供了额外的灵活性,使得来自同一业务模块的具有不同目的的消息可以具有相同的Topic和不同的Tag。Tag有助于保持代码的清晰和连贯,同时方便RocketMQ提供的查询功能。
Tag的主要作用包括:
- 消息细分:在同一Topic下,通过Tag对消息进行进一步细分,使得消费者可以根据需要接收特定类型的消息。
- 消费过滤:消费者可以指定Tag来接收特定类型的消息,或者使用通配符(如*)来接收该Topic下的所有消息。提高消息处理效率:通过合理使用Tag,可以减少消费者处理不必要消息的开销,提高消息处理的效率。
Topic与Tag的关系
层级关系:Topic是消息的一级分类,而Tag是消息的二级分类。它们共同构成了消息系统的分类体系。
互补关系:Topic和Tag在消息分类上起到互补作用。Topic用于区分不同类型的消息,而Tag则用于在同一Topic下进一步区分不同目的的消息。
队列:
RocketMQ的队列是消息存储和传输的实际容器,也是RocketMQ消息的最小存储单元。
集群消费模式
同一个消费者组(Consumer Group)中的多个消费者实例会共同分担同一个主题(Topic)下的消息。RocketMQ会将消息分发给不同的消费者实例,但保证同一条消息只会被该消费者组中的一个消费者消费。
特点:
- 负载均衡:自动实现消息的负载均衡,提高消息处理效率和吞吐量。
- 容错性:如果一个消费者实例出现故障,其他实例可以继续处理消息,保证消息不会丢失。
- 消费进度管理:消费进度(即已消费消息的偏移量)会被存储在Broker端,以便在消费者重启后能够从上次消费的位置继续处理消息。
广播消费
同一个消费者组中的每个消费者实例都会收到订阅主题下的每一条消息的一个副本。即每条消息都会被发送到该消费者组中的所有消费者实例上,且每个消费者实例都会独立地消费这条消息。
特点
- 消息广播:每条消息都会被发送到所有消费者实例,确保消息被每个消费者都接收到。
- 无消费进度管理:在广播模式下,消费进度不需要被存储和共享,因为每个消费者实例都会消费到完整的消息集合。
- 无重试机制:如果消费者消费消息失败,RocketMQ在广播模式下不会自动重试,需要业务方自行处理。
并发消费
特点:
并发消费是指多个消费者实例同时处理消息,每个实例独立地处理一部分消息。
多个消费者实例可以并发地处理来自同一个主题和标签的消息,提高了消息处理的并发度。
顺序消费
RocketMQ 保证在同一个消息队列(Message Queue)中的消息按照发送的顺序被消费。
特点:
顺序消费是指消息按照一定的顺序进行处理,同一个消息队列上的消息按照发送顺序和消费顺序进行处理。顺序消费通常需要保证同一消息队列上只有一个消费者实例处理消息,以确保消息的顺序性。
二、RocketMQ应用
2.1 RocketMQ安装和配置
下载地址https://rocketmq.apache.org/download
解压
配置环境变量
启动nameservver服务
进入安装目录下的bin目录下,执行如下命令
start mqnamesrv.cmd
启动成功效果
启动broker服务
在bin目录下,执行如下命令
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
2.2 java访问Rocketmq工程
配置依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>5.3.2</version>
</dependency>
创建消息生产者
public class Producer {public static void main(String[] args) throws Exception {// 创建一个消息生产者,并设置一个消息生产者组DefaultMQProducer producer = new DefaultMQProducer("producer_group");// 消息发送失败重试次数producer.setRetryTimesWhenSendFailed(3);// 消息没有存储成功是否发送到另外一个brokerproducer.setRetryAnotherBrokerWhenNotStoreOK(true);// 指定 NameServer 地址producer.setNamesrvAddr("localhost:9876");// 初始化 Producer,整个应用生命周期内只需要初始化一次producer.start();// 以下代码是示例,用于发送一条消息try {for (int i = 0; i < 100; i++) {// 创建消息实例,指定主题、标签和消息体Message msg = new Message("TopicTest",// topic"TagA",// tag("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 设置消息持久化msg.setWaitStoreMsgOK(true);// 发送消息SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}} catch (Exception e) {e.printStackTrace();} finally {// 关闭生产者producer.shutdown();}}
}
创建消息消费者
public class Consumer {public static void main(String[] args) throws Exception {// 创建一个消息消费者,并设置一个消息消费者组DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");// 指定 NameServer 地址consumer.setNamesrvAddr("localhost:9876");// 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);// 订阅指定 Topic 下的所有消息consumer.subscribe("TopicTest", "*");consumer.setMessageModel(MessageModel.BROADCASTING); // 广播消费// 注册消息监听器consumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext context) -> {// 默认 list 里只有一条消息,可以通过设置参数来批量接收消息System.out.println("Receive New Messages: " + list.size());if (list != null) {for (MessageExt ext : list) {try {System.out.println(new Date() + " " + new String(ext.getBody(), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// 消费者对象在使用之前必须要调用 start 初始化consumer.start();System.out.println("消息消费者已启动");}
}
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- CONSUME_FROM_FIRST_OFFSET:初次从消息队列头部开始消费,即历史消息(还存在broker的),全部消费一遍,后续再启动接着上次消费的进度开始消费
- CONSUME_FROM_LAST_OFFSET:默认策略,初次从该队列最尾开始消费,即跳过历史消息,后续再启动接着上次消费的进度开始消费
- CONSUME_FROM_TIMESTAMP:从某个时间点开始消费,默认是半个小时以前,后续再启动着上次消费的进度开始消费
2.3 Spring Boot集成Rocketmq
第一步,引入依赖,完整pom如下
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.4.1</version><relativePath/> <!-- lookup parent in repository --></parent><artifactId>spring_boot_rocketmq_demo</artifactId><packaging>jar</packaging><name>spring_boot_rocketmq_demo Maven Webapp</name><dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- RocketMQ Spring Boot Starter --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.2</version></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>
第二步,在resources下编写application.yml
rocketmq:producer:group: my-groupname-server: localhost:9876
第三步,编写生产者
@Service
public class MessageProducer {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String topic, String message) {rocketMQTemplate.convertAndSend(topic, message);}@PostConstructpublic void init() {sendMessage("my-topic", "Hello, RocketMQ!");}
}
第四步,编写消费者
@Service
@RocketMQMessageListener(topic = "my-topic", consumerGroup = "my-group" )
public class MessageConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("接收到消息: " + message);}
}
第五步,编写启动类
@SpringBootApplication
public class RocketMQApplication {public static void main(String[] args) {SpringApplication.run(RocketMQApplication.class, args);}
}
2.4 Rocketmq的顺序消息消费
顺序消费者配置
顺序消息消费者:RocketMQ提供了顺序消息消费者(Orderly Message Consumer),通过指定消费模式为Orderly,可以确保消息按照发送的顺序进行消费。
消费者配置:在消费者端,需要配置为顺序消费者,并确保每个队列只由一个消费者线程消费。这通常是通过设置消费者组的实例数量与Topic的消息队列数量相等来实现的,即每个消费者实例对应一个消息队列。
消息消费控制
单线程消费:在顺序消费者模式下,RocketMQ会保证同一个队列中的消息按照发送的顺序被同一个消费者线程顺序消费。这意味着在消费过程中,前一个消息被完全处理之前,后一个消息不会被消费。
消费状态管理:RocketMQ通过维护消费进度(Consume Offset)来跟踪每个队列的消费情况,确保消息按顺序被消费且不会丢失。
使用spring boot开发顺序消息
第一步,建立生产者,使用syncSendOrderly发送顺序消息
@Service
public class MessageProducer_order {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMessage(String topic, String message) {rocketMQTemplate.syncSendOrderly(topic, message, "order-key");}@PostConstructpublic void init() {for (int i = 0; i < 100; i++) {sendMessage("my-topic-order", "Hello, RocketMQ-order!" + i);}}
}
第二步,建立消费者,设置consumeMode = ConsumeMode.ORDERLY顺序消费
@Service
@RocketMQMessageListener(topic = "my-topic-order",consumerGroup = "my-group-order",consumeMode = ConsumeMode.ORDERLY
)
public class MessageConsumer_order implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("接收到消息: " + message);}
}