java实现Elasticsearch的数据导出与导入
最近组内有个需求要把本地的es测试数据导入到内网中,看了网上很多说用elasticdump插件,不过这个还需要安装个人觉得有点麻烦,在网上找了一下代码自己做了下优化还挺方便,适合刚开始学习的人,下面给大家分享一下。
1、pom.xml引入依赖如下:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.4.3</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.4.3</version>
</dependency>
2、在java中用scroll查询es,数据明细输出为json格式文件
//传入待查的es索引、类型、输出文件路径
public static void scrollQueryTest(String index, String type, File file) throws IOException {
// 1. 创建查询对象
SearchRequest searchRequest = new SearchRequest(index);//指定索引
searchRequest.types(type);//指定类型
searchRequest.scroll(TimeValue.timeValueMinutes(1l));//指定存在内存的时长为1分钟
// 2. 封装查询条件
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.size(2);
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
searchRequest.source(searchSourceBuilder);
// 3.执行查询
// client执行
HttpHost httpHost = new HttpHost("源es的IP", 9200);
RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
RestHighLevelClient restHighLevelClient = new RestHighLevelClient(restClientBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
//获取scorllId
String scrollId = searchResponse.getScrollId();
System.out.println(scrollId);
//创建文件输出流
BufferedWriter out = new BufferedWriter(new FileWriter(file, true));
// 4.获取数据
SearchHit[] hits = searchResponse.getHits().getHits();
//第一页的数据
for (int i = 0; i < hits.length; i++) {
String json = hits[i].getSourceAsString();
out.write(json);//导出的文件以json格式写入到文件中
out.write("\n");
}
System.out.println("首页查询输出结束,文件存储路径=====" + file);
//获取全部数据
while (true) {
//创建SearchScrollRequest对象
SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
searchScrollRequest.scroll(TimeValue.timeValueMinutes(1l));//设置1分钟
SearchResponse scroll = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
SearchHit[] hits1 = scroll.getHits().getHits();
if (hits1 != null && hits1.length > 0) {
System.out.println("hit1======="+hits1.length+" "+hits1);
for (int i = 0; i < hits1.length; i++) {
String json = hits1[i].getSourceAsString();
out.write(json);//导出的文件以json格式写入到文件中
out.write("\n");
}
System.out.println("本次查询输出结束,文件存储路径=====" + file);
System.out.println("------------下一页--------------");
} else {
System.out.println("------------结束--------------");
// 关闭文件流
out.close();
break;
}
}
// 删除ScrollId
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
System.out.println("删除scroll" + clearScrollResponse);
// 关闭restHighLevelClient链接
restHighLevelClient.close();
}
3、java读取本地文件,文件数据明细存入es
//传入文件路径、索引、es中的type ,RestHighLevelClient连接统一使用esContenct()方法
public static void inputEsData(File file, String index, String type) throws IOException {
BufferedReader br = new BufferedReader(new FileReader(file));//读取刚才导出的ES数据
String json;
int count = 1;
BulkRequest bulkRequest = new BulkRequest();
BulkResponse indexResponse;
while ((json = br.readLine()) != null) {
bulkRequest.add(new IndexRequest(index).source(json, XContentType.JSON).type(type)); //先把数据加入到队列中
//每1000条提交一次
if (count % 1000 == 0) {
indexResponse = esContenct().bulk(bulkRequest, RequestOptions.DEFAULT);
if (indexResponse != null) {
System.out.println("=================indexResponse==============" + indexResponse);
System.out.println("本次es提交的记录数=============" + count);
}
count++;
}
}
System.out.println("插入完毕");
br.close();
esContenct().close();
}
3、第三步inputEsData方法用到的esContenct连接方法和查询条件统一抽出来放在下面的方法中。
public static RestHighLevelClient esContenct() {
// 创建Client连接对象
System.out.println("===================连接RestHighLevelClient======================");
String[] ips = {"源es的IP:9200"};
HttpHost[] httpHosts = new HttpHost[ips.length];
for (int i = 0; i < ips.length; i++) {
httpHosts[i] = HttpHost.create(ips[i]);
}
RestClientBuilder builder = RestClient.builder(httpHosts);
return new RestHighLevelClient(builder);
}
//构建查询条件
public static SearchRequest searchRequest(String index, String type) {
//设置要查询的索引 和 type
SearchRequest searchRequest = new SearchRequest(index);
searchRequest.types(type);
//查询构建工具
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
//查询构造条件
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
searchSourceBuilder.timeout(new TimeValue(5000));
searchSourceBuilder.from(0);//分页查询,设置起始下标,从0开始
searchSourceBuilder.size(1000);//每页显示个数
searchRequest.source(searchSourceBuilder); //将请求体加入到请求中
return searchRequest;
}
4、创建main方法分别调用
public class EsDataExchange {
public static void main(String[] args) throws IOException {
File file = new File("/Users/XXXX/Desktop/outes.json");
String index = "源es索引";
String type = "源es类型";
// es数据导入
inputEsData(file, index, type);
//es数据导出
scrollQueryTest(index, type, file);
}
分享是人们与社会产生联系的一种方式,本质上是一种互利行为,在互利中,选择、巩固和发展彼此的社会关系。完全没有分享,社会将不能成立。分享是构成社会的要素之一。