Apache Flink 提供了强大的 Table API 和 SQL 接口,用于统一处理批数据和流数据。它们为开发者提供了类 SQL 的编程方式,简化了复杂的数据处理逻辑,并支持与外部系统集成。
🧩 一、Flink Table & SQL 核心概念
概念 |
描述 |
Table API |
基于 Java/Scala 的 DSL,提供类型安全的操作接口 |
Flink SQL |
支持标准 ANSI SQL 语法的查询语言 |
DataStream / DataSet ↔ Table |
可以在 DataStream 或 Table 之间互相转换 |
Catalog |
元数据管理器,如 Hive Catalog、Memory Catalog |
TableEnvironment |
管理表、SQL 执行环境的核心类 |
Connectors |
支持 Kafka、Hive、MySQL、文件等数据源接入 |
Time Attributes |
定义事件时间(Event Time)、处理时间(Processing Time) |
Windowing |
支持滚动窗口、滑动窗口、会话窗口等 |
💻 二、Flink Table API 和 SQL 的优势
特性 |
描述 |
统一接口 |
同一套代码可运行在 Batch 和 Streaming 场景下 |
高性能 |
底层使用 Apache Calcite 进行优化,自动进行查询优化 |
易用性强 |
对熟悉 SQL 的用户非常友好 |
生态集成好 |
支持 Kafka、Hive、JDBC、Elasticsearch 等多种数据源 |
状态管理 |
在流式场景中自动管理状态和窗口逻辑 |
📦 三、核心组件说明
1. TableEnvironment
- 是操作 Table 和 SQL 的入口
- 负责注册表、执行查询、管理元数据等
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
2. DataStream ↔ Table
转换
示例:DataStream 转 Table
DataStream<Tuple2<String, Integer>> dataStream = env.fromElements(
Tuple2.of("a", 1), Tuple2.of("b", 2));
Table table = tEnv.fromDataStream(dataStream);
tEnv.createTemporaryView("myTable", dataStream);
示例:Table 转 DataStream
Table resultTable = tEnv.sqlQuery("SELECT * FROM myTable WHERE f1 > 1");
DataStream<Row> resultStream = tEnv.toDataStream(resultTable);
3. Flink SQL 查询
示例:使用 SQL 查询统计结果
tEnv.executeSql(
"CREATE TABLE MyKafkaSource (" +
" user STRING," +
" url STRING," +
" ts BIGINT" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'format' = 'json'" +
")"
);
Table result = tEnv.sqlQuery("SELECT user, COUNT(*) AS cnt FROM MyKafkaSource GROUP BY user");
tEnv.toDataStream(result).print();
env.execute();
🧪 四、Java 示例:完整的 Table API + SQL 使用案例
✅ 功能:
从 Kafka 读取日志数据,按用户分组统计访问次数
📁 依赖建议(pom.xml)
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.17.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.17.1</version>
</dependency>
</dependencies>
🧱 五、完整 Java 示例代码
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkTableAndSQLEntry {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(
"CREATE TABLE KafkaLog (" +
" user STRING," +
" url STRING," +
" ts BIGINT" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'user_log'," +
" 'properties.bootstrap.servers' = 'localhost:9092'," +
" 'properties.group.id' = 'flink-sql-group'," +
" 'format' = 'json'" +
")"
);
tEnv.executeSql(
"CREATE TABLE ConsoleSink (" +
" user STRING," +
" cnt BIGINT" +
") WITH (" +
" 'connector' = 'print'" +
")"
);
tEnv.executeSql(
"INSERT INTO ConsoleSink " +
"SELECT user, COUNT(*) AS cnt " +
"FROM KafkaLog " +
"GROUP BY user"
);
}
}
📊 六、SQL 查询示例汇总
SQL 示例 |
描述 |
SELECT * FROM table |
查询所有字段 |
SELECT user, COUNT(*) FROM table GROUP BY user |
分组聚合 |
SELECT * FROM table WHERE ts > 1000 |
条件过滤 |
SELECT TUMBLE_END(ts, INTERVAL '5' SECOND), COUNT(*) ... |
时间窗口聚合 |
SELECT * FROM LATERAL TABLE(udtf(col)) |
使用 UDTF |
CREATE VIEW view_name AS SELECT ... |
创建视图 |
INSERT INTO sink_table SELECT ... |
写入到目标表 |
⏱️ 七、时间属性与窗口聚合
示例:定义事件时间并使用滚动窗口
CREATE TABLE EventTable (
user STRING,
url STRING,
ts BIGINT,
WATERMARK FOR ts AS ts - 1000
) WITH (...);
SELECT
TUMBLE_END(ts, INTERVAL '5' SECOND) AS window_end,
user,
COUNT(*) AS cnt
FROM EventTable
GROUP BY TUMBLE(ts, INTERVAL '5' SECOND), user;
📁 八、连接器(Connector)配置示例
1. Kafka Source
CREATE TABLE KafkaSource (
user STRING,
url STRING,
ts BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'input-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-sql-group',
'format' = 'json'
);
2. MySQL Sink
CREATE TABLE MysqlSink (
user STRING,
cnt BIGINT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/mydb',
'table-name' = 'user_access_log'
);
📈 九、Flink SQL + Table API 的典型应用场景
场景 |
示例 |
实时 ETL |
从 Kafka 读取数据 → 清洗 → 写入 HDFS |
流式分析 |
统计每分钟点击量、异常检测 |
数据质量监控 |
判断字段是否为空、格式是否合法 |
风控规则引擎 |
使用 CEP 检测异常行为 |
数仓建模 |
构建 DWD、DWS 层表结构 |
🧠 十、Table API vs SQL
特性 |
Table API |
SQL |
语法风格 |
函数式链式调用 |
类 SQL 语法 |
易用性 |
对 Java 开发者更友好 |
对 SQL 用户更友好 |
动态解析 |
不适合动态 SQL |
支持字符串拼接、模板引擎 |
性能 |
一致(底层都是 Calcite) |
一致 |
支持功能 |
大部分 SQL 功能都有对应 API |
支持完整 SQL 语法 |
调试难度 |
相对较难调试 |
更直观、便于调试 |
✅ 十一、总结
技术点 |
描述 |
Table API |
基于 Java/Scala 的函数式 API |
Flink SQL |
支持 ANSI SQL,易于上手 |
TableEnvironment |
管理表和 SQL 的核心类 |
Connectors |
支持 Kafka、Hive、JDBC、File、Print 等 |
Time Attributes |
支持事件时间、处理时间 |
Windowing |
支持滚动、滑动、会话窗口 |
State Backend |
支持 RocksDB、FS、Memory 状态后端 |
🧩 十二、扩展学习方向
如果你希望我为你演示以下内容,请继续提问:
- 自定义函数(UDF、UDAF、UDTF)
- Kafka + MySQL 实时同步方案
- 基于 Hive 的批处理 SQL 作业
- 使用 PyFlink 实现 SQL 作业
- 使用
WITH
子句定义临时表
- 使用
LATERAL TABLE
调用 UDTF
- 使用
MATCH_RECOGNIZE
实现 CEP 模式匹配
📌 一句话总结:
Flink Table API 和 SQL 提供了一种统一的批流一体编程模型,适合数据仓库、实时分析、ETL、风控等多种大数据处理场景。