消息可靠性与分布式事务:从理论到实践
如何保证消息100%不丢失?如何处理消息重复?如何实现最终一致性? 本文系统性拆解消息可靠性的核心问题和解决方案。 一、CAP与BASE理论 1.1 CAP理论 CAP三者只能选其二: C(Consistency):一致性 └─ 所有节点在同一时间看到相同的数据 A(Availability):可用性 └─ 每个请求都能收到响应(成功或失败) P(Partition Tolerance):分区容错性 └─ 系统在网络分区时仍能继续工作 在分布式系统中,P是必选项(网络分区不可避免) 因此只能在C和A之间权衡: ├─ CP:牺牲可用性,保证一致性(如ZooKeeper) └─ AP:牺牲一致性,保证可用性(如Cassandra) 消息队列的选择: RabbitMQ(CP倾向): - 镜像队列:主从同步复制 - 牺牲可用性:主节点故障时,等待新主选举 Kafka(CA倾向,实际AP): - ISR机制:异步复制 - 牺牲强一致性:允许短暂的数据不一致 RocketMQ(CP倾向): - 主从同步/异步可配置 - 牺牲可用性:主节点故障时,可能短暂不可用 1.2 BASE理论 BASE理论:对CAP的补充,是AP的实践方案 BA(Basically Available):基本可用 └─ 系统在出现故障时,允许损失部分可用性 └─ 例如:响应时间增加、功能降级 S(Soft State):软状态 └─ 允许系统中的数据存在中间状态 └─ 例如:订单状态从"创建"到"已支付"有延迟 E(Eventually Consistent):最终一致性 └─ 系统中的所有数据副本,在一段时间后最终能达到一致 └─ 例如:订单创建后,库存最终会扣减 核心思想: 牺牲强一致性,换取高可用性和高性能 二、消息可靠性的三个维度 2.1 生产端:确保消息发送成功 问题场景: Producer → MQ(网络故障) ↓ 消息丢失 解决方案: 方案1:同步发送 + 重试 /** * 同步发送:等待确认 */ public class ProducerReliability { public void sendWithRetry(Message message) { int maxRetries = 3; int retryCount = 0; while (retryCount < maxRetries) { try { // 同步发送,等待确认 SendResult result = producer.send(message); if (result.getSendStatus() == SendStatus.SEND_OK) { System.out.println("消息发送成功: " + message.getKeys()); return; } else { System.err.println("消息发送失败: " + result.getSendStatus()); } } catch (Exception e) { System.err.println("消息发送异常: " + e.getMessage()); } retryCount++; System.out.println("重试发送,第" + retryCount + "次"); // 指数退避 try { Thread.sleep((long) Math.pow(2, retryCount) * 1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } // 重试失败,记录到数据库,稍后补偿 saveToFailureLog(message); } private void saveToFailureLog(Message message) { // 记录到数据库或本地文件 System.err.println("消息发送失败,已记录到失败日志"); } } 方案2:本地消息表(最可靠) /** * 本地消息表:保证本地事务和消息发送的最终一致性 */ @Service public class LocalMessageTableService { @Autowired private OrderRepository orderRepository; @Autowired private LocalMessageRepository messageRepository; @Autowired private RocketMQTemplate rocketMQTemplate; /** * 创建订单(本地事务 + 消息表) */ @Transactional public void createOrder(OrderRequest request) { // 1. 创建订单(本地事务) Order order = new Order(); order.setOrderNo(generateOrderNo()); order.setUserId(request.getUserId()); order.setStatus(OrderStatus.PENDING_PAYMENT); orderRepository.save(order); // 2. 保存消息到本地表(同一事务) LocalMessage message = new LocalMessage(); message.setTopic("order.created"); message.setContent(JSON.toJSONString(order)); message.setStatus(MessageStatus.PENDING); messageRepository.save(message); // 提交事务 } /** * 定时任务:扫描未发送的消息 */ @Scheduled(fixedDelay = 5000) public void sendPendingMessages() { List<LocalMessage> messages = messageRepository.findPendingMessages(); for (LocalMessage message : messages) { try { // 发送消息到MQ rocketMQTemplate.syncSend( message.getTopic(), message.getContent() ); // 标记为已发送 message.setStatus(MessageStatus.SENT); messageRepository.save(message); } catch (Exception e) { // 发送失败,下次重试 System.err.println("消息发送失败: " + e.getMessage()); } } } private String generateOrderNo() { return "ORD" + System.currentTimeMillis(); } } 本地消息表的优势: ...