一、为什么要监控线程池? 1.1 常见线程池问题 // 线程池配置不当,导致的问题: // 1. 线程数过小:任务堆积 ThreadPoolExecutor pool = new ThreadPoolExecutor( 2, 2, // ❌ 核心线程数太少 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000) ); // 结果:任务大量排队,响应慢 // 2. 队列无界:内存溢出 ThreadPoolExecutor pool = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>() // ❌ 无界队列 ); // 结果:任务无限堆积,OOM // 3. 拒绝策略不当:任务丢失 ThreadPoolExecutor pool = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.DiscardPolicy() // ❌ 静默丢弃 ); // 结果:任务丢失,业务异常 监控目的:
✅ 发现性能瓶颈 ✅ 预防资源耗尽 ✅ 优化参数配置 ✅ 及时告警 二、核心监控指标 2.1 线程池状态指标 ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000) ); // 1. 核心线程数 int corePoolSize = executor.getCorePoolSize(); // 2. 最大线程数 int maximumPoolSize = executor.getMaximumPoolSize(); // 3. 当前线程数 int poolSize = executor.getPoolSize(); // 4. 活跃线程数(正在执行任务的线程数) int activeCount = executor.getActiveCount(); // 5. 历史最大线程数 int largestPoolSize = executor.getLargestPoolSize(); // 6. 任务总数 long taskCount = executor.getTaskCount(); // 7. 已完成任务数 long completedTaskCount = executor.getCompletedTaskCount(); // 8. 队列中任务数 int queueSize = executor.getQueue().size(); // 9. 队列剩余容量 int remainingCapacity = executor.getQueue().remainingCapacity(); System.out.println("核心线程数:" + corePoolSize); System.out.println("最大线程数:" + maximumPoolSize); System.out.println("当前线程数:" + poolSize); System.out.println("活跃线程数:" + activeCount); System.out.println("队列中任务数:" + queueSize); System.out.println("已完成任务数:" + completedTaskCount); 2.2 关键指标说明 指标 说明 正常范围 异常信号 活跃线程数/当前线程数 线程利用率 60%-80% >90%:线程不足 队列中任务数 任务积压情况 <50% >80%:任务堆积 任务完成速率 处理能力 稳定 持续下降:性能问题 拒绝任务数 容量溢出 0 >0:需要扩容 三、线程池监控实战 3.1 自定义监控线程池 public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor { private final AtomicLong totalExecutionTime = new AtomicLong(0); private final AtomicLong rejectedTaskCount = new AtomicLong(0); public MonitoredThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new MonitoredRejectedExecutionHandler()); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); // 任务执行前的逻辑 } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); // 任务执行后的逻辑 if (t != null) { System.err.println("任务执行异常:" + t.getMessage()); } // 统计执行时间(需要在Runnable中记录) } @Override protected void terminated() { super.terminated(); System.out.println("线程池已关闭"); printStatistics(); } // 自定义拒绝策略:记录拒绝次数 private class MonitoredRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { rejectedTaskCount.incrementAndGet(); System.err.println("任务被拒绝!拒绝次数:" + rejectedTaskCount.get()); // 告警逻辑 if (rejectedTaskCount.get() % 100 == 0) { System.err.println("⚠️ 告警:已拒绝 " + rejectedTaskCount.get() + " 个任务!"); } // 降级逻辑:调用者线程执行 r.run(); } } // 打印统计信息 public void printStatistics() { System.out.println("========== 线程池统计 =========="); System.out.println("核心线程数:" + getCorePoolSize()); System.out.println("最大线程数:" + getMaximumPoolSize()); System.out.println("当前线程数:" + getPoolSize()); System.out.println("活跃线程数:" + getActiveCount()); System.out.println("历史最大线程数:" + getLargestPoolSize()); System.out.println("任务总数:" + getTaskCount()); System.out.println("已完成任务数:" + getCompletedTaskCount()); System.out.println("队列中任务数:" + getQueue().size()); System.out.println("拒绝任务数:" + rejectedTaskCount.get()); System.out.println("================================"); } } 3.2 定时监控 public class ThreadPoolMonitor { public static void startMonitoring(ThreadPoolExecutor executor) { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() -> { System.out.println("========== 线程池监控 =========="); System.out.println("当前时间:" + LocalDateTime.now()); System.out.println("核心线程数:" + executor.getCorePoolSize()); System.out.println("最大线程数:" + executor.getMaximumPoolSize()); System.out.println("当前线程数:" + executor.getPoolSize()); System.out.println("活跃线程数:" + executor.getActiveCount()); System.out.println("队列大小:" + executor.getQueue().size()); System.out.println("已完成任务:" + executor.getCompletedTaskCount()); // 计算线程利用率 double threadUtilization = (double) executor.getActiveCount() / executor.getPoolSize() * 100; System.out.printf("线程利用率:%.2f%%\n", threadUtilization); // 计算队列使用率 BlockingQueue<Runnable> queue = executor.getQueue(); int queueCapacity = queue.size() + queue.remainingCapacity(); double queueUtilization = (double) queue.size() / queueCapacity * 100; System.out.printf("队列使用率:%.2f%%\n", queueUtilization); // 告警逻辑 if (threadUtilization > 90) { System.err.println("⚠️ 告警:线程利用率过高!"); } if (queueUtilization > 80) { System.err.println("⚠️ 告警:队列积压严重!"); } System.out.println("================================\n"); }, 0, 5, TimeUnit.SECONDS); // 每5秒监控一次 } public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100) ); // 启动监控 startMonitoring(executor); // 提交任务... } } 四、线程池参数调优 4.1 核心线程数调优 // CPU密集型任务 int cpuCount = Runtime.getRuntime().availableProcessors(); int corePoolSize = cpuCount + 1; // N + 1 // I/O密集型任务 int corePoolSize = cpuCount * 2; // 2N // 混合型任务(推荐公式) // corePoolSize = N * (1 + WT/ST) // N = CPU核心数 // WT = 等待时间(I/O时间) // ST = 计算时间(CPU时间) // 例如: // CPU核心数:8 // 等待时间:90ms(I/O) // 计算时间:10ms(CPU) // corePoolSize = 8 * (1 + 90/10) = 8 * 10 = 80 int corePoolSize = cpuCount * (1 + waitTime / computeTime); 4.2 队列选择与容量 // 1. LinkedBlockingQueue(无界队列) // 优点:无限容量 // 缺点:可能OOM // 适用:内存充足,任务数可控 BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); // 2. LinkedBlockingQueue(有界队列) // 优点:防止OOM // 缺点:任务过多会拒绝 // 适用:需要控制内存 BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1000); // 3. ArrayBlockingQueue(有界队列) // 优点:数组实现,性能好 // 缺点:容量固定 // 适用:高性能场景 BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000); // 4. SynchronousQueue(无缓冲队列) // 优点:直接交付,吞吐量高 // 缺点:无缓冲,容易拒绝 // 适用:任务执行快,maximumPoolSize大 BlockingQueue<Runnable> queue = new SynchronousQueue<>(); // 5. PriorityBlockingQueue(优先级队列) // 优点:支持优先级 // 缺点:性能开销大 // 适用:需要优先级调度 BlockingQueue<Runnable> queue = new PriorityBlockingQueue<>(); 队列容量计算:
...