Java并发21:线程池最佳实践 - Executors陷阱与生产配置

Executors的陷阱 阿里巴巴Java开发手册明确规定:禁止使用Executors创建线程池。 三大陷阱 1. FixedThreadPool和SingleThreadExecutor // 源码 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); // 无界队列 } 问题:队列无界,任务堆积导致OOM 2. CachedThreadPool public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 无限线程 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 问题:线程数无限,可能创建大量线程导致OOM 3. ScheduledThreadPool public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); // 内部使用DelayedWorkQueue,无界 } 问题:队列无界,任务堆积 生产级配置模板 模板1:Web服务 @Configuration public class ThreadPoolConfig { @Bean("webExecutor") public ThreadPoolExecutor webExecutor() { int coreSize = Runtime.getRuntime().availableProcessors() * 2; return new ThreadPoolExecutor( coreSize, // 核心线程数 coreSize * 4, // 最大线程数 60L, TimeUnit.SECONDS, // 空闲存活时间 new ArrayBlockingQueue<>(1000), // 有界队列 new NamedThreadFactory("web-pool"), new ThreadPoolExecutor.CallerRunsPolicy() // 降级策略 ); } } // 自定义线程工厂 class NamedThreadFactory implements ThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; public NamedThreadFactory(String namePrefix) { this.namePrefix = namePrefix; } @Override public Thread newThread(Runnable r) { Thread t = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement()); t.setDaemon(false); return t; } } 模板2:异步任务 @Bean("asyncExecutor") public ThreadPoolExecutor asyncExecutor() { return new ThreadPoolExecutor( 10, // 核心线程数 20, // 最大线程数 300L, TimeUnit.SECONDS, // 较长的空闲时间 new LinkedBlockingQueue<>(500), // 较大队列 new NamedThreadFactory("async-pool"), (r, executor) -> { // 自定义拒绝策略:记录日志 log.error("Task rejected: {}", r); // 可以保存到数据库或MQ } ); } 异常处理 问题:线程池中的异常不会抛出 ...

2025-11-20 · maneng

Java并发20:ThreadPoolExecutor详解 - 深入线程池源码

核心参数详解 public ThreadPoolExecutor( int corePoolSize, // 核心线程数 int maximumPoolSize, // 最大线程数 long keepAliveTime, // 空闲线程存活时间 TimeUnit unit, // 时间单位 BlockingQueue<Runnable> workQueue, // 任务队列 ThreadFactory threadFactory, // 线程工厂 RejectedExecutionHandler handler // 拒绝策略 ) { ... } 参数说明 corePoolSize(核心线程数): 即使空闲也不会回收 除非设置allowCoreThreadTimeOut(true) maximumPoolSize(最大线程数): 队列满后创建的临时线程 空闲超过keepAliveTime后回收 workQueue(任务队列): // 1. 有界队列(推荐) new ArrayBlockingQueue<>(100) // 2. 无界队列(慎用) new LinkedBlockingQueue<>() // 可能OOM // 3. 同步队列(不存储) new SynchronousQueue<>() // 直接交付给线程 // 4. 优先级队列 new PriorityBlockingQueue<>() // 按优先级执行 任务提交流程 execute() vs submit(): ...

2025-11-20 · maneng

Java并发19:线程池核心原理 - 为什么需要线程池

为什么需要线程池? 直接创建线程的问题: // 每个请求创建一个线程 new Thread(() -> handleRequest()).start(); 三大问题: 创建/销毁开销大:线程创建需要约1MB栈空间,耗时1ms 资源无限制:可能创建上千个线程,耗尽内存 管理困难:无法统一管理和监控 线程池的优势: ✅ 线程复用,降低开销 ✅ 控制并发数,避免资源耗尽 ✅ 统一管理,便于监控 核心组件 public class ThreadPoolExecutor { // 核心参数 int corePoolSize; // 核心线程数 int maximumPoolSize; // 最大线程数 long keepAliveTime; // 空闲线程存活时间 BlockingQueue<Runnable> workQueue; // 任务队列 RejectedExecutionHandler handler; // 拒绝策略 } 工作流程 任务提交 ↓ 线程数 < corePoolSize? ├─ 是 → 创建核心线程执行 └─ 否 → 队列未满? ├─ 是 → 加入队列等待 └─ 否 → 线程数 < maximumPoolSize? ├─ 是 → 创建临时线程执行 └─ 否 → 执行拒绝策略 示例: ...

2025-11-20 · maneng

Java并发04:线程的创建与启动 - 4种方式深度对比

引言:一道高频面试题 面试官:请说一下Java中创建线程有哪几种方式? 候选人A:继承Thread类、实现Runnable接口…还有吗? 面试官:还有吗?Callable算吗?线程池算吗?它们有什么区别? 大部分候选人能说出前两种,但很少有人能说清楚: 为什么推荐实现Runnable而不是继承Thread? Callable和Runnable的本质区别是什么? 线程池是怎么创建线程的? start()和run()到底有什么区别? 今天我们从第一性原理出发,彻底搞懂这些问题。 一、方式1:继承Thread类 1.1 基本用法 public class MyThread extends Thread { @Override public void run() { System.out.println("线程执行: " + Thread.currentThread().getName()); } public static void main(String[] args) { MyThread thread = new MyThread(); thread.start(); // 启动线程 } } 输出: 线程执行: Thread-0 1.2 原理分析 // Thread类的核心结构 public class Thread implements Runnable { private Runnable target; // 任务对象 public Thread() { // 空构造 } public Thread(Runnable target) { this.target = target; // 传入任务 } @Override public void run() { if (target != null) { target.run(); // 执行任务 } // 子类可以重写这个方法 } public synchronized void start() { // 1. 检查状态 if (threadStatus != 0) throw new IllegalThreadStateException(); // 2. 加入线程组 group.add(this); // 3. 调用native方法启动线程 boolean started = false; try { start0(); // ← native方法,创建操作系统线程 started = true; } finally { // ... } } private native void start0(); // JNI调用,创建OS线程 } 关键点: ...

2025-11-20 · maneng

线程池监控与调优:从指标监控到性能优化

一、为什么要监控线程池? 1.1 常见线程池问题 // 线程池配置不当,导致的问题: // 1. 线程数过小:任务堆积 ThreadPoolExecutor pool = new ThreadPoolExecutor( 2, 2, // ❌ 核心线程数太少 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000) ); // 结果:任务大量排队,响应慢 // 2. 队列无界:内存溢出 ThreadPoolExecutor pool = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>() // ❌ 无界队列 ); // 结果:任务无限堆积,OOM // 3. 拒绝策略不当:任务丢失 ThreadPoolExecutor pool = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.DiscardPolicy() // ❌ 静默丢弃 ); // 结果:任务丢失,业务异常 监控目的: ✅ 发现性能瓶颈 ✅ 预防资源耗尽 ✅ 优化参数配置 ✅ 及时告警 二、核心监控指标 2.1 线程池状态指标 ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1000) ); // 1. 核心线程数 int corePoolSize = executor.getCorePoolSize(); // 2. 最大线程数 int maximumPoolSize = executor.getMaximumPoolSize(); // 3. 当前线程数 int poolSize = executor.getPoolSize(); // 4. 活跃线程数(正在执行任务的线程数) int activeCount = executor.getActiveCount(); // 5. 历史最大线程数 int largestPoolSize = executor.getLargestPoolSize(); // 6. 任务总数 long taskCount = executor.getTaskCount(); // 7. 已完成任务数 long completedTaskCount = executor.getCompletedTaskCount(); // 8. 队列中任务数 int queueSize = executor.getQueue().size(); // 9. 队列剩余容量 int remainingCapacity = executor.getQueue().remainingCapacity(); System.out.println("核心线程数:" + corePoolSize); System.out.println("最大线程数:" + maximumPoolSize); System.out.println("当前线程数:" + poolSize); System.out.println("活跃线程数:" + activeCount); System.out.println("队列中任务数:" + queueSize); System.out.println("已完成任务数:" + completedTaskCount); 2.2 关键指标说明 指标 说明 正常范围 异常信号 活跃线程数/当前线程数 线程利用率 60%-80% >90%:线程不足 队列中任务数 任务积压情况 <50% >80%:任务堆积 任务完成速率 处理能力 稳定 持续下降:性能问题 拒绝任务数 容量溢出 0 >0:需要扩容 三、线程池监控实战 3.1 自定义监控线程池 public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor { private final AtomicLong totalExecutionTime = new AtomicLong(0); private final AtomicLong rejectedTaskCount = new AtomicLong(0); public MonitoredThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, new MonitoredRejectedExecutionHandler()); } @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); // 任务执行前的逻辑 } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); // 任务执行后的逻辑 if (t != null) { System.err.println("任务执行异常:" + t.getMessage()); } // 统计执行时间(需要在Runnable中记录) } @Override protected void terminated() { super.terminated(); System.out.println("线程池已关闭"); printStatistics(); } // 自定义拒绝策略:记录拒绝次数 private class MonitoredRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { rejectedTaskCount.incrementAndGet(); System.err.println("任务被拒绝!拒绝次数:" + rejectedTaskCount.get()); // 告警逻辑 if (rejectedTaskCount.get() % 100 == 0) { System.err.println("⚠️ 告警:已拒绝 " + rejectedTaskCount.get() + " 个任务!"); } // 降级逻辑:调用者线程执行 r.run(); } } // 打印统计信息 public void printStatistics() { System.out.println("========== 线程池统计 =========="); System.out.println("核心线程数:" + getCorePoolSize()); System.out.println("最大线程数:" + getMaximumPoolSize()); System.out.println("当前线程数:" + getPoolSize()); System.out.println("活跃线程数:" + getActiveCount()); System.out.println("历史最大线程数:" + getLargestPoolSize()); System.out.println("任务总数:" + getTaskCount()); System.out.println("已完成任务数:" + getCompletedTaskCount()); System.out.println("队列中任务数:" + getQueue().size()); System.out.println("拒绝任务数:" + rejectedTaskCount.get()); System.out.println("================================"); } } 3.2 定时监控 public class ThreadPoolMonitor { public static void startMonitoring(ThreadPoolExecutor executor) { ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(() -> { System.out.println("========== 线程池监控 =========="); System.out.println("当前时间:" + LocalDateTime.now()); System.out.println("核心线程数:" + executor.getCorePoolSize()); System.out.println("最大线程数:" + executor.getMaximumPoolSize()); System.out.println("当前线程数:" + executor.getPoolSize()); System.out.println("活跃线程数:" + executor.getActiveCount()); System.out.println("队列大小:" + executor.getQueue().size()); System.out.println("已完成任务:" + executor.getCompletedTaskCount()); // 计算线程利用率 double threadUtilization = (double) executor.getActiveCount() / executor.getPoolSize() * 100; System.out.printf("线程利用率:%.2f%%\n", threadUtilization); // 计算队列使用率 BlockingQueue<Runnable> queue = executor.getQueue(); int queueCapacity = queue.size() + queue.remainingCapacity(); double queueUtilization = (double) queue.size() / queueCapacity * 100; System.out.printf("队列使用率:%.2f%%\n", queueUtilization); // 告警逻辑 if (threadUtilization > 90) { System.err.println("⚠️ 告警:线程利用率过高!"); } if (queueUtilization > 80) { System.err.println("⚠️ 告警:队列积压严重!"); } System.out.println("================================\n"); }, 0, 5, TimeUnit.SECONDS); // 每5秒监控一次 } public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor( 10, 20, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100) ); // 启动监控 startMonitoring(executor); // 提交任务... } } 四、线程池参数调优 4.1 核心线程数调优 // CPU密集型任务 int cpuCount = Runtime.getRuntime().availableProcessors(); int corePoolSize = cpuCount + 1; // N + 1 // I/O密集型任务 int corePoolSize = cpuCount * 2; // 2N // 混合型任务(推荐公式) // corePoolSize = N * (1 + WT/ST) // N = CPU核心数 // WT = 等待时间(I/O时间) // ST = 计算时间(CPU时间) // 例如: // CPU核心数:8 // 等待时间:90ms(I/O) // 计算时间:10ms(CPU) // corePoolSize = 8 * (1 + 90/10) = 8 * 10 = 80 int corePoolSize = cpuCount * (1 + waitTime / computeTime); 4.2 队列选择与容量 // 1. LinkedBlockingQueue(无界队列) // 优点:无限容量 // 缺点:可能OOM // 适用:内存充足,任务数可控 BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(); // 2. LinkedBlockingQueue(有界队列) // 优点:防止OOM // 缺点:任务过多会拒绝 // 适用:需要控制内存 BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(1000); // 3. ArrayBlockingQueue(有界队列) // 优点:数组实现,性能好 // 缺点:容量固定 // 适用:高性能场景 BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(1000); // 4. SynchronousQueue(无缓冲队列) // 优点:直接交付,吞吐量高 // 缺点:无缓冲,容易拒绝 // 适用:任务执行快,maximumPoolSize大 BlockingQueue<Runnable> queue = new SynchronousQueue<>(); // 5. PriorityBlockingQueue(优先级队列) // 优点:支持优先级 // 缺点:性能开销大 // 适用:需要优先级调度 BlockingQueue<Runnable> queue = new PriorityBlockingQueue<>(); 队列容量计算: ...

2025-11-20 · maneng

线程池与异步编程:从Thread到CompletableFuture的演进

引子:一个Web服务器的性能优化之路 假设你正在开发一个Web服务器,每个HTTP请求需要启动一个新线程来处理。看似简单的设计,却隐藏着严重的性能问题。 场景A:为每个请求创建新线程 /** * 方案1:为每个请求创建新线程(性能差) */ public class ThreadPerRequestServer { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(8080); System.out.println("服务器启动,监听8080端口"); while (true) { Socket socket = serverSocket.accept(); // 为每个请求创建新线程 new Thread(() -> { try { handleRequest(socket); } catch (IOException e) { e.printStackTrace(); } finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } }).start(); } } private static void handleRequest(Socket socket) throws IOException { // 模拟请求处理(读取请求、业务处理、返回响应) InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); // 简化处理 byte[] buffer = new byte[1024]; in.read(buffer); String response = "HTTP/1.1 200 OK\r\n\r\nHello World"; out.write(response.getBytes()); } } /* 性能测试(使用JMeter压测): 并发数:1000 请求总数:10000 结果: ├─ 吞吐量:500 req/s ├─ 平均响应时间:2000ms ├─ CPU使用率:60% └─ 内存使用:峰值2GB 问题分析: 1. 线程创建开销大 ├─ 每个线程需要1MB栈空间 ├─ 1000个线程 = 1GB内存 └─ 线程创建/销毁耗时(ms级别) 2. 上下文切换频繁 ├─ 1000个线程竞争CPU ├─ 大量时间花在线程切换 └─ CPU利用率低 3. 系统资源耗尽 ├─ 线程数无限制 ├─ 可能导致OOM └─ 系统崩溃 */ 场景B:使用线程池 /** * 方案2:使用线程池(性能好) */ public class ThreadPoolServer { // 创建固定大小的线程池 private static final ExecutorService executor = Executors.newFixedThreadPool(100); public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(8080); System.out.println("服务器启动,监听8080端口"); while (true) { Socket socket = serverSocket.accept(); // 提交任务到线程池 executor.submit(() -> { try { handleRequest(socket); } catch (IOException e) { e.printStackTrace(); } finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } }); } } private static void handleRequest(Socket socket) throws IOException { // 同上 InputStream in = socket.getInputStream(); OutputStream out = socket.getOutputStream(); byte[] buffer = new byte[1024]; in.read(buffer); String response = "HTTP/1.1 200 OK\r\n\r\nHello World"; out.write(response.getBytes()); } } /* 性能测试(同样的压测条件): 并发数:1000 请求总数:10000 结果: ├─ 吞吐量:5000 req/s ← 提升10倍! ├─ 平均响应时间:200ms ← 降低10倍! ├─ CPU使用率:85% ← 提升25% └─ 内存使用:峰值200MB ← 降低10倍! 优势分析: 1. 线程复用 ├─ 100个线程处理1000个请求 ├─ 无需频繁创建/销毁线程 └─ 节省大量时间和内存 2. 减少上下文切换 ├─ 线程数固定(100个) ├─ 上下文切换次数大幅减少 └─ CPU利用率提升 3. 资源可控 ├─ 线程数有上限 ├─ 内存使用可预测 └─ 系统稳定 */ 性能对比总结 对比维度 直接创建线程 线程池 提升 吞吐量 500 req/s 5000 req/s 10倍 响应时间 2000ms 200ms 10倍 CPU使用率 60% 85% +25% 内存峰值 2GB 200MB 10倍 线程数 1000+ 100 可控 核心洞察: ...

2025-11-03 · maneng

如约数科科技工作室

浙ICP备2025203501号

👀 本站总访问量 ...| 👤 访客数 ...| 📅 今日访问 ...