ES常识8:ES8.X如何实现热词统计

发布于:2025-05-16 ⋅ 阅读:(24) ⋅ 点赞:(0)

基于 Elasticsearch 8.x 实现热门搜索词推荐,需结合 数据采集、索引设计、聚合统计、缓存优化 等环节。以下是分步骤的详细实现方案:

一、数据采集与存储设计

1. 确定需记录的字段

为了统计热门搜索词,需采集用户搜索行为的基础信息。推荐记录以下字段:

字段名 类型 说明
search_keyword keyword 用户输入的搜索词(必须使用 keyword 类型,避免分词影响统计)。
timestamp date 搜索时间(用于时间窗口统计,如“近 7 天热门词”)。
user_id keyword 用户唯一标识(可选,用于去重统计独立用户搜索量)。
is_clicked boolean 是否点击搜索结果(可选,作为热门度的加权指标)。
search_duration integer 搜索耗时(毫秒,可选,用于评估搜索质量)。
2. 设计搜索日志索引

在 ES 中创建索引 search_logs,存储用户搜索行为数据。索引配置示例:

PUT /search_logs
{
  "settings": {
    "index": {
      "number_of_shards": 3,       // 根据数据量调整分片数(建议单分片 10-50GB)
      "number_of_replicas": 1,     // 1 副本保证高可用
      "refresh_interval": "1s"    // 近实时搜索(可调整为 5s 平衡写入性能)
    }
  },
  "mappings": {
    "properties": {
      "search_keyword": { "type": "keyword", "doc_values": true },  // 启用 doc_values 加速聚合
      "timestamp": { "type": "date", "format": "epoch_millis" },
      "user_id": { "type": "keyword", "index": false },             // 不参与搜索,仅用于聚合
      "is_clicked": { "type": "boolean" },
      "search_duration": { "type": "integer" }
    }
  }
}

二、数据写入与采集

用户每次搜索时,将搜索行为数据写入 search_logs 索引。示例(使用 ES 客户端):

from elasticsearch import Elasticsearch
from datetime import datetime

es = Elasticsearch(["http://es-node-1:9200"])

def log_search(keyword: str, user_id: str, is_clicked: bool, duration: int):
    doc = {
        "search_keyword": keyword,
        "timestamp": int(datetime.now().timestamp() * 1000),  # 毫秒级时间戳
        "user_id": user_id,
        "is_clicked": is_clicked,
        "search_duration": duration
    }
    es.index(index="search_logs", document=doc)

# 模拟用户搜索行为(调用示例)
log_search("机器学习教程", "user_123", is_clicked=True, duration=850)

三、热门搜索词统计(核心逻辑)

使用 ES 的 terms 聚合统计搜索词的频率,并结合时间窗口、加权指标优化结果。

1. 基础版:近 7 天热门搜索词(按出现次数排序)

统计近 7 天搜索次数最多的前 10 个关键词:

GET /search_logs/_search
{
  "size": 0,  // 不返回原始文档
  "query": {
    "range": {
      "timestamp": {
        "gte": "now-7d",  // 近 7 天
        "lte": "now"
      }
    }
  },
  "aggs": {
    "top_keywords": {
      "terms": {
        "field": "search_keyword",
        "size": 10,          // 返回前 10 个词
        "order": { "_count": "desc" },  // 按次数降序
        "min_doc_count": 3   // 过滤出现次数少于 3 次的词(避免长尾词)
      }
    }
  }
}
2. 进阶版:加权热门词(结合点击量与搜索时长)

若需考虑搜索词的“质量”(如用户点击了结果或搜索耗时较短),可通过 sum 聚合加权:

GET /search_logs/_search
{
  "size": 0,
  "query": { "range": { "timestamp": { "gte": "now-7d" } } },
  "aggs": {
    "top_weighted_keywords": {
      "terms": {
        "field": "search_keyword",
        "size": 10,
        "order": { "score": "desc" }  // 按自定义分数排序
      },
      "aggs": {
        "click_count": { "sum": { "field": "is_clicked" } },  // 点击次数(true=1,false=0)
        "avg_duration": { "avg": { "field": "search_duration" } },  // 平均耗时
        "score": {
          "script": {
            "source": "params.click_count * 2 + (1000 - params.avg_duration) * 0.1",  // 自定义分数公式(点击权重更高,耗时越短得分越高)
            "params": {
              "click_count": "sum(is_clicked)",
              "avg_duration": "avg(search_duration)"
            }
          }
        }
      }
    }
  }
}
3. 高基数优化:避免内存溢出

当搜索词数量极大(如每天百万级不同词),terms 聚合可能消耗大量内存。此时可通过 shard_size 参数优化:

{
  "aggs": {
    "top_keywords": {
      "terms": {
        "field": "search_keyword",
        "size": 10,          // 最终返回 10 个词
        "shard_size": 100,   // 每个分片预取 100 个词(提升准确性)
        "min_doc_count": 5   // 过滤低频词
      }
    }
  }
}

四、缓存与实时性优化

直接调用 ES 聚合接口可能因计算耗时影响推荐接口性能(尤其在高并发时)。推荐结合 缓存机制 优化:

1. 定时预计算(推荐方案)
  • 逻辑:定期(如每分钟)执行聚合查询,将结果缓存到 Redis 或 ES 的 hot_keywords 索引中。
  • 优势:减少实时聚合的计算压力,保证推荐接口的低延迟。

示例(Python 定时任务)

import redis
from elasticsearch import Elasticsearch

es = Elasticsearch(["http://es-node-1:9200"])
redis_client = redis.Redis(host="redis-host", port=6379)

def update_hot_keywords():
    # 执行 ES 聚合查询
    response = es.search(
        index="search_logs",
        body={
            "size": 0,
            "query": {"range": {"timestamp": {"gte": "now-7d"}}},
            "aggs": {
                "top_keywords": {
                    "terms": {"field": "search_keyword", "size": 10, "order": {"_count": "desc"}}
                }
            }
        }
    )
    # 提取结果并缓存到 Redis
    hot_keywords = [bucket["key"] for bucket in response["aggregations"]["top_keywords"]["buckets"]]
    redis_client.set("hot_keywords", str(hot_keywords), ex=60)  # 缓存 60 秒

# 每 60 秒执行一次预计算
update_hot_keywords()
2. 实时查询(轻量场景)

若数据量较小或实时性要求极高(如秒级更新),可直接调用 ES 聚合接口,但需限制 sizeshard_size 以降低计算量。

五、推荐接口实现

提供一个 HTTP 接口,从缓存或 ES 读取热门词并返回。示例(使用 Flask):

from flask import Flask, jsonify
import redis

app = Flask(__name__)
redis_client = redis.Redis(host="redis-host", port=6379)

@app.route("/hot_keywords")
def get_hot_keywords():
    # 从 Redis 读取缓存(若存在)
    hot_keywords = redis_client.get("hot_keywords")
    if hot_keywords:
        return jsonify(eval(hot_keywords.decode()))
    
    # 若缓存失效,直接查询 ES(备用逻辑)
    # (此处省略 ES 查询代码,实际应避免高频调用)
    return jsonify(["机器学习", "Python教程", "大数据分析"])  # 默认值

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=5000)

六、监控与调优

1. 监控聚合性能

通过 ES 的 _nodes/stats/indices/aggregations 监控聚合耗时,确保 top_keywords 聚合的执行时间在合理范围(如 < 500ms)。

2. 调整索引配置
  • refresh_interval:若写入量极大,可调整为 30s 减少段合并开销,但会降低实时性。
  • index.codec:使用 best_compression 压缩存储,减少磁盘占用(适合日志类数据)。
3. 处理异常词

通过 exclude 参数过滤无意义词(如“测试”“null”):

{
  "terms": {
    "field": "search_keyword",
    "exclude": ["测试", "null", "undefined"]
  }
}

总结

基于 ES8 实现热门搜索词推荐的核心步骤为:

  1. 数据采集:记录搜索词及关联信息(时间、用户、点击等)。
  2. 索引设计:使用 keyword 类型存储搜索词,优化聚合性能。
  3. 聚合统计:通过 terms 聚合计算词频,结合时间窗口和加权指标。
  4. 缓存优化:定时预计算结果并缓存,降低实时查询压力。
  5. 接口实现:提供 HTTP 接口返回缓存或实时聚合结果。

通过以上步骤,可高效实现一个兼顾实时性与性能的热门搜索词推荐系统。


网站公告

今日签到

点亮在社区的每一天
去签到