文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。
Druid 是一款高性能实时分析数据库,主要价值在于缩短洞察和行动的时间。Druid 专为需要快速查询和数据采集的工作流而设计。Druid 擅长于支持 UI、运行操作型(即席)查询或处理高并发性。您可以将 Druid 视为适用于各种用例的数据仓库的开源替代方案。
查询最新值
Query for latest values
本教程介绍 Apache Druid 中可替代其他数据库 UPSERT 的使用场景策略。你可以在查询时使用 LATEST_BY
聚合,或在插入时对数值维度使用“delta”方式。
更新数据 教程展示了如何基于时间戳使用批处理更新数据,包括 UPSERT 场景。然而,对于流数据,你也可以使用 LATEST_BY
或 delta 来满足原本需要更新的需求。
前置条件
在开始本教程前,请按 本地快速开始 下载并启动 Druid。无需向 Druid 集群加载任何数据。
你应已熟悉 Druid 的数据查询。如果尚未完成,请先阅读 查询数据 教程。
使用 LATEST_BY 获取更新后的值
有时,你希望读取某个维度或度量相对于另一个维度的最新值。在事务型数据库中,你可能会用 UPSERT 维护维度或度量;而在 Druid 中,你可以在摄入时追加所有更新或变更。LATEST_BY
函数允许你通过如下查询获取该维度的最新值:
SELECT dimension,
LATEST_BY(changed_dimension, updated_timestamp)
FROM my_table
GROUP BY 1
示例中 update_timestamp
为用于判定“最新”值的参考时间戳,可以是 __time
或其他时间戳。
例如,考虑以下记录用户总积分的事件表:
__time |
user_id |
points |
---|---|---|
2024-01-01T01:00:00.000Z |
funny_bunny1 |
10 |
2024-01-01T01:05:00.000Z |
funny_bunny1 |
30 |
2024-01-01T02:00:00.000Z |
funny_bunny1 |
35 |
2024-01-01T02:00:00.000Z |
silly_monkey2 |
30 |
2024-01-01T02:05:00.000Z |
silly_monkey2 |
55 |
2024-01-01T03:00:00.000Z |
funny_bunny1 |
40 |
在 Druid Web 控制台中,进入 查询 视图并运行以下查询以插入示例数据:
REPLACE INTO "latest_by_tutorial1" OVERWRITE ALL
WITH "ext" AS (
SELECT *
FROM TABLE(
EXTERN(
'{"type":"inline","data":"{\"timestamp\":\"2024-01-01T01:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":10}\n{\"timestamp\":\"2024-01-01T01:05:00Z\",\"user_id\":\"funny_bunny1\", \"points\":30}\n{\"timestamp\": \"2024-01-01T02:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":35}\n{\"timestamp\":\"2024-01-01T02:00:00Z\",\"user_id\":\"silly_monkey2\", \"points\":30}\n{\"timestamp\":\"2024-01-01T02:05:00Z\",\"user_id\":\"silly_monkey2\", \"points\":55}\n{\"timestamp\":\"2024-01-01T03:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":40}"}',
'{"type":"json"}'
)
) EXTEND ("timestamp" VARCHAR, "user_id" VARCHAR, "points" BIGINT)
)
SELECT
TIME_PARSE("timestamp") AS "__time",
"user_id",
"points"
FROM "ext"
PARTITIONED BY DAY
运行以下查询获取每个 user_id
的最新 points
值:
SELECT user_id,
LATEST_BY("points", "__time") AS latest_points
FROM latest_by_tutorial1
GROUP BY 1
结果如下:
user_id |
total_points |
---|---|
silly_monkey2 |
55 |
funny_bunny1 |
40 |
示例中数值每次递增,但即使数值波动,此方法依然有效。
你可以将此查询结构作为子查询进一步处理。但若 user_id
值过多,查询开销可能很大。
若需在更大粒度时间范围内跟踪不同时间点的最新值,需要额外时间戳记录更新时间,以便 Druid 追踪最新版本。考虑以下数据,用户在 1 小时内积分更新,__time
为小时粒度,updated_timestamp
为分钟粒度:
__time |
updated_timestamp |
user_id |
points |
---|---|---|---|
2024-01-01T01:00:00.000Z |
2024-01-01T01:00:00.000Z |
funny_bunny1 |
10 |
2024-01-01T01:00:00.000Z |
2024-01-01T01:05:00.000Z |
funny_bunny1 |
30 |
2024-01-01T02:00:00.000Z |
2024-01-01T02:00:00.000Z |
funny_bunny1 |
35 |
2024-01-01T02:00:00.000Z |
2024-01-01T02:00:00.000Z |
silly_monkey2 |
30 |
2024-01-01T02:00:00.000Z |
2024-01-01T02:05:00.000Z |
silly_monkey2 |
55 |
2024-01-01T03:00:00.000Z |
2024-01-01T03:00:00.000Z |
funny_bunny1 |
40 |
在 查询 视图中打开新标签页并运行以下查询以插入示例数据:
REPLACE INTO "latest_by_tutorial2" OVERWRITE ALL
WITH "ext" AS (
SELECT *
FROM TABLE(
EXTERN(
'{"type":"inline","data":"{\"timestamp\":\"2024-01-01T01:00:00Z\",\"updated_timestamp\":\"2024-01-01T01:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":10}\n{\"timestamp\":\"2024-01-01T01:05:00Z\",\"updated_timestamp\":\"2024-01-01T01:05:00Z\",\"user_id\":\"funny_bunny1\", \"points\":30}\n{\"timestamp\": \"2024-01-01T02:00:00Z\",\"updated_timestamp\":\"2024-01-01T02:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":35}\n{\"timestamp\":\"2024-01-01T02:00:00Z\",\"updated_timestamp\":\"2024-01-01T02:00:00Z\",\"user_id\":\"silly_monkey2\", \"points\":30}\n{\"timestamp\":\"2024-01-01T02:00:00Z\",\"updated_timestamp\":\"2024-01-01T02:05:00Z\",\"user_id\":\"silly_monkey2\", \"points\":55}\n{\"timestamp\":\"2024-01-01T03:00:00Z\",\"updated_timestamp\":\"2024-01-01T03:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":40}"}',
'{"type":"json"}'
)
) EXTEND ("timestamp" VARCHAR, "updated_timestamp" VARCHAR, "user_id" VARCHAR, "points" BIGINT)
)
SELECT
TIME_PARSE("timestamp") AS "__time",
"updated_timestamp",
"user_id",
"points"
FROM "ext"
PARTITIONED BY DAY
运行以下查询按小时获取每个用户的最新积分:
SELECT FLOOR("__time" TO HOUR) AS "hour_time",
"user_id",
LATEST_BY("points", TIME_PARSE(updated_timestamp)) AS "latest_points_hour"
FROM latest_by_tutorial2
GROUP BY 1,2
结果如下:
hour_time |
user_id |
latest_points_hour |
---|---|---|
2024-01-01T01:00:00.000Z |
funny_bunny1 |
20 |
2024-01-01T02:00:00.000Z |
funny_bunny1 |
5 |
2024-01-01T02:00:00.000Z |
silly_monkey2 |
25 |
2024-01-01T03:00:00.000Z |
funny_bunny1 |
10 |
LATEST_BY
是聚合函数。当匹配某维度(如 user_id
)的更新行不多时效率很高,但若有大量更新(如用户游戏百万次且更新无序),Druid 需扫描所有匹配行以找出最大时间戳的最新数据。
例如,若更新占数据量 1–5%,查询性能良好;若更新占比达 50% 以上,查询将变慢。
缓解方法之一是设置周期性批处理摄入任务,将修改数据重索引为新数据源,直接查询无需分组,通过预计算并存储最新值降低查询成本。注意,直到下次刷新前,最新数据视图不会实时更新。
另一种方法是在摄入时使用 LATEST_BY
进行聚合,并通过流式摄入将更新追加到已汇总数据源。追加到时间块会新增段,无法完美汇总,行可能为部分汇总而非完整,且可能存在多个部分汇总行。此时仍需用 GROUP BY 查询正确查询已汇总数据源。可通过调优自动压缩显著减少过时行数并提升性能。
使用 delta 值和聚合获取更新后的值
与其在事件中追加最新总值,不如记录每次事件的增量值,并使用常用聚合器。这可能避免查询中的一层聚合与分组。
多数场景下,可直接将事件数据发送至 Druid 而无需预处理。例如发送展示次数至 Druid 时,不要发送自昨日以来的总展示次数,仅发送最新展示次数即可,然后在查询时用 Druid 汇总总计。Druid 针对大量行累加优化,这对习惯批处理或预聚合数据的人来说可能反直觉。
例如,某数据源按维度 x
分组,对度量列 y
使用 SUM 聚合。若要将 y
从 3 更新为 2,则插入 y
值为 -1,这样查询按 x
分组的 SUM(y)
即正确。这可能带来显著性能优势,但代价是聚合必须始终为 SUM。
其他场景下,数据更新可能已是原数据的 delta,数据工程仅需简单追加更新。性能影响缓解同上例:摄入时结合持续自动压缩使用汇总。
例如,考虑以下事件表,记录某时段用户获得或失去的积分:
__time |
user_id |
delta |
---|---|---|
2024-01-01T01:00:00.000Z |
funny_bunny1 |
10 |
2024-01-01T01:05:00.000Z |
funny_bunny1 |
10 |
2024-01-01T02:00:00.000Z |
funny_bunny1 |
5 |
2024-01-01T02:00:00.000Z |
silly_monkey2 |
30 |
2024-01-01T02:05:00.000Z |
silly_monkey2 |
-5 |
2024-01-01T03:00:00.000Z |
funny_bunny1 |
10 |
在 查询 视图中打开新标签页并运行以下查询以插入示例数据:
REPLACE INTO "delta_tutorial" OVERWRITE ALL
WITH "ext" AS (
SELECT *
FROM TABLE(
EXTERN(
'{"type":"inline","data":"{\"timestamp\":\"2024-01-01T01:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":10}\n{\"timestamp\":\"2024-01-01T01:05:00Z\",\"user_id\":\"funny_bunny1\", \"points\":10}\n{\"timestamp\": \"2024-01-01T02:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":5}\n{\"timestamp\":\"2024-01-01T02:00:00Z\",\"user_id\":\"silly_monkey2\", \"points\":30}\n{\"timestamp\":\"2024-01-01T02:05:00Z\",\"user_id\":\"silly_monkey2\", \"points\":-5}\n{\"timestamp\":\"2024-01-01T03:00:00Z\",\"user_id\":\"funny_bunny1\", \"points\":10}"}',
'{"type":"json"}'
)
) EXTEND ("timestamp" VARCHAR, "user_id" VARCHAR, "points" BIGINT)
)
SELECT
TIME_PARSE("timestamp") AS "__time",
"user_id",
"points" AS "delta"
FROM "ext"
PARTITIONED BY DAY
以下查询返回与第二个 LATEST_BY 示例相同的每小时积分:
SELECT FLOOR("__time" TO HOUR) as "hour_time",
"user_id",
SUM("delta") AS "latest_points_hour"
FROM "delta_tutorial"
GROUP BY 1,2
了解更多
参见以下主题获取更多信息:
风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。