14、ForkJoin
ForkJoin
框架是Java中用于并行执行任务的框架,特别适合处理可以分解为多个子任务的复杂计算。它基于“分而治之”的思想,将一个大任务分解为多个小任务,这些小任务可以并行执行,最后将结果合并。
ForkJoin框架的核心组件
-
ForkJoinPool
:线程池,用于管理线程和任务的执行。 -
RecursiveTask
:有返回值的任务。 -
RecursiveAction
:无返回值的任务。
工作原理
-
任务分解:将大任务分解为多个小任务。
-
并行执行:小任务在
ForkJoinPool
中并行执行。 -
任务合并:将小任务的结果合并为最终结果。
-
工作窃取:线程池中的线程如果完成自己的任务,可以“窃取”其他线程的任务来执行,提高资源利用率。
代码示例:计算斐波那契数列
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;public class FibonacciExample {// 定义一个阈值,当任务大小小于这个值时,直接计算而不是进一步分解private static final int THRESHOLD = 10;public static void main(String[] args) {// 创建ForkJoinPoolForkJoinPool forkJoinPool = new ForkJoinPool();// 提交任务FibonacciTask task = new FibonacciTask(30);int result = forkJoinPool.invoke(task);// 输出结果System.out.println("Fibonacci(30) = " + result);}// 定义RecursiveTaskstatic class FibonacciTask extends RecursiveTask<Integer> {private final int n;public FibonacciTask(int n) {this.n = n;}@Overrideprotected Integer compute() {// 如果任务大小小于阈值,直接计算if (n <= THRESHOLD) {return computeFibonacci(n);} else {// 分解任务FibonacciTask leftTask = new FibonacciTask(n - 1);FibonacciTask rightTask = new FibonacciTask(n - 2);// 异步执行一个子任务leftTask.fork();// 执行另一个子任务int rightResult = rightTask.compute();// 等待第一个子任务完成并获取结果int leftResult = leftTask.join();// 合并结果return leftResult + rightResult;}}// 计算斐波那契数列的值private int computeFibonacci(int n) {if (n <= 1) {return n;}return computeFibonacci(n - 1) + computeFibonacci(n - 2);}}
}
代码解析
-
ForkJoinPool
:-
ForkJoinPool
是ForkJoin
框架的核心,用于管理线程和任务的执行。在上面的代码中,我们创建了一个ForkJoinPool
实例。 -
使用
invoke
方法提交任务并获取结果。
-
-
RecursiveTask
:-
RecursiveTask
是一个有返回值的任务类。在上面的代码中,我们定义了一个FibonacciTask
类,继承自RecursiveTask<Integer>
。 -
在
compute
方法中,我们实现了任务的分解和结果的合并。
-
-
任务分解:
-
如果任务大小小于阈值(
THRESHOLD
),直接计算结果。 -
如果任务大小大于阈值,将任务分解为两个子任务:
F(n - 1)
和F(n - 2)
。 -
使用
fork
方法异步执行一个子任务,使用compute
方法同步执行另一个子任务。 -
使用
join
方法等待异步执行的子任务完成并获取结果。
-
-
结果合并:
-
将两个子任务的结果相加,得到最终结果。
-
下面详细阐述ForkJoin
. ForkJoinPool
类
ForkJoinPool
是ForkJoin
框架的核心,用于管理线程和任务的执行。
构造方法
-
ForkJoinPool()
:创建一个默认的ForkJoinPool
,其线程数量等于可用的处理器数量。
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinPool(int parallelism)
:创建一个具有指定并行级别的ForkJoinPool
。
ForkJoinPool forkJoinPool = new ForkJoinPool(4); // 指定并行级别为4
常用方法
invoke(ForkJoinTask<?> task)
:执行给定的任务,等待其完成,并返回结果。
int result = forkJoinPool.invoke(new FibonacciTask(30));
execute(ForkJoinTask<?> task)
:异步执行给定的任务,不等待其完成。
forkJoinPool.execute(new FibonacciTask(30));
submit(ForkJoinTask<?> task)
:异步执行给定的任务,并返回一个ForkJoinTask
对象,可以用来获取结果。
ForkJoinTask<Integer> task = forkJoinPool.submit(new FibonacciTask(30));
int result = task.get(); // 获取结果
shutdown()
:关闭线程池,不再接受新任务,但会等待已提交的任务完成。
forkJoinPool.shutdown();
shutdownNow()
:立即关闭线程池,尝试取消所有未完成的任务。
forkJoinPool.shutdownNow();
RecursiveTask
类
构造方法
RecursiveTask()
:无参构造方法,通常在子类中使用。
public class FibonacciTask extends RecursiveTask<Integer> {private final int n;public FibonacciTask(int n) {this.n = n;}
}
常用方法
compute()
:任务的主体,用于执行任务的逻辑。可以在这个方法中分解任务并调用fork
和join
方法。
@Override
protected Integer compute() {if (n <= THRESHOLD) {return computeFibonacci(n);} else {FibonacciTask leftTask = new FibonacciTask(n - 1);FibonacciTask rightTask = new FibonacciTask(n - 2);leftTask.fork();int rightResult = rightTask.compute();int leftResult = leftTask.join();return leftResult + rightResult;}
}
fork()
:异步执行当前任务,返回一个ForkJoinTask
对象。
leftTask.fork();
join()
:等待任务完成并获取结果。如果任务已经完成,直接返回结果。
RecursiveAction
类
RecursiveAction
是一个无返回值的任务类,用于表示可以分解为多个子任务的任务。
构造方法
RecursiveAction()
:无参构造方法,通常在子类中使用。
public class SortTask extends RecursiveAction {private final int[] array;private final int start;private final int end;public SortTask(int[] array, int start, int end) {this.array = array;this.start = start;this.end = end;}
}
常用方法
compute()
:任务的主体,用于执行任务的逻辑。可以在这个方法中分解任务并调用fork
和join
方法。
@Override
protected void compute() {if (end - start < THRESHOLD) {Arrays.sort(array, start, end);} else {int middle = (start + end) / 2;SortTask leftTask = new SortTask(array, start, middle);SortTask rightTask = new SortTask(array, middle, end);leftTask.fork();rightTask.compute();leftTask.join();}
}
fork()
:异步执行当前任务。
leftTask.fork();
join()
:等待任务完成。由于RecursiveAction
没有返回值,join
方法不返回任何结果。
leftTask.join();
总结
ForkJoin
框架提供了多种方法来支持任务的分解、执行和结果合并。通过合理使用这些方法,可以高效地处理复杂的并行计算任务。以下是一些关键点:
-
ForkJoinPool
:用于管理线程和任务的执行。 -
RecursiveTask
:有返回值的任务类,适用于需要返回结果的任务。 -
RecursiveAction
:无返回值的任务类,适用于不需要返回结果的任务。 -
compute()
:任务的主体,用于实现任务的逻辑。 -
fork()
:异步执行任务。 -
join()
:等待任务完成并获取结果(对于RecursiveTask
)或等待任务完成(对于RecursiveAction
)。