使用java操作elasticsearch的具体方法

网友投稿 427 2023-03-02


使用java操作elasticsearch的具体方法

系统环境: vm12 下的centos 7.2

当前安装版本: elasticsearch-2.4.0.tar.gz

java操作es集群步骤1:配置集群对象信息;2:创建客户端;3:查看集群信息

1:集群名称

默认集群名为elasticsearch,如果集群名称和指定的不一致则在使用节点资源时会报错。

2:嗅探功能

通过client.transport.sniff启动嗅探功能,这样只需要指定集群中的某一个节点(不一定是主节点),然后会加载集群中的其他节点,这样只要程序不停即使此节点宕机仍然可以连接到其他节点。

3:查询类型SearchType.QUERY_THEN_FETCH

es 查询共有4种查询类型

QUERY_AND_FETCH:

主节点将查询请求分发到所有的分片中,各个分片按照自己的查询规则即词频文档频率进行打分排序,然后将结果返回给主节点,主节点对所有数据进行汇总排序然后再返回给客户端,此种方式只需要和es交互一次。

这种查询方式存在数据量和排序问题,主节点会汇总所有分片返回的数据这样数据量会比较大,二是各个分片上的规则可能不一致。

QUERY_THEN_FETCH:

主节点将请求分发给所有分片,各个分片打分排序后将数据的id和分值返回给主节点,主节点收到后进行汇总排序再根据排序后的id到对应的节点读取对应的数据再返回给客户端,此种方式需要和es交互两次。

这种方式解决了数据量问题但是排序问题依然存在而且是es的默认查询方式

DEF_QUERY_AND_FETCH: 和 DFS_QUERY_THEN_FETCH:

将各个分片的规则统一起来进行打分。解决了排序问题但是DFS_QUERY_AND_FETCH仍然存在数据量问题,DFS_QUERY_THEN_FETCH两种噢乖你问题都解决但是效率是最差的。

1, 获取client, 两种方式获取

@Before

public void before() throws Exception {

Map map = new HashMap();

map.put("cluster.name", "elasticsearch_wenbronk");

Settings.Builder settings = Settings.builder().put(map);

client = TransportClient.builder().settings(settings).build()

.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("wenbronk.com"), Integer.parseInt("9300")));

}

@Before

public void before11() throws Exception {

// 创建客户端, 使用的默认集群名, "elasticSearch"

// client = TransportClient.builder().build()

// .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("wenbronk.com"), 9300));

// 通过setting对象指定集群配置信息, 配置的集群名

Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch_wenbronk") // 设置集群名

// .put("client.transport.sniff", true) // 开启嗅探 , 开启后会一直连接不上, 原因未知

// .put("network.host", "192.168.50.37")

.put("client.transport.ignore_cluster_name", true) // 忽略集群名字验证, 打开后集群名字不对也能连接上

// .put("client.transport.nodes_sampler_interval", 5) //报错,

// .put("client.transport.ping_timeout", 5) // 报错, ping等待时间,

.build();

client = TransportClient.builder().settings(settings).build()

.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300)));

// 默认5s

// 多久打开连接, 默认5s

System.out.println("success connect");

}

PS: 官网给的2种方式都不能用, 需要合起来才能用, 浪费老子一下午...

其他参数的意义:

代码:

package com.wenbronk.javaes;

import java.net.InetAddress;

import java.net.InetSocketAddress;

import java.util.Date;

import java.util.HashMap;

import java.util.List;

import java.util.Map;

import java.util.concurrent.TimeUnit;

import org.elasticsearch.action.bulk.BackoffPolicy;

import org.elasticsearch.action.bulk.BulkProcessor;

import org.elasticsearch.action.bulk.BulkProcessor.Listener;

import org.elasticsearch.action.bulk.BulkRequest;

import org.elasticsearch.action.bulk.BulkRequestBuilder;

import org.elasticsearch.action.bulk.BulkResponse;

import org.elasticsearch.action.delete.DeleteRequest;

import org.elasticsearch.action.delete.DeleteResponse;

import org.elasticsearch.action.get.GetResponse;

import org.elasticsearch.action.get.MultiGetItemResponse;

import org.elasticsearch.action.get.MultiGetResponse;

import org.elasticsearch.action.index.IndexRequest;

import org.elasticsearch.action.index.IndexResponse;

import org.elasticsearch.action.update.UpdateRequest;

import org.elasticsearch.action.update.UpdateResponse;

import org.elasticsearch.client.transport.TransportClient;

import org.elasticsearch.cluster.node.DiscoveryNode;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.common.transport.InetSocketTransportAddress;

import org.elasticsearch.common.unit.ByteSizeUnit;

import org.elasticsearch.common.unit.ByteSizeValue;

import org.elasticsearch.common.unit.TimeValue;

import org.elasticsearch.common.xcontent.XContentBuilder;

import org.elasticsearch.common.xcontent.XContentFactory;

import org.elasticsearch.script.Script;

import org.junit.Before;

import org.junit.Test;

import com.alibaba.fastjson.JSONObject;

/**

* 使用java API操作elasticSearch

*

* @author 231

*

*/

public class JavaESTest {

private TransportClient client;

private IndexRequest source;

/**

* 获取连接, 第一种方式

* @throws Exception

*/

// @Before

public void before() throws Exception {

Map map = new HashMap();

map.put("cluster.name", "elasticsearch_wenbronk");

Settings.Builder settings = Settings.builder().put(map);

client = TransportClient.builder().settings(settings).build()

.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("wenbronk.com"), Integer.parseInt("9300")));

}

/**

* 查看集群信息

*/

@Test

public void testInfo() {

List nodes = client.connectedNodes();

for (DiscoveryNode node : nodes) {

System.out.println(node.getHostAddress());

}

}

/**

* 组织json串, 方式1,直接拼接

*/

public String createJson1() {

String json = "{" +

"\"user\":\"kimchy\"," +

"\"postDate\":\"2013-01-30\"," +

"\"message\":\"trying out Elasticsearch\"" +

"}";

return json;

}

/**

* 使用map创建json

*/

public Map createJson2() {

Map json = new HashMap<String, Object>();

json.put("user", "kimchy");

json.put("postDate", new Date());

json.put("message", "trying out elasticsearch");

return json;

}

/**

* 使用fastjson创建

*/

public JSONObject createJson3() {

JSONObject json = new JSONObject();

json.put("user", "kimchy");

json.put("postDate", new Date());

json.put("message", "trying out elasticsearch");

return json;

}

/**

* 使用es的帮助类

*/

public XContentBuilder createJson4() throws Exception {

// 创建json对象, 其中一个创建json的方式

XContentBuilder source = XContentFactory.jsonBuilder()

.startObject()

.field("user", "kimchy")

.field("postDate", new Date())

.field("message", "trying to out ElasticSearch")

.endObject();

return source;

}

/**

* 存入索引中

* @throws Exception

*/

@Test

public void test1() throws Exception {

XContentBuilder source = createJson4();

// 存json入索引中

IndexResponse response = client.prepareIndex("twitter", "tweet", "1").setSource(source).get();

// // 结果获取

String index = response.getIndex();

String type =xhBxJyKYTN response.getType();

String id = response.getId();

long version = response.getVersion();

boolean created = response.isCreated();

System.out.println(index + " : " + type + ": " + id + ": " + version + ": " + created);

}

/**

* get API 获取指定文档信息

*/

@Test

public void testGet() {

// GetResponse response = client.prepareGet("twitter", "tweet", "1")

// .get();

GetResponse response = client.prepareGet("twitter", "tweet", "1")

.setOperationThreaded(false) // 线程安全

.get();

System.out.println(response.getSourceAsString());

}

/**

* 测试 delete api

*/

@Test

public void testDelete() {

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")

.get();

String index = response.getIndex();

String type = response.getType();

String id = response.getId();

long version = response.getVersion();

System.out.println(index + " : " + type + ": " + id + ": " + version);

}

/**

* 测试更新 update API

* 使用 updateRequest 对象

* @throws Exception

*/

@Test

public void testUpdate() throws Exception {

UpdateRequest updateRequest = new UpdateRequest();

updateRequest.index("twitter");

updateRequest.type("tweet");

updateRequest.id("1");

updateRequest.doc(XContentFactory.jsonBuilder()

.startObject()

// 对没有的字段添加, 对已有的字段替换

.field("gender", "male")

.field("message", "hello")

.endObject());

UpdateResponse response = client.update(updateRequest).get();

// 打印

String index = response.getIndex();

String type = response.getType();

String id = response.getId();

long version = response.getVersion();

System.out.println(index + " : " + type + ": " + id + ": " + version);

}

/**

* 测试update api, 使用client

* @throws Exception

*/

@Test

public void testUpdate2() throws Exception {

// 使用Script对象进行更新

// UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1")

// .setScript(new Script("hits._source.gender = \"male\""))

// .get();

// 使用XContFactory.jsonBuilder() 进行更新

// UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1")

// .setDoc(XContentFactory.jsonBuilder()

// .startObject()

// .field("gender", "malelelele")

// .endObject()).get();

// 使用updateRequest对象及script

// UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")

// .script(new Script("ctx._source.gender=\"male\""));

// UpdateResponse response = client.update(updateRequest).get();

// 使用updateRequest对象及documents进行更新

UpdateResponse response = client.update(new UpdateRequest("twitter", "tweet", "1")

.doc(XContentFactory.jsonBuilder()

.startObject()

.field("gender", "male")

.endObject()

)).get();

System.out.println(response.getIndex());

}

/**

* 测试update

* 使用updateRequest

* @throws Exception

* @throws InterruptedException

*/

@Test

public void testUpdate3() throws InterruptedException, Exception {

UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")

.script(new Script("ctx._source.gender=\"male\""));

UpdateResponse response = client.update(updateRequest).get();

}

/**

* 测试upsert方法

* @throws Exception

*

*/

@Test

public void testUpsert() throws Exception {

// 设置查询条件, 查找不到则添加生效

IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "2")

.source(XContentFactory.jsonBuilder()

.startObject()

.field("name", "214")

.field("gender", "gfrerq")

.endObject());

// 设置更新, 查找到更新下面的设置

UpdateRequest upsert = new UpdateRequest("twitter", "tweet", "2")

.doc(XContentFactory.jsonBuilder()

.startObject()

.field("user", "wenbronk")

.endObject())

.upsert(indexRequest);

client.update(upsert).get();

}

/**

* 测试multi get api

* 从不同的index, type, 和id中获取

*/

@Test

public void testMultiGet() {

MultiGetResponse multiGetResponse = client.prepareMultiGet()

.add("twitter", "tweet", "1")

.add("twitter", "tweet", "2", "3", "4")

.add("anothoer", "type", "foo")

.get();

for (MultiGetItemResponse itemResponse : multiGetResponse) {

GetResponse response = itemResponse.getResponse();

if (response.isExists()) {

String sourceAsString = response.getSourceAsString();

System.out.println(sourceAsString);

}

}

}

/**

* bulk 批量执行

* 一次查询可以update 或 delete多个document

*/

@Test

public void testBulk() throws Exception {

BulkRequestBuilder bulkRequest = client.prepareBulk();

bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")

.setSource(XContentFactory.jsonBuilder()

.startObject()

.field("user", "kimchy")

.field("postDate", new Date())

.field("message", "trying out Elasticsearch")

.endObject()));

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")

.setSource(XContentFactory.jsonBuilder()

.startObject()

.field("user", "kimchy")

.field("postDate", new Date())

.field("message", "another post")

.endObject()));

BulkResponse response = bulkRequest.get();

System.out.println(response.getHeaders());

}

/**

* 使用bulk processor

* @throws Exception

*/

@Test

public void testBulkProcessor() throws Exception {

// 创建BulkPorcessor对象

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new Listener() {

public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) {

// TODO Auto-generated method stub

}

// 执行出错时执行

public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {

// TODO Auto-generated method stub

}

public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {

// TODO Auto-generated method stub

}

})

// 1w次请求执行一次bulk

.setBulkActions(10000)

// 1gb的数据刷新一次bulk

.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))

// 固定5s必须刷新一次

.setFlushInterval(TimeValue.timeValueSeconds(5))

// 并发请求数量, 0不并发, 1并发允许执行

.setConcurrentRequests(1)

// 设置退避, 100ms后执行, 最大请求3次

.setBackoffPolicy(

BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))

.build();

// 添加单次请求

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1"));

bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

// 关闭

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

// 或者

bulkProcessor.close();

}

}

tes2代码:

package com.wenbronk.javaes;

import java.net.InetSocketAddress;

import org.apache.lucene.queryparser.xml.FilterBuilderFactory;

import org.elasticsearch.action.search.MultiSearchResponse;

import org.elasticsearch.action.search.SearchRequestBuilder;

import org.elasticsearch.action.search.SearchResponse;

import org.elasticsearch.action.search.SearchType;

import org.elasticsearch.client.transport.TransportClient;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.common.settings.Settings.Builder;

import org.elasticsearch.common.transport.InetSocketTransportAddress;

import org.elasticsearch.common.unit.TimeValue;

import org.elasticsearch.index.query.QueryBuilder;

import org.elasticsearch.index.query.QueryBuilders;

import org.elasticsearch.search.SearchHit;

import org.elasticsearch.search.aggregations.Aggregation;

import org.elasticsearch.search.aggregations.AggregationBuilders;

import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;

import org.elasticsearch.search.sort.SortOrder;

import org.elasticsearch.search.sort.SortParseElement;

import org.junit.Before;

import org.junit.Test;

/**

* 使用java API操作elasticSexhBxJyKYTNarch

* search API

* @author 231

*

*/

public class JavaESTest2 {

private TransportClient client;

/**

* 获取client对象

*/

@Before

public void testBefore() {

Builder builder = Settings.settingsBuilder();

builder.put("cluster.name", "wenbronk_escluster");

// .put("client.transport.ignore_cluster_name", true);

Settings settings = builder.build();

org.elasticsearch.client.transport.TransportClient.Builder transportBuild = TransportClient.builder();

TransportClient client1 = transportBuild.settings(settings).build();

client = client1.addTransportAddress((new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300))));

System.out.println("success connect to escluster");

}

/**

* 测试查询

*/

@Test

public void testSearch() {

// SearchRequestBuilder searchRequestBuilder = client.prepareSearch("twitter", "tweet", "1");

// SearchResponse response = searchRequestBuilder.setTypes("type1", "type2")

// .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)

// .setQuery(QueryBuilders.termQuery("user", "test"))

// .setPostFilter(QueryBuilders.rangeQuery("age").from(0).to(1))

// .setFrom(0).setSize(2).setExplain(true)

// .execute().actionGet();

SearchResponse response = client.prepareSearch()

.execute().actionGet();

// SearchHits hits = response.getHits();

// for (SearchHit searchHit : hits) {

// for(Iterator iterator = searchHit.iterator(); iterator.hasNext(); ) {

// SearchHitField next = iterator.next();

// System.out.println(next.getValues());

// }

// }

System.out.println(response);

}

/**

* 测试scroll api

* 对大量数据的处理更有效

*/

@Test

public void testScrolls() {

QueryBuilder queryBuilder = QueryBuilders.termQuery("twitter", "tweet");

SearchResponse response = client.prepareSearch("twitter")

.addSort(SortParseElement.DOC_FIELD_NAME, SortOrder.ASC)

.setScroll(new TimeValue(60000))

.setQuery(queryBuilder)

.setSize(100).execute().actionGet();

while(true) {

for (SearchHit hit : response.getHits().getHits()) {

System.out.println("i am coming");

}

SearchResponse response2 = client.prepareSearchScroll(response.getScrollId())

.setScroll(new TimeValue(60000)).execute().actionGet();

if (response2.getHits().getHits().length == 0) {

System.out.println("oh no=====");

break;

}

}

}

/**

* 测试multiSearch

*/

@Test

public void testMultiSearch() {

QueryBuilder qb1 = QueryBuilders.queryStringQuery("elasticsearch");

SearchRequestBuilder requestBuilder1 = client.prepareSearch().setQuery(qb1).setSize(1);

QueryBuilder qb2 = QueryBuilders.matchQuery("user", "kimchy");

SearchRequestBuilder requestBuilder2 = client.prepareSearch().setQuery(qb2).setSize(1);

MultiSearchResponse multiResponse = client.prepareMultiSearch().add(requestBuilder1).add(requestBuilder2)

.execute().actionGet();

long nbHits = 0;

for (MultiSearchResponse.Item item : multiResponse.getResponses()) {

SearchResponse response = item.getResponse();

nbHits = response.getHits().getTotalHits();

SearchHit[] hits = response.getHits().getHits();

System.out.println(nbHits);

}

}

/**

* 测试聚合查询

*/

@Test

public vhttp://oid testAggregation() {

SearchResponse response = client.prepareSearch()

.setQuery(QueryBuilders.matchAllQuery()) // 先使用query过滤掉一部分

.addAggregation(AggregationBuilders.terms("term").field("user"))

.addAggregation(AggregationBuilders.dateHistogram("agg2").field("birth")

.interval(DateHistogramInterval.YEAR))

.execute().actionGet();

Aggregation aggregation2 = response.getAggregations().get("term");

Aggregation aggregation = response.getAggregations().get("agg2");

// SearchResponse response2 = client.search(new SearchRequest().searchType(SearchType.QUERY_AND_FETCH)).actionGet();

}

/**

* 测试terminate

*/

@Test

public void testTerminateAfter() {

SearchResponse response = client.prepareSearch("twitter").setTerminateAfter(1000).get();

if (response.isTerminatedEarly()) {

System.out.println("ternimate");

}

}

/**

* 过滤查询: 大于gt, 小于lt, 小于等于lte, 大于等于gte

*/

@Test

public void testFilter() {

SearchResponse response = client.prepareSearch("twitter")

.setTypes("")

.setQuery(QueryBuilders.matchAllQuery()) //查询所有

.setSearchType(SearchType.QUERY_THEN_FETCH)

// .setPostFilter(FilterBuilders.rangeFilter("age").from(0).to(19)

// .includeLower(true).includeUpper(true))

// .setPostFilter(FilterBuilderFactory .rangeFilter("age").gte(18).lte(22))

.setExplain(true) //explain为true表示根据数据相关度排序,和关键字匹配最高的排在前面

.get();

}

/**

* 分组查询

*/

@Test

public void testGroupBy() {

client.prepareSearch("twitter").setTypes("tweet")

.setQuery(QueryBuilders.matchAllQuery())

.setSearchType(SearchType.QUERY_THEN_FETCH)

.addAggregation(AggregationBuilders.terms("user")

.field("user").size(0) // 根据user进行分组

// size(0) 也是10

).get();

}

}


版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:java使用es查询的示例代码
下一篇:JVM虚拟机查找类文件的顺序方法
相关文章

 发表评论

暂时没有评论,来抢沙发吧~