Python Flask中启用AWS Secrets Manager+AWS Parameter Store配置中心

发布于:2025-06-05 ⋅ 阅读:(19) ⋅ 点赞:(0)

问题

最近需要改造一个Python的Flask项目。需要在这个项目中添加AWS Secrets Manager作为配置中心,主要是数据库相关配置。

前提

得预先在Amazon RDS里面新建好数据库用户和数据库,以AWS Aurora为例子,建库和建用户语句类似如下:

create database hf_gc default character set utf8mb4 collate utf8mb4_unicode_ci;
create user 'hf_gc'@'%' identified by '3pbP0EMfZcOusvaawfaOv4U';
grant all privileges on hf_gc.* to 'hf_gc'@'%';
flush privileges;

Secrets Manager

新建轮转密钥

开始新建Aurora密钥,如下图:
Aurora
设置密钥名称为/dev/flask-rds,如下图:
设置密钥名称
启用自动轮转密码,并设置相关策略,如下图:
轮换策略
最后审计一下,没问题就直接创建,如下图:
创建审计

Parameter Store

设置数据库主机参数,如下图:
设置数据库主机参数
设置Flask Web应用程序使用的轮转密钥参数,如下图:
设置localhost环境使用dev的数据库

Flask

FLASK_PROFILE自定义环境变量

设置自定义环境变量:FLASK_PROFILE。内容如下:

export FLASK_PROFILE="dev"

这里需要以FLASK_开头的,进行环境变量命名的。
PyCharm设置如下图:
IDE中设置环境变量
在Flask中读取环境变量,类似如下方式:

app.config.from_prefixed_env()
app.config["FLASK_PROFILE"]  # Is "dev"

安装boto3 SDK

pip install boto3
pip install botocore

secrets_manager.py

aws secrets manager客户端工具类,这里主要就是使用boto3来访问AWS 的secrets manager服务,然后取secrets manager服务中配置/dev/flask-rds参数。还使用lru_cache缓存管理注解,在使用的时候,需要进行缓存处理。内容如下:

import json
import sys
sys.path.append('./')
from typing import Dict, Any

import boto3
from botocore.exceptions import ClientError
from functools import lru_cache
from flask import current_app


class SecretsManagerClient:
    def __init__(self, region_name: str = 'cn-north-1'):
        """
        初始化客户端
        :param region_name: AWS区域,默认为中国区北京
        """
        self.session = boto3.session.Session()
        self.client = self.session.client(
            service_name='secretsmanager',
            region_name=region_name
        )

    @lru_cache(maxsize=32)  # 缓存最多32个不同的secret
    def get_secret(self, secret_name: str) -> Dict[str, Any]:
        """
        获取Secret的值(带缓存)
        :param secret_name: Secret的完整路径或名称
        :return: 解析后的Secret字典
        :raises: ClientError 当获取失败时抛出
        """
        try:
            current_app.logger.debug(f"Fetching secret: {secret_name}")
            response = self.client.get_secret_value(SecretId=secret_name)

            if 'SecretString' in response:
                secret = response['SecretString']
                return json.loads(secret)
            else:
                raise ValueError("Binary secrets are not supported")

        except ClientError as e:
            current_app.logger.error(f"Failed to get secret {secret_name}: {str(e)}")
            raise
        except json.JSONDecodeError as e:
            current_app.logger.error(f"Failed to parse secret JSON: {str(e)}")
            raise ValueError("Invalid secret JSON format")

    def clear_cache(self, secret_name: str = None):
        """
        清除缓存
        :param secret_name: 指定清除某个secret的缓存,None表示清除全部
        """
        if secret_name:
            self.get_secret.cache_clear(secret_name)
        else:
            self.get_secret.cache_clear()
        current_app.logger.debug(f"Cleared cache for secret: {secret_name or 'all'}")

# 创建全局客户端实例(单例模式)
client = SecretsManagerClient()

使用方式,类似如下代码:

from configs.secrets_manager import client
client.clear_cache("/dev/flask-rds")  # 清除缓存
secret = client.get_secret("/dev/flask-rds")
user = secret['username']  # 读取用户名
passwd = secret['password']

parameter_store.py

parameter store工具类内容如下:

import json
import sys
sys.path.append('./')
from typing import Dict, Any

import boto3
from botocore.exceptions import ClientError
from functools import lru_cache
from flask import current_app


class SecretsManagerClient:
    def __init__(self, region_name: str = 'cn-north-1'):
        """
        初始化客户端
        :param region_name: AWS区域,默认为中国区北京
        """
        self.session = boto3.session.Session()
        self.client = self.session.client(
            service_name='secretsmanager',
            region_name=region_name
        )

    @lru_cache(maxsize=32)  # 缓存最多32个不同的secret
    def get_secret(self, secret_name: str) -> Dict[str, Any]:
        """
        获取Secret的值(带缓存)
        :param secret_name: Secret的完整路径或名称
        :return: 解析后的Secret字典
        :raises: ClientError 当获取失败时抛出
        """
        try:
            current_app.logger.debug(f"Fetching secret: {secret_name}")
            response = self.client.get_secret_value(SecretId=secret_name)

            if 'SecretString' in response:
                secret = response['SecretString']
                return json.loads(secret)
            else:
                raise ValueError("Binary secrets are not supported")

        except ClientError as e:
            current_app.logger.error(f"Failed to get secret {secret_name}: {str(e)}")
            raise
        except json.JSONDecodeError as e:
            current_app.logger.error(f"Failed to parse secret JSON: {str(e)}")
            raise ValueError("Invalid secret JSON format")

    def clear_cache(self, secret_name: str = None):
        """
        清除缓存
        :param secret_name: 指定清除某个secret的缓存,None表示清除全部
        """
        if secret_name:
            self.get_secret.cache_clear(secret_name)
        else:
            self.get_secret.cache_clear()
        current_app.logger.debug(f"Cleared cache for secret: {secret_name or 'all'}")

# 创建全局客户端实例(单例模式)
client = SecretsManagerClient()

使用方式,类似如下代码:

from parameter_store import ssmClient
ssmClient.clear_cache("/local_flask/db/secret_name")  # 清除缓存
secret_name = ssmClient.get_parameter("/local_flask/db/secret_name")

数据库场景测试

在Flask Web应用程序中,获取数据库连接类,类似内容如下:

# -*- coding: utf-8 -*-

import sys
sys.path.append('./')
import pymysql
from secrets_manager import client
from parameter_store import ssmClient

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from flask import current_app

# 数据库连接
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=10),
    retry=(retry_if_exception_type(pymysql.OperationalError) |
           retry_if_exception_type(pymysql.InternalError)))
def get_conn_sqldb():
    conn = None
    secret_name_title = "/{}_flask/db/secret_name".format(current_app.config['PROFILE'])
    secret_name = ssmClient.get_parameter(secret_name_title)
    db_host_title = "/{}_flask/db/host".format(current_app.config['PROFILE'])
    try:
        secret = client.get_secret(secret_name)
        host = ssmClient.get_parameter(db_host_title)
        port = secret['port']  # 端口号
        user = secret['username']  # 用户名
        passwd = secret['password']
        db = 'xxx_database'  # 库名
        current_app.logger.debug(f"Connecting to {host}:{port}")
        conn = pymysql.connect(host=host, port=port, user=user,passwd=passwd, db=db, charset='utf8mb4', client_flag=pymysql.constants.CLIENT.MULTI_STATEMENTS)
        return conn
    except pymysql.OperationalError as e:
        if conn:
            conn.close()
        if e.args[0] == 1045:  # Access denied
            current_app.logger.warning("Authentication failed, refreshing secret...")
            client.clear_cache(secret_name)  # 清除缓存
            ssmClient.clear_cache(secret_name_title)  # 清除缓存
            ssmClient.clear_cache(db_host_title)  # 清除缓存
            raise
        raise
    except Exception as e:
        if conn:
            conn.close()
        current_app.logger.error(f"Unexpected error: {str(e)}")
        raise

这里使用了tenacity库的@retry注解,进行数据库连接失败时重试策略;还使用了current_app.config['PROFILE']方式来获取Flask环境变量配置;还针对AWS RDS响应1045编码做了清除缓存处理。这里主要是先从parameters store中获取secret名称后,再拿个这个值,去secrets manager换RDS轮换密钥数据库密码,数据库端口,数据库用户名。

总结

现在这个点(2025年),Flask在Python里面的地位。貌似没有Spring生态在java中Web地位那么强势和全面。AWS这两个服务可以统一都通过boto3 SDK访问,比以前强多了,以前parameters store和secrets manager这个两个服务,客户端得安装两个SDK才能读取数据,有点费劲,这一点比以前强多了。这里直接使用Python自带的@lru_cache注解感觉挺香。重试库Tenacity也挺香的。

参考


网站公告

今日签到

点亮在社区的每一天
去签到