引言:消息的"地址系统"
如果把 RocketMQ 比作快递系统,那么:
- Topic 是省份(大的分类)
- Queue 是城市(负载均衡单元)
- Tag 是街道(消息过滤)
- Key 是门牌号(精确查找)
理解这套"地址系统",才能高效地组织和管理消息。
一、Topic(主题)- 消息的逻辑分类
1.1 Topic 是什么?
// Topic 是消息的一级分类,是逻辑概念
public class TopicConcept {
// Topic 的本质:消息类型的逻辑分组
String ORDER_TOPIC = "ORDER_TOPIC"; // 订单消息
String PAYMENT_TOPIC = "PAYMENT_TOPIC"; // 支付消息
String LOGISTICS_TOPIC = "LOGISTICS_TOPIC"; // 物流消息
// 一个 Topic 包含多个 Queue
class TopicStructure {
String topicName;
int readQueueNums = 4; // 读队列数量
int writeQueueNums = 4; // 写队列数量
int perm = 6; // 权限:2-写 4-读 6-读写
// Topic 在多个 Broker 上的分布
Map<String/* brokerName */, List<MessageQueue>> brokerQueues;
}
}
1.2 Topic 的设计原则
public class TopicDesignPrinciples {
// 原则1:按业务领域划分
class BusinessDomain {
// ✅ 推荐:清晰的业务边界
String TRADE_TOPIC = "TRADE_TOPIC";
String USER_TOPIC = "USER_TOPIC";
String INVENTORY_TOPIC = "INVENTORY_TOPIC";
// ❌ 不推荐:过于宽泛
String ALL_MESSAGE_TOPIC = "ALL_MESSAGE_TOPIC";
}
// 原则2:考虑消息特性
class MessageCharacteristics {
// 按消息量级区分
String HIGH_TPS_TOPIC = "LOG_TOPIC"; // 高吞吐
String LOW_TPS_TOPIC = "ALERT_TOPIC"; // 低吞吐
// 按消息大小区分
String BIG_MSG_TOPIC = "FILE_TOPIC"; // 大消息
String SMALL_MSG_TOPIC = "EVENT_TOPIC"; // 小消息
// 按重要程度区分
String CRITICAL_TOPIC = "PAYMENT_TOPIC"; // 核心业务
String NORMAL_TOPIC = "STAT_TOPIC"; // 一般业务
}
// 原则3:隔离不同优先级
class PriorityIsolation {
// 不同优先级使用不同 Topic
String ORDER_HIGH_PRIORITY = "ORDER_HIGH_TOPIC";
String ORDER_NORMAL_PRIORITY = "ORDER_NORMAL_TOPIC";
String ORDER_LOW_PRIORITY = "ORDER_LOW_TOPIC";
}
}
1.3 Topic 创建和管理
# 自动创建(不推荐生产使用)
autoCreateTopicEnable=true
defaultTopicQueueNums=4
# 手动创建(推荐)
sh mqadmin updateTopic \
-n localhost:9876 \
-t ORDER_TOPIC \
-c DefaultCluster \
-r 8 \ # 读队列数量
-w 8 \ # 写队列数量
-p 6 # 权限
# 查看 Topic 列表
sh mqadmin topicList -n localhost:9876
# 查看 Topic 详情
sh mqadmin topicStatus -n localhost:9876 -t ORDER_TOPIC
二、Queue(队列)- 负载均衡的基本单元
2.1 Queue 的本质
public class QueueConcept {
// Queue 是 Topic 的物理分片
class MessageQueue {
private String topic; // 所属 Topic
private String brokerName; // 所在 Broker
private int queueId; // 队列 ID
@Override
public String toString() {
// ORDER_TOPIC@broker-a@0
return topic + "@" + brokerName + "@" + queueId;
}
}
// Topic 与 Queue 的关系
class TopicQueueRelation {
// 1个 Topic = N 个 Queue
// 例如:ORDER_TOPIC 有 8 个 Queue
// broker-a: queue0, queue1, queue2, queue3
// broker-b: queue0, queue1, queue2, queue3
Map<String, Integer> distribution = new HashMap<>();
{
distribution.put("broker-a", 4);
distribution.put("broker-b", 4);
// Total: 8 Queues
}
}
}
2.2 Queue 的负载均衡
public class QueueLoadBalance {
// 生产者端:选择 Queue 发送
public MessageQueue selectQueue() {
List<MessageQueue> queues = getQueues("ORDER_TOPIC");
// 策略1:轮询(默认)
int index = sendWhichQueue.incrementAndGet() % queues.size();
return queues.get(index);
// 策略2:根据业务 Key 路由
String orderId = "12345";
int index = orderId.hashCode() % queues.size();
return queues.get(index);
// 策略3:最小延迟
return selectQueueByLatency(queues);
}
// 消费者端:分配 Queue 消费
public List<MessageQueue> allocateQueues() {
// 假设:8个 Queue,3个 Consumer
// Consumer0: queue0, queue1, queue2
// Consumer1: queue3, queue4, queue5
// Consumer2: queue6, queue7
return AllocateStrategy.allocate(
allQueues,
currentConsumerId,
allConsumerIds
);
}
}
2.3 Queue 数量设计
public class QueueNumberDesign {
// Queue 数量考虑因素
class Factors {
// 1. 并发度
// Queue 数量 = 预期 TPS / 单 Queue TPS
// 例如:10万 TPS / 1万 TPS = 10个 Queue
// 2. Consumer 数量
// Queue 数量 >= Consumer 数量
// 否则有 Consumer 闲置
// 3. Broker 数量
// Queue 数量 = Broker 数量 × 单 Broker Queue 数
// 例如:2个 Broker × 4 = 8个 Queue
}
// 动态调整 Queue 数量
public void adjustQueueNumber() {
// 增加 Queue(在线扩容)
updateTopic("ORDER_TOPIC", readQueues=16, writeQueues=16);
// 减少 Queue(需谨慎)
// 1. 先减少写队列
updateTopic("ORDER_TOPIC", readQueues=16, writeQueues=8);
// 2. 等待消费完成
waitForConsume();
// 3. 再减少读队列
updateTopic("ORDER_TOPIC", readQueues=8, writeQueues=8);
}
}
三、Tag(标签)- 消息的二级分类
3.1 Tag 的作用
public class TagUsage {
// Tag 用于同一 Topic 下的消息细分
public void sendWithTag() {
// 订单 Topic 下的不同消息类型
Message createOrder = new Message("ORDER_TOPIC", "CREATE", body);
Message payOrder = new Message("ORDER_TOPIC", "PAY", body);
Message cancelOrder = new Message("ORDER_TOPIC", "CANCEL", body);
Message refundOrder = new Message("ORDER_TOPIC", "REFUND", body);
}
// Tag vs Topic 的选择
class TagVsTopic {
// 使用 Tag 的场景:
// 1. 消息类型相关性强
// 2. 需要统一管理
// 3. 消费者可能订阅多种类型
// ✅ 推荐:相关业务用 Tag 区分
void goodPractice() {
// 订单的不同状态用 Tag
new Message("ORDER_TOPIC", "CREATED", body);
new Message("ORDER_TOPIC", "PAID", body);
new Message("ORDER_TOPIC", "SHIPPED", body);
}
// ❌ 不推荐:无关业务用同一 Topic
void badPractice() {
// 订单和用户混在一起
new Message("BUSINESS_TOPIC", "ORDER_CREATE", body);
new Message("BUSINESS_TOPIC", "USER_REGISTER", body);
}
}
}
3.2 Tag 过滤机制
public class TagFiltering {
// 消费者订阅
public void subscribeWithTag() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
// 订阅所有 Tag
consumer.subscribe("ORDER_TOPIC", "*");
// 订阅单个 Tag
consumer.subscribe("ORDER_TOPIC", "CREATE");
// 订阅多个 Tag(|| 分隔)
consumer.subscribe("ORDER_TOPIC", "CREATE || PAY || CANCEL");
// SQL92 表达式(需要 Broker 开启)
consumer.subscribe("ORDER_TOPIC",
MessageSelector.bySql("amount > 1000 AND type = 'GOLD'"));
}
// Tag 过滤原理
class FilterPrinciple {
// 1. Broker 端过滤(粗过滤)
// 根据 Tag HashCode 快速过滤
// 在 ConsumeQueue 中存储了 Tag HashCode
// 2. Consumer 端过滤(精过滤)
// 再次验证 Tag 字符串
// 防止 Hash 冲突导致的误判
public boolean filterMessage(MessageExt msg, String subscribeTags) {
// Broker 端:HashCode 匹配
long tagCode = msg.getTagsCode();
if (!isMatched(tagCode, subscribeTagCodes)) {
return false;
}
// Consumer 端:字符串匹配
String msgTag = msg.getTags();
return isTagMatched(msgTag, subscribeTags);
}
}
}
3.3 Tag 最佳实践
public class TagBestPractices {
// 1. Tag 命名规范
class TagNaming {
// 使用大写字母和下划线
String GOOD_TAG = "ORDER_CREATED";
// 避免特殊字符
String BAD_TAG = "order-created@2023"; // 避免
// 保持简短有意义
String CONCISE = "PAY_SUCCESS";
String VERBOSE = "ORDER_PAYMENT_SUCCESS_WITH_DISCOUNT"; // 太长
}
// 2. Tag 使用策略
class TagStrategy {
// 按业务流程设计
String[] ORDER_FLOW = {
"CREATED", // 订单创建
"PAID", // 支付完成
"SHIPPED", // 已发货
"RECEIVED", // 已收货
"COMPLETED" // 已完成
};
// 按事件类型设计
String[] EVENT_TYPE = {
"INSERT", // 新增
"UPDATE", // 更新
"DELETE" // 删除
};
}
}
四、Key - 消息的业务标识
4.1 Key 的用途
public class KeyUsage {
// Key 用于消息的业务标识和查询
public void sendWithKey() {
Message msg = new Message("ORDER_TOPIC", "CREATE", body);
// 设置业务唯一标识
msg.setKeys("ORDER_12345");
// 设置多个 Key(空格分隔)
msg.setKeys("ORDER_12345 USER_67890");
producer.send(msg);
}
// Key 的主要用途
class KeyPurpose {
// 1. 消息查询
void queryByKey() {
// 通过 Key 查询消息
QueryResult result =
mqadmin.queryMsgByKey("ORDER_TOPIC", "ORDER_12345");
}
// 2. 去重判断
void deduplication(String key) {
if (processedKeys.contains(key)) {
// 已处理,跳过
return;
}
processMessage();
processedKeys.add(key);
}
// 3. 业务追踪
void traceOrder(String orderId) {
// 追踪订单的所有消息
List<Message> orderMessages =
queryAllMessagesByKey(orderId);
}
}
}
4.2 Key 的索引机制
public class KeyIndexing {
// IndexFile 结构
class IndexFile {
// 通过 Key 建立索引
// 存储格式:Key Hash -> Physical Offset
private IndexHeader header; // 文件头
private SlotTable slotTable; // Hash 槽
private IndexLinkedList indexes; // 索引链表
public void putKey(String key, long offset) {
int keyHash = hash(key);
int slotPos = keyHash % 500_0000; // 500万个槽
// 存储索引
IndexItem item = new IndexItem();
item.setKeyHash(keyHash);
item.setPhyOffset(offset);
item.setTimeDiff(storeTimestamp - beginTimestamp);
// 处理 Hash 冲突(链表)
item.setSlotValue(slotTable.get(slotPos));
slotTable.put(slotPos, indexes.add(item));
}
}
}
4.3 Key 设计建议
public class KeyDesignSuggestions {
// 1. Key 的选择
class KeySelection {
// ✅ 好的 Key:业务唯一标识
String orderId = "ORDER_2023110901234";
String userId = "USER_123456";
String transactionId = "TXN_9876543210";
// ❌ 不好的 Key:无业务含义
String uuid = UUID.randomUUID().toString();
String timestamp = String.valueOf(System.currentTimeMillis());
}
// 2. 复合 Key
class CompositeKey {
public String generateKey(Order order) {
// 组合多个维度
return String.format("%s_%s_%s",
order.getOrderId(),
order.getUserId(),
order.getCreateTime()
);
}
}
// 3. Key 规范
class KeySpecification {
// 长度控制
static final int MAX_KEY_LENGTH = 128;
// 字符限制
static final String KEY_PATTERN = "^[a-zA-Z0-9_-]+$";
// 避免特殊字符
public String sanitizeKey(String key) {
return key.replaceAll("[^a-zA-Z0-9_-]", "_");
}
}
}
五、消息属性(Properties)
5.1 系统属性 vs 用户属性
public class MessageProperties {
// 系统属性(RocketMQ 内部使用)
class SystemProperties {
String UNIQ_KEY = "__UNIQ_KEY__"; // 消息唯一标识
String MAX_OFFSET = "__MAX_OFFSET__"; // 最大偏移量
String MIN_OFFSET = "__MIN_OFFSET__"; // 最小偏移量
String TRANSACTION_ID = "__TRANSACTION_ID__"; // 事务ID
String DELAY_LEVEL = "__DELAY_LEVEL__"; // 延迟级别
}
// 用户自定义属性
public void setUserProperties() {
Message msg = new Message("TOPIC", "TAG", body);
// 添加业务属性
msg.putUserProperty("orderType", "ONLINE");
msg.putUserProperty("paymentMethod", "CREDIT_CARD");
msg.putUserProperty("region", "EAST");
msg.putUserProperty("vipLevel", "GOLD");
// SQL92 过滤时使用
consumer.subscribe("TOPIC",
MessageSelector.bySql("vipLevel = 'GOLD' AND region = 'EAST'"));
}
}
5.2 属性的应用场景
public class PropertiesUseCases {
// 1. 消息路由
public void routeByProperty(Message msg) {
String region = msg.getUserProperty("region");
// 根据地区路由到不同集群
if ("NORTH".equals(region)) {
sendToNorthCluster(msg);
} else {
sendToSouthCluster(msg);
}
}
// 2. 监控统计
public void monitoring(Message msg) {
// 统计不同类型订单
String orderType = msg.getUserProperty("orderType");
metrics.increment("order.count", "type", orderType);
}
// 3. 灰度发布
public void grayRelease(Message msg) {
msg.putUserProperty("version", "2.0");
msg.putUserProperty("gray", "true");
msg.putUserProperty("percentage", "10"); // 10% 流量
}
}
六、组合使用示例
6.1 电商订单场景
public class ECommerceExample {
public void processOrder(Order order) {
// Topic:按业务领域
String topic = "ORDER_TOPIC";
// Tag:按订单状态
String tag = getOrderTag(order.getStatus());
// Key:订单号(用于查询)
String key = order.getOrderId();
// 构造消息
Message msg = new Message(topic, tag, key,
JSON.toJSONBytes(order));
// 添加属性(用于过滤和统计)
msg.putUserProperty("userId", order.getUserId());
msg.putUserProperty("amount", String.valueOf(order.getAmount()));
msg.putUserProperty("payMethod", order.getPayMethod());
msg.putUserProperty("orderType", order.getType());
// 发送消息
SendResult result = producer.send(msg);
}
private String getOrderTag(OrderStatus status) {
switch (status) {
case CREATED: return "ORDER_CREATED";
case PAID: return "ORDER_PAID";
case SHIPPED: return "ORDER_SHIPPED";
case COMPLETED: return "ORDER_COMPLETED";
case CANCELLED: return "ORDER_CANCELLED";
default: return "ORDER_UNKNOWN";
}
}
}
6.2 消费者订阅策略
public class ConsumerStrategy {
// 场景1:订阅所有订单消息
class AllOrderConsumer {
public void subscribe() {
consumer.subscribe("ORDER_TOPIC", "*");
}
}
// 场景2:只处理支付相关
class PaymentConsumer {
public void subscribe() {
consumer.subscribe("ORDER_TOPIC", "ORDER_PAID || ORDER_REFUND");
}
}
// 场景3:VIP 订单处理
class VipOrderConsumer {
public void subscribe() {
consumer.subscribe("ORDER_TOPIC",
MessageSelector.bySql("orderType = 'VIP' AND amount > 1000"));
}
}
// 场景4:特定用户订单
class UserOrderConsumer {
public void consumeMessage(MessageExt msg) {
String userId = msg.getUserProperty("userId");
if ("VIP_USER_123".equals(userId)) {
// 特殊处理 VIP 用户订单
handleVipOrder(msg);
} else {
// 普通处理
handleNormalOrder(msg);
}
}
}
}
七、设计原则总结
7.1 层级设计原则
消息层级设计:
┌─────────────────────────────────┐
│ Topic │ <- 业务大类
├─────────────────────────────────┤
│ Queue1 Queue2 Queue3 │ <- 负载均衡
├─────────────────────────────────┤
│ Tag1 Tag2 Tag3 │ <- 消息子类
├─────────────────────────────────┤
│ Key (业务ID) │ <- 消息标识
├─────────────────────────────────┤
│ Properties (属性) │ <- 扩展信息
└─────────────────────────────────┘
7.2 最佳实践建议
public class BestPractices {
// 1. Topic 设计:粗粒度
// 一个业务领域一个 Topic
// 避免创建过多 Topic
// 2. Queue 设计:适度
// Queue 数量 = max(预期并发数, Consumer数量)
// 一般 8-16 个即可
// 3. Tag 设计:细粒度
// 同一 Topic 下的不同消息类型
// 便于消费者灵活订阅
// 4. Key 设计:唯一性
// 使用业务唯一标识
// 便于消息追踪和查询
// 5. Properties:扩展性
// 存储额外的业务信息
// 支持 SQL92 复杂过滤
}
八、常见问题
8.1 Topic 过多的问题
// 问题:Topic 过多导致的影响
// 1. 路由信息膨胀
// 2. 内存占用增加
// 3. 管理复杂度上升
// 解决方案:合并相关 Topic,用 Tag 区分
// Before: USER_REGISTER_TOPIC, USER_UPDATE_TOPIC, USER_DELETE_TOPIC
// After: USER_TOPIC + Tags(REGISTER, UPDATE, DELETE)
8.2 Queue 数量不当
// 问题:Queue 太少
// - Consumer 空闲
// - 并发度不够
// 问题:Queue 太多
// - 管理开销大
// - Rebalance 频繁
// 解决:动态调整
// 监控 Queue 积压和 Consumer 负载
// 根据实际情况调整 Queue 数量
九、总结
理解了 Topic、Queue、Tag、Key 这些核心概念,我们就掌握了 RocketMQ 的消息组织方式:
- Topic:业务领域的划分,粗粒度
- Queue:负载均衡的单元,适度即可
- Tag:消息类型的细分,灵活过滤
- Key:业务标识,方便追踪
合理使用这些概念,能让我们的消息系统更加清晰、高效、易维护。
下一篇预告
掌握了消息的组织方式,下一篇我们将深入探讨 RocketMQ 的消息模型:点对点模式 vs 发布订阅模式的实现原理和使用场景。
实践练习:
- 设计一个电商系统的 Topic/Tag 结构
- 计算你的业务场景需要多少 Queue
- 思考什么样的业务字段适合做 Key
在实践中加深理解,我们下篇见!