场景题-3

如何实现一个消息队列

拆解分析主流的几种消息队列

1、基本架构

生产者Producer、消费者Consumer、Broker:生产者发送消息,消费者接受消息,Broker是服务端,处理消息的存储、备份、删除和消费关系的维护。

主题和分区:主题(topic)消息分类的标识,分区是主题的物理分割,有助于提高消息队列的吞吐量。

1.1 kafka:生产者将消息发布到kafka集群(broker)的一个或多个主题(topic),每个topic包含一个或多个分区(partition),消费者从kafka集群中的一个或多个主题消费消息,并将消费的偏移量(offset:分区中每条消息的位置信息,是一个单调递增且不变的值)提交回kafka以保证消息的顺序性和一致性。

kafka集群中,每个分区可以有多个副本,这些副本中包含一个Leader和多个Follower,只有Leader可以处理生产者和消费者的请求,Follower用于数据备份和容错,当Leader发生故障时,Follower提升为Leader。另外还有一个Zookeeper作为注册中心,协调服务,维护集群的状态和元数据信息。

1.2 RocketMQ:除了生产者Producer、消费者Consumer、Broker集群外,有NameServer(名称服务),负责维护Broker的元数据信息,Producerhr和Comsumer启动时需要连接到NameServer获取Broker的地址信息。每个Topic中可以有多个Queue(消息队列),Producer将消息发送到指定的Queue,Consumer从指定的Queue中l拉取消息。

1.3 RabbitMQ:生产者将消息发布到RabbitMQ的交换器(Exchange),交换器将消息路由到和它绑定(Binding)的队列(Queue),消费者从队列中获取消息。RabbitMQ的Broker就是一个个VHost(可以理解为操作系统的命名空间,里面对各资源进行隔离分组),每个VHost拥有自己的交换器、队列、绑定和权限设置,相互独立。

2、基本功能

2.1 消息存储:一般采用内存或者磁盘,内存读写快但可能丢数据;磁盘可以持久化消息但是读写速度相对慢一些。

2.2 消息传递协议:使用成熟的RPC框架(Dubbo或者Thrift)实现生产者和消费者与Broker之间的通信。

2.3 消息持久化和确认机制:一般做法是将消息存储在磁盘中,并且在消费者确认消费完成后再删除消息。

2。4 消息的分发方式:点对点或广播,点对对是每个消费者只会接收自己订阅的消息,广播是每个消费者都会接收到所有消息。

2.5 消息的传递方式:轮询、长连接、长轮询。一般都是支持推拉结合,或者基于拉实现推。

推消息就是消费者和中间件之间建立TCP长连接或者注册一个回调,当服务端数据发生变化,立即通过这个长连接或者回调将数据推送给消费者。这样的话好处就是能保证消息的实时性,但是一旦生产消息过快消息就会堆积在消费者端。

拉消息就是消费者轮询检查数据是否有变化,有变化的话就把数据拉过来。好处是消费者可以控制消息的数量和速度,缺陷就是消费者需要不断轮询,消息中间件也会因此有一定的压力。

另外有些生产环境下,不同环境的通信可能是单向的,此时就只能消费者采取拉的方式,因为长连接是双向通信。

实际使用时,很多中间件是结合使用长连接和轮询,又称长轮询,就是消费者向消息中间件发送一个长轮询请求,消息中间件如果有消息就直接返回,如果没有消息不会立即断开,等待一段时间,在超时时间到达之前有新小心就返回,否则就断开连接等待下一次长轮询。比如Kafka和RocketMQ。

3、消息的可靠性

其实主要就是保证消息不丢失,一般做法就是主从复制、集群模式或者分布式架构。

Kafka如何保证消息不丢失

发送端:发送消息时建议使用producer.send(msg,callback)方法。

Producer设置中acks=-1,表示Leader会等待消息被成功写进所有的ISR副本才认为producer请求成功。retries设置大于0;

Broker端:设置unclean.leader.election.enable = false,表示是否可以把非ISR集合中的副本选举为Leader副本,如果一个Broker落后原先的Leader太多,那么一旦它成为新的Leader则必然会丢失消息,所以这个参数设置为false。设置 min.insync.replicas > 1,控制的是消息至少要被写入到多少个副本才算是“已提交”。另外推荐设置成 replication.factor = min.insync.replicas + 1。

消费端:enable.auto.commit=false,采用手动提交位移的方式。

如上操作之后其实我们还是没法保证消息100%不丢失,首先生产者发送消息后如果kafka挂了,消息还没写进日志(同步到磁盘),那么消息会丢失。后续重试时如果生产者也挂了,那就没人知道这条消息失败了,也就没有重试了。其次,Kafka虽然引入了副本的机制,但是如果发生同步延迟,还没同步主副本就都挂了,那么消息也可能就丢失了。

RocketMQ如何保证消息不丢失

发送端:

同步发送消息的话将保存机制改成同步刷盘,因为Broker默认是先将消息保存在内存中,内存存储成功就返回结果给生产者,然后通过异步刷盘将消息存储到磁盘上,这时候如果机器挂了那么消息就可能丢失。

flushDiskType = SYNC_FLUSH

异步发送消息的话就需要生产者重写SendCallback的onSuccess和onException方法,用于Broker回调,方法中实现消息的确认和重发。

除此之外,RocketMQ集群部署通常采用的一主多从,并且采用主从同步方式做数据复制。Master在将数据同步到Slave节点后,再返回给生产者确认结果。

brokerRole = SYNC_MASTER

消费端:在业务逻辑的最后加上 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS。

RabbitMQ如何保证消息不丢失

首先,在消息从生产者到交换机Exchange和Exchange到Queue的过程中,为了保证消息发送成功,有两种方案:

一种是confirm机制,一种是事务机制。

confirm机制即注册回调来监听,开启Publisher Confirm(确保消息被Exchange成功接收和处理)和Publisher Return(处理消息在无法路由到队列时的异常)。

事务机制主要了解三个方法就行:

toSelect:将当前channel设置成transaction模式。

toCommit:提交事务。

toRollback:回滚事务。

其次,RabbitMQ接收到消息之后,消息也是先暂存内存,所以为了避免消息丢失,需要考虑的就是一个可靠持久化机制。

队列和交换机的持久化:设置durable参数为true来创建持久化交换机、持久化队列以及持久化绑定关系。

持久化消息:设置消息的deliveryMode为2来创建持久化消息,RabbitMQ才会将消息写入磁盘。

消费端同样的有相应的确认机制,消费者处理消息成功之后可以向MQ发送ack回执,MQ收到ack后才会删除消息。处理消息有异常则返回nack回执,MQ收到后可以重发消息,如果一直收不到返回则也会重试。

消息丢失解决方案

kafka、RocketMQ和RabbitMQ单靠自己其实都无法100%保证消息不丢失,针对消息可能的丢失我们可以引入一些其他机制,比如分布式事务、本地消息表等。

分布式事务:

就是保证数据的一致性(所有参与者在一次写操作过程中要么都成功要么都失败),可分强一致性和最终一致性。

强一致性引入一个协调者,方案包括基于XA的二阶段(2PC)及三阶段提交(3PC)。2PC可以理解为第一阶段是协调者先询问参与者是可以发起事务提交操作,若参与者可以执行事务提交,那么就是进行事务操作,只是执行完没有还没commit或者rollback,如果参与者成功执行事务操作就返回YES,没成功就返回NO。第二阶段就是协调者接收到所有参与者的YES的反馈后,就给参与者发送commit请求,如果有反馈为NO,就发送rollback请求。然后参与者将ACK结果返回给协调者。

2PC最关键的一个问题就是在第二阶段,如果参与者和协调者都挂了,那么就可能出现数据不一致的问题。因此引入3PC(CanCommit,PreCommit,doCommit),就是将2PC的第一阶段中的事务操作也分离出来。3PC的问题就是如果由于网络原因,参与者在等待超时后就会commit,这样可能就与其他接收到abort命令执行回滚的参与者不一致了。

最终一致性:方案是基于可靠消息的最终一致性(本地消息表、事务消息)、最大努力通知以及TCC。

基于本地消息表实现分布式事务,这个方案主要思路其实就是将分布式事务拆为本地事务和消息事务。参与者A发送消息前先创建一个本地消息,在参与者A的DB中写入本地业务数据和本地消息数据,两者在一个事务中,这样业务成功则本地消息也一定写入了。然后参与者A基于本地消息调用MQ发送远程消息,参与者B接收后做业务处理且成功之后再联动修改本地消息的状态。这个流程中如果参与者A消息发送MQ失败,那么就可以通过定时任务扫描本地消息数据,对未成功的消息进行重新投递。如果是MQ发送消息失败,那么MQ的重试机制也就派上用场了。如果是最终修改本地消息状态失败,那么起码现在分布式系统中的业务数据是一致了,只是本地消息的状态不对,这种情况可以借助定时任务重新投消息,下游幂等消费再重新更改消息状态,或者本系统通过定时任务主动去查询下游系统的状态,如果已经成功则直接修改消息状态。

基于事务消息实现分布式事务,参考RocketMQ的事务消息实现,参与者A先向RocketMQ Broker发送一条half消息(半消息),半消息存储在Broker的事务消息日志中,半消息发送成功后参与者A执行本地事务,如果A执行本地事务成功则通知RocketMQ Broker提交事务消息,消息状态从prepared改为committed,消费者可以接收消息。如果本地事务失败则A通知RocketMQ Broker回滚事务消息,消息从事务日志删除。这个过程中如果RocketMQ Broker没有接收到A执行本地事务的结果那么就会进行回查,A自查后返回自查结果,如果在规定时间没有结果那么消息就变味unknow状态,此时A如果有了结果还可以向MQ发送commit或者rollback,但是如果一直没有结果,过期时间一到MQ就自动回滚事务消息,将其从事务消息日志中删除。

TCC就是Try-Confirm-Cancel,将分布式事务分解为若干小事务,每个事务都有Try、Confirm和Cancel三个操作。try阶段参与者执行本地事务,并对全局事务预留资源,返回执行标识。所有参与者都返回成功则协调者通知所有参与者提交事务,即confirm阶段,参与者在本地提交事务,并释放全局事务资源。如果任一参与者try阶段返回失败则协调者通知所有参与者回滚。这里面就会有两个问题:空回滚和事务悬挂。空回滚就是try没成功也要执行回滚,注意处理逻辑。事务悬挂就是由于网络原因可能某个节点的try还没收到,而其他节点触发了cancel,然后这个节点先收到cancel进行了空回滚之后又收到了try并执行了,那么这个节点的try占用的资源就没法释放。解决方案就是引入一张分布式事务记录表,每个参与者都可以在本地事务的执行过程中同时记录一次分布式事务的操作记录。

除上述方案还有分布式事务的组件,如Seata。Seata包含三部分:Transaction Coordinator(TC),Transaction Manager™,Resource Mabager(RM),TC维护整个事务的全局状态,负责通知RM进行提交或回滚;TM可视为微服务中的聚合服务,开启一个全局事务或者提交或回滚一个全局事务;RM可对应微服务架构中的某个微服务,对应一个事务分支,负责执行事务分支的操作。TM接收到用户请求后调TC开启全局事务并从TC获得一个XID;TM通过RPC/Restful调用各RM并把XID传递过去;各RM接收到XID,在TC注册事务分支;TM根据所有调用全部完成后的状态确定是Commit还是Rollbask,将结果通知TC。TC协调各RM进行Commit或者Rollbask。

4、消息的高性能

性能这块可以参考kafka的设计,引入一些批量操作、顺序写入和零拷贝之类的技术。

消息发送

批量发送、异步发送、消息压缩、并行发送(数据分布在不同的分区,生产者并行发送消息)。

消息存储

1、零拷贝:一次IO流程可以简单概括有磁盘数据copy到内核缓冲区(页缓存),内核态中的数据copy到用户态中,用户态数据copy到内核态中(socket缓存),内核态缓冲区数据copy到网卡中。零拷贝就是通过各种技术来减少数据copy的次数或者说减少CPU参与数据拷贝的次数。

实现方式有mmap、sendfile、dma、directI/O等/

2、磁盘顺序写入

3、页缓存

4、系数索引:kafka存储消息是通过分段的日志文件,每个分段有自己的索引文件。

5、分区和副本:kafka采用分区和副本的机制,可以将数据分散到多个节点上进行处理。具体可以了解下ISR机制,即同步副本。kafka中每个主题可以有多个副本,ISR是与主副本保持同步的副本集合。当消息写入Kafka的分区时,首先会写入Leader,然后Leader将消息复制给ISR中的所有副本,只有当ISR中的所有副本都成功接收到并确认了消息之后,主副本才会认为消息已成功提交。

消息消费

消费者群组、并行消费、批量拉取

5、扩展功能

顺序消息
kafka顺序消息

kafka的一个topic下有多个partition,当生产者向某个partiton发送消息时,消息被追加到该partiton的日志文件中,并且分配一个唯一的offset,文件读写是有序的。当消费者从该分区消费消息时,会从该分区最早的offset开始读取消息。所以同一个partiton下的消息是有序的。

所以想要实现消息顺序消费,那么一个topic下只创建一个partition,或者消息被发送到同一个partiton。

要想实现消息发送到同一个partition,可以了解下DefaultPartitoner这个类,实现方式有三种:

一是在key为null的话直接指定partiton。

二是指定key,这样同样的ykey经过hash之后还是会指向同一个partiton编号。

三是自己写一个分区器类,实现Partitoner接口,重写partition方法,在生产者的配置中指定使用自己写的分区器类。

RocketMQ的顺序消息

RocketMQ是基于队列的顺序消费,同一个队列的消息可以做到有序。

生产者需要同步发送消息,并且在send方法中传入一个MessageQueueSelector,这个MessageQueueSelector中需要实现一个select方法,用来定义要把消息发送到哪个MessageQueue。

消息有序进入同一个队列之后,要保证顺序消费,需要加三把锁,先锁定Broker上的MessageQueue,确保消息只会投递给一个消费者,对本地的MessageQueue加锁,确保只有一个线程能处理这个消息队列,对存储消息的ProcessQueue加锁,确保在重平衡的过程中不会出现重复消费。值得注意的是,多次加锁虽然能做到顺序消费,但这无疑会降低系统的吞吐量,可能会导致消息阻塞。

延迟消息
RabbitMQ延迟消息

死信队列:给消息设置一个TTL,到期后消息进入死信队列,监听死信队列消费消息。存在问题是可能造成对头阻塞。因为RabbitMQ只定期扫描队头消息是否过期,如果队头消息没过期,队列中的消息即使过期了也不会进入死信队列,一直被阻塞。另外这个方案实现也比较麻烦。

RabbitMQ插件: 对版本有要求,3.6.12版本开始支持的。基于rebbitmq_delayded_message_exchange插件,消息不是在队列中,而是一个基于Erlang开发的Mnesia数据库中,通过一个定时器去查询需要被投递的消息,投递到x-delayed-message交换机中。这个插件支持的最大延迟时间有限。

RocketMQ延迟消息

基于Timer定时器:先将消息存处在内存上,叨叨指定时间后再写入磁盘。

基于时间轮(5.0版本):将消息按照过期时间放置在不同的槽位,到达过期时间就将该槽位的所有消息投递给消费者。

事务消息

参考上文解决消息丢失中的RocketMQ的事务消息。

重复消费
kafka如何防止重复消费

1、kafka中消费者必须至少加入一个消费者组,同组消费者共享消费者的负载,因此只要有一个消费者在消费某条消息,其他消费者就不会接收这个消息。

2、手动提交位移控制+处理结果去重。

3、客户端做幂等控制:一锁二判三更新之类。

4、Kafka的Exactly-Once消费语义(生产者开启幂等+事务 或者 消费者端精确控制)

RabbitMQ如何防止重复消费

根据发送消息时设置的唯一标识在消费者端做幂等控制。

消息堆积

一般是因为客户端本地消费过程中消费时间过长或者消费并发小。

如上解决方案有:

增加消费者数量,提升消费速度(引入线程池,本地存储消息即返回成功后续慢慢消费等)、清理过期消息(评估过期消息和一些一直无法成功的消息是否可清理),调整一些关于参数比如队列数、消息拉取间隔时间等(具体根据MQ类型修改调试)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/diannao/85985.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue2 项目中 npm run dev 运行98% after emitting CopyPlugin 卡死

今天在运行项目时,发现如下问题: 开始以为是node_modules依赖的问题,于是重新 npm install,重启项目后还是未解决。 在网上找了一圈发现有人说是 require引入图片地址没有写。在我的项目中排查没有这个问题,最后发现某…

73 LV的使用(XFS文件系统)

四 LV的使用 先创建一个LV01 lvcreate -L 10G lv01 datavg Logical volume "lv01" created. 将创建出来的LV01进行格式化 mkfs.xxx LV的名称(绝对路径) 绝对路径的组成:/dev/你创建VG的名字/LV的名字 mkfs.xfs /dev/datavg/lv01 挂载你的LV…

mybatis中判断等于字符串的条件怎么写

mybatis中判断等于字符串的条件怎么写_mybatis 等于字符串-CSDN博客 因为mybatis映射文件,是使用的ognl表达式,ognl是java语言(强类型语言),会把‘X’解析成字符,而不是字符串。 所以比较字符串相等使用是…

C语言实现绝对值差值函数

在C语言中&#xff0c;可以编写一个函数来计算两个数的差值的绝对值。以下是一个简单的实现&#xff1a; #include <stdio.h> #include <stdlib.h> // 用于abs函数&#xff08;如果需要&#xff09; // 方法1&#xff1a;使用标准库函数 int absoluteDifference1…

Three.js中AR实现详解并详细介绍基于图像标记模式AR生成的详细步骤

文档地址 Three.js中AR实现详解 以下是Three.js中实现AR功能的详细解析&#xff0c;涵盖技术原理、实现步骤、核心组件及优化策略&#xff1a; &#x1f9e9; 一、技术基础 AR.js框架的核心作用 AR.js是Three.js实现AR的基石&#xff0c;提供以下核心能力&#xff1a; 多模…

Vue3.5 企业级管理系统实战(二十三):权限指令

在实际应用场景中&#xff0c;常常需要依据用户角色对按钮的操作权限实施控制。实现这一控制主要有两种方式&#xff1a;一种是借助前端指令基于角色进行权限管控&#xff0c;另一种是通过后台返回对应的权限属性来实现精细化控制。本文聚焦于前端权限指令展开探讨。 1 权限指…

软考 系统架构设计师系列知识点之杂项集萃(81)

接前一篇文章&#xff1a;软考 系统架构设计师系列知识点之杂项集萃&#xff08;80&#xff09; 第145题 商业智能是企业对商业数据的搜集、管理和分析的系统过程&#xff0c;主要技术包括&#xff08;&#xff09;。 A. 数据仓库、联机分析和数据挖掘 B. 数据采集、数据清洗…

深入浅出Java ParallelStream:高效并行利器还是隐藏的陷阱?

在Java 8带来的众多革新中&#xff0c;Stream API彻底改变了我们对集合操作的方式。而其中最引人注目的特性之一便是parallelStream——它承诺只需简单调用一个方法&#xff0c;就能让数据处理任务自动并行化&#xff0c;充分利用多核CPU的优势。但在美好承诺的背后&#xff0c…

SQL Transactions(事务)、隔离机制

目录 Why Transactions? Example: Bad Interaction Transactions ACID Transactions COMMIT ROLLBACK How the Transaction Log Works How Data Is Stored Example: Interacting Processes Interleaving of Statements Example: Strange Interleaving Fixing the…

第R9周:阿尔茨海默病诊断(优化特征选择版)

文章目录 1. 导入数据2. 数据处理2.1 患病占比2.2 相关性分析2.3 年龄与患病探究 3. 特征选择4. 构建数据集4.1 数据集划分与标准化4.2 构建加载 5. 构建模型6. 模型训练6.1 构建训练函数6.2 构建测试函数6.3 设置超参数 7. 模型训练8. 模型评估8.1 结果图 8.2 混淆矩阵9. 总结…

OpenLayers 分屏对比(地图联动)

注&#xff1a;当前使用的是 ol 5.3.0 版本&#xff0c;天地图使用的key请到天地图官网申请&#xff0c;并替换为自己的key 地图分屏对比在WebGIS开发中是很常见的功能&#xff0c;和卷帘图层不一样的是&#xff0c;分屏对比是在各个地图中添加相同或者不同的图层进行对比查看。…

【大模型】大模型数据训练格式

1. SFT&#xff08;有监督微调&#xff09; 1.1 数据格式 JSONL&#xff08;每行一个 JSON 对象&#xff09;最为流行&#xff1b;也可用 CSV&#xff0f;TSV&#xff0c;但 JSONL 更灵活。字段设计 prompt&#xff1a;用户输入或任务指令&#xff08;通常以“系统指令&#…

[论文阅读] 人工智能 | 利用负信号蒸馏:用REDI框架提升LLM推理能力

【论文速读】利用负信号蒸馏&#xff1a;用REDI框架提升LLM推理能力 论文信息 arXiv:2505.24850 cs.LG cs.AI cs.CL Harnessing Negative Signals: Reinforcement Distillation from Teacher Data for LLM Reasoning Authors: Shuyao Xu, Cheng Peng, Jiangxuan Long, Weidi…

Cursor 1.0正式推出:全面解析你的AI 编程助手

目录 前言 一、BugBot&#xff1a;你的私人代码审查专家 二、Background Agent&#xff1a;7x24小时在线的云端开发伙伴 三、Jupyter Notebook 深度集成&#xff1a;数据科学家的福音 四、记忆功能 (Memories)&#xff1a;让 AI 更懂你的项目 五、MCP 与工具生态&#xf…

QILSTE 精巧电子元件H4-108FO/5M解析

型号&#xff1a;H4-108FO/5M 在电子元件的浩瀚宇宙中&#xff0c;H4-108FO/5M 仿佛一颗散发着独特光芒的恒星&#xff0c;其参数和特性交织成一张错综复杂的网络&#xff0c;既令人困惑又充满惊喜。这款型号的产品&#xff0c;以其 1.60.80.4mm 的微小尺寸&#xff0c;却蕴含…

第2章_Excel_知识点笔记

Excel 知识点总结&#xff08;第2章&#xff09; 来自&#xff1a;第2章_Excel_知识点笔记&#xff0c;原笔记 基础操作 状态栏&#xff1a;快速查看计数/求和等数据&#xff08;右键可配置&#xff09;。筛选&#xff08;CtrlShiftL&#xff09;&#xff1a;按条件显示数据…

【学习笔记】单例类模板

【学习笔记】单例类模板 一、单例类模板 以下为一个通用的单例模式框架&#xff0c;这种设计允许其他类通过继承Singleton模板类来轻松实现单例模式&#xff0c;而无需为每个类重复编写单例实现代码。 // 命名空间&#xff08;Namespace&#xff09; 和 模板&#xff08;Tem…

yolo 训练 中间可视化

yolo训练前几个batch&#xff0c;会可视化target: if plots and ni < 33:f save_dir / ftrain_batch{ni}.jpg # filenameplot_images(imgs, targets, paths, f, kpt_labelkpt_label)

【Linux】虚拟机代理,自动化脚本修改~/.bashrc

二选一执行 {echo ""echo "# Cla Verge代理设置 "echo "alias use-proxyexport http_proxy\"socks5h://192.168.88.1:7897\"; export https_proxy\"socks5h://192.168.88.1:7897\""echo "alias use-proxy-httpexport…

JavaScript 原型与原型链:深入理解 __proto__ 和 prototype 的由来与关系

引言 在 JavaScript 的世界中&#xff0c;原型和原型链是理解这门语言面向对象编程&#xff08;OOP&#xff09;机制的核心。不同于传统的基于类的语言如 Java&#xff0c;JavaScript 采用了一种独特的原型继承机制。本文将深入探讨 __proto__ 和 prototype 的由来、关系以及它…