RocketMQ架构02:NameServer深度剖析 - 轻量级注册中心的设计哲学

引言:简单而不简陋 NameServer 是 RocketMQ 中最"简单"的组件,但简单不代表简陋。它用最少的代码实现了最核心的功能: 无状态、无持久化 不依赖任何第三方组件 完全内存操作 对等部署、无主从之分 今天,我们深入理解这个"简单而强大"的组件。 一、NameServer 核心数据结构 1.1 路由信息管理 public class RouteInfoManager { // 1. Topic → QueueData 映射 // 记录每个 Topic 在哪些 Broker 上有哪些 Queue private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; // QueueData 结构 class QueueData { private String brokerName; // Broker 名称 private int readQueueNums; // 读队列数量 private int writeQueueNums; // 写队列数量 private int perm; // 权限(2=写 4=读 6=读写) private int topicSynFlag; // 同步标识 } // 2. Broker 名称 → BrokerData 映射 // 记录 Broker 的集群信息和地址 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // BrokerData 结构 class BrokerData { private String cluster; // 所属集群 private String brokerName; // Broker 名称 private HashMap<Long/* brokerId */, String/* address */> brokerAddrs; // brokerId: 0=Master, >0=Slave } // 3. Broker 地址 → BrokerLiveInfo 映射 // 记录 Broker 的存活信息 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // BrokerLiveInfo 结构 class BrokerLiveInfo { private long lastUpdateTimestamp; // 最后更新时间 private DataVersion dataVersion; // 数据版本 private Channel channel; // Netty Channel private String haServerAddr; // HA 服务地址 } // 4. 集群名称 → Broker 名称集合 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; // 5. Broker 地址 → 过滤服务器列表 private final HashMap<String/* brokerAddr */, List<String/* filter server */>> filterServerTable; // 读写锁保护 private final ReadWriteLock lock = new ReentrantReadWriteLock(); } 1.2 数据结构关系图 数据结构关系: topicQueueTable ┌──────────────────────────────────────┐ │ Topic1 → [QueueData1, QueueData2] │ │ QueueData1: brokerName=broker-a │ │ QueueData2: brokerName=broker-b │ └──────────────────────────────────────┘ │ ▼ brokerAddrTable ┌──────────────────────────────────────┐ │ broker-a → BrokerData │ │ cluster: DefaultCluster │ │ brokerAddrs: │ │ 0 → 192.168.1.100:10911 (Master)│ │ 1 → 192.168.1.101:10911 (Slave) │ └──────────────────────────────────────┘ │ ▼ brokerLiveTable ┌──────────────────────────────────────┐ │ 192.168.1.100:10911 → BrokerLiveInfo│ │ lastUpdateTimestamp: 1699999999000 │ │ channel: [Netty Channel] │ └──────────────────────────────────────┘ 二、Broker 注册流程 2.1 注册请求处理 public class BrokerRegistration { public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { // 获取写锁 this.lock.writeLock().lockInterruptibly(); // 1. 更新集群信息 Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); // 2. 更新 Broker 地址表 boolean registerFirst = false; BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<>()); this.brokerAddrTable.put(brokerName, brokerData); } // 更新 Broker 地址 Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); // 3. 更新 Topic 配置 if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { // 只有 Master 才更新 Topic 配置 if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } // 4. 更新 Broker 存活信息 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put( brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr ) ); if (null == prevBrokerLiveInfo) { log.info("new broker registered: {}", brokerAddr); } // 5. 更新过滤服务器列表 if (filterServerList != null && !filterServerList.isEmpty()) { this.filterServerTable.put(brokerAddr, filterServerList); } // 6. 如果是 Slave,返回 Master 信息 if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddr); if (masterLiveInfo != null) { result.setHaServerAddr(masterLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } catch (Exception e) { log.error("registerBroker Exception", e); } finally { this.lock.writeLock().unlock(); } return result; } // 创建或更新 Queue 数据 private void createAndUpdateQueueData(String brokerName, TopicConfig topicConfig) { QueueData queueData = new QueueData(); queueData.setBrokerName(brokerName); queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); queueData.setReadQueueNums(topicConfig.getReadQueueNums()); queueData.setPerm(topicConfig.getPerm()); queueData.setTopicSynFlag(topicConfig.getTopicSysFlag()); List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName()); if (null == queueDataList) { queueDataList = new LinkedList<>(); this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList); log.info("new topic registered: {}", topicConfig.getTopicName()); } else { // 移除旧的数据 queueDataList.removeIf(qd -> qd.getBrokerName().equals(brokerName)); } queueDataList.add(queueData); } } 2.2 Broker 心跳机制 public class BrokerHeartbeat { // Broker 端:定时发送心跳 class BrokerSide { // 每 30 秒向所有 NameServer 注册一次 private final ScheduledExecutorService scheduledExecutorService; public void start() { this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { // 向所有 NameServer 注册 this.registerBrokerAll(true, false, true); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } }, 1000, 30000, TimeUnit.MILLISECONDS); } private void registerBrokerAll( final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { // 准备注册信息 TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); // 向每个 NameServer 注册 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { for (String namesrvAddr : nameServerAddressList) { try { RegisterBrokerResult result = this.registerBroker( namesrvAddr, oneway, timeoutMills, topicConfigWrapper ); if (result != null) { // 更新 Master 地址(Slave 使用) if (this.updateMasterHAServerAddrPeriodically && result.getHaServerAddr() != null) { this.messageStore.updateHaMasterAddress(result.getHaServerAddr()); } } } catch (Exception e) { log.warn("registerBroker to {} failed", namesrvAddr, e); } } } } } // NameServer 端:检测 Broker 超时 class NameServerSide { // 每 10 秒扫描一次 public void scanNotActiveBroker() { long timeout = 120000; // 2 分钟超时 Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> entry = it.next(); long last = entry.getValue().getLastUpdateTimestamp(); if ((last + timeout) < System.currentTimeMillis()) { // Broker 超时 RemotingUtil.closeChannel(entry.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", entry.getKey(), timeout); // 清理路由信息 this.onChannelDestroy(entry.getKey(), entry.getValue().getChannel()); } } } // 清理路由信息 public void onChannelDestroy(String remoteAddr, Channel channel) { String brokerAddrFound = null; if (channel != null) { try { this.lock.readLock().lockInterruptibly(); // 查找 Broker 地址 for (Entry<String, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) { if (entry.getValue().getChannel() == channel) { brokerAddrFound = entry.getKey(); break; } } } finally { this.lock.readLock().unlock(); } } if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } if (brokerAddrFound != null) { // 移除 Broker this.removeBroker(brokerAddrFound); } } } } 三、路由查询 3.1 获取 Topic 路由信息 public class RouteQuery { public TopicRouteData pickupTopicRouteData(final String topic) { TopicRouteData topicRouteData = new TopicRouteData(); boolean foundQueueData = false; boolean foundBrokerData = false; try { // 获取读锁 this.lock.readLock().lockInterruptibly(); // 1. 查找 Topic 的 Queue 信息 List<QueueData> queueDataList = this.topicQueueTable.get(topic); if (queueDataList != null) { topicRouteData.setQueueDatas(queueDataList); foundQueueData = true; // 2. 查找对应的 Broker 信息 Set<String> brokerNameSet = new HashSet<>(); for (QueueData qd : queueDataList) { brokerNameSet.add(qd.getBrokerName()); } for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { BrokerData brokerDataClone = new BrokerData( brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone() ); topicRouteData.getBrokerDatas().add(brokerDataClone); foundBrokerData = true; // 3. 查找过滤服务器 for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { List<String> filterServerList = this.filterServerTable.get(brokerAddr); if (filterServerList != null) { topicRouteData.getFilterServerTable().put(brokerAddr, filterServerList); } } } } } } catch (Exception e) { log.error("pickupTopicRouteData Exception", e); } finally { this.lock.readLock().unlock(); } if (foundQueueData && foundBrokerData) { return topicRouteData; } return null; } // TopicRouteData 结构 class TopicRouteData { private String orderTopicConf; // 顺序消息配置 private List<QueueData> queueDatas; // Queue 信息列表 private List<BrokerData> brokerDatas; // Broker 信息列表 private HashMap<String, List<String>> filterServerTable; // 过滤服务器 } } 3.2 Client 端路由更新 public class ClientRouteUpdate { // Producer/Consumer 定时更新路由信息 class RouteUpdateTask { // 每 30 秒更新一次 private ScheduledExecutorService scheduledExecutorService; public void start() { this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } }, 10, 30000, TimeUnit.MILLISECONDS); } private void updateTopicRouteInfoFromNameServer() { // 获取所有订阅的 Topic Set<String> topicSet = new HashSet<>(); // Producer 的 Topic topicSet.addAll(this.producerTable.keySet()); // Consumer 的 Topic for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { topicSet.addAll(entry.getValue().subscriptions()); } // 更新每个 Topic 的路由信息 for (String topic : topicSet) { this.updateTopicRouteInfoFromNameServer(topic); } } public boolean updateTopicRouteInfoFromNameServer(final String topic) { try { // 从 NameServer 获取路由信息 TopicRouteData topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 3000); if (topicRouteData != null) { // 对比版本,判断是否需要更新 TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataChanged(old, topicRouteData); if (changed) { // 更新本地路由表 this.topicRouteTable.put(topic, topicRouteData); // 更新 Producer 的路由信息 for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) { entry.getValue().updateTopicPublishInfo(topic, topicRouteData); } // 更新 Consumer 的路由信息 for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { entry.getValue().updateTopicSubscribeInfo(topic, topicRouteData); } log.info("topicRouteTable.put: Topic={}, RouteData={}", topic, topicRouteData); return true; } } } catch (Exception e) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } return false; } // 判断路由信息是否变化 private boolean topicRouteDataChanged(TopicRouteData olddata, TopicRouteData nowdata) { if (olddata == null || nowdata == null) { return true; } TopicRouteData old = olddata.cloneTopicRouteData(); TopicRouteData now = nowdata.cloneTopicRouteData(); Collections.sort(old.getQueueDatas()); Collections.sort(old.getBrokerDatas()); Collections.sort(now.getQueueDatas()); Collections.sort(now.getBrokerDatas()); return !old.equals(now); } } } 四、NameServer 高可用设计 4.1 对等部署架构 NameServer 集群架构: ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ │ (独立运行) │ │ (独立运行) │ │ (独立运行) │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │ │ └──────────────────┼──────────────────┘ │ ┌───────────▼──────────┐ │ │ │ Broker 集群 │ │ (向所有NS注册) │ │ │ └──────────────────────┘ │ ┌───────────▼──────────┐ │ │ │ Producer/Consumer │ │ (随机选择NS查询) │ │ │ └──────────────────────┘ 特点: 1. NameServer 之间无任何通信 2. 每个 NameServer 都是完整功能的独立节点 3. Broker 向所有 NameServer 注册 4. Client 随机选择 NameServer 查询 4.2 数据一致性保证 public class ConsistencyGuarantee { // 最终一致性 class EventualConsistency { /* NameServer 不保证强一致性,但保证最终一致性 场景1:Broker 注册 ┌─────────────────────────────────────────┐ │ T0: Broker 向 NS1 注册 │ │ T1: Broker 向 NS2 注册 │ │ T2: Broker 向 NS3 注册 │ │ │ │ 在 T0-T2 期间,三个 NS 数据不一致 │ │ T2 后,数据最终一致 │ └─────────────────────────────────────────┘ 场景2:Broker 宕机 ┌─────────────────────────────────────────┐ │ T0: Broker 宕机 │ │ T1: NS1 检测到超时(2分钟) │ │ T2: NS2 检测到超时 │ │ T3: NS3 检测到超时 │ │ │ │ 在检测到之前,可能有短暂不一致 │ │ 但影响有限(Client 会自动切换 Broker) │ └─────────────────────────────────────────┘ */ } // 为什么可以接受不一致? class WhyAcceptableInconsistency { /* 1. Client 有容错机制 - 如果从 NS1 获取的 Broker 不可用 - Client 会自动尝试其他 Broker 2. Broker 定时注册 - 每 30 秒注册一次 - 很快会同步到所有 NS 3. 影响范围小 - 只影响短时间的查询 - 不影响已建立的连接 4. 简化设计 - 无需复杂的一致性协议 - 性能更高 */ } } 4.3 故障处理 public class FailureHandling { // 单个 NameServer 故障 class SingleNameServerFailure { /* 场景:NS1 宕机 ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ │ (宕机) │ │ (正常) │ │ (正常) │ └──────────────┘ └──────┬───────┘ └──────┬───────┘ │ │ ┌───────────────┴──────────────────┘ │ ┌───────▼──────────┐ │ Broker 集群 │ │ - 向NS2/NS3注册 │ │ - NS1注册失败 │ └──────────────────┘ 影响: 1. Broker 向 NS1 注册失败(不影响NS2/NS3) 2. Client 如果查询 NS1 会失败,自动重试其他 NS 3. 总体可用性不受影响 */ } // 全部 NameServer 故障 class AllNameServerFailure { /* 场景:所有 NS 宕机 影响: 1. Broker 无法注册新的 Topic 2. Client 无法获取新的路由信息 3. 但已有连接仍可正常工作! 为什么还能工作? - Producer/Consumer 本地缓存了路由信息 - Broker 地址不会频繁变化 - 短时间内(几小时)仍可正常使用 恢复: - NameServer 恢复后 - Broker 重新注册 - Client 更新路由 - 自动恢复正常 */ } // Client 容错机制 class ClientFaultTolerance { public TopicRouteData getTopicRouteData(String topic) { // 尝试从不同的 NameServer 获取 List<String> nameServerList = this.remotingClient.getNameServerAddressList(); for (String nameServer : nameServerList) { try { TopicRouteData result = this.getTopicRouteDataFromNameServer(topic, nameServer, 3000); if (result != null) { return result; } } catch (Exception e) { log.warn("getTopicRouteData from {} failed", nameServer, e); // 继续尝试下一个 NameServer } } throw new MQClientException("No available NameServer"); } } } 五、性能优化 5.1 内存优化 public class MemoryOptimization { // 数据结构优化 class DataStructure { // 使用 HashMap 而非 ConcurrentHashMap // 通过读写锁控制并发,减少锁竞争 private final HashMap<String, List<QueueData>> topicQueueTable; private final ReadWriteLock lock = new ReentrantReadWriteLock(); // 读操作 public List<QueueData> getQueueData(String topic) { try { this.lock.readLock().lockInterruptibly(); return this.topicQueueTable.get(topic); } finally { this.lock.readLock().unlock(); } } // 写操作 public void putQueueData(String topic, List<QueueData> queueDataList) { try { this.lock.writeLock().lockInterruptibly(); this.topicQueueTable.put(topic, queueDataList); } finally { this.lock.writeLock().unlock(); } } } // 避免不必要的对象创建 class ObjectReuse { // 复用 BrokerData 对象 public BrokerData cloneBrokerData(BrokerData brokerData) { // 只克隆必要的字段 return new BrokerData( brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone() ); } } } 5.2 网络优化 public class NetworkOptimization { // 批量处理 class BatchProcessing { // Broker 一次注册所有 Topic // 而不是每个 Topic 单独注册 public void registerBroker(TopicConfigSerializeWrapper topicConfigWrapper) { // 包含所有 Topic 的配置 ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigWrapper.getTopicConfigTable(); // 一次性更新所有 Topic for (Map.Entry<String, TopicConfig> entry : topicConfigTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } // 连接复用 class ConnectionReuse { // Broker 和 NameServer 之间保持长连接 // 避免频繁建立连接 private Channel channel; // Netty Channel public void keepAlive() { // 定时发送心跳保持连接 this.scheduledExecutorService.scheduleAtFixedRate(() -> { this.registerBrokerAll(true, false, true); }, 1000, 30000, TimeUnit.MILLISECONDS); } } } 六、与 ZooKeeper 对比 6.1 设计理念对比 对比维度: ┌──────────────┬──────────────────┬──────────────────┐ │ 维度 │ NameServer │ ZooKeeper │ ├──────────────┼──────────────────┼──────────────────┤ │ CAP 选择 │ AP(可用性优先) │ CP(一致性优先) │ │ 数据持久化 │ 无(纯内存) │ 有(磁盘) │ │ 节点通信 │ 无 │ 有(Paxos/Raft) │ │ 部署复杂度 │ 低 │ 高 │ │ 运维复杂度 │ 低 │ 高 │ │ 性能 │ 高(内存操作) │ 中(磁盘IO) │ │ 一致性保证 │ 最终一致 │ 强一致 │ │ 故障恢复 │ 快 │ 慢(需要选举) │ └──────────────┴──────────────────┴──────────────────┘ 6.2 为什么 RocketMQ 选择 NameServer? public class WhyNameServer { class Reasons { /* 1. 需求不同 - RocketMQ 不需要强一致性 - 路由信息短暂不一致可以接受 - Client 有容错机制 2. 简化部署 - 无需额外依赖 - 降低运维成本 - 减少故障点 3. 性能优势 - 纯内存操作,性能更高 - 无磁盘 IO - 无网络同步开销 4. 可用性优势 - 无需选举,故障恢复快 - 任意节点可服务 - 整体可用性更高 5. 成本考虑 - 更少的机器资源 - 更低的学习成本 - 更少的运维工作 */ } } 七、最佳实践 7.1 部署建议 public class DeploymentBestPractices { // 1. 节点数量 class NodeCount { /* 推荐配置: - 生产环境:3 个节点 - 测试环境:1 个节点即可 原因: - 3 个节点足够高可用 - 过多节点增加 Broker 负担(需向每个节点注册) - 节省资源 */ } // 2. 硬件配置 class HardwareRequirement { /* 推荐配置: - CPU: 2 核即可 - 内存: 4GB 即可 - 磁盘: 无特殊要求(不持久化) - 网络: 千兆网卡 NameServer 非常轻量,硬件需求很低 */ } // 3. 网络规划 class NetworkPlanning { /* 建议: - 与 Broker 在同一机房 - 使用内网通信 - 避免跨公网访问 - 确保网络稳定性 */ } // 4. 启动参数 class StartupParameters { String startCommand = "nohup sh mqnamesrv " + "-Xms2g -Xmx2g -Xmn1g " + "-XX:+UseG1GC " + "-XX:G1HeapRegionSize=16m " + "-XX:+PrintGCDetails " + "-XX:+PrintGCDateStamps " + "-Xloggc:/dev/shm/mq_gc_%p.log &"; } } 7.2 监控建议 public class MonitoringBestPractices { // 1. 关键指标 class KeyMetrics { long registeredBrokerCount; // 注册的 Broker 数量 long registeredTopicCount; // 注册的 Topic 数量 long activeConnectionCount; // 活跃连接数 double routeQueryQPS; // 路由查询 QPS double brokerRegisterQPS; // Broker 注册 QPS } // 2. 告警规则 class AlertRules { /* 建议告警: 1. Broker 数量突然减少(可能宕机) 2. 路由查询失败率 > 1% 3. 内存使用率 > 80% 4. JVM GC 频繁 */ } } 八、总结 NameServer 的设计充分体现了"大道至简"的哲学: ...

2025-11-13 · maneng

RocketMQ架构01:整体架构设计 - NameServer、Broker、Producer、Consumer

引言:一个精妙的分布式系统 如果把 RocketMQ 比作一座现代化城市,那么: NameServer 是城市规划局,管理所有地图信息 Broker 是物流仓库,存储和转运货物 Producer 是发货方,把货物送到仓库 Consumer 是收货方,从仓库取货 今天,我们从整体视角理解这座"消息城市"的运作机制。 一、RocketMQ 整体架构 1.1 核心组件 RocketMQ 架构全景图: ┌────────────────────────────────────────────┐ │ NameServer Cluster │ │ (路由注册中心、服务发现、轻量级协调器) │ └────────┬───────────────────┬───────────────┘ │ │ 注册/心跳 │ │ 获取路由 │ │ ┌────────────▼──────┐ ┌────▼────────────────┐ │ │ │ │ │ Broker Cluster │ │ Producer/Consumer │ │ │ │ │ │ ┌───────────────┐ │ │ 应用程序 │ │ │ Master Broker │ │◄────┤ │ │ │ - CommitLog │ │ 读写 │ │ │ │ - ConsumeQ │ │ 消息 │ │ │ │ - IndexFile │ │ │ │ │ └───────┬───────┘ │ └─────────────────────┘ │ │ 主从同步 │ │ ┌───────▼───────┐ │ │ │ Slave Broker │ │ │ │ - CommitLog │ │ │ │ - ConsumeQ │ │ │ │ - IndexFile │ │ │ └───────────────┘ │ └───────────────────┘ 1.2 组件职责矩阵 组件职责对比: ┌──────────────┬────────────────────────────────────────┐ │ 组件 │ 核心职责 │ ├──────────────┼────────────────────────────────────────┤ │ NameServer │ - 路由信息管理(Topic → Broker 映射) │ │ │ - Broker 注册与心跳检测 │ │ │ - 提供路由信息查询 │ │ │ - 无状态、对等集群 │ ├──────────────┼────────────────────────────────────────┤ │ Broker │ - 消息存储(CommitLog) │ │ │ - 消息索引(ConsumeQueue、IndexFile) │ │ │ - 消息查询与消费 │ │ │ - 高可用(主从复制、Dledger) │ ├──────────────┼────────────────────────────────────────┤ │ Producer │ - 消息生产 │ │ │ - 消息发送(同步、异步、单向) │ │ │ - 负载均衡(选择Broker和Queue) │ │ │ - 故障规避 │ ├──────────────┼────────────────────────────────────────┤ │ Consumer │ - 消息消费 │ │ │ - 消息拉取(长轮询) │ │ │ - 负载均衡(Rebalance) │ │ │ - 消费进度管理 │ └──────────────┴────────────────────────────────────────┘ 二、NameServer 详解 2.1 为什么需要 NameServer public class WhyNameServer { // 问题1:Producer/Consumer 如何知道 Broker 在哪里? class ServiceDiscovery { // 没有 NameServer: // - 硬编码 Broker 地址(不灵活) // - 无法动态感知 Broker 变化 // 有了 NameServer: // - 动态获取 Broker 列表 // - 自动感知 Broker 上下线 } // 问题2:Topic 分布在哪些 Broker 上? class TopicRoute { // NameServer 维护 Topic 路由信息 class TopicRouteData { List<QueueData> queueDatas; // Queue 分布 List<BrokerData> brokerDatas; // Broker 信息 } } // 问题3:为什么不用 ZooKeeper? class WhyNotZooKeeper { /* ZooKeeper 的问题: 1. CP 系统(强一致性)- RocketMQ 只需 AP(可用性优先) 2. 重量级依赖 - NameServer 更轻量 3. 运维复杂 - NameServer 无状态更简单 4. 性能开销 - NameServer 内存操作更快 NameServer 的优势: 1. 完全无状态 2. 对等集群(任意节点提供服务) 3. 内存操作(无磁盘 IO) 4. 简单高效 */ } } 2.2 NameServer 核心功能 public class NameServerCore { // 1. 路由信息管理 class RouteInfoManager { // Topic → Broker 映射 private HashMap<String/* topic */, List<QueueData>> topicQueueTable; // Broker 名称 → Broker 数据 private HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // Broker 地址 → 集群信息 private HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // 集群名称 → Broker 名称列表 private HashMap<String/* clusterName */, Set<String>> clusterAddrTable; // 过滤服务器 private HashMap<String/* brokerAddr */, List<String>> filterServerTable; } // 2. Broker 注册 public RegisterBrokerResult registerBroker( String clusterName, String brokerAddr, String brokerName, long brokerId, String haServerAddr, TopicConfigSerializeWrapper topicConfigWrapper) { // 创建或更新 Broker 数据 BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { brokerData = new BrokerData(clusterName, brokerName, new HashMap<>()); this.brokerAddrTable.put(brokerName, brokerData); } brokerData.getBrokerAddrs().put(brokerId, brokerAddr); // 更新 Topic 路由信息 if (null != topicConfigWrapper) { for (Map.Entry<String, TopicConfig> entry : topicConfigWrapper.getTopicConfigTable().entrySet()) { createAndUpdateQueueData(brokerName, entry.getValue()); } } // 更新 Broker 存活信息 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put( brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr ) ); return new RegisterBrokerResult(...); } // 3. 路由查询 public TopicRouteData getTopicRouteData(String topic) { TopicRouteData topicRouteData = new TopicRouteData(); // 查找 Topic 的 Queue 信息 List<QueueData> queueDataList = this.topicQueueTable.get(topic); // 查找对应的 Broker 信息 Set<String> brokerNameSet = new HashSet<>(); for (QueueData qd : queueDataList) { brokerNameSet.add(qd.getBrokerName()); } // 组装路由数据 for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (brokerData != null) { topicRouteData.getBrokerDatas().add(brokerData.clone()); } } topicRouteData.setQueueDatas(queueDataList); return topicRouteData; } // 4. Broker 心跳检测 public void scanNotActiveBroker() { long timeoutMillis = 120000; // 2 分钟超时 for (Map.Entry<String, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) { long last = entry.getValue().getLastUpdateTimestamp(); if ((last + timeoutMillis) < System.currentTimeMillis()) { // Broker 超时,移除 RemotingUtil.closeChannel(entry.getValue().getChannel()); this.brokerLiveTable.remove(entry.getKey()); // 清理路由信息 this.onChannelDestroy(entry.getKey(), entry.getValue().getChannel()); log.warn("Broker timeout: {}", entry.getKey()); } } } } 2.3 NameServer 集群 public class NameServerCluster { // NameServer 特点 class Characteristics { /* 1. 完全无状态 - 不持久化任何数据 - 所有数据在内存中 - 重启后数据丢失(从 Broker 重新获取) 2. 对等部署 - 所有节点功能完全一致 - 无主从之分 - 无需选举 3. 不互相通信 - NameServer 之间无任何通信 - 数据可能短暂不一致 - 最终一致即可 4. Broker 全量注册 - Broker 向所有 NameServer 注册 - 保证数据冗余 */ } // 部署架构 class DeploymentArchitecture { /* NameServer 集群(推荐3个节点): ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ └────────────────┼────────────────┘ │ ┌───────▼────────┐ │ │ │ Broker 集群 │ │ (全量注册) │ │ │ └────────────────┘ */ } } 三、Broker 详解 3.1 Broker 核心架构 Broker 内部架构: ┌─────────────────────────────────────────────────────┐ │ Broker │ │ ┌────────────────────────────────────────────────┐│ │ │ Remoting 通信层 ││ │ │ (接收 Producer/Consumer 请求) ││ │ └────────┬───────────────────────────────────────┘│ │ │ │ │ ┌────────▼────────────────────────────┐ │ │ │ BrokerController │ │ │ │ - 处理器注册 │ │ │ │ - 定时任务管理 │ │ │ │ - 消息处理 │ │ │ └────────┬────────────────────────────┘ │ │ │ │ │ ┌────────▼─────────────┬───────────────────┐ │ │ │ MessageStore │ HAService │ │ │ │ ┌────────────────┐ │ (主从复制) │ │ │ │ │ CommitLog │ │ │ │ │ │ │ (消息存储) │ │ │ │ │ │ └────────────────┘ │ │ │ │ │ ┌────────────────┐ │ │ │ │ │ │ ConsumeQueue │ │ │ │ │ │ │ (消费索引) │ │ │ │ │ │ └────────────────┘ │ │ │ │ │ ┌────────────────┐ │ │ │ │ │ │ IndexFile │ │ │ │ │ │ │ (消息索引) │ │ │ │ │ │ └────────────────┘ │ │ │ │ └──────────────────────┴───────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ 定时任务 │ │ │ │ - 持久化 ConsumeQueue │ │ │ │ - 持久化 CommitLog │ │ │ │ - 清理过期文件 │ │ │ │ - 统计信息 │ │ │ └─────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────┘ 3.2 Broker 核心职责 public class BrokerCore { // 1. 消息接收与存储 class MessageReceive { public PutMessageResult putMessage(MessageExtBrokerInner msg) { // 写入 CommitLog PutMessageResult result = this.commitLog.putMessage(msg); // 触发刷盘 handleDiskFlush(result, msg); // 触发主从同步 handleHA(result, msg); return result; } } // 2. 消息查询 class MessageQuery { // 按消息 ID 查询 public MessageExt lookMessageByOffset(long commitLogOffset) { return this.commitLog.lookMessageByOffset(commitLogOffset); } // 按 Key 查询 public QueryMessageResult queryMessage(String topic, String key, int maxNum) { return this.indexService.queryMessage(topic, key, maxNum); } // 按时间查询 public QueryMessageResult queryMessageByTime(String topic, long begin, long end) { return this.messageStore.queryMessage(topic, begin, end, maxNum); } } // 3. 消息消费 class MessageConsume { public GetMessageResult getMessage( String group, String topic, int queueId, long offset, int maxMsgNums, SubscriptionData subscriptionData) { // 从 ConsumeQueue 查找消息位置 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); // 根据 offset 获取消息 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); // 从 CommitLog 读取消息内容 List<MessageExt> messageList = new ArrayList<>(); for (int i = 0; i < maxMsgNums; i++) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); MessageExt msg = lookMessageByOffset(offsetPy, sizePy); // 消息过滤 if (match(subscriptionData, msg)) { messageList.add(msg); } } return new GetMessageResult(messageList); } } // 4. Topic 创建 class TopicManagement { public void createTopic(String topic, int queueNums) { TopicConfig topicConfig = new TopicConfig(topic); topicConfig.setReadQueueNums(queueNums); topicConfig.setWriteQueueNums(queueNums); topicConfig.setPerm(6); // 读写权限 this.topicConfigTable.put(topic, topicConfig); // 创建 ConsumeQueue for (int i = 0; i < queueNums; i++) { ConsumeQueue logic = new ConsumeQueue(topic, i); this.consumeQueueTable.put(buildKey(topic, i), logic); } } } // 5. 消费进度管理 class ConsumerOffsetManagement { private ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable; public void commitOffset(String group, String topic, int queueId, long offset) { String key = topic + "@" + group; ConcurrentHashMap<Integer, Long> map = offsetTable.get(key); if (map != null) { map.put(queueId, offset); } } public long queryOffset(String group, String topic, int queueId) { String key = topic + "@" + group; ConcurrentHashMap<Integer, Long> map = offsetTable.get(key); if (map != null) { Long offset = map.get(queueId); return offset != null ? offset : -1; } return -1; } } } 3.3 Broker 高可用 public class BrokerHA { // 主从架构 class MasterSlave { /* 主从模式: ┌──────────────┐ 同步 ┌──────────────┐ │ Master Broker│ ──────────> │ Slave Broker │ │ - 读写 │ │ - 只读 │ └──────────────┘ └──────────────┘ 同步方式: 1. 同步复制(SYNC_MASTER): - Master 等待 Slave 同步完成才返回 - 可靠性高,性能稍差 2. 异步复制(ASYNC_MASTER): - Master 立即返回,后台异步同步 - 性能高,可能丢失少量数据 */ } // Dledger 模式 class DledgerMode { /* 基于 Raft 的自动选主: ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │ (Leader) │◄─▶│(Follower)│◄─▶│(Follower)│ └──────────┘ └──────────┘ └──────────┘ 特点: 1. 自动故障转移 2. 无需人工干预 3. 保证数据一致性 4. 至少3个节点 */ } } 四、Producer 工作原理 4.1 Producer 架构 public class ProducerArchitecture { // Producer 核心组件 class ProducerComponents { private MQClientInstance clientInstance; // 客户端实例 private MQClientAPIImpl mqClientAPI; // 通信API private TopicPublishInfo publishInfo; // Topic路由信息 private SendMessageHook sendHook; // 发送钩子 } // 发送流程 public SendResult send(Message msg) { // 1. 查找 Topic 路由信息 TopicPublishInfo publishInfo = tryToFindTopicPublishInfo(msg.getTopic()); // 2. 选择消息队列 MessageQueue mq = selectOneMessageQueue(publishInfo); // 3. 发送消息 SendResult result = sendKernelImpl(msg, mq); // 4. 更新故障规避信息 updateFaultItem(mq.getBrokerName(), result); return result; } } 4.2 Producer 负载均衡 public class ProducerLoadBalance { // 队列选择策略 public MessageQueue selectQueue(TopicPublishInfo tpInfo) { // 1. 简单轮询 int index = sendWhichQueue.incrementAndGet() % queueSize; return tpInfo.getMessageQueueList().get(index); // 2. 故障规避 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { return mq; } // 3. 选择延迟最小的 return latencyFaultTolerance.pickOneAtLeast(); } } 五、Consumer 工作原理 5.1 Consumer 架构 public class ConsumerArchitecture { // Consumer 核心组件 class ConsumerComponents { private RebalanceService rebalanceService; // 负载均衡服务 private PullMessageService pullMessageService; // 拉取服务 private ConsumeMessageService consumeService; // 消费服务 private OffsetStore offsetStore; // 进度存储 } // 消费流程 public void start() { // 1. 启动负载均衡 rebalanceService.start(); // 2. 启动拉取服务 pullMessageService.start(); // 3. 启动消费服务 consumeService.start(); // 4. 注册到 Broker registerConsumer(); } } 5.2 Consumer 负载均衡(Rebalance) public class ConsumerRebalance { public void doRebalance() { // 1. 获取 Topic 的所有 Queue List<MessageQueue> mqAll = getTopicQueues(topic); // 2. 获取同组的所有 Consumer List<String> cidAll = getConsumerList(group); // 3. 排序(保证一致性视图) Collections.sort(mqAll); Collections.sort(cidAll); // 4. 执行分配 List<MessageQueue> allocated = allocateStrategy.allocate( group, currentCID, mqAll, cidAll ); // 5. 更新本地队列 updateProcessQueue(allocated); } } 六、完整消息流转 6.1 发送消息完整流程 消息发送完整流程: ┌───────────┐ │ Producer │ └─────┬─────┘ │1. 查询路由 ▼ ┌───────────┐ │NameServer │ 返回路由信息(Broker列表、Queue信息) └─────┬─────┘ │ ▼ ┌───────────┐ │ Producer │ 2. 选择 Broker 和 Queue └─────┬─────┘ │3. 发送消息 ▼ ┌───────────┐ │ Broker │ 4. 写入 CommitLog │ ┌───────┐ │ │ │CommitLog│ │ └───┬───┘ │ │ │5. 异步构建 ConsumeQueue │ ┌───▼──────┐ │ │ConsumeQ │ │ └──────────┘ │ │6. 返回结果 └─────┼─────┘ ▼ ┌───────────┐ │ Producer │ 7. 收到发送结果 └───────────┘ 6.2 消费消息完整流程 消息消费完整流程: ┌───────────┐ │ Consumer │ └─────┬─────┘ │1. 注册到 Broker ▼ ┌───────────┐ │ Broker │ 保存 Consumer 信息 └─────┬─────┘ │2. Rebalance 分配 Queue ▼ ┌───────────┐ │ Consumer │ 3. 发起长轮询拉取 └─────┬─────┘ │ ▼ ┌───────────┐ │ Broker │ 4. 从 ConsumeQueue 查找 │ ┌───────┐ │ 5. 从 CommitLog 读取消息 │ │ConsumeQ│ │ │ └───┬───┘ │ │ │ │ │ ┌───▼────┐│ │ │CommitLog││ │ └────────┘│ │ │6. 返回消息 └─────┼─────┘ ▼ ┌───────────┐ │ Consumer │ 7. 处理消息 └─────┬─────┘ 8. 更新消费进度 │ ▼ ┌───────────┐ │ Broker │ 9. 保存消费进度 └───────────┘ 七、设计亮点与思考 7.1 设计亮点 public class DesignHighlights { // 1. NameServer 的轻量化设计 class LightweightNameServer { /* - 无状态,内存操作 - 对等部署,无需选举 - 简单高效 - AP 而非 CP */ } // 2. CommitLog 的顺序写 class SequentialWrite { /* - 所有消息顺序写入同一个文件 - 充分利用磁盘顺序写性能 - 避免随机 IO */ } // 3. ConsumeQueue 的索引设计 class IndexDesign { /* - 轻量级索引(20字节) - 快速定位消息位置 - 支持消费进度管理 */ } // 4. 长轮询的伪 Push class LongPolling { /* - Consumer 主动拉取 - Broker Hold 请求 - 兼顾实时性和可控性 */ } } 7.2 架构权衡 public class ArchitectureTradeoffs { // 权衡1:性能 vs 可靠性 class PerformanceVsReliability { // 异步刷盘 → 高性能,可能丢消息 // 同步刷盘 → 零丢失,性能下降 // 异步复制 → 高性能,可能丢消息 // 同步复制 → 高可靠,性能下降 } // 权衡2:一致性 vs 可用性 class ConsistencyVsAvailability { // NameServer 选择 AP // - 可能短暂不一致 // - 但保证高可用 // Dledger 模式选择 CP // - 保证数据一致 // - 需要多数节点存活 } // 权衡3:复杂度 vs 功能 class ComplexityVsFeature { // RocketMQ 比 Kafka 复杂 // - 但功能更丰富(事务、延迟、顺序) // RocketMQ 比 RabbitMQ 简单 // - 但性能更高 } } 八、总结 通过本篇,我们从整体视角理解了 RocketMQ 的架构: ...

2025-11-13 · maneng

RocketMQ入门02:消息队列的本质与演进

引言:从一个简单的队列开始 在计算机科学中,队列(Queue)是最基础的数据结构之一。它遵循 FIFO(First In First Out)原则,就像排队买票一样,先来的先处理。 // 最简单的队列实现 public class SimpleQueue<T> { private LinkedList<T> items = new LinkedList<>(); public void enqueue(T item) { items.addLast(item); // 入队 } public T dequeue() { return items.removeFirst(); // 出队 } } 这个简单的数据结构,竟然是现代消息队列的起源。让我们看看它是如何一步步演进的。 一、演进阶段1:进程内队列 1.1 最初的需求 在单进程程序中,当我们需要解耦生产者和消费者时,最简单的方式就是使用内存队列: // 生产者-消费者模式 public class InMemoryMessageQueue { private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>(); // 生产者线程 class Producer extends Thread { public void run() { while (true) { Message msg = generateMessage(); queue.put(msg); // 阻塞式入队 } } } // 消费者线程 class Consumer extends Thread { public void run() { while (true) { Message msg = queue.take(); // 阻塞式出队 processMessage(msg); } } } } 特点: ...

2025-11-13 · maneng

Redis第一性原理:为什么我们需要缓存?

一、引子:商品详情接口的性能进化之路 想象你正在开发一个电商平台的商品详情页面,每次用户访问都需要查询商品基本信息、品牌信息、类目信息、库存信息和商品图片。让我们看看三种不同的实现方式,以及它们各自的性能表现。 1.1 场景A:无缓存(直接查数据库) 这是最直接的实现方式:每次请求都查询数据库。 @Service public class ProductService { @Autowired private ProductRepository productRepository; @Autowired private BrandRepository brandRepository; @Autowired private CategoryRepository categoryRepository; @Autowired private InventoryRepository inventoryRepository; @Autowired private ProductImageRepository productImageRepository; /** * 查询商品详情(每次请求都查数据库) * 平均耗时:100ms * QPS上限:500 */ public ProductDetailVO getProductDetail(Long productId) { // 1. 查询商品基本信息(20ms) Product product = productRepository.findById(productId); if (product == null) { throw new ProductNotFoundException("商品不存在:" + productId); } // 2. 查询品牌信息(20ms) Brand brand = brandRepository.findById(product.getBrandId()); // 3. 查询类目信息(20ms) Category category = categoryRepository.findById(product.getCategoryId()); // 4. 查询库存信息(20ms) Inventory inventory = inventoryRepository.findByProductId(productId); // 5. 查询商品图片(20ms,可能有N+1查询问题) List<ProductImage> images = productImageRepository.findByProductId(productId); // 6. 组装返回对象 ProductDetailVO vo = new ProductDetailVO(); vo.setProductId(product.getId()); vo.setProductName(product.getName()); vo.setPrice(product.getPrice()); vo.setBrandName(brand.getName()); vo.setCategoryName(category.getName()); vo.setStock(inventory.getStock()); vo.setImages(images); return vo; } } 性能数据(压测工具:JMeter,1000并发): ...

2025-11-03 · maneng

为什么需要微服务?单体架构的困境与演进

引子:一个电商系统的架构抉择 2020年3月,某创业公司CTO王明面临一个艰难的技术决策: 现状: 团队规模:15个研发 业务现状:日订单量从1万增长到10万 核心痛点:每次发布需要2小时,任何一个小功能的上线都要全量发布,导致上线频率从每周3次降到每周1次 技术选型的两个方向: 继续单体架构:优化现有系统,加机器,做好模块化 切换微服务架构:拆分服务,引入Spring Cloud,重构系统 王明的困惑: “我们真的需要微服务吗?” “微服务能解决什么问题?” “微服务会带来什么代价?” 让我们通过两个完全不同的世界,来理解微服务架构的本质。 一、场景A:单体架构实现(初创期的最优解) 1.1 单体架构的完整代码实现 /** * 电商系统 - 单体架构实现 * 特点:所有功能在一个进程内,共享一个数据库 */ @SpringBootApplication public class ECommerceMonolithApplication { public static void main(String[] args) { SpringApplication.run(ECommerceMonolithApplication.class, args); } } /** * 订单服务(单体架构版本) * 依赖:用户服务、商品服务、库存服务、支付服务 * 特点:所有依赖都在同一个进程内,方法调用 */ @Service @Slf4j @Transactional(rollbackFor = Exception.class) public class OrderService { @Autowired private UserService userService; // 本地方法调用 @Autowired private ProductService productService; // 本地方法调用 @Autowired private InventoryService inventoryService; // 本地方法调用 @Autowired private PaymentService paymentService; // 本地方法调用 @Autowired private OrderRepository orderRepository; // 同一个数据库 /** * 创建订单 * 优势: * 1. 本地方法调用,性能极高(纳秒级) * 2. 本地事务,ACID保证强一致性 * 3. 调试方便,堆栈清晰 * * 问题: * 1. 单点故障:任何一个模块崩溃,整个系统不可用 * 2. 部署耦合:改一行代码,整个系统重启 * 3. 技术栈绑定:整个系统必须用同一种语言、框架 * 4. 资源竞争:所有模块共享JVM内存、CPU */ public Order createOrder(CreateOrderRequest request) { // ========== 1. 用户验证(本地方法调用,1ms) ========== User user = userService.getUserById(request.getUserId()); if (user == null) { throw new BusinessException("用户不存在"); } if (user.getStatus() != UserStatus.ACTIVE) { throw new BusinessException("用户状态异常"); } // ========== 2. 商品查询(本地方法调用,2ms) ========== Product product = productService.getProductById(request.getProductId()); if (product == null) { throw new BusinessException("商品不存在"); } if (product.getStatus() != ProductStatus.ON_SALE) { throw new BusinessException("商品已下架"); } // ========== 3. 库存扣减(本地方法调用,3ms) ========== boolean deductSuccess = inventoryService.deduct( product.getId(), request.getQuantity() ); if (!deductSuccess) { throw new BusinessException("库存不足"); } // ========== 4. 创建订单(本地数据库写入,5ms) ========== Order order = Order.builder() .orderNo(generateOrderNo()) .userId(user.getId()) .productId(product.getId()) .productName(product.getName()) .quantity(request.getQuantity()) .price(product.getPrice()) .amount(product.getPrice().multiply( new BigDecimal(request.getQuantity()) )) .status(OrderStatus.UNPAID) .createTime(LocalDateTime.now()) .build(); order = orderRepository.save(order); // ========== 5. 调用支付(本地方法调用,10ms) ========== boolean paymentSuccess = paymentService.pay( order.getOrderNo(), order.getAmount() ); if (!paymentSuccess) { throw new BusinessException("支付失败"); } // ========== 6. 更新订单状态(本地数据库更新,2ms) ========== order.setStatus(OrderStatus.PAID); order.setPayTime(LocalDateTime.now()); order = orderRepository.save(order); log.info("订单创建成功: orderNo={}, amount={}", order.getOrderNo(), order.getAmount()); // 总耗时约23ms(都是本地调用) return order; } /** * 生成订单号 */ private String generateOrderNo() { return "ORD" + System.currentTimeMillis() + ThreadLocalRandom.current().nextInt(1000); } } /** * 用户服务 */ @Service public class UserService { @Autowired private UserRepository userRepository; public User getUserById(Long userId) { return userRepository.findById(userId).orElse(null); } } /** * 商品服务 */ @Service public class ProductService { @Autowired private ProductRepository productRepository; public Product getProductById(Long productId) { return productRepository.findById(productId).orElse(null); } } /** * 库存服务 */ @Service public class InventoryService { @Autowired private InventoryRepository inventoryRepository; /** * 扣减库存 * 优势:本地事务,强一致性保证 */ public boolean deduct(Long productId, Integer quantity) { Inventory inventory = inventoryRepository.findByProductId(productId); if (inventory == null || inventory.getStock() < quantity) { return false; } inventory.setStock(inventory.getStock() - quantity); inventoryRepository.save(inventory); return true; } } /** * 支付服务 */ @Service public class PaymentService { @Autowired private PaymentRepository paymentRepository; /** * 支付 * 优势:本地事务,要么都成功,要么都失败 */ public boolean pay(String orderNo, BigDecimal amount) { // 调用第三方支付(这里模拟) boolean paySuccess = callThirdPartyPayment(orderNo, amount); if (paySuccess) { // 记录支付流水 Payment payment = Payment.builder() .orderNo(orderNo) .amount(amount) .status(PaymentStatus.SUCCESS) .payTime(LocalDateTime.now()) .build(); paymentRepository.save(payment); } return paySuccess; } private boolean callThirdPartyPayment(String orderNo, BigDecimal amount) { // 模拟第三方支付调用 return true; } } /** * 数据库实体:订单 */ @Entity @Table(name = "t_order") @Data @Builder public class Order { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String orderNo; private Long userId; private Long productId; private String productName; private Integer quantity; private BigDecimal price; private BigDecimal amount; @Enumerated(EnumType.STRING) private OrderStatus status; private LocalDateTime createTime; private LocalDateTime payTime; } /** * 数据库实体:用户、商品、库存、支付记录 * (省略具体实现) */ 1.2 单体架构的项目结构 ecommerce-monolith/ ├── src/main/java/com/example/ecommerce/ │ ├── ECommerceMonolithApplication.java # 启动类 │ ├── controller/ # 控制器层 │ │ ├── OrderController.java │ │ ├── UserController.java │ │ ├── ProductController.java │ │ └── InventoryController.java │ ├── service/ # 服务层 │ │ ├── OrderService.java # 订单服务 │ │ ├── UserService.java # 用户服务 │ │ ├── ProductService.java # 商品服务 │ │ ├── InventoryService.java # 库存服务 │ │ └── PaymentService.java # 支付服务 │ ├── repository/ # 数据访问层 │ │ ├── OrderRepository.java │ │ ├── UserRepository.java │ │ ├── ProductRepository.java │ │ ├── InventoryRepository.java │ │ └── PaymentRepository.java │ ├── entity/ # 实体层 │ │ ├── Order.java │ │ ├── User.java │ │ ├── Product.java │ │ ├── Inventory.java │ │ └── Payment.java │ └── config/ # 配置类 │ ├── DataSourceConfig.java │ └── TransactionConfig.java ├── src/main/resources/ │ ├── application.yml # 配置文件 │ └── db/migration/ # 数据库脚本 │ └── V1__init_schema.sql └── pom.xml # Maven配置 部署方式: 1. 打包:mvn clean package → ecommerce-monolith.jar(200MB) 2. 部署:java -jar ecommerce-monolith.jar 3. 运行:单个JVM进程,占用2GB内存 4. 数据库:MySQL单实例,5张表在同一个库 1.3 单体架构的核心特点 维度 特点 优势 劣势 部署 单个war/jar包 部署简单,运维成本低 改一行代码,全量重启 调用方式 本地方法调用 性能极高(纳秒级) 单点故障,一损俱损 事务 本地事务(ACID) 强一致性,简单可靠 无法独立扩展单个模块 技术栈 统一技术栈 团队技能统一,学习成本低 技术升级困难,框架绑定 开发效率 代码在同一个工程 IDE调试方便,定位问题快 代码库膨胀,启动变慢 团队协作 共享代码库 代码复用容易 代码冲突频繁,发布排队 二、场景B:微服务架构实现(规模化后的必然选择) 2.1 微服务架构的完整代码实现 /** * 订单服务(微服务架构版本) * 特点:独立进程,独立数据库,通过RPC调用其他服务 */ @SpringBootApplication @EnableDiscoveryClient // 服务注册 @EnableFeignClients // 远程调用 public class OrderServiceApplication { public static void main(String[] args) { SpringApplication.run(OrderServiceApplication.class, args); } } /** * 订单服务实现 * 依赖:远程调用用户服务、商品服务、库存服务、支付服务 */ @Service @Slf4j public class OrderService { @Autowired private UserServiceClient userServiceClient; // 远程调用(HTTP/gRPC) @Autowired private ProductServiceClient productServiceClient; // 远程调用 @Autowired private InventoryServiceClient inventoryServiceClient; // 远程调用 @Autowired private PaymentServiceClient paymentServiceClient; // 远程调用 @Autowired private OrderRepository orderRepository; // 独立数据库 @Autowired private RocketMQTemplate rocketMQTemplate; // 消息队列 /** * 创建订单(微服务版本) * * 优势: * 1. 服务独立部署:订单服务可以独立发布,不影响其他服务 * 2. 技术栈自由:订单服务用Java,用户服务可以用Go,商品服务可以用Python * 3. 独立扩展:订单服务压力大,可以只扩容订单服务 * 4. 故障隔离:支付服务挂了,不影响订单创建(降级返回"支付中") * * 挑战: * 1. 网络调用:性能下降(本地纳秒级 → 远程毫秒级) * 2. 分布式事务:无法保证强一致性(ACID → BASE) * 3. 调用链路长:5个服务,任何一个超时都影响整体 * 4. 运维复杂:5个服务 × 3个环境 = 15套部署 */ public Order createOrder(CreateOrderRequest request) { // ========== 1. 远程调用用户服务(网络调用,10ms + 超时风险) ========== try { UserDTO user = userServiceClient.getUserById(request.getUserId()); if (user == null || user.getStatus() != UserStatus.ACTIVE) { throw new BusinessException("用户不存在或状态异常"); } } catch (FeignException e) { log.error("调用用户服务失败: {}", e.getMessage()); throw new BusinessException("用户服务不可用"); } // ========== 2. 远程调用商品服务(网络调用,15ms + 超时风险) ========== ProductDTO product; try { product = productServiceClient.getProductById(request.getProductId()); if (product == null || product.getStatus() != ProductStatus.ON_SALE) { throw new BusinessException("商品不存在或已下架"); } } catch (FeignException e) { log.error("调用商品服务失败: {}", e.getMessage()); throw new BusinessException("商品服务不可用"); } // ========== 3. 远程调用库存服务(网络调用,20ms + 超时风险) ========== boolean deductSuccess; try { deductSuccess = inventoryServiceClient.deduct( product.getId(), request.getQuantity() ); if (!deductSuccess) { throw new BusinessException("库存不足"); } } catch (FeignException e) { log.error("调用库存服务失败: {}", e.getMessage()); throw new BusinessException("库存服务不可用"); } // ========== 4. 创建订单(本地数据库写入,5ms) ========== Order order = Order.builder() .orderNo(generateOrderNo()) .userId(request.getUserId()) .productId(product.getId()) .productName(product.getName()) .quantity(request.getQuantity()) .price(product.getPrice()) .amount(product.getPrice().multiply( new BigDecimal(request.getQuantity()) )) .status(OrderStatus.UNPAID) .createTime(LocalDateTime.now()) .build(); order = orderRepository.save(order); // ========== 5. 异步调用支付服务(通过消息队列,解耦) ========== // 不直接调用支付服务,而是发送消息到MQ,支付服务异步消费 PaymentMessage paymentMessage = PaymentMessage.builder() .orderNo(order.getOrderNo()) .amount(order.getAmount()) .build(); rocketMQTemplate.syncSend("PAYMENT_TOPIC", paymentMessage); log.info("订单创建成功: orderNo={}, amount={}", order.getOrderNo(), order.getAmount()); // 总耗时约50ms(网络调用 + 超时重试) // 相比单体架构的23ms,慢了2倍,但换来了独立部署、独立扩展、故障隔离 return order; } /** * 生成订单号(分布式唯一ID) * 单体架构:可以用数据库自增ID * 微服务架构:需要用雪花算法、UUID等分布式ID生成策略 */ private String generateOrderNo() { // 雪花算法生成分布式唯一ID return "ORD" + SnowflakeIdWorker.generateId(); } } /** * 用户服务客户端(Feign声明式HTTP客户端) */ @FeignClient( name = "user-service", // 服务名(从注册中心获取) fallback = UserServiceFallback.class // 降级策略 ) public interface UserServiceClient { @GetMapping("/api/users/{userId}") UserDTO getUserById(@PathVariable("userId") Long userId); } /** * 用户服务降级策略(熔断后的备选方案) */ @Component public class UserServiceFallback implements UserServiceClient { @Override public UserDTO getUserById(Long userId) { log.warn("用户服务调用失败,触发降级策略"); // 返回一个默认用户,或者抛出异常 throw new BusinessException("用户服务不可用,请稍后重试"); } } /** * 商品服务客户端 */ @FeignClient( name = "product-service", fallback = ProductServiceFallback.class ) public interface ProductServiceClient { @GetMapping("/api/products/{productId}") ProductDTO getProductById(@PathVariable("productId") Long productId); } /** * 库存服务客户端 */ @FeignClient( name = "inventory-service", fallback = InventoryServiceFallback.class ) public interface InventoryServiceClient { @PostMapping("/api/inventory/deduct") boolean deduct(@RequestParam("productId") Long productId, @RequestParam("quantity") Integer quantity); } /** * 支付服务(独立的微服务,监听MQ消息) */ @Service @Slf4j public class PaymentService { @Autowired private PaymentRepository paymentRepository; @Autowired private OrderServiceClient orderServiceClient; /** * 监听支付消息(异步处理) * 优势:解耦订单服务和支付服务,支付失败不影响订单创建 */ @RocketMQMessageListener( topic = "PAYMENT_TOPIC", consumerGroup = "payment-consumer-group" ) public class PaymentMessageListener implements RocketMQListener<PaymentMessage> { @Override public void onMessage(PaymentMessage message) { log.info("接收到支付消息: {}", message); try { // 调用第三方支付 boolean paySuccess = callThirdPartyPayment( message.getOrderNo(), message.getAmount() ); if (paySuccess) { // 记录支付流水 Payment payment = Payment.builder() .orderNo(message.getOrderNo()) .amount(message.getAmount()) .status(PaymentStatus.SUCCESS) .payTime(LocalDateTime.now()) .build(); paymentRepository.save(payment); // 回调订单服务,更新订单状态 orderServiceClient.updateOrderStatus( message.getOrderNo(), OrderStatus.PAID ); log.info("支付成功: orderNo={}", message.getOrderNo()); } else { log.error("支付失败: orderNo={}", message.getOrderNo()); // 支付失败,可以重试或者发送通知 } } catch (Exception e) { log.error("支付处理异常: orderNo={}, error={}", message.getOrderNo(), e.getMessage()); // 异常时,消息会重新入队,重试机制 throw new RuntimeException("支付处理失败", e); } } } private boolean callThirdPartyPayment(String orderNo, BigDecimal amount) { // 模拟第三方支付调用 return true; } } 2.2 微服务架构的项目结构 microservices-ecommerce/ ├── order-service/ # 订单服务 │ ├── src/main/java/com/example/order/ │ │ ├── OrderServiceApplication.java # 启动类 │ │ ├── controller/OrderController.java │ │ ├── service/OrderService.java │ │ ├── repository/OrderRepository.java │ │ ├── entity/Order.java │ │ ├── client/ # Feign客户端 │ │ │ ├── UserServiceClient.java │ │ │ ├── ProductServiceClient.java │ │ │ ├── InventoryServiceClient.java │ │ │ └── PaymentServiceClient.java │ │ └── config/ # 配置类 │ ├── src/main/resources/ │ │ └── application.yml # 订单服务配置 │ └── pom.xml # 独立的Maven配置 │ ├── user-service/ # 用户服务 │ ├── src/main/java/com/example/user/ │ │ ├── UserServiceApplication.java │ │ ├── controller/UserController.java │ │ ├── service/UserService.java │ │ ├── repository/UserRepository.java │ │ └── entity/User.java │ ├── src/main/resources/ │ │ └── application.yml │ └── pom.xml │ ├── product-service/ # 商品服务 │ ├── src/main/java/com/example/product/ │ │ ├── ProductServiceApplication.java │ │ ├── controller/ProductController.java │ │ ├── service/ProductService.java │ │ ├── repository/ProductRepository.java │ │ └── entity/Product.java │ ├── src/main/resources/ │ │ └── application.yml │ └── pom.xml │ ├── inventory-service/ # 库存服务 │ ├── src/main/java/com/example/inventory/ │ │ ├── InventoryServiceApplication.java │ │ ├── controller/InventoryController.java │ │ ├── service/InventoryService.java │ │ ├── repository/InventoryRepository.java │ │ └── entity/Inventory.java │ ├── src/main/resources/ │ │ └── application.yml │ └── pom.xml │ ├── payment-service/ # 支付服务 │ ├── src/main/java/com/example/payment/ │ │ ├── PaymentServiceApplication.java │ │ ├── service/PaymentService.java │ │ ├── repository/PaymentRepository.java │ │ ├── entity/Payment.java │ │ └── listener/PaymentMessageListener.java │ ├── src/main/resources/ │ │ └── application.yml │ └── pom.xml │ ├── common/ # 公共模块 │ ├── src/main/java/com/example/common/ │ │ ├── dto/ # 数据传输对象 │ │ ├── exception/ # 异常定义 │ │ └── util/ # 工具类 │ └── pom.xml │ ├── gateway/ # API网关(Spring Cloud Gateway) │ ├── src/main/java/com/example/gateway/ │ │ ├── GatewayApplication.java │ │ └── config/GatewayConfig.java │ └── pom.xml │ └── registry/ # 服务注册中心(Nacos) └── nacos-server/ 部署方式: 1. 打包:每个服务独立打包 → order-service.jar, user-service.jar... 2. 部署:每个服务独立部署(可以部署在不同的机器上) 3. 运行:5个JVM进程,每个占用1GB内存(总共5GB) 4. 数据库:5个独立的MySQL数据库(或5个独立的Schema) 2.3 微服务架构的核心特点 维度 特点 优势 劣势 部署 独立部署单元 改一行代码,只重启一个服务 运维复杂度高,需要容器编排 调用方式 远程调用(HTTP/gRPC) 故障隔离,服务独立演进 性能下降(纳秒→毫秒),网络不可靠 事务 分布式事务(BASE) 独立扩展,高可用 最终一致性,业务逻辑复杂 技术栈 多语言异构 技术自由,选择最合适的技术 技能要求高,学习成本大 开发效率 代码分散在多个工程 独立迭代,避免冲突 调试困难,链路追踪复杂 团队协作 按服务划分团队 团队自治,减少沟通成本 接口约定复杂,版本管理困难 三、数据对比:单体 vs 微服务 3.1 性能对比 指标 单体架构 微服务架构 差异分析 接口响应时间 23ms 50ms 微服务慢2倍(网络调用开销) QPS 5000 3000 微服务下降40%(网络+序列化) CPU使用率 60% 70% 微服务高10%(序列化+网络) 内存占用 2GB(单进程) 5GB(5进程) 微服务多2.5倍 启动时间 120s 30s/服务 微服务单个服务更快 关键发现: ...

2025-11-03 · maneng

从HashMap到Redis:分布式缓存的演进

一、引子:一个用户会话缓存的演进之路 假设你正在开发一个电商网站的用户会话管理功能。每次用户请求都需要验证身份,最初的实现是每次都查询数据库,但随着用户量增长,数据库压力越来越大。让我们看看这个功能如何一步步演进,从最简单的HashMap到最终的Redis分布式缓存。 1.1 场景0:无缓存(每次查数据库) 最直接的实现:每次请求都查询数据库验证用户身份。 @RestController public class UserController { @Autowired private UserRepository userRepository; /** * 获取用户信息(每次查数据库) * 问题:数据库压力大,响应慢 */ @GetMapping("/api/user/info") public UserVO getUserInfo(@RequestHeader("token") String token) { // 1. 根据token查询用户ID(查数据库) Long userId = tokenRepository.findUserIdByToken(token); if (userId == null) { throw new UnauthorizedException("未登录"); } // 2. 查询用户详细信息(查数据库) User user = userRepository.findById(userId); if (user == null) { throw new UserNotFoundException("用户不存在"); } return convertToVO(user); } } 性能数据: 指标 数值 说明 平均响应时间 50ms 2次SQL查询 QPS上限 1000 数据库连接池限制 数据库压力 100% 每次请求都查库 问题: ...

2025-11-03 · maneng

Spring Cloud第一性原理:从单体到微服务的架构演进

系列导航:本文是《Spring框架第一性原理》系列的第5篇 第1篇:为什么我们需要Spring框架? 第2篇:IoC容器:从手动new到自动装配的演进 第3篇:AOP:从代码重复到面向切面编程 第4篇:Spring Boot:约定优于配置的威力 第5篇:Spring Cloud:从单体到微服务的架构演进(本文) 引子:单体应用的困境 场景重现:一个电商系统的演进之路 让我们从一个真实的电商系统的成长历程说起。 第一阶段:创业初期(2015年) 团队规模:5人(2个后端、1个前端、1个产品、1个UI) 技术选型:单体架构 + Spring Boot 系统架构: ┌─────────────────────────────────────┐ │ 电商单体应用 │ │ (monolithic-ecommerce-app) │ ├─────────────────────────────────────┤ │ 用户模块 (UserModule) │ │ 商品模块 (ProductModule) │ │ 订单模块 (OrderModule) │ │ 库存模块 (InventoryModule) │ │ 支付模块 (PaymentModule) │ │ 物流模块 (LogisticsModule) │ │ 营销模块 (MarketingModule) │ ├─────────────────────────────────────┤ │ Spring Boot + MyBatis + MySQL │ └─────────────────────────────────────┘ ↓ MySQL数据库 代码结构: ecommerce-app/ ├── src/main/java/com/example/ │ ├── user/ # 用户模块 │ │ ├── UserController.java │ │ ├── UserService.java │ │ └── UserRepository.java │ ├── product/ # 商品模块 │ │ ├── ProductController.java │ │ ├── ProductService.java │ │ └── ProductRepository.java │ ├── order/ # 订单模块 │ │ ├── OrderController.java │ │ ├── OrderService.java │ │ └── OrderRepository.java │ ├── inventory/ # 库存模块 │ ├── payment/ # 支付模块 │ ├── logistics/ # 物流模块 │ └── marketing/ # 营销模块 └── pom.xml 典型业务流程(创建订单): ...

2025-11-03 · maneng

Redis第一性原理:为什么我们需要缓存?

一、引子:商品详情接口的性能进化之路 想象你正在开发一个电商平台的商品详情页面,每次用户访问都需要查询商品基本信息、品牌信息、类目信息、库存信息和商品图片。让我们看看三种不同的实现方式,以及它们各自的性能表现。 1.1 场景A:无缓存(直接查数据库) 这是最直接的实现方式:每次请求都查询数据库。 @Service public class ProductService { @Autowired private ProductRepository productRepository; @Autowired private BrandRepository brandRepository; @Autowired private CategoryRepository categoryRepository; @Autowired private InventoryRepository inventoryRepository; @Autowired private ProductImageRepository productImageRepository; /** * 查询商品详情(每次请求都查数据库) * 平均耗时:100ms * QPS上限:500 */ public ProductDetailVO getProductDetail(Long productId) { // 1. 查询商品基本信息(20ms) Product product = productRepository.findById(productId); if (product == null) { throw new ProductNotFoundException("商品不存在:" + productId); } // 2. 查询品牌信息(20ms) Brand brand = brandRepository.findById(product.getBrandId()); // 3. 查询类目信息(20ms) Category category = categoryRepository.findById(product.getCategoryId()); // 4. 查询库存信息(20ms) Inventory inventory = inventoryRepository.findByProductId(productId); // 5. 查询商品图片(20ms,可能有N+1查询问题) List<ProductImage> images = productImageRepository.findByProductId(productId); // 6. 组装返回对象 ProductDetailVO vo = new ProductDetailVO(); vo.setProductId(product.getId()); vo.setProductName(product.getName()); vo.setPrice(product.getPrice()); vo.setBrandName(brand.getName()); vo.setCategoryName(category.getName()); vo.setStock(inventory.getStock()); vo.setImages(images); return vo; } } 性能数据(压测工具:JMeter,1000并发): ...

2025-11-03 · maneng

从HashMap到Redis:分布式缓存的演进

一、引子:一个用户会话缓存的演进之路 假设你正在开发一个电商网站的用户会话管理功能。每次用户请求都需要验证身份,最初的实现是每次都查询数据库,但随着用户量增长,数据库压力越来越大。让我们看看这个功能如何一步步演进,从最简单的HashMap到最终的Redis分布式缓存。 1.1 场景0:无缓存(每次查数据库) 最直接的实现:每次请求都查询数据库验证用户身份。 @RestController public class UserController { @Autowired private UserRepository userRepository; /** * 获取用户信息(每次查数据库) * 问题:数据库压力大,响应慢 */ @GetMapping("/api/user/info") public UserVO getUserInfo(@RequestHeader("token") String token) { // 1. 根据token查询用户ID(查数据库) Long userId = tokenRepository.findUserIdByToken(token); if (userId == null) { throw new UnauthorizedException("未登录"); } // 2. 查询用户详细信息(查数据库) User user = userRepository.findById(userId); if (user == null) { throw new UserNotFoundException("用户不存在"); } return convertToVO(user); } } 性能数据: 指标 数值 说明 平均响应时间 50ms 2次SQL查询 QPS上限 1000 数据库连接池限制 数据库压力 100% 每次请求都查库 问题: ...

2025-11-03 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...