elasticsearch 查询超10000的解决方案

avatar
作者
筋斗云
阅读量:0

前言

默认情况下,Elasticsearch集群中每个分片的搜索结果数量限制为10000。这是为了避免潜在的性能问题。

但是我们 在实际工作过程中时常会遇到 需要深度分页,以及查询批量数据更新的情况

问题:当请求form + size >10000 时,请求直接报错

在这里插入图片描述

1:修改max_result_window 参数(不推荐)

在此方案中,我们建议仅限于测试用,生产禁用,毕竟当数据量大的时候,过大的数据量可能导致es的内存溢出,直接崩掉,一年绩效白干。

PUT wkl_test/_settings {    "index":{         "max_result_window":2147483647     } } 

查看索引的 settings
在这里插入图片描述
重新查数据:

在这里插入图片描述

2:使用游标 scroll API

使用scroll API:scroll API可以帮助我们在不加载所有数据的情况下获取所有结果。它会在后台执行查询以获取滚动ID,并将其用于进行后续查询。这样就可以一次性获取所有结果,而不必担心限制

ES语句查询

在游标方案中,我们只需要在第一次拿到游标id,之后通过游标就能唯一确定查询,在这个查询中通过我们指定的 size 移动游标,具体操作看看下面实操。

  • 游标查询,设置游标有效时间,有效时间内,游标都可以使用,过期就不行了
GET wkl_test/_search?scroll=5m {   "query": {     "match_all": {}   },   "sort": [     {       "seq": {         "order": "asc"       }     }   ],   "size": 200 } 
  • 上面操作中通过游标的结果返回
    在这里插入图片描述
  • 之后将_scroll_id 复制到窗口,就可以不端通过这个_scroll_id 进行之前设置的页数不断翻页
    以此类推,后面每次滚屏都把前一个的scroll_id复制过来。注意到,后续请求时没有了index信息,size信息等,这些都在初始请求中,只需要使用scroll_id和scroll两个参数即可。
    在这里插入图片描述
    注意,此时游标移动了,所以我们可以通过游标的方式不断后移,直到移动到我们想要的 from+size 范围内。再次点击
    在这里插入图片描述

java实现

 @Test     public void testScroll(){         RestHighLevelClient restHighLevelClient ;         BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();         boolQueryBuilder.mustNot(QueryBuilders.existsQuery("seq"));          try {             //滚动查询的Scroll,设置请求滚动时间窗口时间             Scroll scroll = new Scroll(TimeValue.timeValueMillis(180000));              SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();             //加入query语句             sourceBuilder.query(boolQueryBuilder);             //每次滚动的长度             sourceBuilder.size(SIZE);             //加入排序字段             sourceBuilder.sort("id", SortOrder.DESC);             //构建searchRequest             //加入scroll和构造器             SearchRequest searchRequest = new SearchRequest()                     .indices("wkl_test")                     .source(sourceBuilder)                     .scroll(scroll);             //存储scroll的list             List<String> scrollIdList = new ArrayList<>();             //执行首次检索             SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);             //首次检索返回scrollId,用于下一次的滚动查询             String scrollId = searchResponse.getScrollId();             //拿到hits结果             SearchHit[] hits = searchResponse.getHits().getHits();             long value = searchResponse.getHits().getTotalHits().value;             //保存返回结果List大小             Long resultSize = 0L;             scrollIdList.add(scrollId);             try {                 //滚动查询将SearchHit封装到result中                 while (ArrayUtils.isNotEmpty(hits) && hits.length > 0) {                     BulkRequest bulkRequest = new BulkRequest();                     JSONArray esArray = new JSONArray();                     for (SearchHit hit : hits) {                         String sourceAsString = hit.getSourceAsString();                         String index = hit.getIndex();                         JSONObject jsonObject = JSONObject.parseObject(sourceAsString);                         String seq = jsonObject.getString("seq");                         if(StringUtils.isBlank(seq) ){                             esArray.add(jsonObject);                             String uuid = jsonObject.getString("id");                             jsonObject.put("is_del",1);                             bulkRequest.add(new UpdateRequest(index, uuid).doc(jsonObject));                         }                     }                     resultSize = resultSize+hits.length;                      //发送请求                     //实时更新                     bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);                     BulkResponse bulk = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);                     System.out.println(bulk.getTook()+"-------"+bulk.getItems().length);                      //说明滚动完了,返回结果即可                     if (resultSize > 20000) {                         break;                     }                     //继续滚动,根据上一个游标,得到这次开始查询位置                     SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);                     searchScrollRequest.scroll(scroll);                     //得到结果                     SearchResponse searchScrollResponse = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);                     //定位游标                     scrollId = searchScrollResponse.getScrollId();                     hits = searchScrollResponse.getHits().getHits();                     scrollIdList.add(scrollId);                 }                 System.out.println("----彻底结束了-----");             } finally {                 //清理scroll,释放资源                 ClearScrollRequest clearScrollRequest = new ClearScrollRequest();                 clearScrollRequest.setScrollIds(scrollIdList);                 restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);             }         } catch (Exception e) {             throw new RuntimeException(e);         }     } 

scroll API 的优缺点和总结

优缺点:

  • scroll查询的相应数据是非实时的,如果遍历过程中插入新的数据,是查询不到的。并且保留上下文需要足够的堆内存空间。
  • 相比于 from/size 和 search_after 返回一页数据,Scroll API 可用于从单个搜索请求中检索大量结果。但是 scroll 滚动遍历查询是非实时的,数据量大的时候,响应时间可能会比较长

适用场景

  • 全量或数据量很大时遍历结果数据,而非分页查询。
  • scroll方案基于快照,不能用在高实时性的场景下,建议用在类似数据导出场景下使用

3: search_after + PIT 深度查询

  • Search_after是 ES 5 新引入的一种分页查询机制,其原理几乎就是和scroll一样,因此代码也几乎是一样的。
  • 官方文档说明不再建议使用scroll滚动分页和from size分页,建议使用search_after
  • search_after 分页的方式和 scroll 搜索有一些显著的区别,首先它是根据上一页的最后一条数据来确定下一页的位置,同时在分页请求的过程中,如果有索引数据的增删改查,这些变更也会实时的反映到游标上。

不带PIT

ES语句实现

检索第一页的查询如下所示:

GET wkl_test/_search {   "query": {     "match_all": {}   },   "sort": [     {       "seq": {         "order": "asc"       }     }   ],   "size": 200 }  

上述请求的结果包括每个文档的 sort 值数组。
在这里插入图片描述

这些 sort 值可以与 search_after 参数一起使用,以开始返回在这个结果列表之后的任何文档。例如,我们可以使用上一个文档的 sort 值并将其传递给 search_after 以检索下一页结果:

在这里插入图片描述

Java 实现

@Test     public void testSearchAfter() throws IOException {         RestHighLevelClient restHighLevelClient = es7UtilApi.getRestHighLevelClient();          MatchAllQueryBuilder matchAllQueryBuilder = QueryBuilders.matchAllQuery();          SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();         searchSourceBuilder.query(matchAllQueryBuilder);         searchSourceBuilder.from(0);         searchSourceBuilder.size(200);         searchSourceBuilder.sort("seq", SortOrder.ASC);         searchSourceBuilder.trackTotalHits(true);          SearchRequest searchRequest = new SearchRequest()                 .indices("wkl_test")                 .source(searchSourceBuilder);          SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);         SearchHits hits = searchResponse.getHits();         long value = hits.getTotalHits().value;         System.out.println("查询到记录数=" + value);          List<JSONObject> list = new ArrayList<>();         SearchHit[] searchHists = hits.getHits();         Object[] sortValues = searchHists[searchHists.length - 1].getSortValues();         if (searchHists.length > 0) {             for (SearchHit hit : searchHists) {                 String sourceAsString = hit.getSourceAsString();                 JSONObject jsonObject = JSON.parseObject(sourceAsString);                 jsonObject.put("_id", hit.getId());                 list.add(jsonObject);             }         }          //往后的每次请求都携带上一次的sort_id进行访问。         while (ArrayUtils.isNotEmpty(searchHists) && searchHists.length > 0){             searchSourceBuilder.searchAfter(sortValues);             searchRequest.source(searchSourceBuilder);             SearchResponse searchResponseAfter = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);             hits = searchResponseAfter.getHits();             searchHists = hits.getHits();             sortValues = searchHists[searchHists.length - 1].getSortValues();             if (searchHists.length > 0) {                 for (SearchHit hit : searchHists) {                     String sourceAsString = hit.getSourceAsString();                     JSONObject jsonObject = JSON.parseObject(sourceAsString);                     jsonObject.put("_id", hit.getId());                     list.add(jsonObject);                 }             }             if(list.size()>20000){                 break;             }             System.out.println("-----彻底结束了-------");         }      } 

问题

「优点:」

  • 无状态查询,可以防止在查询过程中,数据的变更无法及时反映到查询中。

  • 不需要维护scroll_id,不需要维护快照,因此可以避免消耗大量的资源。

「缺点:」

  • 由于无状态查询,因此在查询期间的变更可能会导致跨页面的不一值。

  • 排序顺序可能会在执行期间发生变化,具体取决于索引的更新和删除。

  • 至少需要制定一个唯一的不重复字段来排序。

  • 它不适用于大幅度跳页查询,或者全量导出,对第N页的跳转查询相当于对es不断重复的执行N次search after,而全量导出则是在短时间内执行大量的重复查询。

带PIT

关于PIT

  • 在7.*版本中,ES官方不再推荐使用Scroll方法来进行深分页,而是推荐使用带PIT的search_after来进行查询;

  • 从7.*版本开始,您可以使用SEARCH_AFTER参数通过上一页中的一组排序值检索下一页命中。

  • 使用SEARCH_AFTER需要多个具有相同查询和排序值的搜索请求。

  • 如果这些请求之间发生刷新,则结果的顺序可能会更改,从而导致页面之间的结果不一致。
    为防止出现这种情况,您可以创建一个时间点(PIT)来在搜索过程中保留当前索引状态。

ES语句实现

1:生成pit
#keep_alive必须要加上,它表示这个pit能存在多久,这里设置的是1分钟 POST wkl_test/_pit?keep_alive=1m 

在这里插入图片描述

2:在搜索请求中指定PIT:

在每个搜索请求中添加 keep_alive 参数来延长 PIT 的保留期,相当于是重置了一下时间

 GET _search {   "query": {     "match_all": {}   },   "pit":{     "id":"t_yxAwEId2tsX3Rlc3QWU0hzbEJkYWNTVEd0ZGRoN0xsQVVNdwAWUGQtaXJpT0xTa2VUN0RGLXZfTlBvZwAAAAAACHG1fxY1UWNKX1RHOFMybXBaV20zbWx3enp3ARZTSHNsQmRhY1NUR3RkZGg3TGxBVU13AAA=",     "keep_alive":"5m"   },   "sort": [     {       "seq": {         "order": "asc"       }     }   ],   "size": 200 } 

在这里插入图片描述

3:删除PIT
DELETE _pit {  "id":"t_yxAwEId2tsX3Rlc3QWU0hzbEJkYWNTVEd0ZGRoN0xsQVVNdwAWUGQtaXJpT0xTa2VUN0RGLXZfTlBvZwAAAAAACHG1fxY1UWNKX1RHOFMybXBaV20zbWx3enp3ARZTSHNsQmRhY1NUR3RkZGg3TGxBVU13AAA=" } 

在这里插入图片描述

总结

  • 如果数据量小(from+size在10000条内),或者只关注结果集的TopN数据,可以使用from/size 分页,简单粗暴

  • 数据量大,深度翻页,后台批处理任务(数据迁移)之类的任务,使用 scroll 方式

  • 数据量大,深度翻页,用户实时、高并发查询需求,使用 search after 方式

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!