java实现Elasticsearch的数据导出与导入

最近组内有个需求要把本地的es测试数据导入到内网中,看了网上很多说用elasticdump插件,不过这个还需要安装个人觉得有点麻烦,在网上找了一下代码自己做了下优化还挺方便,适合刚开始学习的人,下面给大家分享一下。




1、pom.xml引入依赖如下:

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>6.4.3</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch</artifactId>
    <version>6.4.3</version>
</dependency>

2、在java中用scroll查询es,数据明细输出为json格式文件

 //传入待查的es索引、类型、输出文件路径
public static void scrollQueryTest(String index, String type, File file) throws IOException {
        //  1. 创建查询对象
        SearchRequest searchRequest = new SearchRequest(index);//指定索引
        searchRequest.types(type);//指定类型
        searchRequest.scroll(TimeValue.timeValueMinutes(1l));//指定存在内存的时长为1分钟
	      //  2. 封装查询条件
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        searchSourceBuilder.size(2);
        searchSourceBuilder.query(QueryBuilders.matchAllQuery());
        searchRequest.source(searchSourceBuilder);
        //  3.执行查询
        // client执行
        HttpHost httpHost = new HttpHost("源es的IP", 9200);
        RestClientBuilder restClientBuilder = RestClient.builder(httpHost);
        RestHighLevelClient restHighLevelClient = new RestHighLevelClient(restClientBuilder);
        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        //获取scorllId
        String scrollId = searchResponse.getScrollId();
        System.out.println(scrollId);
        //创建文件输出流
        BufferedWriter out = new BufferedWriter(new FileWriter(file, true));
       // 4.获取数据
        SearchHit[] hits = searchResponse.getHits().getHits();
        //第一页的数据
        for (int i = 0; i < hits.length; i++) {
            String json = hits[i].getSourceAsString();
            out.write(json);//导出的文件以json格式写入到文件中
            out.write("\n");
        }
        System.out.println("首页查询输出结束,文件存储路径=====" + file);
        //获取全部数据
        while (true) {
            //创建SearchScrollRequest对象
            SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
            searchScrollRequest.scroll(TimeValue.timeValueMinutes(1l));//设置1分钟
            SearchResponse scroll = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
            SearchHit[] hits1 = scroll.getHits().getHits();
            if (hits1 != null && hits1.length > 0) {
                System.out.println("hit1======="+hits1.length+"   "+hits1);
                for (int i = 0; i < hits1.length; i++) {
                    String json = hits1[i].getSourceAsString();
                    out.write(json);//导出的文件以json格式写入到文件中
                    out.write("\n");
                }
                System.out.println("本次查询输出结束,文件存储路径=====" + file);
                System.out.println("------------下一页--------------");

            } else {
                System.out.println("------------结束--------------");
             // 关闭文件流
                out.close();
                break;
            }
        }
        //      删除ScrollId
        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
        clearScrollRequest.addScrollId(scrollId);
        ClearScrollResponse clearScrollResponse = restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
        System.out.println("删除scroll" + clearScrollResponse);
        //    关闭restHighLevelClient链接
        restHighLevelClient.close();
    }

3、java读取本地文件,文件数据明细存入es

//传入文件路径、索引、es中的type ,RestHighLevelClient连接统一使用esContenct()方法
public static void inputEsData(File file, String index, String type) throws IOException {
    BufferedReader br = new BufferedReader(new FileReader(file));//读取刚才导出的ES数据
    String json;
    int count = 1;
    BulkRequest bulkRequest = new BulkRequest();
    BulkResponse indexResponse;
    while ((json = br.readLine()) != null) {
        bulkRequest.add(new IndexRequest(index).source(json, XContentType.JSON).type(type)); //先把数据加入到队列中
        //每1000条提交一次
        if (count % 1000 == 0) {
            indexResponse = esContenct().bulk(bulkRequest, RequestOptions.DEFAULT);
            if (indexResponse != null) {
                System.out.println("=================indexResponse==============" + indexResponse);
                System.out.println("本次es提交的记录数=============" + count);
            }
            count++;
        }
    }
    System.out.println("插入完毕");
    br.close();
    esContenct().close();
}

3、第三步inputEsData方法用到的esContenct连接方法和查询条件统一抽出来放在下面的方法中。

 public static RestHighLevelClient esContenct() {
        // 创建Client连接对象
        System.out.println("===================连接RestHighLevelClient======================");
        String[] ips = {"源es的IP:9200"};
        HttpHost[] httpHosts = new HttpHost[ips.length];
        for (int i = 0; i < ips.length; i++) {
            httpHosts[i] = HttpHost.create(ips[i]);
        }
        RestClientBuilder builder = RestClient.builder(httpHosts);
        return new RestHighLevelClient(builder);
    }

    //构建查询条件
    public static SearchRequest searchRequest(String index, String type) {
        //设置要查询的索引 和 type
        SearchRequest searchRequest = new SearchRequest(index);
        searchRequest.types(type);
        //查询构建工具
        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
        //查询构造条件
        searchSourceBuilder.query(QueryBuilders.matchAllQuery());
        searchSourceBuilder.timeout(new TimeValue(5000));
        searchSourceBuilder.from(0);//分页查询,设置起始下标,从0开始
        searchSourceBuilder.size(1000);//每页显示个数
        searchRequest.source(searchSourceBuilder); //将请求体加入到请求中
        return searchRequest;
    }

4、创建main方法分别调用

 public class EsDataExchange {
    public static void main(String[] args) throws IOException {
        File file = new File("/Users/XXXX/Desktop/outes.json");
        String index = "源es索引";
        String type = "源es类型";
			// es数据导入
        inputEsData(file, index, type);
      //es数据导出
        scrollQueryTest(index, type, file);
    }

分享是人们与社会产生联系的一种方式,本质上是一种互利行为,在互利中,选择、巩固和发展彼此的社会关系。完全没有分享,社会将不能成立。分享是构成社会的要素之一。

相关文章

Java之文件删除 java 删除文件夹下的文件

package com.biubiu.utils; import java.io.File; public class Utils { /** * 删除文件 *...

delete()方法删除文件及目录 delete all files

在Java中,可以使用delete()方法来删除文件或目录。该方法是File类中的一个方法,接受一个File对象作为参数,并尝试删除该文件或目录。如果该文件或目录成功删除,该方法返回true,否则返回...

教你如何彻底干净的卸载Java 如何完全卸载java

一、删除环境变量右键此电脑,属性高级系统设置点击环境变量找到Java相关的系统变量,将其删除在系统变量中找到path,将其中与Java相关的删除二、卸载Java打开控制面板“程序和功能”,卸载与Jav...

已满的C盘如何清理无用的文件 c盘已满怎么清理

一、假设电脑(C盘总共80G)为例:1;Program Files 占用了 1.53G2;Program Files(X86) 占用了 4.68G3;ProgramData 占用了 2.8G4; Wi...

快速释放C盘空间,从清理Windows文件夹开始

新安装的系统还没用多久,突然提示系统备份空间不足?光是 C 盘中 Windows 文件夹就已经占用了54.3G空间,再仔细看了一下,其中 WinSxS 文件夹占用了22.1G空间。C盘 Windows...

五年开发,你还记得Java基础代码中的文件I/O操作么?

文件I/O操作作为Java开发中的一个重要组成部分,主要负责Java代码中对于文件的读写操作,可能对于一些长时间不使用Java文件API的开发者来说早都忘记了相关的操作,下面我们就来详细介绍一下Jav...