SpringBoot+SpringBatch+Quartz整合定时批量任务方式

网友投稿 410 2022-09-29


SpringBoot+SpringBatch+Quartz整合定时批量任务方式

目录一、引言二、代码具体实现1、pom文件2、application.yaml文件3、Service实现类4、SpringBatch配置类5、Processor,处理每条数据6、封装数据库返回数据的实体Bean7、启动类上要加上注解三、小结一下spring-batch与quartz集成过程中遇到的问题问题原因解决

一、引言

最近一周,被借调到其他部门,赶一个紧急需求,需求内容如下:

PC网页触发一条设备升级记录(下图),后台要定时批量设备更新。这里定时要用到Quartz,批量数据处理要用到SpringBatch,二者结合,可以完成该需求。

由于之前,没有用过SpringBatch,于是上网查了下资料,发现可参考的不是很多,于是只能去慢慢的翻看官方文档。

遇到不少问题,就记录一下吧。

二、代码具体实现

1、pom文件

org.springframework.boot

spring-boot-starter-web

org.postgresql

postgresql

org.springframework.boot

spring-boot-starter-jdbc

org.springframework.boot

spring-boot-starter-batch

org.projectlombok

lombok

org.springframework.boot

spring-boot-starter-batch

2、application.yaml文件

spring:

datasource:

username: thinklink

password: thinklink

url: jdbc:postgresql://172.16.205.54:5432/thinklink

driver-class-name: org.postgresql.Driver

batch:

job:

enabled: false

server:

port: 8073

#upgrade-dispatch-base-url: http://172.16.205.125:8080/api/rpc/dispatch/command/

upgrade-dispatch-base-url: http://172.16.205.211:8080/api/noauth/rpc/dispatch/command/

# 每次批量处理的数据量,默认为5000

batch-size: 5000

3、Service实现类

触发批处理任务的入口,执行一个job

@Service("batchService")

public class BatchServiceImpl implements BatchService {

// 框架自动注入

@Autowired

private JobLauncher jobLauncher;

@Autowired

private Job updateDeviceJob;

/**

* 根据 taskId 创建一个Job

* @param taskId

* @throws Exception

*/

@Override

public void createBatchJob(String taskId) throws Exception {

JobParameters jobParameters = new JobParametersBuilder()

.addString("taskId", taskId)

.addString("uuid", UUID.randomUUID().toString().replace("-",""))

.toJobParameters();

// 传入一个Job任务和任务需要的参数

jobLauncher.run(updateDeviceJob, jobParameters);

}

}

4、SpringBatch配置类

此部分最重要(☆☆☆☆☆)

@Configuration

public class BatchConfiguration {

private static final Logger log = LoggerFactory.getLogger(BatchConfiguration.class);

@Value("${batch-size:5000}")

private int batchSize;

// 框架自动注入

@Autowired

public JobBuilderFactory jobBuilderFactory;

// 框架自动注入

@Autowired

public StepBuilderFactory stepBuilderFactory;

// 数据过滤器,对从数据库读出来的数据,注意进行操作

@Autowired

public TaskItemProcessor taskItemProcessor;

// 接收job参数

public Map parameters;

public Object taskId;

@Autowired

private JdbcTemplate jdbcTemplate;

// 读取数据库操作

@Bean

@StepScope

public JdbcCursorItemReader itemReader(DataSource dataSource) {

String querySql = " SELECT " +

" e. ID AS taskId, " +

" e.user_id AS userId, " +

" e.timing_startup AS startTime, " +

" u.device_id AS deviceId, " +

" d.app_name AS appName, " +

" d.compose_file AS composeFile, " +

" e.failure_retry AS failureRetry, " +

" e.tetry_times AS retryTimes, " +

" e.device_managered AS deviceManagered " +

" FROM " +

" eiot_upgrade_task e " +

" LEFT JOIN eiot_upgrade_device u ON e. ID = u.upgrade_task_id " +

" LEFT JOIN eiot_app_detail d ON e.app_id = d. ID " +

" WHERE " +

" ( " +

" u.device_upgrade_status = 0 " +

" OR u.device_upgrade_status = 2" +

" )" +

" AND e.tetry_times > u.retry_times " +

" AND e. ID = ?";

return new JdbcCursorItemReaderBuilder()

.name("itemReader")

.sql(querySql)

.dataSource(dataSource)

.queryArguments(new Object[]{parameters.get("taskId").getValue()})

.rowMapper(new DispatchRequest.DispatchRequestRowMapper())

.build();

}

// 将结果写回数据库

@Bean

@StepScope

public ItemWriter itemWriter() {

return new ItemWriter() {

private int updateTaskStatus(DispatchRequest dispatchRequest, int status) {

log.info("update taskId: {}, deviceId: {} to status {}", dispatchRequest.getTaskId(), dispatchRequest.getDeviceId(), status);

Integer retryTimes = jdbcTemplate.queryForObject(

"select retry_times from eiot_upgrade_device where device_id = ? and upgrade_task_id = ?",

new Object[]{ dispatchRequest.getDeviceId(), dispatchRequest.getTaskId()}, Integer.class

);

retryTimes += 1;

int updateCount = jdbcTemplate.update("update eiot_upgrade_device set device_upgrade_status = ?, retry_times = ? " +

"where device_id = ? and upgrade_task_id = ?", status, retryTimes, dispatchRequest.getDeviceId(), dispatchRequest.getTaskId());

if (updateCount <= 0) {

log.warn("no task updatedhttp://");

} else {

log.info("count of {} task updated", updateCount);

}

// 最后一次重试

if (status == STATUS_DISPATCH_FAILED && retryTimes == dispatchRequest.getRetryTimes()) {

log.info("the last retry of {} failed, inc deviceManagered", dispatchRequest.getTaskId());

return 1;

} else {

return 0;

}

}

@Override

@Transactional

public void write(List extends ProcessResult> list) throws Exception {

Map taskMap = jdbcTemplate.queryForMap(

"select device_managered, device_count, task_status from eiot_upgrade_task where id = ?",

list.get(0).getDispatchRequest().getTaskId() // 我们认定一个批量里面,taskId都是一样的

);

int deviceManagered = (int)taskMap.get("device_managered");

Integer deviceCount = (Integer) taskMap.get("device_count");

if (deviceCount == null) {

log.warn("deviceCount of task {} is null", list.get(0).getDispatchRequest().getTaskId());

}

int taskStatus = (int)taskMap.get("task_status");

for (ProcessResult result: list) {

deviceManagered += updateTaskStatus(result.getDispatchRequest(), result.getStatus());

}

if (deviceCount != null && deviceManagered == deviceCount) {

taskStatus = 2; //任务状态 0:待升级,1:升级中,2:已完成

}

jdbcTemplate.update("update eiot_upgrade_task set device_managered = ?, task_status = ? " +

"where id = ?", deviceManagered, taskStatus, list.get(0).getDispatchRequest().getTaskId());

}

};

}

/**

* 定义一个下发更新的 job

* @return

*/

@Bean

public Job updateDeviceJob(Step updateDeviceStep) {

return jobBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))

.listener(new JobListener()) // 设置Job的监听器

.flow(updateDeviceStep)// 执行下发更新的Step

.end()

.build();

}

/**

* 定义一个下发更新的 step

* @return

*/

@Bean

public Step updateDeviceStep(JdbcCursorItemReader itemReader,ItemWriter itemWriter) {

return stepBuilderFactory.get(UUID.randomUUID().toString().replace("-", ""))

. chunk(batchSize)

.reader(itemReader) //根据taskId从数据库读取更新设备信息

.processor(taskItemProcessor) // 每条更新信息,执行下发更新接口

.writer(itemWriter)

.build();

}

// job 监听器

public class JobListener implements JobExecutionListener {

@Override

public void beforeJob(JobExecution jobExecution) {

log.info(jobExecution.getJobInstance().getJobName() + " before... ");

parameters = jobExecution.getJobParameters().getParameters();

taskId = parameters.get("taskId").getValue();

log.info("job param taskId : " + parameters.get("taskId"));

}

@Override

public void afterJob(JobExecution jobExecution) {

log.info(jobExecution.getJobInstance().getJobName() + " after... ");

// 当所有job执行完之后,查询设备更新状态,如果有失败,则要定时重新执行job

String sql = " SELECT " +

" count(*) " +

" FROM " +

" eiot_upgrade_device d " +

" LEFT JOIN eiot_upgrade_task u ON d.upgrade_task_id = u. ID " +

" WHERE " +

" u. ID = ? " +

" AND d.retry_times < u.tetry_times " +

" AND ( " +

" d.device_upgrade_status = 0 " +

" OR d.device_upgrade_status = 2 " +

" ) ";

// 获取更新失败的设备个数

Integer count = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);

log.info("update device failure count : " + count);

// 下面是使用Quartz触发定时任务

// 获取任务时间,单位秒

// String time = jdbcTemplate.queryForObject(sql, new Object[]{taskId}, Integer.class);

// 此处方便测试,应该从数据库中取taskId对应的重试间隔,单位秒

Integer millSecond = 10;

if(count != null && count > 0){

String jobName = "UpgradeTask_" + taskId;

String reTaskId = taskId.toString();

Map params = new HashMap<>();

params.put("jobName",jobName);

params.put("taskId",reTaskId);

if (QuartzManager.checkNameNotExist(jobName))

{

QuartzManager.scheduleRunOnceJob(jobName, RunOnceJobLogic.class,params,millSecond);

}

}

}

}

}

5、Processor,处理每条数据

可以在此对数据进行过滤操作

@Component("taskItemProcessor")

public class TaskItemProcessor implements ItemProcessor {

public static final int STATUS_DISPATCH_FAILED = 2;

public static final int STATUS_DISPATCH_SUCC = 1;

private static final Logger log = LoggerFactory.getLogger(TaskItemProcessor.class);

@Value("${upgrade-dispatch-base-url:http://localhost/api/v2/rpc/dispatch/command/}")

private String dispatchUrl;

@Autowired

JdbcTemplate jdbcTemplate;

/**

* 在这里,执行 下发更新指令 的操作

* @param dispatchRequest

* @return

* @throws Exception

*/

@Override

public ProcessResult process(final DispatchRequest dispatchRequest) {

// 调用接口,下发指令

String url = dispatchUrl + dispatchRequest.getDeviceId()+"/"+dispatchRequest.getUserId();

log.info("request url:" + url);

RestTemplate restTemplate = new RestTemplate();

HttpHeaders headers = new HttpHeaders();

headers.setContentType(MediaType.APPLICATION_jsON_UTF8);

MultiValueMap params = new LinkedMultiValueMap();

JSONObject jsonOuter = new JSONObject();

JSONObject jsonInner = new JSONObject();

try {

jsonInner.put("jobId",dispatchRequest.getTaskId());

jsonInner.put("name",dispatchRequest.getName());

jsonInner.put("composeFile", Base64Util.bytesToBase64Str(dispatchRequest.getComposeFile()));

jsonInner.put("policy",new JSONObject().put("startTime",dispatchRequest.getPolicy()));

jsonInner.put("timestamp",dispatchRequest.getTimestamp());

jsonOuter.put("method","updateApp");

jsonOuter.put("params",jsonInner);

} catch (JSONException e) {

log.info("JSON convert Exception :" + e);

}catch (IOException e) {

log.info("Base64Util bytesToBase64Str :" + e);

}

log.info("request body json :" + jsonOuter);

HttpEntity requestEntity = new HttpEntity(jsonOuter.toString(),headers);

int status;

try {

ResponseEntity response = restTemplate.postForEntity(url,requestEntity,String.class);

log.info("response :" + response);

if (response.getStatusCode() == HttpStatus.OK) {

status = STATUS_DISPATCH_SUCC;

} else {

status = STATUS_DISPATCH_FAILED;

}

}catch (Exception e){

status = STATUS_DISPATCH_FAILED;

}

return new ProcessResult(dispatchRequest, status);

}

}

6、封装数据库返回数据的实体Bean

注意静态内部类

public class DispatchRequest {

private String taskId;

private String deviceId;

private String userId;

private String name;

private byte[] composeFile;

private String policy;

private String timestamp;

private String md5;

private int failureRetry;

private int retryTimes;

private int deviceManagered;

// 省略构造函数,setter/getter/tostring方法

//......

public static class DispatchRequestRowMapper implements RowMapper {

@Override

public DispatchRequest mapRow(ResultSet resultSet, int i) throws SQLException {

DispatchRequest dispatchRequest = new DispatchRequest();

dispatchRequest.setTaskId(resultSet.getString("taskId"));

dispatchRequest.setUserId(resultSet.getString("userId"));

dispatchRequest.setPolicy(resultSet.getString("startTime"));

dispatchRequest.setDeviceId(resultSet.getString("deviceId"));

dispatchRequest.setName(resultSet.getString("appName"));

dispatchRequest.setComposeFile(resultSet.getBytes("composeFile"));

dispatchRequest.setTimestamp(DateUtil.DateToString(new Date()));

dispatchRequest.setRetryTimes(resultSet.getInt("retryTimes"));

dispatchRequest.setFailureRetry(resultSet.getInt("failureRetry"));

dispatchRequest.setDeviceManagered(resultSet.getInt("deviceManagered"));

return dispatchRequest;

}

}

}

7、启动类上要加上注解

@SpringBootApplication

@EnableBatchProcessing

public class Application {

public static void main(String[] args) {

SpringApplication.run(Application.class, args);

}

}

三、小结一下

其实SpringBatch并没有想象中那么好用,当从数据库中每次取5000条数据后,进入processor中是逐条处理的,这个时候不能不行操作,等5000条数据处理完之后,再一次性执行ItemWriter方法。

在使用的过程中,最坑的地方是ItemReader和ItemWriter这两个地方,如何执行自定义的Sql,参考文中代码就行。至于Quartz定时功能,很简单,只要定时创建SpringBatch里面的Job,让这个job启动就好了,此处就不在给出了,贴的代码太多了。由于公司一些原因,代码不能放到github上。

spring-batch与quartz集成过程中遇到的问题

问题

启动时报Exception

Driver's Blob representation is of an unsupported type: weblogic.jdbc.wrapper.Blob_oracle_sql_BLOB

原因

quartz的driverDelegateClass配置的是OracleDelegate,应用运行在weblogic上

解决

driverDelegateClass对应配置改为

org.quartz.impl.jdbcjobstore.oracle.weblogic.WebLogicOracleDelegate


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:越权漏洞测试之网站渗透测试服务(web越权测试)
下一篇:防止黑客攻击的数据库安全加固方案(黑客防范技术论文)
相关文章

 发表评论

评论列表