Airflow和PySPARK实现带多组参数和标签的Amazon Redshift数据仓库批量数据导出程序

发布于:2025-02-27 ⋅ 阅读:(17) ⋅ 点赞:(0)

设计一个基于多个带标签SQL模板作为配置文件和多组参数的PySPARK代码程序,实现根据不同的输入参数,用Airflow进行调度,自动批量地将Amazon Redshift数据仓库的数据导出为Parquet、CSV和Excel文件到S3上,标签和多个参数(以“_”分割)为组成导出数据文件名,文件已经存在则覆盖原始文件。PySpark程序需要异常处理,输出带时间戳和每个运行批次和每个导出文件作业运行状态的日志文件,每天单独一个带日期的和.log扩展名日志文件,放在logs子目录中,参数全部设置在json配置文件中。

PySpark解决方案,包含代码结构、配置文件和日志处理:

注意事项:

  1. 确保Spark集群有访问Redshift和S3的权限

  2. 根据实际环境调整Redshift JDBC驱动版本

  3. 测试不同文件格式的导出需求

  4. 监控S3临时目录的存储使用情况

  5. 定期清理日志文件和临时数据

  6. 根据数据量调整Spark资源配置

  7. 项目结构

redshift_exporter/
├── config/
│   └── config.json
├── sql_templates/
│   └── sales_report.sql
├── logs/
├── jobs/
│   └── redshift_exporter.py
└── airflow/
    └── dags/
        └── redshift_export_dag.py
  1. config.json 示例
{
  "redshift_conn": {
    "url": "jdbc:redshift://cluster:5439/db",
    "user": "user",
    "password": "password",
    "tempdir": "s3a://temp-bucket/redshift_temp"
  },
  "s3_output": "s3a://output-bucket/reports",
  "tasks": [
    {
      "label": "sales_report",
      "sql_template": "sql_templates/sales_report.sql",
      "parameters": ["${date}", "region1"],
      "formats": ["parquet", "csv", "xlsx"]
    }
  ]
}
  1. PySpark程序 (redshift_exporter.py)
import json
import logging
import os
import sys
from datetime import datetime
from pyspark.sql import SparkSession

class RedshiftExporter:
    def __init__(self, config_path):
        self.spark = SparkSession.builder \
            .appName("RedshiftExporter") \
            .config("spark.jars.packages", "com.amazon.redshift:redshift-jdbc42:2.1.0.9,org.apache.hadoop:hadoop-aws:3.3.1") \
            .getOrCreate()
        
        with open(config_path) as f:
            self.config = json.load(f)
        
        self.setup_logging()
        
    def setup_logging(self):
        log_dir = "logs"
        if not os.path.exists(log_dir):
            os.makedirs(log_dir)
        
        log_file = f"{log_dir}/{datetime.now().strftime('%Y-%m-%d')}.log"
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def read_sql_template(self, template_path):
        try:
            with open(template_path) as f:
                return f.read()
        except Exception as e:
            self.logger.error(f"Error reading SQL template: {str(e)}")
            raise

    def export_data(self, df, output_path, format_type):
        try:
            writer = df.write.mode("overwrite")
            if format_type == "parquet":
                writer.parquet(f"{output_path}.parquet")
            elif format_type == "csv":
                writer.option("header", "true").csv(f"{output_path}.csv")
            elif format_type == "xlsx":
                df.write.format("com.crealytics.spark.excel") \
                    .option("header", "true") \
                    .mode("overwrite") \
                    .save(f"{output_path}.xlsx")
            self.logger.info(f"Successfully exported {format_type} to {output_path}")
        except Exception as e:
            self.logger.error(f"Export failed for {format_type}: {str(e)}")
            raise

    def process_task(self, task, params):
        try:
            sql = self.read_sql_template(task["sql_template"])
            formatted_sql = sql.replace("${date}", params[0])
            
            df = self.spark.read \
                .format("com.databricks.spark.redshift") \
                .option("url", self.config["redshift_conn"]["url"]) \
                .option("query", formatted_sql) \
                .option("user", self.config["redshift_conn"]["user"]) \
                .option("password", self.config["redshift_conn"]["password"]) \
                .option("tempdir", self.config["redshift_conn"]["tempdir"]) \
                .load()
            
            filename = f"{task['label']}_{'_'.join(params)}"
            output_base = f"{self.config['s3_output']}/{filename}"
            
            for fmt in task["formats"]:
                self.export_data(df, output_base, fmt)
                
        except Exception as e:
            self.logger.error(f"Task {task['label']} failed: {str(e)}")
            raise

    def run(self, parameters):
        try:
            for task in self.config["tasks"]:
                self.logger.info(f"Processing task: {task['label']}")
                self.process_task(task, parameters)
        finally:
            self.spark.stop()

if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Usage: redshift_exporter.py <config_path> <parameters>")
        sys.exit(1)
    
    config_path = sys.argv[1]
    parameters = sys.argv[2].split("_") if len(sys.argv) > 2 else []
    
    exporter = RedshiftExporter(config_path)
    exporter.run(parameters)
  1. Airflow DAG 示例 (redshift_export_dag.py)
from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

default_args = {
    'owner': 'data_team',
    'start_date': datetime(2023, 1, 1),
    'retries': 1
}

with DAG('redshift_export',
         default_args=default_args,
         schedule_interval='@daily',
         catchup=False) as dag:

    export_task = BashOperator(
        task_id='run_redshift_export',
        bash_command='spark-submit --packages com.amazon.redshift:redshift-jdbc42:2.1.0.9,org.apache.hadoop:hadoop-aws:3.3.1,com.crealytics:spark-excel_2.12:3.3.1_0.18.5 /path/to/jobs/redshift_exporter.py /path/to/config/config.json "{{ ds_nodash }}_region1"'
    )
  1. 实现说明
  • 配置文件管理:使用JSON配置文件管理Redshift连接参数、输出位置和任务配置
  • 参数化处理:支持动态参数替换SQL模板中的变量
  • 多格式导出:支持Parquet、CSV和Excel格式导出
  • 日志记录:每天生成独立的日志文件,包含详细的时间戳和操作状态
  • 异常处理:完善的错误捕获和日志记录机制
  • Spark优化:使用Redshift的Spark连接器进行高效数据传输
  • 文件覆盖:使用Spark的overwrite模式处理已存在文件
  • 依赖管理:通过–packages参数管理Spark依赖

运行方式:

spark-submit --packages com.amazon.redshift:redshift-jdbc42:2.1.0.9,org.apache.hadoop:hadoop-aws:3.3.1,com.crealytics:spark-excel_2.12:3.3.1_0.18.5 jobs/redshift_exporter.py config/config.json "20231001_region1"