RocketMQ入门08:消费模式剖析 - Push vs Pull的权衡

引言:一个有趣的悖论 当你使用 RocketMQ 的 Push 模式时,你可能会惊讶地发现: API 名称是 DefaultMQPushConsumer 但实际上是 Consumer 在主动拉取消息 官方文档称之为"Push 模式" 但本质上是"长轮询 Pull" 这不是设计缺陷,而是精心权衡的结果。让我们揭开这个"伪 Push"的神秘面纱。 一、Push vs Pull 的本质区别 1.1 理论上的 Push 和 Pull public class PushVsPullTheory { // 真正的 Push 模式 class TruePushMode { // Broker 主动推送消息给 Consumer // 特点: // 1. Broker 维护 Consumer 列表 // 2. 有新消息立即推送 // 3. Broker 控制推送速率 // 4. Consumer 被动接收 void brokerPush() { // Broker 端代码 for (Consumer consumer : consumers) { consumer.receive(message); // 主动调用 } } } // 真正的 Pull 模式 class TruePullMode { // Consumer 主动从 Broker 拉取消息 // 特点: // 1. Consumer 控制拉取时机 // 2. Consumer 控制拉取速率 // 3. Broker 被动响应 // 4. 可能存在消息延迟 void consumerPull() { // Consumer 端代码 while (running) { Message msg = broker.pull(); // 主动拉取 process(msg); } } } } 1.2 两种模式的优劣对比 Push 模式的优劣: ┌─────────────────┬──────────────────────────────┐ │ 优点 │ 缺点 │ ├─────────────────┼──────────────────────────────┤ │ 实时性高 │ Broker 需要维护推送状态 │ │ 消息延迟小 │ 难以处理消费速率差异 │ │ 使用简单 │ 可能造成 Consumer 过载 │ │ │ Broker 成为性能瓶颈 │ └─────────────────┴──────────────────────────────┘ Pull 模式的优劣: ┌─────────────────┬──────────────────────────────┐ │ 优点 │ 缺点 │ ├─────────────────┼──────────────────────────────┤ │ Consumer 自主控制│ 可能产生消息延迟 │ │ 流量可控 │ 频繁轮询浪费资源 │ │ Broker 无状态 │ 需要管理消费进度 │ │ 适合批量处理 │ 实现相对复杂 │ └─────────────────┴──────────────────────────────┘ 二、RocketMQ 的 Push 模式真相 2.1 Push 模式的内部实现 public class RocketMQPushMode { // DefaultMQPushConsumer 的秘密:底层是 Pull class PushConsumerImplementation { private PullMessageService pullMessageService; // 启动后自动开始拉取 public void start() { // 1. 启动 Rebalance 服务 rebalanceService.start(); // 2. 启动拉取服务 pullMessageService.start(); // 3. 立即触发拉取 pullMessageService.executePullRequestImmediately(pullRequest); } // 核心:PullMessageService class PullMessageService extends ServiceThread { @Override public void run() { while (!this.isStopped()) { PullRequest pullRequest = pullRequestQueue.take(); pullMessage(pullRequest); } } private void pullMessage(PullRequest pullRequest) { // 实际进行拉取操作 PullResult pullResult = pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, pullRequest.getNextOffset(), maxNums, sysFlag, 0, // 没有消息时挂起时间 brokerSuspendMaxTimeMillis, // 30秒 timeoutMillis, CommunicationMode.ASYNC, pullCallback ); } } } } 2.2 Push 模式的工作流程 Push 模式完整流程: ┌──────────────┐ │ Consumer │ └──────┬───────┘ │ 1. 发送拉取请求 ↓ ┌──────────────┐ │ Broker │ ├──────────────┤ │ 有消息? │ │ ├─是→ 立即返回 │ └─否→ Hold住请求 └──────┬───────┘ │ 2. 返回消息或超时 ↓ ┌──────────────┐ │ Consumer │ │ 处理消息 │ │ 继续拉取 │ └──────────────┘ 2.3 为什么要"伪装"成 Push? public class WhyFakePush { // 原因1:使用体验 class UserExperience { // Push API 更简单 void pushAPI() { consumer.registerMessageListener(listener); consumer.start(); // 就这么简单 } // Pull API 更复杂 void pullAPI() { while (running) { PullResult result = consumer.pull(mq, offset); process(result); updateOffset(offset); } } } // 原因2:兼顾优点 class CombineAdvantages { // 获得 Push 的优点: // - 接近实时(长轮询) // - 使用简单 // 获得 Pull 的优点: // - 流量可控 // - Broker 无状态 // - Consumer 自主控制 } // 原因3:避免缺点 class AvoidDisadvantages { // 避免 Push 的缺点: // - Broker 不需要维护推送状态 // - 不会造成 Consumer 过载 // 避免 Pull 的缺点: // - 无消息时不会空轮询 // - 有消息时立即返回 } } 三、长轮询机制详解 3.1 长轮询的核心原理 public class LongPollingMechanism { // Broker 端长轮询实现 class BrokerLongPolling { // 处理拉取请求 public RemotingCommand processRequest(RemotingCommand request) { // 1. 查询消息 GetMessageResult getMessageResult = messageStore.getMessage( group, topic, queueId, offset, maxMsgNums, filters ); // 2. 判断是否有消息 if (getMessageResult.getMessageCount() > 0) { // 有消息,立即返回 return buildResponse(getMessageResult); } else { // 没有消息,进入长轮询 return longPolling(request); } } // 长轮询处理 private RemotingCommand longPolling(RemotingCommand request) { // 1. 创建 Hold 请求 PullRequest pullRequest = new PullRequest( request, channel, pollingTimeMills, // 挂起时间 System.currentTimeMillis() ); // 2. 放入 Hold 队列 pullRequestHoldService.suspendPullRequest(topic, queueId, pullRequest); // 3. 不立即返回,等待唤醒或超时 return null; // 暂不返回 } } // Hold 服务 class PullRequestHoldService extends ServiceThread { // Hold 请求的容器 private ConcurrentHashMap<String, ManyPullRequest> pullRequestTable; @Override public void run() { while (!stopped) { // 每 5 秒检查一次 waitForRunning(5000); // 检查所有 Hold 的请求 for (ManyPullRequest mpr : pullRequestTable.values()) { List<PullRequest> requestList = mpr.cloneListAndClear(); for (PullRequest request : requestList) { // 检查是否有新消息 long offset = messageStore.getMaxOffset(topic, queueId); if (offset > request.getPullFromThisOffset()) { // 有新消息,唤醒请求 wakeupRequest(request); } else if (System.currentTimeMillis() - request.getSuspendTimestamp() > maxWaitTimeMillis) { // 超时,返回空结果 timeoutRequest(request); } else { // 继续 Hold mpr.addPullRequest(request); } } } } } // 新消息到达时的通知 public void notifyMessageArriving(String topic, int queueId) { ManyPullRequest mpr = pullRequestTable.get(buildKey(topic, queueId)); if (mpr != null) { List<PullRequest> requestList = mpr.cloneListAndClear(); for (PullRequest request : requestList) { // 立即唤醒等待的请求 wakeupRequest(request); } } } } } 3.2 长轮询的时序图 长轮询时序图: Consumer Broker MessageStore │ │ │ │──────Pull Request───────>│ │ │ │ │ │ │────Check Message────────>│ │ │ │ │ │<───No Message────────────│ │ │ │ │ (Hold Request) │ │ │ │ │ │<────New Message Arrive────│ │ │ │ │ (Wake up Request) │ │ │ │ │<─────Return Messages─────│ │ │ │ │ 时间轴: 0s :Consumer 发送拉取请求 0s :Broker 检查没有消息,Hold 住请求 15s :新消息到达,Broker 唤醒请求 15s :Broker 返回消息给 Consumer 3.3 长轮询的参数配置 public class LongPollingConfig { // Consumer 端配置 class ConsumerConfig { // 长轮询模式下的挂起时间(默认 30 秒) consumer.setBrokerSuspendMaxTimeMillis(30000); // 拉取间隔(毫秒) consumer.setPullInterval(0); // 0 表示连续拉取 // 拉取批量大小 consumer.setPullBatchSize(32); // 单次拉取的最大消息数 consumer.setConsumeMessageBatchMaxSize(1); } // Broker 端配置 class BrokerConfig { // 长轮询等待时间(默认 30 秒) longPollingEnable = true; // Hold 请求的检查间隔(默认 5 秒) waitTimeMillsInPullHoldService = 5000; // 短轮询等待时间(默认 1 秒) shortPollingTimeMills = 1000; } } 四、Pull 模式的使用场景 4.1 Pull 模式的实现 public class PullModeImplementation { public void pullConsumer() throws Exception { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(); consumer.setNamesrvAddr("localhost:9876"); consumer.setConsumerGroup("PULL_CONSUMER_GROUP"); consumer.start(); // 获取消息队列 Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TOPIC"); for (MessageQueue mq : mqs) { // 从存储获取消费进度 long offset = loadOffset(mq); while (true) { try { // 拉取消息 PullResult pullResult = consumer.pullBlockIfNotFound( mq, // 消息队列 null, // 过滤表达式 offset, // 拉取偏移量 32 // 最大消息数 ); // 处理拉取结果 switch (pullResult.getMsgFoundList()) { case FOUND: // 处理消息 for (MessageExt msg : pullResult.getMsgFoundList()) { processMessage(msg); } // 更新偏移量 offset = pullResult.getNextBeginOffset(); // 持久化偏移量 saveOffset(mq, offset); break; case NO_NEW_MSG: // 没有新消息 Thread.sleep(1000); break; case OFFSET_ILLEGAL: // 偏移量非法 offset = consumer.minOffset(mq); break; } } catch (Exception e) { e.printStackTrace(); } } } } } 4.2 Pull 模式的适用场景 public class PullModeScenarios { // 场景1:批量处理 class BatchProcessing { void batchConsume() { // 积累一定数量后批量处理 List<MessageExt> batch = new ArrayList<>(); while (batch.size() < 1000) { PullResult result = consumer.pull(mq, offset, 100); batch.addAll(result.getMsgFoundList()); } // 批量处理 batchProcess(batch); } } // 场景2:定时处理 class ScheduledProcessing { void scheduledConsume() { // 每小时处理一次 ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() -> { PullResult result = consumer.pull(mq, offset, Integer.MAX_VALUE); process(result.getMsgFoundList()); }, 0, 1, TimeUnit.HOURS); } } // 场景3:流量控制 class FlowControl { void controlledConsume() { RateLimiter rateLimiter = RateLimiter.create(100); // 100 TPS while (running) { rateLimiter.acquire(); // 限流 PullResult result = consumer.pull(mq, offset, 1); process(result.getMsgFoundList()); } } } // 场景4:自定义消费逻辑 class CustomLogic { void customConsume() { // 根据业务状态决定是否消费 if (isBusinessReady()) { PullResult result = consumer.pull(mq, offset, 32); process(result.getMsgFoundList()); } else { // 暂停消费 Thread.sleep(5000); } } } } 五、Push 和 Pull 的性能对比 5.1 性能测试 public class PerformanceComparison { // Push 模式性能特点 class PushPerformance { // 优点: // - 低延迟(长轮询,有消息立即返回) // - 高吞吐(连续拉取,无空闲) // - 资源利用率高(无空轮询) // 缺点: // - Consumer 压力大(连续处理) // - 流量不可控(Broker 推送速率) } // Pull 模式性能特点 class PullPerformance { // 优点: // - 流量可控(Consumer 控制) // - 压力可调(可以暂停) // - 批量优化(一次拉取多条) // 缺点: // - 可能有延迟(拉取间隔) // - 可能空轮询(浪费资源) } // 性能数据对比 class PerformanceData { /* 测试环境:3 Broker,10 Producer,10 Consumer 消息大小:1KB 指标 Push模式 Pull模式(1s间隔) Pull模式(连续) ---------------------------------------------------------------- 延迟(ms) 10-50 1000-2000 50-100 TPS 100,000 50,000 80,000 CPU使用率 85% 60% 75% 网络开销 低 高(空轮询) 中 */ } } 5.2 选择建议 public class SelectionGuide { // 选择 Push 模式的场景 class WhenToUsePush { // ✅ 实时性要求高 // ✅ 消息量稳定 // ✅ 希望简单使用 // ✅ Consumer 处理能力强 void example() { // 订单处理、支付通知、实时推送 } } // 选择 Pull 模式的场景 class WhenToUsePull { // ✅ 需要批量处理 // ✅ 定时处理 // ✅ 流量控制 // ✅ 自定义消费策略 void example() { // 数据同步、报表生成、批量导入 } } // 混合使用 class HybridUsage { // 核心业务使用 Push(实时性) // 非核心业务使用 Pull(可控性) void example() { // Push: 订单、支付 // Pull: 日志、统计 } } } 六、高级特性 6.1 Push 模式的流控 public class PushFlowControl { public void flowControl() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 流控参数 consumer.setPullThresholdForQueue(1000); // 单队列最大消息数 consumer.setPullThresholdSizeForQueue(100); // 单队列最大消息大小(MB) consumer.setPullThresholdForTopic(10000); // Topic 最大消息数 consumer.setPullThresholdSizeForTopic(1000); // Topic 最大消息大小(MB) // 消费并发控制 consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(64); // 批量消费 consumer.setConsumeMessageBatchMaxSize(10); } } 6.2 Pull 模式的优化 public class PullOptimization { // 优化1:使用 PullConsumer 代替 DefaultMQPullConsumer public void optimizedPull() { // RocketMQ 5.0 新 API PullConsumer pullConsumer = PullConsumer.newBuilder() .setConsumerGroup("GROUP") .setNamesrvAddr("localhost:9876") .build(); // 异步拉取 CompletableFuture<PullResult> future = pullConsumer.pullAsync( mq, offset, maxNums ); future.thenAccept(result -> { process(result.getMsgFoundList()); }); } // 优化2:批量拉取 public void batchPull() { // 一次拉取更多消息,减少网络开销 PullResult result = consumer.pull(mq, offset, 1000); // 拉取 1000 条 } // 优化3:并行处理 public void parallelPull() { ExecutorService executor = Executors.newFixedThreadPool(10); for (MessageQueue mq : queues) { executor.submit(() -> { while (running) { PullResult result = consumer.pull(mq, offset, 32); process(result); } }); } } } 七、常见问题 7.1 Push 模式消费堆积 // 问题:Push 模式下消息堆积 // 原因:消费速度 < 生产速度 // 解决方案: class PushBacklogSolution { void solution() { // 1. 增加消费者实例 // 水平扩展 // 2. 增加消费线程 consumer.setConsumeThreadMin(40); consumer.setConsumeThreadMax(80); // 3. 批量消费 consumer.setConsumeMessageBatchMaxSize(20); // 4. 优化消费逻辑 // 异步处理、批量入库等 } } 7.2 Pull 模式偏移量管理 // 问题:Pull 模式偏移量丢失 // 原因:没有正确持久化偏移量 // 解决方案: class PullOffsetManagement { private OffsetStore offsetStore; void manageOffset() { // 1. 定期持久化 scheduledExecutor.scheduleAtFixedRate(() -> { offsetStore.persistAll(offsetTable); }, 5, 5, TimeUnit.SECONDS); // 2. 优雅关闭时持久化 Runtime.getRuntime().addShutdownHook(new Thread(() -> { offsetStore.persistAll(offsetTable); })); // 3. 使用 RocketMQ 提供的 OffsetStore consumer.setOffsetStore(new RemoteBrokerOffsetStore()); } } 八、总结 通过本篇,我们深入理解了: ...

2025-11-13 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...