RocketMQ进阶04:顺序消息实战 - 订单状态流转的有序保证

引言:订单状态机 订单状态必须按照固定流程流转: 待支付 → 已支付 → 已发货 → 已完成 使用顺序消息保证状态流转的正确性。 实现代码 // Producer:发送订单状态变更消息 public class OrderStatusProducer { public void changeStatus(Long orderId, String newStatus) { Message msg = new Message("order_status_topic", newStatus.getBytes()); // 使用orderId作为orderKey,保证同一订单消息有序 producer.send(msg, (mqs, msg1, arg) -> { int index = (int) (orderId % mqs.size()); return mqs.get(index); }, orderId); } } // Consumer:顺序处理状态变更 @RocketMQMessageListener( topic = "order_status_topic", consumerGroup = "order_status_group", consumeMode = ConsumeMode.ORDERLY // 顺序消费 ) public class OrderStatusConsumer implements RocketMQListener<String> { @Override public void onMessage(String newStatus) { // 验证状态流转合法性 if (isValidTransition(currentStatus, newStatus)) { updateOrderStatus(orderId, newStatus); } else { log.error("非法状态流转"); } } } 核心要点 orderKey选择:使用orderId保证同一订单有序 状态验证:Consumer端验证状态流转合法性 异常处理:非法状态流转记录日志并告警 本文关键词:顺序消息实战 订单状态 状态机 有序流转

2025-11-14 · maneng

RocketMQ进阶03:顺序消息原理 - 局部有序与全局有序的设计权衡

引言:为什么需要顺序消息? 场景:订单状态流转 创建订单 → 支付订单 → 发货 → 确认收货 如果消息乱序: 支付 → 创建 → 发货 ❌ 业务错误 RocketMQ 提供顺序消息保证消息按照发送顺序消费。 一、顺序消息类型 1.1 全局有序 vs 局部有序 ┌──────────────────────────────────────────────┐ │ 全局有序(Global Order) │ ├──────────────────────────────────────────────┤ │ 特点:所有消息严格按照FIFO顺序 │ │ 实现:单Queue + 单Consumer线程 │ │ 性能:低(吞吐量受限) │ │ 场景:极少使用 │ │ │ │ Queue0: Msg1 → Msg2 → Msg3 → Msg4 │ │ │ └──────────────────────────────────────────────┘ ┌──────────────────────────────────────────────┐ │ 局部有序(Partition Order) │ ├──────────────────────────────────────────────┤ │ 特点:相同Key的消息有序 │ │ 实现:相同Key路由到同一Queue │ │ 性能:高(并发消费) │ │ 场景:常用(订单、用户维度有序) │ │ │ │ Queue0: Order1-Msg1 → Order1-Msg2 │ │ Queue1: Order2-Msg1 → Order2-Msg2 │ │ Queue2: Order3-Msg1 → Order3-Msg2 │ │ │ └──────────────────────────────────────────────┘ 1.2 实现原理 Producer端保证: 1. 相同orderKey的消息发送到同一个Queue 2. 使用MessageQueueSelector选择Queue Broker端保证: 1. 单个Queue内部FIFO存储 2. CommitLog顺序写 Consumer端保证: 1. 单个Queue只被一个Consumer实例消费 2. 单个Queue内的消息单线程顺序消费 二、代码实现 2.1 顺序发送 public class OrderedProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ordered_producer_group"); producer.setNamesrvAddr("localhost:9876"); producer.start(); // 模拟订单消息 List<OrderStep> orderSteps = buildOrders(); for (OrderStep step : orderSteps) { Message msg = new Message( "order_topic", step.getTags(), step.toString().getBytes() ); // 顺序发送:相同orderId的消息发送到同一个Queue SendResult sendResult = producer.send( msg, new MessageQueueSelector() { @Override public MessageQueue select( List<MessageQueue> mqs, Message msg, Object arg ) { Long orderId = (Long) arg; // Hash取模,确保相同orderId路由到同一Queue int index = (int) (orderId % mqs.size()); return mqs.get(index); } }, step.getOrderId() // orderKey ); System.out.printf("发送:%s, Queue=%d%n", step, sendResult.getMessageQueue().getQueueId()); } producer.shutdown(); } // 构建订单步骤 private static List<OrderStep> buildOrders() { List<OrderStep> steps = new ArrayList<>(); // 订单1001 steps.add(new OrderStep(1001L, "创建")); steps.add(new OrderStep(1001L, "支付")); steps.add(new OrderStep(1001L, "发货")); // 订单1002 steps.add(new OrderStep(1002L, "创建")); steps.add(new OrderStep(1002L, "支付")); // 订单1003 steps.add(new OrderStep(1003L, "创建")); return steps; } } class OrderStep { private Long orderId; private String status; public OrderStep(Long orderId, String status) { this.orderId = orderId; this.status = status; } public Long getOrderId() { return orderId; } public String getTags() { return status; } @Override public String toString() { return "Order=" + orderId + ", Status=" + status; } } 2.2 顺序消费 public class OrderedConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer_group"); consumer.setNamesrvAddr("localhost:9876"); consumer.subscribe("order_topic", "*"); // 注册顺序消息监听器 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage( List<MessageExt> msgs, ConsumeOrderlyContext context ) { // 单线程顺序消费 for (MessageExt msg : msgs) { System.out.printf( "消费:Thread=%s, Queue=%d, Msg=%s%n", Thread.currentThread().getName(), msg.getQueueId(), new String(msg.getBody()) ); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("顺序消费启动成功"); } } 输出示例: ...

2025-11-14 · maneng

RocketMQ进阶02:事务消息实战 - 订单支付场景完整实现

引言:真实业务场景 业务需求:用户支付成功后,需要完成以下操作: 支付服务:记录支付流水 订单服务:更新订单状态 积分服务:增加用户积分 通知服务:发送支付成功短信 要求:保证数据一致性,支付成功后其他操作必须成功。 一、系统架构设计 1.1 整体架构 ┌──────────────────────────────────────────────────────┐ │ 订单支付系统架构 │ ├──────────────────────────────────────────────────────┤ │ │ │ 用户端 │ │ │ │ │ │ 1. 发起支付请求 │ │ ▼ │ │ ┌─────────────────────────────────────────────┐ │ │ │ 支付服务(Producer) │ │ │ │ - 调用第三方支付 │ │ │ │ - 记录支付流水(本地事务) │ │ │ │ - 发送事务消息 │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ │ 事务消息 │ │ ▼ │ │ ┌─────────────────────────────────────────────┐ │ │ │ RocketMQ Broker │ │ │ └─────────────────────────────────────────────┘ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ 订单服务 │ │ 积分服务 │ │ 通知服务 │ │ │ │ Consumer │ │ Consumer │ │ Consumer │ │ │ │ │ │ │ │ │ │ │ │ 更新订单 │ │ 增加积分 │ │ 发送短信 │ │ │ │ 状态 │ │ │ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ │ │ │ └──────────────────────────────────────────────────────┘ 1.2 消息流转 支付成功 → 发送Half消息 → 记录支付流水(本地事务) ↓ 事务提交(Commit) ↓ 消息变为可消费状态 ↓ ┌─────────────┼─────────────┐ ▼ ▼ ▼ 订单服务 积分服务 通知服务 更新状态 增加积分 发送短信 二、代码实现 2.1 数据库设计 -- 支付流水表 CREATE TABLE payment_record ( id BIGINT PRIMARY KEY AUTO_INCREMENT, order_id VARCHAR(64) NOT NULL UNIQUE, user_id BIGINT NOT NULL, amount DECIMAL(10,2) NOT NULL, pay_time DATETIME NOT NULL, status VARCHAR(20) NOT NULL, -- SUCCESS, FAILED created_at DATETIME DEFAULT CURRENT_TIMESTAMP, INDEX idx_order_id (order_id) ); -- 订单表 CREATE TABLE orders ( id BIGINT PRIMARY KEY AUTO_INCREMENT, order_id VARCHAR(64) NOT NULL UNIQUE, user_id BIGINT NOT NULL, amount DECIMAL(10,2) NOT NULL, status VARCHAR(20) NOT NULL, -- CREATED, PAID, CANCELLED created_at DATETIME DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); -- 积分表 CREATE TABLE user_points ( user_id BIGINT PRIMARY KEY, points INT NOT NULL DEFAULT 0, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ); 2.2 支付服务实现 @Service public class PaymentService { @Autowired private TransactionMQProducer transactionProducer; @Autowired private PaymentRecordMapper paymentRecordMapper; /** * 处理支付请求 */ public PaymentResult processPay(PaymentRequest request) { String orderId = request.getOrderId(); BigDecimal amount = request.getAmount(); // 1. 调用第三方支付接口 ThirdPartyPayResult payResult = callThirdPartyPay(orderId, amount); if (!payResult.isSuccess()) { return PaymentResult.failed("支付失败:" + payResult.getMessage()); } // 2. 构建事务消息 PaymentEvent event = new PaymentEvent(); event.setOrderId(orderId); event.setUserId(request.getUserId()); event.setAmount(amount); event.setPayTime(new Date()); Message message = new Message( "payment_success_topic", "pay", orderId, // Key JSON.toJSONString(event).getBytes(StandardCharsets.UTF_8) ); // 3. 发送事务消息 try { TransactionSendResult sendResult = transactionProducer.sendMessageInTransaction( message, event // 传递给executeLocalTransaction ); if (sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE) { return PaymentResult.success("支付成功"); } else { return PaymentResult.failed("支付记录失败"); } } catch (Exception e) { log.error("发送事务消息失败", e); return PaymentResult.failed("系统异常"); } } /** * 调用第三方支付(模拟) */ private ThirdPartyPayResult callThirdPartyPay(String orderId, BigDecimal amount) { // 实际调用支付宝、微信等支付接口 log.info("调用第三方支付:订单={}, 金额={}", orderId, amount); return ThirdPartyPayResult.success(); } } /** * 事务消息监听器 */ @Component public class PaymentTransactionListener implements TransactionListener { @Autowired private PaymentRecordMapper paymentRecordMapper; /** * 执行本地事务:记录支付流水 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { PaymentEvent event = (PaymentEvent) arg; String orderId = event.getOrderId(); try { // 1. 插入支付记录 PaymentRecord record = new PaymentRecord(); record.setOrderId(orderId); record.setUserId(event.getUserId()); record.setAmount(event.getAmount()); record.setPayTime(event.getPayTime()); record.setStatus("SUCCESS"); int rows = paymentRecordMapper.insert(record); if (rows > 0) { log.info("支付记录成功:{}", orderId); return LocalTransactionState.COMMIT_MESSAGE; } else { log.error("支付记录失败:{}", orderId); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (DuplicateKeyException e) { // 重复记录,说明已经成功 log.warn("支付记录重复:{}", orderId); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error("支付记录异常:{}", orderId, e); // 返回UNKNOW,等待回查 return LocalTransactionState.UNKNOW; } } /** * 回查事务状态 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String orderId = msg.getKeys(); try { // 查询支付记录是否存在 PaymentRecord record = paymentRecordMapper.selectByOrderId(orderId); if (record != null && "SUCCESS".equals(record.getStatus())) { log.info("回查成功:支付记录存在,订单={}", orderId); return LocalTransactionState.COMMIT_MESSAGE; } else { log.warn("回查失败:支付记录不存在,订单={}", orderId); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { log.error("回查异常:{}", orderId, e); return LocalTransactionState.UNKNOW; } } } 2.3 订单服务实现 @Service public class OrderConsumerService { @Autowired private OrderMapper orderMapper; /** * 消费支付成功消息 */ @RocketMQMessageListener( topic = "payment_success_topic", consumerGroup = "order_consumer_group" ) public class OrderMessageListener implements RocketMQListener<String> { @Override public void onMessage(String message) { PaymentEvent event = JSON.parseObject(message, PaymentEvent.class); String orderId = event.getOrderId(); log.info("收到支付成功消息:{}", orderId); try { // 1. 幂等性检查 Order order = orderMapper.selectByOrderId(orderId); if (order == null) { log.error("订单不存在:{}", orderId); return; } if ("PAID".equals(order.getStatus())) { log.warn("订单已支付,跳过处理:{}", orderId); return; // 幂等性保证 } // 2. 更新订单状态 order.setStatus("PAID"); order.setUpdatedAt(new Date()); int rows = orderMapper.updateById(order); if (rows > 0) { log.info("订单状态更新成功:{}", orderId); } else { log.error("订单状态更新失败:{}", orderId); throw new RuntimeException("更新失败"); } } catch (Exception e) { log.error("处理支付消息异常:{}", orderId, e); throw new RuntimeException(e); // 触发重试 } } } } 2.4 积分服务实现 @Service public class PointsConsumerService { @Autowired private UserPointsMapper userPointsMapper; @Autowired private PointsRecordMapper pointsRecordMapper; /** * 消费支付成功消息,增加积分 */ @RocketMQMessageListener( topic = "payment_success_topic", consumerGroup = "points_consumer_group" ) public class PointsMessageListener implements RocketMQListener<String> { @Override @Transactional public void onMessage(String message) { PaymentEvent event = JSON.parseObject(message, PaymentEvent.class); String orderId = event.getOrderId(); Long userId = event.getUserId(); log.info("收到支付成功消息,增加积分:用户={}, 订单={}", userId, orderId); try { // 1. 幂等性检查:是否已经增加过积分 PointsRecord existing = pointsRecordMapper.selectByOrderId(orderId); if (existing != null) { log.warn("积分已增加,跳过处理:{}", orderId); return; } // 2. 计算积分(1元=1积分) int points = event.getAmount().intValue(); // 3. 增加积分 int rows = userPointsMapper.increasePoints(userId, points); if (rows == 0) { // 用户不存在,创建记录 UserPoints userPoints = new UserPoints(); userPoints.setUserId(userId); userPoints.setPoints(points); userPointsMapper.insert(userPoints); } // 4. 记录积分变更 PointsRecord record = new PointsRecord(); record.setUserId(userId); record.setOrderId(orderId); record.setPoints(points); record.setReason("支付获得"); pointsRecordMapper.insert(record); log.info("积分增加成功:用户={}, 积分={}", userId, points); } catch (DuplicateKeyException e) { // 重复处理,直接返回 log.warn("积分记录重复:{}", orderId); } catch (Exception e) { log.error("积分增加异常:{}", orderId, e); throw new RuntimeException(e); } } } } 三、测试验证 3.1 正常流程测试 @SpringBootTest public class PaymentTransactionTest { @Autowired private PaymentService paymentService; @Test public void testNormalPayment() throws InterruptedException { // 1. 发起支付 PaymentRequest request = new PaymentRequest(); request.setOrderId("ORDER_001"); request.setUserId(10001L); request.setAmount(new BigDecimal("99.00")); PaymentResult result = paymentService.processPay(request); // 2. 验证支付结果 assertTrue(result.isSuccess()); // 3. 等待消息消费(实际生产环境通过监控确认) Thread.sleep(5000); // 4. 验证订单状态 Order order = orderMapper.selectByOrderId("ORDER_001"); assertEquals("PAID", order.getStatus()); // 5. 验证积分 UserPoints points = userPointsMapper.selectByUserId(10001L); assertTrue(points.getPoints() >= 99); } } 3.2 异常场景测试 @Test public void testPaymentFailure() { // 场景1:支付失败 // 预期:不记录支付流水,不发送消息 // 场景2:记录支付流水失败 // 预期:回滚消息,不通知下游服务 // 场景3:Producer宕机 // 预期:回查机制保证消息最终发送 // 场景4:Consumer消费失败 // 预期:自动重试,直到成功 } 四、最佳实践总结 4.1 关键要点 1. 幂等性设计 - 使用唯一键约束 - 消费前检查是否已处理 - 数据库层面保证 2. 本地事务设计 - 尽量简单 - 避免远程调用 - 快速返回 3. 回查逻辑设计 - 通过数据库查询事务状态 - 不依赖缓存 - 保证幂等 4. 异常处理 - 明确的Commit/Rollback - 未知异常返回UNKNOW - 记录详细日志 4.2 注意事项 ❌ 避免: - 本地事务中调用RPC - 本地事务执行时间过长 - 回查逻辑依赖不可靠数据源 ✅ 推荐: - 本地事务只操作数据库 - 控制本地事务在1秒内 - 回查直接查询数据库 五、总结 本文通过订单支付场景,完整展示了事务消息的实战应用: ...

2025-11-14 · maneng

RocketMQ进阶01:事务消息原理与实现 - 分布式事务的终极方案

引言:分布式事务的困境 在分布式系统中,跨服务的事务一致性是一个经典难题: 典型场景:电商下单扣库存 订单服务:创建订单 ✅ 库存服务:扣减库存 ❌(网络超时) 问题:订单已创建,但库存未扣减 → 数据不一致 RocketMQ 的事务消息提供了一种优雅的解决方案。 一、什么是事务消息? 1.1 核心概念 ┌────────────────────────────────────────────┐ │ 事务消息 vs 普通消息 │ ├────────────────────────────────────────────┤ │ │ │ 普通消息: │ │ 发送 → 立即可消费 │ │ │ │ 事务消息: │ │ 发送 → 暂不可消费(Half消息) │ │ → 执行本地事务 │ │ → 提交/回滚 │ │ → 可消费/删除 │ │ │ └────────────────────────────────────────────┘ 1.2 应用场景 场景1:订单创建 + 扣库存 - 订单服务:创建订单(本地事务) - 发送事务消息 → 库存服务扣库存 场景2:支付成功 + 积分增加 - 支付服务:记录支付(本地事务) - 发送事务消息 → 积分服务增加积分 场景3:注册用户 + 发送邮件 - 用户服务:创建用户(本地事务) - 发送事务消息 → 邮件服务发送欢迎邮件 二、事务消息原理 2.1 两阶段提交流程 ┌────────────────────────────────────────────────────────┐ │ 事务消息完整流程 │ ├────────────────────────────────────────────────────────┤ │ │ │ Producer Broker Consumer │ │ │ │ │ │ │ │ 1. 发送Half消息 │ │ │ │ ├───────────────────▶│ │ │ │ │ │ 存储Half消息 │ │ │ │ │ (不可消费) │ │ │ │ 2. 返回成功 │ │ │ │ ◀────────────────────┤ │ │ │ │ │ │ │ │ │ 3. 执行本地事务 │ │ │ │ │ (创建订单) │ │ │ │ │ │ │ │ │ │ 4. Commit/Rollback │ │ │ │ ├───────────────────▶│ │ │ │ │ │ │ │ │ │ │ 如果Commit: │ │ │ │ │ - 转为正常消息 │ │ │ │ │ - 可被消费 │ │ │ │ ├──────────────────▶│ │ │ │ │ │ 5. 消费 │ │ │ │ │ (扣库存) │ │ │ │ │ │ │ │ │ 如果Rollback: │ │ │ │ │ - 删除Half消息 │ │ │ │ │ - 不可消费 │ │ │ │ │ │ │ └────────────────────────────────────────────────────────┘ 2.2 事务回查机制 问题:Producer 执行完本地事务后,来不及发送 Commit 就宕机了怎么办? 解决:Broker 定期回查事务状态 ┌────────────────────────────────────────────┐ │ 事务回查流程 │ ├────────────────────────────────────────────┤ │ │ │ Broker Producer │ │ │ │ │ │ │ 1. 检测到超时的Half消息 │ │ │ │ │ │ │ │ 2. 发起事务状态回查 │ │ │ ├────────────────────────▶│ │ │ │ │ │ │ │ │ 3. 查询本地 │ │ │ │ 事务状态 │ │ │ │ │ │ │ 4. 返回事务状态 │ │ │ │ (COMMIT/ROLLBACK/ │ │ │ │ UNKNOWN) │ │ │ ◀────────────────────────┤ │ │ │ │ │ │ │ 5. 根据状态处理Half消息 │ │ │ │ │ │ └────────────────────────────────────────────┘ 三、代码实现 3.1 Producer 端实现 public class TransactionProducerExample { public static void main(String[] args) throws Exception { // 1. 创建事务消息生产者 TransactionMQProducer producer = new TransactionMQProducer("transaction_producer_group"); producer.setNamesrvAddr("localhost:9876"); // 2. 设置事务监听器 producer.setTransactionListener(new TransactionListener() { /** * 执行本地事务 * @param msg 消息 * @param arg 用户参数 * @return 本地事务执行结果 */ @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String orderId = new String(msg.getBody()); System.out.println("执行本地事务,创建订单:" + orderId); try { // 模拟创建订单 boolean success = createOrder(orderId); if (success) { // 本地事务成功 → Commit System.out.println("订单创建成功,提交事务消息"); return LocalTransactionState.COMMIT_MESSAGE; } else { // 本地事务失败 → Rollback System.out.println("订单创建失败,回滚事务消息"); return LocalTransactionState.ROLLBACK_MESSAGE; } } catch (Exception e) { // 异常 → 未知状态,等待回查 System.out.println("订单创建异常,等待回查:" + e.getMessage()); return LocalTransactionState.UNKNOW; } } /** * 事务状态回查 * @param msg 消息 * @return 事务状态 */ @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String orderId = new String(msg.getBody()); System.out.println("回查事务状态,订单ID:" + orderId); // 查询订单是否创建成功 boolean exists = checkOrderExists(orderId); if (exists) { // 订单存在 → Commit System.out.println("订单已存在,提交事务消息"); return LocalTransactionState.COMMIT_MESSAGE; } else { // 订单不存在 → Rollback System.out.println("订单不存在,回滚事务消息"); return LocalTransactionState.ROLLBACK_MESSAGE; } } }); // 3. 启动生产者 producer.start(); // 4. 发送事务消息 String orderId = "ORDER_" + System.currentTimeMillis(); Message msg = new Message( "order_topic", "create", orderId.getBytes() ); TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null); System.out.println("发送结果:" + sendResult.getSendStatus()); System.out.println("本地事务状态:" + sendResult.getLocalTransactionState()); // 等待回查 Thread.sleep(60000); producer.shutdown(); } // 模拟创建订单 private static boolean createOrder(String orderId) { // 实际业务逻辑:插入订单表 System.out.println("INSERT INTO orders VALUES ('" + orderId + "', ...)"); return true; // 假设成功 } // 检查订单是否存在 private static boolean checkOrderExists(String orderId) { // 实际业务逻辑:查询订单表 System.out.println("SELECT * FROM orders WHERE id = '" + orderId + "'"); return true; // 假设存在 } } 3.2 Consumer 端实现 public class TransactionConsumerExample { public static void main(String[] args) throws Exception { // 1. 创建消费者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("inventory_consumer_group"); consumer.setNamesrvAddr("localhost:9876"); // 2. 订阅Topic consumer.subscribe("order_topic", "*"); // 3. 注册消息监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context ) { for (MessageExt msg : msgs) { String orderId = new String(msg.getBody()); System.out.println("收到订单创建消息:" + orderId); try { // 扣减库存 boolean success = deductInventory(orderId); if (success) { System.out.println("库存扣减成功"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } else { System.out.println("库存扣减失败,重试"); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } catch (Exception e) { System.out.println("库存扣减异常,重试:" + e.getMessage()); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 4. 启动消费者 consumer.start(); System.out.println("库存服务启动成功"); } // 扣减库存 private static boolean deductInventory(String orderId) { // 实际业务逻辑:扣减库存 System.out.println("UPDATE inventory SET stock = stock - 1 WHERE order_id = '" + orderId + "'"); return true; } } 四、存储机制 4.1 Half消息存储 // Half消息存储在特殊Topic中 public static final String SYSTEM_TRANSHALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC"; // 消息发送到Broker时 public PutMessageResult putMessage(MessageExtBrokerInner msgInner) { // 如果是事务消息 if (MessageSysFlag.TRANSACTION_PREPARED_TYPE == tranType) { // 修改Topic为Half Topic msgInner.setTopic(SYSTEM_TRANSHALF_TOPIC); msgInner.setQueueId(0); } // 存储到CommitLog PutMessageResult result = this.commitLog.putMessage(msgInner); return result; } 4.2 Op消息存储 // Op消息用于标记Half消息的状态 public static final String SYSTEM_TRANSOP_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC"; // Commit/Rollback时写入Op消息 public void putOpMessage(MessageExtBrokerInner msgInner, String opType) { msgInner.setTopic(SYSTEM_TRANSOP_TOPIC); msgInner.setBody(opType.getBytes()); // 存储Op消息 this.commitLog.putMessage(msgInner); } // Op消息的作用: // 1. 记录Half消息已被Commit或Rollback // 2. 避免重复回查 4.3 事务状态存储 Half消息 + Op消息 组合判断事务状态: ┌──────────────────────────────────────────┐ │ 事务状态判断逻辑 │ ├──────────────────────────────────────────┤ │ Half消息存在 + Op消息不存在 │ │ → 事务未完成,需要回查 │ │ │ │ Half消息存在 + Op消息=COMMIT │ │ → 事务已提交,消息可消费 │ │ │ │ Half消息存在 + Op消息=ROLLBACK │ │ → 事务已回滚,消息已删除 │ └──────────────────────────────────────────┘ 五、回查机制实现 5.1 回查触发条件 public class TransactionalMessageCheckService { // 回查配置 private static final long TRANSACTION_TIMEOUT = 6000; // 6秒超时 private static final int MAX_CHECK_TIMES = 15; // 最多回查15次 // 定时扫描Half消息 public void check() { try { // 1. 从Half Topic读取消息 GetResult getResult = getHalfMsg(queueId, offset, nums); for (MessageExt msgExt : getResult.getMsg()) { // 2. 检查是否有对应的Op消息 if (isOpMessageExist(msgExt)) { continue; // 已处理,跳过 } // 3. 检查超时时间 long currentTime = System.currentTimeMillis(); long tranTime = msgExt.getStoreTimestamp(); if (currentTime - tranTime < TRANSACTION_TIMEOUT) { continue; // 未超时,跳过 } // 4. 检查回查次数 int checkTimes = msgExt.getReconsumeTimes(); if (checkTimes >= MAX_CHECK_TIMES) { // 超过最大回查次数,强制回滚 this.resolveDiscardMsg(msgExt); continue; } // 5. 发起回查 this.sendCheckMessage(msgExt); } } catch (Exception e) { log.error("Check transaction error", e); } } // 发送回查请求 private void sendCheckMessage(MessageExt msgExt) { // 构建回查请求 CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader(); requestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); requestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); // 发送到Producer this.brokerController.getBroker2Client().checkProducerTransactionState( msgExt.getBornHostString(), requestHeader, msgExt ); } } 5.2 回查响应处理 // Producer端处理回查请求 public class ClientRemotingProcessor implements NettyRequestProcessor { @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) { switch (request.getCode()) { case RequestCode.CHECK_TRANSACTION_STATE: return this.checkTransactionState(ctx, request); default: break; } return null; } // 回查事务状态 private RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) { // 1. 解析请求 CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader( CheckTransactionStateRequestHeader.class); // 2. 获取消息 MessageExt message = MessageDecoder.decode(request.getBody()); // 3. 调用用户的TransactionListener TransactionListener transactionListener = this.getTransactionListener(); if (transactionListener != null) { LocalTransactionState localTransactionState = transactionListener.checkLocalTransaction(message); // 4. 返回事务状态 EndTransactionRequestHeader responseHeader = new EndTransactionRequestHeader(); responseHeader.setTransactionId(requestHeader.getTransactionId()); responseHeader.setCommitLogOffset(requestHeader.getCommitLogOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: responseHeader.setCommitOrRollback( MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: responseHeader.setCommitOrRollback( MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: responseHeader.setCommitOrRollback( MessageSysFlag.TRANSACTION_NOT_TYPE); break; } // 5. 发送响应 this.brokerController.getBroker2Client().endTransactionOneway( message.getBrokerAddr(), responseHeader, "Check transaction state from broker" ); } return null; } } 六、最佳实践 6.1 幂等性保证 // 问题:消息可能被重复消费,如何保证幂等? // 方案1:唯一键约束 public boolean deductInventory(String orderId) { try { // 使用orderId作为唯一键 jdbcTemplate.update( "INSERT INTO inventory_log (order_id, status) VALUES (?, 'DEDUCTED')", orderId ); // 扣减库存 jdbcTemplate.update( "UPDATE inventory SET stock = stock - 1 WHERE product_id = ?", productId ); return true; } catch (DuplicateKeyException e) { // 重复消费,直接返回成功 return true; } } // 方案2:分布式锁 public boolean deductInventory(String orderId) { String lockKey = "inventory:lock:" + orderId; if (redisLock.tryLock(lockKey, 10, TimeUnit.SECONDS)) { try { // 检查是否已处理 if (isProcessed(orderId)) { return true; } // 扣减库存 updateInventory(orderId); // 标记已处理 markAsProcessed(orderId); return true; } finally { redisLock.unlock(lockKey); } } return false; } 6.2 异常处理 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { // 执行本地事务 executeBusinessLogic(msg); return LocalTransactionState.COMMIT_MESSAGE; } catch (BusinessException e) { // 业务异常 → 明确回滚 log.error("Business exception, rollback transaction", e); return LocalTransactionState.ROLLBACK_MESSAGE; } catch (Exception e) { // 未知异常 → 返回UNKNOW,等待回查 log.error("Unknown exception, wait for check", e); return LocalTransactionState.UNKNOW; } } 6.3 性能优化 # Producer配置 # 事务回查最大次数 transactionCheckMax=15 # 回查间隔时间(毫秒) transactionCheckInterval=6000 # 事务消息过期时间(小时) transactionTimeOut=6 # 异步发送(提高吞吐量) sendMsgTimeout=3000 retryTimesWhenSendFailed=2 七、总结与思考 7.1 核心要点 两阶段提交:Half消息 + 本地事务 + Commit/Rollback 回查机制:保证事务最终一致性 存储设计:Half Topic + Op Topic 组合判断状态 幂等性:Consumer端需要保证消息幂等处理 7.2 思考题 为什么需要Op消息? ...

2025-11-14 · maneng

RocketMQ架构08:消息路由与负载均衡 - 智能路由的奥秘

引言:智能路由的艺术 RocketMQ 的路由系统就像交通指挥系统,负责: Producer 找 Broker:发送消息到哪个 Broker? Consumer 找 Broker:从哪个 Broker 拉取消息? 故障转移:Broker 宕机后如何自动切换? 负载均衡:如何均匀分配请求? 今天我们深入剖析这套智能路由系统的设计。 一、路由信息管理 1.1 路由表结构 public class TopicRouteData { // Queue 数据列表 private List<QueueData> queueDatas; // Broker 数据列表 private List<BrokerData> brokerDatas; // 过滤服务器表 private HashMap<String, List<String>> filterServerTable; } // Queue 数据 public class QueueData { private String brokerName; // Broker 名称 private int readQueueNums; // 读队列数 private int writeQueueNums; // 写队列数 private int perm; // 权限 private int topicSynFlag; // 同步标志 } // Broker 数据 public class BrokerData { private String cluster; // 集群名 private String brokerName; // Broker 名称 private HashMap<Long, String> brokerAddrs; // Broker 地址表 // Key: brokerId (0=Master, >0=Slave) // Value: broker 地址 } 路由表示例: ...

2025-11-14 · maneng

RocketMQ架构07:网络通信模型与Remoting模块 - 基于Netty的高性能通信

引言:高性能通信的基石 RocketMQ 的网络通信模块(Remoting)是整个系统的神经网络,负责: Producer → Broker:发送消息 Consumer → Broker:拉取消息 Broker → NameServer:注册与心跳 Broker → Broker:主从同步 今天我们深入剖析这个基于 Netty 的高性能通信模块。 一、Remoting 模块架构 1.1 整体架构 ┌────────────────────────────────────────────────┐ │ Remoting 模块架构 │ ├────────────────────────────────────────────────┤ │ │ │ ┌──────────────────────────────────────────┐ │ │ │ RemotingServer (服务端) │ │ │ │ - NettyRemotingServer │ │ │ │ - 接收客户端请求 │ │ │ └──────────────────────────────────────────┘ │ │ ↕ │ │ ┌──────────────────────────────────────────┐ │ │ │ RemotingClient (客户端) │ │ │ │ - NettyRemotingClient │ │ │ │ - 发送请求到服务端 │ │ │ └──────────────────────────────────────────┘ │ │ ↕ │ │ ┌──────────────────────────────────────────┐ │ │ │ Netty 网络层 │ │ │ │ - EventLoopGroup │ │ │ │ - Channel Pipeline │ │ │ │ - 编解码器 │ │ │ └──────────────────────────────────────────┘ │ │ │ └────────────────────────────────────────────────┘ 1.2 核心组件 // 1. RemotingCommand - 通信协议对象 public class RemotingCommand { private int code; // 请求/响应码 private LanguageCode language; // 语言 private int version; // 版本 private int opaque; // 请求ID private int flag; // 标志位 private String remark; // 备注 private HashMap<String, String> extFields; // 扩展字段 private transient byte[] body; // 消息体 } // 2. NettyRequestProcessor - 请求处理器 public interface NettyRequestProcessor { RemotingCommand processRequest( ChannelHandlerContext ctx, RemotingCommand request ) throws Exception; } // 3. ResponseFuture - 异步响应 public class ResponseFuture { private final int opaque; private final long timeoutMillis; private final InvokeCallback invokeCallback; private final CountDownLatch countDownLatch; private volatile RemotingCommand responseCommand; } 二、请求类型与调用方式 2.1 三种调用方式 ┌─────────────────────────────────────────────┐ │ 三种调用方式对比 │ ├─────────────────────────────────────────────┤ │ │ │ 1. 同步调用 (Sync) │ │ Client Server │ │ │ │ │ │ │ ──── Request ─────────▶ │ │ │ │ (阻塞等待) │ Process │ │ │ │ │ │ │ ◀─── Response ──────── │ │ │ │ │ │ │ 特点:等待响应,吞吐量低 │ │ │ ├─────────────────────────────────────────────┤ │ │ │ 2. 异步调用 (Async) │ │ Client Server │ │ │ │ │ │ │ ──── Request ─────────▶ │ │ │ │ (立即返回) │ Process │ │ │ │ │ │ │ ◀─── Response ──────── │ │ │ │ (回调处理) │ │ │ │ │ 特点:非阻塞,吞吐量高 │ │ │ ├─────────────────────────────────────────────┤ │ │ │ 3. 单向调用 (Oneway) │ │ Client Server │ │ │ │ │ │ │ ──── Request ─────────▶ │ │ │ │ (不等响应) │ Process │ │ │ │ │ │ │ │ 特点:无响应,最高吞吐量 │ │ │ └─────────────────────────────────────────────┘ 2.2 代码示例 // 1. 同步调用 public RemotingCommand invokeSyncImpl( final Channel channel, final RemotingCommand request, final long timeoutMillis ) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { // 创建 ResponseFuture final ResponseFuture responseFuture = new ResponseFuture( channel, opaque, timeoutMillis, null, null ); this.responseTable.put(opaque, responseFuture); // 发送请求 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } responseFuture.setSendRequestOK(false); responseFuture.putResponse(null); } }); // 阻塞等待响应 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(); } else { throw new RemotingSendRequestException(); } } return responseCommand; } finally { this.responseTable.remove(opaque); } } // 2. 异步调用 public void invokeAsyncImpl( final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback ) throws InterruptedException, RemotingTooMuchRequestException, RemotingSendRequestException { final int opaque = request.getOpaque(); // 创建 ResponseFuture(带回调) final ResponseFuture responseFuture = new ResponseFuture( channel, opaque, timeoutMillis, invokeCallback, null ); this.responseTable.put(opaque, responseFuture); // 发送请求 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } // 失败立即回调 responseFuture.setSendRequestOK(false); responseFuture.putResponse(null); responseTable.remove(opaque); responseFuture.executeInvokeCallback(); } }); } // 3. 单向调用 public void invokeOnewayImpl( final Channel channel, final RemotingCommand request, final long timeoutMillis ) throws InterruptedException, RemotingTooMuchRequestException, RemotingSendRequestException { // 设置单向标志 request.markOnewayRPC(); // 直接发送,不等响应 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (!f.isSuccess()) { log.warn("send request failed. channel={}", channel); } } }); } 三、请求码与处理器映射 3.1 常见请求码 public class RequestCode { // Producer 相关 public static final int SEND_MESSAGE = 10; public static final int SEND_MESSAGE_V2 = 310; public static final int SEND_BATCH_MESSAGE = 320; // Consumer 相关 public static final int PULL_MESSAGE = 11; public static final int QUERY_MESSAGE = 12; public static final int QUERY_MESSAGE_BY_KEY = 13; // Broker 相关 public static final int REGISTER_BROKER = 103; public static final int UNREGISTER_BROKER = 104; public static final int HEART_BEAT = 34; // 管理相关 public static final int GET_ALL_TOPIC_CONFIG = 21; public static final int UPDATE_BROKER_CONFIG = 25; } 3.2 处理器注册 public class BrokerController { public void registerProcessor() { // 1. 发送消息处理器 SendMessageProcessor sendProcessor = new SendMessageProcessor(this); this.remotingServer.registerProcessor( RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor ); // 2. 拉取消息处理器 PullMessageProcessor pullProcessor = new PullMessageProcessor(this); this.remotingServer.registerProcessor( RequestCode.PULL_MESSAGE, pullProcessor, this.pullMessageExecutor ); // 3. 查询消息处理器 QueryMessageProcessor queryProcessor = new QueryMessageProcessor(this); this.remotingServer.registerProcessor( RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor ); // 4. 心跳处理器 ClientManageProcessor clientManageProcessor = new ClientManageProcessor(this); this.remotingServer.registerProcessor( RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor ); // 5. 默认处理器 AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this); this.remotingServer.registerDefaultProcessor( adminProcessor, this.adminBrokerExecutor ); } } 四、编解码协议 4.1 协议格式 ┌─────────────────────────────────────────────┐ │ RocketMQ 通信协议格式 │ ├─────────────────────────────────────────────┤ │ │ │ 消息总长度(4字节) │ │ ┌──────────────────────────────┐ │ │ │ Total Length │ │ │ └──────────────────────────────┘ │ │ │ │ 序列化类型 + 头部长度(4字节) │ │ ┌──────────────────────────────┐ │ │ │ [1字节] [3字节] │ │ │ │ SerType Header Length │ │ │ └──────────────────────────────┘ │ │ │ │ 头部数据(变长) │ │ ┌──────────────────────────────┐ │ │ │ Header Data │ │ │ │ (JSON 或 RocketMQ 私有格式) │ │ │ └──────────────────────────────┘ │ │ │ │ 消息体(变长) │ │ ┌──────────────────────────────┐ │ │ │ Body Data │ │ │ └──────────────────────────────┘ │ │ │ └─────────────────────────────────────────────┘ 4.2 编码器实现 public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> { @Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } } } // RemotingCommand 编码 public ByteBuffer encodeHeader() { return encodeHeader(this.body != null ? this.body.length : 0); } public ByteBuffer encodeHeader(final int bodyLength) { // 1. 计算总长度 int length = 4; // 总长度字段本身 // 2. 序列化头部 byte[] headerData; headerData = this.headerEncode(); length += headerData.length; length += bodyLength; // 3. 分配缓冲区 ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // 4. 写入总长度 result.putInt(length); // 5. 写入序列化类型 + 头部长度 result.put(markProtocolType(headerData.length, SerializeType.JSON)); // 6. 写入头部数据 result.put(headerData); result.flip(); return result; } 五、线程模型 5.1 Netty 线程模型 ┌─────────────────────────────────────────────┐ │ Netty Reactor 线程模型 │ ├─────────────────────────────────────────────┤ │ │ │ Boss Group (1个线程) │ │ ┌──────────────────────────────┐ │ │ │ - 接收新连接 │ │ │ │ - 注册到 Worker Group │ │ │ └──────────────────────────────┘ │ │ ↓ │ │ Worker Group (多个线程) │ │ ┌──────────────────────────────┐ │ │ │ - I/O 读写 │ │ │ │ - 编解码 │ │ │ │ - 分发到业务线程池 │ │ │ └──────────────────────────────┘ │ │ ↓ │ │ 业务线程池 (可配置) │ │ ┌──────────────────────────────┐ │ │ │ - SendMessageExecutor │ │ │ │ - PullMessageExecutor │ │ │ │ - QueryMessageExecutor │ │ │ │ - HeartbeatExecutor │ │ │ └──────────────────────────────┘ │ │ │ └─────────────────────────────────────────────┘ 5.2 线程池配置 // Broker 启动时初始化线程池 public class BrokerController { private void initialize() { // 1. 发送消息线程池 this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getSendMessageThreadPoolNums(), // 核心线程数:默认16 this.brokerConfig.getSendMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_") ); // 2. 拉取消息线程池 this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), // 默认16+核心数 this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_") ); // 3. 查询消息线程池 this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getQueryMessageThreadPoolNums(), // 默认8+核心数 this.brokerConfig.getQueryMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.queryThreadPoolQueue, new ThreadFactoryImpl("QueryMessageThread_") ); } } // 配置建议 # broker.conf sendMessageThreadPoolNums=16 # 发送消息线程 pullMessageThreadPoolNums=32 # 拉取消息线程 queryMessageThreadPoolNums=16 # 查询消息线程 六、性能优化 6.1 零拷贝 // 使用 FileRegion 实现零拷贝传输 public class TransferMsgByHeap { public void transferData(SelectMappedBufferResult selectMappedBufferResult, Channel channel) { ByteBuffer byteBuffer = selectMappedBufferResult.getByteBuffer(); // 方式1:堆内存传输(有拷贝) byte[] data = new byte[byteBuffer.remaining()]; byteBuffer.get(data); channel.writeAndFlush(Unpooled.wrappedBuffer(data)); // 方式2:零拷贝传输(推荐) FileRegion fileRegion = new DefaultFileRegion( selectMappedBufferResult.getMappedFile().getFileChannel(), selectMappedBufferResult.getStartOffset(), selectMappedBufferResult.getSize() ); channel.writeAndFlush(fileRegion); } } 6.2 批量发送 // 生产者批量发送 DefaultMQProducer producer = new DefaultMQProducer("group"); producer.start(); List<Message> messages = new ArrayList<>(); for (int i = 0; i < 100; i++) { Message msg = new Message("Topic", "Tag", ("Hello" + i).getBytes()); messages.add(msg); } // 一次网络调用发送100条 SendResult result = producer.send(messages); 6.3 连接复用 // Client 连接池复用 public class NettyRemotingClient { // Channel 缓存表 private final ConcurrentMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<>(); // 获取或创建 Channel private Channel getAndCreateChannel(final String addr) throws InterruptedException { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } return this.createChannel(addr); } // 优势:减少连接建立开销 } 七、总结与思考 7.1 核心要点 Netty 基础:RocketMQ 基于 Netty 构建网络通信 三种调用:同步、异步、单向,满足不同场景 请求处理器:请求码映射到具体处理器 自定义协议:高效的二进制协议 线程模型:Reactor + 业务线程池 7.2 思考题 为什么需要三种调用方式? ...

2025-11-14 · maneng

RocketMQ架构06:索引机制详解 - ConsumeQueue与IndexFile的巧妙设计

引言:索引的魔法 如果说 CommitLog 是一本厚厚的字典,那么 ConsumeQueue 和 IndexFile 就是目录和索引。它们让 RocketMQ 能够: 秒级定位:从百万条消息中快速找到目标 轻量高效:索引占用空间极小 多维查询:支持按 Queue、Key、时间查询 今天我们深入剖析这两种索引的巧妙设计。 一、为什么需要索引? 1.1 没有索引的困境 场景:Consumer 想消费 TopicA 的消息 方案1:遍历 CommitLog(❌) ┌────────────────────────────────────┐ │ CommitLog(所有Topic混存) │ ├────────────────────────────────────┤ │ TopicA-Msg1 │ │ TopicB-Msg1 │ ← 需要跳过 │ TopicC-Msg1 │ ← 需要跳过 │ TopicA-Msg2 │ │ TopicB-Msg2 │ ← 需要跳过 │ ... │ └────────────────────────────────────┘ 问题: 1. 需要扫描所有消息 → 慢 2. 无法按 Queue 过滤 → 低效 3. 无法快速定位 Offset → 不可用 1.2 RocketMQ 的索引方案 ┌─────────────────────────────────────────────┐ │ 双层索引架构 │ ├─────────────────────────────────────────────┤ │ │ │ ConsumeQueue(消费索引) │ │ - 按 Topic-Queue 组织 │ │ - 存储消息在 CommitLog 的位置 │ │ - 支持顺序消费 │ │ │ │ IndexFile(查询索引) │ │ - 按 Key/时间 组织 │ │ - Hash 索引结构 │ │ - 支持随机查询 │ │ │ └─────────────────────────────────────────────┘ 二、ConsumeQueue:消费索引 2.1 文件组织结构 $HOME/store/consumequeue/ ├── TopicA/ # Topic 名称 │ ├── 0/ # Queue ID = 0 │ │ ├── 00000000000000000000 # 第1个文件 │ │ ├── 00000000000600000000 # 第2个文件 │ │ └── ... │ ├── 1/ # Queue ID = 1 │ │ └── ... │ └── ... ├── TopicB/ │ └── ... └── ... 文件大小:600万字节(30万条索引 × 20字节) 文件名:该文件第一条索引的逻辑偏移量 2.2 索引格式 ┌────────────────────────────────────┐ │ 单条索引格式(20字节) │ ├────────────────────────────────────┤ │ CommitLog Offset (8字节) │ ← 消息在 CommitLog 的物理位置 │ Size (4字节) │ ← 消息大小 │ Tag HashCode (8字节) │ ← Tag 哈希值(用于过滤) └────────────────────────────────────┘ 实际示例: ...

2025-11-14 · maneng

RocketMQ架构05:CommitLog深度剖析 - 顺序写的艺术

引言:顺序写的魔力 CommitLog 是 RocketMQ 高性能的核心秘密。它用顺序写实现了: 单机 10 万+ TPS:远超数据库的随机写 亚毫秒级延迟:消息写入延迟 < 1ms 零拷贝读取:MMAP 直接访问磁盘文件 今天我们从源码级别剖析 CommitLog 的实现细节。 一、为什么顺序写这么快? 1.1 随机写 vs 顺序写 ┌──────────────────────────────────────────┐ │ 磁盘写入性能对比 │ ├──────────────────────────────────────────┤ │ │ │ 随机写(数据库): │ │ ┌───┐ ┌───┐ ┌───┐ │ │ │ A │ → │ C │ → │ B │ │ │ └───┘ └───┘ └───┘ │ │ 磁盘块100 磁盘块500 磁盘块200 │ │ │ │ 问题: │ │ - 磁头需要频繁移动 │ │ - IOPS 约 100-200/s │ │ │ ├──────────────────────────────────────────┤ │ │ │ 顺序写(RocketMQ): │ │ ┌───┬───┬───┬───┬───┐ │ │ │ A │ B │ C │ D │ E │ ... │ │ └───┴───┴───┴───┴───┘ │ │ CommitLog 文件 │ │ │ │ 优势: │ │ - 磁头连续移动 │ │ - 吞吐量 约 600MB/s(机械硬盘) │ │ - 吞吐量 约 2GB/s(SSD) │ │ │ └──────────────────────────────────────────┘ 性能差距: ...

2025-11-14 · maneng

RocketMQ架构04:存储引擎深度剖析 - 高性能消息存储的奥秘

引言:存储引擎的设计哲学 RocketMQ 的存储引擎是其高性能的核心。它用极简的设计实现了: 百万级 TPS:单机支持百万级消息吞吐 毫秒级延迟:消息存储延迟 < 1ms TB 级存储:单 Broker 可存储数 TB 消息 零数据丢失:通过刷盘策略保证可靠性 今天我们从第一性原理出发,逐步理解这个存储引擎的巧妙设计。 一、为什么需要这样的存储模型? 1.1 传统数据库存储的问题 方案1:为每个 Queue 建一张表 -- TopicA 的 Queue0 CREATE TABLE topic_a_queue_0 ( offset BIGINT PRIMARY KEY, message BLOB, store_time TIMESTAMP ); -- TopicA 的 Queue1 CREATE TABLE topic_a_queue_1 (...); ... 问题: 1. 表数量爆炸:1000个Topic × 4个Queue = 4000张表 2. 随机写入:不同表的写入是随机I/O → 性能差 3. 数据分散:难以统一管理和备份 1.2 RocketMQ 的解决方案 核心思想:写入集中化 + 读取索引化 ┌─────────────────────────────────────────────────┐ │ RocketMQ 存储模型 │ ├─────────────────────────────────────────────────┤ │ │ │ 所有消息 → 统一写入 CommitLog(顺序写) │ │ ↓ │ │ 异步构建 ConsumeQueue(索引) │ │ ↓ │ │ Consumer 根据索引快速定位 │ │ │ └─────────────────────────────────────────────────┘ 优势: ...

2025-11-13 · maneng

RocketMQ架构03:Broker架构与核心组件 - 消息中枢的内部世界

引言:消息系统的心脏 如果说 NameServer 是 RocketMQ 的"大脑"(路由中心),那么 Broker 就是"心脏"(消息中枢)。它负责: 消息存储:持久化所有消息 消息处理:接收生产者消息,推送给消费者 高可用保障:主从同步、故障转移 性能优化:零拷贝、顺序写、内存映射 今天我们从第一性原理出发,逐步理解 Broker 的内部世界。 一、Broker 的核心职责 1.1 从需求出发 假设我们要设计一个消息存储节点,需要解决哪些问题? 基础需求: 1. 接收消息 → 需要网络通信模块 2. 存储消息 → 需要存储引擎 3. 分发消息 → 需要索引和查询机制 4. 保证可靠 → 需要主从同步 5. 高性能 → 需要优化 I/O Broker 就是围绕这5个核心需求设计的。 1.2 Broker 的三重身份 ┌─────────────────────────────────────┐ │ Broker 的三重身份 │ ├─────────────────────────────────────┤ │ 1. 消息接收器(Producer 视角) │ │ - 接收消息请求 │ │ - 验证权限 │ │ - 返回存储结果 │ ├─────────────────────────────────────┤ │ 2. 消息存储库(系统视角) │ │ - 持久化消息 │ │ - 管理索引 │ │ - 定期清理 │ ├─────────────────────────────────────┤ │ 3. 消息分发器(Consumer 视角) │ │ - 根据订阅关系推送消息 │ │ - 管理消费进度 │ │ - 支持消息重试 │ └─────────────────────────────────────┘ 二、Broker 架构全景 2.1 核心组件架构图 ┌────────────────────────────────────────────────────────┐ │ Broker 节点 │ ├────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ Remoting 模块(网络通信层) │ │ │ │ - NettyServer(接收请求) │ │ │ │ - RequestProcessor(请求处理器) │ │ │ └──────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ Message Store 模块(存储引擎) │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │ CommitLog │ │ConsumeQueue│ │ IndexFile │ │ │ │ │ │ 消息主存储 │ │ 消费索引 │ │ 查询索引 │ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ │ │ │ └──────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ HA 模块(高可用) │ │ │ │ - HAService(主从同步) │ │ │ │ - CommitLogDispatcher(消息分发) │ │ │ └──────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ 其他核心模块 │ │ │ │ - TopicConfigManager(Topic 配置管理) │ │ │ │ - ConsumerOffsetManager(消费进度管理) │ │ │ │ - ScheduleMessageService(延迟消息) │ │ │ │ - TransactionalMessageService(事务消息) │ │ │ └──────────────────────────────────────────────────┘ │ │ │ └────────────────────────────────────────────────────────┘ 2.2 各模块职责详解 1️⃣ Remoting 模块(网络通信) // 核心职责 1. 接收网络请求(基于 Netty) 2. 请求路由与分发 3. 编解码(序列化/反序列化) 4. 返回响应结果 // 处理的请求类型 - SEND_MESSAGE // 发送消息 - PULL_MESSAGE // 拉取消息 - QUERY_MESSAGE // 查询消息 - HEART_BEAT // 心跳 - REGISTER_BROKER // Broker 注册 关键设计: ...

2025-11-13 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...