Flask接口签名sign原理与实例代码浅析
223
2023-05-11
Java7之forkjoin简介_动力节点Java学院整理
java7引入了Fork Join的概念,来更好的支持并行运算。顾名思义,Fork Join类似与流程语言的分支,合并的概念。也就是说Java7 SE原生支持了在一个主线程中开辟多个分支线程,并且根据分支线程的逻辑来等待(或者不等待)汇集,当然你也可以fork的某一个分支线程中再开辟Fork Join,这也就可以实现Fork Join的嵌套。
有两个核心类ForkJoinPool和ForkJoinTask。
ForkJoinPool实现了ExecutorService接口,起到线程池的作用。所以他的用法和Executor框架的使用时一样的,当然Fork Join本身就是Executor框架的扩展。ForkJoinPool有3个关键的方法,来启动线程,execute(…),invoke(…),submit(…)。具体描述如下:
ForkJoinTask是分支合并的执行任何,分支合并的业务逻辑使用者可以再继承了这个抽先类之后,在抽象方法exec()中实现。其中exec()的返回结果和ForkJoinPool的执行调用方(execute(…),invoke(…),submit(…)),共同决定着线程是否阻塞,具体请看下面的测试用例。 首先,用户需要创建一个自己的ForkJoinTask。代码如下: public class MyForkJoinTask extends ForkJoinTask { /** * */ private static final long serialVersionUID = 1L; private V value; private boolean success = false; @Override public V getRawResult() { return value; } @Override protected void setRawResult(V value) { this.value = value; } @Override protected boolean exec() { System.out.println("exec"); return this.success; } public boolean isSuccess() { return success; } public void setSuccess(boolean isSuccess) { this.success = isSuccess; } } 测试ForkJoinPool.invoke(…): @Test public void testForkJoinInvoke() throws InterruptedException, ExecutionException { ForkJoinPool forkJoinPool = new ForkJoinPool(); MyForkJoinTask task = new MyForkJoinTask(); task.setSuccess(true); task.setRawResult("test"); String invokeResult = forkJoinPool.invoke(task); assertEquals(invokeResult, "test"); } @Test public void testForkJoinInvoke2() throws InterruptedException, ExecutionException { final ForkJoinPool forkJoinPool = new ForkJoinPool(); final MyForkJoinTask task = new MyForkJoinTask(); new Thread(new Runnable() { public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { } task.complete("test"); } }).start(); // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...) String result = forkJoinPool.invoke(task); System.out.println(result); } @Test public void testForkJoinSubmit() throws InterruptedException, ExecutionException { final ForkJoinPool forkJoinPool = new ForkJoinPool(); final MyForkJoinTask task = new MyForkJoinTask(); task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞 ForkJoinTask result = forkJoinPool.submit(task); result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete } 测试ForkJoinPool.submit(…): @Test public void testForkJoinSubmit() throws InterruptedException, ExecutionException { final ForkJoinPool forkJoinPool = new ForkJoinPool(); final MyForkJoinTask task = new MyForkJoinTask(); task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞 ForkJoinTask result = forkJoinPool.submit(task); result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete } @Test public void testForkJoinSubmit2() throws InterruptedException, ExecutionException { final ForkJoinPool forkJoinPool = new ForkJoinPool(); final MyForkJoinTask task = new MyForkJoinTask(); forkJoinPool.submit(task); Thread.sleep(1000); } @Test public void testForkJoinSubmit3() throws InterruptedException, ExecutionException { final ForkJoinPool forkJoinPool = new ForkJoinPool(); final MyForkJoinTask task = new MyForkJoinTask(); new Thread(new Runnable() { public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { } task.complete("test"); } }).start(); ForkJoinTask result = forkJoinPool.submit(task); // exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...) result.get(); Thread.sleep(1000); } 测试ForkJoinPool.execute(…): @Test public void testForkJoinExecute() throws InterruptedException, ExecutionException { ForkJoinPool forkJoinPool = new ForkJoinPool(); MyForkJoinTask task = new MyForkJoinTask(); forkJoinPool.execute(task); // 异步执行,无视task.exec()返回值。 } 在实际情况中,很多时候我们都需要面对经典的“分治”问题。要解决这类问题,主要任务通常被分解为多个任务块(分解阶段),其后每一小块任务被独立并行计算。一旦计算任务完成,每一快的结果会被合并或者解决(解决阶段)。ForkJoinTask天然就是为了支持“分治”问题的。 分支/合并的完整过程如下: 下面列举一个分治算法的实例。 import java.util.Random; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class MaximumFinder extends RecursiveTask private static final int SEQUENTIAL_THRESHOLD = 5; private final int[]http:// data; private final int start; private final int end; public MaximumFinder(int[] data, int start, int end) { this.data = data; this.start = start; this.end = end; } public MaximumFinder(int[] data) { this(data, 0, data.length); } @Override protected Integer compute() { final int length = end - start; if (length < SEQUENTIAL_THRESHOLD) { return computeDirectly(); } final int split = length / 2; final MaximumFinder left = new MaximumFinder(data, start, start + split); left.fork(); final MaximumFinder right = new MaximumFinder(data, start + split, end); return Math.max(right.compute(), left.join()); } private Integer computeDirectly() { System.out.println(Thread.currentThread() + ' computing: ' + start + ' to ' + end); int max = Integer.MIN_VALUE; for (int i = start; i < end; i++) { if (data[i] > max) { max = data[i]; } } return max; } public static void main(String[] args) { // create a random data set final int[] data = new int[1000]; final Random random = new Random(); for (int i = 0; i < data.length; i++) { data[i] = random.nextInt(100); } // submit the task to the pool final ForkJoinPool pool = new ForkJoinPool(4); final MaximumFinder finder = new MaximumFinder(data); System.out.println(pool.invoke(finder)); } } 以上所示是给大家介绍的Java7之forkjoin简介_动力节点Java学院整理,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,会及时回复大家的,在此也非常感谢大家对我们网站的支持!
ForkJoinTask是分支合并的执行任何,分支合并的业务逻辑使用者可以再继承了这个抽先类之后,在抽象方法exec()中实现。其中exec()的返回结果和ForkJoinPool的执行调用方(execute(…),invoke(…),submit(…)),共同决定着线程是否阻塞,具体请看下面的测试用例。
首先,用户需要创建一个自己的ForkJoinTask。代码如下:
public class MyForkJoinTask extends ForkJoinTask {
/**
*
*/
private static final long serialVersionUID = 1L;
private V value;
private boolean success = false;
@Override
public V getRawResult() {
return value;
}
@Override
protected void setRawResult(V value) {
this.value = value;
}
@Override
protected boolean exec() {
System.out.println("exec");
return this.success;
}
public boolean isSuccess() {
return success;
}
public void setSuccess(boolean isSuccess) {
this.success = isSuccess;
}
}
测试ForkJoinPool.invoke(…):
@Test
public void testForkJoinInvoke() throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
MyForkJoinTask task = new MyForkJoinTask();
task.setSuccess(true);
task.setRawResult("test");
String invokeResult = forkJoinPool.invoke(task);
assertEquals(invokeResult, "test");
}
@Test
public void testForkJoinInvoke2() throws InterruptedException, ExecutionException {
final ForkJoinPool forkJoinPool = new ForkJoinPool();
final MyForkJoinTask task = new MyForkJoinTask();
new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
task.complete("test");
}
}).start();
// exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)
String result = forkJoinPool.invoke(task);
System.out.println(result);
}
@Test
public void testForkJoinSubmit() throws InterruptedException, ExecutionException {
final ForkJoinPool forkJoinPool = new ForkJoinPool();
final MyForkJoinTask task = new MyForkJoinTask();
task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞
ForkJoinTask result = forkJoinPool.submit(task);
result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete
}
测试ForkJoinPool.submit(…):
@Test
public void testForkJoinSubmit() throws InterruptedException, ExecutionException {
final ForkJoinPool forkJoinPool = new ForkJoinPool();
final MyForkJoinTask task = new MyForkJoinTask();
task.setSuccess(true); // 是否在此任务运行完毕后结束阻塞
ForkJoinTask result = forkJoinPool.submit(task);
result.get(); // 如果exec()返回值是false,在此处会阻塞,直到调用complete
}
@Test
public void testForkJoinSubmit2() throws InterruptedException, ExecutionException {
final ForkJoinPool forkJoinPool = new ForkJoinPool();
final MyForkJoinTask task = new MyForkJoinTask();
forkJoinPool.submit(task);
Thread.sleep(1000);
}
@Test
public void testForkJoinSubmit3() throws InterruptedException, ExecutionException {
final ForkJoinPool forkJoinPool = new ForkJoinPool();
final MyForkJoinTask task = new MyForkJoinTask();
new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
task.complete("test");
}
}).start();
ForkJoinTask result = forkJoinPool.submit(task);
// exec()返回值是false,此处阻塞,直到另一个线程调用了task.complete(...)
result.get();
Thread.sleep(1000);
}
测试ForkJoinPool.execute(…):
@Test
public void testForkJoinExecute() throws InterruptedException, ExecutionException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
MyForkJoinTask task = new MyForkJoinTask();
forkJoinPool.execute(task); // 异步执行,无视task.exec()返回值。
}
在实际情况中,很多时候我们都需要面对经典的“分治”问题。要解决这类问题,主要任务通常被分解为多个任务块(分解阶段),其后每一小块任务被独立并行计算。一旦计算任务完成,每一快的结果会被合并或者解决(解决阶段)。ForkJoinTask天然就是为了支持“分治”问题的。
分支/合并的完整过程如下:
下面列举一个分治算法的实例。
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class MaximumFinder extends RecursiveTask
private static final int SEQUENTIAL_THRESHOLD = 5;
private final int[]http:// data;
private final int start;
private final int end;
public MaximumFinder(int[] data, int start, int end) {
this.data = data;
this.start = start;
this.end = end;
}
public MaximumFinder(int[] data) {
this(data, 0, data.length);
}
@Override
protected Integer compute() {
final int length = end - start;
if (length < SEQUENTIAL_THRESHOLD) {
return computeDirectly();
}
final int split = length / 2;
final MaximumFinder left = new MaximumFinder(data, start, start + split);
left.fork();
final MaximumFinder right = new MaximumFinder(data, start + split, end);
return Math.max(right.compute(), left.join());
}
private Integer computeDirectly() {
System.out.println(Thread.currentThread() + ' computing: ' + start
+ ' to ' + end);
int max = Integer.MIN_VALUE;
for (int i = start; i < end; i++) {
if (data[i] > max) {
max = data[i];
}
}
return max;
}
public static void main(String[] args) {
// create a random data set
final int[] data = new int[1000];
final Random random = new Random();
for (int i = 0; i < data.length; i++) {
data[i] = random.nextInt(100);
}
// submit the task to the pool
final ForkJoinPool pool = new ForkJoinPool(4);
final MaximumFinder finder = new MaximumFinder(data);
System.out.println(pool.invoke(finder));
}
}
以上所示是给大家介绍的Java7之forkjoin简介_动力节点Java学院整理,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,会及时回复大家的,在此也非常感谢大家对我们网站的支持!
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~