谈谈Java 线程池

网友投稿 302 2022-11-25


谈谈Java 线程池

一、引言

池的概念大家并不陌生,数据库连接池、线程池等...大体来说,有三个优点:

降低资源消耗。

提高响应速度。

便于统一管理。

以上是 “池化” 技术的相同特点,至于他们之间的不同点这里不讲,两者都是为了提高性能和效率,抛开实际做连连看找不同,没有意义。

同样,类比于线程池来说:

降低资源消耗:

重复利用线程池中已经创建的线程,相比之下省去了线程创建和销毁的性能消耗。

提高响应速度:

当有任务创建时,不必等待线程创建,可以立即执行。

便于统一管理:

使用线程池,可以对线程统一管理,对线程的执行状态做统一监控。

二、线程池的使用

public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,

long keepAliveTime,

TimeUnit unit,

BlockingQueue workQueue,

ThreadFactory threadFactory,

RejectedExecutionHandler handler);

1、关键参数

corePoolSize 核心线程数

当向线程池中提交一个任务时,如果线程池中的线程数量小于核心线程数,即使存在空闲线程,也会新建一个线程来执行当前任务,直到线程数量大于或等于核心线程数。

maximunPoolSize 最大线程数

当任务队列满了,线程池中的线程数量小于最大线程数时,创建新线程执行任务。对于无界队列,忽略该参数。

keepAliveTime 线程存活时间

大于核心线程数的那一部分线程的存活时间,如果这部分线程空闲超过这段时间,则进行销毁。

workqueue 任务队列

线程池中的线程数大于核心线程数时,将任务放入此队列等待执行。

threadFactory 线程工厂

用于创建线程,工厂使用 new Threa() 的方式创建线程,并为每个线程做统一规则的命名:pool-m-thread-n(m为线程池的编号,n为线程池内的线程编号)。

handler 饱和策略

当线程池和队列都满了,则根据此策略处理任务。

2、任务队列类型

名称

描述

ArrayBlockingQueue

基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。

LinkedBlockingQueue

基于链表结构的阻塞队列,此队列按 FIFO (先进先出) 排序元素,吞吐量通常要高于 ArrayBlockingQueue。Executors.newFixedThreadPool( ) 使用了这个队列。

SynchronousQueue

不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 LinkedBlockingQueue,静态工厂方法 Executors.newCachedThreadPool( ) 使用了这个队列。

PriorityBlockingQueue

具有优先级的无限阻塞队列。

3、饱和策略类型

策略名称

特性

AbortPolicy

默认的饱和策略,直接抛出 RejectedExecutionException 异常

DiscardPolicy

不处理,直接丢弃任务

CallerRunsPolicy

使用调用者的线程执行任务

DiscardOldestPolicy

丢弃队列里最近的一个任务,执行当前任务

同时,还可以自行实现 RejectedExecutionHandler 接口来自定义饱和策略,比如记录日志、持久化等等。

void execute(Runnable command)

ThreadFactory namedThreadFactory =

new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();

ExecutorService executor =

new ThreadPoolExecutor(

10,

1000,

60L,

TimeUnit.SECONDS,

new LinkedBlockingQueue<>(10),

namedThreadFactory,

new ThreadPoolExecutor.AbortPolicy());

executor.execute(

() -> {

System.out.println(1111);

});

注意使用 execute 方法提交任务时,没有返回值。

Future> submit(Runnable task)

Future future = executor.submit(() -> {

return 1 + 1;

});

Integer result = future.get();

还可以使用 submit 方法提交任务,该方法返回一个 Future 对象,通过 Future#get( ) 方法可以获得任务的返回值,该方法会一直阻塞知道任务执行完毕。还可以使用 Future#get(long timeout, TimeUnit unit) 方法,该方法会阻塞一段时间后立即返回,而这时任务可能没有执行完毕。

5、关闭线程池

ThreadPoolExecutor 提供了 shutdown( ) 和 shutdownNow( ) 两个方法关闭线程池。原理是首先遍历线程池的工作线程,依次调用 interrupt( ) 方法中断线程,这样看来如果无法响应中断的任务就不能终止。

两者区别是:

shutdownNow( )

shutdown( )

如果调用了其中一种方法,isShutdown 方法就会返回 true。当所有的任务都已关闭后, 才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。实际应用中可以根据任务是否 一定要执行完毕 的特性,决定使用哪种方法关闭线程池。

6、合理的配置线程池

通常我们可以 根据 CPU 核心数量来设计线程池数量 。

可以通过 Runtime.getRuntime().availableProcessors() 方法获得当前设备的物理核心数量。值得注意的是,如果应用运行在一些 docker 或虚拟机容器上时,该方法取得的是当前物理机的 CPU 核心数。

IO 密集型 2nCPU

计算密集型 nCPU+1

其中 n 为 CPU 核心数量。

为什么加 1:即使当计算密集型的线程偶尔由于缺失故障或者其他原因而http://暂停时,这个额外的线程也能确保 CPU 的时钟周期不会被浪费。

三、线程池的运行过程

当提交一个新任务时,线程池的处理步骤:

判断当前线程池内的线程数量是否小于核心线程数,如果小于则新建线程执行任务。否则,进入下个阶段。

判断队列是否已满,如果没满,则将任务加入等待队列。否则,进入下个阶段。

在上面基础上判断是否大于最大线程数,如果是根据响应的策略处理。否则,新建线程执行当前任务。

线程池的源码比较简单易懂,感兴趣的小伙伴可以自行查看 java.util.concurrent.ThreadPoolExecutor ,在线程池中每个任务都被包装为一个一个的 Worker ,下面简单看下 Worker 的 run( ) 方法:

try {

while (task != null || (task = getTask()) != null) {

w.lock();

// If pool is stopping, ensure thread is interrupted;

// if not, ensure thread is not interrupted. This

// requires a recheck in second case to deal with

// shutdownNow race while clearing interrupt

if ((runStateAtLeast(ctl.get(), STOP) ||

(Thread.interrupted() &&

runStateAtLeast(ctl.get(), STOP))) &&

!wt.isInterrupted())

wt.interrupt();

try {

beforeExecute(wt, task);

Throwable thrown = null;

try {

task.run();

} catch (RuntimeException x) {

thrown = x; throw x;

} catch (Error x) {

thrown = x; throw x;

} catch (Throwable x) {

thrown = x; throw new Error(x);

} finally {

afterExecute(task, thrown);

}

} finally {

task = null;

w.completedTasks++;

w.unlock();

}

}

completedAbruptly = false;

} finally {

processWorkerExit(w, completedAbruptly);

}

可以看到不断的循环取出 Task 并执行,而在任务的执行前后,有 beforeExecute 和 afterExecute 方法,我们可以实现两个方法实现一些监控逻辑。除此之外还可以集合线程池的一些属性或者重写 terminated() 方法在线程池关闭时进行监控。

四、常见的几种线程池实现

在 Executors 中提供了集中常见的线程池,分别应用在不同的场景。

FixThreadPool 固定数量的线程池,适用于对线程管理,高负载的系统

SingleThreadPool 只有一个线程的线程池,适用于保证任务顺序执行

CacheThreadPool 创建一个不限制线程数量的线程池,适用于执行短期异步任务的小程序,低负载系统

ScheduledThreadPool 定时任务使用的线程池,适用于定时任务

上面几种线程池的特性主要依赖于 ThreadPoolExecutor 的几个参数来实现,不同的核心线程数量,以及不同类型的阻塞队列,同时我们还可以自行实现自己的线程池满足业务需求。

值得注意的是,并不推荐使用 Executors 创建线程池,详见下:

Executors.newFixedThreadPool(int nThread)

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue());

}

继续来看 LinkedBlockingQueue :

public LinkedBlockingQueue() {

this(Integer.MAX_VALUE);

}

public LinkedBlockingQueue(int capacity) {

if (capacity <= 0) throw new IllegalArgumentException();

this.capacity = capacity;

last = head = new Node(null);

}

可以看到使用 LinkedBlockingQueue 创建的是 Integer.MAX_VALUE 大小的队列,会堆积大量的请求,从而造成 OOM

Executors.newSingleThreadExexutor( )

public static ExecutorService newSingleThreadExecutor() {

return new FinalizableDelegatedExecutorService

(new ThreadPoolExecutor(1, 1,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue()));

}

同样,使用的 LinkedBlockingQueue ,一样的情况

Executors.newCachedThreadPool( )

public static ExecutorService newCachedThreadPool() {

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

new SynchronousQueue());

}

代码课件线程池使用的最大线程数是 Integer.MAX_VALUE ,可能会创建大量线程,导致 OOM

Executors.newScheduleThreadPool()

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {

return new ScheduledThreadPoolExecutor(corePoolSize);

}

public ScheduledThreadPoolExecutor(int corePoolSize) {

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

new DelayedWorkQueue());

}

和上面是一样的问题,最大线程数是 Integer.MAX_VALUE

所以原则上来说禁止使用 Executors 创建线程池, 而使用 ThreadPoolExecutor 的构造函数来创建线程池。

五、结语

线程池在开发中还是比较常见的,结合不同的业务场景,结合最佳实践配置正确的参数,可以帮助我们的应用性能得到提升。

以上就是谈谈Java 线程池的详细内容,更多关于Java 线程池的资料请关注我们其它相关文章!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:IDEA巧用Postfix Completion让码速起飞(小技巧)
下一篇:Delegate IDE build/run actions to maven 配置会影响程序运行吗?
相关文章

 发表评论

暂时没有评论,来抢沙发吧~