如何保证消息100%不丢失?如何处理消息重复?如何实现最终一致性?
本文系统性拆解消息可靠性的核心问题和解决方案。
一、CAP与BASE理论
1.1 CAP理论
CAP三者只能选其二:
C(Consistency):一致性
└─ 所有节点在同一时间看到相同的数据
A(Availability):可用性
└─ 每个请求都能收到响应(成功或失败)
P(Partition Tolerance):分区容错性
└─ 系统在网络分区时仍能继续工作
在分布式系统中,P是必选项(网络分区不可避免)
因此只能在C和A之间权衡:
├─ CP:牺牲可用性,保证一致性(如ZooKeeper)
└─ AP:牺牲一致性,保证可用性(如Cassandra)
消息队列的选择:
RabbitMQ(CP倾向):
- 镜像队列:主从同步复制
- 牺牲可用性:主节点故障时,等待新主选举
Kafka(CA倾向,实际AP):
- ISR机制:异步复制
- 牺牲强一致性:允许短暂的数据不一致
RocketMQ(CP倾向):
- 主从同步/异步可配置
- 牺牲可用性:主节点故障时,可能短暂不可用
1.2 BASE理论
BASE理论:对CAP的补充,是AP的实践方案
BA(Basically Available):基本可用
└─ 系统在出现故障时,允许损失部分可用性
└─ 例如:响应时间增加、功能降级
S(Soft State):软状态
└─ 允许系统中的数据存在中间状态
└─ 例如:订单状态从"创建"到"已支付"有延迟
E(Eventually Consistent):最终一致性
└─ 系统中的所有数据副本,在一段时间后最终能达到一致
└─ 例如:订单创建后,库存最终会扣减
核心思想:
牺牲强一致性,换取高可用性和高性能
二、消息可靠性的三个维度
2.1 生产端:确保消息发送成功
问题场景:
Producer → MQ(网络故障)
↓
消息丢失
解决方案:
方案1:同步发送 + 重试
/**
* 同步发送:等待确认
*/
public class ProducerReliability {
public void sendWithRetry(Message message) {
int maxRetries = 3;
int retryCount = 0;
while (retryCount < maxRetries) {
try {
// 同步发送,等待确认
SendResult result = producer.send(message);
if (result.getSendStatus() == SendStatus.SEND_OK) {
System.out.println("消息发送成功: " + message.getKeys());
return;
} else {
System.err.println("消息发送失败: " + result.getSendStatus());
}
} catch (Exception e) {
System.err.println("消息发送异常: " + e.getMessage());
}
retryCount++;
System.out.println("重试发送,第" + retryCount + "次");
// 指数退避
try {
Thread.sleep((long) Math.pow(2, retryCount) * 1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
// 重试失败,记录到数据库,稍后补偿
saveToFailureLog(message);
}
private void saveToFailureLog(Message message) {
// 记录到数据库或本地文件
System.err.println("消息发送失败,已记录到失败日志");
}
}
方案2:本地消息表(最可靠)
/**
* 本地消息表:保证本地事务和消息发送的最终一致性
*/
@Service
public class LocalMessageTableService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private LocalMessageRepository messageRepository;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 创建订单(本地事务 + 消息表)
*/
@Transactional
public void createOrder(OrderRequest request) {
// 1. 创建订单(本地事务)
Order order = new Order();
order.setOrderNo(generateOrderNo());
order.setUserId(request.getUserId());
order.setStatus(OrderStatus.PENDING_PAYMENT);
orderRepository.save(order);
// 2. 保存消息到本地表(同一事务)
LocalMessage message = new LocalMessage();
message.setTopic("order.created");
message.setContent(JSON.toJSONString(order));
message.setStatus(MessageStatus.PENDING);
messageRepository.save(message);
// 提交事务
}
/**
* 定时任务:扫描未发送的消息
*/
@Scheduled(fixedDelay = 5000)
public void sendPendingMessages() {
List<LocalMessage> messages = messageRepository.findPendingMessages();
for (LocalMessage message : messages) {
try {
// 发送消息到MQ
rocketMQTemplate.syncSend(
message.getTopic(),
message.getContent()
);
// 标记为已发送
message.setStatus(MessageStatus.SENT);
messageRepository.save(message);
} catch (Exception e) {
// 发送失败,下次重试
System.err.println("消息发送失败: " + e.getMessage());
}
}
}
private String generateOrderNo() {
return "ORD" + System.currentTimeMillis();
}
}
本地消息表的优势:
保证了本地事务和消息发送的最终一致性
- 本地事务成功 → 消息一定会发送(定时任务保证)
- 本地事务失败 → 消息不会发送(事务回滚)
2.2 存储端:确保消息不丢失
问题场景:
MQ收到消息 → Broker宕机
↓
消息丢失
解决方案:
方案1:消息持久化
// RabbitMQ持久化
channel.exchangeDeclare("exchange", "direct", true); // Exchange持久化
channel.queueDeclare("queue", true, false, false, null); // Queue持久化
channel.basicPublish(
"exchange",
"routingKey",
MessageProperties.PERSISTENT_TEXT_PLAIN, // 消息持久化
message.getBytes()
);
// RocketMQ持久化(默认持久化)
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
Message message = new Message("topic", "tag", "content".getBytes());
producer.send(message); // 默认持久化到磁盘
// Kafka持久化(配置副本)
props.put("acks", "all"); // 等待所有副本确认
props.put("min.insync.replicas", 2); // 最少2个副本同步
方案2:集群高可用
RabbitMQ镜像队列:
├─ Master(Broker 1)
├─ Mirror(Broker 2)
└─ Mirror(Broker 3)
Kafka副本机制:
Partition 0
├─ Leader(Broker 1)
├─ Follower(Broker 2)
└─ Follower(Broker 3)
RocketMQ主从复制:
Topic: order
├─ Master(Broker 1)
└─ Slave(Broker 2)
2.3 消费端:确保消息被正确处理
问题场景:
Consumer接收消息 → 处理失败 → ACK
↓
消息丢失
解决方案:
方案1:手动ACK
/**
* RabbitMQ手动ACK
*/
channel.basicConsume("queue", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) {
String message = new String(body);
try {
// 处理消息
processMessage(message);
// 手动ACK
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (Exception e) {
try {
// 拒绝消息,重新入队
channel.basicNack(envelope.getDeliveryTag(), false, true);
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
});
方案2:重试机制
/**
* RocketMQ重试机制
*/
@RocketMQMessageListener(
topic = "order_topic",
consumerGroup = "order_consumer_group",
maxReconsumeTimes = 3 // 最多重试3次
)
public class OrderConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
try {
// 处理消息
processMessage(message);
} catch (Exception e) {
// 抛出异常,触发重试
throw new RuntimeException("处理失败", e);
}
}
private void processMessage(String message) {
// 业务逻辑
}
}
三、消息重复:幂等性设计
3.1 为什么会消息重复?
场景1:生产者重试
Producer发送消息 → Broker收到 → ACK丢失 → Producer重试
↓
消息重复
场景2:消费者重试
Consumer处理成功 → ACK失败 → 重新投递
↓
消息重复
场景3:Broker故障
Broker宕机 → 消息未持久化 → 重新发送
↓
消息重复
核心结论:
在分布式系统中,消息重复不可避免(至少一次语义)
3.2 幂等性设计
定义:多次执行产生的结果与一次执行的结果相同
f(f(x)) = f(x)
方案1:唯一ID去重
/**
* 使用Redis实现唯一ID去重
*/
@Service
public class IdempotentService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 幂等处理消息
*/
public boolean processMessage(Message message) {
String messageId = message.getMessageId();
// 检查是否已处理
String key = "message:processed:" + messageId;
Boolean success = redisTemplate.opsForValue().setIfAbsent(
key,
"1",
Duration.ofDays(7) // 7天过期
);
if (Boolean.FALSE.equals(success)) {
// 消息已处理,跳过
System.out.println("消息已处理,跳过: " + messageId);
return false;
}
try {
// 处理消息
doProcess(message);
System.out.println("消息处理成功: " + messageId);
return true;
} catch (Exception e) {
// 处理失败,删除标记
redisTemplate.delete(key);
throw e;
}
}
private void doProcess(Message message) {
// 业务逻辑
}
}
方案2:数据库唯一索引
/**
* 使用数据库唯一索引实现幂等
*/
@Service
public class IdempotentOrderService {
@Autowired
private OrderRepository orderRepository;
/**
* 创建订单(幂等)
*/
public void createOrder(OrderRequest request) {
Order order = new Order();
order.setOrderNo(request.getOrderNo()); // 订单号(唯一索引)
order.setUserId(request.getUserId());
order.setStatus(OrderStatus.PENDING_PAYMENT);
try {
orderRepository.save(order);
System.out.println("订单创建成功: " + order.getOrderNo());
} catch (DuplicateKeyException e) {
// 订单号重复,说明已创建
System.out.println("订单已存在,跳过: " + request.getOrderNo());
}
}
}
方案3:业务逻辑幂等
/**
* 业务逻辑天然幂等
*/
@Service
public class IdempotentBusinessService {
/**
* 更新订单状态(幂等)
*/
public void updateOrderStatus(String orderNo, OrderStatus newStatus) {
// SQL: UPDATE orders SET status = ? WHERE order_no = ? AND status != ?
int rows = orderRepository.updateStatus(orderNo, newStatus);
if (rows > 0) {
System.out.println("订单状态更新成功: " + orderNo);
} else {
System.out.println("订单状态未变更(可能已经是目标状态): " + orderNo);
}
}
/**
* 账户余额扣减(幂等)
*/
public void deductBalance(String userId, BigDecimal amount, String transactionId) {
// SQL: UPDATE accounts
// SET balance = balance - ?
// WHERE user_id = ?
// AND balance >= ?
// AND NOT EXISTS (
// SELECT 1 FROM transactions WHERE transaction_id = ?
// )
// 原理:
// 1. 检查余额是否充足
// 2. 检查交易ID是否已存在
// 3. 扣减余额并记录交易ID
// 4. 多次执行只会成功一次
}
}
四、消息乱序:顺序性保证
4.1 为什么会消息乱序?
场景1:多分区
Message 1 → Partition 0 → Consumer 1
Message 2 → Partition 1 → Consumer 2(先处理完)
↓
乱序
场景2:多消费者
Message 1 → Consumer 1(慢)
Message 2 → Consumer 2(快)
↓
乱序
场景3:重试
Message 1 → 处理成功
Message 2 → 处理失败 → 重试 → 后处理
↓
乱序
4.2 顺序性保证方案
方案1:单分区 + 单消费者(性能低)
// RocketMQ顺序消息
producer.send(
message,
new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 同一Key进入同一Queue
String key = (String) arg;
int index = key.hashCode() % mqs.size();
return mqs.get(Math.abs(index));
}
},
orderId // 订单ID
);
// 单线程消费
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 单线程处理,保证顺序
}
});
方案2:版本号 + 乐观锁
/**
* 使用版本号保证顺序
*/
@Service
public class OrderSequenceService {
/**
* 更新订单状态(带版本号)
*/
public void updateOrderStatus(String orderNo, OrderStatus newStatus, int expectedVersion) {
// SQL: UPDATE orders
// SET status = ?, version = version + 1
// WHERE order_no = ?
// AND version = ?
int rows = orderRepository.updateWithVersion(orderNo, newStatus, expectedVersion);
if (rows > 0) {
System.out.println("订单状态更新成功");
} else {
System.err.println("订单状态更新失败(版本号不匹配)");
// 可能是乱序消息,丢弃
}
}
}
五、分布式事务的四种方案
5.1 方案1:2PC(两阶段提交)
阶段1:准备(Prepare)
├─ 协调者:发送Prepare请求
├─ 参与者1:执行本地事务,返回YES/NO
└─ 参与者2:执行本地事务,返回YES/NO
阶段2:提交/回滚(Commit/Rollback)
├─ 所有参与者返回YES → 协调者发送Commit
└─ 任何参与者返回NO → 协调者发送Rollback
优势:强一致性
劣势:性能差、阻塞、单点故障
5.2 方案2:TCC(Try-Confirm-Cancel)
Try:预留资源
├─ 订单服务:创建订单(状态:预创建)
└─ 库存服务:锁定库存
Confirm:确认提交
├─ 订单服务:订单状态 → 已创建
└─ 库存服务:扣减库存
Cancel:取消回滚
├─ 订单服务:删除订单
└─ 库存服务:释放库存
优势:性能好、不阻塞
劣势:业务侵入性强
5.3 方案3:Saga
正向流程:
订单服务 → 库存服务 → 积分服务 → 通知服务
补偿流程(任一步骤失败):
订单服务 ← 库存服务 ← 积分服务 ← 通知服务
(取消订单)(释放库存)(回滚积分)
优势:适合长事务、微服务
劣势:需要实现补偿逻辑
5.4 方案4:本地消息表(最佳实践)
订单服务:
├─ 创建订单(本地事务)
├─ 保存消息到本地表(同一事务)
└─ 定时任务发送消息
库存服务:
├─ 消费消息
├─ 扣减库存(幂等)
└─ 返回ACK
优势:简单、可靠、最终一致
劣势:延迟高
六、总结
本文系统性拆解了消息可靠性与分布式事务:
- 理论基础:CAP与BASE理论
- 消息可靠性三个维度:
- 生产端:同步发送 + 重试 + 本地消息表
- 存储端:持久化 + 集群高可用
- 消费端:手动ACK + 重试机制
- 消息重复:幂等性设计(唯一ID、数据库唯一索引、业务逻辑幂等)
- 消息乱序:顺序性保证(单分区、版本号)
- 分布式事务:四种方案(2PC、TCC、Saga、本地消息表)
核心原则:
在分布式系统中,追求强一致性会牺牲性能和可用性
最佳实践:BASE理论 + 最终一致性 + 幂等设计
作者:如月说客 创建时间:2025-11-03 系列文章:消息中间件第一性原理(6/6)
至此,消息中间件第一性原理系列全部完成。感谢阅读!