阅读量:0
1 RestHighLevelClient介绍
默认情况下,ElasticSearch使用两个端口来监听外部TCP流量。
- 9200端口:用于所有通过HTTP协议进行的API调用。包括搜索、聚合、监控、以及其他任何使用HTTP协议的请求。所有的客户端库都会使用该端口与ElasticSearch进行交互。
- 9300端口:是一个自定义的二进制协议,用于集群中各节点之间的通信。用于诸如集群变更、主节点选举、节点加入/离开、分片分配等事项。
RestHighLevelClient是ES的Java客户端,它是通过HTTP与ES集群进行通信。
2 引入ES依赖
<!--引入es-high-level-client相关依赖 start--> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.10.0</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-client</artifactId> <version>7.10.0</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.10.0</version> </dependency> <!--引入es-high-level-client相关依赖 end-->
3 使用
3.1 es的配置
# es配置 # es用户名 elasticsearch.userName=elastic # es密码 elasticsearch.password=elastic # es host ip 地址(集群),多个以","间隔 elasticsearch.hosts=127.0.0.1:9200 # es 请求方式 elasticsearch.scheme=http # es 连接超时时间(ms) elasticsearch.connectTimeOut=1000 # es socket 连接超时时间(ms) elasticsearch.socketTimeOut=30000 # es 请求超时时间(ms) elasticsearch.connectionRequestTimeOut=500 # es 最大连接数 elasticsearch.maxConnectNum=100 # es 每个路由的最大连接数 elasticsearch.maxConnectNumPerRoute=100
3.2 es客户端配置类
/** * restHighLevelClient 客户端配置类 * */ @Slf4j @Data @Configuration @ConfigurationProperties(prefix = "elasticsearch") public class ElasticsearchConfig { /** * es host ip 地址(集群) */ private String hosts; /** * es用户名 */ private String userName; /** * es密码 */ private String password; /** * es 请求方式 */ private String scheme; /** * es 连接超时时间 */ private int connectTimeOut; /** * es socket 连接超时时间 */ private int socketTimeOut; /** * es 请求超时时间 */ private int connectionRequestTimeOut; /** * es 最大连接数 */ private int maxConnectNum; /** * es 每个路由的最大连接数 */ private int maxConnectNumPerRoute; /** * 如果@Bean没有指定bean的名称,那么方法名就是bean的名称 */ @Bean(name = "restHighLevelClient") public RestHighLevelClient restHighLevelClient() { // 构建连接对象 RestClientBuilder builder = RestClient.builder(getEsHost()); // 连接延时配置 builder.setRequestConfigCallback(requestConfigBuilder -> { requestConfigBuilder.setConnectTimeout(connectTimeOut); requestConfigBuilder.setSocketTimeout(socketTimeOut); requestConfigBuilder.setConnectionRequestTimeout(connectionRequestTimeOut); return requestConfigBuilder; }); // 连接数配置 builder.setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.setMaxConnTotal(maxConnectNum); httpClientBuilder.setMaxConnPerRoute(maxConnectNumPerRoute); httpClientBuilder.setDefaultCredentialsProvider(getCredentialsProvider()); return httpClientBuilder; }); return new RestHighLevelClient(builder); } private HttpHost[] getEsHost() { // 拆分地址(es为多节点时,不同host以逗号间隔) List<HttpHost> hostLists = new ArrayList<>(); String[] hostList = hosts.split(","); for (String addr : hostList) { String host = addr.split(":")[0]; String port = addr.split(":")[1]; hostLists.add(new HttpHost(host, Integer.parseInt(port), scheme)); } // 转换成 HttpHost 数组 return hostLists.toArray(new HttpHost[]{}); } private CredentialsProvider getCredentialsProvider() { // 设置用户名、密码 CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password)); return credentialsProvider; } }
3.3 es的使用
3.3.1 创建es索引
3.3.1.1 创建es索引的工具类
创建es索引的工具类如下所示。
/** * 操作ES索引 * */ @Slf4j @Service public class EsIndexOperation { @Resource private RestHighLevelClient restHighLevelClient; private final RequestOptions options = RequestOptions.DEFAULT; /** * 判断索引是否存在 */ public boolean checkIndex(String index) { try { return restHighLevelClient.indices().exists(new GetIndexRequest(index), options); } catch (Exception e) { log.error("EsIndexOperation checkIndex error.", e); } return Boolean.FALSE; } /** * 创建索引 * * @param indexName es索引名 * @param esSettingFilePath es索引的alias、settings和mapping的配置文件 */ public boolean createIndex(String indexName, String esSettingFilePath) { String aliases = null; String mappings = null; String settings = null; if (StringUtils.isNotBlank(esSettingFilePath)) { try { String fileContent = FileUtils.readFileContent(esSettingFilePath); if (StringUtils.isNotBlank(fileContent)) { JSONObject jsonObject = JSON.parseObject(fileContent); aliases = jsonObject.getString("aliases"); mappings = jsonObject.getString("mappings"); settings = jsonObject.getString("settings"); } } catch (Exception e) { log.error("createIndex error.", e); return false; } } if (checkIndex(indexName)) { log.error("createIndex indexName:[{}]已存在", indexName); return false; } CreateIndexRequest request = new CreateIndexRequest(indexName); if ((StringUtils.isNotBlank(aliases))) { request.aliases(aliases, XContentType.JSON); } if (StringUtils.isNotBlank(mappings)) { request.mapping(mappings, XContentType.JSON); } if (StringUtils.isNotBlank(settings)) { request.settings(settings, XContentType.JSON); } try { this.restHighLevelClient.indices().create(request, options); return true; } catch (IOException e) { log.error("EsIndexOperation createIndex error.", e); return false; } } /** * 删除索引 */ public boolean deleteIndex(String indexName) { try { if (checkIndex(indexName)) { DeleteIndexRequest request = new DeleteIndexRequest(indexName); AcknowledgedResponse response = restHighLevelClient.indices().delete(request, options); return response.isAcknowledged(); } } catch (Exception e) { log.error("EsIndexOperation deleteIndex error.", e); } return Boolean.FALSE; } }
3.3.1.2 读取文件的工具类
/** * 文件操作类 */ @Slf4j public class FileUtils { /** * 读取项目resources文件夹下的文件 * * @param filePath 文件路径 * @return 文件内容 */ public static String readFileContent(String filePath) { try { BufferedReader reader = new BufferedReader(new FileReader(filePath)); String line; StringBuilder stringBuilder = new StringBuilder(); while ((line = reader.readLine()) != null) { stringBuilder.append(line); } reader.close(); return stringBuilder.toString(); } catch (IOException e) { log.error("readFileContent error.", e); } return null; } public static void main(String[] args) { String filePath = "src/main/resources/es/mappings_test20231216.txt"; String fileContent = readFileContent(filePath); } }
3.3.1.3 测试创建es索引
(1)在“resources”文件夹下创建es索引的配置文件
配置文件内容如下所示。
{ "aliases": { "test": {} }, "mappings": { "properties": { "name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "address": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } } } }, "settings": { "index": { "number_of_shards": "1", "number_of_replicas": "1" } } }
(2)读取es索引的配置文件,创建es索引
@Test public void createIndex() { String indexName = "test_1216"; String filePath = "src/main/resources/es/mappings_test20231216.txt"; boolean b = esIndexOperation.createIndex(indexName, filePath); Assert.assertTrue(b); }
(3)查看创建结果
通过命令 GET /test 查看es索引创建结果,结果如下所示。
{ "test_1216" : { "aliases" : { "test" : { } }, "mappings" : { "properties" : { "address" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "name" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } } } }, "settings" : { "index" : { "routing" : { "allocation" : { "include" : { "_tier_preference" : "data_content" } } }, "number_of_shards" : "1", "provided_name" : "test_1216", "creation_date" : "1702723364945", "number_of_replicas" : "1", "uuid" : "RCAhqjPZSG-n4fse3cot4A", "version" : { "created" : "7100099" } } } } }
3.3.2 查询操作
3.3.2.1 常用查询
/** * 查询操作 * */ @Slf4j @Service public class EsQueryOperation { @Resource private RestHighLevelClient client; private final RequestOptions options = RequestOptions.DEFAULT; /** * 查询总数 */ public Long count(String indexName) { CountRequest countRequest = new CountRequest(indexName); try { CountResponse countResponse = client.count(countRequest, options); return countResponse.getCount(); } catch (Exception e) { log.error("EsQueryOperation count error.", e); } return 0L; } /** * 查询数据集 */ public List<Map<String, Object>> list(String indexName, SearchSourceBuilder sourceBuilder) { SearchRequest searchRequest = new SearchRequest(indexName); searchRequest.source(sourceBuilder); try { SearchResponse searchResp = client.search(searchRequest, options); List<Map<String, Object>> data = new ArrayList<>(); SearchHit[] searchHitArr = searchResp.getHits().getHits(); for (SearchHit searchHit : searchHitArr) { Map<String, Object> temp = searchHit.getSourceAsMap(); temp.put("id", searchHit.getId()); data.add(temp); } return data; } catch (Exception e) { log.error("EsQueryOperation list error.", e); } return null; } }
3.3.2.2 测试
@Test public void list() { String indexName = "test"; // 查询条件 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery(); queryBuilder.must(QueryBuilders.termQuery("address", "hunan")); queryBuilder.mustNot(QueryBuilders.matchQuery("name", "Jack")); sourceBuilder.query(queryBuilder); // 分页查询 sourceBuilder.from(0); sourceBuilder.size(1); List<Map<String, Object>> list = esQueryOperation.list(indexName, sourceBuilder); Assert.assertTrue(true); }
3.3.3 增删改操作
3.3.3.1 常用增删改操作
/** * 增删改数据 * */ @Slf4j @Service public class EsDataOperation { @Resource private RestHighLevelClient client; private final RequestOptions options = RequestOptions.DEFAULT; /** * 写入数据 */ public boolean insert(String indexName, Map<String, Object> dataMap) { try { BulkRequest request = new BulkRequest(); request.add(new IndexRequest(indexName).opType("create") .id(dataMap.get("id").toString()) .source(dataMap, XContentType.JSON)); this.client.bulk(request, options); return Boolean.TRUE; } catch (Exception e) { log.error("EsDataOperation insert error.", e); } return Boolean.FALSE; } /** * 批量写入数据 */ public boolean batchInsert(String indexName, List<Map<String, Object>> userIndexList) { try { BulkRequest request = new BulkRequest(); for (Map<String, Object> dataMap : userIndexList) { request.add(new IndexRequest(indexName).opType("create") .id(dataMap.get("id").toString()) .source(dataMap, XContentType.JSON)); } this.client.bulk(request, options); return Boolean.TRUE; } catch (Exception e) { log.error("EsDataOperation batchInsert error.", e); } return Boolean.FALSE; } /** * 根据id更新数据,可以直接修改索引结构 * * @param refreshPolicy 数据刷新策略 */ public boolean update(String indexName, Map<String, Object> dataMap, WriteRequest.RefreshPolicy refreshPolicy) { try { UpdateRequest updateRequest = new UpdateRequest(indexName, dataMap.get("id").toString()); updateRequest.setRefreshPolicy(refreshPolicy); updateRequest.doc(dataMap); this.client.update(updateRequest, options); return Boolean.TRUE; } catch (Exception e) { log.error("EsDataOperation update error.", e); } return Boolean.FALSE; } /** * 删除数据 */ public boolean delete(String indexName, String id) { try { DeleteRequest deleteRequest = new DeleteRequest(indexName, id); this.client.delete(deleteRequest, options); return Boolean.TRUE; } catch (Exception e) { log.error("EsDataOperation delete error.", e); } return Boolean.FALSE; } }
3.3.3.2 测试
@Test public void insert(){ String indexName = "test"; HashMap<String, Object> hashMap = new HashMap<>(); hashMap.put("id",4); hashMap.put("name","tom"); hashMap.put("address","Jiangsu"); boolean flag = esDataOperation.insert(indexName, hashMap); Assert.assertTrue(true); } @Test public void update(){ String indexName = "test"; HashMap<String, Object> hashMap = new HashMap<>(); hashMap.put("id", 5); hashMap.put("name", "jack7"); boolean update = esDataOperation.update(indexName, hashMap, WriteRequest.RefreshPolicy.WAIT_UNTIL); Assert.assertTrue(true); }
4 参考文献
(1)elasticsearch学习(七):es客户端RestHighLevelClient_炎升的博客