一、为什么需要ForkJoinPool?
1.1 传统线程池的局限
// 传统线程池处理递归任务
ExecutorService executor = Executors.newFixedThreadPool(4);
// 计算斐波那契数列(递归)
public int fib(int n) {
if (n <= 1) return n;
// 提交子任务
Future<Integer> f1 = executor.submit(() -> fib(n - 1));
Future<Integer> f2 = executor.submit(() -> fib(n - 2));
// 等待子任务完成
return f1.get() + f2.get(); // ❌ 可能导致线程池死锁!
}
问题:
- 线程池死锁:父任务等待子任务,但子任务在队列中等待线程,形成死锁
- 负载不均:某些线程任务多,某些线程空闲,资源利用率低
- 任务粒度:传统线程池适合粗粒度任务,不适合细粒度递归任务
1.2 ForkJoinPool的解决方案
// ForkJoinPool 专为递归任务设计
ForkJoinPool pool = new ForkJoinPool();
class FibTask extends RecursiveTask<Integer> {
private int n;
protected Integer compute() {
if (n <= 1) return n;
// 分解子任务
FibTask f1 = new FibTask(n - 1);
FibTask f2 = new FibTask(n - 2);
// 异步执行左子任务
f1.fork();
// 同步执行右子任务,并等待左子任务
return f2.compute() + f1.join(); // ✅ 不会死锁
}
}
int result = pool.invoke(new FibTask(10));
核心优势:
- 工作窃取:空闲线程主动"窃取"其他线程的任务,提高负载均衡
- 双端队列:每个线程有独立的任务队列,减少竞争
- 递归优化:专为分治算法设计,避免死锁
二、工作窃取算法原理
2.1 核心思想
线程1队列: [Task1] [Task2] [Task3] [Task4] ← 线程1从队首取任务
线程2队列: [Task5] [Task6] ← 线程2从队首取任务
线程3队列: [] ← 线程3空闲
工作窃取发生:
线程3发现自己队列为空,于是:
1. 查找其他线程的队列
2. 从线程1的队尾"窃取" Task4
3. 线程3继续工作,避免空闲
线程1队列: [Task1] [Task2] [Task3] ← Task4被窃取
线程2队列: [Task5] [Task6]
线程3队列: [Task4] ← 窃取到任务
关键机制:
- 每个线程维护一个双端队列(Deque)
- 本线程从队首取任务(LIFO,栈特性)
- 其他线程从队尾窃取任务(FIFO,队列特性)
- 减少竞争:两端操作,冲突概率低
2.2 为什么这样设计?
// 递归任务的执行顺序
class Task extends RecursiveTask<Integer> {
protected Integer compute() {
Task left = new Task();
Task right = new Task();
left.fork(); // 左子任务放入队列头部
right.fork(); // 右子任务放入队列头部
return left.join() + right.join();
}
}
队列状态:
队列: [right] [left] [parent]
↑头部 ↑尾部
本线程从头部取:优先处理最新的子任务(深度优先,缓存友好)
其他线程从尾部窃取:窃取较早的任务(避免竞争)
优势:
- 缓存友好:深度优先,相关数据在缓存中
- 减少窃取:子任务优先被创建它的线程执行
- 负载均衡:空闲线程自动窃取任务
三、核心API使用
3.1 RecursiveTask vs RecursiveAction
// RecursiveTask:有返回值
class SumTask extends RecursiveTask<Long> {
private long[] array;
private int start, end;
private static final int THRESHOLD = 100; // 阈值
@Override
protected Long compute() {
// 任务足够小,直接计算
if (end - start <= THRESHOLD) {
long sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
}
// 任务太大,分解
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
// 异步执行左任务
leftTask.fork();
// 同步执行右任务,并等待左任务
long rightResult = rightTask.compute();
long leftResult = leftTask.join();
return leftResult + rightResult;
}
}
// 使用
ForkJoinPool pool = new ForkJoinPool();
long result = pool.invoke(new SumTask(array, 0, array.length));
// RecursiveAction:无返回值
class PrintTask extends RecursiveAction {
private int start, end;
private static final int THRESHOLD = 100;
@Override
protected void compute() {
if (end - start <= THRESHOLD) {
for (int i = start; i < end; i++) {
System.out.println(i);
}
} else {
int mid = (start + end) / 2;
invokeAll(
new PrintTask(start, mid),
new PrintTask(mid, end)
);
}
}
}
3.2 fork() 和 join() 的最佳实践
// ❌ 错误用法
class BadTask extends RecursiveTask<Integer> {
protected Integer compute() {
Task left = new Task();
Task right = new Task();
left.fork();
right.fork();
return left.join() + right.join(); // 两次fork,浪费线程
}
}
// ✅ 推荐用法
class GoodTask extends RecursiveTask<Integer> {
protected Integer compute() {
Task left = new Task();
Task right = new Task();
left.fork(); // 只fork一个
int rightResult = right.compute(); // 另一个直接计算
int leftResult = left.join();
return leftResult + rightResult;
}
}
// ✅ 或使用 invokeAll
class BetterTask extends RecursiveTask<Integer> {
protected Integer compute() {
Task left = new Task();
Task right = new Task();
invokeAll(left, right); // 自动优化
return left.join() + right.join();
}
}
原因:
fork()会将任务放入队列,消耗线程资源compute()直接在当前线程执行,节省一次调度invokeAll()会自动优化,只fork n-1个任务
四、实战案例
4.1 并行归并排序
class MergeSortTask extends RecursiveAction {
private int[] array;
private int left, right;
private int[] temp;
private static final int THRESHOLD = 100;
@Override
protected void compute() {
// 任务足够小,直接排序
if (right - left <= THRESHOLD) {
Arrays.sort(array, left, right);
return;
}
// 分解任务
int mid = (left + right) / 2;
MergeSortTask leftTask = new MergeSortTask(array, left, mid, temp);
MergeSortTask rightTask = new MergeSortTask(array, mid, right, temp);
// 并行执行
invokeAll(leftTask, rightTask);
// 合并结果
merge(array, left, mid, right, temp);
}
private void merge(int[] array, int left, int mid, int right, int[] temp) {
// 标准归并逻辑
System.arraycopy(array, left, temp, left, right - left);
int i = left, j = mid, k = left;
while (i < mid && j < right) {
if (temp[i] <= temp[j]) {
array[k++] = temp[i++];
} else {
array[k++] = temp[j++];
}
}
while (i < mid) array[k++] = temp[i++];
while (j < right) array[k++] = temp[j++];
}
}
// 使用
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(new MergeSortTask(array, 0, array.length, new int[array.length]));
4.2 并行遍历文件树
class FileSearchTask extends RecursiveTask<List<File>> {
private File directory;
private String keyword;
@Override
protected List<File> compute() {
List<File> result = new ArrayList<>();
File[] files = directory.listFiles();
if (files == null) return result;
List<FileSearchTask> subTasks = new ArrayList<>();
for (File file : files) {
if (file.isDirectory()) {
// 子目录:创建子任务
FileSearchTask subTask = new FileSearchTask(file, keyword);
subTask.fork();
subTasks.add(subTask);
} else {
// 文件:检查是否匹配
if (file.getName().contains(keyword)) {
result.add(file);
}
}
}
// 收集子任务结果
for (FileSearchTask subTask : subTasks) {
result.addAll(subTask.join());
}
return result;
}
}
// 使用
ForkJoinPool pool = new ForkJoinPool();
List<File> files = pool.invoke(new FileSearchTask(new File("/path"), "keyword"));
五、ForkJoinPool配置与监控
5.1 创建ForkJoinPool
// 1. 使用默认配置(推荐)
ForkJoinPool pool = new ForkJoinPool();
// 2. 指定并行度
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
// 3. 使用commonPool(全局共享)
ForkJoinPool.commonPool().invoke(task);
// 4. 完全自定义
ForkJoinPool pool = new ForkJoinPool(
4, // 并行度
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, // 异常处理器
false // 异步模式
);
5.2 监控指标
ForkJoinPool pool = new ForkJoinPool();
// 获取监控数据
int parallelism = pool.getParallelism(); // 并行度
int poolSize = pool.getPoolSize(); // 当前线程数
int activeThreadCount = pool.getActiveThreadCount(); // 活跃线程数
int runningThreadCount = pool.getRunningThreadCount(); // 运行中线程数
long queuedSubmissionCount = pool.getQueuedSubmissionCount(); // 队列任务数
long queuedTaskCount = pool.getQueuedTaskCount(); // 总任务数
long stealCount = pool.getStealCount(); // 窃取次数(关键指标)
System.out.println("并行度:" + parallelism);
System.out.println("活跃线程:" + activeThreadCount);
System.out.println("窃取次数:" + stealCount); // 窃取次数越多,负载均衡越好
六、性能对比与最佳实践
6.1 性能对比
// 测试:计算1亿个数的和
long[] array = new long[100_000_000];
for (int i = 0; i < array.length; i++) {
array[i] = i;
}
// 方案1:单线程
long start = System.currentTimeMillis();
long sum = 0;
for (long num : array) {
sum += num;
}
System.out.println("单线程耗时:" + (System.currentTimeMillis() - start) + "ms");
// 输出:约 100ms
// 方案2:ForkJoinPool
start = System.currentTimeMillis();
ForkJoinPool pool = new ForkJoinPool();
long result = pool.invoke(new SumTask(array, 0, array.length));
System.out.println("ForkJoinPool耗时:" + (System.currentTimeMillis() - start) + "ms");
// 输出:约 30ms(4核CPU)
// 方案3:Stream并行流(底层使用commonPool)
start = System.currentTimeMillis();
long streamSum = Arrays.stream(array).parallel().sum();
System.out.println("Stream并行流耗时:" + (System.currentTimeMillis() - start) + "ms");
// 输出:约 35ms
性能提升:
- 4核CPU:约3-4倍
- 8核CPU:约6-7倍
- 适合CPU密集型任务
6.2 最佳实践
选择合适的阈值
// 阈值太小:任务分解开销大,性能下降 private static final int THRESHOLD = 10; // ❌ 太小 // 阈值太大:并行度不够,性能提升有限 private static final int THRESHOLD = 10_000_000; // ❌ 太大 // 推荐:根据任务特性调整,通常在 100-10000 之间 private static final int THRESHOLD = 1000; // ✅ 合适避免阻塞操作
// ❌ 不要在compute中执行阻塞操作 protected Integer compute() { Thread.sleep(1000); // 阻塞,浪费线程 return result; } // ✅ ForkJoinPool适合CPU密集型任务 protected Integer compute() { return heavyComputation(); // 纯计算 }合理使用commonPool
// ✅ 简单场景:使用commonPool ForkJoinPool.commonPool().invoke(task); // ✅ 复杂场景:自定义ForkJoinPool ForkJoinPool customPool = new ForkJoinPool(8); customPool.invoke(task);避免任务过度分解
// ❌ 每次只分解一个元素 if (end - start == 1) return array[start]; // ✅ 达到阈值才停止分解 if (end - start <= THRESHOLD) { return directCompute(); }
七、核心要点总结
7.1 工作窃取算法
- 双端队列:每个线程独立队列,减少竞争
- 两端操作:本线程从队首取,其他线程从队尾窃取
- 负载均衡:空闲线程自动窃取任务,提高CPU利用率
7.2 适用场景
| 场景 | 是否适合 | 说明 |
|---|---|---|
| CPU密集型递归任务 | ✅ 非常适合 | 分治算法、树遍历、数学计算 |
| 大数据并行计算 | ✅ 非常适合 | 数组求和、排序、搜索 |
| I/O密集型任务 | ❌ 不适合 | 阻塞操作会浪费线程 |
| 粗粒度任务 | ❌ 不适合 | 传统线程池更合适 |
7.3 关键API
| API | 作用 | 使用场景 |
|---|---|---|
fork() | 异步执行任务 | 将任务放入队列 |
join() | 等待任务完成 | 获取任务结果 |
compute() | 同步执行任务 | 直接计算,不放入队列 |
invokeAll() | 批量执行任务 | 自动优化fork/compute |
总结
ForkJoinPool通过工作窃取算法解决了传统线程池在处理递归任务时的性能问题:
核心优势:
- ✅ 工作窃取:空闲线程主动窃取任务,提高负载均衡
- ✅ 双端队列:减少线程竞争,提升并发性能
- ✅ 递归优化:专为分治算法设计,避免死锁
- ✅ 高性能:CPU密集型任务可获得接近线性的性能提升
实战要点:
- 选择合适的阈值,避免过度分解
- 适用于CPU密集型递归任务
- 使用
invokeAll()替代手动fork() + join() - 监控窃取次数,评估负载均衡效果
下一篇预告:我们将深入无锁编程与LongAdder,理解CAS如何实现高性能的原子操作!