一、为什么要监控线程池?
1.1 常见线程池问题
// 线程池配置不当,导致的问题:
// 1. 线程数过小:任务堆积
ThreadPoolExecutor pool = new ThreadPoolExecutor(
2, 2, // ❌ 核心线程数太少
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000)
);
// 结果:任务大量排队,响应慢
// 2. 队列无界:内存溢出
ThreadPoolExecutor pool = new ThreadPoolExecutor(
10, 20,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>() // ❌ 无界队列
);
// 结果:任务无限堆积,OOM
// 3. 拒绝策略不当:任务丢失
ThreadPoolExecutor pool = new ThreadPoolExecutor(
10, 20,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
new ThreadPoolExecutor.DiscardPolicy() // ❌ 静默丢弃
);
// 结果:任务丢失,业务异常
监控目的:
- ✅ 发现性能瓶颈
- ✅ 预防资源耗尽
- ✅ 优化参数配置
- ✅ 及时告警
二、核心监控指标
2.1 线程池状态指标
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, 20, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000)
);
// 1. 核心线程数
int corePoolSize = executor.getCorePoolSize();
// 2. 最大线程数
int maximumPoolSize = executor.getMaximumPoolSize();
// 3. 当前线程数
int poolSize = executor.getPoolSize();
// 4. 活跃线程数(正在执行任务的线程数)
int activeCount = executor.getActiveCount();
// 5. 历史最大线程数
int largestPoolSize = executor.getLargestPoolSize();
// 6. 任务总数
long taskCount = executor.getTaskCount();
// 7. 已完成任务数
long completedTaskCount = executor.getCompletedTaskCount();
// 8. 队列中任务数
int queueSize = executor.getQueue().size();
// 9. 队列剩余容量
int remainingCapacity = executor.getQueue().remainingCapacity();
System.out.println("核心线程数:" + corePoolSize);
System.out.println("最大线程数:" + maximumPoolSize);
System.out.println("当前线程数:" + poolSize);
System.out.println("活跃线程数:" + activeCount);
System.out.println("队列中任务数:" + queueSize);
System.out.println("已完成任务数:" + completedTaskCount);
2.2 关键指标说明
| 指标 | 说明 | 正常范围 | 异常信号 |
|---|---|---|---|
| 活跃线程数/当前线程数 | 线程利用率 | 60%-80% | >90%:线程不足 |
| 队列中任务数 | 任务积压情况 | <50% | >80%:任务堆积 |
| 任务完成速率 | 处理能力 | 稳定 | 持续下降:性能问题 |
| 拒绝任务数 | 容量溢出 | 0 | >0:需要扩容 |
三、线程池监控实战
3.1 自定义监控线程池
public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
private final AtomicLong totalExecutionTime = new AtomicLong(0);
private final AtomicLong rejectedTaskCount = new AtomicLong(0);
public MonitoredThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
new MonitoredRejectedExecutionHandler());
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
// 任务执行前的逻辑
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
// 任务执行后的逻辑
if (t != null) {
System.err.println("任务执行异常:" + t.getMessage());
}
// 统计执行时间(需要在Runnable中记录)
}
@Override
protected void terminated() {
super.terminated();
System.out.println("线程池已关闭");
printStatistics();
}
// 自定义拒绝策略:记录拒绝次数
private class MonitoredRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
rejectedTaskCount.incrementAndGet();
System.err.println("任务被拒绝!拒绝次数:" + rejectedTaskCount.get());
// 告警逻辑
if (rejectedTaskCount.get() % 100 == 0) {
System.err.println("⚠️ 告警:已拒绝 " + rejectedTaskCount.get() + " 个任务!");
}
// 降级逻辑:调用者线程执行
r.run();
}
}
// 打印统计信息
public void printStatistics() {
System.out.println("========== 线程池统计 ==========");
System.out.println("核心线程数:" + getCorePoolSize());
System.out.println("最大线程数:" + getMaximumPoolSize());
System.out.println("当前线程数:" + getPoolSize());
System.out.println("活跃线程数:" + getActiveCount());
System.out.println("历史最大线程数:" + getLargestPoolSize());
System.out.println("任务总数:" + getTaskCount());
System.out.println("已完成任务数:" + getCompletedTaskCount());
System.out.println("队列中任务数:" + getQueue().size());
System.out.println("拒绝任务数:" + rejectedTaskCount.get());
System.out.println("================================");
}
}
3.2 定时监控
public class ThreadPoolMonitor {
public static void startMonitoring(ThreadPoolExecutor executor) {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
System.out.println("========== 线程池监控 ==========");
System.out.println("当前时间:" + LocalDateTime.now());
System.out.println("核心线程数:" + executor.getCorePoolSize());
System.out.println("最大线程数:" + executor.getMaximumPoolSize());
System.out.println("当前线程数:" + executor.getPoolSize());
System.out.println("活跃线程数:" + executor.getActiveCount());
System.out.println("队列大小:" + executor.getQueue().size());
System.out.println("已完成任务:" + executor.getCompletedTaskCount());
// 计算线程利用率
double threadUtilization = (double) executor.getActiveCount() / executor.getPoolSize() * 100;
System.out.printf("线程利用率:%.2f%%\n", threadUtilization);
// 计算队列使用率
BlockingQueue<Runnable> queue = executor.getQueue();
int queueCapacity = queue.size() + queue.remainingCapacity();
double queueUtilization = (double) queue.size() / queueCapacity * 100;
System.out.printf("队列使用率:%.2f%%\n", queueUtilization);
// 告警逻辑
if (threadUtilization > 90) {
System.err.println("⚠️ 告警:线程利用率过高!");
}
if (queueUtilization > 80) {
System.err.println("⚠️ 告警:队列积压严重!");
}
System.out.println("================================\n");
}, 0, 5, TimeUnit.SECONDS); // 每5秒监控一次
}
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, 20, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100)
);
// 启动监控
startMonitoring(executor);
// 提交任务...
}
}
四、线程池参数调优
4.1 核心线程数调优
// CPU密集型任务
int cpuCount = Runtime.getRuntime().availableProcessors();
int corePoolSize = cpuCount + 1; // N + 1
// I/O密集型任务
int corePoolSize = cpuCount * 2; // 2N
// 混合型任务(推荐公式)
// corePoolSize = N * (1 + WT/ST)
// N = CPU核心数
// WT = 等待时间(I/O时间)
// ST = 计算时间(CPU时间)
// 例如:
// CPU核心数:8
// 等待时间:90ms(I/O)
// 计算时间:10ms(CPU)
// corePoolSize = 8 * (1 + 90/10) = 8 * 10 = 80
int corePoolSize = cpuCount * (1 + waitTime / computeTime);
4.2 队列选择与容量
// 1. LinkedBlockingQueue(无界队列)
// 优点:无限容量
// 缺点:可能OOM
// 适用:内存充足,任务数可控
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
// 2. LinkedBlockingQueue(有界队列)
// 优点:防止OOM
// 缺点:任务过多会拒绝
// 适用:需要控制内存
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1000);
// 3. ArrayBlockingQueue(有界队列)
// 优点:数组实现,性能好
// 缺点:容量固定
// 适用:高性能场景
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);
// 4. SynchronousQueue(无缓冲队列)
// 优点:直接交付,吞吐量高
// 缺点:无缓冲,容易拒绝
// 适用:任务执行快,maximumPoolSize大
BlockingQueue<Runnable> queue = new SynchronousQueue<>();
// 5. PriorityBlockingQueue(优先级队列)
// 优点:支持优先级
// 缺点:性能开销大
// 适用:需要优先级调度
BlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();
队列容量计算:
// 队列容量 = 峰值QPS * 平均任务执行时间
// 例如:
// 峰值QPS:1000
// 平均执行时间:100ms = 0.1s
// 队列容量 = 1000 * 0.1 = 100
int queueCapacity = peakQPS * avgExecutionTime;
4.3 拒绝策略选择
// 1. AbortPolicy(默认):抛出异常
// 适用:需要感知任务丢失
new ThreadPoolExecutor.AbortPolicy();
// 2. CallerRunsPolicy:调用者线程执行
// 适用:任务不能丢失,可以降低提交速度
new ThreadPoolExecutor.CallerRunsPolicy();
// 3. DiscardPolicy:静默丢弃
// 适用:任务可丢失,不需要感知
new ThreadPoolExecutor.DiscardPolicy();
// 4. DiscardOldestPolicy:丢弃最老的任务
// 适用:新任务优先级高
new ThreadPoolExecutor.DiscardOldestPolicy();
// 5. 自定义拒绝策略
RejectedExecutionHandler handler = (r, executor) -> {
// 记录日志
System.err.println("任务被拒绝:" + r);
// 告警
sendAlert("线程池任务拒绝");
// 降级逻辑:异步重试
retryAsync(r);
};
五、实战调优案例
5.1 案例1:高并发API接口
场景:
- QPS:10000
- 平均响应时间:50ms
- I/O密集型(数据库查询)
调优前:
ThreadPoolExecutor executor = new ThreadPoolExecutor(
10, 10, // ❌ 线程数太少
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100) // ❌ 队列太小
);
// 问题:
// - 线程数不足:大量任务排队
// - 队列太小:频繁拒绝任务
// - 响应时间:平均 500ms(慢10倍)
调优后:
int cpuCount = Runtime.getRuntime().availableProcessors(); // 假设8核
int corePoolSize = cpuCount * 2; // 16(I/O密集型)
int maximumPoolSize = cpuCount * 4; // 32
int queueCapacity = 10000 * 50 / 1000; // 500
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueCapacity),
new ThreadPoolExecutor.CallerRunsPolicy() // 任务不能丢失
);
// 结果:
// - 线程数充足:响应时间降至 60ms
// - 队列容量合理:很少拒绝任务
// - 性能提升:8倍
5.2 案例2:定时任务
场景:
- 每秒执行100个定时任务
- 每个任务耗时:2秒
- CPU密集型
调优前:
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);
// 问题:
// - 线程数固定10个
// - 每秒需要处理100个任务
// - 实际并发:10个,远不够
// - 任务延迟严重
调优后:
int cpuCount = Runtime.getRuntime().availableProcessors();
int corePoolSize = cpuCount + 1; // CPU密集型
// 方案1:增加线程数(如果任务数固定)
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(100);
// 方案2:任务分桶,减少并发
// 将100个任务分成10组,每组10个,串行执行
int bucketCount = 10;
List<ScheduledExecutorService> schedulers = new ArrayList<>();
for (int i = 0; i < bucketCount; i++) {
schedulers.add(Executors.newScheduledThreadPool(corePoolSize));
}
// 将任务分配到不同的调度器
for (int i = 0; i < 100; i++) {
int bucketIndex = i % bucketCount;
schedulers.get(bucketIndex).scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
}
// 结果:
// - 并发控制合理
// - 任务延迟降低
// - CPU利用率提升
六、监控工具与平台
6.1 Micrometer + Prometheus
// 使用Micrometer监控线程池
MeterRegistry registry = new SimpleMeterRegistry();
// 注册线程池指标
ExecutorServiceMetrics.monitor(registry, executor, "my-thread-pool", "app", "my-app");
// 指标会自动导出到Prometheus
// - executor_pool_size:当前线程数
// - executor_active:活跃线程数
// - executor_queued:队列中任务数
// - executor_completed:已完成任务数
6.2 Spring Boot Actuator
// application.yml
management:
endpoints:
web:
exposure:
include: metrics,health
metrics:
tags:
application: my-app
// 访问端点
// GET /actuator/metrics/executor.pool.size
// GET /actuator/metrics/executor.active
// GET /actuator/metrics/executor.queued
6.3 Arthas
# 查看线程池信息
thread
# 查看线程详情
thread <thread-id>
# 监控线程池
monitor -c 5 java.util.concurrent.ThreadPoolExecutor execute
七、核心要点总结
7.1 监控核心指标
| 指标 | 计算公式 | 告警阈值 |
|---|---|---|
| 线程利用率 | 活跃线程数 / 当前线程数 | >90% |
| 队列使用率 | 队列大小 / 队列容量 | >80% |
| 任务拒绝率 | 拒绝任务数 / 总任务数 | >1% |
| 任务完成速率 | 完成任务数 / 时间 | 持续下降 |
7.2 调优参数公式
// 1. CPU密集型
corePoolSize = N + 1; // N = CPU核心数
// 2. I/O密集型
corePoolSize = N * 2;
// 3. 混合型(推荐)
corePoolSize = N * (1 + WT/ST); // WT=等待时间,ST=计算时间
// 4. 队列容量
queueCapacity = 峰值QPS * 平均执行时间;
// 5. 最大线程数
maximumPoolSize = corePoolSize * 2; // 一般为核心线程数的2倍
7.3 调优步骤
- 监控:收集线程池指标
- 分析:找出性能瓶颈
- 调整:修改参数配置
- 验证:观察效果
- 优化:持续迭代
总结
线程池监控与调优是保障系统稳定运行的关键:
核心监控指标:
- ✅ 线程利用率:评估线程数是否充足
- ✅ 队列使用率:评估队列容量是否合理
- ✅ 拒绝任务数:评估系统是否过载
调优关键参数:
- 核心线程数:根据任务类型计算
- 队列容量:根据QPS和执行时间计算
- 拒绝策略:根据业务特性选择
实战建议:
- 定时监控:每5-10秒采集一次指标
- 告警机制:关键指标超阈值立即告警
- 压测验证:调优后务必压测验证
- 持续优化:根据监控数据持续调整
下一篇预告:我们将深入JVM线程相关参数,学习如何从JVM层面优化并发性能!