详解Spring Batch 轻量级批处理框架实践

网友投稿 290 2023-01-03


详解Spring Batch 轻量级批处理框架实践

实践内容

从 MariaDB 一张表内读 10 万条记录,经处理后写到 MongoDB 。

具体实现

1、新建 Spring Boot 应用,依赖如下:

org.springframework.boot

spring-boot-starter-web

org.springframework.boot

spring-boot-starter-logging

org.springframework.boot

spring-boot-starter-tomcat

org.springframework.boot

spring-boot-starter-undertow

org.springframework.boot

spring-boot-starter-log4j2

org.springframework.boot

spring-boot-starter-data-mongodb

org.springframework.boot

spring-boot-starter-batch

org.mariadb.jdbc

mariadb-java-client

2.0.2

org.projectlombok

lombok

1.16.14

2、创建一张表,并生成 10 万条数据

DROP TABLE people IF EXISTS;

CREATE TABLE people (

id BIGINT IDENTITY NOT NULL PRIMARY KEY,

first_name VARCHAR(20),

last_name VARCHAR(20)

);

3、创建 Person 类

@Data

public class Person {

private Long id;

private String lastName;

private String firstName;

}

4、创建一个中间处理器 PersonItemProcessor

import org.springframework.batch.item.ItemProcessor;

@Log4j2

public class PersonItemProcessor implements ItemProcessor {

@Override

public Person process(final Person person) throws Exception {

final String firstName = person.getFirstName().toUpperCase();

final String lastName = person.getLastName().toUpperCase();

final Person transformedPerson = new Person(firstName, lastName);

log.info("Converting (" + person + ") into (" + transformedPerson + ")");

return transformedPerson;

}

}

5、创建 PersonMapper,用户数据库映射

public class PersonMapper implements RowMapper {

private static final String ID_COLUMN = "id";

private static final String NICKNAME_COLUMN = "first_name";

private static final String EMAIL_COLUMN = "last_name";

@Override

public Object mapRow(ResultSet resultSet, int i) throws SQLException {

Person user = new Person();

person.setId(resultSet.getLong(ID_COLUMN));

person.setNickname(resultSet.getString(NICKNAME_COLUMN));

person.setEmail(resultSet.getString(EMAIL_COLUMN));

return person;

}

}

6、创建任务完成的监听 JobCompletionNotificationListener

@Log4j2

@Component

public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

lnatBKj@Override

public void afterJob(JobExecution jobExecution) {

if(jobExecution.getStatus() == BatchStatus.COMPLETED) {

log.info("!!! JOB FINISHED! Time to verify the results");

}

}

}

7、构建批处理任务 BatchConfiguration

@Configuration

@EnableBatchProcessing

public class BatchConfiguration {

@Autowired

public JobBuilderFactory jobBuilderFactory;

@Autowired

public StepBuilderFactory stepBuilderFactory;

@Autowired

public DataSource dataSource;

@Autowired

public MongoTemplate mongoTemplate;

@Bean

public JdbcCursorItemReader reader(){

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();

itemReader.setDataSource(dataSource);

itemReader.setSql("select id, nickname, email from people");

itemReader.setRowMapper(new PersonMapper());

return itemReader;

}

@Bean

public PersonItemProcessor processor() {

http:// return new PersonItemProcessor();

}

@Bean

MongoItemWriter writer(){

MongoItemWriter itemWriter = new MongoItemWriter();

itemWriter.setTemplate(mongoTemplate);

itemWriter.setCollection("branch");

return itemWriter;

}

@Bean

public Step step() {

return stepBuilderFactory.get("step")

. chunk(10)

.reader(reader())

.processor(processor())

.writer(writer())

.build();

}

@Bean

public Job importUserJob(JobCompletionNotificationListener listener) {

return jobBuilderFactory.get("importUserJob")

.incrementer(new RunIdIncrementer())

.listener(listener)

.flow(step())

.end()

.build();

}

}

任务处理结果

0出错,耗时 2 分钟左右,测试机 Mac


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:健身房系统接口设计方案(健身房数据库设计)
下一篇:简单了解JAVA中类、实例与Class对象
相关文章

 发表评论

暂时没有评论,来抢沙发吧~