Executors的陷阱

阿里巴巴Java开发手册明确规定:禁止使用Executors创建线程池。

三大陷阱

1. FixedThreadPool和SingleThreadExecutor

// 源码
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>());  // 无界队列
}

问题:队列无界,任务堆积导致OOM

2. CachedThreadPool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,  // 无限线程
        60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>());
}

问题:线程数无限,可能创建大量线程导致OOM

3. ScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
    // 内部使用DelayedWorkQueue,无界
}

问题:队列无界,任务堆积


生产级配置模板

模板1:Web服务

@Configuration
public class ThreadPoolConfig {

    @Bean("webExecutor")
    public ThreadPoolExecutor webExecutor() {
        int coreSize = Runtime.getRuntime().availableProcessors() * 2;

        return new ThreadPoolExecutor(
            coreSize,                   // 核心线程数
            coreSize * 4,               // 最大线程数
            60L, TimeUnit.SECONDS,      // 空闲存活时间
            new ArrayBlockingQueue<>(1000),  // 有界队列
            new NamedThreadFactory("web-pool"),
            new ThreadPoolExecutor.CallerRunsPolicy()  // 降级策略
        );
    }
}

// 自定义线程工厂
class NamedThreadFactory implements ThreadFactory {
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    public NamedThreadFactory(String namePrefix) {
        this.namePrefix = namePrefix;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r, namePrefix + "-" + threadNumber.getAndIncrement());
        t.setDaemon(false);
        return t;
    }
}

模板2:异步任务

@Bean("asyncExecutor")
public ThreadPoolExecutor asyncExecutor() {
    return new ThreadPoolExecutor(
        10,                         // 核心线程数
        20,                         // 最大线程数
        300L, TimeUnit.SECONDS,     // 较长的空闲时间
        new LinkedBlockingQueue<>(500),  // 较大队列
        new NamedThreadFactory("async-pool"),
        (r, executor) -> {
            // 自定义拒绝策略:记录日志
            log.error("Task rejected: {}", r);
            // 可以保存到数据库或MQ
        }
    );
}

异常处理

问题:线程池中的异常不会抛出

executor.submit(() -> {
    throw new RuntimeException("Error");  // 异常被吞掉
});

解决方案1:使用Future

Future<?> future = executor.submit(() -> {
    // 可能抛异常的代码
});

try {
    future.get();  // 获取结果时会抛出异常
} catch (ExecutionException e) {
    log.error("Task failed", e.getCause());
}

解决方案2:自定义ThreadFactory

class CustomThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setUncaughtExceptionHandler((thread, throwable) -> {
            log.error("Thread {} threw exception", thread.getName(), throwable);
        });
        return t;
    }
}

解决方案3:包装任务

executor.submit(() -> {
    try {
        // 业务代码
    } catch (Exception e) {
        log.error("Task failed", e);
    }
});

监控与告警

1. 自定义ThreadPoolExecutor

public class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {

    public MonitoredThreadPoolExecutor(...) {
        super(...);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        log.debug("Task {} is about to execute", r);
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t != null) {
            log.error("Task {} failed", r, t);
        }
    }

    @Override
    protected void terminated() {
        super.terminated();
        log.info("ThreadPool terminated");
    }
}

2. 定时监控

@Scheduled(fixedRate = 5000)
public void monitorThreadPool() {
    int poolSize = executor.getPoolSize();
    int activeCount = executor.getActiveCount();
    int queueSize = executor.getQueue().size();
    long completedTaskCount = executor.getCompletedTaskCount();

    log.info("Pool[size={}, active={}, queue={}, completed={}]",
        poolSize, activeCount, queueSize, completedTaskCount);

    // 告警:队列积压
    if (queueSize > 800) {
        alert("ThreadPool queue size too high: " + queueSize);
    }

    // 告警:拒绝任务
    if (executor instanceof ThreadPoolExecutor) {
        long rejectedCount = getRejectedCount(executor);
        if (rejectedCount > 0) {
            alert("ThreadPool rejected tasks: " + rejectedCount);
        }
    }
}

3. Micrometer集成

@Bean
public ThreadPoolExecutor monitoredExecutor(MeterRegistry registry) {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(...);

    // 注册指标
    registry.gauge("threadpool.size", executor, ThreadPoolExecutor::getPoolSize);
    registry.gauge("threadpool.active", executor, ThreadPoolExecutor::getActiveCount);
    registry.gauge("threadpool.queue.size", executor, e -> e.getQueue().size());

    return executor;
}

优雅关闭

@PreDestroy
public void shutdown() {
    log.info("Shutting down thread pool...");

    // 1. 停止接受新任务
    executor.shutdown();

    try {
        // 2. 等待已有任务完成(最多等待60秒)
        if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            log.warn("Pool did not terminate gracefully, forcing shutdown");

            // 3. 强制关闭
            List<Runnable> droppedTasks = executor.shutdownNow();
            log.warn("Dropped {} tasks", droppedTasks.size());

            // 4. 再次等待(最多10秒)
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                log.error("Pool did not terminate");
            }
        }
    } catch (InterruptedException e) {
        log.error("Shutdown interrupted", e);
        executor.shutdownNow();
        Thread.currentThread().interrupt();
    }

    log.info("Thread pool shutdown complete");
}

动态配置

使用Apollo/Nacos动态调整

@Component
public class DynamicThreadPoolConfig {

    @Value("${threadpool.coreSize:10}")
    private int coreSize;

    @Value("${threadpool.maxSize:20}")
    private int maxSize;

    @Autowired
    private ThreadPoolExecutor executor;

    @ApolloConfigChangeListener
    public void onChange(ConfigChangeEvent changeEvent) {
        if (changeEvent.isChanged("threadpool.coreSize")) {
            int newCoreSize = Integer.parseInt(changeEvent.getChange("threadpool.coreSize").getNewValue());
            executor.setCorePoolSize(newCoreSize);
            log.info("CorePoolSize changed to {}", newCoreSize);
        }

        if (changeEvent.isChanged("threadpool.maxSize")) {
            int newMaxSize = Integer.parseInt(changeEvent.getChange("threadpool.maxSize").getNewValue());
            executor.setMaximumPoolSize(newMaxSize);
            log.info("MaximumPoolSize changed to {}", newMaxSize);
        }
    }
}

压测调优

性能测试脚本

public class ThreadPoolBenchmark {

    @Benchmark
    public void testThreadPool() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            10, 20, 60L, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(100),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        CountDownLatch latch = new CountDownLatch(1000);
        for (int i = 0; i < 1000; i++) {
            executor.submit(() -> {
                // 模拟任务
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                latch.countDown();
            });
        }

        latch.await();
        executor.shutdown();
    }
}

总结

生产环境检查清单

  • 使用自定义ThreadPoolExecutor,不用Executors
  • 设置有界队列
  • 设置合理的拒绝策略
  • 给线程池命名
  • 实现异常处理
  • 添加监控告警
  • 实现优雅关闭
  • 支持动态调整
  • 进行压测调优

系列文章