使用AWS Glue Python Shell从Workday获取人力资源数据并存储到S3的详细方案:
通过以下步骤,即可实现从Workday到S3的自动化数据管道。实际部署时需根据Workday API的具体响应结构调整数据处理逻辑。
架构流程
Workday API -> AWS Glue Python Shell -> (Parquet) -> Amazon S3
详细步骤
一、前期准备
获取Workday API访问权限
- 确认Workday REST/SOAP API端点
- 获取API认证凭证(OAuth2令牌/Basic Auth/证书)
- 确认所需人力资源数据的具体API路径(如
/human_resources/employees
)
创建S3存储桶
- 创建目标桶(如
s3://my-hr-data-bucket/raw/
)
- 创建目标桶(如
创建IAM角色
- 创建Glue服务角色(如
GlueHRDataRole
) - 附加策略:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": ["s3:PutObject","s3:GetObject"], "Resource": "arn:aws:s3:::my-hr-data-bucket/*" }, { "Effect": "Allow", "Action": ["logs:CreateLogGroup","logs:CreateLogStream","logs:PutLogEvents"], "Resource": "*" } ] }
- 创建Glue服务角色(如
二、Python脚本开发
# glue_workday_to_s3.py
import requests
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import os
from datetime import datetime
import logging
# 初始化日志
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Workday配置
WORKDAY_API_URL = "https://api.workday.com/v1/human_resources"
API_KEY = "your_workday_api_key"
HEADERS = {
"Authorization": f"Bearer {API_KEY}",
"Accept": "application/json"
}
# S3配置
S3_BUCKET = "my-hr-data-bucket"
S3_PREFIX = "raw/hr_data/"
FILE_NAME = f"hr_data_{datetime.today().strftime('%Y%m%d')}.parquet"
def fetch_paginated_data(url):
"""处理API分页"""
all_data = []
page = 1
while True:
response = requests.get(
f"{url}?page={page}",
headers=HEADERS
)
if response.status_code != 200:
logger.error(f"API请求失败: {response.text}")
break
data = response.json()
all_data.extend(data['items'])
if data['has_more']:
page +=1
else:
break
return all_data
def main():
try:
# 获取数据
hr_data = fetch_paginated_data(f"{WORKDAY_API_URL}/employees")
logger.info(f"获取到 {len(hr_data)} 条记录")
# 转换为DataFrame
df = pd.DataFrame(hr_data)
# 转换为Parquet
table = pa.Table.from_pandas(df)
output_path = f"/tmp/{FILE_NAME}"
pq.write_table(table, output_path)
# 上传到S3
s3_client = boto3.client('s3')
s3_client.upload_file(
output_path,
S3_BUCKET,
f"{S3_PREFIX}{FILE_NAME}"
)
logger.info("数据成功写入S3")
except Exception as e:
logger.error(f"处理失败: {str(e)}")
raise
if __name__ == "__main__":
main()
三、Glue作业配置
创建Python Shell作业
- 作业名称:
workday-hr-data-ingestion
- IAM角色:选择
GlueHRDataRole
- 类型:Python Shell
- Python版本:Python 3.9
- 数据处理单元:1/16 DPU
- 作业名称:
作业参数
- 脚本路径:上传
glue_workday_to_s3.py
到S3并指定路径 - 添加Python库依赖:
--additional-python-modules pandas==1.5.3,pyarrow==12.0.1,requests==2.28.2
- 环境变量(可选):
--WORKDAY_API_KEY=your_actual_key
- 脚本路径:上传
设置触发器
- 按需运行 或 使用EventBridge定时触发(如每天凌晨1点)
四、验证数据
- 在S3目标路径检查Parquet文件:
aws s3 ls s3://my-hr-data-bucket/raw/hr_data/
- 使用Athena验证数据:
CREATE EXTERNAL TABLE hr_data ( employee_id string, name string, department string, ... ) STORED AS PARQUET LOCATION 's3://my-hr-data-bucket/raw/hr_data/';
注意事项
- 敏感信息管理
- 建议将API密钥存储在AWS Secrets Manager,通过boto3动态获取
- 错误处理
- 添加重试逻辑应对API限流
- 使用Glue Job Bookmarks跟踪处理进度(可选)
- 性能优化
- 若数据量极大(>1GB),建议改用Glue Spark作业
- 成本控制
- Python Shell作业成本低于Spark作业,适合中等数据量