RocketMQ架构07:网络通信模型与Remoting模块 - 基于Netty的高性能通信
引言:高性能通信的基石 RocketMQ 的网络通信模块(Remoting)是整个系统的神经网络,负责: Producer → Broker:发送消息 Consumer → Broker:拉取消息 Broker → NameServer:注册与心跳 Broker → Broker:主从同步 今天我们深入剖析这个基于 Netty 的高性能通信模块。 一、Remoting 模块架构 1.1 整体架构 ┌────────────────────────────────────────────────┐ │ Remoting 模块架构 │ ├────────────────────────────────────────────────┤ │ │ │ ┌──────────────────────────────────────────┐ │ │ │ RemotingServer (服务端) │ │ │ │ - NettyRemotingServer │ │ │ │ - 接收客户端请求 │ │ │ └──────────────────────────────────────────┘ │ │ ↕ │ │ ┌──────────────────────────────────────────┐ │ │ │ RemotingClient (客户端) │ │ │ │ - NettyRemotingClient │ │ │ │ - 发送请求到服务端 │ │ │ └──────────────────────────────────────────┘ │ │ ↕ │ │ ┌──────────────────────────────────────────┐ │ │ │ Netty 网络层 │ │ │ │ - EventLoopGroup │ │ │ │ - Channel Pipeline │ │ │ │ - 编解码器 │ │ │ └──────────────────────────────────────────┘ │ │ │ └────────────────────────────────────────────────┘ 1.2 核心组件 // 1. RemotingCommand - 通信协议对象 public class RemotingCommand { private int code; // 请求/响应码 private LanguageCode language; // 语言 private int version; // 版本 private int opaque; // 请求ID private int flag; // 标志位 private String remark; // 备注 private HashMap<String, String> extFields; // 扩展字段 private transient byte[] body; // 消息体 } // 2. NettyRequestProcessor - 请求处理器 public interface NettyRequestProcessor { RemotingCommand processRequest( ChannelHandlerContext ctx, RemotingCommand request ) throws Exception; } // 3. ResponseFuture - 异步响应 public class ResponseFuture { private final int opaque; private final long timeoutMillis; private final InvokeCallback invokeCallback; private final CountDownLatch countDownLatch; private volatile RemotingCommand responseCommand; } 二、请求类型与调用方式 2.1 三种调用方式 ┌─────────────────────────────────────────────┐ │ 三种调用方式对比 │ ├─────────────────────────────────────────────┤ │ │ │ 1. 同步调用 (Sync) │ │ Client Server │ │ │ │ │ │ │ ──── Request ─────────▶ │ │ │ │ (阻塞等待) │ Process │ │ │ │ │ │ │ ◀─── Response ──────── │ │ │ │ │ │ │ 特点:等待响应,吞吐量低 │ │ │ ├─────────────────────────────────────────────┤ │ │ │ 2. 异步调用 (Async) │ │ Client Server │ │ │ │ │ │ │ ──── Request ─────────▶ │ │ │ │ (立即返回) │ Process │ │ │ │ │ │ │ ◀─── Response ──────── │ │ │ │ (回调处理) │ │ │ │ │ 特点:非阻塞,吞吐量高 │ │ │ ├─────────────────────────────────────────────┤ │ │ │ 3. 单向调用 (Oneway) │ │ Client Server │ │ │ │ │ │ │ ──── Request ─────────▶ │ │ │ │ (不等响应) │ Process │ │ │ │ │ │ │ │ 特点:无响应,最高吞吐量 │ │ │ └─────────────────────────────────────────────┘ 2.2 代码示例 // 1. 同步调用 public RemotingCommand invokeSyncImpl( final Channel channel, final RemotingCommand request, final long timeoutMillis ) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { // 创建 ResponseFuture final ResponseFuture responseFuture = new ResponseFuture( channel, opaque, timeoutMillis, null, null ); this.responseTable.put(opaque, responseFuture); // 发送请求 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } responseFuture.setSendRequestOK(false); responseFuture.putResponse(null); } }); // 阻塞等待响应 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(); } else { throw new RemotingSendRequestException(); } } return responseCommand; } finally { this.responseTable.remove(opaque); } } // 2. 异步调用 public void invokeAsyncImpl( final Channel channel, final RemotingCommand request, final long timeoutMillis, final InvokeCallback invokeCallback ) throws InterruptedException, RemotingTooMuchRequestException, RemotingSendRequestException { final int opaque = request.getOpaque(); // 创建 ResponseFuture(带回调) final ResponseFuture responseFuture = new ResponseFuture( channel, opaque, timeoutMillis, invokeCallback, null ); this.responseTable.put(opaque, responseFuture); // 发送请求 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } // 失败立即回调 responseFuture.setSendRequestOK(false); responseFuture.putResponse(null); responseTable.remove(opaque); responseFuture.executeInvokeCallback(); } }); } // 3. 单向调用 public void invokeOnewayImpl( final Channel channel, final RemotingCommand request, final long timeoutMillis ) throws InterruptedException, RemotingTooMuchRequestException, RemotingSendRequestException { // 设置单向标志 request.markOnewayRPC(); // 直接发送,不等响应 channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (!f.isSuccess()) { log.warn("send request failed. channel={}", channel); } } }); } 三、请求码与处理器映射 3.1 常见请求码 public class RequestCode { // Producer 相关 public static final int SEND_MESSAGE = 10; public static final int SEND_MESSAGE_V2 = 310; public static final int SEND_BATCH_MESSAGE = 320; // Consumer 相关 public static final int PULL_MESSAGE = 11; public static final int QUERY_MESSAGE = 12; public static final int QUERY_MESSAGE_BY_KEY = 13; // Broker 相关 public static final int REGISTER_BROKER = 103; public static final int UNREGISTER_BROKER = 104; public static final int HEART_BEAT = 34; // 管理相关 public static final int GET_ALL_TOPIC_CONFIG = 21; public static final int UPDATE_BROKER_CONFIG = 25; } 3.2 处理器注册 public class BrokerController { public void registerProcessor() { // 1. 发送消息处理器 SendMessageProcessor sendProcessor = new SendMessageProcessor(this); this.remotingServer.registerProcessor( RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor ); // 2. 拉取消息处理器 PullMessageProcessor pullProcessor = new PullMessageProcessor(this); this.remotingServer.registerProcessor( RequestCode.PULL_MESSAGE, pullProcessor, this.pullMessageExecutor ); // 3. 查询消息处理器 QueryMessageProcessor queryProcessor = new QueryMessageProcessor(this); this.remotingServer.registerProcessor( RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor ); // 4. 心跳处理器 ClientManageProcessor clientManageProcessor = new ClientManageProcessor(this); this.remotingServer.registerProcessor( RequestCode.HEART_BEAT, clientManageProcessor, this.heartbeatExecutor ); // 5. 默认处理器 AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this); this.remotingServer.registerDefaultProcessor( adminProcessor, this.adminBrokerExecutor ); } } 四、编解码协议 4.1 协议格式 ┌─────────────────────────────────────────────┐ │ RocketMQ 通信协议格式 │ ├─────────────────────────────────────────────┤ │ │ │ 消息总长度(4字节) │ │ ┌──────────────────────────────┐ │ │ │ Total Length │ │ │ └──────────────────────────────┘ │ │ │ │ 序列化类型 + 头部长度(4字节) │ │ ┌──────────────────────────────┐ │ │ │ [1字节] [3字节] │ │ │ │ SerType Header Length │ │ │ └──────────────────────────────┘ │ │ │ │ 头部数据(变长) │ │ ┌──────────────────────────────┐ │ │ │ Header Data │ │ │ │ (JSON 或 RocketMQ 私有格式) │ │ │ └──────────────────────────────┘ │ │ │ │ 消息体(变长) │ │ ┌──────────────────────────────┐ │ │ │ Body Data │ │ │ └──────────────────────────────┘ │ │ │ └─────────────────────────────────────────────┘ 4.2 编码器实现 public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> { @Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } } } // RemotingCommand 编码 public ByteBuffer encodeHeader() { return encodeHeader(this.body != null ? this.body.length : 0); } public ByteBuffer encodeHeader(final int bodyLength) { // 1. 计算总长度 int length = 4; // 总长度字段本身 // 2. 序列化头部 byte[] headerData; headerData = this.headerEncode(); length += headerData.length; length += bodyLength; // 3. 分配缓冲区 ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // 4. 写入总长度 result.putInt(length); // 5. 写入序列化类型 + 头部长度 result.put(markProtocolType(headerData.length, SerializeType.JSON)); // 6. 写入头部数据 result.put(headerData); result.flip(); return result; } 五、线程模型 5.1 Netty 线程模型 ┌─────────────────────────────────────────────┐ │ Netty Reactor 线程模型 │ ├─────────────────────────────────────────────┤ │ │ │ Boss Group (1个线程) │ │ ┌──────────────────────────────┐ │ │ │ - 接收新连接 │ │ │ │ - 注册到 Worker Group │ │ │ └──────────────────────────────┘ │ │ ↓ │ │ Worker Group (多个线程) │ │ ┌──────────────────────────────┐ │ │ │ - I/O 读写 │ │ │ │ - 编解码 │ │ │ │ - 分发到业务线程池 │ │ │ └──────────────────────────────┘ │ │ ↓ │ │ 业务线程池 (可配置) │ │ ┌──────────────────────────────┐ │ │ │ - SendMessageExecutor │ │ │ │ - PullMessageExecutor │ │ │ │ - QueryMessageExecutor │ │ │ │ - HeartbeatExecutor │ │ │ └──────────────────────────────┘ │ │ │ └─────────────────────────────────────────────┘ 5.2 线程池配置 // Broker 启动时初始化线程池 public class BrokerController { private void initialize() { // 1. 发送消息线程池 this.sendMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getSendMessageThreadPoolNums(), // 核心线程数:默认16 this.brokerConfig.getSendMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.sendThreadPoolQueue, new ThreadFactoryImpl("SendMessageThread_") ); // 2. 拉取消息线程池 this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getPullMessageThreadPoolNums(), // 默认16+核心数 this.brokerConfig.getPullMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.pullThreadPoolQueue, new ThreadFactoryImpl("PullMessageThread_") ); // 3. 查询消息线程池 this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor( this.brokerConfig.getQueryMessageThreadPoolNums(), // 默认8+核心数 this.brokerConfig.getQueryMessageThreadPoolNums(), 1000 * 60, TimeUnit.MILLISECONDS, this.queryThreadPoolQueue, new ThreadFactoryImpl("QueryMessageThread_") ); } } // 配置建议 # broker.conf sendMessageThreadPoolNums=16 # 发送消息线程 pullMessageThreadPoolNums=32 # 拉取消息线程 queryMessageThreadPoolNums=16 # 查询消息线程 六、性能优化 6.1 零拷贝 // 使用 FileRegion 实现零拷贝传输 public class TransferMsgByHeap { public void transferData(SelectMappedBufferResult selectMappedBufferResult, Channel channel) { ByteBuffer byteBuffer = selectMappedBufferResult.getByteBuffer(); // 方式1:堆内存传输(有拷贝) byte[] data = new byte[byteBuffer.remaining()]; byteBuffer.get(data); channel.writeAndFlush(Unpooled.wrappedBuffer(data)); // 方式2:零拷贝传输(推荐) FileRegion fileRegion = new DefaultFileRegion( selectMappedBufferResult.getMappedFile().getFileChannel(), selectMappedBufferResult.getStartOffset(), selectMappedBufferResult.getSize() ); channel.writeAndFlush(fileRegion); } } 6.2 批量发送 // 生产者批量发送 DefaultMQProducer producer = new DefaultMQProducer("group"); producer.start(); List<Message> messages = new ArrayList<>(); for (int i = 0; i < 100; i++) { Message msg = new Message("Topic", "Tag", ("Hello" + i).getBytes()); messages.add(msg); } // 一次网络调用发送100条 SendResult result = producer.send(messages); 6.3 连接复用 // Client 连接池复用 public class NettyRemotingClient { // Channel 缓存表 private final ConcurrentMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<>(); // 获取或创建 Channel private Channel getAndCreateChannel(final String addr) throws InterruptedException { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { return cw.getChannel(); } return this.createChannel(addr); } // 优势:减少连接建立开销 } 七、总结与思考 7.1 核心要点 Netty 基础:RocketMQ 基于 Netty 构建网络通信 三种调用:同步、异步、单向,满足不同场景 请求处理器:请求码映射到具体处理器 自定义协议:高效的二进制协议 线程模型:Reactor + 业务线程池 7.2 思考题 为什么需要三种调用方式? ...