引言:一个精妙的分布式系统
如果把 RocketMQ 比作一座现代化城市,那么:
- NameServer 是城市规划局,管理所有地图信息
- Broker 是物流仓库,存储和转运货物
- Producer 是发货方,把货物送到仓库
- Consumer 是收货方,从仓库取货
今天,我们从整体视角理解这座"消息城市"的运作机制。
一、RocketMQ 整体架构
1.1 核心组件
RocketMQ 架构全景图:
┌────────────────────────────────────────────┐
│ NameServer Cluster │
│ (路由注册中心、服务发现、轻量级协调器) │
└────────┬───────────────────┬───────────────┘
│ │
注册/心跳 │ │ 获取路由
│ │
┌────────────▼──────┐ ┌────▼────────────────┐
│ │ │ │
│ Broker Cluster │ │ Producer/Consumer │
│ │ │ │
│ ┌───────────────┐ │ │ 应用程序 │
│ │ Master Broker │ │◄────┤ │
│ │ - CommitLog │ │ 读写 │ │
│ │ - ConsumeQ │ │ 消息 │ │
│ │ - IndexFile │ │ │ │
│ └───────┬───────┘ │ └─────────────────────┘
│ │ 主从同步 │
│ ┌───────▼───────┐ │
│ │ Slave Broker │ │
│ │ - CommitLog │ │
│ │ - ConsumeQ │ │
│ │ - IndexFile │ │
│ └───────────────┘ │
└───────────────────┘
1.2 组件职责矩阵
组件职责对比:
┌──────────────┬────────────────────────────────────────┐
│ 组件 │ 核心职责 │
├──────────────┼────────────────────────────────────────┤
│ NameServer │ - 路由信息管理(Topic → Broker 映射) │
│ │ - Broker 注册与心跳检测 │
│ │ - 提供路由信息查询 │
│ │ - 无状态、对等集群 │
├──────────────┼────────────────────────────────────────┤
│ Broker │ - 消息存储(CommitLog) │
│ │ - 消息索引(ConsumeQueue、IndexFile) │
│ │ - 消息查询与消费 │
│ │ - 高可用(主从复制、Dledger) │
├──────────────┼────────────────────────────────────────┤
│ Producer │ - 消息生产 │
│ │ - 消息发送(同步、异步、单向) │
│ │ - 负载均衡(选择Broker和Queue) │
│ │ - 故障规避 │
├──────────────┼────────────────────────────────────────┤
│ Consumer │ - 消息消费 │
│ │ - 消息拉取(长轮询) │
│ │ - 负载均衡(Rebalance) │
│ │ - 消费进度管理 │
└──────────────┴────────────────────────────────────────┘
二、NameServer 详解
2.1 为什么需要 NameServer
public class WhyNameServer {
// 问题1:Producer/Consumer 如何知道 Broker 在哪里?
class ServiceDiscovery {
// 没有 NameServer:
// - 硬编码 Broker 地址(不灵活)
// - 无法动态感知 Broker 变化
// 有了 NameServer:
// - 动态获取 Broker 列表
// - 自动感知 Broker 上下线
}
// 问题2:Topic 分布在哪些 Broker 上?
class TopicRoute {
// NameServer 维护 Topic 路由信息
class TopicRouteData {
List<QueueData> queueDatas; // Queue 分布
List<BrokerData> brokerDatas; // Broker 信息
}
}
// 问题3:为什么不用 ZooKeeper?
class WhyNotZooKeeper {
/*
ZooKeeper 的问题:
1. CP 系统(强一致性)- RocketMQ 只需 AP(可用性优先)
2. 重量级依赖 - NameServer 更轻量
3. 运维复杂 - NameServer 无状态更简单
4. 性能开销 - NameServer 内存操作更快
NameServer 的优势:
1. 完全无状态
2. 对等集群(任意节点提供服务)
3. 内存操作(无磁盘 IO)
4. 简单高效
*/
}
}
2.2 NameServer 核心功能
public class NameServerCore {
// 1. 路由信息管理
class RouteInfoManager {
// Topic → Broker 映射
private HashMap<String/* topic */, List<QueueData>> topicQueueTable;
// Broker 名称 → Broker 数据
private HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
// Broker 地址 → 集群信息
private HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// 集群名称 → Broker 名称列表
private HashMap<String/* clusterName */, Set<String>> clusterAddrTable;
// 过滤服务器
private HashMap<String/* brokerAddr */, List<String>> filterServerTable;
}
// 2. Broker 注册
public RegisterBrokerResult registerBroker(
String clusterName,
String brokerAddr,
String brokerName,
long brokerId,
String haServerAddr,
TopicConfigSerializeWrapper topicConfigWrapper) {
// 创建或更新 Broker 数据
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
brokerData = new BrokerData(clusterName, brokerName, new HashMap<>());
this.brokerAddrTable.put(brokerName, brokerData);
}
brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
// 更新 Topic 路由信息
if (null != topicConfigWrapper) {
for (Map.Entry<String, TopicConfig> entry :
topicConfigWrapper.getTopicConfigTable().entrySet()) {
createAndUpdateQueueData(brokerName, entry.getValue());
}
}
// 更新 Broker 存活信息
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(
brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr
)
);
return new RegisterBrokerResult(...);
}
// 3. 路由查询
public TopicRouteData getTopicRouteData(String topic) {
TopicRouteData topicRouteData = new TopicRouteData();
// 查找 Topic 的 Queue 信息
List<QueueData> queueDataList = this.topicQueueTable.get(topic);
// 查找对应的 Broker 信息
Set<String> brokerNameSet = new HashSet<>();
for (QueueData qd : queueDataList) {
brokerNameSet.add(qd.getBrokerName());
}
// 组装路由数据
for (String brokerName : brokerNameSet) {
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (brokerData != null) {
topicRouteData.getBrokerDatas().add(brokerData.clone());
}
}
topicRouteData.setQueueDatas(queueDataList);
return topicRouteData;
}
// 4. Broker 心跳检测
public void scanNotActiveBroker() {
long timeoutMillis = 120000; // 2 分钟超时
for (Map.Entry<String, BrokerLiveInfo> entry :
this.brokerLiveTable.entrySet()) {
long last = entry.getValue().getLastUpdateTimestamp();
if ((last + timeoutMillis) < System.currentTimeMillis()) {
// Broker 超时,移除
RemotingUtil.closeChannel(entry.getValue().getChannel());
this.brokerLiveTable.remove(entry.getKey());
// 清理路由信息
this.onChannelDestroy(entry.getKey(), entry.getValue().getChannel());
log.warn("Broker timeout: {}", entry.getKey());
}
}
}
}
2.3 NameServer 集群
public class NameServerCluster {
// NameServer 特点
class Characteristics {
/*
1. 完全无状态
- 不持久化任何数据
- 所有数据在内存中
- 重启后数据丢失(从 Broker 重新获取)
2. 对等部署
- 所有节点功能完全一致
- 无主从之分
- 无需选举
3. 不互相通信
- NameServer 之间无任何通信
- 数据可能短暂不一致
- 最终一致即可
4. Broker 全量注册
- Broker 向所有 NameServer 注册
- 保证数据冗余
*/
}
// 部署架构
class DeploymentArchitecture {
/*
NameServer 集群(推荐3个节点):
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ NameServer1 │ │ NameServer2 │ │ NameServer3 │
└──────┬──────┘ └──────┬──────┘ └──────┬──────┘
│ │ │
└────────────────┼────────────────┘
│
┌───────▼────────┐
│ │
│ Broker 集群 │
│ (全量注册) │
│ │
└────────────────┘
*/
}
}
三、Broker 详解
3.1 Broker 核心架构
Broker 内部架构:
┌─────────────────────────────────────────────────────┐
│ Broker │
│ ┌────────────────────────────────────────────────┐│
│ │ Remoting 通信层 ││
│ │ (接收 Producer/Consumer 请求) ││
│ └────────┬───────────────────────────────────────┘│
│ │ │
│ ┌────────▼────────────────────────────┐ │
│ │ BrokerController │ │
│ │ - 处理器注册 │ │
│ │ - 定时任务管理 │ │
│ │ - 消息处理 │ │
│ └────────┬────────────────────────────┘ │
│ │ │
│ ┌────────▼─────────────┬───────────────────┐ │
│ │ MessageStore │ HAService │ │
│ │ ┌────────────────┐ │ (主从复制) │ │
│ │ │ CommitLog │ │ │ │
│ │ │ (消息存储) │ │ │ │
│ │ └────────────────┘ │ │ │
│ │ ┌────────────────┐ │ │ │
│ │ │ ConsumeQueue │ │ │ │
│ │ │ (消费索引) │ │ │ │
│ │ └────────────────┘ │ │ │
│ │ ┌────────────────┐ │ │ │
│ │ │ IndexFile │ │ │ │
│ │ │ (消息索引) │ │ │ │
│ │ └────────────────┘ │ │ │
│ └──────────────────────┴───────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────┐ │
│ │ 定时任务 │ │
│ │ - 持久化 ConsumeQueue │ │
│ │ - 持久化 CommitLog │ │
│ │ - 清理过期文件 │ │
│ │ - 统计信息 │ │
│ └─────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘
3.2 Broker 核心职责
public class BrokerCore {
// 1. 消息接收与存储
class MessageReceive {
public PutMessageResult putMessage(MessageExtBrokerInner msg) {
// 写入 CommitLog
PutMessageResult result = this.commitLog.putMessage(msg);
// 触发刷盘
handleDiskFlush(result, msg);
// 触发主从同步
handleHA(result, msg);
return result;
}
}
// 2. 消息查询
class MessageQuery {
// 按消息 ID 查询
public MessageExt lookMessageByOffset(long commitLogOffset) {
return this.commitLog.lookMessageByOffset(commitLogOffset);
}
// 按 Key 查询
public QueryMessageResult queryMessage(String topic, String key, int maxNum) {
return this.indexService.queryMessage(topic, key, maxNum);
}
// 按时间查询
public QueryMessageResult queryMessageByTime(String topic, long begin, long end) {
return this.messageStore.queryMessage(topic, begin, end, maxNum);
}
}
// 3. 消息消费
class MessageConsume {
public GetMessageResult getMessage(
String group,
String topic,
int queueId,
long offset,
int maxMsgNums,
SubscriptionData subscriptionData) {
// 从 ConsumeQueue 查找消息位置
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
// 根据 offset 获取消息
SelectMappedBufferResult bufferConsumeQueue =
consumeQueue.getIndexBuffer(offset);
// 从 CommitLog 读取消息内容
List<MessageExt> messageList = new ArrayList<>();
for (int i = 0; i < maxMsgNums; i++) {
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
MessageExt msg = lookMessageByOffset(offsetPy, sizePy);
// 消息过滤
if (match(subscriptionData, msg)) {
messageList.add(msg);
}
}
return new GetMessageResult(messageList);
}
}
// 4. Topic 创建
class TopicManagement {
public void createTopic(String topic, int queueNums) {
TopicConfig topicConfig = new TopicConfig(topic);
topicConfig.setReadQueueNums(queueNums);
topicConfig.setWriteQueueNums(queueNums);
topicConfig.setPerm(6); // 读写权限
this.topicConfigTable.put(topic, topicConfig);
// 创建 ConsumeQueue
for (int i = 0; i < queueNums; i++) {
ConsumeQueue logic = new ConsumeQueue(topic, i);
this.consumeQueueTable.put(buildKey(topic, i), logic);
}
}
}
// 5. 消费进度管理
class ConsumerOffsetManagement {
private ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable;
public void commitOffset(String group, String topic, int queueId, long offset) {
String key = topic + "@" + group;
ConcurrentHashMap<Integer, Long> map = offsetTable.get(key);
if (map != null) {
map.put(queueId, offset);
}
}
public long queryOffset(String group, String topic, int queueId) {
String key = topic + "@" + group;
ConcurrentHashMap<Integer, Long> map = offsetTable.get(key);
if (map != null) {
Long offset = map.get(queueId);
return offset != null ? offset : -1;
}
return -1;
}
}
}
3.3 Broker 高可用
public class BrokerHA {
// 主从架构
class MasterSlave {
/*
主从模式:
┌──────────────┐ 同步 ┌──────────────┐
│ Master Broker│ ──────────> │ Slave Broker │
│ - 读写 │ │ - 只读 │
└──────────────┘ └──────────────┘
同步方式:
1. 同步复制(SYNC_MASTER):
- Master 等待 Slave 同步完成才返回
- 可靠性高,性能稍差
2. 异步复制(ASYNC_MASTER):
- Master 立即返回,后台异步同步
- 性能高,可能丢失少量数据
*/
}
// Dledger 模式
class DledgerMode {
/*
基于 Raft 的自动选主:
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Broker 1 │ │ Broker 2 │ │ Broker 3 │
│ (Leader) │◄─▶│(Follower)│◄─▶│(Follower)│
└──────────┘ └──────────┘ └──────────┘
特点:
1. 自动故障转移
2. 无需人工干预
3. 保证数据一致性
4. 至少3个节点
*/
}
}
四、Producer 工作原理
4.1 Producer 架构
public class ProducerArchitecture {
// Producer 核心组件
class ProducerComponents {
private MQClientInstance clientInstance; // 客户端实例
private MQClientAPIImpl mqClientAPI; // 通信API
private TopicPublishInfo publishInfo; // Topic路由信息
private SendMessageHook sendHook; // 发送钩子
}
// 发送流程
public SendResult send(Message msg) {
// 1. 查找 Topic 路由信息
TopicPublishInfo publishInfo = tryToFindTopicPublishInfo(msg.getTopic());
// 2. 选择消息队列
MessageQueue mq = selectOneMessageQueue(publishInfo);
// 3. 发送消息
SendResult result = sendKernelImpl(msg, mq);
// 4. 更新故障规避信息
updateFaultItem(mq.getBrokerName(), result);
return result;
}
}
4.2 Producer 负载均衡
public class ProducerLoadBalance {
// 队列选择策略
public MessageQueue selectQueue(TopicPublishInfo tpInfo) {
// 1. 简单轮询
int index = sendWhichQueue.incrementAndGet() % queueSize;
return tpInfo.getMessageQueueList().get(index);
// 2. 故障规避
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
return mq;
}
// 3. 选择延迟最小的
return latencyFaultTolerance.pickOneAtLeast();
}
}
五、Consumer 工作原理
5.1 Consumer 架构
public class ConsumerArchitecture {
// Consumer 核心组件
class ConsumerComponents {
private RebalanceService rebalanceService; // 负载均衡服务
private PullMessageService pullMessageService; // 拉取服务
private ConsumeMessageService consumeService; // 消费服务
private OffsetStore offsetStore; // 进度存储
}
// 消费流程
public void start() {
// 1. 启动负载均衡
rebalanceService.start();
// 2. 启动拉取服务
pullMessageService.start();
// 3. 启动消费服务
consumeService.start();
// 4. 注册到 Broker
registerConsumer();
}
}
5.2 Consumer 负载均衡(Rebalance)
public class ConsumerRebalance {
public void doRebalance() {
// 1. 获取 Topic 的所有 Queue
List<MessageQueue> mqAll = getTopicQueues(topic);
// 2. 获取同组的所有 Consumer
List<String> cidAll = getConsumerList(group);
// 3. 排序(保证一致性视图)
Collections.sort(mqAll);
Collections.sort(cidAll);
// 4. 执行分配
List<MessageQueue> allocated = allocateStrategy.allocate(
group, currentCID, mqAll, cidAll
);
// 5. 更新本地队列
updateProcessQueue(allocated);
}
}
六、完整消息流转
6.1 发送消息完整流程
消息发送完整流程:
┌───────────┐
│ Producer │
└─────┬─────┘
│1. 查询路由
▼
┌───────────┐
│NameServer │ 返回路由信息(Broker列表、Queue信息)
└─────┬─────┘
│
▼
┌───────────┐
│ Producer │ 2. 选择 Broker 和 Queue
└─────┬─────┘
│3. 发送消息
▼
┌───────────┐
│ Broker │ 4. 写入 CommitLog
│ ┌───────┐ │
│ │CommitLog│
│ └───┬───┘ │
│ │5. 异步构建 ConsumeQueue
│ ┌───▼──────┐
│ │ConsumeQ │
│ └──────────┘
│ │6. 返回结果
└─────┼─────┘
▼
┌───────────┐
│ Producer │ 7. 收到发送结果
└───────────┘
6.2 消费消息完整流程
消息消费完整流程:
┌───────────┐
│ Consumer │
└─────┬─────┘
│1. 注册到 Broker
▼
┌───────────┐
│ Broker │ 保存 Consumer 信息
└─────┬─────┘
│2. Rebalance 分配 Queue
▼
┌───────────┐
│ Consumer │ 3. 发起长轮询拉取
└─────┬─────┘
│
▼
┌───────────┐
│ Broker │ 4. 从 ConsumeQueue 查找
│ ┌───────┐ │ 5. 从 CommitLog 读取消息
│ │ConsumeQ│ │
│ └───┬───┘ │
│ │ │
│ ┌───▼────┐│
│ │CommitLog││
│ └────────┘│
│ │6. 返回消息
└─────┼─────┘
▼
┌───────────┐
│ Consumer │ 7. 处理消息
└─────┬─────┘ 8. 更新消费进度
│
▼
┌───────────┐
│ Broker │ 9. 保存消费进度
└───────────┘
七、设计亮点与思考
7.1 设计亮点
public class DesignHighlights {
// 1. NameServer 的轻量化设计
class LightweightNameServer {
/*
- 无状态,内存操作
- 对等部署,无需选举
- 简单高效
- AP 而非 CP
*/
}
// 2. CommitLog 的顺序写
class SequentialWrite {
/*
- 所有消息顺序写入同一个文件
- 充分利用磁盘顺序写性能
- 避免随机 IO
*/
}
// 3. ConsumeQueue 的索引设计
class IndexDesign {
/*
- 轻量级索引(20字节)
- 快速定位消息位置
- 支持消费进度管理
*/
}
// 4. 长轮询的伪 Push
class LongPolling {
/*
- Consumer 主动拉取
- Broker Hold 请求
- 兼顾实时性和可控性
*/
}
}
7.2 架构权衡
public class ArchitectureTradeoffs {
// 权衡1:性能 vs 可靠性
class PerformanceVsReliability {
// 异步刷盘 → 高性能,可能丢消息
// 同步刷盘 → 零丢失,性能下降
// 异步复制 → 高性能,可能丢消息
// 同步复制 → 高可靠,性能下降
}
// 权衡2:一致性 vs 可用性
class ConsistencyVsAvailability {
// NameServer 选择 AP
// - 可能短暂不一致
// - 但保证高可用
// Dledger 模式选择 CP
// - 保证数据一致
// - 需要多数节点存活
}
// 权衡3:复杂度 vs 功能
class ComplexityVsFeature {
// RocketMQ 比 Kafka 复杂
// - 但功能更丰富(事务、延迟、顺序)
// RocketMQ 比 RabbitMQ 简单
// - 但性能更高
}
}
八、总结
通过本篇,我们从整体视角理解了 RocketMQ 的架构:
- NameServer:轻量级路由中心,无状态高可用
- Broker:消息存储核心,顺序写高性能
- Producer:消息生产者,负载均衡与故障规避
- Consumer:消息消费者,长轮询与 Rebalance
RocketMQ 的架构设计充分体现了分布式系统的智慧,每个设计决策都有其深刻的考量。
下一篇预告
理解了整体架构,下一篇我们将深入 NameServer,探索它为什么能如此轻量高效,以及它的实现细节。
深入思考:
- 为什么 RocketMQ 不像 Kafka 那样让 Broker 依赖 ZooKeeper?
- CommitLog 顺序写为什么比随机写快这么多?
- 如果 NameServer 全部宕机,系统还能工作吗?
思考这些问题,深化理解!