可靠消息最终一致性分布式事务解决方案

之前文章写过主流的一些 分布式事务的解决方案,但其实工作中很少有一些高并发的业务中去使用这些方案,因为对于高并发的场景来说,引入这些方案的性能损耗太大,且对系统事务侵入性太强影响系统稳定性。

所以在高并发的业务中,如果对实时性可以容忍秒级的延迟,那么使用最终一致性事务方案是最合适的选择。

可靠消息最终一致性事务

可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。

可靠消息最终一致性方案要解决以下几个问题:

  1. 本地事务与消息发送的原子性问题:即实现本地事务和消息发送的原子性,要么都成功,要么都失败。这是实现可靠消息最终一致性方案的关键问题。
  2. 事务参与方接收消息的可靠性:事务参与方必须能够从消息队列接收到消息,如果接收消费消息失败需要重复尝试消费,即实现最终消费成功。
  3. 消息重复消费的问题:由于步骤2的存在,若某一个消费节点出现消费超时但是处理逻辑执行成功了,此时由于消息中间件会重复投递就导致了消息的重复消费。要解决消息重复消费的问题就要实现事务参与方的方法幂等性。
     

1.RocketMQ事务消息实现

RocketMQ独有的事务回调扩展可以比较轻松的实现最终一致性事务。

假设有两个本地事务组成当前的全局事务,实现流程如下:

  1. 先发送half消息到MQ,MQ服务端收到后保存消息,但是half是对消费端不可见状态。
  2. MQ回调发送者的事务事件回调接口,这时候在这个接口中我们执行本地事务。
  3. 如果本地事务执行成功,就提交MQ的事务,此时MQ会把消息设置为可消费状态,否则执行事务回滚,本地事务失败且消息也会被删除。
  4. 如果长时间未响应事务提交,MQ服务端会回查发送者的事务状态,可以做补偿提交。

上述流程保证了第一个本地事务与消息发送的一致性,即本地事务发送成功后消息才可消费。基于MQ的分布式事务实现的是最终一致性并不保证实时性,所以对于消费者而言只要确保收到消息完成第二个本地事务的提交就可以了。 

在这里插入图片描述 

 1.发送事务消息

@RestController
@Slf4j
public class AccountInfoController {@Autowiredprivate AccountInfoService accountInfoService;@GetMapping(value = "/transfer")public String transfer(@RequestParam("accountNo")String accountNo, @RequestParam("amount") Double amount){//创建一个事务id,作为消息内容发到mqString tx_no = UUID.randomUUID().toString();AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no);//发送消息accountInfoService.sendUpdateAccountBalance(accountChangeEvent);return "转账成功";}
}

2.RocketMQLocalTransactionListener 接口(本地事务执行和消息事务提交,消息事务回查补偿提交)


@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg_bank1")
public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {@AutowiredAccountInfoService accountInfoService;@AutowiredAccountInfoDao accountInfoDao;//事务消息发送后的回调方法,当消息发送给mq成功,此方法被回调@Override@Transactionalpublic RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {try {//解析message,转成AccountChangeEventString messageString = new String((byte[]) message.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String accountChangeString = jsonObject.getString("accountChange");//将accountChange(json)转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//执行本地事务,扣减金额accountInfoService.doUpdateAccountBalance(accountChangeEvent);//当返回RocketMQLocalTransactionState.COMMIT,自动向mq发送commit消息,mq将消息的状态改为可消费return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}//MQ回调事务状态回查接口,查询是否扣减金额@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message message) {//解析message,转成AccountChangeEventString messageString = new String((byte[]) message.getPayload());JSONObject jsonObject = JSONObject.parseObject(messageString);String accountChangeString = jsonObject.getString("accountChange");//将accountChange(json)转成AccountChangeEventAccountChangeEvent accountChangeEvent = JSONObject.parseObject(accountChangeString, AccountChangeEvent.class);//事务idString txNo = accountChangeEvent.getTxNo();log.info("事务状态回查");int existTx = accountInfoDao.isExistTx(txNo);if(existTx>0){return RocketMQLocalTransactionState.COMMIT;}else{return RocketMQLocalTransactionState.UNKNOWN;}}
}

3.本地事务类

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {@AutowiredAccountInfoDao accountInfoDao;@Autowired(required = false)RocketMQTemplate rocketMQTemplate;//向mq发送转账消息@Overridepublic void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {//将accountChangeEvent转成jsonJSONObject jsonObject =new JSONObject();jsonObject.put("accountChange",accountChangeEvent);String jsonString = jsonObject.toJSONString();log.info(jsonString);//生成message类型Message<String> message = MessageBuilder.withPayload(jsonString).build();//发送一条事务消息/*** String txProducerGroup 生产组* String destination topic,* Message<?> message, 消息内容* Object arg 参数*/rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1","topic_txmsg",message,null);}//更新账户,扣减金额@Override@Transactionalpublic void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {//幂等判断if(accountInfoDao.isExistTx(accountChangeEvent.getTxNo())>0){return ;}//扣减金额accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() * -1);//添加事务日志accountInfoDao.addTx(accountChangeEvent.getTxNo());if(accountChangeEvent.getAmount() == 3){throw new RuntimeException("人为制造异常");}}
}

2.本地消息表实现

由于有些项目或者公司架构中不使用RocketMQ,无法通过事务消息机制来实现。那么使用 本地消息表+普通MQ中间件 也可以实现可靠消息最终一致性事务。

本地消息表实现方案的核心是新建一个本地消息数据库表,通过本地数据库事务把业务操作和消息数据绑定在一起,然后通过异步(定时任务重试)将消息发送至消息中间件,最终待确认消息发送给消费方被成功消费。

CREATE TABLE transaction_messages (id BIGINT PRIMARY KEY AUTO_INCREMENT,message_id VARCHAR(64) NOT NULL UNIQUE,topic VARCHAR(128) NOT NULL,body TEXT NOT NULL,status TINYINT NOT NULL COMMENT '0-待发送 1-已发送 2-发送失败',retry_count INT DEFAULT 0,created_at DATETIME NOT NULL,updated_at DATETIME NOT NULL,INDEX idx_status_retry (status, retry_count)
);

该方案的流程其实就是模拟RocketMQ事务消息的流程,通过本地消息数据的多个状态来实现本地事务与消息发送的原子性。具体如下:

  1. 在本地事务中完成业务操作后,插入一条状态为 待发送的 的消息记录
  2. 异步或定时任务拿到 待发送的消息(能拿到说明本地事务提交成功),处理消息发送,发送完成后更新状态为已发送(这里无论是发送失败还是更新失败都会重试,最终保证成功)
  3. 事务参与方接收到消息并完成消费,保证幂等和最终消费成功

方案瑕疵

我觉得这个方案是跟RocketMQ方案相比,无法保证发送成功后的消息一定能投放给消费者。

因为高并发系统建设中,出于性能考虑大部分场景在使用消息中间件时都是设置异步复制和刷盘,这就意味着如果出现MQ服务宕机的情况,就可能会出现未复制或未落盘的数据丢失的情况。


如果要解决这个问题则需要基于上面的流程增加核对流程,事务参与方消费完成后记录消费记录,定期核对发送和消费记录,对发送未消费的消息进行补偿发送处理。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/915128.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/915128.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ISIS基础

拓扑计算方式 模型 支持的网络 支持的地址OSPF SPF TCP/IP IP网络 IPv4地址ISIS SPF OSI CLNP网络 NSAP地址集成ISIS SPF TCP/IP IP网络 NSAP地址&#xff0c;但可以支持IPv4地址12. …

基于ASP.NET+SQL Server实现(Web)排球赛事网站

排球赛事网的设计与实现摘要随着近几年来计算机技术、网络技术及相应软件技术的迅猛发展&#xff0c;人们的生活已越来越离不开计算机了&#xff0c;而且总是要花费很多时间在它上面。一直以来&#xff0c;排球作为一项大众喜爱的运动&#xff0c;得到广泛传播。随着各项排球赛…

【PTA数据结构 | C语言版】根据后序和中序遍历输出前序遍历

本专栏持续输出数据结构题目集&#xff0c;欢迎订阅。 文章目录题目代码题目 本题要求根据给定的一棵二叉树的后序遍历和中序遍历结果&#xff0c;输出该树的前序遍历结果。 输入格式: 第一行给出正整数 n (≤30)&#xff0c;是树中结点的个数。随后两行&#xff0c;每行给出…

Java HashMap高频面试题深度解析

在 Java 面试中&#xff0c;HashMap 是必问的核心知识点&#xff0c;以下是高频问题和深度解析框架&#xff0c;助你系统性掌握&#xff1a;一、基础概念HashMap 的本质是什么&#xff1f; 基于哈希表的 Map 接口实现&#xff0c;存储键值对&#xff08;Key-Value&#xff09;非…

GitHub Pages无法访问以点号.开头的目录

目录 前言 Jekyll 是什么 启用访问 总结 前言 一些前端项目经常会使用GitHub Pages进行部署展示&#xff0c;但是GitHub Pages 使用的是 Jekyll 引擎&#xff0c;对 Jekyll 引擎不熟悉的小伙伴就会出现如文章标题所言的情况。 Jekyll 是什么 Jekyll 是 GitHub Pages 默认…

JS JSON.stringify介绍(JS序列化、JSON字符串 )(遍历输入值的所有可枚举属性,将其转换为文本表示)缓存序列化、状态管理与时间旅行、replacer

文章目录JSON.stringify 全解析1. 基本概念2. 序列化原理1. 对于原始类型&#xff0c;直接转换为对应的字符串表示2. 对于对象和数组&#xff0c;递归处理其每个属性或元素3. 应用特殊规则处理日期、函数、Symbol 等特殊类型4. 检测并防止循环引用5. 应用 replacer 函数或数组进…

SQLite / LiteDB 单文件数据库为何“清空表后仍占几 GB”?——原理解析与空间回收实战

关键词&#xff1a; SQLite、LiteDB、VACUUM、WAL、auto_vacuum、文件瘦身、数据库维护在嵌入式或桌面、IoT 网关等场景&#xff0c;很多同学都会选择单文件数据库&#xff08;SQLite、LiteDB、SQL CE…&#xff09;。 最近群里一位朋友反馈&#xff1a;“我的 test.db 已经把业…

如何加固Web服务器的安全?

Web服务器是用户和公司联系的桥梁&#xff0c;Web服务器为用户交付网页内容和提供Web应用。正因为Web服务器是面向互联网的&#xff0c;所以成为了网络的攻击经常利用的一个入口。Web 服务器是企业数字化转型的 “前沿阵地”&#xff0c;其安全性不仅关乎技术层面的稳定运行&am…

MyBatis:配置文件完成增删改查_添加

1 实现添加操作 编写接口方法:Mapper接口编写sql语句&#xff1a;sql映射文件<insert id"add">insert into tb_brand(brand_name,company_name,ordered,description,status)values(#{brandName},#{companyName},#{ordered},#{description},#{status});</ins…

SGLang 推理框架核心组件解析:请求、内存与缓存的协同工作

SGLang 推理框架核心组件解析&#xff1a;请求、内存与缓存的协同工作 在当今大语言模型&#xff08;LLM&#xff09;服务的浪潮中&#xff0c;高效的推理框架是决定服务质量与成本的关键。SGLang 作为一个高性能的 LLM 推理和部署库&#xff0c;其内部精巧的设计确保了高吞吐量…

React学习笔记——Day2打卡

1、React表单控制 1.1 受控绑定 概念&#xff1a;使用React组件的状态&#xff08;useState&#xff09;控制表单的状态 完整示例&#xff1a; function App(){/* 1. 准备一个React状态值 */ const [value, setValue] useState()return (/* 2. 通过value属性绑定状态&#x…

用例测试方法5,6:状态迁移图和因果图

状态迁移图通过描绘系统的状态及引起状态转换的事件&#xff0c;来表示系统的行为例如&#xff1a;订机票l向航空公司打电话预定机票—>此时机票信息处于“完成”状态顾客支付了机票费用后—>机票信息就变为“已支付”状态旅行当天到达机场后&#xff0c;拿到机票后—>…

linux 脚本解释

if [ $? -ne 0 ]; thenecho "错误: 无法关闭现有 Tomcat 实例&#xff0c;终止启动流程!" >&2exit 1fi$? 是shell中的特殊变量&#xff0c;表示上一个命令的退出状态码-ne 0 表示"不等于0"(在Unix/Linux中&#xff0c;0通常表示成功&#xff0c;非…

Glary Utilities(系统优化工具) v6.20.0.24 专业便携版

GlaryUtilities 允许你清理系统垃圾文件&#xff0c;无效的注册表&#xff0c;上网记录&#xff0c;删除插件&#xff0c;查找重复文件&#xff0c;优化内存&#xff0c;修理或删除快捷方式&#xff0c;管理windows启动程序&#xff0c;卸载软件&#xff0c;安全删除文件&#…

VScode链接服务器一直卡在下载vscode服务器/scp上传服务器,无法连接成功

终极方案&#xff08;强力推荐&#xff0c;亲测有效&#xff0c;链接只需5秒钟&#xff09;&#xff1a;本地下载复制到mkdir -p ~/.vscode-server/bin/<commit_hash>里面 <commit_hash>可以从帮助->关于里面找到&#xff0c;如下所示 版本: 1.96.2 提交: fa…

基于Spring Boot的农村农产品销售系统设计与实现

随着现代农业的快速发展,传统农产品的销售模式逐渐暴露出信息闭塞、流通效率低和中间环节多等问题。为了打破这些瓶颈,我基于Spring Boot框架开发了一套农产品销售系统,旨在构建一座连接农民与消费者之间的数字桥梁,让优质农产品更高效地直达用户餐桌。 一、项目背景与目标…

Mysql默认存储引擎InnoDB和底层数据结构

在黑马点评项目实战中&#xff1a;谈到了为什么不推荐使用mysql的字段自增作为订单id传递给客户端&#xff0c;让我想到了Mysql的​​存储引擎​​和​​底层数据结构​​究竟是什么&#xff1f;它是如何实现自增的&#xff1f;本文主要是深度解析 MySQL 默认存储引擎 InnoDB 与…

原点安全签约金网络数科,共建一体化数据安全防护体系

金网络正式携手原点安全&#xff0c;基于原点安全一体化数据安全平台&#xff08;uDSP&#xff09;&#xff0c;启动企业数据安全平台建设项目&#xff0c;围绕数据资产盘点、敏感数据识别与分类分级、数据访问权限管控、数据动态脱敏、数据安全审计与风险监测等关键能力建设&a…

mix-blend-mode的了解使用

mix-blend-mode 是 CSS 的一个属性&#xff0c;用于控制元素的内容&#xff08;如文本、图像、背景等&#xff09;如何与其 父元素 或 背景 进行混合。它类似于图形设计软件&#xff08;如 Photoshop&#xff09;中的图层混合模式&#xff0c;可以实现各种视觉效果&#xff1b;…

vue自定义指令bug

问题描述&#xff1a;页面加载时&#xff0c;报已下错误。同时&#xff0c;页面数据不显示环境介绍&#xff1a;已经添加了vue自定义指令permission&#xff0c;实现如下&#xff0c;用以控制元素显示权限app.directive(permission, (el, binding) > {if (!store.hasPermiss…