Springboot线程池并发处理数据优化方式

网友投稿 412 2022-09-06


Springboot线程池并发处理数据优化方式

目录第一步:首先配置线程基本参数第二步:让Spring Boot加载第三步:创建一个service接口第四步:编写现实类第五步:测试结果如下

第一步:首先配置线程基本参数

可以放在application.propertes文件种也可以放在自己新建的config/文件目录下,注意:但是需要使用@PropertySource把配置文件进行加载。

# 异步线程配置

# 配置核心线程数

async.executor.thread.core_pool_size = 8

# 配置最大线程数

async.executor.thread.max_pool_size = 20

# 配置队列大小

async.executor.thread.queue_capacity = 99999

# 配置线程池中的线程的名称前缀

async.executor.thread.name.prefix = async-service-

第二步:让Spring Boot加载

用来定义如何创建一个ThreadPoolTaskExecutor,要使用@Configuration和@EnableAsync这两个注解,表示这是个配置类,并且是线程池的配置类

@Slf4j

@EnableAsync

@Configuration

public class RCExecutorConfig {

@Value("${async.executor.thread.core_pool_size}")

private int corePoolSize;

@Value("${async.executor.thread.max_pool_size}")

private int maxPoolSize;

@Value("${async.executor.thread.queue_capacity}")

private int queueCapacity;

@Value("${async.executor.thread.name.prefix}")

private String namePrefix;

@Bean(name = "asyncServiceExecutor")

public Executor asyncServiceExecutor() {

log.info("start asyncServiceExecutor");

ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

//ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();

//配置核心线程数

executor.setCorePoolSize(corePoolSize);

//配置最大线程数

executor.setMaxPoolSize(maxPoolSize);

//配置队列大小

executor.setQueueCapacity(queueCapacity);

//配置线程池中的线程的名称前缀

executor.setThreadNamePrefix(namePrefix);

// rejection-policy:当pool已经达到max size的时候,如何处理新任务

// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

//执行初始化

executor.initialize();

return executor;

}

}

第三步:创建一个service接口

是异步线程的接口,便于测试

public interface AsyncService {

/**

* 执行异步任务

* 可以根据需求,自己加参数拟定

*/

void executeAsync();

}

第四步:编写现实类

将Service层的服务异步化,在executeAsync()方法上增加注解@Async("asyncServiceExecutor"),asyncServiceExecutor方法是前面RCExecutorConfig.java中的方法名,表明executeAsync方法进入的线程池是asyncServiceExecutor方法创建的。

测试方面这里我加入了一个定时任务,使用的是corn表达式。(不懂得同学可以网上了解一下)

@Slf4j

@Service

public class AsyncServiceImpl implements AsyncService {

@Override

@Scheduled(cron = " */1 * * * * ? ")

@Async("asyncServiceExecutor")

public void executeAsyncOmGMpH() {

log.info("start executeAsync");

System.out.println("异步线程执行批量插入等耗时任务");

log.info("end executeAsync");

}

}

第五步:测试结果如下

10:32:15.004 [async-service-1] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:15.004 [async-service-1] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:32:16.003 [async-service-2] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:16.004 [async-service-2] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:32:17.001 [async-service-3] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:17.001 [async-service-3] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:32:18.002 [async-service-4] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:18.003 [async-service-4] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:32:19.002 [async-service-5] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:19.003 [async-service-5] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:32:20.001 [async-service-6] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:20.002 [async-service-6] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:32:21.002 [async-service-7] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:21.002 [async-service-7] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:32:22.004 [async-service-8] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:22.005 [async-service-8] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:32:23.001 [async-service-1] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:23.003 [async-service-1] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:32:24.003 [async-service-2] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:24.003 [async-service-2] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:32:25.001 [async-service-3] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:25.001 [async-service-3] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:32:26.002 [async-service-4] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:26.002 [async-service-4] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:32:27.002 [async-service-5] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:27.003 [async-service-5] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:32:28.001 [async-service-6] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:28.001 [async-service-6] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:32:29.001 [async-service-7] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:29.002 [async-service-7] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:32:30.001 [async-service-8] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:30.001 [async-service-8] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:32:31.001 [async-service-1] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:32:31.001 [async-service-1] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

还没完:

通过以上日志可以发现,[async-service-]是有多个线程的,显然已经在我们配置的线程池中执行了,表明每次请求都快速响应了,而耗时的操作都留给线程池中的线程去异步执行;

还有另一个问提就是,虽然已经用上了线程池,但是依然不清楚线程池当时的情况,有多少线程在执行,多少在队列中等待呢?于是这里我创建了一个ThreadPoolTaskExecutor的子类,可以把每次提交线程的时候都会将当前线程池的运行状况打印出来

import lombok.extern.slf4j.Slf4j;

@Slf4j

public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {

/**

*

*/

private static final long serialVersionUID = -3518460523928455463L;

private void showThreadPoolInfo(String prefix) {

ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

if (null == threadPoolExecutor) {

return;

}

log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",

this.getThreadNamePrefix(),

prefix,

threadPoolExecutor.getTaskCount(),

threadPoolExecutor.getCompletedTaskCount(),

threadPoolExecutor.getActiveCount(),

threadPoolExecutor.getQueue().size());

}

@Override

public void execute(Runnable task) {

showThreadPoolInfo("1. do execute");

super.execute(task);

}

@Override

public void execute(Runnable task, long startTimeout) {

showThreadPoolInfo("2. do execute");

super.execute(task, startTimeout);

}

@Override

public Future> submit(Runnable task) {

showThreadPoolInfo("1. do submit");

return super.submit(task);

}

@Override

public Future submit(Callable task) {

showThreadPoolInfo("2. do submit");

return super.submit(task);

}

@Override

public ListenableFuture> submitListenable(Runnable task) {

showThreadPoolInfo("1. do submitListenable");

return super.submitListenable(task);

}

@Override

public ListenableFuture submitListenable(Callable task) {

showThreadPoolInfo("2. do submitListenable");

return super.submitListenable(task);

}

}

其次:修改RCExecutorConfig.java的asyncServiceExecutor方法,

将ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()改为ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor()

@Bean(name = "asyncServiceExecutor")

public Executor asyncServiceExecutor() {

log.info("start asyncServiceExecutor");

//ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();

ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor();

//配置核心线程数

executor.setCorePoolSize(corePoolSize);

//配置最大线程数

executor.setMaxPoolSize(maxPoolSize);

//配置队列大小

executor.setQueueCapacity(queueCapacity);

//配置线程池中的线程的名称前缀

executor.setThreadNamePrefix(namePrefix);

// rejection-policy:当pool已经达到max size的时候,如何处理新任务

// CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行

executor.setRejecOmGMpHtedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

//执行初始化

executor.initialize();

return executor;

}

测试结果如下:

异步线程执行批量插入等耗时任务

10:41:35.003 [async-service-5] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:41:36.001 [sheduled-pool-1-thread-1] INFO c.a.a.e.t.VisiableThreadPoolTaskExecutor - async-service-, 2. do submit,taskCount [5], completedTaskCount [5], activeCount [0], queueSize [0]

10:41:36.001 [async-service-6] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:41:36.002 [async-service-6] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:41:37.001 [sheduled-pool-1-thread-7] INFO c.a.a.e.t.VisiableThreadPoolTaskExecutor - async-service-, 2. do submit,taskCount [6], completedTaskCount [6], activeCount [0], queueSize [0]

10:41:37.001 [async-service-7] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:41:37.002 [async-service-7] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:41:38.002 [sheduled-pool-1-thread-7] INFO c.a.a.e.t.VisiableThreadPoolTaskExecutor - async-service-, 2. do submit,taskCount [7], completedTaskCount [7], activeCount [0], queueSize [0]

10:41:38.002 [async-service-8] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:41:38.002 [async-service-8] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:41:39.001 [sheduled-pool-1-thread-7] INFO c.a.a.e.t.VisiableThreadPoolTaskExecutor - async-service-, 2. do submit,taskCount [8], completedTaskCount [8], activeCount [0], queueSize [0]

10:41:39.001 [async-service-1] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:41:39.002 [async-service-1] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:41:40.003 [sheduled-pool-1-thread-7] INFO c.a.a.e.t.VisiableThreadPoolTaskExecutor - async-service-, 2. do submit,taskCount [9], completedTaskCount [9], activeCount [0], queueSize [0]

10:41:40.003 [async-service-2] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:41:40.003 [async-service-2] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:41:41.001 [sheduled-pool-1-thread-7] INFO c.a.a.e.t.VisiableThreadPoolTaskExecutor - async-service-, 2. do submit,taskCount [10], completedTaskCount [10], activeCount [0], queueSize [0]

10:41:41.001 [async-service-3] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:41:41.001 [async-service-3] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:41:42.000 [sheduled-pool-1-thread-7] INFO c.a.a.e.t.VisiableThreadPoolTaskExecutor - async-service-, 2. do submit,taskCount [11], completedTaskCount [11], activeCount [0], queueSize [0]

10:41:42.000 [async-service-4] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:41:42.000 [async-service-4] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:41:43.001 [sheduled-pool-1-thread-7] INFO c.a.a.e.t.VisiableThreadPoolTaskExecutor - async-service-, 2. do submit,taskCount [12], completedTaskCount [12], activeCount [0], queueSize [0]

10:41:43.002 [async-service-5] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:41:43.003 [async-service-5] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:41:44.001 [sheduled-pool-1-thread-7] INFO c.a.a.e.t.VisiableThreadPoolTaskExecutor - async-service-, 2. do submit,taskCount [13], completedTaskCount [13], activeCount [0], queueSize [0]

10:41:44.001 [async-service-6] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

异步线程执行批量插入等耗时任务

10:41:44.001 [async-service-6] INFO c.a.a.service.impl.AsyncServiceImpl - end executeAsync

10:41:45.000 [sheduled-pool-1-thread-7] INFO c.a.a.e.t.VisiableThreadPoolTaskExecutor - async-service-, 2. do submit,taskCount [14], completedTaskCount [14], activeCount [0], queueSize [0]

10:41:45.001 [async-service-7] INFO c.a.a.service.impl.AsyncServiceImpl - start executeAsync

解释:这里意思提交了14个任务,处理了14个任务,对列中还剩0个任务

10:41:45.000 [sheduled-pool-1-thread-7] INFO c.a.a.e.t.VisiableThreadPoolTaskExecutor - async-service-, 2. do submit,taskCount [14], completedTaskCount [14], activeCount [0], queueSize [0]

到此为止就OK了!


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:用 Python 提取 PDF 文本的简单方法(用我的手指搅乱吧樱花未增删翻译无马)
下一篇:#yyds干货盘点#元组截取值 - python基础学习系列(77)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~