一、核心原则
1.1 安全第一
// 1. 线程安全的优先级
// 正确性 > 性能 > 可读性
// ❌ 错误:追求性能,忽略安全
public class UnsafeCounter {
private int count = 0;
public void increment() {
count++; // 非原子操作,线程不安全
}
}
// ✅ 正确:优先保证线程安全
public class SafeCounter {
private final AtomicInteger count = new AtomicInteger(0);
public void increment() {
count.incrementAndGet(); // 原子操作,线程安全
}
}
1.2 最小化同步范围
// ❌ 错误:同步范围过大
public synchronized void process() {
prepareData(); // 无需同步
accessSharedData(); // 需要同步
cleanup(); // 无需同步
}
// ✅ 正确:最小化同步范围
public void process() {
prepareData();
synchronized (lock) {
accessSharedData(); // 只同步必要代码
}
cleanup();
}
1.3 不变性优于锁
// ❌ 复杂:使用锁保护可变对象
public class MutablePoint {
private int x, y;
public synchronized void setX(int x) {
this.x = x;
}
public synchronized int getX() {
return x;
}
}
// ✅ 简单:使用不变对象
public final class ImmutablePoint {
private final int x, y;
public ImmutablePoint(int x, int y) {
this.x = x;
this.y = y;
}
public int getX() {
return x; // 无需同步
}
}
二、线程与线程池最佳实践
2.1 务必使用线程池
// ❌ 错误:直接创建线程
for (int i = 0; i < 10000; i++) {
new Thread(() -> task()).start(); // 线程数爆炸
}
// ✅ 正确:使用线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, 20, // 核心线程数、最大线程数
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder()
.setNameFormat("business-pool-%d")
.build(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
for (int i = 0; i < 10000; i++) {
executor.submit(() -> task());
}
2.2 线程池参数计算
// CPU密集型任务
int cpuCount = Runtime.getRuntime().availableProcessors();
int corePoolSize = cpuCount + 1;
// I/O密集型任务
int corePoolSize = cpuCount * 2;
// 混合型任务(推荐公式)
// corePoolSize = N * (1 + WT/ST)
// N = CPU核心数
// WT = 等待时间
// ST = 计算时间
int corePoolSize = cpuCount * (1 + waitTime / computeTime);
// 队列容量
int queueCapacity = peakQPS * avgExecutionTime;
// 最大线程数
int maximumPoolSize = corePoolSize * 2;
2.3 线程命名
// ✅ 使用ThreadFactory自定义线程名
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("business-pool-%d")
.setDaemon(false)
.setPriority(Thread.NORM_PRIORITY)
.setUncaughtExceptionHandler((t, e) -> {
log.error("线程异常:" + t.getName(), e);
})
.build();
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, 20, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
threadFactory,
new ThreadPoolExecutor.CallerRunsPolicy()
);
三、锁的最佳实践
3.1 锁的选择
// 1. 读多写少:StampedLock
StampedLock sl = new StampedLock();
long stamp = sl.tryOptimisticRead();
// 读取数据
if (!sl.validate(stamp)) {
stamp = sl.readLock();
try {
// 重新读取
} finally {
sl.unlockRead(stamp);
}
}
// 2. 公平性要求:ReentrantLock(true)
Lock lock = new ReentrantLock(true);
// 3. 简单场景:synchronized
synchronized (lock) {
// 业务逻辑
}
// 4. 高并发计数:LongAdder
LongAdder counter = new LongAdder();
counter.increment();
3.2 锁的粒度
// ❌ 粗粒度锁:性能差
public synchronized void process(String key, String value) {
map.put(key, value); // 所有key共用一个锁
}
// ✅ 细粒度锁:性能好
private final ConcurrentHashMap<String, Lock> lockMap = new ConcurrentHashMap<>();
public void process(String key, String value) {
Lock lock = lockMap.computeIfAbsent(key, k -> new ReentrantLock());
lock.lock();
try {
map.put(key, value); // 每个key独立锁
} finally {
lock.unlock();
}
}
3.3 避免死锁
// ✅ 固定加锁顺序
public void transfer(Account from, Account to, int amount) {
Account first, second;
if (from.getId() < to.getId()) {
first = from;
second = to;
} else {
first = to;
second = from;
}
synchronized (first) {
synchronized (second) {
// 转账逻辑
}
}
}
// ✅ 使用tryLock超时
if (lock1.tryLock(1, TimeUnit.SECONDS)) {
try {
if (lock2.tryLock(1, TimeUnit.SECONDS)) {
try {
// 业务逻辑
} finally {
lock2.unlock();
}
}
} finally {
lock1.unlock();
}
}
四、并发集合最佳实践
4.1 集合选择
| 场景 | 推荐集合 | 理由 |
|---|---|---|
| 读多写少 | CopyOnWriteArrayList | 读无锁,性能高 |
| 高并发Map | ConcurrentHashMap | 分段锁,性能好 |
| 生产者-消费者 | BlockingQueue | 自带阻塞,简化代码 |
| 优先级队列 | PriorityBlockingQueue | 支持优先级 |
| 延迟队列 | DelayQueue | 支持延迟 |
4.2 ConcurrentHashMap正确用法
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// ❌ 错误:组合操作不原子
if (!map.containsKey(key)) {
map.put(key, value); // 线程不安全
}
// ✅ 正确:使用原子方法
map.putIfAbsent(key, value);
// ✅ 原子更新
map.compute(key, (k, v) -> (v == null ? 0 : v) + 1);
// ✅ 原子替换
map.replace(key, oldValue, newValue);
五、异步编程最佳实践
5.1 CompletableFuture
// ✅ 务必指定线程池
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> {
return queryData();
}, executor) // 指定线程池
.thenApplyAsync(data -> {
return processData(data);
}, executor)
.exceptionally(ex -> {
log.error("异步任务失败", ex);
return defaultValue;
})
.thenAccept(result -> {
log.info("结果:{}", result);
});
// ✅ 并行查询
CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(() -> queryUser(), executor);
CompletableFuture<List<Order>> orderFuture = CompletableFuture.supplyAsync(() -> queryOrders(), executor);
CompletableFuture.allOf(userFuture, orderFuture)
.thenApply(v -> {
UserInfo user = userFuture.join();
List<Order> orders = orderFuture.join();
return new UserDetailDTO(user, orders);
});
5.2 超时控制
// ✅ 设置超时
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
return slowQuery();
}, executor);
try {
String result = future.get(1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
log.error("查询超时");
return defaultValue;
}
// ✅ Java 9+:orTimeout
future.orTimeout(1, TimeUnit.SECONDS)
.exceptionally(ex -> defaultValue);
六、生产环境配置
6.1 JVM参数
# 通用配置
-Xms4g -Xmx4g # 堆内存
-Xss1m # 线程栈
-XX:+UseG1GC # 使用G1垃圾回收器
-XX:MaxGCPauseMillis=200 # GC暂停时间目标
-XX:ParallelGCThreads=8 # 并行GC线程数
-XX:ConcGCThreads=2 # 并发GC线程数
# 偏向锁(低竞争场景)
-XX:+UseBiasedLocking
-XX:BiasedLockingStartupDelay=0
# 容器环境
-XX:ActiveProcessorCount=2 # 显式指定CPU核心数
-Djava.util.concurrent.ForkJoinPool.common.parallelism=2
# 监控与调试
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/log/heapdump.hprof
-Xloggc:/var/log/gc.log
-XX:+PrintGCDetails
-XX:+PrintGCDateStamps
6.2 Spring Boot配置
# application.yml
server:
tomcat:
threads:
max: 200 # 最大线程数
min-spare: 10 # 最小空闲线程数
accept-count: 100 # 等待队列长度
max-connections: 10000 # 最大连接数
spring:
task:
execution:
pool:
core-size: 10
max-size: 20
queue-capacity: 1000
thread-name-prefix: async-pool-
七、监控与告警
7.1 核心监控指标
// 1. 线程池监控
public class ThreadPoolMonitor {
@Scheduled(fixedRate = 5000)
public void monitor() {
// 活跃线程数
int activeCount = executor.getActiveCount();
// 队列大小
int queueSize = executor.getQueue().size();
// 线程利用率
double threadUtilization = (double) activeCount / executor.getPoolSize() * 100;
// 队列使用率
double queueUtilization = (double) queueSize / queueCapacity * 100;
// 告警
if (threadUtilization > 90) {
alert("线程池利用率过高:" + threadUtilization + "%");
}
if (queueUtilization > 80) {
alert("队列积压严重:" + queueUtilization + "%");
}
}
}
7.2 Prometheus + Grafana
// 使用Micrometer导出指标
MeterRegistry registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
// 监控线程池
ExecutorServiceMetrics.monitor(registry, executor, "business-pool", "app", "my-app");
// 访问指标
// http://localhost:8080/actuator/prometheus
八、问题排查清单
8.1 CPU 100%
1. top -Hp <pid> # 找到CPU高的线程ID
2. printf "%x\n" <tid> # 转换为16进制
3. jstack <pid> | grep "0x<hex-id>" # 查看堆栈
4. 定位代码 → 修复问题
8.2 死锁
1. jstack <pid> | grep "Found one Java-level deadlock"
2. 分析等待链
3. 修复锁顺序
8.3 线程泄漏
1. jstack <pid> | wc -l # 查看线程数
2. jmap -dump:file=heap.hprof <pid>
3. MAT分析Thread对象
4. 定位泄漏点
九、代码审查检查项
9.1 必查项
- 是否使用线程池?
- 线程池是否指定线程名?
- 线程池是否配置拒绝策略?
- 是否有死锁风险?
- 是否正确处理中断?
- 是否有资源泄漏?
- 异常是否被正确捕获?
- 是否使用了正确的锁?
- 锁的粒度是否合理?
- 是否使用了线程安全的集合?
9.2 性能优化项
- 读多写少场景是否使用读写锁?
- 高并发计数是否使用LongAdder?
- 是否使用了不变对象?
- 是否最小化同步范围?
- 是否避免了锁竞争?
十、核心知识图谱
10.1 并发基础
线程基础
├── 线程创建(Thread、Runnable、Callable)
├── 线程状态(NEW、RUNNABLE、BLOCKED、WAITING、TERMINATED)
├── 线程中断(interrupt、isInterrupted、interrupted)
└── 线程通信(wait、notify、notifyAll)
10.2 内存模型
JMM(Java Memory Model)
├── 可见性(volatile、synchronized)
├── 有序性(happens-before)
├── 原子性(AtomicInteger、synchronized)
└── CPU缓存(缓存一致性、伪共享)
10.3 同步工具
锁
├── synchronized(偏向锁、轻量级锁、重量级锁)
├── ReentrantLock(公平锁、非公平锁、可中断)
├── ReadWriteLock(读锁、写锁)
└── StampedLock(乐观读、悲观读、写锁)
原子类
├── AtomicInteger(CAS)
├── LongAdder(分段累加)
└── AtomicReference(对象原子操作)
并发集合
├── ConcurrentHashMap(分段锁、CAS)
├── CopyOnWriteArrayList(读写分离)
└── BlockingQueue(阻塞队列)
同步器
├── CountDownLatch(倒计时)
├── CyclicBarrier(循环栅栏)
├── Semaphore(信号量)
└── Phaser(多阶段同步)
10.4 线程池
ThreadPoolExecutor
├── 核心参数(corePoolSize、maximumPoolSize、keepAliveTime)
├── 工作队列(ArrayBlockingQueue、LinkedBlockingQueue)
├── 拒绝策略(AbortPolicy、CallerRunsPolicy)
└── 线程工厂(ThreadFactory)
特殊线程池
├── ScheduledThreadPoolExecutor(定时任务)
├── ForkJoinPool(工作窃取)
└── CompletableFuture(异步编程)
十一、学习路径总结
11.1 基础篇(1-10篇)
- 为什么需要并发
- 进程与线程
- 线程生命周期
- 线程创建方式
- 线程中断机制
- 线程通信
- 线程安全问题
- 可见性、有序性、原子性
- CPU缓存与多核
- JMM与happens-before
11.2 原理篇(11-18篇)
- happens-before规则
- volatile原理
- synchronized原理
- synchronized优化
- 原子类AtomicInteger
- CAS与ABA
- Lock与ReentrantLock
- ReadWriteLock读写锁
11.3 工具篇(19-28篇)
- 线程池原理
- ThreadPoolExecutor详解
- 线程池最佳实践
- BlockingQueue阻塞队列
- ConcurrentHashMap
- CountDownLatch与CyclicBarrier
- Semaphore与Exchanger
- Phaser多阶段同步
- CopyOnWriteArrayList
- CompletableFuture异步编程
11.4 实战篇(29-33篇)
- CompletableFuture实战
- ForkJoinPool工作窃取
- 无锁编程与LongAdder
- StampedLock性能优化
- 并发设计模式
11.5 排查篇(34-39篇)
- 死锁的产生与排查
- 线程池监控与调优
- JVM线程相关参数
- 并发问题排查工具
- JMH性能测试
- 生产级最佳实践(本篇)
总结
Java并发编程是一个复杂但重要的技术领域,掌握并发编程需要:
核心知识:
- ✅ 线程基础:创建、状态、中断、通信
- ✅ 内存模型:可见性、有序性、原子性
- ✅ 同步机制:synchronized、Lock、原子类
- ✅ 并发工具:线程池、并发集合、同步器
实战能力:
- ✅ 线程池:正确配置参数,监控指标
- ✅ 锁选择:根据场景选择合适的锁
- ✅ 异步编程:CompletableFuture、ForkJoinPool
- ✅ 问题排查:死锁、CPU 100%、线程泄漏
生产经验:
- ✅ 安全第一:正确性优于性能
- ✅ 最小化同步:减少锁竞争
- ✅ 不变性:优先使用不变对象
- ✅ 监控告警:持续监控,及时告警
最后建议:
- 📚 持续学习:并发编程是一个不断发展的领域
- 🧪 多实践:通过实战项目积累经验
- 🔍 深入源码:理解JDK并发工具的实现原理
- 📊 性能测试:使用JMH验证优化效果
**恭喜你!**完成了Java并发编程从入门到精通的完整学习之旅!🎉