ElasticSearch(es)使用游标读取全部数据

发布于:2024-06-05 ⋅ 阅读:(74) ⋅ 点赞:(0)
import com.cdel.utils.EsClientServiceFactoryByENV;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;

import java.util.Map;

public class bulkGetFaqAccV4 {
    private static Logger logger = LogManager.getLogger(bulkGetFaqAccV4.class);


    private static TransportClient client = EsClientServiceFactoryByENV.getInstance().getClient();


    public static void main(String[] args) {
        //数据来源索引
        String srcIndex = "xxx";
        String desType = "xxx";
 
 		//游标形式进行读取
        SearchResponse response = client.prepareSearch(srcIndex)//对应索引
                .setQuery(QueryBuilders.matchAllQuery()).setTypes(desType)//对应索引type
                .setScroll(TimeValue.timeValueMinutes(20))
                .setSize(1000).execute().actionGet();

        long totalHits = response.getHits().getTotalHits();
        logger.info("---------一共有{}个", totalHits);

        //
        while (response.getHits().getHits().length > 0) {
            BulkRequestBuilder bulkRequest = client.prepareBulk();

            SearchHits hits = response.getHits();
            SearchHit[] result = hits.getHits();
            for (SearchHit hit : result) {
                Map<String, Object> sourceAsMap = hit.getSourceAsMap();
                //TODO 具体处理逻辑
            }


            //获取下一批次的结果
            String scrollId = response.getScrollId();
            response = client.prepareSearchScroll(scrollId)
                    .setScroll(TimeValue.timeValueMinutes(20))//设置查询context的存活时间
                    .execute().actionGet();
        }

        //清理游标
        ClearScrollRequest request = new ClearScrollRequest();
        request.addScrollId(response.getScrollId());
        ClearScrollResponse clearScrollResponse = client.clearScroll(request).actionGet();


        client.close();

    }
}

网站公告

今日签到

点亮在社区的每一天
去签到