一、为什么需要ForkJoinPool?

1.1 传统线程池的局限

// 传统线程池处理递归任务
ExecutorService executor = Executors.newFixedThreadPool(4);

// 计算斐波那契数列(递归)
public int fib(int n) {
    if (n <= 1) return n;

    // 提交子任务
    Future<Integer> f1 = executor.submit(() -> fib(n - 1));
    Future<Integer> f2 = executor.submit(() -> fib(n - 2));

    // 等待子任务完成
    return f1.get() + f2.get();  // ❌ 可能导致线程池死锁!
}

问题

  • 线程池死锁:父任务等待子任务,但子任务在队列中等待线程,形成死锁
  • 负载不均:某些线程任务多,某些线程空闲,资源利用率低
  • 任务粒度:传统线程池适合粗粒度任务,不适合细粒度递归任务

1.2 ForkJoinPool的解决方案

// ForkJoinPool 专为递归任务设计
ForkJoinPool pool = new ForkJoinPool();

class FibTask extends RecursiveTask<Integer> {
    private int n;

    protected Integer compute() {
        if (n <= 1) return n;

        // 分解子任务
        FibTask f1 = new FibTask(n - 1);
        FibTask f2 = new FibTask(n - 2);

        // 异步执行左子任务
        f1.fork();

        // 同步执行右子任务,并等待左子任务
        return f2.compute() + f1.join();  // ✅ 不会死锁
    }
}

int result = pool.invoke(new FibTask(10));

核心优势

  • 工作窃取:空闲线程主动"窃取"其他线程的任务,提高负载均衡
  • 双端队列:每个线程有独立的任务队列,减少竞争
  • 递归优化:专为分治算法设计,避免死锁

二、工作窃取算法原理

2.1 核心思想

线程1队列: [Task1] [Task2] [Task3] [Task4]  ← 线程1从队首取任务
线程2队列: [Task5] [Task6]                  ← 线程2从队首取任务
线程3队列: []                                ← 线程3空闲

工作窃取发生:
线程3发现自己队列为空,于是:
1. 查找其他线程的队列
2. 从线程1的队尾"窃取" Task4
3. 线程3继续工作,避免空闲

线程1队列: [Task1] [Task2] [Task3]  ← Task4被窃取
线程2队列: [Task5] [Task6]
线程3队列: [Task4]                  ← 窃取到任务

关键机制

  • 每个线程维护一个双端队列(Deque)
  • 本线程从队首取任务(LIFO,栈特性)
  • 其他线程从队尾窃取任务(FIFO,队列特性)
  • 减少竞争:两端操作,冲突概率低

2.2 为什么这样设计?

// 递归任务的执行顺序
class Task extends RecursiveTask<Integer> {
    protected Integer compute() {
        Task left = new Task();
        Task right = new Task();

        left.fork();    // 左子任务放入队列头部
        right.fork();   // 右子任务放入队列头部

        return left.join() + right.join();
    }
}

队列状态

队列: [right] [left] [parent]
       ↑头部          ↑尾部

本线程从头部取:优先处理最新的子任务(深度优先,缓存友好)
其他线程从尾部窃取:窃取较早的任务(避免竞争)

优势

  • 缓存友好:深度优先,相关数据在缓存中
  • 减少窃取:子任务优先被创建它的线程执行
  • 负载均衡:空闲线程自动窃取任务

三、核心API使用

3.1 RecursiveTask vs RecursiveAction

// RecursiveTask:有返回值
class SumTask extends RecursiveTask<Long> {
    private long[] array;
    private int start, end;
    private static final int THRESHOLD = 100;  // 阈值

    @Override
    protected Long compute() {
        // 任务足够小,直接计算
        if (end - start <= THRESHOLD) {
            long sum = 0;
            for (int i = start; i < end; i++) {
                sum += array[i];
            }
            return sum;
        }

        // 任务太大,分解
        int mid = (start + end) / 2;
        SumTask leftTask = new SumTask(array, start, mid);
        SumTask rightTask = new SumTask(array, mid, end);

        // 异步执行左任务
        leftTask.fork();

        // 同步执行右任务,并等待左任务
        long rightResult = rightTask.compute();
        long leftResult = leftTask.join();

        return leftResult + rightResult;
    }
}

// 使用
ForkJoinPool pool = new ForkJoinPool();
long result = pool.invoke(new SumTask(array, 0, array.length));

// RecursiveAction:无返回值
class PrintTask extends RecursiveAction {
    private int start, end;
    private static final int THRESHOLD = 100;

    @Override
    protected void compute() {
        if (end - start <= THRESHOLD) {
            for (int i = start; i < end; i++) {
                System.out.println(i);
            }
        } else {
            int mid = (start + end) / 2;
            invokeAll(
                new PrintTask(start, mid),
                new PrintTask(mid, end)
            );
        }
    }
}

3.2 fork() 和 join() 的最佳实践

// ❌ 错误用法
class BadTask extends RecursiveTask<Integer> {
    protected Integer compute() {
        Task left = new Task();
        Task right = new Task();

        left.fork();
        right.fork();
        return left.join() + right.join();  // 两次fork,浪费线程
    }
}

// ✅ 推荐用法
class GoodTask extends RecursiveTask<Integer> {
    protected Integer compute() {
        Task left = new Task();
        Task right = new Task();

        left.fork();  // 只fork一个
        int rightResult = right.compute();  // 另一个直接计算
        int leftResult = left.join();

        return leftResult + rightResult;
    }
}

// ✅ 或使用 invokeAll
class BetterTask extends RecursiveTask<Integer> {
    protected Integer compute() {
        Task left = new Task();
        Task right = new Task();

        invokeAll(left, right);  // 自动优化
        return left.join() + right.join();
    }
}

原因

  • fork() 会将任务放入队列,消耗线程资源
  • compute() 直接在当前线程执行,节省一次调度
  • invokeAll() 会自动优化,只fork n-1个任务

四、实战案例

4.1 并行归并排序

class MergeSortTask extends RecursiveAction {
    private int[] array;
    private int left, right;
    private int[] temp;
    private static final int THRESHOLD = 100;

    @Override
    protected void compute() {
        // 任务足够小,直接排序
        if (right - left <= THRESHOLD) {
            Arrays.sort(array, left, right);
            return;
        }

        // 分解任务
        int mid = (left + right) / 2;
        MergeSortTask leftTask = new MergeSortTask(array, left, mid, temp);
        MergeSortTask rightTask = new MergeSortTask(array, mid, right, temp);

        // 并行执行
        invokeAll(leftTask, rightTask);

        // 合并结果
        merge(array, left, mid, right, temp);
    }

    private void merge(int[] array, int left, int mid, int right, int[] temp) {
        // 标准归并逻辑
        System.arraycopy(array, left, temp, left, right - left);
        int i = left, j = mid, k = left;

        while (i < mid && j < right) {
            if (temp[i] <= temp[j]) {
                array[k++] = temp[i++];
            } else {
                array[k++] = temp[j++];
            }
        }

        while (i < mid) array[k++] = temp[i++];
        while (j < right) array[k++] = temp[j++];
    }
}

// 使用
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(new MergeSortTask(array, 0, array.length, new int[array.length]));

4.2 并行遍历文件树

class FileSearchTask extends RecursiveTask<List<File>> {
    private File directory;
    private String keyword;

    @Override
    protected List<File> compute() {
        List<File> result = new ArrayList<>();
        File[] files = directory.listFiles();

        if (files == null) return result;

        List<FileSearchTask> subTasks = new ArrayList<>();

        for (File file : files) {
            if (file.isDirectory()) {
                // 子目录:创建子任务
                FileSearchTask subTask = new FileSearchTask(file, keyword);
                subTask.fork();
                subTasks.add(subTask);
            } else {
                // 文件:检查是否匹配
                if (file.getName().contains(keyword)) {
                    result.add(file);
                }
            }
        }

        // 收集子任务结果
        for (FileSearchTask subTask : subTasks) {
            result.addAll(subTask.join());
        }

        return result;
    }
}

// 使用
ForkJoinPool pool = new ForkJoinPool();
List<File> files = pool.invoke(new FileSearchTask(new File("/path"), "keyword"));

五、ForkJoinPool配置与监控

5.1 创建ForkJoinPool

// 1. 使用默认配置(推荐)
ForkJoinPool pool = new ForkJoinPool();

// 2. 指定并行度
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());

// 3. 使用commonPool(全局共享)
ForkJoinPool.commonPool().invoke(task);

// 4. 完全自定义
ForkJoinPool pool = new ForkJoinPool(
    4,                                      // 并行度
    ForkJoinPool.defaultForkJoinWorkerThreadFactory,
    null,                                   // 异常处理器
    false                                   // 异步模式
);

5.2 监控指标

ForkJoinPool pool = new ForkJoinPool();

// 获取监控数据
int parallelism = pool.getParallelism();        // 并行度
int poolSize = pool.getPoolSize();              // 当前线程数
int activeThreadCount = pool.getActiveThreadCount();  // 活跃线程数
int runningThreadCount = pool.getRunningThreadCount(); // 运行中线程数
long queuedSubmissionCount = pool.getQueuedSubmissionCount();  // 队列任务数
long queuedTaskCount = pool.getQueuedTaskCount();  // 总任务数
long stealCount = pool.getStealCount();         // 窃取次数(关键指标)

System.out.println("并行度:" + parallelism);
System.out.println("活跃线程:" + activeThreadCount);
System.out.println("窃取次数:" + stealCount);  // 窃取次数越多,负载均衡越好

六、性能对比与最佳实践

6.1 性能对比

// 测试:计算1亿个数的和
long[] array = new long[100_000_000];
for (int i = 0; i < array.length; i++) {
    array[i] = i;
}

// 方案1:单线程
long start = System.currentTimeMillis();
long sum = 0;
for (long num : array) {
    sum += num;
}
System.out.println("单线程耗时:" + (System.currentTimeMillis() - start) + "ms");
// 输出:约 100ms

// 方案2:ForkJoinPool
start = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
long result = pool.invoke(new SumTask(array, 0, array.length));
System.out.println("ForkJoinPool耗时:" + (System.currentTimeMillis() - start) + "ms");
// 输出:约 30ms(4核CPU)

// 方案3:Stream并行流(底层使用commonPool)
start = System.currentTimeMillis();
long streamSum = Arrays.stream(array).parallel().sum();
System.out.println("Stream并行流耗时:" + (System.currentTimeMillis() - start) + "ms");
// 输出:约 35ms

性能提升

  • 4核CPU:约3-4倍
  • 8核CPU:约6-7倍
  • 适合CPU密集型任务

6.2 最佳实践

  1. 选择合适的阈值

    // 阈值太小:任务分解开销大,性能下降
    private static final int THRESHOLD = 10;  // ❌ 太小
    
    // 阈值太大:并行度不够,性能提升有限
    private static final int THRESHOLD = 10_000_000;  // ❌ 太大
    
    // 推荐:根据任务特性调整,通常在 100-10000 之间
    private static final int THRESHOLD = 1000;  // ✅ 合适
    
  2. 避免阻塞操作

    // ❌ 不要在compute中执行阻塞操作
    protected Integer compute() {
        Thread.sleep(1000);  // 阻塞,浪费线程
        return result;
    }
    
    // ✅ ForkJoinPool适合CPU密集型任务
    protected Integer compute() {
        return heavyComputation();  // 纯计算
    }
    
  3. 合理使用commonPool

    // ✅ 简单场景:使用commonPool
    ForkJoinPool.commonPool().invoke(task);
    
    // ✅ 复杂场景:自定义ForkJoinPool
    ForkJoinPool customPool = new ForkJoinPool(8);
    customPool.invoke(task);
    
  4. 避免任务过度分解

    // ❌ 每次只分解一个元素
    if (end - start == 1) return array[start];
    
    // ✅ 达到阈值才停止分解
    if (end - start <= THRESHOLD) {
        return directCompute();
    }
    

七、核心要点总结

7.1 工作窃取算法

  • 双端队列:每个线程独立队列,减少竞争
  • 两端操作:本线程从队首取,其他线程从队尾窃取
  • 负载均衡:空闲线程自动窃取任务,提高CPU利用率

7.2 适用场景

场景是否适合说明
CPU密集型递归任务✅ 非常适合分治算法、树遍历、数学计算
大数据并行计算✅ 非常适合数组求和、排序、搜索
I/O密集型任务❌ 不适合阻塞操作会浪费线程
粗粒度任务❌ 不适合传统线程池更合适

7.3 关键API

API作用使用场景
fork()异步执行任务将任务放入队列
join()等待任务完成获取任务结果
compute()同步执行任务直接计算,不放入队列
invokeAll()批量执行任务自动优化fork/compute

总结

ForkJoinPool通过工作窃取算法解决了传统线程池在处理递归任务时的性能问题:

核心优势

  • 工作窃取:空闲线程主动窃取任务,提高负载均衡
  • 双端队列:减少线程竞争,提升并发性能
  • 递归优化:专为分治算法设计,避免死锁
  • 高性能:CPU密集型任务可获得接近线性的性能提升

实战要点

  1. 选择合适的阈值,避免过度分解
  2. 适用于CPU密集型递归任务
  3. 使用 invokeAll() 替代手动 fork() + join()
  4. 监控窃取次数,评估负载均衡效果

下一篇预告:我们将深入无锁编程与LongAdder,理解CAS如何实现高性能的原子操作!