消息中间件核心概念:从生产消费到发布订阅

为什么需要Producer和Consumer?为什么需要Topic和Queue?什么是"至少一次"和"恰好一次"? 本文从零开始,通过手写代码和渐进式演进,深度拆解消息中间件的核心概念。 一、消息模型演进:从点对点到发布订阅 1.1 Level 0:最简模型(内存队列) 场景:单机应用,生产者和消费者在同一个进程。 /** * 最简单的消息队列:内存队列 * 使用Java BlockingQueue实现 */ public class SimpleMessageQueue { // 内存队列(线程安全) private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>(1000); /** * 生产者:发送消息 */ public void send(Message message) throws InterruptedException { queue.put(message); System.out.println("发送消息: " + message); } /** * 消费者:接收消息 */ public Message receive() throws InterruptedException { Message message = queue.take(); System.out.println("接收消息: " + message); return message; } /** * 消息实体 */ @Data @AllArgsConstructor public static class Message { private String id; private String content; private long timestamp; } /** * 测试代码 */ public static void main(String[] args) { SimpleMessageQueue mq = new SimpleMessageQueue(); // 生产者线程 Thread producer = new Thread(() -> { for (int i = 0; i < 10; i++) { try { Message msg = new Message( "MSG-" + i, "Hello " + i, System.currentTimeMillis() ); mq.send(msg); Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); // 消费者线程 Thread consumer = new Thread(() -> { for (int i = 0; i < 10; i++) { try { mq.receive(); Thread.sleep(200); // 消费速度慢于生产速度 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }); producer.start(); consumer.start(); } } 输出示例: ...

2025-11-03 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...