Flink SQL join
Flink sql 支持对动态表进行复杂且灵活的join操作。考虑到查询可能需要的各种语义,flink提供了多种不同类型的join。
默认情况下,join的顺序没有做过优化。表是按照他们在from子句中指定的顺序进行join的。你可以通过把更新频率最低的表放在最前面,把更新频率最高的表放在最后面,来调整连接查询的性能。确保指定表的顺序不会产生交叉连接(笛卡儿积),flink不支持这样的操作,会导致查询失败。
Regular joins常规连接
Regular join是最通用的join类型。在这中连接中,任何新的记录或者连接中任何一方的变化都是可见的,并且影响整个连接查询的结果。例如如果左边表有一条新的记录产生,当产品ID相等时,它将与右边的所有以前和未来的记录连接起来。
SELECT * FROM Orders
INNER JOIN Product
ON Orders.productId = Product.id
对于流式查询,regular join的语法是最灵活的,允许任何一种更新(增、删、改)输入表。然鹅,这种操作具有重要的操作影响:它需要将连接输入的两边永远保持在flink的状态中。因此,计算查询结果所需的状态会无限增长,这取决于所有输入表和中间连接结果的独特输入行数量。你可以提供一个具有适当的状态生存时间(TTL)的查询配置,以防止过度的状态大小。注意这可能会影响查询结果的正确性。
对于流式查询,计算查询结果所需的状态可能会无限增长,这取决于聚合的类型和不同分组键的数量。请提供一个具有有效保留间隔的查询配置,以防止过多的状态大小。详见查询配置。
Inner Equi-JOIN
返回一个由连接条件限制的简单笛卡尔乘积。目前,只支持等价连接,即至少有一个带有相同谓词的连接条件。不支持任意的交叉或θ连接。
SELECT * FROM Orders
INNER JOIN Product
ON Orders.product_id = Product.id
OUTER Equi-JOIN
返回合格的笛卡尔乘积中的所有行(即所有通过其连接条件的组合行),加上外表中连接条件与其他表的任何行不匹配的每条行的一个副本。Flink支持LEFT、RIGHT和FULL外部连接。目前,只支持等价连接,即至少有一个带有等价谓词的联合条件的连接。不支持任意的交叉连接或θ连接。
SELECT *
FROM Orders
LEFT JOIN Product
ON Orders.product_id = Product.id
SELECT *
FROM Orders
RIGHT JOIN Product
ON Orders.product_id = Product.id
SELECT *
FROM Orders
FULL OUTER JOIN Product
ON Orders.product_id = Product.id
Interval Joins区间连接
返回一个由连接条件和时间约束限制的简单笛卡尔积。一个区间连接需要至少一个等价连接谓词和一个限制两边时间的连接条件。两个适当的范围谓词可以定义这样的条件(<, <=, >=, >),一个BETWEEN谓词,或者一个比较两个输入表的相同类型的时间属性(即处理时间或事件时间)的单一等价谓词。
例如,如果订单在收到四小时后才发货,这个查询将把所有的订单与它们相应的发货连接起来。
SELECT *
FROM Orders o, Shipments s
WHERE o.id = s.order_id
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time
下面的谓词是有效的区间连接条件的例子。
- ltime = rtime
- ltime >= rtime AND ltime < rtime + INTERVAL ‘10’ MINUTE
- ltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime + INTERVAL ‘5’ SECOND
对于流式查询,与Regular joins相比,区间连接只支持具有时间属性的append-only表。由于时间属性是准单调递增的,Flink可以从其状态中删除旧值而不影响结果的正确性。
Temporal Joins 时间性连接
时态表是一个随时间演变的表–在Flink中又被称为动态表。时态表中的行与一个或多个时态周期相关,所有的Flink表都是时态(动态)的。时态表包含一个或多个版本的表快照,它可以是一个变化的历史表,跟踪变化(例如,数据库变化日志,包含所有快照)或一个变化的维度表,将变化具体化(例如,数据库表,包含最新的快照)。
Event Time Temporal Join 事件时间
事件时间时间连接允许针对一个版本的表进行连接。这意味着一个表可以用不断变化的元数据来充实,并检索其在某个时间点的值。
时间连接采取一个任意的表(左侧输入/探测点),并将每一行与版本表(右侧输入/构建点)中相应行的相关版本相关联。Flink使用SQL:2011标准中的FOR SYSTEM_TIME AS OF的SQL语法来执行这一操作。时态连接的语法如下。
SELECT [column_list]
FROM table1 [AS <alias1>]
[LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
ON table1.column-name1 = table2.column-name1
有了事件时间属性(即行时间属性),就有可能检索到一个键的值,因为它是在过去的某个时间点上。这允许在一个共同的时间点上连接两个表。版本表将存储自最后一个水印以来的所有版本–通过时间识别。
例如,假设我们有一个订单表,每个订单都有不同货币的价格。为了正确地将此表规范化为单一货币,如美元,每个订单都需要用下单时的时间点的适当货币转换率来连接。
-- 创建一个订单表
-- 订单表是一个 append-only 动态表.
CREATE TABLE orders (
order_id STRING,
price DECIMAL(32,2),
currency STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time
) WITH (/* ... */);
-- Define a versioned table of currency rates.
-- This could be from a change-data-capture(CDC)
-- such as Debezium, a compacted Kafka topic, or any other
-- way of defining a versioned table.
CREATE TABLE currency_rates (
currency STRING,
conversion_rate DECIMAL(32, 2),
update_time TIMESTAMP(3) METADATA FROM `values.source.timestamp` VIRTUAL,
WATERMARK FOR update_time AS update_time,
PRIMARY KEY(currency) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'value.format' = 'debezium-json',
/* ... */
);
SELECT
order_id,
price,
currency,
conversion_rate,
order_time
FROM orders
LEFT JOIN currency_rates FOR SYSTEM_TIME AS OF orders.order_time
ON orders.currency = currency_rates.currency;
order_id price currency conversion_rate order_time
======== ===== ======== =============== =========
o_001 11.11 EUR 1.14 12:00:00
o_002 12.51 EUR 1.10 12:06:00
注意:事件时间的时间连接是由左右两边的水印watermark触发的;请确保连接的两边都正确设置水印watermark。
注意:事件时间时间连接要求时间连接条件的等价条件中包含主键,例如,表currency_rates的主键currency_rates.currency在条件orders.currency = currency_rates.currency中被约束。
与常规连接相比,尽管构建方有变化,但之前的时态表结果不会受到影响。与区间连接相比,时间表连接并不定义记录被连接的时间窗口。来自探测方的记录总是在时间属性指定的时间内与构建方的版本连接。因此,构建方的记录可能是任意老的。随着时间的推移,不再需要的记录版本(对于给定的主键)将被从状态中删除。
Processing Time Temporal Join 处理时间
一个处理时间的临时表连接使用一个处理时间属性来将记录与外部版本表中的键的最新版本相关联。
根据定义,有了处理时间属性,该连接将总是返回一个给定键的最新值。我们可以把查找表看作是一个简单的HashMap<K, V>,它存储了来自构建方的所有记录。这种连接的力量在于,当在Flink内部将表具体化为动态表不可行时,它允许Flink直接针对外部系统工作。
下面的处理时间时态表连接例子显示了一个append-only 表orders,它应该与表LatestRates连接。LatestRates是一个维度表(例如HBase表),为最新的rate。在时间10:15, 10:30, 10:52,LatestRates的内容看起来如下。
10:15> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 114
Yen 1
10:30> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 114
Yen 1
10:52> SELECT * FROM LatestRates;
currency rate
======== ======
US Dollar 102
Euro 116 <==== changed from 114 to 116
Yen 1
LastestRates在10:15和10:30的内容是相同的。欧元汇率在10:52时从114变为116。
订单是一个append-only表,代表给定金额和给定货币的付款。例如,在10:15,有一个金额为2欧元的订单。
SELECT * FROM Orders;
amount currency
====== =========
2 Euro <== arrived at time 10:15
1 US Dollar <== arrived at time 10:30
2 Euro <== arrived at time 10:52
鉴于这些表格,我们想计算所有订单转换为一种通用货币
amount currency rate amount*rate
====== ========= ======= ============
2 Euro 114 228 <== arrived at time 10:15
1 US Dollar 102 102 <== arrived at time 10:30
2 Euro 116 232 <== arrived at time 10:52
目前,在最新版本的任何视图/表的时态连接中使用的FOR SYSTEM_TIME AS OF语法还不支持,你可以使用以下时态表函数语法Temporal Table Function Join。
SELECT
o_amount, r_rate
FROM
Orders,
LATERAL TABLE (Rates(o_proctime))
WHERE
r_currency = o_currency
注意 不支持任何表/视图的最新版本在时态连接中使用的FOR SYSTEM_TIME AS OF语法的原因只是语义上的考虑,因为左流的连接处理并不等待时态表的完整快照,这可能会误导生产环境中的用户。通过时态表函数处理时态连接也存在同样的语义问题,但它已经存在很长时间了,因此我们从兼容性的角度支持它。
其结果在处理时间上是不确定的。处理时间的时间连接最常用于用外部表(即维度表)来充实流。
与常规连接相比,尽管构建方有变化,但之前的时态表结果不会受到影响。与区间连接相比,时态表连接不定义记录加入的时间窗口,即旧的记录不存储在状态中。
Temporal Table Function Join时态表函数连接
用时态表函数连接表 temporal table function的语法与用表函数连接Table Function的语法相同。
注意:目前只支持时态表的内连接和左外连接。
假设利率是一个时态表函数,连接可以用SQL表示如下。
SELECT
o_amount, r_rate
FROM
Orders,
LATERAL TABLE (Rates(o_proctime))
WHERE
r_currency = o_currency
上述时态表DDL和时态表函数的主要区别是。
- 时态表DDL可以在SQL中定义,但时态表函数不能。
- 时态表DDL和时态表函数都支持时态连接版本变化versioned 表,但只有时态表函数可以时态连接任何表/视图的最新版本。
Lookup Join查找连接
查询连接通常用于用从外部系统查询的数据来充实一个表。该连接要求一个表有一个处理时间属性,另一个表有一个查询源连接器的支持。
The lookup join uses the above Processing Time Temporal Join syntax with the right table to be backed by a lookup source connector.
查询连接使用上面的处理时间时间连接语法,右边的表要由查询源连接器来支持。
下面的例子显示了指定一个查找连接的语法。
-- Customers is backed by the JDBC connector and can be used for lookup joins
CREATE TEMPORARY TABLE Customers (
id INT,
name STRING,
country STRING,
zip STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://mysqlhost:3306/customerdb',
'table-name' = 'customers'
);
-- enrich each order with customer information
SELECT o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
In the example above, the Orders table is enriched with data from the Customers table which resides in a MySQL database. The FOR SYSTEM_TIME AS OF
clause with the subsequent processing time attribute ensures that each row of the Orders
table is joined with those Customers rows that match the join predicate at the point in time when the Orders
row is processed by the join operator. It also prevents that the join result is updated when a joined Customer
row is updated in the future. The lookup join also requires a mandatory equality join predicate, in the example above o.customer_id = c.id
.在上面的例子中,订单表被驻扎在MySQL数据库中的客户表的数据所充实。带有后续处理时间属性的FOR SYSTEM_TIME AS OF子句确保订单表的每条记录与那些在订单行被连接操作者处理的时间点上符合连接谓词的客户行被连接起来。它还可以防止在将来更新连接的客户行时更新连接的结果。查找连接也需要一个强制性的平等连接谓词,在上面的例子中,o.customer_id = c.id。
Array Expansion
为给定数组中的每个元素返回一个新行。目前还不支持用ORDINALITY进行嵌套。
SELECT order_id, tag
FROM Orders CROSS JOIN UNNEST(tags) AS t (tag)
Table Function 表函数
用一个表函数的结果来连接一个表。左侧(外部)表的每一行都与表函数的相应调用所产生的所有行连接。用户定义的表函数必须在使用前注册。
INNER JOIN
如果左表(外表)的表函数调用返回一个空的结果,那么左表(外表)的行将被放弃。
SELECT order_id, res
FROM Orders,
LATERAL TABLE(table_func(order_id)) t(res)
LEFT OUTER JOIN
如果一个表函数的调用返回一个空的结果,那么相应的外层行将被保留,并且结果将被填充为空值。目前,针对横向表的左外连接要求在ON子句中有一个TRUE字样。
SELECT order_id, res
FROM Orders
LEFT OUTER JOIN LATERAL TABLE(table_func(order_id)) t(res)
ON TRUE