RocketMQ架构02:NameServer深度剖析 - 轻量级注册中心的设计哲学

引言:简单而不简陋 NameServer 是 RocketMQ 中最"简单"的组件,但简单不代表简陋。它用最少的代码实现了最核心的功能: 无状态、无持久化 不依赖任何第三方组件 完全内存操作 对等部署、无主从之分 今天,我们深入理解这个"简单而强大"的组件。 一、NameServer 核心数据结构 1.1 路由信息管理 public class RouteInfoManager { // 1. Topic → QueueData 映射 // 记录每个 Topic 在哪些 Broker 上有哪些 Queue private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; // QueueData 结构 class QueueData { private String brokerName; // Broker 名称 private int readQueueNums; // 读队列数量 private int writeQueueNums; // 写队列数量 private int perm; // 权限(2=写 4=读 6=读写) private int topicSynFlag; // 同步标识 } // 2. Broker 名称 → BrokerData 映射 // 记录 Broker 的集群信息和地址 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // BrokerData 结构 class BrokerData { private String cluster; // 所属集群 private String brokerName; // Broker 名称 private HashMap<Long/* brokerId */, String/* address */> brokerAddrs; // brokerId: 0=Master, >0=Slave } // 3. Broker 地址 → BrokerLiveInfo 映射 // 记录 Broker 的存活信息 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // BrokerLiveInfo 结构 class BrokerLiveInfo { private long lastUpdateTimestamp; // 最后更新时间 private DataVersion dataVersion; // 数据版本 private Channel channel; // Netty Channel private String haServerAddr; // HA 服务地址 } // 4. 集群名称 → Broker 名称集合 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; // 5. Broker 地址 → 过滤服务器列表 private final HashMap<String/* brokerAddr */, List<String/* filter server */>> filterServerTable; // 读写锁保护 private final ReadWriteLock lock = new ReentrantReadWriteLock(); } 1.2 数据结构关系图 数据结构关系: topicQueueTable ┌──────────────────────────────────────┐ │ Topic1 → [QueueData1, QueueData2] │ │ QueueData1: brokerName=broker-a │ │ QueueData2: brokerName=broker-b │ └──────────────────────────────────────┘ │ ▼ brokerAddrTable ┌──────────────────────────────────────┐ │ broker-a → BrokerData │ │ cluster: DefaultCluster │ │ brokerAddrs: │ │ 0 → 192.168.1.100:10911 (Master)│ │ 1 → 192.168.1.101:10911 (Slave) │ └──────────────────────────────────────┘ │ ▼ brokerLiveTable ┌──────────────────────────────────────┐ │ 192.168.1.100:10911 → BrokerLiveInfo│ │ lastUpdateTimestamp: 1699999999000 │ │ channel: [Netty Channel] │ └──────────────────────────────────────┘ 二、Broker 注册流程 2.1 注册请求处理 public class BrokerRegistration { public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { // 获取写锁 this.lock.writeLock().lockInterruptibly(); // 1. 更新集群信息 Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); // 2. 更新 Broker 地址表 boolean registerFirst = false; BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<>()); this.brokerAddrTable.put(brokerName, brokerData); } // 更新 Broker 地址 Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); // 3. 更新 Topic 配置 if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { // 只有 Master 才更新 Topic 配置 if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } // 4. 更新 Broker 存活信息 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put( brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr ) ); if (null == prevBrokerLiveInfo) { log.info("new broker registered: {}", brokerAddr); } // 5. 更新过滤服务器列表 if (filterServerList != null && !filterServerList.isEmpty()) { this.filterServerTable.put(brokerAddr, filterServerList); } // 6. 如果是 Slave,返回 Master 信息 if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddr); if (masterLiveInfo != null) { result.setHaServerAddr(masterLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } catch (Exception e) { log.error("registerBroker Exception", e); } finally { this.lock.writeLock().unlock(); } return result; } // 创建或更新 Queue 数据 private void createAndUpdateQueueData(String brokerName, TopicConfig topicConfig) { QueueData queueData = new QueueData(); queueData.setBrokerName(brokerName); queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); queueData.setReadQueueNums(topicConfig.getReadQueueNums()); queueData.setPerm(topicConfig.getPerm()); queueData.setTopicSynFlag(topicConfig.getTopicSysFlag()); List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName()); if (null == queueDataList) { queueDataList = new LinkedList<>(); this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList); log.info("new topic registered: {}", topicConfig.getTopicName()); } else { // 移除旧的数据 queueDataList.removeIf(qd -> qd.getBrokerName().equals(brokerName)); } queueDataList.add(queueData); } } 2.2 Broker 心跳机制 public class BrokerHeartbeat { // Broker 端:定时发送心跳 class BrokerSide { // 每 30 秒向所有 NameServer 注册一次 private final ScheduledExecutorService scheduledExecutorService; public void start() { this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { // 向所有 NameServer 注册 this.registerBrokerAll(true, false, true); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } }, 1000, 30000, TimeUnit.MILLISECONDS); } private void registerBrokerAll( final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { // 准备注册信息 TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); // 向每个 NameServer 注册 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { for (String namesrvAddr : nameServerAddressList) { try { RegisterBrokerResult result = this.registerBroker( namesrvAddr, oneway, timeoutMills, topicConfigWrapper ); if (result != null) { // 更新 Master 地址(Slave 使用) if (this.updateMasterHAServerAddrPeriodically && result.getHaServerAddr() != null) { this.messageStore.updateHaMasterAddress(result.getHaServerAddr()); } } } catch (Exception e) { log.warn("registerBroker to {} failed", namesrvAddr, e); } } } } } // NameServer 端:检测 Broker 超时 class NameServerSide { // 每 10 秒扫描一次 public void scanNotActiveBroker() { long timeout = 120000; // 2 分钟超时 Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> entry = it.next(); long last = entry.getValue().getLastUpdateTimestamp(); if ((last + timeout) < System.currentTimeMillis()) { // Broker 超时 RemotingUtil.closeChannel(entry.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", entry.getKey(), timeout); // 清理路由信息 this.onChannelDestroy(entry.getKey(), entry.getValue().getChannel()); } } } // 清理路由信息 public void onChannelDestroy(String remoteAddr, Channel channel) { String brokerAddrFound = null; if (channel != null) { try { this.lock.readLock().lockInterruptibly(); // 查找 Broker 地址 for (Entry<String, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) { if (entry.getValue().getChannel() == channel) { brokerAddrFound = entry.getKey(); break; } } } finally { this.lock.readLock().unlock(); } } if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } if (brokerAddrFound != null) { // 移除 Broker this.removeBroker(brokerAddrFound); } } } } 三、路由查询 3.1 获取 Topic 路由信息 public class RouteQuery { public TopicRouteData pickupTopicRouteData(final String topic) { TopicRouteData topicRouteData = new TopicRouteData(); boolean foundQueueData = false; boolean foundBrokerData = false; try { // 获取读锁 this.lock.readLock().lockInterruptibly(); // 1. 查找 Topic 的 Queue 信息 List<QueueData> queueDataList = this.topicQueueTable.get(topic); if (queueDataList != null) { topicRouteData.setQueueDatas(queueDataList); foundQueueData = true; // 2. 查找对应的 Broker 信息 Set<String> brokerNameSet = new HashSet<>(); for (QueueData qd : queueDataList) { brokerNameSet.add(qd.getBrokerName()); } for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { BrokerData brokerDataClone = new BrokerData( brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone() ); topicRouteData.getBrokerDatas().add(brokerDataClone); foundBrokerData = true; // 3. 查找过滤服务器 for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { List<String> filterServerList = this.filterServerTable.get(brokerAddr); if (filterServerList != null) { topicRouteData.getFilterServerTable().put(brokerAddr, filterServerList); } } } } } } catch (Exception e) { log.error("pickupTopicRouteData Exception", e); } finally { this.lock.readLock().unlock(); } if (foundQueueData && foundBrokerData) { return topicRouteData; } return null; } // TopicRouteData 结构 class TopicRouteData { private String orderTopicConf; // 顺序消息配置 private List<QueueData> queueDatas; // Queue 信息列表 private List<BrokerData> brokerDatas; // Broker 信息列表 private HashMap<String, List<String>> filterServerTable; // 过滤服务器 } } 3.2 Client 端路由更新 public class ClientRouteUpdate { // Producer/Consumer 定时更新路由信息 class RouteUpdateTask { // 每 30 秒更新一次 private ScheduledExecutorService scheduledExecutorService; public void start() { this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } }, 10, 30000, TimeUnit.MILLISECONDS); } private void updateTopicRouteInfoFromNameServer() { // 获取所有订阅的 Topic Set<String> topicSet = new HashSet<>(); // Producer 的 Topic topicSet.addAll(this.producerTable.keySet()); // Consumer 的 Topic for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { topicSet.addAll(entry.getValue().subscriptions()); } // 更新每个 Topic 的路由信息 for (String topic : topicSet) { this.updateTopicRouteInfoFromNameServer(topic); } } public boolean updateTopicRouteInfoFromNameServer(final String topic) { try { // 从 NameServer 获取路由信息 TopicRouteData topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 3000); if (topicRouteData != null) { // 对比版本,判断是否需要更新 TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataChanged(old, topicRouteData); if (changed) { // 更新本地路由表 this.topicRouteTable.put(topic, topicRouteData); // 更新 Producer 的路由信息 for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) { entry.getValue().updateTopicPublishInfo(topic, topicRouteData); } // 更新 Consumer 的路由信息 for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { entry.getValue().updateTopicSubscribeInfo(topic, topicRouteData); } log.info("topicRouteTable.put: Topic={}, RouteData={}", topic, topicRouteData); return true; } } } catch (Exception e) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } return false; } // 判断路由信息是否变化 private boolean topicRouteDataChanged(TopicRouteData olddata, TopicRouteData nowdata) { if (olddata == null || nowdata == null) { return true; } TopicRouteData old = olddata.cloneTopicRouteData(); TopicRouteData now = nowdata.cloneTopicRouteData(); Collections.sort(old.getQueueDatas()); Collections.sort(old.getBrokerDatas()); Collections.sort(now.getQueueDatas()); Collections.sort(now.getBrokerDatas()); return !old.equals(now); } } } 四、NameServer 高可用设计 4.1 对等部署架构 NameServer 集群架构: ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ │ (独立运行) │ │ (独立运行) │ │ (独立运行) │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │ │ └──────────────────┼──────────────────┘ │ ┌───────────▼──────────┐ │ │ │ Broker 集群 │ │ (向所有NS注册) │ │ │ └──────────────────────┘ │ ┌───────────▼──────────┐ │ │ │ Producer/Consumer │ │ (随机选择NS查询) │ │ │ └──────────────────────┘ 特点: 1. NameServer 之间无任何通信 2. 每个 NameServer 都是完整功能的独立节点 3. Broker 向所有 NameServer 注册 4. Client 随机选择 NameServer 查询 4.2 数据一致性保证 public class ConsistencyGuarantee { // 最终一致性 class EventualConsistency { /* NameServer 不保证强一致性,但保证最终一致性 场景1:Broker 注册 ┌─────────────────────────────────────────┐ │ T0: Broker 向 NS1 注册 │ │ T1: Broker 向 NS2 注册 │ │ T2: Broker 向 NS3 注册 │ │ │ │ 在 T0-T2 期间,三个 NS 数据不一致 │ │ T2 后,数据最终一致 │ └─────────────────────────────────────────┘ 场景2:Broker 宕机 ┌─────────────────────────────────────────┐ │ T0: Broker 宕机 │ │ T1: NS1 检测到超时(2分钟) │ │ T2: NS2 检测到超时 │ │ T3: NS3 检测到超时 │ │ │ │ 在检测到之前,可能有短暂不一致 │ │ 但影响有限(Client 会自动切换 Broker) │ └─────────────────────────────────────────┘ */ } // 为什么可以接受不一致? class WhyAcceptableInconsistency { /* 1. Client 有容错机制 - 如果从 NS1 获取的 Broker 不可用 - Client 会自动尝试其他 Broker 2. Broker 定时注册 - 每 30 秒注册一次 - 很快会同步到所有 NS 3. 影响范围小 - 只影响短时间的查询 - 不影响已建立的连接 4. 简化设计 - 无需复杂的一致性协议 - 性能更高 */ } } 4.3 故障处理 public class FailureHandling { // 单个 NameServer 故障 class SingleNameServerFailure { /* 场景:NS1 宕机 ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ │ (宕机) │ │ (正常) │ │ (正常) │ └──────────────┘ └──────┬───────┘ └──────┬───────┘ │ │ ┌───────────────┴──────────────────┘ │ ┌───────▼──────────┐ │ Broker 集群 │ │ - 向NS2/NS3注册 │ │ - NS1注册失败 │ └──────────────────┘ 影响: 1. Broker 向 NS1 注册失败(不影响NS2/NS3) 2. Client 如果查询 NS1 会失败,自动重试其他 NS 3. 总体可用性不受影响 */ } // 全部 NameServer 故障 class AllNameServerFailure { /* 场景:所有 NS 宕机 影响: 1. Broker 无法注册新的 Topic 2. Client 无法获取新的路由信息 3. 但已有连接仍可正常工作! 为什么还能工作? - Producer/Consumer 本地缓存了路由信息 - Broker 地址不会频繁变化 - 短时间内(几小时)仍可正常使用 恢复: - NameServer 恢复后 - Broker 重新注册 - Client 更新路由 - 自动恢复正常 */ } // Client 容错机制 class ClientFaultTolerance { public TopicRouteData getTopicRouteData(String topic) { // 尝试从不同的 NameServer 获取 List<String> nameServerList = this.remotingClient.getNameServerAddressList(); for (String nameServer : nameServerList) { try { TopicRouteData result = this.getTopicRouteDataFromNameServer(topic, nameServer, 3000); if (result != null) { return result; } } catch (Exception e) { log.warn("getTopicRouteData from {} failed", nameServer, e); // 继续尝试下一个 NameServer } } throw new MQClientException("No available NameServer"); } } } 五、性能优化 5.1 内存优化 public class MemoryOptimization { // 数据结构优化 class DataStructure { // 使用 HashMap 而非 ConcurrentHashMap // 通过读写锁控制并发,减少锁竞争 private final HashMap<String, List<QueueData>> topicQueueTable; private final ReadWriteLock lock = new ReentrantReadWriteLock(); // 读操作 public List<QueueData> getQueueData(String topic) { try { this.lock.readLock().lockInterruptibly(); return this.topicQueueTable.get(topic); } finally { this.lock.readLock().unlock(); } } // 写操作 public void putQueueData(String topic, List<QueueData> queueDataList) { try { this.lock.writeLock().lockInterruptibly(); this.topicQueueTable.put(topic, queueDataList); } finally { this.lock.writeLock().unlock(); } } } // 避免不必要的对象创建 class ObjectReuse { // 复用 BrokerData 对象 public BrokerData cloneBrokerData(BrokerData brokerData) { // 只克隆必要的字段 return new BrokerData( brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone() ); } } } 5.2 网络优化 public class NetworkOptimization { // 批量处理 class BatchProcessing { // Broker 一次注册所有 Topic // 而不是每个 Topic 单独注册 public void registerBroker(TopicConfigSerializeWrapper topicConfigWrapper) { // 包含所有 Topic 的配置 ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigWrapper.getTopicConfigTable(); // 一次性更新所有 Topic for (Map.Entry<String, TopicConfig> entry : topicConfigTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } // 连接复用 class ConnectionReuse { // Broker 和 NameServer 之间保持长连接 // 避免频繁建立连接 private Channel channel; // Netty Channel public void keepAlive() { // 定时发送心跳保持连接 this.scheduledExecutorService.scheduleAtFixedRate(() -> { this.registerBrokerAll(true, false, true); }, 1000, 30000, TimeUnit.MILLISECONDS); } } } 六、与 ZooKeeper 对比 6.1 设计理念对比 对比维度: ┌──────────────┬──────────────────┬──────────────────┐ │ 维度 │ NameServer │ ZooKeeper │ ├──────────────┼──────────────────┼──────────────────┤ │ CAP 选择 │ AP(可用性优先) │ CP(一致性优先) │ │ 数据持久化 │ 无(纯内存) │ 有(磁盘) │ │ 节点通信 │ 无 │ 有(Paxos/Raft) │ │ 部署复杂度 │ 低 │ 高 │ │ 运维复杂度 │ 低 │ 高 │ │ 性能 │ 高(内存操作) │ 中(磁盘IO) │ │ 一致性保证 │ 最终一致 │ 强一致 │ │ 故障恢复 │ 快 │ 慢(需要选举) │ └──────────────┴──────────────────┴──────────────────┘ 6.2 为什么 RocketMQ 选择 NameServer? public class WhyNameServer { class Reasons { /* 1. 需求不同 - RocketMQ 不需要强一致性 - 路由信息短暂不一致可以接受 - Client 有容错机制 2. 简化部署 - 无需额外依赖 - 降低运维成本 - 减少故障点 3. 性能优势 - 纯内存操作,性能更高 - 无磁盘 IO - 无网络同步开销 4. 可用性优势 - 无需选举,故障恢复快 - 任意节点可服务 - 整体可用性更高 5. 成本考虑 - 更少的机器资源 - 更低的学习成本 - 更少的运维工作 */ } } 七、最佳实践 7.1 部署建议 public class DeploymentBestPractices { // 1. 节点数量 class NodeCount { /* 推荐配置: - 生产环境:3 个节点 - 测试环境:1 个节点即可 原因: - 3 个节点足够高可用 - 过多节点增加 Broker 负担(需向每个节点注册) - 节省资源 */ } // 2. 硬件配置 class HardwareRequirement { /* 推荐配置: - CPU: 2 核即可 - 内存: 4GB 即可 - 磁盘: 无特殊要求(不持久化) - 网络: 千兆网卡 NameServer 非常轻量,硬件需求很低 */ } // 3. 网络规划 class NetworkPlanning { /* 建议: - 与 Broker 在同一机房 - 使用内网通信 - 避免跨公网访问 - 确保网络稳定性 */ } // 4. 启动参数 class StartupParameters { String startCommand = "nohup sh mqnamesrv " + "-Xms2g -Xmx2g -Xmn1g " + "-XX:+UseG1GC " + "-XX:G1HeapRegionSize=16m " + "-XX:+PrintGCDetails " + "-XX:+PrintGCDateStamps " + "-Xloggc:/dev/shm/mq_gc_%p.log &"; } } 7.2 监控建议 public class MonitoringBestPractices { // 1. 关键指标 class KeyMetrics { long registeredBrokerCount; // 注册的 Broker 数量 long registeredTopicCount; // 注册的 Topic 数量 long activeConnectionCount; // 活跃连接数 double routeQueryQPS; // 路由查询 QPS double brokerRegisterQPS; // Broker 注册 QPS } // 2. 告警规则 class AlertRules { /* 建议告警: 1. Broker 数量突然减少(可能宕机) 2. 路由查询失败率 > 1% 3. 内存使用率 > 80% 4. JVM GC 频繁 */ } } 八、总结 NameServer 的设计充分体现了"大道至简"的哲学: ...

2025-11-13 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...