为什么需要Producer和Consumer?为什么需要Topic和Queue?什么是"至少一次"和"恰好一次"?
本文从零开始,通过手写代码和渐进式演进,深度拆解消息中间件的核心概念。
一、消息模型演进:从点对点到发布订阅
1.1 Level 0:最简模型(内存队列)
场景:单机应用,生产者和消费者在同一个进程。
/**
* 最简单的消息队列:内存队列
* 使用Java BlockingQueue实现
*/
public class SimpleMessageQueue {
// 内存队列(线程安全)
private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>(1000);
/**
* 生产者:发送消息
*/
public void send(Message message) throws InterruptedException {
queue.put(message);
System.out.println("发送消息: " + message);
}
/**
* 消费者:接收消息
*/
public Message receive() throws InterruptedException {
Message message = queue.take();
System.out.println("接收消息: " + message);
return message;
}
/**
* 消息实体
*/
@Data
@AllArgsConstructor
public static class Message {
private String id;
private String content;
private long timestamp;
}
/**
* 测试代码
*/
public static void main(String[] args) {
SimpleMessageQueue mq = new SimpleMessageQueue();
// 生产者线程
Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message(
"MSG-" + i,
"Hello " + i,
System.currentTimeMillis()
);
mq.send(msg);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
// 消费者线程
Thread consumer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
mq.receive();
Thread.sleep(200); // 消费速度慢于生产速度
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
producer.start();
consumer.start();
}
}
输出示例:
发送消息: Message(id=MSG-0, content=Hello 0, timestamp=1698765432100)
发送消息: Message(id=MSG-1, content=Hello 1, timestamp=1698765432200)
接收消息: Message(id=MSG-0, content=Hello 0, timestamp=1698765432100)
发送消息: Message(id=MSG-2, content=Hello 2, timestamp=1698765432300)
接收消息: Message(id=MSG-1, content=Hello 1, timestamp=1698765432200)
...
特点:
- ✅ 简单易用
- ✅ 线程安全(BlockingQueue)
- ✅ 自动阻塞(队列满时生产者阻塞,队列空时消费者阻塞)
- ❌ 进程内(应用重启消息丢失)
- ❌ 无持久化
- ❌ 单消费者
1.2 Level 1:点对点模型(Queue)
特点:一条消息只能被一个消费者消费。
/**
* 点对点模型:多个消费者竞争消费消息
*/
public class PointToPointQueue {
private final BlockingQueue<Message> queue = new LinkedBlockingQueue<>(1000);
private final AtomicLong messageCount = new AtomicLong(0);
/**
* 生产者:发送消息到队列
*/
public void send(String content) throws InterruptedException {
Message message = new Message(
"MSG-" + messageCount.incrementAndGet(),
content,
System.currentTimeMillis()
);
queue.put(message);
System.out.println("[Producer] 发送消息: " + message.getId());
}
/**
* 消费者:从队列接收消息
* 多个消费者竞争消费,每条消息只被一个消费者处理
*/
public Message receive(String consumerName) throws InterruptedException {
Message message = queue.take();
System.out.println("[" + consumerName + "] 接收消息: " + message.getId());
return message;
}
/**
* 测试:多个消费者竞争消费
*/
public static void main(String[] args) {
PointToPointQueue mq = new PointToPointQueue();
// 1个生产者
Thread producer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
mq.send("Message " + i);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
// 3个消费者(竞争消费)
for (int i = 1; i <= 3; i++) {
final String consumerName = "Consumer-" + i;
new Thread(() -> {
while (true) {
try {
mq.receive(consumerName);
Thread.sleep(300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}).start();
}
producer.start();
}
}
输出示例:
[Producer] 发送消息: MSG-1
[Consumer-1] 接收消息: MSG-1 ← Consumer-1消费
[Producer] 发送消息: MSG-2
[Consumer-2] 接收消息: MSG-2 ← Consumer-2消费
[Producer] 发送消息: MSG-3
[Consumer-3] 接收消息: MSG-3 ← Consumer-3消费
[Producer] 发送消息: MSG-4
[Consumer-1] 接收消息: MSG-4 ← Consumer-1再次消费
...
特点:
- ✅ 负载均衡:多个消费者分担负载
- ✅ 水平扩展:增加消费者可以提升吞吐量
- ❌ 一条消息只能被一个消费者处理
适用场景:
- 任务队列(每个任务只需要执行一次)
- 订单处理(每个订单只需要处理一次)
- 邮件发送(每封邮件只需要发送一次)
1.3 Level 2:发布订阅模型(Topic)
特点:一条消息可以被多个订阅者消费。
/**
* 发布订阅模型:一条消息被多个订阅者消费
*/
public class PubSubTopic {
// Topic → 多个订阅者
private final Map<String, List<Subscriber>> topicSubscribers = new ConcurrentHashMap<>();
/**
* 订阅者接口
*/
public interface Subscriber {
void onMessage(Message message);
}
/**
* 订阅Topic
*/
public void subscribe(String topic, Subscriber subscriber) {
topicSubscribers.computeIfAbsent(topic, k -> new CopyOnWriteArrayList<>())
.add(subscriber);
System.out.println("[" + subscriber.getClass().getSimpleName() + "] 订阅Topic: " + topic);
}
/**
* 发布消息到Topic
*/
public void publish(String topic, String content) {
Message message = new Message(
"MSG-" + System.nanoTime(),
content,
System.currentTimeMillis()
);
System.out.println("[Publisher] 发布消息到Topic: " + topic + ", 消息: " + message.getId());
// 通知所有订阅者
List<Subscriber> subscribers = topicSubscribers.get(topic);
if (subscribers != null) {
for (Subscriber subscriber : subscribers) {
// 异步通知(实际MQ会用线程池)
new Thread(() -> subscriber.onMessage(message)).start();
}
}
}
/**
* 测试:多个订阅者订阅同一个Topic
*/
public static void main(String[] args) throws InterruptedException {
PubSubTopic topic = new PubSubTopic();
// 订阅者1:库存服务
topic.subscribe("order.created", message -> {
System.out.println("[InventoryService] 接收消息: " + message.getId() + ", 扣减库存");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 订阅者2:积分服务
topic.subscribe("order.created", message -> {
System.out.println("[PointsService] 接收消息: " + message.getId() + ", 增加积分");
try {
Thread.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 订阅者3:通知服务
topic.subscribe("order.created", message -> {
System.out.println("[NotificationService] 接收消息: " + message.getId() + ", 发送通知");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 发布消息
for (int i = 1; i <= 3; i++) {
topic.publish("order.created", "订单-" + i + "已创建");
Thread.sleep(500);
}
Thread.sleep(2000);
}
}
输出示例:
[InventoryService] 订阅Topic: order.created
[PointsService] 订阅Topic: order.created
[NotificationService] 订阅Topic: order.created
[Publisher] 发布消息到Topic: order.created, 消息: MSG-1234567890
[InventoryService] 接收消息: MSG-1234567890, 扣减库存
[PointsService] 接收消息: MSG-1234567890, 增加积分
[NotificationService] 接收消息: MSG-1234567890, 发送通知
[Publisher] 发布消息到Topic: order.created, 消息: MSG-1234567891
[InventoryService] 接收消息: MSG-1234567891, 扣减库存
[PointsService] 接收消息: MSG-1234567891, 增加积分
[NotificationService] 接收消息: MSG-1234567891, 发送通知
...
特点:
- ✅ 一条消息可以被多个订阅者消费
- ✅ 新增订阅者无需修改发布者
- ✅ 完全解耦:发布者不知道订阅者是谁
适用场景:
- 事件通知(订单创建事件通知多个服务)
- 日志收集(应用日志发送到多个分析系统)
- 数据同步(数据库变更同步到搜索引擎、缓存)
1.4 对比:点对点 vs 发布订阅
| 特性 | 点对点(Queue) | 发布订阅(Topic) |
|---|---|---|
| 消费者数量 | 多个消费者竞争消费 | 多个订阅者都会收到 |
| 消息消费 | 一条消息只被一个消费者处理 | 一条消息被所有订阅者处理 |
| 消息持久化 | 消息消费后删除 | 消息消费后保留(Kafka) |
| 负载均衡 | 自动负载均衡 | 每个订阅者独立处理 |
| 适用场景 | 任务队列、订单处理 | 事件通知、日志收集 |
| 典型MQ | RabbitMQ Queue | Kafka Topic |
二、核心概念详解
2.1 Producer(生产者)
职责:
- 创建消息
- 发送消息到Broker
- 等待确认(可选)
/**
* 生产者实现(支持同步/异步发送)
*/
public class Producer {
private final MessageBroker broker;
public Producer(MessageBroker broker) {
this.broker = broker;
}
/**
* 同步发送:等待Broker确认
*/
public SendResult sendSync(String topic, String content) {
Message message = new Message(generateId(), content, System.currentTimeMillis());
try {
// 发送消息并等待确认
boolean success = broker.send(topic, message);
if (success) {
return SendResult.success(message.getId());
} else {
return SendResult.failure(message.getId(), "Broker拒绝");
}
} catch (Exception e) {
return SendResult.failure(message.getId(), e.getMessage());
}
}
/**
* 异步发送:立即返回,通过回调通知结果
*/
public void sendAsync(String topic, String content, SendCallback callback) {
Message message = new Message(generateId(), content, System.currentTimeMillis());
// 异步发送
new Thread(() -> {
try {
boolean success = broker.send(topic, message);
if (success) {
callback.onSuccess(SendResult.success(message.getId()));
} else {
callback.onException(new Exception("Broker拒绝"));
}
} catch (Exception e) {
callback.onException(e);
}
}).start();
}
/**
* 批量发送:提升吞吐量
*/
public List<SendResult> sendBatch(String topic, List<String> contents) {
List<Message> messages = contents.stream()
.map(content -> new Message(generateId(), content, System.currentTimeMillis()))
.collect(Collectors.toList());
return broker.sendBatch(topic, messages);
}
private String generateId() {
return "MSG-" + System.nanoTime();
}
/**
* 发送回调接口
*/
public interface SendCallback {
void onSuccess(SendResult result);
void onException(Exception e);
}
/**
* 发送结果
*/
@Data
@AllArgsConstructor
public static class SendResult {
private boolean success;
private String messageId;
private String errorMessage;
public static SendResult success(String messageId) {
return new SendResult(true, messageId, null);
}
public static SendResult failure(String messageId, String errorMessage) {
return new SendResult(false, messageId, errorMessage);
}
}
}
2.2 Consumer(消费者)
职责:
- 从Broker拉取消息(Pull)或接收推送(Push)
- 处理消息
- 发送确认(ACK)
/**
* 消费者实现(支持Push/Pull模式)
*/
public abstract class Consumer {
private final MessageBroker broker;
private final String consumerGroup;
private final Set<String> subscribedTopics = new ConcurrentHashSet<>();
public Consumer(MessageBroker broker, String consumerGroup) {
this.broker = broker;
this.consumerGroup = consumerGroup;
}
/**
* 订阅Topic
*/
public void subscribe(String topic) {
subscribedTopics.add(topic);
System.out.println("[" + consumerGroup + "] 订阅Topic: " + topic);
}
/**
* Pull模式:消费者主动拉取消息
*/
public void pullMessages() {
for (String topic : subscribedTopics) {
List<Message> messages = broker.pull(topic, consumerGroup, 10);
for (Message message : messages) {
try {
// 处理消息
handleMessage(message);
// 手动ACK
broker.ack(topic, message.getId(), consumerGroup);
} catch (Exception e) {
System.err.println("[" + consumerGroup + "] 处理消息失败: " + e.getMessage());
// 不ACK,消息会重新投递
}
}
}
}
/**
* Push模式:Broker主动推送消息
*/
public void onPushMessage(Message message) {
try {
// 处理消息
handleMessage(message);
// 自动ACK(由框架处理)
} catch (Exception e) {
System.err.println("[" + consumerGroup + "] 处理消息失败: " + e.getMessage());
throw e; // 抛出异常,框架会重试
}
}
/**
* 子类实现:处理消息的业务逻辑
*/
protected abstract void handleMessage(Message message) throws Exception;
}
2.3 Broker(消息代理)
职责:
- 接收生产者发送的消息
- 存储消息(内存/磁盘)
- 将消息分发给消费者
- 管理消费进度
/**
* 消息代理(简化版)
*/
public class MessageBroker {
// Topic → 消息队列
private final Map<String, BlockingQueue<Message>> topicQueues = new ConcurrentHashMap<>();
// Topic → ConsumerGroup → 消费进度
private final Map<String, Map<String, Long>> consumerOffsets = new ConcurrentHashMap<>();
/**
* 发送消息到Topic
*/
public boolean send(String topic, Message message) {
BlockingQueue<Message> queue = topicQueues.computeIfAbsent(
topic,
k -> new LinkedBlockingQueue<>(10000)
);
try {
queue.put(message);
System.out.println("[Broker] 接收消息: " + message.getId() + " → Topic: " + topic);
return true;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
/**
* 批量发送消息
*/
public List<Producer.SendResult> sendBatch(String topic, List<Message> messages) {
return messages.stream()
.map(msg -> {
boolean success = send(topic, msg);
return success
? Producer.SendResult.success(msg.getId())
: Producer.SendResult.failure(msg.getId(), "发送失败");
})
.collect(Collectors.toList());
}
/**
* 消费者拉取消息
*/
public List<Message> pull(String topic, String consumerGroup, int maxCount) {
BlockingQueue<Message> queue = topicQueues.get(topic);
if (queue == null) {
return Collections.emptyList();
}
List<Message> messages = new ArrayList<>();
queue.drainTo(messages, maxCount);
System.out.println("[Broker] " + consumerGroup + " 拉取 " + messages.size() + " 条消息");
return messages;
}
/**
* 消费者确认消息
*/
public void ack(String topic, String messageId, String consumerGroup) {
// 简化处理:更新消费进度
System.out.println("[Broker] " + consumerGroup + " 确认消息: " + messageId);
}
}
三、消息传递语义
3.1 至多一次(At Most Once)
语义:消息可能丢失,但绝不重复。
实现方式:
生产者 → Broker(不确认)
Broker → 消费者(不确认)
特点:
- ✅ 性能最高(无需等待确认)
- ❌ 可靠性最低(消息可能丢失)
适用场景:
- 日志收集(允许少量日志丢失)
- 性能监控(允许少量数据丢失)
3.2 至少一次(At Least Once)
语义:消息不会丢失,但可能重复。
实现方式:
生产者 → Broker(等待确认,失败重试)
Broker → 消费者(手动ACK,失败重试)
特点:
- ✅ 可靠性高(消息不丢失)
- ❌ 可能重复(需要消费者幂等处理)
适用场景:
- 订单处理(不能丢失订单,幂等处理避免重复)
- 支付通知(不能丢失支付,幂等处理避免重复扣款)
幂等处理示例:
/**
* 幂等消费者:防止消息重复处理
*/
public class IdempotentConsumer extends Consumer {
// 已处理的消息ID(使用Redis或数据库)
private final Set<String> processedMessageIds = new ConcurrentHashSet<>();
@Override
protected void handleMessage(Message message) throws Exception {
// 检查是否已处理
if (processedMessageIds.contains(message.getId())) {
System.out.println("[IdempotentConsumer] 消息已处理,跳过: " + message.getId());
return;
}
// 处理消息
System.out.println("[IdempotentConsumer] 处理消息: " + message.getId());
processBusinessLogic(message);
// 记录已处理
processedMessageIds.add(message.getId());
}
private void processBusinessLogic(Message message) {
// 业务逻辑
}
}
3.3 恰好一次(Exactly Once)
语义:消息不会丢失,也不会重复。
实现方式:
生产者 → Broker(事务/幂等)
Broker → 消费者(事务/幂等)
实现方案:
- 事务消息(RocketMQ)
- 幂等生产者 + 幂等消费者(Kafka)
- 分布式事务(2PC、TCC)
特点:
- ✅ 可靠性最高(消息不丢失、不重复)
- ❌ 性能最低(需要额外的事务开销)
- ❌ 实现复杂(需要分布式事务支持)
适用场景:
- 金融交易(严格的一致性要求)
- 账户余额(不能重复扣款)
四、总结
本文深入剖析了消息中间件的核心概念:
消息模型:
- 点对点(Queue):一条消息只被一个消费者处理
- 发布订阅(Topic):一条消息被多个订阅者处理
核心组件:
- Producer:创建和发送消息
- Consumer:接收和处理消息
- Broker:存储和分发消息
消息传递语义:
- 至多一次:性能最高,可能丢失
- 至少一次:不会丢失,可能重复
- 恰好一次:既不丢失也不重复,实现复杂
下一篇将深入RabbitMQ的架构与原理。
作者:如月说客 创建时间:2025-11-03 系列文章:消息中间件第一性原理(2/6) 字数:约8,000字