引言:索引的魔法
如果说 CommitLog 是一本厚厚的字典,那么 ConsumeQueue 和 IndexFile 就是目录和索引。它们让 RocketMQ 能够:
- 秒级定位:从百万条消息中快速找到目标
- 轻量高效:索引占用空间极小
- 多维查询:支持按 Queue、Key、时间查询
今天我们深入剖析这两种索引的巧妙设计。
一、为什么需要索引?
1.1 没有索引的困境
场景:Consumer 想消费 TopicA 的消息
方案1:遍历 CommitLog(❌)
┌────────────────────────────────────┐
│ CommitLog(所有Topic混存) │
├────────────────────────────────────┤
│ TopicA-Msg1 │
│ TopicB-Msg1 │ ← 需要跳过
│ TopicC-Msg1 │ ← 需要跳过
│ TopicA-Msg2 │
│ TopicB-Msg2 │ ← 需要跳过
│ ... │
└────────────────────────────────────┘
问题:
1. 需要扫描所有消息 → 慢
2. 无法按 Queue 过滤 → 低效
3. 无法快速定位 Offset → 不可用
1.2 RocketMQ 的索引方案
┌─────────────────────────────────────────────┐
│ 双层索引架构 │
├─────────────────────────────────────────────┤
│ │
│ ConsumeQueue(消费索引) │
│ - 按 Topic-Queue 组织 │
│ - 存储消息在 CommitLog 的位置 │
│ - 支持顺序消费 │
│ │
│ IndexFile(查询索引) │
│ - 按 Key/时间 组织 │
│ - Hash 索引结构 │
│ - 支持随机查询 │
│ │
└─────────────────────────────────────────────┘
二、ConsumeQueue:消费索引
2.1 文件组织结构
$HOME/store/consumequeue/
├── TopicA/ # Topic 名称
│ ├── 0/ # Queue ID = 0
│ │ ├── 00000000000000000000 # 第1个文件
│ │ ├── 00000000000600000000 # 第2个文件
│ │ └── ...
│ ├── 1/ # Queue ID = 1
│ │ └── ...
│ └── ...
├── TopicB/
│ └── ...
└── ...
文件大小:600万字节(30万条索引 × 20字节)
文件名:该文件第一条索引的逻辑偏移量
2.2 索引格式
┌────────────────────────────────────┐
│ 单条索引格式(20字节) │
├────────────────────────────────────┤
│ CommitLog Offset (8字节) │ ← 消息在 CommitLog 的物理位置
│ Size (4字节) │ ← 消息大小
│ Tag HashCode (8字节) │ ← Tag 哈希值(用于过滤)
└────────────────────────────────────┘
实际示例:
TopicA-Queue0 的 ConsumeQueue:
Index 0: [CommitLog Offset: 0 , Size: 156, TagHash: 0x12345678]
Index 1: [CommitLog Offset: 156 , Size: 200, TagHash: 0x87654321]
Index 2: [CommitLog Offset: 356 , Size: 180, TagHash: 0xabcdef00]
...
Index 299999: [CommitLog Offset: 1073741824, Size: 150, ...]
文件总大小:300000 × 20 = 6,000,000 字节
2.3 ConsumeQueue 的构建流程
┌────────────────────────────────────────────┐
│ ConsumeQueue 异步构建流程 │
├────────────────────────────────────────────┤
│ │
│ 1. 消息写入 CommitLog │
│ ↓ │
│ 2. ReputMessageService 监听 │
│ ↓ │
│ 3. 从 CommitLog 读取消息 │
│ ↓ │
│ 4. 解析消息(Topic, QueueId, Offset...) │
│ ↓ │
│ 5. 写入对应的 ConsumeQueue │
│ (TopicA/Queue0/xxx) │
│ ↓ │
│ 6. 更新最大偏移量 │
│ │
└────────────────────────────────────────────┘
源码解析:
class ReputMessageService extends ServiceThread {
@Override
public void run() {
while (!this.isStopped()) {
try {
Thread.sleep(1);
// 1. 从 CommitLog 读取数据
SelectMappedBufferResult result =
CommitLog.this.getData(reputFromOffset);
if (result != null) {
try {
this.reputFromOffset = result.getStartOffset();
// 2. 遍历消息
for (int readSize = 0; readSize < result.getSize() && !this.isStopped();) {
// 3. 解析消息
DispatchRequest dispatchRequest =
CommitLog.this.checkMessageAndReturnSize(
result.getByteBuffer(), false, false);
if (dispatchRequest.isSuccess()) {
// 4. 分发到 ConsumeQueue 和 IndexFile
CommitLog.this.doDispatch(dispatchRequest);
// 更新已处理位置
this.reputFromOffset += dispatchRequest.getMsgSize();
readSize += dispatchRequest.getMsgSize();
}
}
} finally {
result.release();
}
}
} catch (Exception e) {
log.error("ReputMessageService exception", e);
}
}
}
}
// 分发到 ConsumeQueue
public void doDispatch(DispatchRequest req) {
// 1. 构建 ConsumeQueue
for (CommitLogDispatcher dispatcher : this.dispatcherList) {
dispatcher.dispatch(req);
}
}
class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
@Override
public void dispatch(DispatchRequest request) {
final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
// 非事务消息或已提交的事务消息
DefaultMessageStore.this.putMessagePositionInfo(request);
break;
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
// 事务消息不写入 ConsumeQueue
break;
}
}
}
// 写入 ConsumeQueue
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
ConsumeQueue cq = this.findConsumeQueue(
dispatchRequest.getTopic(),
dispatchRequest.getQueueId()
);
// 写入 20 字节索引
cq.putMessagePositionInfoWrapper(
dispatchRequest.getCommitLogOffset(), // CommitLog 偏移
dispatchRequest.getMsgSize(), // 消息大小
dispatchRequest.getTagsCode(), // Tag 哈希
dispatchRequest.getConsumeQueueOffset() // ConsumeQueue 偏移
);
}
2.4 Consumer 如何使用 ConsumeQueue
// 拉取消息流程
public GetMessageResult getMessage(
final String topic,
final int queueId,
final long offset, // 从哪个偏移量开始
final int maxMsgNums // 最多拉取多少条
) {
// 1. 找到对应的 ConsumeQueue
ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);
// 2. 从 ConsumeQueue 读取索引(20字节 × N条)
SelectMappedBufferResult bufferConsumeQueue =
consumeQueue.getIndexBuffer(offset);
long nextBeginOffset = offset;
long minOffset = consumeQueue.getMinOffsetInQueue();
long maxOffset = consumeQueue.getMaxOffsetInQueue();
if (maxOffset == 0) {
// 队列为空
return GetMessageStatus.NO_MESSAGE_IN_QUEUE;
}
// 3. 遍历索引,读取消息
for (int i = 0; i < bufferConsumeQueue.getSize() && i < maxMsgNums * 20; i += 20) {
// 读取 CommitLog Offset
long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
// 读取消息大小
int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
// 读取 Tag HashCode
long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
// 4. 从 CommitLog 读取完整消息
SelectMappedBufferResult selectResult =
this.commitLog.getMessage(offsetPy, sizePy);
if (selectResult != null) {
this.messageList.add(selectResult);
nextBeginOffset++;
}
}
return new GetMessageResult(
GetMessageStatus.FOUND,
nextBeginOffset,
minOffset,
maxOffset,
this.messageList
);
}
2.5 ConsumeQueue 的优势
┌────────────────────────────────────────────┐
│ ConsumeQueue 的设计优势 │
├────────────────────────────────────────────┤
│ 1. 轻量级:每条索引只有 20 字节 │
│ - 1 亿条消息 = 2GB 索引 │
│ │
│ 2. 顺序读:Consumer 顺序消费性能高 │
│ - 利用 PageCache 预读 │
│ │
│ 3. Tag 过滤:在索引层面过滤,减少 I/O │
│ - 避免读取不需要的消息 │
│ │
│ 4. 独立管理:每个 Topic-Queue 独立文件 │
│ - 易于扩展和管理 │
└────────────────────────────────────────────┘
三、IndexFile:查询索引
3.1 应用场景
场景1:根据 Message Key 查询
- 订单系统:根据订单号查询消息
- 日志系统:根据请求ID查询轨迹
场景2:根据时间范围查询
- 问题排查:查询某个时间段的消息
- 数据分析:统计某段时间的消息量
场景3:消息重放
- 根据业务 Key 重新消费历史消息
3.2 文件结构
$HOME/store/index/
├── 20251114000000000 # 文件名 = 创建时间戳
├── 20251114010000000
└── ...
单个文件大小:约 420MB
- Header: 40 字节
- Hash Slots: 500万 × 4字节 = 20MB
- Index Items: 2000万 × 20字节 = 400MB
3.3 Hash 索引结构
┌─────────────────────────────────────────────────┐
│ IndexFile 文件结构 │
├─────────────────────────────────────────────────┤
│ │
│ Header(40字节) │
│ ┌───────────────────────────────────────────┐ │
│ │ beginTimestamp (8字节) │ │ ← 第一条消息时间
│ │ endTimestamp (8字节) │ │ ← 最后一条消息时间
│ │ beginPhyOffset (8字节) │ │ ← 第一条消息物理偏移
│ │ endPhyOffset (8字节) │ │ ← 最后一条消息物理偏移
│ │ hashSlotCount (4字节) │ │ ← Hash槽数量(500万)
│ │ indexCount (4字节) │ │ ← 索引条目数量
│ └───────────────────────────────────────────┘ │
│ │
│ Hash Slots(500万个,每个4字节) │
│ ┌───────────────────────────────────────────┐ │
│ │ Slot 0: Index Position (4字节) │ │ ← 链表头指针
│ │ Slot 1: Index Position │ │
│ │ ... │ │
│ │ Slot 4999999: Index Position │ │
│ └───────────────────────────────────────────┘ │
│ │
│ Index Items(2000万个,每个20字节) │
│ ┌───────────────────────────────────────────┐ │
│ │ Item 0: │ │
│ │ keyHash (4字节) │ │ ← Key的哈希值
│ │ phyOffset (8字节) │ │ ← CommitLog 偏移
│ │ timeDiff (4字节) │ │ ← 时间差
│ │ nextIndexOffset (4字节) │ │ ← 下一个索引位置
│ ├───────────────────────────────────────────┤ │
│ │ Item 1: ... │ │
│ │ ... │ │
│ │ Item 19999999: ... │ │
│ └───────────────────────────────────────────┘ │
│ │
└─────────────────────────────────────────────────┘
3.4 索引构建流程
public class IndexService {
// 构建索引
public void buildIndex(DispatchRequest req) {
// 1. 获取当前 IndexFile
IndexFile indexFile = this.indexFileList.getLastIndexFile();
if (indexFile != null && indexFile.isWriteFull()) {
// 文件写满,创建新文件
indexFile = this.indexFileList.createIndexFile();
}
if (indexFile != null) {
long endPhyOffset = req.getCommitLogOffset() + req.getMsgSize();
// 2. 提取消息 Key
final String keys = req.getKeys(); // 可能有多个 Key(空格分隔)
if (keys != null && keys.length() > 0) {
String[] keyArray = keys.split(MessageConst.KEY_SEPARATOR);
for (String key : keyArray) {
if (key.length() > 0) {
// 3. 写入索引
boolean success = indexFile.putKey(
key, // Message Key
req.getCommitLogOffset(), // 物理偏移
req.getStoreTimestamp() // 存储时间
);
if (!success) {
log.error("putKey failed");
}
}
}
}
// 4. 更新文件头信息
if (indexFile.getEndTimestamp() < req.getStoreTimestamp()) {
indexFile.updateByteBuffer(endPhyOffset, req.getStoreTimestamp());
}
}
}
}
3.5 索引写入细节
public class IndexFile {
// 写入索引
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
// 1. 检查是否写满
if (this.indexHeader.getIndexCount() >= this.indexNum) {
log.warn("Index file is full");
return false;
}
// 2. 计算 Key 的哈希值
int keyHash = indexKeyHashMethod(key);
// 3. 计算 Hash 槽位置
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
// 4. 读取槽中的值(链表头)
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
// 5. 计算时间差
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000; // 转换为秒
// 6. 计算当前索引的位置
int absIndexPos = IndexHeader.INDEX_HEADER_SIZE
+ this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
// 7. 写入索引条目(20字节)
this.mappedByteBuffer.putInt(absIndexPos, keyHash); // Key 哈希
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); // 物理偏移
this.mappedByteBuffer.putInt(absIndexPos + 12, (int) timeDiff); // 时间差
this.mappedByteBuffer.putInt(absIndexPos + 16, slotValue); // 下一个索引
// 8. 更新 Hash 槽(指向新索引)
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
// 9. 更新索引计数
this.indexHeader.incIndexCount();
return true;
}
// 哈希算法
public int indexKeyHashMethod(final String key) {
int keyHash = key.hashCode();
int keyHashPositive = Math.abs(keyHash);
if (keyHashPositive < 0) {
keyHashPositive = 0;
}
return keyHashPositive;
}
}
3.6 索引查询流程
public class IndexService {
// 根据 Key 查询消息
public QueryOffsetResult queryOffset(String topic, String key,
int maxNum, long begin, long end) {
List<Long> phyOffsets = new ArrayList<>(maxNum);
// 1. 遍历所有 IndexFile(按时间范围过滤)
for (IndexFile indexFile : this.indexFileList) {
if (indexFile.getEndTimestamp() < begin) {
continue; // 文件太旧,跳过
}
if (indexFile.getBeginTimestamp() > end) {
break; // 文件太新,终止
}
// 2. 在单个文件中查询
List<Long> offsets = indexFile.selectPhyOffset(
Collections.singletonList(key.hashCode()),
maxNum,
begin,
end
);
phyOffsets.addAll(offsets);
if (phyOffsets.size() >= maxNum) {
break;
}
}
return new QueryOffsetResult(phyOffsets);
}
}
// 在单个 IndexFile 中查询
public void selectPhyOffset(final List<Long> phyOffsets, final String key,
int maxNum, final long begin, final long end) {
// 1. 计算 Hash 槽位置
int keyHash = indexKeyHashMethod(key);
int slotPos = keyHash % this.hashSlotNum;
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
// 2. 读取链表头
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
// 槽为空
return;
}
// 3. 遍历链表
for (int nextIndexToRead = slotValue; ; ) {
if (phyOffsets.size() >= maxNum) {
break;
}
// 读取索引条目
int absIndexPos = IndexHeader.INDEX_HEADER_SIZE
+ this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiffRead = this.mappedByteBuffer.getInt(absIndexPos + 12);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 16);
// 检查时间范围
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiffRead * 1000;
if (timeRead >= begin && timeRead <= end) {
// 检查 Hash 是否匹配
if (keyHashRead == keyHash) {
phyOffsets.add(phyOffsetRead);
}
}
// 继续遍历链表
if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount()) {
break;
}
nextIndexToRead = prevIndexRead;
}
}
3.7 查询示例
// 根据 Message Key 查询消息
public void queryMessageByKey() {
// 1. 查询索引,获取 CommitLog 偏移列表
QueryOffsetResult result = indexService.queryOffset(
"order-topic",
"ORDER_12345", // Message Key
64, // 最多返回64条
System.currentTimeMillis() - 3600000, // 1小时前
System.currentTimeMillis() // 现在
);
// 2. 根据物理偏移读取消息
List<MessageExt> messages = new ArrayList<>();
for (Long phyOffset : result.getPhyOffsets()) {
MessageExt msg = commitLog.lookMessageByOffset(phyOffset);
if (msg != null) {
messages.add(msg);
}
}
// 3. 过滤出真正匹配的消息(Hash 可能冲突)
for (MessageExt msg : messages) {
String keys = msg.getKeys();
if (keys != null && keys.contains("ORDER_12345")) {
System.out.println("Found: " + msg);
}
}
}
四、两种索引对比
4.1 功能对比
| 特性 | ConsumeQueue | IndexFile |
|---|---|---|
| 用途 | 消费索引 | 查询索引 |
| 组织方式 | 按 Topic-Queue | 按 Key/时间 |
| 访问模式 | 顺序访问 | 随机访问 |
| 查询方式 | 按偏移量 | 按 Key/时间范围 |
| 索引大小 | 20字节/条 | 20字节/条 |
| 构建方式 | 异步(必须) | 异步(可选) |
| 是否必需 | 是 | 否 |
4.2 适用场景
┌────────────────────────────────────────────┐
│ 使用场景对比 │
├────────────────────────────────────────────┤
│ │
│ ConsumeQueue: │
│ ✅ 消费者拉取消息(主要用途) │
│ ✅ 顺序消费 │
│ ✅ 按 Queue 分区消费 │
│ │
│ IndexFile: │
│ ✅ 根据 Message Key 查询 │
│ ✅ 根据时间范围查询 │
│ ✅ 问题排查和数据分析 │
│ ✅ 消息重放 │
│ │
└────────────────────────────────────────────┘
五、性能优化
5.1 ConsumeQueue 优化
1️⃣ 预读优化
// 启用 ConsumeQueue 预热
public void warmUp() {
ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
for (int i = 0; i < fileSize; i += OS_PAGE_SIZE) {
byteBuffer.get(i); // 触发 PageCache 加载
}
}
2️⃣ 批量读取
// 一次读取多条索引
int maxMsgNums = 32; // 一次读取32条
for (int i = 0; i < maxMsgNums * 20; i += 20) {
// 读取索引
}
// 优势:减少系统调用次数
5.2 IndexFile 优化
1️⃣ 合理设置 Hash 槽数量
# broker.conf
maxHashSlotNum=5000000 # 500万(默认)
maxIndexNum=20000000 # 2000万(默认)
# 选择建议:
# - Hash 槽数量 = 索引数量 / 4
# - 保持较低的冲突率
2️⃣ 限制索引文件大小
# 单个索引文件最大条目数
maxIndexNum=20000000
# 超过后自动创建新文件
# 避免单文件过大影响查询性能
六、监控与排查
6.1 关键指标
# 查看 ConsumeQueue 状态
ls -lh $HOME/store/consumequeue/{topic}/{queueId}/
# 查看 IndexFile 状态
ls -lh $HOME/store/index/
# 查看索引构建延迟
tail -f logs/rocketmqlogs/broker.log | grep "reput"
6.2 常见问题
问题1:ConsumeQueue 构建延迟
# 现象:Consumer 拉不到最新消息
# 排查:
# 1. 检查 ReputMessageService 是否正常
grep "ReputMessageService" logs/rocketmqlogs/broker.log
# 2. 检查 CommitLog 和 ConsumeQueue 的偏移量差距
# CommitLog 最大偏移 - ConsumeQueue 最大偏移
# 解决:
# - 检查磁盘 I/O 是否过载
# - 增加 Broker 内存
问题2:IndexFile 查询慢
# 原因:
# 1. Hash 冲突严重
# 2. 索引文件过多
# 3. 时间范围过大
# 优化:
# 1. 增加 Hash 槽数量
# 2. 缩小时间范围
# 3. 使用更精确的 Key
七、总结与思考
7.1 核心要点
- ConsumeQueue:轻量级消费索引,按 Topic-Queue 组织
- IndexFile:Hash 索引,支持 Key 和时间查询
- 异步构建:索引异步构建,不影响消息写入性能
- 双层索引:ConsumeQueue + IndexFile,覆盖不同查询场景
7.2 思考题
为什么 ConsumeQueue 只存 20 字节索引?
- 提示:考虑存储空间和查询效率
IndexFile 的 Hash 冲突如何处理?
- 提示:链表法
如果 ConsumeQueue 构建失败会怎样?
- 提示:Consumer 无法消费
7.3 下一步学习
下一篇我们将剖析 网络通信模型与 Remoting 模块,理解 RocketMQ 如何实现高性能网络通信。
参考资料
本文关键词:ConsumeQueue IndexFile 索引 Hash索引 消息查询
专题导航:上一篇:CommitLog深度剖析 | 下一篇:网络通信模型