RocketMQ入门02:消息队列的本质与演进

引言:从一个简单的队列开始 在计算机科学中,队列(Queue)是最基础的数据结构之一。它遵循 FIFO(First In First Out)原则,就像排队买票一样,先来的先处理。 // 最简单的队列实现 public class SimpleQueue<T> { private LinkedList<T> items = new LinkedList<>(); public void enqueue(T item) { items.addLast(item); // 入队 } public T dequeue() { return items.removeFirst(); // 出队 } } 这个简单的数据结构,竟然是现代消息队列的起源。让我们看看它是如何一步步演进的。 一、演进阶段1:进程内队列 1.1 最初的需求 在单进程程序中,当我们需要解耦生产者和消费者时,最简单的方式就是使用内存队列: // 生产者-消费者模式 public class InMemoryMessageQueue { private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>(); // 生产者线程 class Producer extends Thread { public void run() { while (true) { Message msg = generateMessage(); queue.put(msg); // 阻塞式入队 } } } // 消费者线程 class Consumer extends Thread { public void run() { while (true) { Message msg = queue.take(); // 阻塞式出队 processMessage(msg); } } } } 特点: ...

2025-11-13 · maneng

RocketMQ入门01:为什么需要消息队列

引言:从一个电商下单场景说起 想象这样一个场景:用户在你的电商平台下单购买了一件商品。看似简单的一次点击,背后却触发了一系列复杂的业务流程: 扣减库存 生成订单 扣减用户积分 发送短信通知 发送邮件通知 增加商家销量统计 记录用户行为日志 触发推荐系统更新 如果采用传统的同步调用方式,会是什么样子? 一、同步通信的痛点 1.1 性能瓶颈 // 传统同步调用方式 public OrderResult createOrder(OrderRequest request) { // 核心业务:100ms Order order = orderService.create(request); // 100ms inventoryService.deduct(request.getSkuId()); // 50ms // 非核心业务:累计 500ms+ pointService.deduct(request.getUserId()); // 100ms smsService.send(request.getPhone()); // 200ms emailService.send(request.getEmail()); // 150ms statisticsService.record(order); // 50ms logService.log(order); // 30ms recommendService.update(request.getUserId()); // 100ms return new OrderResult(order); } // 总耗时:730ms+ 问题分析: 用户需要等待 730ms+ 才能看到下单结果 其中只有前 150ms 是核心业务 580ms 都在等待非核心业务完成 1.2 高耦合问题 // 每增加一个下游系统,都要修改订单服务代码 public OrderResult createOrder(OrderRequest request) { // ... 原有代码 // 新需求:增加优惠券系统通知 couponService.notify(order); // 又要改订单服务! // 新需求:增加大数据分析 bigDataService.collect(order); // 又要改订单服务! } 问题分析: ...

2025-11-13 · maneng

消息中间件第一性原理:为什么需要异步通信?

为什么一个订单创建需要900ms?为什么下游服务故障会导致订单无法创建?为什么秒杀活动会让系统崩溃? 本文从一个真实的电商订单场景出发,用第一性原理思维深度拆解:为什么我们需要消息中间件? 一、引子:订单处理的两种实现 让我们从一个最常见的业务场景开始:用户下单。 场景背景 一个典型的电商订单创建流程包含以下步骤: 创建订单:保存订单到数据库 扣减库存:调用库存服务扣减商品库存 增加积分:调用积分服务为用户增加积分 发送通知:调用通知服务发送订单确认短信/邮件 创建物流:调用物流服务创建配送单 看起来很简单,但在实际生产环境中,这个流程会遇到很多挑战。让我们对比两种实现方式。 1.1 场景A:同步实现(直接调用) 这是大多数初学者的第一反应:依次调用所有服务。 /** * 订单服务 - 同步实现 * 问题:串行等待,响应时间长,系统强耦合 */ @Service @Slf4j public class OrderServiceSync { @Autowired private OrderRepository orderRepository; @Autowired private InventoryServiceClient inventoryService; @Autowired private PointsServiceClient pointsService; @Autowired private NotificationServiceClient notificationService; @Autowired private LogisticsServiceClient logisticsService; /** * 创建订单 - 同步方式 * 总耗时:900ms */ @Transactional public Order createOrder(OrderRequest request) { long startTime = System.currentTimeMillis(); try { // 1. 创建订单(核心业务逻辑,50ms) log.info("开始创建订单,用户ID: {}", request.getUserId()); Order order = buildOrder(request); orderRepository.save(order); log.info("订单创建成功,订单号: {}", order.getOrderNo()); // 2. 同步调用库存服务(网络调用,200ms) log.info("开始扣减库存"); long inventoryStart = System.currentTimeMillis(); try { InventoryDeductRequest inventoryRequest = InventoryDeductRequest.builder() .productId(request.getProductId()) .quantity(request.getQuantity()) .orderId(order.getId()) .build(); inventoryService.deduct(inventoryRequest); log.info("库存扣减成功,耗时: {}ms", System.currentTimeMillis() - inventoryStart); } catch (FeignException e) { log.error("库存扣减失败", e); throw new BusinessException("库存不足或服务不可用"); } // 3. 同步调用积分服务(网络调用,150ms) log.info("开始增加积分"); long pointsStart = System.currentTimeMillis(); try { PointsAddRequest pointsRequest = PointsAddRequest.builder() .userId(request.getUserId()) .points((int) (order.getAmount() / 10)) // 消费10元得1积分 .orderId(order.getId()) .build(); pointsService.add(pointsRequest); log.info("积分增加成功,耗时: {}ms", System.currentTimeMillis() - pointsStart); } catch (FeignException e) { // 积分服务失败不影响订单创建,只记录日志 // 但用户还是要等待150ms log.error("积分增加失败,将稍后重试", e); } // 4. 同步调用通知服务(网络调用 + 第三方API,300ms) log.info("开始发送通知"); long notificationStart = System.currentTimeMillis(); try { NotificationRequest notificationRequest = NotificationRequest.builder() .userId(request.getUserId()) .type(NotificationType.ORDER_CREATED) .content("您的订单" + order.getOrderNo() + "已创建成功") .build(); notificationService.send(notificationRequest); log.info("通知发送成功,耗时: {}ms", System.currentTimeMillis() - notificationStart); } catch (FeignException e) { // 通知失败不影响订单创建,但用户还是要等待300ms log.error("通知发送失败,将稍后重试", e); } // 5. 同步调用物流服务(网络调用,250ms) log.info("开始创建物流单"); long logisticsStart = System.currentTimeMillis(); try { LogisticsCreateRequest logisticsRequest = LogisticsCreateRequest.builder() .orderId(order.getId()) .address(request.getAddress()) .build(); logisticsService.create(logisticsRequest); log.info("物流单创建成功,耗时: {}ms", System.currentTimeMillis() - logisticsStart); } catch (FeignException e) { // 物流失败不影响订单创建,但用户还是要等待250ms log.error("物流单创建失败,将稍后重试", e); } // 记录总耗时 long totalTime = System.currentTimeMillis() - startTime; log.info("订单创建完成,总耗时: {}ms", totalTime); return order; } catch (Exception e) { log.error("订单创建失败", e); throw e; } } private Order buildOrder(OrderRequest request) { Order order = new Order(); order.setOrderNo(generateOrderNo()); order.setUserId(request.getUserId()); order.setProductId(request.getProductId()); order.setQuantity(request.getQuantity()); order.setAmount(calculateAmount(request)); order.setStatus(OrderStatus.PENDING_PAYMENT); order.setCreateTime(LocalDateTime.now()); return order; } private String generateOrderNo() { return "ORD" + System.currentTimeMillis(); } private BigDecimal calculateAmount(OrderRequest request) { // 简化处理,实际应该查询商品价格 return new BigDecimal("100.00").multiply(new BigDecimal(request.getQuantity())); } } 日志输出示例: ...

2025-11-03 · maneng

RocketMQ实战指南:阿里巴巴的分布式消息方案

为什么RocketMQ能支撑双11万亿级消息?如何使用事务消息实现分布式事务?如何保证消息的全局顺序? 本文深度剖析RocketMQ的核心特性和实战应用。 一、RocketMQ核心架构 1.1 核心组件 Producer → NameServer → Broker → Consumer ↓ ↓ 路由信息 消息存储 与Kafka的区别: 组件 RocketMQ Kafka 注册中心 NameServer(自研,轻量级) ZooKeeper / KRaft 存储结构 CommitLog + ConsumeQueue Partition日志文件 消息模型 点对点 + 发布订阅 发布订阅 特殊功能 事务消息、延迟消息、顺序消息 流式计算、消息回溯 1.2 存储架构 RocketMQ的三层存储: 1. CommitLog(所有消息) ├─ Message 1(Topic A) ├─ Message 2(Topic B) ├─ Message 3(Topic A) ... 2. ConsumeQueue(消息索引) Topic A ├─ Queue 0 │ ├─ Offset 0 → CommitLog位置100 │ ├─ Offset 1 → CommitLog位置300 │ ... └─ Queue 1 ... 3. IndexFile(消息检索) Key → CommitLog位置 优势: ...

2025-11-03 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...