007 订单支付超时自动取消订单(rabbitmq死信队列 mybatis)

文章目录

  • 死信队列
    • RabbitMQ 配置类 RabbitMQConfig.java
    • 生产者 OrderTimeoutProducer.java
    • 消费者 OrderTimeoutConsumer.java
    • 应用配置 application.yml
    • pom.xml 依赖
    • 实体类 Order.java(不变)
    • Mapper 接口 OrderMapper.java(不变)
    • 服务层 OrderService.java(不变)
    • 缓存配置 CacheConfig.java(不变)
    • 对账服务 ReconciliationTask.java(不变)
    • 控制器 OrderController.java(不变)

死信队列

在代码中,processedMsgCache的类型是Cache<String, Boolean>,键是消息ID,值是Boolean。当处理成功时,会执行processedMsgCache.put(msgId, true),将消息ID存入缓存,并设置有效期1小时,最大容量10000条。这样,在1小时内,如果同样的消息再次被处理,缓存中已经有记录,就会直接ACK并跳过处理。

不过,这里需要注意的是,消息ID是否唯一。在RabbitMQ中,消息ID默认可能不是唯一的,生产者发送消息时可以设置messageId。如果生产者没有明确设置,可能无法保证唯一性,这会导致幂等性检查失效。因此,需要确保生产者发送消息时设置了唯一的messageId,或者在消费者端使用其他唯一标识,比如消息内容中的orderId结合其他属性。

另外,缓存的过期时间设置为1小时,这可能与业务场景有关。如果消息的存活时间超过1小时,可能会有重复处理的风险。需要根据实际消息的存活时间来调整缓存的过期时间,确保覆盖消息可能被重新投递的时间窗口。

还有一个问题是,缓存是本地缓存,如果消费者有多个实例,每个实例的缓存是独立的。这可能导致不同的实例处理同一条消息,因为一个实例处理过,但另一个实例的缓存中没有记录。这种情况下,本地缓存的幂等性检查可能不够,需要考虑分布式缓存,比如Redis,来保证全局唯一性。但根据当前代码,在单实例或允许短暂重复的场景下使用本地缓存。

总结来说,幂等性检查的逻辑是通过缓存已处理消息的ID,在消息处理前检查是否已存在,存在则跳过处理,避免重复执行。这适用于消息队列保证至少一次投递,但业务需要确保幂等的场景。

                      +---------------------+|   RabbitMQ Message  ||  (携带唯一messageId)   |+----------+----------+|v
+----------------+       +-------+-------+       +-----------------+
|  消息到达消费者   | ----> | 检查缓存是否存在 | ----> | 存在:直接ACK丢弃消息 |
+----------------+       +-------+-------+       +-----------------+|| 不存在v+-------+-------+       +-----------------+| 执行业务逻辑处理  | ----> | 成功:存入缓存并ACK |+---------------+       +-----------------+

缓存过期时间(1小时)> 消息最大存活时间(30分钟+重试时间)
计算公式:缓存过期时间 = 消息TTL + 最大重试时间 * 重试次数 + 缓冲时间

缓存击穿空值缓存对不存在的key也进行缓存(需设置较短过期时间)
缓存穿透布隆过滤器在缓存前增加过滤层
消费者重启持久化存储配合数据库记录处理状态
网络分区最终一致性依赖对账服务修正状态
组件类型作用说明
processedMsgCacheCaffeine缓存存储已处理消息的唯一标识
messageId字符串消息唯一标识(需生产者保证唯一性)
deliveryTag长整型RabbitMQ消息投递标识
sequenceDiagramparticipant RabbitMQparticipant Consumerparticipant Cacheparticipant DBRabbitMQ->>Consumer: 投递消息(messageId=123)Consumer->>Cache: 查询messageId=123alt 存在缓存Cache-->>Consumer: 返回trueConsumer->>RabbitMQ: 发送ACKelse 无缓存Consumer->>DB: 执行取消操作alt 操作成功Consumer->>Cache: 写入messageId=123Consumer->>RabbitMQ: 发送ACKelse 操作失败Consumer->>RabbitMQ: 发送NACK(requeue=true)endend

RabbitMQ 配置类 RabbitMQConfig.java

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// 订单超时相关配置public static final String ORDER_DELAY_EXCHANGE = "order.delay.exchange";public static final String ORDER_DELAY_QUEUE = "order.delay.queue";public static final String ORDER_DELAY_ROUTING_KEY = "order.delay";// 死信队列配置public static final String ORDER_DEAD_LETTER_EXCHANGE = "order.dead.letter.exchange";public static final String ORDER_DEAD_LETTER_QUEUE = "order.dead.letter.queue";public static final String ORDER_DEAD_LETTER_ROUTING_KEY = "order.dead.letter";// 声明延迟队列(设置死信参数)@Beanpublic Queue orderDelayQueue() {return QueueBuilder.durable(ORDER_DELAY_QUEUE).withArgument("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE).withArgument("x-dead-letter-routing-key", ORDER_DEAD_LETTER_ROUTING_KEY).build();}// 声明延迟交换机@Beanpublic DirectExchange orderDelayExchange() {return new DirectExchange(ORDER_DELAY_EXCHANGE);}// 绑定延迟队列到交换机@Beanpublic Binding delayBinding() {return BindingBuilder.bind(orderDelayQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);}// 声明死信队列@Beanpublic Queue deadLetterQueue() {return new Queue(ORDER_DEAD_LETTER_QUEUE, true);}// 声明死信交换机@Beanpublic DirectExchange deadLetterExchange() {return new DirectExchange(ORDER_DEAD_LETTER_EXCHANGE);}// 绑定死信队列到交换机@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(ORDER_DEAD_LETTER_ROUTING_KEY);}// JSON 消息转换器@Beanpublic MessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}
}

生产者 OrderTimeoutProducer.java

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;@Component
public class OrderTimeoutProducer {private final RabbitTemplate rabbitTemplate;public OrderTimeoutProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}public void sendTimeoutMessage(String orderId) {// 设置消息过期时间为30分钟(单位:毫秒)MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setExpiration("1800000");return message;}};rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_DELAY_EXCHANGE,RabbitMQConfig.ORDER_DELAY_ROUTING_KEY,orderId,messagePostProcessor);}
}

消费者 OrderTimeoutConsumer.java

import com.github.benmanes.caffeine.cache.Cache;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;@Component
public class OrderTimeoutConsumer {private final OrderService orderService;private final Cache<String, Boolean> processedMsgCache;public OrderTimeoutConsumer(OrderService orderService, Cache<String, Boolean> processedMsgCache) {this.orderService = orderService;this.processedMsgCache = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.HOURS).maximumSize(10000).build();}@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)public void processMessage(Message message, Channel channel) throws IOException {String orderId = new String(message.getBody(), StandardCharsets.UTF_8);String messageId = message.getMessageProperties().getMessageId();long deliveryTag = message.getMessageProperties().getDeliveryTag();try {// 幂等性检查if (processedMsgCache.getIfPresent(messageId) != null) {channel.basicAck(deliveryTag, false);return;}boolean success = orderService.safeCancel(orderId);if (success) {processedMsgCache.put(messageId, true);System.out.println("订单超时取消成功: " + orderId);}channel.basicAck(deliveryTag, false);} catch (Exception e) {// 记录错误日志,重新放回队列channel.basicNack(deliveryTag, false, true);System.err.println("处理订单超时取消失败: " + orderId);e.printStackTrace();}}
}

应用配置 application.yml

spring:rabbitmq:host: ${RABBITMQ_HOST:localhost}port: 5672username: ${RABBITMQ_USER:guest}password: ${RABBITMQ_PASSWORD:guest}virtual-host: /connection-timeout: 5000template:retry:enabled: truemax-attempts: 3initial-interval: 1000mslistener:simple:acknowledge-mode: manual # 手动确认模式prefetch: 10 # 每次预取数量retry:enabled: truemax-attempts: 3initial-interval: 1000ms

pom.xml 依赖

<!-- 移除 RocketMQ 依赖 -->
<!-- 添加 RabbitMQ 依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

实体类 Order.java(不变)

public class Order {// 保持原有实现
}

Mapper 接口 OrderMapper.java(不变)

@Mapper
public interface OrderMapper {// 保持原有SQL操作
}

服务层 OrderService.java(不变)

@Service
public class OrderService {// 保持原有业务逻辑
}

缓存配置 CacheConfig.java(不变)

@Configuration
public class CacheConfig {// 保持原有缓存配置
}

对账服务 ReconciliationTask.java(不变)

@Component
public class ReconciliationTask {// 保持原有定时任务逻辑
}

控制器 OrderController.java(不变)

@RestController
@RequestMapping("/orders")
public class OrderController {// 保持原有API接口
}

关键差异对比

功能点RocketMQ 实现RabbitMQ 实现
延迟机制内置延迟级别TTL+死信队列
消息存储持久化到CommitLog内存+磁盘持久化
消费确认自动ACK手动ACK+重试机制
消息追踪原生支持消息轨迹需要额外实现
集群方案主从复制镜像队列

部署注意事项
队列初始化:确保首次启动时自动创建所需交换机和队列
消息持久化:所有队列声明时设置durable=true
监控配置:需要监控以下指标:
死信队列消息堆积量
消费者处理耗时
消息TTL过期情况
集群配置:建议至少3节点RabbitMQ集群
故障转移:配置镜像队列实现高可用

性能优化建议
批量处理:

@RabbitListener(queues = QUEUE, containerFactory = "batchFactory")
public void processMessages(List<Message> messages) {// 批量查询缓存Map<String, Boolean> existMap = cache.getAllPresent(messageIds);// 过滤已处理消息// 批量执行数据库操作
}

异步更新:

CompletableFuture.runAsync(() -> {processedMsgCache.put(messageId, true);
});

分级缓存:

Cache<String, Boolean> L1 = Caffeine.newBuilder().maximumSize(1000).build();
Cache<String, Boolean> L2 = RedisClient.getCache();

首先看CacheConfig.java,里面有一个orderStatusCache的Bean,使用Caffeine配置,最大大小10000,过期时间10分钟。然后在OrderService.java中,有一个注入的Cache<String, OrderStatus> orderCache。看起来这里应该用的是这个Bean,因为在OrderService里用了@RequiredArgsConstructor,所以orderCache这个字段会被构造器注入,而对应的Bean应该就是CacheConfig里的orderStatusCache。所以虽然用户可能没有显式地用@Autowired,但是因为用了Lombok的@RequiredArgsConstructor,如果orderCache是final的话,Spring会自动注入同类型的Bean。这时候需要检查OrderService中的orderCache的类型是否匹配。
看OrderService的定义:private final Cache<String, OrderStatus> orderCache; 而CacheConfig中的Bean也是这个类型,所以应该会被正确注入。所以这个Bean是有被使用的,可能没注意到OrderService里的注入,导致认为没有用到。
另外,在OrderTimeoutConsumer.java中,还有一个processedMsgCache,这个是用Caffeine直接new的,并没有使用Spring管理的Bean。所以这个缓存是消费者自己创建的,和CacheConfig无关。而orderStatusCache是被OrderService使用的,所以这个配置是必须的,否则OrderService中的orderCache会找不到对应的Bean,导致启动失败。
所以可能在于误解了哪里用到了这个Bean。可能在代码中没有显式地看到@Autowired或者@Resource,但实际上因为Lombok的@RequiredArgsConstructor和Spring的自动注入机制,这个Bean是被OrderService使用的。如果没有CacheConfig中的Bean,Spring启动时会报错,提示找不到符合条件的Cache类型的Bean来注入到OrderService中。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/72214.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/72214.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

计算机毕业设计SpringBoot+Vue.js智慧图书管理系统(源码+文档+PPT+讲解)

温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 温馨提示&#xff1a;文末有 CSDN 平台官方提供的学长联系方式的名片&#xff01; 作者简介&#xff1a;Java领…

《论数据分片技术及其应用》审题技巧 - 系统架构设计师

论数据分片技术及其应用写作框架 一、考点概述 本论题“论数据分片技术及其应用”主要考察的是软件工程中数据分片技术的理解、应用及其实际效果分析。考点涵盖以下几个方面&#xff1a; 首先&#xff0c;考生需对数据分片的基本概念有清晰的认识&#xff0c;理解数据分片是…

【每日学点HarmnoyOS Next知识】web加载pdf、Toggle禁用、Grid多次渲染问题、Web判断是否存在title、 List侧滑栏关闭

【每日学点HarmnoyOS Next知识】web加载pdf、Toggle禁用、Grid多次渲染问题、Web判断是否存在title、 List侧滑栏关闭 1、HarmonyOS Web组件加载本地pdf文件后&#xff0c;默认显示标题和下载按钮&#xff0c;可以隐藏或者有对应的操作这个title的API吗&#xff1f; 隐藏PDF操…

下载 MindSpore 配置 PyTorch环境

以下是下载 MindSpore 并配置 PyTorch 环境的详细步骤&#xff0c;适用于常见的 Linux/Windows 系统&#xff08;以 NVIDIA GPU 为例&#xff09;&#xff1a; 一、环境准备 1. 硬件与软件检查 GPU 支持&#xff1a;确保使用 NVIDIA 显卡&#xff0c;通过 nvidia-smi 查看驱动…

三、数据提取

利用 requests 可以获取网站页面数据&#xff0c;但是 requests 返回的数据中包含了一些冗余数据&#xff0c;我们需要在这些数据集中提取自己需要的信息。所以我们要学会在数据集中提取自己需要的数据。 需要掌握的知识点如下&#xff1a; json 数据提取 jsonpath 语法 静态…

Qt | 实战继承自QObject的IOThread子类实现TCP客户端(安全销毁)

点击上方"蓝字"关注我们 01、QThread >>> start() 启动线程,调用后会执行 run() 方法。 run() 线程的入口点,子类化 QThread 时需要重写此方法以定义线程的执行逻辑。 quit() 请求线程退出,线程会在事件循环结束后终止。 exit(int returnCode = 0) 退出…

int new_pos = (pos + delta + 9) % 9 化曲为直算法

公式 int new_pos (pos delta 9) % 9; 是一个常见的 循环数组索引计算 方法&#xff0c;用于处理圆圈排列中的位置计算。这个公式可以总结出一个普遍的规律&#xff0c;适用于任何循环数组或圆圈排列的场景。 普遍规律 假设有一个长度为 ( n ) 的循环数组&#xff08;或圆圈…

生成一个日期时间序列,从‘2024-12-03‘开始,每小时递增 oracle 转为达梦

-------------------------------生成一个日期时间序列&#xff0c;从2024-12-03开始&#xff0c;每小时递增---------------------------- ---原oracle : SELECT to_date(2024-12-03, yyyy-mm-dd) (ROWNUM - 1) / 24 data_time FROM dual CO…

前端学习——HTML

VSCode常用快捷键 代码格式化&#xff1a;ShiftAltF 向上或向下移动一行&#xff1a;AltUp或AltDown 快速复制一行代码&#xff1a;ShiftAltUp或者ShiftAltDown 快速替换&#xff1a;CtrlH HTML标签 文本标签 定义着重文字 定义粗体文字 定义斜体文字 加重语气 删除字 无特…

Hadoop之02:MR-图解

1、不是所有的MR都适合combine 1.1、map端统计出了不同班级的每个学生的年龄 如&#xff1a;(class1, 14)表示class1班的一个学生的年龄是14岁。 第一个map任务&#xff1a; class1 14 class1 15 class1 16 class2 10第二个map任务&#xff1a; class1 16 class2 10 class…

C++核心编程之STL

STL初识&#xff1a;从零开始的奇幻冒险 1 STL的诞生&#xff1a;一场代码复用的革命 很久很久以前&#xff0c;在编程的世界里&#xff0c;开发者们每天都在重复造轮子。无论是数据结构还是算法&#xff0c;每个人都得从头开始写&#xff0c;仿佛在无尽的沙漠中寻找绿洲。直到…

【Python】OpenCV算法使用案例全解

OpenCV算法使用案例全解 前言 OpenCV&#xff08;Open Source Computer Vision Library&#xff09;是一个开源的计算机视觉和机器学习软件库&#xff0c;它提供了大量的图像和视频处理功能。从简单的图像滤波到复杂的三维重建&#xff0c;OpenCV涵盖了计算机视觉领域的众多算…

Redis的持久化-RDBAOF

文章目录 一、 RDB1. 触发机制2. 流程说明3. RDB 文件的处理4. RDB 的优缺点 二、AOF1. 使用 AOF2. 命令写⼊3. 文件同步4. 重写机制5 启动时数据恢复 一、 RDB RDB 持久化是把当前进程数据生成快照保存到硬盘的过程&#xff0c;触发 RDB 持久化过程分为手动触发和自动触发。 …

Python Cookbook-2.29 带版本号的文件名

任务 如果你想在改写某文件之前对其做个备份&#xff0c;可以在老文件的名字后面根据惯例加上三个数字的版本号。 解决方案 我们需要编写一个函数来完成备份工作: def VersionFile(file_spec, vtypecopy):import os,shutilif os.path.isfile(file_spec):#检查vtype参数if v…

CCF-CSP认证 202104-1灰度直方图

题目描述 思路 首先输入矩阵长度、矩阵宽度和灰度范围&#xff0c;结果数组长度可固定&#xff0c;其中的元素要初始化为0。在输入灰度值的时候&#xff0c;结果数组中以该灰度值为索引的元素值1&#xff0c;即可统计每个灰度值的数量。 代码 C版&#xff1a; #include <…

水果识别系统 | BP神经网络水果识别系统,含GUI界面(Matlab)

使用说明 代码下载&#xff1a;BP神经网络水果识别系统&#xff0c;含GUI界面&#xff08;Matlab&#xff09; BP神经网络水果识别系统 一、引言 1.1、研究背景及意义 在当今科技迅速发展的背景下&#xff0c;人工智能技术尤其是在图像识别领域的应用日益广泛。水果识别作为…

如何在网页上显示3D CAD PMI

在现代制造业中&#xff0c;3D CAD模型已成为产品设计和制造的核心。为了更有效地传达设计意图和制造信息&#xff0c;产品和制造信息&#xff08;PMI&#xff09;被嵌入到3D模型中。然而&#xff0c;如何在网页上清晰、准确地显示这些3D CAD PMI&#xff0c;成为了一个重要的技…

Git基本命令索引

GIT基本命令索引 创建代码库修改和提交代码日志管理远程操作操作分支 创建代码库 操作指令初始化仓库git init克隆远程仓库git clone 修改和提交代码 操作指令查看文件状态git status文件暂存git add文件比较git diff文件提交git commit回滚版本git reset重命名或者移动工作…

基于Selenium的Python淘宝评论爬取教程

文章目录 前言1. 环境准备安装 Python&#xff1a;安装 Selenium&#xff1a;下载浏览器驱动&#xff1a; 2. 实现思路3. 代码实现4. 代码解释5. 注意事项 前言 以下是一个基于 Selenium 的 Python 淘宝评论爬取教程&#xff0c;需要注意的是&#xff0c;爬取网站数据应当遵守…

GenBI 可视化选谁:Python Matplotlib?HTML ?Tableau?

引言 生成式 BI(Generative BI,GenBI)通过自然语言交互和自动化内容生成,革新了数据分析和商业智能(BI)领域。用户可以通过自然语言提问,GenBI 系统自动生成相应的 SQL 查询、获取数据,并以可视化图表、表格、自然语言摘要等形式呈现分析结果。 可视化是 GenBI 的关键…