RocketMQ进阶02:事务消息实战 - 订单支付场景完整实现

引言:真实业务场景 业务需求:用户支付成功后,需要完成以下操作: 支付服务:记录支付流水 订单服务:更新订单状态 积分服务:增加用户积分 通知服务:发送支付成功短信 要求:保证数据一致性,支付成功后其他操作必须成功。 一、系统架构设计 1.1 整体架构 ┌──────────────────────────────────────────────────────┐ │ 订单支付系统架构 │ ├──────────────────────────────────────────────────────┤ │ │ │ 用户端 │ │ │ │ │ │ 1. 发起支付请求 │ │ ▼ │ │ ┌─────────────────────────────────────────────┐ │ │ │ 支付服务(Producer) │ │ │ │ - 调用第三方支付 │ │ │ │ - 记录支付流水(本地事务) │ │ │ │ - 发送事务消息 │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ │ 事务消息 │ │ ▼ │ │ ┌─────────────────────────────────────────────┐ │ │ │ RocketMQ Broker │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ 订单服务 │ │ 积分服务 │ │ 通知服务 │ │ │ │ Consumer │ │ Consumer │ │ Consumer │ │ │ │ │ │ │ │ │ │ │ │ 更新订单 │ │ 增加积分 │ │ 发送短信 │ │ │ │ 状态 │ │ │ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ │ │ │ └──────────────────────────────────────────────────────┘ 1.2 消息流转 支付成功 → 发送Half消息 → 记录支付流水(本地事务) ↓ 事务提交(Commit) ↓ 消息变为可消费状态 ↓ ┌─────────────┼─────────────┐ ▼ ▼ ▼ 订单服务 积分服务 通知服务 更新状态 增加积分 发送短信 二、代码实现 2.1 数据库设计 -- 支付流水表 CREATE TABLE payment_record ( id BIGINT PRIMARY KEY AUTO_INCREMENT, order_id VARCHAR(64) NOT NULL UNIQUE, user_id BIGINT NOT NULL, amount DECIMAL(10,2) NOT NULL, pay_time DATETIME NOT NULL, status VARCHAR(20) NOT NULL, -- SUCCESS, FAILED created_at DATETIME DEFAULT CURRENT_TIMESTAMP, INDEX idx_order_id (order_id) ); -- 订单表 CREATE TABLE orders ( id BIGINT PRIMARY KEY AUTO_INCREMENT, order_id VARCHAR(64) NOT NULL UNIQUE, user_id BIGINT NOT NULL, amount DECIMAL(10,2) NOT NULL, status VARCHAR(20) NOT NULL, -- CREATED, PAID, CANCELLED created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); -- 积分表 CREATE TABLE user_points ( user_id BIGINT PRIMARY KEY, points INT NOT NULL DEFAULT 0, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); 2.2 支付服务实现 @Service public class PaymentService { @Autowired private TransactionMQProducer transactionProducer; @Autowired private PaymentRecordMapper paymentRecordMapper; /** * 处理支付请求 */ public PaymentResult processPay(PaymentRequest request) { String orderId = request.getOrderId(); BigDecimal amount = request.getAmount(); // 1. 调用第三方支付接口 ThirdPartyPayResult payResult = callThirdPartyPay(orderId, amount); if (!payResult.isSuccess()) { return PaymentResult.failed("支付失败:" + payResult.getMessage()); } // 2. 构建事务消息 PaymentEvent event = new PaymentEvent(); event.setOrderId(orderId); event.setUserId(request.getUserId()); event.setAmount(amount); event.setPayTime(new Date()); Message message = new Message( "payment_success_topic", "pay", orderId, // Key JSON.toJSONString(event).getBytes(StandardCharsets.UTF_8) ); // 3. 发送事务消息 try { TransactionSendResult sendResult = transactionProducer.sendMessageInTransaction( message, event // 传递给executeLocalTransaction ); if (sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) { return PaymentResult.success("支付成功"); } else { return PaymentResult.failed("支付记录失败"); } } catch (Exception e) { log.error("发送事务消息失败", e); return PaymentResult.failed("系统异常"); } } /** * 调用第三方支付(模拟) */ private ThirdPartyPayResult callThirdPartyPay(String orderId, BigDecimal amount) { // 实际调用支付宝、微信等支付接口 log.info("调用第三方支付:订单={}, 金额={}", orderId, amount); return ThirdPartyPayResult.success(); } } /** * 事务消息监听器 */ @Component public class PaymentTransactionListener implements TransactionListener { @Autowired private PaymentRecordMapper paymentRecordMapper; /** * 执行本地事务:记录支付流水 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { PaymentEvent event = (PaymentEvent) arg; String orderId = event.getOrderId(); try { // 1. 插入支付记录 PaymentRecord record = new PaymentRecord(); record.setOrderId(orderId); record.setUserId(event.getUserId()); record.setAmount(event.getAmount()); record.setPayTime(event.getPayTime()); record.setStatus("SUCCESS"); int rows = paymentRecordMapper.insert(record); if (rows > 0) { log.info("支付记录成功:{}", orderId); return LocalTransactionState.COMMIT_MESSAGE; } else { log.error("支付记录失败:{}", orderId); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (DuplicateKeyException e) { // 重复记录,说明已经成功 log.warn("支付记录重复:{}", orderId); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("支付记录异常:{}", orderId, e); // 返回UNKNOW,等待回查 return LocalTransactionState.UNKNOW; } } /** * 回查事务状态 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String orderId = msg.getKeys(); try { // 查询支付记录是否存在 PaymentRecord record = paymentRecordMapper.selectByOrderId(orderId); if (record != null && "SUCCESS".equals(record.getStatus())) { log.info("回查成功:支付记录存在,订单={}", orderId); return LocalTransactionState.COMMIT_MESSAGE; } else { log.warn("回查失败:支付记录不存在,订单={}", orderId); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { log.error("回查异常:{}", orderId, e); return LocalTransactionState.UNKNOW; } } } 2.3 订单服务实现 @Service public class OrderConsumerService { @Autowired private OrderMapper orderMapper; /** * 消费支付成功消息 */ @RocketMQMessageListener( topic = "payment_success_topic", consumerGroup = "order_consumer_group" ) public class OrderMessageListener implements RocketMQListener<String> { @Override public void onMessage(String message) { PaymentEvent event = JSON.parseObject(message, PaymentEvent.class); String orderId = event.getOrderId(); log.info("收到支付成功消息:{}", orderId); try { // 1. 幂等性检查 Order order = orderMapper.selectByOrderId(orderId); if (order == null) { log.error("订单不存在:{}", orderId); return; } if ("PAID".equals(order.getStatus())) { log.warn("订单已支付,跳过处理:{}", orderId); return; // 幂等性保证 } // 2. 更新订单状态 order.setStatus("PAID"); order.setUpdatedAt(new Date()); int rows = orderMapper.updateById(order); if (rows > 0) { log.info("订单状态更新成功:{}", orderId); } else { log.error("订单状态更新失败:{}", orderId); throw new RuntimeException("更新失败"); } } catch (Exception e) { log.error("处理支付消息异常:{}", orderId, e); throw new RuntimeException(e); // 触发重试 } } } } 2.4 积分服务实现 @Service public class PointsConsumerService { @Autowired private UserPointsMapper userPointsMapper; @Autowired private PointsRecordMapper pointsRecordMapper; /** * 消费支付成功消息,增加积分 */ @RocketMQMessageListener( topic = "payment_success_topic", consumerGroup = "points_consumer_group" ) public class PointsMessageListener implements RocketMQListener<String> { @Override @Transactional public void onMessage(String message) { PaymentEvent event = JSON.parseObject(message, PaymentEvent.class); String orderId = event.getOrderId(); Long userId = event.getUserId(); log.info("收到支付成功消息,增加积分:用户={}, 订单={}", userId, orderId); try { // 1. 幂等性检查:是否已经增加过积分 PointsRecord existing = pointsRecordMapper.selectByOrderId(orderId); if (existing != null) { log.warn("积分已增加,跳过处理:{}", orderId); return; } // 2. 计算积分(1元=1积分) int points = event.getAmount().intValue(); // 3. 增加积分 int rows = userPointsMapper.increasePoints(userId, points); if (rows == 0) { // 用户不存在,创建记录 UserPoints userPoints = new UserPoints(); userPoints.setUserId(userId); userPoints.setPoints(points); userPointsMapper.insert(userPoints); } // 4. 记录积分变更 PointsRecord record = new PointsRecord(); record.setUserId(userId); record.setOrderId(orderId); record.setPoints(points); record.setReason("支付获得"); pointsRecordMapper.insert(record); log.info("积分增加成功:用户={}, 积分={}", userId, points); } catch (DuplicateKeyException e) { // 重复处理,直接返回 log.warn("积分记录重复:{}", orderId); } catch (Exception e) { log.error("积分增加异常:{}", orderId, e); throw new RuntimeException(e); } } } } 三、测试验证 3.1 正常流程测试 @SpringBootTest public class PaymentTransactionTest { @Autowired private PaymentService paymentService; @Test public void testNormalPayment() throws InterruptedException { // 1. 发起支付 PaymentRequest request = new PaymentRequest(); request.setOrderId("ORDER_001"); request.setUserId(10001L); request.setAmount(new BigDecimal("99.00")); PaymentResult result = paymentService.processPay(request); // 2. 验证支付结果 assertTrue(result.isSuccess()); // 3. 等待消息消费(实际生产环境通过监控确认) Thread.sleep(5000); // 4. 验证订单状态 Order order = orderMapper.selectByOrderId("ORDER_001"); assertEquals("PAID", order.getStatus()); // 5. 验证积分 UserPoints points = userPointsMapper.selectByUserId(10001L); assertTrue(points.getPoints() >= 99); } } 3.2 异常场景测试 @Test public void testPaymentFailure() { // 场景1:支付失败 // 预期:不记录支付流水,不发送消息 // 场景2:记录支付流水失败 // 预期:回滚消息,不通知下游服务 // 场景3:Producer宕机 // 预期:回查机制保证消息最终发送 // 场景4:Consumer消费失败 // 预期:自动重试,直到成功 } 四、最佳实践总结 4.1 关键要点 1. 幂等性设计 - 使用唯一键约束 - 消费前检查是否已处理 - 数据库层面保证 2. 本地事务设计 - 尽量简单 - 避免远程调用 - 快速返回 3. 回查逻辑设计 - 通过数据库查询事务状态 - 不依赖缓存 - 保证幂等 4. 异常处理 - 明确的Commit/Rollback - 未知异常返回UNKNOW - 记录详细日志 4.2 注意事项 ❌ 避免: - 本地事务中调用RPC - 本地事务执行时间过长 - 回查逻辑依赖不可靠数据源 ✅ 推荐: - 本地事务只操作数据库 - 控制本地事务在1秒内 - 回查直接查询数据库 五、总结 本文通过订单支付场景,完整展示了事务消息的实战应用: ...

2025-11-14 · maneng

RocketMQ进阶01:事务消息原理与实现 - 分布式事务的终极方案

引言:分布式事务的困境 在分布式系统中,跨服务的事务一致性是一个经典难题: 典型场景:电商下单扣库存 订单服务:创建订单 ✅ 库存服务:扣减库存 ❌(网络超时) 问题:订单已创建,但库存未扣减 → 数据不一致 RocketMQ 的事务消息提供了一种优雅的解决方案。 一、什么是事务消息? 1.1 核心概念 ┌────────────────────────────────────────────┐ │ 事务消息 vs 普通消息 │ ├────────────────────────────────────────────┤ │ │ │ 普通消息: │ │ 发送 → 立即可消费 │ │ │ │ 事务消息: │ │ 发送 → 暂不可消费(Half消息) │ │ → 执行本地事务 │ │ → 提交/回滚 │ │ → 可消费/删除 │ │ │ └────────────────────────────────────────────┘ 1.2 应用场景 场景1:订单创建 + 扣库存 - 订单服务:创建订单(本地事务) - 发送事务消息 → 库存服务扣库存 场景2:支付成功 + 积分增加 - 支付服务:记录支付(本地事务) - 发送事务消息 → 积分服务增加积分 场景3:注册用户 + 发送邮件 - 用户服务:创建用户(本地事务) - 发送事务消息 → 邮件服务发送欢迎邮件 二、事务消息原理 2.1 两阶段提交流程 ┌────────────────────────────────────────────────────────┐ │ 事务消息完整流程 │ ├────────────────────────────────────────────────────────┤ │ │ │ Producer Broker Consumer │ │ │ │ │ │ │ │ 1. 发送Half消息 │ │ │ │ ├───────────────────▶│ │ │ │ │ │ 存储Half消息 │ │ │ │ │ (不可消费) │ │ │ │ 2. 返回成功 │ │ │ │ ◀────────────────────┤ │ │ │ │ │ │ │ │ │ 3. 执行本地事务 │ │ │ │ │ (创建订单) │ │ │ │ │ │ │ │ │ │ 4. Commit/Rollback │ │ │ │ ├───────────────────▶│ │ │ │ │ │ │ │ │ │ │ 如果Commit: │ │ │ │ │ - 转为正常消息 │ │ │ │ │ - 可被消费 │ │ │ │ ├──────────────────▶│ │ │ │ │ │ 5. 消费 │ │ │ │ │ (扣库存) │ │ │ │ │ │ │ │ │ 如果Rollback: │ │ │ │ │ - 删除Half消息 │ │ │ │ │ - 不可消费 │ │ │ │ │ │ │ └────────────────────────────────────────────────────────┘ 2.2 事务回查机制 问题:Producer 执行完本地事务后,来不及发送 Commit 就宕机了怎么办? 解决:Broker 定期回查事务状态 ┌────────────────────────────────────────────┐ │ 事务回查流程 │ ├────────────────────────────────────────────┤ │ │ │ Broker Producer │ │ │ │ │ │ │ 1. 检测到超时的Half消息 │ │ │ │ │ │ │ │ 2. 发起事务状态回查 │ │ │ ├────────────────────────▶│ │ │ │ │ │ │ │ │ 3. 查询本地 │ │ │ │ 事务状态 │ │ │ │ │ │ │ 4. 返回事务状态 │ │ │ │ (COMMIT/ROLLBACK/ │ │ │ │ UNKNOWN) │ │ │ ◀────────────────────────┤ │ │ │ │ │ │ │ 5. 根据状态处理Half消息 │ │ │ │ │ │ └────────────────────────────────────────────┘ 三、代码实现 3.1 Producer 端实现 public class TransactionProducerExample { public static void main(String[] args) throws Exception { // 1. 创建事务消息生产者 TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group"); producer.setNamesrvAddr("localhost:9876"); // 2. 设置事务监听器 producer.setTransactionListener(new TransactionListener() { /** * 执行本地事务 * @param msg 消息 * @param arg 用户参数 * @return 本地事务执行结果 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String orderId = new String(msg.getBody()); System.out.println("执行本地事务,创建订单:" + orderId); try { // 模拟创建订单 boolean success = createOrder(orderId); if (success) { // 本地事务成功 → Commit System.out.println("订单创建成功,提交事务消息"); return LocalTransactionState.COMMIT_MESSAGE; } else { // 本地事务失败 → Rollback System.out.println("订单创建失败,回滚事务消息"); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { // 异常 → 未知状态,等待回查 System.out.println("订单创建异常,等待回查:" + e.getMessage()); return LocalTransactionState.UNKNOW; } } /** * 事务状态回查 * @param msg 消息 * @return 事务状态 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String orderId = new String(msg.getBody()); System.out.println("回查事务状态,订单ID:" + orderId); // 查询订单是否创建成功 boolean exists = checkOrderExists(orderId); if (exists) { // 订单存在 → Commit System.out.println("订单已存在,提交事务消息"); return LocalTransactionState.COMMIT_MESSAGE; } else { // 订单不存在 → Rollback System.out.println("订单不存在,回滚事务消息"); return LocalTransactionState.ROLLBACK_MESSAGE; } } }); // 3. 启动生产者 producer.start(); // 4. 发送事务消息 String orderId = "ORDER_" + System.currentTimeMillis(); Message msg = new Message( "order_topic", "create", orderId.getBytes() ); TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.println("发送结果:" + sendResult.getSendStatus()); System.out.println("本地事务状态:" + sendResult.getLocalTransactionState()); // 等待回查 Thread.sleep(60000); producer.shutdown(); } // 模拟创建订单 private static boolean createOrder(String orderId) { // 实际业务逻辑:插入订单表 System.out.println("INSERT INTO orders VALUES ('" + orderId + "', ...)"); return true; // 假设成功 } // 检查订单是否存在 private static boolean checkOrderExists(String orderId) { // 实际业务逻辑:查询订单表 System.out.println("SELECT * FROM orders WHERE id = '" + orderId + "'"); return true; // 假设存在 } } 3.2 Consumer 端实现 public class TransactionConsumerExample { public static void main(String[] args) throws Exception { // 1. 创建消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("inventory_consumer_group"); consumer.setNamesrvAddr("localhost:9876"); // 2. 订阅Topic consumer.subscribe("order_topic", "*"); // 3. 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context ) { for (MessageExt msg : msgs) { String orderId = new String(msg.getBody()); System.out.println("收到订单创建消息:" + orderId); try { // 扣减库存 boolean success = deductInventory(orderId); if (success) { System.out.println("库存扣减成功"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } else { System.out.println("库存扣减失败,重试"); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } catch (Exception e) { System.out.println("库存扣减异常,重试:" + e.getMessage()); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 4. 启动消费者 consumer.start(); System.out.println("库存服务启动成功"); } // 扣减库存 private static boolean deductInventory(String orderId) { // 实际业务逻辑:扣减库存 System.out.println("UPDATE inventory SET stock = stock - 1 WHERE order_id = '" + orderId + "'"); return true; } } 四、存储机制 4.1 Half消息存储 // Half消息存储在特殊Topic中 public static final String SYSTEM_TRANSHALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC"; // 消息发送到Broker时 public PutMessageResult putMessage(MessageExtBrokerInner msgInner) { // 如果是事务消息 if (MessageSysFlag.TRANSACTION_PREPARED_TYPE == tranType) { // 修改Topic为Half Topic msgInner.setTopic(SYSTEM_TRANSHALF_TOPIC); msgInner.setQueueId(0); } // 存储到CommitLog PutMessageResult result = this.commitLog.putMessage(msgInner); return result; } 4.2 Op消息存储 // Op消息用于标记Half消息的状态 public static final String SYSTEM_TRANSOP_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC"; // Commit/Rollback时写入Op消息 public void putOpMessage(MessageExtBrokerInner msgInner, String opType) { msgInner.setTopic(SYSTEM_TRANSOP_TOPIC); msgInner.setBody(opType.getBytes()); // 存储Op消息 this.commitLog.putMessage(msgInner); } // Op消息的作用: // 1. 记录Half消息已被Commit或Rollback // 2. 避免重复回查 4.3 事务状态存储 Half消息 + Op消息 组合判断事务状态: ┌──────────────────────────────────────────┐ │ 事务状态判断逻辑 │ ├──────────────────────────────────────────┤ │ Half消息存在 + Op消息不存在 │ │ → 事务未完成,需要回查 │ │ │ │ Half消息存在 + Op消息=COMMIT │ │ → 事务已提交,消息可消费 │ │ │ │ Half消息存在 + Op消息=ROLLBACK │ │ → 事务已回滚,消息已删除 │ └──────────────────────────────────────────┘ 五、回查机制实现 5.1 回查触发条件 public class TransactionalMessageCheckService { // 回查配置 private static final long TRANSACTION_TIMEOUT = 6000; // 6秒超时 private static final int MAX_CHECK_TIMES = 15; // 最多回查15次 // 定时扫描Half消息 public void check() { try { // 1. 从Half Topic读取消息 GetResult getResult = getHalfMsg(queueId, offset, nums); for (MessageExt msgExt : getResult.getMsg()) { // 2. 检查是否有对应的Op消息 if (isOpMessageExist(msgExt)) { continue; // 已处理,跳过 } // 3. 检查超时时间 long currentTime = System.currentTimeMillis(); long tranTime = msgExt.getStoreTimestamp(); if (currentTime - tranTime < TRANSACTION_TIMEOUT) { continue; // 未超时,跳过 } // 4. 检查回查次数 int checkTimes = msgExt.getReconsumeTimes(); if (checkTimes >= MAX_CHECK_TIMES) { // 超过最大回查次数,强制回滚 this.resolveDiscardMsg(msgExt); continue; } // 5. 发起回查 this.sendCheckMessage(msgExt); } } catch (Exception e) { log.error("Check transaction error", e); } } // 发送回查请求 private void sendCheckMessage(MessageExt msgExt) { // 构建回查请求 CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader(); requestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); requestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); // 发送到Producer this.brokerController.getBroker2Client().checkProducerTransactionState( msgExt.getBornHostString(), requestHeader, msgExt ); } } 5.2 回查响应处理 // Producer端处理回查请求 public class ClientRemotingProcessor implements NettyRequestProcessor { @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { switch (request.getCode()) { case RequestCode.CHECK_TRANSACTION_STATE: return this.checkTransactionState(ctx, request); default: break; } return null; } // 回查事务状态 private RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) { // 1. 解析请求 CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader( CheckTransactionStateRequestHeader.class); // 2. 获取消息 MessageExt message = MessageDecoder.decode(request.getBody()); // 3. 调用用户的TransactionListener TransactionListener transactionListener = this.getTransactionListener(); if (transactionListener != null) { LocalTransactionState localTransactionState = transactionListener.checkLocalTransaction(message); // 4. 返回事务状态 EndTransactionRequestHeader responseHeader = new EndTransactionRequestHeader(); responseHeader.setTransactionId(requestHeader.getTransactionId()); responseHeader.setCommitLogOffset(requestHeader.getCommitLogOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: responseHeader.setCommitOrRollback( MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: responseHeader.setCommitOrRollback( MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: responseHeader.setCommitOrRollback( MessageSysFlag.TRANSACTION_NOT_TYPE); break; } // 5. 发送响应 this.brokerController.getBroker2Client().endTransactionOneway( message.getBrokerAddr(), responseHeader, "Check transaction state from broker" ); } return null; } } 六、最佳实践 6.1 幂等性保证 // 问题:消息可能被重复消费,如何保证幂等? // 方案1:唯一键约束 public boolean deductInventory(String orderId) { try { // 使用orderId作为唯一键 jdbcTemplate.update( "INSERT INTO inventory_log (order_id, status) VALUES (?, 'DEDUCTED')", orderId ); // 扣减库存 jdbcTemplate.update( "UPDATE inventory SET stock = stock - 1 WHERE product_id = ?", productId ); return true; } catch (DuplicateKeyException e) { // 重复消费,直接返回成功 return true; } } // 方案2:分布式锁 public boolean deductInventory(String orderId) { String lockKey = "inventory:lock:" + orderId; if (redisLock.tryLock(lockKey, 10, TimeUnit.SECONDS)) { try { // 检查是否已处理 if (isProcessed(orderId)) { return true; } // 扣减库存 updateInventory(orderId); // 标记已处理 markAsProcessed(orderId); return true; } finally { redisLock.unlock(lockKey); } } return false; } 6.2 异常处理 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { // 执行本地事务 executeBusinessLogic(msg); return LocalTransactionState.COMMIT_MESSAGE; } catch (BusinessException e) { // 业务异常 → 明确回滚 log.error("Business exception, rollback transaction", e); return LocalTransactionState.ROLLBACK_MESSAGE; } catch (Exception e) { // 未知异常 → 返回UNKNOW,等待回查 log.error("Unknown exception, wait for check", e); return LocalTransactionState.UNKNOW; } } 6.3 性能优化 # Producer配置 # 事务回查最大次数 transactionCheckMax=15 # 回查间隔时间(毫秒) transactionCheckInterval=6000 # 事务消息过期时间(小时) transactionTimeOut=6 # 异步发送(提高吞吐量) sendMsgTimeout=3000 retryTimesWhenSendFailed=2 七、总结与思考 7.1 核心要点 两阶段提交:Half消息 + 本地事务 + Commit/Rollback 回查机制:保证事务最终一致性 存储设计:Half Topic + Op Topic 组合判断状态 幂等性:Consumer端需要保证消息幂等处理 7.2 思考题 为什么需要Op消息? ...

2025-11-14 · maneng

RocketMQ实战指南:阿里巴巴的分布式消息方案

为什么RocketMQ能支撑双11万亿级消息?如何使用事务消息实现分布式事务?如何保证消息的全局顺序? 本文深度剖析RocketMQ的核心特性和实战应用。 一、RocketMQ核心架构 1.1 核心组件 Producer → NameServer → Broker → Consumer ↓ ↓ 路由信息 消息存储 与Kafka的区别: 组件 RocketMQ Kafka 注册中心 NameServer(自研,轻量级) ZooKeeper / KRaft 存储结构 CommitLog + ConsumeQueue Partition日志文件 消息模型 点对点 + 发布订阅 发布订阅 特殊功能 事务消息、延迟消息、顺序消息 流式计算、消息回溯 1.2 存储架构 RocketMQ的三层存储: 1. CommitLog(所有消息) ├─ Message 1(Topic A) ├─ Message 2(Topic B) ├─ Message 3(Topic A) ... 2. ConsumeQueue(消息索引) Topic A ├─ Queue 0 │ ├─ Offset 0 → CommitLog位置100 │ ├─ Offset 1 → CommitLog位置300 │ ... └─ Queue 1 ... 3. IndexFile(消息检索) Key → CommitLog位置 优势: ...

2025-11-03 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...