一、为什么要监控线程池?

1.1 常见线程池问题

// 线程池配置不当,导致的问题:

// 1. 线程数过小:任务堆积
ThreadPoolExecutor pool = new ThreadPoolExecutor(
    2, 2,  // ❌ 核心线程数太少
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000)
);
// 结果:任务大量排队,响应慢

// 2. 队列无界:内存溢出
ThreadPoolExecutor pool = new ThreadPoolExecutor(
    10, 20,
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>()  // ❌ 无界队列
);
// 结果:任务无限堆积,OOM

// 3. 拒绝策略不当:任务丢失
ThreadPoolExecutor pool = new ThreadPoolExecutor(
    10, 20,
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100),
    new ThreadPoolExecutor.DiscardPolicy()  // ❌ 静默丢弃
);
// 结果:任务丢失,业务异常

监控目的

  • ✅ 发现性能瓶颈
  • ✅ 预防资源耗尽
  • ✅ 优化参数配置
  • ✅ 及时告警

二、核心监控指标

2.1 线程池状态指标

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    10, 20, 60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(1000)
);

// 1. 核心线程数
int corePoolSize = executor.getCorePoolSize();

// 2. 最大线程数
int maximumPoolSize = executor.getMaximumPoolSize();

// 3. 当前线程数
int poolSize = executor.getPoolSize();

// 4. 活跃线程数(正在执行任务的线程数)
int activeCount = executor.getActiveCount();

// 5. 历史最大线程数
int largestPoolSize = executor.getLargestPoolSize();

// 6. 任务总数
long taskCount = executor.getTaskCount();

// 7. 已完成任务数
long completedTaskCount = executor.getCompletedTaskCount();

// 8. 队列中任务数
int queueSize = executor.getQueue().size();

// 9. 队列剩余容量
int remainingCapacity = executor.getQueue().remainingCapacity();

System.out.println("核心线程数:" + corePoolSize);
System.out.println("最大线程数:" + maximumPoolSize);
System.out.println("当前线程数:" + poolSize);
System.out.println("活跃线程数:" + activeCount);
System.out.println("队列中任务数:" + queueSize);
System.out.println("已完成任务数:" + completedTaskCount);

2.2 关键指标说明

指标说明正常范围异常信号
活跃线程数/当前线程数线程利用率60%-80%>90%:线程不足
队列中任务数任务积压情况<50%>80%:任务堆积
任务完成速率处理能力稳定持续下降:性能问题
拒绝任务数容量溢出0>0:需要扩容

三、线程池监控实战

3.1 自定义监控线程池

public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
    private final AtomicLong totalExecutionTime = new AtomicLong(0);
    private final AtomicLong rejectedTaskCount = new AtomicLong(0);

    public MonitoredThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
                                       long keepAliveTime, TimeUnit unit,
                                       BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
              new MonitoredRejectedExecutionHandler());
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        // 任务执行前的逻辑
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        // 任务执行后的逻辑
        if (t != null) {
            System.err.println("任务执行异常:" + t.getMessage());
        }

        // 统计执行时间(需要在Runnable中记录)
    }

    @Override
    protected void terminated() {
        super.terminated();
        System.out.println("线程池已关闭");
        printStatistics();
    }

    // 自定义拒绝策略:记录拒绝次数
    private class MonitoredRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            rejectedTaskCount.incrementAndGet();
            System.err.println("任务被拒绝!拒绝次数:" + rejectedTaskCount.get());

            // 告警逻辑
            if (rejectedTaskCount.get() % 100 == 0) {
                System.err.println("⚠️ 告警:已拒绝 " + rejectedTaskCount.get() + " 个任务!");
            }

            // 降级逻辑:调用者线程执行
            r.run();
        }
    }

    // 打印统计信息
    public void printStatistics() {
        System.out.println("========== 线程池统计 ==========");
        System.out.println("核心线程数:" + getCorePoolSize());
        System.out.println("最大线程数:" + getMaximumPoolSize());
        System.out.println("当前线程数:" + getPoolSize());
        System.out.println("活跃线程数:" + getActiveCount());
        System.out.println("历史最大线程数:" + getLargestPoolSize());
        System.out.println("任务总数:" + getTaskCount());
        System.out.println("已完成任务数:" + getCompletedTaskCount());
        System.out.println("队列中任务数:" + getQueue().size());
        System.out.println("拒绝任务数:" + rejectedTaskCount.get());
        System.out.println("================================");
    }
}

3.2 定时监控

public class ThreadPoolMonitor {

    public static void startMonitoring(ThreadPoolExecutor executor) {
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

        scheduler.scheduleAtFixedRate(() -> {
            System.out.println("========== 线程池监控 ==========");
            System.out.println("当前时间:" + LocalDateTime.now());
            System.out.println("核心线程数:" + executor.getCorePoolSize());
            System.out.println("最大线程数:" + executor.getMaximumPoolSize());
            System.out.println("当前线程数:" + executor.getPoolSize());
            System.out.println("活跃线程数:" + executor.getActiveCount());
            System.out.println("队列大小:" + executor.getQueue().size());
            System.out.println("已完成任务:" + executor.getCompletedTaskCount());

            // 计算线程利用率
            double threadUtilization = (double) executor.getActiveCount() / executor.getPoolSize() * 100;
            System.out.printf("线程利用率:%.2f%%\n", threadUtilization);

            // 计算队列使用率
            BlockingQueue<Runnable> queue = executor.getQueue();
            int queueCapacity = queue.size() + queue.remainingCapacity();
            double queueUtilization = (double) queue.size() / queueCapacity * 100;
            System.out.printf("队列使用率:%.2f%%\n", queueUtilization);

            // 告警逻辑
            if (threadUtilization > 90) {
                System.err.println("⚠️ 告警:线程利用率过高!");
            }

            if (queueUtilization > 80) {
                System.err.println("⚠️ 告警:队列积压严重!");
            }

            System.out.println("================================\n");
        }, 0, 5, TimeUnit.SECONDS);  // 每5秒监控一次
    }

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            10, 20, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100)
        );

        // 启动监控
        startMonitoring(executor);

        // 提交任务...
    }
}

四、线程池参数调优

4.1 核心线程数调优

// CPU密集型任务
int cpuCount = Runtime.getRuntime().availableProcessors();
int corePoolSize = cpuCount + 1;  // N + 1

// I/O密集型任务
int corePoolSize = cpuCount * 2;  // 2N

// 混合型任务(推荐公式)
// corePoolSize = N * (1 + WT/ST)
// N = CPU核心数
// WT = 等待时间(I/O时间)
// ST = 计算时间(CPU时间)

// 例如:
// CPU核心数:8
// 等待时间:90ms(I/O)
// 计算时间:10ms(CPU)
// corePoolSize = 8 * (1 + 90/10) = 8 * 10 = 80

int corePoolSize = cpuCount * (1 + waitTime / computeTime);

4.2 队列选择与容量

// 1. LinkedBlockingQueue(无界队列)
// 优点:无限容量
// 缺点:可能OOM
// 适用:内存充足,任务数可控
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();

// 2. LinkedBlockingQueue(有界队列)
// 优点:防止OOM
// 缺点:任务过多会拒绝
// 适用:需要控制内存
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1000);

// 3. ArrayBlockingQueue(有界队列)
// 优点:数组实现,性能好
// 缺点:容量固定
// 适用:高性能场景
BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000);

// 4. SynchronousQueue(无缓冲队列)
// 优点:直接交付,吞吐量高
// 缺点:无缓冲,容易拒绝
// 适用:任务执行快,maximumPoolSize大
BlockingQueue<Runnable> queue = new SynchronousQueue<>();

// 5. PriorityBlockingQueue(优先级队列)
// 优点:支持优先级
// 缺点:性能开销大
// 适用:需要优先级调度
BlockingQueue<Runnable> queue = new PriorityBlockingQueue<>();

队列容量计算

// 队列容量 = 峰值QPS * 平均任务执行时间

// 例如:
// 峰值QPS:1000
// 平均执行时间:100ms = 0.1s
// 队列容量 = 1000 * 0.1 = 100

int queueCapacity = peakQPS * avgExecutionTime;

4.3 拒绝策略选择

// 1. AbortPolicy(默认):抛出异常
// 适用:需要感知任务丢失
new ThreadPoolExecutor.AbortPolicy();

// 2. CallerRunsPolicy:调用者线程执行
// 适用:任务不能丢失,可以降低提交速度
new ThreadPoolExecutor.CallerRunsPolicy();

// 3. DiscardPolicy:静默丢弃
// 适用:任务可丢失,不需要感知
new ThreadPoolExecutor.DiscardPolicy();

// 4. DiscardOldestPolicy:丢弃最老的任务
// 适用:新任务优先级高
new ThreadPoolExecutor.DiscardOldestPolicy();

// 5. 自定义拒绝策略
RejectedExecutionHandler handler = (r, executor) -> {
    // 记录日志
    System.err.println("任务被拒绝:" + r);

    // 告警
    sendAlert("线程池任务拒绝");

    // 降级逻辑:异步重试
    retryAsync(r);
};

五、实战调优案例

5.1 案例1:高并发API接口

场景

  • QPS:10000
  • 平均响应时间:50ms
  • I/O密集型(数据库查询)

调优前

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    10, 10,  // ❌ 线程数太少
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100)  // ❌ 队列太小
);

// 问题:
// - 线程数不足:大量任务排队
// - 队列太小:频繁拒绝任务
// - 响应时间:平均 500ms(慢10倍)

调优后

int cpuCount = Runtime.getRuntime().availableProcessors();  // 假设8核
int corePoolSize = cpuCount * 2;  // 16(I/O密集型)
int maximumPoolSize = cpuCount * 4;  // 32
int queueCapacity = 10000 * 50 / 1000;  // 500

ThreadPoolExecutor executor = new ThreadPoolExecutor(
    corePoolSize,
    maximumPoolSize,
    60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(queueCapacity),
    new ThreadPoolExecutor.CallerRunsPolicy()  // 任务不能丢失
);

// 结果:
// - 线程数充足:响应时间降至 60ms
// - 队列容量合理:很少拒绝任务
// - 性能提升:8倍

5.2 案例2:定时任务

场景

  • 每秒执行100个定时任务
  • 每个任务耗时:2秒
  • CPU密集型

调优前

ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(10);

// 问题:
// - 线程数固定10个
// - 每秒需要处理100个任务
// - 实际并发:10个,远不够
// - 任务延迟严重

调优后

int cpuCount = Runtime.getRuntime().availableProcessors();
int corePoolSize = cpuCount + 1;  // CPU密集型

// 方案1:增加线程数(如果任务数固定)
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(100);

// 方案2:任务分桶,减少并发
// 将100个任务分成10组,每组10个,串行执行
int bucketCount = 10;
List<ScheduledExecutorService> schedulers = new ArrayList<>();
for (int i = 0; i < bucketCount; i++) {
    schedulers.add(Executors.newScheduledThreadPool(corePoolSize));
}

// 将任务分配到不同的调度器
for (int i = 0; i < 100; i++) {
    int bucketIndex = i % bucketCount;
    schedulers.get(bucketIndex).scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);
}

// 结果:
// - 并发控制合理
// - 任务延迟降低
// - CPU利用率提升

六、监控工具与平台

6.1 Micrometer + Prometheus

// 使用Micrometer监控线程池
MeterRegistry registry = new SimpleMeterRegistry();

// 注册线程池指标
ExecutorServiceMetrics.monitor(registry, executor, "my-thread-pool", "app", "my-app");

// 指标会自动导出到Prometheus
// - executor_pool_size:当前线程数
// - executor_active:活跃线程数
// - executor_queued:队列中任务数
// - executor_completed:已完成任务数

6.2 Spring Boot Actuator

// application.yml
management:
  endpoints:
    web:
      exposure:
        include: metrics,health
  metrics:
    tags:
      application: my-app

// 访问端点
// GET /actuator/metrics/executor.pool.size
// GET /actuator/metrics/executor.active
// GET /actuator/metrics/executor.queued

6.3 Arthas

# 查看线程池信息
thread

# 查看线程详情
thread <thread-id>

# 监控线程池
monitor -c 5 java.util.concurrent.ThreadPoolExecutor execute

七、核心要点总结

7.1 监控核心指标

指标计算公式告警阈值
线程利用率活跃线程数 / 当前线程数>90%
队列使用率队列大小 / 队列容量>80%
任务拒绝率拒绝任务数 / 总任务数>1%
任务完成速率完成任务数 / 时间持续下降

7.2 调优参数公式

// 1. CPU密集型
corePoolSize = N + 1;  // N = CPU核心数

// 2. I/O密集型
corePoolSize = N * 2;

// 3. 混合型(推荐)
corePoolSize = N * (1 + WT/ST);  // WT=等待时间,ST=计算时间

// 4. 队列容量
queueCapacity = 峰值QPS * 平均执行时间;

// 5. 最大线程数
maximumPoolSize = corePoolSize * 2;  // 一般为核心线程数的2倍

7.3 调优步骤

  1. 监控:收集线程池指标
  2. 分析:找出性能瓶颈
  3. 调整:修改参数配置
  4. 验证:观察效果
  5. 优化:持续迭代

总结

线程池监控与调优是保障系统稳定运行的关键:

核心监控指标

  • ✅ 线程利用率:评估线程数是否充足
  • ✅ 队列使用率:评估队列容量是否合理
  • ✅ 拒绝任务数:评估系统是否过载

调优关键参数

  1. 核心线程数:根据任务类型计算
  2. 队列容量:根据QPS和执行时间计算
  3. 拒绝策略:根据业务特性选择

实战建议

  1. 定时监控:每5-10秒采集一次指标
  2. 告警机制:关键指标超阈值立即告警
  3. 压测验证:调优后务必压测验证
  4. 持续优化:根据监控数据持续调整

下一篇预告:我们将深入JVM线程相关参数,学习如何从JVM层面优化并发性能!