Java7之forkjoin简介_动力节点Java学院整理

网友投稿 218 2023-05-11


Java7之forkjoin简介_动力节点Java学院整理

java7引入了Fork Join的概念,来更好的支持并行运算。顾名思义,Fork Join类似与流程语言的分支,合并的概念。也就是说Java7 SE原生支持了在一个主线程中开辟多个分支线程,并且根据分支线程的逻辑来等待(或者不等待)汇集,当然你也可以fork的某一个分支线程中再开辟Fork Join,这也就可以实现Fork Join的嵌套。

有两个核心类ForkJoinPool和ForkJoinTask。

ForkJoinPool实现了ExecutorService接口,起到线程池的作用。所以他的用法和Executor框架的使用时一样的,当然Fork Join本身就是Executor框架的扩展。ForkJoinPool有3个关键的方法,来启动线程,execute(…),invoke(…),submit(…)。具体描述如下:

ForkJoinTask是分支合并的执行任何,分支合并的业务逻辑使用者可以再继承了这个抽先类之后,在抽象方法exec()中实现。其中exec()的返回结果和ForkJoinPool的执行调用方(execute(…),invoke(…),submit(…)),共同决定着线程是否阻塞,具体请看下面的测试用例。

首先,用户需要创建一个自己的ForkJoinTask。代码如下:

public class MyForkJoinTask extends ForkJoinTask {

/**

*

*/

private static final long serialVersionUID = 1L;

private V value;

private boolean success = false;

@Override

public V getRawResult() {

return value;

}

@Override

protected void setRawResult(V value) {

this.value = value;

}

@Override

protected boolean exec() {

System.out.println("exec");

return this.success;

}

public boolean isSuccess() {

return success;

}

public void setSuccess(boolean isSuccess) {

this.success = isSuccess;

}

}

测试ForkJoinPool.invoke(…):

@Test

public void testForkJoinInvoke() throws InterruptedException, ExecutionException {

ForkJoinPool forkJoinPool = new ForkJoinPool();

MyForkJoinTask task = new MyForkJoinTask();

task.setSuccess(true);

task.setRawResult("test");

String invokeResult = forkJoinPool.invoke(task);

assertEquals(invokeResult, "test");

}

@Test

public void testForkJoinInvoke2() throws InterruptedException, ExecutionException {

final ForkJoinPool forkJoinPool = new ForkJoinPool();

final MyForkJoinTask task = new MyForkJoinTask();

new Thread(new Runnable() {

public void run() {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

}

task.complete("test");

}

}).start();

// exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)

String result = forkJoinPool.invoke(task);

System.out.println(result);

}

@Test

public void testForkJoinSubmit() throws InterruptedException, ExecutionException {

final ForkJoinPool forkJoinPool = new ForkJoinPool();

final MyForkJoinTask task = new MyForkJoinTask();

task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞

ForkJoinTask result = forkJoinPool.submit(task);

result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete

}

测试ForkJoinPool.submit(…):

@Test

public void testForkJoinSubmit() throws InterruptedException, ExecutionException {

final ForkJoinPool forkJoinPool = new ForkJoinPool();

final MyForkJoinTask task = new MyForkJoinTask();

task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞

ForkJoinTask result = forkJoinPool.submit(task);

result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete

}

@Test

public void testForkJoinSubmit2() throws InterruptedException, ExecutionException {

final ForkJoinPool forkJoinPool = new ForkJoinPool();

final MyForkJoinTask task = new MyForkJoinTask();

forkJoinPool.submit(task);

Thread.sleep(1000);

}

@Test

public void testForkJoinSubmit3() throws InterruptedException, ExecutionException {

final ForkJoinPool forkJoinPool = new ForkJoinPool();

final MyForkJoinTask task = new MyForkJoinTask();

new Thread(new Runnable() {

public void run() {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

}

task.complete("test");

}

}).start();

ForkJoinTask result = forkJoinPool.submit(task);

// exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)

result.get();

Thread.sleep(1000);

}

测试ForkJoinPool.execute(…):

@Test

public void testForkJoinExecute() throws InterruptedException, ExecutionException {

ForkJoinPool forkJoinPool = new ForkJoinPool();

MyForkJoinTask task = new MyForkJoinTask();

forkJoinPool.execute(task); // 异步执行,无视task.exec()返回值。

}

在实际情况中,很多时候我们都需要面对经典的“分治”问题。要解决这类问题,主要任务通常被分解为多个任务块(分解阶段),其后每一小块任务被独立并行计算。一旦计算任务完成,每一快的结果会被合并或者解决(解决阶段)。ForkJoinTask天然就是为了支持“分治”问题的。

分支/合并的完整过程如下:

下面列举一个分治算法的实例。

import java.util.Random;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.RecursiveTask;

public class MaximumFinder extends RecursiveTask {

private static final int SEQUENTIAL_THRESHOLD = 5;

private final int[]http:// data;

private final int start;

private final int end;

public MaximumFinder(int[] data, int start, int end) {

this.data = data;

this.start = start;

this.end = end;

}

public MaximumFinder(int[] data) {

this(data, 0, data.length);

}

@Override

protected Integer compute() {

final int length = end - start;

if (length < SEQUENTIAL_THRESHOLD) {

return computeDirectly();

}

final int split = length / 2;

final MaximumFinder left = new MaximumFinder(data, start, start + split);

left.fork();

final MaximumFinder right = new MaximumFinder(data, start + split, end);

return Math.max(right.compute(), left.join());

}

private Integer computeDirectly() {

System.out.println(Thread.currentThread() + ' computing: ' + start

+ ' to ' + end);

int max = Integer.MIN_VALUE;

for (int i = start; i < end; i++) {

if (data[i] > max) {

max = data[i];

}

}

return max;

}

public static void main(String[] args) {

// create a random data set

final int[] data = new int[1000];

final Random random = new Random();

for (int i = 0; i < data.length; i++) {

data[i] = random.nextInt(100);

}

// submit the task to the pool

final ForkJoinPool pool = new ForkJoinPool(4);

final MaximumFinder finder = new MaximumFinder(data);

System.out.println(pool.invoke(finder));

}

}

以上所示是给大家介绍的Java7之forkjoin简介_动力节点Java学院整理,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,会及时回复大家的,在此也非常感谢大家对我们网站的支持!

ForkJoinTask是分支合并的执行任何,分支合并的业务逻辑使用者可以再继承了这个抽先类之后,在抽象方法exec()中实现。其中exec()的返回结果和ForkJoinPool的执行调用方(execute(…),invoke(…),submit(…)),共同决定着线程是否阻塞,具体请看下面的测试用例。

首先,用户需要创建一个自己的ForkJoinTask。代码如下:

public class MyForkJoinTask extends ForkJoinTask {

/**

*

*/

private static final long serialVersionUID = 1L;

private V value;

private boolean success = false;

@Override

public V getRawResult() {

return value;

}

@Override

protected void setRawResult(V value) {

this.value = value;

}

@Override

protected boolean exec() {

System.out.println("exec");

return this.success;

}

public boolean isSuccess() {

return success;

}

public void setSuccess(boolean isSuccess) {

this.success = isSuccess;

}

}

测试ForkJoinPool.invoke(…):

@Test

public void testForkJoinInvoke() throws InterruptedException, ExecutionException {

ForkJoinPool forkJoinPool = new ForkJoinPool();

MyForkJoinTask task = new MyForkJoinTask();

task.setSuccess(true);

task.setRawResult("test");

String invokeResult = forkJoinPool.invoke(task);

assertEquals(invokeResult, "test");

}

@Test

public void testForkJoinInvoke2() throws InterruptedException, ExecutionException {

final ForkJoinPool forkJoinPool = new ForkJoinPool();

final MyForkJoinTask task = new MyForkJoinTask();

new Thread(new Runnable() {

public void run() {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

}

task.complete("test");

}

}).start();

// exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)

String result = forkJoinPool.invoke(task);

System.out.println(result);

}

@Test

public void testForkJoinSubmit() throws InterruptedException, ExecutionException {

final ForkJoinPool forkJoinPool = new ForkJoinPool();

final MyForkJoinTask task = new MyForkJoinTask();

task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞

ForkJoinTask result = forkJoinPool.submit(task);

result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete

}

测试ForkJoinPool.submit(…):

@Test

public void testForkJoinSubmit() throws InterruptedException, ExecutionException {

final ForkJoinPool forkJoinPool = new ForkJoinPool();

final MyForkJoinTask task = new MyForkJoinTask();

task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞

ForkJoinTask result = forkJoinPool.submit(task);

result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete

}

@Test

public void testForkJoinSubmit2() throws InterruptedException, ExecutionException {

final ForkJoinPool forkJoinPool = new ForkJoinPool();

final MyForkJoinTask task = new MyForkJoinTask();

forkJoinPool.submit(task);

Thread.sleep(1000);

}

@Test

public void testForkJoinSubmit3() throws InterruptedException, ExecutionException {

final ForkJoinPool forkJoinPool = new ForkJoinPool();

final MyForkJoinTask task = new MyForkJoinTask();

new Thread(new Runnable() {

public void run() {

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

}

task.complete("test");

}

}).start();

ForkJoinTask result = forkJoinPool.submit(task);

// exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)

result.get();

Thread.sleep(1000);

}

测试ForkJoinPool.execute(…):

@Test

public void testForkJoinExecute() throws InterruptedException, ExecutionException {

ForkJoinPool forkJoinPool = new ForkJoinPool();

MyForkJoinTask task = new MyForkJoinTask();

forkJoinPool.execute(task); // 异步执行,无视task.exec()返回值。

}

在实际情况中,很多时候我们都需要面对经典的“分治”问题。要解决这类问题,主要任务通常被分解为多个任务块(分解阶段),其后每一小块任务被独立并行计算。一旦计算任务完成,每一快的结果会被合并或者解决(解决阶段)。ForkJoinTask天然就是为了支持“分治”问题的。

分支/合并的完整过程如下:

下面列举一个分治算法的实例。

import java.util.Random;

import java.util.concurrent.ForkJoinPool;

import java.util.concurrent.RecursiveTask;

public class MaximumFinder extends RecursiveTask {

private static final int SEQUENTIAL_THRESHOLD = 5;

private final int[]http:// data;

private final int start;

private final int end;

public MaximumFinder(int[] data, int start, int end) {

this.data = data;

this.start = start;

this.end = end;

}

public MaximumFinder(int[] data) {

this(data, 0, data.length);

}

@Override

protected Integer compute() {

final int length = end - start;

if (length < SEQUENTIAL_THRESHOLD) {

return computeDirectly();

}

final int split = length / 2;

final MaximumFinder left = new MaximumFinder(data, start, start + split);

left.fork();

final MaximumFinder right = new MaximumFinder(data, start + split, end);

return Math.max(right.compute(), left.join());

}

private Integer computeDirectly() {

System.out.println(Thread.currentThread() + ' computing: ' + start

+ ' to ' + end);

int max = Integer.MIN_VALUE;

for (int i = start; i < end; i++) {

if (data[i] > max) {

max = data[i];

}

}

return max;

}

public static void main(String[] args) {

// create a random data set

final int[] data = new int[1000];

final Random random = new Random();

for (int i = 0; i < data.length; i++) {

data[i] = random.nextInt(100);

}

// submit the task to the pool

final ForkJoinPool pool = new ForkJoinPool(4);

final MaximumFinder finder = new MaximumFinder(data);

System.out.println(pool.invoke(finder));

}

}

以上所示是给大家介绍的Java7之forkjoin简介_动力节点Java学院整理,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,会及时回复大家的,在此也非常感谢大家对我们网站的支持!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:使用Spring的注解方式实现AOP实例
下一篇:详解angular中的作用域及继承
相关文章

 发表评论

暂时没有评论,来抢沙发吧~