高性能实时分析数据库:Apache Druid 查询最新值 Query for latest values

发布于:2025-08-03 ⋅ 阅读:(14) ⋅ 点赞:(0)

文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。

Apache Druid
Druid 是一款高性能实时分析数据库,主要价值在于缩短洞察和行动的时间。Druid 专为需要快速查询和数据采集的工作流而设计。Druid 擅长于支持 UI、运行操作型(即席)查询或处理高并发性。您可以将 Druid 视为适用于各种用例的数据仓库的开源替代方案。

查询最新值

Query for latest values

本教程介绍 Apache Druid 中可替代其他数据库 UPSERT 的使用场景策略。你可以在查询时使用 LATEST_BY 聚合,或在插入时对数值维度使用“delta”方式。

更新数据 教程展示了如何基于时间戳使用批处理更新数据,包括 UPSERT 场景。然而,对于流数据,你也可以使用 LATEST_BY 或 delta 来满足原本需要更新的需求。

前置条件

在开始本教程前,请按 本地快速开始 下载并启动 Druid。无需向 Druid 集群加载任何数据。

你应已熟悉 Druid 的数据查询。如果尚未完成,请先阅读 查询数据 教程。

使用 LATEST_BY 获取更新后的值

有时,你希望读取某个维度或度量相对于另一个维度的最新值。在事务型数据库中,你可能会用 UPSERT 维护维度或度量;而在 Druid 中,你可以在摄入时追加所有更新或变更。LATEST_BY 函数允许你通过如下查询获取该维度的最新值:

SELECT dimension,
       LATEST_BY(changed_dimension, updated_timestamp)
FROM my_table
GROUP BY 1

示例中 update_timestamp 为用于判定“最新”值的参考时间戳,可以是 __time 或其他时间戳。

例如,考虑以下记录用户总积分的事件表:

__time user_id points
2024-01-01T01:00:00.000Z funny_bunny1 10
2024-01-01T01:05:00.000Z funny_bunny1 30
2024-01-01T02:00:00.000Z funny_bunny1 35
2024-01-01T02:00:00.000Z silly_monkey2 30
2024-01-01T02:05:00.000Z silly_monkey2 55
2024-01-01T03:00:00.000Z funny_bunny1 40

在 Druid Web 控制台中,进入 查询 视图并运行以下查询以插入示例数据:

REPLACE INTO "latest_by_tutorial1" OVERWRITE ALL
WITH "ext" AS (
  SELECT *
  FROM TABLE(
    EXTERN(
     '{"type":"inline","data":"{\"timestamp\":\"2024-01-01T01:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":10}\n{\"timestamp\":\"2024-01-01T01:05:00Z\",\"user_id\":\"funny_bunny1\", \"points\":30}\n{\"timestamp\": \"2024-01-01T02:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":35}\n{\"timestamp\":\"2024-01-01T02:00:00Z\",\"user_id\":\"silly_monkey2\", \"points\":30}\n{\"timestamp\":\"2024-01-01T02:05:00Z\",\"user_id\":\"silly_monkey2\", \"points\":55}\n{\"timestamp\":\"2024-01-01T03:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":40}"}',
     '{"type":"json"}'
    )
  ) EXTEND ("timestamp" VARCHAR, "user_id" VARCHAR, "points" BIGINT)
)
SELECT
  TIME_PARSE("timestamp") AS "__time",
  "user_id",
  "points"
FROM "ext"
PARTITIONED BY DAY

运行以下查询获取每个 user_id 的最新 points 值:

SELECT user_id,
     LATEST_BY("points", "__time") AS latest_points
FROM latest_by_tutorial1
GROUP BY 1

结果如下:

user_id total_points
silly_monkey2 55
funny_bunny1 40

示例中数值每次递增,但即使数值波动,此方法依然有效。

你可以将此查询结构作为子查询进一步处理。但若 user_id 值过多,查询开销可能很大。

若需在更大粒度时间范围内跟踪不同时间点的最新值,需要额外时间戳记录更新时间,以便 Druid 追踪最新版本。考虑以下数据,用户在 1 小时内积分更新,__time 为小时粒度,updated_timestamp 为分钟粒度:

__time updated_timestamp user_id points
2024-01-01T01:00:00.000Z 2024-01-01T01:00:00.000Z funny_bunny1 10
2024-01-01T01:00:00.000Z 2024-01-01T01:05:00.000Z funny_bunny1 30
2024-01-01T02:00:00.000Z 2024-01-01T02:00:00.000Z funny_bunny1 35
2024-01-01T02:00:00.000Z 2024-01-01T02:00:00.000Z silly_monkey2 30
2024-01-01T02:00:00.000Z 2024-01-01T02:05:00.000Z silly_monkey2 55
2024-01-01T03:00:00.000Z 2024-01-01T03:00:00.000Z funny_bunny1 40

查询 视图中打开新标签页并运行以下查询以插入示例数据:

REPLACE INTO "latest_by_tutorial2" OVERWRITE ALL
WITH "ext" AS (
  SELECT *
  FROM TABLE(
    EXTERN(
     '{"type":"inline","data":"{\"timestamp\":\"2024-01-01T01:00:00Z\",\"updated_timestamp\":\"2024-01-01T01:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":10}\n{\"timestamp\":\"2024-01-01T01:05:00Z\",\"updated_timestamp\":\"2024-01-01T01:05:00Z\",\"user_id\":\"funny_bunny1\", \"points\":30}\n{\"timestamp\": \"2024-01-01T02:00:00Z\",\"updated_timestamp\":\"2024-01-01T02:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":35}\n{\"timestamp\":\"2024-01-01T02:00:00Z\",\"updated_timestamp\":\"2024-01-01T02:00:00Z\",\"user_id\":\"silly_monkey2\", \"points\":30}\n{\"timestamp\":\"2024-01-01T02:00:00Z\",\"updated_timestamp\":\"2024-01-01T02:05:00Z\",\"user_id\":\"silly_monkey2\", \"points\":55}\n{\"timestamp\":\"2024-01-01T03:00:00Z\",\"updated_timestamp\":\"2024-01-01T03:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":40}"}',
     '{"type":"json"}'
    )
  ) EXTEND ("timestamp" VARCHAR, "updated_timestamp" VARCHAR, "user_id" VARCHAR, "points" BIGINT)
)
SELECT
  TIME_PARSE("timestamp") AS "__time",
  "updated_timestamp",
  "user_id",
  "points"
FROM "ext"
PARTITIONED BY DAY

运行以下查询按小时获取每个用户的最新积分:

SELECT FLOOR("__time" TO HOUR) AS "hour_time",
      "user_id",
       LATEST_BY("points", TIME_PARSE(updated_timestamp)) AS "latest_points_hour"
FROM latest_by_tutorial2
GROUP BY 1,2

结果如下:

hour_time user_id latest_points_hour
2024-01-01T01:00:00.000Z funny_bunny1 20
2024-01-01T02:00:00.000Z funny_bunny1 5
2024-01-01T02:00:00.000Z silly_monkey2 25
2024-01-01T03:00:00.000Z funny_bunny1 10

LATEST_BY 是聚合函数。当匹配某维度(如 user_id)的更新行不多时效率很高,但若有大量更新(如用户游戏百万次且更新无序),Druid 需扫描所有匹配行以找出最大时间戳的最新数据。

例如,若更新占数据量 1–5%,查询性能良好;若更新占比达 50% 以上,查询将变慢。

缓解方法之一是设置周期性批处理摄入任务,将修改数据重索引为新数据源,直接查询无需分组,通过预计算并存储最新值降低查询成本。注意,直到下次刷新前,最新数据视图不会实时更新。

另一种方法是在摄入时使用 LATEST_BY 进行聚合,并通过流式摄入将更新追加到已汇总数据源。追加到时间块会新增段,无法完美汇总,行可能为部分汇总而非完整,且可能存在多个部分汇总行。此时仍需用 GROUP BY 查询正确查询已汇总数据源。可通过调优自动压缩显著减少过时行数并提升性能。

使用 delta 值和聚合获取更新后的值

与其在事件中追加最新总值,不如记录每次事件的增量值,并使用常用聚合器。这可能避免查询中的一层聚合与分组。

多数场景下,可直接将事件数据发送至 Druid 而无需预处理。例如发送展示次数至 Druid 时,不要发送自昨日以来的总展示次数,仅发送最新展示次数即可,然后在查询时用 Druid 汇总总计。Druid 针对大量行累加优化,这对习惯批处理或预聚合数据的人来说可能反直觉。

例如,某数据源按维度 x 分组,对度量列 y 使用 SUM 聚合。若要将 y 从 3 更新为 2,则插入 y 值为 -1,这样查询按 x 分组的 SUM(y) 即正确。这可能带来显著性能优势,但代价是聚合必须始终为 SUM。

其他场景下,数据更新可能已是原数据的 delta,数据工程仅需简单追加更新。性能影响缓解同上例:摄入时结合持续自动压缩使用汇总。

例如,考虑以下事件表,记录某时段用户获得或失去的积分:

__time user_id delta
2024-01-01T01:00:00.000Z funny_bunny1 10
2024-01-01T01:05:00.000Z funny_bunny1 10
2024-01-01T02:00:00.000Z funny_bunny1 5
2024-01-01T02:00:00.000Z silly_monkey2 30
2024-01-01T02:05:00.000Z silly_monkey2 -5
2024-01-01T03:00:00.000Z funny_bunny1 10

查询 视图中打开新标签页并运行以下查询以插入示例数据:

REPLACE INTO "delta_tutorial" OVERWRITE ALL
WITH "ext" AS (
  SELECT *
  FROM TABLE(
    EXTERN(
     '{"type":"inline","data":"{\"timestamp\":\"2024-01-01T01:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":10}\n{\"timestamp\":\"2024-01-01T01:05:00Z\",\"user_id\":\"funny_bunny1\", \"points\":10}\n{\"timestamp\": \"2024-01-01T02:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":5}\n{\"timestamp\":\"2024-01-01T02:00:00Z\",\"user_id\":\"silly_monkey2\", \"points\":30}\n{\"timestamp\":\"2024-01-01T02:05:00Z\",\"user_id\":\"silly_monkey2\", \"points\":-5}\n{\"timestamp\":\"2024-01-01T03:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":10}"}',
     '{"type":"json"}'
    )
  ) EXTEND ("timestamp" VARCHAR, "user_id" VARCHAR, "points" BIGINT)
)
SELECT
  TIME_PARSE("timestamp") AS "__time",
  "user_id",
  "points" AS "delta"
FROM "ext"
PARTITIONED BY DAY

以下查询返回与第二个 LATEST_BY 示例相同的每小时积分:

SELECT FLOOR("__time" TO HOUR) as "hour_time",
       "user_id",
       SUM("delta") AS "latest_points_hour"
FROM "delta_tutorial"
GROUP BY 1,2

了解更多

参见以下主题获取更多信息:

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。


网站公告

今日签到

点亮在社区的每一天
去签到