目录
聚合(aggregations
)可以让我们极其方便的实现对数据的统计、分析、运算。
DSL 实现数据聚合
Bucket聚合
场景:统计所有商品中共有哪些商品品牌,其实就是以品牌(brand)字段对数据分组。
# bucket聚合
GET /items/_search
{
"size": 0,
"aggs": {
"cate_aggs": {
"terms": {
"field": "brand",
"size": 10
}
}
}
}
实现效果如下:(对品牌进行分组)
带条件聚合
场景:查找按品牌(brand)分组,且价格(price)在(100,2000)的手机(name)
# 带条件聚合
GET /items/_search
{
"query": {
"bool": {
"filter": [
{
"term": {
"name": "手机"
}
},
{
"range": {
"price": {
"gte": 10000,
"lte": 2000000
}
}
}
]
}
},
"size": 0,
"aggs": {
"brand_agg": {
"terms": {
"field": "brand",
"size": 10
}
}
}
}
实现效果如下:
Metric聚合
场景:查找按照品牌分类的手机的价格的最小值、最大值、平均值。
# metric聚合
GET /items/_search
{
"query": {
"bool": {
"filter": [
{
"term": {
"name": "手机"
}
}
]
}
},
"size": 0,
"aggs": {
"brand_agg": {
"terms": {
"field": "category",
"size": 10
},
"aggs": {
"price_stats": {
"stats": {
"field": "price"
}
}
}
}
}
}
实现效果如下:
Java API 实现数据聚合
Bucket聚合
/**
* 聚合
* @throws IOException
*/
@Test
void testAggs() throws IOException {
// 1.创建Request
SearchRequest request = new SearchRequest("items");
// 2.组织请求参数
// 2.1.query条件
request.source().size(0);
// 2.2.聚合条件
String brandAggName = "brandAgg";
request.source().aggregation(AggregationBuilders.terms(brandAggName).field("brand").size(5));
// 3.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析响应
Aggregations aggregations = response.getAggregations();
// 4.1.获取聚合结果
Terms brandTerms = aggregations.get(brandAggName);
// 4.2.获取buckets
List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
// 4.3.遍历buckets
for (Terms.Bucket bucket : buckets) {
// 4.4.获取key,就是品牌名称
String brand = bucket.getKeyAsString();
// 4.5.获取doc_count,就是品牌对应的文档数量
long docCount = bucket.getDocCount();
System.out.println("品牌:" + brand + ",数量:" + docCount);
}
}
实现效果如下:
带条件聚合
/**
* 带条件聚合查询
* @throws IOException
*/
@Test
void testAggs() throws IOException {
// 1.创建Request
SearchRequest request = new SearchRequest("items");
// 2.组织请求参数
request.source().query(QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("name", "手机"))
.filter(QueryBuilders.rangeQuery("price").lt(10000000))
);
// 2.1.聚合条件
String brandAggName = "brandAgg";
request.source().aggregation(AggregationBuilders.terms(brandAggName).field("brand").size(5));
// 3.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析响应
Aggregations aggregations = response.getAggregations();
// 4.1.获取聚合结果
Terms brandTerms = aggregations.get(brandAggName);
for (Terms.Bucket bucket : brandTerms.getBuckets()) {
// 4.2.获取key,就是品牌名称
String brand = bucket.getKeyAsString();
// 4.3.获取doc_count,就是品牌对应的文档数量
long docCount = bucket.getDocCount();
System.out.println("品牌:" + brand + ",数量:" + docCount);
}
}
实现效果如下:
metric聚合
/**
* metric聚合
* @throws IOException
*/
@Test
void testMetricAggs() throws IOException {
// 1.创建Request
SearchRequest request = new SearchRequest("items");
// 2.组织请求参数
request.source().query(QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("name", "手机"))
);
// 2.1.聚合条件
String brandAggName = "brandAgg";
request.source().aggregation(AggregationBuilders.terms(brandAggName).field("brand").size(5)
.subAggregation(AggregationBuilders.avg("priceAvg").field("price"))
.subAggregation(AggregationBuilders.max("priceMax").field("price"))
.subAggregation(AggregationBuilders.min("priceMin").field("price"))
);
// 3.发送请求
SearchResponse response = client.search(request, RequestOptions.DEFAULT);
// 4.解析响应
Aggregations aggregations = response.getAggregations();
// 4.1.获取聚合结果
Terms brandTerms = aggregations.get(brandAggName);
for (Terms.Bucket bucket : brandTerms.getBuckets()) {
// 4.2.获取key,就是品牌名称
String brand = bucket.getKeyAsString();
// 4.3.获取doc_count,就是品牌对应的文档数量
long docCount = bucket.getDocCount();
// 4.4.获取子聚合结果
Avg priceAvg = bucket.getAggregations().get("priceAvg");
Max priceMax = bucket.getAggregations().get("priceMax");
Min priceMin = bucket.getAggregations().get("priceMin");
System.out.println("品牌:" + brand + ",数量:" + docCount + "," +
"平均价格:" + priceAvg.getValue() + ",最高价格:" + priceMax.getValue() + ",最低价格:" + priceMin.getValue());
}
}
实现效果如下: