如何保证消息100%不丢失?如何处理消息重复?如何实现最终一致性?

本文系统性拆解消息可靠性的核心问题和解决方案。


一、CAP与BASE理论

1.1 CAP理论

CAP三者只能选其二

C(Consistency):一致性
    └─ 所有节点在同一时间看到相同的数据

A(Availability):可用性
    └─ 每个请求都能收到响应(成功或失败)

P(Partition Tolerance):分区容错性
    └─ 系统在网络分区时仍能继续工作

在分布式系统中,P是必选项(网络分区不可避免)
因此只能在C和A之间权衡:
├─ CP:牺牲可用性,保证一致性(如ZooKeeper)
└─ AP:牺牲一致性,保证可用性(如Cassandra)

消息队列的选择

RabbitMQ(CP倾向):
- 镜像队列:主从同步复制
- 牺牲可用性:主节点故障时,等待新主选举

Kafka(CA倾向,实际AP):
- ISR机制:异步复制
- 牺牲强一致性:允许短暂的数据不一致

RocketMQ(CP倾向):
- 主从同步/异步可配置
- 牺牲可用性:主节点故障时,可能短暂不可用

1.2 BASE理论

BASE理论:对CAP的补充,是AP的实践方案

BA(Basically Available):基本可用
    └─ 系统在出现故障时,允许损失部分可用性
    └─ 例如:响应时间增加、功能降级

S(Soft State):软状态
    └─ 允许系统中的数据存在中间状态
    └─ 例如:订单状态从"创建"到"已支付"有延迟

E(Eventually Consistent):最终一致性
    └─ 系统中的所有数据副本,在一段时间后最终能达到一致
    └─ 例如:订单创建后,库存最终会扣减

核心思想

牺牲强一致性,换取高可用性和高性能


二、消息可靠性的三个维度

2.1 生产端:确保消息发送成功

问题场景

Producer → MQ(网络故障)
    ↓
消息丢失

解决方案

方案1:同步发送 + 重试

/**
 * 同步发送:等待确认
 */
public class ProducerReliability {

    public void sendWithRetry(Message message) {
        int maxRetries = 3;
        int retryCount = 0;

        while (retryCount < maxRetries) {
            try {
                // 同步发送,等待确认
                SendResult result = producer.send(message);

                if (result.getSendStatus() == SendStatus.SEND_OK) {
                    System.out.println("消息发送成功: " + message.getKeys());
                    return;
                } else {
                    System.err.println("消息发送失败: " + result.getSendStatus());
                }

            } catch (Exception e) {
                System.err.println("消息发送异常: " + e.getMessage());
            }

            retryCount++;
            System.out.println("重试发送,第" + retryCount + "次");

            // 指数退避
            try {
                Thread.sleep((long) Math.pow(2, retryCount) * 1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        // 重试失败,记录到数据库,稍后补偿
        saveToFailureLog(message);
    }

    private void saveToFailureLog(Message message) {
        // 记录到数据库或本地文件
        System.err.println("消息发送失败,已记录到失败日志");
    }
}

方案2:本地消息表(最可靠)

/**
 * 本地消息表:保证本地事务和消息发送的最终一致性
 */
@Service
public class LocalMessageTableService {

    @Autowired
    private OrderRepository orderRepository;

    @Autowired
    private LocalMessageRepository messageRepository;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 创建订单(本地事务 + 消息表)
     */
    @Transactional
    public void createOrder(OrderRequest request) {
        // 1. 创建订单(本地事务)
        Order order = new Order();
        order.setOrderNo(generateOrderNo());
        order.setUserId(request.getUserId());
        order.setStatus(OrderStatus.PENDING_PAYMENT);
        orderRepository.save(order);

        // 2. 保存消息到本地表(同一事务)
        LocalMessage message = new LocalMessage();
        message.setTopic("order.created");
        message.setContent(JSON.toJSONString(order));
        message.setStatus(MessageStatus.PENDING);
        messageRepository.save(message);

        // 提交事务
    }

    /**
     * 定时任务:扫描未发送的消息
     */
    @Scheduled(fixedDelay = 5000)
    public void sendPendingMessages() {
        List<LocalMessage> messages = messageRepository.findPendingMessages();

        for (LocalMessage message : messages) {
            try {
                // 发送消息到MQ
                rocketMQTemplate.syncSend(
                        message.getTopic(),
                        message.getContent()
                );

                // 标记为已发送
                message.setStatus(MessageStatus.SENT);
                messageRepository.save(message);

            } catch (Exception e) {
                // 发送失败,下次重试
                System.err.println("消息发送失败: " + e.getMessage());
            }
        }
    }

    private String generateOrderNo() {
        return "ORD" + System.currentTimeMillis();
    }
}

本地消息表的优势

保证了本地事务和消息发送的最终一致性

  • 本地事务成功 → 消息一定会发送(定时任务保证)
  • 本地事务失败 → 消息不会发送(事务回滚)

2.2 存储端:确保消息不丢失

问题场景

MQ收到消息 → Broker宕机
    ↓
消息丢失

解决方案

方案1:消息持久化

// RabbitMQ持久化
channel.exchangeDeclare("exchange", "direct", true);  // Exchange持久化
channel.queueDeclare("queue", true, false, false, null);  // Queue持久化
channel.basicPublish(
    "exchange",
    "routingKey",
    MessageProperties.PERSISTENT_TEXT_PLAIN,  // 消息持久化
    message.getBytes()
);

// RocketMQ持久化(默认持久化)
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
Message message = new Message("topic", "tag", "content".getBytes());
producer.send(message);  // 默认持久化到磁盘

// Kafka持久化(配置副本)
props.put("acks", "all");  // 等待所有副本确认
props.put("min.insync.replicas", 2);  // 最少2个副本同步

方案2:集群高可用

RabbitMQ镜像队列:
    ├─ Master(Broker 1)
    ├─ Mirror(Broker 2)
    └─ Mirror(Broker 3)

Kafka副本机制:
    Partition 0
        ├─ Leader(Broker 1)
        ├─ Follower(Broker 2)
        └─ Follower(Broker 3)

RocketMQ主从复制:
    Topic: order
        ├─ Master(Broker 1)
        └─ Slave(Broker 2)

2.3 消费端:确保消息被正确处理

问题场景

Consumer接收消息 → 处理失败 → ACK
    ↓
消息丢失

解决方案

方案1:手动ACK

/**
 * RabbitMQ手动ACK
 */
channel.basicConsume("queue", false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(
            String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body) {

        String message = new String(body);

        try {
            // 处理消息
            processMessage(message);

            // 手动ACK
            channel.basicAck(envelope.getDeliveryTag(), false);

        } catch (Exception e) {
            try {
                // 拒绝消息,重新入队
                channel.basicNack(envelope.getDeliveryTag(), false, true);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
});

方案2:重试机制

/**
 * RocketMQ重试机制
 */
@RocketMQMessageListener(
    topic = "order_topic",
    consumerGroup = "order_consumer_group",
    maxReconsumeTimes = 3  // 最多重试3次
)
public class OrderConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        try {
            // 处理消息
            processMessage(message);

        } catch (Exception e) {
            // 抛出异常,触发重试
            throw new RuntimeException("处理失败", e);
        }
    }

    private void processMessage(String message) {
        // 业务逻辑
    }
}

三、消息重复:幂等性设计

3.1 为什么会消息重复?

场景1:生产者重试
Producer发送消息 → Broker收到 → ACK丢失 → Producer重试
    ↓
消息重复

场景2:消费者重试
Consumer处理成功 → ACK失败 → 重新投递
    ↓
消息重复

场景3:Broker故障
Broker宕机 → 消息未持久化 → 重新发送
    ↓
消息重复

核心结论

在分布式系统中,消息重复不可避免(至少一次语义)


3.2 幂等性设计

定义:多次执行产生的结果与一次执行的结果相同

f(f(x)) = f(x)

方案1:唯一ID去重

/**
 * 使用Redis实现唯一ID去重
 */
@Service
public class IdempotentService {

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    /**
     * 幂等处理消息
     */
    public boolean processMessage(Message message) {
        String messageId = message.getMessageId();

        // 检查是否已处理
        String key = "message:processed:" + messageId;
        Boolean success = redisTemplate.opsForValue().setIfAbsent(
                key,
                "1",
                Duration.ofDays(7)  // 7天过期
        );

        if (Boolean.FALSE.equals(success)) {
            // 消息已处理,跳过
            System.out.println("消息已处理,跳过: " + messageId);
            return false;
        }

        try {
            // 处理消息
            doProcess(message);

            System.out.println("消息处理成功: " + messageId);
            return true;

        } catch (Exception e) {
            // 处理失败,删除标记
            redisTemplate.delete(key);
            throw e;
        }
    }

    private void doProcess(Message message) {
        // 业务逻辑
    }
}

方案2:数据库唯一索引

/**
 * 使用数据库唯一索引实现幂等
 */
@Service
public class IdempotentOrderService {

    @Autowired
    private OrderRepository orderRepository;

    /**
     * 创建订单(幂等)
     */
    public void createOrder(OrderRequest request) {
        Order order = new Order();
        order.setOrderNo(request.getOrderNo());  // 订单号(唯一索引)
        order.setUserId(request.getUserId());
        order.setStatus(OrderStatus.PENDING_PAYMENT);

        try {
            orderRepository.save(order);
            System.out.println("订单创建成功: " + order.getOrderNo());

        } catch (DuplicateKeyException e) {
            // 订单号重复,说明已创建
            System.out.println("订单已存在,跳过: " + request.getOrderNo());
        }
    }
}

方案3:业务逻辑幂等

/**
 * 业务逻辑天然幂等
 */
@Service
public class IdempotentBusinessService {

    /**
     * 更新订单状态(幂等)
     */
    public void updateOrderStatus(String orderNo, OrderStatus newStatus) {
        // SQL: UPDATE orders SET status = ? WHERE order_no = ? AND status != ?
        int rows = orderRepository.updateStatus(orderNo, newStatus);

        if (rows > 0) {
            System.out.println("订单状态更新成功: " + orderNo);
        } else {
            System.out.println("订单状态未变更(可能已经是目标状态): " + orderNo);
        }
    }

    /**
     * 账户余额扣减(幂等)
     */
    public void deductBalance(String userId, BigDecimal amount, String transactionId) {
        // SQL: UPDATE accounts
        //      SET balance = balance - ?
        //      WHERE user_id = ?
        //        AND balance >= ?
        //        AND NOT EXISTS (
        //            SELECT 1 FROM transactions WHERE transaction_id = ?
        //        )

        // 原理:
        // 1. 检查余额是否充足
        // 2. 检查交易ID是否已存在
        // 3. 扣减余额并记录交易ID
        // 4. 多次执行只会成功一次
    }
}

四、消息乱序:顺序性保证

4.1 为什么会消息乱序?

场景1:多分区
Message 1 → Partition 0 → Consumer 1
Message 2 → Partition 1 → Consumer 2(先处理完)
    ↓
乱序

场景2:多消费者
Message 1 → Consumer 1(慢)
Message 2 → Consumer 2(快)
    ↓
乱序

场景3:重试
Message 1 → 处理成功
Message 2 → 处理失败 → 重试 → 后处理
    ↓
乱序

4.2 顺序性保证方案

方案1:单分区 + 单消费者(性能低)

// RocketMQ顺序消息
producer.send(
    message,
    new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
            // 同一Key进入同一Queue
            String key = (String) arg;
            int index = key.hashCode() % mqs.size();
            return mqs.get(Math.abs(index));
        }
    },
    orderId  // 订单ID
);

// 单线程消费
consumer.registerMessageListener(new MessageListenerOrderly() {
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
        // 单线程处理,保证顺序
    }
});

方案2:版本号 + 乐观锁

/**
 * 使用版本号保证顺序
 */
@Service
public class OrderSequenceService {

    /**
     * 更新订单状态(带版本号)
     */
    public void updateOrderStatus(String orderNo, OrderStatus newStatus, int expectedVersion) {
        // SQL: UPDATE orders
        //      SET status = ?, version = version + 1
        //      WHERE order_no = ?
        //        AND version = ?

        int rows = orderRepository.updateWithVersion(orderNo, newStatus, expectedVersion);

        if (rows > 0) {
            System.out.println("订单状态更新成功");
        } else {
            System.err.println("订单状态更新失败(版本号不匹配)");
            // 可能是乱序消息,丢弃
        }
    }
}

五、分布式事务的四种方案

5.1 方案1:2PC(两阶段提交)

阶段1:准备(Prepare)
    ├─ 协调者:发送Prepare请求
    ├─ 参与者1:执行本地事务,返回YES/NO
    └─ 参与者2:执行本地事务,返回YES/NO

阶段2:提交/回滚(Commit/Rollback)
    ├─ 所有参与者返回YES → 协调者发送Commit
    └─ 任何参与者返回NO → 协调者发送Rollback

优势:强一致性
劣势:性能差、阻塞、单点故障

5.2 方案2:TCC(Try-Confirm-Cancel)

Try:预留资源
    ├─ 订单服务:创建订单(状态:预创建)
    └─ 库存服务:锁定库存

Confirm:确认提交
    ├─ 订单服务:订单状态 → 已创建
    └─ 库存服务:扣减库存

Cancel:取消回滚
    ├─ 订单服务:删除订单
    └─ 库存服务:释放库存

优势:性能好、不阻塞
劣势:业务侵入性强

5.3 方案3:Saga

正向流程:
    订单服务 → 库存服务 → 积分服务 → 通知服务

补偿流程(任一步骤失败):
    订单服务 ← 库存服务 ← 积分服务 ← 通知服务
    (取消订单)(释放库存)(回滚积分)

优势:适合长事务、微服务
劣势:需要实现补偿逻辑

5.4 方案4:本地消息表(最佳实践)

订单服务:
├─ 创建订单(本地事务)
├─ 保存消息到本地表(同一事务)
└─ 定时任务发送消息

库存服务:
├─ 消费消息
├─ 扣减库存(幂等)
└─ 返回ACK

优势:简单、可靠、最终一致
劣势:延迟高

六、总结

本文系统性拆解了消息可靠性与分布式事务:

  1. 理论基础:CAP与BASE理论
  2. 消息可靠性三个维度
    • 生产端:同步发送 + 重试 + 本地消息表
    • 存储端:持久化 + 集群高可用
    • 消费端:手动ACK + 重试机制
  3. 消息重复:幂等性设计(唯一ID、数据库唯一索引、业务逻辑幂等)
  4. 消息乱序:顺序性保证(单分区、版本号)
  5. 分布式事务:四种方案(2PC、TCC、Saga、本地消息表)

核心原则

在分布式系统中,追求强一致性会牺牲性能和可用性

最佳实践:BASE理论 + 最终一致性 + 幂等设计


作者:如月说客 创建时间:2025-11-03 系列文章:消息中间件第一性原理(6/6)

至此,消息中间件第一性原理系列全部完成。感谢阅读!