FlinkSQL通解

发布于:2025-07-14 ⋅ 阅读:(15) ⋅ 点赞:(0)

参考文档

https://blog.csdn.net/be_racle/article/details/135921061?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522604e8b91e59f598cb3c69ae05c0628f7%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=604e8b91e59f598cb3c69ae05c0628f7&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2allfirst_rank_ecpm_v1~rank_v31_ecpm-20-135921061-null-null.142v101pc_search_result_base6&utm_term=FlinkSQL&spm=1018.2226.3001.4187

1. 基本原理

2. 编码套路

使用Flink SQL时,我们通常会遵循如下编码套路,这些套路和使用Flink API的套路是一样的:

  • 环境准备:初始化一个TableEnvironment对象,它是执行Flink SQL语句的核心。这个环境可以是流数据环境,也可以是批数据环境。

  • 数据源定义:通过CREATE TABLE语句定义输入数据源(source),可以是Kafka、CSV文件等。

  • 数据处理:编写SQL语句对数据进行处理,如查询、过滤、聚合等。

  • 数据输出:通过CREATE TABLE定义输出数据源(sink),并将处理结果输出。

3、相关语法

Create关键字

Create语句用于向当前或指定的Catalog中注册库、表、视图或者函数,注册后的库、表、视图可以在后期的SQL查询中进行使用。

目前FlinkSQL中支持以下Create语句:

  • Create Table

  • Create DataBase

  • Create View

  • Create Function

官方定义

根据指定的表名创建一个表,如果同名表已经在CataLog中存在了,则无法创建。

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] ]
   
<physical_column_definition>: -- 物理列定义
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]
  
<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<metadata_column_definition>:
  column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<source_table>:
  [catalog_name.][db_name.]table_name

<like_options>:
{
   { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
 | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } 
}[, ...]
列定义
(1) 常规列

常规列即物理列,定义了物理介质中存储的数据中字段的名称、类型与顺序。其他类型的列也可以在物理列之间进行声明,但是不会影响最终物理列的读取。

形式举例

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING
) WITH (
  ...
);
(2) 元数据列
元数据列是SQL标准的拓展,允许访问数据源本身具有的一些元数据,元数据列由METADATA关键字标识。

例如,我们可以使用元数据列从 Kafka 数据中读取 Kafka 数据自带的时间戳(这个时间戳不是数据中的某个时间戳字段,而是数据写入 Kafka 时,Kafka 引擎给这条数据打上的时间戳标记),然后我们可以在 Flink SQL 中使用这个时间戳,比如进行基于时间的窗口操作。

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  -- 读取 kafka 本身自带的时间戳
  `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka'
  ...
);

-- 后期数据处理
INSERT INTO MyTable 
SELECT 
    user_id
    , name
    , record_time + INTERVAL '1' SECOND 
FROM MyTable;

 -- 如果自定义的列名称和 Connector 中定义 metadata 字段的名称一样的话,FROM xxx 子句是可以被省略的。
 CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  -- 读取 kafka 本身自带的时间戳
  `timestamp` TIMESTAMP_LTZ(3) METADATA
) WITH (
  'connector' = 'kafka'
  ...
);

需要注意的是,每种Connecor提供的METADATA字段不一样,需要参考https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/

默认情况下,FlinkSQL认为meta列是可以读取也可以写入的,但是有些外部存储系统的元数据只能用于读取不能写入。

可以选择使用VIRTUAL关键字来标识某个元数据列不写入外部存储中。

CREATE TABLE MyTable (
  -- sink 时会写入
  `timestamp` BIGINT METADATA,
  -- sink 时不写入
  `offset` BIGINT METADATA VIRTUAL,
  `user_id` BIGINT,
  `name` STRING,
) WITH (
  'connector' = 'kafka'
  ...
);

这里在写入时需要注意,不要在 SQL 的 INSERT INTO 语句中写入 offset 列,否则 Flink SQL 任务会直接报错。

(3) 计算列

计算列在写建表的DDL时,可以拿已有的一些列经过一些自定义的运算生成的新列。

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `price` DOUBLE,
  `quantity` DOUBLE,
  -- cost 就是使用 price 和 quanitity 生成的计算列,计算方式为 price * quanitity
  `cost` AS price * quanitity,
) WITH (
  'connector' = 'kafka'
  ...
);

需要注意的是:如果只是简单的四则运算的话可以直接写在DML中就可以,但是计算列一些用于定义时间属性。把输入数据的时间格式标准化,处理时间、事件时间举例如下:

  • 处理时间:使用PROCTIME()函数来定义处理时间列。

  • 事件时间:事件时间的时间戳可以在声明WaterMark之前进行预处理

需要注意的是:和虚拟列类型,计算列只能读取不能写入。

WaterMark定义

WaterMark是在Create Table中进行定义的,具体SQL语法标准是:

WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
  • rowtime_column_name:表的事件时间属性字段。该列必须是 TIMESTAMP(3)、TIMESTAMP_LTZ(3) 类,这个时间可以是一个计算列。

  • watermark_strategy_expression:定义 Watermark 的生成策略。Watermark 的一般都是由 rowtime_column_name 列减掉一段固定时间间隔。SQL 中 Watermark 的生产策略是:当前 Watermark 大于上次发出的 Watermark 时发出当前 Watermark。

注意:

如果你使用的是事件时间语义,那么必须要设设置事件时间属性和 WATERMARK 生成策略。

Watermark 的发出频率:Watermark 发出一般是间隔一定时间的,Watermark 的发出间隔时间可以由 pipeline.auto-watermark-interval 进行配置,如果设置为 200ms 则每 200ms 会计算一次 Watermark,如果比之前发出的 Watermark 大,则发出。如果间隔设为 0ms,则 Watermark 只要满足触发条件就会发出,不会受到间隔时间控制。

FlinkSQL定义的WATERMARK生产策略

  • 有界无序:设置方式:WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。此类策略用于设置最大的乱序时间,

  • 严格升序: 设置方式为:。一般基本不用这种方式,如果你能保证你的数据源的时间戳是严格升序的,认为时间戳只会越来越大,也不存在相等的情况,只有相等或者小于之前的,认为迟到的数据。

  • 递增:设置方式为:。一般基本不用这种方式,如果设置此类,则允许tyou

With定义

在建表时,描述数据源、数据汇的具体外部存储的元数据信息。

一般的With的配置项由FlinkSQL的Connector来定义,每种Connector提供的With配置项都不同。内置连接器参考官方文档:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
  'connector' = 'kafka', -- 外部存储的方式
  'topic' = 'user_behavior',-- 主题信息
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup', -- 使用哪个组消费
  'scan.startup.mode' = 'earliest-offset',-- 消费方式
  'format' = 'csv'-- 在读入与写出时的序列化方式
)
Like定义

like字句是Create Table字句的一个延申。

CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3)
) WITH ( 
    'connector' = 'kafka',
    'scan.startup.mode' = 'earliest-offset'
);
CREATE TABLE Orders_with_watermark (
    -- 1. 添加了 WATERMARK 定义
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    -- 2. 覆盖了原 Orders 表中 scan.startup.mode 参数
    'scan.startup.mode' = 'latest-offset'
)
-- 3. Like 子句声明是在原来的 Orders 表的基础上定义 Orders_with_watermark 表
LIKE Orders;

4. 代码示例

FlinkSQL第一个例子

第一步:引用依赖
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>1.17.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>1.17.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java</artifactId>
    <version>1.17.0</version> <!-- 根据版本进行修改 -->
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.17.0</version> <!-- 根据版本进行修改 -->
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-files</artifactId>
    <version>1.17.0</version> <!-- Flink 1.17 版本 -->
</dependency>
第二步:编写代码
public static void main(String[] args) throws Exception {
    // 加载环境依赖
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    EnvironmentSettings build = EnvironmentSettings.newInstance()
            .inBatchMode() // 批处理模式
            .build(); 
    StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, build);
    // 定义数据源
    String createSourceTableDdl = "CREATE TABLE csv_source (" +
            " user_id INT," +
            " product STRING," +
            " order_amount DOUBLE" +
            ") WITH (" +
            " 'connector' = 'filesystem'," +
            " 'path' = 'file:///E:/input.csv'," +
            " 'format' = 'csv'" +
            ")";
    tableEnvironment.executeSql(createSourceTableDdl);
    /*String query = "SELECT user_id, SUM(order_amount) AS total_amount FROM csv_source GROUP BY user_id";
    tableEnvironment.executeSql(query).print();
    env.execute("flink SQL Demo");*/
    
    // 定义输出源
    String createSinkTableDdl = "CREATE TABLE csv_sink (" +
            " user_id INT," +
            " total_amount DOUBLE" +
            ") WITH (" +
            " 'connector' = 'filesystem'," +
            " 'path' = 'file:///E:/output.csv'," +
            " 'format' = 'csv'" +
            ")";
    tableEnvironment.executeSql(createSinkTableDdl);
    // 将执行结果输入到sink中
    String query = "INSERT INTO csv_sink " +
            "SELECT user_id, SUM(order_amount) as total_amount " +
            "FROM csv_source " +
            "GROUP BY user_id";

    tableEnvironment.executeSql(query).print();

    //env.execute();
}

附录

ProcTime函数

是指Flink算子执行具体操作时的机器系统时间,通常以毫秒为单位。

Watermark机制

概念

Flink SQL中的Watermark用于处理流数据中的时间相关问题,特别是在无界流中,时间的延迟、乱序以及处理的顺序会对结果产生影响。Watermark的主要作用是用来标记事件的最大时间戳,并且帮助Flink确定何时可以触发事件时间上的窗口计算。

具体用时间窗口操作:Watermark可以帮助触发窗口操作的计算。当一个事件的时间戳超过了当前Watermark时,窗口计算会被触发。

  • 乱序事件处理:当事件发生顺序与事件流中的实际顺序不一致时(即乱序事件),Watermark可以帮助Flink判断何时可以安全地处理和计算事件。

  • 延迟处理:Watermark允许Flink推迟处理窗口,直到某些延迟的事件到达,并且不丢失它们。

具体例子

假设我们有一个流数据系统,记录用户的购买事件,并且我们想按时间窗口统计每个用户在某个时间段内的购买次数。每个事件有一个时间戳,表示事件发生的时间,但由于网络延迟或事件乱序,事件的到达时间可能不同于实际发生的时间。

CREATE TABLE purchases (
    user_id STRING,
    amount DECIMAL(10, 2),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time            time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'user_purchases',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json'
);

-- 每10秒的滚动窗口,统计用户购买次数
SELECT user_id, COUNT(*) AS purchase_count
FROM purchases
GROUP BY user_id, TUMBLE(event_time, INTERVAL '10' SECOND);

-- WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND:定义了一个Watermark策略,表示Watermark会比事件的时间戳晚5秒,以处理可能出现的延迟事件。
-- TUMBLE(event_time, INTERVAL '10' SECOND):对event_time进行每10秒的滚动窗口计算。
-- 当Flink接收到某个事件时,它会根据事件的时间戳来更新Watermark。Watermark值为事件时间戳减去5秒,这样可以处理一定时间内乱序到达的事件,确保计算窗口时不会遗漏迟到的事件。

网站公告

今日签到

点亮在社区的每一天
去签到