RocketMQ入门06:核心概念详解(下)- Topic、Queue、Tag、Key
引言:消息的"地址系统" 如果把 RocketMQ 比作快递系统,那么: Topic 是省份(大的分类) Queue 是城市(负载均衡单元) Tag 是街道(消息过滤) Key 是门牌号(精确查找) 理解这套"地址系统",才能高效地组织和管理消息。 一、Topic(主题)- 消息的逻辑分类 1.1 Topic 是什么? // Topic 是消息的一级分类,是逻辑概念 public class TopicConcept { // Topic 的本质:消息类型的逻辑分组 String ORDER_TOPIC = "ORDER_TOPIC"; // 订单消息 String PAYMENT_TOPIC = "PAYMENT_TOPIC"; // 支付消息 String LOGISTICS_TOPIC = "LOGISTICS_TOPIC"; // 物流消息 // 一个 Topic 包含多个 Queue class TopicStructure { String topicName; int readQueueNums = 4; // 读队列数量 int writeQueueNums = 4; // 写队列数量 int perm = 6; // 权限:2-写 4-读 6-读写 // Topic 在多个 Broker 上的分布 Map<String/* brokerName */, List<MessageQueue>> brokerQueues; } } 1.2 Topic 的设计原则 public class TopicDesignPrinciples { // 原则1:按业务领域划分 class BusinessDomain { // ✅ 推荐:清晰的业务边界 String TRADE_TOPIC = "TRADE_TOPIC"; String USER_TOPIC = "USER_TOPIC"; String INVENTORY_TOPIC = "INVENTORY_TOPIC"; // ❌ 不推荐:过于宽泛 String ALL_MESSAGE_TOPIC = "ALL_MESSAGE_TOPIC"; } // 原则2:考虑消息特性 class MessageCharacteristics { // 按消息量级区分 String HIGH_TPS_TOPIC = "LOG_TOPIC"; // 高吞吐 String LOW_TPS_TOPIC = "ALERT_TOPIC"; // 低吞吐 // 按消息大小区分 String BIG_MSG_TOPIC = "FILE_TOPIC"; // 大消息 String SMALL_MSG_TOPIC = "EVENT_TOPIC"; // 小消息 // 按重要程度区分 String CRITICAL_TOPIC = "PAYMENT_TOPIC"; // 核心业务 String NORMAL_TOPIC = "STAT_TOPIC"; // 一般业务 } // 原则3:隔离不同优先级 class PriorityIsolation { // 不同优先级使用不同 Topic String ORDER_HIGH_PRIORITY = "ORDER_HIGH_TOPIC"; String ORDER_NORMAL_PRIORITY = "ORDER_NORMAL_TOPIC"; String ORDER_LOW_PRIORITY = "ORDER_LOW_TOPIC"; } } 1.3 Topic 创建和管理 # 自动创建(不推荐生产使用) autoCreateTopicEnable=true defaultTopicQueueNums=4 # 手动创建(推荐) sh mqadmin updateTopic \ -n localhost:9876 \ -t ORDER_TOPIC \ -c DefaultCluster \ -r 8 \ # 读队列数量 -w 8 \ # 写队列数量 -p 6 # 权限 # 查看 Topic 列表 sh mqadmin topicList -n localhost:9876 # 查看 Topic 详情 sh mqadmin topicStatus -n localhost:9876 -t ORDER_TOPIC 二、Queue(队列)- 负载均衡的基本单元 2.1 Queue 的本质 public class QueueConcept { // Queue 是 Topic 的物理分片 class MessageQueue { private String topic; // 所属 Topic private String brokerName; // 所在 Broker private int queueId; // 队列 ID @Override public String toString() { // ORDER_TOPIC@broker-a@0 return topic + "@" + brokerName + "@" + queueId; } } // Topic 与 Queue 的关系 class TopicQueueRelation { // 1个 Topic = N 个 Queue // 例如:ORDER_TOPIC 有 8 个 Queue // broker-a: queue0, queue1, queue2, queue3 // broker-b: queue0, queue1, queue2, queue3 Map<String, Integer> distribution = new HashMap<>(); { distribution.put("broker-a", 4); distribution.put("broker-b", 4); // Total: 8 Queues } } } 2.2 Queue 的负载均衡 public class QueueLoadBalance { // 生产者端:选择 Queue 发送 public MessageQueue selectQueue() { List<MessageQueue> queues = getQueues("ORDER_TOPIC"); // 策略1:轮询(默认) int index = sendWhichQueue.incrementAndGet() % queues.size(); return queues.get(index); // 策略2:根据业务 Key 路由 String orderId = "12345"; int index = orderId.hashCode() % queues.size(); return queues.get(index); // 策略3:最小延迟 return selectQueueByLatency(queues); } // 消费者端:分配 Queue 消费 public List<MessageQueue> allocateQueues() { // 假设:8个 Queue,3个 Consumer // Consumer0: queue0, queue1, queue2 // Consumer1: queue3, queue4, queue5 // Consumer2: queue6, queue7 return AllocateStrategy.allocate( allQueues, currentConsumerId, allConsumerIds ); } } 2.3 Queue 数量设计 public class QueueNumberDesign { // Queue 数量考虑因素 class Factors { // 1. 并发度 // Queue 数量 = 预期 TPS / 单 Queue TPS // 例如:10万 TPS / 1万 TPS = 10个 Queue // 2. Consumer 数量 // Queue 数量 >= Consumer 数量 // 否则有 Consumer 闲置 // 3. Broker 数量 // Queue 数量 = Broker 数量 × 单 Broker Queue 数 // 例如:2个 Broker × 4 = 8个 Queue } // 动态调整 Queue 数量 public void adjustQueueNumber() { // 增加 Queue(在线扩容) updateTopic("ORDER_TOPIC", readQueues=16, writeQueues=16); // 减少 Queue(需谨慎) // 1. 先减少写队列 updateTopic("ORDER_TOPIC", readQueues=16, writeQueues=8); // 2. 等待消费完成 waitForConsume(); // 3. 再减少读队列 updateTopic("ORDER_TOPIC", readQueues=8, writeQueues=8); } } 三、Tag(标签)- 消息的二级分类 3.1 Tag 的作用 public class TagUsage { // Tag 用于同一 Topic 下的消息细分 public void sendWithTag() { // 订单 Topic 下的不同消息类型 Message createOrder = new Message("ORDER_TOPIC", "CREATE", body); Message payOrder = new Message("ORDER_TOPIC", "PAY", body); Message cancelOrder = new Message("ORDER_TOPIC", "CANCEL", body); Message refundOrder = new Message("ORDER_TOPIC", "REFUND", body); } // Tag vs Topic 的选择 class TagVsTopic { // 使用 Tag 的场景: // 1. 消息类型相关性强 // 2. 需要统一管理 // 3. 消费者可能订阅多种类型 // ✅ 推荐:相关业务用 Tag 区分 void goodPractice() { // 订单的不同状态用 Tag new Message("ORDER_TOPIC", "CREATED", body); new Message("ORDER_TOPIC", "PAID", body); new Message("ORDER_TOPIC", "SHIPPED", body); } // ❌ 不推荐:无关业务用同一 Topic void badPractice() { // 订单和用户混在一起 new Message("BUSINESS_TOPIC", "ORDER_CREATE", body); new Message("BUSINESS_TOPIC", "USER_REGISTER", body); } } } 3.2 Tag 过滤机制 public class TagFiltering { // 消费者订阅 public void subscribeWithTag() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 订阅所有 Tag consumer.subscribe("ORDER_TOPIC", "*"); // 订阅单个 Tag consumer.subscribe("ORDER_TOPIC", "CREATE"); // 订阅多个 Tag(|| 分隔) consumer.subscribe("ORDER_TOPIC", "CREATE || PAY || CANCEL"); // SQL92 表达式(需要 Broker 开启) consumer.subscribe("ORDER_TOPIC", MessageSelector.bySql("amount > 1000 AND type = 'GOLD'")); } // Tag 过滤原理 class FilterPrinciple { // 1. Broker 端过滤(粗过滤) // 根据 Tag HashCode 快速过滤 // 在 ConsumeQueue 中存储了 Tag HashCode // 2. Consumer 端过滤(精过滤) // 再次验证 Tag 字符串 // 防止 Hash 冲突导致的误判 public boolean filterMessage(MessageExt msg, String subscribeTags) { // Broker 端:HashCode 匹配 long tagCode = msg.getTagsCode(); if (!isMatched(tagCode, subscribeTagCodes)) { return false; } // Consumer 端:字符串匹配 String msgTag = msg.getTags(); return isTagMatched(msgTag, subscribeTags); } } } 3.3 Tag 最佳实践 public class TagBestPractices { // 1. Tag 命名规范 class TagNaming { // 使用大写字母和下划线 String GOOD_TAG = "ORDER_CREATED"; // 避免特殊字符 String BAD_TAG = "order-created@2023"; // 避免 // 保持简短有意义 String CONCISE = "PAY_SUCCESS"; String VERBOSE = "ORDER_PAYMENT_SUCCESS_WITH_DISCOUNT"; // 太长 } // 2. Tag 使用策略 class TagStrategy { // 按业务流程设计 String[] ORDER_FLOW = { "CREATED", // 订单创建 "PAID", // 支付完成 "SHIPPED", // 已发货 "RECEIVED", // 已收货 "COMPLETED" // 已完成 }; // 按事件类型设计 String[] EVENT_TYPE = { "INSERT", // 新增 "UPDATE", // 更新 "DELETE" // 删除 }; } } 四、Key - 消息的业务标识 4.1 Key 的用途 public class KeyUsage { // Key 用于消息的业务标识和查询 public void sendWithKey() { Message msg = new Message("ORDER_TOPIC", "CREATE", body); // 设置业务唯一标识 msg.setKeys("ORDER_12345"); // 设置多个 Key(空格分隔) msg.setKeys("ORDER_12345 USER_67890"); producer.send(msg); } // Key 的主要用途 class KeyPurpose { // 1. 消息查询 void queryByKey() { // 通过 Key 查询消息 QueryResult result = mqadmin.queryMsgByKey("ORDER_TOPIC", "ORDER_12345"); } // 2. 去重判断 void deduplication(String key) { if (processedKeys.contains(key)) { // 已处理,跳过 return; } processMessage(); processedKeys.add(key); } // 3. 业务追踪 void traceOrder(String orderId) { // 追踪订单的所有消息 List<Message> orderMessages = queryAllMessagesByKey(orderId); } } } 4.2 Key 的索引机制 public class KeyIndexing { // IndexFile 结构 class IndexFile { // 通过 Key 建立索引 // 存储格式:Key Hash -> Physical Offset private IndexHeader header; // 文件头 private SlotTable slotTable; // Hash 槽 private IndexLinkedList indexes; // 索引链表 public void putKey(String key, long offset) { int keyHash = hash(key); int slotPos = keyHash % 500_0000; // 500万个槽 // 存储索引 IndexItem item = new IndexItem(); item.setKeyHash(keyHash); item.setPhyOffset(offset); item.setTimeDiff(storeTimestamp - beginTimestamp); // 处理 Hash 冲突(链表) item.setSlotValue(slotTable.get(slotPos)); slotTable.put(slotPos, indexes.add(item)); } } } 4.3 Key 设计建议 public class KeyDesignSuggestions { // 1. Key 的选择 class KeySelection { // ✅ 好的 Key:业务唯一标识 String orderId = "ORDER_2023110901234"; String userId = "USER_123456"; String transactionId = "TXN_9876543210"; // ❌ 不好的 Key:无业务含义 String uuid = UUID.randomUUID().toString(); String timestamp = String.valueOf(System.currentTimeMillis()); } // 2. 复合 Key class CompositeKey { public String generateKey(Order order) { // 组合多个维度 return String.format("%s_%s_%s", order.getOrderId(), order.getUserId(), order.getCreateTime() ); } } // 3. Key 规范 class KeySpecification { // 长度控制 static final int MAX_KEY_LENGTH = 128; // 字符限制 static final String KEY_PATTERN = "^[a-zA-Z0-9_-]+$"; // 避免特殊字符 public String sanitizeKey(String key) { return key.replaceAll("[^a-zA-Z0-9_-]", "_"); } } } 五、消息属性(Properties) 5.1 系统属性 vs 用户属性 public class MessageProperties { // 系统属性(RocketMQ 内部使用) class SystemProperties { String UNIQ_KEY = "__UNIQ_KEY__"; // 消息唯一标识 String MAX_OFFSET = "__MAX_OFFSET__"; // 最大偏移量 String MIN_OFFSET = "__MIN_OFFSET__"; // 最小偏移量 String TRANSACTION_ID = "__TRANSACTION_ID__"; // 事务ID String DELAY_LEVEL = "__DELAY_LEVEL__"; // 延迟级别 } // 用户自定义属性 public void setUserProperties() { Message msg = new Message("TOPIC", "TAG", body); // 添加业务属性 msg.putUserProperty("orderType", "ONLINE"); msg.putUserProperty("paymentMethod", "CREDIT_CARD"); msg.putUserProperty("region", "EAST"); msg.putUserProperty("vipLevel", "GOLD"); // SQL92 过滤时使用 consumer.subscribe("TOPIC", MessageSelector.bySql("vipLevel = 'GOLD' AND region = 'EAST'")); } } 5.2 属性的应用场景 public class PropertiesUseCases { // 1. 消息路由 public void routeByProperty(Message msg) { String region = msg.getUserProperty("region"); // 根据地区路由到不同集群 if ("NORTH".equals(region)) { sendToNorthCluster(msg); } else { sendToSouthCluster(msg); } } // 2. 监控统计 public void monitoring(Message msg) { // 统计不同类型订单 String orderType = msg.getUserProperty("orderType"); metrics.increment("order.count", "type", orderType); } // 3. 灰度发布 public void grayRelease(Message msg) { msg.putUserProperty("version", "2.0"); msg.putUserProperty("gray", "true"); msg.putUserProperty("percentage", "10"); // 10% 流量 } } 六、组合使用示例 6.1 电商订单场景 public class ECommerceExample { public void processOrder(Order order) { // Topic:按业务领域 String topic = "ORDER_TOPIC"; // Tag:按订单状态 String tag = getOrderTag(order.getStatus()); // Key:订单号(用于查询) String key = order.getOrderId(); // 构造消息 Message msg = new Message(topic, tag, key, JSON.toJSONBytes(order)); // 添加属性(用于过滤和统计) msg.putUserProperty("userId", order.getUserId()); msg.putUserProperty("amount", String.valueOf(order.getAmount())); msg.putUserProperty("payMethod", order.getPayMethod()); msg.putUserProperty("orderType", order.getType()); // 发送消息 SendResult result = producer.send(msg); } private String getOrderTag(OrderStatus status) { switch (status) { case CREATED: return "ORDER_CREATED"; case PAID: return "ORDER_PAID"; case SHIPPED: return "ORDER_SHIPPED"; case COMPLETED: return "ORDER_COMPLETED"; case CANCELLED: return "ORDER_CANCELLED"; default: return "ORDER_UNKNOWN"; } } } 6.2 消费者订阅策略 public class ConsumerStrategy { // 场景1:订阅所有订单消息 class AllOrderConsumer { public void subscribe() { consumer.subscribe("ORDER_TOPIC", "*"); } } // 场景2:只处理支付相关 class PaymentConsumer { public void subscribe() { consumer.subscribe("ORDER_TOPIC", "ORDER_PAID || ORDER_REFUND"); } } // 场景3:VIP 订单处理 class VipOrderConsumer { public void subscribe() { consumer.subscribe("ORDER_TOPIC", MessageSelector.bySql("orderType = 'VIP' AND amount > 1000")); } } // 场景4:特定用户订单 class UserOrderConsumer { public void consumeMessage(MessageExt msg) { String userId = msg.getUserProperty("userId"); if ("VIP_USER_123".equals(userId)) { // 特殊处理 VIP 用户订单 handleVipOrder(msg); } else { // 普通处理 handleNormalOrder(msg); } } } } 七、设计原则总结 7.1 层级设计原则 消息层级设计: ┌─────────────────────────────────┐ │ Topic │ <- 业务大类 ├─────────────────────────────────┤ │ Queue1 Queue2 Queue3 │ <- 负载均衡 ├─────────────────────────────────┤ │ Tag1 Tag2 Tag3 │ <- 消息子类 ├─────────────────────────────────┤ │ Key (业务ID) │ <- 消息标识 ├─────────────────────────────────┤ │ Properties (属性) │ <- 扩展信息 └─────────────────────────────────┘ 7.2 最佳实践建议 public class BestPractices { // 1. Topic 设计:粗粒度 // 一个业务领域一个 Topic // 避免创建过多 Topic // 2. Queue 设计:适度 // Queue 数量 = max(预期并发数, Consumer数量) // 一般 8-16 个即可 // 3. Tag 设计:细粒度 // 同一 Topic 下的不同消息类型 // 便于消费者灵活订阅 // 4. Key 设计:唯一性 // 使用业务唯一标识 // 便于消息追踪和查询 // 5. Properties:扩展性 // 存储额外的业务信息 // 支持 SQL92 复杂过滤 } 八、常见问题 8.1 Topic 过多的问题 // 问题:Topic 过多导致的影响 // 1. 路由信息膨胀 // 2. 内存占用增加 // 3. 管理复杂度上升 // 解决方案:合并相关 Topic,用 Tag 区分 // Before: USER_REGISTER_TOPIC, USER_UPDATE_TOPIC, USER_DELETE_TOPIC // After: USER_TOPIC + Tags(REGISTER, UPDATE, DELETE) 8.2 Queue 数量不当 // 问题:Queue 太少 // - Consumer 空闲 // - 并发度不够 // 问题:Queue 太多 // - 管理开销大 // - Rebalance 频繁 // 解决:动态调整 // 监控 Queue 积压和 Consumer 负载 // 根据实际情况调整 Queue 数量 九、总结 理解了 Topic、Queue、Tag、Key 这些核心概念,我们就掌握了 RocketMQ 的消息组织方式: ...