RocketMQ架构02:NameServer深度剖析 - 轻量级注册中心的设计哲学

引言:简单而不简陋 NameServer 是 RocketMQ 中最"简单"的组件,但简单不代表简陋。它用最少的代码实现了最核心的功能: 无状态、无持久化 不依赖任何第三方组件 完全内存操作 对等部署、无主从之分 今天,我们深入理解这个"简单而强大"的组件。 一、NameServer 核心数据结构 1.1 路由信息管理 public class RouteInfoManager { // 1. Topic → QueueData 映射 // 记录每个 Topic 在哪些 Broker 上有哪些 Queue private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; // QueueData 结构 class QueueData { private String brokerName; // Broker 名称 private int readQueueNums; // 读队列数量 private int writeQueueNums; // 写队列数量 private int perm; // 权限(2=写 4=读 6=读写) private int topicSynFlag; // 同步标识 } // 2. Broker 名称 → BrokerData 映射 // 记录 Broker 的集群信息和地址 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // BrokerData 结构 class BrokerData { private String cluster; // 所属集群 private String brokerName; // Broker 名称 private HashMap<Long/* brokerId */, String/* address */> brokerAddrs; // brokerId: 0=Master, >0=Slave } // 3. Broker 地址 → BrokerLiveInfo 映射 // 记录 Broker 的存活信息 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // BrokerLiveInfo 结构 class BrokerLiveInfo { private long lastUpdateTimestamp; // 最后更新时间 private DataVersion dataVersion; // 数据版本 private Channel channel; // Netty Channel private String haServerAddr; // HA 服务地址 } // 4. 集群名称 → Broker 名称集合 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; // 5. Broker 地址 → 过滤服务器列表 private final HashMap<String/* brokerAddr */, List<String/* filter server */>> filterServerTable; // 读写锁保护 private final ReadWriteLock lock = new ReentrantReadWriteLock(); } 1.2 数据结构关系图 数据结构关系: topicQueueTable ┌──────────────────────────────────────┐ │ Topic1 → [QueueData1, QueueData2] │ │ QueueData1: brokerName=broker-a │ │ QueueData2: brokerName=broker-b │ └──────────────────────────────────────┘ │ ▼ brokerAddrTable ┌──────────────────────────────────────┐ │ broker-a → BrokerData │ │ cluster: DefaultCluster │ │ brokerAddrs: │ │ 0 → 192.168.1.100:10911 (Master)│ │ 1 → 192.168.1.101:10911 (Slave) │ └──────────────────────────────────────┘ │ ▼ brokerLiveTable ┌──────────────────────────────────────┐ │ 192.168.1.100:10911 → BrokerLiveInfo│ │ lastUpdateTimestamp: 1699999999000 │ │ channel: [Netty Channel] │ └──────────────────────────────────────┘ 二、Broker 注册流程 2.1 注册请求处理 public class BrokerRegistration { public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { // 获取写锁 this.lock.writeLock().lockInterruptibly(); // 1. 更新集群信息 Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); // 2. 更新 Broker 地址表 boolean registerFirst = false; BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<>()); this.brokerAddrTable.put(brokerName, brokerData); } // 更新 Broker 地址 Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); // 3. 更新 Topic 配置 if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { // 只有 Master 才更新 Topic 配置 if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } // 4. 更新 Broker 存活信息 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put( brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr ) ); if (null == prevBrokerLiveInfo) { log.info("new broker registered: {}", brokerAddr); } // 5. 更新过滤服务器列表 if (filterServerList != null && !filterServerList.isEmpty()) { this.filterServerTable.put(brokerAddr, filterServerList); } // 6. 如果是 Slave,返回 Master 信息 if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddr); if (masterLiveInfo != null) { result.setHaServerAddr(masterLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } catch (Exception e) { log.error("registerBroker Exception", e); } finally { this.lock.writeLock().unlock(); } return result; } // 创建或更新 Queue 数据 private void createAndUpdateQueueData(String brokerName, TopicConfig topicConfig) { QueueData queueData = new QueueData(); queueData.setBrokerName(brokerName); queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); queueData.setReadQueueNums(topicConfig.getReadQueueNums()); queueData.setPerm(topicConfig.getPerm()); queueData.setTopicSynFlag(topicConfig.getTopicSysFlag()); List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName()); if (null == queueDataList) { queueDataList = new LinkedList<>(); this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList); log.info("new topic registered: {}", topicConfig.getTopicName()); } else { // 移除旧的数据 queueDataList.removeIf(qd -> qd.getBrokerName().equals(brokerName)); } queueDataList.add(queueData); } } 2.2 Broker 心跳机制 public class BrokerHeartbeat { // Broker 端:定时发送心跳 class BrokerSide { // 每 30 秒向所有 NameServer 注册一次 private final ScheduledExecutorService scheduledExecutorService; public void start() { this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { // 向所有 NameServer 注册 this.registerBrokerAll(true, false, true); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } }, 1000, 30000, TimeUnit.MILLISECONDS); } private void registerBrokerAll( final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { // 准备注册信息 TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); // 向每个 NameServer 注册 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { for (String namesrvAddr : nameServerAddressList) { try { RegisterBrokerResult result = this.registerBroker( namesrvAddr, oneway, timeoutMills, topicConfigWrapper ); if (result != null) { // 更新 Master 地址(Slave 使用) if (this.updateMasterHAServerAddrPeriodically && result.getHaServerAddr() != null) { this.messageStore.updateHaMasterAddress(result.getHaServerAddr()); } } } catch (Exception e) { log.warn("registerBroker to {} failed", namesrvAddr, e); } } } } } // NameServer 端:检测 Broker 超时 class NameServerSide { // 每 10 秒扫描一次 public void scanNotActiveBroker() { long timeout = 120000; // 2 分钟超时 Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> entry = it.next(); long last = entry.getValue().getLastUpdateTimestamp(); if ((last + timeout) < System.currentTimeMillis()) { // Broker 超时 RemotingUtil.closeChannel(entry.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", entry.getKey(), timeout); // 清理路由信息 this.onChannelDestroy(entry.getKey(), entry.getValue().getChannel()); } } } // 清理路由信息 public void onChannelDestroy(String remoteAddr, Channel channel) { String brokerAddrFound = null; if (channel != null) { try { this.lock.readLock().lockInterruptibly(); // 查找 Broker 地址 for (Entry<String, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) { if (entry.getValue().getChannel() == channel) { brokerAddrFound = entry.getKey(); break; } } } finally { this.lock.readLock().unlock(); } } if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } if (brokerAddrFound != null) { // 移除 Broker this.removeBroker(brokerAddrFound); } } } } 三、路由查询 3.1 获取 Topic 路由信息 public class RouteQuery { public TopicRouteData pickupTopicRouteData(final String topic) { TopicRouteData topicRouteData = new TopicRouteData(); boolean foundQueueData = false; boolean foundBrokerData = false; try { // 获取读锁 this.lock.readLock().lockInterruptibly(); // 1. 查找 Topic 的 Queue 信息 List<QueueData> queueDataList = this.topicQueueTable.get(topic); if (queueDataList != null) { topicRouteData.setQueueDatas(queueDataList); foundQueueData = true; // 2. 查找对应的 Broker 信息 Set<String> brokerNameSet = new HashSet<>(); for (QueueData qd : queueDataList) { brokerNameSet.add(qd.getBrokerName()); } for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { BrokerData brokerDataClone = new BrokerData( brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone() ); topicRouteData.getBrokerDatas().add(brokerDataClone); foundBrokerData = true; // 3. 查找过滤服务器 for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { List<String> filterServerList = this.filterServerTable.get(brokerAddr); if (filterServerList != null) { topicRouteData.getFilterServerTable().put(brokerAddr, filterServerList); } } } } } } catch (Exception e) { log.error("pickupTopicRouteData Exception", e); } finally { this.lock.readLock().unlock(); } if (foundQueueData && foundBrokerData) { return topicRouteData; } return null; } // TopicRouteData 结构 class TopicRouteData { private String orderTopicConf; // 顺序消息配置 private List<QueueData> queueDatas; // Queue 信息列表 private List<BrokerData> brokerDatas; // Broker 信息列表 private HashMap<String, List<String>> filterServerTable; // 过滤服务器 } } 3.2 Client 端路由更新 public class ClientRouteUpdate { // Producer/Consumer 定时更新路由信息 class RouteUpdateTask { // 每 30 秒更新一次 private ScheduledExecutorService scheduledExecutorService; public void start() { this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } }, 10, 30000, TimeUnit.MILLISECONDS); } private void updateTopicRouteInfoFromNameServer() { // 获取所有订阅的 Topic Set<String> topicSet = new HashSet<>(); // Producer 的 Topic topicSet.addAll(this.producerTable.keySet()); // Consumer 的 Topic for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { topicSet.addAll(entry.getValue().subscriptions()); } // 更新每个 Topic 的路由信息 for (String topic : topicSet) { this.updateTopicRouteInfoFromNameServer(topic); } } public boolean updateTopicRouteInfoFromNameServer(final String topic) { try { // 从 NameServer 获取路由信息 TopicRouteData topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 3000); if (topicRouteData != null) { // 对比版本,判断是否需要更新 TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataChanged(old, topicRouteData); if (changed) { // 更新本地路由表 this.topicRouteTable.put(topic, topicRouteData); // 更新 Producer 的路由信息 for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) { entry.getValue().updateTopicPublishInfo(topic, topicRouteData); } // 更新 Consumer 的路由信息 for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { entry.getValue().updateTopicSubscribeInfo(topic, topicRouteData); } log.info("topicRouteTable.put: Topic={}, RouteData={}", topic, topicRouteData); return true; } } } catch (Exception e) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } return false; } // 判断路由信息是否变化 private boolean topicRouteDataChanged(TopicRouteData olddata, TopicRouteData nowdata) { if (olddata == null || nowdata == null) { return true; } TopicRouteData old = olddata.cloneTopicRouteData(); TopicRouteData now = nowdata.cloneTopicRouteData(); Collections.sort(old.getQueueDatas()); Collections.sort(old.getBrokerDatas()); Collections.sort(now.getQueueDatas()); Collections.sort(now.getBrokerDatas()); return !old.equals(now); } } } 四、NameServer 高可用设计 4.1 对等部署架构 NameServer 集群架构: ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ │ (独立运行) │ │ (独立运行) │ │ (独立运行) │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │ │ └──────────────────┼──────────────────┘ │ ┌───────────▼──────────┐ │ │ │ Broker 集群 │ │ (向所有NS注册) │ │ │ └──────────────────────┘ │ ┌───────────▼──────────┐ │ │ │ Producer/Consumer │ │ (随机选择NS查询) │ │ │ └──────────────────────┘ 特点: 1. NameServer 之间无任何通信 2. 每个 NameServer 都是完整功能的独立节点 3. Broker 向所有 NameServer 注册 4. Client 随机选择 NameServer 查询 4.2 数据一致性保证 public class ConsistencyGuarantee { // 最终一致性 class EventualConsistency { /* NameServer 不保证强一致性,但保证最终一致性 场景1:Broker 注册 ┌─────────────────────────────────────────┐ │ T0: Broker 向 NS1 注册 │ │ T1: Broker 向 NS2 注册 │ │ T2: Broker 向 NS3 注册 │ │ │ │ 在 T0-T2 期间,三个 NS 数据不一致 │ │ T2 后,数据最终一致 │ └─────────────────────────────────────────┘ 场景2:Broker 宕机 ┌─────────────────────────────────────────┐ │ T0: Broker 宕机 │ │ T1: NS1 检测到超时(2分钟) │ │ T2: NS2 检测到超时 │ │ T3: NS3 检测到超时 │ │ │ │ 在检测到之前,可能有短暂不一致 │ │ 但影响有限(Client 会自动切换 Broker) │ └─────────────────────────────────────────┘ */ } // 为什么可以接受不一致? class WhyAcceptableInconsistency { /* 1. Client 有容错机制 - 如果从 NS1 获取的 Broker 不可用 - Client 会自动尝试其他 Broker 2. Broker 定时注册 - 每 30 秒注册一次 - 很快会同步到所有 NS 3. 影响范围小 - 只影响短时间的查询 - 不影响已建立的连接 4. 简化设计 - 无需复杂的一致性协议 - 性能更高 */ } } 4.3 故障处理 public class FailureHandling { // 单个 NameServer 故障 class SingleNameServerFailure { /* 场景:NS1 宕机 ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ │ (宕机) │ │ (正常) │ │ (正常) │ └──────────────┘ └──────┬───────┘ └──────┬───────┘ │ │ ┌───────────────┴──────────────────┘ │ ┌───────▼──────────┐ │ Broker 集群 │ │ - 向NS2/NS3注册 │ │ - NS1注册失败 │ └──────────────────┘ 影响: 1. Broker 向 NS1 注册失败(不影响NS2/NS3) 2. Client 如果查询 NS1 会失败,自动重试其他 NS 3. 总体可用性不受影响 */ } // 全部 NameServer 故障 class AllNameServerFailure { /* 场景:所有 NS 宕机 影响: 1. Broker 无法注册新的 Topic 2. Client 无法获取新的路由信息 3. 但已有连接仍可正常工作! 为什么还能工作? - Producer/Consumer 本地缓存了路由信息 - Broker 地址不会频繁变化 - 短时间内(几小时)仍可正常使用 恢复: - NameServer 恢复后 - Broker 重新注册 - Client 更新路由 - 自动恢复正常 */ } // Client 容错机制 class ClientFaultTolerance { public TopicRouteData getTopicRouteData(String topic) { // 尝试从不同的 NameServer 获取 List<String> nameServerList = this.remotingClient.getNameServerAddressList(); for (String nameServer : nameServerList) { try { TopicRouteData result = this.getTopicRouteDataFromNameServer(topic, nameServer, 3000); if (result != null) { return result; } } catch (Exception e) { log.warn("getTopicRouteData from {} failed", nameServer, e); // 继续尝试下一个 NameServer } } throw new MQClientException("No available NameServer"); } } } 五、性能优化 5.1 内存优化 public class MemoryOptimization { // 数据结构优化 class DataStructure { // 使用 HashMap 而非 ConcurrentHashMap // 通过读写锁控制并发,减少锁竞争 private final HashMap<String, List<QueueData>> topicQueueTable; private final ReadWriteLock lock = new ReentrantReadWriteLock(); // 读操作 public List<QueueData> getQueueData(String topic) { try { this.lock.readLock().lockInterruptibly(); return this.topicQueueTable.get(topic); } finally { this.lock.readLock().unlock(); } } // 写操作 public void putQueueData(String topic, List<QueueData> queueDataList) { try { this.lock.writeLock().lockInterruptibly(); this.topicQueueTable.put(topic, queueDataList); } finally { this.lock.writeLock().unlock(); } } } // 避免不必要的对象创建 class ObjectReuse { // 复用 BrokerData 对象 public BrokerData cloneBrokerData(BrokerData brokerData) { // 只克隆必要的字段 return new BrokerData( brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone() ); } } } 5.2 网络优化 public class NetworkOptimization { // 批量处理 class BatchProcessing { // Broker 一次注册所有 Topic // 而不是每个 Topic 单独注册 public void registerBroker(TopicConfigSerializeWrapper topicConfigWrapper) { // 包含所有 Topic 的配置 ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigWrapper.getTopicConfigTable(); // 一次性更新所有 Topic for (Map.Entry<String, TopicConfig> entry : topicConfigTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } // 连接复用 class ConnectionReuse { // Broker 和 NameServer 之间保持长连接 // 避免频繁建立连接 private Channel channel; // Netty Channel public void keepAlive() { // 定时发送心跳保持连接 this.scheduledExecutorService.scheduleAtFixedRate(() -> { this.registerBrokerAll(true, false, true); }, 1000, 30000, TimeUnit.MILLISECONDS); } } } 六、与 ZooKeeper 对比 6.1 设计理念对比 对比维度: ┌──────────────┬──────────────────┬──────────────────┐ │ 维度 │ NameServer │ ZooKeeper │ ├──────────────┼──────────────────┼──────────────────┤ │ CAP 选择 │ AP(可用性优先) │ CP(一致性优先) │ │ 数据持久化 │ 无(纯内存) │ 有(磁盘) │ │ 节点通信 │ 无 │ 有(Paxos/Raft) │ │ 部署复杂度 │ 低 │ 高 │ │ 运维复杂度 │ 低 │ 高 │ │ 性能 │ 高(内存操作) │ 中(磁盘IO) │ │ 一致性保证 │ 最终一致 │ 强一致 │ │ 故障恢复 │ 快 │ 慢(需要选举) │ └──────────────┴──────────────────┴──────────────────┘ 6.2 为什么 RocketMQ 选择 NameServer? public class WhyNameServer { class Reasons { /* 1. 需求不同 - RocketMQ 不需要强一致性 - 路由信息短暂不一致可以接受 - Client 有容错机制 2. 简化部署 - 无需额外依赖 - 降低运维成本 - 减少故障点 3. 性能优势 - 纯内存操作,性能更高 - 无磁盘 IO - 无网络同步开销 4. 可用性优势 - 无需选举,故障恢复快 - 任意节点可服务 - 整体可用性更高 5. 成本考虑 - 更少的机器资源 - 更低的学习成本 - 更少的运维工作 */ } } 七、最佳实践 7.1 部署建议 public class DeploymentBestPractices { // 1. 节点数量 class NodeCount { /* 推荐配置: - 生产环境:3 个节点 - 测试环境:1 个节点即可 原因: - 3 个节点足够高可用 - 过多节点增加 Broker 负担(需向每个节点注册) - 节省资源 */ } // 2. 硬件配置 class HardwareRequirement { /* 推荐配置: - CPU: 2 核即可 - 内存: 4GB 即可 - 磁盘: 无特殊要求(不持久化) - 网络: 千兆网卡 NameServer 非常轻量,硬件需求很低 */ } // 3. 网络规划 class NetworkPlanning { /* 建议: - 与 Broker 在同一机房 - 使用内网通信 - 避免跨公网访问 - 确保网络稳定性 */ } // 4. 启动参数 class StartupParameters { String startCommand = "nohup sh mqnamesrv " + "-Xms2g -Xmx2g -Xmn1g " + "-XX:+UseG1GC " + "-XX:G1HeapRegionSize=16m " + "-XX:+PrintGCDetails " + "-XX:+PrintGCDateStamps " + "-Xloggc:/dev/shm/mq_gc_%p.log &"; } } 7.2 监控建议 public class MonitoringBestPractices { // 1. 关键指标 class KeyMetrics { long registeredBrokerCount; // 注册的 Broker 数量 long registeredTopicCount; // 注册的 Topic 数量 long activeConnectionCount; // 活跃连接数 double routeQueryQPS; // 路由查询 QPS double brokerRegisterQPS; // Broker 注册 QPS } // 2. 告警规则 class AlertRules { /* 建议告警: 1. Broker 数量突然减少(可能宕机) 2. 路由查询失败率 > 1% 3. 内存使用率 > 80% 4. JVM GC 频繁 */ } } 八、总结 NameServer 的设计充分体现了"大道至简"的哲学: ...

2025-11-13 · maneng

RocketMQ架构01:整体架构设计 - NameServer、Broker、Producer、Consumer

引言:一个精妙的分布式系统 如果把 RocketMQ 比作一座现代化城市,那么: NameServer 是城市规划局,管理所有地图信息 Broker 是物流仓库,存储和转运货物 Producer 是发货方,把货物送到仓库 Consumer 是收货方,从仓库取货 今天,我们从整体视角理解这座"消息城市"的运作机制。 一、RocketMQ 整体架构 1.1 核心组件 RocketMQ 架构全景图: ┌────────────────────────────────────────────┐ │ NameServer Cluster │ │ (路由注册中心、服务发现、轻量级协调器) │ └────────┬───────────────────┬───────────────┘ │ │ 注册/心跳 │ │ 获取路由 │ │ ┌────────────▼──────┐ ┌────▼────────────────┐ │ │ │ │ │ Broker Cluster │ │ Producer/Consumer │ │ │ │ │ │ ┌───────────────┐ │ │ 应用程序 │ │ │ Master Broker │ │◄────┤ │ │ │ - CommitLog │ │ 读写 │ │ │ │ - ConsumeQ │ │ 消息 │ │ │ │ - IndexFile │ │ │ │ │ └───────┬───────┘ │ └─────────────────────┘ │ │ 主从同步 │ │ ┌───────▼───────┐ │ │ │ Slave Broker │ │ │ │ - CommitLog │ │ │ │ - ConsumeQ │ │ │ │ - IndexFile │ │ │ └───────────────┘ │ └───────────────────┘ 1.2 组件职责矩阵 组件职责对比: ┌──────────────┬────────────────────────────────────────┐ │ 组件 │ 核心职责 │ ├──────────────┼────────────────────────────────────────┤ │ NameServer │ - 路由信息管理(Topic → Broker 映射) │ │ │ - Broker 注册与心跳检测 │ │ │ - 提供路由信息查询 │ │ │ - 无状态、对等集群 │ ├──────────────┼────────────────────────────────────────┤ │ Broker │ - 消息存储(CommitLog) │ │ │ - 消息索引(ConsumeQueue、IndexFile) │ │ │ - 消息查询与消费 │ │ │ - 高可用(主从复制、Dledger) │ ├──────────────┼────────────────────────────────────────┤ │ Producer │ - 消息生产 │ │ │ - 消息发送(同步、异步、单向) │ │ │ - 负载均衡(选择Broker和Queue) │ │ │ - 故障规避 │ ├──────────────┼────────────────────────────────────────┤ │ Consumer │ - 消息消费 │ │ │ - 消息拉取(长轮询) │ │ │ - 负载均衡(Rebalance) │ │ │ - 消费进度管理 │ └──────────────┴────────────────────────────────────────┘ 二、NameServer 详解 2.1 为什么需要 NameServer public class WhyNameServer { // 问题1:Producer/Consumer 如何知道 Broker 在哪里? class ServiceDiscovery { // 没有 NameServer: // - 硬编码 Broker 地址(不灵活) // - 无法动态感知 Broker 变化 // 有了 NameServer: // - 动态获取 Broker 列表 // - 自动感知 Broker 上下线 } // 问题2:Topic 分布在哪些 Broker 上? class TopicRoute { // NameServer 维护 Topic 路由信息 class TopicRouteData { List<QueueData> queueDatas; // Queue 分布 List<BrokerData> brokerDatas; // Broker 信息 } } // 问题3:为什么不用 ZooKeeper? class WhyNotZooKeeper { /* ZooKeeper 的问题: 1. CP 系统(强一致性)- RocketMQ 只需 AP(可用性优先) 2. 重量级依赖 - NameServer 更轻量 3. 运维复杂 - NameServer 无状态更简单 4. 性能开销 - NameServer 内存操作更快 NameServer 的优势: 1. 完全无状态 2. 对等集群(任意节点提供服务) 3. 内存操作(无磁盘 IO) 4. 简单高效 */ } } 2.2 NameServer 核心功能 public class NameServerCore { // 1. 路由信息管理 class RouteInfoManager { // Topic → Broker 映射 private HashMap<String/* topic */, List<QueueData>> topicQueueTable; // Broker 名称 → Broker 数据 private HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // Broker 地址 → 集群信息 private HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // 集群名称 → Broker 名称列表 private HashMap<String/* clusterName */, Set<String>> clusterAddrTable; // 过滤服务器 private HashMap<String/* brokerAddr */, List<String>> filterServerTable; } // 2. Broker 注册 public RegisterBrokerResult registerBroker( String clusterName, String brokerAddr, String brokerName, long brokerId, String haServerAddr, TopicConfigSerializeWrapper topicConfigWrapper) { // 创建或更新 Broker 数据 BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { brokerData = new BrokerData(clusterName, brokerName, new HashMap<>()); this.brokerAddrTable.put(brokerName, brokerData); } brokerData.getBrokerAddrs().put(brokerId, brokerAddr); // 更新 Topic 路由信息 if (null != topicConfigWrapper) { for (Map.Entry<String, TopicConfig> entry : topicConfigWrapper.getTopicConfigTable().entrySet()) { createAndUpdateQueueData(brokerName, entry.getValue()); } } // 更新 Broker 存活信息 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put( brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr ) ); return new RegisterBrokerResult(...); } // 3. 路由查询 public TopicRouteData getTopicRouteData(String topic) { TopicRouteData topicRouteData = new TopicRouteData(); // 查找 Topic 的 Queue 信息 List<QueueData> queueDataList = this.topicQueueTable.get(topic); // 查找对应的 Broker 信息 Set<String> brokerNameSet = new HashSet<>(); for (QueueData qd : queueDataList) { brokerNameSet.add(qd.getBrokerName()); } // 组装路由数据 for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (brokerData != null) { topicRouteData.getBrokerDatas().add(brokerData.clone()); } } topicRouteData.setQueueDatas(queueDataList); return topicRouteData; } // 4. Broker 心跳检测 public void scanNotActiveBroker() { long timeoutMillis = 120000; // 2 分钟超时 for (Map.Entry<String, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) { long last = entry.getValue().getLastUpdateTimestamp(); if ((last + timeoutMillis) < System.currentTimeMillis()) { // Broker 超时,移除 RemotingUtil.closeChannel(entry.getValue().getChannel()); this.brokerLiveTable.remove(entry.getKey()); // 清理路由信息 this.onChannelDestroy(entry.getKey(), entry.getValue().getChannel()); log.warn("Broker timeout: {}", entry.getKey()); } } } } 2.3 NameServer 集群 public class NameServerCluster { // NameServer 特点 class Characteristics { /* 1. 完全无状态 - 不持久化任何数据 - 所有数据在内存中 - 重启后数据丢失(从 Broker 重新获取) 2. 对等部署 - 所有节点功能完全一致 - 无主从之分 - 无需选举 3. 不互相通信 - NameServer 之间无任何通信 - 数据可能短暂不一致 - 最终一致即可 4. Broker 全量注册 - Broker 向所有 NameServer 注册 - 保证数据冗余 */ } // 部署架构 class DeploymentArchitecture { /* NameServer 集群(推荐3个节点): ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ └────────────────┼────────────────┘ │ ┌───────▼────────┐ │ │ │ Broker 集群 │ │ (全量注册) │ │ │ └────────────────┘ */ } } 三、Broker 详解 3.1 Broker 核心架构 Broker 内部架构: ┌─────────────────────────────────────────────────────┐ │ Broker │ │ ┌────────────────────────────────────────────────┐│ │ │ Remoting 通信层 ││ │ │ (接收 Producer/Consumer 请求) ││ │ └────────┬───────────────────────────────────────┘│ │ │ │ │ ┌────────▼────────────────────────────┐ │ │ │ BrokerController │ │ │ │ - 处理器注册 │ │ │ │ - 定时任务管理 │ │ │ │ - 消息处理 │ │ │ └────────┬────────────────────────────┘ │ │ │ │ │ ┌────────▼─────────────┬───────────────────┐ │ │ │ MessageStore │ HAService │ │ │ │ ┌────────────────┐ │ (主从复制) │ │ │ │ │ CommitLog │ │ │ │ │ │ │ (消息存储) │ │ │ │ │ │ └────────────────┘ │ │ │ │ │ ┌────────────────┐ │ │ │ │ │ │ ConsumeQueue │ │ │ │ │ │ │ (消费索引) │ │ │ │ │ │ └────────────────┘ │ │ │ │ │ ┌────────────────┐ │ │ │ │ │ │ IndexFile │ │ │ │ │ │ │ (消息索引) │ │ │ │ │ │ └────────────────┘ │ │ │ │ └──────────────────────┴───────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ 定时任务 │ │ │ │ - 持久化 ConsumeQueue │ │ │ │ - 持久化 CommitLog │ │ │ │ - 清理过期文件 │ │ │ │ - 统计信息 │ │ │ └─────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────┘ 3.2 Broker 核心职责 public class BrokerCore { // 1. 消息接收与存储 class MessageReceive { public PutMessageResult putMessage(MessageExtBrokerInner msg) { // 写入 CommitLog PutMessageResult result = this.commitLog.putMessage(msg); // 触发刷盘 handleDiskFlush(result, msg); // 触发主从同步 handleHA(result, msg); return result; } } // 2. 消息查询 class MessageQuery { // 按消息 ID 查询 public MessageExt lookMessageByOffset(long commitLogOffset) { return this.commitLog.lookMessageByOffset(commitLogOffset); } // 按 Key 查询 public QueryMessageResult queryMessage(String topic, String key, int maxNum) { return this.indexService.queryMessage(topic, key, maxNum); } // 按时间查询 public QueryMessageResult queryMessageByTime(String topic, long begin, long end) { return this.messageStore.queryMessage(topic, begin, end, maxNum); } } // 3. 消息消费 class MessageConsume { public GetMessageResult getMessage( String group, String topic, int queueId, long offset, int maxMsgNums, SubscriptionData subscriptionData) { // 从 ConsumeQueue 查找消息位置 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); // 根据 offset 获取消息 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); // 从 CommitLog 读取消息内容 List<MessageExt> messageList = new ArrayList<>(); for (int i = 0; i < maxMsgNums; i++) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); MessageExt msg = lookMessageByOffset(offsetPy, sizePy); // 消息过滤 if (match(subscriptionData, msg)) { messageList.add(msg); } } return new GetMessageResult(messageList); } } // 4. Topic 创建 class TopicManagement { public void createTopic(String topic, int queueNums) { TopicConfig topicConfig = new TopicConfig(topic); topicConfig.setReadQueueNums(queueNums); topicConfig.setWriteQueueNums(queueNums); topicConfig.setPerm(6); // 读写权限 this.topicConfigTable.put(topic, topicConfig); // 创建 ConsumeQueue for (int i = 0; i < queueNums; i++) { ConsumeQueue logic = new ConsumeQueue(topic, i); this.consumeQueueTable.put(buildKey(topic, i), logic); } } } // 5. 消费进度管理 class ConsumerOffsetManagement { private ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable; public void commitOffset(String group, String topic, int queueId, long offset) { String key = topic + "@" + group; ConcurrentHashMap<Integer, Long> map = offsetTable.get(key); if (map != null) { map.put(queueId, offset); } } public long queryOffset(String group, String topic, int queueId) { String key = topic + "@" + group; ConcurrentHashMap<Integer, Long> map = offsetTable.get(key); if (map != null) { Long offset = map.get(queueId); return offset != null ? offset : -1; } return -1; } } } 3.3 Broker 高可用 public class BrokerHA { // 主从架构 class MasterSlave { /* 主从模式: ┌──────────────┐ 同步 ┌──────────────┐ │ Master Broker│ ──────────> │ Slave Broker │ │ - 读写 │ │ - 只读 │ └──────────────┘ └──────────────┘ 同步方式: 1. 同步复制(SYNC_MASTER): - Master 等待 Slave 同步完成才返回 - 可靠性高,性能稍差 2. 异步复制(ASYNC_MASTER): - Master 立即返回,后台异步同步 - 性能高,可能丢失少量数据 */ } // Dledger 模式 class DledgerMode { /* 基于 Raft 的自动选主: ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │ (Leader) │◄─▶│(Follower)│◄─▶│(Follower)│ └──────────┘ └──────────┘ └──────────┘ 特点: 1. 自动故障转移 2. 无需人工干预 3. 保证数据一致性 4. 至少3个节点 */ } } 四、Producer 工作原理 4.1 Producer 架构 public class ProducerArchitecture { // Producer 核心组件 class ProducerComponents { private MQClientInstance clientInstance; // 客户端实例 private MQClientAPIImpl mqClientAPI; // 通信API private TopicPublishInfo publishInfo; // Topic路由信息 private SendMessageHook sendHook; // 发送钩子 } // 发送流程 public SendResult send(Message msg) { // 1. 查找 Topic 路由信息 TopicPublishInfo publishInfo = tryToFindTopicPublishInfo(msg.getTopic()); // 2. 选择消息队列 MessageQueue mq = selectOneMessageQueue(publishInfo); // 3. 发送消息 SendResult result = sendKernelImpl(msg, mq); // 4. 更新故障规避信息 updateFaultItem(mq.getBrokerName(), result); return result; } } 4.2 Producer 负载均衡 public class ProducerLoadBalance { // 队列选择策略 public MessageQueue selectQueue(TopicPublishInfo tpInfo) { // 1. 简单轮询 int index = sendWhichQueue.incrementAndGet() % queueSize; return tpInfo.getMessageQueueList().get(index); // 2. 故障规避 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { return mq; } // 3. 选择延迟最小的 return latencyFaultTolerance.pickOneAtLeast(); } } 五、Consumer 工作原理 5.1 Consumer 架构 public class ConsumerArchitecture { // Consumer 核心组件 class ConsumerComponents { private RebalanceService rebalanceService; // 负载均衡服务 private PullMessageService pullMessageService; // 拉取服务 private ConsumeMessageService consumeService; // 消费服务 private OffsetStore offsetStore; // 进度存储 } // 消费流程 public void start() { // 1. 启动负载均衡 rebalanceService.start(); // 2. 启动拉取服务 pullMessageService.start(); // 3. 启动消费服务 consumeService.start(); // 4. 注册到 Broker registerConsumer(); } } 5.2 Consumer 负载均衡(Rebalance) public class ConsumerRebalance { public void doRebalance() { // 1. 获取 Topic 的所有 Queue List<MessageQueue> mqAll = getTopicQueues(topic); // 2. 获取同组的所有 Consumer List<String> cidAll = getConsumerList(group); // 3. 排序(保证一致性视图) Collections.sort(mqAll); Collections.sort(cidAll); // 4. 执行分配 List<MessageQueue> allocated = allocateStrategy.allocate( group, currentCID, mqAll, cidAll ); // 5. 更新本地队列 updateProcessQueue(allocated); } } 六、完整消息流转 6.1 发送消息完整流程 消息发送完整流程: ┌───────────┐ │ Producer │ └─────┬─────┘ │1. 查询路由 ▼ ┌───────────┐ │NameServer │ 返回路由信息(Broker列表、Queue信息) └─────┬─────┘ │ ▼ ┌───────────┐ │ Producer │ 2. 选择 Broker 和 Queue └─────┬─────┘ │3. 发送消息 ▼ ┌───────────┐ │ Broker │ 4. 写入 CommitLog │ ┌───────┐ │ │ │CommitLog│ │ └───┬───┘ │ │ │5. 异步构建 ConsumeQueue │ ┌───▼──────┐ │ │ConsumeQ │ │ └──────────┘ │ │6. 返回结果 └─────┼─────┘ ▼ ┌───────────┐ │ Producer │ 7. 收到发送结果 └───────────┘ 6.2 消费消息完整流程 消息消费完整流程: ┌───────────┐ │ Consumer │ └─────┬─────┘ │1. 注册到 Broker ▼ ┌───────────┐ │ Broker │ 保存 Consumer 信息 └─────┬─────┘ │2. Rebalance 分配 Queue ▼ ┌───────────┐ │ Consumer │ 3. 发起长轮询拉取 └─────┬─────┘ │ ▼ ┌───────────┐ │ Broker │ 4. 从 ConsumeQueue 查找 │ ┌───────┐ │ 5. 从 CommitLog 读取消息 │ │ConsumeQ│ │ │ └───┬───┘ │ │ │ │ │ ┌───▼────┐│ │ │CommitLog││ │ └────────┘│ │ │6. 返回消息 └─────┼─────┘ ▼ ┌───────────┐ │ Consumer │ 7. 处理消息 └─────┬─────┘ 8. 更新消费进度 │ ▼ ┌───────────┐ │ Broker │ 9. 保存消费进度 └───────────┘ 七、设计亮点与思考 7.1 设计亮点 public class DesignHighlights { // 1. NameServer 的轻量化设计 class LightweightNameServer { /* - 无状态,内存操作 - 对等部署,无需选举 - 简单高效 - AP 而非 CP */ } // 2. CommitLog 的顺序写 class SequentialWrite { /* - 所有消息顺序写入同一个文件 - 充分利用磁盘顺序写性能 - 避免随机 IO */ } // 3. ConsumeQueue 的索引设计 class IndexDesign { /* - 轻量级索引(20字节) - 快速定位消息位置 - 支持消费进度管理 */ } // 4. 长轮询的伪 Push class LongPolling { /* - Consumer 主动拉取 - Broker Hold 请求 - 兼顾实时性和可控性 */ } } 7.2 架构权衡 public class ArchitectureTradeoffs { // 权衡1:性能 vs 可靠性 class PerformanceVsReliability { // 异步刷盘 → 高性能,可能丢消息 // 同步刷盘 → 零丢失,性能下降 // 异步复制 → 高性能,可能丢消息 // 同步复制 → 高可靠,性能下降 } // 权衡2:一致性 vs 可用性 class ConsistencyVsAvailability { // NameServer 选择 AP // - 可能短暂不一致 // - 但保证高可用 // Dledger 模式选择 CP // - 保证数据一致 // - 需要多数节点存活 } // 权衡3:复杂度 vs 功能 class ComplexityVsFeature { // RocketMQ 比 Kafka 复杂 // - 但功能更丰富(事务、延迟、顺序) // RocketMQ 比 RabbitMQ 简单 // - 但性能更高 } } 八、总结 通过本篇,我们从整体视角理解了 RocketMQ 的架构: ...

2025-11-13 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...