Spring Batch轻量级批处理框架实战

网友投稿 372 2022-09-21


Spring Batch轻量级批处理框架实战

目录1 实战前的理论基础1.1 Spring Batch是什么1.2 Spring Batch能做什么1.3 基础架构1.4 核心概念和抽象2 各个组件介绍2.1 Job2.2 Step2.3 ExecutionContext2.4 JobRepository2.5 JobLauncher2.6 Item Reader2.7 Item Writer2.8 Item Processor3 Spring Batch实战3.1 依赖和项目结构以及配置文件3.2 代码和数据表3.3 测试4 实战后的总结4.1 JobBuilderFactory4.2 StepBuilderFactory参考文档:

1 实战前的理论基础

1.1 Spring Batch是什么

Spring Batch 是一个轻量级、全面的批处理框架,旨在支持开发对企业系统日常运营至关重要的强大的批处理应用程序。同时使开发人员在必要时可以轻松访问和利用更先进的企业服务。Spring Batch 不是调度框架,它旨在与调度程序一起工作,而不是取代调度程序。

1.2 Spring Batch能做什么

自动化、复杂的大量信息处理,无需用户交互即可最有效地处理。这些操作通常包括基于时间的事件(例如月末计算、通知或通信)。

定期应用在非常大的数据集上重复处理的复杂业务规则(例如,保险福利确定或费率调整)。

将从内部和外部系统接收的信息集成到记录系统中,这些信息通常需要以事务方式进行格式化、验证和处理。批处理用于每天为企业处理数十亿笔交易。

业务场景:

定期提交批处理

并发批处理:作业的并行处理

分阶段的、企业消息驱动的处理

大规模并行批处理

失败后手动或计划重启

依赖步骤的顺序处理(扩展到工作流驱动的批处理)

部分处理:跳过记录(例如,在回滚时)

整批事务,适用于小批量或现有存储过程/脚本的情况

总之Spring batch可以做的:

从数据库、文件或队列中读取大量记录。

以某种方式处理数据。

以修改后的形式写回数据。

1.3 基础架构

1.4 核心概念和抽象

核心概念:一个 Job 有一对多的Step,每个步骤都正好有一个 ItemReader、一个ItemProcessor和 一个ItemWriter。需要启动作业(使用 JobLauncher),并且需要存储有关当前运行进程的元数据(在 中 JobRepository)。

2 各个组件介绍

2.1 Job

Job是封装了整个批处理过程的实体。与其他 Spring 项目一样,一个Job与 XML 配置文件或基于 java 的配置连接在一起。这种配置可以被称为“作业配置”。

可配置项:

作业的简单名称。

Step实例的定义和排序。

作业是否可重新启动。

2.2 Step

一个Step是一个域对象,它封装了批处理作业的一个独立的、连续的阶段。因此,每个 Job 完全由一个或多个步骤组成。一个Step包含定义和控制实际批处理所需的所有信息。

一个StepExecution代表一次尝试执行一个Step。StepExecution 每次Step运行时都会创建一个新的,类似于JobExecution。

2.3 ExecutionContext

一个ExecutionContext表示由框架持久化和控制的键/值对的集合,以允许开发人员有一个地方来存储范围为StepExecution对象或JobExecution对象的持久状态。

2.4 JobRepository

JobRepository是上述所有 Stereotypes 的持久性机制。它提供了CRUD操作JobLauncher,Job以及Step实现。当 Job第一次启动,一个JobExecution被从库中获得,并且,执行的过程中,StepExecution和JobExecution实施方式是通过将它们传递到存储库持续。

使用 Java 配置时,@EnableBatchProcessing注解提供了一个 JobRepository作为开箱即用自动配置的组件之一。

2.5 JobLauncher

JobLauncher表示一个简单的接口,用于Job使用给定的 集合 启动JobParameters,如以下示例所示:

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)

throws JobExecutionAlreadyRunningException, JobRestartException,

JobInstanceAlreadyCompleteException, JobParametersInvalidException;

}

期望实现JobExecution从 中 获得有效JobRepository并执行Job。

2.6 Item Reader

ItemReader是一种抽象,表示一次检索Step一个项目的输入。当ItemReader用完它可以提供的项目时,它通过返回来表明这一点null。

2.7 Item Writer

ItemWriter是一种抽象,表示一次一个Step、一批或一大块项目的输出。通常, anItemWriter不知道它接下来应该接收的输入,并且只知道在其当前调用中传递的项目。

2.8 Item Processor

ItemProcessor是表示项目的业务处理的抽象。当ItemReader读取一个项目并ItemWriter写入它们时,它 ItemProcessor提供了一个访问点来转换或应用其他业务处理。如果在处理该项目时确定该项目无效,则返回 null表示不应写出该项目。

3 Spring Batch实战

下面就利用我们所学的理论实现一个最简单的Spring Batch批处理项目

3.1 依赖和项目结构以及配置文件

依赖

org.springframework.boot

spring-boot-starter-batch

org.springframework.boot

spring-boot-starter-web

org.projectlombok

lombok

1.18.20

mysql

mysql-connector-java

5.1.47

JbavlXB

com.baomidou

mybatis-plus-boot-starter

3.2.0

项目结构

配置文件

server.port=9000

spring.datasource.url=jdbc:mysql://localhost:3306/test

spring.datasource.username=root

spring.datasource.password=12345

spring.datasource.driver-class-name=com.mysql.jdbc.Driver

3.2 代码和数据表

数据表

CREATE TABLE `student` (

`id` int(100) NOT NULL AUTO_INCREMENT,

`name` varchar(45) DEFAULT NULL,

`age` int(2) DEFAULT NULL,

`address` varchar(45) DEFAULT NULL,

PRIMARY KEY (`id`),

UNIQUE KEY `id_UNIQUE` (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=203579 DEFAULT CHARSET=utf8 ROW_FORMAT=REDUNDANT

Student实体类

/**

* @desc: Student实体类

* @author: YanMingXin

* @create: 2021/10/15-12:17

**/

@Data

@Accessors(chain = true)

@NoArgsConstructor

@AllArgsConstructor

@ToString

@TableName("student")

public class Student {

@TableId(value = "id", type = IdType.AUTO)

private Long sId;

@TableField("name")

private String sName;

@TableField("age")

private Integer sAge;

@TableField("address")

private String sAddress;

}

Mapper层

/**

* @desc: Mapper层

* @author: YanMingXin

* @create: 2021/10/15-12:17

**/

@Mapper

@Repository

public interface StudentDao extends BaseMapper {

}

模拟数据库(文件)中读取类

/**

* @desc: 模拟数据库中读取

* @author: YanMingXin

* @create: 2021/10/16-10:13

**/

public class StudentVirtualDao {

/**

* 模拟从数据库中读取

*

* @return

*/

public List getStudents() {

ArrayList students = new ArrayList<>();

students.add(new Student(1L, "zs", 23, "Beijing"));

students.add(new Student(2L, "ls", 23, "Beijing"));

students.add(new Student(3L, "ww", 23, "Beijing"));

students.add(new Student(4L, "zl", 23, "Beijing"));

students.add(new Student(5L, "mq", 23, "Beijing"));

students.add(new Student(6L, "gb", 23, "Beijing"));

students.add(new Student(7L, "lj", 23, "Beijing"));

students.add(new Student(8L, "ss", 23, "Beijing"));

students.add(new Student(9L, "zsdd", 23, "Beijing"));

students.add(new Student(10L, "zss", 23, "Beijing"));

return students;

}

}

Service层接口

/**

* @desc:

* @author: YanMingXin

* @create: 2021/10/15-12:16

**/

public interface StudentService {

List selectStudentsFromDB();

void insertStudent(Student student);

}

Service层实现类

/**

* @desc: Service层实现类

* @author: YanMingXin

* @create: 2021/10/15-12:16

**/

@Service

public class StudentServiceImpl implements StudentService {

@Autowired

private StudentDao studentDao;

@Override

public List selectStudentsFromDB() {

return studentDao.selectList(null);

}

@Override

public void insertStudent(Student student) {

studentDao.insert(student);

}

}

最核心的配置类BatchConfiguration

/**

* @desc: BatchConfiguration

* @author: YanMingXin

* @create: 2021/10/15-12:25

**/

@Configuration

@EnableBatchProcessing

@SuppressWarnings("all")

public class BatchConfiguration {

/**

* 注入JobBuilderFactory

*/

@Autowired

public JobBuilderFactory jobBuilderFactory;

/**

* 注入StepBuilderFactory

*/

@Autowired

public StepBuilderFactory stepBuilderFactory;

/**

* 注入JobRepository

*/

@Autowired

public JobRepository jobRepository;

/**

* 注入JobLauncher

*/

@Autowired

private JobLauncher jobLauncher;

/**

* 注入自定义StudentService

*/

@Autowired

private StudentService studentService;

/**

* 注入自定义job

*/

@Autowired

private Job studentJob;

/**

* 封装writer bean

*

* @return

*/

@Bean

public ItemWriter writer() {

ItemWriter writer = new ItemWriter() {

@Override

public void write(List list) throws Exception {

//debug发现是嵌套的List reader的线程List嵌套真正的List

list.forEach((stu) -> {

for (Student student : (ArrayList) stu) {

studentService.insertStudent(student);

}

});

}

};

return writer;

}

/**

* 封装reader bean

*

* @return

*/

@Bean

public ItemReader reader() {

ItemReader reader = new ItemReader() {

@Override

public Object read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {

//模拟数据获取

StudentVirtualDao virtualDao = new StudentVirtualDao();

return virtualDao.getStudents();

}

};

return reader;

}

/**

* 封装processor bean

*

* @return

*/

@Bean

public ItemProcessor processor() {

ItemProcessor processor = new ItemProcessor() {

@Override

public Object process(Object o) throws Exception {

//debug发现o就是reader单次单线程读取的数据

return o;

}

};

return processor;

}

/**

* 封装自定义step

*

* @return

*/

@Bean

public Step studentStepOne() {

return stepBuilderFactory.get("studentStepOne")

.chunk(1)

.reader(reader()) //加入reader

.processor(processor()) //加入processor

.writer(writer())//加入writer

.build();

}

/**

* 封装自定义job

*

* @return

*/

@Bean

public Job studentJob() {

return jobBuilderFactory.get("studentJob")

.flow(studentStepOne())//加入step

.end()

.build();

}

/**

* 使用spring 定时任务执行

*/

@Scheduled(fixedRate = 5000)

public void printMessage() {

try {

JobParameters jobParameters = new JobParametersBuilder()

.addLong("time", System.currentTimeMillis())

.toJobParameters();

jobLauncher.run(studentJob, jobParameters);

} catch (Exception e) {

e.printStackTrace();

}

}

}

3.3 测试

项目启动1s之后

看数据库,除了我们实体类定义的表以外多出来这么多表,这些表都是spring batch自带的记录日志和错误的表,具体的字段含义的有待研究

4 实战后的总结

Spring Batch有非常快的写入和读取速度,但是带来的影响就是非常耗费内存和数据库连接池的资源如果使用不好的话还会发生异常,因此我们要进行正确的配置,接下来我们进行简单的源码探究:

4.1 JobBuilderFactory

job的获取使用了简单工厂模式和建造者模式JobBuilderFactory获取JobBuilder在经过配置返回一个job对象的实例,该实例就是Spring Batch中最顶级的组件,包含了n和step

public class JobBuilderFactory {

private JobRepository jobRepository;

public JobBuilderFactory(JobRepository jobRepository) {

this.jobRepository = jobRepository;

}

//返回JobBuilder

public JobBuilder get(String name) {

JobBuilder builder = new JobBuilder(name).repository(jobRepository);

return builder;

}

}

jobBuilder类

public class JobBuilder extends JobBuilderHelper {

/**

* 为指定名称的作业创建一个新的构建器

*/

public JobBuilder(String name) {

super(name);

}

/**

* 创建将执行步骤或步骤序列的新作业构建器。

*/

public SimpleJobBuilder start(Step step) {

return new SimpleJobBuilder(this).start(step);

}

/**

* 创建将执行流的新作业构建器。

*/

public JobFlowBuilder start(Flow flow) {

return new FlowJobBuilder(this).start(flow);

}

/**

* 创建将执行步骤或步骤序列的新作业构建器

*/

public JobFlowBuilder flow(Step step) {

return new FlowJobBuilder(this).start(step);

}

}

4.2 StepBuilderFactory

直接看StepBuilder类

public class StepBuilder extends StepBuilderHelper {

public StepBuilder(String name) {

super(name);

}

/**

* 用自定义微线程构建步骤,不一定是项处理。

*/

public TaskletStepBuilder tasklet(Tasklet tasklet) {

return new TaskletStepBuilder(this).tasklet(tasklet);

}

/**

* 构建一个步骤,按照提供的大小以块的形式处理项。为了将这一步扩展到容错,

* 在构建器上调用SimpleStepBuilder的 faultolerant()方法。

* @param 输入类型

* @param 输出类型

*/

public SimpleStepBuilder chunk(int chunkSize) {

return new SimpleStepBuilder(this).chunk(chunkSize);

}

public SimpleStepBuilder chunk(CompletionPolicy completionPolicy) {

return new SimpleStepBuilder(this).chunk(completionPolicy);

}

public PartitionStepBuilder partitioner(String stepName, Partitioner partitioner) {

return new PartitionStepBuilder(this).partitioner(stepName, partitioner);

}

public PartitionStepBuilder partitioner(Step step) {

return new PartitionStepBuilder(this).step(step);

}

public JobStepBuilder job(Job job) {

return new JobStepBuilder(this).job(job);

}

/**

* 创建将执行流的新步骤构建器。

*/

public FlowStepBuilder flow(Flow flow) {

return new FlowStepBuilder(this).flow(flow);

}

}

参考文档:

https://docs.spring.io/spring-batch/docs/4.3.x/reference/html/index.html

https://jdon.com/springbatch.html


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:基于华为路由器,静态路由,递归路由,NQA(华为路由器引入静态路由)
下一篇:VLAN的端口类型
相关文章

 发表评论

暂时没有评论,来抢沙发吧~