AWS Glue用Python Shell从Workday系统将人力资源原始数据以Parquet格式存入S3

发布于:2025-03-03 ⋅ 阅读:(20) ⋅ 点赞:(0)

使用AWS Glue Python Shell从Workday获取人力资源数据并存储到S3的详细方案:

通过以下步骤,即可实现从Workday到S3的自动化数据管道。实际部署时需根据Workday API的具体响应结构调整数据处理逻辑。

架构流程

Workday API -> AWS Glue Python Shell -> (Parquet) -> Amazon S3

详细步骤

一、前期准备
  1. 获取Workday API访问权限

    • 确认Workday REST/SOAP API端点
    • 获取API认证凭证(OAuth2令牌/Basic Auth/证书)
    • 确认所需人力资源数据的具体API路径(如/human_resources/employees
  2. 创建S3存储桶

    • 创建目标桶(如s3://my-hr-data-bucket/raw/
  3. 创建IAM角色

    • 创建Glue服务角色(如GlueHRDataRole
    • 附加策略:
      {
        "Version": "2012-10-17",
        "Statement": [
          {
            "Effect": "Allow",
            "Action": ["s3:PutObject","s3:GetObject"],
            "Resource": "arn:aws:s3:::my-hr-data-bucket/*"
          },
          {
            "Effect": "Allow",
            "Action": ["logs:CreateLogGroup","logs:CreateLogStream","logs:PutLogEvents"],
            "Resource": "*"
          }
        ]
      }
      

二、Python脚本开发
# glue_workday_to_s3.py
import requests
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import os
from datetime import datetime
import logging

# 初始化日志
logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Workday配置
WORKDAY_API_URL = "https://api.workday.com/v1/human_resources"
API_KEY = "your_workday_api_key"
HEADERS = {
    "Authorization": f"Bearer {API_KEY}",
    "Accept": "application/json"
}

# S3配置
S3_BUCKET = "my-hr-data-bucket"
S3_PREFIX = "raw/hr_data/"
FILE_NAME = f"hr_data_{datetime.today().strftime('%Y%m%d')}.parquet"

def fetch_paginated_data(url):
    """处理API分页"""
    all_data = []
    page = 1
    while True:
        response = requests.get(
            f"{url}?page={page}",
            headers=HEADERS
        )
        if response.status_code != 200:
            logger.error(f"API请求失败: {response.text}")
            break
        
        data = response.json()
        all_data.extend(data['items'])
        
        if data['has_more']:
            page +=1
        else:
            break
    return all_data

def main():
    try:
        # 获取数据
        hr_data = fetch_paginated_data(f"{WORKDAY_API_URL}/employees")
        logger.info(f"获取到 {len(hr_data)} 条记录")
        
        # 转换为DataFrame
        df = pd.DataFrame(hr_data)
        
        # 转换为Parquet
        table = pa.Table.from_pandas(df)
        output_path = f"/tmp/{FILE_NAME}"
        pq.write_table(table, output_path)
        
        # 上传到S3
        s3_client = boto3.client('s3')
        s3_client.upload_file(
            output_path,
            S3_BUCKET,
            f"{S3_PREFIX}{FILE_NAME}"
        )
        logger.info("数据成功写入S3")
        
    except Exception as e:
        logger.error(f"处理失败: {str(e)}")
        raise

if __name__ == "__main__":
    main()

三、Glue作业配置
  1. 创建Python Shell作业

    • 作业名称:workday-hr-data-ingestion
    • IAM角色:选择GlueHRDataRole
    • 类型:Python Shell
    • Python版本:Python 3.9
    • 数据处理单元:1/16 DPU
  2. 作业参数

    • 脚本路径:上传glue_workday_to_s3.py到S3并指定路径
    • 添加Python库依赖:
      --additional-python-modules pandas==1.5.3,pyarrow==12.0.1,requests==2.28.2
      
    • 环境变量(可选):
      --WORKDAY_API_KEY=your_actual_key
      
  3. 设置触发器

    • 按需运行 或 使用EventBridge定时触发(如每天凌晨1点)

四、验证数据
  1. 在S3目标路径检查Parquet文件:
    aws s3 ls s3://my-hr-data-bucket/raw/hr_data/
    
  2. 使用Athena验证数据:
    CREATE EXTERNAL TABLE hr_data (
      employee_id string,
      name string,
      department string,
      ...
    )
    STORED AS PARQUET
    LOCATION 's3://my-hr-data-bucket/raw/hr_data/';
    

注意事项

  1. 敏感信息管理
    • 建议将API密钥存储在AWS Secrets Manager,通过boto3动态获取
  2. 错误处理
    • 添加重试逻辑应对API限流
    • 使用Glue Job Bookmarks跟踪处理进度(可选)
  3. 性能优化
    • 若数据量极大(>1GB),建议改用Glue Spark作业
  4. 成本控制
    • Python Shell作业成本低于Spark作业,适合中等数据量