vue项目接口域名动态的获取方法
472
2023-02-16
基于Spring Batch向Elasticsearch批量导入数据示例
1.介绍
当系统有大量数据需要从数据库导入Elasticsearch时,使用Spring Batch可以提高导入的效率。Spring Batch使用ItemReader分页读取数据,ItemWriter批量写数据。由于Spring Batch没有提供Elastisearch的ItemWriter和ItemReader,本示例中自定义一个ElasticsearchItemWriter(ElasticsearchItemReader),用于批量导入。
2.示例
2.1 pom.xml
本文使用spring data jest连接ES(也可以使用sprinQhfRAxTkg data elasticsearch连接ES),ES版本为5.5.3
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2.2 实体类及repository
package com.hfcsbc.esetl.domain;
import lombok.Data;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.OneToOne;
/**
* Create by pengchao on 2018/2/23
*/
@Document(indexName = "person", type = "person", shards = 1, replicas = 0, refreshInterval = "-1")
@Entity
@Data
public class Person {
@Id
private Long id;
private String name;
@OneToOne
@Field(type = FieldType.Nested)
private Address address;
}
package com.hfcsbc.esetl.domain;
import lombok.Data;
import javax.persistence.Entity;
import javax.persistence.Id;
/**
* Create by pengchao on 2018/2/23
*/
@Entity
@Data
public class Address {
@Id
private Long id;
private String name;
}
package com.hfcsbc.esetl.repository.jpa;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.jpa.repository.JpaRepository;
/**
* Create by pengchao on 2018/2/23
*/
public interface PersonRepository extends JpaRepository
}
package com.hfcsbc.esetl.repository.es;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
/**
* Create by pengchao on 2018/2/23
*/
public interface EsPersonRepository extends ElasticsearchRepository
}
2.3 配置elasticsearchItemWriter
package com.hfcsbc.esetl.itemWriter;
import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.item.ItemWriter;
import java.util.List;
/**
* Create by pengchao on 2018/2/23
*/
public class ElasticsearchItemWriter implements ItemWriter
private EsPersonRepository personRepository;
public ElasticsearchItemWriter(EsPersonRepository personRepository) {
this.personRepository = personRepository;
}
@Override
public void beforeWrite(List extends Person> items) {
}
@Override
public void afterWrite(List extends Person> items) {
}
@Override
public void onWriteError(Exception exception, List extends Person> items) {
}
@Override
public void beforeStep(StepExecution stepExecution) {
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
@Override
public void write(List extends Person> items) throws Exception {
//实现类AbstractElasticsearchRepository的saveAll方法调用的是elasticsearchOperations.bulkIndex(queries),为批量索引
personRepository.saveAll(items);
}
}
2.4 配置ElasticsearchItemReader(本示例未使用,仅供参考)
package com.hfcsbc.esetl.itemReader;
import org.springframework.batch.item.data.AbstractPaginatedDataItemReader;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.elasticsearch.core.ElasticsearchOperations;
import org.springframework.data.elasticsearch.core.query.SearchQuery;
import java.util.Iterator;
/**
* Create by pengchao on 2018/2/24
*/
public class ElasticsearchItemReader
private final ElasticsearchOperations elasticsearchOperations;
private final SearchQuery query;
private final Class extends Person> targetType;
public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class extends Person> targetType) {
this.elasticsearchOperations = elasticsearchOperations;
this.query = query;
this.targetType = targetType;
}
@Override
protected Iterator
return (Iterator
}
@Override
public void afterPropertiesSet() throws Exception {
}
}
2.5 配置spring batch需要的配置
package com.hfcsbc.esetl.config;
import com.hfcsbc.esetl.itemWriter.ElasticsearchItemWriter;
import com.hfcsbc.esetl.repository.es.EsPersonRepository;
import com.hfcsbc.esetl.domain.Person;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.orm.JpaNativeQueryProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.PlatformTransactionManager;
import javax.persistence.EntityManagerFactory;
import javax.sql.DataSource;
/**
* Create by pengchao on 2018/2/23
*/
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private EsPersonRepository personRepository;
@Bean
public ItemReader
JpaPagingItemReader
String sqlQuery = "select * from person";
try {
JpaNativeQueryProvider
queryProvider.setSqlQuery(sqlQuery);
queryProvider.setEntityClass(Person.class);
queryProvider.afterPropertiesSet();
reader.setEntityManagerFactory(entityManagerFactory);
reader.setPageSize(10000);
reader.setQueryProvider(queryProvider);
reader.afterPropertiesSet();
reader.setSaveState(true);
} catch (Exception e) {
e.printStackTrace();
}
return reader;
}
@Bean
public ElasticsearchItemWriter itemWriter(){
return new ElasticsearchItemWriter(personRepository);
}
@Bean
public Step step(StepBuilderFactory stepBuilderFactory,
ItemReader itemReader,
ItemWriter itemWriter){
return stepBuilderFactory
.get("step1")
.chunk(10000)
.reader(itemReader)
.writer(itemWriter)
.build();
}
@Bean
public Job job(JobBuilderFactory jobBuilderFactory, Step step){
return jobBuilderFactory
.get("importJob")
.incrementer(new RunIdIncrementer())
.flow(step)
.end()
.build();
}
/**
* spring batch执行时会创建一些自身需要的表,这里指定表创建的位置:dataSource
* @param dataSource
* @param manager
* @return
*/
@Bean
public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager manager){
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource);
jobRepositoryFactoryBean.setTransactionManager(manager);
jobRepositoryFactoryBean.setDatabaseType("postgres");
try {
return jobRepositoryFactoryBean.getObject();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
2.6配置数据库及es的连接地址
spring:
redis:
host: 192.168.1.222
data:
jest:
uri: http://192.168.1.222:9200
username: elastic
password: changeme
jpa:
database: POSTGRESQL
show-sql: true
hibernate:
ddl-auto: update
datasource:
platform: postgres
url: jdbc:postgresql://192.168.1.222:5433/person
username: hfcb
password: hfcb
driver-class-name: org.postgresql.Driver
max-active: 2
spring.batch.initialize-schema: always
2.7 配置入口类
package com.hfcsbc.esetl;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchAutoConfiguration;
import org.springframework.boot.autoconfigure.data.elasticsearch.ElasticsearchDataAutoConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;
import org.springframework.data.jpa.repository.config.EnableJpaRepositories;
@SpringBootApplication(exclude = {ElasticsearchAutoConfiguration.class, ElasticsearchDataAutoConfiguration.class})
@EnableElasticsearchRepositories(basePackages = "com.hfcsbc.esetl.repository")
@EnableJpaRepositories(basePackages = "com.hfcsbc.esetl.repository.jpa")
public class EsEtlApplication {
public static void main(String[] args) {
SpringApplication.run(EsEtlApplication.class, args);
}
}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~