1.深分页和search after 原理
深分页 (from/size) | search_after | |
数据定位 | 全局排序后跳过前 N 条 | 基于上一页最后一条的排序值定位 |
排序开销 | 每次请求重新全局排序 (O(N)) | 仅首次全局排序,后续游标跳转 (O(1)) |
内存消耗 | 堆内存存储完整结果集 (高风险OOM) | 无堆内存累积 (安全) |
分页深度限制 | from + size ≤ 10000 (默认限制) | 无深度限制 |
2. 性能对比
分页深度 |
深分页响应时间 |
search_after响应时间 |
1 | 100ms | 100ms |
100 | 300ms | 110ms |
1000 | 1500ms | 120ms |
10000 | 超时/报错 |
130ms |
3. 适用场景
深分页 | search_after | |
典型场景 | 小数据量随机跳页 | 大数据量连续翻页(如日志流) |
排序要求 | 任意排序字段 | 必须指定唯一排序字段(如时间戳+ID) |
跳页能力 | 支持任意页跳转 | 仅支持顺序翻页 |
4. Maven依赖配置
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.12.0</version>
</dependency>
5.ES分页服务类
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.json.JsonData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@Slf4j
@Service
public class EsSearchAfterService {
private final ElasticsearchClient esClient;
// 初始化ES客户端(通过构造函数注入)
public EsSearchAfterService(ElasticsearchClient esClient) {
this.esClient = esClient;
}
/**
* 分页查询(支持深度分页)
* @param indexName 索引名称
* @param query 查询条件
* @param sortField 主排序字段
* @param pageSize 每页大小
* @param pitId 时间点ID(首次查询传null)
* @param searchAfter 分页游标(首次查询传null)
* @return 分页结果(包含数据、下次分页游标、新的pitId)
*/
public PageResult<Object> searchWithPagination(
String indexName,
Query query,
String sortField,
int pageSize,
String pitId,
List<JsonData> searchAfter) throws IOException {
// 1. 创建或延续PIT上下文
String currentPitId = pitId;
if (currentPitId == null) {
CreatePitResponse pitResponse = esClient.createPit(c -> c
.index(indexName)
.keepAlive(a -> a.time("30m")));
currentPitId = pitResponse.id();
log.info("Created new PIT: {}", currentPitId);
}
try {
// 2. 构建SearchRequest
SearchRequest.Builder searchBuilder = new SearchRequest.Builder()
.size(pageSize)
.query(query)
.pit(p -> p.id(currentPitId).keepAlive(a -> a.time("30m")))
.sort(s -> s.field(f -> f.field(sortField).order(SortOrder.Desc)))
.sort(s -> s.field(f -> f.field("_id").order(SortOrder.Asc)));
if (searchAfter != null && !searchAfter.isEmpty()) {
searchBuilder.searchAfter(searchAfter);
}
// 3. 执行查询
SearchResponse<Object> response = esClient.search(searchBuilder.build(), Object.class);
// 4. 解析结果
List<Object> data = new ArrayList<>();
List<JsonData> nextSearchAfter = Collections.emptyList();
if (response.hits().hits() != null && !response.hits().hits().isEmpty()) {
List<Hit<Object>> hits = response.hits().hits();
for (Hit<Object> hit : hits) {
data.add(hit.source());
}
// 获取最后一个排序值
nextSearchAfter = hits.get(hits.size() - 1).sort();
}
return new PageResult<>(data, nextSearchAfter, currentPitId);
} catch (Exception e) {
// 清理无效PIT
if (currentPitId != null && !currentPitId.equals(pitId)) {
esClient.deletePit(d -> d.id(currentPitId));
}
throw new RuntimeException("ES查询失败", e);
}
}
/**
* 关闭PIT上下文
*/
public void closePit(String pitId) throws IOException {
if (pitId != null && !pitId.isEmpty()) {
DeletePitResponse response = esClient.deletePit(d -> d.id(pitId));
log.info("Closed PIT {}: {}", pitId, response.succeeded());
}
}
// 分页结果封装类
public record PageResult<T>(
List<T> data,
List<JsonData> nextSearchAfter,
String pitId
) {}
}
6. 业务层使用示例
@Service
@RequiredArgsConstructor
public class OrderQueryService {
private final EsSearchAfterService esService;
public PaginatedOrders queryOrders(int pageSize, String lastPitId, List<JsonData> lastSearchAfter) {
try {
// 1. 构建查询条件
Query query = Query.of(q -> q
.bool(b -> b
.must(m -> m.term(t -> t.field("status").value("paid")))
));
// 2. 执行分页查询
PageResult<Object> result = esService.searchWithPagination(
"orders",
query,
"order_time",
pageSize,
lastPitId,
lastSearchAfter
);
// 3. 转换为业务DTO
List<OrderDTO> orders = convertToDTO(result.data());
return new PaginatedOrders(orders, result.nextSearchAfter(), result.pitId());
} catch (IOException e) {
throw new BusinessException("订单查询失败", e);
}
}
// DTO转换逻辑(示例)
private List<OrderDTO> convertToDTO(List<Object> esSources) {
// 实现实际的转换逻辑
return Collections.emptyList();
}
// 分页结果DTO
public record PaginatedOrders(
List<OrderDTO> orders,
List<JsonData> nextSearchAfter,
String pitId
) {}
}
7. 控制器层实现
@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
public class OrderController {
private final OrderQueryService orderService;
@GetMapping
public ResponseEntity<?> getOrders(
@RequestParam(defaultValue = "20") int size,
@RequestParam(required = false) String pitId,
@RequestParam(required = false) List<String> searchAfter) {
try {
// 转换前端传来的searchAfter参数
List<JsonData> searchAfterParams = Optional.ofNullable(searchAfter)
.orElse(Collections.emptyList())
.stream()
.map(JsonData::of)
.toList();
PaginatedOrders result = orderService.queryOrders(size, pitId, searchAfterParams);
return ResponseEntity.ok()
.header("X-PIT-ID", result.pitId())
.body(Map.of(
"data", result.orders(),
"next_search_after", result.nextSearchAfter()
));
} catch (BusinessException e) {
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body(Map.of("error", e.getMessage()));
}
}
@DeleteMapping("/pit/{pitId}")
public ResponseEntity<?> closePitContext(@PathVariable String pitId) {
try {
orderService.closePit(pitId);
return ResponseEntity.ok().build();
} catch (Exception e) {
return ResponseEntity.internalServerError()
.body(Map.of("error", "PIT关闭失败"));
}
}
}
8. 生产环境关键配置
elasticsearch:
hosts: localhost:9200
username: elastic
password: your_password
connection-timeout: 30s
socket-timeout: 60s
9.ES客户端配置类
@Configuration
public class EsConfig {
@Value("${elasticsearch.hosts}")
private String hosts;
@Bean
public RestClient restClient() {
return RestClient.builder(HttpHost.create(hosts))
.setRequestConfigCallback(builder ->
builder.setConnectTimeout(30000)
.setSocketTimeout(60000))
.build();
}
@Bean
public ElasticsearchClient elasticsearchClient(RestClient restClient) {
ElasticsearchTransport transport = new RestClientTransport(
restClient,
new JacksonJsonpMapper()
);
return new ElasticsearchClient(transport);
}
}
10. 前端交互示例
无限滚动实现(React)
import React, { useState, useEffect } from 'react';
const OrderList = () => {
const [orders, setOrders] = useState([]);
const [pitId, setPitId] = useState(null);
const [searchAfter, setSearchAfter] = useState(null);
const [loading, setLoading] = useState(false);
const loadMore = async () => {
setLoading(true);
try {
const params = new URLSearchParams({
size: 20,
...(pitId && { pitId }),
...(searchAfter && { searchAfter: JSON.stringify(searchAfter) })
});
const response = await fetch(`/api/orders?${params}`);
const { data, next_search_after } = await response.json();
setOrders(prev => [...prev, ...data]);
setSearchAfter(next_search_after);
setPitId(response.headers.get('X-PIT-ID'));
} finally {
setLoading(false);
}
};
// 组件卸载时清理PIT
useEffect(() => {
return () => {
if (pitId) {
fetch(`/api/orders/pit/${pitId}`, { method: 'DELETE' });
}
};
}, [pitId]);
return (
<div>
{/* 订单列表渲染 */}
<button onClick={loadMore} disabled={loading}>
{loading ? '加载中...' : '加载更多'}
</button>
</div>
);
};