一篇文章带你深入了解Java线程池

网友投稿 783 2022-10-08


一篇文章带你深入了解Java线程池

目录线程池模型常用线程池ThreadPoolExecutor构造函数参数说明 线程池默认工作行为ForkJoinPoolFutureTask线程数量分析CPU密集型IO密集型总结

线程池模型

一般的池化模型会有两个方法,用于获取资源和释放资源,就像这样:

public interface XXPool{

XX acquire();

void release();

}

但是,工程中的线程池一般是生产者和消费者模型,线程池是消费者,任务的提交者是生产者,下面是一个简化的线程池模型:

//简化的线程池,仅用来说明工作原理

class MyThreadPool{

//利用阻塞队列实现生产者-消费者模式

BlockingQueue workQueue;

//保存内部工作线程

List threads

= new ArrayList<>();

// 构造方法

MyThreadPool(int poolSize,

BlockingQueue workQueue){

this.workQueue = workQueue;

// 创建工作线程

for(int idx=0; idx

WorkerThread work = new WorkerThread();

work.start();

threads.add(work);

}

}

// 提交任务

void execute(Runnable command){

workQueue.put(command);

}

// 工作线程负责消费任务,并执行任务

class WorkerThread extends Thread{

public void run() {

//循环取任务并执行

while(true){ ①

Runnable task = workQueue.take();

task.run();

}

}

}

}

/** 下面是使用示例 **/

// 创建有界阻塞队列

BlockingQueue workQueue =

new LinkedBlockingQueue<>(2);

// 创建线程池

MyThreadPool pool = new MyThreadPool(

10, workQueue);

// 提交任务

pool.execute(()->{

System.out.println("hello");

});

常用线程池

ThreadPoolExecutor

在工程中,我们会使用Executors来快速new一个线程池,例如:

ExecutorService executorService = Executors.newFixedThreadPool(threadPoolNum, r -> new Thread(r, threadName));

Executors底层使用的是 ThreadPoolExecutor,我们可以通过ThreadPoolExecutor构造函数来了解ThreadPoolExecutor的一些行为。

ThreadPoolExecutor(

  int corePoolSize,

  int maximumPoolSize,

  long keepAliveTime,

  TimeUnit unit,

  BlockingQueue workQueue,

  ThreadFactory threadFactory,

  RejectedExecutionHandler handler)

构造函数参数说明

corePoolSize:表示线程池保有的最小线程数。

maximumPoolSize:表示线程池创建的最大线程数。

keepAliveTime & unit:如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。

workQueue:工作队列,和上面示例代码的工作队列同义。

threadFactory:通过这http://个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。

handler:通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。

ThreadPoolExecutor 已经提供了以下 4 种策略。

CallerRunsPolicy:提交任务的线程自己去执行该任务。

AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。

DiscardPolicy:直接丢弃任务,没有任何异常抛出。

DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。

线程池默认工作行为

不会初始化 corePoolSize 个线程,有任务来了才创建工作线程;

当核心线程满了之后不会立即扩容线程池,而是把任务堆积到工作队列中;

当工作队列满了后扩容线程池,一直到线程个数达到 maximumPoolSize 为止;(如果线程池还没有扩容到最大线程数但是工作队列已经溢出,溢出的请求会被拒绝)

如果队列已满且达到了最大线程后还有任务进来,按照拒绝策略处理;

当线程数大于核心线程数时,线程等待 keepAliveTime 后还是没有任务需要处理的话,收缩线程到核心线程数。

ForkJoinPool

Fork/Join 是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的 Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。

Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是分治任务 ForkJoinTask。这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask。

ForkJoinPool 主要适用于计算密集型任务,java中的parallelStream底层使用的就是ForkJoinPool。

下面是使用ForkJoinPool的一个简单例子:

public static void main(String[] args) {

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

Fibonacci fibonacci = new Fibonacci(5);

Integer res = forkJoinPool.invoke(fibonacci);

System.out.println(res);

}

static class Fibonacci extends RecursiveTask{

final int n;

Fibonacci(int n){

this.n = n;

}

@Override

protected Integer compute() {

if(n<=1){

return n;

}

Fibonacci f1 = new Fibonacci(n-1);

f1.fork();

Fibonacci f2 = new Fibonacci(n-2);

return f2.compute() + f1.join();

}

}

FutureTask

我们可以通过FutureTask(Future接口的实现类)获取线程执行结果。FutureTask主要方法如下:

// 取消任务

boolean cancel(

boolean mayInterruptIfRunning);

// 判断任务是否已取消

boolean isCancelled();

// 判断任务是否已结束

boolean isDone();

// 获得任务执行结果

get();

// 获得任务执行结果,支持超时

get(long timeout, TimeUnit unit);

其中,两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。

ExecutorService executorService = Executors.newFixedThreadPool(10);

Future future = executorService.submit(() -> {

return 1 + 1;

});

Integer res = future.get();

System.out.println(res);

Integer res2 = future.get(1000, TimeUnit.SECONDS);

System.out.println(res2);

FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行。

// 创建FutureTask

FutureTask futureTask

= new FutureTask<>(()-> 1+2);

// 创建线程池

ExecutorService es =

Executors.newCachedThreadPool();

// 提交FutureTask

es.submit(futureTask);

// 获取计算结果

Integer result = futureTask.get();

线程数量分析

多线程可以提高程序的响应速度和吞吐量,创建线程的数量会对实际效果产生非常大的影响,线程太少会浪费CPU的资源,线程太多则会导致线程的频繁切换,系统性能反而会下降。

根据程序类型的不同,我们可以将我们的程序分为IO密集型和CPU密集型两种,这两种程序计算最佳线程数的方法有所不同。

CPU密集型

对于 CPU 密集型计算,多线程本质上是提升多核 CPU 的利用率,所以对于一个 4 核的 CPU,每个核一个线程,理论上创建 4 个线程就可以了,再多创建线程也只是增加线程切换的成本。所以,对于 CPU 密集型的计算场景,理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的JUyiH利用率。

IO密集型

对于I/O 密集型计算场景,由于计算资源与IO资源是各自独立的资源,在CPU执行其他线程的任务时,IO仍能继续,因此对于IO密集型的程序,最佳线程数与程序中 CPU 计算和 I/O 操作的耗时比相关。

根据上诉分析,我们可以得出最佳线程数的计算公式:

最佳线程数 = 1 +(I/O 耗时 / CPU 耗时)

对于多核CPU,只需进行同比扩大就行:

最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]

对于最佳线程数是多少,以上只是理论分析,由于实际生产环境中,一台机器可能会跑多个服务,一个服务可能会有多个线程池,因此最佳线程数还是要根据实际生产情况进行调整,理论值仅供参考。

总结

本篇文章就到这里了,希望能给你带来帮助,也希望您能够多多关注我们的更多内容!

WorkerThread work = new WorkerThread();

work.start();

threads.add(work);

}

}

// 提交任务

void execute(Runnable command){

workQueue.put(command);

}

// 工作线程负责消费任务,并执行任务

class WorkerThread extends Thread{

public void run() {

//循环取任务并执行

while(true){ ①

Runnable task = workQueue.take();

task.run();

}

}

}

}

/** 下面是使用示例 **/

// 创建有界阻塞队列

BlockingQueue workQueue =

new LinkedBlockingQueue<>(2);

// 创建线程池

MyThreadPool pool = new MyThreadPool(

10, workQueue);

// 提交任务

pool.execute(()->{

System.out.println("hello");

});

常用线程池

ThreadPoolExecutor

在工程中,我们会使用Executors来快速new一个线程池,例如:

ExecutorService executorService = Executors.newFixedThreadPool(threadPoolNum, r -> new Thread(r, threadName));

Executors底层使用的是 ThreadPoolExecutor,我们可以通过ThreadPoolExecutor构造函数来了解ThreadPoolExecutor的一些行为。

ThreadPoolExecutor(

  int corePoolSize,

  int maximumPoolSize,

  long keepAliveTime,

  TimeUnit unit,

  BlockingQueue workQueue,

  ThreadFactory threadFactory,

  RejectedExecutionHandler handler)

构造函数参数说明

corePoolSize:表示线程池保有的最小线程数。

maximumPoolSize:表示线程池创建的最大线程数。

keepAliveTime & unit:如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。

workQueue:工作队列,和上面示例代码的工作队列同义。

threadFactory:通过这http://个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。

handler:通过这个参数你可以自定义任务的拒绝策略。如果线程池中所有的线程都在忙碌,并且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。

ThreadPoolExecutor 已经提供了以下 4 种策略。

CallerRunsPolicy:提交任务的线程自己去执行该任务。

AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。

DiscardPolicy:直接丢弃任务,没有任何异常抛出。

DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。

线程池默认工作行为

不会初始化 corePoolSize 个线程,有任务来了才创建工作线程;

当核心线程满了之后不会立即扩容线程池,而是把任务堆积到工作队列中;

当工作队列满了后扩容线程池,一直到线程个数达到 maximumPoolSize 为止;(如果线程池还没有扩容到最大线程数但是工作队列已经溢出,溢出的请求会被拒绝)

如果队列已满且达到了最大线程后还有任务进来,按照拒绝策略处理;

当线程数大于核心线程数时,线程等待 keepAliveTime 后还是没有任务需要处理的话,收缩线程到核心线程数。

ForkJoinPool

Fork/Join 是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的 Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。

Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是分治任务 ForkJoinTask。这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask。

ForkJoinPool 主要适用于计算密集型任务,java中的parallelStream底层使用的就是ForkJoinPool。

下面是使用ForkJoinPool的一个简单例子:

public static void main(String[] args) {

ForkJoinPool forkJoinPool = new ForkJoinPool(4);

Fibonacci fibonacci = new Fibonacci(5);

Integer res = forkJoinPool.invoke(fibonacci);

System.out.println(res);

}

static class Fibonacci extends RecursiveTask{

final int n;

Fibonacci(int n){

this.n = n;

}

@Override

protected Integer compute() {

if(n<=1){

return n;

}

Fibonacci f1 = new Fibonacci(n-1);

f1.fork();

Fibonacci f2 = new Fibonacci(n-2);

return f2.compute() + f1.join();

}

}

FutureTask

我们可以通过FutureTask(Future接口的实现类)获取线程执行结果。FutureTask主要方法如下:

// 取消任务

boolean cancel(

boolean mayInterruptIfRunning);

// 判断任务是否已取消

boolean isCancelled();

// 判断任务是否已结束

boolean isDone();

// 获得任务执行结果

get();

// 获得任务执行结果,支持超时

get(long timeout, TimeUnit unit);

其中,两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。

ExecutorService executorService = Executors.newFixedThreadPool(10);

Future future = executorService.submit(() -> {

return 1 + 1;

});

Integer res = future.get();

System.out.println(res);

Integer res2 = future.get(1000, TimeUnit.SECONDS);

System.out.println(res2);

FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行。

// 创建FutureTask

FutureTask futureTask

= new FutureTask<>(()-> 1+2);

// 创建线程池

ExecutorService es =

Executors.newCachedThreadPool();

// 提交FutureTask

es.submit(futureTask);

// 获取计算结果

Integer result = futureTask.get();

线程数量分析

多线程可以提高程序的响应速度和吞吐量,创建线程的数量会对实际效果产生非常大的影响,线程太少会浪费CPU的资源,线程太多则会导致线程的频繁切换,系统性能反而会下降。

根据程序类型的不同,我们可以将我们的程序分为IO密集型和CPU密集型两种,这两种程序计算最佳线程数的方法有所不同。

CPU密集型

对于 CPU 密集型计算,多线程本质上是提升多核 CPU 的利用率,所以对于一个 4 核的 CPU,每个核一个线程,理论上创建 4 个线程就可以了,再多创建线程也只是增加线程切换的成本。所以,对于 CPU 密集型的计算场景,理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的JUyiH利用率。

IO密集型

对于I/O 密集型计算场景,由于计算资源与IO资源是各自独立的资源,在CPU执行其他线程的任务时,IO仍能继续,因此对于IO密集型的程序,最佳线程数与程序中 CPU 计算和 I/O 操作的耗时比相关。

根据上诉分析,我们可以得出最佳线程数的计算公式:

最佳线程数 = 1 +(I/O 耗时 / CPU 耗时)

对于多核CPU,只需进行同比扩大就行:

最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]

对于最佳线程数是多少,以上只是理论分析,由于实际生产环境中,一台机器可能会跑多个服务,一个服务可能会有多个线程池,因此最佳线程数还是要根据实际生产情况进行调整,理论值仅供参考。

总结

本篇文章就到这里了,希望能给你带来帮助,也希望您能够多多关注我们的更多内容!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Linux下IPtables命令详解(linux下iptable的命令)
下一篇:IPtables概念和功能(关于iptables的描述)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~