SpringBoot实现动态多线程并发定时任务

网友投稿 404 2022-10-24


SpringBoot实现动态多线程并发定时任务

本文实例为大家分享了SpringBoot实现动态多线程并发定时任务的具体代码,供大家参考,具体内容如下

实现定时任务有多种方式,使用spring自带的,继承SchedulingConfigurer的方式。

一、实现

1、启动类

在启动类添加注解@EnableScheduling开启,不然不起用做。

2、新建任务类

添加注解@Component注册到spring的容器中。

package com.example.demo.task;

import com.example.demo.entity.MyTask;

import lombok.extern.slf4j.Slf4j;

import org.springframework.scheduling.annotation.SchedulingConfigurer;

import org.springframework.scheduling.config.CronTask;

import org.springframework.scheduling.config.ScheduledTaskRegistrar;

import org.springframework.stereotype.Component;

import org.springframework.util.StringUtils;

import javax.annotation.PreDestroy;

import java.util.Arrays;

import java.util.List;

import java.util.Objects;

import java.util.concurrent.ConcurrentHashMap;

import java.util.concurrent.Executors;

import java.util.concurrent.ScheduledFuture;

/**

* @path:com.example.demo.task.ScheduledTask.java

* @className:ScheduledTask.java

* @description:定时任务

* @author:tanyp

* @dateTime:2020/7/23 21:37

* @editNote:

*/

@Slf4j

@Component

public class ScheduledTask implements SchedulingConfigurer {

private volatile ScheduledTaskRegistrar registrar;

private final ConcurrentHashMap> scheduledFutures = new ConcurrentHashMap<>();

private final ConcurrentHashMap cronTasks = new ConcurrentHashMap<>();

/**

* 默认启动10个线程

*/

private static final Integer DEFAULT_THREAD_POOL = 10;

@Override

public void configureTasks(ScheduledTaskRegistrar registrar) {

registrar.setScheduler(Executors.newScheduledThreadPool(DEFAULT_THREAD_POOL));

this.registrar = registrar;

}

@PreDestroy

public void destroy() {

this.registrar.destroy();

}

/**

* @methodName:refreshTask

* @description:初始化任务

* 1、从数据库获取执行任务的集合【TxTask】

* 2、通过调用 【refresh】 方法刷新任务列表

* 3、每次数据库中的任务发生变化后重新执行【1、2】

* @author:tanyp

* @dateTime:2020/7/23 21:37

* @Params: [tasks]

* @Return: void

* @editNote:

*/

public void refreshTask(List tasks) {

// 删除已经取消任务

scheduledFutures.keySet().forEach(key -> {

if (Objects.isNull(tasks) || tasks.size() == 0) {

scheduledFutures.get(key).cancel(false);

scheduledFutures.remove(key);

cronTasks.remove(key);

return;

}

tasks.forEach(task -> {

if (!Objects.equals(key, task.getTaskId())) {

scheduledFutures.get(key).cancel(false);

scheduledFutures.remove(key);

cronTasks.remove(key);

http:// return;

}

});

});

// 添加新任务、更改执行规则任务

tasks.forEach(txTask -> {

String expression = txTask.getExpression();

// 任务表达式为空则跳过

if (StringUtils.isEmpty(expression)) {

return;

}

// 任务已存在并且表达式未发生变化则跳过

if (scheduledFutures.containsKey(txTask.getTaskId()) && cronTasks.get(txTask.getTaskId()).getExpression().equals(expression)) {

return;

}

// 任务执行时间发生了变化,则删除该任务

if (scheduledFutures.containsKey(txTask.getTaskId())) {

scheduledFutures.get(txTask.getTaskId()).cancel(false);

scheduledFutures.remove(txTask.getTaskId());

cronTasks.remove(txTask.getTaskId());

}

CronTask task = new CronTask(new Runnable() {

@Override

public void run() {

// 执行业务逻辑

try {

log.info("执行单个任务,任务ID【{}】执行规则【{}】", txTask.getTaskId(), txTask.getExpression());

System.out.println("==========================执行任务=============================");

} catch (Exception e) {

log.error("执行发送消息任务异常,异常信息:{}", e);

}

}

}, expression);

ScheduledFuture> future = registrar.getScheduler().schedule(task.getRunnable(), task.getTrigger());

cronTasks.put(txTask.getTaskId(), task);

scheduledFutures.put(txTask.getTaskId(), future);

});

}

}

3、创建自启动任务类

package com.example.demo.task;

import com.example.demo.task.ScheduledTask;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.ApplicationArguments;

import org.springframework.boot.ApplicationRunner;

import org.springframework.stereotype.Component;

import java.util.Arrays;

import java.util.List;

/**

* @path:com.example.demo.task.MyApplicationRunner.java

* @className:ScheduledTask.java

* @description:自启动

* @author:tanyp

* @dateTime:2020/7/23 21:37

* @editNote:

*/

@Slf4j

@Component

public class MyApplicationRunner implements ApplicationRunner {

@Autowired

private ScheduledTask scheduledTask;

@Override

public void run(ApplicationArguments args) throws Exception {

log.info("================项目启动初始化定时任务====开始===========");

/**

* 初始化三个任务:

* 1、10秒执行一次

* 2、15秒执行一次

* 3、20秒执行一次

*/

List tasks = Arrays.asList(

MyTask.builder().taskId("10001").expression("*/10 * * * * ?").build(),

MyTask.builder().taskId("10002").expression("*/15 * * * * ?").build(),

MyTask.builder().taskId("10003").expression("*/20 * * * * ?").build()

);

scheduledTask.refreshTask(tasks);

log.info("================项目启动初始化定时任务====完成==========");

}

}

4、实体

package com.example.demo.entity;

import lombok.AllArgsConstructor;

import lombok.Builder;

import lombok.Data;

import lombok.NoArgsConstructor;

/**

* @path:com.example.demo.entity.MyTask.java

* @className:MyTask.java

* @description:任务实体

* @author:tanyp

* @dateTime:2020/7/23 21:41

* @editNote:

*/

@Data

@Builder

@AllArgsConstructor

@NoArgsConstructor

public class MyTask {

/**

* 任务id

*/

private String taskId;

/**

* 任务执行规则时间

*/

private String expression;

}

二、测试

初始化三个任务,分别为:

10秒执行一次(*/10 * * * * ?)

15秒执行一次(*/15 * * * * ?)

20秒执行一次(*/20 * * * * ?)

测试效果:

可以看到初始化的三个任务都在执行,并且是不用的线程在执行。

三、动态使用方式

1、启动方式有两种:

启动项目后,手动调用ScheduledTask.refreshTask(List tasks),并初始化任务列表;

使用我测试中的方式,配置项目启动完成后自动调用初始任务的方法,并初始化任务列表。

2、数据初始化

只需要给 List集合赋值并调用refreshTask()方法即可:

根据业务需求修改MyTask实体类;

这里的初始化数据可以从数据库读取数据赋值给集合;

例如:从mysql读取任务配置表的数据,调用refreshTask()方法。

3、如何动态?

修改:修改某一项正在执行的任务规则;

添加:添加一项新的任务;

删除:停止某一项正在执行的任务。

例如:我们有一张任务配置表,此时进行分别新增一条或多条数据、删除一条或多条数据、改一条数据,只需要完成以上任何一项操作后,重新调用一下refreshTask()方法即可。

怎么重新调用 refreshTask()方法:可以另外启一个任务实时监控任务表的数据变化。


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:数据传输中用到的模拟量和开关量指什么?
下一篇:物联网网关下连锁店联网解决方案
相关文章

 发表评论

暂时没有评论,来抢沙发吧~