SQLMesh系列教程-2:SQLMesh入门项目实战

发布于:2025-02-11 ⋅ 阅读:(8) ⋅ 点赞:(0)

假设你已经了解SQLMesh是什么,以及其他应用场景。如果没有,我建议你先阅读《SQLMesh系列教程-1:数据工程师的高效利器-SQLMesh》

在本文中,我们将完成一个小项目或教程,以帮助你开始使用SQLMesh。你可以选择一步一步地进行操作,也可以通读一遍以了解整个过程。

在这里插入图片描述

搭建开发环境

你可以使用自己喜欢的IDE,但在本教程中我将使用VSCode。我们将使用duckdb作为执行引擎和数据源。如果你对duckdb在CLI和Python开始使用感兴趣,请搜索我的其他duckdb系列文章。

首先,让我们通过安装Python和必要的依赖项来设置工作环境。创建一个Python虚拟环境并安装依赖项:

python -m venv .venv
source ./venv/bin/activate
pip install 'sqlmesh[duckdb]'

我们将使用一个简单的数据集来重点理解SQLMesh。让我们在duckdb CLI中创建一个源数据库(也可以在Python中执行相同的操作)。我将使用名称“db.db”,因为它是初始化duckdb项目时SQLMesh配置的默认名称:

duckdb db.db

创建schema及源表:

CREATE SCHEMA example;
CREATE OR REPLACE TABLE example.letters (id INTEGER, letter CHAR(1), value INTEGER, updated_date DATE); 

-- 插入数据
INSERT INTO example.letters VALUES (1, 'A', 10, '2025-01-07'), (2, 'B', 20, '2025-01-07'), (3, 'C', 30, '2025-01-07');

检查表数据:

SELECT * FROM example.letters;

┌───────┬─────────┬───────┬──────────────┐
│  id   │ letter  │ value │ updated_date │
│ int32 │ varchar │ int32 │     date     │
├───────┼─────────┼───────┼──────────────┤
│     1 │ A       │    102025-01-07   │
│     2 │ B       │    202025-01-07   │
│     3 │ C       │    302025-01-07   │
└───────┴─────────┴───────┴──────────────┘

初始化SQLMesh项目

在命令行中,执行以下命令初始化sqlmesh项目:

sqlmesh init duckdb

一旦你运行这个命令,你会看到生成了几个文件夹,就像你初始化dbt项目时一样:

在这里插入图片描述

解释sqlmesh项目结构

在上面的截图中,您可以看到多个文件夹,例如:

  • “audits” - 用于定义自定义数据审计的SQL文件
  • “logs” - (运行项目后你会看到)-日志文件
  • “macros” - Python文件用于Python宏,SQL文件用于Jinja宏
  • “models” - SQL或Python文件/模型将被存储
  • “seeds” - 静态CSV文件
  • “tests” - 定义单元测试的yaml文件

你还可以看到“config.yaml”。配置SQLMesh项目设置的文件。让我们看下配置文件内容:

gateways:
  local:
    connection:
      type: duckdb
      database: db.db

default_gateway: local

model_defaults:
  dialect: duckdb
  start: 2025-02-09

这是配置数据库连接的地方。SQLMesh用duckdb作为后端初始化项目的上述内容。

基本上就是说,SQLMesh在默认情况下使用名为“local”的网关,在使用duckdb SQL方言的名为“db.db”的数据库中使用duckdb作为其执行引擎。正如你可能已经猜到的,你可以在这个配置文件中分离测试连接和状态数据库连接。

现在,让我们稍微修改一下这个配置文件,添加一个单独的test/state连接:

gateways:
  local:
    connection:
      type: duckdb
      database: db.db
    test_connection:
      type: duckdb
      database: test.db
    state_connection:
      type: duckdb
      database: state.db

default_gateway: local

model_defaults:
  dialect: duckdb
  start: 2025-02-09

我指定使用“test.db”运行测试,使用“state.db”管理状态信息。现在我们有了一个整体的结构,让我们来为一个演示目构建模型、宏、审计和测试。

从头设计数据模型

我们将构建模型、Python宏、审计和测试。在初始化的SQLMesh项目中,有3个模型:

  • seed_model.sql
  • incremental_model.sql
  • full_model.sql

种子模型是静态的,为了使我们的项目更有意义,我们将使用我们在前一步中生成的一些数据完全替换种子模型。我们还将用新产品替换其他两种型号。

之后,我们将执行模型并创建一个简单的Python宏、一个自定义审计和一个单元测试。我们将看到所有这些如何很好地协同工作。

让我们删除“seed_model”。Sql”文件。您可以手动执行或运行命令:

rm models/*.sql

让我们创建一个新模型,它将作为其他两个模型的基础。我们已经在“db.db”中创建了一个源表,它被设置为SQLMesh项目的连接数据库。

建立基础模型

我将这个模型命名为“base_model.sql”:

MODEL (
    name example.base_model,
    owner tom,
    kind VIEW,
    cron '@daily',
    grain id,
    column_descriptions (
        id = 'primary key',
        letter = 'alphabet letter',
        value = 'random value'
    )
  );

SELECT
    id::INT,
    letter::TEXT,
    value::INT,
    updated_date
FROM 
db.example.letters  -- full table path
  

需要注意的一些事情:

  • 模型元数据直接进入模型文件,而不是像在dbt中那样在单独的yaml文件中定义所有内容。
  • 你可能不熟悉元数据,如“kind”,“owner”,“cron”和“grain”,但它们是相当不言自明的。虽然‘ cron ’将在运行‘ sqlmesh run ’时使用,我们将在后面介绍。你可以在SQLMesh的文档页面上查看可用的模型属性。
  • 类型转换是使用Postgres的‘ x::int ’语法完成的。您还可以在‘ MODEL ’块中指定模式(列名=数据类型)。
  • 你可以通过添加类似“id::INT -主键”这样的注释来添加列描述,但我喜欢将列描述放在“MODEL”块中。
  • 如果您引用的是SQLMesh项目之外的表,它将被视为外部表,你需要指定表路径,而不是模型名称。

外部模型

可选地,你可以创建/生成一个external_models。用于存储外部表的元数据。定义外部模型的元数据的好处是,SQLMesh可以使特性更有用。例如,如果你没有创建yaml文件,那么你将无法获得到外部模型的列级沿袭。

创建external_models。您可以手动定义Yaml,也可以运行以下命令:

sqlmesh create_external_models 

SQLMesh将创建一个yaml文件,如下所示:

- name: '"db"."example"."letters"'
  columns:
    id: INT
    letter: TEXT
    value: INT
    updated_date: DATE
  gateway: local

建立下游模型

既然基本模型已经准备好了,其外部模型模式信息也已经到位,那么让我们创建更多的模型。

  • “example.intermediate_model.sql” - 这个模型类似于“base_model.sql”,除了它包括一个新的列,改变了它只需要指定上游表的模型名,并使用‘ FULL ’物化策略:
MODEL (
    name example.intermediate_model,
    owner tommy,
    kind FULL,
    cron '@daily',
    grain id,
    column_descriptions (
        id = 'primary key',
        letter = 'alphabet letter',
        value = 'random value',
        updated_date = 'updated date',
        new_col = 'a new column'
    )
  );

  SELECT
    id,
    letter,
    value,
    @multiply_by_10(value) AS big_value,
    updated_date,
    'new_col' AS new_col
  FROM
    example.base_model

“example.incremental_model。- 该模型使用‘ INCREMENTAL_BY_TIME_RANGE ’物化策略,这是SQLMesh中3个增量加载选项之一。这需要添加where子句,以确保只处理必要的数据。

MODEL (
    name example.incremental_model,
    owner tommy,
    kind INCREMENTAL_BY_TIME_RANGE (
        time_column (updated_date, '%Y-%m-%d'),
        lookback 5,  -- to handle late arriving date
    ),
    start '2025-01-01',
    cron '@daily',
    grain id,
    column_descriptions (
        id = 'primary key',
        letter = 'alphabet letter',
        updated_date = 'updated date',
    )
  );

  SELECT
    id,
    letter,
    updated_date
  FROM
    example.base_model
  WHERE 
    updated_date BETWEEN @start_date AND @end_date

我不会太深入SQLMesh的增量加载选项,但简单地说,你可以使用增量加载数据:

  • by time range
  • by partition
  • by unique key (a merge operation)

使用“sqlmesh plan”应用更改

现在已经构建了我们的模型,我们将使用‘ sqlmesh plan ’命令查看更改并加载数据。

注意:`test_full_model。我们初始化项目时附带的Yaml '文件应该删除,因为它不再适用于我们的项目。如果尝试在不删除yaml文件的情况下运行下面的命令,可能会遇到错误。因此,请确保在继续之前将其从项目中删除。

首先运行下面命令:

sqlmesh plan dev

系统将询问您希望回填多少数据的日期范围,但通过按回车键将其保留为空白。对于最后一个问题,输入“y”以应用更改:

在这里插入图片描述

执行过程截图:

在这里插入图片描述

为了确保你的模型被物化,你可以进入你的数据库或使用‘ sqlmesh fetchdf ’命令:

$ sqlmesh fetchdf "select * from example__dev.base_model;"

   id letter  value updated_date
0   1      A     10   2025-01-07
1   2      B     20   2025-01-07
2   3      C     30   2025-01-07

注意,我必须将模式指定为example__dev。模式的格式是YOURSCHEMA__{yourenenvironment}。在这种情况下,我运行sqlmesh plan dev ,并添加了__dev后缀。

如果一切正常,那么你可以通过运行以下命令将更改部署到生产环境:

sqlmesh plan

在这里插入图片描述
检查生产环境中的base_model表(schema没有后缀):

$ sqlmesh fetchdf "select * from example.base_model;"

   id letter  value updated_date
0   1      A     10   2025-01-07
1   2      B     20   2025-01-07
2   3      C     30   2025-01-07

你可能已经注意到,当我们运行‘ sqlmesh plan ’时,我们不需要回填任何数据。SQLMesh只是通过使用视图将指针切换到更新后的表。

现在,在实际项目中,顺序运行‘ sqlmesh plan dev ’和‘ sqlmesh plan ’可能不是你部署到生产环境的方式。好消息是SQLMesh有一个开源的GitHub Actions CI/CD Bot。它的功能如下:

自动在pr上运行单元测试

  • 自动创建PR环境,表示PR中的代码更改
  • 自动分类和回填数据的模型已经改变
  • 自动将更改部署到生产中,自动防止数据差距并合并PR

这些要点是从SQLMesh网站上复制粘贴过来的,在未来文章中我们继续讨论。

sqlmesh run

当我第一次开始使用SQLMesh时,我有一个问题:sqlmesh plansqlmesh run 之间的区别是什么?

  • ‘ sqlmesh plan ’汇总本地所有变更,并允许你在目标环境中检查和执行模型。
  • ‘ sqlmesh run ’根据每个cron时间表执行模型。对我来说,除了SQLMesh利用模型属性中定义的“cron”之外,更容易认为它是“dbt run”。例如,假设你有一个具有每日cron计划的模型和另一个具有小时cron计划的模型。你计划通过GitHub Actions每小时运行“sqlmesh run”命令。使用每日cron计划的模型每天只运行一次,而如果您每小时运行“dbt run”,则所有模型每小时运行一次,从而导致浪费计算资源。

现在我们已经介绍了两个关键的SQLMesh命令,让我们继续学习我们的教程/项目。

总结

本文我们首先搭建duckdb及sqlmesh基础开发环境,然后新建入门项目,解释项目结构,构建数据模型,执行模型并查看结果。为了避免文章篇幅太长,关于宏、审计、测试,数据血缘关系及DAG, python模型。


网站公告

今日签到

点亮在社区的每一天
去签到