RocketMQ架构02:NameServer深度剖析 - 轻量级注册中心的设计哲学

引言:简单而不简陋 NameServer 是 RocketMQ 中最"简单"的组件,但简单不代表简陋。它用最少的代码实现了最核心的功能: 无状态、无持久化 不依赖任何第三方组件 完全内存操作 对等部署、无主从之分 今天,我们深入理解这个"简单而强大"的组件。 一、NameServer 核心数据结构 1.1 路由信息管理 public class RouteInfoManager { // 1. Topic → QueueData 映射 // 记录每个 Topic 在哪些 Broker 上有哪些 Queue private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; // QueueData 结构 class QueueData { private String brokerName; // Broker 名称 private int readQueueNums; // 读队列数量 private int writeQueueNums; // 写队列数量 private int perm; // 权限(2=写 4=读 6=读写) private int topicSynFlag; // 同步标识 } // 2. Broker 名称 → BrokerData 映射 // 记录 Broker 的集群信息和地址 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // BrokerData 结构 class BrokerData { private String cluster; // 所属集群 private String brokerName; // Broker 名称 private HashMap<Long/* brokerId */, String/* address */> brokerAddrs; // brokerId: 0=Master, >0=Slave } // 3. Broker 地址 → BrokerLiveInfo 映射 // 记录 Broker 的存活信息 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // BrokerLiveInfo 结构 class BrokerLiveInfo { private long lastUpdateTimestamp; // 最后更新时间 private DataVersion dataVersion; // 数据版本 private Channel channel; // Netty Channel private String haServerAddr; // HA 服务地址 } // 4. 集群名称 → Broker 名称集合 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; // 5. Broker 地址 → 过滤服务器列表 private final HashMap<String/* brokerAddr */, List<String/* filter server */>> filterServerTable; // 读写锁保护 private final ReadWriteLock lock = new ReentrantReadWriteLock(); } 1.2 数据结构关系图 数据结构关系: topicQueueTable ┌──────────────────────────────────────┐ │ Topic1 → [QueueData1, QueueData2] │ │ QueueData1: brokerName=broker-a │ │ QueueData2: brokerName=broker-b │ └──────────────────────────────────────┘ │ ▼ brokerAddrTable ┌──────────────────────────────────────┐ │ broker-a → BrokerData │ │ cluster: DefaultCluster │ │ brokerAddrs: │ │ 0 → 192.168.1.100:10911 (Master)│ │ 1 → 192.168.1.101:10911 (Slave) │ └──────────────────────────────────────┘ │ ▼ brokerLiveTable ┌──────────────────────────────────────┐ │ 192.168.1.100:10911 → BrokerLiveInfo│ │ lastUpdateTimestamp: 1699999999000 │ │ channel: [Netty Channel] │ └──────────────────────────────────────┘ 二、Broker 注册流程 2.1 注册请求处理 public class BrokerRegistration { public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { // 获取写锁 this.lock.writeLock().lockInterruptibly(); // 1. 更新集群信息 Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); // 2. 更新 Broker 地址表 boolean registerFirst = false; BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<>()); this.brokerAddrTable.put(brokerName, brokerData); } // 更新 Broker 地址 Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); // 3. 更新 Topic 配置 if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { // 只有 Master 才更新 Topic 配置 if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } // 4. 更新 Broker 存活信息 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put( brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr ) ); if (null == prevBrokerLiveInfo) { log.info("new broker registered: {}", brokerAddr); } // 5. 更新过滤服务器列表 if (filterServerList != null && !filterServerList.isEmpty()) { this.filterServerTable.put(brokerAddr, filterServerList); } // 6. 如果是 Slave,返回 Master 信息 if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddr); if (masterLiveInfo != null) { result.setHaServerAddr(masterLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } catch (Exception e) { log.error("registerBroker Exception", e); } finally { this.lock.writeLock().unlock(); } return result; } // 创建或更新 Queue 数据 private void createAndUpdateQueueData(String brokerName, TopicConfig topicConfig) { QueueData queueData = new QueueData(); queueData.setBrokerName(brokerName); queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); queueData.setReadQueueNums(topicConfig.getReadQueueNums()); queueData.setPerm(topicConfig.getPerm()); queueData.setTopicSynFlag(topicConfig.getTopicSysFlag()); List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName()); if (null == queueDataList) { queueDataList = new LinkedList<>(); this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList); log.info("new topic registered: {}", topicConfig.getTopicName()); } else { // 移除旧的数据 queueDataList.removeIf(qd -> qd.getBrokerName().equals(brokerName)); } queueDataList.add(queueData); } } 2.2 Broker 心跳机制 public class BrokerHeartbeat { // Broker 端:定时发送心跳 class BrokerSide { // 每 30 秒向所有 NameServer 注册一次 private final ScheduledExecutorService scheduledExecutorService; public void start() { this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { // 向所有 NameServer 注册 this.registerBrokerAll(true, false, true); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } }, 1000, 30000, TimeUnit.MILLISECONDS); } private void registerBrokerAll( final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { // 准备注册信息 TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); // 向每个 NameServer 注册 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { for (String namesrvAddr : nameServerAddressList) { try { RegisterBrokerResult result = this.registerBroker( namesrvAddr, oneway, timeoutMills, topicConfigWrapper ); if (result != null) { // 更新 Master 地址(Slave 使用) if (this.updateMasterHAServerAddrPeriodically && result.getHaServerAddr() != null) { this.messageStore.updateHaMasterAddress(result.getHaServerAddr()); } } } catch (Exception e) { log.warn("registerBroker to {} failed", namesrvAddr, e); } } } } } // NameServer 端:检测 Broker 超时 class NameServerSide { // 每 10 秒扫描一次 public void scanNotActiveBroker() { long timeout = 120000; // 2 分钟超时 Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> entry = it.next(); long last = entry.getValue().getLastUpdateTimestamp(); if ((last + timeout) < System.currentTimeMillis()) { // Broker 超时 RemotingUtil.closeChannel(entry.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", entry.getKey(), timeout); // 清理路由信息 this.onChannelDestroy(entry.getKey(), entry.getValue().getChannel()); } } } // 清理路由信息 public void onChannelDestroy(String remoteAddr, Channel channel) { String brokerAddrFound = null; if (channel != null) { try { this.lock.readLock().lockInterruptibly(); // 查找 Broker 地址 for (Entry<String, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) { if (entry.getValue().getChannel() == channel) { brokerAddrFound = entry.getKey(); break; } } } finally { this.lock.readLock().unlock(); } } if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } if (brokerAddrFound != null) { // 移除 Broker this.removeBroker(brokerAddrFound); } } } } 三、路由查询 3.1 获取 Topic 路由信息 public class RouteQuery { public TopicRouteData pickupTopicRouteData(final String topic) { TopicRouteData topicRouteData = new TopicRouteData(); boolean foundQueueData = false; boolean foundBrokerData = false; try { // 获取读锁 this.lock.readLock().lockInterruptibly(); // 1. 查找 Topic 的 Queue 信息 List<QueueData> queueDataList = this.topicQueueTable.get(topic); if (queueDataList != null) { topicRouteData.setQueueDatas(queueDataList); foundQueueData = true; // 2. 查找对应的 Broker 信息 Set<String> brokerNameSet = new HashSet<>(); for (QueueData qd : queueDataList) { brokerNameSet.add(qd.getBrokerName()); } for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { BrokerData brokerDataClone = new BrokerData( brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone() ); topicRouteData.getBrokerDatas().add(brokerDataClone); foundBrokerData = true; // 3. 查找过滤服务器 for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { List<String> filterServerList = this.filterServerTable.get(brokerAddr); if (filterServerList != null) { topicRouteData.getFilterServerTable().put(brokerAddr, filterServerList); } } } } } } catch (Exception e) { log.error("pickupTopicRouteData Exception", e); } finally { this.lock.readLock().unlock(); } if (foundQueueData && foundBrokerData) { return topicRouteData; } return null; } // TopicRouteData 结构 class TopicRouteData { private String orderTopicConf; // 顺序消息配置 private List<QueueData> queueDatas; // Queue 信息列表 private List<BrokerData> brokerDatas; // Broker 信息列表 private HashMap<String, List<String>> filterServerTable; // 过滤服务器 } } 3.2 Client 端路由更新 public class ClientRouteUpdate { // Producer/Consumer 定时更新路由信息 class RouteUpdateTask { // 每 30 秒更新一次 private ScheduledExecutorService scheduledExecutorService; public void start() { this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } }, 10, 30000, TimeUnit.MILLISECONDS); } private void updateTopicRouteInfoFromNameServer() { // 获取所有订阅的 Topic Set<String> topicSet = new HashSet<>(); // Producer 的 Topic topicSet.addAll(this.producerTable.keySet()); // Consumer 的 Topic for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { topicSet.addAll(entry.getValue().subscriptions()); } // 更新每个 Topic 的路由信息 for (String topic : topicSet) { this.updateTopicRouteInfoFromNameServer(topic); } } public boolean updateTopicRouteInfoFromNameServer(final String topic) { try { // 从 NameServer 获取路由信息 TopicRouteData topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 3000); if (topicRouteData != null) { // 对比版本,判断是否需要更新 TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataChanged(old, topicRouteData); if (changed) { // 更新本地路由表 this.topicRouteTable.put(topic, topicRouteData); // 更新 Producer 的路由信息 for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) { entry.getValue().updateTopicPublishInfo(topic, topicRouteData); } // 更新 Consumer 的路由信息 for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { entry.getValue().updateTopicSubscribeInfo(topic, topicRouteData); } log.info("topicRouteTable.put: Topic={}, RouteData={}", topic, topicRouteData); return true; } } } catch (Exception e) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } return false; } // 判断路由信息是否变化 private boolean topicRouteDataChanged(TopicRouteData olddata, TopicRouteData nowdata) { if (olddata == null || nowdata == null) { return true; } TopicRouteData old = olddata.cloneTopicRouteData(); TopicRouteData now = nowdata.cloneTopicRouteData(); Collections.sort(old.getQueueDatas()); Collections.sort(old.getBrokerDatas()); Collections.sort(now.getQueueDatas()); Collections.sort(now.getBrokerDatas()); return !old.equals(now); } } } 四、NameServer 高可用设计 4.1 对等部署架构 NameServer 集群架构: ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ │ (独立运行) │ │ (独立运行) │ │ (独立运行) │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │ │ └──────────────────┼──────────────────┘ │ ┌───────────▼──────────┐ │ │ │ Broker 集群 │ │ (向所有NS注册) │ │ │ └──────────────────────┘ │ ┌───────────▼──────────┐ │ │ │ Producer/Consumer │ │ (随机选择NS查询) │ │ │ └──────────────────────┘ 特点: 1. NameServer 之间无任何通信 2. 每个 NameServer 都是完整功能的独立节点 3. Broker 向所有 NameServer 注册 4. Client 随机选择 NameServer 查询 4.2 数据一致性保证 public class ConsistencyGuarantee { // 最终一致性 class EventualConsistency { /* NameServer 不保证强一致性,但保证最终一致性 场景1:Broker 注册 ┌─────────────────────────────────────────┐ │ T0: Broker 向 NS1 注册 │ │ T1: Broker 向 NS2 注册 │ │ T2: Broker 向 NS3 注册 │ │ │ │ 在 T0-T2 期间,三个 NS 数据不一致 │ │ T2 后,数据最终一致 │ └─────────────────────────────────────────┘ 场景2:Broker 宕机 ┌─────────────────────────────────────────┐ │ T0: Broker 宕机 │ │ T1: NS1 检测到超时(2分钟) │ │ T2: NS2 检测到超时 │ │ T3: NS3 检测到超时 │ │ │ │ 在检测到之前,可能有短暂不一致 │ │ 但影响有限(Client 会自动切换 Broker) │ └─────────────────────────────────────────┘ */ } // 为什么可以接受不一致? class WhyAcceptableInconsistency { /* 1. Client 有容错机制 - 如果从 NS1 获取的 Broker 不可用 - Client 会自动尝试其他 Broker 2. Broker 定时注册 - 每 30 秒注册一次 - 很快会同步到所有 NS 3. 影响范围小 - 只影响短时间的查询 - 不影响已建立的连接 4. 简化设计 - 无需复杂的一致性协议 - 性能更高 */ } } 4.3 故障处理 public class FailureHandling { // 单个 NameServer 故障 class SingleNameServerFailure { /* 场景:NS1 宕机 ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ │ (宕机) │ │ (正常) │ │ (正常) │ └──────────────┘ └──────┬───────┘ └──────┬───────┘ │ │ ┌───────────────┴──────────────────┘ │ ┌───────▼──────────┐ │ Broker 集群 │ │ - 向NS2/NS3注册 │ │ - NS1注册失败 │ └──────────────────┘ 影响: 1. Broker 向 NS1 注册失败(不影响NS2/NS3) 2. Client 如果查询 NS1 会失败,自动重试其他 NS 3. 总体可用性不受影响 */ } // 全部 NameServer 故障 class AllNameServerFailure { /* 场景:所有 NS 宕机 影响: 1. Broker 无法注册新的 Topic 2. Client 无法获取新的路由信息 3. 但已有连接仍可正常工作! 为什么还能工作? - Producer/Consumer 本地缓存了路由信息 - Broker 地址不会频繁变化 - 短时间内(几小时)仍可正常使用 恢复: - NameServer 恢复后 - Broker 重新注册 - Client 更新路由 - 自动恢复正常 */ } // Client 容错机制 class ClientFaultTolerance { public TopicRouteData getTopicRouteData(String topic) { // 尝试从不同的 NameServer 获取 List<String> nameServerList = this.remotingClient.getNameServerAddressList(); for (String nameServer : nameServerList) { try { TopicRouteData result = this.getTopicRouteDataFromNameServer(topic, nameServer, 3000); if (result != null) { return result; } } catch (Exception e) { log.warn("getTopicRouteData from {} failed", nameServer, e); // 继续尝试下一个 NameServer } } throw new MQClientException("No available NameServer"); } } } 五、性能优化 5.1 内存优化 public class MemoryOptimization { // 数据结构优化 class DataStructure { // 使用 HashMap 而非 ConcurrentHashMap // 通过读写锁控制并发,减少锁竞争 private final HashMap<String, List<QueueData>> topicQueueTable; private final ReadWriteLock lock = new ReentrantReadWriteLock(); // 读操作 public List<QueueData> getQueueData(String topic) { try { this.lock.readLock().lockInterruptibly(); return this.topicQueueTable.get(topic); } finally { this.lock.readLock().unlock(); } } // 写操作 public void putQueueData(String topic, List<QueueData> queueDataList) { try { this.lock.writeLock().lockInterruptibly(); this.topicQueueTable.put(topic, queueDataList); } finally { this.lock.writeLock().unlock(); } } } // 避免不必要的对象创建 class ObjectReuse { // 复用 BrokerData 对象 public BrokerData cloneBrokerData(BrokerData brokerData) { // 只克隆必要的字段 return new BrokerData( brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone() ); } } } 5.2 网络优化 public class NetworkOptimization { // 批量处理 class BatchProcessing { // Broker 一次注册所有 Topic // 而不是每个 Topic 单独注册 public void registerBroker(TopicConfigSerializeWrapper topicConfigWrapper) { // 包含所有 Topic 的配置 ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigWrapper.getTopicConfigTable(); // 一次性更新所有 Topic for (Map.Entry<String, TopicConfig> entry : topicConfigTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } // 连接复用 class ConnectionReuse { // Broker 和 NameServer 之间保持长连接 // 避免频繁建立连接 private Channel channel; // Netty Channel public void keepAlive() { // 定时发送心跳保持连接 this.scheduledExecutorService.scheduleAtFixedRate(() -> { this.registerBrokerAll(true, false, true); }, 1000, 30000, TimeUnit.MILLISECONDS); } } } 六、与 ZooKeeper 对比 6.1 设计理念对比 对比维度: ┌──────────────┬──────────────────┬──────────────────┐ │ 维度 │ NameServer │ ZooKeeper │ ├──────────────┼──────────────────┼──────────────────┤ │ CAP 选择 │ AP(可用性优先) │ CP(一致性优先) │ │ 数据持久化 │ 无(纯内存) │ 有(磁盘) │ │ 节点通信 │ 无 │ 有(Paxos/Raft) │ │ 部署复杂度 │ 低 │ 高 │ │ 运维复杂度 │ 低 │ 高 │ │ 性能 │ 高(内存操作) │ 中(磁盘IO) │ │ 一致性保证 │ 最终一致 │ 强一致 │ │ 故障恢复 │ 快 │ 慢(需要选举) │ └──────────────┴──────────────────┴──────────────────┘ 6.2 为什么 RocketMQ 选择 NameServer? public class WhyNameServer { class Reasons { /* 1. 需求不同 - RocketMQ 不需要强一致性 - 路由信息短暂不一致可以接受 - Client 有容错机制 2. 简化部署 - 无需额外依赖 - 降低运维成本 - 减少故障点 3. 性能优势 - 纯内存操作,性能更高 - 无磁盘 IO - 无网络同步开销 4. 可用性优势 - 无需选举,故障恢复快 - 任意节点可服务 - 整体可用性更高 5. 成本考虑 - 更少的机器资源 - 更低的学习成本 - 更少的运维工作 */ } } 七、最佳实践 7.1 部署建议 public class DeploymentBestPractices { // 1. 节点数量 class NodeCount { /* 推荐配置: - 生产环境:3 个节点 - 测试环境:1 个节点即可 原因: - 3 个节点足够高可用 - 过多节点增加 Broker 负担(需向每个节点注册) - 节省资源 */ } // 2. 硬件配置 class HardwareRequirement { /* 推荐配置: - CPU: 2 核即可 - 内存: 4GB 即可 - 磁盘: 无特殊要求(不持久化) - 网络: 千兆网卡 NameServer 非常轻量,硬件需求很低 */ } // 3. 网络规划 class NetworkPlanning { /* 建议: - 与 Broker 在同一机房 - 使用内网通信 - 避免跨公网访问 - 确保网络稳定性 */ } // 4. 启动参数 class StartupParameters { String startCommand = "nohup sh mqnamesrv " + "-Xms2g -Xmx2g -Xmn1g " + "-XX:+UseG1GC " + "-XX:G1HeapRegionSize=16m " + "-XX:+PrintGCDetails " + "-XX:+PrintGCDateStamps " + "-Xloggc:/dev/shm/mq_gc_%p.log &"; } } 7.2 监控建议 public class MonitoringBestPractices { // 1. 关键指标 class KeyMetrics { long registeredBrokerCount; // 注册的 Broker 数量 long registeredTopicCount; // 注册的 Topic 数量 long activeConnectionCount; // 活跃连接数 double routeQueryQPS; // 路由查询 QPS double brokerRegisterQPS; // Broker 注册 QPS } // 2. 告警规则 class AlertRules { /* 建议告警: 1. Broker 数量突然减少(可能宕机) 2. 路由查询失败率 > 1% 3. 内存使用率 > 80% 4. JVM GC 频繁 */ } } 八、总结 NameServer 的设计充分体现了"大道至简"的哲学: ...

2025-11-13 · maneng

RocketMQ架构01:整体架构设计 - NameServer、Broker、Producer、Consumer

引言:一个精妙的分布式系统 如果把 RocketMQ 比作一座现代化城市,那么: NameServer 是城市规划局,管理所有地图信息 Broker 是物流仓库,存储和转运货物 Producer 是发货方,把货物送到仓库 Consumer 是收货方,从仓库取货 今天,我们从整体视角理解这座"消息城市"的运作机制。 一、RocketMQ 整体架构 1.1 核心组件 RocketMQ 架构全景图: ┌────────────────────────────────────────────┐ │ NameServer Cluster │ │ (路由注册中心、服务发现、轻量级协调器) │ └────────┬───────────────────┬───────────────┘ │ │ 注册/心跳 │ │ 获取路由 │ │ ┌────────────▼──────┐ ┌────▼────────────────┐ │ │ │ │ │ Broker Cluster │ │ Producer/Consumer │ │ │ │ │ │ ┌───────────────┐ │ │ 应用程序 │ │ │ Master Broker │ │◄────┤ │ │ │ - CommitLog │ │ 读写 │ │ │ │ - ConsumeQ │ │ 消息 │ │ │ │ - IndexFile │ │ │ │ │ └───────┬───────┘ │ └─────────────────────┘ │ │ 主从同步 │ │ ┌───────▼───────┐ │ │ │ Slave Broker │ │ │ │ - CommitLog │ │ │ │ - ConsumeQ │ │ │ │ - IndexFile │ │ │ └───────────────┘ │ └───────────────────┘ 1.2 组件职责矩阵 组件职责对比: ┌──────────────┬────────────────────────────────────────┐ │ 组件 │ 核心职责 │ ├──────────────┼────────────────────────────────────────┤ │ NameServer │ - 路由信息管理(Topic → Broker 映射) │ │ │ - Broker 注册与心跳检测 │ │ │ - 提供路由信息查询 │ │ │ - 无状态、对等集群 │ ├──────────────┼────────────────────────────────────────┤ │ Broker │ - 消息存储(CommitLog) │ │ │ - 消息索引(ConsumeQueue、IndexFile) │ │ │ - 消息查询与消费 │ │ │ - 高可用(主从复制、Dledger) │ ├──────────────┼────────────────────────────────────────┤ │ Producer │ - 消息生产 │ │ │ - 消息发送(同步、异步、单向) │ │ │ - 负载均衡(选择Broker和Queue) │ │ │ - 故障规避 │ ├──────────────┼────────────────────────────────────────┤ │ Consumer │ - 消息消费 │ │ │ - 消息拉取(长轮询) │ │ │ - 负载均衡(Rebalance) │ │ │ - 消费进度管理 │ └──────────────┴────────────────────────────────────────┘ 二、NameServer 详解 2.1 为什么需要 NameServer public class WhyNameServer { // 问题1:Producer/Consumer 如何知道 Broker 在哪里? class ServiceDiscovery { // 没有 NameServer: // - 硬编码 Broker 地址(不灵活) // - 无法动态感知 Broker 变化 // 有了 NameServer: // - 动态获取 Broker 列表 // - 自动感知 Broker 上下线 } // 问题2:Topic 分布在哪些 Broker 上? class TopicRoute { // NameServer 维护 Topic 路由信息 class TopicRouteData { List<QueueData> queueDatas; // Queue 分布 List<BrokerData> brokerDatas; // Broker 信息 } } // 问题3:为什么不用 ZooKeeper? class WhyNotZooKeeper { /* ZooKeeper 的问题: 1. CP 系统(强一致性)- RocketMQ 只需 AP(可用性优先) 2. 重量级依赖 - NameServer 更轻量 3. 运维复杂 - NameServer 无状态更简单 4. 性能开销 - NameServer 内存操作更快 NameServer 的优势: 1. 完全无状态 2. 对等集群(任意节点提供服务) 3. 内存操作(无磁盘 IO) 4. 简单高效 */ } } 2.2 NameServer 核心功能 public class NameServerCore { // 1. 路由信息管理 class RouteInfoManager { // Topic → Broker 映射 private HashMap<String/* topic */, List<QueueData>> topicQueueTable; // Broker 名称 → Broker 数据 private HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // Broker 地址 → 集群信息 private HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // 集群名称 → Broker 名称列表 private HashMap<String/* clusterName */, Set<String>> clusterAddrTable; // 过滤服务器 private HashMap<String/* brokerAddr */, List<String>> filterServerTable; } // 2. Broker 注册 public RegisterBrokerResult registerBroker( String clusterName, String brokerAddr, String brokerName, long brokerId, String haServerAddr, TopicConfigSerializeWrapper topicConfigWrapper) { // 创建或更新 Broker 数据 BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { brokerData = new BrokerData(clusterName, brokerName, new HashMap<>()); this.brokerAddrTable.put(brokerName, brokerData); } brokerData.getBrokerAddrs().put(brokerId, brokerAddr); // 更新 Topic 路由信息 if (null != topicConfigWrapper) { for (Map.Entry<String, TopicConfig> entry : topicConfigWrapper.getTopicConfigTable().entrySet()) { createAndUpdateQueueData(brokerName, entry.getValue()); } } // 更新 Broker 存活信息 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put( brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr ) ); return new RegisterBrokerResult(...); } // 3. 路由查询 public TopicRouteData getTopicRouteData(String topic) { TopicRouteData topicRouteData = new TopicRouteData(); // 查找 Topic 的 Queue 信息 List<QueueData> queueDataList = this.topicQueueTable.get(topic); // 查找对应的 Broker 信息 Set<String> brokerNameSet = new HashSet<>(); for (QueueData qd : queueDataList) { brokerNameSet.add(qd.getBrokerName()); } // 组装路由数据 for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (brokerData != null) { topicRouteData.getBrokerDatas().add(brokerData.clone()); } } topicRouteData.setQueueDatas(queueDataList); return topicRouteData; } // 4. Broker 心跳检测 public void scanNotActiveBroker() { long timeoutMillis = 120000; // 2 分钟超时 for (Map.Entry<String, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) { long last = entry.getValue().getLastUpdateTimestamp(); if ((last + timeoutMillis) < System.currentTimeMillis()) { // Broker 超时,移除 RemotingUtil.closeChannel(entry.getValue().getChannel()); this.brokerLiveTable.remove(entry.getKey()); // 清理路由信息 this.onChannelDestroy(entry.getKey(), entry.getValue().getChannel()); log.warn("Broker timeout: {}", entry.getKey()); } } } } 2.3 NameServer 集群 public class NameServerCluster { // NameServer 特点 class Characteristics { /* 1. 完全无状态 - 不持久化任何数据 - 所有数据在内存中 - 重启后数据丢失(从 Broker 重新获取) 2. 对等部署 - 所有节点功能完全一致 - 无主从之分 - 无需选举 3. 不互相通信 - NameServer 之间无任何通信 - 数据可能短暂不一致 - 最终一致即可 4. Broker 全量注册 - Broker 向所有 NameServer 注册 - 保证数据冗余 */ } // 部署架构 class DeploymentArchitecture { /* NameServer 集群(推荐3个节点): ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ └────────────────┼────────────────┘ │ ┌───────▼────────┐ │ │ │ Broker 集群 │ │ (全量注册) │ │ │ └────────────────┘ */ } } 三、Broker 详解 3.1 Broker 核心架构 Broker 内部架构: ┌─────────────────────────────────────────────────────┐ │ Broker │ │ ┌────────────────────────────────────────────────┐│ │ │ Remoting 通信层 ││ │ │ (接收 Producer/Consumer 请求) ││ │ └────────┬───────────────────────────────────────┘│ │ │ │ │ ┌────────▼────────────────────────────┐ │ │ │ BrokerController │ │ │ │ - 处理器注册 │ │ │ │ - 定时任务管理 │ │ │ │ - 消息处理 │ │ │ └────────┬────────────────────────────┘ │ │ │ │ │ ┌────────▼─────────────┬───────────────────┐ │ │ │ MessageStore │ HAService │ │ │ │ ┌────────────────┐ │ (主从复制) │ │ │ │ │ CommitLog │ │ │ │ │ │ │ (消息存储) │ │ │ │ │ │ └────────────────┘ │ │ │ │ │ ┌────────────────┐ │ │ │ │ │ │ ConsumeQueue │ │ │ │ │ │ │ (消费索引) │ │ │ │ │ │ └────────────────┘ │ │ │ │ │ ┌────────────────┐ │ │ │ │ │ │ IndexFile │ │ │ │ │ │ │ (消息索引) │ │ │ │ │ │ └────────────────┘ │ │ │ │ └──────────────────────┴───────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ 定时任务 │ │ │ │ - 持久化 ConsumeQueue │ │ │ │ - 持久化 CommitLog │ │ │ │ - 清理过期文件 │ │ │ │ - 统计信息 │ │ │ └─────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────┘ 3.2 Broker 核心职责 public class BrokerCore { // 1. 消息接收与存储 class MessageReceive { public PutMessageResult putMessage(MessageExtBrokerInner msg) { // 写入 CommitLog PutMessageResult result = this.commitLog.putMessage(msg); // 触发刷盘 handleDiskFlush(result, msg); // 触发主从同步 handleHA(result, msg); return result; } } // 2. 消息查询 class MessageQuery { // 按消息 ID 查询 public MessageExt lookMessageByOffset(long commitLogOffset) { return this.commitLog.lookMessageByOffset(commitLogOffset); } // 按 Key 查询 public QueryMessageResult queryMessage(String topic, String key, int maxNum) { return this.indexService.queryMessage(topic, key, maxNum); } // 按时间查询 public QueryMessageResult queryMessageByTime(String topic, long begin, long end) { return this.messageStore.queryMessage(topic, begin, end, maxNum); } } // 3. 消息消费 class MessageConsume { public GetMessageResult getMessage( String group, String topic, int queueId, long offset, int maxMsgNums, SubscriptionData subscriptionData) { // 从 ConsumeQueue 查找消息位置 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); // 根据 offset 获取消息 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); // 从 CommitLog 读取消息内容 List<MessageExt> messageList = new ArrayList<>(); for (int i = 0; i < maxMsgNums; i++) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); MessageExt msg = lookMessageByOffset(offsetPy, sizePy); // 消息过滤 if (match(subscriptionData, msg)) { messageList.add(msg); } } return new GetMessageResult(messageList); } } // 4. Topic 创建 class TopicManagement { public void createTopic(String topic, int queueNums) { TopicConfig topicConfig = new TopicConfig(topic); topicConfig.setReadQueueNums(queueNums); topicConfig.setWriteQueueNums(queueNums); topicConfig.setPerm(6); // 读写权限 this.topicConfigTable.put(topic, topicConfig); // 创建 ConsumeQueue for (int i = 0; i < queueNums; i++) { ConsumeQueue logic = new ConsumeQueue(topic, i); this.consumeQueueTable.put(buildKey(topic, i), logic); } } } // 5. 消费进度管理 class ConsumerOffsetManagement { private ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable; public void commitOffset(String group, String topic, int queueId, long offset) { String key = topic + "@" + group; ConcurrentHashMap<Integer, Long> map = offsetTable.get(key); if (map != null) { map.put(queueId, offset); } } public long queryOffset(String group, String topic, int queueId) { String key = topic + "@" + group; ConcurrentHashMap<Integer, Long> map = offsetTable.get(key); if (map != null) { Long offset = map.get(queueId); return offset != null ? offset : -1; } return -1; } } } 3.3 Broker 高可用 public class BrokerHA { // 主从架构 class MasterSlave { /* 主从模式: ┌──────────────┐ 同步 ┌──────────────┐ │ Master Broker│ ──────────> │ Slave Broker │ │ - 读写 │ │ - 只读 │ └──────────────┘ └──────────────┘ 同步方式: 1. 同步复制(SYNC_MASTER): - Master 等待 Slave 同步完成才返回 - 可靠性高,性能稍差 2. 异步复制(ASYNC_MASTER): - Master 立即返回,后台异步同步 - 性能高,可能丢失少量数据 */ } // Dledger 模式 class DledgerMode { /* 基于 Raft 的自动选主: ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │ (Leader) │◄─▶│(Follower)│◄─▶│(Follower)│ └──────────┘ └──────────┘ └──────────┘ 特点: 1. 自动故障转移 2. 无需人工干预 3. 保证数据一致性 4. 至少3个节点 */ } } 四、Producer 工作原理 4.1 Producer 架构 public class ProducerArchitecture { // Producer 核心组件 class ProducerComponents { private MQClientInstance clientInstance; // 客户端实例 private MQClientAPIImpl mqClientAPI; // 通信API private TopicPublishInfo publishInfo; // Topic路由信息 private SendMessageHook sendHook; // 发送钩子 } // 发送流程 public SendResult send(Message msg) { // 1. 查找 Topic 路由信息 TopicPublishInfo publishInfo = tryToFindTopicPublishInfo(msg.getTopic()); // 2. 选择消息队列 MessageQueue mq = selectOneMessageQueue(publishInfo); // 3. 发送消息 SendResult result = sendKernelImpl(msg, mq); // 4. 更新故障规避信息 updateFaultItem(mq.getBrokerName(), result); return result; } } 4.2 Producer 负载均衡 public class ProducerLoadBalance { // 队列选择策略 public MessageQueue selectQueue(TopicPublishInfo tpInfo) { // 1. 简单轮询 int index = sendWhichQueue.incrementAndGet() % queueSize; return tpInfo.getMessageQueueList().get(index); // 2. 故障规避 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { return mq; } // 3. 选择延迟最小的 return latencyFaultTolerance.pickOneAtLeast(); } } 五、Consumer 工作原理 5.1 Consumer 架构 public class ConsumerArchitecture { // Consumer 核心组件 class ConsumerComponents { private RebalanceService rebalanceService; // 负载均衡服务 private PullMessageService pullMessageService; // 拉取服务 private ConsumeMessageService consumeService; // 消费服务 private OffsetStore offsetStore; // 进度存储 } // 消费流程 public void start() { // 1. 启动负载均衡 rebalanceService.start(); // 2. 启动拉取服务 pullMessageService.start(); // 3. 启动消费服务 consumeService.start(); // 4. 注册到 Broker registerConsumer(); } } 5.2 Consumer 负载均衡(Rebalance) public class ConsumerRebalance { public void doRebalance() { // 1. 获取 Topic 的所有 Queue List<MessageQueue> mqAll = getTopicQueues(topic); // 2. 获取同组的所有 Consumer List<String> cidAll = getConsumerList(group); // 3. 排序(保证一致性视图) Collections.sort(mqAll); Collections.sort(cidAll); // 4. 执行分配 List<MessageQueue> allocated = allocateStrategy.allocate( group, currentCID, mqAll, cidAll ); // 5. 更新本地队列 updateProcessQueue(allocated); } } 六、完整消息流转 6.1 发送消息完整流程 消息发送完整流程: ┌───────────┐ │ Producer │ └─────┬─────┘ │1. 查询路由 ▼ ┌───────────┐ │NameServer │ 返回路由信息(Broker列表、Queue信息) └─────┬─────┘ │ ▼ ┌───────────┐ │ Producer │ 2. 选择 Broker 和 Queue └─────┬─────┘ │3. 发送消息 ▼ ┌───────────┐ │ Broker │ 4. 写入 CommitLog │ ┌───────┐ │ │ │CommitLog│ │ └───┬───┘ │ │ │5. 异步构建 ConsumeQueue │ ┌───▼──────┐ │ │ConsumeQ │ │ └──────────┘ │ │6. 返回结果 └─────┼─────┘ ▼ ┌───────────┐ │ Producer │ 7. 收到发送结果 └───────────┘ 6.2 消费消息完整流程 消息消费完整流程: ┌───────────┐ │ Consumer │ └─────┬─────┘ │1. 注册到 Broker ▼ ┌───────────┐ │ Broker │ 保存 Consumer 信息 └─────┬─────┘ │2. Rebalance 分配 Queue ▼ ┌───────────┐ │ Consumer │ 3. 发起长轮询拉取 └─────┬─────┘ │ ▼ ┌───────────┐ │ Broker │ 4. 从 ConsumeQueue 查找 │ ┌───────┐ │ 5. 从 CommitLog 读取消息 │ │ConsumeQ│ │ │ └───┬───┘ │ │ │ │ │ ┌───▼────┐│ │ │CommitLog││ │ └────────┘│ │ │6. 返回消息 └─────┼─────┘ ▼ ┌───────────┐ │ Consumer │ 7. 处理消息 └─────┬─────┘ 8. 更新消费进度 │ ▼ ┌───────────┐ │ Broker │ 9. 保存消费进度 └───────────┘ 七、设计亮点与思考 7.1 设计亮点 public class DesignHighlights { // 1. NameServer 的轻量化设计 class LightweightNameServer { /* - 无状态,内存操作 - 对等部署,无需选举 - 简单高效 - AP 而非 CP */ } // 2. CommitLog 的顺序写 class SequentialWrite { /* - 所有消息顺序写入同一个文件 - 充分利用磁盘顺序写性能 - 避免随机 IO */ } // 3. ConsumeQueue 的索引设计 class IndexDesign { /* - 轻量级索引(20字节) - 快速定位消息位置 - 支持消费进度管理 */ } // 4. 长轮询的伪 Push class LongPolling { /* - Consumer 主动拉取 - Broker Hold 请求 - 兼顾实时性和可控性 */ } } 7.2 架构权衡 public class ArchitectureTradeoffs { // 权衡1:性能 vs 可靠性 class PerformanceVsReliability { // 异步刷盘 → 高性能,可能丢消息 // 同步刷盘 → 零丢失,性能下降 // 异步复制 → 高性能,可能丢消息 // 同步复制 → 高可靠,性能下降 } // 权衡2:一致性 vs 可用性 class ConsistencyVsAvailability { // NameServer 选择 AP // - 可能短暂不一致 // - 但保证高可用 // Dledger 模式选择 CP // - 保证数据一致 // - 需要多数节点存活 } // 权衡3:复杂度 vs 功能 class ComplexityVsFeature { // RocketMQ 比 Kafka 复杂 // - 但功能更丰富(事务、延迟、顺序) // RocketMQ 比 RabbitMQ 简单 // - 但性能更高 } } 八、总结 通过本篇,我们从整体视角理解了 RocketMQ 的架构: ...

2025-11-13 · maneng

RocketMQ入门10:SpringBoot集成实战 - 从配置到生产级应用

引言:将知识转化为实践 前面9篇,我们深入学习了 RocketMQ 的各个方面。现在,让我们通过一个完整的 SpringBoot 项目,把这些知识串联起来,构建一个真正的生产级应用。 这个项目将包含: 完整的项目结构 各种消息类型的实现 错误处理和重试机制 监控和告警 性能优化 部署方案 一、项目搭建 1.1 项目结构 rocketmq-springboot-demo/ ├── pom.xml ├── src/main/java/com/example/rocketmq/ │ ├── RocketMQApplication.java # 启动类 │ ├── config/ │ │ ├── RocketMQConfig.java # RocketMQ配置 │ │ ├── ProducerConfig.java # 生产者配置 │ │ └── ConsumerConfig.java # 消费者配置 │ ├── producer/ │ │ ├── OrderProducer.java # 订单消息生产者 │ │ ├── TransactionProducer.java # 事务消息生产者 │ │ └── DelayProducer.java # 延迟消息生产者 │ ├── consumer/ │ │ ├── OrderConsumer.java # 订单消息消费者 │ │ ├── TransactionConsumer.java # 事务消息消费者 │ │ └── DelayConsumer.java # 延迟消息消费者 │ ├── message/ │ │ ├── OrderMessage.java # 订单消息实体 │ │ └── MessageWrapper.java # 消息包装器 │ ├── service/ │ │ ├── OrderService.java # 订单服务 │ │ └── MessageService.java # 消息服务 │ ├── utils/ │ │ ├── MessageBuilder.java # 消息构建工具 │ │ └── TraceUtils.java # 链路追踪工具 │ └── listener/ │ ├── TransactionListenerImpl.java # 事务监听器 │ └── MessageListenerImpl.java # 消息监听器 ├── src/main/resources/ │ ├── application.yml # 配置文件 │ ├── application-dev.yml # 开发环境配置 │ ├── application-prod.yml # 生产环境配置 │ └── logback-spring.xml # 日志配置 └── src/test/java/ # 测试类 1.2 Maven 依赖 <!-- pom.xml --> <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.14</version> </parent> <groupId>com.example</groupId> <artifactId>rocketmq-springboot-demo</artifactId> <version>1.0.0</version> <properties> <java.version>8</java.version> <rocketmq-spring.version>2.2.3</rocketmq-spring.version> </properties> <dependencies> <!-- Spring Boot Starter --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- RocketMQ Spring Boot Starter --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${rocketmq-spring.version}</version> </dependency> <!-- 数据库相关 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <!-- Redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- 监控 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> </dependency> <!-- 工具类 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.34</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.20</version> </dependency> <!-- 测试 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project> 1.3 配置文件 # application.yml spring: application: name: rocketmq-springboot-demo # RocketMQ 配置 rocketmq: name-server: ${ROCKETMQ_NAMESRV:localhost:9876} producer: group: ${spring.application.name}_producer_group send-message-timeout: 3000 compress-message-body-threshold: 4096 max-message-size: 4194304 retry-times-when-send-failed: 2 retry-times-when-send-async-failed: 2 retry-next-server: true # 自定义配置 consumer: groups: order-consumer-group: topics: ORDER_TOPIC consumer-group: ${spring.application.name}_order_consumer consume-thread-max: 64 consume-thread-min: 20 transaction-consumer-group: topics: TRANSACTION_TOPIC consumer-group: ${spring.application.name}_transaction_consumer # 应用配置 app: rocketmq: # 是否启用事务消息 transaction-enabled: true # 是否启用消息轨迹 trace-enabled: true # 消息轨迹Topic trace-topic: RMQ_SYS_TRACE_TOPIC # ACL配置 acl-enabled: false access-key: ${ROCKETMQ_ACCESS_KEY:} secret-key: ${ROCKETMQ_SECRET_KEY:} # 监控配置 management: endpoints: web: exposure: include: health,info,metrics,prometheus metrics: export: prometheus: enabled: true 二、核心配置类 2.1 RocketMQ 配置 // RocketMQConfig.java @Configuration @EnableConfigurationProperties(RocketMQProperties.class) @Slf4j public class RocketMQConfig { @Autowired private RocketMQProperties rocketMQProperties; /** * 配置消息轨迹 */ @Bean @ConditionalOnProperty(prefix = "app.rocketmq", name = "trace-enabled", havingValue = "true") public DefaultMQProducer tracedProducer() { DefaultMQProducer producer = new DefaultMQProducer( rocketMQProperties.getProducer().getGroup(), true, // 启用消息轨迹 rocketMQProperties.getTraceTopic() ); configureProducer(producer); return producer; } /** * 配置事务消息生产者 */ @Bean @ConditionalOnProperty(prefix = "app.rocketmq", name = "transaction-enabled", havingValue = "true") public TransactionMQProducer transactionProducer( @Autowired TransactionListener transactionListener) { TransactionMQProducer producer = new TransactionMQProducer( rocketMQProperties.getProducer().getGroup() + "_TX" ); configureProducer(producer); // 设置事务监听器 producer.setTransactionListener(transactionListener); // 设置事务回查线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> { Thread thread = new Thread(r); thread.setName("transaction-check-thread"); return thread; } ); producer.setExecutorService(executor); return producer; } /** * 通用生产者配置 */ private void configureProducer(DefaultMQProducer producer) { producer.setNamesrvAddr(rocketMQProperties.getNameServer()); producer.setSendMsgTimeout(rocketMQProperties.getProducer().getSendMessageTimeout()); producer.setCompressMsgBodyOverHowmuch( rocketMQProperties.getProducer().getCompressMessageBodyThreshold() ); producer.setMaxMessageSize(rocketMQProperties.getProducer().getMaxMessageSize()); producer.setRetryTimesWhenSendFailed( rocketMQProperties.getProducer().getRetryTimesWhenSendFailed() ); // 配置ACL if (rocketMQProperties.isAclEnabled()) { producer.setAccessKey(rocketMQProperties.getAccessKey()); producer.setSecretKey(rocketMQProperties.getSecretKey()); } } /** * 消息发送模板 */ @Bean public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter converter) { RocketMQTemplate template = new RocketMQTemplate(); template.setMessageConverter(converter); return template; } /** * 自定义消息转换器 */ @Bean public RocketMQMessageConverter rocketMQMessageConverter() { return new CustomMessageConverter(); } } 2.2 自定义消息转换器 // CustomMessageConverter.java @Component public class CustomMessageConverter extends CompositeMessageConverter { private static final String ENCODING = "UTF-8"; @Override public org.springframework.messaging.Message<?> toMessage( Object payload, MessageHeaders headers) { // 添加追踪信息 Map<String, Object> enrichedHeaders = new HashMap<>(headers); enrichedHeaders.put("traceId", TraceUtils.getTraceId()); enrichedHeaders.put("timestamp", System.currentTimeMillis()); return super.toMessage(payload, new MessageHeaders(enrichedHeaders)); } @Override public Object fromMessage(org.springframework.messaging.Message<?> message, Class<?> targetClass) { // 自定义反序列化逻辑 if (targetClass == OrderMessage.class) { return JSON.parseObject( new String((byte[]) message.getPayload(), StandardCharsets.UTF_8), OrderMessage.class ); } return super.fromMessage(message, targetClass); } } 三、消息生产者实现 3.1 通用消息服务 // MessageService.java @Service @Slf4j public class MessageService { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送普通消息 */ public SendResult sendMessage(String topic, String tag, Object message) { String destination = buildDestination(topic, tag); try { Message<Object> springMessage = MessageBuilder .withPayload(message) .setHeader(RocketMQHeaders.KEYS, generateKey(message)) .setHeader("businessType", message.getClass().getSimpleName()) .build(); SendResult result = rocketMQTemplate.syncSend(destination, springMessage); log.info("消息发送成功: destination={}, msgId={}, sendStatus={}", destination, result.getMsgId(), result.getSendStatus()); // 记录监控指标 recordMetrics(topic, tag, true, 0); return result; } catch (Exception e) { log.error("消息发送失败: destination={}, error={}", destination, e.getMessage()); recordMetrics(topic, tag, false, 1); throw new RuntimeException("消息发送失败", e); } } /** * 发送延迟消息 */ public SendResult sendDelayMessage(String topic, String tag, Object message, int delayLevel) { String destination = buildDestination(topic, tag); Message<Object> springMessage = MessageBuilder .withPayload(message) .setHeader(RocketMQHeaders.KEYS, generateKey(message)) .build(); // delayLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h SendResult result = rocketMQTemplate.syncSend(destination, springMessage, rocketMQTemplate.getProducer().getSendMsgTimeout(), delayLevel); log.info("延迟消息发送成功: destination={}, delayLevel={}, msgId={}", destination, delayLevel, result.getMsgId()); return result; } /** * 发送顺序消息 */ public SendResult sendOrderlyMessage(String topic, String tag, Object message, String hashKey) { String destination = buildDestination(topic, tag); Message<Object> springMessage = MessageBuilder .withPayload(message) .setHeader(RocketMQHeaders.KEYS, generateKey(message)) .build(); SendResult result = rocketMQTemplate.syncSendOrderly( destination, springMessage, hashKey); log.info("顺序消息发送成功: destination={}, hashKey={}, msgId={}", destination, hashKey, result.getMsgId()); return result; } /** * 发送事务消息 */ public TransactionSendResult sendTransactionMessage( String topic, String tag, Object message, Object arg) { String destination = buildDestination(topic, tag); Message<Object> springMessage = MessageBuilder .withPayload(message) .setHeader(RocketMQHeaders.KEYS, generateKey(message)) .setHeader("transactionId", UUID.randomUUID().toString()) .build(); TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( destination, springMessage, arg); log.info("事务消息发送成功: destination={}, transactionId={}, sendStatus={}", destination, result.getTransactionId(), result.getSendStatus()); return result; } /** * 批量发送消息 */ public SendResult sendBatchMessage(String topic, String tag, List<?> messages) { String destination = buildDestination(topic, tag); List<Message> rocketMQMessages = messages.stream() .map(msg -> { Message message = new Message(); message.setTopic(topic); message.setTags(tag); message.setKeys(generateKey(msg)); message.setBody(JSON.toJSONBytes(msg)); return message; }) .collect(Collectors.toList()); SendResult result = rocketMQTemplate.syncSend(destination, rocketMQMessages); log.info("批量消息发送成功: destination={}, count={}, msgId={}", destination, messages.size(), result.getMsgId()); return result; } /** * 异步发送消息 */ public void sendAsyncMessage(String topic, String tag, Object message, SendCallback sendCallback) { String destination = buildDestination(topic, tag); Message<Object> springMessage = MessageBuilder .withPayload(message) .setHeader(RocketMQHeaders.KEYS, generateKey(message)) .build(); rocketMQTemplate.asyncSend(destination, springMessage, sendCallback); log.info("异步消息已提交: destination={}", destination); } private String buildDestination(String topic, String tag) { return StringUtils.isBlank(tag) ? topic : topic + ":" + tag; } private String generateKey(Object message) { if (message instanceof BaseMessage) { return ((BaseMessage) message).getBusinessId(); } return UUID.randomUUID().toString(); } private void recordMetrics(String topic, String tag, boolean success, int failCount) { // 记录Prometheus指标 if (success) { Metrics.counter("rocketmq.message.sent", "topic", topic, "tag", tag, "status", "success" ).increment(); } else { Metrics.counter("rocketmq.message.sent", "topic", topic, "tag", tag, "status", "fail" ).increment(failCount); } } } 3.2 订单消息生产者 // OrderProducer.java @Component @Slf4j public class OrderProducer { @Autowired private MessageService messageService; /** * 发送订单创建消息 */ public void sendOrderCreateMessage(Order order) { OrderMessage message = OrderMessage.builder() .orderId(order.getOrderId()) .userId(order.getUserId()) .amount(order.getAmount()) .status(OrderStatus.CREATED) .createTime(new Date()) .build(); messageService.sendMessage("ORDER_TOPIC", "ORDER_CREATE", message); } /** * 发送订单支付消息(事务消息) */ public void sendOrderPaymentMessage(Order order, PaymentInfo paymentInfo) { OrderMessage message = OrderMessage.builder() .orderId(order.getOrderId()) .userId(order.getUserId()) .amount(order.getAmount()) .status(OrderStatus.PAID) .paymentInfo(paymentInfo) .build(); // 发送事务消息,确保订单更新和消息发送的原子性 messageService.sendTransactionMessage( "ORDER_TOPIC", "ORDER_PAYMENT", message, order // 传递订单对象作为事务回查参数 ); } /** * 发送订单状态变更消息(顺序消息) */ public void sendOrderStatusMessage(Order order, OrderStatus newStatus) { OrderMessage message = OrderMessage.builder() .orderId(order.getOrderId()) .userId(order.getUserId()) .status(newStatus) .updateTime(new Date()) .build(); // 使用订单ID作为顺序key,确保同一订单的消息顺序 messageService.sendOrderlyMessage( "ORDER_TOPIC", "ORDER_STATUS_" + newStatus.name(), message, order.getOrderId() ); } /** * 发送订单超时取消消息(延迟消息) */ public void sendOrderTimeoutMessage(Order order) { OrderMessage message = OrderMessage.builder() .orderId(order.getOrderId()) .userId(order.getUserId()) .status(OrderStatus.TIMEOUT) .build(); // 30分钟后发送超时消息(延迟级别16 = 30m) messageService.sendDelayMessage( "ORDER_TOPIC", "ORDER_TIMEOUT", message, 16 ); } } 四、消息消费者实现 4.1 订单消息消费者 // OrderConsumer.java @Component @Slf4j @RocketMQMessageListener( topic = "ORDER_TOPIC", consumerGroup = "order_consumer_group", messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY, consumeThreadMax = 64, consumeThreadMin = 20 ) public class OrderConsumer implements RocketMQListener<OrderMessage>, RocketMQPushConsumerLifecycleListener { @Autowired private OrderService orderService; @Autowired private RedisTemplate<String, Object> redisTemplate; private final AtomicLong consumeCount = new AtomicLong(0); private final AtomicLong errorCount = new AtomicLong(0); @Override public void onMessage(OrderMessage message) { long count = consumeCount.incrementAndGet(); log.info("消费订单消息: orderId={}, status={}, count={}", message.getOrderId(), message.getStatus(), count); try { // 幂等性检查 if (isDuplicate(message)) { log.warn("重复消息,跳过处理: orderId={}", message.getOrderId()); return; } // 处理消息 processMessage(message); // 记录已处理 markAsProcessed(message); // 记录监控指标 recordSuccessMetrics(message); } catch (Exception e) { errorCount.incrementAndGet(); log.error("消息处理失败: orderId={}, error={}", message.getOrderId(), e.getMessage(), e); // 记录错误指标 recordErrorMetrics(message); // 重新抛出异常,触发重试 throw new RuntimeException(e); } } /** * 幂等性检查 */ private boolean isDuplicate(OrderMessage message) { String key = "msg:processed:" + message.getOrderId(); Boolean result = redisTemplate.opsForValue().setIfAbsent( key, "1", 24, TimeUnit.HOURS); return !Boolean.TRUE.equals(result); } /** * 处理消息 */ private void processMessage(OrderMessage message) { switch (message.getStatus()) { case CREATED: orderService.handleOrderCreated(message); break; case PAID: orderService.handleOrderPaid(message); break; case SHIPPED: orderService.handleOrderShipped(message); break; case COMPLETED: orderService.handleOrderCompleted(message); break; case CANCELLED: orderService.handleOrderCancelled(message); break; case TIMEOUT: orderService.handleOrderTimeout(message); break; default: log.warn("未知订单状态: {}", message.getStatus()); } } /** * 标记消息已处理 */ private void markAsProcessed(OrderMessage message) { String key = "msg:processed:" + message.getOrderId(); redisTemplate.opsForValue().set(key, JSON.toJSONString(message), 7, TimeUnit.DAYS); } /** * 记录成功指标 */ private void recordSuccessMetrics(OrderMessage message) { Metrics.counter("order.message.consumed", "status", message.getStatus().name(), "result", "success" ).increment(); } /** * 记录错误指标 */ private void recordErrorMetrics(OrderMessage message) { Metrics.counter("order.message.consumed", "status", message.getStatus().name(), "result", "error" ).increment(); } @Override public void prepareStart(DefaultMQPushConsumer consumer) { // 消费者启动前的准备工作 log.info("订单消费者准备启动: group={}", consumer.getConsumerGroup()); // 设置消费位点 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 设置消费超时(默认15分钟) consumer.setConsumeTimeout(15); // 设置最大重试次数 consumer.setMaxReconsumeTimes(3); // 设置消息过滤 try { consumer.subscribe("ORDER_TOPIC", MessageSelector.bySql("status in ('PAID', 'SHIPPED')")); } catch (Exception e) { log.error("设置消息过滤失败", e); } } } 4.2 事务消息监听器 // TransactionListenerImpl.java @Component @Slf4j public class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Autowired private OrderService orderService; @Autowired private TransactionLogService transactionLogService; /** * 执行本地事务 */ @Override public RocketMQLocalTransactionState executeLocalTransaction( Message msg, Object arg) { String transactionId = msg.getHeaders().get("transactionId", String.class); log.info("执行本地事务: transactionId={}", transactionId); try { // 解析消息 OrderMessage orderMessage = JSON.parseObject( new String((byte[]) msg.getPayload()), OrderMessage.class ); // 执行本地事务 Order order = (Order) arg; orderService.updateOrderStatus(order, OrderStatus.PAID); // 记录事务日志 transactionLogService.save( transactionId, orderMessage.getOrderId(), TransactionStatus.COMMIT ); log.info("本地事务执行成功: orderId={}", orderMessage.getOrderId()); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error("本地事务执行失败: transactionId={}", transactionId, e); // 记录失败 transactionLogService.save( transactionId, null, TransactionStatus.ROLLBACK ); return RocketMQLocalTransactionState.ROLLBACK; } } /** * 事务回查 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String transactionId = msg.getHeaders().get("transactionId", String.class); log.info("事务回查: transactionId={}", transactionId); // 查询事务日志 TransactionLog log = transactionLogService.findByTransactionId(transactionId); if (log == null) { log.warn("事务日志不存在: transactionId={}", transactionId); return RocketMQLocalTransactionState.UNKNOWN; } // 根据事务状态返回 switch (log.getStatus()) { case COMMIT: return RocketMQLocalTransactionState.COMMIT; case ROLLBACK: return RocketMQLocalTransactionState.ROLLBACK; default: return RocketMQLocalTransactionState.UNKNOWN; } } } 五、错误处理和重试 5.1 消费失败处理 // RetryMessageHandler.java @Component @Slf4j public class RetryMessageHandler { @Autowired private MessageService messageService; @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 处理消费失败的消息 */ public void handleConsumeError(Message message, Exception error) { String messageId = message.getHeaders().get(RocketMQHeaders.MESSAGE_ID, String.class); String topic = message.getHeaders().get(RocketMQHeaders.TOPIC, String.class); // 记录重试次数 String retryKey = "retry:count:" + messageId; Long retryCount = redisTemplate.opsForValue().increment(retryKey); redisTemplate.expire(retryKey, 1, TimeUnit.DAYS); log.error("消息消费失败: messageId={}, topic={}, retryCount={}, error={}", messageId, topic, retryCount, error.getMessage()); // 根据重试次数决定处理策略 if (retryCount <= 3) { // 延迟重试 retryWithDelay(message, retryCount); } else if (retryCount <= 5) { // 发送到重试队列 sendToRetryQueue(message); } else { // 发送到死信队列 sendToDeadLetterQueue(message, error); } } /** * 延迟重试 */ private void retryWithDelay(Message message, Long retryCount) { // 计算延迟级别:第1次10s,第2次30s,第3次1m int delayLevel = retryCount.intValue() * 2 + 1; messageService.sendDelayMessage( message.getHeaders().get(RocketMQHeaders.TOPIC, String.class), "RETRY", message.getPayload(), delayLevel ); log.info("消息已发送到延迟重试队列: messageId={}, delayLevel={}", message.getHeaders().get(RocketMQHeaders.MESSAGE_ID), delayLevel); } /** * 发送到重试队列 */ private void sendToRetryQueue(Message message) { String retryTopic = "%RETRY%" + message.getHeaders().get(RocketMQHeaders.CONSUMER_GROUP); messageService.sendMessage(retryTopic, "RETRY", message.getPayload()); log.info("消息已发送到重试队列: topic={}", retryTopic); } /** * 发送到死信队列 */ private void sendToDeadLetterQueue(Message message, Exception error) { String dlqTopic = "%DLQ%" + message.getHeaders().get(RocketMQHeaders.CONSUMER_GROUP); // 添加错误信息 Map<String, Object> headers = new HashMap<>(message.getHeaders()); headers.put("errorMessage", error.getMessage()); headers.put("errorTime", new Date()); Message<Object> dlqMessage = MessageBuilder .withPayload(message.getPayload()) .copyHeaders(headers) .build(); messageService.sendMessage(dlqTopic, "DLQ", dlqMessage); log.error("消息已发送到死信队列: topic={}", dlqTopic); // 发送告警 sendAlert(message, error); } /** * 发送告警 */ private void sendAlert(Message message, Exception error) { // 发送告警通知(邮件、短信、钉钉等) String alertMessage = String.format( "消息处理失败告警\n" + "Topic: %s\n" + "MessageId: %s\n" + "Error: %s\n" + "Time: %s", message.getHeaders().get(RocketMQHeaders.TOPIC), message.getHeaders().get(RocketMQHeaders.MESSAGE_ID), error.getMessage(), new Date() ); // 这里可以集成告警系统 log.error("告警: {}", alertMessage); } } 六、监控和运维 6.1 健康检查 // RocketMQHealthIndicator.java @Component public class RocketMQHealthIndicator implements HealthIndicator { @Autowired private RocketMQTemplate rocketMQTemplate; @Override public Health health() { try { // 检查 NameServer 连接 DefaultMQProducer producer = rocketMQTemplate.getProducer(); producer.getDefaultMQProducerImpl().getmQClientFactory() .getMQClientAPIImpl().getNameServerAddressList(); // 检查 Broker 连接 Collection<String> topics = producer.getDefaultMQProducerImpl() .getmQClientFactory().getTopicRouteTable().keySet(); return Health.up() .withDetail("nameServer", producer.getNamesrvAddr()) .withDetail("producerGroup", producer.getProducerGroup()) .withDetail("topics", topics) .build(); } catch (Exception e) { return Health.down() .withDetail("error", e.getMessage()) .build(); } } } 6.2 监控指标收集 // RocketMQMetrics.java @Component @Slf4j public class RocketMQMetrics { private final MeterRegistry meterRegistry; private final Map<String, AtomicLong> counters = new ConcurrentHashMap<>(); public RocketMQMetrics(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; initMetrics(); } private void initMetrics() { // 发送成功计数 Gauge.builder("rocketmq.producer.send.success", counters, map -> map.getOrDefault("send.success", new AtomicLong()).get()) .register(meterRegistry); // 发送失败计数 Gauge.builder("rocketmq.producer.send.failure", counters, map -> map.getOrDefault("send.failure", new AtomicLong()).get()) .register(meterRegistry); // 消费成功计数 Gauge.builder("rocketmq.consumer.consume.success", counters, map -> map.getOrDefault("consume.success", new AtomicLong()).get()) .register(meterRegistry); // 消费失败计数 Gauge.builder("rocketmq.consumer.consume.failure", counters, map -> map.getOrDefault("consume.failure", new AtomicLong()).get()) .register(meterRegistry); // 消息堆积数 Gauge.builder("rocketmq.consumer.lag", this, RocketMQMetrics::getConsumerLag) .register(meterRegistry); } public void recordSendSuccess() { counters.computeIfAbsent("send.success", k -> new AtomicLong()).incrementAndGet(); } public void recordSendFailure() { counters.computeIfAbsent("send.failure", k -> new AtomicLong()).incrementAndGet(); } public void recordConsumeSuccess() { counters.computeIfAbsent("consume.success", k -> new AtomicLong()).incrementAndGet(); } public void recordConsumeFailure() { counters.computeIfAbsent("consume.failure", k -> new AtomicLong()).incrementAndGet(); } private double getConsumerLag() { // 实现获取消费延迟的逻辑 return 0.0; } } 七、部署和运维 7.1 Docker 部署 # docker-compose.yml version: '3.8' services: app: build: . container_name: rocketmq-springboot-demo environment: - SPRING_PROFILES_ACTIVE=prod - ROCKETMQ_NAMESRV=rocketmq-namesrv:9876 - JAVA_OPTS=-Xms1G -Xmx1G -XX:+UseG1GC ports: - "8080:8080" - "9090:9090" # Prometheus metrics depends_on: - rocketmq-namesrv - rocketmq-broker - redis - mysql networks: - rocketmq-network rocketmq-namesrv: image: apache/rocketmq:4.9.4 container_name: rocketmq-namesrv command: sh mqnamesrv ports: - "9876:9876" networks: - rocketmq-network rocketmq-broker: image: apache/rocketmq:4.9.4 container_name: rocketmq-broker command: sh mqbroker -n rocketmq-namesrv:9876 ports: - "10909:10909" - "10911:10911" networks: - rocketmq-network redis: image: redis:6.2 container_name: redis ports: - "6379:6379" networks: - rocketmq-network mysql: image: mysql:8.0 container_name: mysql environment: - MYSQL_ROOT_PASSWORD=root123 - MYSQL_DATABASE=rocketmq_demo ports: - "3306:3306" networks: - rocketmq-network networks: rocketmq-network: driver: bridge 7.2 Kubernetes 部署 # deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: rocketmq-springboot-demo namespace: default spec: replicas: 3 selector: matchLabels: app: rocketmq-springboot-demo template: metadata: labels: app: rocketmq-springboot-demo spec: containers: - name: app image: rocketmq-springboot-demo:latest ports: - containerPort: 8080 - containerPort: 9090 env: - name: SPRING_PROFILES_ACTIVE value: "prod" - name: ROCKETMQ_NAMESRV value: "rocketmq-namesrv-service:9876" - name: JAVA_OPTS value: "-Xms1G -Xmx1G -XX:+UseG1GC" resources: requests: memory: "1Gi" cpu: "500m" limits: memory: "2Gi" cpu: "1000m" livenessProbe: httpGet: path: /actuator/health port: 8080 initialDelaySeconds: 60 periodSeconds: 10 readinessProbe: httpGet: path: /actuator/health port: 8080 initialDelaySeconds: 30 periodSeconds: 5 --- apiVersion: v1 kind: Service metadata: name: rocketmq-springboot-demo-service spec: selector: app: rocketmq-springboot-demo ports: - name: http port: 8080 targetPort: 8080 - name: metrics port: 9090 targetPort: 9090 type: LoadBalancer 八、总结 通过这个完整的 SpringBoot 集成示例,我们实现了: ...

2025-11-13 · maneng

RocketMQ入门09:消费组机制 - 负载均衡与消息分发

引言:一个精妙的协调机制 假设你经营一家餐厅,有10个服务员(Consumer),需要服务100桌客人(Message Queue): 如何公平分配?每个服务员负责10桌? 如果有服务员请假,谁来接管他的桌子? 如果来了新服务员,如何重新分配? 如何记住每桌的服务进度? 这就是 RocketMQ Consumer Group 要解决的问题。 一、Consumer Group 核心概念 1.1 什么是 Consumer Group public class ConsumerGroupConcept { // Consumer Group 定义 class ConsumerGroup { String groupName; // 组名(全局唯一) List<String> consumerIdList; // 组内消费者列表 SubscriptionData subscriptionData; // 订阅信息 ConsumeType consumeType; // 消费模式 MessageModel messageModel; // 消息模式(集群/广播) ConsumeFromWhere consumeFromWhere; // 从何处开始消费 } // 核心规则 class CoreRules { // 1. 同一 Group 内的 Consumer 订阅关系必须一致 // 2. 同一 Group 内的 Consumer 均分消息 // 3. 不同 Group 相互独立,都能收到全量消息 // 4. Group 是消费进度管理的单位 } } 1.2 Consumer Group 的设计目标 public class DesignGoals { // 目标1:负载均衡 void loadBalancing() { // 10个 Queue,5个 Consumer // 每个 Consumer 负责 2个 Queue // 自动、公平、动态 } // 目标2:高可用 void highAvailability() { // Consumer 宕机,其他 Consumer 接管 // 无消息丢失 // 自动故障转移 } // 目标3:水平扩展 void horizontalScaling() { // 动态增加 Consumer // 自动重新分配 // 无需停机 } // 目标4:消费进度管理 void progressManagement() { // 统一管理消费位点 // 支持重置进度 // 持久化存储 } } 二、Rebalance 机制详解 2.1 什么是 Rebalance public class RebalanceMechanism { // Rebalance:重新分配 Queue 给 Consumer class RebalanceDefinition { // 触发时机: // 1. Consumer 上线/下线 // 2. Queue 数量变化 // 3. 订阅关系变化 // 目标: // 保证每个 Queue 只被一个 Consumer 消费 // 尽可能均匀分配 } // Rebalance 流程 class RebalanceFlow { public void doRebalance() { // 1. 获取 Topic 的所有 Queue List<MessageQueue> mqAll = getTopicQueues(topic); // 2. 获取 Consumer Group 的所有 Consumer List<String> cidAll = getConsumerIdList(topic, group); // 3. 排序保证一致性 Collections.sort(mqAll); Collections.sort(cidAll); // 4. 执行分配算法 List<MessageQueue> allocateResult = strategy.allocate( consumerGroup, currentCID, mqAll, cidAll ); // 5. 更新本地队列 updateProcessQueueTable(allocateResult); } } } 2.2 Rebalance 触发流程 Rebalance 触发流程: Consumer 加入: ┌──────────┐ 注册 ┌──────────┐ │Consumer1 │ ───────────────> │ Broker │ └──────────┘ └──────────┘ │ 通知其他Consumer ↓ ┌──────────┐ ┌──────────┐ │Consumer2 │ <──── Rebalance ─│Consumer3 │ └──────────┘ └──────────┘ 时间线: T0: Consumer1 加入 Group T1: Broker 通知所有 Consumer T2: 所有 Consumer 开始 Rebalance T3: 完成 Queue 重新分配 2.3 Rebalance 实现细节 public class RebalanceImplementation { // Consumer 端 Rebalance 服务 class RebalanceService extends ServiceThread { @Override public void run() { while (!this.isStopped()) { // 默认每 20 秒执行一次 this.waitForRunning(20000); this.doRebalance(); } } private void doRebalance() { // 对所有订阅的 Topic 执行 Rebalance for (Map.Entry<String, SubscriptionData> entry : subscriptionInner.entrySet()) { String topic = entry.getKey(); rebalanceByTopic(topic); } } private void rebalanceByTopic(String topic) { // 1. 获取 Topic 的队列信息 Set<MessageQueue> mqSet = topicSubscribeInfoTable.get(topic); // 2. 获取同组的所有 Consumer ID List<String> cidAll = mQClientFactory.findConsumerIdList( topic, consumerGroup ); // 3. 执行分配 if (mqSet != null && cidAll != null) { AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), new ArrayList<>(mqSet), cidAll ); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception", e); } // 4. 更新分配结果 updateProcessQueueTableInRebalance(topic, allocateResult); } } } // 更新本地队列表 private void updateProcessQueueTableInRebalance(String topic, List<MessageQueue> mqSet) { // 移除不再分配给自己的队列 for (MessageQueue mq : processQueueTable.keySet()) { if (!mqSet.contains(mq)) { ProcessQueue pq = processQueueTable.remove(mq); if (pq != null) { pq.setDropped(true); // 标记为丢弃 log.info("Drop queue: {}", mq); } } } // 添加新分配的队列 for (MessageQueue mq : mqSet) { if (!processQueueTable.containsKey(mq)) { ProcessQueue pq = new ProcessQueue(); processQueueTable.put(mq, pq); // 创建拉取请求 PullRequest pullRequest = new PullRequest(); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequest.setNextOffset(computePullFromWhere(mq)); // 立即执行拉取 pullMessageService.executePullRequestImmediately(pullRequest); log.info("Add new queue: {}", mq); } } } } 三、负载均衡策略 3.1 内置分配策略 public class AllocationStrategies { // 1. 平均分配策略(默认) class AllocateMessageQueueAveragely { /* 示例:8个Queue,3个Consumer Consumer0: Q0, Q1, Q2 Consumer1: Q3, Q4, Q5 Consumer2: Q6, Q7 */ public List<MessageQueue> allocate(String group, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = (mqAll.size() <= cidAll.size()) ? 1 : (mod > 0 && index < mod) ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size(); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); return mqAll.subList(startIndex, startIndex + range); } } // 2. 环形平均分配 class AllocateMessageQueueAveragelyByCircle { /* 示例:8个Queue,3个Consumer Consumer0: Q0, Q3, Q6 Consumer1: Q1, Q4, Q7 Consumer2: Q2, Q5 */ public List<MessageQueue> allocate(String group, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { int index = cidAll.indexOf(currentCID); List<MessageQueue> result = new ArrayList<>(); for (int i = index; i < mqAll.size(); i += cidAll.size()) { result.add(mqAll.get(i)); } return result; } } // 3. 一致性Hash分配 class AllocateMessageQueueConsistentHash { /* 使用一致性Hash算法 优点:Consumer变化时,影响的Queue最少 */ private final int virtualNodeCnt; private final HashFunction hashFunc; public List<MessageQueue> allocate(String group, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { // 构建Hash环 TreeMap<Long, String> hashRing = new TreeMap<>(); for (String cid : cidAll) { for (int i = 0; i < virtualNodeCnt; i++) { long hash = hashFunc.hash(cid + "#" + i); hashRing.put(hash, cid); } } // 分配Queue List<MessageQueue> result = new ArrayList<>(); for (MessageQueue mq : mqAll) { long hash = hashFunc.hash(mq.toString()); Map.Entry<Long, String> entry = hashRing.ceilingEntry(hash); if (entry == null) { entry = hashRing.firstEntry(); } if (currentCID.equals(entry.getValue())) { result.add(mq); } } return result; } } // 4. 机房就近分配 class AllocateMessageQueueByMachineRoom { /* 根据机房分配,优先同机房 减少跨机房流量 */ public List<MessageQueue> allocate(String group, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { // 获取当前Consumer的机房 String currentRoom = getMachineRoom(currentCID); // 同机房的Queue List<MessageQueue> sameRoomQueues = new ArrayList<>(); // 其他机房的Queue List<MessageQueue> diffRoomQueues = new ArrayList<>(); for (MessageQueue mq : mqAll) { if (currentRoom.equals(getMachineRoom(mq.getBrokerName()))) { sameRoomQueues.add(mq); } else { diffRoomQueues.add(mq); } } // 优先分配同机房Queue List<MessageQueue> result = new ArrayList<>(); result.addAll(allocateQueues(sameRoomQueues, currentCID, cidAll)); // 如果还需要更多Queue,分配其他机房的 if (result.size() < getTargetQueueNum(mqAll.size(), cidAll.size())) { result.addAll(allocateQueues(diffRoomQueues, currentCID, cidAll)); } return result; } } // 5. 自定义分配策略 class CustomAllocationStrategy implements AllocateMessageQueueStrategy { @Override public List<MessageQueue> allocate(String group, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { // 根据业务需求自定义 // 例如:VIP Consumer 分配更多Queue if (isVipConsumer(currentCID)) { // VIP分配60%的Queue return allocateForVip(mqAll, currentCID, cidAll); } else { // 普通Consumer平分剩余40% return allocateForNormal(mqAll, currentCID, cidAll); } } } } 3.2 策略选择指南 public class StrategySelectionGuide { public void selectStrategy() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 场景1:默认场景 // 使用平均分配,简单公平 consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueAveragely() ); // 场景2:Consumer频繁变动 // 使用一致性Hash,减少重分配 consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueConsistentHash() ); // 场景3:多机房部署 // 使用机房就近,减少跨机房流量 consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueByMachineRoom() ); // 场景4:特殊业务需求 // 自定义策略 consumer.setAllocateMessageQueueStrategy( new CustomAllocationStrategy() ); } } 四、消费进度管理 4.1 消费进度存储 public class ConsumeProgressStorage { // 集群模式:进度存储在 Broker class ClusterModeProgress { // 存储位置:{ROCKETMQ_HOME}/store/config/consumerOffset.json /* { "offsetTable": { "ORDER_TOPIC@ORDER_GROUP": { "0": 123456, // Queue0 的消费进度 "1": 234567, // Queue1 的消费进度 "2": 345678, // Queue2 的消费进度 "3": 456789 // Queue3 的消费进度 } } } */ // Broker 端定期持久化(默认5秒) class ConsumerOffsetManager { private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable; public void commitOffset(String group, String topic, int queueId, long offset) { String key = topic + "@" + group; ConcurrentHashMap<Integer, Long> map = offsetTable.get(key); if (map != null) { Long oldOffset = map.put(queueId, offset); if (oldOffset == null || offset > oldOffset) { // 更新成功 } } } } } // 广播模式:进度存储在本地 class BroadcastModeProgress { // 存储位置:{user.home}/.rocketmq_offsets/{clientId}/{group}/offsets.json class LocalFileOffsetStore { private String storePath; private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable; public void persistAll() { // 持久化到本地文件 String jsonString = JSON.toJSONString(offsetTable); MixAll.string2File(jsonString, storePath); } public void load() { // 从本地文件加载 String content = MixAll.file2String(storePath); if (content != null) { offsetTable = JSON.parseObject(content, new TypeReference<ConcurrentHashMap<MessageQueue, AtomicLong>>(){}); } } } } } 4.2 消费进度提交 public class ProgressCommit { // Consumer 端进度提交 class ConsumerProgressCommit { // 定时提交(默认5秒) class ScheduledCommit { private ScheduledExecutorService scheduledExecutorService; public void start() { scheduledExecutorService.scheduleAtFixedRate(() -> { try { persistConsumerOffset(); } catch (Exception e) { log.error("persistConsumerOffset error", e); } }, 1000, 5000, TimeUnit.MILLISECONDS); } private void persistConsumerOffset() { for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) { MessageQueue mq = entry.getKey(); ProcessQueue pq = entry.getValue(); if (pq.isDropped()) { continue; } // 获取消费进度 long offset = pq.getConsumeOffset(); // 提交到 Broker offsetStore.updateOffset(mq, offset, false); } // 批量提交 offsetStore.persistAll(); } } // 关闭时提交 class ShutdownCommit { public void shutdown() { // 停止定时任务 scheduledExecutorService.shutdown(); // 最后一次提交 persistConsumerOffset(); // 等待提交完成 offsetStore.persistAll(); } } } } 4.3 进度重置 public class ProgressReset { // 重置消费进度的方式 class ResetMethods { // 1. 通过管理工具重置 void resetByAdmin() { // 重置到最早 // sh mqadmin resetOffset -g GROUP -t TOPIC -o 0 // 重置到最新 // sh mqadmin resetOffset -g GROUP -t TOPIC -o -1 // 重置到指定时间 // sh mqadmin resetOffset -g GROUP -t TOPIC -o 2023-11-13#10:00:00 } // 2. 通过代码重置 void resetByCode() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 设置从哪里开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // CONSUME_FROM_FIRST_OFFSET: 从最早消息 // CONSUME_FROM_LAST_OFFSET: 从最新消息 // CONSUME_FROM_TIMESTAMP: 从指定时间 // 设置消费时间点 consumer.setConsumeTimestamp("2023-11-13 10:00:00"); } // 3. 跳过堆积消息 void skipBacklog() { // 获取最大偏移量 long maxOffset = consumer.maxOffset(mq); // 更新到最大偏移量 consumer.updateConsumeOffset(mq, maxOffset); } } } 五、Rebalance 过程中的问题 5.1 消息重复消费 public class DuplicateConsumption { // 问题:Rebalance 导致消息重复 class Problem { /* 场景: 1. Consumer1 正在处理 Queue1 的消息 2. Rebalance 发生,Queue1 分配给 Consumer2 3. Consumer1 还没提交进度就失去了 Queue1 4. Consumer2 从旧进度开始消费,导致重复 */ } // 解决方案 class Solution { // 1. 幂等消费 class IdempotentConsumer { private Set<String> processedIds = new ConcurrentHashSet<>(); public ConsumeStatus consumeMessage(List<MessageExt> msgs) { for (MessageExt msg : msgs) { String bizId = msg.getKeys(); // 使用业务ID if (!processedIds.add(bizId)) { // 已处理,跳过 log.info("Message already processed: {}", bizId); continue; } // 处理消息 processMessage(msg); } return ConsumeStatus.CONSUME_SUCCESS; } } // 2. 减少 Rebalance 频率 class ReduceRebalance { void configure() { // 增加心跳间隔 consumer.setHeartbeatBrokerInterval(30000); // 30秒 // 使用一致性Hash consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueConsistentHash() ); } } } } 5.2 消费停顿 public class ConsumePause { // 问题:Rebalance 期间消费暂停 class Problem { /* Rebalance 过程: 1. 停止当前消费 2. 等待其他 Consumer Rebalance 3. 重新分配Queue 4. 恢复消费 期间有短暂停顿 */ } // 优化方案 class Optimization { // 1. 减少 Rebalance 时间 void reduceRebalanceTime() { // 减少 Consumer 数量变化 // 使用容器编排,批量上下线 } // 2. 平滑迁移 class SmoothMigration { void migrate() { // 先启动新 Consumer startNewConsumers(); // 等待 Rebalance 完成 Thread.sleep(30000); // 再停止旧 Consumer stopOldConsumers(); } } // 3. 预分配策略 void preAllocate() { // 预先规划好 Queue 分配 // 使用自定义分配策略 consumer.setAllocateMessageQueueStrategy( new StaticAllocationStrategy() ); } } } 六、监控和运维 6.1 消费组监控 public class ConsumerGroupMonitoring { // 监控指标 class Metrics { // 1. 消费进度监控 void monitorProgress() { // 消费延迟 long lag = maxOffset - consumerOffset; // 消费速度 double tps = (currentOffset - lastOffset) / interval; // 预计消费完成时间 double eta = lag / tps; } // 2. Rebalance 监控 void monitorRebalance() { // Rebalance 次数 int rebalanceCount; // Rebalance 耗时 long rebalanceDuration; // Queue 分配变化 Map<String, List<MessageQueue>> allocationHistory; } // 3. Consumer 健康度 void monitorHealth() { // 在线 Consumer 数量 int onlineConsumerCount; // 消费线程池状态 ThreadPoolExecutor executor = getConsumeExecutor(); int activeCount = executor.getActiveCount(); int queueSize = executor.getQueue().size(); } } // 运维命令 class AdminCommands { // 查看消费进度 void checkProgress() { // sh mqadmin consumerProgress -n localhost:9876 -g ORDER_GROUP } // 查看消费组列表 void listConsumerGroups() { // sh mqadmin consumerGroupList -n localhost:9876 } // 查看消费组详情 void consumerGroupDetail() { // sh mqadmin consumerConnection -n localhost:9876 -g ORDER_GROUP } } } 6.2 问题诊断 public class ProblemDiagnosis { // 常见问题诊断 class CommonProblems { // 1. 消费速度慢 void slowConsumption() { // 检查点: // - Consumer 数量是否足够 // - 消费线程池是否饱和 // - 单条消息处理时间 // - 网络延迟 // 解决: // - 增加 Consumer 实例 // - 增加消费线程数 // - 优化消费逻辑 // - 批量消费 } // 2. 消费不均衡 void imbalancedConsumption() { // 检查点: // - Queue 分配是否均匀 // - Consumer 处理能力是否一致 // - 是否有热点 Queue // 解决: // - 调整分配策略 // - 增加 Queue 数量 // - 使用一致性 Hash } // 3. Rebalance 频繁 void frequentRebalance() { // 检查点: // - Consumer 是否频繁上下线 // - 网络是否稳定 // - 心跳是否超时 // 解决: // - 增加心跳间隔 // - 优化部署策略 // - 使用容器编排 } } } 七、最佳实践 7.1 Consumer Group 设计原则 public class DesignPrinciples { // 1. 合理设置 Consumer 数量 class ConsumerCount { void calculate() { // Consumer 数量 <= Queue 数量 // 否则有 Consumer 空闲 // 建议: int queueCount = 16; int consumerCount = Math.min(queueCount, 8); // 不超过 Queue 数量 } } // 2. 订阅关系一致性 class SubscriptionConsistency { // 同一 Group 内所有 Consumer 必须: // - 订阅相同的 Topic // - 使用相同的 Tag 过滤 // - 使用相同的消费模式 // ❌ 错误示例 void wrongExample() { // Consumer1 consumer1.subscribe("TOPIC", "TagA"); // Consumer2(同组但不同Tag) consumer2.subscribe("TOPIC", "TagB"); // 错误! } // ✅ 正确示例 void correctExample() { // 所有 Consumer 订阅一致 consumer.subscribe("TOPIC", "TagA || TagB"); } } // 3. 消费进度管理 class ProgressManagement { void bestPractice() { // 定期监控消费延迟 monitorConsumerLag(); // 设置合理的提交间隔 consumer.setAutoCommitIntervalMillis(5000); // 优雅关闭 Runtime.getRuntime().addShutdownHook(new Thread(() -> { consumer.shutdown(); })); } } } 7.2 性能优化 public class PerformanceOptimization { // 1. 消费并发度调优 void optimizeConcurrency() { // 根据消息处理时间调整 consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(64); // 批量消费 consumer.setConsumeMessageBatchMaxSize(10); } // 2. 拉取优化 void optimizePull() { // 拉取批次大小 consumer.setPullBatchSize(32); // 拉取间隔(0表示不间断) consumer.setPullInterval(0); // 流控阈值 consumer.setPullThresholdForQueue(1000); } // 3. Rebalance 优化 void optimizeRebalance() { // 使用合适的分配策略 consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueConsistentHash() ); // 减少 Rebalance 触发 consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET ); } } 八、总结 通过本篇,我们深入理解了: ...

2025-11-13 · maneng

RocketMQ入门08:消费模式剖析 - Push vs Pull的权衡

引言:一个有趣的悖论 当你使用 RocketMQ 的 Push 模式时,你可能会惊讶地发现: API 名称是 DefaultMQPushConsumer 但实际上是 Consumer 在主动拉取消息 官方文档称之为"Push 模式" 但本质上是"长轮询 Pull" 这不是设计缺陷,而是精心权衡的结果。让我们揭开这个"伪 Push"的神秘面纱。 一、Push vs Pull 的本质区别 1.1 理论上的 Push 和 Pull public class PushVsPullTheory { // 真正的 Push 模式 class TruePushMode { // Broker 主动推送消息给 Consumer // 特点: // 1. Broker 维护 Consumer 列表 // 2. 有新消息立即推送 // 3. Broker 控制推送速率 // 4. Consumer 被动接收 void brokerPush() { // Broker 端代码 for (Consumer consumer : consumers) { consumer.receive(message); // 主动调用 } } } // 真正的 Pull 模式 class TruePullMode { // Consumer 主动从 Broker 拉取消息 // 特点: // 1. Consumer 控制拉取时机 // 2. Consumer 控制拉取速率 // 3. Broker 被动响应 // 4. 可能存在消息延迟 void consumerPull() { // Consumer 端代码 while (running) { Message msg = broker.pull(); // 主动拉取 process(msg); } } } } 1.2 两种模式的优劣对比 Push 模式的优劣: ┌─────────────────┬──────────────────────────────┐ │ 优点 │ 缺点 │ ├─────────────────┼──────────────────────────────┤ │ 实时性高 │ Broker 需要维护推送状态 │ │ 消息延迟小 │ 难以处理消费速率差异 │ │ 使用简单 │ 可能造成 Consumer 过载 │ │ │ Broker 成为性能瓶颈 │ └─────────────────┴──────────────────────────────┘ Pull 模式的优劣: ┌─────────────────┬──────────────────────────────┐ │ 优点 │ 缺点 │ ├─────────────────┼──────────────────────────────┤ │ Consumer 自主控制│ 可能产生消息延迟 │ │ 流量可控 │ 频繁轮询浪费资源 │ │ Broker 无状态 │ 需要管理消费进度 │ │ 适合批量处理 │ 实现相对复杂 │ └─────────────────┴──────────────────────────────┘ 二、RocketMQ 的 Push 模式真相 2.1 Push 模式的内部实现 public class RocketMQPushMode { // DefaultMQPushConsumer 的秘密:底层是 Pull class PushConsumerImplementation { private PullMessageService pullMessageService; // 启动后自动开始拉取 public void start() { // 1. 启动 Rebalance 服务 rebalanceService.start(); // 2. 启动拉取服务 pullMessageService.start(); // 3. 立即触发拉取 pullMessageService.executePullRequestImmediately(pullRequest); } // 核心:PullMessageService class PullMessageService extends ServiceThread { @Override public void run() { while (!this.isStopped()) { PullRequest pullRequest = pullRequestQueue.take(); pullMessage(pullRequest); } } private void pullMessage(PullRequest pullRequest) { // 实际进行拉取操作 PullResult pullResult = pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, pullRequest.getNextOffset(), maxNums, sysFlag, 0, // 没有消息时挂起时间 brokerSuspendMaxTimeMillis, // 30秒 timeoutMillis, CommunicationMode.ASYNC, pullCallback ); } } } } 2.2 Push 模式的工作流程 Push 模式完整流程: ┌──────────────┐ │ Consumer │ └──────┬───────┘ │ 1. 发送拉取请求 ↓ ┌──────────────┐ │ Broker │ ├──────────────┤ │ 有消息? │ │ ├─是→ 立即返回 │ └─否→ Hold住请求 └──────┬───────┘ │ 2. 返回消息或超时 ↓ ┌──────────────┐ │ Consumer │ │ 处理消息 │ │ 继续拉取 │ └──────────────┘ 2.3 为什么要"伪装"成 Push? public class WhyFakePush { // 原因1:使用体验 class UserExperience { // Push API 更简单 void pushAPI() { consumer.registerMessageListener(listener); consumer.start(); // 就这么简单 } // Pull API 更复杂 void pullAPI() { while (running) { PullResult result = consumer.pull(mq, offset); process(result); updateOffset(offset); } } } // 原因2:兼顾优点 class CombineAdvantages { // 获得 Push 的优点: // - 接近实时(长轮询) // - 使用简单 // 获得 Pull 的优点: // - 流量可控 // - Broker 无状态 // - Consumer 自主控制 } // 原因3:避免缺点 class AvoidDisadvantages { // 避免 Push 的缺点: // - Broker 不需要维护推送状态 // - 不会造成 Consumer 过载 // 避免 Pull 的缺点: // - 无消息时不会空轮询 // - 有消息时立即返回 } } 三、长轮询机制详解 3.1 长轮询的核心原理 public class LongPollingMechanism { // Broker 端长轮询实现 class BrokerLongPolling { // 处理拉取请求 public RemotingCommand processRequest(RemotingCommand request) { // 1. 查询消息 GetMessageResult getMessageResult = messageStore.getMessage( group, topic, queueId, offset, maxMsgNums, filters ); // 2. 判断是否有消息 if (getMessageResult.getMessageCount() > 0) { // 有消息,立即返回 return buildResponse(getMessageResult); } else { // 没有消息,进入长轮询 return longPolling(request); } } // 长轮询处理 private RemotingCommand longPolling(RemotingCommand request) { // 1. 创建 Hold 请求 PullRequest pullRequest = new PullRequest( request, channel, pollingTimeMills, // 挂起时间 System.currentTimeMillis() ); // 2. 放入 Hold 队列 pullRequestHoldService.suspendPullRequest(topic, queueId, pullRequest); // 3. 不立即返回,等待唤醒或超时 return null; // 暂不返回 } } // Hold 服务 class PullRequestHoldService extends ServiceThread { // Hold 请求的容器 private ConcurrentHashMap<String, ManyPullRequest> pullRequestTable; @Override public void run() { while (!stopped) { // 每 5 秒检查一次 waitForRunning(5000); // 检查所有 Hold 的请求 for (ManyPullRequest mpr : pullRequestTable.values()) { List<PullRequest> requestList = mpr.cloneListAndClear(); for (PullRequest request : requestList) { // 检查是否有新消息 long offset = messageStore.getMaxOffset(topic, queueId); if (offset > request.getPullFromThisOffset()) { // 有新消息,唤醒请求 wakeupRequest(request); } else if (System.currentTimeMillis() - request.getSuspendTimestamp() > maxWaitTimeMillis) { // 超时,返回空结果 timeoutRequest(request); } else { // 继续 Hold mpr.addPullRequest(request); } } } } } // 新消息到达时的通知 public void notifyMessageArriving(String topic, int queueId) { ManyPullRequest mpr = pullRequestTable.get(buildKey(topic, queueId)); if (mpr != null) { List<PullRequest> requestList = mpr.cloneListAndClear(); for (PullRequest request : requestList) { // 立即唤醒等待的请求 wakeupRequest(request); } } } } } 3.2 长轮询的时序图 长轮询时序图: Consumer Broker MessageStore │ │ │ │──────Pull Request───────>│ │ │ │ │ │ │────Check Message────────>│ │ │ │ │ │<───No Message────────────│ │ │ │ │ (Hold Request) │ │ │ │ │ │<────New Message Arrive────│ │ │ │ │ (Wake up Request) │ │ │ │ │<─────Return Messages─────│ │ │ │ │ 时间轴: 0s :Consumer 发送拉取请求 0s :Broker 检查没有消息,Hold 住请求 15s :新消息到达,Broker 唤醒请求 15s :Broker 返回消息给 Consumer 3.3 长轮询的参数配置 public class LongPollingConfig { // Consumer 端配置 class ConsumerConfig { // 长轮询模式下的挂起时间(默认 30 秒) consumer.setBrokerSuspendMaxTimeMillis(30000); // 拉取间隔(毫秒) consumer.setPullInterval(0); // 0 表示连续拉取 // 拉取批量大小 consumer.setPullBatchSize(32); // 单次拉取的最大消息数 consumer.setConsumeMessageBatchMaxSize(1); } // Broker 端配置 class BrokerConfig { // 长轮询等待时间(默认 30 秒) longPollingEnable = true; // Hold 请求的检查间隔(默认 5 秒) waitTimeMillsInPullHoldService = 5000; // 短轮询等待时间(默认 1 秒) shortPollingTimeMills = 1000; } } 四、Pull 模式的使用场景 4.1 Pull 模式的实现 public class PullModeImplementation { public void pullConsumer() throws Exception { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(); consumer.setNamesrvAddr("localhost:9876"); consumer.setConsumerGroup("PULL_CONSUMER_GROUP"); consumer.start(); // 获取消息队列 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TOPIC"); for (MessageQueue mq : mqs) { // 从存储获取消费进度 long offset = loadOffset(mq); while (true) { try { // 拉取消息 PullResult pullResult = consumer.pullBlockIfNotFound( mq, // 消息队列 null, // 过滤表达式 offset, // 拉取偏移量 32 // 最大消息数 ); // 处理拉取结果 switch (pullResult.getMsgFoundList()) { case FOUND: // 处理消息 for (MessageExt msg : pullResult.getMsgFoundList()) { processMessage(msg); } // 更新偏移量 offset = pullResult.getNextBeginOffset(); // 持久化偏移量 saveOffset(mq, offset); break; case NO_NEW_MSG: // 没有新消息 Thread.sleep(1000); break; case OFFSET_ILLEGAL: // 偏移量非法 offset = consumer.minOffset(mq); break; } } catch (Exception e) { e.printStackTrace(); } } } } } 4.2 Pull 模式的适用场景 public class PullModeScenarios { // 场景1:批量处理 class BatchProcessing { void batchConsume() { // 积累一定数量后批量处理 List<MessageExt> batch = new ArrayList<>(); while (batch.size() < 1000) { PullResult result = consumer.pull(mq, offset, 100); batch.addAll(result.getMsgFoundList()); } // 批量处理 batchProcess(batch); } } // 场景2:定时处理 class ScheduledProcessing { void scheduledConsume() { // 每小时处理一次 ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() -> { PullResult result = consumer.pull(mq, offset, Integer.MAX_VALUE); process(result.getMsgFoundList()); }, 0, 1, TimeUnit.HOURS); } } // 场景3:流量控制 class FlowControl { void controlledConsume() { RateLimiter rateLimiter = RateLimiter.create(100); // 100 TPS while (running) { rateLimiter.acquire(); // 限流 PullResult result = consumer.pull(mq, offset, 1); process(result.getMsgFoundList()); } } } // 场景4:自定义消费逻辑 class CustomLogic { void customConsume() { // 根据业务状态决定是否消费 if (isBusinessReady()) { PullResult result = consumer.pull(mq, offset, 32); process(result.getMsgFoundList()); } else { // 暂停消费 Thread.sleep(5000); } } } } 五、Push 和 Pull 的性能对比 5.1 性能测试 public class PerformanceComparison { // Push 模式性能特点 class PushPerformance { // 优点: // - 低延迟(长轮询,有消息立即返回) // - 高吞吐(连续拉取,无空闲) // - 资源利用率高(无空轮询) // 缺点: // - Consumer 压力大(连续处理) // - 流量不可控(Broker 推送速率) } // Pull 模式性能特点 class PullPerformance { // 优点: // - 流量可控(Consumer 控制) // - 压力可调(可以暂停) // - 批量优化(一次拉取多条) // 缺点: // - 可能有延迟(拉取间隔) // - 可能空轮询(浪费资源) } // 性能数据对比 class PerformanceData { /* 测试环境:3 Broker,10 Producer,10 Consumer 消息大小:1KB 指标 Push模式 Pull模式(1s间隔) Pull模式(连续) ---------------------------------------------------------------- 延迟(ms) 10-50 1000-2000 50-100 TPS 100,000 50,000 80,000 CPU使用率 85% 60% 75% 网络开销 低 高(空轮询) 中 */ } } 5.2 选择建议 public class SelectionGuide { // 选择 Push 模式的场景 class WhenToUsePush { // ✅ 实时性要求高 // ✅ 消息量稳定 // ✅ 希望简单使用 // ✅ Consumer 处理能力强 void example() { // 订单处理、支付通知、实时推送 } } // 选择 Pull 模式的场景 class WhenToUsePull { // ✅ 需要批量处理 // ✅ 定时处理 // ✅ 流量控制 // ✅ 自定义消费策略 void example() { // 数据同步、报表生成、批量导入 } } // 混合使用 class HybridUsage { // 核心业务使用 Push(实时性) // 非核心业务使用 Pull(可控性) void example() { // Push: 订单、支付 // Pull: 日志、统计 } } } 六、高级特性 6.1 Push 模式的流控 public class PushFlowControl { public void flowControl() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 流控参数 consumer.setPullThresholdForQueue(1000); // 单队列最大消息数 consumer.setPullThresholdSizeForQueue(100); // 单队列最大消息大小(MB) consumer.setPullThresholdForTopic(10000); // Topic 最大消息数 consumer.setPullThresholdSizeForTopic(1000); // Topic 最大消息大小(MB) // 消费并发控制 consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(64); // 批量消费 consumer.setConsumeMessageBatchMaxSize(10); } } 6.2 Pull 模式的优化 public class PullOptimization { // 优化1:使用 PullConsumer 代替 DefaultMQPullConsumer public void optimizedPull() { // RocketMQ 5.0 新 API PullConsumer pullConsumer = PullConsumer.newBuilder() .setConsumerGroup("GROUP") .setNamesrvAddr("localhost:9876") .build(); // 异步拉取 CompletableFuture<PullResult> future = pullConsumer.pullAsync( mq, offset, maxNums ); future.thenAccept(result -> { process(result.getMsgFoundList()); }); } // 优化2:批量拉取 public void batchPull() { // 一次拉取更多消息,减少网络开销 PullResult result = consumer.pull(mq, offset, 1000); // 拉取 1000 条 } // 优化3:并行处理 public void parallelPull() { ExecutorService executor = Executors.newFixedThreadPool(10); for (MessageQueue mq : queues) { executor.submit(() -> { while (running) { PullResult result = consumer.pull(mq, offset, 32); process(result); } }); } } } 七、常见问题 7.1 Push 模式消费堆积 // 问题:Push 模式下消息堆积 // 原因:消费速度 < 生产速度 // 解决方案: class PushBacklogSolution { void solution() { // 1. 增加消费者实例 // 水平扩展 // 2. 增加消费线程 consumer.setConsumeThreadMin(40); consumer.setConsumeThreadMax(80); // 3. 批量消费 consumer.setConsumeMessageBatchMaxSize(20); // 4. 优化消费逻辑 // 异步处理、批量入库等 } } 7.2 Pull 模式偏移量管理 // 问题:Pull 模式偏移量丢失 // 原因:没有正确持久化偏移量 // 解决方案: class PullOffsetManagement { private OffsetStore offsetStore; void manageOffset() { // 1. 定期持久化 scheduledExecutor.scheduleAtFixedRate(() -> { offsetStore.persistAll(offsetTable); }, 5, 5, TimeUnit.SECONDS); // 2. 优雅关闭时持久化 Runtime.getRuntime().addShutdownHook(new Thread(() -> { offsetStore.persistAll(offsetTable); })); // 3. 使用 RocketMQ 提供的 OffsetStore consumer.setOffsetStore(new RemoteBrokerOffsetStore()); } } 八、总结 通过本篇,我们深入理解了: ...

2025-11-13 · maneng

RocketMQ入门07:消息模型深入 - 点对点vs发布订阅

引言:一个模型,两种模式 传统消息系统通常要在两种模型中二选一: JMS:明确区分 Queue(点对点)和 Topic(发布订阅) AMQP:通过 Exchange 和 Binding 实现不同模式 Kafka:只有 Topic,通过 Consumer Group 实现不同语义 而 RocketMQ 的独特之处在于:用一套模型,同时支持两种模式。 让我们深入理解这是如何实现的。 一、两种基本消息模型 1.1 点对点模型(Point-to-Point) public class P2PModel { // 点对点模型特征 class Characteristics { // 1. 一条消息只能被一个消费者消费 // 2. 消息被消费后从队列中删除 // 3. 多个消费者竞争消息 // 4. 适合任务分发场景 } // 传统 JMS Queue 实现 class TraditionalQueue { Queue<Message> queue = new LinkedBlockingQueue<>(); // 生产者发送 public void send(Message msg) { queue.offer(msg); } // 消费者接收(竞争) public Message receive() { return queue.poll(); // 取出即删除 } } } 点对点模型示意图: ┌──────────┐ Producer ──> Queue ──> Consumer1 (获得消息) └──────────┘ ──> Consumer2 (没有获得) ──> Consumer3 (没有获得) 特点:消息被其中一个消费者消费后,其他消费者无法再消费 1.2 发布订阅模型(Publish-Subscribe) public class PubSubModel { // 发布订阅模型特征 class Characteristics { // 1. 一条消息可以被多个订阅者消费 // 2. 每个订阅者都能收到完整消息副本 // 3. 消息广播给所有订阅者 // 4. 适合事件通知场景 } // 传统 JMS Topic 实现 class TraditionalTopic { List<Subscriber> subscribers = new ArrayList<>(); // 发布消息 public void publish(Message msg) { // 广播给所有订阅者 for (Subscriber sub : subscribers) { sub.onMessage(msg.copy()); } } // 订阅主题 public void subscribe(Subscriber sub) { subscribers.add(sub); } } } 发布订阅模型示意图: ┌──> Consumer1 (收到消息) ┌────────┐ │ Producer ──> Topic ──┼──> Consumer2 (收到消息) └────────┘ │ └──> Consumer3 (收到消息) 特点:每个订阅者都能收到消息的完整副本 二、RocketMQ 的统一模型 2.1 核心设计:Consumer Group public class RocketMQModel { // RocketMQ 的秘密:通过 Consumer Group 实现两种模式 class ConsumerGroup { String groupName; List<Consumer> consumers; ConsumeMode mode; // 集群消费 or 广播消费 } // 关键规则: // 1. 同一 Consumer Group 内的消费者【分摊】消息(点对点) // 2. 不同 Consumer Group 都能收到【全量】消息(发布订阅) } 2.2 模式切换的魔法 public class ModelSwitching { // 场景1:实现点对点模式 public void p2pMode() { // 所有消费者使用【相同】的 Consumer Group String GROUP = "SAME_GROUP"; Consumer consumer1 = new DefaultMQPushConsumer(GROUP); Consumer consumer2 = new DefaultMQPushConsumer(GROUP); Consumer consumer3 = new DefaultMQPushConsumer(GROUP); // 结果:消息被分摊消费,每条消息只被其中一个消费 } // 场景2:实现发布订阅模式 public void pubSubMode() { // 每个消费者使用【不同】的 Consumer Group Consumer consumer1 = new DefaultMQPushConsumer("GROUP_A"); Consumer consumer2 = new DefaultMQPushConsumer("GROUP_B"); Consumer consumer3 = new DefaultMQPushConsumer("GROUP_C"); // 结果:每个 Group 都收到全量消息 } // 场景3:混合模式 public void mixedMode() { // 订单服务组(2个实例) Consumer order1 = new DefaultMQPushConsumer("ORDER_GROUP"); Consumer order2 = new DefaultMQPushConsumer("ORDER_GROUP"); // 库存服务组(3个实例) Consumer stock1 = new DefaultMQPushConsumer("STOCK_GROUP"); Consumer stock2 = new DefaultMQPushConsumer("STOCK_GROUP"); Consumer stock3 = new DefaultMQPushConsumer("STOCK_GROUP"); // 结果: // - ORDER_GROUP 内部:order1 和 order2 分摊消息 // - STOCK_GROUP 内部:stock1/2/3 分摊消息 // - 两个 GROUP 之间:都能收到全量消息 } } 三、集群消费 vs 广播消费 3.1 集群消费(默认) public class ClusterConsumeMode { public void setup() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP"); // 设置集群消费模式(默认) consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe("TOPIC", "*"); } // 集群消费特点 class ClusteringCharacteristics { // 1. 同组内消费者分摊消息 // 2. 消费进度存储在 Broker // 3. 支持消费失败重试 // 4. 适合负载均衡场景 } // 消费进度管理 class ProgressManagement { // Broker 端统一管理消费进度 // 路径:{ROCKETMQ_HOME}/store/config/consumerOffset.json { "offsetTable": { "TOPIC@GROUP": { "0": 12345, // Queue0 消费到 12345 "1": 23456, // Queue1 消费到 23456 "2": 34567, // Queue2 消费到 34567 "3": 45678 // Queue3 消费到 45678 } } } } } 集群消费示意图: Queue0 ──> Consumer1 ─┐ Queue1 ──> Consumer1 ─┤ Topic ──> ├─> GROUP_A (共享进度) Queue2 ──> Consumer2 ─┤ Queue3 ──> Consumer2 ─┘ GROUP_A 的消费进度存储在 Broker,所有成员共享 3.2 广播消费 public class BroadcastConsumeMode { public void setup() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP"); // 设置广播消费模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TOPIC", "*"); } // 广播消费特点 class BroadcastingCharacteristics { // 1. 每个消费者都消费全量消息 // 2. 消费进度存储在本地 // 3. 不支持消费失败重试 // 4. 适合本地缓存更新场景 } // 消费进度管理 class LocalProgress { // 本地文件存储消费进度 // 路径:{user.home}/.rocketmq_offsets/{clientId}/{group}/offsets.json { "offsetTable": { "TOPIC": { "0": 99999, // 本实例 Queue0 进度 "1": 88888, // 本实例 Queue1 进度 "2": 77777, // 本实例 Queue2 进度 "3": 66666 // 本实例 Queue3 进度 } } } } } 广播消费示意图: ┌─> Consumer1 (消费全部消息,独立进度) │ Topic ──> ────┼─> Consumer2 (消费全部消息,独立进度) │ └─> Consumer3 (消费全部消息,独立进度) 每个 Consumer 独立维护自己的消费进度 四、消息分配策略 4.1 集群模式下的分配策略 public class AllocationStrategy { // 1. 平均分配(默认) class AllocateMessageQueueAveragely { // 假设:4个Queue,2个Consumer // Consumer1: Queue0, Queue1 // Consumer2: Queue2, Queue3 // 假设:5个Queue,2个Consumer // Consumer1: Queue0, Queue1, Queue2 // Consumer2: Queue3, Queue4 } // 2. 环形分配 class AllocateMessageQueueAveragelyByCircle { // 假设:5个Queue,2个Consumer // Consumer1: Queue0, Queue2, Queue4 // Consumer2: Queue1, Queue3 } // 3. 机房就近分配 class AllocateMessageQueueByMachineRoom { // 根据 Consumer 和 Broker 的机房位置分配 // 优先分配同机房的 Queue } // 4. 一致性Hash分配 class AllocateMessageQueueConsistentHash { // 使用一致性Hash算法 // Consumer 上下线对其他 Consumer 影响最小 } // 5. 自定义分配 class CustomAllocateStrategy implements AllocateMessageQueueStrategy { @Override public List<MessageQueue> allocate( String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { // 自定义分配逻辑 // 比如:VIP Consumer 分配更多 Queue if (isVipConsumer(currentCID)) { return allocateMoreQueues(mqAll, currentCID); } return allocateNormalQueues(mqAll, currentCID); } } } 4.2 分配策略选择 public class StrategySelection { public void selectStrategy() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 设置分配策略 consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueAveragely() ); // 根据场景选择 class ScenarioBasedSelection { // 场景1:消费者能力相同 → 平均分配 // 场景2:跨机房部署 → 机房就近 // 场景3:Consumer频繁上下线 → 一致性Hash // 场景4:Consumer能力不同 → 自定义策略 } } } 五、实战场景分析 5.1 订单处理系统 public class OrderProcessingSystem { // 需求:订单需要被多个系统处理,但每个系统只处理一次 // 订单服务(2个实例,负载均衡) class OrderService { void startConsumers() { // 使用相同 Group,实现负载均衡 Consumer instance1 = createConsumer("ORDER_SERVICE_GROUP"); Consumer instance2 = createConsumer("ORDER_SERVICE_GROUP"); // 结果:订单在两个实例间负载均衡 } } // 库存服务(3个实例,负载均衡) class StockService { void startConsumers() { // 使用相同 Group,实现负载均衡 Consumer instance1 = createConsumer("STOCK_SERVICE_GROUP"); Consumer instance2 = createConsumer("STOCK_SERVICE_GROUP"); Consumer instance3 = createConsumer("STOCK_SERVICE_GROUP"); // 结果:订单在三个实例间负载均衡 } } // 积分服务(1个实例) class PointService { void startConsumer() { Consumer instance = createConsumer("POINT_SERVICE_GROUP"); // 结果:所有订单都由这个实例处理 } } // 效果: // 1. 每个订单被 ORDER_SERVICE_GROUP 处理一次 // 2. 每个订单被 STOCK_SERVICE_GROUP 处理一次 // 3. 每个订单被 POINT_SERVICE_GROUP 处理一次 } 5.2 配置更新系统 public class ConfigUpdateSystem { // 需求:配置更新需要通知所有服务实例 class ConfigUpdateConsumer { void setup() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CONFIG_GROUP"); // 使用广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("CONFIG_UPDATE_TOPIC", "*"); consumer.registerMessageListener((List<MessageExt> msgs, context) -> { for (MessageExt msg : msgs) { // 更新本地配置 updateLocalConfig(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); } } // 效果: // 1. 所有服务实例都收到配置更新 // 2. 每个实例独立更新自己的配置 // 3. 某个实例重启不影响其他实例 } 5.3 日志收集系统 public class LogCollectionSystem { // 需求:大量日志需要收集,要求高吞吐 class LogCollector { void setup() { // 创建多个消费者,使用同一个 Group String GROUP = "LOG_COLLECTOR_GROUP"; // 部署10个消费者实例 for (int i = 0; i < 10; i++) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP); consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(30); consumer.setPullBatchSize(64); // 批量拉取 consumer.subscribe("LOG_TOPIC", "*"); consumer.start(); } // 效果: // 1. 10个实例并行消费,提高吞吐量 // 2. 自动负载均衡 // 3. 某个实例故障,其他实例自动接管 } } } 六、模型对比与选择 6.1 不同MQ产品的模型对比 消息模型对比: ┌─────────────┬──────────────┬──────────────┬──────────────┐ │ 产品 │ 模型 │ 实现方式 │ 特点 │ ├─────────────┼──────────────┼──────────────┼──────────────┤ │ RabbitMQ │ AMQP │ Exchange │ 灵活但复杂 │ │ ActiveMQ │ JMS │ Queue/Topic │ 标准但死板 │ │ Kafka │ 发布订阅 │ Partition │ 简单但单一 │ │ RocketMQ │ 统一模型 │ ConsumerGroup│ 灵活且简单 │ └─────────────┴──────────────┴──────────────┴──────────────┘ 6.2 选择建议 public class ModelSelectionGuide { // 使用集群消费(点对点)的场景 class UseClusterMode { // ✅ 任务分发 // ✅ 负载均衡 // ✅ 高可用(故障转移) // ✅ 需要消费确认和重试 void example() { // 订单处理、支付处理、数据同步 } } // 使用广播消费的场景 class UseBroadcastMode { // ✅ 本地缓存更新 // ✅ 配置刷新 // ✅ 全量数据同步 // ✅ 实时通知 void example() { // 配置更新、缓存刷新、实时推送 } } // 混合使用的场景 class UseMixedMode { // 不同业务使用不同 Group // 同一业务的多实例使用相同 Group void example() { // 电商系统:订单服务组、库存服务组、积分服务组 // 每个组内负载均衡,组间相互独立 } } } 七、高级特性 7.1 消费进度重置 public class ConsumeProgressReset { // 重置消费进度的场景 // 1. 需要重新消费历史消息 // 2. 跳过大量积压消息 // 3. 修复消费进度异常 public void resetProgress() { // 方式1:通过管理工具 // sh mqadmin resetOffset -n localhost:9876 -g GROUP -t TOPIC -o 0 // 方式2:通过代码 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // CONSUME_FROM_FIRST_OFFSET: 从最早消息开始 // CONSUME_FROM_LAST_OFFSET: 从最新消息开始 // CONSUME_FROM_TIMESTAMP: 从指定时间开始 } } 7.2 消费并行度调优 public class ConsumerConcurrency { public void tuneConcurrency() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 并发消费 consumer.setConsumeThreadMin(20); // 最小线程数 consumer.setConsumeThreadMax(64); // 最大线程数 // 顺序消费(单线程) consumer.setConsumeThreadMax(1); // 批量消费 consumer.setConsumeMessageBatchMaxSize(10); // 批量大小 consumer.setPullBatchSize(32); // 拉取批量 // 限流 consumer.setPullThresholdForQueue(1000); // 队列级别流控 consumer.setPullThresholdForTopic(5000); // Topic级别流控 } } 八、常见问题 8.1 消息重复消费 // 问题:同一条消息被消费多次 // 原因: // 1. 消费超时导致重试 // 2. Rebalance 导致重新分配 // 3. 消费者异常退出 // 解决方案:幂等处理 class IdempotentConsumer { private Set<String> processedMsgIds = new ConcurrentHashSet<>(); public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs) { for (MessageExt msg : msgs) { String msgId = msg.getMsgId(); // 幂等检查 if (!processedMsgIds.add(msgId)) { // 已处理过,跳过 continue; } // 处理消息 processMessage(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } 8.2 消费不均衡 // 问题:某些 Consumer 消费很多,某些很少 // 原因: // 1. Queue 数量 < Consumer 数量 // 2. 消息分布不均 // 3. Consumer 处理能力不同 // 解决方案: class BalanceSolution { void solution() { // 1. 调整 Queue 数量 // Queue数量 >= Consumer数量 // 2. 使用合适的分配策略 consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueConsistentHash() ); // 3. 监控并动态调整 // 监控每个 Consumer 的消费TPS // 动态调整线程池大小 } } 九、总结 通过本篇,我们深入理解了: ...

2025-11-13 · maneng

RocketMQ入门06:核心概念详解(下)- Topic、Queue、Tag、Key

引言:消息的"地址系统" 如果把 RocketMQ 比作快递系统,那么: Topic 是省份(大的分类) Queue 是城市(负载均衡单元) Tag 是街道(消息过滤) Key 是门牌号(精确查找) 理解这套"地址系统",才能高效地组织和管理消息。 一、Topic(主题)- 消息的逻辑分类 1.1 Topic 是什么? // Topic 是消息的一级分类,是逻辑概念 public class TopicConcept { // Topic 的本质:消息类型的逻辑分组 String ORDER_TOPIC = "ORDER_TOPIC"; // 订单消息 String PAYMENT_TOPIC = "PAYMENT_TOPIC"; // 支付消息 String LOGISTICS_TOPIC = "LOGISTICS_TOPIC"; // 物流消息 // 一个 Topic 包含多个 Queue class TopicStructure { String topicName; int readQueueNums = 4; // 读队列数量 int writeQueueNums = 4; // 写队列数量 int perm = 6; // 权限:2-写 4-读 6-读写 // Topic 在多个 Broker 上的分布 Map<String/* brokerName */, List<MessageQueue>> brokerQueues; } } 1.2 Topic 的设计原则 public class TopicDesignPrinciples { // 原则1:按业务领域划分 class BusinessDomain { // ✅ 推荐:清晰的业务边界 String TRADE_TOPIC = "TRADE_TOPIC"; String USER_TOPIC = "USER_TOPIC"; String INVENTORY_TOPIC = "INVENTORY_TOPIC"; // ❌ 不推荐:过于宽泛 String ALL_MESSAGE_TOPIC = "ALL_MESSAGE_TOPIC"; } // 原则2:考虑消息特性 class MessageCharacteristics { // 按消息量级区分 String HIGH_TPS_TOPIC = "LOG_TOPIC"; // 高吞吐 String LOW_TPS_TOPIC = "ALERT_TOPIC"; // 低吞吐 // 按消息大小区分 String BIG_MSG_TOPIC = "FILE_TOPIC"; // 大消息 String SMALL_MSG_TOPIC = "EVENT_TOPIC"; // 小消息 // 按重要程度区分 String CRITICAL_TOPIC = "PAYMENT_TOPIC"; // 核心业务 String NORMAL_TOPIC = "STAT_TOPIC"; // 一般业务 } // 原则3:隔离不同优先级 class PriorityIsolation { // 不同优先级使用不同 Topic String ORDER_HIGH_PRIORITY = "ORDER_HIGH_TOPIC"; String ORDER_NORMAL_PRIORITY = "ORDER_NORMAL_TOPIC"; String ORDER_LOW_PRIORITY = "ORDER_LOW_TOPIC"; } } 1.3 Topic 创建和管理 # 自动创建(不推荐生产使用) autoCreateTopicEnable=true defaultTopicQueueNums=4 # 手动创建(推荐) sh mqadmin updateTopic \ -n localhost:9876 \ -t ORDER_TOPIC \ -c DefaultCluster \ -r 8 \ # 读队列数量 -w 8 \ # 写队列数量 -p 6 # 权限 # 查看 Topic 列表 sh mqadmin topicList -n localhost:9876 # 查看 Topic 详情 sh mqadmin topicStatus -n localhost:9876 -t ORDER_TOPIC 二、Queue(队列)- 负载均衡的基本单元 2.1 Queue 的本质 public class QueueConcept { // Queue 是 Topic 的物理分片 class MessageQueue { private String topic; // 所属 Topic private String brokerName; // 所在 Broker private int queueId; // 队列 ID @Override public String toString() { // ORDER_TOPIC@broker-a@0 return topic + "@" + brokerName + "@" + queueId; } } // Topic 与 Queue 的关系 class TopicQueueRelation { // 1个 Topic = N 个 Queue // 例如:ORDER_TOPIC 有 8 个 Queue // broker-a: queue0, queue1, queue2, queue3 // broker-b: queue0, queue1, queue2, queue3 Map<String, Integer> distribution = new HashMap<>(); { distribution.put("broker-a", 4); distribution.put("broker-b", 4); // Total: 8 Queues } } } 2.2 Queue 的负载均衡 public class QueueLoadBalance { // 生产者端:选择 Queue 发送 public MessageQueue selectQueue() { List<MessageQueue> queues = getQueues("ORDER_TOPIC"); // 策略1:轮询(默认) int index = sendWhichQueue.incrementAndGet() % queues.size(); return queues.get(index); // 策略2:根据业务 Key 路由 String orderId = "12345"; int index = orderId.hashCode() % queues.size(); return queues.get(index); // 策略3:最小延迟 return selectQueueByLatency(queues); } // 消费者端:分配 Queue 消费 public List<MessageQueue> allocateQueues() { // 假设:8个 Queue,3个 Consumer // Consumer0: queue0, queue1, queue2 // Consumer1: queue3, queue4, queue5 // Consumer2: queue6, queue7 return AllocateStrategy.allocate( allQueues, currentConsumerId, allConsumerIds ); } } 2.3 Queue 数量设计 public class QueueNumberDesign { // Queue 数量考虑因素 class Factors { // 1. 并发度 // Queue 数量 = 预期 TPS / 单 Queue TPS // 例如:10万 TPS / 1万 TPS = 10个 Queue // 2. Consumer 数量 // Queue 数量 >= Consumer 数量 // 否则有 Consumer 闲置 // 3. Broker 数量 // Queue 数量 = Broker 数量 × 单 Broker Queue 数 // 例如:2个 Broker × 4 = 8个 Queue } // 动态调整 Queue 数量 public void adjustQueueNumber() { // 增加 Queue(在线扩容) updateTopic("ORDER_TOPIC", readQueues=16, writeQueues=16); // 减少 Queue(需谨慎) // 1. 先减少写队列 updateTopic("ORDER_TOPIC", readQueues=16, writeQueues=8); // 2. 等待消费完成 waitForConsume(); // 3. 再减少读队列 updateTopic("ORDER_TOPIC", readQueues=8, writeQueues=8); } } 三、Tag(标签)- 消息的二级分类 3.1 Tag 的作用 public class TagUsage { // Tag 用于同一 Topic 下的消息细分 public void sendWithTag() { // 订单 Topic 下的不同消息类型 Message createOrder = new Message("ORDER_TOPIC", "CREATE", body); Message payOrder = new Message("ORDER_TOPIC", "PAY", body); Message cancelOrder = new Message("ORDER_TOPIC", "CANCEL", body); Message refundOrder = new Message("ORDER_TOPIC", "REFUND", body); } // Tag vs Topic 的选择 class TagVsTopic { // 使用 Tag 的场景: // 1. 消息类型相关性强 // 2. 需要统一管理 // 3. 消费者可能订阅多种类型 // ✅ 推荐:相关业务用 Tag 区分 void goodPractice() { // 订单的不同状态用 Tag new Message("ORDER_TOPIC", "CREATED", body); new Message("ORDER_TOPIC", "PAID", body); new Message("ORDER_TOPIC", "SHIPPED", body); } // ❌ 不推荐:无关业务用同一 Topic void badPractice() { // 订单和用户混在一起 new Message("BUSINESS_TOPIC", "ORDER_CREATE", body); new Message("BUSINESS_TOPIC", "USER_REGISTER", body); } } } 3.2 Tag 过滤机制 public class TagFiltering { // 消费者订阅 public void subscribeWithTag() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 订阅所有 Tag consumer.subscribe("ORDER_TOPIC", "*"); // 订阅单个 Tag consumer.subscribe("ORDER_TOPIC", "CREATE"); // 订阅多个 Tag(|| 分隔) consumer.subscribe("ORDER_TOPIC", "CREATE || PAY || CANCEL"); // SQL92 表达式(需要 Broker 开启) consumer.subscribe("ORDER_TOPIC", MessageSelector.bySql("amount > 1000 AND type = 'GOLD'")); } // Tag 过滤原理 class FilterPrinciple { // 1. Broker 端过滤(粗过滤) // 根据 Tag HashCode 快速过滤 // 在 ConsumeQueue 中存储了 Tag HashCode // 2. Consumer 端过滤(精过滤) // 再次验证 Tag 字符串 // 防止 Hash 冲突导致的误判 public boolean filterMessage(MessageExt msg, String subscribeTags) { // Broker 端:HashCode 匹配 long tagCode = msg.getTagsCode(); if (!isMatched(tagCode, subscribeTagCodes)) { return false; } // Consumer 端:字符串匹配 String msgTag = msg.getTags(); return isTagMatched(msgTag, subscribeTags); } } } 3.3 Tag 最佳实践 public class TagBestPractices { // 1. Tag 命名规范 class TagNaming { // 使用大写字母和下划线 String GOOD_TAG = "ORDER_CREATED"; // 避免特殊字符 String BAD_TAG = "order-created@2023"; // 避免 // 保持简短有意义 String CONCISE = "PAY_SUCCESS"; String VERBOSE = "ORDER_PAYMENT_SUCCESS_WITH_DISCOUNT"; // 太长 } // 2. Tag 使用策略 class TagStrategy { // 按业务流程设计 String[] ORDER_FLOW = { "CREATED", // 订单创建 "PAID", // 支付完成 "SHIPPED", // 已发货 "RECEIVED", // 已收货 "COMPLETED" // 已完成 }; // 按事件类型设计 String[] EVENT_TYPE = { "INSERT", // 新增 "UPDATE", // 更新 "DELETE" // 删除 }; } } 四、Key - 消息的业务标识 4.1 Key 的用途 public class KeyUsage { // Key 用于消息的业务标识和查询 public void sendWithKey() { Message msg = new Message("ORDER_TOPIC", "CREATE", body); // 设置业务唯一标识 msg.setKeys("ORDER_12345"); // 设置多个 Key(空格分隔) msg.setKeys("ORDER_12345 USER_67890"); producer.send(msg); } // Key 的主要用途 class KeyPurpose { // 1. 消息查询 void queryByKey() { // 通过 Key 查询消息 QueryResult result = mqadmin.queryMsgByKey("ORDER_TOPIC", "ORDER_12345"); } // 2. 去重判断 void deduplication(String key) { if (processedKeys.contains(key)) { // 已处理,跳过 return; } processMessage(); processedKeys.add(key); } // 3. 业务追踪 void traceOrder(String orderId) { // 追踪订单的所有消息 List<Message> orderMessages = queryAllMessagesByKey(orderId); } } } 4.2 Key 的索引机制 public class KeyIndexing { // IndexFile 结构 class IndexFile { // 通过 Key 建立索引 // 存储格式:Key Hash -> Physical Offset private IndexHeader header; // 文件头 private SlotTable slotTable; // Hash 槽 private IndexLinkedList indexes; // 索引链表 public void putKey(String key, long offset) { int keyHash = hash(key); int slotPos = keyHash % 500_0000; // 500万个槽 // 存储索引 IndexItem item = new IndexItem(); item.setKeyHash(keyHash); item.setPhyOffset(offset); item.setTimeDiff(storeTimestamp - beginTimestamp); // 处理 Hash 冲突(链表) item.setSlotValue(slotTable.get(slotPos)); slotTable.put(slotPos, indexes.add(item)); } } } 4.3 Key 设计建议 public class KeyDesignSuggestions { // 1. Key 的选择 class KeySelection { // ✅ 好的 Key:业务唯一标识 String orderId = "ORDER_2023110901234"; String userId = "USER_123456"; String transactionId = "TXN_9876543210"; // ❌ 不好的 Key:无业务含义 String uuid = UUID.randomUUID().toString(); String timestamp = String.valueOf(System.currentTimeMillis()); } // 2. 复合 Key class CompositeKey { public String generateKey(Order order) { // 组合多个维度 return String.format("%s_%s_%s", order.getOrderId(), order.getUserId(), order.getCreateTime() ); } } // 3. Key 规范 class KeySpecification { // 长度控制 static final int MAX_KEY_LENGTH = 128; // 字符限制 static final String KEY_PATTERN = "^[a-zA-Z0-9_-]+$"; // 避免特殊字符 public String sanitizeKey(String key) { return key.replaceAll("[^a-zA-Z0-9_-]", "_"); } } } 五、消息属性(Properties) 5.1 系统属性 vs 用户属性 public class MessageProperties { // 系统属性(RocketMQ 内部使用) class SystemProperties { String UNIQ_KEY = "__UNIQ_KEY__"; // 消息唯一标识 String MAX_OFFSET = "__MAX_OFFSET__"; // 最大偏移量 String MIN_OFFSET = "__MIN_OFFSET__"; // 最小偏移量 String TRANSACTION_ID = "__TRANSACTION_ID__"; // 事务ID String DELAY_LEVEL = "__DELAY_LEVEL__"; // 延迟级别 } // 用户自定义属性 public void setUserProperties() { Message msg = new Message("TOPIC", "TAG", body); // 添加业务属性 msg.putUserProperty("orderType", "ONLINE"); msg.putUserProperty("paymentMethod", "CREDIT_CARD"); msg.putUserProperty("region", "EAST"); msg.putUserProperty("vipLevel", "GOLD"); // SQL92 过滤时使用 consumer.subscribe("TOPIC", MessageSelector.bySql("vipLevel = 'GOLD' AND region = 'EAST'")); } } 5.2 属性的应用场景 public class PropertiesUseCases { // 1. 消息路由 public void routeByProperty(Message msg) { String region = msg.getUserProperty("region"); // 根据地区路由到不同集群 if ("NORTH".equals(region)) { sendToNorthCluster(msg); } else { sendToSouthCluster(msg); } } // 2. 监控统计 public void monitoring(Message msg) { // 统计不同类型订单 String orderType = msg.getUserProperty("orderType"); metrics.increment("order.count", "type", orderType); } // 3. 灰度发布 public void grayRelease(Message msg) { msg.putUserProperty("version", "2.0"); msg.putUserProperty("gray", "true"); msg.putUserProperty("percentage", "10"); // 10% 流量 } } 六、组合使用示例 6.1 电商订单场景 public class ECommerceExample { public void processOrder(Order order) { // Topic:按业务领域 String topic = "ORDER_TOPIC"; // Tag:按订单状态 String tag = getOrderTag(order.getStatus()); // Key:订单号(用于查询) String key = order.getOrderId(); // 构造消息 Message msg = new Message(topic, tag, key, JSON.toJSONBytes(order)); // 添加属性(用于过滤和统计) msg.putUserProperty("userId", order.getUserId()); msg.putUserProperty("amount", String.valueOf(order.getAmount())); msg.putUserProperty("payMethod", order.getPayMethod()); msg.putUserProperty("orderType", order.getType()); // 发送消息 SendResult result = producer.send(msg); } private String getOrderTag(OrderStatus status) { switch (status) { case CREATED: return "ORDER_CREATED"; case PAID: return "ORDER_PAID"; case SHIPPED: return "ORDER_SHIPPED"; case COMPLETED: return "ORDER_COMPLETED"; case CANCELLED: return "ORDER_CANCELLED"; default: return "ORDER_UNKNOWN"; } } } 6.2 消费者订阅策略 public class ConsumerStrategy { // 场景1:订阅所有订单消息 class AllOrderConsumer { public void subscribe() { consumer.subscribe("ORDER_TOPIC", "*"); } } // 场景2:只处理支付相关 class PaymentConsumer { public void subscribe() { consumer.subscribe("ORDER_TOPIC", "ORDER_PAID || ORDER_REFUND"); } } // 场景3:VIP 订单处理 class VipOrderConsumer { public void subscribe() { consumer.subscribe("ORDER_TOPIC", MessageSelector.bySql("orderType = 'VIP' AND amount > 1000")); } } // 场景4:特定用户订单 class UserOrderConsumer { public void consumeMessage(MessageExt msg) { String userId = msg.getUserProperty("userId"); if ("VIP_USER_123".equals(userId)) { // 特殊处理 VIP 用户订单 handleVipOrder(msg); } else { // 普通处理 handleNormalOrder(msg); } } } } 七、设计原则总结 7.1 层级设计原则 消息层级设计: ┌─────────────────────────────────┐ │ Topic │ <- 业务大类 ├─────────────────────────────────┤ │ Queue1 Queue2 Queue3 │ <- 负载均衡 ├─────────────────────────────────┤ │ Tag1 Tag2 Tag3 │ <- 消息子类 ├─────────────────────────────────┤ │ Key (业务ID) │ <- 消息标识 ├─────────────────────────────────┤ │ Properties (属性) │ <- 扩展信息 └─────────────────────────────────┘ 7.2 最佳实践建议 public class BestPractices { // 1. Topic 设计:粗粒度 // 一个业务领域一个 Topic // 避免创建过多 Topic // 2. Queue 设计:适度 // Queue 数量 = max(预期并发数, Consumer数量) // 一般 8-16 个即可 // 3. Tag 设计:细粒度 // 同一 Topic 下的不同消息类型 // 便于消费者灵活订阅 // 4. Key 设计:唯一性 // 使用业务唯一标识 // 便于消息追踪和查询 // 5. Properties:扩展性 // 存储额外的业务信息 // 支持 SQL92 复杂过滤 } 八、常见问题 8.1 Topic 过多的问题 // 问题:Topic 过多导致的影响 // 1. 路由信息膨胀 // 2. 内存占用增加 // 3. 管理复杂度上升 // 解决方案:合并相关 Topic,用 Tag 区分 // Before: USER_REGISTER_TOPIC, USER_UPDATE_TOPIC, USER_DELETE_TOPIC // After: USER_TOPIC + Tags(REGISTER, UPDATE, DELETE) 8.2 Queue 数量不当 // 问题:Queue 太少 // - Consumer 空闲 // - 并发度不够 // 问题:Queue 太多 // - 管理开销大 // - Rebalance 频繁 // 解决:动态调整 // 监控 Queue 积压和 Consumer 负载 // 根据实际情况调整 Queue 数量 九、总结 理解了 Topic、Queue、Tag、Key 这些核心概念,我们就掌握了 RocketMQ 的消息组织方式: ...

2025-11-13 · maneng

RocketMQ入门05:核心概念详解(上)- Producer、Consumer、Broker

引言:理解核心,掌握全局 如果把 RocketMQ 比作一个物流系统,那么: Producer 是发货方,负责打包发送包裹 Broker 是物流中心,负责存储转运包裹 Consumer 是收货方,负责签收处理包裹 理解这三个核心组件,就掌握了 RocketMQ 的精髓。 一、Producer(生产者)详解 1.1 Producer 架构 // Producer 内部结构 public class ProducerArchitecture { // 核心组件 private MQClientInstance clientInstance; // 客户端实例 private MQClientAPIImpl mqClientAPI; // 通信层 private TopicPublishInfo topicPublishInfo; // 路由信息 private SendMessageHook sendHook; // 发送钩子 private DefaultMQProducerImpl impl; // 核心实现 } Producer 内部架构: ┌──────────────────────────────────┐ │ 应用程序 │ ├──────────────────────────────────┤ │ DefaultMQProducer │ <- API 层 ├──────────────────────────────────┤ │ DefaultMQProducerImpl │ <- 核心实现 ├──────────────────────────────────┤ │ ┌──────────┬─────────────────┐ │ │ │路由管理 │ 消息发送器 │ │ │ ├──────────┼─────────────────┤ │ │ │故障规避 │ 异步发送处理 │ │ │ ├──────────┼─────────────────┤ │ │ │负载均衡 │ 事务消息处理 │ │ │ └──────────┴─────────────────┘ │ ├──────────────────────────────────┤ │ Netty 通信层 │ └──────────────────────────────────┘ 1.2 Producer 生命周期 public class ProducerLifecycle { // 1. 创建阶段 public void create() { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup"); producer.setNamesrvAddr("localhost:9876"); producer.setRetryTimesWhenSendFailed(3); // 同步发送失败重试次数 producer.setRetryTimesWhenSendAsyncFailed(2); // 异步发送失败重试次数 producer.setSendMsgTimeout(3000); // 发送超时时间 } // 2. 启动阶段 public void start() throws MQClientException { producer.start(); // 内部流程: // a. 检查配置 // b. 获取 NameServer 地址 // c. 启动定时任务(路由更新、心跳) // d. 启动消息发送线程池 // e. 注册 Producer 到所有 Broker } // 3. 运行阶段 public void running() throws Exception { // 获取路由信息 TopicRouteData routeData = producer.getDefaultMQProducerImpl() .getmQClientFactory() .getMQClientAPIImpl() .getTopicRouteInfoFromNameServer("TopicTest", 3000); // 选择消息队列 MessageQueue mq = selectMessageQueue(routeData); // 发送消息 SendResult result = producer.send(message, mq); } // 4. 关闭阶段 public void shutdown() { producer.shutdown(); // 内部流程: // a. 注销 Producer // b. 关闭网络连接 // c. 清理资源 // d. 停止定时任务 } } 1.3 消息发送流程 // 消息发送核心流程 public class SendMessageFlow { public SendResult sendMessage(Message msg) { // 1. 验证消息 Validators.checkMessage(msg, producer); // 2. 获取路由信息 TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(msg.getTopic()); // 3. 选择消息队列(负载均衡) MessageQueue mq = selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 4. 消息发送前处理 msg.setBody(compress(msg.getBody())); // 压缩 msg.setProperty("UNIQ_KEY", MessageClientIDSetter.createUniqID()); // 唯一ID // 5. 发送消息 SendResult sendResult = sendKernelImpl( msg, mq, communicationMode, // SYNC, ASYNC, ONEWAY sendCallback, timeout ); // 6. 发送后处理(钩子) if (hasSendMessageHook()) { sendMessageHook.sendMessageAfter(context); } return sendResult; } } 1.4 负载均衡策略 public class ProducerLoadBalance { // 默认策略:轮询 + 故障规避 public MessageQueue selectOneMessageQueue(TopicPublishInfo tpInfo, String lastBrokerName) { // 启用故障规避 if (this.sendLatencyFaultEnable) { // 1. 优先选择延迟小的 Broker for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int index = Math.abs(sendWhichQueue.incrementAndGet()) % queueSize; MessageQueue mq = tpInfo.getMessageQueueList().get(index); // 检查 Broker 是否可用 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // 避免连续发送到同一个 Broker if (null == lastBrokerName || !mq.getBrokerName().equals(lastBrokerName)) { return mq; } } } // 2. 如果都不可用,选择延迟最小的 MessageQueue mq = latencyFaultTolerance.pickOneAtLeast(); return mq; } // 不启用故障规避:简单轮询 return selectOneMessageQueue(); } // 故障延迟统计 class LatencyStats { // 根据发送耗时计算 Broker 不可用时长 // 耗时:50ms, 100ms, 550ms, 1000ms, 2000ms, 3000ms, 15000ms // 规避:0ms, 0ms, 30s, 60s, 120s, 180s, 600s private final long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private final long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; } } 二、Consumer(消费者)详解 2.1 Consumer 架构 Consumer 内部架构: ┌──────────────────────────────────┐ │ 应用程序 │ ├──────────────────────────────────┤ │ DefaultMQPushConsumer │ <- Push 模式 API │ DefaultMQPullConsumer │ <- Pull 模式 API ├──────────────────────────────────┤ │ ┌──────────────────────────┐ │ │ │ 消费者核心实现 │ │ │ ├──────────────────────────┤ │ │ │ Rebalance 负载均衡 │ │ │ │ ProcessQueue 处理队列 │ │ │ │ ConsumeMessageService │ │ │ │ 消费进度管理 │ │ │ └──────────────────────────┘ │ ├──────────────────────────────────┤ │ PullMessage 长轮询服务 │ ├──────────────────────────────────┤ │ Netty 通信层 │ └──────────────────────────────────┘ 2.2 Push vs Pull 模式 // Push 模式(推荐) public class PushConsumerDemo { public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup"); // Push 模式本质:长轮询拉取 // 1. Consumer 主动拉取消息 // 2. 如果没有消息,Broker hold 住请求 // 3. 有新消息时,立即返回 // 4. 看起来像是 Broker 推送 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> messages, ConsumeConcurrentlyContext context) { // 处理消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } } // Pull 模式(特殊场景) public class PullConsumerDemo { public void consume() throws Exception { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ConsumerGroup"); consumer.start(); // 手动拉取消息 Set<MessageQueue> mqs = consumer.fetchMessageQueuesInBalance("TopicTest"); for (MessageQueue mq : mqs) { PullResult pullResult = consumer.pullBlockIfNotFound( mq, // 消息队列 null, // 过滤表达式 getOffset(mq), // 消费位点 32 // 最大拉取数量 ); // 处理消息 for (MessageExt msg : pullResult.getMsgFoundList()) { processMessage(msg); } // 更新消费位点 updateOffset(mq, pullResult.getNextBeginOffset()); } } } 2.3 消费者 Rebalance 机制 public class ConsumerRebalance { // Rebalance 触发条件: // 1. Consumer 数量变化(上线/下线) // 2. Topic 的 Queue 数量变化 // 3. 消费者订阅关系变化 public void rebalanceProcess() { // 1. 获取 Topic 的所有 Queue Set<MessageQueue> mqSet = topicSubscribeInfoTable.get(topic); // 2. 获取消费组的所有 Consumer List<String> cidAll = findConsumerIdList(topic, consumerGroup); // 3. 排序(保证所有 Consumer 看到一致的视图) Collections.sort(mqAll); Collections.sort(cidAll); // 4. 分配策略 AllocateMessageQueueStrategy strategy = getAllocateMessageQueueStrategy(); List<MessageQueue> allocateResult = strategy.allocate( consumerGroup, currentCID, mqAll, cidAll ); // 5. 更新本地队列 updateProcessQueueTableInRebalance(topic, allocateResult); } // 分配策略示例:平均分配 public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { public List<MessageQueue> allocate( String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); // 计算分配数量 int averageSize = (mod > 0 && index < mod) ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size(); // 计算起始位置 int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 分配队列 return mqAll.subList(startIndex, Math.min(startIndex + averageSize, mqAll.size())); } } } 三、Broker 详解 3.1 Broker 架构 Broker 完整架构: ┌────────────────────────────────────────┐ │ 客户端请求 │ ├────────────────────────────────────────┤ │ Remoting 通信层 │ ├────────────────────────────────────────┤ │ ┌──────────────┬─────────────────┐ │ │ │ Processor │ 业务处理器 │ │ │ ├──────────────┼─────────────────┤ │ │ │SendMessage │ 接收消息 │ │ │ │PullMessage │ 拉取消息 │ │ │ │QueryMessage │ 查询消息 │ │ │ │AdminRequest │ 管理请求 │ │ │ └──────────────┴─────────────────┘ │ ├────────────────────────────────────────┤ │ Store 存储层 │ │ ┌──────────────────────────────┐ │ │ │ CommitLog(消息存储) │ │ │ ├──────────────────────────────┤ │ │ │ ConsumeQueue(消费索引) │ │ │ ├──────────────────────────────┤ │ │ │ IndexFile(消息索引) │ │ │ └──────────────────────────────┘ │ ├────────────────────────────────────────┤ │ HA 高可用模块 │ ├────────────────────────────────────────┤ │ Schedule 定时任务 │ └────────────────────────────────────────┘ 3.2 Broker 消息存储 public class BrokerStorage { // 1. CommitLog:消息主体存储 class CommitLog { // 所有消息顺序写入 // 单个文件默认 1GB // 文件名为起始偏移量 private final MappedFileQueue mappedFileQueue; public PutMessageResult putMessage(MessageExtBrokerInner msg) { // 获取当前写入文件 MappedFile mappedFile = mappedFileQueue.getLastMappedFile(); // 写入消息 AppendMessageResult result = mappedFile.appendMessage(msg, this.appendMessageCallback); // 刷盘策略 handleDiskFlush(result, msg); // 主从同步 handleHA(result, msg); return new PutMessageResult(PutMessageStatus.PUT_OK, result); } } // 2. ConsumeQueue:消费队列 class ConsumeQueue { // 存储格式:8字节 CommitLog 偏移量 + 4字节消息大小 + 8字节 Tag HashCode // 单个文件 30W 条记录,约 5.72MB private final MappedFileQueue mappedFileQueue; public void putMessagePositionInfo( long offset, // CommitLog 偏移量 int size, // 消息大小 long tagsCode, // Tag HashCode long cqOffset) { // ConsumeQueue 偏移量 // 写入索引 this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode); // 更新最大偏移量 this.maxPhysicOffset = offset + size; } } // 3. IndexFile:消息索引 class IndexFile { // 支持按 Key 或时间区间查询 // 存储格式:Header(40B) + Slot(500W*4B) + Index(2000W*20B) // 单个文件约 400MB public void putKey(String key, long phyOffset, long storeTimestamp) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; // 存储索引 // ... } } } 3.3 Broker 刷盘机制 public class FlushDiskStrategy { // 同步刷盘 class SyncFlush { public void flush() { // 写入 PageCache 后立即刷盘 // 优点:数据可靠性高 // 缺点:性能差 if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { GroupCommitService service = (GroupCommitService) this.flushCommitLogService; // 等待刷盘完成 boolean flushResult = service.waitForFlush(timeout); if (!flushResult) { log.error("Sync flush timeout"); throw new RuntimeException("Flush timeout"); } } } } // 异步刷盘(默认) class AsyncFlush { public void flush() { // 写入 PageCache 后立即返回 // 后台线程定时刷盘 // 优点:性能高 // 缺点:可能丢失数据(机器宕机) // 定时刷盘(默认 500ms) this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { // 刷盘条件: // 1. 距上次刷盘超过 10s // 2. 脏页超过 4 页 // 3. 收到关闭信号 if (System.currentTimeMillis() - lastFlushTime > 10000 || dirtyPages > 4) { mappedFile.flush(0); lastFlushTime = System.currentTimeMillis(); dirtyPages = 0; } } }, 500, 500, TimeUnit.MILLISECONDS); } } } 3.4 Broker 主从同步 public class BrokerHA { // Master Broker class HAService { // 接受 Slave 连接 private AcceptSocketService acceptSocketService; // 管理多个 Slave 连接 private List<HAConnection> connectionList; public void start() { this.acceptSocketService.start(); this.groupTransferService.start(); this.haClient.start(); } // 等待 Slave 同步 public boolean waitForSlaveSync(long masterPutWhere) { // 同步复制模式 if (messageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SYNC_MASTER) { // 等待至少一个 Slave 同步完成 for (HAConnection conn : connectionList) { if (conn.getSlaveAckOffset() >= masterPutWhere) { return true; } } return false; } // 异步复制模式:直接返回 return true; } } // Slave Broker class HAClient { private SocketChannel socketChannel; public void run() { while (!this.isStopped()) { try { // 连接 Master connectMaster(); // 上报同步进度 reportSlaveMaxOffset(); // 接收数据 boolean ok = processReadEvent(); // 处理数据 dispatchReadRequest(); } catch (Exception e) { log.error("HAClient error", e); } } } } } 四、组件交互流程 4.1 完整的消息流转 消息完整流转过程: ┌─────────────┐ │ Producer │ └──────┬──────┘ │ 1. Send Message ↓ ┌─────────────┐ │ NameServer │ <- 2. Get Route Info └──────┬──────┘ │ 3. Route Info ↓ ┌─────────────┐ │ Broker │ │ ┌─────────┐ │ │ │CommitLog│ │ <- 4. Store Message │ └─────────┘ │ │ ┌─────────┐ │ │ │ConsumeQ │ │ <- 5. Build Index │ └─────────┘ │ └──────┬──────┘ │ 6. Pull Message ↓ ┌─────────────┐ │ Consumer │ └─────────────┘ 4.2 心跳机制 public class HeartbeatMechanism { // Producer/Consumer → Broker class ClientHeartbeat { // 每 30 秒发送一次心跳 private final long heartbeatInterval = 30000; public void sendHeartbeatToAllBroker() { HeartbeatData heartbeatData = prepareHeartbeatData(); for (Entry<String, HashMap<Long, String>> entry : brokerAddrTable.entrySet()) { String brokerName = entry.getKey(); for (Entry<Long, String> innerEntry : entry.getValue().entrySet()) { String addr = innerEntry.getValue(); // 发送心跳 this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000); } } } } // Broker → NameServer class BrokerHeartbeat { // 每 30 秒注册一次 private final long registerInterval = 30000; public void registerBrokerAll() { RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); // 注册到所有 NameServer for (String namesrvAddr : namesrvAddrList) { registerBroker(namesrvAddr, requestBody); } } } } 五、配置优化建议 5.1 Producer 优化 // 性能优化配置 producer.setSendMsgTimeout(3000); // 发送超时 producer.setCompressMsgBodyOverHowmuch(4096); // 消息压缩阈值 producer.setMaxMessageSize(4 * 1024 * 1024); // 最大消息大小 producer.setRetryTimesWhenSendFailed(2); // 重试次数 producer.setSendLatencyFaultEnable(true); // 故障规避 5.2 Consumer 优化 // 消费优化配置 consumer.setConsumeThreadMin(20); // 最小消费线程数 consumer.setConsumeThreadMax(64); // 最大消费线程数 consumer.setConsumeMessageBatchMaxSize(1); // 批量消费数量 consumer.setPullBatchSize(32); // 批量拉取数量 consumer.setPullInterval(0); // 拉取间隔 consumer.setConsumeConcurrentlyMaxSpan(2000); // 消费进度延迟阈值 5.3 Broker 优化 # broker.conf 优化配置 # 刷盘策略 flushDiskType=ASYNC_FLUSH # 同步刷盘超时时间 syncFlushTimeout=5000 # 消息最大大小 maxMessageSize=65536 # 发送线程池大小 sendMessageThreadPoolNums=128 # 拉取线程池大小 pullMessageThreadPoolNums=128 # Broker 角色 brokerRole=ASYNC_MASTER 六、总结 通过本篇,我们深入理解了: ...

2025-11-13 · maneng

RocketMQ入门04:快速上手 - Docker一键部署与第一条消息

引言:5分钟上手 RocketMQ 不需要复杂的环境配置,不需要编译源码,通过 Docker,我们可以在 5 分钟内搭建一个完整的 RocketMQ 环境,并成功发送第一条消息。 让我们开始这段激动人心的旅程! 一、环境准备 1.1 前置要求 # 检查 Docker 版本(需要 20.10+) docker --version # Docker version 24.0.5, build ced0996 # 检查 Docker Compose 版本(需要 2.0+) docker-compose --version # Docker Compose version v2.20.2 # 检查系统资源 # 建议:4GB+ 内存,10GB+ 磁盘空间 docker system info | grep -E "Memory|Storage" 1.2 创建项目目录 # 创建项目目录 mkdir -p ~/rocketmq-demo cd ~/rocketmq-demo # 创建必要的子目录 mkdir -p data/namesrv/logs data/broker/logs data/broker/store mkdir -p conf 二、Docker Compose 部署 2.1 编写 docker-compose.yml # docker-compose.yml version: '3.8' services: # NameServer 服务 namesrv: image: apache/rocketmq:5.1.3 container_name: rocketmq-namesrv ports: - "9876:9876" environment: JAVA_OPT: "-Duser.home=/opt" JAVA_OPT_EXT: "-Xms512M -Xmx512M -Xmn128m" volumes: - ./data/namesrv/logs:/opt/logs command: sh mqnamesrv networks: - rocketmq-net # Broker 服务 broker: image: apache/rocketmq:5.1.3 container_name: rocketmq-broker ports: - "10909:10909" - "10911:10911" - "10912:10912" environment: JAVA_OPT_EXT: "-Xms1G -Xmx1G -Xmn512m" volumes: - ./data/broker/logs:/opt/logs - ./data/broker/store:/opt/store - ./conf/broker.conf:/opt/conf/broker.conf command: sh mqbroker -n namesrv:9876 -c /opt/conf/broker.conf depends_on: - namesrv networks: - rocketmq-net # 控制台服务 dashboard: image: apacherocketmq/rocketmq-dashboard:latest container_name: rocketmq-dashboard ports: - "8080:8080" environment: JAVA_OPTS: "-Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" depends_on: - namesrv - broker networks: - rocketmq-net networks: rocketmq-net: driver: bridge 2.2 配置 Broker # conf/broker.conf cat > conf/broker.conf << 'EOF' # 集群名称 brokerClusterName = DefaultCluster # broker 名称 brokerName = broker-a # 0 表示 Master,>0 表示 Slave brokerId = 0 # 删除文件时间点,默认凌晨 4 点 deleteWhen = 04 # 文件保留时间,默认 48 小时 fileReservedTime = 48 # Broker 的角色 brokerRole = ASYNC_MASTER # 刷盘方式 flushDiskType = ASYNC_FLUSH # 存储路径 storePathRootDir = /opt/store # 队列存储路径 storePathCommitLog = /opt/store/commitlog # 自动创建 Topic autoCreateTopicEnable = true # 自动创建订阅组 autoCreateSubscriptionGroup = true # Broker 监听端口 listenPort = 10911 # 是否允许 Broker 自动创建 Topic brokerIP1 = broker EOF 2.3 启动服务 # 启动所有服务 docker-compose up -d # 查看服务状态 docker-compose ps # 输出示例: # NAME IMAGE STATUS # rocketmq-namesrv apache/rocketmq:5.1.3 Up 30 seconds # rocketmq-broker apache/rocketmq:5.1.3 Up 20 seconds # rocketmq-dashboard apacherocketmq/rocketmq-dashboard:latest Up 10 seconds # 查看日志 docker-compose logs -f broker 三、发送第一条消息 3.1 创建 Java 项目 <!-- pom.xml --> <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>rocketmq-demo</artifactId> <version>1.0.0</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- RocketMQ 客户端 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.5</version> </dependency> <!-- 日志 --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.11</version> </dependency> </dependencies> </project> 3.2 编写生产者代码 // QuickProducer.java package com.example.rocketmq; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class QuickProducer { public static void main(String[] args) throws Exception { // 1. 创建生产者,指定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("quick_producer_group"); // 2. 指定 NameServer 地址 producer.setNamesrvAddr("localhost:9876"); // 3. 启动生产者 producer.start(); System.out.println("Producer Started."); // 4. 发送消息 for (int i = 0; i < 10; i++) { // 创建消息对象 Message msg = new Message( "QuickStartTopic", // Topic "TagA", // Tag ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); // 发送消息 SendResult sendResult = producer.send(msg); // 打印发送结果 System.out.printf("%s%n", sendResult); Thread.sleep(1000); } // 5. 关闭生产者 producer.shutdown(); System.out.println("Producer Shutdown."); } } 3.3 编写消费者代码 // QuickConsumer.java package com.example.rocketmq; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import java.util.List; public class QuickConsumer { public static void main(String[] args) throws Exception { // 1. 创建消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quick_consumer_group"); // 2. 指定 NameServer 地址 consumer.setNamesrvAddr("localhost:9876"); // 3. 订阅 Topic 和 Tag consumer.subscribe("QuickStartTopic", "*"); // 4. 设置消费模式(默认集群模式) consumer.setMessageModel(MessageModel.CLUSTERING); // 5. 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { System.out.printf("Consume message: %s%n", new String(message.getBody())); } // 返回消费状态:成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 6. 启动消费者 consumer.start(); System.out.println("Consumer Started."); // 保持运行 Thread.sleep(Long.MAX_VALUE); } } 四、运行测试 4.1 运行步骤 # 1. 先启动消费者(新终端) mvn compile exec:java -Dexec.mainClass="com.example.rocketmq.QuickConsumer" # 输出: # Consumer Started. # 等待消息... # 2. 启动生产者(新终端) mvn compile exec:java -Dexec.mainClass="com.example.rocketmq.QuickProducer" # 输出: # Producer Started. # SendResult [sendStatus=SEND_OK, msgId=7F00000100002A9F0000000000000000, ... # SendResult [sendStatus=SEND_OK, msgId=7F00000100002A9F00000000000000B8, ... # ... # Producer Shutdown. # 3. 查看消费者输出 # Consume message: Hello RocketMQ 0 # Consume message: Hello RocketMQ 1 # ... 4.2 使用控制台查看 打开浏览器访问:http://localhost:8080 ...

2025-11-13 · maneng

RocketMQ入门03:RocketMQ初识 - 阿里开源消息中间件的前世今生

引言:一个技术选型的故事 2010年,淘宝的技术团队面临一个棘手的问题: 双11大促期间,订单量瞬间飙升100倍,原有的 ActiveMQ 集群直接崩溃。几百万用户的订单消息堆积,下游系统无法处理,整个交易链路几近瘫痪。 这不是简单的扩容能解决的问题。他们需要一个全新的消息中间件,一个真正理解电商业务、能扛住双11洪峰的消息系统。 这就是 RocketMQ 诞生的开始。 一、RocketMQ 的前世:从 Notify 到 MetaQ 1.1 第一代:Notify(2007-2010) 背景问题: - 淘宝业务快速增长,系统解耦需求迫切 - 使用 ActiveMQ,遇到性能瓶颈 - 运维成本高,稳定性不足 Notify 1.0 特点: - 基于 ActiveMQ 5.1 改造 - 增加了消息存储优化 - 简单的集群支持 - 问题:Java 性能瓶颈,10K 消息就卡顿 1.2 第二代:MetaQ(2010-2012) // MetaQ 的设计转变 // 从 JMS 规范 → 自定义协议 // 从 随机读写 → 顺序写入 public class MetaQDesign { // 核心创新:完全顺序写入 CommitLog commitLog; // 顺序写入 ConsumeQueue consumeQueue; // 消费索引 // 借鉴 Kafka 的设计 // 但针对电商场景优化 - 支持事务消息 - 支持定时消息 - 支持消息查询 } MetaQ 的突破: ...

2025-11-13 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...