随笔 - 100  文章 - 50  trackbacks - 0
<2016年12月>
27282930123
45678910
11121314151617
18192021222324
25262728293031
1234567

常用链接

留言簿(3)

随笔分类

随笔档案

文章分类

文章档案

收藏夹

我收藏的一些文章!

搜索

  •  

最新评论

阅读排行榜

评论排行榜

一、任务分解问题和ForkJoinPool简介

       在多线程并发编程中,有时候会遇到将大任务分解成小任务再并发执行的场景。Java 8新增的ForkJoinPool很好的支持了这个问题。

       ForkJoinPool是一种支持任务分解的线程池,当提交给他的任务“过大”,他就会按照预先定义的规则将大任务分解成小任务,多线程并发执行。

      一般要配合可分解任务接口ForkJoinTask来使用,ForkJoinTask有两个实现它的抽象类:RecursiveAction和RecursiveTask,其区别是前者没有返回值,后者有返回值。

          下面通过具体代码,来示范两个问题:(1)怎么定义可分解的任务类 (2)如何使用ForkJoinPool

package demo.thread.fork;


import java.util.Random;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.Future;

import java.util.concurrent.RecursiveTask;


public class ForkJoinPoolDemo {


public static void main(String[] args) throws InterruptedException, ExecutionException {

int arr[] = new int[100];

Random random = new Random();

int total = 0;

// 初始化100个数字元素

for (int i = 0; i < arr.length; i++) {

int temp = random.nextInt(100);

// 对数组元素赋值,并将数组元素的值添加到total总和中

total += (arr[i] = temp);

}

System.out.println("初始化时的总和=" + total);

// 创建包含Runtime.getRuntime().availableProcessors()返回值作为个数的并行线程的ForkJoinPool

ForkJoinPool forkJoinPool = new ForkJoinPool();

// 提交可分解的PrintTask任务

Future<Integer> future = forkJoinPool.submit(new SumTaskDemo(arr, 0,

arr.length));

System.out.println("计算出来的总和=" + future.get());

// 关闭线程池

forkJoinPool.shutdown();

}

}


// RecursiveTask为ForkJoinTask的抽象子类,有返回值的任务

class SumTaskDemo extends RecursiveTask<Integer> {


private static final long serialVersionUID = 4033241174438751063L;

// 每个"小任务"最多只打印20个数

private static final int MAX = 20;

private int arr[];

private int start;

private int end;


SumTaskDemo(int arr[], int start, int end) {

this.arr = arr;

this.start = start;

this.end = end;

}


@Override

protected Integer compute() {

int sum = 0;

// 当end-start的值小于MAX时候,开始打印

if ((end - start) <= MAX) {

for (int i = start; i < end; i++) {

sum += arr[i];

}

return sum;

} else {

System.err.println("=====任务分解======");

// 将大任务分解成两个小任务

int middle = (start + end) >>>1;

SumTaskDemo left = new SumTaskDemo(arr, start, middle);

SumTaskDemo right = new SumTaskDemo(arr, middle, end);

// 并行执行两个小任务

left.fork();

right.fork();

// 把两个小任务累加的结果合并起来

return left.join() + right.join();

}

}

}
例子二:

package demo.thread.fork;


import java.util.Random;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.RecursiveAction;

import java.util.concurrent.TimeUnit;


/**

 * @author lin

 *

 */

public class ForkJoinPoolDemo2 {

public static void main(String[] args) throws Exception {

// 创建一个支持分解任务的线程池ForkJoinPool

ForkJoinPool pool = new ForkJoinPool(4);

myTask task = new myTask(60);


pool.submit(task);

pool.awaitTermination(10, TimeUnit.SECONDS);// 等待20s,观察结果

pool.shutdown();

}


}


/**

 * 定义一个可分解的的任务类,继承了RecursiveAction抽象类 必须实现它的compute方法

 */

class myTask extends RecursiveAction {


private static final long serialVersionUID = 1L;

// 定义一个分解任务的阈值——50,即一个任务最多承担50个工作量

int THRESHOLD = 20;

// 任务量

int task_Num = 0;


myTask(int Num) {

this.task_Num = Num;

}


@Override

protected void compute() {

if (task_Num <= THRESHOLD) {

System.out.println(Thread.currentThread().getName() + "承担了"

+ task_Num + "份工作");

/*try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}*/

} else {

// 随机解成两个任务

Random m = new Random();

int x = m.nextInt(50);

myTask left = new myTask(x);

myTask right = new myTask(task_Num - x);


left.fork();

right.fork();

}

}

}






posted on 2016-12-21 22:37 fly 阅读(246) 评论(0)  编辑  收藏 所属分类: java学习