Apache Flink 的 Table API 是 Flink 提供的一种高级抽象,用于以声明式方式处理批处理和流处理数据。它是基于关系模型的 API,用户可以像编写 SQL 一样,以简洁、类型安全的方式编写数据处理逻辑。
一、基本概念
1. 什么是 Table API?
Table API 是 Flink 中用于处理结构化数据的 关系型编程接口,它支持:
批处理(Batch)
流处理(Streaming)
Table API 提供了类似 SQL 的语法风格,但以函数式 API 方式表达,具备更好的类型安全和 IDE 友好性。
二、核心组件
1. Table
Flink 中的
Table
是对结构化数据的一种抽象。相当于数据库中的表,可以进行过滤、聚合、连接等操作。
2. TableEnvironment
Table API 的执行上下文。
创建表、注册 UDF、执行 SQL/Table API 操作等都依赖它。
3. Schema(模式)
定义表结构,包括字段名、数据类型、主键、水位线等。
三、编程模型
// 1. 创建 TableEnvironment
EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 2. 注册表(从外部数据源)
tableEnv.executeSql("""
CREATE TABLE source_table (
id STRING,
ts TIMESTAMP(3),
val INT,
WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'test',
...
)
""");
// 3. 使用 Table API 处理数据
Table result = tableEnv.from("source_table")
.filter($("val").isGreater(10))
.groupBy($("id"))
.select($("id"), $("val").avg().as("avg_val"));
// 4. 输出结果到目标表
tableEnv.executeSql("""
CREATE TABLE sink_table (
id STRING,
avg_val DOUBLE
) WITH (
'connector' = 'print'
)
""");
result.executeInsert("sink_table");
四、常用操作
操作类型 | 示例 |
---|---|
过滤 | table.filter($("age").isGreater(18)) |
投影 | table.select($("name"), $("age")) |
聚合 | table.groupBy($("city")).select($("city"), $("salary").avg()) |
连接 | table1.join(table2).where(...).select(...) |
去重 | table.distinct() |
排序 | table.orderBy($("time").desc()) |
窗口 | table.window(...) (见下文) |
五、时间和窗口支持
Table API 支持两种时间语义:
事件时间(Event Time)
处理时间(Processing Time)
常见的窗口类型:
滚动窗口(Tumble)
滑动窗口(Slide)
会话窗口(Session)
示例:
table.window(Tumble.over(lit(10).minutes()).on($("ts")).as("w"))
.groupBy($("id"), $("w"))
.select($("id"), $("w").start(), $("val").sum());
六、与 SQL 的关系
Table API 与 SQL 是等价的抽象:
SQL 更加声明式,适合数据分析人员;
Table API 更加灵活、支持编程逻辑,适合开发者。
两者可以混合使用,例如:
Table result = tableEnv.sqlQuery("SELECT id, COUNT(*) FROM source GROUP BY id");
七、数据源和连接器支持
Table API 支持多种数据源和 sink,通过 Flink Connector 实现:
常见的:
Kafka
HDFS
MySQL / JDBC
Elasticsearch
Hive
Iceberg / Delta / Hudi
Redis 等
通过 SQL 创建表:
CREATE TABLE example (
...
) WITH (
'connector' = 'kafka',
...
);
八、UDF 扩展
可以定义自定义函数:
ScalarFunction(标量函数)
TableFunction(表函数)
AggregateFunction(聚合函数)
TableAggregateFunction(表聚合函数)
示例:
public class HashCode extends ScalarFunction {
public int eval(String s) {
return s.hashCode();
}
}
tableEnv.createTemporarySystemFunction("HashCode", HashCode.class);
table.select(call("HashCode", $("name")));
九、批与流统一
Flink 的 Table API 实现了 批流统一语义,相同的 API 可用于处理批或流数据,只需切换 EnvironmentSettings
即可。
十、优点总结
统一的 API:批流统一,开发逻辑一致
类型安全:Java/Scala 函数式风格,IDE 友好
与 SQL 无缝切换
可插拔的连接器与格式支持
强大的时间与窗口语义支持
与 Flink Runtime 深度整合,性能高效