RocketMQ进阶04:顺序消息实战 - 订单状态流转的有序保证

引言:订单状态机 订单状态必须按照固定流程流转: 待支付 → 已支付 → 已发货 → 已完成 使用顺序消息保证状态流转的正确性。 实现代码 // Producer:发送订单状态变更消息 public class OrderStatusProducer { public void changeStatus(Long orderId, String newStatus) { Message msg = new Message("order_status_topic", newStatus.getBytes()); // 使用orderId作为orderKey,保证同一订单消息有序 producer.send(msg, (mqs, msg1, arg) -> { int index = (int) (orderId % mqs.size()); return mqs.get(index); }, orderId); } } // Consumer:顺序处理状态变更 @RocketMQMessageListener( topic = "order_status_topic", consumerGroup = "order_status_group", consumeMode = ConsumeMode.ORDERLY // 顺序消费 ) public class OrderStatusConsumer implements RocketMQListener<String> { @Override public void onMessage(String newStatus) { // 验证状态流转合法性 if (isValidTransition(currentStatus, newStatus)) { updateOrderStatus(orderId, newStatus); } else { log.error("非法状态流转"); } } } 核心要点 orderKey选择:使用orderId保证同一订单有序 状态验证:Consumer端验证状态流转合法性 异常处理:非法状态流转记录日志并告警 本文关键词:顺序消息实战 订单状态 状态机 有序流转

2025-11-14 · maneng

RocketMQ进阶03:顺序消息原理 - 局部有序与全局有序的设计权衡

引言:为什么需要顺序消息? 场景:订单状态流转 创建订单 → 支付订单 → 发货 → 确认收货 如果消息乱序: 支付 → 创建 → 发货 ❌ 业务错误 RocketMQ 提供顺序消息保证消息按照发送顺序消费。 一、顺序消息类型 1.1 全局有序 vs 局部有序 ┌──────────────────────────────────────────────┐ │ 全局有序(Global Order) │ ├──────────────────────────────────────────────┤ │ 特点:所有消息严格按照FIFO顺序 │ │ 实现:单Queue + 单Consumer线程 │ │ 性能:低(吞吐量受限) │ │ 场景:极少使用 │ │ │ │ Queue0: Msg1 → Msg2 → Msg3 → Msg4 │ │ │ └──────────────────────────────────────────────┘ ┌──────────────────────────────────────────────┐ │ 局部有序(Partition Order) │ ├──────────────────────────────────────────────┤ │ 特点:相同Key的消息有序 │ │ 实现:相同Key路由到同一Queue │ │ 性能:高(并发消费) │ │ 场景:常用(订单、用户维度有序) │ │ │ │ Queue0: Order1-Msg1 → Order1-Msg2 │ │ Queue1: Order2-Msg1 → Order2-Msg2 │ │ Queue2: Order3-Msg1 → Order3-Msg2 │ │ │ └──────────────────────────────────────────────┘ 1.2 实现原理 Producer端保证: 1. 相同orderKey的消息发送到同一个Queue 2. 使用MessageQueueSelector选择Queue Broker端保证: 1. 单个Queue内部FIFO存储 2. CommitLog顺序写 Consumer端保证: 1. 单个Queue只被一个Consumer实例消费 2. 单个Queue内的消息单线程顺序消费 二、代码实现 2.1 顺序发送 public class OrderedProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ordered_producer_group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); // 模拟订单消息 List<OrderStep> orderSteps = buildOrders(); for (OrderStep step : orderSteps) { Message msg = new Message( "order_topic", step.getTags(), step.toString().getBytes() ); // 顺序发送:相同orderId的消息发送到同一个Queue SendResult sendResult = producer.send( msg, new MessageQueueSelector() { @Override public MessageQueue select( List<MessageQueue> mqs, Message msg, Object arg ) { Long orderId = (Long) arg; // Hash取模,确保相同orderId路由到同一Queue int index = (int) (orderId % mqs.size()); return mqs.get(index); } }, step.getOrderId() // orderKey ); System.out.printf("发送:%s, Queue=%d%n", step, sendResult.getMessageQueue().getQueueId()); } producer.shutdown(); } // 构建订单步骤 private static List<OrderStep> buildOrders() { List<OrderStep> steps = new ArrayList<>(); // 订单1001 steps.add(new OrderStep(1001L, "创建")); steps.add(new OrderStep(1001L, "支付")); steps.add(new OrderStep(1001L, "发货")); // 订单1002 steps.add(new OrderStep(1002L, "创建")); steps.add(new OrderStep(1002L, "支付")); // 订单1003 steps.add(new OrderStep(1003L, "创建")); return steps; } } class OrderStep { private Long orderId; private String status; public OrderStep(Long orderId, String status) { this.orderId = orderId; this.status = status; } public Long getOrderId() { return orderId; } public String getTags() { return status; } @Override public String toString() { return "Order=" + orderId + ", Status=" + status; } } 2.2 顺序消费 public class OrderedConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer_group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("order_topic", "*"); // 注册顺序消息监听器 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage( List<MessageExt> msgs, ConsumeOrderlyContext context ) { // 单线程顺序消费 for (MessageExt msg : msgs) { System.out.printf( "消费:Thread=%s, Queue=%d, Msg=%s%n", Thread.currentThread().getName(), msg.getQueueId(), new String(msg.getBody()) ); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("顺序消费启动成功"); } } 输出示例: ...

2025-11-14 · maneng

RocketMQ实战指南:阿里巴巴的分布式消息方案

为什么RocketMQ能支撑双11万亿级消息?如何使用事务消息实现分布式事务?如何保证消息的全局顺序? 本文深度剖析RocketMQ的核心特性和实战应用。 一、RocketMQ核心架构 1.1 核心组件 Producer → NameServer → Broker → Consumer ↓ ↓ 路由信息 消息存储 与Kafka的区别: 组件 RocketMQ Kafka 注册中心 NameServer(自研,轻量级) ZooKeeper / KRaft 存储结构 CommitLog + ConsumeQueue Partition日志文件 消息模型 点对点 + 发布订阅 发布订阅 特殊功能 事务消息、延迟消息、顺序消息 流式计算、消息回溯 1.2 存储架构 RocketMQ的三层存储: 1. CommitLog(所有消息) ├─ Message 1(Topic A) ├─ Message 2(Topic B) ├─ Message 3(Topic A) ... 2. ConsumeQueue(消息索引) Topic A ├─ Queue 0 │ ├─ Offset 0 → CommitLog位置100 │ ├─ Offset 1 → CommitLog位置300 │ ... └─ Queue 1 ... 3. IndexFile(消息检索) Key → CommitLog位置 优势: ...

2025-11-03 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...