引言:一个有趣的悖论

当你使用 RocketMQ 的 Push 模式时,你可能会惊讶地发现:

  • API 名称是 DefaultMQPushConsumer
  • 但实际上是 Consumer 在主动拉取消息
  • 官方文档称之为"Push 模式"
  • 但本质上是"长轮询 Pull"

这不是设计缺陷,而是精心权衡的结果。让我们揭开这个"伪 Push"的神秘面纱。

一、Push vs Pull 的本质区别

1.1 理论上的 Push 和 Pull

public class PushVsPullTheory {

    // 真正的 Push 模式
    class TruePushMode {
        // Broker 主动推送消息给 Consumer
        // 特点:
        // 1. Broker 维护 Consumer 列表
        // 2. 有新消息立即推送
        // 3. Broker 控制推送速率
        // 4. Consumer 被动接收

        void brokerPush() {
            // Broker 端代码
            for (Consumer consumer : consumers) {
                consumer.receive(message);  // 主动调用
            }
        }
    }

    // 真正的 Pull 模式
    class TruePullMode {
        // Consumer 主动从 Broker 拉取消息
        // 特点:
        // 1. Consumer 控制拉取时机
        // 2. Consumer 控制拉取速率
        // 3. Broker 被动响应
        // 4. 可能存在消息延迟

        void consumerPull() {
            // Consumer 端代码
            while (running) {
                Message msg = broker.pull();  // 主动拉取
                process(msg);
            }
        }
    }
}

1.2 两种模式的优劣对比

Push 模式的优劣:
┌─────────────────┬──────────────────────────────┐
│     优点        │           缺点               │
├─────────────────┼──────────────────────────────┤
│ 实时性高        │ Broker 需要维护推送状态      │
│ 消息延迟小      │ 难以处理消费速率差异        │
│ 使用简单        │ 可能造成 Consumer 过载       │
│                 │ Broker 成为性能瓶颈          │
└─────────────────┴──────────────────────────────┘

Pull 模式的优劣:
┌─────────────────┬──────────────────────────────┐
│     优点        │           缺点               │
├─────────────────┼──────────────────────────────┤
│ Consumer 自主控制│ 可能产生消息延迟            │
│ 流量可控        │ 频繁轮询浪费资源            │
│ Broker 无状态   │ 需要管理消费进度            │
│ 适合批量处理    │ 实现相对复杂                │
└─────────────────┴──────────────────────────────┘

二、RocketMQ 的 Push 模式真相

2.1 Push 模式的内部实现

public class RocketMQPushMode {

    // DefaultMQPushConsumer 的秘密:底层是 Pull
    class PushConsumerImplementation {

        private PullMessageService pullMessageService;

        // 启动后自动开始拉取
        public void start() {
            // 1. 启动 Rebalance 服务
            rebalanceService.start();

            // 2. 启动拉取服务
            pullMessageService.start();

            // 3. 立即触发拉取
            pullMessageService.executePullRequestImmediately(pullRequest);
        }

        // 核心:PullMessageService
        class PullMessageService extends ServiceThread {
            @Override
            public void run() {
                while (!this.isStopped()) {
                    PullRequest pullRequest = pullRequestQueue.take();
                    pullMessage(pullRequest);
                }
            }

            private void pullMessage(PullRequest pullRequest) {
                // 实际进行拉取操作
                PullResult pullResult = pullAPIWrapper.pullKernelImpl(
                    pullRequest.getMessageQueue(),
                    subExpression,
                    pullRequest.getNextOffset(),
                    maxNums,
                    sysFlag,
                    0,  // 没有消息时挂起时间
                    brokerSuspendMaxTimeMillis,  // 30秒
                    timeoutMillis,
                    CommunicationMode.ASYNC,
                    pullCallback
                );
            }
        }
    }
}

2.2 Push 模式的工作流程

Push 模式完整流程:
┌──────────────┐
│   Consumer   │
└──────┬───────┘
       │ 1. 发送拉取请求
       ↓
┌──────────────┐
│    Broker    │
├──────────────┤
│ 有消息?     │
│  ├─是→ 立即返回
│  └─否→ Hold住请求
└──────┬───────┘
       │ 2. 返回消息或超时
       ↓
┌──────────────┐
│   Consumer   │
│ 处理消息     │
│ 继续拉取     │
└──────────────┘

2.3 为什么要"伪装"成 Push?

public class WhyFakePush {

    // 原因1:使用体验
    class UserExperience {
        // Push API 更简单
        void pushAPI() {
            consumer.registerMessageListener(listener);
            consumer.start();  // 就这么简单
        }

        // Pull API 更复杂
        void pullAPI() {
            while (running) {
                PullResult result = consumer.pull(mq, offset);
                process(result);
                updateOffset(offset);
            }
        }
    }

    // 原因2:兼顾优点
    class CombineAdvantages {
        // 获得 Push 的优点:
        // - 接近实时(长轮询)
        // - 使用简单

        // 获得 Pull 的优点:
        // - 流量可控
        // - Broker 无状态
        // - Consumer 自主控制
    }

    // 原因3:避免缺点
    class AvoidDisadvantages {
        // 避免 Push 的缺点:
        // - Broker 不需要维护推送状态
        // - 不会造成 Consumer 过载

        // 避免 Pull 的缺点:
        // - 无消息时不会空轮询
        // - 有消息时立即返回
    }
}

三、长轮询机制详解

3.1 长轮询的核心原理

public class LongPollingMechanism {

    // Broker 端长轮询实现
    class BrokerLongPolling {

        // 处理拉取请求
        public RemotingCommand processRequest(RemotingCommand request) {
            // 1. 查询消息
            GetMessageResult getMessageResult = messageStore.getMessage(
                group, topic, queueId, offset, maxMsgNums, filters
            );

            // 2. 判断是否有消息
            if (getMessageResult.getMessageCount() > 0) {
                // 有消息,立即返回
                return buildResponse(getMessageResult);
            } else {
                // 没有消息,进入长轮询
                return longPolling(request);
            }
        }

        // 长轮询处理
        private RemotingCommand longPolling(RemotingCommand request) {
            // 1. 创建 Hold 请求
            PullRequest pullRequest = new PullRequest(
                request,
                channel,
                pollingTimeMills,  // 挂起时间
                System.currentTimeMillis()
            );

            // 2. 放入 Hold 队列
            pullRequestHoldService.suspendPullRequest(topic, queueId, pullRequest);

            // 3. 不立即返回,等待唤醒或超时
            return null;  // 暂不返回
        }
    }

    // Hold 服务
    class PullRequestHoldService extends ServiceThread {

        // Hold 请求的容器
        private ConcurrentHashMap<String, ManyPullRequest> pullRequestTable;

        @Override
        public void run() {
            while (!stopped) {
                // 每 5 秒检查一次
                waitForRunning(5000);

                // 检查所有 Hold 的请求
                for (ManyPullRequest mpr : pullRequestTable.values()) {
                    List<PullRequest> requestList = mpr.cloneListAndClear();

                    for (PullRequest request : requestList) {
                        // 检查是否有新消息
                        long offset = messageStore.getMaxOffset(topic, queueId);

                        if (offset > request.getPullFromThisOffset()) {
                            // 有新消息,唤醒请求
                            wakeupRequest(request);
                        } else if (System.currentTimeMillis() - request.getSuspendTimestamp() > maxWaitTimeMillis) {
                            // 超时,返回空结果
                            timeoutRequest(request);
                        } else {
                            // 继续 Hold
                            mpr.addPullRequest(request);
                        }
                    }
                }
            }
        }

        // 新消息到达时的通知
        public void notifyMessageArriving(String topic, int queueId) {
            ManyPullRequest mpr = pullRequestTable.get(buildKey(topic, queueId));
            if (mpr != null) {
                List<PullRequest> requestList = mpr.cloneListAndClear();
                for (PullRequest request : requestList) {
                    // 立即唤醒等待的请求
                    wakeupRequest(request);
                }
            }
        }
    }
}

3.2 长轮询的时序图

长轮询时序图:

Consumer                    Broker                    MessageStore
    │                          │                           │
    │──────Pull Request───────>│                           │
    │                          │                           │
    │                          │────Check Message────────>│
    │                          │                           │
    │                          │<───No Message────────────│
    │                          │                           │
    │                    (Hold Request)                    │
    │                          │                           │
    │                          │<────New Message Arrive────│
    │                          │                           │
    │                    (Wake up Request)                 │
    │                          │                           │
    │<─────Return Messages─────│                           │
    │                          │                           │

时间轴:
0s   :Consumer 发送拉取请求
0s   :Broker 检查没有消息,Hold 住请求
15s  :新消息到达,Broker 唤醒请求
15s  :Broker 返回消息给 Consumer

3.3 长轮询的参数配置

public class LongPollingConfig {

    // Consumer 端配置
    class ConsumerConfig {
        // 长轮询模式下的挂起时间(默认 30 秒)
        consumer.setBrokerSuspendMaxTimeMillis(30000);

        // 拉取间隔(毫秒)
        consumer.setPullInterval(0);  // 0 表示连续拉取

        // 拉取批量大小
        consumer.setPullBatchSize(32);

        // 单次拉取的最大消息数
        consumer.setConsumeMessageBatchMaxSize(1);
    }

    // Broker 端配置
    class BrokerConfig {
        // 长轮询等待时间(默认 30 秒)
        longPollingEnable = true;

        // Hold 请求的检查间隔(默认 5 秒)
        waitTimeMillsInPullHoldService = 5000;

        // 短轮询等待时间(默认 1 秒)
        shortPollingTimeMills = 1000;
    }
}

四、Pull 模式的使用场景

4.1 Pull 模式的实现

public class PullModeImplementation {

    public void pullConsumer() throws Exception {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer();
        consumer.setNamesrvAddr("localhost:9876");
        consumer.setConsumerGroup("PULL_CONSUMER_GROUP");
        consumer.start();

        // 获取消息队列
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TOPIC");

        for (MessageQueue mq : mqs) {
            // 从存储获取消费进度
            long offset = loadOffset(mq);

            while (true) {
                try {
                    // 拉取消息
                    PullResult pullResult = consumer.pullBlockIfNotFound(
                        mq,           // 消息队列
                        null,         // 过滤表达式
                        offset,       // 拉取偏移量
                        32           // 最大消息数
                    );

                    // 处理拉取结果
                    switch (pullResult.getMsgFoundList()) {
                        case FOUND:
                            // 处理消息
                            for (MessageExt msg : pullResult.getMsgFoundList()) {
                                processMessage(msg);
                            }
                            // 更新偏移量
                            offset = pullResult.getNextBeginOffset();
                            // 持久化偏移量
                            saveOffset(mq, offset);
                            break;

                        case NO_NEW_MSG:
                            // 没有新消息
                            Thread.sleep(1000);
                            break;

                        case OFFSET_ILLEGAL:
                            // 偏移量非法
                            offset = consumer.minOffset(mq);
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

4.2 Pull 模式的适用场景

public class PullModeScenarios {

    // 场景1:批量处理
    class BatchProcessing {
        void batchConsume() {
            // 积累一定数量后批量处理
            List<MessageExt> batch = new ArrayList<>();

            while (batch.size() < 1000) {
                PullResult result = consumer.pull(mq, offset, 100);
                batch.addAll(result.getMsgFoundList());
            }

            // 批量处理
            batchProcess(batch);
        }
    }

    // 场景2:定时处理
    class ScheduledProcessing {
        void scheduledConsume() {
            // 每小时处理一次
            ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

            scheduler.scheduleAtFixedRate(() -> {
                PullResult result = consumer.pull(mq, offset, Integer.MAX_VALUE);
                process(result.getMsgFoundList());
            }, 0, 1, TimeUnit.HOURS);
        }
    }

    // 场景3:流量控制
    class FlowControl {
        void controlledConsume() {
            RateLimiter rateLimiter = RateLimiter.create(100);  // 100 TPS

            while (running) {
                rateLimiter.acquire();  // 限流
                PullResult result = consumer.pull(mq, offset, 1);
                process(result.getMsgFoundList());
            }
        }
    }

    // 场景4:自定义消费逻辑
    class CustomLogic {
        void customConsume() {
            // 根据业务状态决定是否消费
            if (isBusinessReady()) {
                PullResult result = consumer.pull(mq, offset, 32);
                process(result.getMsgFoundList());
            } else {
                // 暂停消费
                Thread.sleep(5000);
            }
        }
    }
}

五、Push 和 Pull 的性能对比

5.1 性能测试

public class PerformanceComparison {

    // Push 模式性能特点
    class PushPerformance {
        // 优点:
        // - 低延迟(长轮询,有消息立即返回)
        // - 高吞吐(连续拉取,无空闲)
        // - 资源利用率高(无空轮询)

        // 缺点:
        // - Consumer 压力大(连续处理)
        // - 流量不可控(Broker 推送速率)
    }

    // Pull 模式性能特点
    class PullPerformance {
        // 优点:
        // - 流量可控(Consumer 控制)
        // - 压力可调(可以暂停)
        // - 批量优化(一次拉取多条)

        // 缺点:
        // - 可能有延迟(拉取间隔)
        // - 可能空轮询(浪费资源)
    }

    // 性能数据对比
    class PerformanceData {
        /*
        测试环境:3 Broker,10 Producer,10 Consumer
        消息大小:1KB

        指标         Push模式    Pull模式(1s间隔)  Pull模式(连续)
        ----------------------------------------------------------------
        延迟(ms)     10-50      1000-2000        50-100
        TPS          100,000    50,000           80,000
        CPU使用率    85%        60%              75%
        网络开销     低         高(空轮询)        中
        */
    }
}

5.2 选择建议

public class SelectionGuide {

    // 选择 Push 模式的场景
    class WhenToUsePush {
        // ✅ 实时性要求高
        // ✅ 消息量稳定
        // ✅ 希望简单使用
        // ✅ Consumer 处理能力强

        void example() {
            // 订单处理、支付通知、实时推送
        }
    }

    // 选择 Pull 模式的场景
    class WhenToUsePull {
        // ✅ 需要批量处理
        // ✅ 定时处理
        // ✅ 流量控制
        // ✅ 自定义消费策略

        void example() {
            // 数据同步、报表生成、批量导入
        }
    }

    // 混合使用
    class HybridUsage {
        // 核心业务使用 Push(实时性)
        // 非核心业务使用 Pull(可控性)

        void example() {
            // Push: 订单、支付
            // Pull: 日志、统计
        }
    }
}

六、高级特性

6.1 Push 模式的流控

public class PushFlowControl {

    public void flowControl() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();

        // 流控参数
        consumer.setPullThresholdForQueue(1000);      // 单队列最大消息数
        consumer.setPullThresholdSizeForQueue(100);   // 单队列最大消息大小(MB)
        consumer.setPullThresholdForTopic(10000);     // Topic 最大消息数
        consumer.setPullThresholdSizeForTopic(1000);  // Topic 最大消息大小(MB)

        // 消费并发控制
        consumer.setConsumeThreadMin(20);
        consumer.setConsumeThreadMax(64);

        // 批量消费
        consumer.setConsumeMessageBatchMaxSize(10);
    }
}

6.2 Pull 模式的优化

public class PullOptimization {

    // 优化1:使用 PullConsumer 代替 DefaultMQPullConsumer
    public void optimizedPull() {
        // RocketMQ 5.0 新 API
        PullConsumer pullConsumer = PullConsumer.newBuilder()
            .setConsumerGroup("GROUP")
            .setNamesrvAddr("localhost:9876")
            .build();

        // 异步拉取
        CompletableFuture<PullResult> future = pullConsumer.pullAsync(
            mq, offset, maxNums
        );

        future.thenAccept(result -> {
            process(result.getMsgFoundList());
        });
    }

    // 优化2:批量拉取
    public void batchPull() {
        // 一次拉取更多消息,减少网络开销
        PullResult result = consumer.pull(mq, offset, 1000);  // 拉取 1000 条
    }

    // 优化3:并行处理
    public void parallelPull() {
        ExecutorService executor = Executors.newFixedThreadPool(10);

        for (MessageQueue mq : queues) {
            executor.submit(() -> {
                while (running) {
                    PullResult result = consumer.pull(mq, offset, 32);
                    process(result);
                }
            });
        }
    }
}

七、常见问题

7.1 Push 模式消费堆积

// 问题:Push 模式下消息堆积
// 原因:消费速度 < 生产速度

// 解决方案:
class PushBacklogSolution {
    void solution() {
        // 1. 增加消费者实例
        // 水平扩展

        // 2. 增加消费线程
        consumer.setConsumeThreadMin(40);
        consumer.setConsumeThreadMax(80);

        // 3. 批量消费
        consumer.setConsumeMessageBatchMaxSize(20);

        // 4. 优化消费逻辑
        // 异步处理、批量入库等
    }
}

7.2 Pull 模式偏移量管理

// 问题:Pull 模式偏移量丢失
// 原因:没有正确持久化偏移量

// 解决方案:
class PullOffsetManagement {
    private OffsetStore offsetStore;

    void manageOffset() {
        // 1. 定期持久化
        scheduledExecutor.scheduleAtFixedRate(() -> {
            offsetStore.persistAll(offsetTable);
        }, 5, 5, TimeUnit.SECONDS);

        // 2. 优雅关闭时持久化
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            offsetStore.persistAll(offsetTable);
        }));

        // 3. 使用 RocketMQ 提供的 OffsetStore
        consumer.setOffsetStore(new RemoteBrokerOffsetStore());
    }
}

八、总结

通过本篇,我们深入理解了:

  1. Push vs Pull 的本质:推送 vs 拉取的根本区别
  2. RocketMQ 的 Push 真相:基于长轮询的"伪 Push"
  3. 长轮询机制:如何实现低延迟又不浪费资源
  4. 使用场景:如何根据业务选择合适的模式

RocketMQ 的 Push 模式设计,是工程实践的典范:既提供了简单的使用体验,又保证了系统的可控性和高性能。

下一篇预告

理解了消费模式的本质,下一篇我们将深入探讨消费组机制:负载均衡、Rebalance、消费进度管理的实现细节。


思考与实践

  1. 为什么 RocketMQ 不实现真正的 Push,而要用长轮询?
  2. 在什么场景下,Pull 模式比 Push 模式更合适?
  3. 如何监控和调优长轮询的性能?

通过实践加深理解,我们下篇见!