引言:索引的魔法

如果说 CommitLog 是一本厚厚的字典,那么 ConsumeQueue 和 IndexFile 就是目录和索引。它们让 RocketMQ 能够:

  • 秒级定位:从百万条消息中快速找到目标
  • 轻量高效:索引占用空间极小
  • 多维查询:支持按 Queue、Key、时间查询

今天我们深入剖析这两种索引的巧妙设计。


一、为什么需要索引?

1.1 没有索引的困境

场景:Consumer 想消费 TopicA 的消息

方案1:遍历 CommitLog(❌)
┌────────────────────────────────────┐
│ CommitLog(所有Topic混存)          │
├────────────────────────────────────┤
│ TopicA-Msg1                        │
│ TopicB-Msg1                        │ ← 需要跳过
│ TopicC-Msg1                        │ ← 需要跳过
│ TopicA-Msg2                        │
│ TopicB-Msg2                        │ ← 需要跳过
│ ...                                │
└────────────────────────────────────┘

问题:
1. 需要扫描所有消息 → 慢
2. 无法按 Queue 过滤 → 低效
3. 无法快速定位 Offset → 不可用

1.2 RocketMQ 的索引方案

┌─────────────────────────────────────────────┐
│            双层索引架构                      │
├─────────────────────────────────────────────┤
│                                              │
│  ConsumeQueue(消费索引)                     │
│  - 按 Topic-Queue 组织                       │
│  - 存储消息在 CommitLog 的位置               │
│  - 支持顺序消费                              │
│                                              │
│  IndexFile(查询索引)                        │
│  - 按 Key/时间 组织                          │
│  - Hash 索引结构                             │
│  - 支持随机查询                              │
│                                              │
└─────────────────────────────────────────────┘

二、ConsumeQueue:消费索引

2.1 文件组织结构

$HOME/store/consumequeue/
├── TopicA/                    # Topic 名称
│   ├── 0/                     # Queue ID = 0
│   │   ├── 00000000000000000000    # 第1个文件
│   │   ├── 00000000000600000000    # 第2个文件
│   │   └── ...
│   ├── 1/                     # Queue ID = 1
│   │   └── ...
│   └── ...
├── TopicB/
│   └── ...
└── ...

文件大小:600万字节(30万条索引 × 20字节)
文件名:该文件第一条索引的逻辑偏移量

2.2 索引格式

┌────────────────────────────────────┐
│      单条索引格式(20字节)         │
├────────────────────────────────────┤
│ CommitLog Offset   (8字节)         │ ← 消息在 CommitLog 的物理位置
│ Size               (4字节)         │ ← 消息大小
│ Tag HashCode       (8字节)         │ ← Tag 哈希值(用于过滤)
└────────────────────────────────────┘

实际示例

TopicA-Queue0 的 ConsumeQueue:

Index 0: [CommitLog Offset: 0      , Size: 156, TagHash: 0x12345678]
Index 1: [CommitLog Offset: 156    , Size: 200, TagHash: 0x87654321]
Index 2: [CommitLog Offset: 356    , Size: 180, TagHash: 0xabcdef00]
...
Index 299999: [CommitLog Offset: 1073741824, Size: 150, ...]

文件总大小:300000 × 20 = 6,000,000 字节

2.3 ConsumeQueue 的构建流程

┌────────────────────────────────────────────┐
│       ConsumeQueue 异步构建流程            │
├────────────────────────────────────────────┤
│                                             │
│  1. 消息写入 CommitLog                      │
│           ↓                                 │
│  2. ReputMessageService 监听                │
│           ↓                                 │
│  3. 从 CommitLog 读取消息                   │
│           ↓                                 │
│  4. 解析消息(Topic, QueueId, Offset...)   │
│           ↓                                 │
│  5. 写入对应的 ConsumeQueue                 │
│     (TopicA/Queue0/xxx)                     │
│           ↓                                 │
│  6. 更新最大偏移量                          │
│                                             │
└────────────────────────────────────────────┘

源码解析

class ReputMessageService extends ServiceThread {

    @Override
    public void run() {
        while (!this.isStopped()) {
            try {
                Thread.sleep(1);

                // 1. 从 CommitLog 读取数据
                SelectMappedBufferResult result =
                    CommitLog.this.getData(reputFromOffset);

                if (result != null) {
                    try {
                        this.reputFromOffset = result.getStartOffset();

                        // 2. 遍历消息
                        for (int readSize = 0; readSize < result.getSize() && !this.isStopped();) {
                            // 3. 解析消息
                            DispatchRequest dispatchRequest =
                                CommitLog.this.checkMessageAndReturnSize(
                                    result.getByteBuffer(), false, false);

                            if (dispatchRequest.isSuccess()) {
                                // 4. 分发到 ConsumeQueue 和 IndexFile
                                CommitLog.this.doDispatch(dispatchRequest);

                                // 更新已处理位置
                                this.reputFromOffset += dispatchRequest.getMsgSize();
                                readSize += dispatchRequest.getMsgSize();
                            }
                        }
                    } finally {
                        result.release();
                    }
                }
            } catch (Exception e) {
                log.error("ReputMessageService exception", e);
            }
        }
    }
}

// 分发到 ConsumeQueue
public void doDispatch(DispatchRequest req) {
    // 1. 构建 ConsumeQueue
    for (CommitLogDispatcher dispatcher : this.dispatcherList) {
        dispatcher.dispatch(req);
    }
}

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

    @Override
    public void dispatch(DispatchRequest request) {
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());

        switch (tranType) {
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                // 非事务消息或已提交的事务消息
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                // 事务消息不写入 ConsumeQueue
                break;
        }
    }
}

// 写入 ConsumeQueue
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    ConsumeQueue cq = this.findConsumeQueue(
        dispatchRequest.getTopic(),
        dispatchRequest.getQueueId()
    );

    // 写入 20 字节索引
    cq.putMessagePositionInfoWrapper(
        dispatchRequest.getCommitLogOffset(),  // CommitLog 偏移
        dispatchRequest.getMsgSize(),          // 消息大小
        dispatchRequest.getTagsCode(),         // Tag 哈希
        dispatchRequest.getConsumeQueueOffset() // ConsumeQueue 偏移
    );
}

2.4 Consumer 如何使用 ConsumeQueue

// 拉取消息流程
public GetMessageResult getMessage(
    final String topic,
    final int queueId,
    final long offset,      // 从哪个偏移量开始
    final int maxMsgNums    // 最多拉取多少条
) {
    // 1. 找到对应的 ConsumeQueue
    ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);

    // 2. 从 ConsumeQueue 读取索引(20字节 × N条)
    SelectMappedBufferResult bufferConsumeQueue =
        consumeQueue.getIndexBuffer(offset);

    long nextBeginOffset = offset;
    long minOffset = consumeQueue.getMinOffsetInQueue();
    long maxOffset = consumeQueue.getMaxOffsetInQueue();

    if (maxOffset == 0) {
        // 队列为空
        return GetMessageStatus.NO_MESSAGE_IN_QUEUE;
    }

    // 3. 遍历索引,读取消息
    for (int i = 0; i < bufferConsumeQueue.getSize() && i < maxMsgNums * 20; i += 20) {
        // 读取 CommitLog Offset
        long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();
        // 读取消息大小
        int sizePy = bufferConsumeQueue.getByteBuffer().getInt();
        // 读取 Tag HashCode
        long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();

        // 4. 从 CommitLog 读取完整消息
        SelectMappedBufferResult selectResult =
            this.commitLog.getMessage(offsetPy, sizePy);

        if (selectResult != null) {
            this.messageList.add(selectResult);
            nextBeginOffset++;
        }
    }

    return new GetMessageResult(
        GetMessageStatus.FOUND,
        nextBeginOffset,
        minOffset,
        maxOffset,
        this.messageList
    );
}

2.5 ConsumeQueue 的优势

┌────────────────────────────────────────────┐
│        ConsumeQueue 的设计优势             │
├────────────────────────────────────────────┤
│ 1. 轻量级:每条索引只有 20 字节            │
│    - 1 亿条消息 = 2GB 索引                 │
│                                             │
│ 2. 顺序读:Consumer 顺序消费性能高         │
│    - 利用 PageCache 预读                   │
│                                             │
│ 3. Tag 过滤:在索引层面过滤,减少 I/O      │
│    - 避免读取不需要的消息                  │
│                                             │
│ 4. 独立管理:每个 Topic-Queue 独立文件     │
│    - 易于扩展和管理                        │
└────────────────────────────────────────────┘

三、IndexFile:查询索引

3.1 应用场景

场景1:根据 Message Key 查询
- 订单系统:根据订单号查询消息
- 日志系统:根据请求ID查询轨迹

场景2:根据时间范围查询
- 问题排查:查询某个时间段的消息
- 数据分析:统计某段时间的消息量

场景3:消息重放
- 根据业务 Key 重新消费历史消息

3.2 文件结构

$HOME/store/index/
├── 20251114000000000      # 文件名 = 创建时间戳
├── 20251114010000000
└── ...

单个文件大小:约 420MB
- Header:        40 字节
- Hash Slots:    500万 × 4字节 = 20MB
- Index Items:   2000万 × 20字节 = 400MB

3.3 Hash 索引结构

┌─────────────────────────────────────────────────┐
│              IndexFile 文件结构                  │
├─────────────────────────────────────────────────┤
│                                                  │
│  Header(40字节)                                │
│  ┌───────────────────────────────────────────┐  │
│  │ beginTimestamp        (8字节)             │  │  ← 第一条消息时间
│  │ endTimestamp          (8字节)             │  │  ← 最后一条消息时间
│  │ beginPhyOffset        (8字节)             │  │  ← 第一条消息物理偏移
│  │ endPhyOffset          (8字节)             │  │  ← 最后一条消息物理偏移
│  │ hashSlotCount         (4字节)             │  │  ← Hash槽数量(500万)
│  │ indexCount            (4字节)             │  │  ← 索引条目数量
│  └───────────────────────────────────────────┘  │
│                                                  │
│  Hash Slots(500万个,每个4字节)                │
│  ┌───────────────────────────────────────────┐  │
│  │ Slot 0:  Index Position (4字节)           │  │  ← 链表头指针
│  │ Slot 1:  Index Position                   │  │
│  │ ...                                       │  │
│  │ Slot 4999999:  Index Position             │  │
│  └───────────────────────────────────────────┘  │
│                                                  │
│  Index Items(2000万个,每个20字节)             │
│  ┌───────────────────────────────────────────┐  │
│  │ Item 0:                                   │  │
│  │   keyHash          (4字节)                │  │  ← Key的哈希值
│  │   phyOffset        (8字节)                │  │  ← CommitLog 偏移
│  │   timeDiff         (4字节)                │  │  ← 时间差
│  │   nextIndexOffset  (4字节)                │  │  ← 下一个索引位置
│  ├───────────────────────────────────────────┤  │
│  │ Item 1: ...                               │  │
│  │ ...                                       │  │
│  │ Item 19999999: ...                        │  │
│  └───────────────────────────────────────────┘  │
│                                                  │
└─────────────────────────────────────────────────┘

3.4 索引构建流程

public class IndexService {

    // 构建索引
    public void buildIndex(DispatchRequest req) {
        // 1. 获取当前 IndexFile
        IndexFile indexFile = this.indexFileList.getLastIndexFile();

        if (indexFile != null && indexFile.isWriteFull()) {
            // 文件写满,创建新文件
            indexFile = this.indexFileList.createIndexFile();
        }

        if (indexFile != null) {
            long endPhyOffset = req.getCommitLogOffset() + req.getMsgSize();

            // 2. 提取消息 Key
            final String keys = req.getKeys();  // 可能有多个 Key(空格分隔)
            if (keys != null && keys.length() > 0) {
                String[] keyArray = keys.split(MessageConst.KEY_SEPARATOR);

                for (String key : keyArray) {
                    if (key.length() > 0) {
                        // 3. 写入索引
                        boolean success = indexFile.putKey(
                            key,                           // Message Key
                            req.getCommitLogOffset(),      // 物理偏移
                            req.getStoreTimestamp()        // 存储时间
                        );

                        if (!success) {
                            log.error("putKey failed");
                        }
                    }
                }
            }

            // 4. 更新文件头信息
            if (indexFile.getEndTimestamp() < req.getStoreTimestamp()) {
                indexFile.updateByteBuffer(endPhyOffset, req.getStoreTimestamp());
            }
        }
    }
}

3.5 索引写入细节

public class IndexFile {

    // 写入索引
    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        // 1. 检查是否写满
        if (this.indexHeader.getIndexCount() >= this.indexNum) {
            log.warn("Index file is full");
            return false;
        }

        // 2. 计算 Key 的哈希值
        int keyHash = indexKeyHashMethod(key);

        // 3. 计算 Hash 槽位置
        int slotPos = keyHash % this.hashSlotNum;
        int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

        // 4. 读取槽中的值(链表头)
        int slotValue = this.mappedByteBuffer.getInt(absSlotPos);

        if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
            slotValue = invalidIndex;
        }

        // 5. 计算时间差
        long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
        timeDiff = timeDiff / 1000;  // 转换为秒

        // 6. 计算当前索引的位置
        int absIndexPos = IndexHeader.INDEX_HEADER_SIZE
            + this.hashSlotNum * hashSlotSize
            + this.indexHeader.getIndexCount() * indexSize;

        // 7. 写入索引条目(20字节)
        this.mappedByteBuffer.putInt(absIndexPos, keyHash);        // Key 哈希
        this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); // 物理偏移
        this.mappedByteBuffer.putInt(absIndexPos + 12, (int) timeDiff); // 时间差
        this.mappedByteBuffer.putInt(absIndexPos + 16, slotValue); // 下一个索引

        // 8. 更新 Hash 槽(指向新索引)
        this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

        // 9. 更新索引计数
        this.indexHeader.incIndexCount();

        return true;
    }

    // 哈希算法
    public int indexKeyHashMethod(final String key) {
        int keyHash = key.hashCode();
        int keyHashPositive = Math.abs(keyHash);
        if (keyHashPositive < 0) {
            keyHashPositive = 0;
        }
        return keyHashPositive;
    }
}

3.6 索引查询流程

public class IndexService {

    // 根据 Key 查询消息
    public QueryOffsetResult queryOffset(String topic, String key,
                                        int maxNum, long begin, long end) {
        List<Long> phyOffsets = new ArrayList<>(maxNum);

        // 1. 遍历所有 IndexFile(按时间范围过滤)
        for (IndexFile indexFile : this.indexFileList) {
            if (indexFile.getEndTimestamp() < begin) {
                continue;  // 文件太旧,跳过
            }
            if (indexFile.getBeginTimestamp() > end) {
                break;     // 文件太新,终止
            }

            // 2. 在单个文件中查询
            List<Long> offsets = indexFile.selectPhyOffset(
                Collections.singletonList(key.hashCode()),
                maxNum,
                begin,
                end
            );

            phyOffsets.addAll(offsets);

            if (phyOffsets.size() >= maxNum) {
                break;
            }
        }

        return new QueryOffsetResult(phyOffsets);
    }
}

// 在单个 IndexFile 中查询
public void selectPhyOffset(final List<Long> phyOffsets, final String key,
                            int maxNum, final long begin, final long end) {
    // 1. 计算 Hash 槽位置
    int keyHash = indexKeyHashMethod(key);
    int slotPos = keyHash % this.hashSlotNum;
    int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

    // 2. 读取链表头
    int slotValue = this.mappedByteBuffer.getInt(absSlotPos);

    if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
        // 槽为空
        return;
    }

    // 3. 遍历链表
    for (int nextIndexToRead = slotValue; ; ) {
        if (phyOffsets.size() >= maxNum) {
            break;
        }

        // 读取索引条目
        int absIndexPos = IndexHeader.INDEX_HEADER_SIZE
            + this.hashSlotNum * hashSlotSize
            + nextIndexToRead * indexSize;

        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
        long timeDiffRead = this.mappedByteBuffer.getInt(absIndexPos + 12);
        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 16);

        // 检查时间范围
        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiffRead * 1000;
        if (timeRead >= begin && timeRead <= end) {
            // 检查 Hash 是否匹配
            if (keyHashRead == keyHash) {
                phyOffsets.add(phyOffsetRead);
            }
        }

        // 继续遍历链表
        if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount()) {
            break;
        }

        nextIndexToRead = prevIndexRead;
    }
}

3.7 查询示例

// 根据 Message Key 查询消息
public void queryMessageByKey() {
    // 1. 查询索引,获取 CommitLog 偏移列表
    QueryOffsetResult result = indexService.queryOffset(
        "order-topic",
        "ORDER_12345",     // Message Key
        64,                // 最多返回64条
        System.currentTimeMillis() - 3600000,  // 1小时前
        System.currentTimeMillis()              // 现在
    );

    // 2. 根据物理偏移读取消息
    List<MessageExt> messages = new ArrayList<>();
    for (Long phyOffset : result.getPhyOffsets()) {
        MessageExt msg = commitLog.lookMessageByOffset(phyOffset);
        if (msg != null) {
            messages.add(msg);
        }
    }

    // 3. 过滤出真正匹配的消息(Hash 可能冲突)
    for (MessageExt msg : messages) {
        String keys = msg.getKeys();
        if (keys != null && keys.contains("ORDER_12345")) {
            System.out.println("Found: " + msg);
        }
    }
}

四、两种索引对比

4.1 功能对比

特性ConsumeQueueIndexFile
用途消费索引查询索引
组织方式按 Topic-Queue按 Key/时间
访问模式顺序访问随机访问
查询方式按偏移量按 Key/时间范围
索引大小20字节/条20字节/条
构建方式异步(必须)异步(可选)
是否必需

4.2 适用场景

┌────────────────────────────────────────────┐
│            使用场景对比                     │
├────────────────────────────────────────────┤
│                                             │
│  ConsumeQueue:                             │
│  ✅ 消费者拉取消息(主要用途)               │
│  ✅ 顺序消费                                │
│  ✅ 按 Queue 分区消费                       │
│                                             │
│  IndexFile:                                │
│  ✅ 根据 Message Key 查询                   │
│  ✅ 根据时间范围查询                        │
│  ✅ 问题排查和数据分析                      │
│  ✅ 消息重放                                │
│                                             │
└────────────────────────────────────────────┘

五、性能优化

5.1 ConsumeQueue 优化

1️⃣ 预读优化

// 启用 ConsumeQueue 预热
public void warmUp() {
    ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
    for (int i = 0; i < fileSize; i += OS_PAGE_SIZE) {
        byteBuffer.get(i);  // 触发 PageCache 加载
    }
}

2️⃣ 批量读取

// 一次读取多条索引
int maxMsgNums = 32;  // 一次读取32条
for (int i = 0; i < maxMsgNums * 20; i += 20) {
    // 读取索引
}

// 优势:减少系统调用次数

5.2 IndexFile 优化

1️⃣ 合理设置 Hash 槽数量

# broker.conf
maxHashSlotNum=5000000      # 500万(默认)
maxIndexNum=20000000        # 2000万(默认)

# 选择建议:
# - Hash 槽数量 = 索引数量 / 4
# - 保持较低的冲突率

2️⃣ 限制索引文件大小

# 单个索引文件最大条目数
maxIndexNum=20000000

# 超过后自动创建新文件
# 避免单文件过大影响查询性能

六、监控与排查

6.1 关键指标

# 查看 ConsumeQueue 状态
ls -lh $HOME/store/consumequeue/{topic}/{queueId}/

# 查看 IndexFile 状态
ls -lh $HOME/store/index/

# 查看索引构建延迟
tail -f logs/rocketmqlogs/broker.log | grep "reput"

6.2 常见问题

问题1:ConsumeQueue 构建延迟

# 现象:Consumer 拉不到最新消息

# 排查:
# 1. 检查 ReputMessageService 是否正常
grep "ReputMessageService" logs/rocketmqlogs/broker.log

# 2. 检查 CommitLog 和 ConsumeQueue 的偏移量差距
# CommitLog 最大偏移 - ConsumeQueue 最大偏移

# 解决:
# - 检查磁盘 I/O 是否过载
# - 增加 Broker 内存

问题2:IndexFile 查询慢

# 原因:
# 1. Hash 冲突严重
# 2. 索引文件过多
# 3. 时间范围过大

# 优化:
# 1. 增加 Hash 槽数量
# 2. 缩小时间范围
# 3. 使用更精确的 Key

七、总结与思考

7.1 核心要点

  1. ConsumeQueue:轻量级消费索引,按 Topic-Queue 组织
  2. IndexFile:Hash 索引,支持 Key 和时间查询
  3. 异步构建:索引异步构建,不影响消息写入性能
  4. 双层索引:ConsumeQueue + IndexFile,覆盖不同查询场景

7.2 思考题

  1. 为什么 ConsumeQueue 只存 20 字节索引?

    • 提示:考虑存储空间和查询效率
  2. IndexFile 的 Hash 冲突如何处理?

    • 提示:链表法
  3. 如果 ConsumeQueue 构建失败会怎样?

    • 提示:Consumer 无法消费

7.3 下一步学习

下一篇我们将剖析 网络通信模型与 Remoting 模块,理解 RocketMQ 如何实现高性能网络通信。


参考资料


本文关键词ConsumeQueue IndexFile 索引 Hash索引 消息查询

专题导航上一篇:CommitLog深度剖析 | 下一篇:网络通信模型