引言:一个模型,两种模式
传统消息系统通常要在两种模型中二选一:
- JMS:明确区分 Queue(点对点)和 Topic(发布订阅)
- AMQP:通过 Exchange 和 Binding 实现不同模式
- Kafka:只有 Topic,通过 Consumer Group 实现不同语义
而 RocketMQ 的独特之处在于:用一套模型,同时支持两种模式。
让我们深入理解这是如何实现的。
一、两种基本消息模型
1.1 点对点模型(Point-to-Point)
public class P2PModel {
// 点对点模型特征
class Characteristics {
// 1. 一条消息只能被一个消费者消费
// 2. 消息被消费后从队列中删除
// 3. 多个消费者竞争消息
// 4. 适合任务分发场景
}
// 传统 JMS Queue 实现
class TraditionalQueue {
Queue<Message> queue = new LinkedBlockingQueue<>();
// 生产者发送
public void send(Message msg) {
queue.offer(msg);
}
// 消费者接收(竞争)
public Message receive() {
return queue.poll(); // 取出即删除
}
}
}
点对点模型示意图:
┌──────────┐
Producer ──> Queue ──> Consumer1 (获得消息)
└──────────┘
──> Consumer2 (没有获得)
──> Consumer3 (没有获得)
特点:消息被其中一个消费者消费后,其他消费者无法再消费
1.2 发布订阅模型(Publish-Subscribe)
public class PubSubModel {
// 发布订阅模型特征
class Characteristics {
// 1. 一条消息可以被多个订阅者消费
// 2. 每个订阅者都能收到完整消息副本
// 3. 消息广播给所有订阅者
// 4. 适合事件通知场景
}
// 传统 JMS Topic 实现
class TraditionalTopic {
List<Subscriber> subscribers = new ArrayList<>();
// 发布消息
public void publish(Message msg) {
// 广播给所有订阅者
for (Subscriber sub : subscribers) {
sub.onMessage(msg.copy());
}
}
// 订阅主题
public void subscribe(Subscriber sub) {
subscribers.add(sub);
}
}
}
发布订阅模型示意图:
┌──> Consumer1 (收到消息)
┌────────┐ │
Producer ──> Topic ──┼──> Consumer2 (收到消息)
└────────┘ │
└──> Consumer3 (收到消息)
特点:每个订阅者都能收到消息的完整副本
二、RocketMQ 的统一模型
2.1 核心设计:Consumer Group
public class RocketMQModel {
// RocketMQ 的秘密:通过 Consumer Group 实现两种模式
class ConsumerGroup {
String groupName;
List<Consumer> consumers;
ConsumeMode mode; // 集群消费 or 广播消费
}
// 关键规则:
// 1. 同一 Consumer Group 内的消费者【分摊】消息(点对点)
// 2. 不同 Consumer Group 都能收到【全量】消息(发布订阅)
}
2.2 模式切换的魔法
public class ModelSwitching {
// 场景1:实现点对点模式
public void p2pMode() {
// 所有消费者使用【相同】的 Consumer Group
String GROUP = "SAME_GROUP";
Consumer consumer1 = new DefaultMQPushConsumer(GROUP);
Consumer consumer2 = new DefaultMQPushConsumer(GROUP);
Consumer consumer3 = new DefaultMQPushConsumer(GROUP);
// 结果:消息被分摊消费,每条消息只被其中一个消费
}
// 场景2:实现发布订阅模式
public void pubSubMode() {
// 每个消费者使用【不同】的 Consumer Group
Consumer consumer1 = new DefaultMQPushConsumer("GROUP_A");
Consumer consumer2 = new DefaultMQPushConsumer("GROUP_B");
Consumer consumer3 = new DefaultMQPushConsumer("GROUP_C");
// 结果:每个 Group 都收到全量消息
}
// 场景3:混合模式
public void mixedMode() {
// 订单服务组(2个实例)
Consumer order1 = new DefaultMQPushConsumer("ORDER_GROUP");
Consumer order2 = new DefaultMQPushConsumer("ORDER_GROUP");
// 库存服务组(3个实例)
Consumer stock1 = new DefaultMQPushConsumer("STOCK_GROUP");
Consumer stock2 = new DefaultMQPushConsumer("STOCK_GROUP");
Consumer stock3 = new DefaultMQPushConsumer("STOCK_GROUP");
// 结果:
// - ORDER_GROUP 内部:order1 和 order2 分摊消息
// - STOCK_GROUP 内部:stock1/2/3 分摊消息
// - 两个 GROUP 之间:都能收到全量消息
}
}
三、集群消费 vs 广播消费
3.1 集群消费(默认)
public class ClusterConsumeMode {
public void setup() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP");
// 设置集群消费模式(默认)
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("TOPIC", "*");
}
// 集群消费特点
class ClusteringCharacteristics {
// 1. 同组内消费者分摊消息
// 2. 消费进度存储在 Broker
// 3. 支持消费失败重试
// 4. 适合负载均衡场景
}
// 消费进度管理
class ProgressManagement {
// Broker 端统一管理消费进度
// 路径:{ROCKETMQ_HOME}/store/config/consumerOffset.json
{
"offsetTable": {
"TOPIC@GROUP": {
"0": 12345, // Queue0 消费到 12345
"1": 23456, // Queue1 消费到 23456
"2": 34567, // Queue2 消费到 34567
"3": 45678 // Queue3 消费到 45678
}
}
}
}
}
集群消费示意图:
Queue0 ──> Consumer1 ─┐
Queue1 ──> Consumer1 ─┤
Topic ──> ├─> GROUP_A (共享进度)
Queue2 ──> Consumer2 ─┤
Queue3 ──> Consumer2 ─┘
GROUP_A 的消费进度存储在 Broker,所有成员共享
3.2 广播消费
public class BroadcastConsumeMode {
public void setup() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP");
// 设置广播消费模式
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("TOPIC", "*");
}
// 广播消费特点
class BroadcastingCharacteristics {
// 1. 每个消费者都消费全量消息
// 2. 消费进度存储在本地
// 3. 不支持消费失败重试
// 4. 适合本地缓存更新场景
}
// 消费进度管理
class LocalProgress {
// 本地文件存储消费进度
// 路径:{user.home}/.rocketmq_offsets/{clientId}/{group}/offsets.json
{
"offsetTable": {
"TOPIC": {
"0": 99999, // 本实例 Queue0 进度
"1": 88888, // 本实例 Queue1 进度
"2": 77777, // 本实例 Queue2 进度
"3": 66666 // 本实例 Queue3 进度
}
}
}
}
}
广播消费示意图:
┌─> Consumer1 (消费全部消息,独立进度)
│
Topic ──> ────┼─> Consumer2 (消费全部消息,独立进度)
│
└─> Consumer3 (消费全部消息,独立进度)
每个 Consumer 独立维护自己的消费进度
四、消息分配策略
4.1 集群模式下的分配策略
public class AllocationStrategy {
// 1. 平均分配(默认)
class AllocateMessageQueueAveragely {
// 假设:4个Queue,2个Consumer
// Consumer1: Queue0, Queue1
// Consumer2: Queue2, Queue3
// 假设:5个Queue,2个Consumer
// Consumer1: Queue0, Queue1, Queue2
// Consumer2: Queue3, Queue4
}
// 2. 环形分配
class AllocateMessageQueueAveragelyByCircle {
// 假设:5个Queue,2个Consumer
// Consumer1: Queue0, Queue2, Queue4
// Consumer2: Queue1, Queue3
}
// 3. 机房就近分配
class AllocateMessageQueueByMachineRoom {
// 根据 Consumer 和 Broker 的机房位置分配
// 优先分配同机房的 Queue
}
// 4. 一致性Hash分配
class AllocateMessageQueueConsistentHash {
// 使用一致性Hash算法
// Consumer 上下线对其他 Consumer 影响最小
}
// 5. 自定义分配
class CustomAllocateStrategy implements AllocateMessageQueueStrategy {
@Override
public List<MessageQueue> allocate(
String consumerGroup,
String currentCID,
List<MessageQueue> mqAll,
List<String> cidAll) {
// 自定义分配逻辑
// 比如:VIP Consumer 分配更多 Queue
if (isVipConsumer(currentCID)) {
return allocateMoreQueues(mqAll, currentCID);
}
return allocateNormalQueues(mqAll, currentCID);
}
}
}
4.2 分配策略选择
public class StrategySelection {
public void selectStrategy() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
// 设置分配策略
consumer.setAllocateMessageQueueStrategy(
new AllocateMessageQueueAveragely()
);
// 根据场景选择
class ScenarioBasedSelection {
// 场景1:消费者能力相同 → 平均分配
// 场景2:跨机房部署 → 机房就近
// 场景3:Consumer频繁上下线 → 一致性Hash
// 场景4:Consumer能力不同 → 自定义策略
}
}
}
五、实战场景分析
5.1 订单处理系统
public class OrderProcessingSystem {
// 需求:订单需要被多个系统处理,但每个系统只处理一次
// 订单服务(2个实例,负载均衡)
class OrderService {
void startConsumers() {
// 使用相同 Group,实现负载均衡
Consumer instance1 = createConsumer("ORDER_SERVICE_GROUP");
Consumer instance2 = createConsumer("ORDER_SERVICE_GROUP");
// 结果:订单在两个实例间负载均衡
}
}
// 库存服务(3个实例,负载均衡)
class StockService {
void startConsumers() {
// 使用相同 Group,实现负载均衡
Consumer instance1 = createConsumer("STOCK_SERVICE_GROUP");
Consumer instance2 = createConsumer("STOCK_SERVICE_GROUP");
Consumer instance3 = createConsumer("STOCK_SERVICE_GROUP");
// 结果:订单在三个实例间负载均衡
}
}
// 积分服务(1个实例)
class PointService {
void startConsumer() {
Consumer instance = createConsumer("POINT_SERVICE_GROUP");
// 结果:所有订单都由这个实例处理
}
}
// 效果:
// 1. 每个订单被 ORDER_SERVICE_GROUP 处理一次
// 2. 每个订单被 STOCK_SERVICE_GROUP 处理一次
// 3. 每个订单被 POINT_SERVICE_GROUP 处理一次
}
5.2 配置更新系统
public class ConfigUpdateSystem {
// 需求:配置更新需要通知所有服务实例
class ConfigUpdateConsumer {
void setup() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CONFIG_GROUP");
// 使用广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.subscribe("CONFIG_UPDATE_TOPIC", "*");
consumer.registerMessageListener((List<MessageExt> msgs, context) -> {
for (MessageExt msg : msgs) {
// 更新本地配置
updateLocalConfig(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
}
}
// 效果:
// 1. 所有服务实例都收到配置更新
// 2. 每个实例独立更新自己的配置
// 3. 某个实例重启不影响其他实例
}
5.3 日志收集系统
public class LogCollectionSystem {
// 需求:大量日志需要收集,要求高吞吐
class LogCollector {
void setup() {
// 创建多个消费者,使用同一个 Group
String GROUP = "LOG_COLLECTOR_GROUP";
// 部署10个消费者实例
for (int i = 0; i < 10; i++) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP);
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(30);
consumer.setPullBatchSize(64); // 批量拉取
consumer.subscribe("LOG_TOPIC", "*");
consumer.start();
}
// 效果:
// 1. 10个实例并行消费,提高吞吐量
// 2. 自动负载均衡
// 3. 某个实例故障,其他实例自动接管
}
}
}
六、模型对比与选择
6.1 不同MQ产品的模型对比
消息模型对比:
┌─────────────┬──────────────┬──────────────┬──────────────┐
│ 产品 │ 模型 │ 实现方式 │ 特点 │
├─────────────┼──────────────┼──────────────┼──────────────┤
│ RabbitMQ │ AMQP │ Exchange │ 灵活但复杂 │
│ ActiveMQ │ JMS │ Queue/Topic │ 标准但死板 │
│ Kafka │ 发布订阅 │ Partition │ 简单但单一 │
│ RocketMQ │ 统一模型 │ ConsumerGroup│ 灵活且简单 │
└─────────────┴──────────────┴──────────────┴──────────────┘
6.2 选择建议
public class ModelSelectionGuide {
// 使用集群消费(点对点)的场景
class UseClusterMode {
// ✅ 任务分发
// ✅ 负载均衡
// ✅ 高可用(故障转移)
// ✅ 需要消费确认和重试
void example() {
// 订单处理、支付处理、数据同步
}
}
// 使用广播消费的场景
class UseBroadcastMode {
// ✅ 本地缓存更新
// ✅ 配置刷新
// ✅ 全量数据同步
// ✅ 实时通知
void example() {
// 配置更新、缓存刷新、实时推送
}
}
// 混合使用的场景
class UseMixedMode {
// 不同业务使用不同 Group
// 同一业务的多实例使用相同 Group
void example() {
// 电商系统:订单服务组、库存服务组、积分服务组
// 每个组内负载均衡,组间相互独立
}
}
}
七、高级特性
7.1 消费进度重置
public class ConsumeProgressReset {
// 重置消费进度的场景
// 1. 需要重新消费历史消息
// 2. 跳过大量积压消息
// 3. 修复消费进度异常
public void resetProgress() {
// 方式1:通过管理工具
// sh mqadmin resetOffset -n localhost:9876 -g GROUP -t TOPIC -o 0
// 方式2:通过代码
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
// CONSUME_FROM_FIRST_OFFSET: 从最早消息开始
// CONSUME_FROM_LAST_OFFSET: 从最新消息开始
// CONSUME_FROM_TIMESTAMP: 从指定时间开始
}
}
7.2 消费并行度调优
public class ConsumerConcurrency {
public void tuneConcurrency() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
// 并发消费
consumer.setConsumeThreadMin(20); // 最小线程数
consumer.setConsumeThreadMax(64); // 最大线程数
// 顺序消费(单线程)
consumer.setConsumeThreadMax(1);
// 批量消费
consumer.setConsumeMessageBatchMaxSize(10); // 批量大小
consumer.setPullBatchSize(32); // 拉取批量
// 限流
consumer.setPullThresholdForQueue(1000); // 队列级别流控
consumer.setPullThresholdForTopic(5000); // Topic级别流控
}
}
八、常见问题
8.1 消息重复消费
// 问题:同一条消息被消费多次
// 原因:
// 1. 消费超时导致重试
// 2. Rebalance 导致重新分配
// 3. 消费者异常退出
// 解决方案:幂等处理
class IdempotentConsumer {
private Set<String> processedMsgIds = new ConcurrentHashSet<>();
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs) {
for (MessageExt msg : msgs) {
String msgId = msg.getMsgId();
// 幂等检查
if (!processedMsgIds.add(msgId)) {
// 已处理过,跳过
continue;
}
// 处理消息
processMessage(msg);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
8.2 消费不均衡
// 问题:某些 Consumer 消费很多,某些很少
// 原因:
// 1. Queue 数量 < Consumer 数量
// 2. 消息分布不均
// 3. Consumer 处理能力不同
// 解决方案:
class BalanceSolution {
void solution() {
// 1. 调整 Queue 数量
// Queue数量 >= Consumer数量
// 2. 使用合适的分配策略
consumer.setAllocateMessageQueueStrategy(
new AllocateMessageQueueConsistentHash()
);
// 3. 监控并动态调整
// 监控每个 Consumer 的消费TPS
// 动态调整线程池大小
}
}
九、总结
通过本篇,我们深入理解了:
- 两种基本模型:点对点 vs 发布订阅的本质区别
- RocketMQ 的创新:通过 Consumer Group 统一两种模型
- 集群 vs 广播:两种消费模式的特点和适用场景
- 实战应用:如何根据业务选择合适的模型
RocketMQ 的消息模型设计充分体现了"简单即美"的设计哲学,用最简单的概念实现了最灵活的功能。
下一篇预告
理解了消息模型,下一篇我们将深入剖析消费模式:Push vs Pull 的实现原理,以及 RocketMQ 为什么选择"伪 Push"的设计。
思考与实践:
- 为什么 RocketMQ 不像 JMS 那样明确区分 Queue 和 Topic?
- 在你的业务场景中,哪些适合集群消费,哪些适合广播消费?
- 如何设计 Consumer Group 来满足复杂的业务需求?
动手实践,在理解中成长!