基于Spring Batch向Elasticsearch批量导入数据示例

网友投稿 472 2023-02-16


基于Spring Batch向Elasticsearch批量导入数据示例

1.介绍

当系统有大量数据需要从数据库导入Elasticsearch时,使用Spring Batch可以提高导入的效率。Spring Batch使用ItemReader分页读取数据,ItemWriter批量写数据。由于Spring Batch没有提供Elastisearch的ItemWriter和ItemReader,本示例中自定义一个ElasticsearchItemWriter(ElasticsearchItemReader),用于批量导入。

2.示例

2.1 pom.xml

本文使用spring data jest连接ES(也可以使用sprinQhfRAxTkg data elasticsearch连接ES),ES版本为5.5.3

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.hfcsbc.estl

es-etl

0.0.1-SNAPSHOT

jar

es-etl

Demo project for Spring Boot

org.springframework.boot

spring-boot-starter-parent

2.0.0.M7

UTF-8

UTF-8

1.8

org.springframework.boot

spring-boot-starter

org.springframework.boot

spring-boot-starter-data-jpa

org.postgresql

postgresql

org.springframework.boot

spring-boot-starter-batch

com.github.vanroy

spring-boot-starter-data-jest

3.0.0.RELEASE

io.searchbox

jest

5.3.2

org.projectlombok

lombok

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-maven-plugin

spring-snapshots

Spring Snapshots

https://repo.spring.io/snapshot

true

spring-milestones

Spring Milestones

https://repo.spring.io/milestone

false

spring-snapshots

Spring Snapshots

https://repo.spring.io/snapshot

true

spring-milestones

Spring Milestones

https://repo.spring.io/milestone

false

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.hfcsbc.estl

es-etl

0.0.1-SNAPSHOT

jar

es-etl

Demo project for Spring Boot

org.springframework.boot

spring-boot-starter-parent

2.0.0.M7

UTF-8

UTF-8

1.8

org.springframework.boot

spring-boot-starter

org.springframework.boot

spring-boot-starter-data-jpa

org.postgresql

postgresql

org.springframework.boot

spring-boot-starter-batch

com.github.vanroy

spring-boot-starter-data-jest

3.0.0.RELEASE

io.searchbox

jest

5.3.2

org.projectlombok

lombok

org.springframework.boot

spring-boot-starter-test

test

org.springframework.boot

spring-boot-maven-plugin

spring-snapshots

Spring Snapshots

https://repo.spring.io/snapshot

true

spring-milestones

Spring Milestones

https://repo.spring.io/milestone

false

spring-snapshots

Spring Snapshots

https://repo.spring.io/snapshot

true

spring-milestones

Spring Milestones

https://repo.spring.io/milestone

false

2.2 实体类及repository

package com.hfcsbc.esetl.domain;

import lombok.Data;

import org.springframework.data.elasticsearch.annotations.Document;

import org.springframework.data.elasticsearch.annotations.Field;

import org.springframework.data.elasticsearch.annotations.FieldType;

import javax.persistence.Entity;

import javax.persistence.Id;

import javax.persistence.OneToOne;

/**

* Create by pengchao on 2018/2/23

*/

@Document(indexName = "person", type = "person", shards = 1, replicas = 0, refreshInterval = "-1")

@Entity

@Data

public class Person {

@Id

private Long id;

private String name;

@OneToOne

@Field(type = FieldType.Nested)

private Address address;

}

package com.hfcsbc.esetl.domain;

import lombok.Data;

import javax.persistence.Entity;

import javax.persistence.Id;

/**

* Create by pengchao on 2018/2/23

*/

@Entity

@Data

public class Address {

@Id

private Long id;

private String name;

}

package com.hfcsbc.esetl.repository.jpa;

import com.hfcsbc.esetl.domain.Person;

import org.springframework.data.jpa.repository.JpaRepository;

/**

* Create by pengchao on 2018/2/23

*/

public interface PersonRepository extends JpaRepository {

}

package com.hfcsbc.esetl.repository.es;

import com.hfcsbc.esetl.domain.Person;

import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;

/**

* Create by pengchao on 2018/2/23

*/

public interface EsPersonRepository extends ElasticsearchRepository {

}

2.3 配置elasticsearchItemWriter

package com.hfcsbc.esetl.itemWriter;

import com.hfcsbc.esetl.repository.es.EsPersonRepository;

import com.hfcsbc.esetl.domain.Person;

import org.springframework.batch.core.ExitStatus;

import org.springframework.batch.core.ItemWriteListener;

import org.springframework.batch.core.StepExecution;

import org.springframework.batch.core.StepExecutionListener;

import org.springframework.batch.item.ItemWriter;

import java.util.List;

/**

* Create by pengchao on 2018/2/23

*/

public class ElasticsearchItemWriter implements ItemWriter, ItemWriteListener, StepExecutionListener {

private EsPersonRepository personRepository;

public ElasticsearchItemWriter(EsPersonRepository personRepository) {

this.personRepository = personRepository;

}

@Override

public void beforeWrite(List extends Person> items) {

}

@Override

public void afterWrite(List extends Person> items) {

}

@Override

public void onWriteError(Exception exception, List extends Person> items) {

}

@Override

public void beforeStep(StepExecution stepExecution) {

}

@Override

public ExitStatus afterStep(StepExecution stepExecution) {

return null;

}

@Override

public void write(List extends Person> items) throws Exception {

//实现类AbstractElasticsearchRepository的saveAll方法调用的是elasticsearchOperations.bulkIndex(queries),为批量索引

personRepository.saveAll(items);

}

}

2.4 配置ElasticsearchItemReader(本示例未使用,仅供参考)

package com.hfcsbc.esetl.itemReader;

import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;

import org.springframework.beans.factory.InitializingBean;

import org.springframework.data.elasticsearch.core.ElasticsearchOperations;

import org.springframework.data.elasticsearch.core.query.SearchQuery;

import java.util.Iterator;

/**

* Create by pengchao on 2018/2/24

*/

public class ElasticsearchItemReader extends AbstractPaginatedDataItemReader implements InitializingBean {

private final ElasticsearchOperations elasticsearchOperations;

private final SearchQuery query;

private final Class extends Person> targetType;

public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class extends Person> targetType) {

this.elasticsearchOperations = elasticsearchOperations;

this.query = query;

this.targetType = targetType;

}

@Override

protected Iterator doPageRead() {

return (Iterator)elasticsearchOperations.queryForList(query, targetType).iterator();

}

@Override

public void afterPropertiesSet() throws Exception {

}

}

2.5 配置spring batch需要的配置

package com.hfcsbc.esetl.config;

import com.hfcsbc.esetl.itemWriter.ElasticsearchItemWriter;

import com.hfcsbc.esetl.repository.es.EsPersonRepository;

import com.hfcsbc.esetl.domain.Person;

import org.springframework.batch.core.Job;

import org.springframework.batch.core.Step;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;

import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;

import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;

import org.springframework.batch.core.launch.support.RunIdIncrementer;

import org.springframework.batch.core.repository.JobRepository;

import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;

import org.springframework.batch.item.ItemReader;

import org.springframework.batch.item.ItemWriter;

import org.springframework.batch.item.database.JpaPagingItemReader;

import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.transaction.PlatformTransactionManager;

import javax.persistence.EntityManagerFactory;

import javax.sql.DataSource;

/**

* Create by pengchao on 2018/2/23

*/

@Configuration

@EnableBatchProcessing

public class BatchConfig {

@Autowired

private EsPersonRepository personRepository;

@Bean

public ItemReader orderItemReader(EntityManagerFactory entityManagerFactory){

JpaPagingItemReader reader = new JpaPagingItemReader();

String sqlQuery = "select * from person";

try {

JpaNativeQueryProvider queryProvider = new JpaNativeQueryProvider();

queryProvider.setSqlQuery(sqlQuery);

queryProvider.setEntityClass(Person.class);

queryProvider.afterPropertiesSet();

reader.setEntityManagerFactory(entityManagerFactory);

reader.setPageSize(10000);

reader.setQueryProvider(queryProvider);

reader.afterPropertiesSet();

reader.setSaveState(true);

} catch (Exception e) {

e.printStackTrace();

}

return reader;

}

@Bean

public ElasticsearchItemWriter itemWriter(){

return new ElasticsearchItemWriter(personRepository);

}

@Bean

public Step step(StepBuilderFactory stepBuilderFactory,

ItemReader itemReader,

ItemWriter itemWriter){

return stepBuilderFactory

.get("step1")

.chunk(10000)

.reader(itemReader)

.writer(itemWriter)

.build();

}

@Bean

public Job job(JobBuilderFactory jobBuilderFactory, Step step){

return jobBuilderFactory

.get("importJob")

.incrementer(new RunIdIncrementer())

.flow(step)

.end()

.build();

}

/**

* spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:dataSource

* @param dataSource

* @param manager

* @return

*/

@Bean

public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager manager){

JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();

jobRepositoryFactoryBean.setDataSource(dataSource);

jobRepositoryFactoryBean.setTransactionManager(manager);

jobRepositoryFactoryBean.setDatabaseType("postgres");

try {

return jobRepositoryFactoryBean.getObject();

} catch (Exception e) {

e.printStackTrace();

}

return null;

}

}

2.6配置数据库及es的连接地址

spring:

redis:

host: 192.168.1.222

data:

jest:

uri: http://192.168.1.222:9200

username: elastic

password: changeme

jpa:

database: POSTGRESQL

show-sql: true

hibernate:

ddl-auto: update

datasource:

platform: postgres

url: jdbc:postgresql://192.168.1.222:5433/person

username: hfcb

password: hfcb

driver-class-name: org.postgresql.Driver

max-active: 2

spring.batch.initialize-schema: always

2.7 配置入口类

package com.hfcsbc.esetl;

import org.springframework.boot.SpringApplication;

import org.springframework.boot.autoconfigure.SpringBootApplication;

import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;

import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration;

import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;

import org.springframework.data.jpa.repository.config.EnableJpaRepositories;

@SpringBootApplication(exclude = {ElasticsearchAutoConfiguration.class, ElasticsearchDataAutoConfiguration.class})

@EnableElasticsearchRepositories(basePackages = "com.hfcsbc.esetl.repository")

@EnableJpaRepositories(basePackages = "com.hfcsbc.esetl.repository.jpa")

public class EsEtlApplication {

public static void main(String[] args) {

SpringApplication.run(EsEtlApplication.class, args);

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:vue cli webpack中使用sass的方法
下一篇:解决Vue不能检测数组或对象变动的问题
相关文章

 发表评论

暂时没有评论,来抢沙发吧~