非常适合新手学生的Java线程池优化升级版

网友投稿 291 2022-08-16


非常适合新手学生的Java线程池优化升级版

目录升级版线程池的优化线程池构造器线程池拒绝策略execute方法手写线程池源码MyExecutorServiceMyRejectedExecutionExceptionMyRejectedExecutionHandle核心类MyThreadPoolExecutor线程池测试类

升级版线程池的优化

1:新增了4种拒绝策略。分别为:MyAbortPolicy、MyDiscardPolicy、MyDiscardOldestPolicy、MyCallerRunsPolicy

2:对线程池MyThreadPoolExecutor的构造方法进行优化,增加了参数校验,防止乱传参数现象。

3:这是最重要的一个优化。

移除线程池的线程预热功能。因为线程预热会极大的耗费内存,当我们不用线程池时也会一直在运行状态。换来的是在调用execute方法添加任务时通过检查workers线程集合目前的大小与corePoolSize的值去比较,再通过new MyWorker()去创建添加线程到线程池,这样好处就是当我们创建线程池如果不使用的话则对当前内存没有一点影响,当使用了才会创建线程并放入线程池中进行复用。

线程池构造器

public MyThreadPoolExecutor(){

this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);

}

public MyThreadPoolExecutor(int corePoolSize, BlockingQueue waitingQueue,ThreadFactory threadFactory) {

this(corePoolSize,waitingQueue,threadFactory,defaultHandle);

}

public MyThreadPoolExecutor(int corePoolSize, BlockingQueue waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {

this.workers=new HashSet<>(corePoolSize);

if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){

this.corePoolSize=corePoolSize;

this.waitingQueue=waitingQueue;

this.threadFactory=threadFactory;

this.handle=handle;

}else {

throw new NullPointerException("线程池参数不合法");

}

}

线程池拒绝策略

策略接口:MyugjeDHRFzXRejectedExecutionHandle

package com.springframework.concurrent;

/**

* 自定义拒绝策略

* @author 游政杰

*/

public interface MyRejectedExecutionHandle {

void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);

}

策略内部实现类

/**

* 实现自定义拒绝策略

*/

//抛异常策略(默认)

public static class MyAbortPolicy implements MyRejectedExecutionHandle{

public MyAbortPolicy(){

}

@Override

public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {

throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝");

}

}

//默默丢弃策略

public static class MyDiscardPolicy implements MyRejectedExecutionHandle{

public MyDiscardPolicy() {

}

@Override

public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

}

}

//丢弃掉最老的任务策略

public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{

public MyDiscardOldestPolicy() {

}

@Override

public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭

threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了

threadPoolExecutor.execute(runnable); //把新任务加入到队列中

}

}

}

//由调用者调用策略

public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{

public MyCallerRunsPolicy(){

}

@Override

public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭

runnable.run();

}

}

}

封装拒绝方法

protected final void reject(Runnable runnable){

this.handle.rejectedExecution(runnable, this);

}

protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){

this.handle.rejectedExecution(runnable, threadPoolExecutor);

}

execute方法

@Override

public boolean execute(Runnable runnable)

{

if (!this.waitingQueue.offer(runnable)) {

this.reject(runnable);

return false;

}

else {

if(this.workers!=null&&this.workers.size()

MyWorker worker = new MyWorker(); //通过构造方法添加线程

}

return true;

}

}

可以看出只有当往线程池放任务时才会创建线程对象。

手写线程池源码

MyExecutorService

package com.springframework.concurrent;

import java.util.concurrent.BlockingQueue;

/**

* 自定义线程池业务接口

* @author 游政杰

*/

public interface MyExecutorService {

boolean execute(Runnable runnable);

void shutdown();

void shutdownNow();

boolean isShutdown();

BlockingQueue getWaitingQueue();

}

MyRejectedExecutionException

package com.springframework.concurrent;

/**

* 自定义拒绝异常

*/

public class MyRejectedExecutionException extends RuntimeException {

public MyRejectedExecutionException() {

}

public MyRejectedExecutionException(String message) {

super(message);

}

public MyRejectedExecutionException(String message, Throwable cause) {

super(message, cause);

}

public MyRejectedExecutionException(Throwable cause) {

super(cause);

}

}

MyRejectedExecutionHandle

package com.springframework.concurrent;

/**

* 自定义拒绝策略

* @author 游政杰

*/

public interface MyRejectedExecutionHandle {

void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);

}

核心类MyThreadPoolExecutor

package com.springframework.concurrent;

import java.util.HashSet;

import java.util.Set;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.Executors;

import java.util.concurrent.ThreadFactory;

import java.util.concurrent.atomic.AtomicInteger;

/**

ugjeDHRFzX * 纯手撸线程池框架

* @author 游政杰

*/

public class MyThreadPoolExecutor implements MyExecutorService{

private static final AtomicInteger taskcount=new AtomicInteger(0);//执行任务次数

private static final AtomicInteger threadNumber=new AtomicInteger(0); //线程编号

private static volatile int corePoolSize; //核心线程数

private final HashSet workers; //工作线程

private final BlockingQueue waitingQueue; //等待队列

private static final String THREADPOOL_NAME="MyThread-Pool-";//线程名称

private volatile boolean isRunning=true; //是否运行

private volatile boolean STOPNOW=false; //是否立刻停止

private volatile ThreadFactory threadFactory; //线程工厂

private static final MyRejectedExecutionHandle defaultHandle=new MyThreadPoolExecutor.MyAbortPolicy();//默认拒绝策略

private volatile MyRejectedExecutionHandle handle; //拒绝紫略

public MyThreadPoolExecutor(){

this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);

}

public MyThreadPoolExecutor(int corePoolSize, BlockingQueue waitingQueue,ThreadFactory threadFactory) {

this(corePoolSize,waitingQueue,threadFactory,defaultHandle);

}

public MyThreadPoolExecutor(int corePoolSize, BlockingQueue waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {

this.workers=new HashSet<>(corePoolSize);

if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){

this.corePoolSize=corePoolSize;

this.waitingQueue=waitingQueue;

this.threadFactory=threadFactory;

this.handle=handle;

}else {

throw new NullPointerException("线程池参数不合法");

}

}

/**

* 实现自定义拒绝策略

*/

//抛异常策略(默认)

public static class MyAbortPolicy implements MyRejectedExecutionHandle{

public MyAbortPolicy(){

}

@Override

public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {

throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝");

}

}

//默默丢弃策略

public static class MyDiscardPolicy implements MyRejectedExecutionHandle{

public MyDiscardPolicy() {

}

@Override

public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

}

}

//丢弃掉最老的任务策略

public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{

public MyDiscardOldestPolicy() {

}

@Override

public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭

threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了

threadPoolExecutor.execute(runnable); //把新任务加入到队列中

}

}

}

//由调用者调用策略

public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{

public MyCallerRunsPolicy(){

}

@Override

public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭

runnable.run();

}

}

}

//call拒绝方法

protected final void reject(Runnable runnable){

this.handle.rejectedExecution(runnable, this);

}

protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){

this.handle.rejectedExecution(runnable, threadPoolExecutor);

}

/**

* MyWorker就是我们每一个线程对象

*/

private final class MyWorker implements Runnable{

final Thread thread; //为每个MyWorker

MyWorker(){

Thread td = threadFactory.newThread(this);

td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement());

this.thread=td;

this.thread.start();

workers.add(this);

}

//执行任务

@Override

public void run() {

//循环接收任务

while (true)

{

//循环退出条件:

//1:当isRunning为false并且waitingQueue的队列大小为0(也就是无任务了),会优雅的退出。

//2:当STOPNOW为true,则说明调用了shutdownNow方法进行暴力退出。

if((!isRunning&&waitingQueue.size()==0)||STOPNOW)

{

break;

}else {

//不断取任务,当任务!=null时则调用run方法处理任务

Runnable runnable = waitingQueue.poll();

if(runnable!=null){

runnable.run();

System.out.println("task==>"+taskcount.incrementAndGet());

}

}

}

}

}

//往线程池中放任务

@Override

public boolean execute(Runnable runnable)

{

if (!this.waitingQueue.offer(runnable)) {

this.reject(runnable);

return false;

}

else {

if(this.workers!=null&&this.workers.size()

MyWorker worker = new MyWorker(); //通过构造方法添加线程

}

return true;

}

}

//优雅的关闭

@Override

public void shutdown()

{

this.isRunning=false;

}

//暴力关闭

@Override

public void shutdownNow()

{

this.STOPNOW=true;

}

//判断线程池是否关闭

@Override

public boolean isShutdown() {

return !this.isRunning||STOPNOW;

}

//获取等待队列

@Override

public BlockingQueue getWaitingQueue() {

return this.waitingQueue;

}

}

线程池测试类

package com.springframework.test;

import com.springframework.concurrent.MyThreadPoolExecutor;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.Executors;

public class ThreadPoolTest {

public static void main(String[] args) {

// MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor

// (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyAbortPolicy());

// MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor

// (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardPolicy());

// MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor

// (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardOldestPolicy());

MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor

(5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyCallerRunsPolicy());

for(int i=0;i<11;i++){

int finalI = i;

myThreadPoolExecutor.execute(()->{

System.out.println(Thread.currentThread().getName()+">>>>"+ finalI);

});

}

myThreadPoolExecutor.shutdown();

// myThreadPoolExecutor.shutdownNow();

}

}

好了升级版线程池就优化到这了,后面可能还会出完善版,不断进行优化。

MyWorker worker = new MyWorker(); //通过构造方法添加线程

}

return true;

}

}

可以看出只有当往线程池放任务时才会创建线程对象。

手写线程池源码

MyExecutorService

package com.springframework.concurrent;

import java.util.concurrent.BlockingQueue;

/**

* 自定义线程池业务接口

* @author 游政杰

*/

public interface MyExecutorService {

boolean execute(Runnable runnable);

void shutdown();

void shutdownNow();

boolean isShutdown();

BlockingQueue getWaitingQueue();

}

MyRejectedExecutionException

package com.springframework.concurrent;

/**

* 自定义拒绝异常

*/

public class MyRejectedExecutionException extends RuntimeException {

public MyRejectedExecutionException() {

}

public MyRejectedExecutionException(String message) {

super(message);

}

public MyRejectedExecutionException(String message, Throwable cause) {

super(message, cause);

}

public MyRejectedExecutionException(Throwable cause) {

super(cause);

}

}

MyRejectedExecutionHandle

package com.springframework.concurrent;

/**

* 自定义拒绝策略

* @author 游政杰

*/

public interface MyRejectedExecutionHandle {

void rejectedExecution(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor);

}

核心类MyThreadPoolExecutor

package com.springframework.concurrent;

import java.util.HashSet;

import java.util.Set;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.Executors;

import java.util.concurrent.ThreadFactory;

import java.util.concurrent.atomic.AtomicInteger;

/**

ugjeDHRFzX * 纯手撸线程池框架

* @author 游政杰

*/

public class MyThreadPoolExecutor implements MyExecutorService{

private static final AtomicInteger taskcount=new AtomicInteger(0);//执行任务次数

private static final AtomicInteger threadNumber=new AtomicInteger(0); //线程编号

private static volatile int corePoolSize; //核心线程数

private final HashSet workers; //工作线程

private final BlockingQueue waitingQueue; //等待队列

private static final String THREADPOOL_NAME="MyThread-Pool-";//线程名称

private volatile boolean isRunning=true; //是否运行

private volatile boolean STOPNOW=false; //是否立刻停止

private volatile ThreadFactory threadFactory; //线程工厂

private static final MyRejectedExecutionHandle defaultHandle=new MyThreadPoolExecutor.MyAbortPolicy();//默认拒绝策略

private volatile MyRejectedExecutionHandle handle; //拒绝紫略

public MyThreadPoolExecutor(){

this(5,new ArrayBlockingQueue<>(10), Executors.defaultThreadFactory(),defaultHandle);

}

public MyThreadPoolExecutor(int corePoolSize, BlockingQueue waitingQueue,ThreadFactory threadFactory) {

this(corePoolSize,waitingQueue,threadFactory,defaultHandle);

}

public MyThreadPoolExecutor(int corePoolSize, BlockingQueue waitingQueue,ThreadFactory threadFactory,MyRejectedExecutionHandle handle) {

this.workers=new HashSet<>(corePoolSize);

if(corePoolSize>=0&&waitingQueue!=null&&threadFactory!=null&&handle!=null){

this.corePoolSize=corePoolSize;

this.waitingQueue=waitingQueue;

this.threadFactory=threadFactory;

this.handle=handle;

}else {

throw new NullPointerException("线程池参数不合法");

}

}

/**

* 实现自定义拒绝策略

*/

//抛异常策略(默认)

public static class MyAbortPolicy implements MyRejectedExecutionHandle{

public MyAbortPolicy(){

}

@Override

public void rejectedExecution(Runnable r, MyThreadPoolExecutor t) {

throw new MyRejectedExecutionException("任务-> "+r.toString()+"被线程池-> "+t.toString()+" 拒绝");

}

}

//默默丢弃策略

public static class MyDiscardPolicy implements MyRejectedExecutionHandle{

public MyDiscardPolicy() {

}

@Override

public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

}

}

//丢弃掉最老的任务策略

public static class MyDiscardOldestPolicy implements MyRejectedExecutionHandle{

public MyDiscardOldestPolicy() {

}

@Override

public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

if(!threadPoolExecutor.isShutdown()){ //如果线程池没被关闭

threadPoolExecutor.getWaitingQueue().poll();//丢掉最老的任务,此时就有位置当新任务了

threadPoolExecutor.execute(runnable); //把新任务加入到队列中

}

}

}

//由调用者调用策略

public static class MyCallerRunsPolicy implements MyRejectedExecutionHandle{

public MyCallerRunsPolicy(){

}

@Override

public void rejectedExecution(Runnable runnable, MyThreadPoolExecutor threadPoolExecutor) {

if(!threadPoolExecutor.isShutdown()){//判断线程池是否被关闭

runnable.run();

}

}

}

//call拒绝方法

protected final void reject(Runnable runnable){

this.handle.rejectedExecution(runnable, this);

}

protected final void reject(Runnable runnable,MyThreadPoolExecutor threadPoolExecutor){

this.handle.rejectedExecution(runnable, threadPoolExecutor);

}

/**

* MyWorker就是我们每一个线程对象

*/

private final class MyWorker implements Runnable{

final Thread thread; //为每个MyWorker

MyWorker(){

Thread td = threadFactory.newThread(this);

td.setName(THREADPOOL_NAME+threadNumber.getAndIncrement());

this.thread=td;

this.thread.start();

workers.add(this);

}

//执行任务

@Override

public void run() {

//循环接收任务

while (true)

{

//循环退出条件:

//1:当isRunning为false并且waitingQueue的队列大小为0(也就是无任务了),会优雅的退出。

//2:当STOPNOW为true,则说明调用了shutdownNow方法进行暴力退出。

if((!isRunning&&waitingQueue.size()==0)||STOPNOW)

{

break;

}else {

//不断取任务,当任务!=null时则调用run方法处理任务

Runnable runnable = waitingQueue.poll();

if(runnable!=null){

runnable.run();

System.out.println("task==>"+taskcount.incrementAndGet());

}

}

}

}

}

//往线程池中放任务

@Override

public boolean execute(Runnable runnable)

{

if (!this.waitingQueue.offer(runnable)) {

this.reject(runnable);

return false;

}

else {

if(this.workers!=null&&this.workers.size()

MyWorker worker = new MyWorker(); //通过构造方法添加线程

}

return true;

}

}

//优雅的关闭

@Override

public void shutdown()

{

this.isRunning=false;

}

//暴力关闭

@Override

public void shutdownNow()

{

this.STOPNOW=true;

}

//判断线程池是否关闭

@Override

public boolean isShutdown() {

return !this.isRunning||STOPNOW;

}

//获取等待队列

@Override

public BlockingQueue getWaitingQueue() {

return this.waitingQueue;

}

}

线程池测试类

package com.springframework.test;

import com.springframework.concurrent.MyThreadPoolExecutor;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.Executors;

public class ThreadPoolTest {

public static void main(String[] args) {

// MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor

// (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyAbortPolicy());

// MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor

// (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardPolicy());

// MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor

// (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardOldestPolicy());

MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor

(5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyCallerRunsPolicy());

for(int i=0;i<11;i++){

int finalI = i;

myThreadPoolExecutor.execute(()->{

System.out.println(Thread.currentThread().getName()+">>>>"+ finalI);

});

}

myThreadPoolExecutor.shutdown();

// myThreadPoolExecutor.shutdownNow();

}

}

好了升级版线程池就优化到这了,后面可能还会出完善版,不断进行优化。

MyWorker worker = new MyWorker(); //通过构造方法添加线程

}

return true;

}

}

//优雅的关闭

@Override

public void shutdown()

{

this.isRunning=false;

}

//暴力关闭

@Override

public void shutdownNow()

{

this.STOPNOW=true;

}

//判断线程池是否关闭

@Override

public boolean isShutdown() {

return !this.isRunning||STOPNOW;

}

//获取等待队列

@Override

public BlockingQueue getWaitingQueue() {

return this.waitingQueue;

}

}

线程池测试类

package com.springframework.test;

import com.springframework.concurrent.MyThreadPoolExecutor;

import java.util.concurrent.ArrayBlockingQueue;

import java.util.concurrent.Executors;

public class ThreadPoolTest {

public static void main(String[] args) {

// MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor

// (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyAbortPolicy());

// MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor

// (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardPolicy());

// MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor

// (5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyDiscardOldestPolicy());

MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor

(5,new ArrayBlockingQueue<>(6), Executors.defaultThreadFactory(),new MyThreadPoolExecutor.MyCallerRunsPolicy());

for(int i=0;i<11;i++){

int finalI = i;

myThreadPoolExecutor.execute(()->{

System.out.println(Thread.currentThread().getName()+">>>>"+ finalI);

});

}

myThreadPoolExecutor.shutdown();

// myThreadPoolExecutor.shutdownNow();

}

}

好了升级版线程池就优化到这了,后面可能还会出完善版,不断进行优化。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Apache Hudi异步Clustering部署操作的掌握
下一篇:Java中提供synchronized后为什么还要提供Lock
相关文章

 发表评论

暂时没有评论,来抢沙发吧~