【ETL】从理论到Python实践的数据处理

发布于:2025-02-21 ⋅ 阅读:(14) ⋅ 点赞:(0)

引言

ETL(Extract, Transform, Load)是一种数据处理过程,用于将数据从一个或多个源提取出来,进行清洗、转换和整合,然后加载到目标数据仓库或数据库中。ETL 是数据仓库和数据分析领域中不可或缺的一部分,广泛应用于企业数据集成、数据迁移和数据治理等场景。

一、ETL的基础与工作原理

ETL 的工作原理可以分为三个核心阶段:提取(Extract)、转换(Transform)和加载(Load)。

1. 提取(Extract)
  • 定义:从数据源中读取数据。

  • 功能:提取阶段的主要任务是从各种数据源中获取数据,并将其传输到中间存储区域(如暂存区或内存)

2. 转换(Transform)
  • 定义:对提取的数据进行清洗、转换和整合。

  • 功能

    • 数据清洗:去除重复数据、填补缺失值、纠正错误数据等。

    • 数据转换:将数据格式从一种格式转换为另一种格式,例如日期格式转换、数值单位转换等。

    • 数据整合:将来自多个数据源的数据进行合并,消除数据冗余,实现数据的一致性。

    • 数据聚合:对数据进行汇总和统计,例如计算总和、平均值等。

    • 数据映射:将数据字段映射到目标数据仓库的表结构中。

3. 加载(Load)
  • 定义:将转换后的数据加载到目标数据仓库中。

  • 功能

    • 数据插入:将数据插入目标数据仓库的表中。

    • 数据更新:如果目标数据仓库中已存在数据,则需要根据业务逻辑进行更新。

    • 数据删除:在某些情况下,可能需要删除目标数据仓库中不再需要的数据。

二、ETL过程中的工具选择与实际操作

提取数据的ETL流程可能会选择不同的工具来执行每一个步骤,下面来介绍几种常见的ETL工具,并具体解释每个步骤

1. 提取(Extract)

提取数据通常是从外部数据源(如数据库、API或文件)获取数据。以下是常见的工具和技术

SQL查询:对于关系型数据库(如MySQL、PostgreSQL)、通常使用SQL查询语句来提取数据

APIS:对于第三方服务的数据,可能需要调用API来提取数据(例如:Python的requests库)。

日志文件:使用文件读取工具(Python的pandas或csv库)来读取存储在日志中的数据

代码示例:从MySQL提取数据

import pymysql
import pandas as pd
from sqlalchemy import create_engine 

# 创建 SQLAlchemy 引擎
engine = create_engine('mysql+pymysql://root:root@localhost/homedo')
#创建sql
sql_query = """select account_id,order_id,order_date,sum(received_amount) as amount 
from dwd_trd_order_order 
where order_date >= '2024-05-01'
group by account_id,order_id,order_date;"""
# 使用 SQLAlchemy 引擎执行查询并将结果加载到 DataFrame 中
df = pd.read_sql(sql_query, engine)

# 关闭引擎连接(可选,因为 SQLAlchemy 会自动管理连接)
engine.dispose()

# 打印结果
print(df.head())
2. 转换(Transform)
  1. 转换是ETL流程中的核心步骤,涉及对数据的清洗、格式化和转换
  2. 数据清洗:去除重复项,处理缺失值、数据格式化
  3. 数据标准 :统一日期格式等
  4. 数据聚合计算:计算总销售、平均价格等

代码示例:

dfs = pd.DataFrame(df)
#假设df从数据库提取的数据
dfs['order_date'] = pd.to_datetime(df['order_date'], errors='coerce')
# 替换空值(NaT)为默认日期
default_date = pd.to_datetime('2025-01-01')
dfs['order_date'] = dfs['order_date'].fillna(default_date) 
print(df)
3. 加载(Load)

此处是加载到MySQL数据库中

df = pd.DataFrame(dfs)
#创建MySQL数据库连接
engine = create_engine('mysql+pymysql://root:root@localhost/homedo')
#将DataFrame加载到数据库中的指定表格
df.to_sql('dwd_index',engine,if_exists='replace',index=False)
print('数据加载成功!')

三、ETL操作流程:

1. 需求分析
  • 确定数据源和目标数据仓库。

  • 明确数据处理需求(如数据清洗、转换、实时性等)。

2. 工具选择
  • 根据需求选择合适的ETL工具。例如:

    • 如果需要强大的数据转换功能,可以选择Kettle。

    • 如果需要实时数据处理,可以选择Apache NiFi。

    • 如果需要简单易用的工具,可以选择ETLCloud。

3. 数据提取(Extract)
  • 配置数据源连接,从源系统中提取数据。

  • 使用ETL工具的连接器或适配器支持多种数据源。

4. 数据转换(Transform)
  • 清洗数据,去除重复、填补缺失值。

  • 转换数据格式,如日期格式、数值单位等。

  • 整合数据,消除冗余,实现一致性。

5. 数据加载(Load)
  • 将转换后的数据加载到目标数据仓库。

  • 根据需求选择批量加载或实时加载。

6. 监控与维护
  • 监控ETL流程的运行状态,确保数据的完整性和一致性。

  • 定期维护ETL任务,优化性能。