引言:消息系统的心脏
如果说 NameServer 是 RocketMQ 的"大脑"(路由中心),那么 Broker 就是"心脏"(消息中枢)。它负责:
- 消息存储:持久化所有消息
- 消息处理:接收生产者消息,推送给消费者
- 高可用保障:主从同步、故障转移
- 性能优化:零拷贝、顺序写、内存映射
今天我们从第一性原理出发,逐步理解 Broker 的内部世界。
一、Broker 的核心职责
1.1 从需求出发
假设我们要设计一个消息存储节点,需要解决哪些问题?
基础需求:
1. 接收消息 → 需要网络通信模块
2. 存储消息 → 需要存储引擎
3. 分发消息 → 需要索引和查询机制
4. 保证可靠 → 需要主从同步
5. 高性能 → 需要优化 I/O
Broker 就是围绕这5个核心需求设计的。
1.2 Broker 的三重身份
┌─────────────────────────────────────┐
│ Broker 的三重身份 │
├─────────────────────────────────────┤
│ 1. 消息接收器(Producer 视角) │
│ - 接收消息请求 │
│ - 验证权限 │
│ - 返回存储结果 │
├─────────────────────────────────────┤
│ 2. 消息存储库(系统视角) │
│ - 持久化消息 │
│ - 管理索引 │
│ - 定期清理 │
├─────────────────────────────────────┤
│ 3. 消息分发器(Consumer 视角) │
│ - 根据订阅关系推送消息 │
│ - 管理消费进度 │
│ - 支持消息重试 │
└─────────────────────────────────────┘
二、Broker 架构全景
2.1 核心组件架构图
┌────────────────────────────────────────────────────────┐
│ Broker 节点 │
├────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Remoting 模块(网络通信层) │ │
│ │ - NettyServer(接收请求) │ │
│ │ - RequestProcessor(请求处理器) │ │
│ └──────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ Message Store 模块(存储引擎) │ │
│ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │
│ │ │ CommitLog │ │ConsumeQueue│ │ IndexFile │ │ │
│ │ │ 消息主存储 │ │ 消费索引 │ │ 查询索引 │ │ │
│ │ └────────────┘ └────────────┘ └────────────┘ │ │
│ └──────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ HA 模块(高可用) │ │
│ │ - HAService(主从同步) │ │
│ │ - CommitLogDispatcher(消息分发) │ │
│ └──────────────────────────────────────────────────┘ │
│ ↓ │
│ ┌──────────────────────────────────────────────────┐ │
│ │ 其他核心模块 │ │
│ │ - TopicConfigManager(Topic 配置管理) │ │
│ │ - ConsumerOffsetManager(消费进度管理) │ │
│ │ - ScheduleMessageService(延迟消息) │ │
│ │ - TransactionalMessageService(事务消息) │ │
│ └──────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────┘
2.2 各模块职责详解
1️⃣ Remoting 模块(网络通信)
// 核心职责
1. 接收网络请求(基于 Netty)
2. 请求路由与分发
3. 编解码(序列化/反序列化)
4. 返回响应结果
// 处理的请求类型
- SEND_MESSAGE // 发送消息
- PULL_MESSAGE // 拉取消息
- QUERY_MESSAGE // 查询消息
- HEART_BEAT // 心跳
- REGISTER_BROKER // Broker 注册
关键设计:
- 基于 Netty 的高性能异步通信
- 请求处理器模式(每种请求对应一个处理器)
- 支持同步/异步/单向调用
2️⃣ Message Store 模块(存储引擎)
这是 Broker 最核心的模块,我们将在后续文章深入讲解。
核心文件结构:
$HOME/store/
├── commitlog/ # 消息主存储(所有消息)
│ ├── 00000000000000000000
│ ├── 00000000001073741824
│ └── ...
├── consumequeue/ # 消费索引(按 Topic-Queue 组织)
│ ├── TopicA/
│ │ ├── 0/
│ │ ├── 1/
│ │ └── ...
│ └── TopicB/
│ └── ...
└── index/ # 查询索引(按 Key/时间)
└── ...
关键设计:
- CommitLog 顺序写(性能优化)
- ConsumeQueue 轻量级索引(快速定位)
- 内存映射文件(MMAP,零拷贝)
3️⃣ HA 模块(高可用)
主 Broker 从 Broker
│ │
│ 1. 消息写入 CommitLog │
│────────────────────────────▶│
│ │ 2. 同步复制
│ 3. 返回主从同步位置 │
│◀────────────────────────────│
│ │
主从同步策略:
- 同步复制:主 Broker 等待从 Broker 确认后才返回成功
- 异步复制:主 Broker 写入成功立即返回,后台异步同步
4️⃣ 其他核心模块
| 模块 | 职责 |
|---|---|
| TopicConfigManager | Topic 和 Queue 配置管理 |
| ConsumerOffsetManager | 消费进度持久化 |
| ScheduleMessageService | 延迟消息调度 |
| TransactionalMessageService | 事务消息协调 |
| FilterServerManager | 消息过滤服务 |
三、消息处理全流程
3.1 生产者发送消息流程
Producer Broker 存储引擎
│ │ │
│ 1. 发送消息请求 │ │
├─────────────────────▶│ │
│ │ 2. 验证权限/Topic存在 │
│ ├───────────────────────▶│
│ │ │ 3. 写入 CommitLog
│ │ │ 4. 构建索引
│ │ 5. 返回存储位置 │
│ ◀────────────────────────┤
│ 6. 返回发送结果 │ │
◀─────────────────────┤ │
│ │ │
详细步骤:
// 伪代码演示
public SendResult handleSendMessage(SendMessageRequest request) {
// 1. 验证 Topic 配置
TopicConfig topicConfig = topicConfigManager.selectTopicConfig(request.getTopic());
if (topicConfig == null) {
throw new TopicNotExistException();
}
// 2. 选择写入的 Queue
int queueId = selectQueue(topicConfig);
// 3. 构建消息对象
MessageExtBrokerInner msgInner = buildMessage(request, queueId);
// 4. 写入存储引擎
PutMessageResult result = messageStore.putMessage(msgInner);
// 5. 返回结果
return new SendResult(
result.getMessageId(),
queueId,
result.getQueueOffset()
);
}
3.2 消费者拉取消息流程
Consumer Broker 存储引擎
│ │ │
│ 1. 拉取消息请求 │ │
│ (Topic, QueueId, │ │
│ Offset) │ │
├─────────────────────▶│ │
│ │ 2. 查询 ConsumeQueue │
│ ├───────────────────────▶│
│ │ │ 3. 获取消息位置
│ │ 4. 查询 CommitLog │
│ ├───────────────────────▶│
│ │ │ 5. 读取消息内容
│ │ 6. 返回消息列表 │
│ ◀────────────────────────┤
│ 7. 返回消息 │ │
◀─────────────────────┤ │
│ │ │
详细步骤:
// 伪代码演示
public PullResult handlePullMessage(PullMessageRequest request) {
// 1. 查询 ConsumeQueue(轻量级索引)
ConsumeQueue consumeQueue = findConsumeQueue(
request.getTopic(),
request.getQueueId()
);
// 2. 根据 Offset 查询消息位置
List<SelectMappedBufferResult> bufferResults =
consumeQueue.getMessages(request.getOffset(), request.getMaxMsgNums());
// 3. 从 CommitLog 读取消息内容
List<MessageExt> messageList = new ArrayList<>();
for (SelectMappedBufferResult bufferResult : bufferResults) {
MessageExt msg = MessageDecoder.decode(bufferResult.getByteBuffer());
messageList.add(msg);
}
// 4. 返回结果
return new PullResult(
PullStatus.FOUND,
nextBeginOffset,
messageList
);
}
四、主从同步机制
4.1 为什么需要主从同步?
单点故障问题:
场景:只有一个 Broker
问题:Broker 宕机 → 消息无法发送/消费 → 业务中断
解决方案:主从架构
主 Broker(可读可写) + 从 Broker(只读)
- 主 Broker 宕机 → 从 Broker 继续提供消费服务
- 数据实时同步 → 数据不丢失
4.2 主从同步流程
┌─────────────────────────────────────────────────┐
│ 主从同步流程 │
├─────────────────────────────────────────────────┤
│ │
│ 主 Broker 从 Broker │
│ │ │ │
│ │ 1. 消息写入 CommitLog │ │
│ │ (offset: 1000) │ │
│ │ │ │
│ │ 2. HAService 通知从节点 │ │
│ │────────────────────────────▶│ │
│ │ │ 3. 拉取数据 │
│ │ │ │
│ │ 4. 返回消息数据 │ │
│ │◀────────────────────────────│ │
│ │ │ 5. 写入本地 │
│ │ │ CommitLog │
│ │ │ │
│ │ 6. 报告同步位置 (1000) │ │
│ │◀────────────────────────────│ │
│ │ │ │
└─────────────────────────────────────────────────┘
4.3 同步复制 vs 异步复制
| 对比项 | 同步复制 | 异步复制 |
|---|---|---|
| 可靠性 | 高(从库确认后才返回) | 中(可能丢失未同步的消息) |
| 性能 | 低(等待从库确认) | 高(立即返回) |
| 适用场景 | 金融、支付等高可靠场景 | 日志、监控等高吞吐场景 |
配置方式:
# Broker 配置文件
brokerRole=SYNC_MASTER # 同步主节点
# brokerRole=ASYNC_MASTER # 异步主节点
# brokerRole=SLAVE # 从节点
五、Broker 与 NameServer 的交互
5.1 Broker 启动注册
Broker NameServer
│ │
│ 1. 启动完成 │
│ │
│ 2. 注册请求 │
│ (brokerName, address, │
│ topicConfig) │
├────────────────────────▶│
│ │ 3. 更新路由表
│ 4. 注册成功 │
◀────────────────────────┤
│ │
│ 5. 每30秒心跳 │
├────────────────────────▶│
│ │
注册信息包含:
{
"brokerName": "broker-a",
"brokerAddr": "192.168.1.10:10911",
"cluster": "DefaultCluster",
"haServerAddr": "192.168.1.10:10912", // 主从同步地址
"topicConfigTable": {
"TopicA": {...},
"TopicB": {...}
}
}
5.2 心跳保活机制
┌────────────────────────────────────┐
│ Broker 心跳机制 │
├────────────────────────────────────┤
│ 1. Broker 每30秒发送心跳 │
│ 2. NameServer 记录最后心跳时间 │
│ 3. 超过120秒未心跳 → 剔除该 Broker │
│ 4. Producer/Consumer 定期更新路由 │
└────────────────────────────────────┘
六、Broker 的性能优化
6.1 核心优化技术
1️⃣ 顺序写 CommitLog
传统随机写(慢):
Msg1 → Disk Block 100
Msg2 → Disk Block 500
Msg3 → Disk Block 200
性能:约 100-200 IOPS
RocketMQ 顺序写(快):
Msg1 → Msg2 → Msg3 → ... → CommitLog
性能:约 600MB/s(机械硬盘)
2️⃣ 内存映射文件(MMAP)
// 零拷贝读取
MappedByteBuffer mappedByteBuffer =
fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize);
// 优势:
// - 减少用户态/内核态切换
// - 减少数据拷贝次数
// - 利用 OS 的 PageCache
3️⃣ 异步刷盘
同步刷盘:每条消息都 fsync() → 慢但可靠
异步刷盘:批量刷盘(默认500ms) → 快但可能丢失
配置:
# 刷盘策略
flushDiskType=ASYNC_FLUSH # 异步刷盘(默认)
# flushDiskType=SYNC_FLUSH # 同步刷盘
七、实战:Broker 集群部署
7.1 主从部署架构
┌────────────────────────────────────────┐
│ Broker 集群(双主双从) │
├────────────────────────────────────────┤
│ │
│ broker-a (MASTER) broker-a-s (SLAVE)│
│ 192.168.1.10 192.168.1.11 │
│ │ │ │
│ └────────────────────┘ │
│ 主从同步 │
│ │
│ broker-b (MASTER) broker-b-s (SLAVE)│
│ 192.168.1.20 192.168.1.21 │
│ │ │ │
│ └────────────────────┘ │
│ 主从同步 │
│ │
└────────────────────────────────────────┘
7.2 配置示例
主节点配置 (broker-a.properties):
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0 # 0 表示 Master
brokerRole=SYNC_MASTER # 同步主节点
deleteWhen=04 # 凌晨4点删除过期文件
fileReservedTime=48 # 文件保留48小时
flushDiskType=ASYNC_FLUSH # 异步刷盘
从节点配置 (broker-a-s.properties):
brokerClusterName=DefaultCluster
brokerName=broker-a # 与主节点相同
brokerId=1 # 非0表示 Slave
brokerRole=SLAVE # 从节点
deleteWhen=04
fileReservedTime=48
flushDiskType=ASYNC_FLUSH
八、总结与思考
8.1 核心要点
- Broker 是消息中枢,负责存储、处理、分发消息
- 模块化设计:Remoting、Store、HA 各司其职
- 主从同步保障高可用,支持同步/异步复制
- 性能优化:顺序写、MMAP、异步刷盘
- 与 NameServer 协作:注册、心跳、路由更新
8.2 思考题
为什么 CommitLog 使用顺序写而不是随机写?
- 提示:机械硬盘的特性
主从同步如果网络延迟很高会怎样?
- 提示:思考同步复制的性能影响
如果主 Broker 宕机,从 Broker 能否自动切换为主?
- 提示:RocketMQ 4.x vs 5.x 的区别
8.3 下一步学习
下一篇我们将深入 存储引擎,详细剖析 CommitLog、ConsumeQueue、IndexFile 的实现原理。
参考资料
本文关键词:RocketMQ Broker 消息存储 主从同步 高可用 性能优化
专题导航:上一篇:NameServer深度剖析 | 下一篇:存储引擎深度剖析