多模联邦查询网关:ABP + Trino/Presto 聚合跨源数据 🚀
📚 目录
- 多模联邦查询网关:ABP + Trino/Presto 聚合跨源数据 🚀
0. TL;DR 🧾
- 引擎:Trino 做联邦查询(多 Catalog 单点聚合);网关:ABP 做多租户身份穿透、列掩码/行过滤策略注入、配额与审计、路由(Trino 或源端直连只读)。
- 客户端协议:只用
POST /v1/statement
提交;解析响应体nextUri
循环 GET 拉取;需要取消则 DELETE nextUri;请求头使用标准X-Trino-*
。 - 性能主轴:动态过滤(默认开启) + CBO(Join 重排/分发自动选择,广播上限可控)。
- 治理主轴:Resource Groups 分队列限流;FTE+Exchange 抗节点故障;Worker 优雅下线 保障维护期稳定。
1. 场景与边界 🎯
- 目标:统一查询入口,跨 PostgreSQL(维/主数据)、对象存储(ORC/Parquet on S3/MinIO,经 Hive/Iceberg 编目)、Kafka(事件流)与若干 OLAP 明细/汇总。
- 非目标:不讨论“单库一体化”(如纯 ClickHouse/DuckDB);本文聚焦联邦与网关治理。
2. 总体架构 🧩
ABP 网关(
Abp.FederatedQueryGateway
)- 从
ICurrentTenant/ICurrentUser
获取上下文 → 映射到X-Trino-User
、X-Trino-Source
、X-Trino-Client-Tags
、X-Trino-Session
(如join_reordering_strategy
/join_distribution_type
);必要时加X-Trino-Routing-Group
对接 Trino Gateway。 - 安全:把租户/角色映射为 File-based/OPA 的列掩码与行过滤策略(引擎强制);应用层再做白名单/禁用昂贵模式。
- 审计/配额:事件监听(HTTP/MySQL/Kafka/OpenLineage)做“查询层”审计,ABP 聚合看板与熔断。
- 从
Trino 联邦层
- Coordinator + Workers,多 Catalog(Hive/Iceberg、Postgres、Kafka、JDBC…);Resource Groups 以
source
/clientTags
限流与排队。
- Coordinator + Workers,多 Catalog(Hive/Iceberg、Postgres、Kafka、JDBC…);Resource Groups 以
3. 权限与最小可见(引擎层强制)🔐
3.1 File-based Access Control
etc/access-control.properties
:access-control.name=file
+ JSON 规则(系统信息/对象级/列级规则),支持列掩码与行过滤;系统信息规则里可放行优雅下线所需写权限。
3.2 OPA Access Control
access-control.name=opa
+opa.policy.*
端点;OPA 返回列掩码表达式与行过滤谓词并由 Trino 强制应用。适合把 ABP 角色/租户映射为 Rego 决策。
选择建议:小团队优先 File-based(维护简单);需要统一策略/审计/合规时再上 OPA/Ranger。
4. 路由与优化策略(DF + CBO)⚡
动态过滤 DF(默认开启)
全局关闭:
enable-dynamic-filtering=false
;会话关闭:enable_dynamic_filtering=false
。等待收集(按连接器配置):
- JDBC:
dynamic-filtering.wait-timeout
- Hive:
hive.dynamic-filtering.wait-timeout
- Iceberg:
iceberg.dynamic-filtering.wait-timeout
- JDBC:
作用:在“维表(小)↔ 明细(大)”Join 中,将维表键值作为运行时谓词,下推到扫描/分区裁剪,显著减少 IO 与网络。
CBO & Join 分发/重排
配置(
config.properties
):optimizer.join-reordering-strategy=AUTOMATIC
(会话:join_reordering_strategy
)join-distribution-type=AUTOMATIC
(会话:join_distribution_type
)- 广播上限:
join-max-broadcast-table-size
(会话:join_max_broadcast_table_size
,默认 100MB)
5. ABP 网关实现(.NET,流式 & 可取消) 🧑💻
5.1 客户端协议与重试要点
- 只使用
POST /v1/statement
提交 SQL;解析响应 JSON 的nextUri
循环 GET;需要取消就 DELETEnextUri
。 - 对
429/503/504
做指数退避并尊重Retry-After
;HttpCompletionOption.ResponseHeadersRead
降低大结果集内存占用;CancellationToken
全链路透传。
5.2 流式返回(IAsyncEnumerable<object[]>
)示例
相比“把所有行装进 List 再返回”,边拉取边
yield
不仅省内存,还能更快给到首批结果。取消时会尝试 DELETEnextUri
释放服务端资源。
public class QueryAppService : ApplicationService
{
private readonly IHttpClientFactory _http;
private readonly ICurrentTenant _tenant;
public QueryAppService(IHttpClientFactory http, ICurrentTenant tenant)
{ _http = http; _tenant = tenant; }
[Authorize]
public async IAsyncEnumerable<object[]> ExecuteStreamAsync(
ExecuteQueryInput input,
[System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct = default)
{
var cli = _http.CreateClient("trino");
using var req = new HttpRequestMessage(HttpMethod.Post, "/v1/statement")
{ Content = new StringContent(input.Sql, Encoding.UTF8, "text/plain") };
// 身份穿透 + 策略标签(Resource Groups / 路由组)
req.Headers.TryAddWithoutValidation("X-Trino-User", CurrentUser.UserName ?? "abp");
req.Headers.TryAddWithoutValidation("X-Trino-Source", "abp-fq-gateway");
req.Headers.TryAddWithoutValidation("X-Trino-Client-Tags", $"tenant:{_tenant?.Id},app:abp");
if (!string.IsNullOrEmpty(input.RoutingGroup))
req.Headers.TryAddWithoutValidation("X-Trino-Routing-Group", input.RoutingGroup);
// 会话属性(配置名与会话名区分:连字符 vs 下划线)
req.Headers.TryAddWithoutValidation("X-Trino-Session",
"join_reordering_strategy=AUTOMATIC, join_distribution_type=AUTOMATIC");
string? next = null;
try
{
// 1st POST
using (var resp = await cli.SendAsync(req, HttpCompletionOption.ResponseHeadersRead, ct))
{
resp.EnsureSuccessStatusCode();
var json = await resp.Content.ReadAsStringAsync(ct);
var qr = JsonSerializer.Deserialize<QueryResults>(json, new() { PropertyNameCaseInsensitive = true })!;
next = qr.nextUri;
if (qr.data != null)
foreach (var r in qr.data) yield return r.ToArray();
}
// Follow nextUri
int attempt = 0;
while (!ct.IsCancellationRequested && !string.IsNullOrEmpty(next))
{
using var resp = await cli.GetAsync(next, HttpCompletionOption.ResponseHeadersRead, ct);
if ((int)resp.StatusCode is 429 or 503 or 504)
{
var retry = resp.Headers.RetryAfter?.Delta
?? TimeSpan.FromSeconds(Math.Min(60, Math.Pow(2, attempt++) + Random.Shared.NextDouble()));
await Task.Delay(retry, ct);
continue;
}
resp.EnsureSuccessStatusCode();
var body = await resp.Content.ReadAsStringAsync(ct);
var qr = JsonSerializer.Deserialize<QueryResults>(body, new() { PropertyNameCaseInsensitive = true })!;
if (qr.data != null)
foreach (var r in qr.data) yield return r.ToArray();
next = qr.nextUri;
attempt = 0; // reset on success
}
}
finally
{
// 取消时尽量通知服务端释放资源
if (ct.IsCancellationRequested && !string.IsNullOrEmpty(next))
{
try { await cli.DeleteAsync(next, ct); } catch { /* 忽略 */ }
}
}
}
private sealed record QueryResults(string? id, TrinoColumn[]? columns, object[][]? data, string? nextUri, TrinoError? error);
private sealed record TrinoColumn(string name, string type);
private sealed record TrinoError(string message);
}
可选:再提供一个
StreamToNdjsonAsync(HttpResponse response, ...)
,边拉取边写入 NDJSON/CSV,适合超大结果导出。
5.3 会话默认值托管(可选)
- 用 Session Property Manager 基于条件(
source=abp-fq-gateway
、clientTags=tenant:*
)下发默认会话,减少前端可变性与误配。
6. Catalog 配置样例(S3/MinIO、JDBC、Kafka) 🗂️
对象存储:启用原生 S3(
fs.native-s3.enabled=true
)与s3.*
前缀;Trino 设计支持 S3 兼容系统,但只对 AWS S3 与 MinIO 做过兼容测试,其他厂商需自测。
6.1 etc/catalog/hive.properties
connector.name=hive
hive.metastore.uri=thrift://hms:9083
fs.native-s3.enabled=true
s3.endpoint=http://minio:9000
s3.region=us-east-1
s3.aws-access-key=minio
s3.aws-secret-key=minio123
s3.path-style-access=true
6.2 etc/catalog/postgres.properties
connector.name=postgresql
connection-url=jdbc:postgresql://postgres:5432/appdb
connection-user=app
connection-password=app123
6.3 etc/catalog/kafka.properties
connector.name=kafka
kafka.nodes=kafka:9092
# 文件式表描述(推荐):目录中的 *.json 会被自动发现并映射为表
kafka.table-description-supplier=FILE
kafka.table-description-dir=/etc/kafka/descriptions
# 如需白名单方式可显式指定表名(可选)
# kafka.table-names=tpch.customer,tpch.orders
7. 高可用与容错:FTE + Exchange(必读) 🧯
- 开启 FTE:
retry-policy=QUERY | TASK
。若设为TASK
,必须配置 Exchange Manager(S3/HDFS/ABFS/GCS/兼容端)。 - S3 Filesystem Exchange(示例):
exchange-manager.name=filesystem
exchange.base-directories=s3://trino-exchange
exchange.s3.endpoint=http://minio:9000
exchange.s3.region=us-east-1
exchange.s3.aws-access-key=minio
exchange.s3.aws-secret-key=minio123
8. 资源隔离:Resource Groups(文件或 DB 后端) 🧱
启用(config.properties):
resource-groups.configuration-manager=file resource-groups.config-file=etc/resource-groups.json
最小 JSON(按
source
/clientTags
做队列与权重):
{
"rootGroups": [
{ "name": "global", "softMemoryLimit": "80%", "hardConcurrencyLimit": 100, "schedulingPolicy": "weighted_fair",
"subGroups": [
{ "name": "adhoc", "softMemoryLimit": "30%", "hardConcurrencyLimit": 20, "schedulingWeight": 1 },
{ "name": "etl", "softMemoryLimit": "50%", "hardConcurrencyLimit": 50, "schedulingWeight": 3 },
{ "name": "rt", "softMemoryLimit": "20%", "hardConcurrencyLimit": 30, "schedulingWeight": 2 }
]}
],
"selectors": [
{ "source": "abp-fq-gateway", "clientTags": ["tenant:.*"], "group": "global.adhoc" },
{ "source": "airflow", "clientTags": ["batch"], "group": "global.etl" },
{ "clientTags": ["realtime"], "group": "global.rt" }
]
}
多集群分流可用 Trino Gateway,在请求头携带
X-Trino-Routing-Group
指定路由组(未指定按默认组,如adhoc
)。
9. 部署与“最小可跑”(Compose 片段) 🐳
services:
trino:
image: trinodb/trino:latest
ports: ["8080:8080"]
volumes:
- ./trino/etc:/etc/trino
- ./kafka/descriptions:/etc/kafka/descriptions
depends_on: [hms, minio, postgres]
hms:
image: apache/hive:4.0.0
command: ["bash","-c","/opt/hive/bin/hive --service metastore"]
ports: ["9083:9083"]
minio:
image: minio/minio
command: server /data --console-address ":9001"
environment:
- MINIO_ROOT_USER=minio
- MINIO_ROOT_PASSWORD=minio123
ports: ["9000:9000","9001:9001"]
postgres:
image: postgres:15
environment:
- POSTGRES_PASSWORD=app123
- POSTGRES_USER=app
- POSTGRES_DB=appdb
ports: ["5432:5432"]
10. 基准与用例 🧪
- Join(维↔明细):Postgres × Hive(Iceberg) TopN/计数/去重窗口;观察 DF 命中与 Join 分发。
- REST 压测(
nextUri
协议):首包后循环 GET,带X-Trino-Client-Tags
与默认会话;统计statementStats
。
11. 成本与时延对比 📈
- A:Trino 联邦(DF 默认开 + CBO 自动分发/重排,必要时
enable_large_dynamic_filters
)。 - B:源侧拆分查询 + ABP 聚合(网络/扫描对照)。
- C:明细沉入专用 OLAP(对照)。
- 输出 雷达图/表格:选择度 × 并发 × 维表大小 →
p50/p95/p99
、扫描字节/行、remote reads。
12. 运维与关停 🛠️
OpenMetrics:Prometheus 抓
/metrics
(如trino_execution_*
、trino_memory_*
)。OpenTelemetry:
tracing.enabled=true
+ 导出端点,串起查询链路。优雅下线(仅 Worker):
curl -v -X PUT -d '"SHUTTING_DOWN"' -H "Content-type: application/json" http://worker:8081/v1/info/state
提示:若启用 File-based System 信息规则,需放行该写操作。
13. 常见坑位 Checklist ✅
- 不要轮询
/v1/query
;严格用/v1/statement
+nextUri
;对 429/503/504 退避并尊重Retry-After
;取消时 DELETEnextUri
。 - DF 默认开启;按连接器设置等待超时键位(JDBC/Hive/Iceberg)。
- CBO 属性名:配置(
optimizer.join-reordering-strategy
、join-distribution-type
、join-max-broadcast-table-size
) vs 会话(join_reordering_strategy
、join_distribution_type
、join_max_broadcast_table_size
)。 - FTE:
retry-policy=TASK
⇒ 必须配置 Exchange;S3/MinIO 走 filesystem exchange。 - Resource Groups:启用键为
resource-groups.configuration-manager
与resource-groups.config-file
,selectors 用source
/clientTags
。 - S3 原生:
fs.native-s3.enabled=true
+s3.*
;仅 AWS S3 / MinIO 做过兼容测试(其他需自测)。 - Kafka 文件式描述:启用
kafka.table-description-supplier=FILE
后,table-description-dir
下的*.json
会自动发现为表(可不填kafka.table-names
)。 - Gateway 分流:多集群可带
X-Trino-Routing-Group
;未显式指定按 Gateway 默认组(常为adhoc
)。