ES Client性能测试初探
最近在工作中协助研发进行了ES优化,效果还是非常明显的,几乎翻倍。除了通过各种业务接口测试ES性能以外,还可以直接请求ES接口,绕过服务,这样应该数据回更加准确。所以,ES Client学起来。
准备工作
首先,先准备了一个ES服务,这里就不多赘述了,大家自己在尝试的时候一定主意好ES Server和ES Client的版本要一致。其次,新建项目,添加依赖。
学习资料
搜一下,能搜到很多的ES学习资料,建议先去看看大厂出品的基础知识了解一下ES功能。然后就可以直接看ES的API了。下面是ES官方的文档地址:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/6.7/java-rest-high-search.html
如果能能查看自己公司项目源码的小伙伴可以多研究研发的代码,能够更好结合业务理解ES API的使用。
ES Client
HTTP请求
这里说一下,很多ES查询功能都是通过HTTP请求完成的,GET请求,body传参,一开始还是比较懵逼的。查了一些资料需要自己实现是个body携带数据的HTTPGET请求,下面是我的实现代码:
package com.funtester.httpclient import org.apache.http.client.methods.HttpEntityEnclosingRequestBase import javax.annotation.concurrent.NotThreadSafe /**
* HttpGet请求携带body参数
*/ @NotThreadSafe class HttpGetByBody extends HttpEntityEnclosingRequestBase { static final String METHOD_NAME = "GET"; /**
* 获取方法(必须重载)
*
* @return */ @Override String getMethod() { return METHOD_NAME;
} /**
* PS:不能照抄{@link org.apache.http.client.methods.HttpPost}
* @param uri
*/ HttpGetByBody(final String uri) { this(new URI(uri))
}
HttpGetByBody(final URI uri) { super();
setURI(uri);
}
HttpGetByBody() { super();
}
}
ES Client
如果使用HTTP接口进行ES操作,需要组合多层级的参数,这个写起来会比较麻烦、可读性也比较差,而且更加容易出错。所以,还是使用ES Client作为操作ES的基础框架。
如果翻看ES Client源码,最终也是通过HttpClient发起HTTP请求的,这中间进行了很多的封装。这里分享一下ES Client的HTTP Client创建代码部分:
private CloseableHttpAsyncClient createHttpClient() { //default timeouts are all infinite RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
.setConnectTimeout(DEFAULT_CONNECT_TIMEOUT_MILLIS)
.setSocketTimeout(DEFAULT_SOCKET_TIMEOUT_MILLIS); if (requestConfigCallback != null) {
requestConfigBuilder = requestConfigCallback.customizeRequestConfig(requestConfigBuilder);
} try {
HttpAsyncClientBuilder httpClientBuilder = HttpAsyncClientBuilder.create().setDefaultRequestConfig(requestConfigBuilder.build()) //default settings for connection pooling may be too constraining .setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE).setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL)
.setSSLContext(SSLContext.getDefault())
.setTargetAuthenticationStrategy(new PersistentCredentialsAuthenticationStrategy()); if (httpClientConfigCallback != null) {
httpClientBuilder = httpClientConfigCallback.customizeHttpClient(httpClientBuilder);
} final HttpAsyncClientBuilder finalBuilder = httpClientBuilder; return AccessController.doPrivileged(new PrivilegedAction() { @Override public CloseableHttpAsyncClient run() { return finalBuilder.build();
}
});
} catch (NoSuchAlgorithmException e) { throw new IllegalStateException("could not create the default ssl context", e);
}
}
可以看出ES Client用到了HttpClient的异步Client,我猜是用future实现同步返回响应结果,这个没仔细看,有错请指出。这里也回答我的自己的一个疑惑,ES Client是支持并发的。
ES Client 封装
就我自己的观察,ES Client的封装程度非常高,完全可以拿来就用。我担心自己过几天之后就不知道改怎么用这些ES Client 的API了,所以又进行了一次封装,权当是一个学习笔记类。
封装代码有点多,放到了文末。
测试用例
添加数据
这个可以用来跑一部分数据到ES里。
package com.funtest.groovytest import com.alibaba.fastjson.JSONObject import com.funtester.es.ESClient import com.funtester.frame.SourceCode import com.funtester.frame.execute.FunQpsConcurrent import java.util.concurrent.atomic.AtomicInteger class ESC extends SourceCode { static void main(String[] args) {
def client = new ESClient("127.0.0.1", 9200, "http")
def data = new JSONObject()
data.name = "FunTester" data.age = getRandomInt(100)
def index = new AtomicInteger(0)
def test = {
data.put("time", index.getAndIncrement())
client.index("fun", "tt", data)
} new FunQpsConcurrent(test, "ES添加数据").start()
}
}
如果想测试添加、删除功能,只需要把test
闭包内容修改即可。
def test = {
data.put("time", index.getAndIncrement())
client.delete("fun", "tt", client.index("fun", "tt", data))
}
下面是搜索功能的性能测试用例:
package com.funtest.groovytest import com.alibaba.fastjson.JSONObject import com.funtester.es.ESClient import com.funtester.frame.SourceCode import com.funtester.frame.execute.FunQpsConcurrent import org.elasticsearch.index.query.QueryBuilders import java.util.concurrent.atomic.AtomicInteger class ESC extends SourceCode { static void main(String[] args) {
def client = new ESClient("127.0.0.1", 9200, "http")
def data = new JSONObject()
data.name = "FunTester" data.age = getRandomInt(100)
def index = new AtomicInteger(0)
def test = {
client.search("fun", QueryBuilders.matchQuery("time", getRandomInt(10)))
} new FunQpsConcurrent(test, "ES搜索").start()
}
}
ES Client API封装类
package com.funtester.es import com.funtester.frame.SourceCode import groovy.util.logging.Log4j2 import org.apache.http.HttpHost import org.elasticsearch.action.delete.DeleteRequest import org.elasticsearch.action.get.GetRequest import org.elasticsearch.action.get.GetResponse import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.action.index.IndexResponse import org.elasticsearch.action.search.SearchRequest import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.action.search.SearchScrollRequest import org.elasticsearch.client.RequestOptions import org.elasticsearch.client.RestClient import org.elasticsearch.client.RestHighLevelClient import org.elasticsearch.common.unit.TimeValue import org.elasticsearch.index.query.QueryBuilder import org.elasticsearch.search.SearchHits import org.elasticsearch.search.builder.SearchSourceBuilder import org.elasticsearch.search.fetch.subphase.FetchSourceContext import java.util.concurrent.TimeUnit /**
* ES客户端API练习类
*/ @Log4j2 class ESClient extends SourceCode { String host int port
String scheme
RestHighLevelClient client ESClient(String host, int port = 9200, String scheme = "http") { this.host = host this.port = port this.scheme = scheme // 设置验证信息,填写账号及密码 // CredentialsProvider credentialsProvider = new BasicCredentialsProvider() // credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("user", "passwd")) def builder = RestClient.builder(new HttpHost(host, port, scheme)) // 设置认证信息 // builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { // // @Override // public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { // return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider) // } // }) builder.setMaxRetryTimeoutMillis(1000)
client = new RestHighLevelClient(builder)
} /**
* 添加数据
* @param index
* @param type
* @param data
* @return */ def index(String index, type, Map data) {
IndexRequest indexRequest = new IndexRequest(index, type).source(data)
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT)
indexResponse.getId()
} /**
* 获取数据
* @param index
* @param type
* @param id
* @return */ def get(String index, type, id) { // 查询文档 GetRequest getRequest = new GetRequest(index, type, id)
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT) if (getResponse.isExists()) {
getResponse.getSourceAsString()
}
} /**
* 数据是否存在
* @param index
* @param type
* @param id
* @return */ def exists(String index, type, id) {
GetRequest getRequest = new GetRequest(index, type, id)
getRequest.fetchSourceContext(new FetchSourceContext(false))
getRequest.storedFields("_none_")
client.exists(getRequest, RequestOptions.DEFAULT)
} /**
* 删除数据
* @param index
* @param type
* @param id
* @return */ def delete(String index, type, id) {
DeleteRequest deleteRequest = new DeleteRequest(index, type, id)
client.delete(deleteRequest, RequestOptions.DEFAULT)
} /**
* 搜索数据
* @param index
* @param query
* @param size
* @return */ def search(String index, QueryBuilder query, int size = 10) {
SearchRequest searchRequest = new SearchRequest(index)
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder()
sourceBuilder.query(query)
sourceBuilder.from(0)
sourceBuilder.size(size)
sourceBuilder.timeout(new TimeValue(1, TimeUnit.SECONDS))
searchRequest.source(sourceBuilder)
client.search(searchRequest, RequestOptions.DEFAULT)
} /**
* 滚动搜索
* @param index
* @param query
* @param size
*/ def searchScroll(String index, QueryBuilder query, int size = 10) {
SearchRequest searchRequest = new SearchRequest(index)
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
searchSourceBuilder.query(query)
searchSourceBuilder.size(size)
searchRequest.source(searchSourceBuilder)
searchRequest.scroll(TimeValue.timeValueMinutes(1L))
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT)
String scrollId = searchResponse.getScrollId()
SearchHits hits = searchResponse.getHits()
def searchHits = hits.getHits() while (searchHits != null && searchHits.length > 0) {
SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId)
scrollRequest.scroll(TimeValue.timeValueMinutes(1L))
searchResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT)
scrollId = searchResponse.getScrollId()
searchHits = searchResponse.getHits().getHits()
}
} def close() {
client.close()
}
}