RocketMQ入门08:消费模式剖析 - Push vs Pull的权衡

引言:一个有趣的悖论 当你使用 RocketMQ 的 Push 模式时,你可能会惊讶地发现: API 名称是 DefaultMQPushConsumer 但实际上是 Consumer 在主动拉取消息 官方文档称之为"Push 模式" 但本质上是"长轮询 Pull" 这不是设计缺陷,而是精心权衡的结果。让我们揭开这个"伪 Push"的神秘面纱。 一、Push vs Pull 的本质区别 1.1 理论上的 Push 和 Pull public class PushVsPullTheory { // 真正的 Push 模式 class TruePushMode { // Broker 主动推送消息给 Consumer // 特点: // 1. Broker 维护 Consumer 列表 // 2. 有新消息立即推送 // 3. Broker 控制推送速率 // 4. Consumer 被动接收 void brokerPush() { // Broker 端代码 for (Consumer consumer : consumers) { consumer.receive(message); // 主动调用 } } } // 真正的 Pull 模式 class TruePullMode { // Consumer 主动从 Broker 拉取消息 // 特点: // 1. Consumer 控制拉取时机 // 2. Consumer 控制拉取速率 // 3. Broker 被动响应 // 4. 可能存在消息延迟 void consumerPull() { // Consumer 端代码 while (running) { Message msg = broker.pull(); // 主动拉取 process(msg); } } } } 1.2 两种模式的优劣对比 Push 模式的优劣: ┌─────────────────┬──────────────────────────────┐ │ 优点 │ 缺点 │ ├─────────────────┼──────────────────────────────┤ │ 实时性高 │ Broker 需要维护推送状态 │ │ 消息延迟小 │ 难以处理消费速率差异 │ │ 使用简单 │ 可能造成 Consumer 过载 │ │ │ Broker 成为性能瓶颈 │ └─────────────────┴──────────────────────────────┘ Pull 模式的优劣: ┌─────────────────┬──────────────────────────────┐ │ 优点 │ 缺点 │ ├─────────────────┼──────────────────────────────┤ │ Consumer 自主控制│ 可能产生消息延迟 │ │ 流量可控 │ 频繁轮询浪费资源 │ │ Broker 无状态 │ 需要管理消费进度 │ │ 适合批量处理 │ 实现相对复杂 │ └─────────────────┴──────────────────────────────┘ 二、RocketMQ 的 Push 模式真相 2.1 Push 模式的内部实现 public class RocketMQPushMode { // DefaultMQPushConsumer 的秘密:底层是 Pull class PushConsumerImplementation { private PullMessageService pullMessageService; // 启动后自动开始拉取 public void start() { // 1. 启动 Rebalance 服务 rebalanceService.start(); // 2. 启动拉取服务 pullMessageService.start(); // 3. 立即触发拉取 pullMessageService.executePullRequestImmediately(pullRequest); } // 核心:PullMessageService class PullMessageService extends ServiceThread { @Override public void run() { while (!this.isStopped()) { PullRequest pullRequest = pullRequestQueue.take(); pullMessage(pullRequest); } } private void pullMessage(PullRequest pullRequest) { // 实际进行拉取操作 PullResult pullResult = pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, pullRequest.getNextOffset(), maxNums, sysFlag, 0, // 没有消息时挂起时间 brokerSuspendMaxTimeMillis, // 30秒 timeoutMillis, CommunicationMode.ASYNC, pullCallback ); } } } } 2.2 Push 模式的工作流程 Push 模式完整流程: ┌──────────────┐ │ Consumer │ └──────┬───────┘ │ 1. 发送拉取请求 ↓ ┌──────────────┐ │ Broker │ ├──────────────┤ │ 有消息? │ │ ├─是→ 立即返回 │ └─否→ Hold住请求 └──────┬───────┘ │ 2. 返回消息或超时 ↓ ┌──────────────┐ │ Consumer │ │ 处理消息 │ │ 继续拉取 │ └──────────────┘ 2.3 为什么要"伪装"成 Push? public class WhyFakePush { // 原因1:使用体验 class UserExperience { // Push API 更简单 void pushAPI() { consumer.registerMessageListener(listener); consumer.start(); // 就这么简单 } // Pull API 更复杂 void pullAPI() { while (running) { PullResult result = consumer.pull(mq, offset); process(result); updateOffset(offset); } } } // 原因2:兼顾优点 class CombineAdvantages { // 获得 Push 的优点: // - 接近实时(长轮询) // - 使用简单 // 获得 Pull 的优点: // - 流量可控 // - Broker 无状态 // - Consumer 自主控制 } // 原因3:避免缺点 class AvoidDisadvantages { // 避免 Push 的缺点: // - Broker 不需要维护推送状态 // - 不会造成 Consumer 过载 // 避免 Pull 的缺点: // - 无消息时不会空轮询 // - 有消息时立即返回 } } 三、长轮询机制详解 3.1 长轮询的核心原理 public class LongPollingMechanism { // Broker 端长轮询实现 class BrokerLongPolling { // 处理拉取请求 public RemotingCommand processRequest(RemotingCommand request) { // 1. 查询消息 GetMessageResult getMessageResult = messageStore.getMessage( group, topic, queueId, offset, maxMsgNums, filters ); // 2. 判断是否有消息 if (getMessageResult.getMessageCount() > 0) { // 有消息,立即返回 return buildResponse(getMessageResult); } else { // 没有消息,进入长轮询 return longPolling(request); } } // 长轮询处理 private RemotingCommand longPolling(RemotingCommand request) { // 1. 创建 Hold 请求 PullRequest pullRequest = new PullRequest( request, channel, pollingTimeMills, // 挂起时间 System.currentTimeMillis() ); // 2. 放入 Hold 队列 pullRequestHoldService.suspendPullRequest(topic, queueId, pullRequest); // 3. 不立即返回,等待唤醒或超时 return null; // 暂不返回 } } // Hold 服务 class PullRequestHoldService extends ServiceThread { // Hold 请求的容器 private ConcurrentHashMap<String, ManyPullRequest> pullRequestTable; @Override public void run() { while (!stopped) { // 每 5 秒检查一次 waitForRunning(5000); // 检查所有 Hold 的请求 for (ManyPullRequest mpr : pullRequestTable.values()) { List<PullRequest> requestList = mpr.cloneListAndClear(); for (PullRequest request : requestList) { // 检查是否有新消息 long offset = messageStore.getMaxOffset(topic, queueId); if (offset > request.getPullFromThisOffset()) { // 有新消息,唤醒请求 wakeupRequest(request); } else if (System.currentTimeMillis() - request.getSuspendTimestamp() > maxWaitTimeMillis) { // 超时,返回空结果 timeoutRequest(request); } else { // 继续 Hold mpr.addPullRequest(request); } } } } } // 新消息到达时的通知 public void notifyMessageArriving(String topic, int queueId) { ManyPullRequest mpr = pullRequestTable.get(buildKey(topic, queueId)); if (mpr != null) { List<PullRequest> requestList = mpr.cloneListAndClear(); for (PullRequest request : requestList) { // 立即唤醒等待的请求 wakeupRequest(request); } } } } } 3.2 长轮询的时序图 长轮询时序图: Consumer Broker MessageStore │ │ │ │──────Pull Request───────>│ │ │ │ │ │ │────Check Message────────>│ │ │ │ │ │<───No Message────────────│ │ │ │ │ (Hold Request) │ │ │ │ │ │<────New Message Arrive────│ │ │ │ │ (Wake up Request) │ │ │ │ │<─────Return Messages─────│ │ │ │ │ 时间轴: 0s :Consumer 发送拉取请求 0s :Broker 检查没有消息,Hold 住请求 15s :新消息到达,Broker 唤醒请求 15s :Broker 返回消息给 Consumer 3.3 长轮询的参数配置 public class LongPollingConfig { // Consumer 端配置 class ConsumerConfig { // 长轮询模式下的挂起时间(默认 30 秒) consumer.setBrokerSuspendMaxTimeMillis(30000); // 拉取间隔(毫秒) consumer.setPullInterval(0); // 0 表示连续拉取 // 拉取批量大小 consumer.setPullBatchSize(32); // 单次拉取的最大消息数 consumer.setConsumeMessageBatchMaxSize(1); } // Broker 端配置 class BrokerConfig { // 长轮询等待时间(默认 30 秒) longPollingEnable = true; // Hold 请求的检查间隔(默认 5 秒) waitTimeMillsInPullHoldService = 5000; // 短轮询等待时间(默认 1 秒) shortPollingTimeMills = 1000; } } 四、Pull 模式的使用场景 4.1 Pull 模式的实现 public class PullModeImplementation { public void pullConsumer() throws Exception { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(); consumer.setNamesrvAddr("localhost:9876"); consumer.setConsumerGroup("PULL_CONSUMER_GROUP"); consumer.start(); // 获取消息队列 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TOPIC"); for (MessageQueue mq : mqs) { // 从存储获取消费进度 long offset = loadOffset(mq); while (true) { try { // 拉取消息 PullResult pullResult = consumer.pullBlockIfNotFound( mq, // 消息队列 null, // 过滤表达式 offset, // 拉取偏移量 32 // 最大消息数 ); // 处理拉取结果 switch (pullResult.getMsgFoundList()) { case FOUND: // 处理消息 for (MessageExt msg : pullResult.getMsgFoundList()) { processMessage(msg); } // 更新偏移量 offset = pullResult.getNextBeginOffset(); // 持久化偏移量 saveOffset(mq, offset); break; case NO_NEW_MSG: // 没有新消息 Thread.sleep(1000); break; case OFFSET_ILLEGAL: // 偏移量非法 offset = consumer.minOffset(mq); break; } } catch (Exception e) { e.printStackTrace(); } } } } } 4.2 Pull 模式的适用场景 public class PullModeScenarios { // 场景1:批量处理 class BatchProcessing { void batchConsume() { // 积累一定数量后批量处理 List<MessageExt> batch = new ArrayList<>(); while (batch.size() < 1000) { PullResult result = consumer.pull(mq, offset, 100); batch.addAll(result.getMsgFoundList()); } // 批量处理 batchProcess(batch); } } // 场景2:定时处理 class ScheduledProcessing { void scheduledConsume() { // 每小时处理一次 ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() -> { PullResult result = consumer.pull(mq, offset, Integer.MAX_VALUE); process(result.getMsgFoundList()); }, 0, 1, TimeUnit.HOURS); } } // 场景3:流量控制 class FlowControl { void controlledConsume() { RateLimiter rateLimiter = RateLimiter.create(100); // 100 TPS while (running) { rateLimiter.acquire(); // 限流 PullResult result = consumer.pull(mq, offset, 1); process(result.getMsgFoundList()); } } } // 场景4:自定义消费逻辑 class CustomLogic { void customConsume() { // 根据业务状态决定是否消费 if (isBusinessReady()) { PullResult result = consumer.pull(mq, offset, 32); process(result.getMsgFoundList()); } else { // 暂停消费 Thread.sleep(5000); } } } } 五、Push 和 Pull 的性能对比 5.1 性能测试 public class PerformanceComparison { // Push 模式性能特点 class PushPerformance { // 优点: // - 低延迟(长轮询,有消息立即返回) // - 高吞吐(连续拉取,无空闲) // - 资源利用率高(无空轮询) // 缺点: // - Consumer 压力大(连续处理) // - 流量不可控(Broker 推送速率) } // Pull 模式性能特点 class PullPerformance { // 优点: // - 流量可控(Consumer 控制) // - 压力可调(可以暂停) // - 批量优化(一次拉取多条) // 缺点: // - 可能有延迟(拉取间隔) // - 可能空轮询(浪费资源) } // 性能数据对比 class PerformanceData { /* 测试环境:3 Broker,10 Producer,10 Consumer 消息大小:1KB 指标 Push模式 Pull模式(1s间隔) Pull模式(连续) ---------------------------------------------------------------- 延迟(ms) 10-50 1000-2000 50-100 TPS 100,000 50,000 80,000 CPU使用率 85% 60% 75% 网络开销 低 高(空轮询) 中 */ } } 5.2 选择建议 public class SelectionGuide { // 选择 Push 模式的场景 class WhenToUsePush { // ✅ 实时性要求高 // ✅ 消息量稳定 // ✅ 希望简单使用 // ✅ Consumer 处理能力强 void example() { // 订单处理、支付通知、实时推送 } } // 选择 Pull 模式的场景 class WhenToUsePull { // ✅ 需要批量处理 // ✅ 定时处理 // ✅ 流量控制 // ✅ 自定义消费策略 void example() { // 数据同步、报表生成、批量导入 } } // 混合使用 class HybridUsage { // 核心业务使用 Push(实时性) // 非核心业务使用 Pull(可控性) void example() { // Push: 订单、支付 // Pull: 日志、统计 } } } 六、高级特性 6.1 Push 模式的流控 public class PushFlowControl { public void flowControl() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 流控参数 consumer.setPullThresholdForQueue(1000); // 单队列最大消息数 consumer.setPullThresholdSizeForQueue(100); // 单队列最大消息大小(MB) consumer.setPullThresholdForTopic(10000); // Topic 最大消息数 consumer.setPullThresholdSizeForTopic(1000); // Topic 最大消息大小(MB) // 消费并发控制 consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(64); // 批量消费 consumer.setConsumeMessageBatchMaxSize(10); } } 6.2 Pull 模式的优化 public class PullOptimization { // 优化1:使用 PullConsumer 代替 DefaultMQPullConsumer public void optimizedPull() { // RocketMQ 5.0 新 API PullConsumer pullConsumer = PullConsumer.newBuilder() .setConsumerGroup("GROUP") .setNamesrvAddr("localhost:9876") .build(); // 异步拉取 CompletableFuture<PullResult> future = pullConsumer.pullAsync( mq, offset, maxNums ); future.thenAccept(result -> { process(result.getMsgFoundList()); }); } // 优化2:批量拉取 public void batchPull() { // 一次拉取更多消息,减少网络开销 PullResult result = consumer.pull(mq, offset, 1000); // 拉取 1000 条 } // 优化3:并行处理 public void parallelPull() { ExecutorService executor = Executors.newFixedThreadPool(10); for (MessageQueue mq : queues) { executor.submit(() -> { while (running) { PullResult result = consumer.pull(mq, offset, 32); process(result); } }); } } } 七、常见问题 7.1 Push 模式消费堆积 // 问题:Push 模式下消息堆积 // 原因:消费速度 < 生产速度 // 解决方案: class PushBacklogSolution { void solution() { // 1. 增加消费者实例 // 水平扩展 // 2. 增加消费线程 consumer.setConsumeThreadMin(40); consumer.setConsumeThreadMax(80); // 3. 批量消费 consumer.setConsumeMessageBatchMaxSize(20); // 4. 优化消费逻辑 // 异步处理、批量入库等 } } 7.2 Pull 模式偏移量管理 // 问题:Pull 模式偏移量丢失 // 原因:没有正确持久化偏移量 // 解决方案: class PullOffsetManagement { private OffsetStore offsetStore; void manageOffset() { // 1. 定期持久化 scheduledExecutor.scheduleAtFixedRate(() -> { offsetStore.persistAll(offsetTable); }, 5, 5, TimeUnit.SECONDS); // 2. 优雅关闭时持久化 Runtime.getRuntime().addShutdownHook(new Thread(() -> { offsetStore.persistAll(offsetTable); })); // 3. 使用 RocketMQ 提供的 OffsetStore consumer.setOffsetStore(new RemoteBrokerOffsetStore()); } } 八、总结 通过本篇,我们深入理解了: ...

2025-11-13 · maneng

RocketMQ入门07:消息模型深入 - 点对点vs发布订阅

引言:一个模型,两种模式 传统消息系统通常要在两种模型中二选一: JMS:明确区分 Queue(点对点)和 Topic(发布订阅) AMQP:通过 Exchange 和 Binding 实现不同模式 Kafka:只有 Topic,通过 Consumer Group 实现不同语义 而 RocketMQ 的独特之处在于:用一套模型,同时支持两种模式。 让我们深入理解这是如何实现的。 一、两种基本消息模型 1.1 点对点模型(Point-to-Point) public class P2PModel { // 点对点模型特征 class Characteristics { // 1. 一条消息只能被一个消费者消费 // 2. 消息被消费后从队列中删除 // 3. 多个消费者竞争消息 // 4. 适合任务分发场景 } // 传统 JMS Queue 实现 class TraditionalQueue { Queue<Message> queue = new LinkedBlockingQueue<>(); // 生产者发送 public void send(Message msg) { queue.offer(msg); } // 消费者接收(竞争) public Message receive() { return queue.poll(); // 取出即删除 } } } 点对点模型示意图: ┌──────────┐ Producer ──> Queue ──> Consumer1 (获得消息) └──────────┘ ──> Consumer2 (没有获得) ──> Consumer3 (没有获得) 特点:消息被其中一个消费者消费后,其他消费者无法再消费 1.2 发布订阅模型(Publish-Subscribe) public class PubSubModel { // 发布订阅模型特征 class Characteristics { // 1. 一条消息可以被多个订阅者消费 // 2. 每个订阅者都能收到完整消息副本 // 3. 消息广播给所有订阅者 // 4. 适合事件通知场景 } // 传统 JMS Topic 实现 class TraditionalTopic { List<Subscriber> subscribers = new ArrayList<>(); // 发布消息 public void publish(Message msg) { // 广播给所有订阅者 for (Subscriber sub : subscribers) { sub.onMessage(msg.copy()); } } // 订阅主题 public void subscribe(Subscriber sub) { subscribers.add(sub); } } } 发布订阅模型示意图: ┌──> Consumer1 (收到消息) ┌────────┐ │ Producer ──> Topic ──┼──> Consumer2 (收到消息) └────────┘ │ └──> Consumer3 (收到消息) 特点:每个订阅者都能收到消息的完整副本 二、RocketMQ 的统一模型 2.1 核心设计:Consumer Group public class RocketMQModel { // RocketMQ 的秘密:通过 Consumer Group 实现两种模式 class ConsumerGroup { String groupName; List<Consumer> consumers; ConsumeMode mode; // 集群消费 or 广播消费 } // 关键规则: // 1. 同一 Consumer Group 内的消费者【分摊】消息(点对点) // 2. 不同 Consumer Group 都能收到【全量】消息(发布订阅) } 2.2 模式切换的魔法 public class ModelSwitching { // 场景1:实现点对点模式 public void p2pMode() { // 所有消费者使用【相同】的 Consumer Group String GROUP = "SAME_GROUP"; Consumer consumer1 = new DefaultMQPushConsumer(GROUP); Consumer consumer2 = new DefaultMQPushConsumer(GROUP); Consumer consumer3 = new DefaultMQPushConsumer(GROUP); // 结果:消息被分摊消费,每条消息只被其中一个消费 } // 场景2:实现发布订阅模式 public void pubSubMode() { // 每个消费者使用【不同】的 Consumer Group Consumer consumer1 = new DefaultMQPushConsumer("GROUP_A"); Consumer consumer2 = new DefaultMQPushConsumer("GROUP_B"); Consumer consumer3 = new DefaultMQPushConsumer("GROUP_C"); // 结果:每个 Group 都收到全量消息 } // 场景3:混合模式 public void mixedMode() { // 订单服务组(2个实例) Consumer order1 = new DefaultMQPushConsumer("ORDER_GROUP"); Consumer order2 = new DefaultMQPushConsumer("ORDER_GROUP"); // 库存服务组(3个实例) Consumer stock1 = new DefaultMQPushConsumer("STOCK_GROUP"); Consumer stock2 = new DefaultMQPushConsumer("STOCK_GROUP"); Consumer stock3 = new DefaultMQPushConsumer("STOCK_GROUP"); // 结果: // - ORDER_GROUP 内部:order1 和 order2 分摊消息 // - STOCK_GROUP 内部:stock1/2/3 分摊消息 // - 两个 GROUP 之间:都能收到全量消息 } } 三、集群消费 vs 广播消费 3.1 集群消费(默认) public class ClusterConsumeMode { public void setup() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP"); // 设置集群消费模式(默认) consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe("TOPIC", "*"); } // 集群消费特点 class ClusteringCharacteristics { // 1. 同组内消费者分摊消息 // 2. 消费进度存储在 Broker // 3. 支持消费失败重试 // 4. 适合负载均衡场景 } // 消费进度管理 class ProgressManagement { // Broker 端统一管理消费进度 // 路径:{ROCKETMQ_HOME}/store/config/consumerOffset.json { "offsetTable": { "TOPIC@GROUP": { "0": 12345, // Queue0 消费到 12345 "1": 23456, // Queue1 消费到 23456 "2": 34567, // Queue2 消费到 34567 "3": 45678 // Queue3 消费到 45678 } } } } } 集群消费示意图: Queue0 ──> Consumer1 ─┐ Queue1 ──> Consumer1 ─┤ Topic ──> ├─> GROUP_A (共享进度) Queue2 ──> Consumer2 ─┤ Queue3 ──> Consumer2 ─┘ GROUP_A 的消费进度存储在 Broker,所有成员共享 3.2 广播消费 public class BroadcastConsumeMode { public void setup() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP"); // 设置广播消费模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TOPIC", "*"); } // 广播消费特点 class BroadcastingCharacteristics { // 1. 每个消费者都消费全量消息 // 2. 消费进度存储在本地 // 3. 不支持消费失败重试 // 4. 适合本地缓存更新场景 } // 消费进度管理 class LocalProgress { // 本地文件存储消费进度 // 路径:{user.home}/.rocketmq_offsets/{clientId}/{group}/offsets.json { "offsetTable": { "TOPIC": { "0": 99999, // 本实例 Queue0 进度 "1": 88888, // 本实例 Queue1 进度 "2": 77777, // 本实例 Queue2 进度 "3": 66666 // 本实例 Queue3 进度 } } } } } 广播消费示意图: ┌─> Consumer1 (消费全部消息,独立进度) │ Topic ──> ────┼─> Consumer2 (消费全部消息,独立进度) │ └─> Consumer3 (消费全部消息,独立进度) 每个 Consumer 独立维护自己的消费进度 四、消息分配策略 4.1 集群模式下的分配策略 public class AllocationStrategy { // 1. 平均分配(默认) class AllocateMessageQueueAveragely { // 假设:4个Queue,2个Consumer // Consumer1: Queue0, Queue1 // Consumer2: Queue2, Queue3 // 假设:5个Queue,2个Consumer // Consumer1: Queue0, Queue1, Queue2 // Consumer2: Queue3, Queue4 } // 2. 环形分配 class AllocateMessageQueueAveragelyByCircle { // 假设:5个Queue,2个Consumer // Consumer1: Queue0, Queue2, Queue4 // Consumer2: Queue1, Queue3 } // 3. 机房就近分配 class AllocateMessageQueueByMachineRoom { // 根据 Consumer 和 Broker 的机房位置分配 // 优先分配同机房的 Queue } // 4. 一致性Hash分配 class AllocateMessageQueueConsistentHash { // 使用一致性Hash算法 // Consumer 上下线对其他 Consumer 影响最小 } // 5. 自定义分配 class CustomAllocateStrategy implements AllocateMessageQueueStrategy { @Override public List<MessageQueue> allocate( String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { // 自定义分配逻辑 // 比如:VIP Consumer 分配更多 Queue if (isVipConsumer(currentCID)) { return allocateMoreQueues(mqAll, currentCID); } return allocateNormalQueues(mqAll, currentCID); } } } 4.2 分配策略选择 public class StrategySelection { public void selectStrategy() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 设置分配策略 consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueAveragely() ); // 根据场景选择 class ScenarioBasedSelection { // 场景1:消费者能力相同 → 平均分配 // 场景2:跨机房部署 → 机房就近 // 场景3:Consumer频繁上下线 → 一致性Hash // 场景4:Consumer能力不同 → 自定义策略 } } } 五、实战场景分析 5.1 订单处理系统 public class OrderProcessingSystem { // 需求:订单需要被多个系统处理,但每个系统只处理一次 // 订单服务(2个实例,负载均衡) class OrderService { void startConsumers() { // 使用相同 Group,实现负载均衡 Consumer instance1 = createConsumer("ORDER_SERVICE_GROUP"); Consumer instance2 = createConsumer("ORDER_SERVICE_GROUP"); // 结果:订单在两个实例间负载均衡 } } // 库存服务(3个实例,负载均衡) class StockService { void startConsumers() { // 使用相同 Group,实现负载均衡 Consumer instance1 = createConsumer("STOCK_SERVICE_GROUP"); Consumer instance2 = createConsumer("STOCK_SERVICE_GROUP"); Consumer instance3 = createConsumer("STOCK_SERVICE_GROUP"); // 结果:订单在三个实例间负载均衡 } } // 积分服务(1个实例) class PointService { void startConsumer() { Consumer instance = createConsumer("POINT_SERVICE_GROUP"); // 结果:所有订单都由这个实例处理 } } // 效果: // 1. 每个订单被 ORDER_SERVICE_GROUP 处理一次 // 2. 每个订单被 STOCK_SERVICE_GROUP 处理一次 // 3. 每个订单被 POINT_SERVICE_GROUP 处理一次 } 5.2 配置更新系统 public class ConfigUpdateSystem { // 需求:配置更新需要通知所有服务实例 class ConfigUpdateConsumer { void setup() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CONFIG_GROUP"); // 使用广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("CONFIG_UPDATE_TOPIC", "*"); consumer.registerMessageListener((List<MessageExt> msgs, context) -> { for (MessageExt msg : msgs) { // 更新本地配置 updateLocalConfig(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); } } // 效果: // 1. 所有服务实例都收到配置更新 // 2. 每个实例独立更新自己的配置 // 3. 某个实例重启不影响其他实例 } 5.3 日志收集系统 public class LogCollectionSystem { // 需求:大量日志需要收集,要求高吞吐 class LogCollector { void setup() { // 创建多个消费者,使用同一个 Group String GROUP = "LOG_COLLECTOR_GROUP"; // 部署10个消费者实例 for (int i = 0; i < 10; i++) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP); consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(30); consumer.setPullBatchSize(64); // 批量拉取 consumer.subscribe("LOG_TOPIC", "*"); consumer.start(); } // 效果: // 1. 10个实例并行消费,提高吞吐量 // 2. 自动负载均衡 // 3. 某个实例故障,其他实例自动接管 } } } 六、模型对比与选择 6.1 不同MQ产品的模型对比 消息模型对比: ┌─────────────┬──────────────┬──────────────┬──────────────┐ │ 产品 │ 模型 │ 实现方式 │ 特点 │ ├─────────────┼──────────────┼──────────────┼──────────────┤ │ RabbitMQ │ AMQP │ Exchange │ 灵活但复杂 │ │ ActiveMQ │ JMS │ Queue/Topic │ 标准但死板 │ │ Kafka │ 发布订阅 │ Partition │ 简单但单一 │ │ RocketMQ │ 统一模型 │ ConsumerGroup│ 灵活且简单 │ └─────────────┴──────────────┴──────────────┴──────────────┘ 6.2 选择建议 public class ModelSelectionGuide { // 使用集群消费(点对点)的场景 class UseClusterMode { // ✅ 任务分发 // ✅ 负载均衡 // ✅ 高可用(故障转移) // ✅ 需要消费确认和重试 void example() { // 订单处理、支付处理、数据同步 } } // 使用广播消费的场景 class UseBroadcastMode { // ✅ 本地缓存更新 // ✅ 配置刷新 // ✅ 全量数据同步 // ✅ 实时通知 void example() { // 配置更新、缓存刷新、实时推送 } } // 混合使用的场景 class UseMixedMode { // 不同业务使用不同 Group // 同一业务的多实例使用相同 Group void example() { // 电商系统:订单服务组、库存服务组、积分服务组 // 每个组内负载均衡,组间相互独立 } } } 七、高级特性 7.1 消费进度重置 public class ConsumeProgressReset { // 重置消费进度的场景 // 1. 需要重新消费历史消息 // 2. 跳过大量积压消息 // 3. 修复消费进度异常 public void resetProgress() { // 方式1:通过管理工具 // sh mqadmin resetOffset -n localhost:9876 -g GROUP -t TOPIC -o 0 // 方式2:通过代码 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // CONSUME_FROM_FIRST_OFFSET: 从最早消息开始 // CONSUME_FROM_LAST_OFFSET: 从最新消息开始 // CONSUME_FROM_TIMESTAMP: 从指定时间开始 } } 7.2 消费并行度调优 public class ConsumerConcurrency { public void tuneConcurrency() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 并发消费 consumer.setConsumeThreadMin(20); // 最小线程数 consumer.setConsumeThreadMax(64); // 最大线程数 // 顺序消费(单线程) consumer.setConsumeThreadMax(1); // 批量消费 consumer.setConsumeMessageBatchMaxSize(10); // 批量大小 consumer.setPullBatchSize(32); // 拉取批量 // 限流 consumer.setPullThresholdForQueue(1000); // 队列级别流控 consumer.setPullThresholdForTopic(5000); // Topic级别流控 } } 八、常见问题 8.1 消息重复消费 // 问题:同一条消息被消费多次 // 原因: // 1. 消费超时导致重试 // 2. Rebalance 导致重新分配 // 3. 消费者异常退出 // 解决方案:幂等处理 class IdempotentConsumer { private Set<String> processedMsgIds = new ConcurrentHashSet<>(); public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs) { for (MessageExt msg : msgs) { String msgId = msg.getMsgId(); // 幂等检查 if (!processedMsgIds.add(msgId)) { // 已处理过,跳过 continue; } // 处理消息 processMessage(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } 8.2 消费不均衡 // 问题:某些 Consumer 消费很多,某些很少 // 原因: // 1. Queue 数量 < Consumer 数量 // 2. 消息分布不均 // 3. Consumer 处理能力不同 // 解决方案: class BalanceSolution { void solution() { // 1. 调整 Queue 数量 // Queue数量 >= Consumer数量 // 2. 使用合适的分配策略 consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueConsistentHash() ); // 3. 监控并动态调整 // 监控每个 Consumer 的消费TPS // 动态调整线程池大小 } } 九、总结 通过本篇,我们深入理解了: ...

2025-11-13 · maneng

RocketMQ入门06:核心概念详解(下)- Topic、Queue、Tag、Key

引言:消息的"地址系统" 如果把 RocketMQ 比作快递系统,那么: Topic 是省份(大的分类) Queue 是城市(负载均衡单元) Tag 是街道(消息过滤) Key 是门牌号(精确查找) 理解这套"地址系统",才能高效地组织和管理消息。 一、Topic(主题)- 消息的逻辑分类 1.1 Topic 是什么? // Topic 是消息的一级分类,是逻辑概念 public class TopicConcept { // Topic 的本质:消息类型的逻辑分组 String ORDER_TOPIC = "ORDER_TOPIC"; // 订单消息 String PAYMENT_TOPIC = "PAYMENT_TOPIC"; // 支付消息 String LOGISTICS_TOPIC = "LOGISTICS_TOPIC"; // 物流消息 // 一个 Topic 包含多个 Queue class TopicStructure { String topicName; int readQueueNums = 4; // 读队列数量 int writeQueueNums = 4; // 写队列数量 int perm = 6; // 权限:2-写 4-读 6-读写 // Topic 在多个 Broker 上的分布 Map<String/* brokerName */, List<MessageQueue>> brokerQueues; } } 1.2 Topic 的设计原则 public class TopicDesignPrinciples { // 原则1:按业务领域划分 class BusinessDomain { // ✅ 推荐:清晰的业务边界 String TRADE_TOPIC = "TRADE_TOPIC"; String USER_TOPIC = "USER_TOPIC"; String INVENTORY_TOPIC = "INVENTORY_TOPIC"; // ❌ 不推荐:过于宽泛 String ALL_MESSAGE_TOPIC = "ALL_MESSAGE_TOPIC"; } // 原则2:考虑消息特性 class MessageCharacteristics { // 按消息量级区分 String HIGH_TPS_TOPIC = "LOG_TOPIC"; // 高吞吐 String LOW_TPS_TOPIC = "ALERT_TOPIC"; // 低吞吐 // 按消息大小区分 String BIG_MSG_TOPIC = "FILE_TOPIC"; // 大消息 String SMALL_MSG_TOPIC = "EVENT_TOPIC"; // 小消息 // 按重要程度区分 String CRITICAL_TOPIC = "PAYMENT_TOPIC"; // 核心业务 String NORMAL_TOPIC = "STAT_TOPIC"; // 一般业务 } // 原则3:隔离不同优先级 class PriorityIsolation { // 不同优先级使用不同 Topic String ORDER_HIGH_PRIORITY = "ORDER_HIGH_TOPIC"; String ORDER_NORMAL_PRIORITY = "ORDER_NORMAL_TOPIC"; String ORDER_LOW_PRIORITY = "ORDER_LOW_TOPIC"; } } 1.3 Topic 创建和管理 # 自动创建(不推荐生产使用) autoCreateTopicEnable=true defaultTopicQueueNums=4 # 手动创建(推荐) sh mqadmin updateTopic \ -n localhost:9876 \ -t ORDER_TOPIC \ -c DefaultCluster \ -r 8 \ # 读队列数量 -w 8 \ # 写队列数量 -p 6 # 权限 # 查看 Topic 列表 sh mqadmin topicList -n localhost:9876 # 查看 Topic 详情 sh mqadmin topicStatus -n localhost:9876 -t ORDER_TOPIC 二、Queue(队列)- 负载均衡的基本单元 2.1 Queue 的本质 public class QueueConcept { // Queue 是 Topic 的物理分片 class MessageQueue { private String topic; // 所属 Topic private String brokerName; // 所在 Broker private int queueId; // 队列 ID @Override public String toString() { // ORDER_TOPIC@broker-a@0 return topic + "@" + brokerName + "@" + queueId; } } // Topic 与 Queue 的关系 class TopicQueueRelation { // 1个 Topic = N 个 Queue // 例如:ORDER_TOPIC 有 8 个 Queue // broker-a: queue0, queue1, queue2, queue3 // broker-b: queue0, queue1, queue2, queue3 Map<String, Integer> distribution = new HashMap<>(); { distribution.put("broker-a", 4); distribution.put("broker-b", 4); // Total: 8 Queues } } } 2.2 Queue 的负载均衡 public class QueueLoadBalance { // 生产者端:选择 Queue 发送 public MessageQueue selectQueue() { List<MessageQueue> queues = getQueues("ORDER_TOPIC"); // 策略1:轮询(默认) int index = sendWhichQueue.incrementAndGet() % queues.size(); return queues.get(index); // 策略2:根据业务 Key 路由 String orderId = "12345"; int index = orderId.hashCode() % queues.size(); return queues.get(index); // 策略3:最小延迟 return selectQueueByLatency(queues); } // 消费者端:分配 Queue 消费 public List<MessageQueue> allocateQueues() { // 假设:8个 Queue,3个 Consumer // Consumer0: queue0, queue1, queue2 // Consumer1: queue3, queue4, queue5 // Consumer2: queue6, queue7 return AllocateStrategy.allocate( allQueues, currentConsumerId, allConsumerIds ); } } 2.3 Queue 数量设计 public class QueueNumberDesign { // Queue 数量考虑因素 class Factors { // 1. 并发度 // Queue 数量 = 预期 TPS / 单 Queue TPS // 例如:10万 TPS / 1万 TPS = 10个 Queue // 2. Consumer 数量 // Queue 数量 >= Consumer 数量 // 否则有 Consumer 闲置 // 3. Broker 数量 // Queue 数量 = Broker 数量 × 单 Broker Queue 数 // 例如:2个 Broker × 4 = 8个 Queue } // 动态调整 Queue 数量 public void adjustQueueNumber() { // 增加 Queue(在线扩容) updateTopic("ORDER_TOPIC", readQueues=16, writeQueues=16); // 减少 Queue(需谨慎) // 1. 先减少写队列 updateTopic("ORDER_TOPIC", readQueues=16, writeQueues=8); // 2. 等待消费完成 waitForConsume(); // 3. 再减少读队列 updateTopic("ORDER_TOPIC", readQueues=8, writeQueues=8); } } 三、Tag(标签)- 消息的二级分类 3.1 Tag 的作用 public class TagUsage { // Tag 用于同一 Topic 下的消息细分 public void sendWithTag() { // 订单 Topic 下的不同消息类型 Message createOrder = new Message("ORDER_TOPIC", "CREATE", body); Message payOrder = new Message("ORDER_TOPIC", "PAY", body); Message cancelOrder = new Message("ORDER_TOPIC", "CANCEL", body); Message refundOrder = new Message("ORDER_TOPIC", "REFUND", body); } // Tag vs Topic 的选择 class TagVsTopic { // 使用 Tag 的场景: // 1. 消息类型相关性强 // 2. 需要统一管理 // 3. 消费者可能订阅多种类型 // ✅ 推荐:相关业务用 Tag 区分 void goodPractice() { // 订单的不同状态用 Tag new Message("ORDER_TOPIC", "CREATED", body); new Message("ORDER_TOPIC", "PAID", body); new Message("ORDER_TOPIC", "SHIPPED", body); } // ❌ 不推荐:无关业务用同一 Topic void badPractice() { // 订单和用户混在一起 new Message("BUSINESS_TOPIC", "ORDER_CREATE", body); new Message("BUSINESS_TOPIC", "USER_REGISTER", body); } } } 3.2 Tag 过滤机制 public class TagFiltering { // 消费者订阅 public void subscribeWithTag() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 订阅所有 Tag consumer.subscribe("ORDER_TOPIC", "*"); // 订阅单个 Tag consumer.subscribe("ORDER_TOPIC", "CREATE"); // 订阅多个 Tag(|| 分隔) consumer.subscribe("ORDER_TOPIC", "CREATE || PAY || CANCEL"); // SQL92 表达式(需要 Broker 开启) consumer.subscribe("ORDER_TOPIC", MessageSelector.bySql("amount > 1000 AND type = 'GOLD'")); } // Tag 过滤原理 class FilterPrinciple { // 1. Broker 端过滤(粗过滤) // 根据 Tag HashCode 快速过滤 // 在 ConsumeQueue 中存储了 Tag HashCode // 2. Consumer 端过滤(精过滤) // 再次验证 Tag 字符串 // 防止 Hash 冲突导致的误判 public boolean filterMessage(MessageExt msg, String subscribeTags) { // Broker 端:HashCode 匹配 long tagCode = msg.getTagsCode(); if (!isMatched(tagCode, subscribeTagCodes)) { return false; } // Consumer 端:字符串匹配 String msgTag = msg.getTags(); return isTagMatched(msgTag, subscribeTags); } } } 3.3 Tag 最佳实践 public class TagBestPractices { // 1. Tag 命名规范 class TagNaming { // 使用大写字母和下划线 String GOOD_TAG = "ORDER_CREATED"; // 避免特殊字符 String BAD_TAG = "order-created@2023"; // 避免 // 保持简短有意义 String CONCISE = "PAY_SUCCESS"; String VERBOSE = "ORDER_PAYMENT_SUCCESS_WITH_DISCOUNT"; // 太长 } // 2. Tag 使用策略 class TagStrategy { // 按业务流程设计 String[] ORDER_FLOW = { "CREATED", // 订单创建 "PAID", // 支付完成 "SHIPPED", // 已发货 "RECEIVED", // 已收货 "COMPLETED" // 已完成 }; // 按事件类型设计 String[] EVENT_TYPE = { "INSERT", // 新增 "UPDATE", // 更新 "DELETE" // 删除 }; } } 四、Key - 消息的业务标识 4.1 Key 的用途 public class KeyUsage { // Key 用于消息的业务标识和查询 public void sendWithKey() { Message msg = new Message("ORDER_TOPIC", "CREATE", body); // 设置业务唯一标识 msg.setKeys("ORDER_12345"); // 设置多个 Key(空格分隔) msg.setKeys("ORDER_12345 USER_67890"); producer.send(msg); } // Key 的主要用途 class KeyPurpose { // 1. 消息查询 void queryByKey() { // 通过 Key 查询消息 QueryResult result = mqadmin.queryMsgByKey("ORDER_TOPIC", "ORDER_12345"); } // 2. 去重判断 void deduplication(String key) { if (processedKeys.contains(key)) { // 已处理,跳过 return; } processMessage(); processedKeys.add(key); } // 3. 业务追踪 void traceOrder(String orderId) { // 追踪订单的所有消息 List<Message> orderMessages = queryAllMessagesByKey(orderId); } } } 4.2 Key 的索引机制 public class KeyIndexing { // IndexFile 结构 class IndexFile { // 通过 Key 建立索引 // 存储格式:Key Hash -> Physical Offset private IndexHeader header; // 文件头 private SlotTable slotTable; // Hash 槽 private IndexLinkedList indexes; // 索引链表 public void putKey(String key, long offset) { int keyHash = hash(key); int slotPos = keyHash % 500_0000; // 500万个槽 // 存储索引 IndexItem item = new IndexItem(); item.setKeyHash(keyHash); item.setPhyOffset(offset); item.setTimeDiff(storeTimestamp - beginTimestamp); // 处理 Hash 冲突(链表) item.setSlotValue(slotTable.get(slotPos)); slotTable.put(slotPos, indexes.add(item)); } } } 4.3 Key 设计建议 public class KeyDesignSuggestions { // 1. Key 的选择 class KeySelection { // ✅ 好的 Key:业务唯一标识 String orderId = "ORDER_2023110901234"; String userId = "USER_123456"; String transactionId = "TXN_9876543210"; // ❌ 不好的 Key:无业务含义 String uuid = UUID.randomUUID().toString(); String timestamp = String.valueOf(System.currentTimeMillis()); } // 2. 复合 Key class CompositeKey { public String generateKey(Order order) { // 组合多个维度 return String.format("%s_%s_%s", order.getOrderId(), order.getUserId(), order.getCreateTime() ); } } // 3. Key 规范 class KeySpecification { // 长度控制 static final int MAX_KEY_LENGTH = 128; // 字符限制 static final String KEY_PATTERN = "^[a-zA-Z0-9_-]+$"; // 避免特殊字符 public String sanitizeKey(String key) { return key.replaceAll("[^a-zA-Z0-9_-]", "_"); } } } 五、消息属性(Properties) 5.1 系统属性 vs 用户属性 public class MessageProperties { // 系统属性(RocketMQ 内部使用) class SystemProperties { String UNIQ_KEY = "__UNIQ_KEY__"; // 消息唯一标识 String MAX_OFFSET = "__MAX_OFFSET__"; // 最大偏移量 String MIN_OFFSET = "__MIN_OFFSET__"; // 最小偏移量 String TRANSACTION_ID = "__TRANSACTION_ID__"; // 事务ID String DELAY_LEVEL = "__DELAY_LEVEL__"; // 延迟级别 } // 用户自定义属性 public void setUserProperties() { Message msg = new Message("TOPIC", "TAG", body); // 添加业务属性 msg.putUserProperty("orderType", "ONLINE"); msg.putUserProperty("paymentMethod", "CREDIT_CARD"); msg.putUserProperty("region", "EAST"); msg.putUserProperty("vipLevel", "GOLD"); // SQL92 过滤时使用 consumer.subscribe("TOPIC", MessageSelector.bySql("vipLevel = 'GOLD' AND region = 'EAST'")); } } 5.2 属性的应用场景 public class PropertiesUseCases { // 1. 消息路由 public void routeByProperty(Message msg) { String region = msg.getUserProperty("region"); // 根据地区路由到不同集群 if ("NORTH".equals(region)) { sendToNorthCluster(msg); } else { sendToSouthCluster(msg); } } // 2. 监控统计 public void monitoring(Message msg) { // 统计不同类型订单 String orderType = msg.getUserProperty("orderType"); metrics.increment("order.count", "type", orderType); } // 3. 灰度发布 public void grayRelease(Message msg) { msg.putUserProperty("version", "2.0"); msg.putUserProperty("gray", "true"); msg.putUserProperty("percentage", "10"); // 10% 流量 } } 六、组合使用示例 6.1 电商订单场景 public class ECommerceExample { public void processOrder(Order order) { // Topic:按业务领域 String topic = "ORDER_TOPIC"; // Tag:按订单状态 String tag = getOrderTag(order.getStatus()); // Key:订单号(用于查询) String key = order.getOrderId(); // 构造消息 Message msg = new Message(topic, tag, key, JSON.toJSONBytes(order)); // 添加属性(用于过滤和统计) msg.putUserProperty("userId", order.getUserId()); msg.putUserProperty("amount", String.valueOf(order.getAmount())); msg.putUserProperty("payMethod", order.getPayMethod()); msg.putUserProperty("orderType", order.getType()); // 发送消息 SendResult result = producer.send(msg); } private String getOrderTag(OrderStatus status) { switch (status) { case CREATED: return "ORDER_CREATED"; case PAID: return "ORDER_PAID"; case SHIPPED: return "ORDER_SHIPPED"; case COMPLETED: return "ORDER_COMPLETED"; case CANCELLED: return "ORDER_CANCELLED"; default: return "ORDER_UNKNOWN"; } } } 6.2 消费者订阅策略 public class ConsumerStrategy { // 场景1:订阅所有订单消息 class AllOrderConsumer { public void subscribe() { consumer.subscribe("ORDER_TOPIC", "*"); } } // 场景2:只处理支付相关 class PaymentConsumer { public void subscribe() { consumer.subscribe("ORDER_TOPIC", "ORDER_PAID || ORDER_REFUND"); } } // 场景3:VIP 订单处理 class VipOrderConsumer { public void subscribe() { consumer.subscribe("ORDER_TOPIC", MessageSelector.bySql("orderType = 'VIP' AND amount > 1000")); } } // 场景4:特定用户订单 class UserOrderConsumer { public void consumeMessage(MessageExt msg) { String userId = msg.getUserProperty("userId"); if ("VIP_USER_123".equals(userId)) { // 特殊处理 VIP 用户订单 handleVipOrder(msg); } else { // 普通处理 handleNormalOrder(msg); } } } } 七、设计原则总结 7.1 层级设计原则 消息层级设计: ┌─────────────────────────────────┐ │ Topic │ <- 业务大类 ├─────────────────────────────────┤ │ Queue1 Queue2 Queue3 │ <- 负载均衡 ├─────────────────────────────────┤ │ Tag1 Tag2 Tag3 │ <- 消息子类 ├─────────────────────────────────┤ │ Key (业务ID) │ <- 消息标识 ├─────────────────────────────────┤ │ Properties (属性) │ <- 扩展信息 └─────────────────────────────────┘ 7.2 最佳实践建议 public class BestPractices { // 1. Topic 设计:粗粒度 // 一个业务领域一个 Topic // 避免创建过多 Topic // 2. Queue 设计:适度 // Queue 数量 = max(预期并发数, Consumer数量) // 一般 8-16 个即可 // 3. Tag 设计:细粒度 // 同一 Topic 下的不同消息类型 // 便于消费者灵活订阅 // 4. Key 设计:唯一性 // 使用业务唯一标识 // 便于消息追踪和查询 // 5. Properties:扩展性 // 存储额外的业务信息 // 支持 SQL92 复杂过滤 } 八、常见问题 8.1 Topic 过多的问题 // 问题:Topic 过多导致的影响 // 1. 路由信息膨胀 // 2. 内存占用增加 // 3. 管理复杂度上升 // 解决方案:合并相关 Topic,用 Tag 区分 // Before: USER_REGISTER_TOPIC, USER_UPDATE_TOPIC, USER_DELETE_TOPIC // After: USER_TOPIC + Tags(REGISTER, UPDATE, DELETE) 8.2 Queue 数量不当 // 问题:Queue 太少 // - Consumer 空闲 // - 并发度不够 // 问题:Queue 太多 // - 管理开销大 // - Rebalance 频繁 // 解决:动态调整 // 监控 Queue 积压和 Consumer 负载 // 根据实际情况调整 Queue 数量 九、总结 理解了 Topic、Queue、Tag、Key 这些核心概念,我们就掌握了 RocketMQ 的消息组织方式: ...

2025-11-13 · maneng

RocketMQ入门05:核心概念详解(上)- Producer、Consumer、Broker

引言:理解核心,掌握全局 如果把 RocketMQ 比作一个物流系统,那么: Producer 是发货方,负责打包发送包裹 Broker 是物流中心,负责存储转运包裹 Consumer 是收货方,负责签收处理包裹 理解这三个核心组件,就掌握了 RocketMQ 的精髓。 一、Producer(生产者)详解 1.1 Producer 架构 // Producer 内部结构 public class ProducerArchitecture { // 核心组件 private MQClientInstance clientInstance; // 客户端实例 private MQClientAPIImpl mqClientAPI; // 通信层 private TopicPublishInfo topicPublishInfo; // 路由信息 private SendMessageHook sendHook; // 发送钩子 private DefaultMQProducerImpl impl; // 核心实现 } Producer 内部架构: ┌──────────────────────────────────┐ │ 应用程序 │ ├──────────────────────────────────┤ │ DefaultMQProducer │ <- API 层 ├──────────────────────────────────┤ │ DefaultMQProducerImpl │ <- 核心实现 ├──────────────────────────────────┤ │ ┌──────────┬─────────────────┐ │ │ │路由管理 │ 消息发送器 │ │ │ ├──────────┼─────────────────┤ │ │ │故障规避 │ 异步发送处理 │ │ │ ├──────────┼─────────────────┤ │ │ │负载均衡 │ 事务消息处理 │ │ │ └──────────┴─────────────────┘ │ ├──────────────────────────────────┤ │ Netty 通信层 │ └──────────────────────────────────┘ 1.2 Producer 生命周期 public class ProducerLifecycle { // 1. 创建阶段 public void create() { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup"); producer.setNamesrvAddr("localhost:9876"); producer.setRetryTimesWhenSendFailed(3); // 同步发送失败重试次数 producer.setRetryTimesWhenSendAsyncFailed(2); // 异步发送失败重试次数 producer.setSendMsgTimeout(3000); // 发送超时时间 } // 2. 启动阶段 public void start() throws MQClientException { producer.start(); // 内部流程: // a. 检查配置 // b. 获取 NameServer 地址 // c. 启动定时任务(路由更新、心跳) // d. 启动消息发送线程池 // e. 注册 Producer 到所有 Broker } // 3. 运行阶段 public void running() throws Exception { // 获取路由信息 TopicRouteData routeData = producer.getDefaultMQProducerImpl() .getmQClientFactory() .getMQClientAPIImpl() .getTopicRouteInfoFromNameServer("TopicTest", 3000); // 选择消息队列 MessageQueue mq = selectMessageQueue(routeData); // 发送消息 SendResult result = producer.send(message, mq); } // 4. 关闭阶段 public void shutdown() { producer.shutdown(); // 内部流程: // a. 注销 Producer // b. 关闭网络连接 // c. 清理资源 // d. 停止定时任务 } } 1.3 消息发送流程 // 消息发送核心流程 public class SendMessageFlow { public SendResult sendMessage(Message msg) { // 1. 验证消息 Validators.checkMessage(msg, producer); // 2. 获取路由信息 TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(msg.getTopic()); // 3. 选择消息队列(负载均衡) MessageQueue mq = selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 4. 消息发送前处理 msg.setBody(compress(msg.getBody())); // 压缩 msg.setProperty("UNIQ_KEY", MessageClientIDSetter.createUniqID()); // 唯一ID // 5. 发送消息 SendResult sendResult = sendKernelImpl( msg, mq, communicationMode, // SYNC, ASYNC, ONEWAY sendCallback, timeout ); // 6. 发送后处理(钩子) if (hasSendMessageHook()) { sendMessageHook.sendMessageAfter(context); } return sendResult; } } 1.4 负载均衡策略 public class ProducerLoadBalance { // 默认策略:轮询 + 故障规避 public MessageQueue selectOneMessageQueue(TopicPublishInfo tpInfo, String lastBrokerName) { // 启用故障规避 if (this.sendLatencyFaultEnable) { // 1. 优先选择延迟小的 Broker for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int index = Math.abs(sendWhichQueue.incrementAndGet()) % queueSize; MessageQueue mq = tpInfo.getMessageQueueList().get(index); // 检查 Broker 是否可用 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // 避免连续发送到同一个 Broker if (null == lastBrokerName || !mq.getBrokerName().equals(lastBrokerName)) { return mq; } } } // 2. 如果都不可用,选择延迟最小的 MessageQueue mq = latencyFaultTolerance.pickOneAtLeast(); return mq; } // 不启用故障规避:简单轮询 return selectOneMessageQueue(); } // 故障延迟统计 class LatencyStats { // 根据发送耗时计算 Broker 不可用时长 // 耗时:50ms, 100ms, 550ms, 1000ms, 2000ms, 3000ms, 15000ms // 规避:0ms, 0ms, 30s, 60s, 120s, 180s, 600s private final long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private final long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; } } 二、Consumer(消费者)详解 2.1 Consumer 架构 Consumer 内部架构: ┌──────────────────────────────────┐ │ 应用程序 │ ├──────────────────────────────────┤ │ DefaultMQPushConsumer │ <- Push 模式 API │ DefaultMQPullConsumer │ <- Pull 模式 API ├──────────────────────────────────┤ │ ┌──────────────────────────┐ │ │ │ 消费者核心实现 │ │ │ ├──────────────────────────┤ │ │ │ Rebalance 负载均衡 │ │ │ │ ProcessQueue 处理队列 │ │ │ │ ConsumeMessageService │ │ │ │ 消费进度管理 │ │ │ └──────────────────────────┘ │ ├──────────────────────────────────┤ │ PullMessage 长轮询服务 │ ├──────────────────────────────────┤ │ Netty 通信层 │ └──────────────────────────────────┘ 2.2 Push vs Pull 模式 // Push 模式(推荐) public class PushConsumerDemo { public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup"); // Push 模式本质:长轮询拉取 // 1. Consumer 主动拉取消息 // 2. 如果没有消息,Broker hold 住请求 // 3. 有新消息时,立即返回 // 4. 看起来像是 Broker 推送 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> messages, ConsumeConcurrentlyContext context) { // 处理消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } } // Pull 模式(特殊场景) public class PullConsumerDemo { public void consume() throws Exception { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ConsumerGroup"); consumer.start(); // 手动拉取消息 Set<MessageQueue> mqs = consumer.fetchMessageQueuesInBalance("TopicTest"); for (MessageQueue mq : mqs) { PullResult pullResult = consumer.pullBlockIfNotFound( mq, // 消息队列 null, // 过滤表达式 getOffset(mq), // 消费位点 32 // 最大拉取数量 ); // 处理消息 for (MessageExt msg : pullResult.getMsgFoundList()) { processMessage(msg); } // 更新消费位点 updateOffset(mq, pullResult.getNextBeginOffset()); } } } 2.3 消费者 Rebalance 机制 public class ConsumerRebalance { // Rebalance 触发条件: // 1. Consumer 数量变化(上线/下线) // 2. Topic 的 Queue 数量变化 // 3. 消费者订阅关系变化 public void rebalanceProcess() { // 1. 获取 Topic 的所有 Queue Set<MessageQueue> mqSet = topicSubscribeInfoTable.get(topic); // 2. 获取消费组的所有 Consumer List<String> cidAll = findConsumerIdList(topic, consumerGroup); // 3. 排序(保证所有 Consumer 看到一致的视图) Collections.sort(mqAll); Collections.sort(cidAll); // 4. 分配策略 AllocateMessageQueueStrategy strategy = getAllocateMessageQueueStrategy(); List<MessageQueue> allocateResult = strategy.allocate( consumerGroup, currentCID, mqAll, cidAll ); // 5. 更新本地队列 updateProcessQueueTableInRebalance(topic, allocateResult); } // 分配策略示例:平均分配 public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { public List<MessageQueue> allocate( String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); // 计算分配数量 int averageSize = (mod > 0 && index < mod) ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size(); // 计算起始位置 int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 分配队列 return mqAll.subList(startIndex, Math.min(startIndex + averageSize, mqAll.size())); } } } 三、Broker 详解 3.1 Broker 架构 Broker 完整架构: ┌────────────────────────────────────────┐ │ 客户端请求 │ ├────────────────────────────────────────┤ │ Remoting 通信层 │ ├────────────────────────────────────────┤ │ ┌──────────────┬─────────────────┐ │ │ │ Processor │ 业务处理器 │ │ │ ├──────────────┼─────────────────┤ │ │ │SendMessage │ 接收消息 │ │ │ │PullMessage │ 拉取消息 │ │ │ │QueryMessage │ 查询消息 │ │ │ │AdminRequest │ 管理请求 │ │ │ └──────────────┴─────────────────┘ │ ├────────────────────────────────────────┤ │ Store 存储层 │ │ ┌──────────────────────────────┐ │ │ │ CommitLog(消息存储) │ │ │ ├──────────────────────────────┤ │ │ │ ConsumeQueue(消费索引) │ │ │ ├──────────────────────────────┤ │ │ │ IndexFile(消息索引) │ │ │ └──────────────────────────────┘ │ ├────────────────────────────────────────┤ │ HA 高可用模块 │ ├────────────────────────────────────────┤ │ Schedule 定时任务 │ └────────────────────────────────────────┘ 3.2 Broker 消息存储 public class BrokerStorage { // 1. CommitLog:消息主体存储 class CommitLog { // 所有消息顺序写入 // 单个文件默认 1GB // 文件名为起始偏移量 private final MappedFileQueue mappedFileQueue; public PutMessageResult putMessage(MessageExtBrokerInner msg) { // 获取当前写入文件 MappedFile mappedFile = mappedFileQueue.getLastMappedFile(); // 写入消息 AppendMessageResult result = mappedFile.appendMessage(msg, this.appendMessageCallback); // 刷盘策略 handleDiskFlush(result, msg); // 主从同步 handleHA(result, msg); return new PutMessageResult(PutMessageStatus.PUT_OK, result); } } // 2. ConsumeQueue:消费队列 class ConsumeQueue { // 存储格式:8字节 CommitLog 偏移量 + 4字节消息大小 + 8字节 Tag HashCode // 单个文件 30W 条记录,约 5.72MB private final MappedFileQueue mappedFileQueue; public void putMessagePositionInfo( long offset, // CommitLog 偏移量 int size, // 消息大小 long tagsCode, // Tag HashCode long cqOffset) { // ConsumeQueue 偏移量 // 写入索引 this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode); // 更新最大偏移量 this.maxPhysicOffset = offset + size; } } // 3. IndexFile:消息索引 class IndexFile { // 支持按 Key 或时间区间查询 // 存储格式:Header(40B) + Slot(500W*4B) + Index(2000W*20B) // 单个文件约 400MB public void putKey(String key, long phyOffset, long storeTimestamp) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; // 存储索引 // ... } } } 3.3 Broker 刷盘机制 public class FlushDiskStrategy { // 同步刷盘 class SyncFlush { public void flush() { // 写入 PageCache 后立即刷盘 // 优点:数据可靠性高 // 缺点:性能差 if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { GroupCommitService service = (GroupCommitService) this.flushCommitLogService; // 等待刷盘完成 boolean flushResult = service.waitForFlush(timeout); if (!flushResult) { log.error("Sync flush timeout"); throw new RuntimeException("Flush timeout"); } } } } // 异步刷盘(默认) class AsyncFlush { public void flush() { // 写入 PageCache 后立即返回 // 后台线程定时刷盘 // 优点:性能高 // 缺点:可能丢失数据(机器宕机) // 定时刷盘(默认 500ms) this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { // 刷盘条件: // 1. 距上次刷盘超过 10s // 2. 脏页超过 4 页 // 3. 收到关闭信号 if (System.currentTimeMillis() - lastFlushTime > 10000 || dirtyPages > 4) { mappedFile.flush(0); lastFlushTime = System.currentTimeMillis(); dirtyPages = 0; } } }, 500, 500, TimeUnit.MILLISECONDS); } } } 3.4 Broker 主从同步 public class BrokerHA { // Master Broker class HAService { // 接受 Slave 连接 private AcceptSocketService acceptSocketService; // 管理多个 Slave 连接 private List<HAConnection> connectionList; public void start() { this.acceptSocketService.start(); this.groupTransferService.start(); this.haClient.start(); } // 等待 Slave 同步 public boolean waitForSlaveSync(long masterPutWhere) { // 同步复制模式 if (messageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SYNC_MASTER) { // 等待至少一个 Slave 同步完成 for (HAConnection conn : connectionList) { if (conn.getSlaveAckOffset() >= masterPutWhere) { return true; } } return false; } // 异步复制模式:直接返回 return true; } } // Slave Broker class HAClient { private SocketChannel socketChannel; public void run() { while (!this.isStopped()) { try { // 连接 Master connectMaster(); // 上报同步进度 reportSlaveMaxOffset(); // 接收数据 boolean ok = processReadEvent(); // 处理数据 dispatchReadRequest(); } catch (Exception e) { log.error("HAClient error", e); } } } } } 四、组件交互流程 4.1 完整的消息流转 消息完整流转过程: ┌─────────────┐ │ Producer │ └──────┬──────┘ │ 1. Send Message ↓ ┌─────────────┐ │ NameServer │ <- 2. Get Route Info └──────┬──────┘ │ 3. Route Info ↓ ┌─────────────┐ │ Broker │ │ ┌─────────┐ │ │ │CommitLog│ │ <- 4. Store Message │ └─────────┘ │ │ ┌─────────┐ │ │ │ConsumeQ │ │ <- 5. Build Index │ └─────────┘ │ └──────┬──────┘ │ 6. Pull Message ↓ ┌─────────────┐ │ Consumer │ └─────────────┘ 4.2 心跳机制 public class HeartbeatMechanism { // Producer/Consumer → Broker class ClientHeartbeat { // 每 30 秒发送一次心跳 private final long heartbeatInterval = 30000; public void sendHeartbeatToAllBroker() { HeartbeatData heartbeatData = prepareHeartbeatData(); for (Entry<String, HashMap<Long, String>> entry : brokerAddrTable.entrySet()) { String brokerName = entry.getKey(); for (Entry<Long, String> innerEntry : entry.getValue().entrySet()) { String addr = innerEntry.getValue(); // 发送心跳 this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000); } } } } // Broker → NameServer class BrokerHeartbeat { // 每 30 秒注册一次 private final long registerInterval = 30000; public void registerBrokerAll() { RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); // 注册到所有 NameServer for (String namesrvAddr : namesrvAddrList) { registerBroker(namesrvAddr, requestBody); } } } } 五、配置优化建议 5.1 Producer 优化 // 性能优化配置 producer.setSendMsgTimeout(3000); // 发送超时 producer.setCompressMsgBodyOverHowmuch(4096); // 消息压缩阈值 producer.setMaxMessageSize(4 * 1024 * 1024); // 最大消息大小 producer.setRetryTimesWhenSendFailed(2); // 重试次数 producer.setSendLatencyFaultEnable(true); // 故障规避 5.2 Consumer 优化 // 消费优化配置 consumer.setConsumeThreadMin(20); // 最小消费线程数 consumer.setConsumeThreadMax(64); // 最大消费线程数 consumer.setConsumeMessageBatchMaxSize(1); // 批量消费数量 consumer.setPullBatchSize(32); // 批量拉取数量 consumer.setPullInterval(0); // 拉取间隔 consumer.setConsumeConcurrentlyMaxSpan(2000); // 消费进度延迟阈值 5.3 Broker 优化 # broker.conf 优化配置 # 刷盘策略 flushDiskType=ASYNC_FLUSH # 同步刷盘超时时间 syncFlushTimeout=5000 # 消息最大大小 maxMessageSize=65536 # 发送线程池大小 sendMessageThreadPoolNums=128 # 拉取线程池大小 pullMessageThreadPoolNums=128 # Broker 角色 brokerRole=ASYNC_MASTER 六、总结 通过本篇,我们深入理解了: ...

2025-11-13 · maneng

RocketMQ入门04:快速上手 - Docker一键部署与第一条消息

引言:5分钟上手 RocketMQ 不需要复杂的环境配置,不需要编译源码,通过 Docker,我们可以在 5 分钟内搭建一个完整的 RocketMQ 环境,并成功发送第一条消息。 让我们开始这段激动人心的旅程! 一、环境准备 1.1 前置要求 # 检查 Docker 版本(需要 20.10+) docker --version # Docker version 24.0.5, build ced0996 # 检查 Docker Compose 版本(需要 2.0+) docker-compose --version # Docker Compose version v2.20.2 # 检查系统资源 # 建议:4GB+ 内存,10GB+ 磁盘空间 docker system info | grep -E "Memory|Storage" 1.2 创建项目目录 # 创建项目目录 mkdir -p ~/rocketmq-demo cd ~/rocketmq-demo # 创建必要的子目录 mkdir -p data/namesrv/logs data/broker/logs data/broker/store mkdir -p conf 二、Docker Compose 部署 2.1 编写 docker-compose.yml # docker-compose.yml version: '3.8' services: # NameServer 服务 namesrv: image: apache/rocketmq:5.1.3 container_name: rocketmq-namesrv ports: - "9876:9876" environment: JAVA_OPT: "-Duser.home=/opt" JAVA_OPT_EXT: "-Xms512M -Xmx512M -Xmn128m" volumes: - ./data/namesrv/logs:/opt/logs command: sh mqnamesrv networks: - rocketmq-net # Broker 服务 broker: image: apache/rocketmq:5.1.3 container_name: rocketmq-broker ports: - "10909:10909" - "10911:10911" - "10912:10912" environment: JAVA_OPT_EXT: "-Xms1G -Xmx1G -Xmn512m" volumes: - ./data/broker/logs:/opt/logs - ./data/broker/store:/opt/store - ./conf/broker.conf:/opt/conf/broker.conf command: sh mqbroker -n namesrv:9876 -c /opt/conf/broker.conf depends_on: - namesrv networks: - rocketmq-net # 控制台服务 dashboard: image: apacherocketmq/rocketmq-dashboard:latest container_name: rocketmq-dashboard ports: - "8080:8080" environment: JAVA_OPTS: "-Drocketmq.namesrv.addr=namesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" depends_on: - namesrv - broker networks: - rocketmq-net networks: rocketmq-net: driver: bridge 2.2 配置 Broker # conf/broker.conf cat > conf/broker.conf << 'EOF' # 集群名称 brokerClusterName = DefaultCluster # broker 名称 brokerName = broker-a # 0 表示 Master,>0 表示 Slave brokerId = 0 # 删除文件时间点,默认凌晨 4 点 deleteWhen = 04 # 文件保留时间,默认 48 小时 fileReservedTime = 48 # Broker 的角色 brokerRole = ASYNC_MASTER # 刷盘方式 flushDiskType = ASYNC_FLUSH # 存储路径 storePathRootDir = /opt/store # 队列存储路径 storePathCommitLog = /opt/store/commitlog # 自动创建 Topic autoCreateTopicEnable = true # 自动创建订阅组 autoCreateSubscriptionGroup = true # Broker 监听端口 listenPort = 10911 # 是否允许 Broker 自动创建 Topic brokerIP1 = broker EOF 2.3 启动服务 # 启动所有服务 docker-compose up -d # 查看服务状态 docker-compose ps # 输出示例: # NAME IMAGE STATUS # rocketmq-namesrv apache/rocketmq:5.1.3 Up 30 seconds # rocketmq-broker apache/rocketmq:5.1.3 Up 20 seconds # rocketmq-dashboard apacherocketmq/rocketmq-dashboard:latest Up 10 seconds # 查看日志 docker-compose logs -f broker 三、发送第一条消息 3.1 创建 Java 项目 <!-- pom.xml --> <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>rocketmq-demo</artifactId> <version>1.0.0</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- RocketMQ 客户端 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client-java</artifactId> <version>5.0.5</version> </dependency> <!-- 日志 --> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.11</version> </dependency> </dependencies> </project> 3.2 编写生产者代码 // QuickProducer.java package com.example.rocketmq; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.common.RemotingHelper; public class QuickProducer { public static void main(String[] args) throws Exception { // 1. 创建生产者,指定生产者组名 DefaultMQProducer producer = new DefaultMQProducer("quick_producer_group"); // 2. 指定 NameServer 地址 producer.setNamesrvAddr("localhost:9876"); // 3. 启动生产者 producer.start(); System.out.println("Producer Started."); // 4. 发送消息 for (int i = 0; i < 10; i++) { // 创建消息对象 Message msg = new Message( "QuickStartTopic", // Topic "TagA", // Tag ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); // 发送消息 SendResult sendResult = producer.send(msg); // 打印发送结果 System.out.printf("%s%n", sendResult); Thread.sleep(1000); } // 5. 关闭生产者 producer.shutdown(); System.out.println("Producer Shutdown."); } } 3.3 编写消费者代码 // QuickConsumer.java package com.example.rocketmq; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import java.util.List; public class QuickConsumer { public static void main(String[] args) throws Exception { // 1. 创建消费者,指定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quick_consumer_group"); // 2. 指定 NameServer 地址 consumer.setNamesrvAddr("localhost:9876"); // 3. 订阅 Topic 和 Tag consumer.subscribe("QuickStartTopic", "*"); // 4. 设置消费模式(默认集群模式) consumer.setMessageModel(MessageModel.CLUSTERING); // 5. 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> messages, ConsumeConcurrentlyContext context) { for (MessageExt message : messages) { System.out.printf("Consume message: %s%n", new String(message.getBody())); } // 返回消费状态:成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 6. 启动消费者 consumer.start(); System.out.println("Consumer Started."); // 保持运行 Thread.sleep(Long.MAX_VALUE); } } 四、运行测试 4.1 运行步骤 # 1. 先启动消费者(新终端) mvn compile exec:java -Dexec.mainClass="com.example.rocketmq.QuickConsumer" # 输出: # Consumer Started. # 等待消息... # 2. 启动生产者(新终端) mvn compile exec:java -Dexec.mainClass="com.example.rocketmq.QuickProducer" # 输出: # Producer Started. # SendResult [sendStatus=SEND_OK, msgId=7F00000100002A9F0000000000000000, ... # SendResult [sendStatus=SEND_OK, msgId=7F00000100002A9F00000000000000B8, ... # ... # Producer Shutdown. # 3. 查看消费者输出 # Consume message: Hello RocketMQ 0 # Consume message: Hello RocketMQ 1 # ... 4.2 使用控制台查看 打开浏览器访问:http://localhost:8080 ...

2025-11-13 · maneng

RocketMQ入门03:RocketMQ初识 - 阿里开源消息中间件的前世今生

引言:一个技术选型的故事 2010年,淘宝的技术团队面临一个棘手的问题: 双11大促期间,订单量瞬间飙升100倍,原有的 ActiveMQ 集群直接崩溃。几百万用户的订单消息堆积,下游系统无法处理,整个交易链路几近瘫痪。 这不是简单的扩容能解决的问题。他们需要一个全新的消息中间件,一个真正理解电商业务、能扛住双11洪峰的消息系统。 这就是 RocketMQ 诞生的开始。 一、RocketMQ 的前世:从 Notify 到 MetaQ 1.1 第一代:Notify(2007-2010) 背景问题: - 淘宝业务快速增长,系统解耦需求迫切 - 使用 ActiveMQ,遇到性能瓶颈 - 运维成本高,稳定性不足 Notify 1.0 特点: - 基于 ActiveMQ 5.1 改造 - 增加了消息存储优化 - 简单的集群支持 - 问题:Java 性能瓶颈,10K 消息就卡顿 1.2 第二代:MetaQ(2010-2012) // MetaQ 的设计转变 // 从 JMS 规范 → 自定义协议 // 从 随机读写 → 顺序写入 public class MetaQDesign { // 核心创新:完全顺序写入 CommitLog commitLog; // 顺序写入 ConsumeQueue consumeQueue; // 消费索引 // 借鉴 Kafka 的设计 // 但针对电商场景优化 - 支持事务消息 - 支持定时消息 - 支持消息查询 } MetaQ 的突破: ...

2025-11-13 · maneng

RocketMQ入门02:消息队列的本质与演进

引言:从一个简单的队列开始 在计算机科学中,队列(Queue)是最基础的数据结构之一。它遵循 FIFO(First In First Out)原则,就像排队买票一样,先来的先处理。 // 最简单的队列实现 public class SimpleQueue<T> { private LinkedList<T> items = new LinkedList<>(); public void enqueue(T item) { items.addLast(item); // 入队 } public T dequeue() { return items.removeFirst(); // 出队 } } 这个简单的数据结构,竟然是现代消息队列的起源。让我们看看它是如何一步步演进的。 一、演进阶段1:进程内队列 1.1 最初的需求 在单进程程序中,当我们需要解耦生产者和消费者时,最简单的方式就是使用内存队列: // 生产者-消费者模式 public class InMemoryMessageQueue { private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>(); // 生产者线程 class Producer extends Thread { public void run() { while (true) { Message msg = generateMessage(); queue.put(msg); // 阻塞式入队 } } } // 消费者线程 class Consumer extends Thread { public void run() { while (true) { Message msg = queue.take(); // 阻塞式出队 processMessage(msg); } } } } 特点: ...

2025-11-13 · maneng

RocketMQ入门01:为什么需要消息队列

引言:从一个电商下单场景说起 想象这样一个场景:用户在你的电商平台下单购买了一件商品。看似简单的一次点击,背后却触发了一系列复杂的业务流程: 扣减库存 生成订单 扣减用户积分 发送短信通知 发送邮件通知 增加商家销量统计 记录用户行为日志 触发推荐系统更新 如果采用传统的同步调用方式,会是什么样子? 一、同步通信的痛点 1.1 性能瓶颈 // 传统同步调用方式 public OrderResult createOrder(OrderRequest request) { // 核心业务:100ms Order order = orderService.create(request); // 100ms inventoryService.deduct(request.getSkuId()); // 50ms // 非核心业务:累计 500ms+ pointService.deduct(request.getUserId()); // 100ms smsService.send(request.getPhone()); // 200ms emailService.send(request.getEmail()); // 150ms statisticsService.record(order); // 50ms logService.log(order); // 30ms recommendService.update(request.getUserId()); // 100ms return new OrderResult(order); } // 总耗时:730ms+ 问题分析: 用户需要等待 730ms+ 才能看到下单结果 其中只有前 150ms 是核心业务 580ms 都在等待非核心业务完成 1.2 高耦合问题 // 每增加一个下游系统,都要修改订单服务代码 public OrderResult createOrder(OrderRequest request) { // ... 原有代码 // 新需求:增加优惠券系统通知 couponService.notify(order); // 又要改订单服务! // 新需求:增加大数据分析 bigDataService.collect(order); // 又要改订单服务! } 问题分析: ...

2025-11-13 · maneng

消息中间件第一性原理:为什么需要异步通信?

为什么一个订单创建需要900ms?为什么下游服务故障会导致订单无法创建?为什么秒杀活动会让系统崩溃? 本文从一个真实的电商订单场景出发,用第一性原理思维深度拆解:为什么我们需要消息中间件? 一、引子:订单处理的两种实现 让我们从一个最常见的业务场景开始:用户下单。 场景背景 一个典型的电商订单创建流程包含以下步骤: 创建订单:保存订单到数据库 扣减库存:调用库存服务扣减商品库存 增加积分:调用积分服务为用户增加积分 发送通知:调用通知服务发送订单确认短信/邮件 创建物流:调用物流服务创建配送单 看起来很简单,但在实际生产环境中,这个流程会遇到很多挑战。让我们对比两种实现方式。 1.1 场景A:同步实现(直接调用) 这是大多数初学者的第一反应:依次调用所有服务。 /** * 订单服务 - 同步实现 * 问题:串行等待,响应时间长,系统强耦合 */ @Service @Slf4j public class OrderServiceSync { @Autowired private OrderRepository orderRepository; @Autowired private InventoryServiceClient inventoryService; @Autowired private PointsServiceClient pointsService; @Autowired private NotificationServiceClient notificationService; @Autowired private LogisticsServiceClient logisticsService; /** * 创建订单 - 同步方式 * 总耗时:900ms */ @Transactional public Order createOrder(OrderRequest request) { long startTime = System.currentTimeMillis(); try { // 1. 创建订单(核心业务逻辑,50ms) log.info("开始创建订单,用户ID: {}", request.getUserId()); Order order = buildOrder(request); orderRepository.save(order); log.info("订单创建成功,订单号: {}", order.getOrderNo()); // 2. 同步调用库存服务(网络调用,200ms) log.info("开始扣减库存"); long inventoryStart = System.currentTimeMillis(); try { InventoryDeductRequest inventoryRequest = InventoryDeductRequest.builder() .productId(request.getProductId()) .quantity(request.getQuantity()) .orderId(order.getId()) .build(); inventoryService.deduct(inventoryRequest); log.info("库存扣减成功,耗时: {}ms", System.currentTimeMillis() - inventoryStart); } catch (FeignException e) { log.error("库存扣减失败", e); throw new BusinessException("库存不足或服务不可用"); } // 3. 同步调用积分服务(网络调用,150ms) log.info("开始增加积分"); long pointsStart = System.currentTimeMillis(); try { PointsAddRequest pointsRequest = PointsAddRequest.builder() .userId(request.getUserId()) .points((int) (order.getAmount() / 10)) // 消费10元得1积分 .orderId(order.getId()) .build(); pointsService.add(pointsRequest); log.info("积分增加成功,耗时: {}ms", System.currentTimeMillis() - pointsStart); } catch (FeignException e) { // 积分服务失败不影响订单创建,只记录日志 // 但用户还是要等待150ms log.error("积分增加失败,将稍后重试", e); } // 4. 同步调用通知服务(网络调用 + 第三方API,300ms) log.info("开始发送通知"); long notificationStart = System.currentTimeMillis(); try { NotificationRequest notificationRequest = NotificationRequest.builder() .userId(request.getUserId()) .type(NotificationType.ORDER_CREATED) .content("您的订单" + order.getOrderNo() + "已创建成功") .build(); notificationService.send(notificationRequest); log.info("通知发送成功,耗时: {}ms", System.currentTimeMillis() - notificationStart); } catch (FeignException e) { // 通知失败不影响订单创建,但用户还是要等待300ms log.error("通知发送失败,将稍后重试", e); } // 5. 同步调用物流服务(网络调用,250ms) log.info("开始创建物流单"); long logisticsStart = System.currentTimeMillis(); try { LogisticsCreateRequest logisticsRequest = LogisticsCreateRequest.builder() .orderId(order.getId()) .address(request.getAddress()) .build(); logisticsService.create(logisticsRequest); log.info("物流单创建成功,耗时: {}ms", System.currentTimeMillis() - logisticsStart); } catch (FeignException e) { // 物流失败不影响订单创建,但用户还是要等待250ms log.error("物流单创建失败,将稍后重试", e); } // 记录总耗时 long totalTime = System.currentTimeMillis() - startTime; log.info("订单创建完成,总耗时: {}ms", totalTime); return order; } catch (Exception e) { log.error("订单创建失败", e); throw e; } } private Order buildOrder(OrderRequest request) { Order order = new Order(); order.setOrderNo(generateOrderNo()); order.setUserId(request.getUserId()); order.setProductId(request.getProductId()); order.setQuantity(request.getQuantity()); order.setAmount(calculateAmount(request)); order.setStatus(OrderStatus.PENDING_PAYMENT); order.setCreateTime(LocalDateTime.now()); return order; } private String generateOrderNo() { return "ORD" + System.currentTimeMillis(); } private BigDecimal calculateAmount(OrderRequest request) { // 简化处理,实际应该查询商品价格 return new BigDecimal("100.00").multiply(new BigDecimal(request.getQuantity())); } } 日志输出示例: ...

2025-11-03 · maneng

RocketMQ实战指南:阿里巴巴的分布式消息方案

为什么RocketMQ能支撑双11万亿级消息?如何使用事务消息实现分布式事务?如何保证消息的全局顺序? 本文深度剖析RocketMQ的核心特性和实战应用。 一、RocketMQ核心架构 1.1 核心组件 Producer → NameServer → Broker → Consumer ↓ ↓ 路由信息 消息存储 与Kafka的区别: 组件 RocketMQ Kafka 注册中心 NameServer(自研,轻量级) ZooKeeper / KRaft 存储结构 CommitLog + ConsumeQueue Partition日志文件 消息模型 点对点 + 发布订阅 发布订阅 特殊功能 事务消息、延迟消息、顺序消息 流式计算、消息回溯 1.2 存储架构 RocketMQ的三层存储: 1. CommitLog(所有消息) ├─ Message 1(Topic A) ├─ Message 2(Topic B) ├─ Message 3(Topic A) ... 2. ConsumeQueue(消息索引) Topic A ├─ Queue 0 │ ├─ Offset 0 → CommitLog位置100 │ ├─ Offset 1 → CommitLog位置300 │ ... └─ Queue 1 ... 3. IndexFile(消息检索) Key → CommitLog位置 优势: ...

2025-11-03 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...