你可以先把事务消息理解成一句话事务消息是用来保证“本地数据库操作”和“发送 MQ 消息”尽量保持一致的。它解决的不是“消费者一定成功消费”的问题而是解决我数据库里的业务操作成功了MQ 消息也应该成功发出去。 我数据库里的业务操作失败了MQ 消息就不应该被消费者看到。1. 先看普通消息有什么问题假设你有一个下单接口1. 创建订单写入数据库 2. 发送 MQ 消息通知库存系统扣库存代码可能像这样publicvoidcreateOrder(){// 1. 创建订单写数据库orderService.saveOrder();// 2. 发送 MQ 消息rocketMQTemplate.convertAndSend(order_topic:create,订单创建成功);}看起来没问题但它有两个风险。情况一订单创建成功了但消息发送失败了订单写入数据库成功 ✅ 发送 MQ 消息失败 ❌结果就是数据库里有订单 但是库存系统不知道 所以库存没有扣这就出现了数据不一致。情况二消息发送成功了但订单创建失败了如果你先发消息再写数据库publicvoidcreateOrder(){// 1. 先发消息rocketMQTemplate.convertAndSend(order_topic:create,订单创建成功);// 2. 再创建订单orderService.saveOrder();}可能出现MQ 消息发送成功 ✅ 订单写入数据库失败 ❌结果就是库存系统收到消息开始扣库存 但是数据库里根本没有这个订单这也不对。2. 所以事务消息要解决什么事务消息就是要解决这个问题本地事务比如创建订单 MQ 消息发送这两个动作要配合起来。它想达到的效果是订单创建成功 - 消息才能被消费者消费 订单创建失败 - 消息不能被消费者消费3. RocketMQ 事务消息的核心思想RocketMQ 不是一上来就把消息给消费者。它会先发一条特殊消息叫半消息 Half Message你可以理解为消息先放到 RocketMQ 里但是暂时不让消费者看到。就像你寄快递时先把快递放到快递站但告诉快递站先别派送等我确认。4. 事务消息完整流程假设订单系统创建订单然后通知库存系统扣库存。流程是这样1. 订单系统先发送半消息到 RocketMQ 2. RocketMQ 保存半消息但消费者暂时看不到 3. 订单系统执行本地事务也就是创建订单 4. 如果订单创建成功告诉 RocketMQ提交消息 5. RocketMQ 把消息变成可消费状态 6. 库存系统才能消费消息开始扣库存如果订单创建失败1. 订单系统先发送半消息到 RocketMQ 2. RocketMQ 保存半消息但消费者暂时看不到 3. 订单系统执行本地事务创建订单失败 4. 告诉 RocketMQ回滚消息 5. RocketMQ 删除/丢弃这条消息 6. 库存系统永远看不到这条消息5. 用生活例子理解你把 RocketMQ 想成快递站。普通消息是你把快递交给快递站 快递站马上派送事务消息是你把快递交给快递站 但是贴了一个“暂不派送”的标签然后你去确认一件事钱有没有付成功 订单有没有创建成功 数据库有没有写成功如果确认成功你告诉快递站可以派送了如果确认失败你告诉快递站这个快递取消不要派送所以事务消息的关键不是“消费者消费成功”而是消费者看到消息之前先确认生产者自己的本地事务成功了。6. 为什么叫“事务消息”因为它和数据库事务有关。数据库事务是要么都成功 要么都失败比如转账A 扣 100 B 加 100不能只扣 A不加 B。而 RocketMQ 事务消息想解决的是数据库操作 MQ 消息投递这两个动作的一致性问题。比如订单入库成功 订单消息也应该发出去不能出现订单入库成功但是消息没发出去也不能出现订单没入库但是消息发出去了7. 事务消息和普通消息的区别普通消息消息一发出去消费者就可能收到。事务消息消息先进入 RocketMQ但消费者暂时看不到。 等本地事务成功后消息才会被消费者看到。对比一下普通消息 生产者 - Broker - 消费者 事务消息 生产者 - 半消息 - 执行本地事务 - 提交消息 - 消费者8. 最核心的三个状态RocketMQ 事务消息有三个结果COMMIT提交消息消费者可以消费 ROLLBACK回滚消息消费者看不到 UNKNOWN暂时不知道结果RocketMQ 后面会回查这三个很重要。1COMMIT本地事务成功。比如订单创建成功于是告诉 RocketMQ这条消息可以投递给消费者了。2ROLLBACK本地事务失败。比如订单创建失败于是告诉 RocketMQ这条消息不要了消费者不应该看到。3UNKNOWN生产者执行本地事务后可能因为网络问题、服务宕机等原因没有告诉 RocketMQ 到底成功还是失败。这时 RocketMQ 不知道该提交还是回滚。于是它会问生产者你刚才那条消息对应的本地事务到底成功了吗这个过程叫事务回查9. 什么是事务回查这是事务消息里最容易懵的地方。假设流程走到这里1. 半消息发送成功 2. 订单系统创建订单成功 3. 订单系统准备告诉 RocketMQ提交消息 4. 结果订单系统突然宕机了RocketMQ 此时很尴尬我这里有一条半消息 但是我不知道订单到底创建成功没有怎么办RocketMQ 会过一段时间主动问订单系统这条消息对应的订单到底有没有创建成功订单系统就去查数据库查一下 orderId 10001 的订单是否存在如果存在返回 COMMIT如果不存在返回 ROLLBACK这就是事务回查。10. 用订单例子完整串起来你发起下单用户点击下单RocketMQ 事务消息流程第一步订单系统发送半消息 消息内容订单 10001 创建成功 但是消费者暂时看不到 第二步订单系统执行本地事务 往订单表插入订单 10001 第三步判断本地事务结果 如果订单插入成功 提交消息 COMMIT 库存系统可以消费消息 如果订单插入失败 回滚消息 ROLLBACK 库存系统看不到消息 如果订单系统宕机不知道结果 RocketMQ 后面回查订单系统 订单系统查数据库 有订单 - COMMIT 没订单 - ROLLBACK11. 小白版流程图订单系统 | | 1. 发送半消息 v RocketMQ | | 2. 先保存但不投递 | 订单系统 | | 3. 创建订单到数据库 v MySQL | | 4. 判断订单是否创建成功 | |--- 成功 --- 告诉 RocketMQ 提交消息 --- 消费者可以消费 | |--- 失败 --- 告诉 RocketMQ 回滚消息 --- 消费者看不到 | |--- 未知 --- RocketMQ 后面回查订单系统12. 事务消息解决了什么没解决什么它解决的是生产者本地事务 和 MQ 消息发送 的一致性比如订单创建成功消息一定尽量发出去 订单创建失败消息不要被消费但它不解决消费者一定消费成功消费者消费失败怎么办还是靠消费失败重试 幂等 死信队列 人工补偿你之前问的“生产者消息发送出去了消费者已经完全消费消息了”这件事不是事务消息直接保证的。RocketMQ 里 Producer、Consumer、Topic、Group、Broker 的关系是Producer 把消息发到 BrokerConsumer 再根据 Topic 和 Group 去消费消息。13. 为什么不是直接用数据库事务包住 MQ 发送你可能会想TransactionalpublicvoidcreateOrder(){orderService.saveOrder();rocketMQTemplate.convertAndSend(order_topic:create,订单创建成功);}这样不就行了吗不完全行。因为数据库事务只能管数据库管不了 RocketMQ。比如数据库事务提交成功了 但是 MQ 发送失败了数据库没法自动回滚因为事务已经提交了。或者MQ 发送成功了 但是数据库事务回滚了消息已经发出去了消费者可能已经开始处理了。所以数据库事务不能天然保证 MQ 消息一致性RocketMQ 才引入事务消息。14. 代码大概长什么样你不用现在完全看懂代码先看注释。发送事务消息rocketMQTemplate.sendMessageInTransaction(order_topic:create,MessageBuilder.withPayload(orderMessage).build(),orderMessage);然后写一个事务监听器ComponentRocketMQTransactionListenerpublicclassOrderTransactionListenerimplementsRocketMQLocalTransactionListener{/** * 执行本地事务 * 这里通常写创建订单、写数据库 */OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemessage,Objectarg){try{// 1. 执行本地事务创建订单orderService.createOrder(arg);// 2. 本地事务成功提交消息returnRocketMQLocalTransactionState.COMMIT;}catch(Exceptione){// 3. 本地事务失败回滚消息returnRocketMQLocalTransactionState.ROLLBACK;}}/** * 事务回查 * 当 RocketMQ 不知道本地事务结果时会调用这里 */OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemessage){// 1. 根据订单ID查询数据库booleanorderExistsorderService.exists(message);// 2. 如果订单存在说明本地事务成功if(orderExists){returnRocketMQLocalTransactionState.COMMIT;}// 3. 如果订单不存在说明本地事务失败returnRocketMQLocalTransactionState.ROLLBACK;}}你目前只要看懂这两个方法executeLocalTransaction执行本地事务 checkLocalTransaction事务回查15. 再用一句话总结事务消息就是先把消息发到 RocketMQ但先不让消费者看到 然后执行本地数据库事务 如果数据库事务成功就提交消息让消费者消费 如果数据库事务失败就回滚消息不让消费者消费 如果中间状态不确定RocketMQ 后面会回查生产者。最关键的是这句话事务消息保证的是“生产者本地事务成功后消息才对消费者可见”。不是保证消费者一定消费成功。