Elasticsearch:简化大数据分析,使用Elasticsearch进行高效数据聚合

发布于:2025-03-06 ⋅ 阅读:(17) ⋅ 点赞:(0)

目录

DSL 实现数据聚合

Bucket聚合

带条件聚合

Metric聚合

Java API 实现数据聚合

Bucket聚合

带条件聚合

metric聚合


聚合(aggregations)可以让我们极其方便的实现对数据的统计、分析、运算。

DSL 实现数据聚合

Bucket聚合

场景:统计所有商品中共有哪些商品品牌,其实就是以品牌(brand)字段对数据分组。

# bucket聚合
GET /items/_search
{
  "size": 0,
  "aggs": {
    "cate_aggs": {
      "terms": {
        "field": "brand",
        "size": 10
      }
    }
  }
}

实现效果如下:(对品牌进行分组)

带条件聚合

场景:查找按品牌(brand)分组,且价格(price)在(100,2000)的手机(name)

# 带条件聚合
GET /items/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "name": "手机"
          }
        },
        {
          "range": {
            "price": {
              "gte": 10000,
              "lte": 2000000
            }
          }
        }
      ]
    }
  },
  "size": 0,
  "aggs": {
    "brand_agg": {
      "terms": {
        "field": "brand",
        "size": 10
      }
    }
  }
}

实现效果如下:

Metric聚合

场景:查找按照品牌分类的手机的价格的最小值、最大值、平均值。

# metric聚合
GET /items/_search
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "name": "手机"
          }
        }
      ]
    }
  },
  "size": 0,
  "aggs": {
    "brand_agg": {
      "terms": {
        "field": "category",
        "size": 10
      },
      "aggs": {
        "price_stats": {
          "stats": {
            "field": "price"
          }
        }
      }
    }
  }
}

实现效果如下:

Java API 实现数据聚合

Bucket聚合

 /**
     * 聚合
     * @throws IOException
     */
    @Test
    void testAggs() throws IOException {
        // 1.创建Request
        SearchRequest request = new SearchRequest("items");
        // 2.组织请求参数
        // 2.1.query条件
        request.source().size(0);
        // 2.2.聚合条件
        String brandAggName = "brandAgg";
        request.source().aggregation(AggregationBuilders.terms(brandAggName).field("brand").size(5));
        // 3.发送请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4.解析响应
        Aggregations aggregations = response.getAggregations();
        // 4.1.获取聚合结果
        Terms brandTerms = aggregations.get(brandAggName);
        // 4.2.获取buckets
        List<? extends Terms.Bucket> buckets = brandTerms.getBuckets();
        // 4.3.遍历buckets
        for (Terms.Bucket bucket : buckets) {
            // 4.4.获取key,就是品牌名称
            String brand = bucket.getKeyAsString();
            // 4.5.获取doc_count,就是品牌对应的文档数量
            long docCount = bucket.getDocCount();
            System.out.println("品牌:" + brand + ",数量:" + docCount);
        }
    }

实现效果如下:

带条件聚合

 /**
     * 带条件聚合查询
     * @throws IOException
     */
    @Test
    void testAggs() throws IOException {
        // 1.创建Request
        SearchRequest request = new SearchRequest("items");
        // 2.组织请求参数
        request.source().query(QueryBuilders.boolQuery()
                .must(QueryBuilders.matchQuery("name", "手机"))
                .filter(QueryBuilders.rangeQuery("price").lt(10000000))
        );
        // 2.1.聚合条件
        String brandAggName = "brandAgg";
        request.source().aggregation(AggregationBuilders.terms(brandAggName).field("brand").size(5));
        // 3.发送请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4.解析响应
        Aggregations aggregations = response.getAggregations();
        // 4.1.获取聚合结果
        Terms brandTerms = aggregations.get(brandAggName);
        for (Terms.Bucket bucket : brandTerms.getBuckets()) {
            // 4.2.获取key,就是品牌名称
            String brand = bucket.getKeyAsString();
            // 4.3.获取doc_count,就是品牌对应的文档数量
            long docCount = bucket.getDocCount();
            System.out.println("品牌:" + brand + ",数量:" + docCount);
        }

    }

实现效果如下:

metric聚合

  /**
     *  metric聚合
     * @throws IOException
     */
    @Test
    void testMetricAggs() throws IOException {
        // 1.创建Request
        SearchRequest request = new SearchRequest("items");
        // 2.组织请求参数
        request.source().query(QueryBuilders.boolQuery()
                .must(QueryBuilders.matchQuery("name", "手机"))
        );
        // 2.1.聚合条件
        String brandAggName = "brandAgg";
        request.source().aggregation(AggregationBuilders.terms(brandAggName).field("brand").size(5)
                .subAggregation(AggregationBuilders.avg("priceAvg").field("price"))
                .subAggregation(AggregationBuilders.max("priceMax").field("price"))
                .subAggregation(AggregationBuilders.min("priceMin").field("price"))
        );
        // 3.发送请求
        SearchResponse response = client.search(request, RequestOptions.DEFAULT);
        // 4.解析响应
        Aggregations aggregations = response.getAggregations();
        // 4.1.获取聚合结果
        Terms brandTerms = aggregations.get(brandAggName);
        for (Terms.Bucket bucket : brandTerms.getBuckets()) {
            // 4.2.获取key,就是品牌名称
            String brand = bucket.getKeyAsString();
            // 4.3.获取doc_count,就是品牌对应的文档数量
            long docCount = bucket.getDocCount();
            // 4.4.获取子聚合结果
            Avg priceAvg = bucket.getAggregations().get("priceAvg");
            Max priceMax = bucket.getAggregations().get("priceMax");
            Min priceMin = bucket.getAggregations().get("priceMin");
            System.out.println("品牌:" + brand + ",数量:" + docCount + "," +
            "平均价格:" + priceAvg.getValue() + ",最高价格:" + priceMax.getValue() + ",最低价格:" + priceMin.getValue());
        }

    }

实现效果如下:


网站公告

今日签到

点亮在社区的每一天
去签到