java连接ElasticSearch集群操作

网友投稿 588 2022-11-21


java连接ElasticSearch集群操作

我就废话不多说了,大家还是直接看代码吧~

/*

*es配置类

*

*/

@Configuration

public class ElasticSearchDataSourceConfigurer {

private static final Logger LOG = LogManager.getLogger(ElasticSearchDataSourceConfigurer.class);

@Bean

public TransportClient getESClient() {

//设置集群名称

Settings settings = Settings.builder().put("cluster.name", "bigData-cluster").put("client.transport.sniff", true).build();

//创建client

TransportClient client = null;

try {

client = new PreBuiltTransportClient(settings)

.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(""), 9300));//集群ip

LOG.info("ESClient连接建立成功");

} catch (UnknownHostException e) {

LOG.info("ESClient连接建立失败");

e.printStackTrace();

}

return client;

}

}

/**

* Simple to Introduction

*

* @Description: [添加类]

*/

@Repository

public class UserDaoImpl implements userDao {

private static final String INDEXNAME = "user";//小写

private static final String TYPENAME = "info";

@Resource

TransportClient transportClient;

@Override

public int addUser(User[] user) {

IndexResponse indexResponse = null;

int successNum = 0;

for (int i = 0; i < user.length; i++) {

UUID uuid = UUID.randomUUID();

String str = uuid.toString();

String jsonValue = null;

try {

jsonValue = JsonUtil.object2JsonString(user[i]);

if (jsonValue != null) {

indexResponse = transportClient.prepareIndex(INDEXNAME, TYPENAME, str).setSource(jsonValue)

.execute().actionGet();

successNum++;

}

} catch (JsonProcessingException e) {

e.printStackTrace();

}

}

return successNum;

}

}

/**

*批量插入

*/

public static void bathAddUser(TransportClient client, List users) {

BulkRequestBuilder bulkRequest = transportClient.prepareBulk();

for (int i = 0; i < users.size(); i++) {

UUID uuid = UUID.randomUUID();

String str = uuid.toString();

String jsonValue = null;

try {

jsonValue = JsonUtil.object2JsonString(users.get(i));

} catch (JsonProcessingException e) {

e.printStackTrace();

}

bulkRequest.add(client.prepareIndex("user", "info", str).setSource(jsonValue));

// 一万条插入一次

if (i % 10000 == 0) {

bulkRequest.execute().actionGet();

}

System.out.println("已经插入第" + i + "多少条");

}

}

补充知识:使用java创建ES(ElasticSearch)连接池

1.首先要有一个创建连接的工厂类

package com.aly.util;

import org.apache.commons.pool2.PooledObject;

import org.apache.commons.pool2.PooledObjectFactory;

import org.apache.commons.pool2.impl.DefaultPooledObject;

import org.apache.http.HttpHost;

import org.elasticsearch.client.RestClient;

import org.elasticsearch.client.RestHighLevelClient;

/**

* EliasticSearch连接池工厂对象

* @author 00000

*

*/

public class EsClientPoolFactory implements PooledObjectFactory{

@Override

public void activateObject(PooledObject arg0) throws Exception {

System.out.println("activateObject");

}

/**

* 销毁对象

*/

@Override

public void destroyObject(PooledObject pooledObject) throws Exception {

RestHighLevelClient highLevelClient = pooledObject.getObject();

highLevelClient.close();

}

/**

* 生产对象

*/

// @SuppressWarnings({ "resource" })

@Override

public PooledObject makeObject() throws Exception {

// Settings settings = Settings.builder().put("cluster.name","elasticsearch").build();

RestHighLevelClient client = null;

try {

/*client = new PreBuiltTransportClient(settings)

.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"),9300));*/

client = new RestHighLevelClient(RestClient.builder(

new HttpHost("192.168.1.121", 9200, "http"), new HttpHost("192.168.1.122", 9200, "http"),

new HttpHost("192.168.1.123", 9200, "http"), new HttpHost("192.168.1.125", 9200, "http"),

new HttpHost("192.168.1.126", 9200, "http"), new HttpHost("192.168.1.127", 9200, "http")));

} catch (Exception e) {

e.printStackTrace();

}

return new DefaultPooledObject(client);

}

@Override

public void passivateObject(PooledObject arg0) throws Exception {

System.out.println("passivateObject");

}

@Override

public boolean validateObject(PooledObject arg0) {

return true;

}

}

2.然后再写我们的连接池工具类

package com.aly.util;

import org.apache.commons.pool2.impl.GenericObjectPool;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import org.elasticsearch.client.RestHighLevelClient;

/**

* ElasticSearch 连接池工具类

*

* @author 00000

*

*/

public class ElasticSearchPoolUtil {

// 对象池配置类,不写也可以,采用默认配置

private static GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();

// 采用默认配置maxTotal是8,池中有8个client

static {

poolConfig.setMaxTotal(8);

}

// 要池化的对象的工厂类,这个是我们要实现的类

private static EsClientPoolFactory esClientPoolFactory = new EsClientPoolFactory();

// 利用对象工厂类和配置类生成对象池

private static GenericObjectPool clientPool = new GenericObjectPool<>(esClientPoolFactory,

poolConfig);

/**

* 获得对象

*

* @return

* @throws Exception

*/

public static RestHighLevelClient getClient() throws Exception {

// 从池中取一个对象

RestHighLevelClient client = clientPool.borrowObject();

return client;

}

/**

* 归还对象

*

* @param client

*/

public static void returnClient(RestHighLevelClient client) {

// 使用完毕之后,归还对象

clientPool.returnObject(client);

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:JAVA解析XML字符串简单方法代码案例
下一篇:java8时间 yyyyMMddHHmmss格式转为日期的代码
相关文章

 发表评论

暂时没有评论,来抢沙发吧~