引言:一个精妙的分布式系统

如果把 RocketMQ 比作一座现代化城市,那么:

  • NameServer 是城市规划局,管理所有地图信息
  • Broker 是物流仓库,存储和转运货物
  • Producer 是发货方,把货物送到仓库
  • Consumer 是收货方,从仓库取货

今天,我们从整体视角理解这座"消息城市"的运作机制。

一、RocketMQ 整体架构

1.1 核心组件

RocketMQ 架构全景图:

        ┌────────────────────────────────────────────┐
        │           NameServer Cluster              │
        │  (路由注册中心、服务发现、轻量级协调器)     │
        └────────┬───────────────────┬───────────────┘
                 │                   │
        注册/心跳 │                   │ 获取路由
                 │                   │
    ┌────────────▼──────┐      ┌────▼────────────────┐
    │                   │      │                     │
    │  Broker Cluster   │      │  Producer/Consumer  │
    │                   │      │                     │
    │ ┌───────────────┐ │      │  应用程序            │
    │ │ Master Broker │ │◄────┤                     │
    │ │  - CommitLog  │ │ 读写 │                     │
    │ │  - ConsumeQ   │ │ 消息 │                     │
    │ │  - IndexFile  │ │      │                     │
    │ └───────┬───────┘ │      └─────────────────────┘
    │         │ 主从同步 │
    │ ┌───────▼───────┐ │
    │ │ Slave Broker  │ │
    │ │  - CommitLog  │ │
    │ │  - ConsumeQ   │ │
    │ │  - IndexFile  │ │
    │ └───────────────┘ │
    └───────────────────┘

1.2 组件职责矩阵

组件职责对比:
┌──────────────┬────────────────────────────────────────┐
│ 组件         │ 核心职责                               │
├──────────────┼────────────────────────────────────────┤
│ NameServer   │ - 路由信息管理(Topic → Broker 映射)  │
│              │ - Broker 注册与心跳检测                │
│              │ - 提供路由信息查询                      │
│              │ - 无状态、对等集群                      │
├──────────────┼────────────────────────────────────────┤
│ Broker       │ - 消息存储(CommitLog)                │
│              │ - 消息索引(ConsumeQueue、IndexFile)  │
│              │ - 消息查询与消费                        │
│              │ - 高可用(主从复制、Dledger)          │
├──────────────┼────────────────────────────────────────┤
│ Producer     │ - 消息生产                             │
│              │ - 消息发送(同步、异步、单向)          │
│              │ - 负载均衡(选择Broker和Queue)         │
│              │ - 故障规避                             │
├──────────────┼────────────────────────────────────────┤
│ Consumer     │ - 消息消费                             │
│              │ - 消息拉取(长轮询)                    │
│              │ - 负载均衡(Rebalance)                │
│              │ - 消费进度管理                          │
└──────────────┴────────────────────────────────────────┘

二、NameServer 详解

2.1 为什么需要 NameServer

public class WhyNameServer {

    // 问题1:Producer/Consumer 如何知道 Broker 在哪里?
    class ServiceDiscovery {
        // 没有 NameServer:
        // - 硬编码 Broker 地址(不灵活)
        // - 无法动态感知 Broker 变化

        // 有了 NameServer:
        // - 动态获取 Broker 列表
        // - 自动感知 Broker 上下线
    }

    // 问题2:Topic 分布在哪些 Broker 上?
    class TopicRoute {
        // NameServer 维护 Topic 路由信息
        class TopicRouteData {
            List<QueueData> queueDatas;      // Queue 分布
            List<BrokerData> brokerDatas;    // Broker 信息
        }
    }

    // 问题3:为什么不用 ZooKeeper?
    class WhyNotZooKeeper {
        /*
        ZooKeeper 的问题:
        1. CP 系统(强一致性)- RocketMQ 只需 AP(可用性优先)
        2. 重量级依赖 - NameServer 更轻量
        3. 运维复杂 - NameServer 无状态更简单
        4. 性能开销 - NameServer 内存操作更快

        NameServer 的优势:
        1. 完全无状态
        2. 对等集群(任意节点提供服务)
        3. 内存操作(无磁盘 IO)
        4. 简单高效
        */
    }
}

2.2 NameServer 核心功能

public class NameServerCore {

    // 1. 路由信息管理
    class RouteInfoManager {
        // Topic → Broker 映射
        private HashMap<String/* topic */, List<QueueData>> topicQueueTable;

        // Broker 名称 → Broker 数据
        private HashMap<String/* brokerName */, BrokerData> brokerAddrTable;

        // Broker 地址 → 集群信息
        private HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;

        // 集群名称 → Broker 名称列表
        private HashMap<String/* clusterName */, Set<String>> clusterAddrTable;

        // 过滤服务器
        private HashMap<String/* brokerAddr */, List<String>> filterServerTable;
    }

    // 2. Broker 注册
    public RegisterBrokerResult registerBroker(
            String clusterName,
            String brokerAddr,
            String brokerName,
            long brokerId,
            String haServerAddr,
            TopicConfigSerializeWrapper topicConfigWrapper) {

        // 创建或更新 Broker 数据
        BrokerData brokerData = this.brokerAddrTable.get(brokerName);
        if (null == brokerData) {
            brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
            this.brokerAddrTable.put(brokerName, brokerData);
        }
        brokerData.getBrokerAddrs().put(brokerId, brokerAddr);

        // 更新 Topic 路由信息
        if (null != topicConfigWrapper) {
            for (Map.Entry<String, TopicConfig> entry :
                 topicConfigWrapper.getTopicConfigTable().entrySet()) {
                createAndUpdateQueueData(brokerName, entry.getValue());
            }
        }

        // 更新 Broker 存活信息
        BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(
            brokerAddr,
            new BrokerLiveInfo(
                System.currentTimeMillis(),
                topicConfigWrapper.getDataVersion(),
                channel,
                haServerAddr
            )
        );

        return new RegisterBrokerResult(...);
    }

    // 3. 路由查询
    public TopicRouteData getTopicRouteData(String topic) {
        TopicRouteData topicRouteData = new TopicRouteData();

        // 查找 Topic 的 Queue 信息
        List<QueueData> queueDataList = this.topicQueueTable.get(topic);

        // 查找对应的 Broker 信息
        Set<String> brokerNameSet = new HashSet<>();
        for (QueueData qd : queueDataList) {
            brokerNameSet.add(qd.getBrokerName());
        }

        // 组装路由数据
        for (String brokerName : brokerNameSet) {
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (brokerData != null) {
                topicRouteData.getBrokerDatas().add(brokerData.clone());
            }
        }

        topicRouteData.setQueueDatas(queueDataList);
        return topicRouteData;
    }

    // 4. Broker 心跳检测
    public void scanNotActiveBroker() {
        long timeoutMillis = 120000;  // 2 分钟超时

        for (Map.Entry<String, BrokerLiveInfo> entry :
             this.brokerLiveTable.entrySet()) {

            long last = entry.getValue().getLastUpdateTimestamp();
            if ((last + timeoutMillis) < System.currentTimeMillis()) {
                // Broker 超时,移除
                RemotingUtil.closeChannel(entry.getValue().getChannel());
                this.brokerLiveTable.remove(entry.getKey());

                // 清理路由信息
                this.onChannelDestroy(entry.getKey(), entry.getValue().getChannel());

                log.warn("Broker timeout: {}", entry.getKey());
            }
        }
    }
}

2.3 NameServer 集群

public class NameServerCluster {

    // NameServer 特点
    class Characteristics {
        /*
        1. 完全无状态
           - 不持久化任何数据
           - 所有数据在内存中
           - 重启后数据丢失(从 Broker 重新获取)

        2. 对等部署
           - 所有节点功能完全一致
           - 无主从之分
           - 无需选举

        3. 不互相通信
           - NameServer 之间无任何通信
           - 数据可能短暂不一致
           - 最终一致即可

        4. Broker 全量注册
           - Broker 向所有 NameServer 注册
           - 保证数据冗余
        */
    }

    // 部署架构
    class DeploymentArchitecture {
        /*
        NameServer 集群(推荐3个节点):
        ┌─────────────┐  ┌─────────────┐  ┌─────────────┐
        │ NameServer1 │  │ NameServer2 │  │ NameServer3 │
        └──────┬──────┘  └──────┬──────┘  └──────┬──────┘
               │                │                │
               └────────────────┼────────────────┘
                        ┌───────▼────────┐
                        │                │
                        │  Broker 集群    │
                        │  (全量注册)   │
                        │                │
                        └────────────────┘
        */
    }
}

三、Broker 详解

3.1 Broker 核心架构

Broker 内部架构:
┌─────────────────────────────────────────────────────┐
│                    Broker                           │
│  ┌────────────────────────────────────────────────┐│
│  │         Remoting 通信层                        ││
│  │  (接收 Producer/Consumer 请求)                 ││
│  └────────┬───────────────────────────────────────┘│
│           │                                         │
│  ┌────────▼────────────────────────────┐           │
│  │     BrokerController               │           │
│  │  - 处理器注册                       │           │
│  │  - 定时任务管理                     │           │
│  │  - 消息处理                         │           │
│  └────────┬────────────────────────────┘           │
│           │                                         │
│  ┌────────▼─────────────┬───────────────────┐     │
│  │   MessageStore       │    HAService      │     │
│  │  ┌────────────────┐  │   (主从复制)       │     │
│  │  │  CommitLog     │  │                   │     │
│  │  │  (消息存储)     │  │                   │     │
│  │  └────────────────┘  │                   │     │
│  │  ┌────────────────┐  │                   │     │
│  │  │ ConsumeQueue   │  │                   │     │
│  │  │  (消费索引)     │  │                   │     │
│  │  └────────────────┘  │                   │     │
│  │  ┌────────────────┐  │                   │     │
│  │  │  IndexFile     │  │                   │     │
│  │  │  (消息索引)     │  │                   │     │
│  │  └────────────────┘  │                   │     │
│  └──────────────────────┴───────────────────┘     │
│                                                     │
│  ┌─────────────────────────────────────────────┐  │
│  │          定时任务                            │  │
│  │  - 持久化 ConsumeQueue                       │  │
│  │  - 持久化 CommitLog                          │  │
│  │  - 清理过期文件                              │  │
│  │  - 统计信息                                  │  │
│  └─────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────┘

3.2 Broker 核心职责

public class BrokerCore {

    // 1. 消息接收与存储
    class MessageReceive {
        public PutMessageResult putMessage(MessageExtBrokerInner msg) {
            // 写入 CommitLog
            PutMessageResult result = this.commitLog.putMessage(msg);

            // 触发刷盘
            handleDiskFlush(result, msg);

            // 触发主从同步
            handleHA(result, msg);

            return result;
        }
    }

    // 2. 消息查询
    class MessageQuery {
        // 按消息 ID 查询
        public MessageExt lookMessageByOffset(long commitLogOffset) {
            return this.commitLog.lookMessageByOffset(commitLogOffset);
        }

        // 按 Key 查询
        public QueryMessageResult queryMessage(String topic, String key, int maxNum) {
            return this.indexService.queryMessage(topic, key, maxNum);
        }

        // 按时间查询
        public QueryMessageResult queryMessageByTime(String topic, long begin, long end) {
            return this.messageStore.queryMessage(topic, begin, end, maxNum);
        }
    }

    // 3. 消息消费
    class MessageConsume {
        public GetMessageResult getMessage(
                String group,
                String topic,
                int queueId,
                long offset,
                int maxMsgNums,
                SubscriptionData subscriptionData) {

            // 从 ConsumeQueue 查找消息位置
            ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);

            // 根据 offset 获取消息
            SelectMappedBufferResult bufferConsumeQueue =
                consumeQueue.getIndexBuffer(offset);

            // 从 CommitLog 读取消息内容
            List<MessageExt> messageList = new ArrayList<>();
            for (int i = 0; i < maxMsgNums; i++) {
                long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
                int sizePy = bufferConsumeQueue.getByteBuffer().getInt();

                MessageExt msg = lookMessageByOffset(offsetPy, sizePy);

                // 消息过滤
                if (match(subscriptionData, msg)) {
                    messageList.add(msg);
                }
            }

            return new GetMessageResult(messageList);
        }
    }

    // 4. Topic 创建
    class TopicManagement {
        public void createTopic(String topic, int queueNums) {
            TopicConfig topicConfig = new TopicConfig(topic);
            topicConfig.setReadQueueNums(queueNums);
            topicConfig.setWriteQueueNums(queueNums);
            topicConfig.setPerm(6);  // 读写权限

            this.topicConfigTable.put(topic, topicConfig);

            // 创建 ConsumeQueue
            for (int i = 0; i < queueNums; i++) {
                ConsumeQueue logic = new ConsumeQueue(topic, i);
                this.consumeQueueTable.put(buildKey(topic, i), logic);
            }
        }
    }

    // 5. 消费进度管理
    class ConsumerOffsetManagement {
        private ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable;

        public void commitOffset(String group, String topic, int queueId, long offset) {
            String key = topic + "@" + group;
            ConcurrentHashMap<Integer, Long> map = offsetTable.get(key);
            if (map != null) {
                map.put(queueId, offset);
            }
        }

        public long queryOffset(String group, String topic, int queueId) {
            String key = topic + "@" + group;
            ConcurrentHashMap<Integer, Long> map = offsetTable.get(key);
            if (map != null) {
                Long offset = map.get(queueId);
                return offset != null ? offset : -1;
            }
            return -1;
        }
    }
}

3.3 Broker 高可用

public class BrokerHA {

    // 主从架构
    class MasterSlave {
        /*
        主从模式:
        ┌──────────────┐     同步     ┌──────────────┐
        │ Master Broker│ ──────────> │ Slave Broker │
        │  - 读写       │              │  - 只读       │
        └──────────────┘              └──────────────┘

        同步方式:
        1. 同步复制(SYNC_MASTER):
           - Master 等待 Slave 同步完成才返回
           - 可靠性高,性能稍差

        2. 异步复制(ASYNC_MASTER):
           - Master 立即返回,后台异步同步
           - 性能高,可能丢失少量数据
        */
    }

    // Dledger 模式
    class DledgerMode {
        /*
        基于 Raft 的自动选主:
        ┌──────────┐   ┌──────────┐   ┌──────────┐
        │ Broker 1 │   │ Broker 2 │   │ Broker 3 │
        │ (Leader) │◄─▶│(Follower)│◄─▶│(Follower)│
        └──────────┘   └──────────┘   └──────────┘

        特点:
        1. 自动故障转移
        2. 无需人工干预
        3. 保证数据一致性
        4. 至少3个节点
        */
    }
}

四、Producer 工作原理

4.1 Producer 架构

public class ProducerArchitecture {

    // Producer 核心组件
    class ProducerComponents {
        private MQClientInstance clientInstance;  // 客户端实例
        private MQClientAPIImpl mqClientAPI;      // 通信API
        private TopicPublishInfo publishInfo;     // Topic路由信息
        private SendMessageHook sendHook;         // 发送钩子
    }

    // 发送流程
    public SendResult send(Message msg) {
        // 1. 查找 Topic 路由信息
        TopicPublishInfo publishInfo = tryToFindTopicPublishInfo(msg.getTopic());

        // 2. 选择消息队列
        MessageQueue mq = selectOneMessageQueue(publishInfo);

        // 3. 发送消息
        SendResult result = sendKernelImpl(msg, mq);

        // 4. 更新故障规避信息
        updateFaultItem(mq.getBrokerName(), result);

        return result;
    }
}

4.2 Producer 负载均衡

public class ProducerLoadBalance {

    // 队列选择策略
    public MessageQueue selectQueue(TopicPublishInfo tpInfo) {
        // 1. 简单轮询
        int index = sendWhichQueue.incrementAndGet() % queueSize;
        return tpInfo.getMessageQueueList().get(index);

        // 2. 故障规避
        if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
            return mq;
        }

        // 3. 选择延迟最小的
        return latencyFaultTolerance.pickOneAtLeast();
    }
}

五、Consumer 工作原理

5.1 Consumer 架构

public class ConsumerArchitecture {

    // Consumer 核心组件
    class ConsumerComponents {
        private RebalanceService rebalanceService;     // 负载均衡服务
        private PullMessageService pullMessageService; // 拉取服务
        private ConsumeMessageService consumeService;  // 消费服务
        private OffsetStore offsetStore;               // 进度存储
    }

    // 消费流程
    public void start() {
        // 1. 启动负载均衡
        rebalanceService.start();

        // 2. 启动拉取服务
        pullMessageService.start();

        // 3. 启动消费服务
        consumeService.start();

        // 4. 注册到 Broker
        registerConsumer();
    }
}

5.2 Consumer 负载均衡(Rebalance)

public class ConsumerRebalance {

    public void doRebalance() {
        // 1. 获取 Topic 的所有 Queue
        List<MessageQueue> mqAll = getTopicQueues(topic);

        // 2. 获取同组的所有 Consumer
        List<String> cidAll = getConsumerList(group);

        // 3. 排序(保证一致性视图)
        Collections.sort(mqAll);
        Collections.sort(cidAll);

        // 4. 执行分配
        List<MessageQueue> allocated = allocateStrategy.allocate(
            group, currentCID, mqAll, cidAll
        );

        // 5. 更新本地队列
        updateProcessQueue(allocated);
    }
}

六、完整消息流转

6.1 发送消息完整流程

消息发送完整流程:
┌───────────┐
│ Producer  │
└─────┬─────┘
      │1. 查询路由
      ▼
┌───────────┐
│NameServer │  返回路由信息(Broker列表、Queue信息)
└─────┬─────┘
      │
      ▼
┌───────────┐
│ Producer  │  2. 选择 Broker 和 Queue
└─────┬─────┘
      │3. 发送消息
      ▼
┌───────────┐
│  Broker   │  4. 写入 CommitLog
│ ┌───────┐ │
│ │CommitLog│
│ └───┬───┘ │
│     │5. 异步构建 ConsumeQueue
│ ┌───▼──────┐
│ │ConsumeQ  │
│ └──────────┘
│     │6. 返回结果
└─────┼─────┘
      ▼
┌───────────┐
│ Producer  │  7. 收到发送结果
└───────────┘

6.2 消费消息完整流程

消息消费完整流程:
┌───────────┐
│ Consumer  │
└─────┬─────┘
      │1. 注册到 Broker
      ▼
┌───────────┐
│  Broker   │  保存 Consumer 信息
└─────┬─────┘
      │2. Rebalance 分配 Queue
      ▼
┌───────────┐
│ Consumer  │  3. 发起长轮询拉取
└─────┬─────┘
      │
      ▼
┌───────────┐
│  Broker   │  4. 从 ConsumeQueue 查找
│ ┌───────┐ │     5. 从 CommitLog 读取消息
│ │ConsumeQ│ │
│ └───┬───┘ │
│     │     │
│ ┌───▼────┐│
│ │CommitLog││
│ └────────┘│
│     │6. 返回消息
└─────┼─────┘
      ▼
┌───────────┐
│ Consumer  │  7. 处理消息
└─────┬─────┘  8. 更新消费进度
      │
      ▼
┌───────────┐
│  Broker   │  9. 保存消费进度
└───────────┘

七、设计亮点与思考

7.1 设计亮点

public class DesignHighlights {

    // 1. NameServer 的轻量化设计
    class LightweightNameServer {
        /*
        - 无状态,内存操作
        - 对等部署,无需选举
        - 简单高效
        - AP 而非 CP
        */
    }

    // 2. CommitLog 的顺序写
    class SequentialWrite {
        /*
        - 所有消息顺序写入同一个文件
        - 充分利用磁盘顺序写性能
        - 避免随机 IO
        */
    }

    // 3. ConsumeQueue 的索引设计
    class IndexDesign {
        /*
        - 轻量级索引(20字节)
        - 快速定位消息位置
        - 支持消费进度管理
        */
    }

    // 4. 长轮询的伪 Push
    class LongPolling {
        /*
        - Consumer 主动拉取
        - Broker Hold 请求
        - 兼顾实时性和可控性
        */
    }
}

7.2 架构权衡

public class ArchitectureTradeoffs {

    // 权衡1:性能 vs 可靠性
    class PerformanceVsReliability {
        // 异步刷盘 → 高性能,可能丢消息
        // 同步刷盘 → 零丢失,性能下降

        // 异步复制 → 高性能,可能丢消息
        // 同步复制 → 高可靠,性能下降
    }

    // 权衡2:一致性 vs 可用性
    class ConsistencyVsAvailability {
        // NameServer 选择 AP
        // - 可能短暂不一致
        // - 但保证高可用

        // Dledger 模式选择 CP
        // - 保证数据一致
        // - 需要多数节点存活
    }

    // 权衡3:复杂度 vs 功能
    class ComplexityVsFeature {
        // RocketMQ 比 Kafka 复杂
        // - 但功能更丰富(事务、延迟、顺序)

        // RocketMQ 比 RabbitMQ 简单
        // - 但性能更高
    }
}

八、总结

通过本篇,我们从整体视角理解了 RocketMQ 的架构:

  1. NameServer:轻量级路由中心,无状态高可用
  2. Broker:消息存储核心,顺序写高性能
  3. Producer:消息生产者,负载均衡与故障规避
  4. Consumer:消息消费者,长轮询与 Rebalance

RocketMQ 的架构设计充分体现了分布式系统的智慧,每个设计决策都有其深刻的考量。

下一篇预告

理解了整体架构,下一篇我们将深入 NameServer,探索它为什么能如此轻量高效,以及它的实现细节。


深入思考

  1. 为什么 RocketMQ 不像 Kafka 那样让 Broker 依赖 ZooKeeper?
  2. CommitLog 顺序写为什么比随机写快这么多?
  3. 如果 NameServer 全部宕机,系统还能工作吗?

思考这些问题,深化理解!