为什么RocketMQ能支撑双11万亿级消息?如何使用事务消息实现分布式事务?如何保证消息的全局顺序?
本文深度剖析RocketMQ的核心特性和实战应用。
一、RocketMQ核心架构
1.1 核心组件
Producer → NameServer → Broker → Consumer
↓ ↓
路由信息 消息存储
与Kafka的区别:
| 组件 | RocketMQ | Kafka |
|---|---|---|
| 注册中心 | NameServer(自研,轻量级) | ZooKeeper / KRaft |
| 存储结构 | CommitLog + ConsumeQueue | Partition日志文件 |
| 消息模型 | 点对点 + 发布订阅 | 发布订阅 |
| 特殊功能 | 事务消息、延迟消息、顺序消息 | 流式计算、消息回溯 |
1.2 存储架构
RocketMQ的三层存储:
1. CommitLog(所有消息)
├─ Message 1(Topic A)
├─ Message 2(Topic B)
├─ Message 3(Topic A)
...
2. ConsumeQueue(消息索引)
Topic A
├─ Queue 0
│ ├─ Offset 0 → CommitLog位置100
│ ├─ Offset 1 → CommitLog位置300
│ ...
└─ Queue 1
...
3. IndexFile(消息检索)
Key → CommitLog位置
优势:
- ✅ 顺序写CommitLog(高性能)
- ✅ 多个Topic共享CommitLog(不浪费磁盘)
- ✅ ConsumeQueue轻量级(只存索引,不存消息)
二、事务消息:分布式事务的终极解决方案
2.1 问题场景:订单与支付的一致性
/**
* 传统方案的问题
*/
public class TraditionalTransaction {
@Transactional
public void createOrder(OrderRequest request) {
// 1. 创建订单(本地事务)
Order order = orderService.create(request);
// 2. 发送消息到支付服务(远程调用)
// 问题:如果发送消息失败怎么办?
// - 订单已创建
// - 支付消息未发送
// - 数据不一致!
rabbitTemplate.send("payment.queue", new PaymentEvent(order));
}
}
两个难题:
- 先提交事务,后发送消息:消息发送失败,订单已创建(数据不一致)
- 先发送消息,后提交事务:消息发送成功,事务回滚(数据不一致)
2.2 RocketMQ事务消息
设计思想:二阶段提交 + 事务回查
阶段1:发送半消息(Half Message)
↓ 消息不可见
执行本地事务
↓ 成功 or 失败
阶段2:提交/回滚消息
↓ 提交:消息可见
↓ 回滚:删除消息
实现示例:
/**
* RocketMQ事务消息示例
*/
public class TransactionMessageExample {
public static void main(String[] args) throws Exception {
// 1. 创建事务监听器
TransactionListener transactionListener = new TransactionListenerImpl();
// 2. 创建事务生产者
TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group");
producer.setNamesrvAddr("localhost:9876");
// 设置事务监听器
producer.setTransactionListener(transactionListener);
producer.start();
// 3. 发送事务消息
String orderId = "ORD" + System.currentTimeMillis();
Message message = new Message(
"payment_topic",
"payment_tag",
orderId,
("支付订单:" + orderId).getBytes()
);
// 发送半消息
TransactionSendResult result = producer.sendMessageInTransaction(
message,
orderId // 本地事务参数
);
System.out.println("发送结果: " + result.getSendStatus());
Thread.sleep(60000);
producer.shutdown();
}
/**
* 事务监听器实现
*/
static class TransactionListenerImpl implements TransactionListener {
private final Map<String, LocalTransactionState> localTransactionMap = new ConcurrentHashMap<>();
/**
* 执行本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String orderId = (String) arg;
try {
// 执行本地事务(创建订单)
System.out.println("执行本地事务,订单ID: " + orderId);
// 模拟创建订单
boolean success = createOrder(orderId);
if (success) {
// 本地事务成功,提交消息
localTransactionMap.put(orderId, LocalTransactionState.COMMIT_MESSAGE);
return LocalTransactionState.COMMIT_MESSAGE;
} else {
// 本地事务失败,回滚消息
localTransactionMap.put(orderId, LocalTransactionState.ROLLBACK_MESSAGE);
return LocalTransactionState.ROLLBACK_MESSAGE;
}
} catch (Exception e) {
// 本地事务状态未知,等待回查
System.err.println("本地事务异常: " + e.getMessage());
localTransactionMap.put(orderId, LocalTransactionState.UNKNOW);
return LocalTransactionState.UNKNOW;
}
}
/**
* 事务回查(当本地事务状态未知时,Broker会定期回查)
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId = msg.getKeys();
System.out.println("事务回查,订单ID: " + orderId);
// 查询本地事务状态
LocalTransactionState state = localTransactionMap.get(orderId);
if (state != null) {
return state;
}
// 查询数据库,判断订单是否创建成功
boolean orderExists = checkOrderExists(orderId);
if (orderExists) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
private boolean createOrder(String orderId) {
// 模拟创建订单
System.out.println("创建订单: " + orderId);
// 模拟随机成功/失败
return Math.random() > 0.2;
}
private boolean checkOrderExists(String orderId) {
// 查询数据库,判断订单是否存在
return localTransactionMap.containsKey(orderId);
}
}
}
执行流程:
1. Producer发送半消息到Broker
└─ 半消息不可见,Consumer无法消费
2. Broker返回发送成功
└─ Producer执行本地事务(创建订单)
3. 根据本地事务结果,提交或回滚消息
├─ 成功:Commit → 消息可见,Consumer可以消费
├─ 失败:Rollback → 删除半消息
└─ 未知:UNKNOW → Broker定期回查(最多15次)
4. 如果15次回查都失败,消息进入死信队列
核心优势:
RocketMQ事务消息保证了本地事务和消息发送的最终一致性
三、顺序消息:保证消息的严格顺序
3.1 问题场景:订单状态机
订单状态流转:
创建订单 → 支付订单 → 发货 → 完成
问题:如果消息乱序
支付订单(后到达)
创建订单(先到达)
↓
状态机错误!
3.2 顺序消息实现
全局顺序:所有消息严格顺序(性能低,很少使用) 分区顺序:同一分区内的消息严格顺序(推荐)
/**
* RocketMQ顺序消息示例
*/
public class OrderlyMessageExample {
/**
* 发送顺序消息
*/
public static void sendOrderlyMessage() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("orderly_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String orderId = "ORD123";
// 发送3条顺序消息
String[] events = {"创建订单", "支付订单", "发货"};
for (String event : events) {
Message message = new Message(
"order_topic",
"order_tag",
(orderId + ":" + event).getBytes()
);
// 发送顺序消息(指定MessageQueueSelector)
SendResult result = producer.send(
message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 根据订单ID选择队列(同一订单ID进入同一队列)
String orderId = (String) arg;
int index = orderId.hashCode() % mqs.size();
return mqs.get(Math.abs(index));
}
},
orderId // 订单ID(决定队列)
);
System.out.println("发送顺序消息: " + event + " → Queue: " + result.getMessageQueue().getQueueId());
}
producer.shutdown();
}
/**
* 消费顺序消息
*/
public static void consumeOrderlyMessage() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("order_topic", "*");
// 注册顺序消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
for (MessageExt msg : msgs) {
String content = new String(msg.getBody());
System.out.println("消费顺序消息: " + content
+ ", QueueId: " + msg.getQueueId()
+ ", Thread: " + Thread.currentThread().getName());
// 模拟处理
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 返回成功
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("顺序消费者已启动");
Thread.sleep(60000);
}
public static void main(String[] args) throws Exception {
// 启动消费者
new Thread(() -> {
try {
consumeOrderlyMessage();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
Thread.sleep(1000);
// 发送消息
sendOrderlyMessage();
}
}
输出示例:
发送顺序消息: 创建订单 → Queue: 2
发送顺序消息: 支付订单 → Queue: 2
发送顺序消息: 发货 → Queue: 2
消费顺序消息: ORD123:创建订单, QueueId: 2, Thread: ConsumeMessageThread_1
消费顺序消息: ORD123:支付订单, QueueId: 2, Thread: ConsumeMessageThread_1
消费顺序消息: ORD123:发货, QueueId: 2, Thread: ConsumeMessageThread_1
核心原理:
同一订单ID → Hash → 同一Queue → 单线程消费 → 保证顺序
四、延迟消息:定时任务的最佳实践
4.1 问题场景:订单超时自动取消
用户下单 → 30分钟内未支付 → 自动取消订单
传统方案的问题:
// 方案1:定时任务扫描数据库(性能差)
@Scheduled(fixedDelay = 60000)
public void cancelTimeoutOrders() {
// 每分钟扫描一次数据库
List<Order> orders = orderRepository.findTimeoutOrders();
for (Order order : orders) {
orderService.cancel(order);
}
}
问题:
- 数据库压力大(每分钟扫描)
- 实时性差(最多延迟1分钟)
- 扩展性差(数据库成为瓶颈)
4.2 RocketMQ延迟消息
延迟级别(18个级别):
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
实现示例:
/**
* RocketMQ延迟消息示例:订单超时取消
*/
public class DelayMessageExample {
/**
* 发送延迟消息
*/
public static void sendDelayMessage() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("delay_producer_group");
producer.setNamesrvAddr("localhost:9876");
producer.start();
String orderId = "ORD" + System.currentTimeMillis();
Message message = new Message(
"order_timeout_topic",
"timeout_tag",
orderId.getBytes()
);
// 设置延迟级别:14 = 10分钟
message.setDelayTimeLevel(14);
SendResult result = producer.send(message);
System.out.println("发送延迟消息: 订单ID=" + orderId + ", 10分钟后执行");
System.out.println("发送结果: " + result.getSendStatus());
producer.shutdown();
}
/**
* 消费延迟消息
*/
public static void consumeDelayMessage() throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer_group");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("order_timeout_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
String orderId = new String(msg.getBody());
System.out.println("收到延迟消息: 订单ID=" + orderId);
// 检查订单状态
Order order = orderService.getOrder(orderId);
if (order.getStatus() == OrderStatus.UNPAID) {
// 订单未支付,自动取消
orderService.cancel(orderId);
System.out.println("订单已自动取消: " + orderId);
} else {
System.out.println("订单已支付,无需取消: " + orderId);
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("延迟消息消费者已启动");
}
public static void main(String[] args) throws Exception {
// 启动消费者
consumeDelayMessage();
Thread.sleep(1000);
// 发送延迟消息
sendDelayMessage();
// 等待10分钟
Thread.sleep(600000);
}
}
延迟级别映射:
// RocketMQ预定义的18个延迟级别
private String messageDelayLevel =
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
// 使用方式
message.setDelayTimeLevel(1); // 延迟1秒
message.setDelayTimeLevel(5); // 延迟1分钟
message.setDelayTimeLevel(14); // 延迟10分钟
message.setDelayTimeLevel(16); // 延迟30分钟
message.setDelayTimeLevel(17); // 延迟1小时
message.setDelayTimeLevel(18); // 延迟2小时
五、消息过滤:Tag与SQL92
5.1 Tag过滤
// 生产者:发送带Tag的消息
Message message = new Message(
"order_topic",
"VIP_USER", // Tag:VIP用户
"订单内容".getBytes()
);
producer.send(message);
// 消费者:只消费VIP_USER的消息
consumer.subscribe("order_topic", "VIP_USER");
5.2 SQL92过滤(更灵活)
// 生产者:发送带属性的消息
Message message = new Message("order_topic", "订单内容".getBytes());
message.putUserProperty("amount", "100");
message.putUserProperty("city", "Beijing");
producer.send(message);
// 消费者:使用SQL92过滤
consumer.subscribe(
"order_topic",
MessageSelector.bySql("amount > 50 AND city = 'Beijing'")
);
六、总结
RocketMQ的核心特性:
- 事务消息:分布式事务的最佳实践
- 顺序消息:保证消息的严格顺序
- 延迟消息:定时任务的优雅实现
- 消息过滤:Tag和SQL92灵活过滤
下一篇将深入探讨消息可靠性与分布式事务的完整解决方案。
作者:如月说客 创建时间:2025-11-03 系列文章:消息中间件第一性原理(5/6)