引言:为什么需要顺序消息?

场景:订单状态流转

创建订单 → 支付订单 → 发货 → 确认收货

如果消息乱序:
支付 → 创建 → 发货 ❌  业务错误

RocketMQ 提供顺序消息保证消息按照发送顺序消费。


一、顺序消息类型

1.1 全局有序 vs 局部有序

┌──────────────────────────────────────────────┐
│           全局有序(Global Order)            │
├──────────────────────────────────────────────┤
│ 特点:所有消息严格按照FIFO顺序                 │
│ 实现:单Queue + 单Consumer线程                │
│ 性能:低(吞吐量受限)                        │
│ 场景:极少使用                                │
│                                               │
│  Queue0: Msg1 → Msg2 → Msg3 → Msg4           │
│                                               │
└──────────────────────────────────────────────┘

┌──────────────────────────────────────────────┐
│          局部有序(Partition Order)          │
├──────────────────────────────────────────────┤
│ 特点:相同Key的消息有序                       │
│ 实现:相同Key路由到同一Queue                  │
│ 性能:高(并发消费)                          │
│ 场景:常用(订单、用户维度有序)              │
│                                               │
│  Queue0: Order1-Msg1 → Order1-Msg2           │
│  Queue1: Order2-Msg1 → Order2-Msg2           │
│  Queue2: Order3-Msg1 → Order3-Msg2           │
│                                               │
└──────────────────────────────────────────────┘

1.2 实现原理

Producer端保证:
1. 相同orderKey的消息发送到同一个Queue
2. 使用MessageQueueSelector选择Queue

Broker端保证:
1. 单个Queue内部FIFO存储
2. CommitLog顺序写

Consumer端保证:
1. 单个Queue只被一个Consumer实例消费
2. 单个Queue内的消息单线程顺序消费

二、代码实现

2.1 顺序发送

public class OrderedProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("ordered_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        // 模拟订单消息
        List<OrderStep> orderSteps = buildOrders();

        for (OrderStep step : orderSteps) {
            Message msg = new Message(
                "order_topic",
                step.getTags(),
                step.toString().getBytes()
            );

            // 顺序发送:相同orderId的消息发送到同一个Queue
            SendResult sendResult = producer.send(
                msg,
                new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(
                        List<MessageQueue> mqs,
                        Message msg,
                        Object arg
                    ) {
                        Long orderId = (Long) arg;
                        // Hash取模,确保相同orderId路由到同一Queue
                        int index = (int) (orderId % mqs.size());
                        return mqs.get(index);
                    }
                },
                step.getOrderId()  // orderKey
            );

            System.out.printf("发送:%s, Queue=%d%n",
                step, sendResult.getMessageQueue().getQueueId());
        }

        producer.shutdown();
    }

    // 构建订单步骤
    private static List<OrderStep> buildOrders() {
        List<OrderStep> steps = new ArrayList<>();

        // 订单1001
        steps.add(new OrderStep(1001L, "创建"));
        steps.add(new OrderStep(1001L, "支付"));
        steps.add(new OrderStep(1001L, "发货"));

        // 订单1002
        steps.add(new OrderStep(1002L, "创建"));
        steps.add(new OrderStep(1002L, "支付"));

        // 订单1003
        steps.add(new OrderStep(1003L, "创建"));

        return steps;
    }
}

class OrderStep {
    private Long orderId;
    private String status;

    public OrderStep(Long orderId, String status) {
        this.orderId = orderId;
        this.status = status;
    }

    public Long getOrderId() {
        return orderId;
    }

    public String getTags() {
        return status;
    }

    @Override
    public String toString() {
        return "Order=" + orderId + ", Status=" + status;
    }
}

2.2 顺序消费

public class OrderedConsumer {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("order_topic", "*");

        // 注册顺序消息监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {

            @Override
            public ConsumeOrderlyStatus consumeMessage(
                List<MessageExt> msgs,
                ConsumeOrderlyContext context
            ) {
                // 单线程顺序消费
                for (MessageExt msg : msgs) {
                    System.out.printf(
                        "消费:Thread=%s, Queue=%d, Msg=%s%n",
                        Thread.currentThread().getName(),
                        msg.getQueueId(),
                        new String(msg.getBody())
                    );
                }

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.println("顺序消费启动成功");
    }
}

输出示例

发送:Order=1001, Status=创建, Queue=1
发送:Order=1001, Status=支付, Queue=1
发送:Order=1001, Status=发货, Queue=1
发送:Order=1002, Status=创建, Queue=2
发送:Order=1002, Status=支付, Queue=2

消费:Thread=ConsumeMessageThread_1, Queue=1, Msg=Order=1001, Status=创建
消费:Thread=ConsumeMessageThread_1, Queue=1, Msg=Order=1001, Status=支付
消费:Thread=ConsumeMessageThread_1, Queue=1, Msg=Order=1001, Status=发货
消费:Thread=ConsumeMessageThread_2, Queue=2, Msg=Order=1002, Status=创建
消费:Thread=ConsumeMessageThread_2, Queue=2, Msg=Order=1002, Status=支付

三、实现原理深度剖析

3.1 Queue级别锁机制

public class MessageListenerOrderly {

    // 消费时对Queue加锁
    public ConsumeOrderlyStatus consumeMessage(
        List<MessageExt> msgs,
        ConsumeOrderlyContext context
    ) {
        // 1. 获取Queue锁(本地锁 + Broker锁)
        MessageQueue mq = context.getMessageQueue();
        Object lock = this.processQueueTable.get(mq);

        synchronized (lock) {  // Queue级别锁
            // 2. 顺序消费消息
            for (MessageExt msg : msgs) {
                // 处理消息
            }

            // 3. 提交消费进度
            this.updateOffset(mq, msgs);
        }

        return ConsumeOrderlyStatus.SUCCESS;
    }
}

3.2 Broker端分布式锁

// Consumer启动时向Broker申请Queue锁
public class RebalanceImpl {

    public void lock(MessageQueue mq) {
        // 向Broker发送锁定请求
        Set<MessageQueue> lockedMqs =
            this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(
                brokerAddr,
                requestBody,
                1000
            );

        // 锁定成功,开始消费
        if (lockedMqs.contains(mq)) {
            processQueue.setLocked(true);
        }
    }
}

四、性能对比

对比项顺序消息普通消息
吞吐量低(单线程)高(多线程并发)
延迟略高
消费模式串行并行
适用场景强顺序要求高吞吐要求

五、最佳实践

5.1 选择合适的顺序级别

场景1:交易流水
- 需求:同一用户的交易有序
- 方案:局部有序(userId作为orderKey)

场景2:配置下发
- 需求:所有配置按时间顺序下发
- 方案:全局有序(单Queue)

场景3:日志采集
- 需求:无顺序要求
- 方案:普通消息(性能最高)

5.2 注意事项

❌ 避免:
- orderKey选择过于分散(导致Queue分布不均)
- orderKey选择过于集中(导致单Queue压力大)
- 消费逻辑耗时过长(阻塞后续消息)

✅ 推荐:
- orderKey选择均衡(如userId, orderId)
- 消费逻辑简单快速
- 异常处理完善(避免死循环)

六、总结

  1. 局部有序:相同Key消息有序,性能高,推荐使用
  2. 全局有序:所有消息有序,性能低,极少使用
  3. 实现原理:Queue级别锁 + 单线程消费
  4. 性能权衡:顺序性 vs 吞吐量

下一篇:通过订单状态流转案例,实战顺序消息!


本文关键词顺序消息 局部有序 全局有序 FIFO Queue锁

专题导航上一篇:事务消息实战 | 下一篇:顺序消息实战