大家好,我是G探险者!
📌 场景引入
在实际项目中,我们常常面临以下挑战:
- 监听 MQ 消息失败了,希望自动重试?
- 消费 MQ 消息后,要写数据库,但中间报错了?
- 消息处理必须要么成功要么失败,否则可能导致脏数据?
- 消息是幂等的吗?可以重复投递处理吗?
这些都需要 事务性会话 + 容器回滚机制 + 幂等控制 组合拳来解决。
✅ 一、什么是 JMS 的事务性会话?
事务性会话(transacted = true
)是一种 将消息的接收与处理放入事务中控制 的机制。
与确认模式(acknowledge)对比:
特性 | 确认模式(ACK) | 事务性会话(Transacted) |
---|---|---|
消息确认 | AUTO_ACKNOWLEDGE 、CLIENT_ACKNOWLEDGE 等 | 使用 session.commit() |
回滚方式 | 手动控制 ACK | 抛异常或手动 session.rollback() |
MQ是否重发消息 | 否(默认不重发) | ✅ 是,失败自动重新投递 |
一次事务包含消息数 | 一条(Spring容器下) | ✅ 默认一条,支持手动多条 |
🛠️ 二、Spring 如何开启事务性监听?
Spring 中的 DefaultMessageListenerContainer
支持事务模式:
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setDestinationName("MY.QUEUE");
container.setMessageListener(new MyListener());container.setSessionTransacted(true); // ✅ 开启事务会话
container.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED); // 推荐container.afterPropertiesSet();
container.start();
这样配置后,每条消息的处理会包裹在如下事务中:
Session session = connection.createSession(true, Session.SESSION_TRANSACTED)
🧪 三、事务处理机制详解
Spring 容器负责自动控制事务行为:
try {messageListener.onMessage(message);session.commit(); // ✅ 成功后提交
} catch (Throwable ex) {session.rollback(); // ❌ 抛异常后回滚,MQ 重发消息throw ex;
}
✅ 你只要记住:
- 成功就正常返回(容器帮你 commit)
- 失败就抛出异常(容器自动 rollback)
🔂 四、消息重试机制联动
Spring rollback → MQ 检测未 commit → 触发重投
🔁 每个 MQ 中间件(IBM MQ、ActiveMQ、TongLinkQ 等)都支持配置:
- 最大重投次数
- 重投间隔(redelivery delay)
- 超过重试后投递到死信队列(DLQ)
💥 五、事务作用范围:是“一条消息”吗?
这个问题很关键,我们以 Spring 默认配置为例说明:
场景 | 事务作用范围 |
---|---|
DefaultMessageListenerContainer 默认行为 | ✅ 每条消息单独包裹事务 |
自定义 Session 拉多条消息后统一 commit | ❌ 多条消息为一个事务 |
设置并发消费者(线程池) | 每条消息独立事务(线程隔离) |
实战建议:
✅ 在监听容器中消费 MQ 消息,默认一条消息就是一个事务单元,安全可靠。
🎯 六、事务 + 幂等的设计建议
事务只能解决“要么成功、要么失败”的问题,不能避免重复处理。
所以业务系统通常要配合幂等性策略:
场景 | 幂等性设计建议 |
---|---|
写数据库 | 利用主键/唯一索引避免重复插入 |
写 Redis | 使用 SETNX 保证消息只处理一次 |
写业务日志 | 使用消息 ID 做去重处理 |
第三方调用 | 如果不能重复调用,要做幂等屏障 |
☑️ 七、监听失败常见问题排查
问题 | 排查建议 |
---|---|
没开启事务? | 是否调用了 setSessionTransacted(true) |
容器未启动? | 是否漏了 afterPropertiesSet() 调用 |
消息处理失败后 MQ 不重发? | 是否吞掉异常了?应抛出异常给容器 |
重投失败消息去哪了? | 查看 MQ 的 DLQ(死信队列)配置 |
📘 小结
功能点 | 建议配置 |
---|---|
自动控制 commit/rollback | 使用 DefaultMessageListenerContainer |
每条消息开启事务 | setSessionTransacted(true) |
抛异常触发回滚 | 在 onMessage() 中不要吞异常 |
幂等设计 | 配合事务做幂等逻辑 |
消息处理失败自动重试 | 借助 MQ 的重投策略 + Spring 回滚机制 |
📘 下一篇预告:
第 4 篇:《JMS 消息重试机制与死信队列配置指南》
我们将详细讲解 MQ 的 redelivery policy、最大重试次数配置、死信队列处理策略,以及如何在 Spring 中优雅应对消费失败。