消息中间件第一性原理:为什么需要异步通信?

为什么一个订单创建需要900ms?为什么下游服务故障会导致订单无法创建?为什么秒杀活动会让系统崩溃? 本文从一个真实的电商订单场景出发,用第一性原理思维深度拆解:为什么我们需要消息中间件? 一、引子:订单处理的两种实现 让我们从一个最常见的业务场景开始:用户下单。 场景背景 一个典型的电商订单创建流程包含以下步骤: 创建订单:保存订单到数据库 扣减库存:调用库存服务扣减商品库存 增加积分:调用积分服务为用户增加积分 发送通知:调用通知服务发送订单确认短信/邮件 创建物流:调用物流服务创建配送单 看起来很简单,但在实际生产环境中,这个流程会遇到很多挑战。让我们对比两种实现方式。 1.1 场景A:同步实现(直接调用) 这是大多数初学者的第一反应:依次调用所有服务。 /** * 订单服务 - 同步实现 * 问题:串行等待,响应时间长,系统强耦合 */ @Service @Slf4j public class OrderServiceSync { @Autowired private OrderRepository orderRepository; @Autowired private InventoryServiceClient inventoryService; @Autowired private PointsServiceClient pointsService; @Autowired private NotificationServiceClient notificationService; @Autowired private LogisticsServiceClient logisticsService; /** * 创建订单 - 同步方式 * 总耗时:900ms */ @Transactional public Order createOrder(OrderRequest request) { long startTime = System.currentTimeMillis(); try { // 1. 创建订单(核心业务逻辑,50ms) log.info("开始创建订单,用户ID: {}", request.getUserId()); Order order = buildOrder(request); orderRepository.save(order); log.info("订单创建成功,订单号: {}", order.getOrderNo()); // 2. 同步调用库存服务(网络调用,200ms) log.info("开始扣减库存"); long inventoryStart = System.currentTimeMillis(); try { InventoryDeductRequest inventoryRequest = InventoryDeductRequest.builder() .productId(request.getProductId()) .quantity(request.getQuantity()) .orderId(order.getId()) .build(); inventoryService.deduct(inventoryRequest); log.info("库存扣减成功,耗时: {}ms", System.currentTimeMillis() - inventoryStart); } catch (FeignException e) { log.error("库存扣减失败", e); throw new BusinessException("库存不足或服务不可用"); } // 3. 同步调用积分服务(网络调用,150ms) log.info("开始增加积分"); long pointsStart = System.currentTimeMillis(); try { PointsAddRequest pointsRequest = PointsAddRequest.builder() .userId(request.getUserId()) .points((int) (order.getAmount() / 10)) // 消费10元得1积分 .orderId(order.getId()) .build(); pointsService.add(pointsRequest); log.info("积分增加成功,耗时: {}ms", System.currentTimeMillis() - pointsStart); } catch (FeignException e) { // 积分服务失败不影响订单创建,只记录日志 // 但用户还是要等待150ms log.error("积分增加失败,将稍后重试", e); } // 4. 同步调用通知服务(网络调用 + 第三方API,300ms) log.info("开始发送通知"); long notificationStart = System.currentTimeMillis(); try { NotificationRequest notificationRequest = NotificationRequest.builder() .userId(request.getUserId()) .type(NotificationType.ORDER_CREATED) .content("您的订单" + order.getOrderNo() + "已创建成功") .build(); notificationService.send(notificationRequest); log.info("通知发送成功,耗时: {}ms", System.currentTimeMillis() - notificationStart); } catch (FeignException e) { // 通知失败不影响订单创建,但用户还是要等待300ms log.error("通知发送失败,将稍后重试", e); } // 5. 同步调用物流服务(网络调用,250ms) log.info("开始创建物流单"); long logisticsStart = System.currentTimeMillis(); try { LogisticsCreateRequest logisticsRequest = LogisticsCreateRequest.builder() .orderId(order.getId()) .address(request.getAddress()) .build(); logisticsService.create(logisticsRequest); log.info("物流单创建成功,耗时: {}ms", System.currentTimeMillis() - logisticsStart); } catch (FeignException e) { // 物流失败不影响订单创建,但用户还是要等待250ms log.error("物流单创建失败,将稍后重试", e); } // 记录总耗时 long totalTime = System.currentTimeMillis() - startTime; log.info("订单创建完成,总耗时: {}ms", totalTime); return order; } catch (Exception e) { log.error("订单创建失败", e); throw e; } } private Order buildOrder(OrderRequest request) { Order order = new Order(); order.setOrderNo(generateOrderNo()); order.setUserId(request.getUserId()); order.setProductId(request.getProductId()); order.setQuantity(request.getQuantity()); order.setAmount(calculateAmount(request)); order.setStatus(OrderStatus.PENDING_PAYMENT); order.setCreateTime(LocalDateTime.now()); return order; } private String generateOrderNo() { return "ORD" + System.currentTimeMillis(); } private BigDecimal calculateAmount(OrderRequest request) { // 简化处理,实际应该查询商品价格 return new BigDecimal("100.00").multiply(new BigDecimal(request.getQuantity())); } } 日志输出示例: ...

2025-11-03 · maneng

消息中间件核心概念:从生产消费到发布订阅

为什么需要Producer和Consumer?为什么需要Topic和Queue?什么是"至少一次"和"恰好一次"? 本文从零开始,通过手写代码和渐进式演进,深度拆解消息中间件的核心概念。 一、消息模型演进:从点对点到发布订阅 1.1 Level 0:最简模型(内存队列) 场景:单机应用,生产者和消费者在同一个进程。 /** * 最简单的消息队列:内存队列 * 使用Java BlockingQueue实现 */ public class SimpleMessageQueue { // 内存队列(线程安全) private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>(1000); /** * 生产者:发送消息 */ public void send(Message message) throws InterruptedException { queue.put(message); System.out.println("发送消息: " + message); } /** * 消费者:接收消息 */ public Message receive() throws InterruptedException { Message message = queue.take(); System.out.println("接收消息: " + message); return message; } /** * 消息实体 */ @Data @AllArgsConstructor public static class Message { private String id; private String content; private long timestamp; } /** * 测试代码 */ public static void main(String[] args) { SimpleMessageQueue mq = new SimpleMessageQueue(); // 生产者线程 Thread producer = new Thread(() -> { for (int i = 0; i < 10; i++) { try { Message msg = new Message( "MSG-" + i, "Hello " + i, System.currentTimeMillis() ); mq.send(msg); Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); // 消费者线程 Thread consumer = new Thread(() -> { for (int i = 0; i < 10; i++) { try { mq.receive(); Thread.sleep(200); // 消费速度慢于生产速度 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); producer.start(); consumer.start(); } } 输出示例: ...

2025-11-03 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...