Search After+PIT 解决ES深度分页问题

发布于:2025-05-08 ⋅ 阅读:(17) ⋅ 点赞:(0)

1.深分页和search after 原理 

深分页 (from/size) search_after
数据定位 全局排序后跳过前 N 条 基于上一页最后一条的排序值定位
排序开销 每次请求重新全局排序 (O(N)) 仅首次全局排序,后续游标跳转 (O(1))
内存消耗 堆内存存储完整结果集 (高风险OOM) 无堆内存累积 (安全)
分页深度限制 from + size ≤ 10000 (默认限制) 无深度限制

2. 性能对比 

分页深度 
深分页响应时间 
search_after响应时间
1 100ms 100ms
100 300ms 110ms
1000 1500ms 120ms
10000
超时/报错
130ms
3. 适用场景
深分页 search_after
典型场景 小数据量随机跳页 大数据量连续翻页(如日志流)
排序要求 任意排序字段 必须指定唯一排序字段(如时间戳+ID)
跳页能力 支持任意页跳转 仅支持顺序翻页

4. Maven依赖配置

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-java</artifactId>
    <version>8.12.0</version>
</dependency>

5.ES分页服务类

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.json.JsonData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

@Slf4j
@Service
public class EsSearchAfterService {

    private final ElasticsearchClient esClient;

    // 初始化ES客户端(通过构造函数注入)
    public EsSearchAfterService(ElasticsearchClient esClient) {
        this.esClient = esClient;
    }

    /**
     * 分页查询(支持深度分页)
     * @param indexName 索引名称
     * @param query 查询条件
     * @param sortField 主排序字段
     * @param pageSize 每页大小
     * @param pitId 时间点ID(首次查询传null)
     * @param searchAfter 分页游标(首次查询传null)
     * @return 分页结果(包含数据、下次分页游标、新的pitId)
     */
    public PageResult<Object> searchWithPagination(
            String indexName,
            Query query,
            String sortField,
            int pageSize,
            String pitId,
            List<JsonData> searchAfter) throws IOException {

        // 1. 创建或延续PIT上下文
        String currentPitId = pitId;
        if (currentPitId == null) {
            CreatePitResponse pitResponse = esClient.createPit(c -> c
                    .index(indexName)
                    .keepAlive(a -> a.time("30m")));
            currentPitId = pitResponse.id();
            log.info("Created new PIT: {}", currentPitId);
        }

        try {
            // 2. 构建SearchRequest
            SearchRequest.Builder searchBuilder = new SearchRequest.Builder()
                    .size(pageSize)
                    .query(query)
                    .pit(p -> p.id(currentPitId).keepAlive(a -> a.time("30m")))
                    .sort(s -> s.field(f -> f.field(sortField).order(SortOrder.Desc)))
                    .sort(s -> s.field(f -> f.field("_id").order(SortOrder.Asc)));

            if (searchAfter != null && !searchAfter.isEmpty()) {
                searchBuilder.searchAfter(searchAfter);
            }

            // 3. 执行查询
            SearchResponse<Object> response = esClient.search(searchBuilder.build(), Object.class);
            
            // 4. 解析结果
            List<Object> data = new ArrayList<>();
            List<JsonData> nextSearchAfter = Collections.emptyList();
            
            if (response.hits().hits() != null && !response.hits().hits().isEmpty()) {
                List<Hit<Object>> hits = response.hits().hits();
                for (Hit<Object> hit : hits) {
                    data.add(hit.source());
                }
                // 获取最后一个排序值
                nextSearchAfter = hits.get(hits.size() - 1).sort();
            }

            return new PageResult<>(data, nextSearchAfter, currentPitId);

        } catch (Exception e) {
            // 清理无效PIT
            if (currentPitId != null && !currentPitId.equals(pitId)) {
                esClient.deletePit(d -> d.id(currentPitId));
            }
            throw new RuntimeException("ES查询失败", e);
        }
    }

    /**
     * 关闭PIT上下文
     */
    public void closePit(String pitId) throws IOException {
        if (pitId != null && !pitId.isEmpty()) {
            DeletePitResponse response = esClient.deletePit(d -> d.id(pitId));
            log.info("Closed PIT {}: {}", pitId, response.succeeded());
        }
    }

    // 分页结果封装类
    public record PageResult<T>(
            List<T> data,
            List<JsonData> nextSearchAfter,
            String pitId
    ) {}
}

6. 业务层使用示例

@Service
@RequiredArgsConstructor
public class OrderQueryService {

    private final EsSearchAfterService esService;

    public PaginatedOrders queryOrders(int pageSize, String lastPitId, List<JsonData> lastSearchAfter) {
        try {
            // 1. 构建查询条件
            Query query = Query.of(q -> q
                    .bool(b -> b
                            .must(m -> m.term(t -> t.field("status").value("paid")))
                    ));

            // 2. 执行分页查询
            PageResult<Object> result = esService.searchWithPagination(
                    "orders",
                    query,
                    "order_time",
                    pageSize,
                    lastPitId,
                    lastSearchAfter
            );

            // 3. 转换为业务DTO
            List<OrderDTO> orders = convertToDTO(result.data());

            return new PaginatedOrders(orders, result.nextSearchAfter(), result.pitId());

        } catch (IOException e) {
            throw new BusinessException("订单查询失败", e);
        }
    }

    // DTO转换逻辑(示例)
    private List<OrderDTO> convertToDTO(List<Object> esSources) {
        // 实现实际的转换逻辑
        return Collections.emptyList();
    }

    // 分页结果DTO
    public record PaginatedOrders(
            List<OrderDTO> orders,
            List<JsonData> nextSearchAfter,
            String pitId
    ) {}
}

7. 控制器层实现

@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
public class OrderController {

    private final OrderQueryService orderService;

    @GetMapping
    public ResponseEntity<?> getOrders(
            @RequestParam(defaultValue = "20") int size,
            @RequestParam(required = false) String pitId,
            @RequestParam(required = false) List<String> searchAfter) {

        try {
            // 转换前端传来的searchAfter参数
            List<JsonData> searchAfterParams = Optional.ofNullable(searchAfter)
                    .orElse(Collections.emptyList())
                    .stream()
                    .map(JsonData::of)
                    .toList();

            PaginatedOrders result = orderService.queryOrders(size, pitId, searchAfterParams);

            return ResponseEntity.ok()
                    .header("X-PIT-ID", result.pitId())
                    .body(Map.of(
                            "data", result.orders(),
                            "next_search_after", result.nextSearchAfter()
                    ));

        } catch (BusinessException e) {
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                    .body(Map.of("error", e.getMessage()));
        }
    }

    @DeleteMapping("/pit/{pitId}")
    public ResponseEntity<?> closePitContext(@PathVariable String pitId) {
        try {
            orderService.closePit(pitId);
            return ResponseEntity.ok().build();
        } catch (Exception e) {
            return ResponseEntity.internalServerError()
                    .body(Map.of("error", "PIT关闭失败"));
        }
    }
}

8. 生产环境关键配置

elasticsearch:
  hosts: localhost:9200
  username: elastic
  password: your_password
  connection-timeout: 30s
  socket-timeout: 60s

9.ES客户端配置类

@Configuration
public class EsConfig {

    @Value("${elasticsearch.hosts}")
    private String hosts;

    @Bean
    public RestClient restClient() {
        return RestClient.builder(HttpHost.create(hosts))
                .setRequestConfigCallback(builder ->
                        builder.setConnectTimeout(30000)
                                .setSocketTimeout(60000))
                .build();
    }

    @Bean
    public ElasticsearchClient elasticsearchClient(RestClient restClient) {
        ElasticsearchTransport transport = new RestClientTransport(
                restClient,
                new JacksonJsonpMapper()
        );
        return new ElasticsearchClient(transport);
    }
}

10. 前端交互示例

无限滚动实现(React)

import React, { useState, useEffect } from 'react';

const OrderList = () => {
    const [orders, setOrders] = useState([]);
    const [pitId, setPitId] = useState(null);
    const [searchAfter, setSearchAfter] = useState(null);
    const [loading, setLoading] = useState(false);

    const loadMore = async () => {
        setLoading(true);
        try {
            const params = new URLSearchParams({
                size: 20,
                ...(pitId && { pitId }),
                ...(searchAfter && { searchAfter: JSON.stringify(searchAfter) })
            });

            const response = await fetch(`/api/orders?${params}`);
            const { data, next_search_after } = await response.json();
            
            setOrders(prev => [...prev, ...data]);
            setSearchAfter(next_search_after);
            setPitId(response.headers.get('X-PIT-ID'));

        } finally {
            setLoading(false);
        }
    };

    // 组件卸载时清理PIT
    useEffect(() => {
        return () => {
            if (pitId) {
                fetch(`/api/orders/pit/${pitId}`, { method: 'DELETE' });
            }
        };
    }, [pitId]);

    return (
        <div>
            {/* 订单列表渲染 */}
            <button onClick={loadMore} disabled={loading}>
                {loading ? '加载中...' : '加载更多'}
            </button>
        </div>
    );
};


网站公告

今日签到

点亮在社区的每一天
去签到