生产级最佳实践:Java并发编程完整指南
一、核心原则 1.1 安全第一 // 1. 线程安全的优先级 // 正确性 > 性能 > 可读性 // ❌ 错误:追求性能,忽略安全 public class UnsafeCounter { private int count = 0; public void increment() { count++; // 非原子操作,线程不安全 } } // ✅ 正确:优先保证线程安全 public class SafeCounter { private final AtomicInteger count = new AtomicInteger(0); public void increment() { count.incrementAndGet(); // 原子操作,线程安全 } } 1.2 最小化同步范围 // ❌ 错误:同步范围过大 public synchronized void process() { prepareData(); // 无需同步 accessSharedData(); // 需要同步 cleanup(); // 无需同步 } // ✅ 正确:最小化同步范围 public void process() { prepareData(); synchronized (lock) { accessSharedData(); // 只同步必要代码 } cleanup(); } 1.3 不变性优于锁 // ❌ 复杂:使用锁保护可变对象 public class MutablePoint { private int x, y; public synchronized void setX(int x) { this.x = x; } public synchronized int getX() { return x; } } // ✅ 简单:使用不变对象 public final class ImmutablePoint { private final int x, y; public ImmutablePoint(int x, int y) { this.x = x; this.y = y; } public int getX() { return x; // 无需同步 } } 二、线程与线程池最佳实践 2.1 务必使用线程池 // ❌ 错误:直接创建线程 for (int i = 0; i < 10000; i++) { new Thread(() -> task()).start(); // 线程数爆炸 } // ✅ 正确:使用线程池 ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, // 核心线程数、最大线程数 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), new ThreadFactoryBuilder() .setNameFormat("business-pool-%d") .build(), new ThreadPoolExecutor.CallerRunsPolicy() ); for (int i = 0; i < 10000; i++) { executor.submit(() -> task()); } 2.2 线程池参数计算 // CPU密集型任务 int cpuCount = Runtime.getRuntime().availableProcessors(); int corePoolSize = cpuCount + 1; // I/O密集型任务 int corePoolSize = cpuCount * 2; // 混合型任务(推荐公式) // corePoolSize = N * (1 + WT/ST) // N = CPU核心数 // WT = 等待时间 // ST = 计算时间 int corePoolSize = cpuCount * (1 + waitTime / computeTime); // 队列容量 int queueCapacity = peakQPS * avgExecutionTime; // 最大线程数 int maximumPoolSize = corePoolSize * 2; 2.3 线程命名 // ✅ 使用ThreadFactory自定义线程名 ThreadFactory threadFactory = new ThreadFactoryBuilder() .setNameFormat("business-pool-%d") .setDaemon(false) .setPriority(Thread.NORM_PRIORITY) .setUncaughtExceptionHandler((t, e) -> { log.error("线程异常:" + t.getName(), e); }) .build(); ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000), threadFactory, new ThreadPoolExecutor.CallerRunsPolicy() ); 三、锁的最佳实践 3.1 锁的选择 // 1. 读多写少:StampedLock StampedLock sl = new StampedLock(); long stamp = sl.tryOptimisticRead(); // 读取数据 if (!sl.validate(stamp)) { stamp = sl.readLock(); try { // 重新读取 } finally { sl.unlockRead(stamp); } } // 2. 公平性要求:ReentrantLock(true) Lock lock = new ReentrantLock(true); // 3. 简单场景:synchronized synchronized (lock) { // 业务逻辑 } // 4. 高并发计数:LongAdder LongAdder counter = new LongAdder(); counter.increment(); 3.2 锁的粒度 // ❌ 粗粒度锁:性能差 public synchronized void process(String key, String value) { map.put(key, value); // 所有key共用一个锁 } // ✅ 细粒度锁:性能好 private final ConcurrentHashMap<String, Lock> lockMap = new ConcurrentHashMap<>(); public void process(String key, String value) { Lock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock()); lock.lock(); try { map.put(key, value); // 每个key独立锁 } finally { lock.unlock(); } } 3.3 避免死锁 // ✅ 固定加锁顺序 public void transfer(Account from, Account to, int amount) { Account first, second; if (from.getId() < to.getId()) { first = from; second = to; } else { first = to; second = from; } synchronized (first) { synchronized (second) { // 转账逻辑 } } } // ✅ 使用tryLock超时 if (lock1.tryLock(1, TimeUnit.SECONDS)) { try { if (lock2.tryLock(1, TimeUnit.SECONDS)) { try { // 业务逻辑 } finally { lock2.unlock(); } } } finally { lock1.unlock(); } } 四、并发集合最佳实践 4.1 集合选择 场景 推荐集合 理由 读多写少 CopyOnWriteArrayList 读无锁,性能高 高并发Map ConcurrentHashMap 分段锁,性能好 生产者-消费者 BlockingQueue 自带阻塞,简化代码 优先级队列 PriorityBlockingQueue 支持优先级 延迟队列 DelayQueue 支持延迟 4.2 ConcurrentHashMap正确用法 ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>(); // ❌ 错误:组合操作不原子 if (!map.containsKey(key)) { map.put(key, value); // 线程不安全 } // ✅ 正确:使用原子方法 map.putIfAbsent(key, value); // ✅ 原子更新 map.compute(key, (k, v) -> (v == null ? 0 : v) + 1); // ✅ 原子替换 map.replace(key, oldValue, newValue); 五、异步编程最佳实践 5.1 CompletableFuture // ✅ 务必指定线程池 ExecutorService executor = Executors.newFixedThreadPool(10); CompletableFuture.supplyAsync(() -> { return queryData(); }, executor) // 指定线程池 .thenApplyAsync(data -> { return processData(data); }, executor) .exceptionally(ex -> { log.error("异步任务失败", ex); return defaultValue; }) .thenAccept(result -> { log.info("结果:{}", result); }); // ✅ 并行查询 CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(() -> queryUser(), executor); CompletableFuture<List<Order>> orderFuture = CompletableFuture.supplyAsync(() -> queryOrders(), executor); CompletableFuture.allOf(userFuture, orderFuture) .thenApply(v -> { UserInfo user = userFuture.join(); List<Order> orders = orderFuture.join(); return new UserDetailDTO(user, orders); }); 5.2 超时控制 // ✅ 设置超时 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return slowQuery(); }, executor); try { String result = future.get(1, TimeUnit.SECONDS); } catch (TimeoutException e) { log.error("查询超时"); return defaultValue; } // ✅ Java 9+:orTimeout future.orTimeout(1, TimeUnit.SECONDS) .exceptionally(ex -> defaultValue); 六、生产环境配置 6.1 JVM参数 # 通用配置 -Xms4g -Xmx4g # 堆内存 -Xss1m # 线程栈 -XX:+UseG1GC # 使用G1垃圾回收器 -XX:MaxGCPauseMillis=200 # GC暂停时间目标 -XX:ParallelGCThreads=8 # 并行GC线程数 -XX:ConcGCThreads=2 # 并发GC线程数 # 偏向锁(低竞争场景) -XX:+UseBiasedLocking -XX:BiasedLockingStartupDelay=0 # 容器环境 -XX:ActiveProcessorCount=2 # 显式指定CPU核心数 -Djava.util.concurrent.ForkJoinPool.common.parallelism=2 # 监控与调试 -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/var/log/heapdump.hprof -Xloggc:/var/log/gc.log -XX:+PrintGCDetails -XX:+PrintGCDateStamps 6.2 Spring Boot配置 # application.yml server: tomcat: threads: max: 200 # 最大线程数 min-spare: 10 # 最小空闲线程数 accept-count: 100 # 等待队列长度 max-connections: 10000 # 最大连接数 spring: task: execution: pool: core-size: 10 max-size: 20 queue-capacity: 1000 thread-name-prefix: async-pool- 七、监控与告警 7.1 核心监控指标 // 1. 线程池监控 public class ThreadPoolMonitor { @Scheduled(fixedRate = 5000) public void monitor() { // 活跃线程数 int activeCount = executor.getActiveCount(); // 队列大小 int queueSize = executor.getQueue().size(); // 线程利用率 double threadUtilization = (double) activeCount / executor.getPoolSize() * 100; // 队列使用率 double queueUtilization = (double) queueSize / queueCapacity * 100; // 告警 if (threadUtilization > 90) { alert("线程池利用率过高:" + threadUtilization + "%"); } if (queueUtilization > 80) { alert("队列积压严重:" + queueUtilization + "%"); } } } 7.2 Prometheus + Grafana // 使用Micrometer导出指标 MeterRegistry registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); // 监控线程池 ExecutorServiceMetrics.monitor(registry, executor, "business-pool", "app", "my-app"); // 访问指标 // http://localhost:8080/actuator/prometheus 八、问题排查清单 8.1 CPU 100% 1. top -Hp <pid> # 找到CPU高的线程ID 2. printf "%x\n" <tid> # 转换为16进制 3. jstack <pid> | grep "0x<hex-id>" # 查看堆栈 4. 定位代码 → 修复问题 8.2 死锁 1. jstack <pid> | grep "Found one Java-level deadlock" 2. 分析等待链 3. 修复锁顺序 8.3 线程泄漏 1. jstack <pid> | wc -l # 查看线程数 2. jmap -dump:file=heap.hprof <pid> 3. MAT分析Thread对象 4. 定位泄漏点 九、代码审查检查项 9.1 必查项 是否使用线程池? 线程池是否指定线程名? 线程池是否配置拒绝策略? 是否有死锁风险? 是否正确处理中断? 是否有资源泄漏? 异常是否被正确捕获? 是否使用了正确的锁? 锁的粒度是否合理? 是否使用了线程安全的集合? 9.2 性能优化项 读多写少场景是否使用读写锁? 高并发计数是否使用LongAdder? 是否使用了不变对象? 是否最小化同步范围? 是否避免了锁竞争? 十、核心知识图谱 10.1 并发基础 线程基础 ├── 线程创建(Thread、Runnable、Callable) ├── 线程状态(NEW、RUNNABLE、BLOCKED、WAITING、TERMINATED) ├── 线程中断(interrupt、isInterrupted、interrupted) └── 线程通信(wait、notify、notifyAll) 10.2 内存模型 JMM(Java Memory Model) ├── 可见性(volatile、synchronized) ├── 有序性(happens-before) ├── 原子性(AtomicInteger、synchronized) └── CPU缓存(缓存一致性、伪共享) 10.3 同步工具 锁 ├── synchronized(偏向锁、轻量级锁、重量级锁) ├── ReentrantLock(公平锁、非公平锁、可中断) ├── ReadWriteLock(读锁、写锁) └── StampedLock(乐观读、悲观读、写锁) 原子类 ├── AtomicInteger(CAS) ├── LongAdder(分段累加) └── AtomicReference(对象原子操作) 并发集合 ├── ConcurrentHashMap(分段锁、CAS) ├── CopyOnWriteArrayList(读写分离) └── BlockingQueue(阻塞队列) 同步器 ├── CountDownLatch(倒计时) ├── CyclicBarrier(循环栅栏) ├── Semaphore(信号量) └── Phaser(多阶段同步) 10.4 线程池 ThreadPoolExecutor ├── 核心参数(corePoolSize、maximumPoolSize、keepAliveTime) ├── 工作队列(ArrayBlockingQueue、LinkedBlockingQueue) ├── 拒绝策略(AbortPolicy、CallerRunsPolicy) └── 线程工厂(ThreadFactory) 特殊线程池 ├── ScheduledThreadPoolExecutor(定时任务) ├── ForkJoinPool(工作窃取) └── CompletableFuture(异步编程) 十一、学习路径总结 11.1 基础篇(1-10篇) 为什么需要并发 进程与线程 线程生命周期 线程创建方式 线程中断机制 线程通信 线程安全问题 可见性、有序性、原子性 CPU缓存与多核 JMM与happens-before 11.2 原理篇(11-18篇) happens-before规则 volatile原理 synchronized原理 synchronized优化 原子类AtomicInteger CAS与ABA Lock与ReentrantLock ReadWriteLock读写锁 11.3 工具篇(19-28篇) 线程池原理 ThreadPoolExecutor详解 线程池最佳实践 BlockingQueue阻塞队列 ConcurrentHashMap CountDownLatch与CyclicBarrier Semaphore与Exchanger Phaser多阶段同步 CopyOnWriteArrayList CompletableFuture异步编程 11.4 实战篇(29-33篇) CompletableFuture实战 ForkJoinPool工作窃取 无锁编程与LongAdder StampedLock性能优化 并发设计模式 11.5 排查篇(34-39篇) 死锁的产生与排查 线程池监控与调优 JVM线程相关参数 并发问题排查工具 JMH性能测试 生产级最佳实践(本篇) 总结 Java并发编程是一个复杂但重要的技术领域,掌握并发编程需要: ...