RocketMQ架构08:消息路由与负载均衡 - 智能路由的奥秘

引言:智能路由的艺术 RocketMQ 的路由系统就像交通指挥系统,负责: Producer 找 Broker:发送消息到哪个 Broker? Consumer 找 Broker:从哪个 Broker 拉取消息? 故障转移:Broker 宕机后如何自动切换? 负载均衡:如何均匀分配请求? 今天我们深入剖析这套智能路由系统的设计。 一、路由信息管理 1.1 路由表结构 public class TopicRouteData { // Queue 数据列表 private List<QueueData> queueDatas; // Broker 数据列表 private List<BrokerData> brokerDatas; // 过滤服务器表 private HashMap<String, List<String>> filterServerTable; } // Queue 数据 public class QueueData { private String brokerName; // Broker 名称 private int readQueueNums; // 读队列数 private int writeQueueNums; // 写队列数 private int perm; // 权限 private int topicSynFlag; // 同步标志 } // Broker 数据 public class BrokerData { private String cluster; // 集群名 private String brokerName; // Broker 名称 private HashMap<Long, String> brokerAddrs; // Broker 地址表 // Key: brokerId (0=Master, >0=Slave) // Value: broker 地址 } 路由表示例: ...

2025-11-14 · maneng

RocketMQ架构07:网络通信模型与Remoting模块 - 基于Netty的高性能通信

引言:高性能通信的基石 RocketMQ 的网络通信模块(Remoting)是整个系统的神经网络,负责: Producer → Broker:发送消息 Consumer → Broker:拉取消息 Broker → NameServer:注册与心跳 Broker → Broker:主从同步 今天我们深入剖析这个基于 Netty 的高性能通信模块。 一、Remoting 模块架构 1.1 整体架构 ┌────────────────────────────────────────────────┐ │ Remoting 模块架构 │ ├────────────────────────────────────────────────┤ │ │ │ ┌──────────────────────────────────────────┐ │ │ │ RemotingServer (服务端) │ │ │ │ - NettyRemotingServer │ │ │ │ - 接收客户端请求 │ │ │ └──────────────────────────────────────────┘ │ │ ↕ │ │ ┌──────────────────────────────────────────┐ │ │ │ RemotingClient (客户端) │ │ │ │ - NettyRemotingClient │ │ │ │ - 发送请求到服务端 │ │ │ └──────────────────────────────────────────┘ │ │ ↕ │ │ ┌──────────────────────────────────────────┐ │ │ │ Netty 网络层 │ │ │ │ - EventLoopGroup │ │ │ │ - Channel Pipeline │ │ │ │ - 编解码器 │ │ │ └──────────────────────────────────────────┘ │ │ │ └────────────────────────────────────────────────┘ 1.2 核心组件 // 1. RemotingCommand - 通信协议对象 public class RemotingCommand { private int code; // 请求/响应码 private LanguageCode language; // 语言 private int version; // 版本 private int opaque; // 请求ID private int flag; // 标志位 private String remark; // 备注 private HashMap<String, String> extFields; // 扩展字段 private transient byte[] body; // 消息体 } // 2. NettyRequestProcessor - 请求处理器 public interface NettyRequestProcessor { RemotingCommand processRequest( ChannelHandlerContext ctx, RemotingCommand request ) throws Exception; } // 3. ResponseFuture - 异步响应 public class ResponseFuture { private final int opaque; private final long timeoutMillis; private final InvokeCallback invokeCallback; private final CountDownLatch countDownLatch; private volatile RemotingCommand responseCommand; } 二、请求类型与调用方式 2.1 三种调用方式 ┌─────────────────────────────────────────────┐ │ 三种调用方式对比 │ ├─────────────────────────────────────────────┤ │ │ │ 1. 同步调用 (Sync) │ │ Client Server │ │ │ │ │ │ │ ──── Request ─────────▶ │ │ │ │ (阻塞等待) │ Process │ │ │ │ │ │ │ ◀─── Response ──────── │ │ │ │ │ │ │ 特点:等待响应,吞吐量低 │ │ │ ├─────────────────────────────────────────────┤ │ │ │ 2. 异步调用 (Async) │ │ Client Server │ │ │ │ │ │ │ ──── Request ─────────▶ │ │ │ │ (立即返回) │ Process │ │ │ │ │ │ │ ◀─── Response ──────── │ │ │ │ (回调处理) │ │ │ │ │ 特点:非阻塞,吞吐量高 │ │ │ ├─────────────────────────────────────────────┤ │ │ │ 3. 单向调用 (Oneway) │ │ Client Server │ │ │ │ │ │ │ ──── Request ─────────▶ │ │ │ │ (不等响应) │ Process │ │ │ │ │ │ │ │ 特点:无响应,最高吞吐量 │ │ │ └─────────────────────────────────────────────┘ 2.2 代码示例 // 1. 同步调用 public RemotingCommand invokeSyncImpl( final Channel channel, final RemotingCommand request, final long timeoutMillis ) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { // 创建 ResponseFuture final ResponseFuture responseFuture = new ResponseFuture( channel, opaque, timeoutMillis, null, null ); this.responseTable.put(opaque, responseFuture); // 发送请求 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } responseFuture.setSendRequestOK(false); responseFuture.putResponse(null); } }); // 阻塞等待响应 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(); } else { throw new RemotingSendRequestException(); } } return responseCommand; } finally { this.responseTable.remove(opaque); } } // 2. 异步调用 public void invokeAsyncImpl( final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback ) throws InterruptedException, RemotingTooMuchRequestException, RemotingSendRequestException { final int opaque = request.getOpaque(); // 创建 ResponseFuture(带回调) final ResponseFuture responseFuture = new ResponseFuture( channel, opaque, timeoutMillis, invokeCallback, null ); this.responseTable.put(opaque, responseFuture); // 发送请求 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } // 失败立即回调 responseFuture.setSendRequestOK(false); responseFuture.putResponse(null); responseTable.remove(opaque); responseFuture.executeInvokeCallback(); } }); } // 3. 单向调用 public void invokeOnewayImpl( final Channel channel, final RemotingCommand request, final long timeoutMillis ) throws InterruptedException, RemotingTooMuchRequestException, RemotingSendRequestException { // 设置单向标志 request.markOnewayRPC(); // 直接发送,不等响应 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (!f.isSuccess()) { log.warn("send request failed. channel={}", channel); } } }); } 三、请求码与处理器映射 3.1 常见请求码 public class RequestCode { // Producer 相关 public static final int SEND_MESSAGE = 10; public static final int SEND_MESSAGE_V2 = 310; public static final int SEND_BATCH_MESSAGE = 320; // Consumer 相关 public static final int PULL_MESSAGE = 11; public static final int QUERY_MESSAGE = 12; public static final int QUERY_MESSAGE_BY_KEY = 13; // Broker 相关 public static final int REGISTER_BROKER = 103; public static final int UNREGISTER_BROKER = 104; public static final int HEART_BEAT = 34; // 管理相关 public static final int GET_ALL_TOPIC_CONFIG = 21; public static final int UPDATE_BROKER_CONFIG = 25; } 3.2 处理器注册 public class BrokerController { public void registerProcessor() { // 1. 发送消息处理器 SendMessageProcessor sendProcessor = new SendMessageProcessor(this); this.remotingServer.registerProcessor( RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor ); // 2. 拉取消息处理器 PullMessageProcessor pullProcessor = new PullMessageProcessor(this); this.remotingServer.registerProcessor( RequestCode.PULL_MESSAGE, pullProcessor, this.pullMessageExecutor ); // 3. 查询消息处理器 QueryMessageProcessor queryProcessor = new QueryMessageProcessor(this); this.remotingServer.registerProcessor( RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor ); // 4. 心跳处理器 ClientManageProcessor clientManageProcessor = new ClientManageProcessor(this); this.remotingServer.registerProcessor( RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor ); // 5. 默认处理器 AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this); this.remotingServer.registerDefaultProcessor( adminProcessor, this.adminBrokerExecutor ); } } 四、编解码协议 4.1 协议格式 ┌─────────────────────────────────────────────┐ │ RocketMQ 通信协议格式 │ ├─────────────────────────────────────────────┤ │ │ │ 消息总长度(4字节) │ │ ┌──────────────────────────────┐ │ │ │ Total Length │ │ │ └──────────────────────────────┘ │ │ │ │ 序列化类型 + 头部长度(4字节) │ │ ┌──────────────────────────────┐ │ │ │ [1字节] [3字节] │ │ │ │ SerType Header Length │ │ │ └──────────────────────────────┘ │ │ │ │ 头部数据(变长) │ │ ┌──────────────────────────────┐ │ │ │ Header Data │ │ │ │ (JSON 或 RocketMQ 私有格式) │ │ │ └──────────────────────────────┘ │ │ │ │ 消息体(变长) │ │ ┌──────────────────────────────┐ │ │ │ Body Data │ │ │ └──────────────────────────────┘ │ │ │ └─────────────────────────────────────────────┘ 4.2 编码器实现 public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> { @Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } } } // RemotingCommand 编码 public ByteBuffer encodeHeader() { return encodeHeader(this.body != null ? this.body.length : 0); } public ByteBuffer encodeHeader(final int bodyLength) { // 1. 计算总长度 int length = 4; // 总长度字段本身 // 2. 序列化头部 byte[] headerData; headerData = this.headerEncode(); length += headerData.length; length += bodyLength; // 3. 分配缓冲区 ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // 4. 写入总长度 result.putInt(length); // 5. 写入序列化类型 + 头部长度 result.put(markProtocolType(headerData.length, SerializeType.JSON)); // 6. 写入头部数据 result.put(headerData); result.flip(); return result; } 五、线程模型 5.1 Netty 线程模型 ┌─────────────────────────────────────────────┐ │ Netty Reactor 线程模型 │ ├─────────────────────────────────────────────┤ │ │ │ Boss Group (1个线程) │ │ ┌──────────────────────────────┐ │ │ │ - 接收新连接 │ │ │ │ - 注册到 Worker Group │ │ │ └──────────────────────────────┘ │ │ ↓ │ │ Worker Group (多个线程) │ │ ┌──────────────────────────────┐ │ │ │ - I/O 读写 │ │ │ │ - 编解码 │ │ │ │ - 分发到业务线程池 │ │ │ └──────────────────────────────┘ │ │ ↓ │ │ 业务线程池 (可配置) │ │ ┌──────────────────────────────┐ │ │ │ - SendMessageExecutor │ │ │ │ - PullMessageExecutor │ │ │ │ - QueryMessageExecutor │ │ │ │ - HeartbeatExecutor │ │ │ └──────────────────────────────┘ │ │ │ └─────────────────────────────────────────────┘ 5.2 线程池配置 // Broker 启动时初始化线程池 public class BrokerController { private void initialize() { // 1. 发送消息线程池 this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getSendMessageThreadPoolNums(), // 核心线程数:默认16 this.brokerConfig.getSendMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_") ); // 2. 拉取消息线程池 this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), // 默认16+核心数 this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_") ); // 3. 查询消息线程池 this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getQueryMessageThreadPoolNums(), // 默认8+核心数 this.brokerConfig.getQueryMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.queryThreadPoolQueue, new ThreadFactoryImpl("QueryMessageThread_") ); } } // 配置建议 # broker.conf sendMessageThreadPoolNums=16 # 发送消息线程 pullMessageThreadPoolNums=32 # 拉取消息线程 queryMessageThreadPoolNums=16 # 查询消息线程 六、性能优化 6.1 零拷贝 // 使用 FileRegion 实现零拷贝传输 public class TransferMsgByHeap { public void transferData(SelectMappedBufferResult selectMappedBufferResult, Channel channel) { ByteBuffer byteBuffer = selectMappedBufferResult.getByteBuffer(); // 方式1:堆内存传输(有拷贝) byte[] data = new byte[byteBuffer.remaining()]; byteBuffer.get(data); channel.writeAndFlush(Unpooled.wrappedBuffer(data)); // 方式2:零拷贝传输(推荐) FileRegion fileRegion = new DefaultFileRegion( selectMappedBufferResult.getMappedFile().getFileChannel(), selectMappedBufferResult.getStartOffset(), selectMappedBufferResult.getSize() ); channel.writeAndFlush(fileRegion); } } 6.2 批量发送 // 生产者批量发送 DefaultMQProducer producer = new DefaultMQProducer("group"); producer.start(); List<Message> messages = new ArrayList<>(); for (int i = 0; i < 100; i++) { Message msg = new Message("Topic", "Tag", ("Hello" + i).getBytes()); messages.add(msg); } // 一次网络调用发送100条 SendResult result = producer.send(messages); 6.3 连接复用 // Client 连接池复用 public class NettyRemotingClient { // Channel 缓存表 private final ConcurrentMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<>(); // 获取或创建 Channel private Channel getAndCreateChannel(final String addr) throws InterruptedException { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } return this.createChannel(addr); } // 优势:减少连接建立开销 } 七、总结与思考 7.1 核心要点 Netty 基础:RocketMQ 基于 Netty 构建网络通信 三种调用:同步、异步、单向,满足不同场景 请求处理器:请求码映射到具体处理器 自定义协议:高效的二进制协议 线程模型:Reactor + 业务线程池 7.2 思考题 为什么需要三种调用方式? ...

2025-11-14 · maneng

RocketMQ架构06:索引机制详解 - ConsumeQueue与IndexFile的巧妙设计

引言:索引的魔法 如果说 CommitLog 是一本厚厚的字典,那么 ConsumeQueue 和 IndexFile 就是目录和索引。它们让 RocketMQ 能够: 秒级定位:从百万条消息中快速找到目标 轻量高效:索引占用空间极小 多维查询:支持按 Queue、Key、时间查询 今天我们深入剖析这两种索引的巧妙设计。 一、为什么需要索引? 1.1 没有索引的困境 场景:Consumer 想消费 TopicA 的消息 方案1:遍历 CommitLog(❌) ┌────────────────────────────────────┐ │ CommitLog(所有Topic混存) │ ├────────────────────────────────────┤ │ TopicA-Msg1 │ │ TopicB-Msg1 │ ← 需要跳过 │ TopicC-Msg1 │ ← 需要跳过 │ TopicA-Msg2 │ │ TopicB-Msg2 │ ← 需要跳过 │ ... │ └────────────────────────────────────┘ 问题: 1. 需要扫描所有消息 → 慢 2. 无法按 Queue 过滤 → 低效 3. 无法快速定位 Offset → 不可用 1.2 RocketMQ 的索引方案 ┌─────────────────────────────────────────────┐ │ 双层索引架构 │ ├─────────────────────────────────────────────┤ │ │ │ ConsumeQueue(消费索引) │ │ - 按 Topic-Queue 组织 │ │ - 存储消息在 CommitLog 的位置 │ │ - 支持顺序消费 │ │ │ │ IndexFile(查询索引) │ │ - 按 Key/时间 组织 │ │ - Hash 索引结构 │ │ - 支持随机查询 │ │ │ └─────────────────────────────────────────────┘ 二、ConsumeQueue:消费索引 2.1 文件组织结构 $HOME/store/consumequeue/ ├── TopicA/ # Topic 名称 │ ├── 0/ # Queue ID = 0 │ │ ├── 00000000000000000000 # 第1个文件 │ │ ├── 00000000000600000000 # 第2个文件 │ │ └── ... │ ├── 1/ # Queue ID = 1 │ │ └── ... │ └── ... ├── TopicB/ │ └── ... └── ... 文件大小:600万字节(30万条索引 × 20字节) 文件名:该文件第一条索引的逻辑偏移量 2.2 索引格式 ┌────────────────────────────────────┐ │ 单条索引格式(20字节) │ ├────────────────────────────────────┤ │ CommitLog Offset (8字节) │ ← 消息在 CommitLog 的物理位置 │ Size (4字节) │ ← 消息大小 │ Tag HashCode (8字节) │ ← Tag 哈希值(用于过滤) └────────────────────────────────────┘ 实际示例: ...

2025-11-14 · maneng

RocketMQ架构05:CommitLog深度剖析 - 顺序写的艺术

引言:顺序写的魔力 CommitLog 是 RocketMQ 高性能的核心秘密。它用顺序写实现了: 单机 10 万+ TPS:远超数据库的随机写 亚毫秒级延迟:消息写入延迟 < 1ms 零拷贝读取:MMAP 直接访问磁盘文件 今天我们从源码级别剖析 CommitLog 的实现细节。 一、为什么顺序写这么快? 1.1 随机写 vs 顺序写 ┌──────────────────────────────────────────┐ │ 磁盘写入性能对比 │ ├──────────────────────────────────────────┤ │ │ │ 随机写(数据库): │ │ ┌───┐ ┌───┐ ┌───┐ │ │ │ A │ → │ C │ → │ B │ │ │ └───┘ └───┘ └───┘ │ │ 磁盘块100 磁盘块500 磁盘块200 │ │ │ │ 问题: │ │ - 磁头需要频繁移动 │ │ - IOPS 约 100-200/s │ │ │ ├──────────────────────────────────────────┤ │ │ │ 顺序写(RocketMQ): │ │ ┌───┬───┬───┬───┬───┐ │ │ │ A │ B │ C │ D │ E │ ... │ │ └───┴───┴───┴───┴───┘ │ │ CommitLog 文件 │ │ │ │ 优势: │ │ - 磁头连续移动 │ │ - 吞吐量 约 600MB/s(机械硬盘) │ │ - 吞吐量 约 2GB/s(SSD) │ │ │ └──────────────────────────────────────────┘ 性能差距: ...

2025-11-14 · maneng

RocketMQ架构04:存储引擎深度剖析 - 高性能消息存储的奥秘

引言:存储引擎的设计哲学 RocketMQ 的存储引擎是其高性能的核心。它用极简的设计实现了: 百万级 TPS:单机支持百万级消息吞吐 毫秒级延迟:消息存储延迟 < 1ms TB 级存储:单 Broker 可存储数 TB 消息 零数据丢失:通过刷盘策略保证可靠性 今天我们从第一性原理出发,逐步理解这个存储引擎的巧妙设计。 一、为什么需要这样的存储模型? 1.1 传统数据库存储的问题 方案1:为每个 Queue 建一张表 -- TopicA 的 Queue0 CREATE TABLE topic_a_queue_0 ( offset BIGINT PRIMARY KEY, message BLOB, store_time TIMESTAMP ); -- TopicA 的 Queue1 CREATE TABLE topic_a_queue_1 (...); ... 问题: 1. 表数量爆炸:1000个Topic × 4个Queue = 4000张表 2. 随机写入:不同表的写入是随机I/O → 性能差 3. 数据分散:难以统一管理和备份 1.2 RocketMQ 的解决方案 核心思想:写入集中化 + 读取索引化 ┌─────────────────────────────────────────────────┐ │ RocketMQ 存储模型 │ ├─────────────────────────────────────────────────┤ │ │ │ 所有消息 → 统一写入 CommitLog(顺序写) │ │ ↓ │ │ 异步构建 ConsumeQueue(索引) │ │ ↓ │ │ Consumer 根据索引快速定位 │ │ │ └─────────────────────────────────────────────────┘ 优势: ...

2025-11-13 · maneng

RocketMQ架构03:Broker架构与核心组件 - 消息中枢的内部世界

引言:消息系统的心脏 如果说 NameServer 是 RocketMQ 的"大脑"(路由中心),那么 Broker 就是"心脏"(消息中枢)。它负责: 消息存储:持久化所有消息 消息处理:接收生产者消息,推送给消费者 高可用保障:主从同步、故障转移 性能优化:零拷贝、顺序写、内存映射 今天我们从第一性原理出发,逐步理解 Broker 的内部世界。 一、Broker 的核心职责 1.1 从需求出发 假设我们要设计一个消息存储节点,需要解决哪些问题? 基础需求: 1. 接收消息 → 需要网络通信模块 2. 存储消息 → 需要存储引擎 3. 分发消息 → 需要索引和查询机制 4. 保证可靠 → 需要主从同步 5. 高性能 → 需要优化 I/O Broker 就是围绕这5个核心需求设计的。 1.2 Broker 的三重身份 ┌─────────────────────────────────────┐ │ Broker 的三重身份 │ ├─────────────────────────────────────┤ │ 1. 消息接收器(Producer 视角) │ │ - 接收消息请求 │ │ - 验证权限 │ │ - 返回存储结果 │ ├─────────────────────────────────────┤ │ 2. 消息存储库(系统视角) │ │ - 持久化消息 │ │ - 管理索引 │ │ - 定期清理 │ ├─────────────────────────────────────┤ │ 3. 消息分发器(Consumer 视角) │ │ - 根据订阅关系推送消息 │ │ - 管理消费进度 │ │ - 支持消息重试 │ └─────────────────────────────────────┘ 二、Broker 架构全景 2.1 核心组件架构图 ┌────────────────────────────────────────────────────────┐ │ Broker 节点 │ ├────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ Remoting 模块(网络通信层) │ │ │ │ - NettyServer(接收请求) │ │ │ │ - RequestProcessor(请求处理器) │ │ │ └──────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ Message Store 模块(存储引擎) │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │ CommitLog │ │ConsumeQueue│ │ IndexFile │ │ │ │ │ │ 消息主存储 │ │ 消费索引 │ │ 查询索引 │ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ │ │ │ └──────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ HA 模块(高可用) │ │ │ │ - HAService(主从同步) │ │ │ │ - CommitLogDispatcher(消息分发) │ │ │ └──────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ 其他核心模块 │ │ │ │ - TopicConfigManager(Topic 配置管理) │ │ │ │ - ConsumerOffsetManager(消费进度管理) │ │ │ │ - ScheduleMessageService(延迟消息) │ │ │ │ - TransactionalMessageService(事务消息) │ │ │ └──────────────────────────────────────────────────┘ │ │ │ └────────────────────────────────────────────────────────┘ 2.2 各模块职责详解 1️⃣ Remoting 模块(网络通信) // 核心职责 1. 接收网络请求(基于 Netty) 2. 请求路由与分发 3. 编解码(序列化/反序列化) 4. 返回响应结果 // 处理的请求类型 - SEND_MESSAGE // 发送消息 - PULL_MESSAGE // 拉取消息 - QUERY_MESSAGE // 查询消息 - HEART_BEAT // 心跳 - REGISTER_BROKER // Broker 注册 关键设计: ...

2025-11-13 · maneng

RocketMQ架构02:NameServer深度剖析 - 轻量级注册中心的设计哲学

引言:简单而不简陋 NameServer 是 RocketMQ 中最"简单"的组件,但简单不代表简陋。它用最少的代码实现了最核心的功能: 无状态、无持久化 不依赖任何第三方组件 完全内存操作 对等部署、无主从之分 今天,我们深入理解这个"简单而强大"的组件。 一、NameServer 核心数据结构 1.1 路由信息管理 public class RouteInfoManager { // 1. Topic → QueueData 映射 // 记录每个 Topic 在哪些 Broker 上有哪些 Queue private final HashMap<String/* topic */, List<QueueData>> topicQueueTable; // QueueData 结构 class QueueData { private String brokerName; // Broker 名称 private int readQueueNums; // 读队列数量 private int writeQueueNums; // 写队列数量 private int perm; // 权限(2=写 4=读 6=读写) private int topicSynFlag; // 同步标识 } // 2. Broker 名称 → BrokerData 映射 // 记录 Broker 的集群信息和地址 private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // BrokerData 结构 class BrokerData { private String cluster; // 所属集群 private String brokerName; // Broker 名称 private HashMap<Long/* brokerId */, String/* address */> brokerAddrs; // brokerId: 0=Master, >0=Slave } // 3. Broker 地址 → BrokerLiveInfo 映射 // 记录 Broker 的存活信息 private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // BrokerLiveInfo 结构 class BrokerLiveInfo { private long lastUpdateTimestamp; // 最后更新时间 private DataVersion dataVersion; // 数据版本 private Channel channel; // Netty Channel private String haServerAddr; // HA 服务地址 } // 4. 集群名称 → Broker 名称集合 private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable; // 5. Broker 地址 → 过滤服务器列表 private final HashMap<String/* brokerAddr */, List<String/* filter server */>> filterServerTable; // 读写锁保护 private final ReadWriteLock lock = new ReentrantReadWriteLock(); } 1.2 数据结构关系图 数据结构关系: topicQueueTable ┌──────────────────────────────────────┐ │ Topic1 → [QueueData1, QueueData2] │ │ QueueData1: brokerName=broker-a │ │ QueueData2: brokerName=broker-b │ └──────────────────────────────────────┘ │ ▼ brokerAddrTable ┌──────────────────────────────────────┐ │ broker-a → BrokerData │ │ cluster: DefaultCluster │ │ brokerAddrs: │ │ 0 → 192.168.1.100:10911 (Master)│ │ 1 → 192.168.1.101:10911 (Slave) │ └──────────────────────────────────────┘ │ ▼ brokerLiveTable ┌──────────────────────────────────────┐ │ 192.168.1.100:10911 → BrokerLiveInfo│ │ lastUpdateTimestamp: 1699999999000 │ │ channel: [Netty Channel] │ └──────────────────────────────────────┘ 二、Broker 注册流程 2.1 注册请求处理 public class BrokerRegistration { public RegisterBrokerResult registerBroker( final String clusterName, final String brokerAddr, final String brokerName, final long brokerId, final String haServerAddr, final TopicConfigSerializeWrapper topicConfigWrapper, final List<String> filterServerList, final Channel channel) { RegisterBrokerResult result = new RegisterBrokerResult(); try { // 获取写锁 this.lock.writeLock().lockInterruptibly(); // 1. 更新集群信息 Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); // 2. 更新 Broker 地址表 boolean registerFirst = false; BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<>()); this.brokerAddrTable.put(brokerName, brokerData); } // 更新 Broker 地址 Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); String oldAddr = brokerAddrsMap.put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); // 3. 更新 Topic 配置 if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { // 只有 Master 才更新 Topic 配置 if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } // 4. 更新 Broker 存活信息 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put( brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr ) ); if (null == prevBrokerLiveInfo) { log.info("new broker registered: {}", brokerAddr); } // 5. 更新过滤服务器列表 if (filterServerList != null && !filterServerList.isEmpty()) { this.filterServerTable.put(brokerAddr, filterServerList); } // 6. 如果是 Slave,返回 Master 信息 if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo masterLiveInfo = this.brokerLiveTable.get(masterAddr); if (masterLiveInfo != null) { result.setHaServerAddr(masterLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } catch (Exception e) { log.error("registerBroker Exception", e); } finally { this.lock.writeLock().unlock(); } return result; } // 创建或更新 Queue 数据 private void createAndUpdateQueueData(String brokerName, TopicConfig topicConfig) { QueueData queueData = new QueueData(); queueData.setBrokerName(brokerName); queueData.setWriteQueueNums(topicConfig.getWriteQueueNums()); queueData.setReadQueueNums(topicConfig.getReadQueueNums()); queueData.setPerm(topicConfig.getPerm()); queueData.setTopicSynFlag(topicConfig.getTopicSysFlag()); List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName()); if (null == queueDataList) { queueDataList = new LinkedList<>(); this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList); log.info("new topic registered: {}", topicConfig.getTopicName()); } else { // 移除旧的数据 queueDataList.removeIf(qd -> qd.getBrokerName().equals(brokerName)); } queueDataList.add(queueData); } } 2.2 Broker 心跳机制 public class BrokerHeartbeat { // Broker 端:定时发送心跳 class BrokerSide { // 每 30 秒向所有 NameServer 注册一次 private final ScheduledExecutorService scheduledExecutorService; public void start() { this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { // 向所有 NameServer 注册 this.registerBrokerAll(true, false, true); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } }, 1000, 30000, TimeUnit.MILLISECONDS); } private void registerBrokerAll( final boolean checkOrderConfig, boolean oneway, boolean forceRegister) { // 准备注册信息 TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); // 向每个 NameServer 注册 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null && nameServerAddressList.size() > 0) { for (String namesrvAddr : nameServerAddressList) { try { RegisterBrokerResult result = this.registerBroker( namesrvAddr, oneway, timeoutMills, topicConfigWrapper ); if (result != null) { // 更新 Master 地址(Slave 使用) if (this.updateMasterHAServerAddrPeriodically && result.getHaServerAddr() != null) { this.messageStore.updateHaMasterAddress(result.getHaServerAddr()); } } } catch (Exception e) { log.warn("registerBroker to {} failed", namesrvAddr, e); } } } } } // NameServer 端:检测 Broker 超时 class NameServerSide { // 每 10 秒扫描一次 public void scanNotActiveBroker() { long timeout = 120000; // 2 分钟超时 Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, BrokerLiveInfo> entry = it.next(); long last = entry.getValue().getLastUpdateTimestamp(); if ((last + timeout) < System.currentTimeMillis()) { // Broker 超时 RemotingUtil.closeChannel(entry.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", entry.getKey(), timeout); // 清理路由信息 this.onChannelDestroy(entry.getKey(), entry.getValue().getChannel()); } } } // 清理路由信息 public void onChannelDestroy(String remoteAddr, Channel channel) { String brokerAddrFound = null; if (channel != null) { try { this.lock.readLock().lockInterruptibly(); // 查找 Broker 地址 for (Entry<String, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) { if (entry.getValue().getChannel() == channel) { brokerAddrFound = entry.getKey(); break; } } } finally { this.lock.readLock().unlock(); } } if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } if (brokerAddrFound != null) { // 移除 Broker this.removeBroker(brokerAddrFound); } } } } 三、路由查询 3.1 获取 Topic 路由信息 public class RouteQuery { public TopicRouteData pickupTopicRouteData(final String topic) { TopicRouteData topicRouteData = new TopicRouteData(); boolean foundQueueData = false; boolean foundBrokerData = false; try { // 获取读锁 this.lock.readLock().lockInterruptibly(); // 1. 查找 Topic 的 Queue 信息 List<QueueData> queueDataList = this.topicQueueTable.get(topic); if (queueDataList != null) { topicRouteData.setQueueDatas(queueDataList); foundQueueData = true; // 2. 查找对应的 Broker 信息 Set<String> brokerNameSet = new HashSet<>(); for (QueueData qd : queueDataList) { brokerNameSet.add(qd.getBrokerName()); } for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { BrokerData brokerDataClone = new BrokerData( brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone() ); topicRouteData.getBrokerDatas().add(brokerDataClone); foundBrokerData = true; // 3. 查找过滤服务器 for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { List<String> filterServerList = this.filterServerTable.get(brokerAddr); if (filterServerList != null) { topicRouteData.getFilterServerTable().put(brokerAddr, filterServerList); } } } } } } catch (Exception e) { log.error("pickupTopicRouteData Exception", e); } finally { this.lock.readLock().unlock(); } if (foundQueueData && foundBrokerData) { return topicRouteData; } return null; } // TopicRouteData 结构 class TopicRouteData { private String orderTopicConf; // 顺序消息配置 private List<QueueData> queueDatas; // Queue 信息列表 private List<BrokerData> brokerDatas; // Broker 信息列表 private HashMap<String, List<String>> filterServerTable; // 过滤服务器 } } 3.2 Client 端路由更新 public class ClientRouteUpdate { // Producer/Consumer 定时更新路由信息 class RouteUpdateTask { // 每 30 秒更新一次 private ScheduledExecutorService scheduledExecutorService; public void start() { this.scheduledExecutorService.scheduleAtFixedRate(() -> { try { this.updateTopicRouteInfoFromNameServer(); } catch (Exception e) { log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e); } }, 10, 30000, TimeUnit.MILLISECONDS); } private void updateTopicRouteInfoFromNameServer() { // 获取所有订阅的 Topic Set<String> topicSet = new HashSet<>(); // Producer 的 Topic topicSet.addAll(this.producerTable.keySet()); // Consumer 的 Topic for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { topicSet.addAll(entry.getValue().subscriptions()); } // 更新每个 Topic 的路由信息 for (String topic : topicSet) { this.updateTopicRouteInfoFromNameServer(topic); } } public boolean updateTopicRouteInfoFromNameServer(final String topic) { try { // 从 NameServer 获取路由信息 TopicRouteData topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 3000); if (topicRouteData != null) { // 对比版本,判断是否需要更新 TopicRouteData old = this.topicRouteTable.get(topic); boolean changed = topicRouteDataChanged(old, topicRouteData); if (changed) { // 更新本地路由表 this.topicRouteTable.put(topic, topicRouteData); // 更新 Producer 的路由信息 for (Entry<String, MQProducerInner> entry : this.producerTable.entrySet()) { entry.getValue().updateTopicPublishInfo(topic, topicRouteData); } // 更新 Consumer 的路由信息 for (Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { entry.getValue().updateTopicSubscribeInfo(topic, topicRouteData); } log.info("topicRouteTable.put: Topic={}, RouteData={}", topic, topicRouteData); return true; } } } catch (Exception e) { log.warn("updateTopicRouteInfoFromNameServer Exception", e); } return false; } // 判断路由信息是否变化 private boolean topicRouteDataChanged(TopicRouteData olddata, TopicRouteData nowdata) { if (olddata == null || nowdata == null) { return true; } TopicRouteData old = olddata.cloneTopicRouteData(); TopicRouteData now = nowdata.cloneTopicRouteData(); Collections.sort(old.getQueueDatas()); Collections.sort(old.getBrokerDatas()); Collections.sort(now.getQueueDatas()); Collections.sort(now.getBrokerDatas()); return !old.equals(now); } } } 四、NameServer 高可用设计 4.1 对等部署架构 NameServer 集群架构: ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ │ (独立运行) │ │ (独立运行) │ │ (独立运行) │ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │ │ │ └──────────────────┼──────────────────┘ │ ┌───────────▼──────────┐ │ │ │ Broker 集群 │ │ (向所有NS注册) │ │ │ └──────────────────────┘ │ ┌───────────▼──────────┐ │ │ │ Producer/Consumer │ │ (随机选择NS查询) │ │ │ └──────────────────────┘ 特点: 1. NameServer 之间无任何通信 2. 每个 NameServer 都是完整功能的独立节点 3. Broker 向所有 NameServer 注册 4. Client 随机选择 NameServer 查询 4.2 数据一致性保证 public class ConsistencyGuarantee { // 最终一致性 class EventualConsistency { /* NameServer 不保证强一致性,但保证最终一致性 场景1:Broker 注册 ┌─────────────────────────────────────────┐ │ T0: Broker 向 NS1 注册 │ │ T1: Broker 向 NS2 注册 │ │ T2: Broker 向 NS3 注册 │ │ │ │ 在 T0-T2 期间,三个 NS 数据不一致 │ │ T2 后,数据最终一致 │ └─────────────────────────────────────────┘ 场景2:Broker 宕机 ┌─────────────────────────────────────────┐ │ T0: Broker 宕机 │ │ T1: NS1 检测到超时(2分钟) │ │ T2: NS2 检测到超时 │ │ T3: NS3 检测到超时 │ │ │ │ 在检测到之前,可能有短暂不一致 │ │ 但影响有限(Client 会自动切换 Broker) │ └─────────────────────────────────────────┘ */ } // 为什么可以接受不一致? class WhyAcceptableInconsistency { /* 1. Client 有容错机制 - 如果从 NS1 获取的 Broker 不可用 - Client 会自动尝试其他 Broker 2. Broker 定时注册 - 每 30 秒注册一次 - 很快会同步到所有 NS 3. 影响范围小 - 只影响短时间的查询 - 不影响已建立的连接 4. 简化设计 - 无需复杂的一致性协议 - 性能更高 */ } } 4.3 故障处理 public class FailureHandling { // 单个 NameServer 故障 class SingleNameServerFailure { /* 场景:NS1 宕机 ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ │ (宕机) │ │ (正常) │ │ (正常) │ └──────────────┘ └──────┬───────┘ └──────┬───────┘ │ │ ┌───────────────┴──────────────────┘ │ ┌───────▼──────────┐ │ Broker 集群 │ │ - 向NS2/NS3注册 │ │ - NS1注册失败 │ └──────────────────┘ 影响: 1. Broker 向 NS1 注册失败(不影响NS2/NS3) 2. Client 如果查询 NS1 会失败,自动重试其他 NS 3. 总体可用性不受影响 */ } // 全部 NameServer 故障 class AllNameServerFailure { /* 场景:所有 NS 宕机 影响: 1. Broker 无法注册新的 Topic 2. Client 无法获取新的路由信息 3. 但已有连接仍可正常工作! 为什么还能工作? - Producer/Consumer 本地缓存了路由信息 - Broker 地址不会频繁变化 - 短时间内(几小时)仍可正常使用 恢复: - NameServer 恢复后 - Broker 重新注册 - Client 更新路由 - 自动恢复正常 */ } // Client 容错机制 class ClientFaultTolerance { public TopicRouteData getTopicRouteData(String topic) { // 尝试从不同的 NameServer 获取 List<String> nameServerList = this.remotingClient.getNameServerAddressList(); for (String nameServer : nameServerList) { try { TopicRouteData result = this.getTopicRouteDataFromNameServer(topic, nameServer, 3000); if (result != null) { return result; } } catch (Exception e) { log.warn("getTopicRouteData from {} failed", nameServer, e); // 继续尝试下一个 NameServer } } throw new MQClientException("No available NameServer"); } } } 五、性能优化 5.1 内存优化 public class MemoryOptimization { // 数据结构优化 class DataStructure { // 使用 HashMap 而非 ConcurrentHashMap // 通过读写锁控制并发,减少锁竞争 private final HashMap<String, List<QueueData>> topicQueueTable; private final ReadWriteLock lock = new ReentrantReadWriteLock(); // 读操作 public List<QueueData> getQueueData(String topic) { try { this.lock.readLock().lockInterruptibly(); return this.topicQueueTable.get(topic); } finally { this.lock.readLock().unlock(); } } // 写操作 public void putQueueData(String topic, List<QueueData> queueDataList) { try { this.lock.writeLock().lockInterruptibly(); this.topicQueueTable.put(topic, queueDataList); } finally { this.lock.writeLock().unlock(); } } } // 避免不必要的对象创建 class ObjectReuse { // 复用 BrokerData 对象 public BrokerData cloneBrokerData(BrokerData brokerData) { // 只克隆必要的字段 return new BrokerData( brokerData.getCluster(), brokerData.getBrokerName(), (HashMap<Long, String>) brokerData.getBrokerAddrs().clone() ); } } } 5.2 网络优化 public class NetworkOptimization { // 批量处理 class BatchProcessing { // Broker 一次注册所有 Topic // 而不是每个 Topic 单独注册 public void registerBroker(TopicConfigSerializeWrapper topicConfigWrapper) { // 包含所有 Topic 的配置 ConcurrentMap<String, TopicConfig> topicConfigTable = topicConfigWrapper.getTopicConfigTable(); // 一次性更新所有 Topic for (Map.Entry<String, TopicConfig> entry : topicConfigTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } // 连接复用 class ConnectionReuse { // Broker 和 NameServer 之间保持长连接 // 避免频繁建立连接 private Channel channel; // Netty Channel public void keepAlive() { // 定时发送心跳保持连接 this.scheduledExecutorService.scheduleAtFixedRate(() -> { this.registerBrokerAll(true, false, true); }, 1000, 30000, TimeUnit.MILLISECONDS); } } } 六、与 ZooKeeper 对比 6.1 设计理念对比 对比维度: ┌──────────────┬──────────────────┬──────────────────┐ │ 维度 │ NameServer │ ZooKeeper │ ├──────────────┼──────────────────┼──────────────────┤ │ CAP 选择 │ AP(可用性优先) │ CP(一致性优先) │ │ 数据持久化 │ 无(纯内存) │ 有(磁盘) │ │ 节点通信 │ 无 │ 有(Paxos/Raft) │ │ 部署复杂度 │ 低 │ 高 │ │ 运维复杂度 │ 低 │ 高 │ │ 性能 │ 高(内存操作) │ 中(磁盘IO) │ │ 一致性保证 │ 最终一致 │ 强一致 │ │ 故障恢复 │ 快 │ 慢(需要选举) │ └──────────────┴──────────────────┴──────────────────┘ 6.2 为什么 RocketMQ 选择 NameServer? public class WhyNameServer { class Reasons { /* 1. 需求不同 - RocketMQ 不需要强一致性 - 路由信息短暂不一致可以接受 - Client 有容错机制 2. 简化部署 - 无需额外依赖 - 降低运维成本 - 减少故障点 3. 性能优势 - 纯内存操作,性能更高 - 无磁盘 IO - 无网络同步开销 4. 可用性优势 - 无需选举,故障恢复快 - 任意节点可服务 - 整体可用性更高 5. 成本考虑 - 更少的机器资源 - 更低的学习成本 - 更少的运维工作 */ } } 七、最佳实践 7.1 部署建议 public class DeploymentBestPractices { // 1. 节点数量 class NodeCount { /* 推荐配置: - 生产环境:3 个节点 - 测试环境:1 个节点即可 原因: - 3 个节点足够高可用 - 过多节点增加 Broker 负担(需向每个节点注册) - 节省资源 */ } // 2. 硬件配置 class HardwareRequirement { /* 推荐配置: - CPU: 2 核即可 - 内存: 4GB 即可 - 磁盘: 无特殊要求(不持久化) - 网络: 千兆网卡 NameServer 非常轻量,硬件需求很低 */ } // 3. 网络规划 class NetworkPlanning { /* 建议: - 与 Broker 在同一机房 - 使用内网通信 - 避免跨公网访问 - 确保网络稳定性 */ } // 4. 启动参数 class StartupParameters { String startCommand = "nohup sh mqnamesrv " + "-Xms2g -Xmx2g -Xmn1g " + "-XX:+UseG1GC " + "-XX:G1HeapRegionSize=16m " + "-XX:+PrintGCDetails " + "-XX:+PrintGCDateStamps " + "-Xloggc:/dev/shm/mq_gc_%p.log &"; } } 7.2 监控建议 public class MonitoringBestPractices { // 1. 关键指标 class KeyMetrics { long registeredBrokerCount; // 注册的 Broker 数量 long registeredTopicCount; // 注册的 Topic 数量 long activeConnectionCount; // 活跃连接数 double routeQueryQPS; // 路由查询 QPS double brokerRegisterQPS; // Broker 注册 QPS } // 2. 告警规则 class AlertRules { /* 建议告警: 1. Broker 数量突然减少(可能宕机) 2. 路由查询失败率 > 1% 3. 内存使用率 > 80% 4. JVM GC 频繁 */ } } 八、总结 NameServer 的设计充分体现了"大道至简"的哲学: ...

2025-11-13 · maneng

RocketMQ架构01:整体架构设计 - NameServer、Broker、Producer、Consumer

引言:一个精妙的分布式系统 如果把 RocketMQ 比作一座现代化城市,那么: NameServer 是城市规划局,管理所有地图信息 Broker 是物流仓库,存储和转运货物 Producer 是发货方,把货物送到仓库 Consumer 是收货方,从仓库取货 今天,我们从整体视角理解这座"消息城市"的运作机制。 一、RocketMQ 整体架构 1.1 核心组件 RocketMQ 架构全景图: ┌────────────────────────────────────────────┐ │ NameServer Cluster │ │ (路由注册中心、服务发现、轻量级协调器) │ └────────┬───────────────────┬───────────────┘ │ │ 注册/心跳 │ │ 获取路由 │ │ ┌────────────▼──────┐ ┌────▼────────────────┐ │ │ │ │ │ Broker Cluster │ │ Producer/Consumer │ │ │ │ │ │ ┌───────────────┐ │ │ 应用程序 │ │ │ Master Broker │ │◄────┤ │ │ │ - CommitLog │ │ 读写 │ │ │ │ - ConsumeQ │ │ 消息 │ │ │ │ - IndexFile │ │ │ │ │ └───────┬───────┘ │ └─────────────────────┘ │ │ 主从同步 │ │ ┌───────▼───────┐ │ │ │ Slave Broker │ │ │ │ - CommitLog │ │ │ │ - ConsumeQ │ │ │ │ - IndexFile │ │ │ └───────────────┘ │ └───────────────────┘ 1.2 组件职责矩阵 组件职责对比: ┌──────────────┬────────────────────────────────────────┐ │ 组件 │ 核心职责 │ ├──────────────┼────────────────────────────────────────┤ │ NameServer │ - 路由信息管理(Topic → Broker 映射) │ │ │ - Broker 注册与心跳检测 │ │ │ - 提供路由信息查询 │ │ │ - 无状态、对等集群 │ ├──────────────┼────────────────────────────────────────┤ │ Broker │ - 消息存储(CommitLog) │ │ │ - 消息索引(ConsumeQueue、IndexFile) │ │ │ - 消息查询与消费 │ │ │ - 高可用(主从复制、Dledger) │ ├──────────────┼────────────────────────────────────────┤ │ Producer │ - 消息生产 │ │ │ - 消息发送(同步、异步、单向) │ │ │ - 负载均衡(选择Broker和Queue) │ │ │ - 故障规避 │ ├──────────────┼────────────────────────────────────────┤ │ Consumer │ - 消息消费 │ │ │ - 消息拉取(长轮询) │ │ │ - 负载均衡(Rebalance) │ │ │ - 消费进度管理 │ └──────────────┴────────────────────────────────────────┘ 二、NameServer 详解 2.1 为什么需要 NameServer public class WhyNameServer { // 问题1:Producer/Consumer 如何知道 Broker 在哪里? class ServiceDiscovery { // 没有 NameServer: // - 硬编码 Broker 地址(不灵活) // - 无法动态感知 Broker 变化 // 有了 NameServer: // - 动态获取 Broker 列表 // - 自动感知 Broker 上下线 } // 问题2:Topic 分布在哪些 Broker 上? class TopicRoute { // NameServer 维护 Topic 路由信息 class TopicRouteData { List<QueueData> queueDatas; // Queue 分布 List<BrokerData> brokerDatas; // Broker 信息 } } // 问题3:为什么不用 ZooKeeper? class WhyNotZooKeeper { /* ZooKeeper 的问题: 1. CP 系统(强一致性)- RocketMQ 只需 AP(可用性优先) 2. 重量级依赖 - NameServer 更轻量 3. 运维复杂 - NameServer 无状态更简单 4. 性能开销 - NameServer 内存操作更快 NameServer 的优势: 1. 完全无状态 2. 对等集群(任意节点提供服务) 3. 内存操作(无磁盘 IO) 4. 简单高效 */ } } 2.2 NameServer 核心功能 public class NameServerCore { // 1. 路由信息管理 class RouteInfoManager { // Topic → Broker 映射 private HashMap<String/* topic */, List<QueueData>> topicQueueTable; // Broker 名称 → Broker 数据 private HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // Broker 地址 → 集群信息 private HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // 集群名称 → Broker 名称列表 private HashMap<String/* clusterName */, Set<String>> clusterAddrTable; // 过滤服务器 private HashMap<String/* brokerAddr */, List<String>> filterServerTable; } // 2. Broker 注册 public RegisterBrokerResult registerBroker( String clusterName, String brokerAddr, String brokerName, long brokerId, String haServerAddr, TopicConfigSerializeWrapper topicConfigWrapper) { // 创建或更新 Broker 数据 BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { brokerData = new BrokerData(clusterName, brokerName, new HashMap<>()); this.brokerAddrTable.put(brokerName, brokerData); } brokerData.getBrokerAddrs().put(brokerId, brokerAddr); // 更新 Topic 路由信息 if (null != topicConfigWrapper) { for (Map.Entry<String, TopicConfig> entry : topicConfigWrapper.getTopicConfigTable().entrySet()) { createAndUpdateQueueData(brokerName, entry.getValue()); } } // 更新 Broker 存活信息 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put( brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr ) ); return new RegisterBrokerResult(...); } // 3. 路由查询 public TopicRouteData getTopicRouteData(String topic) { TopicRouteData topicRouteData = new TopicRouteData(); // 查找 Topic 的 Queue 信息 List<QueueData> queueDataList = this.topicQueueTable.get(topic); // 查找对应的 Broker 信息 Set<String> brokerNameSet = new HashSet<>(); for (QueueData qd : queueDataList) { brokerNameSet.add(qd.getBrokerName()); } // 组装路由数据 for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (brokerData != null) { topicRouteData.getBrokerDatas().add(brokerData.clone()); } } topicRouteData.setQueueDatas(queueDataList); return topicRouteData; } // 4. Broker 心跳检测 public void scanNotActiveBroker() { long timeoutMillis = 120000; // 2 分钟超时 for (Map.Entry<String, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) { long last = entry.getValue().getLastUpdateTimestamp(); if ((last + timeoutMillis) < System.currentTimeMillis()) { // Broker 超时,移除 RemotingUtil.closeChannel(entry.getValue().getChannel()); this.brokerLiveTable.remove(entry.getKey()); // 清理路由信息 this.onChannelDestroy(entry.getKey(), entry.getValue().getChannel()); log.warn("Broker timeout: {}", entry.getKey()); } } } } 2.3 NameServer 集群 public class NameServerCluster { // NameServer 特点 class Characteristics { /* 1. 完全无状态 - 不持久化任何数据 - 所有数据在内存中 - 重启后数据丢失(从 Broker 重新获取) 2. 对等部署 - 所有节点功能完全一致 - 无主从之分 - 无需选举 3. 不互相通信 - NameServer 之间无任何通信 - 数据可能短暂不一致 - 最终一致即可 4. Broker 全量注册 - Broker 向所有 NameServer 注册 - 保证数据冗余 */ } // 部署架构 class DeploymentArchitecture { /* NameServer 集群(推荐3个节点): ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ └────────────────┼────────────────┘ │ ┌───────▼────────┐ │ │ │ Broker 集群 │ │ (全量注册) │ │ │ └────────────────┘ */ } } 三、Broker 详解 3.1 Broker 核心架构 Broker 内部架构: ┌─────────────────────────────────────────────────────┐ │ Broker │ │ ┌────────────────────────────────────────────────┐│ │ │ Remoting 通信层 ││ │ │ (接收 Producer/Consumer 请求) ││ │ └────────┬───────────────────────────────────────┘│ │ │ │ │ ┌────────▼────────────────────────────┐ │ │ │ BrokerController │ │ │ │ - 处理器注册 │ │ │ │ - 定时任务管理 │ │ │ │ - 消息处理 │ │ │ └────────┬────────────────────────────┘ │ │ │ │ │ ┌────────▼─────────────┬───────────────────┐ │ │ │ MessageStore │ HAService │ │ │ │ ┌────────────────┐ │ (主从复制) │ │ │ │ │ CommitLog │ │ │ │ │ │ │ (消息存储) │ │ │ │ │ │ └────────────────┘ │ │ │ │ │ ┌────────────────┐ │ │ │ │ │ │ ConsumeQueue │ │ │ │ │ │ │ (消费索引) │ │ │ │ │ │ └────────────────┘ │ │ │ │ │ ┌────────────────┐ │ │ │ │ │ │ IndexFile │ │ │ │ │ │ │ (消息索引) │ │ │ │ │ │ └────────────────┘ │ │ │ │ └──────────────────────┴───────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ 定时任务 │ │ │ │ - 持久化 ConsumeQueue │ │ │ │ - 持久化 CommitLog │ │ │ │ - 清理过期文件 │ │ │ │ - 统计信息 │ │ │ └─────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────┘ 3.2 Broker 核心职责 public class BrokerCore { // 1. 消息接收与存储 class MessageReceive { public PutMessageResult putMessage(MessageExtBrokerInner msg) { // 写入 CommitLog PutMessageResult result = this.commitLog.putMessage(msg); // 触发刷盘 handleDiskFlush(result, msg); // 触发主从同步 handleHA(result, msg); return result; } } // 2. 消息查询 class MessageQuery { // 按消息 ID 查询 public MessageExt lookMessageByOffset(long commitLogOffset) { return this.commitLog.lookMessageByOffset(commitLogOffset); } // 按 Key 查询 public QueryMessageResult queryMessage(String topic, String key, int maxNum) { return this.indexService.queryMessage(topic, key, maxNum); } // 按时间查询 public QueryMessageResult queryMessageByTime(String topic, long begin, long end) { return this.messageStore.queryMessage(topic, begin, end, maxNum); } } // 3. 消息消费 class MessageConsume { public GetMessageResult getMessage( String group, String topic, int queueId, long offset, int maxMsgNums, SubscriptionData subscriptionData) { // 从 ConsumeQueue 查找消息位置 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); // 根据 offset 获取消息 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); // 从 CommitLog 读取消息内容 List<MessageExt> messageList = new ArrayList<>(); for (int i = 0; i < maxMsgNums; i++) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); MessageExt msg = lookMessageByOffset(offsetPy, sizePy); // 消息过滤 if (match(subscriptionData, msg)) { messageList.add(msg); } } return new GetMessageResult(messageList); } } // 4. Topic 创建 class TopicManagement { public void createTopic(String topic, int queueNums) { TopicConfig topicConfig = new TopicConfig(topic); topicConfig.setReadQueueNums(queueNums); topicConfig.setWriteQueueNums(queueNums); topicConfig.setPerm(6); // 读写权限 this.topicConfigTable.put(topic, topicConfig); // 创建 ConsumeQueue for (int i = 0; i < queueNums; i++) { ConsumeQueue logic = new ConsumeQueue(topic, i); this.consumeQueueTable.put(buildKey(topic, i), logic); } } } // 5. 消费进度管理 class ConsumerOffsetManagement { private ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable; public void commitOffset(String group, String topic, int queueId, long offset) { String key = topic + "@" + group; ConcurrentHashMap<Integer, Long> map = offsetTable.get(key); if (map != null) { map.put(queueId, offset); } } public long queryOffset(String group, String topic, int queueId) { String key = topic + "@" + group; ConcurrentHashMap<Integer, Long> map = offsetTable.get(key); if (map != null) { Long offset = map.get(queueId); return offset != null ? offset : -1; } return -1; } } } 3.3 Broker 高可用 public class BrokerHA { // 主从架构 class MasterSlave { /* 主从模式: ┌──────────────┐ 同步 ┌──────────────┐ │ Master Broker│ ──────────> │ Slave Broker │ │ - 读写 │ │ - 只读 │ └──────────────┘ └──────────────┘ 同步方式: 1. 同步复制(SYNC_MASTER): - Master 等待 Slave 同步完成才返回 - 可靠性高,性能稍差 2. 异步复制(ASYNC_MASTER): - Master 立即返回,后台异步同步 - 性能高,可能丢失少量数据 */ } // Dledger 模式 class DledgerMode { /* 基于 Raft 的自动选主: ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │ (Leader) │◄─▶│(Follower)│◄─▶│(Follower)│ └──────────┘ └──────────┘ └──────────┘ 特点: 1. 自动故障转移 2. 无需人工干预 3. 保证数据一致性 4. 至少3个节点 */ } } 四、Producer 工作原理 4.1 Producer 架构 public class ProducerArchitecture { // Producer 核心组件 class ProducerComponents { private MQClientInstance clientInstance; // 客户端实例 private MQClientAPIImpl mqClientAPI; // 通信API private TopicPublishInfo publishInfo; // Topic路由信息 private SendMessageHook sendHook; // 发送钩子 } // 发送流程 public SendResult send(Message msg) { // 1. 查找 Topic 路由信息 TopicPublishInfo publishInfo = tryToFindTopicPublishInfo(msg.getTopic()); // 2. 选择消息队列 MessageQueue mq = selectOneMessageQueue(publishInfo); // 3. 发送消息 SendResult result = sendKernelImpl(msg, mq); // 4. 更新故障规避信息 updateFaultItem(mq.getBrokerName(), result); return result; } } 4.2 Producer 负载均衡 public class ProducerLoadBalance { // 队列选择策略 public MessageQueue selectQueue(TopicPublishInfo tpInfo) { // 1. 简单轮询 int index = sendWhichQueue.incrementAndGet() % queueSize; return tpInfo.getMessageQueueList().get(index); // 2. 故障规避 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { return mq; } // 3. 选择延迟最小的 return latencyFaultTolerance.pickOneAtLeast(); } } 五、Consumer 工作原理 5.1 Consumer 架构 public class ConsumerArchitecture { // Consumer 核心组件 class ConsumerComponents { private RebalanceService rebalanceService; // 负载均衡服务 private PullMessageService pullMessageService; // 拉取服务 private ConsumeMessageService consumeService; // 消费服务 private OffsetStore offsetStore; // 进度存储 } // 消费流程 public void start() { // 1. 启动负载均衡 rebalanceService.start(); // 2. 启动拉取服务 pullMessageService.start(); // 3. 启动消费服务 consumeService.start(); // 4. 注册到 Broker registerConsumer(); } } 5.2 Consumer 负载均衡(Rebalance) public class ConsumerRebalance { public void doRebalance() { // 1. 获取 Topic 的所有 Queue List<MessageQueue> mqAll = getTopicQueues(topic); // 2. 获取同组的所有 Consumer List<String> cidAll = getConsumerList(group); // 3. 排序(保证一致性视图) Collections.sort(mqAll); Collections.sort(cidAll); // 4. 执行分配 List<MessageQueue> allocated = allocateStrategy.allocate( group, currentCID, mqAll, cidAll ); // 5. 更新本地队列 updateProcessQueue(allocated); } } 六、完整消息流转 6.1 发送消息完整流程 消息发送完整流程: ┌───────────┐ │ Producer │ └─────┬─────┘ │1. 查询路由 ▼ ┌───────────┐ │NameServer │ 返回路由信息(Broker列表、Queue信息) └─────┬─────┘ │ ▼ ┌───────────┐ │ Producer │ 2. 选择 Broker 和 Queue └─────┬─────┘ │3. 发送消息 ▼ ┌───────────┐ │ Broker │ 4. 写入 CommitLog │ ┌───────┐ │ │ │CommitLog│ │ └───┬───┘ │ │ │5. 异步构建 ConsumeQueue │ ┌───▼──────┐ │ │ConsumeQ │ │ └──────────┘ │ │6. 返回结果 └─────┼─────┘ ▼ ┌───────────┐ │ Producer │ 7. 收到发送结果 └───────────┘ 6.2 消费消息完整流程 消息消费完整流程: ┌───────────┐ │ Consumer │ └─────┬─────┘ │1. 注册到 Broker ▼ ┌───────────┐ │ Broker │ 保存 Consumer 信息 └─────┬─────┘ │2. Rebalance 分配 Queue ▼ ┌───────────┐ │ Consumer │ 3. 发起长轮询拉取 └─────┬─────┘ │ ▼ ┌───────────┐ │ Broker │ 4. 从 ConsumeQueue 查找 │ ┌───────┐ │ 5. 从 CommitLog 读取消息 │ │ConsumeQ│ │ │ └───┬───┘ │ │ │ │ │ ┌───▼────┐│ │ │CommitLog││ │ └────────┘│ │ │6. 返回消息 └─────┼─────┘ ▼ ┌───────────┐ │ Consumer │ 7. 处理消息 └─────┬─────┘ 8. 更新消费进度 │ ▼ ┌───────────┐ │ Broker │ 9. 保存消费进度 └───────────┘ 七、设计亮点与思考 7.1 设计亮点 public class DesignHighlights { // 1. NameServer 的轻量化设计 class LightweightNameServer { /* - 无状态,内存操作 - 对等部署,无需选举 - 简单高效 - AP 而非 CP */ } // 2. CommitLog 的顺序写 class SequentialWrite { /* - 所有消息顺序写入同一个文件 - 充分利用磁盘顺序写性能 - 避免随机 IO */ } // 3. ConsumeQueue 的索引设计 class IndexDesign { /* - 轻量级索引(20字节) - 快速定位消息位置 - 支持消费进度管理 */ } // 4. 长轮询的伪 Push class LongPolling { /* - Consumer 主动拉取 - Broker Hold 请求 - 兼顾实时性和可控性 */ } } 7.2 架构权衡 public class ArchitectureTradeoffs { // 权衡1:性能 vs 可靠性 class PerformanceVsReliability { // 异步刷盘 → 高性能,可能丢消息 // 同步刷盘 → 零丢失,性能下降 // 异步复制 → 高性能,可能丢消息 // 同步复制 → 高可靠,性能下降 } // 权衡2:一致性 vs 可用性 class ConsistencyVsAvailability { // NameServer 选择 AP // - 可能短暂不一致 // - 但保证高可用 // Dledger 模式选择 CP // - 保证数据一致 // - 需要多数节点存活 } // 权衡3:复杂度 vs 功能 class ComplexityVsFeature { // RocketMQ 比 Kafka 复杂 // - 但功能更丰富(事务、延迟、顺序) // RocketMQ 比 RabbitMQ 简单 // - 但性能更高 } } 八、总结 通过本篇,我们从整体视角理解了 RocketMQ 的架构: ...

2025-11-13 · maneng

RocketMQ入门10:SpringBoot集成实战 - 从配置到生产级应用

引言:将知识转化为实践 前面9篇,我们深入学习了 RocketMQ 的各个方面。现在,让我们通过一个完整的 SpringBoot 项目,把这些知识串联起来,构建一个真正的生产级应用。 这个项目将包含: 完整的项目结构 各种消息类型的实现 错误处理和重试机制 监控和告警 性能优化 部署方案 一、项目搭建 1.1 项目结构 rocketmq-springboot-demo/ ├── pom.xml ├── src/main/java/com/example/rocketmq/ │ ├── RocketMQApplication.java # 启动类 │ ├── config/ │ │ ├── RocketMQConfig.java # RocketMQ配置 │ │ ├── ProducerConfig.java # 生产者配置 │ │ └── ConsumerConfig.java # 消费者配置 │ ├── producer/ │ │ ├── OrderProducer.java # 订单消息生产者 │ │ ├── TransactionProducer.java # 事务消息生产者 │ │ └── DelayProducer.java # 延迟消息生产者 │ ├── consumer/ │ │ ├── OrderConsumer.java # 订单消息消费者 │ │ ├── TransactionConsumer.java # 事务消息消费者 │ │ └── DelayConsumer.java # 延迟消息消费者 │ ├── message/ │ │ ├── OrderMessage.java # 订单消息实体 │ │ └── MessageWrapper.java # 消息包装器 │ ├── service/ │ │ ├── OrderService.java # 订单服务 │ │ └── MessageService.java # 消息服务 │ ├── utils/ │ │ ├── MessageBuilder.java # 消息构建工具 │ │ └── TraceUtils.java # 链路追踪工具 │ └── listener/ │ ├── TransactionListenerImpl.java # 事务监听器 │ └── MessageListenerImpl.java # 消息监听器 ├── src/main/resources/ │ ├── application.yml # 配置文件 │ ├── application-dev.yml # 开发环境配置 │ ├── application-prod.yml # 生产环境配置 │ └── logback-spring.xml # 日志配置 └── src/test/java/ # 测试类 1.2 Maven 依赖 <!-- pom.xml --> <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.7.14</version> </parent> <groupId>com.example</groupId> <artifactId>rocketmq-springboot-demo</artifactId> <version>1.0.0</version> <properties> <java.version>8</java.version> <rocketmq-spring.version>2.2.3</rocketmq-spring.version> </properties> <dependencies> <!-- Spring Boot Starter --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- RocketMQ Spring Boot Starter --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>${rocketmq-spring.version}</version> </dependency> <!-- 数据库相关 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <!-- Redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- 监控 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-prometheus</artifactId> </dependency> <!-- 工具类 --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.34</version> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.20</version> </dependency> <!-- 测试 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> </project> 1.3 配置文件 # application.yml spring: application: name: rocketmq-springboot-demo # RocketMQ 配置 rocketmq: name-server: ${ROCKETMQ_NAMESRV:localhost:9876} producer: group: ${spring.application.name}_producer_group send-message-timeout: 3000 compress-message-body-threshold: 4096 max-message-size: 4194304 retry-times-when-send-failed: 2 retry-times-when-send-async-failed: 2 retry-next-server: true # 自定义配置 consumer: groups: order-consumer-group: topics: ORDER_TOPIC consumer-group: ${spring.application.name}_order_consumer consume-thread-max: 64 consume-thread-min: 20 transaction-consumer-group: topics: TRANSACTION_TOPIC consumer-group: ${spring.application.name}_transaction_consumer # 应用配置 app: rocketmq: # 是否启用事务消息 transaction-enabled: true # 是否启用消息轨迹 trace-enabled: true # 消息轨迹Topic trace-topic: RMQ_SYS_TRACE_TOPIC # ACL配置 acl-enabled: false access-key: ${ROCKETMQ_ACCESS_KEY:} secret-key: ${ROCKETMQ_SECRET_KEY:} # 监控配置 management: endpoints: web: exposure: include: health,info,metrics,prometheus metrics: export: prometheus: enabled: true 二、核心配置类 2.1 RocketMQ 配置 // RocketMQConfig.java @Configuration @EnableConfigurationProperties(RocketMQProperties.class) @Slf4j public class RocketMQConfig { @Autowired private RocketMQProperties rocketMQProperties; /** * 配置消息轨迹 */ @Bean @ConditionalOnProperty(prefix = "app.rocketmq", name = "trace-enabled", havingValue = "true") public DefaultMQProducer tracedProducer() { DefaultMQProducer producer = new DefaultMQProducer( rocketMQProperties.getProducer().getGroup(), true, // 启用消息轨迹 rocketMQProperties.getTraceTopic() ); configureProducer(producer); return producer; } /** * 配置事务消息生产者 */ @Bean @ConditionalOnProperty(prefix = "app.rocketmq", name = "transaction-enabled", havingValue = "true") public TransactionMQProducer transactionProducer( @Autowired TransactionListener transactionListener) { TransactionMQProducer producer = new TransactionMQProducer( rocketMQProperties.getProducer().getGroup() + "_TX" ); configureProducer(producer); // 设置事务监听器 producer.setTransactionListener(transactionListener); // 设置事务回查线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor( 2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> { Thread thread = new Thread(r); thread.setName("transaction-check-thread"); return thread; } ); producer.setExecutorService(executor); return producer; } /** * 通用生产者配置 */ private void configureProducer(DefaultMQProducer producer) { producer.setNamesrvAddr(rocketMQProperties.getNameServer()); producer.setSendMsgTimeout(rocketMQProperties.getProducer().getSendMessageTimeout()); producer.setCompressMsgBodyOverHowmuch( rocketMQProperties.getProducer().getCompressMessageBodyThreshold() ); producer.setMaxMessageSize(rocketMQProperties.getProducer().getMaxMessageSize()); producer.setRetryTimesWhenSendFailed( rocketMQProperties.getProducer().getRetryTimesWhenSendFailed() ); // 配置ACL if (rocketMQProperties.isAclEnabled()) { producer.setAccessKey(rocketMQProperties.getAccessKey()); producer.setSecretKey(rocketMQProperties.getSecretKey()); } } /** * 消息发送模板 */ @Bean public RocketMQTemplate rocketMQTemplate(RocketMQMessageConverter converter) { RocketMQTemplate template = new RocketMQTemplate(); template.setMessageConverter(converter); return template; } /** * 自定义消息转换器 */ @Bean public RocketMQMessageConverter rocketMQMessageConverter() { return new CustomMessageConverter(); } } 2.2 自定义消息转换器 // CustomMessageConverter.java @Component public class CustomMessageConverter extends CompositeMessageConverter { private static final String ENCODING = "UTF-8"; @Override public org.springframework.messaging.Message<?> toMessage( Object payload, MessageHeaders headers) { // 添加追踪信息 Map<String, Object> enrichedHeaders = new HashMap<>(headers); enrichedHeaders.put("traceId", TraceUtils.getTraceId()); enrichedHeaders.put("timestamp", System.currentTimeMillis()); return super.toMessage(payload, new MessageHeaders(enrichedHeaders)); } @Override public Object fromMessage(org.springframework.messaging.Message<?> message, Class<?> targetClass) { // 自定义反序列化逻辑 if (targetClass == OrderMessage.class) { return JSON.parseObject( new String((byte[]) message.getPayload(), StandardCharsets.UTF_8), OrderMessage.class ); } return super.fromMessage(message, targetClass); } } 三、消息生产者实现 3.1 通用消息服务 // MessageService.java @Service @Slf4j public class MessageService { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送普通消息 */ public SendResult sendMessage(String topic, String tag, Object message) { String destination = buildDestination(topic, tag); try { Message<Object> springMessage = MessageBuilder .withPayload(message) .setHeader(RocketMQHeaders.KEYS, generateKey(message)) .setHeader("businessType", message.getClass().getSimpleName()) .build(); SendResult result = rocketMQTemplate.syncSend(destination, springMessage); log.info("消息发送成功: destination={}, msgId={}, sendStatus={}", destination, result.getMsgId(), result.getSendStatus()); // 记录监控指标 recordMetrics(topic, tag, true, 0); return result; } catch (Exception e) { log.error("消息发送失败: destination={}, error={}", destination, e.getMessage()); recordMetrics(topic, tag, false, 1); throw new RuntimeException("消息发送失败", e); } } /** * 发送延迟消息 */ public SendResult sendDelayMessage(String topic, String tag, Object message, int delayLevel) { String destination = buildDestination(topic, tag); Message<Object> springMessage = MessageBuilder .withPayload(message) .setHeader(RocketMQHeaders.KEYS, generateKey(message)) .build(); // delayLevel: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h SendResult result = rocketMQTemplate.syncSend(destination, springMessage, rocketMQTemplate.getProducer().getSendMsgTimeout(), delayLevel); log.info("延迟消息发送成功: destination={}, delayLevel={}, msgId={}", destination, delayLevel, result.getMsgId()); return result; } /** * 发送顺序消息 */ public SendResult sendOrderlyMessage(String topic, String tag, Object message, String hashKey) { String destination = buildDestination(topic, tag); Message<Object> springMessage = MessageBuilder .withPayload(message) .setHeader(RocketMQHeaders.KEYS, generateKey(message)) .build(); SendResult result = rocketMQTemplate.syncSendOrderly( destination, springMessage, hashKey); log.info("顺序消息发送成功: destination={}, hashKey={}, msgId={}", destination, hashKey, result.getMsgId()); return result; } /** * 发送事务消息 */ public TransactionSendResult sendTransactionMessage( String topic, String tag, Object message, Object arg) { String destination = buildDestination(topic, tag); Message<Object> springMessage = MessageBuilder .withPayload(message) .setHeader(RocketMQHeaders.KEYS, generateKey(message)) .setHeader("transactionId", UUID.randomUUID().toString()) .build(); TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( destination, springMessage, arg); log.info("事务消息发送成功: destination={}, transactionId={}, sendStatus={}", destination, result.getTransactionId(), result.getSendStatus()); return result; } /** * 批量发送消息 */ public SendResult sendBatchMessage(String topic, String tag, List<?> messages) { String destination = buildDestination(topic, tag); List<Message> rocketMQMessages = messages.stream() .map(msg -> { Message message = new Message(); message.setTopic(topic); message.setTags(tag); message.setKeys(generateKey(msg)); message.setBody(JSON.toJSONBytes(msg)); return message; }) .collect(Collectors.toList()); SendResult result = rocketMQTemplate.syncSend(destination, rocketMQMessages); log.info("批量消息发送成功: destination={}, count={}, msgId={}", destination, messages.size(), result.getMsgId()); return result; } /** * 异步发送消息 */ public void sendAsyncMessage(String topic, String tag, Object message, SendCallback sendCallback) { String destination = buildDestination(topic, tag); Message<Object> springMessage = MessageBuilder .withPayload(message) .setHeader(RocketMQHeaders.KEYS, generateKey(message)) .build(); rocketMQTemplate.asyncSend(destination, springMessage, sendCallback); log.info("异步消息已提交: destination={}", destination); } private String buildDestination(String topic, String tag) { return StringUtils.isBlank(tag) ? topic : topic + ":" + tag; } private String generateKey(Object message) { if (message instanceof BaseMessage) { return ((BaseMessage) message).getBusinessId(); } return UUID.randomUUID().toString(); } private void recordMetrics(String topic, String tag, boolean success, int failCount) { // 记录Prometheus指标 if (success) { Metrics.counter("rocketmq.message.sent", "topic", topic, "tag", tag, "status", "success" ).increment(); } else { Metrics.counter("rocketmq.message.sent", "topic", topic, "tag", tag, "status", "fail" ).increment(failCount); } } } 3.2 订单消息生产者 // OrderProducer.java @Component @Slf4j public class OrderProducer { @Autowired private MessageService messageService; /** * 发送订单创建消息 */ public void sendOrderCreateMessage(Order order) { OrderMessage message = OrderMessage.builder() .orderId(order.getOrderId()) .userId(order.getUserId()) .amount(order.getAmount()) .status(OrderStatus.CREATED) .createTime(new Date()) .build(); messageService.sendMessage("ORDER_TOPIC", "ORDER_CREATE", message); } /** * 发送订单支付消息(事务消息) */ public void sendOrderPaymentMessage(Order order, PaymentInfo paymentInfo) { OrderMessage message = OrderMessage.builder() .orderId(order.getOrderId()) .userId(order.getUserId()) .amount(order.getAmount()) .status(OrderStatus.PAID) .paymentInfo(paymentInfo) .build(); // 发送事务消息,确保订单更新和消息发送的原子性 messageService.sendTransactionMessage( "ORDER_TOPIC", "ORDER_PAYMENT", message, order // 传递订单对象作为事务回查参数 ); } /** * 发送订单状态变更消息(顺序消息) */ public void sendOrderStatusMessage(Order order, OrderStatus newStatus) { OrderMessage message = OrderMessage.builder() .orderId(order.getOrderId()) .userId(order.getUserId()) .status(newStatus) .updateTime(new Date()) .build(); // 使用订单ID作为顺序key,确保同一订单的消息顺序 messageService.sendOrderlyMessage( "ORDER_TOPIC", "ORDER_STATUS_" + newStatus.name(), message, order.getOrderId() ); } /** * 发送订单超时取消消息(延迟消息) */ public void sendOrderTimeoutMessage(Order order) { OrderMessage message = OrderMessage.builder() .orderId(order.getOrderId()) .userId(order.getUserId()) .status(OrderStatus.TIMEOUT) .build(); // 30分钟后发送超时消息(延迟级别16 = 30m) messageService.sendDelayMessage( "ORDER_TOPIC", "ORDER_TIMEOUT", message, 16 ); } } 四、消息消费者实现 4.1 订单消息消费者 // OrderConsumer.java @Component @Slf4j @RocketMQMessageListener( topic = "ORDER_TOPIC", consumerGroup = "order_consumer_group", messageModel = MessageModel.CLUSTERING, consumeMode = ConsumeMode.CONCURRENTLY, consumeThreadMax = 64, consumeThreadMin = 20 ) public class OrderConsumer implements RocketMQListener<OrderMessage>, RocketMQPushConsumerLifecycleListener { @Autowired private OrderService orderService; @Autowired private RedisTemplate<String, Object> redisTemplate; private final AtomicLong consumeCount = new AtomicLong(0); private final AtomicLong errorCount = new AtomicLong(0); @Override public void onMessage(OrderMessage message) { long count = consumeCount.incrementAndGet(); log.info("消费订单消息: orderId={}, status={}, count={}", message.getOrderId(), message.getStatus(), count); try { // 幂等性检查 if (isDuplicate(message)) { log.warn("重复消息,跳过处理: orderId={}", message.getOrderId()); return; } // 处理消息 processMessage(message); // 记录已处理 markAsProcessed(message); // 记录监控指标 recordSuccessMetrics(message); } catch (Exception e) { errorCount.incrementAndGet(); log.error("消息处理失败: orderId={}, error={}", message.getOrderId(), e.getMessage(), e); // 记录错误指标 recordErrorMetrics(message); // 重新抛出异常,触发重试 throw new RuntimeException(e); } } /** * 幂等性检查 */ private boolean isDuplicate(OrderMessage message) { String key = "msg:processed:" + message.getOrderId(); Boolean result = redisTemplate.opsForValue().setIfAbsent( key, "1", 24, TimeUnit.HOURS); return !Boolean.TRUE.equals(result); } /** * 处理消息 */ private void processMessage(OrderMessage message) { switch (message.getStatus()) { case CREATED: orderService.handleOrderCreated(message); break; case PAID: orderService.handleOrderPaid(message); break; case SHIPPED: orderService.handleOrderShipped(message); break; case COMPLETED: orderService.handleOrderCompleted(message); break; case CANCELLED: orderService.handleOrderCancelled(message); break; case TIMEOUT: orderService.handleOrderTimeout(message); break; default: log.warn("未知订单状态: {}", message.getStatus()); } } /** * 标记消息已处理 */ private void markAsProcessed(OrderMessage message) { String key = "msg:processed:" + message.getOrderId(); redisTemplate.opsForValue().set(key, JSON.toJSONString(message), 7, TimeUnit.DAYS); } /** * 记录成功指标 */ private void recordSuccessMetrics(OrderMessage message) { Metrics.counter("order.message.consumed", "status", message.getStatus().name(), "result", "success" ).increment(); } /** * 记录错误指标 */ private void recordErrorMetrics(OrderMessage message) { Metrics.counter("order.message.consumed", "status", message.getStatus().name(), "result", "error" ).increment(); } @Override public void prepareStart(DefaultMQPushConsumer consumer) { // 消费者启动前的准备工作 log.info("订单消费者准备启动: group={}", consumer.getConsumerGroup()); // 设置消费位点 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // 设置消费超时(默认15分钟) consumer.setConsumeTimeout(15); // 设置最大重试次数 consumer.setMaxReconsumeTimes(3); // 设置消息过滤 try { consumer.subscribe("ORDER_TOPIC", MessageSelector.bySql("status in ('PAID', 'SHIPPED')")); } catch (Exception e) { log.error("设置消息过滤失败", e); } } } 4.2 事务消息监听器 // TransactionListenerImpl.java @Component @Slf4j public class TransactionListenerImpl implements RocketMQLocalTransactionListener { @Autowired private OrderService orderService; @Autowired private TransactionLogService transactionLogService; /** * 执行本地事务 */ @Override public RocketMQLocalTransactionState executeLocalTransaction( Message msg, Object arg) { String transactionId = msg.getHeaders().get("transactionId", String.class); log.info("执行本地事务: transactionId={}", transactionId); try { // 解析消息 OrderMessage orderMessage = JSON.parseObject( new String((byte[]) msg.getPayload()), OrderMessage.class ); // 执行本地事务 Order order = (Order) arg; orderService.updateOrderStatus(order, OrderStatus.PAID); // 记录事务日志 transactionLogService.save( transactionId, orderMessage.getOrderId(), TransactionStatus.COMMIT ); log.info("本地事务执行成功: orderId={}", orderMessage.getOrderId()); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error("本地事务执行失败: transactionId={}", transactionId, e); // 记录失败 transactionLogService.save( transactionId, null, TransactionStatus.ROLLBACK ); return RocketMQLocalTransactionState.ROLLBACK; } } /** * 事务回查 */ @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { String transactionId = msg.getHeaders().get("transactionId", String.class); log.info("事务回查: transactionId={}", transactionId); // 查询事务日志 TransactionLog log = transactionLogService.findByTransactionId(transactionId); if (log == null) { log.warn("事务日志不存在: transactionId={}", transactionId); return RocketMQLocalTransactionState.UNKNOWN; } // 根据事务状态返回 switch (log.getStatus()) { case COMMIT: return RocketMQLocalTransactionState.COMMIT; case ROLLBACK: return RocketMQLocalTransactionState.ROLLBACK; default: return RocketMQLocalTransactionState.UNKNOWN; } } } 五、错误处理和重试 5.1 消费失败处理 // RetryMessageHandler.java @Component @Slf4j public class RetryMessageHandler { @Autowired private MessageService messageService; @Autowired private RedisTemplate<String, Object> redisTemplate; /** * 处理消费失败的消息 */ public void handleConsumeError(Message message, Exception error) { String messageId = message.getHeaders().get(RocketMQHeaders.MESSAGE_ID, String.class); String topic = message.getHeaders().get(RocketMQHeaders.TOPIC, String.class); // 记录重试次数 String retryKey = "retry:count:" + messageId; Long retryCount = redisTemplate.opsForValue().increment(retryKey); redisTemplate.expire(retryKey, 1, TimeUnit.DAYS); log.error("消息消费失败: messageId={}, topic={}, retryCount={}, error={}", messageId, topic, retryCount, error.getMessage()); // 根据重试次数决定处理策略 if (retryCount <= 3) { // 延迟重试 retryWithDelay(message, retryCount); } else if (retryCount <= 5) { // 发送到重试队列 sendToRetryQueue(message); } else { // 发送到死信队列 sendToDeadLetterQueue(message, error); } } /** * 延迟重试 */ private void retryWithDelay(Message message, Long retryCount) { // 计算延迟级别:第1次10s,第2次30s,第3次1m int delayLevel = retryCount.intValue() * 2 + 1; messageService.sendDelayMessage( message.getHeaders().get(RocketMQHeaders.TOPIC, String.class), "RETRY", message.getPayload(), delayLevel ); log.info("消息已发送到延迟重试队列: messageId={}, delayLevel={}", message.getHeaders().get(RocketMQHeaders.MESSAGE_ID), delayLevel); } /** * 发送到重试队列 */ private void sendToRetryQueue(Message message) { String retryTopic = "%RETRY%" + message.getHeaders().get(RocketMQHeaders.CONSUMER_GROUP); messageService.sendMessage(retryTopic, "RETRY", message.getPayload()); log.info("消息已发送到重试队列: topic={}", retryTopic); } /** * 发送到死信队列 */ private void sendToDeadLetterQueue(Message message, Exception error) { String dlqTopic = "%DLQ%" + message.getHeaders().get(RocketMQHeaders.CONSUMER_GROUP); // 添加错误信息 Map<String, Object> headers = new HashMap<>(message.getHeaders()); headers.put("errorMessage", error.getMessage()); headers.put("errorTime", new Date()); Message<Object> dlqMessage = MessageBuilder .withPayload(message.getPayload()) .copyHeaders(headers) .build(); messageService.sendMessage(dlqTopic, "DLQ", dlqMessage); log.error("消息已发送到死信队列: topic={}", dlqTopic); // 发送告警 sendAlert(message, error); } /** * 发送告警 */ private void sendAlert(Message message, Exception error) { // 发送告警通知(邮件、短信、钉钉等) String alertMessage = String.format( "消息处理失败告警\n" + "Topic: %s\n" + "MessageId: %s\n" + "Error: %s\n" + "Time: %s", message.getHeaders().get(RocketMQHeaders.TOPIC), message.getHeaders().get(RocketMQHeaders.MESSAGE_ID), error.getMessage(), new Date() ); // 这里可以集成告警系统 log.error("告警: {}", alertMessage); } } 六、监控和运维 6.1 健康检查 // RocketMQHealthIndicator.java @Component public class RocketMQHealthIndicator implements HealthIndicator { @Autowired private RocketMQTemplate rocketMQTemplate; @Override public Health health() { try { // 检查 NameServer 连接 DefaultMQProducer producer = rocketMQTemplate.getProducer(); producer.getDefaultMQProducerImpl().getmQClientFactory() .getMQClientAPIImpl().getNameServerAddressList(); // 检查 Broker 连接 Collection<String> topics = producer.getDefaultMQProducerImpl() .getmQClientFactory().getTopicRouteTable().keySet(); return Health.up() .withDetail("nameServer", producer.getNamesrvAddr()) .withDetail("producerGroup", producer.getProducerGroup()) .withDetail("topics", topics) .build(); } catch (Exception e) { return Health.down() .withDetail("error", e.getMessage()) .build(); } } } 6.2 监控指标收集 // RocketMQMetrics.java @Component @Slf4j public class RocketMQMetrics { private final MeterRegistry meterRegistry; private final Map<String, AtomicLong> counters = new ConcurrentHashMap<>(); public RocketMQMetrics(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; initMetrics(); } private void initMetrics() { // 发送成功计数 Gauge.builder("rocketmq.producer.send.success", counters, map -> map.getOrDefault("send.success", new AtomicLong()).get()) .register(meterRegistry); // 发送失败计数 Gauge.builder("rocketmq.producer.send.failure", counters, map -> map.getOrDefault("send.failure", new AtomicLong()).get()) .register(meterRegistry); // 消费成功计数 Gauge.builder("rocketmq.consumer.consume.success", counters, map -> map.getOrDefault("consume.success", new AtomicLong()).get()) .register(meterRegistry); // 消费失败计数 Gauge.builder("rocketmq.consumer.consume.failure", counters, map -> map.getOrDefault("consume.failure", new AtomicLong()).get()) .register(meterRegistry); // 消息堆积数 Gauge.builder("rocketmq.consumer.lag", this, RocketMQMetrics::getConsumerLag) .register(meterRegistry); } public void recordSendSuccess() { counters.computeIfAbsent("send.success", k -> new AtomicLong()).incrementAndGet(); } public void recordSendFailure() { counters.computeIfAbsent("send.failure", k -> new AtomicLong()).incrementAndGet(); } public void recordConsumeSuccess() { counters.computeIfAbsent("consume.success", k -> new AtomicLong()).incrementAndGet(); } public void recordConsumeFailure() { counters.computeIfAbsent("consume.failure", k -> new AtomicLong()).incrementAndGet(); } private double getConsumerLag() { // 实现获取消费延迟的逻辑 return 0.0; } } 七、部署和运维 7.1 Docker 部署 # docker-compose.yml version: '3.8' services: app: build: . container_name: rocketmq-springboot-demo environment: - SPRING_PROFILES_ACTIVE=prod - ROCKETMQ_NAMESRV=rocketmq-namesrv:9876 - JAVA_OPTS=-Xms1G -Xmx1G -XX:+UseG1GC ports: - "8080:8080" - "9090:9090" # Prometheus metrics depends_on: - rocketmq-namesrv - rocketmq-broker - redis - mysql networks: - rocketmq-network rocketmq-namesrv: image: apache/rocketmq:4.9.4 container_name: rocketmq-namesrv command: sh mqnamesrv ports: - "9876:9876" networks: - rocketmq-network rocketmq-broker: image: apache/rocketmq:4.9.4 container_name: rocketmq-broker command: sh mqbroker -n rocketmq-namesrv:9876 ports: - "10909:10909" - "10911:10911" networks: - rocketmq-network redis: image: redis:6.2 container_name: redis ports: - "6379:6379" networks: - rocketmq-network mysql: image: mysql:8.0 container_name: mysql environment: - MYSQL_ROOT_PASSWORD=root123 - MYSQL_DATABASE=rocketmq_demo ports: - "3306:3306" networks: - rocketmq-network networks: rocketmq-network: driver: bridge 7.2 Kubernetes 部署 # deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: rocketmq-springboot-demo namespace: default spec: replicas: 3 selector: matchLabels: app: rocketmq-springboot-demo template: metadata: labels: app: rocketmq-springboot-demo spec: containers: - name: app image: rocketmq-springboot-demo:latest ports: - containerPort: 8080 - containerPort: 9090 env: - name: SPRING_PROFILES_ACTIVE value: "prod" - name: ROCKETMQ_NAMESRV value: "rocketmq-namesrv-service:9876" - name: JAVA_OPTS value: "-Xms1G -Xmx1G -XX:+UseG1GC" resources: requests: memory: "1Gi" cpu: "500m" limits: memory: "2Gi" cpu: "1000m" livenessProbe: httpGet: path: /actuator/health port: 8080 initialDelaySeconds: 60 periodSeconds: 10 readinessProbe: httpGet: path: /actuator/health port: 8080 initialDelaySeconds: 30 periodSeconds: 5 --- apiVersion: v1 kind: Service metadata: name: rocketmq-springboot-demo-service spec: selector: app: rocketmq-springboot-demo ports: - name: http port: 8080 targetPort: 8080 - name: metrics port: 9090 targetPort: 9090 type: LoadBalancer 八、总结 通过这个完整的 SpringBoot 集成示例,我们实现了: ...

2025-11-13 · maneng

RocketMQ入门09:消费组机制 - 负载均衡与消息分发

引言:一个精妙的协调机制 假设你经营一家餐厅,有10个服务员(Consumer),需要服务100桌客人(Message Queue): 如何公平分配?每个服务员负责10桌? 如果有服务员请假,谁来接管他的桌子? 如果来了新服务员,如何重新分配? 如何记住每桌的服务进度? 这就是 RocketMQ Consumer Group 要解决的问题。 一、Consumer Group 核心概念 1.1 什么是 Consumer Group public class ConsumerGroupConcept { // Consumer Group 定义 class ConsumerGroup { String groupName; // 组名(全局唯一) List<String> consumerIdList; // 组内消费者列表 SubscriptionData subscriptionData; // 订阅信息 ConsumeType consumeType; // 消费模式 MessageModel messageModel; // 消息模式(集群/广播) ConsumeFromWhere consumeFromWhere; // 从何处开始消费 } // 核心规则 class CoreRules { // 1. 同一 Group 内的 Consumer 订阅关系必须一致 // 2. 同一 Group 内的 Consumer 均分消息 // 3. 不同 Group 相互独立,都能收到全量消息 // 4. Group 是消费进度管理的单位 } } 1.2 Consumer Group 的设计目标 public class DesignGoals { // 目标1:负载均衡 void loadBalancing() { // 10个 Queue,5个 Consumer // 每个 Consumer 负责 2个 Queue // 自动、公平、动态 } // 目标2:高可用 void highAvailability() { // Consumer 宕机,其他 Consumer 接管 // 无消息丢失 // 自动故障转移 } // 目标3:水平扩展 void horizontalScaling() { // 动态增加 Consumer // 自动重新分配 // 无需停机 } // 目标4:消费进度管理 void progressManagement() { // 统一管理消费位点 // 支持重置进度 // 持久化存储 } } 二、Rebalance 机制详解 2.1 什么是 Rebalance public class RebalanceMechanism { // Rebalance:重新分配 Queue 给 Consumer class RebalanceDefinition { // 触发时机: // 1. Consumer 上线/下线 // 2. Queue 数量变化 // 3. 订阅关系变化 // 目标: // 保证每个 Queue 只被一个 Consumer 消费 // 尽可能均匀分配 } // Rebalance 流程 class RebalanceFlow { public void doRebalance() { // 1. 获取 Topic 的所有 Queue List<MessageQueue> mqAll = getTopicQueues(topic); // 2. 获取 Consumer Group 的所有 Consumer List<String> cidAll = getConsumerIdList(topic, group); // 3. 排序保证一致性 Collections.sort(mqAll); Collections.sort(cidAll); // 4. 执行分配算法 List<MessageQueue> allocateResult = strategy.allocate( consumerGroup, currentCID, mqAll, cidAll ); // 5. 更新本地队列 updateProcessQueueTable(allocateResult); } } } 2.2 Rebalance 触发流程 Rebalance 触发流程: Consumer 加入: ┌──────────┐ 注册 ┌──────────┐ │Consumer1 │ ───────────────> │ Broker │ └──────────┘ └──────────┘ │ 通知其他Consumer ↓ ┌──────────┐ ┌──────────┐ │Consumer2 │ <──── Rebalance ─│Consumer3 │ └──────────┘ └──────────┘ 时间线: T0: Consumer1 加入 Group T1: Broker 通知所有 Consumer T2: 所有 Consumer 开始 Rebalance T3: 完成 Queue 重新分配 2.3 Rebalance 实现细节 public class RebalanceImplementation { // Consumer 端 Rebalance 服务 class RebalanceService extends ServiceThread { @Override public void run() { while (!this.isStopped()) { // 默认每 20 秒执行一次 this.waitForRunning(20000); this.doRebalance(); } } private void doRebalance() { // 对所有订阅的 Topic 执行 Rebalance for (Map.Entry<String, SubscriptionData> entry : subscriptionInner.entrySet()) { String topic = entry.getKey(); rebalanceByTopic(topic); } } private void rebalanceByTopic(String topic) { // 1. 获取 Topic 的队列信息 Set<MessageQueue> mqSet = topicSubscribeInfoTable.get(topic); // 2. 获取同组的所有 Consumer ID List<String> cidAll = mQClientFactory.findConsumerIdList( topic, consumerGroup ); // 3. 执行分配 if (mqSet != null && cidAll != null) { AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), new ArrayList<>(mqSet), cidAll ); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception", e); } // 4. 更新分配结果 updateProcessQueueTableInRebalance(topic, allocateResult); } } } // 更新本地队列表 private void updateProcessQueueTableInRebalance(String topic, List<MessageQueue> mqSet) { // 移除不再分配给自己的队列 for (MessageQueue mq : processQueueTable.keySet()) { if (!mqSet.contains(mq)) { ProcessQueue pq = processQueueTable.remove(mq); if (pq != null) { pq.setDropped(true); // 标记为丢弃 log.info("Drop queue: {}", mq); } } } // 添加新分配的队列 for (MessageQueue mq : mqSet) { if (!processQueueTable.containsKey(mq)) { ProcessQueue pq = new ProcessQueue(); processQueueTable.put(mq, pq); // 创建拉取请求 PullRequest pullRequest = new PullRequest(); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequest.setNextOffset(computePullFromWhere(mq)); // 立即执行拉取 pullMessageService.executePullRequestImmediately(pullRequest); log.info("Add new queue: {}", mq); } } } } 三、负载均衡策略 3.1 内置分配策略 public class AllocationStrategies { // 1. 平均分配策略(默认) class AllocateMessageQueueAveragely { /* 示例:8个Queue,3个Consumer Consumer0: Q0, Q1, Q2 Consumer1: Q3, Q4, Q5 Consumer2: Q6, Q7 */ public List<MessageQueue> allocate(String group, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); int averageSize = (mqAll.size() <= cidAll.size()) ? 1 : (mod > 0 && index < mod) ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size(); int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; int range = Math.min(averageSize, mqAll.size() - startIndex); return mqAll.subList(startIndex, startIndex + range); } } // 2. 环形平均分配 class AllocateMessageQueueAveragelyByCircle { /* 示例:8个Queue,3个Consumer Consumer0: Q0, Q3, Q6 Consumer1: Q1, Q4, Q7 Consumer2: Q2, Q5 */ public List<MessageQueue> allocate(String group, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { int index = cidAll.indexOf(currentCID); List<MessageQueue> result = new ArrayList<>(); for (int i = index; i < mqAll.size(); i += cidAll.size()) { result.add(mqAll.get(i)); } return result; } } // 3. 一致性Hash分配 class AllocateMessageQueueConsistentHash { /* 使用一致性Hash算法 优点:Consumer变化时,影响的Queue最少 */ private final int virtualNodeCnt; private final HashFunction hashFunc; public List<MessageQueue> allocate(String group, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { // 构建Hash环 TreeMap<Long, String> hashRing = new TreeMap<>(); for (String cid : cidAll) { for (int i = 0; i < virtualNodeCnt; i++) { long hash = hashFunc.hash(cid + "#" + i); hashRing.put(hash, cid); } } // 分配Queue List<MessageQueue> result = new ArrayList<>(); for (MessageQueue mq : mqAll) { long hash = hashFunc.hash(mq.toString()); Map.Entry<Long, String> entry = hashRing.ceilingEntry(hash); if (entry == null) { entry = hashRing.firstEntry(); } if (currentCID.equals(entry.getValue())) { result.add(mq); } } return result; } } // 4. 机房就近分配 class AllocateMessageQueueByMachineRoom { /* 根据机房分配,优先同机房 减少跨机房流量 */ public List<MessageQueue> allocate(String group, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { // 获取当前Consumer的机房 String currentRoom = getMachineRoom(currentCID); // 同机房的Queue List<MessageQueue> sameRoomQueues = new ArrayList<>(); // 其他机房的Queue List<MessageQueue> diffRoomQueues = new ArrayList<>(); for (MessageQueue mq : mqAll) { if (currentRoom.equals(getMachineRoom(mq.getBrokerName()))) { sameRoomQueues.add(mq); } else { diffRoomQueues.add(mq); } } // 优先分配同机房Queue List<MessageQueue> result = new ArrayList<>(); result.addAll(allocateQueues(sameRoomQueues, currentCID, cidAll)); // 如果还需要更多Queue,分配其他机房的 if (result.size() < getTargetQueueNum(mqAll.size(), cidAll.size())) { result.addAll(allocateQueues(diffRoomQueues, currentCID, cidAll)); } return result; } } // 5. 自定义分配策略 class CustomAllocationStrategy implements AllocateMessageQueueStrategy { @Override public List<MessageQueue> allocate(String group, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { // 根据业务需求自定义 // 例如:VIP Consumer 分配更多Queue if (isVipConsumer(currentCID)) { // VIP分配60%的Queue return allocateForVip(mqAll, currentCID, cidAll); } else { // 普通Consumer平分剩余40% return allocateForNormal(mqAll, currentCID, cidAll); } } } } 3.2 策略选择指南 public class StrategySelectionGuide { public void selectStrategy() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 场景1:默认场景 // 使用平均分配,简单公平 consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueAveragely() ); // 场景2:Consumer频繁变动 // 使用一致性Hash,减少重分配 consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueConsistentHash() ); // 场景3:多机房部署 // 使用机房就近,减少跨机房流量 consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueByMachineRoom() ); // 场景4:特殊业务需求 // 自定义策略 consumer.setAllocateMessageQueueStrategy( new CustomAllocationStrategy() ); } } 四、消费进度管理 4.1 消费进度存储 public class ConsumeProgressStorage { // 集群模式:进度存储在 Broker class ClusterModeProgress { // 存储位置:{ROCKETMQ_HOME}/store/config/consumerOffset.json /* { "offsetTable": { "ORDER_TOPIC@ORDER_GROUP": { "0": 123456, // Queue0 的消费进度 "1": 234567, // Queue1 的消费进度 "2": 345678, // Queue2 的消费进度 "3": 456789 // Queue3 的消费进度 } } } */ // Broker 端定期持久化(默认5秒) class ConsumerOffsetManager { private ConcurrentHashMap<String/* topic@group */, ConcurrentHashMap<Integer, Long>> offsetTable; public void commitOffset(String group, String topic, int queueId, long offset) { String key = topic + "@" + group; ConcurrentHashMap<Integer, Long> map = offsetTable.get(key); if (map != null) { Long oldOffset = map.put(queueId, offset); if (oldOffset == null || offset > oldOffset) { // 更新成功 } } } } } // 广播模式:进度存储在本地 class BroadcastModeProgress { // 存储位置:{user.home}/.rocketmq_offsets/{clientId}/{group}/offsets.json class LocalFileOffsetStore { private String storePath; private ConcurrentHashMap<MessageQueue, AtomicLong> offsetTable; public void persistAll() { // 持久化到本地文件 String jsonString = JSON.toJSONString(offsetTable); MixAll.string2File(jsonString, storePath); } public void load() { // 从本地文件加载 String content = MixAll.file2String(storePath); if (content != null) { offsetTable = JSON.parseObject(content, new TypeReference<ConcurrentHashMap<MessageQueue, AtomicLong>>(){}); } } } } } 4.2 消费进度提交 public class ProgressCommit { // Consumer 端进度提交 class ConsumerProgressCommit { // 定时提交(默认5秒) class ScheduledCommit { private ScheduledExecutorService scheduledExecutorService; public void start() { scheduledExecutorService.scheduleAtFixedRate(() -> { try { persistConsumerOffset(); } catch (Exception e) { log.error("persistConsumerOffset error", e); } }, 1000, 5000, TimeUnit.MILLISECONDS); } private void persistConsumerOffset() { for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) { MessageQueue mq = entry.getKey(); ProcessQueue pq = entry.getValue(); if (pq.isDropped()) { continue; } // 获取消费进度 long offset = pq.getConsumeOffset(); // 提交到 Broker offsetStore.updateOffset(mq, offset, false); } // 批量提交 offsetStore.persistAll(); } } // 关闭时提交 class ShutdownCommit { public void shutdown() { // 停止定时任务 scheduledExecutorService.shutdown(); // 最后一次提交 persistConsumerOffset(); // 等待提交完成 offsetStore.persistAll(); } } } } 4.3 进度重置 public class ProgressReset { // 重置消费进度的方式 class ResetMethods { // 1. 通过管理工具重置 void resetByAdmin() { // 重置到最早 // sh mqadmin resetOffset -g GROUP -t TOPIC -o 0 // 重置到最新 // sh mqadmin resetOffset -g GROUP -t TOPIC -o -1 // 重置到指定时间 // sh mqadmin resetOffset -g GROUP -t TOPIC -o 2023-11-13#10:00:00 } // 2. 通过代码重置 void resetByCode() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 设置从哪里开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // CONSUME_FROM_FIRST_OFFSET: 从最早消息 // CONSUME_FROM_LAST_OFFSET: 从最新消息 // CONSUME_FROM_TIMESTAMP: 从指定时间 // 设置消费时间点 consumer.setConsumeTimestamp("2023-11-13 10:00:00"); } // 3. 跳过堆积消息 void skipBacklog() { // 获取最大偏移量 long maxOffset = consumer.maxOffset(mq); // 更新到最大偏移量 consumer.updateConsumeOffset(mq, maxOffset); } } } 五、Rebalance 过程中的问题 5.1 消息重复消费 public class DuplicateConsumption { // 问题:Rebalance 导致消息重复 class Problem { /* 场景: 1. Consumer1 正在处理 Queue1 的消息 2. Rebalance 发生,Queue1 分配给 Consumer2 3. Consumer1 还没提交进度就失去了 Queue1 4. Consumer2 从旧进度开始消费,导致重复 */ } // 解决方案 class Solution { // 1. 幂等消费 class IdempotentConsumer { private Set<String> processedIds = new ConcurrentHashSet<>(); public ConsumeStatus consumeMessage(List<MessageExt> msgs) { for (MessageExt msg : msgs) { String bizId = msg.getKeys(); // 使用业务ID if (!processedIds.add(bizId)) { // 已处理,跳过 log.info("Message already processed: {}", bizId); continue; } // 处理消息 processMessage(msg); } return ConsumeStatus.CONSUME_SUCCESS; } } // 2. 减少 Rebalance 频率 class ReduceRebalance { void configure() { // 增加心跳间隔 consumer.setHeartbeatBrokerInterval(30000); // 30秒 // 使用一致性Hash consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueConsistentHash() ); } } } } 5.2 消费停顿 public class ConsumePause { // 问题:Rebalance 期间消费暂停 class Problem { /* Rebalance 过程: 1. 停止当前消费 2. 等待其他 Consumer Rebalance 3. 重新分配Queue 4. 恢复消费 期间有短暂停顿 */ } // 优化方案 class Optimization { // 1. 减少 Rebalance 时间 void reduceRebalanceTime() { // 减少 Consumer 数量变化 // 使用容器编排,批量上下线 } // 2. 平滑迁移 class SmoothMigration { void migrate() { // 先启动新 Consumer startNewConsumers(); // 等待 Rebalance 完成 Thread.sleep(30000); // 再停止旧 Consumer stopOldConsumers(); } } // 3. 预分配策略 void preAllocate() { // 预先规划好 Queue 分配 // 使用自定义分配策略 consumer.setAllocateMessageQueueStrategy( new StaticAllocationStrategy() ); } } } 六、监控和运维 6.1 消费组监控 public class ConsumerGroupMonitoring { // 监控指标 class Metrics { // 1. 消费进度监控 void monitorProgress() { // 消费延迟 long lag = maxOffset - consumerOffset; // 消费速度 double tps = (currentOffset - lastOffset) / interval; // 预计消费完成时间 double eta = lag / tps; } // 2. Rebalance 监控 void monitorRebalance() { // Rebalance 次数 int rebalanceCount; // Rebalance 耗时 long rebalanceDuration; // Queue 分配变化 Map<String, List<MessageQueue>> allocationHistory; } // 3. Consumer 健康度 void monitorHealth() { // 在线 Consumer 数量 int onlineConsumerCount; // 消费线程池状态 ThreadPoolExecutor executor = getConsumeExecutor(); int activeCount = executor.getActiveCount(); int queueSize = executor.getQueue().size(); } } // 运维命令 class AdminCommands { // 查看消费进度 void checkProgress() { // sh mqadmin consumerProgress -n localhost:9876 -g ORDER_GROUP } // 查看消费组列表 void listConsumerGroups() { // sh mqadmin consumerGroupList -n localhost:9876 } // 查看消费组详情 void consumerGroupDetail() { // sh mqadmin consumerConnection -n localhost:9876 -g ORDER_GROUP } } } 6.2 问题诊断 public class ProblemDiagnosis { // 常见问题诊断 class CommonProblems { // 1. 消费速度慢 void slowConsumption() { // 检查点: // - Consumer 数量是否足够 // - 消费线程池是否饱和 // - 单条消息处理时间 // - 网络延迟 // 解决: // - 增加 Consumer 实例 // - 增加消费线程数 // - 优化消费逻辑 // - 批量消费 } // 2. 消费不均衡 void imbalancedConsumption() { // 检查点: // - Queue 分配是否均匀 // - Consumer 处理能力是否一致 // - 是否有热点 Queue // 解决: // - 调整分配策略 // - 增加 Queue 数量 // - 使用一致性 Hash } // 3. Rebalance 频繁 void frequentRebalance() { // 检查点: // - Consumer 是否频繁上下线 // - 网络是否稳定 // - 心跳是否超时 // 解决: // - 增加心跳间隔 // - 优化部署策略 // - 使用容器编排 } } } 七、最佳实践 7.1 Consumer Group 设计原则 public class DesignPrinciples { // 1. 合理设置 Consumer 数量 class ConsumerCount { void calculate() { // Consumer 数量 <= Queue 数量 // 否则有 Consumer 空闲 // 建议: int queueCount = 16; int consumerCount = Math.min(queueCount, 8); // 不超过 Queue 数量 } } // 2. 订阅关系一致性 class SubscriptionConsistency { // 同一 Group 内所有 Consumer 必须: // - 订阅相同的 Topic // - 使用相同的 Tag 过滤 // - 使用相同的消费模式 // ❌ 错误示例 void wrongExample() { // Consumer1 consumer1.subscribe("TOPIC", "TagA"); // Consumer2(同组但不同Tag) consumer2.subscribe("TOPIC", "TagB"); // 错误! } // ✅ 正确示例 void correctExample() { // 所有 Consumer 订阅一致 consumer.subscribe("TOPIC", "TagA || TagB"); } } // 3. 消费进度管理 class ProgressManagement { void bestPractice() { // 定期监控消费延迟 monitorConsumerLag(); // 设置合理的提交间隔 consumer.setAutoCommitIntervalMillis(5000); // 优雅关闭 Runtime.getRuntime().addShutdownHook(new Thread(() -> { consumer.shutdown(); })); } } } 7.2 性能优化 public class PerformanceOptimization { // 1. 消费并发度调优 void optimizeConcurrency() { // 根据消息处理时间调整 consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(64); // 批量消费 consumer.setConsumeMessageBatchMaxSize(10); } // 2. 拉取优化 void optimizePull() { // 拉取批次大小 consumer.setPullBatchSize(32); // 拉取间隔(0表示不间断) consumer.setPullInterval(0); // 流控阈值 consumer.setPullThresholdForQueue(1000); } // 3. Rebalance 优化 void optimizeRebalance() { // 使用合适的分配策略 consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueConsistentHash() ); // 减少 Rebalance 触发 consumer.setConsumeFromWhere( ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET ); } } 八、总结 通过本篇,我们深入理解了: ...

2025-11-13 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...