RocketMQ入门07:消息模型深入 - 点对点vs发布订阅

引言:一个模型,两种模式 传统消息系统通常要在两种模型中二选一: JMS:明确区分 Queue(点对点)和 Topic(发布订阅) AMQP:通过 Exchange 和 Binding 实现不同模式 Kafka:只有 Topic,通过 Consumer Group 实现不同语义 而 RocketMQ 的独特之处在于:用一套模型,同时支持两种模式。 让我们深入理解这是如何实现的。 一、两种基本消息模型 1.1 点对点模型(Point-to-Point) public class P2PModel { // 点对点模型特征 class Characteristics { // 1. 一条消息只能被一个消费者消费 // 2. 消息被消费后从队列中删除 // 3. 多个消费者竞争消息 // 4. 适合任务分发场景 } // 传统 JMS Queue 实现 class TraditionalQueue { Queue<Message> queue = new LinkedBlockingQueue<>(); // 生产者发送 public void send(Message msg) { queue.offer(msg); } // 消费者接收(竞争) public Message receive() { return queue.poll(); // 取出即删除 } } } 点对点模型示意图: ┌──────────┐ Producer ──> Queue ──> Consumer1 (获得消息) └──────────┘ ──> Consumer2 (没有获得) ──> Consumer3 (没有获得) 特点:消息被其中一个消费者消费后,其他消费者无法再消费 1.2 发布订阅模型(Publish-Subscribe) public class PubSubModel { // 发布订阅模型特征 class Characteristics { // 1. 一条消息可以被多个订阅者消费 // 2. 每个订阅者都能收到完整消息副本 // 3. 消息广播给所有订阅者 // 4. 适合事件通知场景 } // 传统 JMS Topic 实现 class TraditionalTopic { List<Subscriber> subscribers = new ArrayList<>(); // 发布消息 public void publish(Message msg) { // 广播给所有订阅者 for (Subscriber sub : subscribers) { sub.onMessage(msg.copy()); } } // 订阅主题 public void subscribe(Subscriber sub) { subscribers.add(sub); } } } 发布订阅模型示意图: ┌──> Consumer1 (收到消息) ┌────────┐ │ Producer ──> Topic ──┼──> Consumer2 (收到消息) └────────┘ │ └──> Consumer3 (收到消息) 特点:每个订阅者都能收到消息的完整副本 二、RocketMQ 的统一模型 2.1 核心设计:Consumer Group public class RocketMQModel { // RocketMQ 的秘密:通过 Consumer Group 实现两种模式 class ConsumerGroup { String groupName; List<Consumer> consumers; ConsumeMode mode; // 集群消费 or 广播消费 } // 关键规则: // 1. 同一 Consumer Group 内的消费者【分摊】消息(点对点) // 2. 不同 Consumer Group 都能收到【全量】消息(发布订阅) } 2.2 模式切换的魔法 public class ModelSwitching { // 场景1:实现点对点模式 public void p2pMode() { // 所有消费者使用【相同】的 Consumer Group String GROUP = "SAME_GROUP"; Consumer consumer1 = new DefaultMQPushConsumer(GROUP); Consumer consumer2 = new DefaultMQPushConsumer(GROUP); Consumer consumer3 = new DefaultMQPushConsumer(GROUP); // 结果:消息被分摊消费,每条消息只被其中一个消费 } // 场景2:实现发布订阅模式 public void pubSubMode() { // 每个消费者使用【不同】的 Consumer Group Consumer consumer1 = new DefaultMQPushConsumer("GROUP_A"); Consumer consumer2 = new DefaultMQPushConsumer("GROUP_B"); Consumer consumer3 = new DefaultMQPushConsumer("GROUP_C"); // 结果:每个 Group 都收到全量消息 } // 场景3:混合模式 public void mixedMode() { // 订单服务组(2个实例) Consumer order1 = new DefaultMQPushConsumer("ORDER_GROUP"); Consumer order2 = new DefaultMQPushConsumer("ORDER_GROUP"); // 库存服务组(3个实例) Consumer stock1 = new DefaultMQPushConsumer("STOCK_GROUP"); Consumer stock2 = new DefaultMQPushConsumer("STOCK_GROUP"); Consumer stock3 = new DefaultMQPushConsumer("STOCK_GROUP"); // 结果: // - ORDER_GROUP 内部:order1 和 order2 分摊消息 // - STOCK_GROUP 内部:stock1/2/3 分摊消息 // - 两个 GROUP 之间:都能收到全量消息 } } 三、集群消费 vs 广播消费 3.1 集群消费(默认) public class ClusterConsumeMode { public void setup() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP"); // 设置集群消费模式(默认) consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe("TOPIC", "*"); } // 集群消费特点 class ClusteringCharacteristics { // 1. 同组内消费者分摊消息 // 2. 消费进度存储在 Broker // 3. 支持消费失败重试 // 4. 适合负载均衡场景 } // 消费进度管理 class ProgressManagement { // Broker 端统一管理消费进度 // 路径:{ROCKETMQ_HOME}/store/config/consumerOffset.json { "offsetTable": { "TOPIC@GROUP": { "0": 12345, // Queue0 消费到 12345 "1": 23456, // Queue1 消费到 23456 "2": 34567, // Queue2 消费到 34567 "3": 45678 // Queue3 消费到 45678 } } } } } 集群消费示意图: Queue0 ──> Consumer1 ─┐ Queue1 ──> Consumer1 ─┤ Topic ──> ├─> GROUP_A (共享进度) Queue2 ──> Consumer2 ─┤ Queue3 ──> Consumer2 ─┘ GROUP_A 的消费进度存储在 Broker,所有成员共享 3.2 广播消费 public class BroadcastConsumeMode { public void setup() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP"); // 设置广播消费模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TOPIC", "*"); } // 广播消费特点 class BroadcastingCharacteristics { // 1. 每个消费者都消费全量消息 // 2. 消费进度存储在本地 // 3. 不支持消费失败重试 // 4. 适合本地缓存更新场景 } // 消费进度管理 class LocalProgress { // 本地文件存储消费进度 // 路径:{user.home}/.rocketmq_offsets/{clientId}/{group}/offsets.json { "offsetTable": { "TOPIC": { "0": 99999, // 本实例 Queue0 进度 "1": 88888, // 本实例 Queue1 进度 "2": 77777, // 本实例 Queue2 进度 "3": 66666 // 本实例 Queue3 进度 } } } } } 广播消费示意图: ┌─> Consumer1 (消费全部消息,独立进度) │ Topic ──> ────┼─> Consumer2 (消费全部消息,独立进度) │ └─> Consumer3 (消费全部消息,独立进度) 每个 Consumer 独立维护自己的消费进度 四、消息分配策略 4.1 集群模式下的分配策略 public class AllocationStrategy { // 1. 平均分配(默认) class AllocateMessageQueueAveragely { // 假设:4个Queue,2个Consumer // Consumer1: Queue0, Queue1 // Consumer2: Queue2, Queue3 // 假设:5个Queue,2个Consumer // Consumer1: Queue0, Queue1, Queue2 // Consumer2: Queue3, Queue4 } // 2. 环形分配 class AllocateMessageQueueAveragelyByCircle { // 假设:5个Queue,2个Consumer // Consumer1: Queue0, Queue2, Queue4 // Consumer2: Queue1, Queue3 } // 3. 机房就近分配 class AllocateMessageQueueByMachineRoom { // 根据 Consumer 和 Broker 的机房位置分配 // 优先分配同机房的 Queue } // 4. 一致性Hash分配 class AllocateMessageQueueConsistentHash { // 使用一致性Hash算法 // Consumer 上下线对其他 Consumer 影响最小 } // 5. 自定义分配 class CustomAllocateStrategy implements AllocateMessageQueueStrategy { @Override public List<MessageQueue> allocate( String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { // 自定义分配逻辑 // 比如:VIP Consumer 分配更多 Queue if (isVipConsumer(currentCID)) { return allocateMoreQueues(mqAll, currentCID); } return allocateNormalQueues(mqAll, currentCID); } } } 4.2 分配策略选择 public class StrategySelection { public void selectStrategy() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 设置分配策略 consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueAveragely() ); // 根据场景选择 class ScenarioBasedSelection { // 场景1:消费者能力相同 → 平均分配 // 场景2:跨机房部署 → 机房就近 // 场景3:Consumer频繁上下线 → 一致性Hash // 场景4:Consumer能力不同 → 自定义策略 } } } 五、实战场景分析 5.1 订单处理系统 public class OrderProcessingSystem { // 需求:订单需要被多个系统处理,但每个系统只处理一次 // 订单服务(2个实例,负载均衡) class OrderService { void startConsumers() { // 使用相同 Group,实现负载均衡 Consumer instance1 = createConsumer("ORDER_SERVICE_GROUP"); Consumer instance2 = createConsumer("ORDER_SERVICE_GROUP"); // 结果:订单在两个实例间负载均衡 } } // 库存服务(3个实例,负载均衡) class StockService { void startConsumers() { // 使用相同 Group,实现负载均衡 Consumer instance1 = createConsumer("STOCK_SERVICE_GROUP"); Consumer instance2 = createConsumer("STOCK_SERVICE_GROUP"); Consumer instance3 = createConsumer("STOCK_SERVICE_GROUP"); // 结果:订单在三个实例间负载均衡 } } // 积分服务(1个实例) class PointService { void startConsumer() { Consumer instance = createConsumer("POINT_SERVICE_GROUP"); // 结果:所有订单都由这个实例处理 } } // 效果: // 1. 每个订单被 ORDER_SERVICE_GROUP 处理一次 // 2. 每个订单被 STOCK_SERVICE_GROUP 处理一次 // 3. 每个订单被 POINT_SERVICE_GROUP 处理一次 } 5.2 配置更新系统 public class ConfigUpdateSystem { // 需求:配置更新需要通知所有服务实例 class ConfigUpdateConsumer { void setup() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CONFIG_GROUP"); // 使用广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("CONFIG_UPDATE_TOPIC", "*"); consumer.registerMessageListener((List<MessageExt> msgs, context) -> { for (MessageExt msg : msgs) { // 更新本地配置 updateLocalConfig(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); } } // 效果: // 1. 所有服务实例都收到配置更新 // 2. 每个实例独立更新自己的配置 // 3. 某个实例重启不影响其他实例 } 5.3 日志收集系统 public class LogCollectionSystem { // 需求:大量日志需要收集,要求高吞吐 class LogCollector { void setup() { // 创建多个消费者,使用同一个 Group String GROUP = "LOG_COLLECTOR_GROUP"; // 部署10个消费者实例 for (int i = 0; i < 10; i++) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP); consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(30); consumer.setPullBatchSize(64); // 批量拉取 consumer.subscribe("LOG_TOPIC", "*"); consumer.start(); } // 效果: // 1. 10个实例并行消费,提高吞吐量 // 2. 自动负载均衡 // 3. 某个实例故障,其他实例自动接管 } } } 六、模型对比与选择 6.1 不同MQ产品的模型对比 消息模型对比: ┌─────────────┬──────────────┬──────────────┬──────────────┐ │ 产品 │ 模型 │ 实现方式 │ 特点 │ ├─────────────┼──────────────┼──────────────┼──────────────┤ │ RabbitMQ │ AMQP │ Exchange │ 灵活但复杂 │ │ ActiveMQ │ JMS │ Queue/Topic │ 标准但死板 │ │ Kafka │ 发布订阅 │ Partition │ 简单但单一 │ │ RocketMQ │ 统一模型 │ ConsumerGroup│ 灵活且简单 │ └─────────────┴──────────────┴──────────────┴──────────────┘ 6.2 选择建议 public class ModelSelectionGuide { // 使用集群消费(点对点)的场景 class UseClusterMode { // ✅ 任务分发 // ✅ 负载均衡 // ✅ 高可用(故障转移) // ✅ 需要消费确认和重试 void example() { // 订单处理、支付处理、数据同步 } } // 使用广播消费的场景 class UseBroadcastMode { // ✅ 本地缓存更新 // ✅ 配置刷新 // ✅ 全量数据同步 // ✅ 实时通知 void example() { // 配置更新、缓存刷新、实时推送 } } // 混合使用的场景 class UseMixedMode { // 不同业务使用不同 Group // 同一业务的多实例使用相同 Group void example() { // 电商系统:订单服务组、库存服务组、积分服务组 // 每个组内负载均衡,组间相互独立 } } } 七、高级特性 7.1 消费进度重置 public class ConsumeProgressReset { // 重置消费进度的场景 // 1. 需要重新消费历史消息 // 2. 跳过大量积压消息 // 3. 修复消费进度异常 public void resetProgress() { // 方式1:通过管理工具 // sh mqadmin resetOffset -n localhost:9876 -g GROUP -t TOPIC -o 0 // 方式2:通过代码 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // CONSUME_FROM_FIRST_OFFSET: 从最早消息开始 // CONSUME_FROM_LAST_OFFSET: 从最新消息开始 // CONSUME_FROM_TIMESTAMP: 从指定时间开始 } } 7.2 消费并行度调优 public class ConsumerConcurrency { public void tuneConcurrency() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 并发消费 consumer.setConsumeThreadMin(20); // 最小线程数 consumer.setConsumeThreadMax(64); // 最大线程数 // 顺序消费(单线程) consumer.setConsumeThreadMax(1); // 批量消费 consumer.setConsumeMessageBatchMaxSize(10); // 批量大小 consumer.setPullBatchSize(32); // 拉取批量 // 限流 consumer.setPullThresholdForQueue(1000); // 队列级别流控 consumer.setPullThresholdForTopic(5000); // Topic级别流控 } } 八、常见问题 8.1 消息重复消费 // 问题:同一条消息被消费多次 // 原因: // 1. 消费超时导致重试 // 2. Rebalance 导致重新分配 // 3. 消费者异常退出 // 解决方案:幂等处理 class IdempotentConsumer { private Set<String> processedMsgIds = new ConcurrentHashSet<>(); public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs) { for (MessageExt msg : msgs) { String msgId = msg.getMsgId(); // 幂等检查 if (!processedMsgIds.add(msgId)) { // 已处理过,跳过 continue; } // 处理消息 processMessage(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } 8.2 消费不均衡 // 问题:某些 Consumer 消费很多,某些很少 // 原因: // 1. Queue 数量 < Consumer 数量 // 2. 消息分布不均 // 3. Consumer 处理能力不同 // 解决方案: class BalanceSolution { void solution() { // 1. 调整 Queue 数量 // Queue数量 >= Consumer数量 // 2. 使用合适的分配策略 consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueConsistentHash() ); // 3. 监控并动态调整 // 监控每个 Consumer 的消费TPS // 动态调整线程池大小 } } 九、总结 通过本篇,我们深入理解了: ...

2025-11-13 · maneng

RocketMQ入门06:核心概念详解(下)- Topic、Queue、Tag、Key

引言:消息的"地址系统" 如果把 RocketMQ 比作快递系统,那么: Topic 是省份(大的分类) Queue 是城市(负载均衡单元) Tag 是街道(消息过滤) Key 是门牌号(精确查找) 理解这套"地址系统",才能高效地组织和管理消息。 一、Topic(主题)- 消息的逻辑分类 1.1 Topic 是什么? // Topic 是消息的一级分类,是逻辑概念 public class TopicConcept { // Topic 的本质:消息类型的逻辑分组 String ORDER_TOPIC = "ORDER_TOPIC"; // 订单消息 String PAYMENT_TOPIC = "PAYMENT_TOPIC"; // 支付消息 String LOGISTICS_TOPIC = "LOGISTICS_TOPIC"; // 物流消息 // 一个 Topic 包含多个 Queue class TopicStructure { String topicName; int readQueueNums = 4; // 读队列数量 int writeQueueNums = 4; // 写队列数量 int perm = 6; // 权限:2-写 4-读 6-读写 // Topic 在多个 Broker 上的分布 Map<String/* brokerName */, List<MessageQueue>> brokerQueues; } } 1.2 Topic 的设计原则 public class TopicDesignPrinciples { // 原则1:按业务领域划分 class BusinessDomain { // ✅ 推荐:清晰的业务边界 String TRADE_TOPIC = "TRADE_TOPIC"; String USER_TOPIC = "USER_TOPIC"; String INVENTORY_TOPIC = "INVENTORY_TOPIC"; // ❌ 不推荐:过于宽泛 String ALL_MESSAGE_TOPIC = "ALL_MESSAGE_TOPIC"; } // 原则2:考虑消息特性 class MessageCharacteristics { // 按消息量级区分 String HIGH_TPS_TOPIC = "LOG_TOPIC"; // 高吞吐 String LOW_TPS_TOPIC = "ALERT_TOPIC"; // 低吞吐 // 按消息大小区分 String BIG_MSG_TOPIC = "FILE_TOPIC"; // 大消息 String SMALL_MSG_TOPIC = "EVENT_TOPIC"; // 小消息 // 按重要程度区分 String CRITICAL_TOPIC = "PAYMENT_TOPIC"; // 核心业务 String NORMAL_TOPIC = "STAT_TOPIC"; // 一般业务 } // 原则3:隔离不同优先级 class PriorityIsolation { // 不同优先级使用不同 Topic String ORDER_HIGH_PRIORITY = "ORDER_HIGH_TOPIC"; String ORDER_NORMAL_PRIORITY = "ORDER_NORMAL_TOPIC"; String ORDER_LOW_PRIORITY = "ORDER_LOW_TOPIC"; } } 1.3 Topic 创建和管理 # 自动创建(不推荐生产使用) autoCreateTopicEnable=true defaultTopicQueueNums=4 # 手动创建(推荐) sh mqadmin updateTopic \ -n localhost:9876 \ -t ORDER_TOPIC \ -c DefaultCluster \ -r 8 \ # 读队列数量 -w 8 \ # 写队列数量 -p 6 # 权限 # 查看 Topic 列表 sh mqadmin topicList -n localhost:9876 # 查看 Topic 详情 sh mqadmin topicStatus -n localhost:9876 -t ORDER_TOPIC 二、Queue(队列)- 负载均衡的基本单元 2.1 Queue 的本质 public class QueueConcept { // Queue 是 Topic 的物理分片 class MessageQueue { private String topic; // 所属 Topic private String brokerName; // 所在 Broker private int queueId; // 队列 ID @Override public String toString() { // ORDER_TOPIC@broker-a@0 return topic + "@" + brokerName + "@" + queueId; } } // Topic 与 Queue 的关系 class TopicQueueRelation { // 1个 Topic = N 个 Queue // 例如:ORDER_TOPIC 有 8 个 Queue // broker-a: queue0, queue1, queue2, queue3 // broker-b: queue0, queue1, queue2, queue3 Map<String, Integer> distribution = new HashMap<>(); { distribution.put("broker-a", 4); distribution.put("broker-b", 4); // Total: 8 Queues } } } 2.2 Queue 的负载均衡 public class QueueLoadBalance { // 生产者端:选择 Queue 发送 public MessageQueue selectQueue() { List<MessageQueue> queues = getQueues("ORDER_TOPIC"); // 策略1:轮询(默认) int index = sendWhichQueue.incrementAndGet() % queues.size(); return queues.get(index); // 策略2:根据业务 Key 路由 String orderId = "12345"; int index = orderId.hashCode() % queues.size(); return queues.get(index); // 策略3:最小延迟 return selectQueueByLatency(queues); } // 消费者端:分配 Queue 消费 public List<MessageQueue> allocateQueues() { // 假设:8个 Queue,3个 Consumer // Consumer0: queue0, queue1, queue2 // Consumer1: queue3, queue4, queue5 // Consumer2: queue6, queue7 return AllocateStrategy.allocate( allQueues, currentConsumerId, allConsumerIds ); } } 2.3 Queue 数量设计 public class QueueNumberDesign { // Queue 数量考虑因素 class Factors { // 1. 并发度 // Queue 数量 = 预期 TPS / 单 Queue TPS // 例如:10万 TPS / 1万 TPS = 10个 Queue // 2. Consumer 数量 // Queue 数量 >= Consumer 数量 // 否则有 Consumer 闲置 // 3. Broker 数量 // Queue 数量 = Broker 数量 × 单 Broker Queue 数 // 例如:2个 Broker × 4 = 8个 Queue } // 动态调整 Queue 数量 public void adjustQueueNumber() { // 增加 Queue(在线扩容) updateTopic("ORDER_TOPIC", readQueues=16, writeQueues=16); // 减少 Queue(需谨慎) // 1. 先减少写队列 updateTopic("ORDER_TOPIC", readQueues=16, writeQueues=8); // 2. 等待消费完成 waitForConsume(); // 3. 再减少读队列 updateTopic("ORDER_TOPIC", readQueues=8, writeQueues=8); } } 三、Tag(标签)- 消息的二级分类 3.1 Tag 的作用 public class TagUsage { // Tag 用于同一 Topic 下的消息细分 public void sendWithTag() { // 订单 Topic 下的不同消息类型 Message createOrder = new Message("ORDER_TOPIC", "CREATE", body); Message payOrder = new Message("ORDER_TOPIC", "PAY", body); Message cancelOrder = new Message("ORDER_TOPIC", "CANCEL", body); Message refundOrder = new Message("ORDER_TOPIC", "REFUND", body); } // Tag vs Topic 的选择 class TagVsTopic { // 使用 Tag 的场景: // 1. 消息类型相关性强 // 2. 需要统一管理 // 3. 消费者可能订阅多种类型 // ✅ 推荐:相关业务用 Tag 区分 void goodPractice() { // 订单的不同状态用 Tag new Message("ORDER_TOPIC", "CREATED", body); new Message("ORDER_TOPIC", "PAID", body); new Message("ORDER_TOPIC", "SHIPPED", body); } // ❌ 不推荐:无关业务用同一 Topic void badPractice() { // 订单和用户混在一起 new Message("BUSINESS_TOPIC", "ORDER_CREATE", body); new Message("BUSINESS_TOPIC", "USER_REGISTER", body); } } } 3.2 Tag 过滤机制 public class TagFiltering { // 消费者订阅 public void subscribeWithTag() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 订阅所有 Tag consumer.subscribe("ORDER_TOPIC", "*"); // 订阅单个 Tag consumer.subscribe("ORDER_TOPIC", "CREATE"); // 订阅多个 Tag(|| 分隔) consumer.subscribe("ORDER_TOPIC", "CREATE || PAY || CANCEL"); // SQL92 表达式(需要 Broker 开启) consumer.subscribe("ORDER_TOPIC", MessageSelector.bySql("amount > 1000 AND type = 'GOLD'")); } // Tag 过滤原理 class FilterPrinciple { // 1. Broker 端过滤(粗过滤) // 根据 Tag HashCode 快速过滤 // 在 ConsumeQueue 中存储了 Tag HashCode // 2. Consumer 端过滤(精过滤) // 再次验证 Tag 字符串 // 防止 Hash 冲突导致的误判 public boolean filterMessage(MessageExt msg, String subscribeTags) { // Broker 端:HashCode 匹配 long tagCode = msg.getTagsCode(); if (!isMatched(tagCode, subscribeTagCodes)) { return false; } // Consumer 端:字符串匹配 String msgTag = msg.getTags(); return isTagMatched(msgTag, subscribeTags); } } } 3.3 Tag 最佳实践 public class TagBestPractices { // 1. Tag 命名规范 class TagNaming { // 使用大写字母和下划线 String GOOD_TAG = "ORDER_CREATED"; // 避免特殊字符 String BAD_TAG = "order-created@2023"; // 避免 // 保持简短有意义 String CONCISE = "PAY_SUCCESS"; String VERBOSE = "ORDER_PAYMENT_SUCCESS_WITH_DISCOUNT"; // 太长 } // 2. Tag 使用策略 class TagStrategy { // 按业务流程设计 String[] ORDER_FLOW = { "CREATED", // 订单创建 "PAID", // 支付完成 "SHIPPED", // 已发货 "RECEIVED", // 已收货 "COMPLETED" // 已完成 }; // 按事件类型设计 String[] EVENT_TYPE = { "INSERT", // 新增 "UPDATE", // 更新 "DELETE" // 删除 }; } } 四、Key - 消息的业务标识 4.1 Key 的用途 public class KeyUsage { // Key 用于消息的业务标识和查询 public void sendWithKey() { Message msg = new Message("ORDER_TOPIC", "CREATE", body); // 设置业务唯一标识 msg.setKeys("ORDER_12345"); // 设置多个 Key(空格分隔) msg.setKeys("ORDER_12345 USER_67890"); producer.send(msg); } // Key 的主要用途 class KeyPurpose { // 1. 消息查询 void queryByKey() { // 通过 Key 查询消息 QueryResult result = mqadmin.queryMsgByKey("ORDER_TOPIC", "ORDER_12345"); } // 2. 去重判断 void deduplication(String key) { if (processedKeys.contains(key)) { // 已处理,跳过 return; } processMessage(); processedKeys.add(key); } // 3. 业务追踪 void traceOrder(String orderId) { // 追踪订单的所有消息 List<Message> orderMessages = queryAllMessagesByKey(orderId); } } } 4.2 Key 的索引机制 public class KeyIndexing { // IndexFile 结构 class IndexFile { // 通过 Key 建立索引 // 存储格式:Key Hash -> Physical Offset private IndexHeader header; // 文件头 private SlotTable slotTable; // Hash 槽 private IndexLinkedList indexes; // 索引链表 public void putKey(String key, long offset) { int keyHash = hash(key); int slotPos = keyHash % 500_0000; // 500万个槽 // 存储索引 IndexItem item = new IndexItem(); item.setKeyHash(keyHash); item.setPhyOffset(offset); item.setTimeDiff(storeTimestamp - beginTimestamp); // 处理 Hash 冲突(链表) item.setSlotValue(slotTable.get(slotPos)); slotTable.put(slotPos, indexes.add(item)); } } } 4.3 Key 设计建议 public class KeyDesignSuggestions { // 1. Key 的选择 class KeySelection { // ✅ 好的 Key:业务唯一标识 String orderId = "ORDER_2023110901234"; String userId = "USER_123456"; String transactionId = "TXN_9876543210"; // ❌ 不好的 Key:无业务含义 String uuid = UUID.randomUUID().toString(); String timestamp = String.valueOf(System.currentTimeMillis()); } // 2. 复合 Key class CompositeKey { public String generateKey(Order order) { // 组合多个维度 return String.format("%s_%s_%s", order.getOrderId(), order.getUserId(), order.getCreateTime() ); } } // 3. Key 规范 class KeySpecification { // 长度控制 static final int MAX_KEY_LENGTH = 128; // 字符限制 static final String KEY_PATTERN = "^[a-zA-Z0-9_-]+$"; // 避免特殊字符 public String sanitizeKey(String key) { return key.replaceAll("[^a-zA-Z0-9_-]", "_"); } } } 五、消息属性(Properties) 5.1 系统属性 vs 用户属性 public class MessageProperties { // 系统属性(RocketMQ 内部使用) class SystemProperties { String UNIQ_KEY = "__UNIQ_KEY__"; // 消息唯一标识 String MAX_OFFSET = "__MAX_OFFSET__"; // 最大偏移量 String MIN_OFFSET = "__MIN_OFFSET__"; // 最小偏移量 String TRANSACTION_ID = "__TRANSACTION_ID__"; // 事务ID String DELAY_LEVEL = "__DELAY_LEVEL__"; // 延迟级别 } // 用户自定义属性 public void setUserProperties() { Message msg = new Message("TOPIC", "TAG", body); // 添加业务属性 msg.putUserProperty("orderType", "ONLINE"); msg.putUserProperty("paymentMethod", "CREDIT_CARD"); msg.putUserProperty("region", "EAST"); msg.putUserProperty("vipLevel", "GOLD"); // SQL92 过滤时使用 consumer.subscribe("TOPIC", MessageSelector.bySql("vipLevel = 'GOLD' AND region = 'EAST'")); } } 5.2 属性的应用场景 public class PropertiesUseCases { // 1. 消息路由 public void routeByProperty(Message msg) { String region = msg.getUserProperty("region"); // 根据地区路由到不同集群 if ("NORTH".equals(region)) { sendToNorthCluster(msg); } else { sendToSouthCluster(msg); } } // 2. 监控统计 public void monitoring(Message msg) { // 统计不同类型订单 String orderType = msg.getUserProperty("orderType"); metrics.increment("order.count", "type", orderType); } // 3. 灰度发布 public void grayRelease(Message msg) { msg.putUserProperty("version", "2.0"); msg.putUserProperty("gray", "true"); msg.putUserProperty("percentage", "10"); // 10% 流量 } } 六、组合使用示例 6.1 电商订单场景 public class ECommerceExample { public void processOrder(Order order) { // Topic:按业务领域 String topic = "ORDER_TOPIC"; // Tag:按订单状态 String tag = getOrderTag(order.getStatus()); // Key:订单号(用于查询) String key = order.getOrderId(); // 构造消息 Message msg = new Message(topic, tag, key, JSON.toJSONBytes(order)); // 添加属性(用于过滤和统计) msg.putUserProperty("userId", order.getUserId()); msg.putUserProperty("amount", String.valueOf(order.getAmount())); msg.putUserProperty("payMethod", order.getPayMethod()); msg.putUserProperty("orderType", order.getType()); // 发送消息 SendResult result = producer.send(msg); } private String getOrderTag(OrderStatus status) { switch (status) { case CREATED: return "ORDER_CREATED"; case PAID: return "ORDER_PAID"; case SHIPPED: return "ORDER_SHIPPED"; case COMPLETED: return "ORDER_COMPLETED"; case CANCELLED: return "ORDER_CANCELLED"; default: return "ORDER_UNKNOWN"; } } } 6.2 消费者订阅策略 public class ConsumerStrategy { // 场景1:订阅所有订单消息 class AllOrderConsumer { public void subscribe() { consumer.subscribe("ORDER_TOPIC", "*"); } } // 场景2:只处理支付相关 class PaymentConsumer { public void subscribe() { consumer.subscribe("ORDER_TOPIC", "ORDER_PAID || ORDER_REFUND"); } } // 场景3:VIP 订单处理 class VipOrderConsumer { public void subscribe() { consumer.subscribe("ORDER_TOPIC", MessageSelector.bySql("orderType = 'VIP' AND amount > 1000")); } } // 场景4:特定用户订单 class UserOrderConsumer { public void consumeMessage(MessageExt msg) { String userId = msg.getUserProperty("userId"); if ("VIP_USER_123".equals(userId)) { // 特殊处理 VIP 用户订单 handleVipOrder(msg); } else { // 普通处理 handleNormalOrder(msg); } } } } 七、设计原则总结 7.1 层级设计原则 消息层级设计: ┌─────────────────────────────────┐ │ Topic │ <- 业务大类 ├─────────────────────────────────┤ │ Queue1 Queue2 Queue3 │ <- 负载均衡 ├─────────────────────────────────┤ │ Tag1 Tag2 Tag3 │ <- 消息子类 ├─────────────────────────────────┤ │ Key (业务ID) │ <- 消息标识 ├─────────────────────────────────┤ │ Properties (属性) │ <- 扩展信息 └─────────────────────────────────┘ 7.2 最佳实践建议 public class BestPractices { // 1. Topic 设计:粗粒度 // 一个业务领域一个 Topic // 避免创建过多 Topic // 2. Queue 设计:适度 // Queue 数量 = max(预期并发数, Consumer数量) // 一般 8-16 个即可 // 3. Tag 设计:细粒度 // 同一 Topic 下的不同消息类型 // 便于消费者灵活订阅 // 4. Key 设计:唯一性 // 使用业务唯一标识 // 便于消息追踪和查询 // 5. Properties:扩展性 // 存储额外的业务信息 // 支持 SQL92 复杂过滤 } 八、常见问题 8.1 Topic 过多的问题 // 问题:Topic 过多导致的影响 // 1. 路由信息膨胀 // 2. 内存占用增加 // 3. 管理复杂度上升 // 解决方案:合并相关 Topic,用 Tag 区分 // Before: USER_REGISTER_TOPIC, USER_UPDATE_TOPIC, USER_DELETE_TOPIC // After: USER_TOPIC + Tags(REGISTER, UPDATE, DELETE) 8.2 Queue 数量不当 // 问题:Queue 太少 // - Consumer 空闲 // - 并发度不够 // 问题:Queue 太多 // - 管理开销大 // - Rebalance 频繁 // 解决:动态调整 // 监控 Queue 积压和 Consumer 负载 // 根据实际情况调整 Queue 数量 九、总结 理解了 Topic、Queue、Tag、Key 这些核心概念,我们就掌握了 RocketMQ 的消息组织方式: ...

2025-11-13 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...