链接:https://github.com/lnynhi02/coffee-sales-data-pipeline
https://github.com/wszel/coffee-shop-chain-aws-data-academy
这个项目是一个咖啡销售数据管道
用于从数据源收集数据,进行数据处理和分析,并将结果存储在数据库中。
该项目使用了 Apache Airflow
作为工作流引擎,Kafka
作为消息队列,MySQL
作为数据库。
主要功能点
- 从数据源收集咖啡销售数据
- 使用 Apache Airflow 管理数据处理工作流
- 使用 Kafka 作为消息队列进行实时数据处理
- 将处理后的数据存储在 MySQL 数据库中
技术栈
- Apache Airflow
- Apache Kafka
- MySQL
- Python
为什么选择使用 Kafka 作为消息队列?
1. 高吞吐量与低延迟
- Kafka 每秒可处理百万级消息,适合咖啡销售场景(如促销活动时订单激增)
- 毫秒级延迟保证实时数据处理能力,满足销售数据分析的时效性需求
2. 持久化与可靠性
- 消息持久化存储(默认保留7天),避免数据丢失风险
- 副本机制确保节点故障时数据不中断,保障交易完整性
3. 水平扩展能力
- 通过增加 Broker 节点实现无缝扩容
- 分区机制允许并行消费,适应业务规模增长
4. 生态集成优势
- 与 Airflow 工作流无缝协作(通过 KafkaOperator)
- 支持多种数据格式(JSON/Avro),兼容 MySQL 数据存储
提供 Connect API 简化数据源对接
5. 实时流处理支持
- Kafka Streams
可直接处理实时数据流
- 与 Flink/Spark Streaming 集成,为后续销售预测等场景预留扩展空间
在该项目中,Kafka 作为数据缓冲层,有效解耦了数据采集(销售终端)与数据处理(Airflow ETL),同时为实时看板、库存预警等场景提供实时数据支撑。其分布式架构设计特别适合需要处理突发流量(如节日促销)的零售业务场景。
咖啡销售数据管道
一个咖啡销售数据的综合性数据管道。
- 包含由
Airflow
编排的批处理管道,可从MySQL提取数据, - 使用
Spark
进行转换处理,将数据存储于数据湖 MinIO
的不同数据层(Bronze/Silver/Gold), - 通过自动化数据质量检查确保数据质量。
- 同时配备基于
Kafka
和Redis
的实时流处理系统,可即时响应新订单,
支持实时商品推荐等功能。 - 整个系统通过
Docker Compose
进行定义和管理。
架构
章节目录
- MySQL数据库(源系统)
- Spark作业(数据处理)
- MinIO存储(数据湖)
- 数据分层(Bronze/Silver/Gold)
- Airflow DAG(批处理编排)
- 数据质量检查
- Kafka消息(实时流)
- Redis缓存/存储
- Docker Compose环境
第一章:MySQL数据库(源系统)
欢迎进入咖啡销售数据管道之旅的第一章!
每个数据管道都需要一个起点——原始数据最初存储的位置。在我们的项目中,这个主要起点之一就是**MySQL数据库**。
想象我们的咖啡店场景。当顾客购买一杯美味的拿铁或一袋咖啡豆时,该交易需要被记录在某个地方。这个"地方"通常是连接
销售点(POS)系统的数据库
。我们的MySQL数据库就如同咖啡店的数字记录员。可以将其想象成一个大型有序的文件柜,每个销售记录、产品详情、门店位置和支付方式都被整齐地存储在
不同的文件夹
(我们称之为表)中。
为何称之为"源系统"?
- 在数据管道领域,"源系统"即数据产生的源头。
- 它是我们抽取数据进行处理和分析的来源。
- 我们的
MySQL数据库
是所有交易数据(如订单)和重要查询信息(如产品列表或门店地址)的主要来源。
存储哪些类型的数据?
我们的MySQL数据库保存着咖啡店运营的基础信息。根据项目代码,它包含以下表结构:
orders
: 每笔交易的详细信息(购买者、时间、地点、支付方式)order_details
: 每笔订单包含的具体商品、数量、价格等stores
: 各实体咖啡门店的位置信息products
: 咖啡及其他销售商品的详情product_category
: 商品分类如’咖啡’、‘烘焙食品’、‘周边商品’payment_method
: 支付方式(银行卡、现金等)diamond_customers
: 高价值客户信息(查询表)
这些表由项目中的脚本定义创建(scripts/database/create_table.py
)。以下代码片段展示了表的创建逻辑:
# 来源:scripts/database/create_table.py
TABLES = {}
TABLES['stores'] = (
"CREATE TABLE `stores` (" \
" `id` int," \
" `name` varchar(20) NOT NULL,"
# ... 其他字段 ...
" `updated_at` DATETIME," \
" PRIMARY KEY (`id`)" \
") ENGINE=InnoDB"
)
TABLES['orders'] = (
"CREATE TABLE `orders` (" \
" `id` varchar(250) NOT NULL," \
" `timestamp` datetime NOT NULL," \
# ... 其他字段 ...
" PRIMARY KEY (`id`)," \
# ... 外键约束 ...
") ENGINE=InnoDB"
)
# ... 其他表定义 ...
def create_table(cursor):
"""在MySQL数据库中创建表"""
for table_name in TABLES:
table_description = TABLES[table_name]
try:
print(f"创建表 {table_name}: ", end='')
cursor.execute(table_description)
except:
# ... 错误处理 ...
pass
else:
print("成功")
这个Python脚本包含构建我们"文件柜"的指令(SQL CREATE TABLE
语句),定义了每个文件夹(表)的结构和每个信息单元(字段)的命名规范。
管道如何与源系统交互?
数据管道通过以下方式与MySQL数据库交互:
- 初始数据加载:通过脚本(
scripts/database/load_static_file.py
)从CSV文件加载静态信息(如门店、商品、支付方式)到MySQL,填充查询表 - 实时写入模拟:脚本(
scripts/database/generate_data.py
)模拟POS系统持续向orders
和order_details
表写入新订单数据,保持源数据更新
- 批量读取:主批处理作业(使用Spark)连接MySQL读取表数据,这是数据流出源系统进入管道处理的主要方式
以下代码片段展示了批处理脚本如何连接读取MySQL表:
# 来源:scripts/batch/bronze_dimension_fact_load.py
def read_mysql_table(spark: SparkSession, table: str):
host = os.getenv("MYSQL_HOST")
user = os.getenv("MYSQL_USER")
password = os.getenv("MYSQL_PASSWORD")
database = os.getenv("MYSQL_DATABASE")
return spark.read \
.format("jdbc") \
.option("url", f"jdbc:mysql://{host}:3306/{database}?user={user}&password={password}") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("dbtable", table) \
.load()
# ... 后续脚本内容 ...
def incremental_load_orders(spark: SparkSession) -> None:
# ... 日志记录 ...
orders_df = read_mysql_table(spark, "orders") # <<< 读取orders表
details_df = read_mysql_table(spark, "order_details") # <<< 读取order_details表
# ... 后续处理流程 ...
这段代码展示了数据处理引擎(Spark)如何使用read_mysql_table
函数:
- 通过指定数据库地址(
host
)、凭证信息和表名 - 使用
.load()
命令将表数据载入Spark可处理的DataFrame结构。
数据流出流程(高层抽象)
以下是数据从MySQL源系统进入批处理管道的简化流程:
总结
MySQL数据库是我们批处理数据管道的核心起点,它以结构化方式(通过表)存储所有咖啡店交易数据和静态信息,作为数据管道提取原始数据的"源系统"。
下一章我们将探索数据处理环节。
第二章:Spark Jobs(数据处理)
在第一章:MySQL数据库(源系统)中,我们了解到旅程始于MySQL数据库——这个
如同咖啡店文件柜般存储原始销售数据的系统
。但原始数据无法直接生成精美报表或洞察结论,需要经过清洗、关联和结构化处理。这正是本章核心概念Spark Jobs的用武之地。
管道中的Spark作业是什么?
若将数据管道比作工厂,MySQL数据库是原材料(数据)仓库,那么Spark作业就是车间里的工人和加工设备。
它们接收原始数据,通过多道工序生产精炼材料或成品
。
在我们的咖啡销售项目中,"Spark作业"本质上是使用Apache Spark库的Python脚本,主要承担以下数据处理重任:
- 从源系统(如MySQL)或存储层(MinIO存储)
读取数据
清洗脏数据
(如去除多余字符)关联不同表
数据(如订单与商品详情匹配)转换数据
(如计算总额或创建新字段)- 将处理结果
写入数据湖
(MinIO存储)的下一阶段
管道中的每个批处理步骤(后续由Airflow DAG编排)通常对应独立的Spark作业脚本,专精于特定数据处理层级或任务。
为何选择Spark?
为何不使用普通Python脚本或数据库查询?
- 大数据处理:Spark专为跨机器集群(“集群”)处理海量数据设计。尽管当前示例数据量较小,真实场景常需处理单机无法承载的巨型数据集
- 高效性:Spark
优先内存计算,避免反复磁盘I/O带来的性能损耗
- Python友好:通过
pyspark
库,数据从业者可用熟悉的Python语言编写强大处理逻辑
类比理解:小型数据集用电子表格,中型用数据库查询,海量数据则需Spark这类工业级工具。
Spark引擎初始化
Spark作业执行前需建立与Spark处理引擎的连接并配置数据源访问,通过创建SparkSession
实现。
以下代码片段来自批处理脚本(scripts/batch/bronze_dimension_fact_load.py
):
# 来源:scripts/batch/bronze_dimension_fact_load.py
from pyspark.sql import SparkSession
def create_SparkSession() -> SparkSession:
return SparkSession.builder \
.appName("数据摄取 - MySQL到Minio") \
.config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
.config("spark.hadoop.fs.s3a.access.key", "minioadmin") \
.config("spark.hadoop.fs.s3a.secret.key", "minioadmin") \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.hadoop.fs.s3a.S3AFileSystem") \
# ... 其他配置 ...
.getOrCreate()
# 脚本主流程调用:
# spark = create_SparkSession()
create_SparkSession
函数如同设置Spark工厂的控制面板:
appName(...)
:命名作业(便于监控).config(...)
:配置关键参数(如连接MinIO存储的S3兼容设置).getOrCreate()
:复用现有会话或创建新会话
获取spark
对象后,即可指挥Spark处理数据。
Spark Jobs核心任务
通过项目脚本片段解析Spark作业的五大核心任务:
1. 数据读取
从源系统或前序处理阶段读取数据
从MySQL读取:
# 来源:scripts/batch/bronze_dimension_fact_load.py
def read_mysql_table(spark: SparkSession, table: str):
host = os.getenv("MYSQL_HOST") # 从环境变量获取连接信息
user = os.getenv("MYSQL_USER")
password = os.getenv("MYSQL_PASSWORD")
database = os.getenv("MYSQL_DATABASE")
return spark.read \
.format("jdbc") \
.option("url", f"jdbc:mysql://{host}:3306/{database}?user={user}&password={password}") \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("dbtable", table) \
.load()
# 调用示例:
# orders_df = read_mysql_table(spark, "orders")
通过·JDBC连接器读取MySQL指定表数据
生成Spark DataFrame(内存表结构)。
从MinIO读取:
# 来源:scripts/batch/silver_dimensions.py
def read_bronze_layer(spark, table):
# 示例路径:s3a://bronze-layer/brz.stores
return spark.read.parquet(f"s3a://bronze-layer/{table}")
# 调用示例:
# source_df = read_bronze_layer(spark, table="brz.stores")
从MinIO的"bronze-layer"存储桶读取Parquet格式文件
,s3a://
前缀触发S3兼容连接器。
2. 数据清洗
处理原始数据中的异常或错误
# 来源:scripts/batch/silver_dimensions.py (cleaned_stores函数)
# source_df读取自brz.stores
cleaned_df = source_df.withColumn("city_cleaned", expr("regexp_replace(city, '\\\\r$', '')"))
# ... 后续筛选输出列 ...
withColumn
创建新列city_cleaned
,使用正则表达式移除city
字段末尾的\r
字符。
3. 数据关联
整合多源数据
# 来源:scripts/batch/silver_dimensions.py (cleand_products函数)
# product读取自brz.products
# product_category读取自brz.product_category
join_df = product.join(
product_category,
product["category_id"] == product_category["id"],
how="left"
)
# ... 筛选关联结果特定列 ...
基于category_id
和id
字段左连接商品表与分类表,保留所有商品并补充分类信息。
4. 数据转换
计算新值或改变数据结构
# 来源:scripts/batch/bronze_dimension_fact_load.py (incremental_load_orders函数)
# new_orders是包含新订单的DataFrame
enriched_orders = new_orders.withColumn("year", year("timestamp")) \
.withColumn("month", month("timestamp")) \
.withColumn("day", dayofmonth("timestamp"))
从timestamp
字段提取年、月、日生成新列,便于后续分区查询。
5. 数据写入
将处理结果写入下一阶段
# 来源:scripts/batch/silver_dimensions.py (cleand_stores函数)
# output_df是清洗后的DataFrame
output_df.write.mode("overwrite").parquet(f"{silver_path}/{table}")
.mode("overwrite")
:覆盖写入模式.parquet(...)
:指定Parquet格式及MinIO存储路径(如s3a://silver-layer/slv.stores
)
分区写入示例:
# 来源:scripts/batch/bronze_dimension_fact_load.py (incremental_load_orders函数)
# enriched_orders包含年、月、日字段
enriched_orders.write.partitionBy("year", "month", "day").mode("append").parquet(orders_path)
partitionBy
按年月日分区存储,提升日期范围查询效率。
Spark Jobs执行流程(高层抽象)
Python脚本作为驱动器指挥SparkSession
执行步骤
SparkSession
将任务分发至集群资源(本项目中通过Docker Compose
模拟)。
实际数据处理在Spark引擎内完成,读写操作面向MinIO存储
或MySQL等数据源。
代码结构范式
Spark Jobs脚本遵循统一范式:
- 导入依赖库(
pyspark.sql
、os
、logging
等)- 配置日志系统
- 定义
create_SparkSession()
函数- 定义数据读取辅助函数(
read_bronze_layer
、read_mysql_table
等)- 定义核心转换/加载函数(如
cleand_stores
、incremental_load_orders
)main()
函数编排流程:
- 调用
create_SparkSession()
- 按序执行转换/加载函数
- 最终调用
spark.stop()
if __name__ == "__main__": main()
入口
该结构可见于scripts/batch/silver_dimensions.py
或scripts/batch/gold_fact_orders.py
等文件。
总结
Spark Jobs是数据管道的核心处理器,通过Python脚本驱动Apache Spark高效执行数据读取、清洗、转换、关联和写入任务。
它们将数据从源头(MySQL数据库)或中间存储(MinIO数据湖)转化为可供分析的形态,逐层推进数据精炼过程。
下一章我们将探索处理结果的存储之地——数据湖
!