为什么RocketMQ能支撑双11万亿级消息?如何使用事务消息实现分布式事务?如何保证消息的全局顺序?

本文深度剖析RocketMQ的核心特性和实战应用。


一、RocketMQ核心架构

1.1 核心组件

Producer → NameServer → Broker → Consumer
               ↓           ↓
            路由信息    消息存储

与Kafka的区别

组件RocketMQKafka
注册中心NameServer(自研,轻量级)ZooKeeper / KRaft
存储结构CommitLog + ConsumeQueuePartition日志文件
消息模型点对点 + 发布订阅发布订阅
特殊功能事务消息、延迟消息、顺序消息流式计算、消息回溯

1.2 存储架构

RocketMQ的三层存储

1. CommitLog(所有消息)
   ├─ Message 1(Topic A)
   ├─ Message 2(Topic B)
   ├─ Message 3(Topic A)
   ...

2. ConsumeQueue(消息索引)
   Topic A
       ├─ Queue 0
       │   ├─ Offset 0 → CommitLog位置100
       │   ├─ Offset 1 → CommitLog位置300
       │   ...
       └─ Queue 1
           ...

3. IndexFile(消息检索)
   Key → CommitLog位置

优势

  • ✅ 顺序写CommitLog(高性能)
  • ✅ 多个Topic共享CommitLog(不浪费磁盘)
  • ✅ ConsumeQueue轻量级(只存索引,不存消息)

二、事务消息:分布式事务的终极解决方案

2.1 问题场景:订单与支付的一致性

/**
 * 传统方案的问题
 */
public class TraditionalTransaction {

    @Transactional
    public void createOrder(OrderRequest request) {
        // 1. 创建订单(本地事务)
        Order order = orderService.create(request);

        // 2. 发送消息到支付服务(远程调用)
        // 问题:如果发送消息失败怎么办?
        // - 订单已创建
        // - 支付消息未发送
        // - 数据不一致!
        rabbitTemplate.send("payment.queue", new PaymentEvent(order));
    }
}

两个难题

  1. 先提交事务,后发送消息:消息发送失败,订单已创建(数据不一致)
  2. 先发送消息,后提交事务:消息发送成功,事务回滚(数据不一致)

2.2 RocketMQ事务消息

设计思想:二阶段提交 + 事务回查

阶段1:发送半消息(Half Message)
    ↓ 消息不可见
执行本地事务
    ↓ 成功 or 失败
阶段2:提交/回滚消息
    ↓ 提交:消息可见
    ↓ 回滚:删除消息

实现示例

/**
 * RocketMQ事务消息示例
 */
public class TransactionMessageExample {

    public static void main(String[] args) throws Exception {
        // 1. 创建事务监听器
        TransactionListener transactionListener = new TransactionListenerImpl();

        // 2. 创建事务生产者
        TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
        producer.setNamesrvAddr("localhost:9876");

        // 设置事务监听器
        producer.setTransactionListener(transactionListener);

        producer.start();

        // 3. 发送事务消息
        String orderId = "ORD" + System.currentTimeMillis();
        Message message = new Message(
                "payment_topic",
                "payment_tag",
                orderId,
                ("支付订单:" + orderId).getBytes()
        );

        // 发送半消息
        TransactionSendResult result = producer.sendMessageInTransaction(
                message,
                orderId  // 本地事务参数
        );

        System.out.println("发送结果: " + result.getSendStatus());

        Thread.sleep(60000);
        producer.shutdown();
    }

    /**
     * 事务监听器实现
     */
    static class TransactionListenerImpl implements TransactionListener {

        private final Map<String, LocalTransactionState> localTransactionMap = new ConcurrentHashMap<>();

        /**
         * 执行本地事务
         */
        @Override
        public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
            String orderId = (String) arg;

            try {
                // 执行本地事务(创建订单)
                System.out.println("执行本地事务,订单ID: " + orderId);

                // 模拟创建订单
                boolean success = createOrder(orderId);

                if (success) {
                    // 本地事务成功,提交消息
                    localTransactionMap.put(orderId, LocalTransactionState.COMMIT_MESSAGE);
                    return LocalTransactionState.COMMIT_MESSAGE;
                } else {
                    // 本地事务失败,回滚消息
                    localTransactionMap.put(orderId, LocalTransactionState.ROLLBACK_MESSAGE);
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }

            } catch (Exception e) {
                // 本地事务状态未知,等待回查
                System.err.println("本地事务异常: " + e.getMessage());
                localTransactionMap.put(orderId, LocalTransactionState.UNKNOW);
                return LocalTransactionState.UNKNOW;
            }
        }

        /**
         * 事务回查(当本地事务状态未知时,Broker会定期回查)
         */
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt msg) {
            String orderId = msg.getKeys();

            System.out.println("事务回查,订单ID: " + orderId);

            // 查询本地事务状态
            LocalTransactionState state = localTransactionMap.get(orderId);

            if (state != null) {
                return state;
            }

            // 查询数据库,判断订单是否创建成功
            boolean orderExists = checkOrderExists(orderId);

            if (orderExists) {
                return LocalTransactionState.COMMIT_MESSAGE;
            } else {
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }

        private boolean createOrder(String orderId) {
            // 模拟创建订单
            System.out.println("创建订单: " + orderId);

            // 模拟随机成功/失败
            return Math.random() > 0.2;
        }

        private boolean checkOrderExists(String orderId) {
            // 查询数据库,判断订单是否存在
            return localTransactionMap.containsKey(orderId);
        }
    }
}

执行流程

1. Producer发送半消息到Broker
   └─ 半消息不可见,Consumer无法消费

2. Broker返回发送成功
   └─ Producer执行本地事务(创建订单)

3. 根据本地事务结果,提交或回滚消息
   ├─ 成功:Commit → 消息可见,Consumer可以消费
   ├─ 失败:Rollback → 删除半消息
   └─ 未知:UNKNOW → Broker定期回查(最多15次)

4. 如果15次回查都失败,消息进入死信队列

核心优势

RocketMQ事务消息保证了本地事务和消息发送的最终一致性


三、顺序消息:保证消息的严格顺序

3.1 问题场景:订单状态机

订单状态流转:
创建订单 → 支付订单 → 发货 → 完成

问题:如果消息乱序
支付订单(后到达)
创建订单(先到达)
↓
状态机错误!

3.2 顺序消息实现

全局顺序:所有消息严格顺序(性能低,很少使用) 分区顺序:同一分区内的消息严格顺序(推荐)

/**
 * RocketMQ顺序消息示例
 */
public class OrderlyMessageExample {

    /**
     * 发送顺序消息
     */
    public static void sendOrderlyMessage() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("orderly_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        String orderId = "ORD123";

        // 发送3条顺序消息
        String[] events = {"创建订单", "支付订单", "发货"};

        for (String event : events) {
            Message message = new Message(
                    "order_topic",
                    "order_tag",
                    (orderId + ":" + event).getBytes()
            );

            // 发送顺序消息(指定MessageQueueSelector)
            SendResult result = producer.send(
                    message,
                    new MessageQueueSelector() {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            // 根据订单ID选择队列(同一订单ID进入同一队列)
                            String orderId = (String) arg;
                            int index = orderId.hashCode() % mqs.size();
                            return mqs.get(Math.abs(index));
                        }
                    },
                    orderId  // 订单ID(决定队列)
            );

            System.out.println("发送顺序消息: " + event + " → Queue: " + result.getMessageQueue().getQueueId());
        }

        producer.shutdown();
    }

    /**
     * 消费顺序消息
     */
    public static void consumeOrderlyMessage() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("order_topic", "*");

        // 注册顺序消息监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    String content = new String(msg.getBody());
                    System.out.println("消费顺序消息: " + content
                            + ", QueueId: " + msg.getQueueId()
                            + ", Thread: " + Thread.currentThread().getName());

                    // 模拟处理
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }

                // 返回成功
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.println("顺序消费者已启动");

        Thread.sleep(60000);
    }

    public static void main(String[] args) throws Exception {
        // 启动消费者
        new Thread(() -> {
            try {
                consumeOrderlyMessage();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        Thread.sleep(1000);

        // 发送消息
        sendOrderlyMessage();
    }
}

输出示例

发送顺序消息: 创建订单 → Queue: 2
发送顺序消息: 支付订单 → Queue: 2
发送顺序消息: 发货 → Queue: 2

消费顺序消息: ORD123:创建订单, QueueId: 2, Thread: ConsumeMessageThread_1
消费顺序消息: ORD123:支付订单, QueueId: 2, Thread: ConsumeMessageThread_1
消费顺序消息: ORD123:发货, QueueId: 2, Thread: ConsumeMessageThread_1

核心原理

同一订单ID → Hash → 同一Queue → 单线程消费 → 保证顺序

四、延迟消息:定时任务的最佳实践

4.1 问题场景:订单超时自动取消

用户下单 → 30分钟内未支付 → 自动取消订单

传统方案的问题

// 方案1:定时任务扫描数据库(性能差)
@Scheduled(fixedDelay = 60000)
public void cancelTimeoutOrders() {
    // 每分钟扫描一次数据库
    List<Order> orders = orderRepository.findTimeoutOrders();
    for (Order order : orders) {
        orderService.cancel(order);
    }
}

问题
- 数据库压力大每分钟扫描
- 实时性差最多延迟1分钟
- 扩展性差数据库成为瓶颈

4.2 RocketMQ延迟消息

延迟级别(18个级别):

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

实现示例

/**
 * RocketMQ延迟消息示例:订单超时取消
 */
public class DelayMessageExample {

    /**
     * 发送延迟消息
     */
    public static void sendDelayMessage() throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("delay_producer_group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();

        String orderId = "ORD" + System.currentTimeMillis();

        Message message = new Message(
                "order_timeout_topic",
                "timeout_tag",
                orderId.getBytes()
        );

        // 设置延迟级别:14 = 10分钟
        message.setDelayTimeLevel(14);

        SendResult result = producer.send(message);

        System.out.println("发送延迟消息: 订单ID=" + orderId + ", 10分钟后执行");
        System.out.println("发送结果: " + result.getSendStatus());

        producer.shutdown();
    }

    /**
     * 消费延迟消息
     */
    public static void consumeDelayMessage() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer_group");
        consumer.setNamesrvAddr("localhost:9876");
        consumer.subscribe("order_timeout_topic", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {

                for (MessageExt msg : msgs) {
                    String orderId = new String(msg.getBody());

                    System.out.println("收到延迟消息: 订单ID=" + orderId);

                    // 检查订单状态
                    Order order = orderService.getOrder(orderId);

                    if (order.getStatus() == OrderStatus.UNPAID) {
                        // 订单未支付,自动取消
                        orderService.cancel(orderId);
                        System.out.println("订单已自动取消: " + orderId);
                    } else {
                        System.out.println("订单已支付,无需取消: " + orderId);
                    }
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
        System.out.println("延迟消息消费者已启动");
    }

    public static void main(String[] args) throws Exception {
        // 启动消费者
        consumeDelayMessage();

        Thread.sleep(1000);

        // 发送延迟消息
        sendDelayMessage();

        // 等待10分钟
        Thread.sleep(600000);
    }
}

延迟级别映射

// RocketMQ预定义的18个延迟级别
private String messageDelayLevel =
    "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

// 使用方式
message.setDelayTimeLevel(1);   // 延迟1秒
message.setDelayTimeLevel(5);   // 延迟1分钟
message.setDelayTimeLevel(14);  // 延迟10分钟
message.setDelayTimeLevel(16);  // 延迟30分钟
message.setDelayTimeLevel(17);  // 延迟1小时
message.setDelayTimeLevel(18);  // 延迟2小时

五、消息过滤:Tag与SQL92

5.1 Tag过滤

// 生产者:发送带Tag的消息
Message message = new Message(
    "order_topic",
    "VIP_USER",  // Tag:VIP用户
    "订单内容".getBytes()
);
producer.send(message);

// 消费者:只消费VIP_USER的消息
consumer.subscribe("order_topic", "VIP_USER");

5.2 SQL92过滤(更灵活)

// 生产者:发送带属性的消息
Message message = new Message("order_topic", "订单内容".getBytes());
message.putUserProperty("amount", "100");
message.putUserProperty("city", "Beijing");
producer.send(message);

// 消费者:使用SQL92过滤
consumer.subscribe(
    "order_topic",
    MessageSelector.bySql("amount > 50 AND city = 'Beijing'")
);

六、总结

RocketMQ的核心特性:

  1. 事务消息:分布式事务的最佳实践
  2. 顺序消息:保证消息的严格顺序
  3. 延迟消息:定时任务的优雅实现
  4. 消息过滤:Tag和SQL92灵活过滤

下一篇将深入探讨消息可靠性与分布式事务的完整解决方案。


作者:如月说客 创建时间:2025-11-03 系列文章:消息中间件第一性原理(5/6)