为什么需要Producer和Consumer?为什么需要Topic和Queue?什么是"至少一次"和"恰好一次"?

本文从零开始,通过手写代码和渐进式演进,深度拆解消息中间件的核心概念。


一、消息模型演进:从点对点到发布订阅

1.1 Level 0:最简模型(内存队列)

场景:单机应用,生产者和消费者在同一个进程。

/**
 * 最简单的消息队列:内存队列
 * 使用Java BlockingQueue实现
 */
public class SimpleMessageQueue {

    // 内存队列(线程安全)
    private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>(1000);

    /**
     * 生产者:发送消息
     */
    public void send(Message message) throws InterruptedException {
        queue.put(message);
        System.out.println("发送消息: " + message);
    }

    /**
     * 消费者:接收消息
     */
    public Message receive() throws InterruptedException {
        Message message = queue.take();
        System.out.println("接收消息: " + message);
        return message;
    }

    /**
     * 消息实体
     */
    @Data
    @AllArgsConstructor
    public static class Message {
        private String id;
        private String content;
        private long timestamp;
    }

    /**
     * 测试代码
     */
    public static void main(String[] args) {
        SimpleMessageQueue mq = new SimpleMessageQueue();

        // 生产者线程
        Thread producer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    Message msg = new Message(
                            "MSG-" + i,
                            "Hello " + i,
                            System.currentTimeMillis()
                    );
                    mq.send(msg);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        // 消费者线程
        Thread consumer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    mq.receive();
                    Thread.sleep(200);  // 消费速度慢于生产速度
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        producer.start();
        consumer.start();
    }
}

输出示例

发送消息: Message(id=MSG-0, content=Hello 0, timestamp=1698765432100)
发送消息: Message(id=MSG-1, content=Hello 1, timestamp=1698765432200)
接收消息: Message(id=MSG-0, content=Hello 0, timestamp=1698765432100)
发送消息: Message(id=MSG-2, content=Hello 2, timestamp=1698765432300)
接收消息: Message(id=MSG-1, content=Hello 1, timestamp=1698765432200)
...

特点

  • ✅ 简单易用
  • ✅ 线程安全(BlockingQueue)
  • ✅ 自动阻塞(队列满时生产者阻塞,队列空时消费者阻塞)
  • ❌ 进程内(应用重启消息丢失)
  • ❌ 无持久化
  • ❌ 单消费者

1.2 Level 1:点对点模型(Queue)

特点:一条消息只能被一个消费者消费。

/**
 * 点对点模型:多个消费者竞争消费消息
 */
public class PointToPointQueue {

    private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>(1000);
    private final AtomicLong messageCount = new AtomicLong(0);

    /**
     * 生产者:发送消息到队列
     */
    public void send(String content) throws InterruptedException {
        Message message = new Message(
                "MSG-" + messageCount.incrementAndGet(),
                content,
                System.currentTimeMillis()
        );
        queue.put(message);
        System.out.println("[Producer] 发送消息: " + message.getId());
    }

    /**
     * 消费者:从队列接收消息
     * 多个消费者竞争消费,每条消息只被一个消费者处理
     */
    public Message receive(String consumerName) throws InterruptedException {
        Message message = queue.take();
        System.out.println("[" + consumerName + "] 接收消息: " + message.getId());
        return message;
    }

    /**
     * 测试:多个消费者竞争消费
     */
    public static void main(String[] args) {
        PointToPointQueue mq = new PointToPointQueue();

        // 1个生产者
        Thread producer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    mq.send("Message " + i);
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        // 3个消费者(竞争消费)
        for (int i = 1; i <= 3; i++) {
            final String consumerName = "Consumer-" + i;
            new Thread(() -> {
                while (true) {
                    try {
                        mq.receive(consumerName);
                        Thread.sleep(300);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }).start();
        }

        producer.start();
    }
}

输出示例

[Producer] 发送消息: MSG-1
[Consumer-1] 接收消息: MSG-1  ← Consumer-1消费
[Producer] 发送消息: MSG-2
[Consumer-2] 接收消息: MSG-2  ← Consumer-2消费
[Producer] 发送消息: MSG-3
[Consumer-3] 接收消息: MSG-3  ← Consumer-3消费
[Producer] 发送消息: MSG-4
[Consumer-1] 接收消息: MSG-4  ← Consumer-1再次消费
...

特点

  • ✅ 负载均衡:多个消费者分担负载
  • ✅ 水平扩展:增加消费者可以提升吞吐量
  • ❌ 一条消息只能被一个消费者处理

适用场景

  • 任务队列(每个任务只需要执行一次)
  • 订单处理(每个订单只需要处理一次)
  • 邮件发送(每封邮件只需要发送一次)

1.3 Level 2:发布订阅模型(Topic)

特点:一条消息可以被多个订阅者消费。

/**
 * 发布订阅模型:一条消息被多个订阅者消费
 */
public class PubSubTopic {

    // Topic → 多个订阅者
    private final Map<String, List<Subscriber>> topicSubscribers = new ConcurrentHashMap<>();

    /**
     * 订阅者接口
     */
    public interface Subscriber {
        void onMessage(Message message);
    }

    /**
     * 订阅Topic
     */
    public void subscribe(String topic, Subscriber subscriber) {
        topicSubscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>())
                .add(subscriber);
        System.out.println("[" + subscriber.getClass().getSimpleName() + "] 订阅Topic: " + topic);
    }

    /**
     * 发布消息到Topic
     */
    public void publish(String topic, String content) {
        Message message = new Message(
                "MSG-" + System.nanoTime(),
                content,
                System.currentTimeMillis()
        );

        System.out.println("[Publisher] 发布消息到Topic: " + topic + ", 消息: " + message.getId());

        // 通知所有订阅者
        List<Subscriber> subscribers = topicSubscribers.get(topic);
        if (subscribers != null) {
            for (Subscriber subscriber : subscribers) {
                // 异步通知(实际MQ会用线程池)
                new Thread(() -> subscriber.onMessage(message)).start();
            }
        }
    }

    /**
     * 测试:多个订阅者订阅同一个Topic
     */
    public static void main(String[] args) throws InterruptedException {
        PubSubTopic topic = new PubSubTopic();

        // 订阅者1:库存服务
        topic.subscribe("order.created", message -> {
            System.out.println("[InventoryService] 接收消息: " + message.getId() + ", 扣减库存");
            try {
                Thread.sleep(200);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // 订阅者2:积分服务
        topic.subscribe("order.created", message -> {
            System.out.println("[PointsService] 接收消息: " + message.getId() + ", 增加积分");
            try {
                Thread.sleep(150);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // 订阅者3:通知服务
        topic.subscribe("order.created", message -> {
            System.out.println("[NotificationService] 接收消息: " + message.getId() + ", 发送通知");
            try {
                Thread.sleep(300);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // 发布消息
        for (int i = 1; i <= 3; i++) {
            topic.publish("order.created", "订单-" + i + "已创建");
            Thread.sleep(500);
        }

        Thread.sleep(2000);
    }
}

输出示例

[InventoryService] 订阅Topic: order.created
[PointsService] 订阅Topic: order.created
[NotificationService] 订阅Topic: order.created

[Publisher] 发布消息到Topic: order.created, 消息: MSG-1234567890
[InventoryService] 接收消息: MSG-1234567890, 扣减库存
[PointsService] 接收消息: MSG-1234567890, 增加积分
[NotificationService] 接收消息: MSG-1234567890, 发送通知

[Publisher] 发布消息到Topic: order.created, 消息: MSG-1234567891
[InventoryService] 接收消息: MSG-1234567891, 扣减库存
[PointsService] 接收消息: MSG-1234567891, 增加积分
[NotificationService] 接收消息: MSG-1234567891, 发送通知
...

特点

  • ✅ 一条消息可以被多个订阅者消费
  • ✅ 新增订阅者无需修改发布者
  • ✅ 完全解耦:发布者不知道订阅者是谁

适用场景

  • 事件通知(订单创建事件通知多个服务)
  • 日志收集(应用日志发送到多个分析系统)
  • 数据同步(数据库变更同步到搜索引擎、缓存)

1.4 对比:点对点 vs 发布订阅

特性点对点(Queue)发布订阅(Topic)
消费者数量多个消费者竞争消费多个订阅者都会收到
消息消费一条消息只被一个消费者处理一条消息被所有订阅者处理
消息持久化消息消费后删除消息消费后保留(Kafka)
负载均衡自动负载均衡每个订阅者独立处理
适用场景任务队列、订单处理事件通知、日志收集
典型MQRabbitMQ QueueKafka Topic

二、核心概念详解

2.1 Producer(生产者)

职责

  1. 创建消息
  2. 发送消息到Broker
  3. 等待确认(可选)
/**
 * 生产者实现(支持同步/异步发送)
 */
public class Producer {

    private final MessageBroker broker;

    public Producer(MessageBroker broker) {
        this.broker = broker;
    }

    /**
     * 同步发送:等待Broker确认
     */
    public SendResult sendSync(String topic, String content) {
        Message message = new Message(generateId(), content, System.currentTimeMillis());

        try {
            // 发送消息并等待确认
            boolean success = broker.send(topic, message);

            if (success) {
                return SendResult.success(message.getId());
            } else {
                return SendResult.failure(message.getId(), "Broker拒绝");
            }
        } catch (Exception e) {
            return SendResult.failure(message.getId(), e.getMessage());
        }
    }

    /**
     * 异步发送:立即返回,通过回调通知结果
     */
    public void sendAsync(String topic, String content, SendCallback callback) {
        Message message = new Message(generateId(), content, System.currentTimeMillis());

        // 异步发送
        new Thread(() -> {
            try {
                boolean success = broker.send(topic, message);
                if (success) {
                    callback.onSuccess(SendResult.success(message.getId()));
                } else {
                    callback.onException(new Exception("Broker拒绝"));
                }
            } catch (Exception e) {
                callback.onException(e);
            }
        }).start();
    }

    /**
     * 批量发送:提升吞吐量
     */
    public List<SendResult> sendBatch(String topic, List<String> contents) {
        List<Message> messages = contents.stream()
                .map(content -> new Message(generateId(), content, System.currentTimeMillis()))
                .collect(Collectors.toList());

        return broker.sendBatch(topic, messages);
    }

    private String generateId() {
        return "MSG-" + System.nanoTime();
    }

    /**
     * 发送回调接口
     */
    public interface SendCallback {
        void onSuccess(SendResult result);
        void onException(Exception e);
    }

    /**
     * 发送结果
     */
    @Data
    @AllArgsConstructor
    public static class SendResult {
        private boolean success;
        private String messageId;
        private String errorMessage;

        public static SendResult success(String messageId) {
            return new SendResult(true, messageId, null);
        }

        public static SendResult failure(String messageId, String errorMessage) {
            return new SendResult(false, messageId, errorMessage);
        }
    }
}

2.2 Consumer(消费者)

职责

  1. 从Broker拉取消息(Pull)或接收推送(Push)
  2. 处理消息
  3. 发送确认(ACK)
/**
 * 消费者实现(支持Push/Pull模式)
 */
public abstract class Consumer {

    private final MessageBroker broker;
    private final String consumerGroup;
    private final Set<String> subscribedTopics = new ConcurrentHashSet<>();

    public Consumer(MessageBroker broker, String consumerGroup) {
        this.broker = broker;
        this.consumerGroup = consumerGroup;
    }

    /**
     * 订阅Topic
     */
    public void subscribe(String topic) {
        subscribedTopics.add(topic);
        System.out.println("[" + consumerGroup + "] 订阅Topic: " + topic);
    }

    /**
     * Pull模式:消费者主动拉取消息
     */
    public void pullMessages() {
        for (String topic : subscribedTopics) {
            List<Message> messages = broker.pull(topic, consumerGroup, 10);

            for (Message message : messages) {
                try {
                    // 处理消息
                    handleMessage(message);

                    // 手动ACK
                    broker.ack(topic, message.getId(), consumerGroup);

                } catch (Exception e) {
                    System.err.println("[" + consumerGroup + "] 处理消息失败: " + e.getMessage());
                    // 不ACK,消息会重新投递
                }
            }
        }
    }

    /**
     * Push模式:Broker主动推送消息
     */
    public void onPushMessage(Message message) {
        try {
            // 处理消息
            handleMessage(message);

            // 自动ACK(由框架处理)
        } catch (Exception e) {
            System.err.println("[" + consumerGroup + "] 处理消息失败: " + e.getMessage());
            throw e;  // 抛出异常,框架会重试
        }
    }

    /**
     * 子类实现:处理消息的业务逻辑
     */
    protected abstract void handleMessage(Message message) throws Exception;
}

2.3 Broker(消息代理)

职责

  1. 接收生产者发送的消息
  2. 存储消息(内存/磁盘)
  3. 将消息分发给消费者
  4. 管理消费进度
/**
 * 消息代理(简化版)
 */
public class MessageBroker {

    // Topic → 消息队列
    private final Map<String, BlockingQueue<Message>> topicQueues = new ConcurrentHashMap<>();

    // Topic → ConsumerGroup → 消费进度
    private final Map<String, Map<String, Long>> consumerOffsets = new ConcurrentHashMap<>();

    /**
     * 发送消息到Topic
     */
    public boolean send(String topic, Message message) {
        BlockingQueue<Message> queue = topicQueues.computeIfAbsent(
                topic,
                k -> new LinkedBlockingQueue<>(10000)
        );

        try {
            queue.put(message);
            System.out.println("[Broker] 接收消息: " + message.getId() + " → Topic: " + topic);
            return true;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    /**
     * 批量发送消息
     */
    public List<Producer.SendResult> sendBatch(String topic, List<Message> messages) {
        return messages.stream()
                .map(msg -> {
                    boolean success = send(topic, msg);
                    return success
                            ? Producer.SendResult.success(msg.getId())
                            : Producer.SendResult.failure(msg.getId(), "发送失败");
                })
                .collect(Collectors.toList());
    }

    /**
     * 消费者拉取消息
     */
    public List<Message> pull(String topic, String consumerGroup, int maxCount) {
        BlockingQueue<Message> queue = topicQueues.get(topic);
        if (queue == null) {
            return Collections.emptyList();
        }

        List<Message> messages = new ArrayList<>();
        queue.drainTo(messages, maxCount);

        System.out.println("[Broker] " + consumerGroup + " 拉取 " + messages.size() + " 条消息");
        return messages;
    }

    /**
     * 消费者确认消息
     */
    public void ack(String topic, String messageId, String consumerGroup) {
        // 简化处理:更新消费进度
        System.out.println("[Broker] " + consumerGroup + " 确认消息: " + messageId);
    }
}

三、消息传递语义

3.1 至多一次(At Most Once)

语义:消息可能丢失,但绝不重复。

实现方式

生产者 → Broker(不确认)
Broker → 消费者(不确认)

特点

  • ✅ 性能最高(无需等待确认)
  • ❌ 可靠性最低(消息可能丢失)

适用场景

  • 日志收集(允许少量日志丢失)
  • 性能监控(允许少量数据丢失)

3.2 至少一次(At Least Once)

语义:消息不会丢失,但可能重复。

实现方式

生产者 → Broker(等待确认,失败重试)
Broker → 消费者(手动ACK,失败重试)

特点

  • ✅ 可靠性高(消息不丢失)
  • ❌ 可能重复(需要消费者幂等处理)

适用场景

  • 订单处理(不能丢失订单,幂等处理避免重复)
  • 支付通知(不能丢失支付,幂等处理避免重复扣款)

幂等处理示例

/**
 * 幂等消费者:防止消息重复处理
 */
public class IdempotentConsumer extends Consumer {

    // 已处理的消息ID(使用Redis或数据库)
    private final Set<String> processedMessageIds = new ConcurrentHashSet<>();

    @Override
    protected void handleMessage(Message message) throws Exception {
        // 检查是否已处理
        if (processedMessageIds.contains(message.getId())) {
            System.out.println("[IdempotentConsumer] 消息已处理,跳过: " + message.getId());
            return;
        }

        // 处理消息
        System.out.println("[IdempotentConsumer] 处理消息: " + message.getId());
        processBusinessLogic(message);

        // 记录已处理
        processedMessageIds.add(message.getId());
    }

    private void processBusinessLogic(Message message) {
        // 业务逻辑
    }
}

3.3 恰好一次(Exactly Once)

语义:消息不会丢失,也不会重复。

实现方式

生产者 → Broker(事务/幂等)
Broker → 消费者(事务/幂等)

实现方案

  1. 事务消息(RocketMQ)
  2. 幂等生产者 + 幂等消费者(Kafka)
  3. 分布式事务(2PC、TCC)

特点

  • ✅ 可靠性最高(消息不丢失、不重复)
  • ❌ 性能最低(需要额外的事务开销)
  • ❌ 实现复杂(需要分布式事务支持)

适用场景

  • 金融交易(严格的一致性要求)
  • 账户余额(不能重复扣款)

四、总结

本文深入剖析了消息中间件的核心概念:

  1. 消息模型

    • 点对点(Queue):一条消息只被一个消费者处理
    • 发布订阅(Topic):一条消息被多个订阅者处理
  2. 核心组件

    • Producer:创建和发送消息
    • Consumer:接收和处理消息
    • Broker:存储和分发消息
  3. 消息传递语义

    • 至多一次:性能最高,可能丢失
    • 至少一次:不会丢失,可能重复
    • 恰好一次:既不丢失也不重复,实现复杂

下一篇将深入RabbitMQ的架构与原理。


作者:如月说客 创建时间:2025-11-03 系列文章:消息中间件第一性原理(2/6) 字数:约8,000字