SQLMesh 的
MODEL
提供了丰富的属性,用于定义模型的行为、存储、调度、依赖关系等。通过合理配置这些属性,可以构建高效、可维护的数据管道。在 SQLMesh 中,MODEL
是定义数据模型的核心结构,初学SQLMesh,定义模型看到属性会很迷惑,本文主要解释sqlmesh的模型属性,为后续继续学习打基础。
模型属性
在 SQLMesh 中,MODEL
是定义数据模型的核心结构,除了 kind
属性外,还有许多其他属性可以配置。MODEL DDL语句具有各种属性,这些属性用于元数据和控制行为。在模型配置参考中了解有关这些属性及其默认值的更多信息。以下是 MODEL
的常见属性及其说明,以及一个完整示例。
- name
Name指定模型的名称。该名称表示模型输出的产品视图名称,因此它通常采用“schema”.“view_name”的形式。在SQLMesh项目中,模型的名称必须是唯一的。
当模型在非生产环境中使用时,SQLMesh会自动为模型名称添加前缀。例如,考虑一个名为“sushi”、“customers”的模型。在dev中,它的视图命名为“sushi__dev”.“customers”。
name属性是必需的,并且必须是唯一的,除非启用了名称自动推断。
- project
Project指定模型所属的项目的名称。用于多仓库SQLMesh部署。
- Kind
Kind指定模型的类型。模型的类型决定了如何计算和存储模型。SQL模型的默认类型是VIEW,这意味着创建一个视图,并且每次访问该视图时都要运行查询。另一方面,Python模型的默认类型是FULL,这意味着每次计算模型时都会创建一个表并执行Python代码。每个类型未来我们会分别详细说明,你也可以参考官网文档。
- audits(审计)
审计指定在评估模型后应该运行哪些审计。
Dialect定义模型的SQL方言。默认情况下,它使用配置文件model_defaults方言键中的方言。SQLGlot库支持的所有SQL方言都是允许的。
Owner指定模型的主要联系人是谁。对于拥有许多数据合作者的组织来说,这是一个重要的领域。
一个可选的任意字符串序列,用于在不更改定义的功能组件的情况下创建新的模型版本。
标签是用于组织模型的一个或多个标签。
- cron
Cron用于调度模型处理或刷新数据的时间。它接受一个cron表达式或@hour、@daily、@weekly或@monthly中的任意一个。所有时间都假定为UTC时区-不可能在不同的时区中指定它们。
- interval_unit
间隔单元确定用于计算模型的时间间隔的时间粒度。缺省情况下,间隔单位是自动从cron表达式派生出来的,不需要指定。
支持的取值为:year
, month
, day
, hour
, half_hour
, quarter_hour
, five_minute
.
与cron的关系
SQLMesh调度器需要来自模型的两个时间信息片段:模型应该运行的特定时间,以及处理或存储数据的最佳时间粒度。interval_unit指定了粒度。
如果模型的cron参数是一个频率,比如@daily,那么运行时间和interval_unit就很容易确定:模型准备在一天开始时运行,它的interval_unit是day。类似地,@hour的cron准备在每个小时开始时运行,其interval_unit为hour。
但是,如果用cron表达式指定cron, SQLMesh将使用更复杂的方法派生interval_unit。
cron表达式可以生成复杂的时间间隔,因此SQLMesh不会直接解析它。相反, 处理过程如下:
从cron表达式生成接下来的五个运行时(相对于计算时间)
计算这五个值之间的间隔的持续时间
确定模型的interval_unit为小于或等于的最大间隔单位值
例如,考虑一个对应于“每43分钟运行一次”的cron表达式。它的interval_unit是half_hour,因为这是小于43分钟的最大interval_unit值。如果cron表达式是“每67分钟运行一次”,那么在相同的逻辑下,它的interval_unit是小时。
然而,interval_unit不需要从cron中推断出来——您可以显式地指定它来定制回填的方式。
指定interval_unit
模型通常以有规律的节奏运行,每次运行之间经过的时间相同,每次运行中处理数据的时间长度相同。
例如,一个模型可能每天在午夜运行(每天运行1次),以处理前一天的数据(每次运行1天的数据)。两次运行之间的时间长度和每次运行中处理的数据的时间长度都是1天(如果您错过了一次运行,则都是2天)。
但是,运行节奏长度和处理过的数据长度不必相同。
考虑一个每天早上7:30运行并处理数据直到今天早上7点的模型。模型的cron是表示“每天早上7:30运行”的cron表达式,SQLMesh从中推断出每天的interval_unit。
当这个模型运行时会发生什么?首先,SQLMesh将识别最近完成的时间间隔。interval_unit被推断为day,所以最后一个完整的间隔是昨天。在运行中,SQLMesh将不包含今天上午12:00到7:00之间的任何数据。
要包含今天的数据,请手动指定一个以小时为单位的间隔。当模型在7:30am运行时,SQLMesh将把最近完成的小时间隔标识为6:00-7:00am,并在回填中包含该间隔内的数据。
MODEL (
name sqlmesh_example.up_until_7,
kind INCREMENTAL_BY_TIME_RANGE (
time_column date_column,
),
start '2024-11-01',
cron '30 7 * * *', -- cron expression for "every day at 7:30am"
interval_unit 'hour', -- backfill up until the most recently completed hour (rather than day)
);
下面我们再配置一个模型:
- 每小时运行一次
- 每次运行时处理最近两天的数据
- 处理迄今为止在每次运行中积累的数据
配置此模型需要通过设置模型配置allow_partials True让SQLMesh处理部分完成的间隔。部分间隔的数据只是临时的——SQLMesh将在整个间隔完成后重新处理它。
MODEL (
name sqlmesh_example.demo,
kind INCREMENTAL_BY_TIME_RANGE (
time_column date_column,
lookback 2, -- 2 days of late-arriving data to backfill
),
start '2024-11-01',
cron '@hourly', -- run model hourly, not tied to the interval_unit
allow_partials true, -- allow partial intervals so today's data is processed in each run
interval_unit 'day', -- finest granularity of data to be time bucketed
);
lookback 以天为单位计算,因为模型的interval_unit指定为天。
Start用于确定处理模型所需的最早时间。它可以是绝对日期/时间(2022-01-01),也可以是相对日期/时间(1年前)。
End用于确定处理模型所需的最晚时间。它可以是绝对日期/时间(2022-01-01),也可以是相对日期/时间(1年前)。
描述属性是可选的。自动注册为底层SQL引擎的表描述/注释(如果引擎支持)。
- column_descriptions
可选的键/值对字典。自动注册为底层SQL引擎的列描述/注释(如果引擎支持)。如果不存在,内联注释将自动注册。示例如下:
column_descriptions (
id = 'primary key',
letter = 'alphabet letter',
value = 'random value',
updated_date = 'updated date',
new_col = 'a new column'
)
模型的粒度是在模型查询返回的结果中唯一标识一行的列或列的组合。如果设置了粒度,像table_diff这样的SQLMesh工具会更容易运行,因为它们会自动使用模型粒度来指定参数,否则需要手动指定。
如果一个模型有多个唯一键或键的组合,那么它可以定义多个粒度。
引用是标识到另一个模型的连接关系的非唯一列或列的组合。
例如,一个模型可以定义一个引用account_id,这将表明它现在可以自动连接到任何具有account_id粒度的模型。它不能安全地连接到具有account_id引用的表,因为引用不是唯一的,这样做将构成多对多连接。
有时列的名称不同,在这种情况下,可以将列名称别名设为公共实体名称。例如,guest_id AS account_id将允许列为guest_id的模型连接到粒度为account_id的模型。
- depends_on
Depends显式地指定模型所依赖的模型,以及从模型代码中自动推断出来的模型。
- table_format
对于支持iceberg和hive等表格式的引擎来说,表格式是一个可选属性,其中物理文件格式是可配置的。目的是使用table_format定义表类型,然后使用storage_format定义表内文件的磁盘格式。
请注意,此属性仅适用于允许独立于storage_format配置table_format的引擎。
- storage_format
存储格式是Spark或Hive等支持parquet和orc等存储格式的引擎的属性。请注意,有些引擎不会区分table_format和storage_format,在这种情况下,使用storage_format而忽略table_format。
- partitioned_by
分区起两个作用。对于大多数模型类型,它是支持表分区(如Spark或BigQuery)的引擎的可选属性。
对于INCREMENTAL_BY_PARTITION模型类型,它定义了用于增量加载数据的分区键。
它可以指定多列分区键或修改用于分区的日期列。例如,在BigQuery中,你可以使用partitioned_by TIMESTAMP_TRUNC(event_ts, day)提取时间戳列event_ts的日期组件,从而按天进行分区。
- clustered_by
Clustered by是Bigquery等支持集群的引擎的可选属性。
- columns
默认情况下,SQLMesh从其SQL查询推断模型的列名和类型。通过在模型的columns属性中手动指定所有列名和数据类型来禁用该行为。
**警告:**如果columns属性包含查询未返回的列、省略查询返回的列或指定查询返回的数据类型以外的数据类型,SQLMesh可能会出现意外行为。
例如,下面示例包含columns键的seed模型定义。它指定文件中所有列的数据类型:holiday_name列的数据类型是VARCHAR, holiday_date列的数据类型是DATE。
MODEL (
name test_db.national_holidays,
kind SEED (
path 'national_holidays.csv'
),
columns (
holiday_name VARCHAR,
holiday_date DATE
)
);
注意:返回dataframe的Python模型需要指定列名和数据类型。
- physical_properties
以前命名为table_properties
physical_properties是应用于物理层中的模型表/视图的任意属性的键值映射。注意分区细节和creatable_type,它覆盖了所创建的模型/视图的类型。在本例中,它创建了一个TRANSIENT TABLE。虽然creatable_type是通用的,但其他属性是特定于适配器的,因此请查看引擎文档。例如:
MODEL (
...,
physical_properties (
partition_expiration_days = 7,
require_partition_filter = true,
creatable_type = TRANSIENT
)
);
- virtual_properties
virtual_properties是应用于虚拟层中的模型视图的任意属性的键值映射。注意分区细节和creatable_type,它覆盖了所创建的模型/视图的类型。在本例中,它创建了一个SECURE VIEW。虽然creatable_type是通用的,但其他属性是特定于适配器的,因此请查看引擎文档。例如:
MODEL (
...,
virtual_properties (
creatable_type = SECURE,
labels = [('test-label', 'label-value')]
)
);
- session_properties
会话属性是应用于引擎会话的特定于目标引擎的任意属性的键值映射。
- allow_partials
指示此模型可以对部分(不完整)数据间隔执行。
默认情况下,每个模型只处理完整的间隔,以防止部分数据引起的常见错误。区间的大小由模型的interval_unit决定。
将allow_partials设置为true将覆盖此行为,这表明模型可能会处理丢失某些数据点的输入数据段。
注意:将此属性设置为true将忽略cron属性。
是否启用模型。默认情况下,此属性为true。将其设置为false将导致SQLMesh在加载项目时忽略此模型。
- physical_version
将此模型的物理表的版本固定到给定的值。
注意:这只能为仅向前的模型设置。
指定要用于执行此模型的网关。如果不指定,则使用默认网关。
- optimize_query
模型的查询是否需要优化。默认情况下,所有SQL模型都是优化的。将其设置为false将导致SQLMesh禁用查询规范化和简化。只有当优化后的查询导致错误(如超出文本限制)时,才应该关闭该功能。
- validate_query
是否在编译时验证模型的查询。默认情况下,该属性为false。将其设置为true会导致SQLMesh引发错误而不是发出警告。这将在SQL语句中显示无效的列,以及包含SELECT *的模型,这些模型不能自动展开以列出所有列。这样可以确保在花费时间和金钱在数据仓库中运行SQL之前在本地验证SQL。
完整示例
以下是一个完整的 SQLMesh 模型定义示例,展示了多个属性的使用:
MODEL (
name sales.fact_sales, -- 模型名称
kind INCREMENTAL, -- 模型类型为增量模型
storage_format 'PARQUET', -- 存储格式为 Parquet
partitioned_by 'order_date', -- 按 order_date 分区
clustered_by ['customer_id'], -- 按 customer_id 聚类
tags ['sales', 'fact'], -- 添加标签
description 'Fact table for sales data', -- 模型描述
columns { -- 显式定义列
order_id INT,
customer_id INT,
product_id INT,
order_date DATE,
quantity INT,
amount DECIMAL(10, 2)
},
dependencies ['raw_sales'], -- 依赖 raw_sales 表
unique_key ['order_id'], -- 唯一键为 order_id
grained_by 'order_date', -- 数据粒度为 order_date
allow_incremental TRUE, -- 允许增量运行
cron '0 0 * * *', -- 每天凌晨运行
owner 'data_team', -- 负责人为 data_team
pre { -- 前置钩子
DELETE FROM sales.fact_sales WHERE order_date = @execution_date;
},
post { -- 后置钩子
ANALYZE TABLE sales.fact_sales;
}
);
-- 模型查询逻辑
SELECT
order_id,
customer_id,
product_id,
order_date,
quantity,
amount
FROM
raw_sales
WHERE
order_date > @prev_execution_date; -- 只处理新增数据
总结
SQLMesh 的 MODEL
提供了丰富的属性,用于定义模型的行为、存储、调度、依赖关系等。通过合理配置这些属性,可以构建高效、可维护的数据管道。以上示例展示了如何结合多个属性来定义一个完整的增量模型。