引言:一个精妙的协调机制
假设你经营一家餐厅,有10个服务员(Consumer),需要服务100桌客人(Message Queue):
- 如何公平分配?每个服务员负责10桌?
- 如果有服务员请假,谁来接管他的桌子?
- 如果来了新服务员,如何重新分配?
- 如何记住每桌的服务进度?
这就是 RocketMQ Consumer Group 要解决的问题。
一、Consumer Group 核心概念
1.1 什么是 Consumer Group
public class ConsumerGroupConcept {
// Consumer Group 定义
class ConsumerGroup {
String groupName; // 组名(全局唯一)
List<String> consumerIdList; // 组内消费者列表
SubscriptionData subscriptionData; // 订阅信息
ConsumeType consumeType; // 消费模式
MessageModel messageModel; // 消息模式(集群/广播)
ConsumeFromWhere consumeFromWhere; // 从何处开始消费
}
// 核心规则
class CoreRules {
// 1. 同一 Group 内的 Consumer 订阅关系必须一致
// 2. 同一 Group 内的 Consumer 均分消息
// 3. 不同 Group 相互独立,都能收到全量消息
// 4. Group 是消费进度管理的单位
}
}
1.2 Consumer Group 的设计目标
public class DesignGoals {
// 目标1:负载均衡
void loadBalancing() {
// 10个 Queue,5个 Consumer
// 每个 Consumer 负责 2个 Queue
// 自动、公平、动态
}
// 目标2:高可用
void highAvailability() {
// Consumer 宕机,其他 Consumer 接管
// 无消息丢失
// 自动故障转移
}
// 目标3:水平扩展
void horizontalScaling() {
// 动态增加 Consumer
// 自动重新分配
// 无需停机
}
// 目标4:消费进度管理
void progressManagement() {
// 统一管理消费位点
// 支持重置进度
// 持久化存储
}
}
二、Rebalance 机制详解
2.1 什么是 Rebalance
public class RebalanceMechanism {
// Rebalance:重新分配 Queue 给 Consumer
class RebalanceDefinition {
// 触发时机:
// 1. Consumer 上线/下线
// 2. Queue 数量变化
// 3. 订阅关系变化
// 目标:
// 保证每个 Queue 只被一个 Consumer 消费
// 尽可能均匀分配
}
// Rebalance 流程
class RebalanceFlow {
public void doRebalance() {
// 1. 获取 Topic 的所有 Queue
List<MessageQueue> mqAll = getTopicQueues(topic);
// 2. 获取 Consumer Group 的所有 Consumer
List<String> cidAll = getConsumerIdList(topic, group);
// 3. 排序保证一致性
Collections.sort(mqAll);
Collections.sort(cidAll);
// 4. 执行分配算法
List<MessageQueue> allocateResult = strategy.allocate(
consumerGroup,
currentCID,
mqAll,
cidAll
);
// 5. 更新本地队列
updateProcessQueueTable(allocateResult);
}
}
}
2.2 Rebalance 触发流程
Rebalance 触发流程:
Consumer 加入:
┌──────────┐ 注册 ┌──────────┐
│Consumer1 │ ───────────────> │ Broker │
└──────────┘ └──────────┘
│
通知其他Consumer
↓
┌──────────┐ ┌──────────┐
│Consumer2 │ <──── Rebalance ─│Consumer3 │
└──────────┘ └──────────┘
时间线:
T0: Consumer1 加入 Group
T1: Broker 通知所有 Consumer
T2: 所有 Consumer 开始 Rebalance
T3: 完成 Queue 重新分配
2.3 Rebalance 实现细节
public class RebalanceImplementation {
// Consumer 端 Rebalance 服务
class RebalanceService extends ServiceThread {
@Override
public void run() {
while (!this.isStopped()) {
// 默认每 20 秒执行一次
this.waitForRunning(20000);
this.doRebalance();
}
}
private void doRebalance() {
// 对所有订阅的 Topic 执行 Rebalance
for (Map.Entry<String, SubscriptionData> entry :
subscriptionInner.entrySet()) {
String topic = entry.getKey();
rebalanceByTopic(topic);
}
}
private void rebalanceByTopic(String topic) {
// 1. 获取 Topic 的队列信息
Set<MessageQueue> mqSet = topicSubscribeInfoTable.get(topic);
// 2. 获取同组的所有 Consumer ID
List<String> cidAll = mQClientFactory.findConsumerIdList(
topic, consumerGroup
);
// 3. 执行分配
if (mqSet != null && cidAll != null) {
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
try {
allocateResult = strategy.allocate(
this.consumerGroup,
this.mQClientFactory.getClientId(),
new ArrayList<>(mqSet),
cidAll
);
} catch (Throwable e) {
log.error("AllocateMessageQueueStrategy.allocate Exception", e);
}
// 4. 更新分配结果
updateProcessQueueTableInRebalance(topic, allocateResult);
}
}
}
// 更新本地队列表
private void updateProcessQueueTableInRebalance(String topic, List<MessageQueue> mqSet) {
// 移除不再分配给自己的队列
for (MessageQueue mq : processQueueTable.keySet()) {
if (!mqSet.contains(mq)) {
ProcessQueue pq = processQueueTable.remove(mq);
if (pq != null) {
pq.setDropped(true); // 标记为丢弃
log.info("Drop queue: {}", mq);
}
}
}
// 添加新分配的队列
for (MessageQueue mq : mqSet) {
if (!processQueueTable.containsKey(mq)) {
ProcessQueue pq = new ProcessQueue();
processQueueTable.put(mq, pq);
// 创建拉取请求
PullRequest pullRequest = new PullRequest();
pullRequest.setMessageQueue(mq);
pullRequest.setProcessQueue(pq);
pullRequest.setNextOffset(computePullFromWhere(mq));
// 立即执行拉取
pullMessageService.executePullRequestImmediately(pullRequest);
log.info("Add new queue: {}", mq);
}
}
}
}
三、负载均衡策略
3.1 内置分配策略
public class AllocationStrategies {
// 1. 平均分配策略(默认)
class AllocateMessageQueueAveragely {
/*
示例:8个Queue,3个Consumer
Consumer0: Q0, Q1, Q2
Consumer1: Q3, Q4, Q5
Consumer2: Q6, Q7
*/
public List<MessageQueue> allocate(String group, String currentCID,
List<MessageQueue> mqAll,
List<String> cidAll) {
int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize = (mqAll.size() <= cidAll.size()) ? 1 :
(mod > 0 && index < mod) ?
mqAll.size() / cidAll.size() + 1 :
mqAll.size() / cidAll.size();
int startIndex = (mod > 0 && index < mod) ?
index * averageSize :
index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
return mqAll.subList(startIndex, startIndex + range);
}
}
// 2. 环形平均分配
class AllocateMessageQueueAveragelyByCircle {
/*
示例:8个Queue,3个Consumer
Consumer0: Q0, Q3, Q6
Consumer1: Q1, Q4, Q7
Consumer2: Q2, Q5
*/
public List<MessageQueue> allocate(String group, String currentCID,
List<MessageQueue> mqAll,
List<String> cidAll) {
int index = cidAll.indexOf(currentCID);
List<MessageQueue> result = new ArrayList<>();
for (int i = index; i < mqAll.size(); i += cidAll.size()) {
result.add(mqAll.get(i));
}
return result;
}
}
// 3. 一致性Hash分配
class AllocateMessageQueueConsistentHash {
/*
使用一致性Hash算法
优点:Consumer变化时,影响的Queue最少
*/
private final int virtualNodeCnt;
private final HashFunction hashFunc;
public List<MessageQueue> allocate(String group, String currentCID,
List<MessageQueue> mqAll,
List<String> cidAll) {
// 构建Hash环
TreeMap<Long, String> hashRing = new TreeMap<>();
for (String cid : cidAll) {
for (int i = 0; i < virtualNodeCnt; i++) {
long hash = hashFunc.hash(cid + "#" + i);
hashRing.put(hash, cid);
}
}
// 分配Queue
List<MessageQueue> result = new ArrayList<>();
for (MessageQueue mq : mqAll) {
long hash = hashFunc.hash(mq.toString());
Map.Entry<Long, String> entry = hashRing.ceilingEntry(hash);
if (entry == null) {
entry = hashRing.firstEntry();
}
if (currentCID.equals(entry.getValue())) {
result.add(mq);
}
}
return result;
}
}
// 4. 机房就近分配
class AllocateMessageQueueByMachineRoom {
/*
根据机房分配,优先同机房
减少跨机房流量
*/
public List<MessageQueue> allocate(String group, String currentCID,
List<MessageQueue> mqAll,
List<String> cidAll) {
// 获取当前Consumer的机房
String currentRoom = getMachineRoom(currentCID);
// 同机房的Queue
List<MessageQueue> sameRoomQueues = new ArrayList<>();
// 其他机房的Queue
List<MessageQueue> diffRoomQueues = new ArrayList<>();
for (MessageQueue mq : mqAll) {
if (currentRoom.equals(getMachineRoom(mq.getBrokerName()))) {
sameRoomQueues.add(mq);
} else {
diffRoomQueues.add(mq);
}
}
// 优先分配同机房Queue
List<MessageQueue> result = new ArrayList<>();
result.addAll(allocateQueues(sameRoomQueues, currentCID, cidAll));
// 如果还需要更多Queue,分配其他机房的
if (result.size() < getTargetQueueNum(mqAll.size(), cidAll.size())) {
result.addAll(allocateQueues(diffRoomQueues, currentCID, cidAll));
}
return result;
}
}
// 5. 自定义分配策略
class CustomAllocationStrategy implements AllocateMessageQueueStrategy {
@Override
public List<MessageQueue> allocate(String group, String currentCID,
List<MessageQueue> mqAll,
List<String> cidAll) {
// 根据业务需求自定义
// 例如:VIP Consumer 分配更多Queue
if (isVipConsumer(currentCID)) {
// VIP分配60%的Queue
return allocateForVip(mqAll, currentCID, cidAll);
} else {
// 普通Consumer平分剩余40%
return allocateForNormal(mqAll, currentCID, cidAll);
}
}
}
}
3.2 策略选择指南
public class StrategySelectionGuide {
public void selectStrategy() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
// 场景1:默认场景
// 使用平均分配,简单公平
consumer.setAllocateMessageQueueStrategy(
new AllocateMessageQueueAveragely()
);
// 场景2:Consumer频繁变动
// 使用一致性Hash,减少重分配
consumer.setAllocateMessageQueueStrategy(
new AllocateMessageQueueConsistentHash()
);
// 场景3:多机房部署
// 使用机房就近,减少跨机房流量
consumer.setAllocateMessageQueueStrategy(
new AllocateMessageQueueByMachineRoom()
);
// 场景4:特殊业务需求
// 自定义策略
consumer.setAllocateMessageQueueStrategy(
new CustomAllocationStrategy()
);
}
}
四、消费进度管理
4.1 消费进度存储
public class ConsumeProgressStorage {
// 集群模式:进度存储在 Broker
class ClusterModeProgress {
// 存储位置:{ROCKETMQ_HOME}/store/config/consumerOffset.json
/*
{
"offsetTable": {
"ORDER_TOPIC@ORDER_GROUP": {
"0": 123456, // Queue0 的消费进度
"1": 234567, // Queue1 的消费进度
"2": 345678, // Queue2 的消费进度
"3": 456789 // Queue3 的消费进度
}
}
}
*/
// Broker 端定期持久化(默认5秒)
class ConsumerOffsetManager {
private ConcurrentHashMap<String/* topic@group */,
ConcurrentHashMap<Integer, Long>> offsetTable;
public void commitOffset(String group, String topic, int queueId, long offset) {
String key = topic + "@" + group;
ConcurrentHashMap<Integer, Long> map = offsetTable.get(key);
if (map != null) {
Long oldOffset = map.put(queueId, offset);
if (oldOffset == null || offset > oldOffset) {
// 更新成功
}
}
}
}
}
// 广播模式:进度存储在本地
class BroadcastModeProgress {
// 存储位置:{user.home}/.rocketmq_offsets/{clientId}/{group}/offsets.json
class LocalFileOffsetStore {
private String storePath;
private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable;
public void persistAll() {
// 持久化到本地文件
String jsonString = JSON.toJSONString(offsetTable);
MixAll.string2File(jsonString, storePath);
}
public void load() {
// 从本地文件加载
String content = MixAll.file2String(storePath);
if (content != null) {
offsetTable = JSON.parseObject(content,
new TypeReference<ConcurrentHashMap<MessageQueue, AtomicLong>>(){});
}
}
}
}
}
4.2 消费进度提交
public class ProgressCommit {
// Consumer 端进度提交
class ConsumerProgressCommit {
// 定时提交(默认5秒)
class ScheduledCommit {
private ScheduledExecutorService scheduledExecutorService;
public void start() {
scheduledExecutorService.scheduleAtFixedRate(() -> {
try {
persistConsumerOffset();
} catch (Exception e) {
log.error("persistConsumerOffset error", e);
}
}, 1000, 5000, TimeUnit.MILLISECONDS);
}
private void persistConsumerOffset() {
for (Map.Entry<MessageQueue, ProcessQueue> entry :
processQueueTable.entrySet()) {
MessageQueue mq = entry.getKey();
ProcessQueue pq = entry.getValue();
if (pq.isDropped()) {
continue;
}
// 获取消费进度
long offset = pq.getConsumeOffset();
// 提交到 Broker
offsetStore.updateOffset(mq, offset, false);
}
// 批量提交
offsetStore.persistAll();
}
}
// 关闭时提交
class ShutdownCommit {
public void shutdown() {
// 停止定时任务
scheduledExecutorService.shutdown();
// 最后一次提交
persistConsumerOffset();
// 等待提交完成
offsetStore.persistAll();
}
}
}
}
4.3 进度重置
public class ProgressReset {
// 重置消费进度的方式
class ResetMethods {
// 1. 通过管理工具重置
void resetByAdmin() {
// 重置到最早
// sh mqadmin resetOffset -g GROUP -t TOPIC -o 0
// 重置到最新
// sh mqadmin resetOffset -g GROUP -t TOPIC -o -1
// 重置到指定时间
// sh mqadmin resetOffset -g GROUP -t TOPIC -o 2023-11-13#10:00:00
}
// 2. 通过代码重置
void resetByCode() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
// 设置从哪里开始消费
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// CONSUME_FROM_FIRST_OFFSET: 从最早消息
// CONSUME_FROM_LAST_OFFSET: 从最新消息
// CONSUME_FROM_TIMESTAMP: 从指定时间
// 设置消费时间点
consumer.setConsumeTimestamp("2023-11-13 10:00:00");
}
// 3. 跳过堆积消息
void skipBacklog() {
// 获取最大偏移量
long maxOffset = consumer.maxOffset(mq);
// 更新到最大偏移量
consumer.updateConsumeOffset(mq, maxOffset);
}
}
}
五、Rebalance 过程中的问题
5.1 消息重复消费
public class DuplicateConsumption {
// 问题:Rebalance 导致消息重复
class Problem {
/*
场景:
1. Consumer1 正在处理 Queue1 的消息
2. Rebalance 发生,Queue1 分配给 Consumer2
3. Consumer1 还没提交进度就失去了 Queue1
4. Consumer2 从旧进度开始消费,导致重复
*/
}
// 解决方案
class Solution {
// 1. 幂等消费
class IdempotentConsumer {
private Set<String> processedIds = new ConcurrentHashSet<>();
public ConsumeStatus consumeMessage(List<MessageExt> msgs) {
for (MessageExt msg : msgs) {
String bizId = msg.getKeys(); // 使用业务ID
if (!processedIds.add(bizId)) {
// 已处理,跳过
log.info("Message already processed: {}", bizId);
continue;
}
// 处理消息
processMessage(msg);
}
return ConsumeStatus.CONSUME_SUCCESS;
}
}
// 2. 减少 Rebalance 频率
class ReduceRebalance {
void configure() {
// 增加心跳间隔
consumer.setHeartbeatBrokerInterval(30000); // 30秒
// 使用一致性Hash
consumer.setAllocateMessageQueueStrategy(
new AllocateMessageQueueConsistentHash()
);
}
}
}
}
5.2 消费停顿
public class ConsumePause {
// 问题:Rebalance 期间消费暂停
class Problem {
/*
Rebalance 过程:
1. 停止当前消费
2. 等待其他 Consumer Rebalance
3. 重新分配Queue
4. 恢复消费
期间有短暂停顿
*/
}
// 优化方案
class Optimization {
// 1. 减少 Rebalance 时间
void reduceRebalanceTime() {
// 减少 Consumer 数量变化
// 使用容器编排,批量上下线
}
// 2. 平滑迁移
class SmoothMigration {
void migrate() {
// 先启动新 Consumer
startNewConsumers();
// 等待 Rebalance 完成
Thread.sleep(30000);
// 再停止旧 Consumer
stopOldConsumers();
}
}
// 3. 预分配策略
void preAllocate() {
// 预先规划好 Queue 分配
// 使用自定义分配策略
consumer.setAllocateMessageQueueStrategy(
new StaticAllocationStrategy()
);
}
}
}
六、监控和运维
6.1 消费组监控
public class ConsumerGroupMonitoring {
// 监控指标
class Metrics {
// 1. 消费进度监控
void monitorProgress() {
// 消费延迟
long lag = maxOffset - consumerOffset;
// 消费速度
double tps = (currentOffset - lastOffset) / interval;
// 预计消费完成时间
double eta = lag / tps;
}
// 2. Rebalance 监控
void monitorRebalance() {
// Rebalance 次数
int rebalanceCount;
// Rebalance 耗时
long rebalanceDuration;
// Queue 分配变化
Map<String, List<MessageQueue>> allocationHistory;
}
// 3. Consumer 健康度
void monitorHealth() {
// 在线 Consumer 数量
int onlineConsumerCount;
// 消费线程池状态
ThreadPoolExecutor executor = getConsumeExecutor();
int activeCount = executor.getActiveCount();
int queueSize = executor.getQueue().size();
}
}
// 运维命令
class AdminCommands {
// 查看消费进度
void checkProgress() {
// sh mqadmin consumerProgress -n localhost:9876 -g ORDER_GROUP
}
// 查看消费组列表
void listConsumerGroups() {
// sh mqadmin consumerGroupList -n localhost:9876
}
// 查看消费组详情
void consumerGroupDetail() {
// sh mqadmin consumerConnection -n localhost:9876 -g ORDER_GROUP
}
}
}
6.2 问题诊断
public class ProblemDiagnosis {
// 常见问题诊断
class CommonProblems {
// 1. 消费速度慢
void slowConsumption() {
// 检查点:
// - Consumer 数量是否足够
// - 消费线程池是否饱和
// - 单条消息处理时间
// - 网络延迟
// 解决:
// - 增加 Consumer 实例
// - 增加消费线程数
// - 优化消费逻辑
// - 批量消费
}
// 2. 消费不均衡
void imbalancedConsumption() {
// 检查点:
// - Queue 分配是否均匀
// - Consumer 处理能力是否一致
// - 是否有热点 Queue
// 解决:
// - 调整分配策略
// - 增加 Queue 数量
// - 使用一致性 Hash
}
// 3. Rebalance 频繁
void frequentRebalance() {
// 检查点:
// - Consumer 是否频繁上下线
// - 网络是否稳定
// - 心跳是否超时
// 解决:
// - 增加心跳间隔
// - 优化部署策略
// - 使用容器编排
}
}
}
七、最佳实践
7.1 Consumer Group 设计原则
public class DesignPrinciples {
// 1. 合理设置 Consumer 数量
class ConsumerCount {
void calculate() {
// Consumer 数量 <= Queue 数量
// 否则有 Consumer 空闲
// 建议:
int queueCount = 16;
int consumerCount = Math.min(queueCount, 8); // 不超过 Queue 数量
}
}
// 2. 订阅关系一致性
class SubscriptionConsistency {
// 同一 Group 内所有 Consumer 必须:
// - 订阅相同的 Topic
// - 使用相同的 Tag 过滤
// - 使用相同的消费模式
// ❌ 错误示例
void wrongExample() {
// Consumer1
consumer1.subscribe("TOPIC", "TagA");
// Consumer2(同组但不同Tag)
consumer2.subscribe("TOPIC", "TagB"); // 错误!
}
// ✅ 正确示例
void correctExample() {
// 所有 Consumer 订阅一致
consumer.subscribe("TOPIC", "TagA || TagB");
}
}
// 3. 消费进度管理
class ProgressManagement {
void bestPractice() {
// 定期监控消费延迟
monitorConsumerLag();
// 设置合理的提交间隔
consumer.setAutoCommitIntervalMillis(5000);
// 优雅关闭
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
consumer.shutdown();
}));
}
}
}
7.2 性能优化
public class PerformanceOptimization {
// 1. 消费并发度调优
void optimizeConcurrency() {
// 根据消息处理时间调整
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
// 批量消费
consumer.setConsumeMessageBatchMaxSize(10);
}
// 2. 拉取优化
void optimizePull() {
// 拉取批次大小
consumer.setPullBatchSize(32);
// 拉取间隔(0表示不间断)
consumer.setPullInterval(0);
// 流控阈值
consumer.setPullThresholdForQueue(1000);
}
// 3. Rebalance 优化
void optimizeRebalance() {
// 使用合适的分配策略
consumer.setAllocateMessageQueueStrategy(
new AllocateMessageQueueConsistentHash()
);
// 减少 Rebalance 触发
consumer.setConsumeFromWhere(
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
);
}
}
八、总结
通过本篇,我们深入理解了:
- Consumer Group 机制:如何实现负载均衡和高可用
- Rebalance 原理:Queue 如何在 Consumer 间动态分配
- 分配策略:不同策略的特点和适用场景
- 消费进度管理:进度如何存储、提交和重置
- 问题与优化:常见问题的诊断和解决
Consumer Group 是 RocketMQ 的精髓之一,它用简单的概念实现了复杂的分布式协调。
下一篇预告
基础篇的最后一篇,我们将通过 SpringBoot 集成实战,把前面学到的知识串联起来,构建一个生产级的消息系统。
思考与实践:
- 为什么同一 Consumer Group 内的订阅关系必须一致?
- Rebalance 过程中如何保证消息不丢失?
- 在你的业务中,应该选择哪种分配策略?
深入理解原理,才能更好地应用!