生产级最佳实践:Java并发编程完整指南

一、核心原则 1.1 安全第一 // 1. 线程安全的优先级 // 正确性 > 性能 > 可读性 // ❌ 错误:追求性能,忽略安全 public class UnsafeCounter { private int count = 0; public void increment() { count++; // 非原子操作,线程不安全 } } // ✅ 正确:优先保证线程安全 public class SafeCounter { private final AtomicInteger count = new AtomicInteger(0); public void increment() { count.incrementAndGet(); // 原子操作,线程安全 } } 1.2 最小化同步范围 // ❌ 错误:同步范围过大 public synchronized void process() { prepareData(); // 无需同步 accessSharedData(); // 需要同步 cleanup(); // 无需同步 } // ✅ 正确:最小化同步范围 public void process() { prepareData(); synchronized (lock) { accessSharedData(); // 只同步必要代码 } cleanup(); } 1.3 不变性优于锁 // ❌ 复杂:使用锁保护可变对象 public class MutablePoint { private int x, y; public synchronized void setX(int x) { this.x = x; } public synchronized int getX() { return x; } } // ✅ 简单:使用不变对象 public final class ImmutablePoint { private final int x, y; public ImmutablePoint(int x, int y) { this.x = x; this.y = y; } public int getX() { return x; // 无需同步 } } 二、线程与线程池最佳实践 2.1 务必使用线程池 // ❌ 错误:直接创建线程 for (int i = 0; i < 10000; i++) { new Thread(() -> task()).start(); // 线程数爆炸 } // ✅ 正确:使用线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, // 核心线程数、最大线程数 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadFactoryBuilder() .setNameFormat("business-pool-%d") .build(), new ThreadPoolExecutor.CallerRunsPolicy() ); for (int i = 0; i < 10000; i++) { executor.submit(() -> task()); } 2.2 线程池参数计算 // CPU密集型任务 int cpuCount = Runtime.getRuntime().availableProcessors(); int corePoolSize = cpuCount + 1; // I/O密集型任务 int corePoolSize = cpuCount * 2; // 混合型任务(推荐公式) // corePoolSize = N * (1 + WT/ST) // N = CPU核心数 // WT = 等待时间 // ST = 计算时间 int corePoolSize = cpuCount * (1 + waitTime / computeTime); // 队列容量 int queueCapacity = peakQPS * avgExecutionTime; // 最大线程数 int maximumPoolSize = corePoolSize * 2; 2.3 线程命名 // ✅ 使用ThreadFactory自定义线程名 ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("business-pool-%d") .setDaemon(false) .setPriority(Thread.NORM_PRIORITY) .setUncaughtExceptionHandler((t, e) -> { log.error("线程异常:" + t.getName(), e); }) .build(); ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy() ); 三、锁的最佳实践 3.1 锁的选择 // 1. 读多写少:StampedLock StampedLock sl = new StampedLock(); long stamp = sl.tryOptimisticRead(); // 读取数据 if (!sl.validate(stamp)) { stamp = sl.readLock(); try { // 重新读取 } finally { sl.unlockRead(stamp); } } // 2. 公平性要求:ReentrantLock(true) Lock lock = new ReentrantLock(true); // 3. 简单场景:synchronized synchronized (lock) { // 业务逻辑 } // 4. 高并发计数:LongAdder LongAdder counter = new LongAdder(); counter.increment(); 3.2 锁的粒度 // ❌ 粗粒度锁:性能差 public synchronized void process(String key, String value) { map.put(key, value); // 所有key共用一个锁 } // ✅ 细粒度锁:性能好 private final ConcurrentHashMap<String, Lock> lockMap = new ConcurrentHashMap<>(); public void process(String key, String value) { Lock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock()); lock.lock(); try { map.put(key, value); // 每个key独立锁 } finally { lock.unlock(); } } 3.3 避免死锁 // ✅ 固定加锁顺序 public void transfer(Account from, Account to, int amount) { Account first, second; if (from.getId() < to.getId()) { first = from; second = to; } else { first = to; second = from; } synchronized (first) { synchronized (second) { // 转账逻辑 } } } // ✅ 使用tryLock超时 if (lock1.tryLock(1, TimeUnit.SECONDS)) { try { if (lock2.tryLock(1, TimeUnit.SECONDS)) { try { // 业务逻辑 } finally { lock2.unlock(); } } } finally { lock1.unlock(); } } 四、并发集合最佳实践 4.1 集合选择 场景 推荐集合 理由 读多写少 CopyOnWriteArrayList 读无锁,性能高 高并发Map ConcurrentHashMap 分段锁,性能好 生产者-消费者 BlockingQueue 自带阻塞,简化代码 优先级队列 PriorityBlockingQueue 支持优先级 延迟队列 DelayQueue 支持延迟 4.2 ConcurrentHashMap正确用法 ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(); // ❌ 错误:组合操作不原子 if (!map.containsKey(key)) { map.put(key, value); // 线程不安全 } // ✅ 正确:使用原子方法 map.putIfAbsent(key, value); // ✅ 原子更新 map.compute(key, (k, v) -> (v == null ? 0 : v) + 1); // ✅ 原子替换 map.replace(key, oldValue, newValue); 五、异步编程最佳实践 5.1 CompletableFuture // ✅ 务必指定线程池 ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture.supplyAsync(() -> { return queryData(); }, executor) // 指定线程池 .thenApplyAsync(data -> { return processData(data); }, executor) .exceptionally(ex -> { log.error("异步任务失败", ex); return defaultValue; }) .thenAccept(result -> { log.info("结果:{}", result); }); // ✅ 并行查询 CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(() -> queryUser(), executor); CompletableFuture<List<Order>> orderFuture = CompletableFuture.supplyAsync(() -> queryOrders(), executor); CompletableFuture.allOf(userFuture, orderFuture) .thenApply(v -> { UserInfo user = userFuture.join(); List<Order> orders = orderFuture.join(); return new UserDetailDTO(user, orders); }); 5.2 超时控制 // ✅ 设置超时 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return slowQuery(); }, executor); try { String result = future.get(1, TimeUnit.SECONDS); } catch (TimeoutException e) { log.error("查询超时"); return defaultValue; } // ✅ Java 9+:orTimeout future.orTimeout(1, TimeUnit.SECONDS) .exceptionally(ex -> defaultValue); 六、生产环境配置 6.1 JVM参数 # 通用配置 -Xms4g -Xmx4g # 堆内存 -Xss1m # 线程栈 -XX:+UseG1GC # 使用G1垃圾回收器 -XX:MaxGCPauseMillis=200 # GC暂停时间目标 -XX:ParallelGCThreads=8 # 并行GC线程数 -XX:ConcGCThreads=2 # 并发GC线程数 # 偏向锁(低竞争场景) -XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0 # 容器环境 -XX:ActiveProcessorCount=2 # 显式指定CPU核心数 -Djava.util.concurrent.ForkJoinPool.common.parallelism=2 # 监控与调试 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/heapdump.hprof -Xloggc:/var/log/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps 6.2 Spring Boot配置 # application.yml server: tomcat: threads: max: 200 # 最大线程数 min-spare: 10 # 最小空闲线程数 accept-count: 100 # 等待队列长度 max-connections: 10000 # 最大连接数 spring: task: execution: pool: core-size: 10 max-size: 20 queue-capacity: 1000 thread-name-prefix: async-pool- 七、监控与告警 7.1 核心监控指标 // 1. 线程池监控 public class ThreadPoolMonitor { @Scheduled(fixedRate = 5000) public void monitor() { // 活跃线程数 int activeCount = executor.getActiveCount(); // 队列大小 int queueSize = executor.getQueue().size(); // 线程利用率 double threadUtilization = (double) activeCount / executor.getPoolSize() * 100; // 队列使用率 double queueUtilization = (double) queueSize / queueCapacity * 100; // 告警 if (threadUtilization > 90) { alert("线程池利用率过高:" + threadUtilization + "%"); } if (queueUtilization > 80) { alert("队列积压严重:" + queueUtilization + "%"); } } } 7.2 Prometheus + Grafana // 使用Micrometer导出指标 MeterRegistry registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); // 监控线程池 ExecutorServiceMetrics.monitor(registry, executor, "business-pool", "app", "my-app"); // 访问指标 // http://localhost:8080/actuator/prometheus 八、问题排查清单 8.1 CPU 100% 1. top -Hp <pid> # 找到CPU高的线程ID 2. printf "%x\n" <tid> # 转换为16进制 3. jstack <pid> | grep "0x<hex-id>" # 查看堆栈 4. 定位代码 → 修复问题 8.2 死锁 1. jstack <pid> | grep "Found one Java-level deadlock" 2. 分析等待链 3. 修复锁顺序 8.3 线程泄漏 1. jstack <pid> | wc -l # 查看线程数 2. jmap -dump:file=heap.hprof <pid> 3. MAT分析Thread对象 4. 定位泄漏点 九、代码审查检查项 9.1 必查项 是否使用线程池? 线程池是否指定线程名? 线程池是否配置拒绝策略? 是否有死锁风险? 是否正确处理中断? 是否有资源泄漏? 异常是否被正确捕获? 是否使用了正确的锁? 锁的粒度是否合理? 是否使用了线程安全的集合? 9.2 性能优化项 读多写少场景是否使用读写锁? 高并发计数是否使用LongAdder? 是否使用了不变对象? 是否最小化同步范围? 是否避免了锁竞争? 十、核心知识图谱 10.1 并发基础 线程基础 ├── 线程创建(Thread、Runnable、Callable) ├── 线程状态(NEW、RUNNABLE、BLOCKED、WAITING、TERMINATED) ├── 线程中断(interrupt、isInterrupted、interrupted) └── 线程通信(wait、notify、notifyAll) 10.2 内存模型 JMM(Java Memory Model) ├── 可见性(volatile、synchronized) ├── 有序性(happens-before) ├── 原子性(AtomicInteger、synchronized) └── CPU缓存(缓存一致性、伪共享) 10.3 同步工具 锁 ├── synchronized(偏向锁、轻量级锁、重量级锁) ├── ReentrantLock(公平锁、非公平锁、可中断) ├── ReadWriteLock(读锁、写锁) └── StampedLock(乐观读、悲观读、写锁) 原子类 ├── AtomicInteger(CAS) ├── LongAdder(分段累加) └── AtomicReference(对象原子操作) 并发集合 ├── ConcurrentHashMap(分段锁、CAS) ├── CopyOnWriteArrayList(读写分离) └── BlockingQueue(阻塞队列) 同步器 ├── CountDownLatch(倒计时) ├── CyclicBarrier(循环栅栏) ├── Semaphore(信号量) └── Phaser(多阶段同步) 10.4 线程池 ThreadPoolExecutor ├── 核心参数(corePoolSize、maximumPoolSize、keepAliveTime) ├── 工作队列(ArrayBlockingQueue、LinkedBlockingQueue) ├── 拒绝策略(AbortPolicy、CallerRunsPolicy) └── 线程工厂(ThreadFactory) 特殊线程池 ├── ScheduledThreadPoolExecutor(定时任务) ├── ForkJoinPool(工作窃取) └── CompletableFuture(异步编程) 十一、学习路径总结 11.1 基础篇(1-10篇) 为什么需要并发 进程与线程 线程生命周期 线程创建方式 线程中断机制 线程通信 线程安全问题 可见性、有序性、原子性 CPU缓存与多核 JMM与happens-before 11.2 原理篇(11-18篇) happens-before规则 volatile原理 synchronized原理 synchronized优化 原子类AtomicInteger CAS与ABA Lock与ReentrantLock ReadWriteLock读写锁 11.3 工具篇(19-28篇) 线程池原理 ThreadPoolExecutor详解 线程池最佳实践 BlockingQueue阻塞队列 ConcurrentHashMap CountDownLatch与CyclicBarrier Semaphore与Exchanger Phaser多阶段同步 CopyOnWriteArrayList CompletableFuture异步编程 11.4 实战篇(29-33篇) CompletableFuture实战 ForkJoinPool工作窃取 无锁编程与LongAdder StampedLock性能优化 并发设计模式 11.5 排查篇(34-39篇) 死锁的产生与排查 线程池监控与调优 JVM线程相关参数 并发问题排查工具 JMH性能测试 生产级最佳实践(本篇) 总结 Java并发编程是一个复杂但重要的技术领域,掌握并发编程需要: ...

2025-11-20 · maneng

集群流控的架构设计

架构设计 ┌────────────────────────────────┐ │ Token Server │ │ ├─ TokenService (令牌服务) │ │ ├─ FlowStatistics (统计) │ │ └─ ClusterFlowRuleChecker │ └────────┬───────────────────────┘ │ Netty通信 ┌────┴───┬────┬────┐ ↓ ↓ ↓ ↓ Client1 Client2 Client3 Client4 Token Server核心 TokenService public class TokenService { // 资源ID → FlowRule private final Map<Long, FlowRule> ruleMap = new ConcurrentHashMap<>(); // 资源ID → 统计数据 private final Map<Long, ClusterMetric> metricMap = new ConcurrentHashMap<>(); // 请求令牌 public TokenResult requestToken(Long ruleId, int acquireCount, boolean prioritized) { if (ruleId == null) { return new TokenResult(TokenResultStatus.NO_RULE_EXISTS); } FlowRule rule = ruleMap.get(ruleId); if (rule == null) { return new TokenResult(TokenResultStatus.NO_RULE_EXISTS); } // 获取统计数据 ClusterMetric metric = metricMap.computeIfAbsent(ruleId, k -> new ClusterMetric()); // 检查是否允许通过 boolean pass = canPass(rule, metric, acquireCount); if (pass) { metric.add(acquireCount); return new TokenResult(TokenResultStatus.OK); } else { return new TokenResult(TokenResultStatus.BLOCKED); } } private boolean canPass(FlowRule rule, ClusterMetric metric, int acquireCount) { double count = rule.getCount(); long currentCount = metric.getCurrentCount(); return currentCount + acquireCount <= count; } } ClusterMetric public class ClusterMetric { private final LeapArray<MetricBucket> data; public ClusterMetric() { this.data = new BucketLeapArray(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); } public long getCurrentCount() { data.currentWindow(); long pass = 0; List<MetricBucket> list = data.values(); for (MetricBucket bucket : list) { pass += bucket.pass(); } return pass; } public void add(int count) { WindowWrap<MetricBucket> wrap = data.currentWindow(); wrap.value().addPass(count); } } Token Client核心 ClusterFlowChecker public class ClusterFlowChecker { public static TokenResult acquireClusterToken(FlowRule rule, int acquireCount, boolean prioritized) { // 获取集群客户端 ClusterTokenClient client = TokenClientProvider.getClient(); if (client == null) { // 客户端未初始化,降级为本地限流 return null; } long flowId = rule.getClusterConfig().getFlowId(); // 请求令牌 TokenResult result = null; try { result = client.requestToken(flowId, acquireCount, prioritized); } catch (Exception ex) { // 请求失败,降级为本地限流 RecordLog.warn("[ClusterFlowChecker] Request cluster token failed", ex); } return result; } } TokenResult public class TokenResult { private Integer status; // 状态码 private int remaining; // 剩余令牌 private int waitInMs; // 等待时间 private Map<String, String> attachments; // 附加信息 public TokenResult(Integer status) { this.status = status; } public boolean isPass() { return status == TokenResultStatus.OK; } public boolean isBlocked() { return status == TokenResultStatus.BLOCKED; } public boolean shouldWait() { return status == TokenResultStatus.SHOULD_WAIT; } } public final class TokenResultStatus { public static final int OK = 0; public static final int BLOCKED = 1; public static final int SHOULD_WAIT = 2; public static final int NO_RULE_EXISTS = 3; public static final int BAD_REQUEST = 4; public static final int FAIL = 5; } Netty通信 Server端 public class TokenServer { private final int port; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; public void start() throws Exception { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new Decoder()) .addLast(new Encoder()) .addLast(new TokenServerHandler()); } }); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync(); } public void stop() { if (bossGroup != null) { bossGroup.shutdownGracefully(); } if (workerGroup != null) { workerGroup.shutdownGracefully(); } } } public class TokenServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof TokenRequest) { TokenRequest request = (TokenRequest) msg; // 请求令牌 TokenResult result = TokenService.requestToken( request.getFlowId(), request.getAcquireCount(), request.isPriority() ); // 响应 TokenResponse response = new TokenResponse(); response.setStatus(result.getStatus()); response.setRemaining(result.getRemaining()); ctx.writeAndFlush(response); } } } Client端 public class TokenClient { private EventLoopGroup group; private Bootstrap bootstrap; private Channel channel; public void start(String host, int port) throws Exception { group = new NioEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline() .addLast(new Encoder()) .addLast(new Decoder()) .addLast(new TokenClientHandler()); } }); channel = bootstrap.connect(host, port).sync().channel(); } public TokenResult requestToken(Long flowId, int acquireCount, boolean prioritized) { TokenRequest request = new TokenRequest(); request.setFlowId(flowId); request.setAcquireCount(acquireCount); request.setPriority(prioritized); // 发送请求 channel.writeAndFlush(request); // 等待响应(简化处理) return waitForResponse(); } } 失败降级 public class ClusterFlowSlot extends AbstractLinkedProcessorSlot<DefaultNode> { @Override public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, Object... args) throws Throwable { FlowRule rule = getRuleFor(resourceWrapper); if (rule != null && rule.isClusterMode()) { // 集群模式 TokenResult result = ClusterFlowChecker.acquireClusterToken(rule, count, false); if (result == null || result.getStatus() == TokenResultStatus.FAIL) { // Token Server不可用,降级为本地限流 fallbackToLocalOrPass(context, rule, node, count, args); } else if (result.isBlocked()) { // 集群限流拒绝 throw new FlowException(resourceWrapper.getName(), rule); } } else { // 本地模式 checkLocalFlow(context, resourceWrapper, node, count, args); } fireEntry(context, resourceWrapper, node, count, args); } private void fallbackToLocalOrPass(Context context, FlowRule rule, DefaultNode node, int count, Object... args) throws BlockException { if (rule.getClusterConfig().isFallbackToLocalWhenFail()) { // 降级为本地限流 checkLocalFlow(context, null, node, count, args); } else { // 直接通过 // Do nothing } } } 动态切换 Server选举 public class ServerElection { private final NacosNamingService namingService; private volatile boolean isLeader = false; public void elect() { try { // 注册临时节点 Instance instance = new Instance(); instance.setIp(NetUtil.getLocalHost()); instance.setPort(ClusterConstants.DEFAULT_TOKEN_SERVER_PORT); instance.setHealthy(true); instance.setWeight(1.0); namingService.registerInstance( ClusterConstants.TOKEN_SERVER_SERVICE_NAME, instance ); // 监听节点变化 namingService.subscribe( ClusterConstants.TOKEN_SERVER_SERVICE_NAME, event -> { if (event instanceof NamingEvent) { List<Instance> instances = ((NamingEvent) event).getInstances(); checkAndUpdateLeader(instances); } } ); } catch (Exception e) { RecordLog.warn("Election failed", e); } } private void checkAndUpdateLeader(List<Instance> instances) { if (instances.isEmpty()) { isLeader = false; return; } // 选择第一个实例为Leader Instance leader = instances.get(0); isLeader = isCurrentInstance(leader); if (isLeader) { startTokenServer(); } else { stopTokenServer(); connectToLeader(leader); } } } 总结 集群流控架构核心: ...

2025-11-20 · maneng

Sentinel核心架构设计全景

整体架构 ┌────────────────────────────────────────┐ │ Sentinel Client │ ├────────────────────────────────────────┤ │ API Layer (SphU/SphO/Entry) │ ├────────────────────────────────────────┤ │ Slot Chain (责任链模式) │ │ ├─ NodeSelectorSlot │ │ ├─ ClusterBuilderSlot │ │ ├─ StatisticSlot │ │ ├─ AuthoritySlot │ │ ├─ SystemSlot │ │ ├─ FlowSlot │ │ └─ DegradeSlot │ ├────────────────────────────────────────┤ │ Metrics (滑动窗口统计) │ ├────────────────────────────────────────┤ │ Rule Manager (规则管理) │ └────────────────────────────────────────┘ 核心组件 1. Entry(资源入口) public abstract class Entry implements AutoCloseable { private final long createTime; private Node curNode; private Node originNode; private Throwable error; protected Entry(ResourceWrapper resourceWrapper, ProcessorSlot<Object> chain, Context context) { this.resourceWrapper = resourceWrapper; this.chain = chain; this.context = context; this.createTime = TimeUtil.currentTimeMillis(); } @Override public void close() { if (chain != null) { chain.exit(context, resourceWrapper, count, args); } } } 作用: ...

2025-11-20 · maneng

RocketMQ架构03:Broker架构与核心组件 - 消息中枢的内部世界

引言:消息系统的心脏 如果说 NameServer 是 RocketMQ 的"大脑"(路由中心),那么 Broker 就是"心脏"(消息中枢)。它负责: 消息存储:持久化所有消息 消息处理:接收生产者消息,推送给消费者 高可用保障:主从同步、故障转移 性能优化:零拷贝、顺序写、内存映射 今天我们从第一性原理出发,逐步理解 Broker 的内部世界。 一、Broker 的核心职责 1.1 从需求出发 假设我们要设计一个消息存储节点,需要解决哪些问题? 基础需求: 1. 接收消息 → 需要网络通信模块 2. 存储消息 → 需要存储引擎 3. 分发消息 → 需要索引和查询机制 4. 保证可靠 → 需要主从同步 5. 高性能 → 需要优化 I/O Broker 就是围绕这5个核心需求设计的。 1.2 Broker 的三重身份 ┌─────────────────────────────────────┐ │ Broker 的三重身份 │ ├─────────────────────────────────────┤ │ 1. 消息接收器(Producer 视角) │ │ - 接收消息请求 │ │ - 验证权限 │ │ - 返回存储结果 │ ├─────────────────────────────────────┤ │ 2. 消息存储库(系统视角) │ │ - 持久化消息 │ │ - 管理索引 │ │ - 定期清理 │ ├─────────────────────────────────────┤ │ 3. 消息分发器(Consumer 视角) │ │ - 根据订阅关系推送消息 │ │ - 管理消费进度 │ │ - 支持消息重试 │ └─────────────────────────────────────┘ 二、Broker 架构全景 2.1 核心组件架构图 ┌────────────────────────────────────────────────────────┐ │ Broker 节点 │ ├────────────────────────────────────────────────────────┤ │ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ Remoting 模块(网络通信层) │ │ │ │ - NettyServer(接收请求) │ │ │ │ - RequestProcessor(请求处理器) │ │ │ └──────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ Message Store 模块(存储引擎) │ │ │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ │ │ CommitLog │ │ConsumeQueue│ │ IndexFile │ │ │ │ │ │ 消息主存储 │ │ 消费索引 │ │ 查询索引 │ │ │ │ │ └────────────┘ └────────────┘ └────────────┘ │ │ │ └──────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ HA 模块(高可用) │ │ │ │ - HAService(主从同步) │ │ │ │ - CommitLogDispatcher(消息分发) │ │ │ └──────────────────────────────────────────────────┘ │ │ ↓ │ │ ┌──────────────────────────────────────────────────┐ │ │ │ 其他核心模块 │ │ │ │ - TopicConfigManager(Topic 配置管理) │ │ │ │ - ConsumerOffsetManager(消费进度管理) │ │ │ │ - ScheduleMessageService(延迟消息) │ │ │ │ - TransactionalMessageService(事务消息) │ │ │ └──────────────────────────────────────────────────┘ │ │ │ └────────────────────────────────────────────────────────┘ 2.2 各模块职责详解 1️⃣ Remoting 模块(网络通信) // 核心职责 1. 接收网络请求(基于 Netty) 2. 请求路由与分发 3. 编解码(序列化/反序列化) 4. 返回响应结果 // 处理的请求类型 - SEND_MESSAGE // 发送消息 - PULL_MESSAGE // 拉取消息 - QUERY_MESSAGE // 查询消息 - HEART_BEAT // 心跳 - REGISTER_BROKER // Broker 注册 关键设计: ...

2025-11-13 · maneng

RocketMQ架构01:整体架构设计 - NameServer、Broker、Producer、Consumer

引言:一个精妙的分布式系统 如果把 RocketMQ 比作一座现代化城市,那么: NameServer 是城市规划局,管理所有地图信息 Broker 是物流仓库,存储和转运货物 Producer 是发货方,把货物送到仓库 Consumer 是收货方,从仓库取货 今天,我们从整体视角理解这座"消息城市"的运作机制。 一、RocketMQ 整体架构 1.1 核心组件 RocketMQ 架构全景图: ┌────────────────────────────────────────────┐ │ NameServer Cluster │ │ (路由注册中心、服务发现、轻量级协调器) │ └────────┬───────────────────┬───────────────┘ │ │ 注册/心跳 │ │ 获取路由 │ │ ┌────────────▼──────┐ ┌────▼────────────────┐ │ │ │ │ │ Broker Cluster │ │ Producer/Consumer │ │ │ │ │ │ ┌───────────────┐ │ │ 应用程序 │ │ │ Master Broker │ │◄────┤ │ │ │ - CommitLog │ │ 读写 │ │ │ │ - ConsumeQ │ │ 消息 │ │ │ │ - IndexFile │ │ │ │ │ └───────┬───────┘ │ └─────────────────────┘ │ │ 主从同步 │ │ ┌───────▼───────┐ │ │ │ Slave Broker │ │ │ │ - CommitLog │ │ │ │ - ConsumeQ │ │ │ │ - IndexFile │ │ │ └───────────────┘ │ └───────────────────┘ 1.2 组件职责矩阵 组件职责对比: ┌──────────────┬────────────────────────────────────────┐ │ 组件 │ 核心职责 │ ├──────────────┼────────────────────────────────────────┤ │ NameServer │ - 路由信息管理(Topic → Broker 映射) │ │ │ - Broker 注册与心跳检测 │ │ │ - 提供路由信息查询 │ │ │ - 无状态、对等集群 │ ├──────────────┼────────────────────────────────────────┤ │ Broker │ - 消息存储(CommitLog) │ │ │ - 消息索引(ConsumeQueue、IndexFile) │ │ │ - 消息查询与消费 │ │ │ - 高可用(主从复制、Dledger) │ ├──────────────┼────────────────────────────────────────┤ │ Producer │ - 消息生产 │ │ │ - 消息发送(同步、异步、单向) │ │ │ - 负载均衡(选择Broker和Queue) │ │ │ - 故障规避 │ ├──────────────┼────────────────────────────────────────┤ │ Consumer │ - 消息消费 │ │ │ - 消息拉取(长轮询) │ │ │ - 负载均衡(Rebalance) │ │ │ - 消费进度管理 │ └──────────────┴────────────────────────────────────────┘ 二、NameServer 详解 2.1 为什么需要 NameServer public class WhyNameServer { // 问题1:Producer/Consumer 如何知道 Broker 在哪里? class ServiceDiscovery { // 没有 NameServer: // - 硬编码 Broker 地址(不灵活) // - 无法动态感知 Broker 变化 // 有了 NameServer: // - 动态获取 Broker 列表 // - 自动感知 Broker 上下线 } // 问题2:Topic 分布在哪些 Broker 上? class TopicRoute { // NameServer 维护 Topic 路由信息 class TopicRouteData { List<QueueData> queueDatas; // Queue 分布 List<BrokerData> brokerDatas; // Broker 信息 } } // 问题3:为什么不用 ZooKeeper? class WhyNotZooKeeper { /* ZooKeeper 的问题: 1. CP 系统(强一致性)- RocketMQ 只需 AP(可用性优先) 2. 重量级依赖 - NameServer 更轻量 3. 运维复杂 - NameServer 无状态更简单 4. 性能开销 - NameServer 内存操作更快 NameServer 的优势: 1. 完全无状态 2. 对等集群(任意节点提供服务) 3. 内存操作(无磁盘 IO) 4. 简单高效 */ } } 2.2 NameServer 核心功能 public class NameServerCore { // 1. 路由信息管理 class RouteInfoManager { // Topic → Broker 映射 private HashMap<String/* topic */, List<QueueData>> topicQueueTable; // Broker 名称 → Broker 数据 private HashMap<String/* brokerName */, BrokerData> brokerAddrTable; // Broker 地址 → 集群信息 private HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable; // 集群名称 → Broker 名称列表 private HashMap<String/* clusterName */, Set<String>> clusterAddrTable; // 过滤服务器 private HashMap<String/* brokerAddr */, List<String>> filterServerTable; } // 2. Broker 注册 public RegisterBrokerResult registerBroker( String clusterName, String brokerAddr, String brokerName, long brokerId, String haServerAddr, TopicConfigSerializeWrapper topicConfigWrapper) { // 创建或更新 Broker 数据 BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null == brokerData) { brokerData = new BrokerData(clusterName, brokerName, new HashMap<>()); this.brokerAddrTable.put(brokerName, brokerData); } brokerData.getBrokerAddrs().put(brokerId, brokerAddr); // 更新 Topic 路由信息 if (null != topicConfigWrapper) { for (Map.Entry<String, TopicConfig> entry : topicConfigWrapper.getTopicConfigTable().entrySet()) { createAndUpdateQueueData(brokerName, entry.getValue()); } } // 更新 Broker 存活信息 BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put( brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr ) ); return new RegisterBrokerResult(...); } // 3. 路由查询 public TopicRouteData getTopicRouteData(String topic) { TopicRouteData topicRouteData = new TopicRouteData(); // 查找 Topic 的 Queue 信息 List<QueueData> queueDataList = this.topicQueueTable.get(topic); // 查找对应的 Broker 信息 Set<String> brokerNameSet = new HashSet<>(); for (QueueData qd : queueDataList) { brokerNameSet.add(qd.getBrokerName()); } // 组装路由数据 for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (brokerData != null) { topicRouteData.getBrokerDatas().add(brokerData.clone()); } } topicRouteData.setQueueDatas(queueDataList); return topicRouteData; } // 4. Broker 心跳检测 public void scanNotActiveBroker() { long timeoutMillis = 120000; // 2 分钟超时 for (Map.Entry<String, BrokerLiveInfo> entry : this.brokerLiveTable.entrySet()) { long last = entry.getValue().getLastUpdateTimestamp(); if ((last + timeoutMillis) < System.currentTimeMillis()) { // Broker 超时,移除 RemotingUtil.closeChannel(entry.getValue().getChannel()); this.brokerLiveTable.remove(entry.getKey()); // 清理路由信息 this.onChannelDestroy(entry.getKey(), entry.getValue().getChannel()); log.warn("Broker timeout: {}", entry.getKey()); } } } } 2.3 NameServer 集群 public class NameServerCluster { // NameServer 特点 class Characteristics { /* 1. 完全无状态 - 不持久化任何数据 - 所有数据在内存中 - 重启后数据丢失(从 Broker 重新获取) 2. 对等部署 - 所有节点功能完全一致 - 无主从之分 - 无需选举 3. 不互相通信 - NameServer 之间无任何通信 - 数据可能短暂不一致 - 最终一致即可 4. Broker 全量注册 - Broker 向所有 NameServer 注册 - 保证数据冗余 */ } // 部署架构 class DeploymentArchitecture { /* NameServer 集群(推荐3个节点): ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ NameServer1 │ │ NameServer2 │ │ NameServer3 │ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │ │ │ └────────────────┼────────────────┘ │ ┌───────▼────────┐ │ │ │ Broker 集群 │ │ (全量注册) │ │ │ └────────────────┘ */ } } 三、Broker 详解 3.1 Broker 核心架构 Broker 内部架构: ┌─────────────────────────────────────────────────────┐ │ Broker │ │ ┌────────────────────────────────────────────────┐│ │ │ Remoting 通信层 ││ │ │ (接收 Producer/Consumer 请求) ││ │ └────────┬───────────────────────────────────────┘│ │ │ │ │ ┌────────▼────────────────────────────┐ │ │ │ BrokerController │ │ │ │ - 处理器注册 │ │ │ │ - 定时任务管理 │ │ │ │ - 消息处理 │ │ │ └────────┬────────────────────────────┘ │ │ │ │ │ ┌────────▼─────────────┬───────────────────┐ │ │ │ MessageStore │ HAService │ │ │ │ ┌────────────────┐ │ (主从复制) │ │ │ │ │ CommitLog │ │ │ │ │ │ │ (消息存储) │ │ │ │ │ │ └────────────────┘ │ │ │ │ │ ┌────────────────┐ │ │ │ │ │ │ ConsumeQueue │ │ │ │ │ │ │ (消费索引) │ │ │ │ │ │ └────────────────┘ │ │ │ │ │ ┌────────────────┐ │ │ │ │ │ │ IndexFile │ │ │ │ │ │ │ (消息索引) │ │ │ │ │ │ └────────────────┘ │ │ │ │ └──────────────────────┴───────────────────┘ │ │ │ │ ┌─────────────────────────────────────────────┐ │ │ │ 定时任务 │ │ │ │ - 持久化 ConsumeQueue │ │ │ │ - 持久化 CommitLog │ │ │ │ - 清理过期文件 │ │ │ │ - 统计信息 │ │ │ └─────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────┘ 3.2 Broker 核心职责 public class BrokerCore { // 1. 消息接收与存储 class MessageReceive { public PutMessageResult putMessage(MessageExtBrokerInner msg) { // 写入 CommitLog PutMessageResult result = this.commitLog.putMessage(msg); // 触发刷盘 handleDiskFlush(result, msg); // 触发主从同步 handleHA(result, msg); return result; } } // 2. 消息查询 class MessageQuery { // 按消息 ID 查询 public MessageExt lookMessageByOffset(long commitLogOffset) { return this.commitLog.lookMessageByOffset(commitLogOffset); } // 按 Key 查询 public QueryMessageResult queryMessage(String topic, String key, int maxNum) { return this.indexService.queryMessage(topic, key, maxNum); } // 按时间查询 public QueryMessageResult queryMessageByTime(String topic, long begin, long end) { return this.messageStore.queryMessage(topic, begin, end, maxNum); } } // 3. 消息消费 class MessageConsume { public GetMessageResult getMessage( String group, String topic, int queueId, long offset, int maxMsgNums, SubscriptionData subscriptionData) { // 从 ConsumeQueue 查找消息位置 ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId); // 根据 offset 获取消息 SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset); // 从 CommitLog 读取消息内容 List<MessageExt> messageList = new ArrayList<>(); for (int i = 0; i < maxMsgNums; i++) { long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); MessageExt msg = lookMessageByOffset(offsetPy, sizePy); // 消息过滤 if (match(subscriptionData, msg)) { messageList.add(msg); } } return new GetMessageResult(messageList); } } // 4. Topic 创建 class TopicManagement { public void createTopic(String topic, int queueNums) { TopicConfig topicConfig = new TopicConfig(topic); topicConfig.setReadQueueNums(queueNums); topicConfig.setWriteQueueNums(queueNums); topicConfig.setPerm(6); // 读写权限 this.topicConfigTable.put(topic, topicConfig); // 创建 ConsumeQueue for (int i = 0; i < queueNums; i++) { ConsumeQueue logic = new ConsumeQueue(topic, i); this.consumeQueueTable.put(buildKey(topic, i), logic); } } } // 5. 消费进度管理 class ConsumerOffsetManagement { private ConcurrentHashMap<String, ConcurrentHashMap<Integer, Long>> offsetTable; public void commitOffset(String group, String topic, int queueId, long offset) { String key = topic + "@" + group; ConcurrentHashMap<Integer, Long> map = offsetTable.get(key); if (map != null) { map.put(queueId, offset); } } public long queryOffset(String group, String topic, int queueId) { String key = topic + "@" + group; ConcurrentHashMap<Integer, Long> map = offsetTable.get(key); if (map != null) { Long offset = map.get(queueId); return offset != null ? offset : -1; } return -1; } } } 3.3 Broker 高可用 public class BrokerHA { // 主从架构 class MasterSlave { /* 主从模式: ┌──────────────┐ 同步 ┌──────────────┐ │ Master Broker│ ──────────> │ Slave Broker │ │ - 读写 │ │ - 只读 │ └──────────────┘ └──────────────┘ 同步方式: 1. 同步复制(SYNC_MASTER): - Master 等待 Slave 同步完成才返回 - 可靠性高,性能稍差 2. 异步复制(ASYNC_MASTER): - Master 立即返回,后台异步同步 - 性能高,可能丢失少量数据 */ } // Dledger 模式 class DledgerMode { /* 基于 Raft 的自动选主: ┌──────────┐ ┌──────────┐ ┌──────────┐ │ Broker 1 │ │ Broker 2 │ │ Broker 3 │ │ (Leader) │◄─▶│(Follower)│◄─▶│(Follower)│ └──────────┘ └──────────┘ └──────────┘ 特点: 1. 自动故障转移 2. 无需人工干预 3. 保证数据一致性 4. 至少3个节点 */ } } 四、Producer 工作原理 4.1 Producer 架构 public class ProducerArchitecture { // Producer 核心组件 class ProducerComponents { private MQClientInstance clientInstance; // 客户端实例 private MQClientAPIImpl mqClientAPI; // 通信API private TopicPublishInfo publishInfo; // Topic路由信息 private SendMessageHook sendHook; // 发送钩子 } // 发送流程 public SendResult send(Message msg) { // 1. 查找 Topic 路由信息 TopicPublishInfo publishInfo = tryToFindTopicPublishInfo(msg.getTopic()); // 2. 选择消息队列 MessageQueue mq = selectOneMessageQueue(publishInfo); // 3. 发送消息 SendResult result = sendKernelImpl(msg, mq); // 4. 更新故障规避信息 updateFaultItem(mq.getBrokerName(), result); return result; } } 4.2 Producer 负载均衡 public class ProducerLoadBalance { // 队列选择策略 public MessageQueue selectQueue(TopicPublishInfo tpInfo) { // 1. 简单轮询 int index = sendWhichQueue.incrementAndGet() % queueSize; return tpInfo.getMessageQueueList().get(index); // 2. 故障规避 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { return mq; } // 3. 选择延迟最小的 return latencyFaultTolerance.pickOneAtLeast(); } } 五、Consumer 工作原理 5.1 Consumer 架构 public class ConsumerArchitecture { // Consumer 核心组件 class ConsumerComponents { private RebalanceService rebalanceService; // 负载均衡服务 private PullMessageService pullMessageService; // 拉取服务 private ConsumeMessageService consumeService; // 消费服务 private OffsetStore offsetStore; // 进度存储 } // 消费流程 public void start() { // 1. 启动负载均衡 rebalanceService.start(); // 2. 启动拉取服务 pullMessageService.start(); // 3. 启动消费服务 consumeService.start(); // 4. 注册到 Broker registerConsumer(); } } 5.2 Consumer 负载均衡(Rebalance) public class ConsumerRebalance { public void doRebalance() { // 1. 获取 Topic 的所有 Queue List<MessageQueue> mqAll = getTopicQueues(topic); // 2. 获取同组的所有 Consumer List<String> cidAll = getConsumerList(group); // 3. 排序(保证一致性视图) Collections.sort(mqAll); Collections.sort(cidAll); // 4. 执行分配 List<MessageQueue> allocated = allocateStrategy.allocate( group, currentCID, mqAll, cidAll ); // 5. 更新本地队列 updateProcessQueue(allocated); } } 六、完整消息流转 6.1 发送消息完整流程 消息发送完整流程: ┌───────────┐ │ Producer │ └─────┬─────┘ │1. 查询路由 ▼ ┌───────────┐ │NameServer │ 返回路由信息(Broker列表、Queue信息) └─────┬─────┘ │ ▼ ┌───────────┐ │ Producer │ 2. 选择 Broker 和 Queue └─────┬─────┘ │3. 发送消息 ▼ ┌───────────┐ │ Broker │ 4. 写入 CommitLog │ ┌───────┐ │ │ │CommitLog│ │ └───┬───┘ │ │ │5. 异步构建 ConsumeQueue │ ┌───▼──────┐ │ │ConsumeQ │ │ └──────────┘ │ │6. 返回结果 └─────┼─────┘ ▼ ┌───────────┐ │ Producer │ 7. 收到发送结果 └───────────┘ 6.2 消费消息完整流程 消息消费完整流程: ┌───────────┐ │ Consumer │ └─────┬─────┘ │1. 注册到 Broker ▼ ┌───────────┐ │ Broker │ 保存 Consumer 信息 └─────┬─────┘ │2. Rebalance 分配 Queue ▼ ┌───────────┐ │ Consumer │ 3. 发起长轮询拉取 └─────┬─────┘ │ ▼ ┌───────────┐ │ Broker │ 4. 从 ConsumeQueue 查找 │ ┌───────┐ │ 5. 从 CommitLog 读取消息 │ │ConsumeQ│ │ │ └───┬───┘ │ │ │ │ │ ┌───▼────┐│ │ │CommitLog││ │ └────────┘│ │ │6. 返回消息 └─────┼─────┘ ▼ ┌───────────┐ │ Consumer │ 7. 处理消息 └─────┬─────┘ 8. 更新消费进度 │ ▼ ┌───────────┐ │ Broker │ 9. 保存消费进度 └───────────┘ 七、设计亮点与思考 7.1 设计亮点 public class DesignHighlights { // 1. NameServer 的轻量化设计 class LightweightNameServer { /* - 无状态,内存操作 - 对等部署,无需选举 - 简单高效 - AP 而非 CP */ } // 2. CommitLog 的顺序写 class SequentialWrite { /* - 所有消息顺序写入同一个文件 - 充分利用磁盘顺序写性能 - 避免随机 IO */ } // 3. ConsumeQueue 的索引设计 class IndexDesign { /* - 轻量级索引(20字节) - 快速定位消息位置 - 支持消费进度管理 */ } // 4. 长轮询的伪 Push class LongPolling { /* - Consumer 主动拉取 - Broker Hold 请求 - 兼顾实时性和可控性 */ } } 7.2 架构权衡 public class ArchitectureTradeoffs { // 权衡1:性能 vs 可靠性 class PerformanceVsReliability { // 异步刷盘 → 高性能,可能丢消息 // 同步刷盘 → 零丢失,性能下降 // 异步复制 → 高性能,可能丢消息 // 同步复制 → 高可靠,性能下降 } // 权衡2:一致性 vs 可用性 class ConsistencyVsAvailability { // NameServer 选择 AP // - 可能短暂不一致 // - 但保证高可用 // Dledger 模式选择 CP // - 保证数据一致 // - 需要多数节点存活 } // 权衡3:复杂度 vs 功能 class ComplexityVsFeature { // RocketMQ 比 Kafka 复杂 // - 但功能更丰富(事务、延迟、顺序) // RocketMQ 比 RabbitMQ 简单 // - 但性能更高 } } 八、总结 通过本篇,我们从整体视角理解了 RocketMQ 的架构: ...

2025-11-13 · maneng

RocketMQ入门07:消息模型深入 - 点对点vs发布订阅

引言:一个模型,两种模式 传统消息系统通常要在两种模型中二选一: JMS:明确区分 Queue(点对点)和 Topic(发布订阅) AMQP:通过 Exchange 和 Binding 实现不同模式 Kafka:只有 Topic,通过 Consumer Group 实现不同语义 而 RocketMQ 的独特之处在于:用一套模型,同时支持两种模式。 让我们深入理解这是如何实现的。 一、两种基本消息模型 1.1 点对点模型(Point-to-Point) public class P2PModel { // 点对点模型特征 class Characteristics { // 1. 一条消息只能被一个消费者消费 // 2. 消息被消费后从队列中删除 // 3. 多个消费者竞争消息 // 4. 适合任务分发场景 } // 传统 JMS Queue 实现 class TraditionalQueue { Queue<Message> queue = new LinkedBlockingQueue<>(); // 生产者发送 public void send(Message msg) { queue.offer(msg); } // 消费者接收(竞争) public Message receive() { return queue.poll(); // 取出即删除 } } } 点对点模型示意图: ┌──────────┐ Producer ──> Queue ──> Consumer1 (获得消息) └──────────┘ ──> Consumer2 (没有获得) ──> Consumer3 (没有获得) 特点:消息被其中一个消费者消费后,其他消费者无法再消费 1.2 发布订阅模型(Publish-Subscribe) public class PubSubModel { // 发布订阅模型特征 class Characteristics { // 1. 一条消息可以被多个订阅者消费 // 2. 每个订阅者都能收到完整消息副本 // 3. 消息广播给所有订阅者 // 4. 适合事件通知场景 } // 传统 JMS Topic 实现 class TraditionalTopic { List<Subscriber> subscribers = new ArrayList<>(); // 发布消息 public void publish(Message msg) { // 广播给所有订阅者 for (Subscriber sub : subscribers) { sub.onMessage(msg.copy()); } } // 订阅主题 public void subscribe(Subscriber sub) { subscribers.add(sub); } } } 发布订阅模型示意图: ┌──> Consumer1 (收到消息) ┌────────┐ │ Producer ──> Topic ──┼──> Consumer2 (收到消息) └────────┘ │ └──> Consumer3 (收到消息) 特点:每个订阅者都能收到消息的完整副本 二、RocketMQ 的统一模型 2.1 核心设计:Consumer Group public class RocketMQModel { // RocketMQ 的秘密:通过 Consumer Group 实现两种模式 class ConsumerGroup { String groupName; List<Consumer> consumers; ConsumeMode mode; // 集群消费 or 广播消费 } // 关键规则: // 1. 同一 Consumer Group 内的消费者【分摊】消息(点对点) // 2. 不同 Consumer Group 都能收到【全量】消息(发布订阅) } 2.2 模式切换的魔法 public class ModelSwitching { // 场景1:实现点对点模式 public void p2pMode() { // 所有消费者使用【相同】的 Consumer Group String GROUP = "SAME_GROUP"; Consumer consumer1 = new DefaultMQPushConsumer(GROUP); Consumer consumer2 = new DefaultMQPushConsumer(GROUP); Consumer consumer3 = new DefaultMQPushConsumer(GROUP); // 结果:消息被分摊消费,每条消息只被其中一个消费 } // 场景2:实现发布订阅模式 public void pubSubMode() { // 每个消费者使用【不同】的 Consumer Group Consumer consumer1 = new DefaultMQPushConsumer("GROUP_A"); Consumer consumer2 = new DefaultMQPushConsumer("GROUP_B"); Consumer consumer3 = new DefaultMQPushConsumer("GROUP_C"); // 结果:每个 Group 都收到全量消息 } // 场景3:混合模式 public void mixedMode() { // 订单服务组(2个实例) Consumer order1 = new DefaultMQPushConsumer("ORDER_GROUP"); Consumer order2 = new DefaultMQPushConsumer("ORDER_GROUP"); // 库存服务组(3个实例) Consumer stock1 = new DefaultMQPushConsumer("STOCK_GROUP"); Consumer stock2 = new DefaultMQPushConsumer("STOCK_GROUP"); Consumer stock3 = new DefaultMQPushConsumer("STOCK_GROUP"); // 结果: // - ORDER_GROUP 内部:order1 和 order2 分摊消息 // - STOCK_GROUP 内部:stock1/2/3 分摊消息 // - 两个 GROUP 之间:都能收到全量消息 } } 三、集群消费 vs 广播消费 3.1 集群消费(默认) public class ClusterConsumeMode { public void setup() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP"); // 设置集群消费模式(默认) consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe("TOPIC", "*"); } // 集群消费特点 class ClusteringCharacteristics { // 1. 同组内消费者分摊消息 // 2. 消费进度存储在 Broker // 3. 支持消费失败重试 // 4. 适合负载均衡场景 } // 消费进度管理 class ProgressManagement { // Broker 端统一管理消费进度 // 路径:{ROCKETMQ_HOME}/store/config/consumerOffset.json { "offsetTable": { "TOPIC@GROUP": { "0": 12345, // Queue0 消费到 12345 "1": 23456, // Queue1 消费到 23456 "2": 34567, // Queue2 消费到 34567 "3": 45678 // Queue3 消费到 45678 } } } } } 集群消费示意图: Queue0 ──> Consumer1 ─┐ Queue1 ──> Consumer1 ─┤ Topic ──> ├─> GROUP_A (共享进度) Queue2 ──> Consumer2 ─┤ Queue3 ──> Consumer2 ─┘ GROUP_A 的消费进度存储在 Broker,所有成员共享 3.2 广播消费 public class BroadcastConsumeMode { public void setup() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GROUP"); // 设置广播消费模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TOPIC", "*"); } // 广播消费特点 class BroadcastingCharacteristics { // 1. 每个消费者都消费全量消息 // 2. 消费进度存储在本地 // 3. 不支持消费失败重试 // 4. 适合本地缓存更新场景 } // 消费进度管理 class LocalProgress { // 本地文件存储消费进度 // 路径:{user.home}/.rocketmq_offsets/{clientId}/{group}/offsets.json { "offsetTable": { "TOPIC": { "0": 99999, // 本实例 Queue0 进度 "1": 88888, // 本实例 Queue1 进度 "2": 77777, // 本实例 Queue2 进度 "3": 66666 // 本实例 Queue3 进度 } } } } } 广播消费示意图: ┌─> Consumer1 (消费全部消息,独立进度) │ Topic ──> ────┼─> Consumer2 (消费全部消息,独立进度) │ └─> Consumer3 (消费全部消息,独立进度) 每个 Consumer 独立维护自己的消费进度 四、消息分配策略 4.1 集群模式下的分配策略 public class AllocationStrategy { // 1. 平均分配(默认) class AllocateMessageQueueAveragely { // 假设:4个Queue,2个Consumer // Consumer1: Queue0, Queue1 // Consumer2: Queue2, Queue3 // 假设:5个Queue,2个Consumer // Consumer1: Queue0, Queue1, Queue2 // Consumer2: Queue3, Queue4 } // 2. 环形分配 class AllocateMessageQueueAveragelyByCircle { // 假设:5个Queue,2个Consumer // Consumer1: Queue0, Queue2, Queue4 // Consumer2: Queue1, Queue3 } // 3. 机房就近分配 class AllocateMessageQueueByMachineRoom { // 根据 Consumer 和 Broker 的机房位置分配 // 优先分配同机房的 Queue } // 4. 一致性Hash分配 class AllocateMessageQueueConsistentHash { // 使用一致性Hash算法 // Consumer 上下线对其他 Consumer 影响最小 } // 5. 自定义分配 class CustomAllocateStrategy implements AllocateMessageQueueStrategy { @Override public List<MessageQueue> allocate( String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { // 自定义分配逻辑 // 比如:VIP Consumer 分配更多 Queue if (isVipConsumer(currentCID)) { return allocateMoreQueues(mqAll, currentCID); } return allocateNormalQueues(mqAll, currentCID); } } } 4.2 分配策略选择 public class StrategySelection { public void selectStrategy() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 设置分配策略 consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueAveragely() ); // 根据场景选择 class ScenarioBasedSelection { // 场景1:消费者能力相同 → 平均分配 // 场景2:跨机房部署 → 机房就近 // 场景3:Consumer频繁上下线 → 一致性Hash // 场景4:Consumer能力不同 → 自定义策略 } } } 五、实战场景分析 5.1 订单处理系统 public class OrderProcessingSystem { // 需求:订单需要被多个系统处理,但每个系统只处理一次 // 订单服务(2个实例,负载均衡) class OrderService { void startConsumers() { // 使用相同 Group,实现负载均衡 Consumer instance1 = createConsumer("ORDER_SERVICE_GROUP"); Consumer instance2 = createConsumer("ORDER_SERVICE_GROUP"); // 结果:订单在两个实例间负载均衡 } } // 库存服务(3个实例,负载均衡) class StockService { void startConsumers() { // 使用相同 Group,实现负载均衡 Consumer instance1 = createConsumer("STOCK_SERVICE_GROUP"); Consumer instance2 = createConsumer("STOCK_SERVICE_GROUP"); Consumer instance3 = createConsumer("STOCK_SERVICE_GROUP"); // 结果:订单在三个实例间负载均衡 } } // 积分服务(1个实例) class PointService { void startConsumer() { Consumer instance = createConsumer("POINT_SERVICE_GROUP"); // 结果:所有订单都由这个实例处理 } } // 效果: // 1. 每个订单被 ORDER_SERVICE_GROUP 处理一次 // 2. 每个订单被 STOCK_SERVICE_GROUP 处理一次 // 3. 每个订单被 POINT_SERVICE_GROUP 处理一次 } 5.2 配置更新系统 public class ConfigUpdateSystem { // 需求:配置更新需要通知所有服务实例 class ConfigUpdateConsumer { void setup() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CONFIG_GROUP"); // 使用广播模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("CONFIG_UPDATE_TOPIC", "*"); consumer.registerMessageListener((List<MessageExt> msgs, context) -> { for (MessageExt msg : msgs) { // 更新本地配置 updateLocalConfig(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); } } // 效果: // 1. 所有服务实例都收到配置更新 // 2. 每个实例独立更新自己的配置 // 3. 某个实例重启不影响其他实例 } 5.3 日志收集系统 public class LogCollectionSystem { // 需求:大量日志需要收集,要求高吞吐 class LogCollector { void setup() { // 创建多个消费者,使用同一个 Group String GROUP = "LOG_COLLECTOR_GROUP"; // 部署10个消费者实例 for (int i = 0; i < 10; i++) { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP); consumer.setConsumeThreadMin(20); consumer.setConsumeThreadMax(30); consumer.setPullBatchSize(64); // 批量拉取 consumer.subscribe("LOG_TOPIC", "*"); consumer.start(); } // 效果: // 1. 10个实例并行消费,提高吞吐量 // 2. 自动负载均衡 // 3. 某个实例故障,其他实例自动接管 } } } 六、模型对比与选择 6.1 不同MQ产品的模型对比 消息模型对比: ┌─────────────┬──────────────┬──────────────┬──────────────┐ │ 产品 │ 模型 │ 实现方式 │ 特点 │ ├─────────────┼──────────────┼──────────────┼──────────────┤ │ RabbitMQ │ AMQP │ Exchange │ 灵活但复杂 │ │ ActiveMQ │ JMS │ Queue/Topic │ 标准但死板 │ │ Kafka │ 发布订阅 │ Partition │ 简单但单一 │ │ RocketMQ │ 统一模型 │ ConsumerGroup│ 灵活且简单 │ └─────────────┴──────────────┴──────────────┴──────────────┘ 6.2 选择建议 public class ModelSelectionGuide { // 使用集群消费(点对点)的场景 class UseClusterMode { // ✅ 任务分发 // ✅ 负载均衡 // ✅ 高可用(故障转移) // ✅ 需要消费确认和重试 void example() { // 订单处理、支付处理、数据同步 } } // 使用广播消费的场景 class UseBroadcastMode { // ✅ 本地缓存更新 // ✅ 配置刷新 // ✅ 全量数据同步 // ✅ 实时通知 void example() { // 配置更新、缓存刷新、实时推送 } } // 混合使用的场景 class UseMixedMode { // 不同业务使用不同 Group // 同一业务的多实例使用相同 Group void example() { // 电商系统:订单服务组、库存服务组、积分服务组 // 每个组内负载均衡,组间相互独立 } } } 七、高级特性 7.1 消费进度重置 public class ConsumeProgressReset { // 重置消费进度的场景 // 1. 需要重新消费历史消息 // 2. 跳过大量积压消息 // 3. 修复消费进度异常 public void resetProgress() { // 方式1:通过管理工具 // sh mqadmin resetOffset -n localhost:9876 -g GROUP -t TOPIC -o 0 // 方式2:通过代码 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // CONSUME_FROM_FIRST_OFFSET: 从最早消息开始 // CONSUME_FROM_LAST_OFFSET: 从最新消息开始 // CONSUME_FROM_TIMESTAMP: 从指定时间开始 } } 7.2 消费并行度调优 public class ConsumerConcurrency { public void tuneConcurrency() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); // 并发消费 consumer.setConsumeThreadMin(20); // 最小线程数 consumer.setConsumeThreadMax(64); // 最大线程数 // 顺序消费(单线程) consumer.setConsumeThreadMax(1); // 批量消费 consumer.setConsumeMessageBatchMaxSize(10); // 批量大小 consumer.setPullBatchSize(32); // 拉取批量 // 限流 consumer.setPullThresholdForQueue(1000); // 队列级别流控 consumer.setPullThresholdForTopic(5000); // Topic级别流控 } } 八、常见问题 8.1 消息重复消费 // 问题:同一条消息被消费多次 // 原因: // 1. 消费超时导致重试 // 2. Rebalance 导致重新分配 // 3. 消费者异常退出 // 解决方案:幂等处理 class IdempotentConsumer { private Set<String> processedMsgIds = new ConcurrentHashSet<>(); public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs) { for (MessageExt msg : msgs) { String msgId = msg.getMsgId(); // 幂等检查 if (!processedMsgIds.add(msgId)) { // 已处理过,跳过 continue; } // 处理消息 processMessage(msg); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } } 8.2 消费不均衡 // 问题:某些 Consumer 消费很多,某些很少 // 原因: // 1. Queue 数量 < Consumer 数量 // 2. 消息分布不均 // 3. Consumer 处理能力不同 // 解决方案: class BalanceSolution { void solution() { // 1. 调整 Queue 数量 // Queue数量 >= Consumer数量 // 2. 使用合适的分配策略 consumer.setAllocateMessageQueueStrategy( new AllocateMessageQueueConsistentHash() ); // 3. 监控并动态调整 // 监控每个 Consumer 的消费TPS // 动态调整线程池大小 } } 九、总结 通过本篇,我们深入理解了: ...

2025-11-13 · maneng

RocketMQ入门05:核心概念详解(上)- Producer、Consumer、Broker

引言:理解核心,掌握全局 如果把 RocketMQ 比作一个物流系统,那么: Producer 是发货方,负责打包发送包裹 Broker 是物流中心,负责存储转运包裹 Consumer 是收货方,负责签收处理包裹 理解这三个核心组件,就掌握了 RocketMQ 的精髓。 一、Producer(生产者)详解 1.1 Producer 架构 // Producer 内部结构 public class ProducerArchitecture { // 核心组件 private MQClientInstance clientInstance; // 客户端实例 private MQClientAPIImpl mqClientAPI; // 通信层 private TopicPublishInfo topicPublishInfo; // 路由信息 private SendMessageHook sendHook; // 发送钩子 private DefaultMQProducerImpl impl; // 核心实现 } Producer 内部架构: ┌──────────────────────────────────┐ │ 应用程序 │ ├──────────────────────────────────┤ │ DefaultMQProducer │ <- API 层 ├──────────────────────────────────┤ │ DefaultMQProducerImpl │ <- 核心实现 ├──────────────────────────────────┤ │ ┌──────────┬─────────────────┐ │ │ │路由管理 │ 消息发送器 │ │ │ ├──────────┼─────────────────┤ │ │ │故障规避 │ 异步发送处理 │ │ │ ├──────────┼─────────────────┤ │ │ │负载均衡 │ 事务消息处理 │ │ │ └──────────┴─────────────────┘ │ ├──────────────────────────────────┤ │ Netty 通信层 │ └──────────────────────────────────┘ 1.2 Producer 生命周期 public class ProducerLifecycle { // 1. 创建阶段 public void create() { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup"); producer.setNamesrvAddr("localhost:9876"); producer.setRetryTimesWhenSendFailed(3); // 同步发送失败重试次数 producer.setRetryTimesWhenSendAsyncFailed(2); // 异步发送失败重试次数 producer.setSendMsgTimeout(3000); // 发送超时时间 } // 2. 启动阶段 public void start() throws MQClientException { producer.start(); // 内部流程: // a. 检查配置 // b. 获取 NameServer 地址 // c. 启动定时任务(路由更新、心跳) // d. 启动消息发送线程池 // e. 注册 Producer 到所有 Broker } // 3. 运行阶段 public void running() throws Exception { // 获取路由信息 TopicRouteData routeData = producer.getDefaultMQProducerImpl() .getmQClientFactory() .getMQClientAPIImpl() .getTopicRouteInfoFromNameServer("TopicTest", 3000); // 选择消息队列 MessageQueue mq = selectMessageQueue(routeData); // 发送消息 SendResult result = producer.send(message, mq); } // 4. 关闭阶段 public void shutdown() { producer.shutdown(); // 内部流程: // a. 注销 Producer // b. 关闭网络连接 // c. 清理资源 // d. 停止定时任务 } } 1.3 消息发送流程 // 消息发送核心流程 public class SendMessageFlow { public SendResult sendMessage(Message msg) { // 1. 验证消息 Validators.checkMessage(msg, producer); // 2. 获取路由信息 TopicPublishInfo topicPublishInfo = tryToFindTopicPublishInfo(msg.getTopic()); // 3. 选择消息队列(负载均衡) MessageQueue mq = selectOneMessageQueue(topicPublishInfo, lastBrokerName); // 4. 消息发送前处理 msg.setBody(compress(msg.getBody())); // 压缩 msg.setProperty("UNIQ_KEY", MessageClientIDSetter.createUniqID()); // 唯一ID // 5. 发送消息 SendResult sendResult = sendKernelImpl( msg, mq, communicationMode, // SYNC, ASYNC, ONEWAY sendCallback, timeout ); // 6. 发送后处理(钩子) if (hasSendMessageHook()) { sendMessageHook.sendMessageAfter(context); } return sendResult; } } 1.4 负载均衡策略 public class ProducerLoadBalance { // 默认策略:轮询 + 故障规避 public MessageQueue selectOneMessageQueue(TopicPublishInfo tpInfo, String lastBrokerName) { // 启用故障规避 if (this.sendLatencyFaultEnable) { // 1. 优先选择延迟小的 Broker for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) { int index = Math.abs(sendWhichQueue.incrementAndGet()) % queueSize; MessageQueue mq = tpInfo.getMessageQueueList().get(index); // 检查 Broker 是否可用 if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) { // 避免连续发送到同一个 Broker if (null == lastBrokerName || !mq.getBrokerName().equals(lastBrokerName)) { return mq; } } } // 2. 如果都不可用,选择延迟最小的 MessageQueue mq = latencyFaultTolerance.pickOneAtLeast(); return mq; } // 不启用故障规避:简单轮询 return selectOneMessageQueue(); } // 故障延迟统计 class LatencyStats { // 根据发送耗时计算 Broker 不可用时长 // 耗时:50ms, 100ms, 550ms, 1000ms, 2000ms, 3000ms, 15000ms // 规避:0ms, 0ms, 30s, 60s, 120s, 180s, 600s private final long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L}; private final long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L}; } } 二、Consumer(消费者)详解 2.1 Consumer 架构 Consumer 内部架构: ┌──────────────────────────────────┐ │ 应用程序 │ ├──────────────────────────────────┤ │ DefaultMQPushConsumer │ <- Push 模式 API │ DefaultMQPullConsumer │ <- Pull 模式 API ├──────────────────────────────────┤ │ ┌──────────────────────────┐ │ │ │ 消费者核心实现 │ │ │ ├──────────────────────────┤ │ │ │ Rebalance 负载均衡 │ │ │ │ ProcessQueue 处理队列 │ │ │ │ ConsumeMessageService │ │ │ │ 消费进度管理 │ │ │ └──────────────────────────┘ │ ├──────────────────────────────────┤ │ PullMessage 长轮询服务 │ ├──────────────────────────────────┤ │ Netty 通信层 │ └──────────────────────────────────┘ 2.2 Push vs Pull 模式 // Push 模式(推荐) public class PushConsumerDemo { public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup"); // Push 模式本质:长轮询拉取 // 1. Consumer 主动拉取消息 // 2. 如果没有消息,Broker hold 住请求 // 3. 有新消息时,立即返回 // 4. 看起来像是 Broker 推送 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> messages, ConsumeConcurrentlyContext context) { // 处理消息 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } } // Pull 模式(特殊场景) public class PullConsumerDemo { public void consume() throws Exception { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ConsumerGroup"); consumer.start(); // 手动拉取消息 Set<MessageQueue> mqs = consumer.fetchMessageQueuesInBalance("TopicTest"); for (MessageQueue mq : mqs) { PullResult pullResult = consumer.pullBlockIfNotFound( mq, // 消息队列 null, // 过滤表达式 getOffset(mq), // 消费位点 32 // 最大拉取数量 ); // 处理消息 for (MessageExt msg : pullResult.getMsgFoundList()) { processMessage(msg); } // 更新消费位点 updateOffset(mq, pullResult.getNextBeginOffset()); } } } 2.3 消费者 Rebalance 机制 public class ConsumerRebalance { // Rebalance 触发条件: // 1. Consumer 数量变化(上线/下线) // 2. Topic 的 Queue 数量变化 // 3. 消费者订阅关系变化 public void rebalanceProcess() { // 1. 获取 Topic 的所有 Queue Set<MessageQueue> mqSet = topicSubscribeInfoTable.get(topic); // 2. 获取消费组的所有 Consumer List<String> cidAll = findConsumerIdList(topic, consumerGroup); // 3. 排序(保证所有 Consumer 看到一致的视图) Collections.sort(mqAll); Collections.sort(cidAll); // 4. 分配策略 AllocateMessageQueueStrategy strategy = getAllocateMessageQueueStrategy(); List<MessageQueue> allocateResult = strategy.allocate( consumerGroup, currentCID, mqAll, cidAll ); // 5. 更新本地队列 updateProcessQueueTableInRebalance(topic, allocateResult); } // 分配策略示例:平均分配 public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy { public List<MessageQueue> allocate( String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) { int index = cidAll.indexOf(currentCID); int mod = mqAll.size() % cidAll.size(); // 计算分配数量 int averageSize = (mod > 0 && index < mod) ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size(); // 计算起始位置 int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod; // 分配队列 return mqAll.subList(startIndex, Math.min(startIndex + averageSize, mqAll.size())); } } } 三、Broker 详解 3.1 Broker 架构 Broker 完整架构: ┌────────────────────────────────────────┐ │ 客户端请求 │ ├────────────────────────────────────────┤ │ Remoting 通信层 │ ├────────────────────────────────────────┤ │ ┌──────────────┬─────────────────┐ │ │ │ Processor │ 业务处理器 │ │ │ ├──────────────┼─────────────────┤ │ │ │SendMessage │ 接收消息 │ │ │ │PullMessage │ 拉取消息 │ │ │ │QueryMessage │ 查询消息 │ │ │ │AdminRequest │ 管理请求 │ │ │ └──────────────┴─────────────────┘ │ ├────────────────────────────────────────┤ │ Store 存储层 │ │ ┌──────────────────────────────┐ │ │ │ CommitLog(消息存储) │ │ │ ├──────────────────────────────┤ │ │ │ ConsumeQueue(消费索引) │ │ │ ├──────────────────────────────┤ │ │ │ IndexFile(消息索引) │ │ │ └──────────────────────────────┘ │ ├────────────────────────────────────────┤ │ HA 高可用模块 │ ├────────────────────────────────────────┤ │ Schedule 定时任务 │ └────────────────────────────────────────┘ 3.2 Broker 消息存储 public class BrokerStorage { // 1. CommitLog:消息主体存储 class CommitLog { // 所有消息顺序写入 // 单个文件默认 1GB // 文件名为起始偏移量 private final MappedFileQueue mappedFileQueue; public PutMessageResult putMessage(MessageExtBrokerInner msg) { // 获取当前写入文件 MappedFile mappedFile = mappedFileQueue.getLastMappedFile(); // 写入消息 AppendMessageResult result = mappedFile.appendMessage(msg, this.appendMessageCallback); // 刷盘策略 handleDiskFlush(result, msg); // 主从同步 handleHA(result, msg); return new PutMessageResult(PutMessageStatus.PUT_OK, result); } } // 2. ConsumeQueue:消费队列 class ConsumeQueue { // 存储格式:8字节 CommitLog 偏移量 + 4字节消息大小 + 8字节 Tag HashCode // 单个文件 30W 条记录,约 5.72MB private final MappedFileQueue mappedFileQueue; public void putMessagePositionInfo( long offset, // CommitLog 偏移量 int size, // 消息大小 long tagsCode, // Tag HashCode long cqOffset) { // ConsumeQueue 偏移量 // 写入索引 this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode); // 更新最大偏移量 this.maxPhysicOffset = offset + size; } } // 3. IndexFile:消息索引 class IndexFile { // 支持按 Key 或时间区间查询 // 存储格式:Header(40B) + Slot(500W*4B) + Index(2000W*20B) // 单个文件约 400MB public void putKey(String key, long phyOffset, long storeTimestamp) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; // 存储索引 // ... } } } 3.3 Broker 刷盘机制 public class FlushDiskStrategy { // 同步刷盘 class SyncFlush { public void flush() { // 写入 PageCache 后立即刷盘 // 优点:数据可靠性高 // 缺点:性能差 if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) { GroupCommitService service = (GroupCommitService) this.flushCommitLogService; // 等待刷盘完成 boolean flushResult = service.waitForFlush(timeout); if (!flushResult) { log.error("Sync flush timeout"); throw new RuntimeException("Flush timeout"); } } } } // 异步刷盘(默认) class AsyncFlush { public void flush() { // 写入 PageCache 后立即返回 // 后台线程定时刷盘 // 优点:性能高 // 缺点:可能丢失数据(机器宕机) // 定时刷盘(默认 500ms) this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { // 刷盘条件: // 1. 距上次刷盘超过 10s // 2. 脏页超过 4 页 // 3. 收到关闭信号 if (System.currentTimeMillis() - lastFlushTime > 10000 || dirtyPages > 4) { mappedFile.flush(0); lastFlushTime = System.currentTimeMillis(); dirtyPages = 0; } } }, 500, 500, TimeUnit.MILLISECONDS); } } } 3.4 Broker 主从同步 public class BrokerHA { // Master Broker class HAService { // 接受 Slave 连接 private AcceptSocketService acceptSocketService; // 管理多个 Slave 连接 private List<HAConnection> connectionList; public void start() { this.acceptSocketService.start(); this.groupTransferService.start(); this.haClient.start(); } // 等待 Slave 同步 public boolean waitForSlaveSync(long masterPutWhere) { // 同步复制模式 if (messageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SYNC_MASTER) { // 等待至少一个 Slave 同步完成 for (HAConnection conn : connectionList) { if (conn.getSlaveAckOffset() >= masterPutWhere) { return true; } } return false; } // 异步复制模式:直接返回 return true; } } // Slave Broker class HAClient { private SocketChannel socketChannel; public void run() { while (!this.isStopped()) { try { // 连接 Master connectMaster(); // 上报同步进度 reportSlaveMaxOffset(); // 接收数据 boolean ok = processReadEvent(); // 处理数据 dispatchReadRequest(); } catch (Exception e) { log.error("HAClient error", e); } } } } } 四、组件交互流程 4.1 完整的消息流转 消息完整流转过程: ┌─────────────┐ │ Producer │ └──────┬──────┘ │ 1. Send Message ↓ ┌─────────────┐ │ NameServer │ <- 2. Get Route Info └──────┬──────┘ │ 3. Route Info ↓ ┌─────────────┐ │ Broker │ │ ┌─────────┐ │ │ │CommitLog│ │ <- 4. Store Message │ └─────────┘ │ │ ┌─────────┐ │ │ │ConsumeQ │ │ <- 5. Build Index │ └─────────┘ │ └──────┬──────┘ │ 6. Pull Message ↓ ┌─────────────┐ │ Consumer │ └─────────────┘ 4.2 心跳机制 public class HeartbeatMechanism { // Producer/Consumer → Broker class ClientHeartbeat { // 每 30 秒发送一次心跳 private final long heartbeatInterval = 30000; public void sendHeartbeatToAllBroker() { HeartbeatData heartbeatData = prepareHeartbeatData(); for (Entry<String, HashMap<Long, String>> entry : brokerAddrTable.entrySet()) { String brokerName = entry.getKey(); for (Entry<Long, String> innerEntry : entry.getValue().entrySet()) { String addr = innerEntry.getValue(); // 发送心跳 this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000); } } } } // Broker → NameServer class BrokerHeartbeat { // 每 30 秒注册一次 private final long registerInterval = 30000; public void registerBrokerAll() { RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); // 注册到所有 NameServer for (String namesrvAddr : namesrvAddrList) { registerBroker(namesrvAddr, requestBody); } } } } 五、配置优化建议 5.1 Producer 优化 // 性能优化配置 producer.setSendMsgTimeout(3000); // 发送超时 producer.setCompressMsgBodyOverHowmuch(4096); // 消息压缩阈值 producer.setMaxMessageSize(4 * 1024 * 1024); // 最大消息大小 producer.setRetryTimesWhenSendFailed(2); // 重试次数 producer.setSendLatencyFaultEnable(true); // 故障规避 5.2 Consumer 优化 // 消费优化配置 consumer.setConsumeThreadMin(20); // 最小消费线程数 consumer.setConsumeThreadMax(64); // 最大消费线程数 consumer.setConsumeMessageBatchMaxSize(1); // 批量消费数量 consumer.setPullBatchSize(32); // 批量拉取数量 consumer.setPullInterval(0); // 拉取间隔 consumer.setConsumeConcurrentlyMaxSpan(2000); // 消费进度延迟阈值 5.3 Broker 优化 # broker.conf 优化配置 # 刷盘策略 flushDiskType=ASYNC_FLUSH # 同步刷盘超时时间 syncFlushTimeout=5000 # 消息最大大小 maxMessageSize=65536 # 发送线程池大小 sendMessageThreadPoolNums=128 # 拉取线程池大小 pullMessageThreadPoolNums=128 # Broker 角色 brokerRole=ASYNC_MASTER 六、总结 通过本篇,我们深入理解了: ...

2025-11-13 · maneng

RocketMQ入门01:为什么需要消息队列

引言:从一个电商下单场景说起 想象这样一个场景:用户在你的电商平台下单购买了一件商品。看似简单的一次点击,背后却触发了一系列复杂的业务流程: 扣减库存 生成订单 扣减用户积分 发送短信通知 发送邮件通知 增加商家销量统计 记录用户行为日志 触发推荐系统更新 如果采用传统的同步调用方式,会是什么样子? 一、同步通信的痛点 1.1 性能瓶颈 // 传统同步调用方式 public OrderResult createOrder(OrderRequest request) { // 核心业务:100ms Order order = orderService.create(request); // 100ms inventoryService.deduct(request.getSkuId()); // 50ms // 非核心业务:累计 500ms+ pointService.deduct(request.getUserId()); // 100ms smsService.send(request.getPhone()); // 200ms emailService.send(request.getEmail()); // 150ms statisticsService.record(order); // 50ms logService.log(order); // 30ms recommendService.update(request.getUserId()); // 100ms return new OrderResult(order); } // 总耗时:730ms+ 问题分析: 用户需要等待 730ms+ 才能看到下单结果 其中只有前 150ms 是核心业务 580ms 都在等待非核心业务完成 1.2 高耦合问题 // 每增加一个下游系统,都要修改订单服务代码 public OrderResult createOrder(OrderRequest request) { // ... 原有代码 // 新需求:增加优惠券系统通知 couponService.notify(order); // 又要改订单服务! // 新需求:增加大数据分析 bigDataService.collect(order); // 又要改订单服务! } 问题分析: ...

2025-11-13 · maneng

MySQL第一性原理:为什么我们需要数据库?

引言 “如果你不能简单地解释一件事,说明你还没有真正理解它。” —— 理查德·费曼 作为一名后端开发者,你一定写过这样的代码: @Autowired private UserRepository userRepository; public User findUser(Long id) { return userRepository.findById(id); } 这行代码背后,数据库帮你做了什么? 持久化存储:数据写入磁盘,重启不丢失 快速检索:1亿条数据中找到目标行,只需10微秒 并发控制:1万个并发请求,数据不会错乱 事务保证:转账操作要么全成功,要么全失败 故障恢复:系统崩溃后,数据可以完整恢复 但你有没有想过: 为什么不用文件存储数据?(txt、csv、json) 为什么不用内存存储数据?(HashMap、Redis) 为什么一定要用MySQL这样的关系型数据库? 今天,我们从第一性原理出发,通过对比文件、内存、MySQL三种存储方案,深度剖析数据库解决的核心问题。 本文不会教你怎么写SQL,而是回答为什么需要数据库。 一、引子:用户注册功能的三种实现 让我们从一个最简单的需求开始:用户注册功能。 需求: 用户注册时,存储用户名和密码 检查用户名是否已存在 用户登录时,验证用户名和密码 按注册时间范围查询用户 性能要求: 支持100万用户 注册和登录响应时间 < 100ms 支持1000并发 我们将用三种方式实现,看看它们的差异。 1.1 场景A:文件存储(users.txt) 实现思路:将用户数据存储在文本文件中,每行一个用户,逗号分隔字段。 /** * 用户服务 - 文件存储实现 */ public class FileUserService { private static final String FILE_PATH = "users.txt"; /** * 注册用户(追加到文件末尾) */ public void register(String username, String password) { // 1. 检查用户名是否已存在 if (exists(username)) { throw new RuntimeException("用户名已存在"); } // 2. 追加到文件 try (FileWriter fw = new FileWriter(FILE_PATH, true)) { String line = username + "," + hashPassword(password) + "," + System.currentTimeMillis() + "\n"; fw.write(line); } catch (IOException e) { throw new RuntimeException("注册失败", e); } } /** * 检查用户名是否存在(全文件扫描) */ public boolean exists(String username) { try (BufferedReader br = new BufferedReader(new FileReader(FILE_PATH))) { String line; while ((line = br.readLine()) != null) { // O(n) 全表扫描 String[] parts = line.split(","); if (parts[0].equals(username)) { return true; } } } catch (IOException e) { e.printStackTrace(); } return false; } /** * 登录验证(全文件扫描) */ public boolean login(String username, String password) { String hashedPassword = hashPassword(password); try (BufferedReader br = new BufferedReader(new FileReader(FILE_PATH))) { String line; while ((line = br.readLine()) != null) { // O(n) 全表扫描 String[] parts = line.split(","); if (parts[0].equals(username) && parts[1].equals(hashedPassword)) { return true; } } } catch (IOException e) { e.printStackTrace(); } return false; } /** * 按注册时间范围查询(全文件扫描+过滤) */ public List<User> findByRegisteredDateRange(long startTime, long endTime) { List<User> result = new ArrayList<>(); try (BufferedReader br = new BufferedReader(new FileReader(FILE_PATH))) { String line; while ((line = br.readLine()) != null) { // O(n) 全表扫描 String[] parts = line.split(","); long registeredTime = Long.parseLong(parts[2]); if (registeredTime >= startTime && registeredTime <= endTime) { result.add(new User(parts[0], parts[1], registeredTime)); } } } catch (IOException e) { e.printStackTrace(); } return result; } private String hashPassword(String password) { // 简化版,实际应该用BCrypt return Integer.toString(password.hashCode()); } } 文件内容示例: ...

2025-11-03 · maneng

MySQL第一性原理:为什么我们需要数据库?

引言 “如果你不能简单地解释一件事,说明你还没有真正理解它。” —— 理查德·费曼 作为一名后端开发者,你一定写过这样的代码: @Autowired private UserRepository userRepository; public User findUser(Long id) { return userRepository.findById(id); } 这行代码背后,数据库帮你做了什么? 持久化存储:数据写入磁盘,重启不丢失 快速检索:1亿条数据中找到目标行,只需10微秒 并发控制:1万个并发请求,数据不会错乱 事务保证:转账操作要么全成功,要么全失败 故障恢复:系统崩溃后,数据可以完整恢复 但你有没有想过: 为什么不用文件存储数据?(txt、csv、json) 为什么不用内存存储数据?(HashMap、Redis) 为什么一定要用MySQL这样的关系型数据库? 今天,我们从第一性原理出发,通过对比文件、内存、MySQL三种存储方案,深度剖析数据库解决的核心问题。 本文不会教你怎么写SQL,而是回答为什么需要数据库。 一、引子:用户注册功能的三种实现 让我们从一个最简单的需求开始:用户注册功能。 需求: 用户注册时,存储用户名和密码 检查用户名是否已存在 用户登录时,验证用户名和密码 按注册时间范围查询用户 性能要求: 支持100万用户 注册和登录响应时间 < 100ms 支持1000并发 我们将用三种方式实现,看看它们的差异。 1.1 场景A:文件存储(users.txt) 实现思路:将用户数据存储在文本文件中,每行一个用户,逗号分隔字段。 /** * 用户服务 - 文件存储实现 */ public class FileUserService { private static final String FILE_PATH = "users.txt"; /** * 注册用户(追加到文件末尾) */ public void register(String username, String password) { // 1. 检查用户名是否已存在 if (exists(username)) { throw new RuntimeException("用户名已存在"); } // 2. 追加到文件 try (FileWriter fw = new FileWriter(FILE_PATH, true)) { String line = username + "," + hashPassword(password) + "," + System.currentTimeMillis() + "\n"; fw.write(line); } catch (IOException e) { throw new RuntimeException("注册失败", e); } } /** * 检查用户名是否存在(全文件扫描) */ public boolean exists(String username) { try (BufferedReader br = new BufferedReader(new FileReader(FILE_PATH))) { String line; while ((line = br.readLine()) != null) { // O(n) 全表扫描 String[] parts = line.split(","); if (parts[0].equals(username)) { return true; } } } catch (IOException e) { e.printStackTrace(); } return false; } /** * 登录验证(全文件扫描) */ public boolean login(String username, String password) { String hashedPassword = hashPassword(password); try (BufferedReader br = new BufferedReader(new FileReader(FILE_PATH))) { String line; while ((line = br.readLine()) != null) { // O(n) 全表扫描 String[] parts = line.split(","); if (parts[0].equals(username) && parts[1].equals(hashedPassword)) { return true; } } } catch (IOException e) { e.printStackTrace(); } return false; } /** * 按注册时间范围查询(全文件扫描+过滤) */ public List<User> findByRegisteredDateRange(long startTime, long endTime) { List<User> result = new ArrayList<>(); try (BufferedReader br = new BufferedReader(new FileReader(FILE_PATH))) { String line; while ((line = br.readLine()) != null) { // O(n) 全表扫描 String[] parts = line.split(","); long registeredTime = Long.parseLong(parts[2]); if (registeredTime >= startTime && registeredTime <= endTime) { result.add(new User(parts[0], parts[1], registeredTime)); } } } catch (IOException e) { e.printStackTrace(); } return result; } private String hashPassword(String password) { // 简化版,实际应该用BCrypt return Integer.toString(password.hashCode()); } } 文件内容示例: ...

2025-11-03 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...