该方案通过无服务架构实现高可扩展性,结合分页查询和批量更新确保高效处理海量数据,同时通过密钥托管和错误重试机制保障安全性及可靠性。
一、技术栈
组件 | 技术选型 | 说明 |
---|---|---|
计算层 | AWS Lambda | 无服务器执行,适合事件驱动、按需处理,成本低 |
数据存储 | Amazon Redshift | 存储原始地址数据及验证结果 |
API调用 | 高德地理编码API | 提供地址标准化及验证能力 |
开发语言 | Python 3.9+ | 使用requests 处理HTTP请求,psycopg2 连接Redshift |
密钥管理 | AWS Secrets Manager | 安全存储高德API Key和Redshift凭证 |
任务调度 | Amazon EventBridge | 定时触发Lambda执行验证任务 |
日志监控 | Amazon CloudWatch | 记录运行日志及监控错误 |
二、实现流程
三、关键代码实现
1. 获取密钥 & 连接Redshift
import psycopg2
import boto3
import json
import os
from botocore.exceptions import ClientError
def get_secret(secret_name):
client = boto3.client('secretsmanager')
try:
response = client.get_secret_value(SecretId=secret_name)
return json.loads(response['SecretString'])
except ClientError as e:
raise e
def connect_redshift():
secret = get_secret('prod/Redshift')
conn = psycopg2.connect(
host=secret['host'],
port=secret['port'],
dbname=secret['database'],
user=secret['username'],
password=secret['password']
)
return conn
2. 高德API验证函数(含重试)
import requests
import time
def validate_gaode(address, api_key, max_retries=3):
url = "https://restapi.amap.com/v3/geocode/geo"
params = {'address': address, 'key': api_key}
for attempt in range(max_retries):
try:
resp = requests.get(url, params=params, timeout=5)
data = resp.json()
if data.get('status') == '1' and len(data.get('geocodes', [])) > 0:
return True, data['geocodes'][0]['location']
else:
return False, data.get('info', 'Unknown error')
except (requests.Timeout, requests.ConnectionError):
if attempt == max_retries - 1:
return False, 'API Timeout'
time.sleep(2**attempt)
3. 批量更新Redshift
def batch_update(conn, records):
sql = """
UPDATE address_table
SET is_valid = %s,
geo_location = %s,
last_checked = CURRENT_DATE
WHERE address_id = %s
"""
with conn.cursor() as cur:
cur.executemany(sql, records)
conn.commit()
4. Lambda主处理逻辑
def lambda_handler(event, context):
# 初始化
gaode_key = get_secret('prod/GaodeAPI')['key']
conn = connect_redshift()
# 分页查询未验证地址
page_size = 500
cursor = conn.cursor(name='server_side_cursor')
cursor.execute("""
SELECT address_id, raw_address
FROM address_table
WHERE last_checked IS NULL
ORDER BY address_id
""")
# 分批处理
while True:
batch = cursor.fetchmany(page_size)
if not batch:
break
update_records = []
for addr_id, raw_addr in batch:
is_valid, location = validate_gaode(raw_addr, gaode_key)
update_records.append( (is_valid, location, addr_id) )
# 批量提交更新
batch_update(conn, update_records)
# 清理资源
cursor.close()
conn.close()
return {'statusCode': 200, 'processed': sum(len(batch) for batch in update_records)}
四、优化策略
并发控制
- 使用
concurrent.futures.ThreadPoolExecutor
实现并行API调用(注意高德QPS限制)
from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(validate_gaode, addr, gaode_key) for addr in batch] results = [f.result() for f in futures]
- 使用
增量处理
- 使用
last_checked
字段避免重复验证 - 添加索引加速查询:
CREATE INDEX idx_last_checked ON address_table(last_checked)
- 使用
容错机制
- 死信队列(DLQ)处理失败记录
- 在Redshift中增加
error_reason
字段记录详细错误
五、部署配置
Lambda配置
- 内存:1024MB(根据批处理量调整)
- 超时:15分钟
- 环境变量:
SECRET_NAME_REDSHIFT = "prod/Redshift" SECRET_NAME_GAODE = "prod/GaodeAPI"
IAM权限
secretsmanager:GetSecretValue
redshift-data:ExecuteStatement
logs:CreateLogGroup
,logs:CreateLogStream
,logs:PutLogEvents
六、监控指标
CloudWatch仪表盘
AddressValidation.SuccessCount
(自定义指标)API.Latency
(P95/P99)Redshift.UpdateErrors
告警配置
- API失败率 > 5% 持续5分钟
- Lambda错误次数 > 10次/小时
- 积压未处理地址 > 10,000条