在本文中,我们将探讨如何利用dbt项目的代码库来实现一个简单的SQLMesh项目。本文的基础是基于Udemy讲师为dbt课程创建的示例项目,可以在这个GitHub repo中获得。这个dbt项目是相对完整的示例,我们将使用它作为模板来演示SQLMesh(下一代数据转换工具)的功能。
dbt示例项目在Snowflake中使用Airbnb数据集分析端到端的数据工程工作流。该项目包括将原始数据加载到Snowflake中,创建各种模型,并执行转换以从数据中获得有意义的见解。我们打算使用duckdb作为分析数据库,方便读者直接在本机上测试运行。
简要说明Airbnb数据集
本项目中使用的Airbnb数据集由三个主表组成:raw_listings、raw_reviews和raw_hosts。这些表格包含有关Airbnb房源的信息、对房源的评论以及房东的详细信息。理解这些表之间的关系和意义对于处理和分析数据至关重要。
raw_listings(原始房源信息):
• 描述:此表包含有关爱彼迎房源的详细信息,包括房源 ID、网址、名称、房型、最短入住天数、房东 ID、价格以及创建和更新的时间戳。
• 重要性:房源数据提供了爱彼迎上可用房源的全面视图。它是与评论和房东数据相连接的核心表。原始评论:
• 描述:此表包含爱彼迎房源的评论,包含房源 ID、评论日期、评论者姓名、评论内容和情感倾向等信息。
• 重要性:评论数据能提供有关客户体验和满意度的见解。它通过房源 ID 与房源数据相关联。raw_hosts:
• 描述:此表包含有关房源的信息,包括房源 ID、名称、超级房源状态以及创建和更新的时间戳。
• 重要性:房源数据提供了有关提供房源的个人或实体的详细信息。它通过 host_id 与房源数据相关联。
项目最终目标
本项目的首要目标是分析满月日期对爱彼迎(Airbnb)评分和评论的影响。通过利用房源、评论和房东之间的关系,我们旨在得出有关月相周期对客户反馈和满意度影响的有意义的见解。
实现SQLMesh项目的步骤
- 初始化SQLMesh项目
创建一个名为airbnb_sqlmesh的文件夹,并进入该文件夹。执行如下命令初始化SQLMesh项目:
python -m venv .venv
source .venv/bin/activate
pip install "sqlmesh"
sqlmesh init duckdb
项目结构
在初始化完成后,将会创建以下目录和文件,从而为我们的 SQLMesh 项目提供组织结构:
• config.yaml:用于您项目的数据库配置文件。
• models:存放 SQL 和 Python 模型的目录。
• audits:存放共享审计的目录。
• tests:存放单元测试的目录。
• macros:存放宏的目录。
删除models、seeds以及tests目录下的示例文件,后续我们会添加实际业务需要的模型。
项目配置
在配置文件中定义SQLMesh项目配置。yaml文件。该文件包含数据库连接细节和模型默认值:
gateways:
duckdb:
connection:
type: duckdb
database: dw.db
default_gateway: duckdb
model_defaults:
dialect: duckdb
start: 2025-03-17
加载原始数据
使用以下SQL脚本将原始数据加载到duckdb表中:
INSTALL httpfs;
LOAD httpfs;
-- Create and load raw_listings table
CREATE OR REPLACE TABLE raw_listings (
id integer,
listing_url string,
name string,
room_type string,
minimum_nights integer,
host_id integer,
price string,
created_at datetime,
updated_at datetime
);
COPY raw_listings FROM 's3://dbtlearn/listings.csv' (ignore_errors true);
-- Create and load raw_reviews table
CREATE OR REPLACE TABLE raw_reviews (
listing_id integer,
date datetime,
reviewer_name string,
comments string,
sentiment string
);
COPY raw_reviews FROM 's3://dbtlearn/reviews.csv' (ignore_errors true);
-- Create and load raw_hosts table
CREATE OR REPLACE TABLE raw_hosts (
id integer,
name string,
is_superhost string,
created_at datetime,
updated_at datetime
);
COPY raw_hosts FROM 's3://dbtlearn/hosts.csv' (ignore_errors true);
创建模型
源(Source )模型
我们在 models/source 文件夹内的原始数据表基础上创建了三个模型 src_hosts.sql、src_listings.sql 和 src_reviews.sql:
本项目中的源模型旨在对来自 Airbnb 数据集的原始数据进行标准化和准备,以便进一步处理和分析。它们充当中间层,通过重命名列、选择相关字段以及标准化数据等方式将原始数据转换为更易于使用的格式。所有源模型都被实现为视图,确保数据干净、结构化,并为后续在维度和事实模型中的转换和分析做好准备。
src_listings.sql 模型:
MODEL (
name src.SRC_LISTINGS,
kind view
);
WITH mr_listings AS (
SELECT * FROM main.RAW_LISTINGS
)
SELECT
id AS listing_id,
name AS listing_name,
listing_url,
room_type,
minimum_nights,
host_id,
price AS price_str,
created_at,
updated_at
FROM mr_listings;
src_reviews.sql模型:
MODEL (
name src.SRC_REVIEWS,
kind view
);
WITH mr_reviews AS (
SELECT * FROM main.raw_reviews
)
SELECT
listing_id,
date AS review_date,
reviewer_name,
comments AS review_text,
sentiment AS review_sentiment
FROM mr_reviews;
src_hosts.sql模型:
MODEL (
name src.SRC_HOSTS,
kind view
);
WITH mr_hosts AS (
SELECT * FROM main.RAW_HOSTS
)
SELECT
id AS host_id,
name AS host_name,
is_superhost,
created_at,
updated_at
FROM mr_hosts;
运行命令,生成源模型:
sqlmesh plan dev
生成src__dev schema以及三个视图:
D select table_catalog, table_schema,table_name, table_type from information_schema.tables where table_schema='src__dev';
┌───────────────┬──────────────┬──────────────┬────────────┐
│ table_catalog │ table_schema │ table_name │ table_type │
│ varchar │ varchar │ varchar │ varchar │
├───────────────┼──────────────┼──────────────┼────────────┤
│ dw │ src__dev │ src_hosts │ VIEW │
│ dw │ src__dev │ src_listings │ VIEW │
│ dw │ src__dev │ src_reviews │ VIEW │
└───────────────┴──────────────┴──────────────┴────────────┘
维度模型
在models/dim文件夹中创建维度模型作为视图:
维度模型实现对源数据的清理、丰富和组合,以便进行详细分析。它们处理空值、格式化字段和合并相关数据集。模型dim.dim_hosts_cleaned和dim. dim_listings_cleaned被具体化为视图,而dim.dim_listings_w_hosts被具体化为一个完整的表,每次都完全重新加载。
dim.dim_hosts_cleaned模型:
MODEL (
name dim.dim_hosts_cleansed,
kind view
);
WITH src_hosts AS (
SELECT * FROM SOURCE.SRC_HOSTS
)
SELECT
host_id,
NVL(host_name, 'Anonymous') AS host_name,
is_superhost,
created_at,
updated_at
FROM src_hosts;
dim. dim_listings_cleaned模型:
MODEL (
name dim.dim_listings_cleansed,
kind view
);
WITH src_listings AS (
SELECT * FROM SOURCE.SRC_LISTINGS
)
SELECT
listing_id,
listing_name,
room_type,
CASE WHEN minimum_nights = 0 THEN 1 ELSE minimum_nights END AS minimum_nights,
host_id,
REPLACE(price_str, '$', '')::NUMERIC(10, 2) AS price,
created_at,
updated_at
FROM src_listings;
dim.dim_listings_w_hosts模型
MODEL (
name dim.dim_listings_w_hosts,
kind full
);
WITH l AS (
SELECT * FROM dim.dim_listings_cleansed
),
h AS (
SELECT * FROM dim.dim_hosts_cleansed
)
SELECT
l.listing_id,
l.listing_name,
l.room_type,
l.minimum_nights,
l.price,
l.host_id,
h.host_name,
h.is_superhost AS host_is_superhost,
l.created_at,
GREATEST(l.updated_at, h.updated_at) AS updated_at
FROM l
LEFT JOIN h ON (h.host_id = l.host_id);
运行命令,生成维度表:
D select table_catalog, table_schema,table_name, table_type from information_schema.tables where table_schema='dim__dev';
┌───────────────┬──────────────┬───────────────────────┬────────────┐
│ table_catalog │ table_schema │ table_name │ table_type │
│ varchar │ varchar │ varchar │ varchar │
├───────────────┼──────────────┼───────────────────────┼────────────┤
│ dw │ dim__dev │ dim_hosts_cleansed │ VIEW │
│ dw │ dim__dev │ dim_listings_cleansed │ VIEW │
│ dw │ dim__dev │ dim_listings_w_hosts │ VIEW │
└───────────────┴──────────────┴───────────────────────┴────────────┘
事实模型
Fact 模型会在 models/fct 文件夹内创建增量型 Fact 模型:
Fact 模型 fct.reviews 会处理并汇总来自源模型的评论数据。这是一个增量型模型,意味着它只会加载新数据,这极大地减少了每次模型运行所需的计算资源。在模型中,@start 和 @end_date 是 SQLMesh 宏,在 sqlmesh 计划或运行期间会根据运行的适当开始和结束日期进行渲染。此外,@GENERATE_SURROGATE_KEY 宏用于根据给定的输入列生成 MD5 哈希值,为数据生成一个替代键。
MODEL (
name fct.reviews,
kind INCREMENTAL_BY_TIME_RANGE (
time_column review_date
)
);
WITH src_reviews AS (
SELECT * FROM SOURCE.SRC_REVIEWS
)
SELECT
@GENERATE_SURROGATE_KEY(listing_id, review_date, reviewer_name, review_text) AS review_id,
*
FROM src_reviews
WHERE review_text IS NOT NULL
AND review_date BETWEEN @start_date AND @end_date;
运行命令,生成事实表:
D select table_catalog, table_schema,table_name, table_type from information_schema.tables where table_schema='fct__dev';
┌───────────────┬──────────────┬────────────┬────────────┐
│ table_catalog │ table_schema │ table_name │ table_type │
│ varchar │ varchar │ varchar │ varchar │
├───────────────┼──────────────┼────────────┼────────────┤
│ dw │ fct__dev │ reviews │ VIEW │
└───────────────┴──────────────┴────────────┴────────────┘
业务层模型
转到业务层,我们需要分析满月日期如何影响评审意见。为此,我们需要一个包含满月日期的表。这可以使用SQLMesh中的种子文件来实现。从提供的链接下载种子文件,并将其放在SQLMesh项目的种子文件夹中。然后,创建一个名为full_moon_dates_seed.sql 的种子模型。
MODEL (
name seed.full_moon_dates,
kind SEED (
path '../seeds/seed_full_moon_dates.csv'
)
);
运行命令,生成seed日期模型:
D select table_catalog, table_schema,table_name, table_type from information_schema.tables where table_schema='seed__dev';
┌───────────────┬──────────────┬─────────────────┬────────────┐
│ table_catalog │ table_schema │ table_name │ table_type │
│ varchar │ varchar │ varchar │ varchar │
├───────────────┼──────────────┼─────────────────┼────────────┤
│ dw │ seed__dev │ full_moon_dates │ VIEW │
└───────────────┴──────────────┴─────────────────┴────────────┘
对于最终的模型,创建名为models/mart的文件夹,并在其中添加以下模型:
MODEL (
name mart.mart_fullmoon_reviews,
kind full
);
WITH fct_reviews AS (
SELECT * FROM fct.reviews
),
full_moon_dates AS (
SELECT * FROM seed.full_moon_dates
)
SELECT
r.*,
CASE
WHEN fm.full_moon_date IS NULL THEN 'not full moon'
ELSE 'full moon'
END AS is_full_moon
FROM
fct_reviews as r
LEFT JOIN full_moon_dates as fm
ON ( r.review_date = DATE_ADD(strptime(fm.full_moon_date, '%Y-%m-%d'), INTERVAL 1 DAY) )
运行命令,生成模型:
D select table_catalog, table_schema,table_name, table_type from information_schema.tables where table_schema='mart__dev';
┌───────────────┬──────────────┬───────────────────────┬────────────┐
│ table_catalog │ table_schema │ table_name │ table_type │
│ varchar │ varchar │ varchar │ varchar │
├───────────────┼──────────────┼───────────────────────┼────────────┤
│ dw │ mart__dev │ mart_fullmoon_reviews │ VIEW │
└───────────────┴──────────────┴───────────────────────┴────────────┘
一旦所有的模型都创建好了,项目结构应该是这样的:
在开发模式下测试成功后,我们在生产模式下运行sqlmesh计划:
sqlmesh plan prod
输出结果:
sqlmesh plan prod
`prod` environment will be initialized
Models:
└── Added:
├── dim.dim_hosts_cleansed
├── dim.dim_listings_cleansed
├── dim.dim_listings_w_hosts
├── fct.reviews
├── mart.mart_fullmoon_reviews
├── seed.full_moon_dates
├── src.src_hosts
├── src.src_listings
└── src.src_reviews
Apply - Virtual Update [y/n]:
选择y,sqlmesh会自动创建生产环境所需的模型,SQLMesh不会像在开发环境中那样重新处理所有数据。相反,它重用在开发环境中创建的模型。有关SQLMesh虚拟环境的更多信息,请参阅此链接。
最后通过命令sqlmesh ui
启动web开发页面,图示如下: