在 Sanic 应用中使用内存缓存管理 IP 黑名单

发布于:2024-12-23 ⋅ 阅读:(62) ⋅ 点赞:(0)

[外链图片转存中…(img-Pm0K9mzd-1734859380698)]

在现代 web 应用中,保护 API 接口免受恶意请求的攻击至关重要。IP 黑名单是一种常见的安全措施,可以有效阻止某些 IP 地址的访问。本文将介绍如何在 Python 的 Sanic 框架中实现 IP 黑名单功能,并结合内存缓存提升性能。

1. 环境准备

首先,确保你已经安装了 Sanic 和必要的数据库驱动程序。我们将使用 aiomysql 作为 MySQL 的异步驱动。可以使用以下命令进行安装:

pip install sanic aiomysql

2. 创建数据库表

我们需要一个数据库表来存储黑名单 IP。可以使用以下 SQL 语句创建表:

CREATE TABLE api_ip_blacklist (
    id INT AUTO_INCREMENT PRIMARY KEY COMMENT '唯一标识',
    ip_address VARCHAR(45) NOT NULL COMMENT '被拦截的 IP 地址,支持 IPv4 和 IPv6',
    status TINYINT DEFAULT 1 COMMENT '状态,1 表示有效(黑名单),0 表示无效(可用)',
    reason VARCHAR(255) DEFAULT NULL COMMENT '记录添加到黑名单的原因',
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT '记录创建时间',
    update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '记录最后更新时间',
    UNIQUE KEY (ip_address) COMMENT '确保 IP 地址唯一'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='API 接口黑名单 IP 表';

3. 实现内存缓存

我们可以创建一个 CacheObject 类来管理内存中的黑名单 IP 缓存。以下是该类的实现:

class CacheObject(object):
    def __init__(self):
        self.data = dict()

    def put(self, key, value):
        """将 IP 地址放入缓存"""
        self.data[key] = value

    def get(self, key):
        """从缓存中获取 IP 地址的状态"""
        return self.data.get(key)

    def load_blacklist(self, ip_list):
        """从数据库加载黑名单 IP 到缓存"""
        for ip in ip_list:
            self.put(ip, 1)  # 1 表示有效(黑名单)

    def clear(self):
        """清空缓存"""
        self.data.clear()

4. 集成 Sanic 应用

接下来,我们将 CacheObject 集成到 Sanic 应用中,实现动态加载和检查黑名单功能。

from sanic import Sanic, response
from sanic.exceptions import Forbidden
import aiomysql
import asyncio

app = Sanic("IPBlacklistApp")

# 创建 CacheObject 实例
blacklist_cache = CacheObject()

# 数据库配置
DB_CONFIG = {
    'host': 'localhost',
    'port': 3306,
    'user': 'your_username',
    'password': 'your_password',
    'db': 'your_database'
}

async def load_blacklist():
    async with aiomysql.connect(**DB_CONFIG) as conn:
        async with conn.cursor() as cursor:
            await cursor.execute("SELECT ip_address FROM api_ip_blacklist WHERE status = 1")
            rows = await cursor.fetchall()
            blacklist_cache.load_blacklist([row[0] for row in rows])  # 加载黑名单到缓存

async def update_blacklist():
    while True:
        await load_blacklist()
        await asyncio.sleep(60)  # 每60秒更新一次黑名单

@app.listener('before_server_start')
async def setup_db(app, loop):
    await load_blacklist()  # 启动时加载黑名单到缓存
    app.add_task(update_blacklist())  # 定时更新黑名单

@app.middleware("request")
async def block_blacklisted_ips(request):
    client_ip = request.ip
    if blacklist_cache.get(client_ip) == 1:  # 检查 IP 是否在黑名单中
        raise Forbidden("Your IP address is blocked.")

@app.route("/")
async def index(request):
    return response.json({"message": "Welcome to the API!"})

@app.route("/data")
async def data(request):
    return response.json({"data": "Here is your data!"})

if __name__ == "__main__":
    app.run(host="0.0.0.0", port=8000)

5. 代码解析

  • 数据库连接:使用 aiomysql 连接到 MySQL 数据库并查询黑名单 IP。
  • 内存缓存:通过 CacheObject 类将黑名单 IP 缓存到内存中,提高查询效率。
  • 中间件:在请求处理中检查客户端 IP 是否在黑名单中,若在则抛出 Forbidden 异常。

6. 测试应用

启动应用后,您可以通过 Postman 或浏览器访问接口。确保数据库中有有效的黑名单 IP 数据。访问黑名单 IP 时,您将收到 403 Forbidden 响应,表示该 IP 被阻止。

7. 总结

通过结合 Sanic 框架和内存缓存,我们成功实现了一个高效的 IP 黑名单管理系统。这种方法有效地提高了查询速度,减少了对数据库的频繁访问,为 API 接口提供了更好的安全保障。


网站公告

今日签到

点亮在社区的每一天
去签到

热门文章