RocketMQ进阶07:消息重试与死信队列 - 失败处理的最佳实践

引言:消费失败怎么办? Consumer消费失败时,RocketMQ自动重试: 消费失败 → 延迟重试 → 多次重试 → 死信队列 重试机制 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs) { try { // 处理消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { // 返回RECONSUME_LATER触发重试 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } // 重试间隔:10s, 30s, 1m, 2m, 3m...最多16次 死信队列 16次重试失败 → 进入死信队列(%DLQ%消费组名) 死信队列处理 // 监听死信队列 @RocketMQMessageListener( topic = "%DLQ%my_consumer_group", consumerGroup = "dlq_handler_group" ) public class DLQHandler implements RocketMQListener<String> { @Override public void onMessage(String msg) { // 人工处理或记录日志 log.error("死信消息:{}", msg); } } 本文关键词:消息重试 死信队列 DLQ 失败处理

2025-11-14 · maneng

RabbitMQ深度解析:AMQP协议与可靠性保证

为什么RabbitMQ需要Exchange?为什么有4种Exchange类型?如何保证消息100%不丢失? 本文深度拆解RabbitMQ的核心设计,建立系统性理解。 一、RabbitMQ核心架构 1.1 核心组件 Producer → Exchange → Binding → Queue → Consumer ↓ ↓ 路由规则 消息存储 核心概念: Producer(生产者):发送消息到Exchange Exchange(交换机):接收消息并路由到Queue Binding(绑定):Exchange和Queue之间的路由规则 Queue(队列):存储消息,等待Consumer消费 Consumer(消费者):从Queue接收消息 为什么需要Exchange? 关键洞察:Exchange实现了路由解耦,生产者不需要知道消息发送到哪个Queue 对比设计: // 没有Exchange(直接发送到Queue) producer.send("order.queue", message); // 生产者需要知道Queue名称 // 有Exchange(通过Exchange路由) producer.send("order.exchange", "order.created", message); // 生产者只需要知道Exchange和Routing Key // 由Exchange决定路由到哪个Queue 1.2 四种Exchange类型 类型1:Direct Exchange(直接交换机) 路由规则:Routing Key完全匹配 /** * Direct Exchange示例:订单系统 */ public class DirectExchangeExample { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 1. 声明Exchange channel.exchangeDeclare( "order.direct.exchange", // Exchange名称 BuiltinExchangeType.DIRECT, // 类型:Direct true, // durable:持久化 false, // autoDelete:不自动删除 null // arguments ); // 2. 声明Queue channel.queueDeclare("order.create.queue", true, false, false, null); channel.queueDeclare("order.cancel.queue", true, false, false, null); // 3. 绑定Exchange和Queue channel.queueBind( "order.create.queue", // Queue名称 "order.direct.exchange", // Exchange名称 "order.create" // Routing Key ); channel.queueBind( "order.cancel.queue", "order.direct.exchange", "order.cancel" ); // 4. 发送消息 String message1 = "创建订单:ORD123"; channel.basicPublish( "order.direct.exchange", // Exchange "order.create", // Routing Key null, message1.getBytes() ); System.out.println("发送消息: " + message1 + " → order.create"); String message2 = "取消订单:ORD124"; channel.basicPublish( "order.direct.exchange", "order.cancel", // 不同的Routing Key null, message2.getBytes() ); System.out.println("发送消息: " + message2 + " → order.cancel"); } } } 路由示意图: ...

2025-11-03 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...