引言:一个有趣的悖论
当你使用 RocketMQ 的 Push 模式时,你可能会惊讶地发现:
- API 名称是
DefaultMQPushConsumer - 但实际上是 Consumer 在主动拉取消息
- 官方文档称之为"Push 模式"
- 但本质上是"长轮询 Pull"
这不是设计缺陷,而是精心权衡的结果。让我们揭开这个"伪 Push"的神秘面纱。
一、Push vs Pull 的本质区别
1.1 理论上的 Push 和 Pull
public class PushVsPullTheory {
// 真正的 Push 模式
class TruePushMode {
// Broker 主动推送消息给 Consumer
// 特点:
// 1. Broker 维护 Consumer 列表
// 2. 有新消息立即推送
// 3. Broker 控制推送速率
// 4. Consumer 被动接收
void brokerPush() {
// Broker 端代码
for (Consumer consumer : consumers) {
consumer.receive(message); // 主动调用
}
}
}
// 真正的 Pull 模式
class TruePullMode {
// Consumer 主动从 Broker 拉取消息
// 特点:
// 1. Consumer 控制拉取时机
// 2. Consumer 控制拉取速率
// 3. Broker 被动响应
// 4. 可能存在消息延迟
void consumerPull() {
// Consumer 端代码
while (running) {
Message msg = broker.pull(); // 主动拉取
process(msg);
}
}
}
}
1.2 两种模式的优劣对比
Push 模式的优劣:
┌─────────────────┬──────────────────────────────┐
│ 优点 │ 缺点 │
├─────────────────┼──────────────────────────────┤
│ 实时性高 │ Broker 需要维护推送状态 │
│ 消息延迟小 │ 难以处理消费速率差异 │
│ 使用简单 │ 可能造成 Consumer 过载 │
│ │ Broker 成为性能瓶颈 │
└─────────────────┴──────────────────────────────┘
Pull 模式的优劣:
┌─────────────────┬──────────────────────────────┐
│ 优点 │ 缺点 │
├─────────────────┼──────────────────────────────┤
│ Consumer 自主控制│ 可能产生消息延迟 │
│ 流量可控 │ 频繁轮询浪费资源 │
│ Broker 无状态 │ 需要管理消费进度 │
│ 适合批量处理 │ 实现相对复杂 │
└─────────────────┴──────────────────────────────┘
二、RocketMQ 的 Push 模式真相
2.1 Push 模式的内部实现
public class RocketMQPushMode {
// DefaultMQPushConsumer 的秘密:底层是 Pull
class PushConsumerImplementation {
private PullMessageService pullMessageService;
// 启动后自动开始拉取
public void start() {
// 1. 启动 Rebalance 服务
rebalanceService.start();
// 2. 启动拉取服务
pullMessageService.start();
// 3. 立即触发拉取
pullMessageService.executePullRequestImmediately(pullRequest);
}
// 核心:PullMessageService
class PullMessageService extends ServiceThread {
@Override
public void run() {
while (!this.isStopped()) {
PullRequest pullRequest = pullRequestQueue.take();
pullMessage(pullRequest);
}
}
private void pullMessage(PullRequest pullRequest) {
// 实际进行拉取操作
PullResult pullResult = pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
pullRequest.getNextOffset(),
maxNums,
sysFlag,
0, // 没有消息时挂起时间
brokerSuspendMaxTimeMillis, // 30秒
timeoutMillis,
CommunicationMode.ASYNC,
pullCallback
);
}
}
}
}
2.2 Push 模式的工作流程
Push 模式完整流程:
┌──────────────┐
│ Consumer │
└──────┬───────┘
│ 1. 发送拉取请求
↓
┌──────────────┐
│ Broker │
├──────────────┤
│ 有消息? │
│ ├─是→ 立即返回
│ └─否→ Hold住请求
└──────┬───────┘
│ 2. 返回消息或超时
↓
┌──────────────┐
│ Consumer │
│ 处理消息 │
│ 继续拉取 │
└──────────────┘
2.3 为什么要"伪装"成 Push?
public class WhyFakePush {
// 原因1:使用体验
class UserExperience {
// Push API 更简单
void pushAPI() {
consumer.registerMessageListener(listener);
consumer.start(); // 就这么简单
}
// Pull API 更复杂
void pullAPI() {
while (running) {
PullResult result = consumer.pull(mq, offset);
process(result);
updateOffset(offset);
}
}
}
// 原因2:兼顾优点
class CombineAdvantages {
// 获得 Push 的优点:
// - 接近实时(长轮询)
// - 使用简单
// 获得 Pull 的优点:
// - 流量可控
// - Broker 无状态
// - Consumer 自主控制
}
// 原因3:避免缺点
class AvoidDisadvantages {
// 避免 Push 的缺点:
// - Broker 不需要维护推送状态
// - 不会造成 Consumer 过载
// 避免 Pull 的缺点:
// - 无消息时不会空轮询
// - 有消息时立即返回
}
}
三、长轮询机制详解
3.1 长轮询的核心原理
public class LongPollingMechanism {
// Broker 端长轮询实现
class BrokerLongPolling {
// 处理拉取请求
public RemotingCommand processRequest(RemotingCommand request) {
// 1. 查询消息
GetMessageResult getMessageResult = messageStore.getMessage(
group, topic, queueId, offset, maxMsgNums, filters
);
// 2. 判断是否有消息
if (getMessageResult.getMessageCount() > 0) {
// 有消息,立即返回
return buildResponse(getMessageResult);
} else {
// 没有消息,进入长轮询
return longPolling(request);
}
}
// 长轮询处理
private RemotingCommand longPolling(RemotingCommand request) {
// 1. 创建 Hold 请求
PullRequest pullRequest = new PullRequest(
request,
channel,
pollingTimeMills, // 挂起时间
System.currentTimeMillis()
);
// 2. 放入 Hold 队列
pullRequestHoldService.suspendPullRequest(topic, queueId, pullRequest);
// 3. 不立即返回,等待唤醒或超时
return null; // 暂不返回
}
}
// Hold 服务
class PullRequestHoldService extends ServiceThread {
// Hold 请求的容器
private ConcurrentHashMap<String, ManyPullRequest> pullRequestTable;
@Override
public void run() {
while (!stopped) {
// 每 5 秒检查一次
waitForRunning(5000);
// 检查所有 Hold 的请求
for (ManyPullRequest mpr : pullRequestTable.values()) {
List<PullRequest> requestList = mpr.cloneListAndClear();
for (PullRequest request : requestList) {
// 检查是否有新消息
long offset = messageStore.getMaxOffset(topic, queueId);
if (offset > request.getPullFromThisOffset()) {
// 有新消息,唤醒请求
wakeupRequest(request);
} else if (System.currentTimeMillis() - request.getSuspendTimestamp() > maxWaitTimeMillis) {
// 超时,返回空结果
timeoutRequest(request);
} else {
// 继续 Hold
mpr.addPullRequest(request);
}
}
}
}
}
// 新消息到达时的通知
public void notifyMessageArriving(String topic, int queueId) {
ManyPullRequest mpr = pullRequestTable.get(buildKey(topic, queueId));
if (mpr != null) {
List<PullRequest> requestList = mpr.cloneListAndClear();
for (PullRequest request : requestList) {
// 立即唤醒等待的请求
wakeupRequest(request);
}
}
}
}
}
3.2 长轮询的时序图
长轮询时序图:
Consumer Broker MessageStore
│ │ │
│──────Pull Request───────>│ │
│ │ │
│ │────Check Message────────>│
│ │ │
│ │<───No Message────────────│
│ │ │
│ (Hold Request) │
│ │ │
│ │<────New Message Arrive────│
│ │ │
│ (Wake up Request) │
│ │ │
│<─────Return Messages─────│ │
│ │ │
时间轴:
0s :Consumer 发送拉取请求
0s :Broker 检查没有消息,Hold 住请求
15s :新消息到达,Broker 唤醒请求
15s :Broker 返回消息给 Consumer
3.3 长轮询的参数配置
public class LongPollingConfig {
// Consumer 端配置
class ConsumerConfig {
// 长轮询模式下的挂起时间(默认 30 秒)
consumer.setBrokerSuspendMaxTimeMillis(30000);
// 拉取间隔(毫秒)
consumer.setPullInterval(0); // 0 表示连续拉取
// 拉取批量大小
consumer.setPullBatchSize(32);
// 单次拉取的最大消息数
consumer.setConsumeMessageBatchMaxSize(1);
}
// Broker 端配置
class BrokerConfig {
// 长轮询等待时间(默认 30 秒)
longPollingEnable = true;
// Hold 请求的检查间隔(默认 5 秒)
waitTimeMillsInPullHoldService = 5000;
// 短轮询等待时间(默认 1 秒)
shortPollingTimeMills = 1000;
}
}
四、Pull 模式的使用场景
4.1 Pull 模式的实现
public class PullModeImplementation {
public void pullConsumer() throws Exception {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer();
consumer.setNamesrvAddr("localhost:9876");
consumer.setConsumerGroup("PULL_CONSUMER_GROUP");
consumer.start();
// 获取消息队列
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TOPIC");
for (MessageQueue mq : mqs) {
// 从存储获取消费进度
long offset = loadOffset(mq);
while (true) {
try {
// 拉取消息
PullResult pullResult = consumer.pullBlockIfNotFound(
mq, // 消息队列
null, // 过滤表达式
offset, // 拉取偏移量
32 // 最大消息数
);
// 处理拉取结果
switch (pullResult.getMsgFoundList()) {
case FOUND:
// 处理消息
for (MessageExt msg : pullResult.getMsgFoundList()) {
processMessage(msg);
}
// 更新偏移量
offset = pullResult.getNextBeginOffset();
// 持久化偏移量
saveOffset(mq, offset);
break;
case NO_NEW_MSG:
// 没有新消息
Thread.sleep(1000);
break;
case OFFSET_ILLEGAL:
// 偏移量非法
offset = consumer.minOffset(mq);
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
4.2 Pull 模式的适用场景
public class PullModeScenarios {
// 场景1:批量处理
class BatchProcessing {
void batchConsume() {
// 积累一定数量后批量处理
List<MessageExt> batch = new ArrayList<>();
while (batch.size() < 1000) {
PullResult result = consumer.pull(mq, offset, 100);
batch.addAll(result.getMsgFoundList());
}
// 批量处理
batchProcess(batch);
}
}
// 场景2:定时处理
class ScheduledProcessing {
void scheduledConsume() {
// 每小时处理一次
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
PullResult result = consumer.pull(mq, offset, Integer.MAX_VALUE);
process(result.getMsgFoundList());
}, 0, 1, TimeUnit.HOURS);
}
}
// 场景3:流量控制
class FlowControl {
void controlledConsume() {
RateLimiter rateLimiter = RateLimiter.create(100); // 100 TPS
while (running) {
rateLimiter.acquire(); // 限流
PullResult result = consumer.pull(mq, offset, 1);
process(result.getMsgFoundList());
}
}
}
// 场景4:自定义消费逻辑
class CustomLogic {
void customConsume() {
// 根据业务状态决定是否消费
if (isBusinessReady()) {
PullResult result = consumer.pull(mq, offset, 32);
process(result.getMsgFoundList());
} else {
// 暂停消费
Thread.sleep(5000);
}
}
}
}
五、Push 和 Pull 的性能对比
5.1 性能测试
public class PerformanceComparison {
// Push 模式性能特点
class PushPerformance {
// 优点:
// - 低延迟(长轮询,有消息立即返回)
// - 高吞吐(连续拉取,无空闲)
// - 资源利用率高(无空轮询)
// 缺点:
// - Consumer 压力大(连续处理)
// - 流量不可控(Broker 推送速率)
}
// Pull 模式性能特点
class PullPerformance {
// 优点:
// - 流量可控(Consumer 控制)
// - 压力可调(可以暂停)
// - 批量优化(一次拉取多条)
// 缺点:
// - 可能有延迟(拉取间隔)
// - 可能空轮询(浪费资源)
}
// 性能数据对比
class PerformanceData {
/*
测试环境:3 Broker,10 Producer,10 Consumer
消息大小:1KB
指标 Push模式 Pull模式(1s间隔) Pull模式(连续)
----------------------------------------------------------------
延迟(ms) 10-50 1000-2000 50-100
TPS 100,000 50,000 80,000
CPU使用率 85% 60% 75%
网络开销 低 高(空轮询) 中
*/
}
}
5.2 选择建议
public class SelectionGuide {
// 选择 Push 模式的场景
class WhenToUsePush {
// ✅ 实时性要求高
// ✅ 消息量稳定
// ✅ 希望简单使用
// ✅ Consumer 处理能力强
void example() {
// 订单处理、支付通知、实时推送
}
}
// 选择 Pull 模式的场景
class WhenToUsePull {
// ✅ 需要批量处理
// ✅ 定时处理
// ✅ 流量控制
// ✅ 自定义消费策略
void example() {
// 数据同步、报表生成、批量导入
}
}
// 混合使用
class HybridUsage {
// 核心业务使用 Push(实时性)
// 非核心业务使用 Pull(可控性)
void example() {
// Push: 订单、支付
// Pull: 日志、统计
}
}
}
六、高级特性
6.1 Push 模式的流控
public class PushFlowControl {
public void flowControl() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
// 流控参数
consumer.setPullThresholdForQueue(1000); // 单队列最大消息数
consumer.setPullThresholdSizeForQueue(100); // 单队列最大消息大小(MB)
consumer.setPullThresholdForTopic(10000); // Topic 最大消息数
consumer.setPullThresholdSizeForTopic(1000); // Topic 最大消息大小(MB)
// 消费并发控制
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
// 批量消费
consumer.setConsumeMessageBatchMaxSize(10);
}
}
6.2 Pull 模式的优化
public class PullOptimization {
// 优化1:使用 PullConsumer 代替 DefaultMQPullConsumer
public void optimizedPull() {
// RocketMQ 5.0 新 API
PullConsumer pullConsumer = PullConsumer.newBuilder()
.setConsumerGroup("GROUP")
.setNamesrvAddr("localhost:9876")
.build();
// 异步拉取
CompletableFuture<PullResult> future = pullConsumer.pullAsync(
mq, offset, maxNums
);
future.thenAccept(result -> {
process(result.getMsgFoundList());
});
}
// 优化2:批量拉取
public void batchPull() {
// 一次拉取更多消息,减少网络开销
PullResult result = consumer.pull(mq, offset, 1000); // 拉取 1000 条
}
// 优化3:并行处理
public void parallelPull() {
ExecutorService executor = Executors.newFixedThreadPool(10);
for (MessageQueue mq : queues) {
executor.submit(() -> {
while (running) {
PullResult result = consumer.pull(mq, offset, 32);
process(result);
}
});
}
}
}
七、常见问题
7.1 Push 模式消费堆积
// 问题:Push 模式下消息堆积
// 原因:消费速度 < 生产速度
// 解决方案:
class PushBacklogSolution {
void solution() {
// 1. 增加消费者实例
// 水平扩展
// 2. 增加消费线程
consumer.setConsumeThreadMin(40);
consumer.setConsumeThreadMax(80);
// 3. 批量消费
consumer.setConsumeMessageBatchMaxSize(20);
// 4. 优化消费逻辑
// 异步处理、批量入库等
}
}
7.2 Pull 模式偏移量管理
// 问题:Pull 模式偏移量丢失
// 原因:没有正确持久化偏移量
// 解决方案:
class PullOffsetManagement {
private OffsetStore offsetStore;
void manageOffset() {
// 1. 定期持久化
scheduledExecutor.scheduleAtFixedRate(() -> {
offsetStore.persistAll(offsetTable);
}, 5, 5, TimeUnit.SECONDS);
// 2. 优雅关闭时持久化
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
offsetStore.persistAll(offsetTable);
}));
// 3. 使用 RocketMQ 提供的 OffsetStore
consumer.setOffsetStore(new RemoteBrokerOffsetStore());
}
}
八、总结
通过本篇,我们深入理解了:
- Push vs Pull 的本质:推送 vs 拉取的根本区别
- RocketMQ 的 Push 真相:基于长轮询的"伪 Push"
- 长轮询机制:如何实现低延迟又不浪费资源
- 使用场景:如何根据业务选择合适的模式
RocketMQ 的 Push 模式设计,是工程实践的典范:既提供了简单的使用体验,又保证了系统的可控性和高性能。
下一篇预告
理解了消费模式的本质,下一篇我们将深入探讨消费组机制:负载均衡、Rebalance、消费进度管理的实现细节。
思考与实践:
- 为什么 RocketMQ 不实现真正的 Push,而要用长轮询?
- 在什么场景下,Pull 模式比 Push 模式更合适?
- 如何监控和调优长轮询的性能?
通过实践加深理解,我们下篇见!