假设你已经了解SQLMesh是什么,以及其他应用场景。如果没有,我建议你先阅读《SQLMesh系列教程-1:数据工程师的高效利器-SQLMesh》。
在本文中,我们将完成一个小项目或教程,以帮助你开始使用SQLMesh。你可以选择一步一步地进行操作,也可以通读一遍以了解整个过程。
搭建开发环境
你可以使用自己喜欢的IDE,但在本教程中我将使用VSCode。我们将使用duckdb作为执行引擎和数据源。如果你对duckdb在CLI和Python开始使用感兴趣,请搜索我的其他duckdb系列文章。
首先,让我们通过安装Python和必要的依赖项来设置工作环境。创建一个Python虚拟环境并安装依赖项:
python -m venv .venv
source ./venv/bin/activate
pip install 'sqlmesh[duckdb]'
我们将使用一个简单的数据集来重点理解SQLMesh。让我们在duckdb CLI中创建一个源数据库(也可以在Python中执行相同的操作)。我将使用名称“db.db”,因为它是初始化duckdb项目时SQLMesh配置的默认名称:
duckdb db.db
创建schema及源表:
CREATE SCHEMA example;
CREATE OR REPLACE TABLE example.letters (id INTEGER, letter CHAR(1), value INTEGER, updated_date DATE);
-- 插入数据
INSERT INTO example.letters VALUES (1, 'A', 10, '2025-01-07'), (2, 'B', 20, '2025-01-07'), (3, 'C', 30, '2025-01-07');
检查表数据:
SELECT * FROM example.letters;
┌───────┬─────────┬───────┬──────────────┐
│ id │ letter │ value │ updated_date │
│ int32 │ varchar │ int32 │ date │
├───────┼─────────┼───────┼──────────────┤
│ 1 │ A │ 10 │ 2025-01-07 │
│ 2 │ B │ 20 │ 2025-01-07 │
│ 3 │ C │ 30 │ 2025-01-07 │
└───────┴─────────┴───────┴──────────────┘
初始化SQLMesh项目
在命令行中,执行以下命令初始化sqlmesh项目:
sqlmesh init duckdb
一旦你运行这个命令,你会看到生成了几个文件夹,就像你初始化dbt项目时一样:
解释sqlmesh项目结构
在上面的截图中,您可以看到多个文件夹,例如:
- “audits” - 用于定义自定义数据审计的SQL文件
- “logs” - (运行项目后你会看到)-日志文件
- “macros” - Python文件用于Python宏,SQL文件用于Jinja宏
- “models” - SQL或Python文件/模型将被存储
- “seeds” - 静态CSV文件
- “tests” - 定义单元测试的yaml文件
你还可以看到“config.yaml”。配置SQLMesh项目设置的文件。让我们看下配置文件内容:
gateways:
local:
connection:
type: duckdb
database: db.db
default_gateway: local
model_defaults:
dialect: duckdb
start: 2025-02-09
这是配置数据库连接的地方。SQLMesh用duckdb作为后端初始化项目的上述内容。
基本上就是说,SQLMesh在默认情况下使用名为“local”的网关,在使用duckdb SQL方言的名为“db.db”的数据库中使用duckdb作为其执行引擎。正如你可能已经猜到的,你可以在这个配置文件中分离测试连接和状态数据库连接。
现在,让我们稍微修改一下这个配置文件,添加一个单独的test/state连接:
gateways:
local:
connection:
type: duckdb
database: db.db
test_connection:
type: duckdb
database: test.db
state_connection:
type: duckdb
database: state.db
default_gateway: local
model_defaults:
dialect: duckdb
start: 2025-02-09
我指定使用“test.db”运行测试,使用“state.db”管理状态信息。现在我们有了一个整体的结构,让我们来为一个演示目构建模型、宏、审计和测试。
从头设计数据模型
我们将构建模型、Python宏、审计和测试。在初始化的SQLMesh项目中,有3个模型:
- seed_model.sql
- incremental_model.sql
- full_model.sql
种子模型是静态的,为了使我们的项目更有意义,我们将使用我们在前一步中生成的一些数据完全替换种子模型。我们还将用新产品替换其他两种型号。
之后,我们将执行模型并创建一个简单的Python宏、一个自定义审计和一个单元测试。我们将看到所有这些如何很好地协同工作。
让我们删除“seed_model”。Sql”文件。您可以手动执行或运行命令:
rm models/*.sql
让我们创建一个新模型,它将作为其他两个模型的基础。我们已经在“db.db”中创建了一个源表,它被设置为SQLMesh项目的连接数据库。
建立基础模型
我将这个模型命名为“base_model.sql”:
MODEL (
name example.base_model,
owner tom,
kind VIEW,
cron '@daily',
grain id,
column_descriptions (
id = 'primary key',
letter = 'alphabet letter',
value = 'random value'
)
);
SELECT
id::INT,
letter::TEXT,
value::INT,
updated_date
FROM
db.example.letters -- full table path
需要注意的一些事情:
- 模型元数据直接进入模型文件,而不是像在dbt中那样在单独的yaml文件中定义所有内容。
- 你可能不熟悉元数据,如“kind”,“owner”,“cron”和“grain”,但它们是相当不言自明的。虽然‘ cron ’将在运行‘ sqlmesh run ’时使用,我们将在后面介绍。你可以在SQLMesh的文档页面上查看可用的模型属性。
- 类型转换是使用Postgres的‘ x::int ’语法完成的。您还可以在‘ MODEL ’块中指定模式(列名=数据类型)。
- 你可以通过添加类似“id::INT -主键”这样的注释来添加列描述,但我喜欢将列描述放在“MODEL”块中。
- 如果您引用的是SQLMesh项目之外的表,它将被视为外部表,你需要指定表路径,而不是模型名称。
外部模型
可选地,你可以创建/生成一个external_models。用于存储外部表的元数据。定义外部模型的元数据的好处是,SQLMesh可以使特性更有用。例如,如果你没有创建yaml文件,那么你将无法获得到外部模型的列级沿袭。
创建external_models。您可以手动定义Yaml,也可以运行以下命令:
sqlmesh create_external_models
SQLMesh将创建一个yaml文件,如下所示:
- name: '"db"."example"."letters"'
columns:
id: INT
letter: TEXT
value: INT
updated_date: DATE
gateway: local
建立下游模型
既然基本模型已经准备好了,其外部模型模式信息也已经到位,那么让我们创建更多的模型。
- “example.intermediate_model.sql” - 这个模型类似于“base_model.sql”,除了它包括一个新的列,改变了它只需要指定上游表的模型名,并使用‘ FULL ’物化策略:
MODEL (
name example.intermediate_model,
owner tommy,
kind FULL,
cron '@daily',
grain id,
column_descriptions (
id = 'primary key',
letter = 'alphabet letter',
value = 'random value',
updated_date = 'updated date',
new_col = 'a new column'
)
);
SELECT
id,
letter,
value,
@multiply_by_10(value) AS big_value,
updated_date,
'new_col' AS new_col
FROM
example.base_model
“example.incremental_model。- 该模型使用‘ INCREMENTAL_BY_TIME_RANGE ’物化策略,这是SQLMesh中3个增量加载选项之一。这需要添加where子句,以确保只处理必要的数据。
MODEL (
name example.incremental_model,
owner tommy,
kind INCREMENTAL_BY_TIME_RANGE (
time_column (updated_date, '%Y-%m-%d'),
lookback 5, -- to handle late arriving date
),
start '2025-01-01',
cron '@daily',
grain id,
column_descriptions (
id = 'primary key',
letter = 'alphabet letter',
updated_date = 'updated date',
)
);
SELECT
id,
letter,
updated_date
FROM
example.base_model
WHERE
updated_date BETWEEN @start_date AND @end_date
我不会太深入SQLMesh的增量加载选项,但简单地说,你可以使用增量加载数据:
- by time range
- by partition
- by unique key (a merge operation)
使用“sqlmesh plan”应用更改
现在已经构建了我们的模型,我们将使用‘ sqlmesh plan ’命令查看更改并加载数据。
注意:`test_full_model。我们初始化项目时附带的Yaml '文件应该删除,因为它不再适用于我们的项目。如果尝试在不删除yaml文件的情况下运行下面的命令,可能会遇到错误。因此,请确保在继续之前将其从项目中删除。
首先运行下面命令:
sqlmesh plan dev
系统将询问您希望回填多少数据的日期范围,但通过按回车键将其保留为空白。对于最后一个问题,输入“y”以应用更改:
执行过程截图:
为了确保你的模型被物化,你可以进入你的数据库或使用‘ sqlmesh fetchdf ’命令:
$ sqlmesh fetchdf "select * from example__dev.base_model;"
id letter value updated_date
0 1 A 10 2025-01-07
1 2 B 20 2025-01-07
2 3 C 30 2025-01-07
注意,我必须将模式指定为example__dev
。模式的格式是YOURSCHEMA__{yourenenvironment}
。在这种情况下,我运行sqlmesh plan dev
,并添加了__dev
后缀。
如果一切正常,那么你可以通过运行以下命令将更改部署到生产环境:
sqlmesh plan
检查生产环境中的base_model表(schema没有后缀):
$ sqlmesh fetchdf "select * from example.base_model;"
id letter value updated_date
0 1 A 10 2025-01-07
1 2 B 20 2025-01-07
2 3 C 30 2025-01-07
你可能已经注意到,当我们运行‘ sqlmesh plan ’时,我们不需要回填任何数据。SQLMesh只是通过使用视图将指针切换到更新后的表。
现在,在实际项目中,顺序运行‘ sqlmesh plan dev ’和‘ sqlmesh plan ’可能不是你部署到生产环境的方式。好消息是SQLMesh有一个开源的GitHub Actions CI/CD Bot。它的功能如下:
自动在pr上运行单元测试
- 自动创建PR环境,表示PR中的代码更改
- 自动分类和回填数据的模型已经改变
- 自动将更改部署到生产中,自动防止数据差距并合并PR
这些要点是从SQLMesh网站上复制粘贴过来的,在未来文章中我们继续讨论。
sqlmesh run
当我第一次开始使用SQLMesh时,我有一个问题:sqlmesh plan
和 sqlmesh run
之间的区别是什么?
- ‘ sqlmesh plan ’汇总本地所有变更,并允许你在目标环境中检查和执行模型。
- ‘ sqlmesh run ’根据每个cron时间表执行模型。对我来说,除了SQLMesh利用模型属性中定义的“cron”之外,更容易认为它是“dbt run”。例如,假设你有一个具有每日cron计划的模型和另一个具有小时cron计划的模型。你计划通过GitHub Actions每小时运行“sqlmesh run”命令。使用每日cron计划的模型每天只运行一次,而如果您每小时运行“dbt run”,则所有模型每小时运行一次,从而导致浪费计算资源。
现在我们已经介绍了两个关键的SQLMesh命令,让我们继续学习我们的教程/项目。
总结
本文我们首先搭建duckdb及sqlmesh基础开发环境,然后新建入门项目,解释项目结构,构建数据模型,执行模型并查看结果。为了避免文章篇幅太长,关于宏、审计、测试,数据血缘关系及DAG, python模型。