java 单机接口限流处理方案
257
2022-06-06
介绍
随着当今处理器中可用的核心数量的增加, 随着对实现更高吞吐量的需求的不断增长,多线程 API 变得非常流行。 Java 提供了自己的多线程框架,称为 Executor 框架.
1. Executor 框架是什么?
Executor 框架包含一组用于有效管理工作线程的组件。Executor API 通过 Executors 将任务的执行与要执行的实际任务解耦。 这是 生产者-消费者 模式的一种实现。
java.util.concurrent.Executors 提供了用于创建工作线程的线程池的工厂方法。
为了使用 Executor 框架,我们需要创建一个线程池并提交任务给它以供执行。Executor 框架的工作是调度和执行已提交的任务并从线程池中拿到返回的结果。
浮现于脑海中的一个基本的问题是,当我们创建 java.lang.Thread 的对象或调用实现了 Runnable/Callable 接口来达到的程序的并行性时,为什么需要线程池?
答案来源于两个基本面:
所有的这些因素都会导致系统的吞吐量下降。线程池通过保持线程一直存活并重用这些线程来克服这个问题。当提交到线程池中的任务多于正在执行的线程时,那些多余的任务将被放到队列中。 一旦执行任务的线程有空闲的了,它们会从队列中取下一个任务来执行。对于 JDK 提供的现成的 executors 此任务队列基本是无界的。
2. Executors 的类型
现在我们已经了解了 executors 是什么, 让我们来看看不同类型的 executors。
2.1 SingleThreadExecutor
此线程池 executor 只有一个线程。它用于以顺序方式的形式执行任务。如果此线程在执行任务时因异常而挂掉,则会创建一个新线程来替换此线程,后续任务将在新线程中执行。
ExecutorService executorService = Executors.newSingleThreadExecutor()
2.2 FixedThreadPool(n)
顾名思义,它是一个拥有固定数量线程的线程池。提交给 executor 的任务由固定的 n 个线程执行,如果有更多的任务,它们存储在 LinkedBlockingQueue 里。这个数字 n 通常跟底层处理器支持的线程总数有关。
ExecutorService executorService = Executors.newFixedThreadPool(4);
2.3 CachedThreadPool
该线程池主要用于执行大量短期并行任务的场景。与固定线程池不同,此线程池的线程数不受限制。如果所有的线程都在忙于执行任务并且又有新的任务到来了,这个线程池将创建一个新的线程并将其提交到 executor。只要其中一个线程变为空闲,它就会执行新的任务。 如果一个线程有 60 秒的时间都是空闲的,它们将被结束生命周期并从缓存中删除。
但是,如果管理得不合理,或者任务不是很短的,则线程池将包含大量的活动线程。这可能导致资源紊乱并因此导致性能下降。
ExecutorService executorService = Executors.newCachedThreadPool();
2.4 ScheduledExecutor
当我们有一个需要定期运行的任务或者我们希望延迟某个任务时,就会使用此类型的 executor。
ScheduledExecutorService scheduledExecService = Executors.newScheduledThreadPool(1);
可以使用 scheduleAtFixedRate 或 scheduleWithFixedDelay 在 ScheduledExecutor 中定期的执行任务。
scheduledExecService.scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
scheduledExecService.scheduleWithFixedDelay(Runnable command, long initialDelay, long period, TimeUnit unit)
这两种方法的主要区别在于它们对连续执行定期任务之间的延迟的应答。
3. 对于 Future 对象的理解
可以使用 executor 返回的 java.util.concurrent.Future 对象访问提交给 executor 的任务的结果。 Future 可以被认为是 executor 对调用者的响应。
Future<String> result = executorService.submit(callableTask);
如上所述,提交给 executor 的任务是异步的,即程序不会等待当前任务执行完成,而是直接进入下一步。相反,每当任务执行完成时,executor 在此 Future对象中设置它。
调用者可以继续执行主程序,当需要提交任务的结果时,他可以在这个 Future对象上调用.get() 方法来获取。如果任务完成,结果将立即返回给调用者,否则调用者将被阻塞,直到 executor 完成此操作的执行并计算出结果。
如果调用者不能无限期地等待任务执行的结果,那么这个等待时间也可以设置为定时地。可以通过 Future.get(long timeout,TimeUnit unit) 方法实现,如果在规定的时间范围内没有返回结果,则抛出 TimeoutException。调用者可以处理此异常并继续执行该程序。
如果在执行任务时出现异常,则对 get 方法的调用将抛出一个ExecutionException。
对于 Future.get()方法返回的结果,一个重要的事情是,只有提交的任务实现了java.util.concurrent.Callable接口时才返回 Future。如果任务实现了Runnable接口,那么一旦任务完成,对 .get() 方法的调用将返回 null。
另一个关注点是 Future.cancel(boolean mayInterruptIfRunning) 方法。此方法用于取消已提交任务的执行。如果任务已在执行,则 executor 将尝试在mayInterruptIfRunning 标志为 true 时中断任务执行。
4. Example: 创建和执行一个简单的 Executor
我们现在将创建一个任务并尝试在 fixed pool executor 中执行它:
public class Task implements Callable<String> { private String message; public Task(String message) { this.message = message; } @Override public String call() throws Exception { return "Hello " + message + "!"; } }
Task 类实现 Callable 接口并有一个 String 类型作为返回值的方法。 这个方法也可以抛出 Exception。这种向 executor 抛出异常的能力以及 executor 将此异常返回给调用者的能力非常重要,因为它有助于调用者知道任务执行的状态。
现在让我们来执行一下这个任务:
public class ExecutorExample { public static void main(String[] args) { Task task = new Task("World"); ExecutorService executorService = Executors.newFixedThreadPool(4); Future<String> result = executorService.submit(task); try { System.out.println(result.get()); } catch (InterruptedException | ExecutionException e) { System.out.println("Error occured while executing the submitted task"); e.printStackTrace(); } executorService.shutdown(); } }
我们创建了一个具有4个线程数的 FixedThreadPool executors,因为这个 demo是在四核处理器上开发的。如果正在执行的任务执行大量 I/O 操作或花费较长时间等待外部资源,则线程数可能超过处理器的核心数。
我们实例化了 Task 类,并将它提交给 executors 执行。 结果由 Future 对象返回,然后我们在屏幕上打印。
让我们运行 ExecutorExample 并查看其输出:
Hello World!
正如所料,任务追加了问候语 Hello 并通过 Future object 返回结果。
最后,我们调用 executorService 对象上的 shutdown 来终止所有线程并将资源返回给 OS。
.shutdown() 方法等待 executor 完成当前提交的任务。 但是,如果要求是立即关闭 executor 而不等待,那么我们可以使用 .shutdownNow() 方法。
任何待执行的任务都将结果返回到 java.util.List 对象中。
我们也可以通过实现 Runnable 接口来创建同样的任务:
public class Task implements Runnable{ private String message; public Task(String message) { this.message = message; } public void run() { System.out.println("Hello " + message + "!"); } }
当我们实现 Runnable 时,这里有一些重要的变化。
无法从 run() 方法得到任务执行的结果。 因此,我们直接在这里打印。
run() 方法不可抛出任何已受检的异常。
5. 总结
随着处理器时钟速度难以提高,多线程正变得越来越主流。 但是,由于涉及复杂性,处理每个线程的生命周期非常困难。
在本文中,我们展示了一个高效而简单的多线程框架,即 Executor Framework,并解释了它的不同组件。 我们还看了一下在 executor 中创建提交和执行任务的不同示例。
与往常一样,此示例的代码可以在GitHub上找到。
英文原文:https://stackabuse.com/concurrency-in-java-the-executor-framework/
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~