关于Java 中 Future 的 get 方法超时问题

网友投稿 704 2022-07-23


目录一、背景二、模拟2.1 常见写法2.2 尝试取消2.2.1 cancel(false)2.2.2 cancel(true)三、回归源码四、总结

一、背景

很多 java 工程师在准备面试时,会刷很多八股文,线程和线程池这一块通常会准备线程的状态、线程的创建方式,Executors 里面的一些工厂方法和为什么不推荐使用这些工厂方法,ThreadPoolExecutor 构造方法的一些参数和执行过程等。

工作中,很多人会使用线程池的 submit 方法 获取 Future 类型的返回值,然后使用 java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 实现“最多等多久”的效果。

但很多人对此的理解只停留在表面上,稍微问深一点点可能就懵逼了。

比如,java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 超时之后,当前线程会怎样?线程池里执行对应任务的线程会有怎样的表现?

如果你对这个问题没有很大的把握,说明你掌握的还不够扎实。

最常见的理解就是,“超时以后,当前线程继续执行,线程池里的对应线程中断”,真的是这样吗?

二、模拟

2.1 常见写法

下面给出一个简单的模拟案例:

package basic.thread;

import java.util.concurrent.*;

public class FutureDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {

ExecutorService executorService = Executors.newFixedThreadPool(2);

Future> future = executorService.submit(() -> {

try {

demo();

} catch (InterruptedException e) {

throw new RuntimeException(e);

}

});

String threadName = Thread.currentThread().getName();

System.out.println(threadName + "获取的结果 -- start");

Object result = future.get(100, TimeUnit.MILLISECONDS);

System.out.println(threadName + "获取的结果 -- end :" + result);

}

private static String demo() throws InterruptedException {

String threadName = Thread.currentThread().getName();

System.out.println(threadName + ",执行 demo -- start");

TimeUnit.SECONDS.sleep(1);

System.out.println(threadName + ",执行 demo -- end");

return "test";

}

}

输出结果:

main获取的结果 -- startpool-1-thread-1,执行 demo -- startException in thread "main" java.util.concurrent.TimeoutException    at java.util.concurrent.FutureTask.get(FutureTask.java:205)    at basic.thread.FutureDemo.main(FutureDemo.java:20)pool-1-thread-1,执行 demo -- end

我们可以发现:当前线程会因为收到 TimeoutException 而被中断,线程池里对应的线程“却”继续执行完毕。

2.2 尝试取消

我们尝试对未完成的线程进行取消,发现 Future#cancel 有个 boolean 类型的参数。

/**

* Attempts to cancel execution of this task. This attempt will

* fail if the task has already completed, has already been cancelled,

* or could not be cancelled for some other reason. If successful,

* and this task has not started when {@code cancel} is called,

* this task should never run. If the task has already started,

* then the {@code mayInterruptIfRunning} parameter determines

* whether the thread executing this task should be interrupted in

* an attempt to stop the task.

*

*

After this method returns, subsequent calls to {@link #isDone} will

* always return {@code true}. Subsequent calls to {@link #isCancelled}

* will always return {@code true} if this method returned {@code true}.

*

* @param mayInterruptIfRunning {@code true} if the thread executing this

* task should be interrupted; otherwise, in-progress tasks are allowed

* to complete

* @return {@code false} if the task could not be cancelled,

* typically because it has already completed normally;

* {@code true} otherwise

*/

boolean cancel(boolean mayInterruptIfRunning);

看源码注释我们可以知道:

当设置为 true 时,正在执行的任务将被中断(interrupted);

当设置为 false 时,如果任务正在执行中,那么仍然允许任务执行完成。

2.2.1 cancel(false)

此时,为了不让主线程因为超时异常被中断,我们 try-catch 包起来。

package basic.thread;

import org.junit.platform.commons.util.ExceptionUtils;

import java.util.concurrent.*;

public class FutureDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {

ExecutorService executorService = Executors.newFixedThreadPool(2);

Future> future = executorService.submit(() -> {

try {

demo();

} catch (InterruptedException e) {

throw new RuntimeException(e);

}

});

String threadName = Thread.currentThread().getName();

System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start");

try {

Object result = future.get(100, TimeUnit.MILLISECONDS);

System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result);

} catch (Exception e) {

System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e));

}

future.cancel(false);

System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel");

}

private static String demo() throws InterruptedException {

String threadName = Thread.currentThread().getName();

System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start");

TimeUnit.SECONDS.sleep(1);

System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end");

return "test";

}

}

结果:

1653751759233,main获取的结果 -- start1653751759233,pool-1-thread-1,执行 demo -- start1653751759343,main获取的结果异常:java.util.concurrent.TimeoutException    at java.util.concurrent.FutureTask.get(FutureTask.java:205)    at basic.thread.FutureDemo.main(FutureDemo.java:23)

1653751759351,main获取的结果 -- cancel1653751760263,pool-1-thread-1,执行 demo -- end

我们发现,线程池里的对应线程在 cancel(false) 时,如果已经正在执行,则会继续执行完成。

2.2.2 cancel(true)

package basic.thread;

import org.junit.platform.commons.util.ExceptionUtils;

import java.util.concurrent.*;

public class FutureDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {

ExecutorService executorService = Executors.newFixedThreadPool(2);

Future> future = executorService.submit(() -> {

try {

demo();

} catch (InterruptedException e) {

System.out.println(System.currentTimeMillis() + "," + Thread.currentThread().getName() + ", Interrupted:" + ExceptionUtils.readStackTrace(e));

throw new RuntimeException(e);

}

});

String threadName = Thread.currentThread().getName();

System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start");

try {

Object result = future.get(100, TimeUnit.MILLISECONDS);

System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result);

} catch (Exception e) {

System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e));

}

fuhttp://ture.cancel(true);

System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel");

}

private static String demo() throws InterruptedException {

String threadName = Thread.currentThread().getName();

System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start");

TimeUnit.SECONDS.sleep(1);

System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end");

return "test";

}

}

执行结果:

1653752011246,main获取的结果 -- start1653752011246,pool-1-thread-1,执行 demo -- start1653752011347,main获取的结果异常:java.util.concurrent.TimeoutException    at java.util.concurrent.FutureTask.get(FutureTask.java:205)    at basic.thread.FutureDemo.main(FutureDemo.java:24)

1653752011363,pool-1-thread-1, Interrupted:java.lang.InterruptedException: sleep interrupted    at java.lang.Thread.sleep(Native Method)    at java.lang.Thread.sleep(Thread.java:340)    at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:386)    at basic.thread.FutureDemo.demo(FutureDemo.java:36)    at basic.thread.FutureDemo.lambda$main$0(FutureDemo.java:14)    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)    at java.util.concurrent.FutureTask.run(FutureTask.java:266)    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)    at java.lang.Thread.run(Thread.java:748)

1653752011366,main获取的结果 -- cancel

可以看出,此时,如果目标线程未执行完,那么会收到 InterruptedException ,被中断。

当然,如果此时不希望目标线程被中断,可以使用 try-catch 包住,再执行其他逻辑。

package basic.thread;

import org.junit.platform.commons.util.ExceptionUtils;

import java.util.concurrent.*;

public class FutureDemo {

public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {

ExecutorService executorService = Executors.newFixedThreadPool(2);

Future> future = executorService.submit(() -> {

demo();

});

String threadName = Thread.currentThread().getName();

System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- start");

try {

Object result = future.get(100, TimeUnit.MILLISECONDS);

System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- end :" + result);

} catch (Exception e) {

System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果异常:" + ExceptionUtils.readStackTrace(e));

}

future.cancel(true);

System.out.println(System.currentTimeMillis() + "," + threadName + "获取的结果 -- cancel");

}

private static String demo() {

String threadName = Thread.currentThread().getName();

System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- start");

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e) {

System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo 被中断,自动降级");

}

System.out.println(System.currentTimeMillis() + "," + threadName + ",执行 demo -- end");

return "test";

}

}

执行结果:

1653752219803,main获取的结果 -- start1653752219803,pool-1-thread-1,执行 demo -- start1653752219908,main获取的结果异常:java.util.concurrent.TimeoutException    at java.util.concurrent.FutureTask.get(FutureTask.java:205)    at basic.thread.FutureDemo.main(FutureDemo.java:19)

1653752219913,main获取的结果 -- cancel1653752219914,pool-1-thread-1,执行 demo 被中断,自动降级1653752219914,pool-1-thread-1,执行 demo -- end

三、回归源码

我们直接看 java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 的源码注释,就可以清楚地知道各种情况的表现:

/**

* Waits if necessary for at most the given time for the computation

* to complete, and then retrieves its result, if available.

*

* @param timeout the maximum time to waitEZLeTf

* @param unit the time unit of the timeout argument

* @return the computed result

* @throws CancellationException if the computation was cancelled

* @throws ExecutionException if the computation threw an

* exception

* @throws InterruptedException if the current thread was interrupted

* while waiting

* @throws TimeoutException if the wait timed out

*/

V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException;

我们还可以选取几个常见的实现类,查看下实现的基本思路:

java.util.concurrent.FutureTask#get(long, java.util.concurrent.TimeUnit)

public V get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException {

if (unit == null)

throw new NullPointerException();

int s = state;

if (s <= COMPLETING &&

(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)

throw new TimeoutException();

return report(s);

}

java.util.concurrent.CompletableFuture#get(long, java.util.concurrent.TimeUnit)

/**

* Waits if necessary for at most the given time for this future

* to complete, and then returns its result, if available.

*

* @param timeout the maximum time to wait

* @param unit the time unit of the timeout argument

* @return the result value

* @throws CancellationException if this future was cancelled

* @throws ExecutionException if this future completed exceptionally

* @throws InterruptedException if the current thread was interrupted

* while waiting

* @throws TimeoutException if the wait timed out

*/

public T get(long timeout, TimeUnit unit)

throws InterruptedException, ExecutionException, TimeoutException {

Object r;

long nanos = unit.toNanos(timeout);

return reportGet((r = result) == null ? timedGet(nanos) : r);

}

/**

* Returns raw result after waiting, or null if interrupted, or

* throws TimeoutException on timeout.

*/

private Object timedGet(long nanos) throws TimeoutException {

if (Thread.interrupted())

return null;

if (nanos <= 0L)

throw new TimeoutException();

long d = System.nanoTime() + nanos;

Signaller q = new Signaller(true, nanos, d == 0L ? 1L : d); // avoid 0

boolean queued = false;

Object r;

// We intentionally don't spin here (as waitingGet does) because

// the call to nanoTime() above acts much like a spin.

while ((r = result) == null) {

if (!queued)

queued = tryPushStack(q);

else if (q.interruptControl < 0 || q.nanos <= 0L) {

q.thread = null;

cleanStack();

if (q.interruptControl < 0)

return null;

throw new TimeoutException();

}

else if (q.thread != null && result == null) {

try {

ForkJoinPool.managedBlock(q);

} catch (InterruptedException ie) {

q.interruptControl = -1;

}

}

}

if (q.interruptControl < 0)

r = null;

q.thread = null;

postComplete();

return r;

}

java.util.concurrent.Future#cancel 也一样

/**

* Attempts to cancel execution of this task. This attempt will

* fail if the task has already completed, has already been cancelled,

* or could not be cancelled for some other reason. If successful,

* and this task has not started when {@code cancel} is called,

* this task should never run. If the task has already started,

* then the {@code mayInterruptIfRunning} parameter determines

* whether the thread executing this task should be interrupted in

* an attempt to stop the task.

*

*

After this method returns, subsequent calls to {@link #isDone} will

* always return {@code true}. Subsequent calls to {@link #isCancelled}

* will always return {@code true} if this method returned {@code true}.

*

* @param mayInterruptIfRunning {@code true} if the thread executing this

* task should be interrupted; otherwise, in-progress tasks are allowed

* to complete

* @return {@code false} if the task could not be cancelled,

* typically because it has already completed normally;

* {@code true} otherwise

*/

boolean cancel(boolean mayInterruptIfRunning);

java.util.concurrent.FutureTask#cancel

public boolean cancel(boolean mayInterruptIfRunning) {

if (!(state == NEW &&

UNSAFE.compareAndSwapInt(this, stateOffset, NEW,

mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))

return false;

try { // in case call to interrupt throws exception

if (mayInterruptIfRunning) {

try {

Thread t = runner;

if (t != null)

t.interrupt();

} finally { // final state

UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);

}

}

} finally {

finishCompletion();

}

return true;

}

可以看到 mayInterruptIfRunning 为 true 时,会执行 Thread#interrupt 方法

java.util.concurrent.CompletableFuture#cancel

/**

* If not already completed, completes this CompletableFuture with

* a {@link CancellationException}. Dependent CompletableFutures

* that have not already completed will also complete

* exceptionally, with a {@link CompletionException} caused by

* this {@code CancellationException}.

*

* @param mayInterruptIfRunning this value has no effect in this

* implementation because interrupts are not used to control

* processing.

*

* @return {@code true} if this task is now cancelled

*/

public boolean cancel(boolean mayInterruptIfRunning) {

boolean cancelled = (result == null) &&

internalComplete(new AltResult(new CancellationException()));

postComplete();

return cancelled || isCancelled();

}

通过注释我们也发现,不同的实现类对参数的“效果”也有差异。

四、总结

我们学习时不应该想当然,不能纸上谈兵,对于不太理解的地方,可以多看源码注释,多看源码,多写 DEMO 去模拟或调试。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java 线程池存在的意义
下一篇:Java日期时间类(Date、DateFormat、Calendar)解析
相关文章

 发表评论

暂时没有评论,来抢沙发吧~