java线程池之ForkJoinPool
作用
计算机中一个任务一般是由一个线程来处理的,如果此时出现了一个非常耗时的大任务,比如对一个大的ArrayList每个元素进行+1操作,如果是普通的ThreadPoolExecutor就会出现线程池中只有一个线程正在处理这个大任务而其他线程却空闲着,这会导致CPU负载不均衡,空闲的处理器无法帮助工作。ForkJoinPool就是用来解决这种问题的,将一个大任务拆分成多个小任务后,使用fork可以将小任务分发给其他线程同时处理,使用join可以将多个线程处理的结果进行汇总;这实际上就是分治思想的并行版本
Java8中的parallelStream API就是基于ForkJoinPool实现的
ForkJoinPool的线程使用的是 Thread子类ForkJoinWorkerThread
实例
我们在提交任务时,一般不会直接继承ForkJoinTask,只要继承它的子类即可,框架提供了两种子类:
- RecursiveAction:用于没有返回结果的任务(类似Runnable)
- RecursiveTask:用于有返回结果的任务(类似Callable)
public class ForJoinPollExecutor {
public static void main(String[] args) throws Exception {
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
CountTask sumTask1 = new CountTask(0, 4);
ForkJoinTask task1 = forkJoinPool.submit(sumTask1);
System.out.println(task1.get());
}
}
class CountTask extends RecursiveTask<Integer> {
private static final int THREADHOLD = 2;
private int start;
private int end;
/**
* @Description: 最小的一个任务单元,以及它要处理的范围
* @param:
* @return:
*/
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THREADHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
//继续将任务细分
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
//执行子任务
leftTask.fork();
int rightValue = rightTask.compute();
//合并结果
sum = leftTask.join() + rightValue;
}
return sum;
}
}
可以看出,提交的单个大任务拆分到多小是由自身的程序来控制的。它和一般的线程池类似,外部提交的任务被封装成ForkJoinTask类型;工作线程被封装成ForkJoinWorkerThread类型;存放任务的队列称为WorkQueue[],这个和一般的线程池不同,一般的线程池只有一个存放任务的阻塞队列,后面会说道这个数组的作用。
执行流程
- 执行ForkJoinPool的submit方法,入参为ForkJoinTask实现类
- 然后执行externalPush方法
- 执行externalSubmit方法
- 略 待后续补充
parallelStream
list.parallelStream().forEach
首个会以当前线程执行,其它的新开线程,parallel默认线程数是CPU核心数
可以通过System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","20");
修改默认线程数