RocketMQ架构03:Broker架构与核心组件 - 消息中枢的内部世界

引言:消息系统的心脏 如果说 NameServer 是 RocketMQ 的"大脑"(路由中心),那么 Broker 就是"心脏"(消息中枢)。它负责: 消息存储:持久化所有消息 消息处理:接收生产者消息,推送给消费者 高可用保障:主从同步、故障转移 性能优化:零拷贝、顺序写、内存映射 今天我们从第一性原理出发,逐步理解 Broker 的内部世界。 一、Broker 的核心职责 1.1 从需求出发 假设我们要设计一个消息存储节点,需要解决哪些问题? 基础需求: 1. 接收消息 → 需要网络通信模块 2. 存储消息 → 需要存储引擎 3. 分发消息 → 需要索引和查询机制 4. 保证可靠 → 需要主从同步 5. 高性能 → 需要优化 I/O Broker 就是围绕这5个核心需求设计的。 1.2 Broker 的三重身份 ┌─────────────────────────────────────┐ │ Broker 的三重身份 │ ├─────────────────────────────────────┤ │ 1. 消息接收器(Producer 视角) │ │ - 接收消息请求 │ │ - 验证权限 │ │ - 返回存储结果 │ ├─────────────────────────────────────┤ │ 2. 消息存储库(系统视角) │ │ - 持久化消息 │ │ - 管理索引 │ │ - 定期清理 │ ├─────────────────────────────────────┤ │ 3. 消息分发器(Consumer 视角) │ │ - 根据订阅关系推送消息 │ │ - 管理消费进度 │ │ - 支持消息重试 │ └─────────────────────────────────────┘ 二、Broker 架构全景 2.1 核心组件架构图 ┌────────────────────────────────────────────────────────┐ │ Broker 节点 │ ├────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ Remoting 模块(网络通信层) │ │ │ │ - NettyServer(接收请求) │ │ │ │ - RequestProcessor(请求处理器) │ │ │ └──────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ Message Store 模块(存储引擎) │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │ CommitLog │ │ConsumeQueue│ │ IndexFile │ │ │ │ │ │ 消息主存储 │ │ 消费索引 │ │ 查询索引 │ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ │ │ │ └──────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ HA 模块(高可用) │ │ │ │ - HAService(主从同步) │ │ │ │ - CommitLogDispatcher(消息分发) │ │ │ └──────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ 其他核心模块 │ │ │ │ - TopicConfigManager(Topic 配置管理) │ │ │ │ - ConsumerOffsetManager(消费进度管理) │ │ │ │ - ScheduleMessageService(延迟消息) │ │ │ │ - TransactionalMessageService(事务消息) │ │ │ └──────────────────────────────────────────────────┘ │ │ │ └────────────────────────────────────────────────────────┘ 2.2 各模块职责详解 1️⃣ Remoting 模块(网络通信) // 核心职责 1. 接收网络请求(基于 Netty) 2. 请求路由与分发 3. 编解码(序列化/反序列化) 4. 返回响应结果 // 处理的请求类型 - SEND_MESSAGE // 发送消息 - PULL_MESSAGE // 拉取消息 - QUERY_MESSAGE // 查询消息 - HEART_BEAT // 心跳 - REGISTER_BROKER // Broker 注册 关键设计: ...

2025-11-13 · maneng

RocketMQ架构01:整体架构设计 - NameServer、Broker、Producer、Consumer

引言:一个精妙的分布式系统 如果把 RocketMQ 比作一座现代化城市,那么: NameServer 是城市规划局,管理所有地图信息 Broker 是物流仓库,存储和转运货物 Producer 是发货方,把货物送到仓库 Consumer 是收货方,从仓库取货 今天,我们从整体视角理解这座"消息城市"的运作机制。 一、RocketMQ 整体架构 1.1 核心组件 RocketMQ 架构全景图: ┌────────────────────────────────────────────┐ │ NameServer Cluster │ │ (路由注册中心、服务发现、轻量级协调器) │ └────────┬───────────────────┬───────────────┘ │ │ 注册/心跳 │ │ 获取路由 │ │ ┌────────────▼──────┐ ┌────▼────────────────┐ │ │ │ │ │ Broker Cluster │ │ Producer/Consumer │ │ │ │ │ │ ┌───────────────┐ │ │ 应用程序 │ │ │ Master Broker │ │◄────┤ │ │ │ - CommitLog │ │ 读写 │ │ │ │ - ConsumeQ │ │ 消息 │ │ │ │ - IndexFile │ │ │ │ │ └───────┬───────┘ │ └─────────────────────┘ │ │ 主从同步 │ │ ┌───────▼───────┐ │ │ │ Slave Broker │ │ │ │ - CommitLog │ │ │ │ - ConsumeQ │ │ │ │ - IndexFile │ │ │ └───────────────┘ │ └───────────────────┘ 1.2 组件职责矩阵 组件职责对比: ┌──────────────┬────────────────────────────────────────┐ │ 组件 │ 核心职责 │ ├──────────────┼────────────────────────────────────────┤ │ NameServer │ - 路由信息管理(Topic → Broker 映射) │ │ │ - Broker 注册与心跳检测 │ │ │ - 提供路由信息查询 │ │ │ - 无状态、对等集群 │ ├──────────────┼────────────────────────────────────────┤ │ Broker │ - 消息存储(CommitLog) │ │ │ - 消息索引(ConsumeQueue、IndexFile) │ │ │ - 消息查询与消费 │ │ │ - 高可用(主从复制、Dledger) │ ├──────────────┼────────────────────────────────────────┤ │ Producer │ - 消息生产 │ │ │ - 消息发送(同步、异步、单向) │ │ │ - 负载均衡(选择Broker和Queue) │ │ │ - 故障规避 │ ├──────────────┼────────────────────────────────────────┤ │ Consumer │ - 消息消费 │ │ │ - 消息拉取(长轮询) │ │ │ - 负载均衡(Rebalance) │ │ │ - 消费进度管理 │ └──────────────┴────────────────────────────────────────┘ 二、NameServer 详解 2.1 为什么需要 NameServer public class WhyNameServer { // 问题1:Producer/Consumer 如何知道 Broker 在哪里? class ServiceDiscovery { // 没有 NameServer: // - 硬编码 Broker 地址(不灵活) // - 无法动态感知 Broker 变化 // 有了 NameServer: // - 动态获取 Broker 列表 // - 自动感知 Broker 上下线 } // 问题2:Topic 分布在哪些 Broker 上? class TopicRoute { // NameServer 维护 Topic 路由信息 class TopicRouteData { List<QueueData> queueDatas; // Queue 分布 List<BrokerData> brokerDatas; // Broker 信息 } } // 问题3:为什么不用 ZooKeeper? class WhyNotZooKeeper { /* ZooKeeper 的问题: 1. CP 系统(强一致性)- RocketMQ 只需 AP(可用性优先) 2. 重量级依赖 - NameServer 更轻量 3. 运维复杂 - NameServer 无状态更简单 4. 性能开销 - NameServer 内存操作更快 NameServer 的优势: 1. 完全无状态 2. 对等集群(任意节点提供服务) 3. 内存操作(无磁盘 IO) 4. 简单高效 */ } } 2.2 NameServer 核心功能 public class NameServerCore { // 1. 路由信息管理 class RouteInfoManager { // Topic → Broker 映射 private HashMap<String/* topic */, List<QueueData>> topicQueueTable; // Broker 名称 → Broker 数据 private HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // Broker 地址 → 集群信息 private HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // 集群名称 → Broker 名称列表 private HashMap<String/* clusterName */, Set<String>> clusterAddrTable; // 过滤服务器 private HashMap<String/* brokerAddr */, List<String>> filterServerTable; } // 2. Broker 注册 public RegisterBrokerResult registerBroker( String clusterName, String brokerAddr, String brokerName, long brokerId, String haServerAddr, TopicConfigSerializeWrapper topicConfigWrapper) { // 创建或更新 Broker 数据 BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { brokerData = new BrokerData(clusterName, brokerName, new HashMap<>()); this.brokerAddrTable.put(brokerName, brokerData); } brokerData.getBrokerAddrs().put(brokerId, brokerAddr); // 更新 Topic 路由信息 if (null != topicConfigWrapper) { for (Map.Entry<String, TopicConfig> entry : topicConfigWrapper.getTopicConfigTable().entrySet()) { createAndUpdateQueueData(brokerName, entry.getValue()); } } // 更新 Broker 存活信息 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put( brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr ) ); return new RegisterBrokerResult(...); } // 3. 路由查询 public TopicRouteData getTopicRouteData(String topic) { TopicRouteData topicRouteData = new TopicRouteData(); // 查找 Topic 的 Queue 信息 List<QueueData> queueDataList = this.topicQueueTable.get(topic); // 查找对应的 Broker 信息 Set<String> brokerNameSet = new HashSet<>(); for (QueueData qd : queueDataList) { brokerNameSet.add(qd.getBrokerName()); } // 组装路由数据 for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (brokerData != null) { topicRouteData.getBrokerDatas().add(brokerData.clone()); } } topicRouteData.setQueueDatas(queueDataList); return topicRouteData; } // 4. Broker 心跳检测 public void scanNotActiveBroker() { long timeoutMillis = 120000; // 2 分钟超时 for (Map.Entry<String, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) { long last = entry.getValue().getLastUpdateTimestamp(); if ((last + timeoutMillis) < System.currentTimeMillis()) { // Broker 超时,移除 RemotingUtil.closeChannel(entry.getValue().getChannel()); this.brokerLiveTable.remove(entry.getKey()); // 清理路由信息 this.onChannelDestroy(entry.getKey(), entry.getValue().getChannel()); log.warn("Broker timeout: {}", entry.getKey()); } } } } 2.3 NameServer 集群 public class NameServerCluster { // NameServer 特点 class Characteristics { /* 1. 完全无状态 - 不持久化任何数据 - 所有数据在内存中 - 重启后数据丢失(从 Broker 重新获取) 2. 对等部署 - 所有节点功能完全一致 - 无主从之分 - 无需选举 3. 不互相通信 - NameServer 之间无任何通信 - 数据可能短暂不一致 - 最终一致即可 4. Broker 全量注册 - Broker 向所有 NameServer 注册 - 保证数据冗余 */ } // 部署架构 class DeploymentArchitecture { /* NameServer 集群(推荐3个节点): ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ └────────────────┼────────────────┘ │ ┌───────▼────────┐ │ │ │ Broker 集群 │ │ (全量注册) │ │ │ └────────────────┘ */ } } 三、Broker 详解 3.1 Broker 核心架构 Broker 内部架构: ┌─────────────────────────────────────────────────────┐ │ Broker │ │ ┌────────────────────────────────────────────────┐│ │ │ Remoting 通信层 ││ │ │ (接收 Producer/Consumer 请求) ││ │ └────────┬───────────────────────────────────────┘│ │ │ │ │ ┌────────▼────────────────────────────┐ │ │ │ BrokerController │ │ │ │ - 处理器注册 │ │ │ │ - 定时任务管理 │ │ │ │ - 消息处理 │ │ │ └────────┬────────────────────────────┘ │ │ │ │ │ ┌────────▼─────────────┬───────────────────┐ │ │ │ MessageStore │ HAService │ │ │ │ ┌────────────────┐ │ (主从复制) │ │ │ │ │ CommitLog │ │ │ │ │ │ │ (消息存储) │ │ │ │ │ │ └────────────────┘ │ │ │ │ │ ┌────────────────┐ │ │ │ │ │ │ ConsumeQueue │ │ │ │ │ │ │ (消费索引) │ │ │ │ │ │ └────────────────┘ │ │ │ │ │ ┌────────────────┐ │ │ │ │ │ │ IndexFile │ │ │ │ │ │ │ (消息索引) │ │ │ │ │ │ └────────────────┘ │ │ │ │ └──────────────────────┴───────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ 定时任务 │ │ │ │ - 持久化 ConsumeQueue │ │ │ │ - 持久化 CommitLog │ │ │ │ - 清理过期文件 │ │ │ │ - 统计信息 │ │ │ └─────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────┘ 3.2 Broker 核心职责 public class BrokerCore { // 1. 消息接收与存储 class MessageReceive { public PutMessageResult putMessage(MessageExtBrokerInner msg) { // 写入 CommitLog PutMessageResult result = this.commitLog.putMessage(msg); // 触发刷盘 handleDiskFlush(result, msg); // 触发主从同步 handleHA(result, msg); return result; } } // 2. 消息查询 class MessageQuery { // 按消息 ID 查询 public MessageExt lookMessageByOffset(long commitLogOffset) { return this.commitLog.lookMessageByOffset(commitLogOffset); } // 按 Key 查询 public QueryMessageResult queryMessage(String topic, String key, int maxNum) { return this.indexService.queryMessage(topic, key, maxNum); } // 按时间查询 public QueryMessageResult queryMessageByTime(String topic, long begin, long end) { return this.messageStore.queryMessage(topic, begin, end, maxNum); } } // 3. 消息消费 class MessageConsume { public GetMessageResult getMessage( String group, String topic, int queueId, long offset, int maxMsgNums, SubscriptionData subscriptionData) { // 从 ConsumeQueue 查找消息位置 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); // 根据 offset 获取消息 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); // 从 CommitLog 读取消息内容 List<MessageExt> messageList = new ArrayList<>(); for (int i = 0; i < maxMsgNums; i++) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); MessageExt msg = lookMessageByOffset(offsetPy, sizePy); // 消息过滤 if (match(subscriptionData, msg)) { messageList.add(msg); } } return new GetMessageResult(messageList); } } // 4. Topic 创建 class TopicManagement { public void createTopic(String topic, int queueNums) { TopicConfig topicConfig = new TopicConfig(topic); topicConfig.setReadQueueNums(queueNums); topicConfig.setWriteQueueNums(queueNums); topicConfig.setPerm(6); // 读写权限 this.topicConfigTable.put(topic, topicConfig); // 创建 ConsumeQueue for (int i = 0; i < queueNums; i++) { ConsumeQueue logic = new ConsumeQueue(topic, i); this.consumeQueueTable.put(buildKey(topic, i), logic); } } } // 5. 消费进度管理 class ConsumerOffsetManagement { private ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable; public void commitOffset(String group, String topic, int queueId, long offset) { String key = topic + "@" + group; ConcurrentHashMap<Integer, Long> map = offsetTable.get(key); if (map != null) { map.put(queueId, offset); } } public long queryOffset(String group, String topic, int queueId) { String key = topic + "@" + group; ConcurrentHashMap<Integer, Long> map = offsetTable.get(key); if (map != null) { Long offset = map.get(queueId); return offset != null ? offset : -1; } return -1; } } } 3.3 Broker 高可用 public class BrokerHA { // 主从架构 class MasterSlave { /* 主从模式: ┌──────────────┐ 同步 ┌──────────────┐ │ Master Broker│ ──────────> │ Slave Broker │ │ - 读写 │ │ - 只读 │ └──────────────┘ └──────────────┘ 同步方式: 1. 同步复制(SYNC_MASTER): - Master 等待 Slave 同步完成才返回 - 可靠性高,性能稍差 2. 异步复制(ASYNC_MASTER): - Master 立即返回,后台异步同步 - 性能高,可能丢失少量数据 */ } // Dledger 模式 class DledgerMode { /* 基于 Raft 的自动选主: ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │ (Leader) │◄─▶│(Follower)│◄─▶│(Follower)│ └──────────┘ └──────────┘ └──────────┘ 特点: 1. 自动故障转移 2. 无需人工干预 3. 保证数据一致性 4. 至少3个节点 */ } } 四、Producer 工作原理 4.1 Producer 架构 public class ProducerArchitecture { // Producer 核心组件 class ProducerComponents { private MQClientInstance clientInstance; // 客户端实例 private MQClientAPIImpl mqClientAPI; // 通信API private TopicPublishInfo publishInfo; // Topic路由信息 private SendMessageHook sendHook; // 发送钩子 } // 发送流程 public SendResult send(Message msg) { // 1. 查找 Topic 路由信息 TopicPublishInfo publishInfo = tryToFindTopicPublishInfo(msg.getTopic()); // 2. 选择消息队列 MessageQueue mq = selectOneMessageQueue(publishInfo); // 3. 发送消息 SendResult result = sendKernelImpl(msg, mq); // 4. 更新故障规避信息 updateFaultItem(mq.getBrokerName(), result); return result; } } 4.2 Producer 负载均衡 public class ProducerLoadBalance { // 队列选择策略 public MessageQueue selectQueue(TopicPublishInfo tpInfo) { // 1. 简单轮询 int index = sendWhichQueue.incrementAndGet() % queueSize; return tpInfo.getMessageQueueList().get(index); // 2. 故障规避 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { return mq; } // 3. 选择延迟最小的 return latencyFaultTolerance.pickOneAtLeast(); } } 五、Consumer 工作原理 5.1 Consumer 架构 public class ConsumerArchitecture { // Consumer 核心组件 class ConsumerComponents { private RebalanceService rebalanceService; // 负载均衡服务 private PullMessageService pullMessageService; // 拉取服务 private ConsumeMessageService consumeService; // 消费服务 private OffsetStore offsetStore; // 进度存储 } // 消费流程 public void start() { // 1. 启动负载均衡 rebalanceService.start(); // 2. 启动拉取服务 pullMessageService.start(); // 3. 启动消费服务 consumeService.start(); // 4. 注册到 Broker registerConsumer(); } } 5.2 Consumer 负载均衡(Rebalance) public class ConsumerRebalance { public void doRebalance() { // 1. 获取 Topic 的所有 Queue List<MessageQueue> mqAll = getTopicQueues(topic); // 2. 获取同组的所有 Consumer List<String> cidAll = getConsumerList(group); // 3. 排序(保证一致性视图) Collections.sort(mqAll); Collections.sort(cidAll); // 4. 执行分配 List<MessageQueue> allocated = allocateStrategy.allocate( group, currentCID, mqAll, cidAll ); // 5. 更新本地队列 updateProcessQueue(allocated); } } 六、完整消息流转 6.1 发送消息完整流程 消息发送完整流程: ┌───────────┐ │ Producer │ └─────┬─────┘ │1. 查询路由 ▼ ┌───────────┐ │NameServer │ 返回路由信息(Broker列表、Queue信息) └─────┬─────┘ │ ▼ ┌───────────┐ │ Producer │ 2. 选择 Broker 和 Queue └─────┬─────┘ │3. 发送消息 ▼ ┌───────────┐ │ Broker │ 4. 写入 CommitLog │ ┌───────┐ │ │ │CommitLog│ │ └───┬───┘ │ │ │5. 异步构建 ConsumeQueue │ ┌───▼──────┐ │ │ConsumeQ │ │ └──────────┘ │ │6. 返回结果 └─────┼─────┘ ▼ ┌───────────┐ │ Producer │ 7. 收到发送结果 └───────────┘ 6.2 消费消息完整流程 消息消费完整流程: ┌───────────┐ │ Consumer │ └─────┬─────┘ │1. 注册到 Broker ▼ ┌───────────┐ │ Broker │ 保存 Consumer 信息 └─────┬─────┘ │2. Rebalance 分配 Queue ▼ ┌───────────┐ │ Consumer │ 3. 发起长轮询拉取 └─────┬─────┘ │ ▼ ┌───────────┐ │ Broker │ 4. 从 ConsumeQueue 查找 │ ┌───────┐ │ 5. 从 CommitLog 读取消息 │ │ConsumeQ│ │ │ └───┬───┘ │ │ │ │ │ ┌───▼────┐│ │ │CommitLog││ │ └────────┘│ │ │6. 返回消息 └─────┼─────┘ ▼ ┌───────────┐ │ Consumer │ 7. 处理消息 └─────┬─────┘ 8. 更新消费进度 │ ▼ ┌───────────┐ │ Broker │ 9. 保存消费进度 └───────────┘ 七、设计亮点与思考 7.1 设计亮点 public class DesignHighlights { // 1. NameServer 的轻量化设计 class LightweightNameServer { /* - 无状态,内存操作 - 对等部署,无需选举 - 简单高效 - AP 而非 CP */ } // 2. CommitLog 的顺序写 class SequentialWrite { /* - 所有消息顺序写入同一个文件 - 充分利用磁盘顺序写性能 - 避免随机 IO */ } // 3. ConsumeQueue 的索引设计 class IndexDesign { /* - 轻量级索引(20字节) - 快速定位消息位置 - 支持消费进度管理 */ } // 4. 长轮询的伪 Push class LongPolling { /* - Consumer 主动拉取 - Broker Hold 请求 - 兼顾实时性和可控性 */ } } 7.2 架构权衡 public class ArchitectureTradeoffs { // 权衡1:性能 vs 可靠性 class PerformanceVsReliability { // 异步刷盘 → 高性能,可能丢消息 // 同步刷盘 → 零丢失,性能下降 // 异步复制 → 高性能,可能丢消息 // 同步复制 → 高可靠,性能下降 } // 权衡2:一致性 vs 可用性 class ConsistencyVsAvailability { // NameServer 选择 AP // - 可能短暂不一致 // - 但保证高可用 // Dledger 模式选择 CP // - 保证数据一致 // - 需要多数节点存活 } // 权衡3:复杂度 vs 功能 class ComplexityVsFeature { // RocketMQ 比 Kafka 复杂 // - 但功能更丰富(事务、延迟、顺序) // RocketMQ 比 RabbitMQ 简单 // - 但性能更高 } } 八、总结 通过本篇,我们从整体视角理解了 RocketMQ 的架构: ...

2025-11-13 · maneng

RocketMQ入门05:核心概念详解(上)- Producer、Consumer、Broker

引言:理解核心,掌握全局 如果把 RocketMQ 比作一个物流系统,那么: Producer 是发货方,负责打包发送包裹 Broker 是物流中心,负责存储转运包裹 Consumer 是收货方,负责签收处理包裹 理解这三个核心组件,就掌握了 RocketMQ 的精髓。 一、Producer(生产者)详解 1.1 Producer 架构 // Producer 内部结构 public class ProducerArchitecture { // 核心组件 private MQClientInstance clientInstance; // 客户端实例 private MQClientAPIImpl mqClientAPI; // 通信层 private TopicPublishInfo topicPublishInfo; // 路由信息 private SendMessageHook sendHook; // 发送钩子 private DefaultMQProducerImpl impl; // 核心实现 } Producer 内部架构: ┌──────────────────────────────────┐ │ 应用程序 │ ├──────────────────────────────────┤ │ DefaultMQProducer │ <- API 层 ├──────────────────────────────────┤ │ DefaultMQProducerImpl │ <- 核心实现 ├──────────────────────────────────┤ │ ┌──────────┬─────────────────┐ │ │ │路由管理 │ 消息发送器 │ │ │ ├──────────┼─────────────────┤ │ │ │故障规避 │ 异步发送处理 │ │ │ ├──────────┼─────────────────┤ │ │ │负载均衡 │ 事务消息处理 │ │ │ └──────────┴─────────────────┘ │ ├──────────────────────────────────┤ │ Netty 通信层 │ └──────────────────────────────────┘ 1.2 Producer 生命周期 public class ProducerLifecycle { // 1. 创建阶段 public void create() { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup"); producer.setNamesrvAddr("localhost:9876"); producer.setRetryTimesWhenSendFailed(3); // 同步发送失败重试次数 producer.setRetryTimesWhenSendAsyncFailed(2); // 异步发送失败重试次数 producer.setSendMsgTimeout(3000); // 发送超时时间 } // 2. 启动阶段 public void start() throws MQClientException { producer.start(); // 内部流程: // a. 检查配置 // b. 获取 NameServer 地址 // c. 启动定时任务(路由更新、心跳) // d. 启动消息发送线程池 // e. 注册 Producer 到所有 Broker } // 3. 运行阶段 public void running() throws Exception { // 获取路由信息 TopicRouteData routeData = producer.getDefaultMQProducerImpl() .getmQClientFactory() .getMQClientAPIImpl() .getTopicRouteInfoFromNameServer("TopicTest", 3000); // 选择消息队列 MessageQueue mq = selectMessageQueue(routeData); // 发送消息 SendResult result = producer.send(message, mq); } // 4. 关闭阶段 public void shutdown() { producer.shutdown(); // 内部流程: // a. 注销 Producer // b. 关闭网络连接 // c. 清理资源 // d. 停止定时任务 } } 1.3 消息发送流程 // 消息发送核心流程 public class SendMessageFlow { public SendResult sendMessage(Message msg) { // 1. 验证消息 Validators.checkMessage(msg, producer); // 2. 获取路由信息 TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(msg.getTopic()); // 3. 选择消息队列(负载均衡) MessageQueue mq = selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 4. 消息发送前处理 msg.setBody(compress(msg.getBody())); // 压缩 msg.setProperty("UNIQ_KEY", MessageClientIDSetter.createUniqID()); // 唯一ID // 5. 发送消息 SendResult sendResult = sendKernelImpl( msg, mq, communicationMode, // SYNC, ASYNC, ONEWAY sendCallback, timeout ); // 6. 发送后处理(钩子) if (hasSendMessageHook()) { sendMessageHook.sendMessageAfter(context); } return sendResult; } } 1.4 负载均衡策略 public class ProducerLoadBalance { // 默认策略:轮询 + 故障规避 public MessageQueue selectOneMessageQueue(TopicPublishInfo tpInfo, String lastBrokerName) { // 启用故障规避 if (this.sendLatencyFaultEnable) { // 1. 优先选择延迟小的 Broker for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int index = Math.abs(sendWhichQueue.incrementAndGet()) % queueSize; MessageQueue mq = tpInfo.getMessageQueueList().get(index); // 检查 Broker 是否可用 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // 避免连续发送到同一个 Broker if (null == lastBrokerName || !mq.getBrokerName().equals(lastBrokerName)) { return mq; } } } // 2. 如果都不可用,选择延迟最小的 MessageQueue mq = latencyFaultTolerance.pickOneAtLeast(); return mq; } // 不启用故障规避:简单轮询 return selectOneMessageQueue(); } // 故障延迟统计 class LatencyStats { // 根据发送耗时计算 Broker 不可用时长 // 耗时:50ms, 100ms, 550ms, 1000ms, 2000ms, 3000ms, 15000ms // 规避:0ms, 0ms, 30s, 60s, 120s, 180s, 600s private final long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private final long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; } } 二、Consumer(消费者)详解 2.1 Consumer 架构 Consumer 内部架构: ┌──────────────────────────────────┐ │ 应用程序 │ ├──────────────────────────────────┤ │ DefaultMQPushConsumer │ <- Push 模式 API │ DefaultMQPullConsumer │ <- Pull 模式 API ├──────────────────────────────────┤ │ ┌──────────────────────────┐ │ │ │ 消费者核心实现 │ │ │ ├──────────────────────────┤ │ │ │ Rebalance 负载均衡 │ │ │ │ ProcessQueue 处理队列 │ │ │ │ ConsumeMessageService │ │ │ │ 消费进度管理 │ │ │ └──────────────────────────┘ │ ├──────────────────────────────────┤ │ PullMessage 长轮询服务 │ ├──────────────────────────────────┤ │ Netty 通信层 │ └──────────────────────────────────┘ 2.2 Push vs Pull 模式 // Push 模式(推荐) public class PushConsumerDemo { public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup"); // Push 模式本质:长轮询拉取 // 1. Consumer 主动拉取消息 // 2. 如果没有消息,Broker hold 住请求 // 3. 有新消息时,立即返回 // 4. 看起来像是 Broker 推送 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> messages, ConsumeConcurrentlyContext context) { // 处理消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } } // Pull 模式(特殊场景) public class PullConsumerDemo { public void consume() throws Exception { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ConsumerGroup"); consumer.start(); // 手动拉取消息 Set<MessageQueue> mqs = consumer.fetchMessageQueuesInBalance("TopicTest"); for (MessageQueue mq : mqs) { PullResult pullResult = consumer.pullBlockIfNotFound( mq, // 消息队列 null, // 过滤表达式 getOffset(mq), // 消费位点 32 // 最大拉取数量 ); // 处理消息 for (MessageExt msg : pullResult.getMsgFoundList()) { processMessage(msg); } // 更新消费位点 updateOffset(mq, pullResult.getNextBeginOffset()); } } } 2.3 消费者 Rebalance 机制 public class ConsumerRebalance { // Rebalance 触发条件: // 1. Consumer 数量变化(上线/下线) // 2. Topic 的 Queue 数量变化 // 3. 消费者订阅关系变化 public void rebalanceProcess() { // 1. 获取 Topic 的所有 Queue Set<MessageQueue> mqSet = topicSubscribeInfoTable.get(topic); // 2. 获取消费组的所有 Consumer List<String> cidAll = findConsumerIdList(topic, consumerGroup); // 3. 排序(保证所有 Consumer 看到一致的视图) Collections.sort(mqAll); Collections.sort(cidAll); // 4. 分配策略 AllocateMessageQueueStrategy strategy = getAllocateMessageQueueStrategy(); List<MessageQueue> allocateResult = strategy.allocate( consumerGroup, currentCID, mqAll, cidAll ); // 5. 更新本地队列 updateProcessQueueTableInRebalance(topic, allocateResult); } // 分配策略示例:平均分配 public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { public List<MessageQueue> allocate( String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); // 计算分配数量 int averageSize = (mod > 0 && index < mod) ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size(); // 计算起始位置 int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 分配队列 return mqAll.subList(startIndex, Math.min(startIndex + averageSize, mqAll.size())); } } } 三、Broker 详解 3.1 Broker 架构 Broker 完整架构: ┌────────────────────────────────────────┐ │ 客户端请求 │ ├────────────────────────────────────────┤ │ Remoting 通信层 │ ├────────────────────────────────────────┤ │ ┌──────────────┬─────────────────┐ │ │ │ Processor │ 业务处理器 │ │ │ ├──────────────┼─────────────────┤ │ │ │SendMessage │ 接收消息 │ │ │ │PullMessage │ 拉取消息 │ │ │ │QueryMessage │ 查询消息 │ │ │ │AdminRequest │ 管理请求 │ │ │ └──────────────┴─────────────────┘ │ ├────────────────────────────────────────┤ │ Store 存储层 │ │ ┌──────────────────────────────┐ │ │ │ CommitLog(消息存储) │ │ │ ├──────────────────────────────┤ │ │ │ ConsumeQueue(消费索引) │ │ │ ├──────────────────────────────┤ │ │ │ IndexFile(消息索引) │ │ │ └──────────────────────────────┘ │ ├────────────────────────────────────────┤ │ HA 高可用模块 │ ├────────────────────────────────────────┤ │ Schedule 定时任务 │ └────────────────────────────────────────┘ 3.2 Broker 消息存储 public class BrokerStorage { // 1. CommitLog:消息主体存储 class CommitLog { // 所有消息顺序写入 // 单个文件默认 1GB // 文件名为起始偏移量 private final MappedFileQueue mappedFileQueue; public PutMessageResult putMessage(MessageExtBrokerInner msg) { // 获取当前写入文件 MappedFile mappedFile = mappedFileQueue.getLastMappedFile(); // 写入消息 AppendMessageResult result = mappedFile.appendMessage(msg, this.appendMessageCallback); // 刷盘策略 handleDiskFlush(result, msg); // 主从同步 handleHA(result, msg); return new PutMessageResult(PutMessageStatus.PUT_OK, result); } } // 2. ConsumeQueue:消费队列 class ConsumeQueue { // 存储格式:8字节 CommitLog 偏移量 + 4字节消息大小 + 8字节 Tag HashCode // 单个文件 30W 条记录,约 5.72MB private final MappedFileQueue mappedFileQueue; public void putMessagePositionInfo( long offset, // CommitLog 偏移量 int size, // 消息大小 long tagsCode, // Tag HashCode long cqOffset) { // ConsumeQueue 偏移量 // 写入索引 this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode); // 更新最大偏移量 this.maxPhysicOffset = offset + size; } } // 3. IndexFile:消息索引 class IndexFile { // 支持按 Key 或时间区间查询 // 存储格式:Header(40B) + Slot(500W*4B) + Index(2000W*20B) // 单个文件约 400MB public void putKey(String key, long phyOffset, long storeTimestamp) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; // 存储索引 // ... } } } 3.3 Broker 刷盘机制 public class FlushDiskStrategy { // 同步刷盘 class SyncFlush { public void flush() { // 写入 PageCache 后立即刷盘 // 优点:数据可靠性高 // 缺点:性能差 if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { GroupCommitService service = (GroupCommitService) this.flushCommitLogService; // 等待刷盘完成 boolean flushResult = service.waitForFlush(timeout); if (!flushResult) { log.error("Sync flush timeout"); throw new RuntimeException("Flush timeout"); } } } } // 异步刷盘(默认) class AsyncFlush { public void flush() { // 写入 PageCache 后立即返回 // 后台线程定时刷盘 // 优点:性能高 // 缺点:可能丢失数据(机器宕机) // 定时刷盘(默认 500ms) this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { // 刷盘条件: // 1. 距上次刷盘超过 10s // 2. 脏页超过 4 页 // 3. 收到关闭信号 if (System.currentTimeMillis() - lastFlushTime > 10000 || dirtyPages > 4) { mappedFile.flush(0); lastFlushTime = System.currentTimeMillis(); dirtyPages = 0; } } }, 500, 500, TimeUnit.MILLISECONDS); } } } 3.4 Broker 主从同步 public class BrokerHA { // Master Broker class HAService { // 接受 Slave 连接 private AcceptSocketService acceptSocketService; // 管理多个 Slave 连接 private List<HAConnection> connectionList; public void start() { this.acceptSocketService.start(); this.groupTransferService.start(); this.haClient.start(); } // 等待 Slave 同步 public boolean waitForSlaveSync(long masterPutWhere) { // 同步复制模式 if (messageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SYNC_MASTER) { // 等待至少一个 Slave 同步完成 for (HAConnection conn : connectionList) { if (conn.getSlaveAckOffset() >= masterPutWhere) { return true; } } return false; } // 异步复制模式:直接返回 return true; } } // Slave Broker class HAClient { private SocketChannel socketChannel; public void run() { while (!this.isStopped()) { try { // 连接 Master connectMaster(); // 上报同步进度 reportSlaveMaxOffset(); // 接收数据 boolean ok = processReadEvent(); // 处理数据 dispatchReadRequest(); } catch (Exception e) { log.error("HAClient error", e); } } } } } 四、组件交互流程 4.1 完整的消息流转 消息完整流转过程: ┌─────────────┐ │ Producer │ └──────┬──────┘ │ 1. Send Message ↓ ┌─────────────┐ │ NameServer │ <- 2. Get Route Info └──────┬──────┘ │ 3. Route Info ↓ ┌─────────────┐ │ Broker │ │ ┌─────────┐ │ │ │CommitLog│ │ <- 4. Store Message │ └─────────┘ │ │ ┌─────────┐ │ │ │ConsumeQ │ │ <- 5. Build Index │ └─────────┘ │ └──────┬──────┘ │ 6. Pull Message ↓ ┌─────────────┐ │ Consumer │ └─────────────┘ 4.2 心跳机制 public class HeartbeatMechanism { // Producer/Consumer → Broker class ClientHeartbeat { // 每 30 秒发送一次心跳 private final long heartbeatInterval = 30000; public void sendHeartbeatToAllBroker() { HeartbeatData heartbeatData = prepareHeartbeatData(); for (Entry<String, HashMap<Long, String>> entry : brokerAddrTable.entrySet()) { String brokerName = entry.getKey(); for (Entry<Long, String> innerEntry : entry.getValue().entrySet()) { String addr = innerEntry.getValue(); // 发送心跳 this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000); } } } } // Broker → NameServer class BrokerHeartbeat { // 每 30 秒注册一次 private final long registerInterval = 30000; public void registerBrokerAll() { RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); // 注册到所有 NameServer for (String namesrvAddr : namesrvAddrList) { registerBroker(namesrvAddr, requestBody); } } } } 五、配置优化建议 5.1 Producer 优化 // 性能优化配置 producer.setSendMsgTimeout(3000); // 发送超时 producer.setCompressMsgBodyOverHowmuch(4096); // 消息压缩阈值 producer.setMaxMessageSize(4 * 1024 * 1024); // 最大消息大小 producer.setRetryTimesWhenSendFailed(2); // 重试次数 producer.setSendLatencyFaultEnable(true); // 故障规避 5.2 Consumer 优化 // 消费优化配置 consumer.setConsumeThreadMin(20); // 最小消费线程数 consumer.setConsumeThreadMax(64); // 最大消费线程数 consumer.setConsumeMessageBatchMaxSize(1); // 批量消费数量 consumer.setPullBatchSize(32); // 批量拉取数量 consumer.setPullInterval(0); // 拉取间隔 consumer.setConsumeConcurrentlyMaxSpan(2000); // 消费进度延迟阈值 5.3 Broker 优化 # broker.conf 优化配置 # 刷盘策略 flushDiskType=ASYNC_FLUSH # 同步刷盘超时时间 syncFlushTimeout=5000 # 消息最大大小 maxMessageSize=65536 # 发送线程池大小 sendMessageThreadPoolNums=128 # 拉取线程池大小 pullMessageThreadPoolNums=128 # Broker 角色 brokerRole=ASYNC_MASTER 六、总结 通过本篇,我们深入理解了: ...

2025-11-13 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...