RabbitMQ深度解析:AMQP协议与可靠性保证
为什么RabbitMQ需要Exchange?为什么有4种Exchange类型?如何保证消息100%不丢失? 本文深度拆解RabbitMQ的核心设计,建立系统性理解。 一、RabbitMQ核心架构 1.1 核心组件 Producer → Exchange → Binding → Queue → Consumer ↓ ↓ 路由规则 消息存储 核心概念: Producer(生产者):发送消息到Exchange Exchange(交换机):接收消息并路由到Queue Binding(绑定):Exchange和Queue之间的路由规则 Queue(队列):存储消息,等待Consumer消费 Consumer(消费者):从Queue接收消息 为什么需要Exchange? 关键洞察:Exchange实现了路由解耦,生产者不需要知道消息发送到哪个Queue 对比设计: // 没有Exchange(直接发送到Queue) producer.send("order.queue", message); // 生产者需要知道Queue名称 // 有Exchange(通过Exchange路由) producer.send("order.exchange", "order.created", message); // 生产者只需要知道Exchange和Routing Key // 由Exchange决定路由到哪个Queue 1.2 四种Exchange类型 类型1:Direct Exchange(直接交换机) 路由规则:Routing Key完全匹配 /** * Direct Exchange示例:订单系统 */ public class DirectExchangeExample { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 1. 声明Exchange channel.exchangeDeclare( "order.direct.exchange", // Exchange名称 BuiltinExchangeType.DIRECT, // 类型:Direct true, // durable:持久化 false, // autoDelete:不自动删除 null // arguments ); // 2. 声明Queue channel.queueDeclare("order.create.queue", true, false, false, null); channel.queueDeclare("order.cancel.queue", true, false, false, null); // 3. 绑定Exchange和Queue channel.queueBind( "order.create.queue", // Queue名称 "order.direct.exchange", // Exchange名称 "order.create" // Routing Key ); channel.queueBind( "order.cancel.queue", "order.direct.exchange", "order.cancel" ); // 4. 发送消息 String message1 = "创建订单:ORD123"; channel.basicPublish( "order.direct.exchange", // Exchange "order.create", // Routing Key null, message1.getBytes() ); System.out.println("发送消息: " + message1 + " → order.create"); String message2 = "取消订单:ORD124"; channel.basicPublish( "order.direct.exchange", "order.cancel", // 不同的Routing Key null, message2.getBytes() ); System.out.println("发送消息: " + message2 + " → order.cancel"); } } } 路由示意图: ...