RocketMQ进阶10:消费进度管理 - Offset存储与重置策略

引言:什么是Offset? Offset(偏移量)记录Consumer的消费进度: Queue中的消息位置:0, 1, 2, 3, 4, 5... Consumer已消费到:3 下次从4开始消费 Offset存储方式 集群模式(Broker存储) // 自动提交(默认) consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // Offset存储在Broker端,多个Consumer共享进度 广播模式(本地存储) consumer.setMessageModel(MessageModel.BROADCASTING); // Offset存储在本地文件,每个Consumer独立进度 Offset提交机制 // 1. 自动提交(默认) // 消费成功后自动提交 // 2. 手动提交 consumer.setAutoCommit(false); // 业务处理完成后手动提交 context.commitOffset(); Offset重置 按时间重置 # 重置到1小时前 sh mqadmin resetOffsetByTime \ -g consumerGroup \ -t topic \ -s -3600000 按位置重置 # 重置到最早 sh mqadmin resetOffsetByTime \ -g consumerGroup \ -t topic \ -s -1 # 重置到最新 sh mqadmin resetOffsetByTime \ -g consumerGroup \ -t topic \ -s 0 最佳实践 1. 集群模式用于分布式消费 2. 广播模式用于全量通知 3. 生产环境定期备份Offset 4. 重置Offset前务必评估影响 本文关键词:Offset 消费进度 进度管理 重置策略 ...

2025-11-14 · maneng

RocketMQ进阶09:消息堆积处理 - 生产故障的应急方案

引言:消息堆积的危害 堆积影响: 消费延迟增加 内存占用上升 磁盘空间不足 系统性能下降 原因分析 1. Consumer消费速度慢 - 业务逻辑耗时 - 数据库慢查询 - 外部接口超时 2. Consumer数量不足 - 单个Consumer处理能力有限 3. Consumer宕机 - 无Consumer消费 解决方案 方案1:增加Consumer # 快速扩容Consumer实例 docker run -d rocketmq-consumer 方案2:提升消费性能 // 增加消费线程 consumer.setConsumeThreadMin(50); consumer.setConsumeThreadMax(100); // 批量消费 consumer.setConsumeMessageBatchMaxSize(16); // 优化业务逻辑 // - 异步处理 // - 批量操作数据库 // - 使用缓存 方案3:临时限流 // 限制Producer发送速率 Thread.sleep(10); // 每条消息延迟10ms 方案4:跳过堆积消息 // 紧急情况:重置offset跳过堆积 consumer.resetOffsetByTimestamp(topic, timestamp); 监控告警 监控指标: - 消费延迟(Consumer Lag) - 堆积消息数量 - 消费TPS 告警阈值: - 延迟 > 5分钟 - 堆积 > 10万条 本文关键词:消息堆积 故障处理 性能调优 应急方案

2025-11-14 · maneng

RocketMQ进阶08:流量控制机制 - 保护系统的最后防线

引言:为什么需要流量控制? 防止系统过载: Consumer处理速度:100条/秒 消息生产速度:1000条/秒 结果:消息堆积 → 内存溢出 → 系统崩溃 Producer端限流 // 设置发送超时 producer.setSendMsgTimeout(3000); // 异步发送队列大小限制 producer.setMaxMessageSize(4 * 1024 * 1024); // 4MB Consumer端限流 // 1. 限制拉取数量 consumer.setPullBatchSize(32); // 每次最多拉32条 // 2. 限制并发消费线程 consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(20); // 3. 限制消费速率 consumer.setPullInterval(100); // 拉取间隔100ms Broker端流控 触发条件: 1. 内存使用超过85% 2. 消息堆积超过阈值 3. PageCache繁忙 流控动作: - 拒绝Producer发送 - 降低Consumer拉取频率 本文关键词:流量控制 限流 背压 系统保护

2025-11-14 · maneng

RocketMQ进阶07:消息重试与死信队列 - 失败处理的最佳实践

引言:消费失败怎么办? Consumer消费失败时,RocketMQ自动重试: 消费失败 → 延迟重试 → 多次重试 → 死信队列 重试机制 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs) { try { // 处理消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { // 返回RECONSUME_LATER触发重试 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } // 重试间隔:10s, 30s, 1m, 2m, 3m...最多16次 死信队列 16次重试失败 → 进入死信队列(%DLQ%消费组名) 死信队列处理 // 监听死信队列 @RocketMQMessageListener( topic = "%DLQ%my_consumer_group", consumerGroup = "dlq_handler_group" ) public class DLQHandler implements RocketMQListener<String> { @Override public void onMessage(String msg) { // 人工处理或记录日志 log.error("死信消息:{}", msg); } } 本文关键词:消息重试 死信队列 DLQ 失败处理

2025-11-14 · maneng

RocketMQ进阶06:批量消息优化 - 提升吞吐量的利器

引言:批量的力量 单条发送 vs 批量发送: 单条:1万TPS 批量:10万TPS(提升10倍) 批量发送 List<Message> messages = new ArrayList<>(); for (int i = 0; i < 100; i++) { Message msg = new Message("topic", ("Msg" + i).getBytes()); messages.add(msg); } // 批量发送(一次网络调用) SendResult result = producer.send(messages); 批量大小限制 单批最大:4MB 建议:每批100-1000条消息 性能对比 方式 TPS 延迟 单条发送 1万 5ms 批量发送 10万 10ms 本文关键词:批量消息 性能优化 高吞吐量

2025-11-14 · maneng

RocketMQ进阶05:延迟消息机制 - 定时任务的优雅实现

引言:延迟消息的应用场景 典型场景: 订单超时自动取消(30分钟未支付) 定时推送消息(生日祝福) 延迟重试(失败后延迟重试) 延迟级别 RocketMQ支持18个延迟级别: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h ##实现原理 // 发送延迟消息 Message msg = new Message("topic", "body".getBytes()); msg.setDelayTimeLevel(3); // 延迟10秒 producer.send(msg); // 原理: // 1. 消息先发送到SCHEDULE_TOPIC_XXXX // 2. 定时任务扫描到期消息 // 3. 投递到目标Topic 实战案例 // 订单超时取消 public void createOrder(Order order) { // 1. 创建订单 orderService.save(order); // 2. 发送延迟消息(30分钟) Message msg = new Message("order_timeout_topic", order.getId().toString().getBytes()); msg.setDelayTimeLevel(16); // 30分钟 producer.send(msg); } // Consumer处理超时订单 @Override public void onMessage(String orderId) { Order order = orderService.getById(orderId); if ("CREATED".equals(order.getStatus())) { // 仍未支付,取消订单 orderService.cancel(orderId); } } 本文关键词:延迟消息 定时任务 订单超时 延迟队列

2025-11-14 · maneng

RocketMQ进阶04:顺序消息实战 - 订单状态流转的有序保证

引言:订单状态机 订单状态必须按照固定流程流转: 待支付 → 已支付 → 已发货 → 已完成 使用顺序消息保证状态流转的正确性。 实现代码 // Producer:发送订单状态变更消息 public class OrderStatusProducer { public void changeStatus(Long orderId, String newStatus) { Message msg = new Message("order_status_topic", newStatus.getBytes()); // 使用orderId作为orderKey,保证同一订单消息有序 producer.send(msg, (mqs, msg1, arg) -> { int index = (int) (orderId % mqs.size()); return mqs.get(index); }, orderId); } } // Consumer:顺序处理状态变更 @RocketMQMessageListener( topic = "order_status_topic", consumerGroup = "order_status_group", consumeMode = ConsumeMode.ORDERLY // 顺序消费 ) public class OrderStatusConsumer implements RocketMQListener<String> { @Override public void onMessage(String newStatus) { // 验证状态流转合法性 if (isValidTransition(currentStatus, newStatus)) { updateOrderStatus(orderId, newStatus); } else { log.error("非法状态流转"); } } } 核心要点 orderKey选择:使用orderId保证同一订单有序 状态验证:Consumer端验证状态流转合法性 异常处理:非法状态流转记录日志并告警 本文关键词:顺序消息实战 订单状态 状态机 有序流转

2025-11-14 · maneng

RocketMQ进阶03:顺序消息原理 - 局部有序与全局有序的设计权衡

引言:为什么需要顺序消息? 场景:订单状态流转 创建订单 → 支付订单 → 发货 → 确认收货 如果消息乱序: 支付 → 创建 → 发货 ❌ 业务错误 RocketMQ 提供顺序消息保证消息按照发送顺序消费。 一、顺序消息类型 1.1 全局有序 vs 局部有序 ┌──────────────────────────────────────────────┐ │ 全局有序(Global Order) │ ├──────────────────────────────────────────────┤ │ 特点:所有消息严格按照FIFO顺序 │ │ 实现:单Queue + 单Consumer线程 │ │ 性能:低(吞吐量受限) │ │ 场景:极少使用 │ │ │ │ Queue0: Msg1 → Msg2 → Msg3 → Msg4 │ │ │ └──────────────────────────────────────────────┘ ┌──────────────────────────────────────────────┐ │ 局部有序(Partition Order) │ ├──────────────────────────────────────────────┤ │ 特点:相同Key的消息有序 │ │ 实现:相同Key路由到同一Queue │ │ 性能:高(并发消费) │ │ 场景:常用(订单、用户维度有序) │ │ │ │ Queue0: Order1-Msg1 → Order1-Msg2 │ │ Queue1: Order2-Msg1 → Order2-Msg2 │ │ Queue2: Order3-Msg1 → Order3-Msg2 │ │ │ └──────────────────────────────────────────────┘ 1.2 实现原理 Producer端保证: 1. 相同orderKey的消息发送到同一个Queue 2. 使用MessageQueueSelector选择Queue Broker端保证: 1. 单个Queue内部FIFO存储 2. CommitLog顺序写 Consumer端保证: 1. 单个Queue只被一个Consumer实例消费 2. 单个Queue内的消息单线程顺序消费 二、代码实现 2.1 顺序发送 public class OrderedProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ordered_producer_group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); // 模拟订单消息 List<OrderStep> orderSteps = buildOrders(); for (OrderStep step : orderSteps) { Message msg = new Message( "order_topic", step.getTags(), step.toString().getBytes() ); // 顺序发送:相同orderId的消息发送到同一个Queue SendResult sendResult = producer.send( msg, new MessageQueueSelector() { @Override public MessageQueue select( List<MessageQueue> mqs, Message msg, Object arg ) { Long orderId = (Long) arg; // Hash取模,确保相同orderId路由到同一Queue int index = (int) (orderId % mqs.size()); return mqs.get(index); } }, step.getOrderId() // orderKey ); System.out.printf("发送:%s, Queue=%d%n", step, sendResult.getMessageQueue().getQueueId()); } producer.shutdown(); } // 构建订单步骤 private static List<OrderStep> buildOrders() { List<OrderStep> steps = new ArrayList<>(); // 订单1001 steps.add(new OrderStep(1001L, "创建")); steps.add(new OrderStep(1001L, "支付")); steps.add(new OrderStep(1001L, "发货")); // 订单1002 steps.add(new OrderStep(1002L, "创建")); steps.add(new OrderStep(1002L, "支付")); // 订单1003 steps.add(new OrderStep(1003L, "创建")); return steps; } } class OrderStep { private Long orderId; private String status; public OrderStep(Long orderId, String status) { this.orderId = orderId; this.status = status; } public Long getOrderId() { return orderId; } public String getTags() { return status; } @Override public String toString() { return "Order=" + orderId + ", Status=" + status; } } 2.2 顺序消费 public class OrderedConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer_group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("order_topic", "*"); // 注册顺序消息监听器 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage( List<MessageExt> msgs, ConsumeOrderlyContext context ) { // 单线程顺序消费 for (MessageExt msg : msgs) { System.out.printf( "消费:Thread=%s, Queue=%d, Msg=%s%n", Thread.currentThread().getName(), msg.getQueueId(), new String(msg.getBody()) ); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("顺序消费启动成功"); } } 输出示例: ...

2025-11-14 · maneng

RocketMQ进阶02:事务消息实战 - 订单支付场景完整实现

引言:真实业务场景 业务需求:用户支付成功后,需要完成以下操作: 支付服务:记录支付流水 订单服务:更新订单状态 积分服务:增加用户积分 通知服务:发送支付成功短信 要求:保证数据一致性,支付成功后其他操作必须成功。 一、系统架构设计 1.1 整体架构 ┌──────────────────────────────────────────────────────┐ │ 订单支付系统架构 │ ├──────────────────────────────────────────────────────┤ │ │ │ 用户端 │ │ │ │ │ │ 1. 发起支付请求 │ │ ▼ │ │ ┌─────────────────────────────────────────────┐ │ │ │ 支付服务(Producer) │ │ │ │ - 调用第三方支付 │ │ │ │ - 记录支付流水(本地事务) │ │ │ │ - 发送事务消息 │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ │ 事务消息 │ │ ▼ │ │ ┌─────────────────────────────────────────────┐ │ │ │ RocketMQ Broker │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ 订单服务 │ │ 积分服务 │ │ 通知服务 │ │ │ │ Consumer │ │ Consumer │ │ Consumer │ │ │ │ │ │ │ │ │ │ │ │ 更新订单 │ │ 增加积分 │ │ 发送短信 │ │ │ │ 状态 │ │ │ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ │ │ │ └──────────────────────────────────────────────────────┘ 1.2 消息流转 支付成功 → 发送Half消息 → 记录支付流水(本地事务) ↓ 事务提交(Commit) ↓ 消息变为可消费状态 ↓ ┌─────────────┼─────────────┐ ▼ ▼ ▼ 订单服务 积分服务 通知服务 更新状态 增加积分 发送短信 二、代码实现 2.1 数据库设计 -- 支付流水表 CREATE TABLE payment_record ( id BIGINT PRIMARY KEY AUTO_INCREMENT, order_id VARCHAR(64) NOT NULL UNIQUE, user_id BIGINT NOT NULL, amount DECIMAL(10,2) NOT NULL, pay_time DATETIME NOT NULL, status VARCHAR(20) NOT NULL, -- SUCCESS, FAILED created_at DATETIME DEFAULT CURRENT_TIMESTAMP, INDEX idx_order_id (order_id) ); -- 订单表 CREATE TABLE orders ( id BIGINT PRIMARY KEY AUTO_INCREMENT, order_id VARCHAR(64) NOT NULL UNIQUE, user_id BIGINT NOT NULL, amount DECIMAL(10,2) NOT NULL, status VARCHAR(20) NOT NULL, -- CREATED, PAID, CANCELLED created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); -- 积分表 CREATE TABLE user_points ( user_id BIGINT PRIMARY KEY, points INT NOT NULL DEFAULT 0, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); 2.2 支付服务实现 @Service public class PaymentService { @Autowired private TransactionMQProducer transactionProducer; @Autowired private PaymentRecordMapper paymentRecordMapper; /** * 处理支付请求 */ public PaymentResult processPay(PaymentRequest request) { String orderId = request.getOrderId(); BigDecimal amount = request.getAmount(); // 1. 调用第三方支付接口 ThirdPartyPayResult payResult = callThirdPartyPay(orderId, amount); if (!payResult.isSuccess()) { return PaymentResult.failed("支付失败:" + payResult.getMessage()); } // 2. 构建事务消息 PaymentEvent event = new PaymentEvent(); event.setOrderId(orderId); event.setUserId(request.getUserId()); event.setAmount(amount); event.setPayTime(new Date()); Message message = new Message( "payment_success_topic", "pay", orderId, // Key JSON.toJSONString(event).getBytes(StandardCharsets.UTF_8) ); // 3. 发送事务消息 try { TransactionSendResult sendResult = transactionProducer.sendMessageInTransaction( message, event // 传递给executeLocalTransaction ); if (sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) { return PaymentResult.success("支付成功"); } else { return PaymentResult.failed("支付记录失败"); } } catch (Exception e) { log.error("发送事务消息失败", e); return PaymentResult.failed("系统异常"); } } /** * 调用第三方支付(模拟) */ private ThirdPartyPayResult callThirdPartyPay(String orderId, BigDecimal amount) { // 实际调用支付宝、微信等支付接口 log.info("调用第三方支付:订单={}, 金额={}", orderId, amount); return ThirdPartyPayResult.success(); } } /** * 事务消息监听器 */ @Component public class PaymentTransactionListener implements TransactionListener { @Autowired private PaymentRecordMapper paymentRecordMapper; /** * 执行本地事务:记录支付流水 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { PaymentEvent event = (PaymentEvent) arg; String orderId = event.getOrderId(); try { // 1. 插入支付记录 PaymentRecord record = new PaymentRecord(); record.setOrderId(orderId); record.setUserId(event.getUserId()); record.setAmount(event.getAmount()); record.setPayTime(event.getPayTime()); record.setStatus("SUCCESS"); int rows = paymentRecordMapper.insert(record); if (rows > 0) { log.info("支付记录成功:{}", orderId); return LocalTransactionState.COMMIT_MESSAGE; } else { log.error("支付记录失败:{}", orderId); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (DuplicateKeyException e) { // 重复记录,说明已经成功 log.warn("支付记录重复:{}", orderId); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("支付记录异常:{}", orderId, e); // 返回UNKNOW,等待回查 return LocalTransactionState.UNKNOW; } } /** * 回查事务状态 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String orderId = msg.getKeys(); try { // 查询支付记录是否存在 PaymentRecord record = paymentRecordMapper.selectByOrderId(orderId); if (record != null && "SUCCESS".equals(record.getStatus())) { log.info("回查成功:支付记录存在,订单={}", orderId); return LocalTransactionState.COMMIT_MESSAGE; } else { log.warn("回查失败:支付记录不存在,订单={}", orderId); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { log.error("回查异常:{}", orderId, e); return LocalTransactionState.UNKNOW; } } } 2.3 订单服务实现 @Service public class OrderConsumerService { @Autowired private OrderMapper orderMapper; /** * 消费支付成功消息 */ @RocketMQMessageListener( topic = "payment_success_topic", consumerGroup = "order_consumer_group" ) public class OrderMessageListener implements RocketMQListener<String> { @Override public void onMessage(String message) { PaymentEvent event = JSON.parseObject(message, PaymentEvent.class); String orderId = event.getOrderId(); log.info("收到支付成功消息:{}", orderId); try { // 1. 幂等性检查 Order order = orderMapper.selectByOrderId(orderId); if (order == null) { log.error("订单不存在:{}", orderId); return; } if ("PAID".equals(order.getStatus())) { log.warn("订单已支付,跳过处理:{}", orderId); return; // 幂等性保证 } // 2. 更新订单状态 order.setStatus("PAID"); order.setUpdatedAt(new Date()); int rows = orderMapper.updateById(order); if (rows > 0) { log.info("订单状态更新成功:{}", orderId); } else { log.error("订单状态更新失败:{}", orderId); throw new RuntimeException("更新失败"); } } catch (Exception e) { log.error("处理支付消息异常:{}", orderId, e); throw new RuntimeException(e); // 触发重试 } } } } 2.4 积分服务实现 @Service public class PointsConsumerService { @Autowired private UserPointsMapper userPointsMapper; @Autowired private PointsRecordMapper pointsRecordMapper; /** * 消费支付成功消息,增加积分 */ @RocketMQMessageListener( topic = "payment_success_topic", consumerGroup = "points_consumer_group" ) public class PointsMessageListener implements RocketMQListener<String> { @Override @Transactional public void onMessage(String message) { PaymentEvent event = JSON.parseObject(message, PaymentEvent.class); String orderId = event.getOrderId(); Long userId = event.getUserId(); log.info("收到支付成功消息,增加积分:用户={}, 订单={}", userId, orderId); try { // 1. 幂等性检查:是否已经增加过积分 PointsRecord existing = pointsRecordMapper.selectByOrderId(orderId); if (existing != null) { log.warn("积分已增加,跳过处理:{}", orderId); return; } // 2. 计算积分(1元=1积分) int points = event.getAmount().intValue(); // 3. 增加积分 int rows = userPointsMapper.increasePoints(userId, points); if (rows == 0) { // 用户不存在,创建记录 UserPoints userPoints = new UserPoints(); userPoints.setUserId(userId); userPoints.setPoints(points); userPointsMapper.insert(userPoints); } // 4. 记录积分变更 PointsRecord record = new PointsRecord(); record.setUserId(userId); record.setOrderId(orderId); record.setPoints(points); record.setReason("支付获得"); pointsRecordMapper.insert(record); log.info("积分增加成功:用户={}, 积分={}", userId, points); } catch (DuplicateKeyException e) { // 重复处理,直接返回 log.warn("积分记录重复:{}", orderId); } catch (Exception e) { log.error("积分增加异常:{}", orderId, e); throw new RuntimeException(e); } } } } 三、测试验证 3.1 正常流程测试 @SpringBootTest public class PaymentTransactionTest { @Autowired private PaymentService paymentService; @Test public void testNormalPayment() throws InterruptedException { // 1. 发起支付 PaymentRequest request = new PaymentRequest(); request.setOrderId("ORDER_001"); request.setUserId(10001L); request.setAmount(new BigDecimal("99.00")); PaymentResult result = paymentService.processPay(request); // 2. 验证支付结果 assertTrue(result.isSuccess()); // 3. 等待消息消费(实际生产环境通过监控确认) Thread.sleep(5000); // 4. 验证订单状态 Order order = orderMapper.selectByOrderId("ORDER_001"); assertEquals("PAID", order.getStatus()); // 5. 验证积分 UserPoints points = userPointsMapper.selectByUserId(10001L); assertTrue(points.getPoints() >= 99); } } 3.2 异常场景测试 @Test public void testPaymentFailure() { // 场景1:支付失败 // 预期:不记录支付流水,不发送消息 // 场景2:记录支付流水失败 // 预期:回滚消息,不通知下游服务 // 场景3:Producer宕机 // 预期:回查机制保证消息最终发送 // 场景4:Consumer消费失败 // 预期:自动重试,直到成功 } 四、最佳实践总结 4.1 关键要点 1. 幂等性设计 - 使用唯一键约束 - 消费前检查是否已处理 - 数据库层面保证 2. 本地事务设计 - 尽量简单 - 避免远程调用 - 快速返回 3. 回查逻辑设计 - 通过数据库查询事务状态 - 不依赖缓存 - 保证幂等 4. 异常处理 - 明确的Commit/Rollback - 未知异常返回UNKNOW - 记录详细日志 4.2 注意事项 ❌ 避免: - 本地事务中调用RPC - 本地事务执行时间过长 - 回查逻辑依赖不可靠数据源 ✅ 推荐: - 本地事务只操作数据库 - 控制本地事务在1秒内 - 回查直接查询数据库 五、总结 本文通过订单支付场景,完整展示了事务消息的实战应用: ...

2025-11-14 · maneng

RocketMQ进阶01:事务消息原理与实现 - 分布式事务的终极方案

引言:分布式事务的困境 在分布式系统中,跨服务的事务一致性是一个经典难题: 典型场景:电商下单扣库存 订单服务:创建订单 ✅ 库存服务:扣减库存 ❌(网络超时) 问题:订单已创建,但库存未扣减 → 数据不一致 RocketMQ 的事务消息提供了一种优雅的解决方案。 一、什么是事务消息? 1.1 核心概念 ┌────────────────────────────────────────────┐ │ 事务消息 vs 普通消息 │ ├────────────────────────────────────────────┤ │ │ │ 普通消息: │ │ 发送 → 立即可消费 │ │ │ │ 事务消息: │ │ 发送 → 暂不可消费(Half消息) │ │ → 执行本地事务 │ │ → 提交/回滚 │ │ → 可消费/删除 │ │ │ └────────────────────────────────────────────┘ 1.2 应用场景 场景1:订单创建 + 扣库存 - 订单服务:创建订单(本地事务) - 发送事务消息 → 库存服务扣库存 场景2:支付成功 + 积分增加 - 支付服务:记录支付(本地事务) - 发送事务消息 → 积分服务增加积分 场景3:注册用户 + 发送邮件 - 用户服务:创建用户(本地事务) - 发送事务消息 → 邮件服务发送欢迎邮件 二、事务消息原理 2.1 两阶段提交流程 ┌────────────────────────────────────────────────────────┐ │ 事务消息完整流程 │ ├────────────────────────────────────────────────────────┤ │ │ │ Producer Broker Consumer │ │ │ │ │ │ │ │ 1. 发送Half消息 │ │ │ │ ├───────────────────▶│ │ │ │ │ │ 存储Half消息 │ │ │ │ │ (不可消费) │ │ │ │ 2. 返回成功 │ │ │ │ ◀────────────────────┤ │ │ │ │ │ │ │ │ │ 3. 执行本地事务 │ │ │ │ │ (创建订单) │ │ │ │ │ │ │ │ │ │ 4. Commit/Rollback │ │ │ │ ├───────────────────▶│ │ │ │ │ │ │ │ │ │ │ 如果Commit: │ │ │ │ │ - 转为正常消息 │ │ │ │ │ - 可被消费 │ │ │ │ ├──────────────────▶│ │ │ │ │ │ 5. 消费 │ │ │ │ │ (扣库存) │ │ │ │ │ │ │ │ │ 如果Rollback: │ │ │ │ │ - 删除Half消息 │ │ │ │ │ - 不可消费 │ │ │ │ │ │ │ └────────────────────────────────────────────────────────┘ 2.2 事务回查机制 问题:Producer 执行完本地事务后,来不及发送 Commit 就宕机了怎么办? 解决:Broker 定期回查事务状态 ┌────────────────────────────────────────────┐ │ 事务回查流程 │ ├────────────────────────────────────────────┤ │ │ │ Broker Producer │ │ │ │ │ │ │ 1. 检测到超时的Half消息 │ │ │ │ │ │ │ │ 2. 发起事务状态回查 │ │ │ ├────────────────────────▶│ │ │ │ │ │ │ │ │ 3. 查询本地 │ │ │ │ 事务状态 │ │ │ │ │ │ │ 4. 返回事务状态 │ │ │ │ (COMMIT/ROLLBACK/ │ │ │ │ UNKNOWN) │ │ │ ◀────────────────────────┤ │ │ │ │ │ │ │ 5. 根据状态处理Half消息 │ │ │ │ │ │ └────────────────────────────────────────────┘ 三、代码实现 3.1 Producer 端实现 public class TransactionProducerExample { public static void main(String[] args) throws Exception { // 1. 创建事务消息生产者 TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group"); producer.setNamesrvAddr("localhost:9876"); // 2. 设置事务监听器 producer.setTransactionListener(new TransactionListener() { /** * 执行本地事务 * @param msg 消息 * @param arg 用户参数 * @return 本地事务执行结果 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String orderId = new String(msg.getBody()); System.out.println("执行本地事务,创建订单:" + orderId); try { // 模拟创建订单 boolean success = createOrder(orderId); if (success) { // 本地事务成功 → Commit System.out.println("订单创建成功,提交事务消息"); return LocalTransactionState.COMMIT_MESSAGE; } else { // 本地事务失败 → Rollback System.out.println("订单创建失败,回滚事务消息"); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { // 异常 → 未知状态,等待回查 System.out.println("订单创建异常,等待回查:" + e.getMessage()); return LocalTransactionState.UNKNOW; } } /** * 事务状态回查 * @param msg 消息 * @return 事务状态 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String orderId = new String(msg.getBody()); System.out.println("回查事务状态,订单ID:" + orderId); // 查询订单是否创建成功 boolean exists = checkOrderExists(orderId); if (exists) { // 订单存在 → Commit System.out.println("订单已存在,提交事务消息"); return LocalTransactionState.COMMIT_MESSAGE; } else { // 订单不存在 → Rollback System.out.println("订单不存在,回滚事务消息"); return LocalTransactionState.ROLLBACK_MESSAGE; } } }); // 3. 启动生产者 producer.start(); // 4. 发送事务消息 String orderId = "ORDER_" + System.currentTimeMillis(); Message msg = new Message( "order_topic", "create", orderId.getBytes() ); TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.println("发送结果:" + sendResult.getSendStatus()); System.out.println("本地事务状态:" + sendResult.getLocalTransactionState()); // 等待回查 Thread.sleep(60000); producer.shutdown(); } // 模拟创建订单 private static boolean createOrder(String orderId) { // 实际业务逻辑:插入订单表 System.out.println("INSERT INTO orders VALUES ('" + orderId + "', ...)"); return true; // 假设成功 } // 检查订单是否存在 private static boolean checkOrderExists(String orderId) { // 实际业务逻辑:查询订单表 System.out.println("SELECT * FROM orders WHERE id = '" + orderId + "'"); return true; // 假设存在 } } 3.2 Consumer 端实现 public class TransactionConsumerExample { public static void main(String[] args) throws Exception { // 1. 创建消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("inventory_consumer_group"); consumer.setNamesrvAddr("localhost:9876"); // 2. 订阅Topic consumer.subscribe("order_topic", "*"); // 3. 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context ) { for (MessageExt msg : msgs) { String orderId = new String(msg.getBody()); System.out.println("收到订单创建消息:" + orderId); try { // 扣减库存 boolean success = deductInventory(orderId); if (success) { System.out.println("库存扣减成功"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } else { System.out.println("库存扣减失败,重试"); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } catch (Exception e) { System.out.println("库存扣减异常,重试:" + e.getMessage()); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 4. 启动消费者 consumer.start(); System.out.println("库存服务启动成功"); } // 扣减库存 private static boolean deductInventory(String orderId) { // 实际业务逻辑:扣减库存 System.out.println("UPDATE inventory SET stock = stock - 1 WHERE order_id = '" + orderId + "'"); return true; } } 四、存储机制 4.1 Half消息存储 // Half消息存储在特殊Topic中 public static final String SYSTEM_TRANSHALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC"; // 消息发送到Broker时 public PutMessageResult putMessage(MessageExtBrokerInner msgInner) { // 如果是事务消息 if (MessageSysFlag.TRANSACTION_PREPARED_TYPE == tranType) { // 修改Topic为Half Topic msgInner.setTopic(SYSTEM_TRANSHALF_TOPIC); msgInner.setQueueId(0); } // 存储到CommitLog PutMessageResult result = this.commitLog.putMessage(msgInner); return result; } 4.2 Op消息存储 // Op消息用于标记Half消息的状态 public static final String SYSTEM_TRANSOP_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC"; // Commit/Rollback时写入Op消息 public void putOpMessage(MessageExtBrokerInner msgInner, String opType) { msgInner.setTopic(SYSTEM_TRANSOP_TOPIC); msgInner.setBody(opType.getBytes()); // 存储Op消息 this.commitLog.putMessage(msgInner); } // Op消息的作用: // 1. 记录Half消息已被Commit或Rollback // 2. 避免重复回查 4.3 事务状态存储 Half消息 + Op消息 组合判断事务状态: ┌──────────────────────────────────────────┐ │ 事务状态判断逻辑 │ ├──────────────────────────────────────────┤ │ Half消息存在 + Op消息不存在 │ │ → 事务未完成,需要回查 │ │ │ │ Half消息存在 + Op消息=COMMIT │ │ → 事务已提交,消息可消费 │ │ │ │ Half消息存在 + Op消息=ROLLBACK │ │ → 事务已回滚,消息已删除 │ └──────────────────────────────────────────┘ 五、回查机制实现 5.1 回查触发条件 public class TransactionalMessageCheckService { // 回查配置 private static final long TRANSACTION_TIMEOUT = 6000; // 6秒超时 private static final int MAX_CHECK_TIMES = 15; // 最多回查15次 // 定时扫描Half消息 public void check() { try { // 1. 从Half Topic读取消息 GetResult getResult = getHalfMsg(queueId, offset, nums); for (MessageExt msgExt : getResult.getMsg()) { // 2. 检查是否有对应的Op消息 if (isOpMessageExist(msgExt)) { continue; // 已处理,跳过 } // 3. 检查超时时间 long currentTime = System.currentTimeMillis(); long tranTime = msgExt.getStoreTimestamp(); if (currentTime - tranTime < TRANSACTION_TIMEOUT) { continue; // 未超时,跳过 } // 4. 检查回查次数 int checkTimes = msgExt.getReconsumeTimes(); if (checkTimes >= MAX_CHECK_TIMES) { // 超过最大回查次数,强制回滚 this.resolveDiscardMsg(msgExt); continue; } // 5. 发起回查 this.sendCheckMessage(msgExt); } } catch (Exception e) { log.error("Check transaction error", e); } } // 发送回查请求 private void sendCheckMessage(MessageExt msgExt) { // 构建回查请求 CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader(); requestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); requestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); // 发送到Producer this.brokerController.getBroker2Client().checkProducerTransactionState( msgExt.getBornHostString(), requestHeader, msgExt ); } } 5.2 回查响应处理 // Producer端处理回查请求 public class ClientRemotingProcessor implements NettyRequestProcessor { @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { switch (request.getCode()) { case RequestCode.CHECK_TRANSACTION_STATE: return this.checkTransactionState(ctx, request); default: break; } return null; } // 回查事务状态 private RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) { // 1. 解析请求 CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader( CheckTransactionStateRequestHeader.class); // 2. 获取消息 MessageExt message = MessageDecoder.decode(request.getBody()); // 3. 调用用户的TransactionListener TransactionListener transactionListener = this.getTransactionListener(); if (transactionListener != null) { LocalTransactionState localTransactionState = transactionListener.checkLocalTransaction(message); // 4. 返回事务状态 EndTransactionRequestHeader responseHeader = new EndTransactionRequestHeader(); responseHeader.setTransactionId(requestHeader.getTransactionId()); responseHeader.setCommitLogOffset(requestHeader.getCommitLogOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: responseHeader.setCommitOrRollback( MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: responseHeader.setCommitOrRollback( MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: responseHeader.setCommitOrRollback( MessageSysFlag.TRANSACTION_NOT_TYPE); break; } // 5. 发送响应 this.brokerController.getBroker2Client().endTransactionOneway( message.getBrokerAddr(), responseHeader, "Check transaction state from broker" ); } return null; } } 六、最佳实践 6.1 幂等性保证 // 问题:消息可能被重复消费,如何保证幂等? // 方案1:唯一键约束 public boolean deductInventory(String orderId) { try { // 使用orderId作为唯一键 jdbcTemplate.update( "INSERT INTO inventory_log (order_id, status) VALUES (?, 'DEDUCTED')", orderId ); // 扣减库存 jdbcTemplate.update( "UPDATE inventory SET stock = stock - 1 WHERE product_id = ?", productId ); return true; } catch (DuplicateKeyException e) { // 重复消费,直接返回成功 return true; } } // 方案2:分布式锁 public boolean deductInventory(String orderId) { String lockKey = "inventory:lock:" + orderId; if (redisLock.tryLock(lockKey, 10, TimeUnit.SECONDS)) { try { // 检查是否已处理 if (isProcessed(orderId)) { return true; } // 扣减库存 updateInventory(orderId); // 标记已处理 markAsProcessed(orderId); return true; } finally { redisLock.unlock(lockKey); } } return false; } 6.2 异常处理 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { // 执行本地事务 executeBusinessLogic(msg); return LocalTransactionState.COMMIT_MESSAGE; } catch (BusinessException e) { // 业务异常 → 明确回滚 log.error("Business exception, rollback transaction", e); return LocalTransactionState.ROLLBACK_MESSAGE; } catch (Exception e) { // 未知异常 → 返回UNKNOW,等待回查 log.error("Unknown exception, wait for check", e); return LocalTransactionState.UNKNOW; } } 6.3 性能优化 # Producer配置 # 事务回查最大次数 transactionCheckMax=15 # 回查间隔时间(毫秒) transactionCheckInterval=6000 # 事务消息过期时间(小时) transactionTimeOut=6 # 异步发送(提高吞吐量) sendMsgTimeout=3000 retryTimesWhenSendFailed=2 七、总结与思考 7.1 核心要点 两阶段提交:Half消息 + 本地事务 + Commit/Rollback 回查机制:保证事务最终一致性 存储设计:Half Topic + Op Topic 组合判断状态 幂等性:Consumer端需要保证消息幂等处理 7.2 思考题 为什么需要Op消息? ...

2025-11-14 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...