引言:为什么需要顺序消息? 场景:订单状态流转
创建订单 → 支付订单 → 发货 → 确认收货 如果消息乱序: 支付 → 创建 → 发货 ❌ 业务错误 RocketMQ 提供顺序消息保证消息按照发送顺序消费。
一、顺序消息类型 1.1 全局有序 vs 局部有序 ┌──────────────────────────────────────────────┐ │ 全局有序(Global Order) │ ├──────────────────────────────────────────────┤ │ 特点:所有消息严格按照FIFO顺序 │ │ 实现:单Queue + 单Consumer线程 │ │ 性能:低(吞吐量受限) │ │ 场景:极少使用 │ │ │ │ Queue0: Msg1 → Msg2 → Msg3 → Msg4 │ │ │ └──────────────────────────────────────────────┘ ┌──────────────────────────────────────────────┐ │ 局部有序(Partition Order) │ ├──────────────────────────────────────────────┤ │ 特点:相同Key的消息有序 │ │ 实现:相同Key路由到同一Queue │ │ 性能:高(并发消费) │ │ 场景:常用(订单、用户维度有序) │ │ │ │ Queue0: Order1-Msg1 → Order1-Msg2 │ │ Queue1: Order2-Msg1 → Order2-Msg2 │ │ Queue2: Order3-Msg1 → Order3-Msg2 │ │ │ └──────────────────────────────────────────────┘ 1.2 实现原理 Producer端保证: 1. 相同orderKey的消息发送到同一个Queue 2. 使用MessageQueueSelector选择Queue Broker端保证: 1. 单个Queue内部FIFO存储 2. CommitLog顺序写 Consumer端保证: 1. 单个Queue只被一个Consumer实例消费 2. 单个Queue内的消息单线程顺序消费 二、代码实现 2.1 顺序发送 public class OrderedProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ordered_producer_group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); // 模拟订单消息 List<OrderStep> orderSteps = buildOrders(); for (OrderStep step : orderSteps) { Message msg = new Message( "order_topic", step.getTags(), step.toString().getBytes() ); // 顺序发送:相同orderId的消息发送到同一个Queue SendResult sendResult = producer.send( msg, new MessageQueueSelector() { @Override public MessageQueue select( List<MessageQueue> mqs, Message msg, Object arg ) { Long orderId = (Long) arg; // Hash取模,确保相同orderId路由到同一Queue int index = (int) (orderId % mqs.size()); return mqs.get(index); } }, step.getOrderId() // orderKey ); System.out.printf("发送:%s, Queue=%d%n", step, sendResult.getMessageQueue().getQueueId()); } producer.shutdown(); } // 构建订单步骤 private static List<OrderStep> buildOrders() { List<OrderStep> steps = new ArrayList<>(); // 订单1001 steps.add(new OrderStep(1001L, "创建")); steps.add(new OrderStep(1001L, "支付")); steps.add(new OrderStep(1001L, "发货")); // 订单1002 steps.add(new OrderStep(1002L, "创建")); steps.add(new OrderStep(1002L, "支付")); // 订单1003 steps.add(new OrderStep(1003L, "创建")); return steps; } } class OrderStep { private Long orderId; private String status; public OrderStep(Long orderId, String status) { this.orderId = orderId; this.status = status; } public Long getOrderId() { return orderId; } public String getTags() { return status; } @Override public String toString() { return "Order=" + orderId + ", Status=" + status; } } 2.2 顺序消费 public class OrderedConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer_group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("order_topic", "*"); // 注册顺序消息监听器 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage( List<MessageExt> msgs, ConsumeOrderlyContext context ) { // 单线程顺序消费 for (MessageExt msg : msgs) { System.out.printf( "消费:Thread=%s, Queue=%d, Msg=%s%n", Thread.currentThread().getName(), msg.getQueueId(), new String(msg.getBody()) ); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("顺序消费启动成功"); } } 输出示例:
...