Elasticsearch学习之(一)线上迁移数据方案_elasticsearch 在线迁移

avatar
作者
猴君
阅读量:1

先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7

深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年最新大数据全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友。
img
img
img
img
img

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,涵盖了95%以上大数据知识点,真正体系化!

由于文件比较多,这里只是将部分目录截图出来,全套包含大厂面经、学习笔记、源码讲义、实战项目、大纲路线、讲解视频,并且后续会持续更新

如果你需要这些资料,可以添加V获取:vip204888 (备注大数据)
img

正文

    // 计算开始和结束的年份     int startYear = startCalendar.get(Calendar.YEAR);     int endYear = endCalendar.get(Calendar.YEAR);      // 遍历每个年份的月份,生成索引名称     for (int year = startYear; year <= endYear; year++) {         Calendar tempCalendar = Calendar.getInstance();         tempCalendar.set(Calendar.YEAR, year);         tempCalendar.set(Calendar.MONTH, Calendar.JANUARY); // 一月份          // 对于开始年份,从开始的月份开始遍历         int startMonth = year == startYear ? startCalendar.get(Calendar.MONTH) : Calendar.JANUARY;          // 对于结束年份,到结束的月份结束遍历         int endMonth = year == endYear ? endCalendar.get(Calendar.MONTH) : Calendar.DECEMBER;          for (int month = startMonth; month <= endMonth; month++) {             tempCalendar.set(Calendar.MONTH, month);             indexNames.add( indexPrefix + dateFormat.format(tempCalendar.getTime()));         }     }       return indexNames; } 
 ### 2、数据访问   #### 1、每个月的最后几天生成下个月的索引   注意点:    1、因为月份最后几天不确定,所以从28-31 都计算一下    2、创建索引的配置`indexConfiguration`自己写个json文件然后放到容器中读取即可    

@Scheduled(cron = “0 0 10 28-31 * ?”)
public void createIndex() throws IOException {
//判断是否是最后一天
if(!DateUtil.isLastDayOfMonth()){
log.warn(“索引初始化|判断不是本月最后一天|不进行处理”);
return;
}

    // 每月最后一天生成下个月的索引     Calendar instance = Calendar.getInstance();     instance.add(Calendar.MONTH,1);     instance.set(Calendar.DAY\_OF\_MONTH,1);      String indexName = ElasticsearchUtil.getIndexNameByTimeStamp(newIndexPrefix, instance.getTimeInMillis());     GetIndexRequest getIndexRequest = new GetIndexRequest(indexName);     boolean exists = restHighLevelClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);     if(exists){         log.warn("索引初始化|索引已存在|index:{}",indexName);         return;     }     if(StringUtils.isEmpty(indexConfiguration)){         log.error("索引初始化|获取索引初始化配置为空|setting:{}",indexConfiguration);         return;     }     CreateIndexRequest request = new CreateIndexRequest(indexName);     // 初始化索引     request.source(indexConfiguration,XContentType.JSON);     CreateIndexResponse response = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);     boolean acknowledged = response.isAcknowledged();     log.info("索引初始化|初始化完成|所有节点是否都已确认:{}",acknowledged);  } 
 **index初始化的配置**    从resource下读取配置文件    

@Bean(“indexConfiguration”)
public String initIndexConfiguration() throws IOException {
String indexConfiguration= initEsIndexSetting(“indexInitialization.json”);
log.info(“索引初始化|初始化索引配置文件:{}”,indexConfiguration);
return indexConfiguration;
}

private String initEsIndexSetting(String resource) throws IOException {     ClassPathResource classPathResource = new ClassPathResource(resource);     try (InputStream in = classPathResource.getInputStream()) {         return StreamUtils.copyToString(in, StandardCharsets.UTF\_8);     }catch (IOException e){         throw new IOException(e);     } } 
 #### 2、数据查询   很明显根据时间戳确定索引名称然后查询就行了    

public List queryLisNewt(int from, int pageSize, BoolQueryBuilder queryBuilder,FeedbackReport entity) {

    List<FeedbackReport>results=new ArrayList<>();      if( StringUtils.isEmpty(entity.getStartDate()) || StringUtils.isEmpty(entity.getEndDate()) ){         log.error("queryLisNewt|查询错误|开始或结束时间为空|startDate:{}|endDate:{}",entity.getStartDate(),entity.getEndDate());         return results;     }     long start=DateUtil.parseDateString(entity.getStartDate()).getTime();     long end=DateUtil.parseDateString(entity.getEndDate()).getTime();     if(start <=0l || end <=0l){         log.error("queryLisNewt|查询错误|解析日期错误:param:{}",entity);         return results;     }     List<String> indexNames = ElasticsearchUtil.getAllIndexNameByTimeStampRange(newIndexPrefix, start, end);     if(CollectionUtils.isEmpty(indexNames)){         log.error("queryLisNewt|查询错误|获取索引名称为空:{}",entity);         return results;     }     SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();     searchSourceBuilder             .query(queryBuilder)             .from(from)             .size(pageSize)             .sort(SortBuilders.fieldSort("createTime")             .order(SortOrder.DESC));      SearchRequest searchRequest = new SearchRequest(indexNames.toArray(new String[]{}),searchSourceBuilder);     RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();     builder.setHttpAsyncResponseConsumerFactory(new HttpAsyncResponseConsumerFactory 

.HeapBufferedResponseConsumerFactory(200*1024*1024));
try {
SearchResponse response = restHighLevelClient.search(searchRequest, builder.build());
SearchHits hits = response.getHits();
for (SearchHit hit : hits) {
Map map = hit.getSourceAsMap();
FeedbackReport obj = new FeedbackReport();
org.apache.commons.beanutils.BeanUtils.populate(obj, map);
results.add(obj);
}
} catch (Exception e) {
log.error(“queryLisNewt|查询错误|获取结果失败:{}”,e.getMessage(),e);
return results;
}

    return results; } 
 ### 3、数据写入   主要就是收到消息,然后还是根据时间戳写到索引里面就行了。    

public boolean insertRecordNew(FeedbackReport record) throws Exception {

    if(record.getCreateTime() <=0 ){         log.error("写入索引|写入失败|提交时间错误:{}|param:{}",record.getCreateTime(),JSON.toJSONString(record));         return false;     }     // 根据时间戳 获取索引名称     String indexName = ElasticsearchUtil.getIndexNameByTimeStamp(newIndexPrefix, record.getCreateTime());     log.info("写入索引|准备写入|获取索引名称|name:{}",indexName);     // 判断此内容是否存在     GetRequest getRequest = new GetRequest(indexName);     getRequest.id(record.getId());     boolean exists = restHighLevelClient.exists(getRequest, RequestOptions.DEFAULT);     if(exists){         log.error("写入索引|写入失败|内容重复|indexName:{}|time:{}|param:{}",indexName,record.getCreateTime(),JSON.toJSONString(record));         return false;     }     IndexRequest indexRequest = new IndexRequest(indexName, "\_doc", record.getId());     indexRequest.source(JSON.toJSONString(record),XContentType.JSON); 

网上学习资料一大堆,但如果学到的知识不成体系,遇到问题时只是浅尝辄止,不再深入研究,那么很难做到真正的技术提升。

需要这份系统化的资料的朋友,可以添加V获取:vip204888 (备注大数据)
img

一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

研究,那么很难做到真正的技术提升。**

需要这份系统化的资料的朋友,可以添加V获取:vip204888 (备注大数据)
[外链图片转存中…(img-hERLCiuB-1713281770331)]

一个人可以走的很快,但一群人才能走的更远!不论你是正从事IT行业的老鸟或是对IT行业感兴趣的新人,都欢迎加入我们的的圈子(技术交流、学习资源、职场吐槽、大厂内推、面试辅导),让我们一起学习成长!

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!