为MySQL社区版实现审计功能:从插件配置到日志监控全解析

发布于:2025-06-11 ⋅ 阅读:(42) ⋅ 点赞:(0)

背景

公司要把重要的数据库的操作记录留存,方便查询操作可视化和告警监控等待。更能和其余相同的数据库保持一致。

本文将详细介绍如何通过MariaDB的server_audit插件为MySQL社区版实现完整的审计功能,包括版本兼容性处理、插件配置、日志解析和存储方案。

为什么MySQL社区版需要审计功能?

在公司数据库管理中,审计日志是安全合规和故障排查的重要工具。然而,许多使用MySQL社区版的企业面临一个尴尬的现实:原生的审计功能仅在企业版中提供。根据MySQL官方文档,社区版从5.7.34版本后不再支持第三方审计插件,这给需要合规审计的企业带来了挑战。

技术选型与版本兼容性分析

方案 优点 缺点 适用场景
MySQL企业版审计插件 官方支持,功能完善 需要付费许可 预算充足的企业
MariaDB server_audit插件 免费开源,功能完整 版本兼容性复杂 MySQL 5.7.34及以下版本
Percona审计插件 完全兼容MySQL 需要迁移到Percona Server 新建项目或可迁移环境
触发器+通用日志 无需额外组件 性能影响大,信息不完整 简单审计需求

在企业数据管理中,数据库操作审计是安全合规的核心需求。但MySQL社区版存在明显短板:

  1. 缺乏原生审计功能(仅企业版支持audit_log插件)
  2. 版本兼容陷阱
    • MySQL 5.7.34:最后一个可原生使用MariaDB插件的版本
    • MySQL ≥5.7.34 无法使用MariaDB的server_audit.so插件
    • MySQL 8.0 完全移除插件兼容性
  3. 替代方案选择
    • 方案1:MySQL 5.7低版本 + MariaDB插件(需版本匹配)
    • 方案2:迁移至Percona Server(免费开源,兼容MySQL 5.6/5.7/8.0且支持审计)
    • 方案3:全面迁移到MariaDB

实践建议:在测试环境中验证插件兼容性,生产环境部署前进行完整的回归测试。

环境

环境如下:

MySQL

[root@test_db01 ~]# mysql -V
mysql  Ver 14.14 Distrib 5.7.27, for Linux (x86_64) using  EditLine wrapper

Cnetos7

[root@test_db01 ~]# cat /etc/redhat-release 
CentOS Linux release 7.3.1611 (Core)

Linux test_db01 3.10.0-514.el7.x86_64 #1 SMP Tue Nov 22 16:42:41 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux

安装插件

查看插件安装目录


mysql>  show global variables like 'plugin_dir';
+---------------+--------------------------+
| Variable_name | Value                    |
+---------------+--------------------------+
| plugin_dir    | /usr/lib64/mysql/plugin/ |
+---------------+--------------------------+
1 row in set (0.00 sec)

提取mariadb审计插件并放置插件目录

此处需要注意版本,直接适配核对测试

wget  https://downloads.mariadb.com/MariaDB/mariadb-10.5.16/bintar-linux-x86_64/mariadb-10.5.16-linux-x86_64.tar.gz
tar -zxvf   mariadb-10.5.3-linux-x86_64.tar.gz
cp ./mariadb-10.5.3-linux-x86_64/lib/plugin/server_audit.so /usr/lib64/mysql/plugin/
chmod +x /usr/lib64/mysql/plugin/

mysql安装server_audit.so插件

mysql>  install plugin server_audit soname 'server_audit.so';
Query OK, 0 rows affected (0.02 sec)           
##也可以在my.cnf 加载插件方式安装
在my.cnf 设置 plugin_load = server_audit=server_audit.so  

查看当前MySQL插件情况

mysql>  show plugins;
| SERVER_AUDIT                             | ACTIVE   | AUDIT              | server_audit.so       | GPL     |

增加审计目录授权

mkdir /opt/mysqldata/auditlogs
chown -R mysql:mysql /opt/mysqldata/auditlogs

开启审计,写入配置文件

#防止server_audit 插件被卸载 进行配置文件配置
server_audit=FORCE_PLUS_PERMANENT
#指定哪些操作被记录到日志文件中
server_audit_events='CONNECT,QUERY,TABLE,QUERY_DDL'
#开启审计功能
server_audit_logging=on
#默认存放路径,可以不写,默认到data文件下
server_audit_file_path=/opt/mysqldata/auditlogs
#设置文件大小 默认1000000,1073741824=1GB
server_audit_file_rotate_size=1073741824
#指定日志文件的数量,如果为0日志将从不轮转
server_audit_file_rotations=0

下面是我的配置文件
版本太低,不支持JSON,高版本支持JSON格式,方便Python处理,后续只能re来处理
```bash
#######server audit#####
server_audit_logging=ON
server_audit_file_path=/www/server/data/server_audit.log
server_audit=FORCE_PLUS_PERMANENT
server_audit_file_rotate_size=500M
server_audit_file_rotations=15
max_allowed_packet=32M
#audit_log_format=JSON
#audit_log_file=server_audit.json
#server_audit_logging=ON
#server_audit_file_path=/data/mysql/server_audit.json
#server_audit=FORCE_PLUS_PERMANENT
#server_audit_file_rotate_size=1G
#server_audit_file_rotations=10
#max_allowed_packet=32M
  • FORCE_PLUS_PERMANENT:防止插件被意外卸载
  • 日志轮转设置:避免日志文件无限增长
  • max_allowed_packet:确保大SQL语句能被完整记录

重启mysql生效!

/etc/init.d/mysql restart
Shutting down MySQL............... SUCCESS! 
Starting MySQL.. SUCCESS! 

检查日志状态

-- 检查插件状态 
SHOW PLUGINS WHERE NAME = 'server_audit';
 
-- 查看审计设置 
SHOW GLOBAL VARIABLES LIKE 'server_audit%';

日志格式

20250610 10:42:49,test_db01,root,192.168.102.207,7972452,438352983,QUERY,xxxx,'SELECT  id,ext_id,ext_table_name,target_type,value1,value2,spread_config  FROM t_target \n \n WHERE (ext_id = \'12606411819336451111124\' AND ext_table_name = \'t_discount_coupon\')',0
20250610 10:42:50,test_db01,root,192.168.83.18,7972073,438352984,QUERY,gazelle_model_assemble,'SELECT  id,code,name,handler,status,retry_times,create_time,update_time  FROM batch      WHERE  (status = \'running\')',0
20250610 10:42:50,test_db01,root,192.168.102.207,7969972,438352986,QUERY,xxxx,'select * from t_event_send where sent = 0 order by created_time asc',0
20250610 10:42:50,test_db01,root,192.168.83.36,7853786,438352985,QUERY,hxxxdb,'SELECT\n            MAX(IF(status = 1, create_time, null)) nearBankTime\n        FROM user_bank_cards\n        WHERE create_time >= NOW() - INTERVAL 12 HOUR',0
20250610 10:42:50,test_db01,root,192.168.83.27,7971636,438352987,QUERY,hixxxx_core,'SELECT id, user_id, trade_no, out_trade_no, order_sn, lease_id, period_no, amount, trade_status, alipay_close_status, trade_type, remark, trade_way, business_type, entry_param, coupon_id, coupon_amount, deleted, gmt_create, gmt_modified FROM hire_trade_flows WHERE deleted = 0 AND (trade_status = 3 AND trade_type = 1 AND trade_way IN (1, 3, 4) AND business_type IN (3, 5, 2, 1, 14, 4, 6, 10, 13))',0
20250610 10:42:50,test_db01,root,192.168.83.25,7972144,438352988,QUERY,historical_customer_data,'SELECT 1',0
20250610 10:42:50,test_db01,root,192.168.83.25,7972144,438352989,QUERY,historical_customer_data,'SELECT 1',0

解释这条日志格式

20250610 10:42:50,test_db01,root,192.168.83.36,7853786,438352985,QUERY,hxxxdb,'SELECT\n            MAX(IF(status = 1, create_time, null)) nearBankTime\n        FROM user_bank_cards\n        WHERE create_time >= NOW() - INTERVAL 12 HOUR',0
字段编号 字段值 含义
1 20250610 10:42:50 时间戳,表示审计事件发生的时间
2 test_db01 服务器主机名(或 MariaDB 实例名称)
3 root 数据库用户名,执行该语句的用户
4 192.168.83.36 客户端 IP 地址
5 7853786 线程 ID(MariaDB 内部线程号)
6 438352985 查询 ID,用于区分不同的 SQL 查询
7 QUERY 事件类型QUERY 表示 SQL 查询)
8 hxxxdb 当前数据库名USE 的数据库)
9 'SELECT\n MAX(IF(... SQL 语句内容,使用单引号包裹,\n 为换行符
10 0 返回值 / 错误码0 表示成功(无错误)

审计日志处理系统实现

系统架构设计

[MySQL Server](生成审计日志)
[server_audit.log](Python监控进程)
[日志解析模块][MySQL审计数据库]

Python处理程序核心逻辑

20250610 10:42:50,test_db01,root,192.168.83.36,7853786,438352985,QUERY,hxxxdb,'SELECT\n            MAX(IF(status = 1, create_time, null)) nearBankTime\n        FROM user_bank_cards\n        WHERE create_time >= NOW() - INTERVAL 12 HOUR',0

我们有上面的日志格式,可以进行如下操作处理

日志监控关键特性:

  1. 断点续传:通过state文件记录读取位置
  2. 文件轮转检测:通过inode和dev识别日志切换
  3. 异常处理:完善的错误捕获和日志记录

在这里插入图片描述

def process_log_entry(new_content):
    """处理单个审计日志条目"""
    # 定义正则表达式模式,用于解析审计日志格式
    # 模式匹配字段:时间,数据库名(1),用户名,源IP,连接ID,查询ID,事件类型,数据库名(2),SQL语句,执行结果
    pattern = r"^(\d{8} \d{2}:\d{2}:\d{2}),([\w]+),([\w]+),(\d+\.\d+\.\d+\.\d+),(\d+),(\d+),([\w]+),([\w]+),'(.*)',(\d+)"
    
    # 尝试匹配日志行
    match = re.match(pattern, new_content)
    if match:
        # 提取匹配的各个字段
        sql_time = match.group(1)        # 时间 (格式: 年月日 时分秒)
        database_name = match.group(8)   # 数据库名称 (从第8组获取)
        sql_content = match.group(9)      # SQL语句内容 (包含原始转义字符)
        s_sql_content = match.group(9)    # 保留原始SQL语句用于日志记录
        sql_result = match.group(10)      # SQL执行结果 (0表示成功)
        
        # 清洗SQL语句内容:
        # 1. 去除SQL注释(--、#和/* */类型的注释)
        COMMENT_PATTERN = re.compile(r"(--.*?\\r\\n|#.*?\\r\\n|/\*.*?\*/)", re.DOTALL | re.MULTILINE)
        sql_content = COMMENT_PATTERN.sub("", sql_content)
        
        # 2. 替换特殊转义字符
        #    - 将\n换行符替换为空格
        #    - 将\'转义单引号替换为普通单引号
        #    - 将\t制表符替换为空格
        #    - 将\r回车符替换为空格
        sql_content = sql_content.replace('\\n', ' ').replace('\\\'', '\'').replace('\\t', ' ').replace('\\r', ' ')
        
        # 3. 去除SQL语句前后的多余空格
        sql_content = sql_content.strip()
        
        # 检查是否为需要记录的DDL语句:
        # 1. SQL执行成功 (sql_result == "0")
        # 2. 是DDL操作 (CREATE/ALTER/DROP/RENAME开头)
        DDL_PATTERN = re.compile(r"^(CREATE|ALTER|DROP|RENAME)", re.IGNORECASE)
        if sql_result == "0" and DDL_PATTERN.match(sql_content):
            # 记录处理日志
            logging.info("-" * 50)
            logging.info(f"原始SQL内容: {s_sql_content}")
            logging.info(f"执行时间: {sql_time}")
            logging.info(f"数据库名称: {database_name}")
            logging.info(f"清洗后SQL: {sql_content}")
            logging.info("-" * 50)
            
            # 将审计记录保存到MySQL数据库
            save_to_mysql(sql_time, database_name, sql_content)
原始: 'SELECT\n            MAX(...) nearBankTime\n        FROM ...'
清洗后: 'SELECT MAX(...) nearBankTime FROM ...'

完整代码

# -*- coding: utf-8 -*-
import os, sys
import re
import time
import json, logging
from datetime import datetime
import pymysql

# 日志配置
logging.basicConfig(
    filename='mysql_monitor.log',
    level=logging.INFO,
    format='[%(asctime)s] %(levelname)s: %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# 读取缓存文件,定位日志读取位置
def load_state(state_file):
    """Load saved state."""
    try:
        with open(state_file, 'r', encoding='utf-8') as f:
            state = json.load(f)
            return state.get('position', 0), state.get('inode', None), state.get('dev', None)
    except (IOError, ValueError):
        return 0, None, None
    except Exception as e:
        logging.error(f"Failed to load state: {str(e)}")
        return 0, None, None

# 保存位置数据到文件中
def save_state(state_file, position, inode, dev):
    """Save current state."""
    try:
        with open(state_file, 'w', encoding='utf-8') as f:
            json.dump({
                'position': position,
                'inode': inode,
                'dev': dev
            }, f)
    except Exception as e:
        logging.error(f"Failed to save state: {str(e)}")

# 保存sql数据到数据中,方便可视化 或者  告警
def save_to_mysql(sql_time, database_name, sql_content):
    """将日志信息写入到 MySQL 数据库的 audit_log 表中"""
    connection = pymysql.connect(
        host='192.168.102.201',
        user='root',
        password='xxxx.88',
        database='audit_db',
        charset='utf8mb4'
    )
    try:
        with connection.cursor() as cursor:
            sql = """
                  INSERT INTO audit_log (sql_time, database_name, sql_content)
                  VALUES (%s, %s, %s)
                  """
            sql_time = datetime.strptime(sql_time, "%Y%m%d %H:%M:%S").strftime("%Y-%m-%d %H:%M:%S")
            cursor.execute(sql, (sql_time, database_name, sql_content))
        connection.commit()
    except Exception as e:
        logging.error(f"Failed to insert data into MySQL: {str(e)}")
    finally:
        connection.close()

# 读取文件 ,数据 匹配 和 清洗
def process_log_entry(new_content):
    """Process a single log entry."""
    pattern = r"^(\d{8} \d{2}:\d{2}:\d{2}),([\w]+),([\w]+),(\d+\.\d+\.\d+\.\d+),(\d+),(\d+),([\w]+),([\w]+),'(.*)',(\d+)"
    match = re.match(pattern, new_content)
    if match:
        sql_time = match.group(1)
        database_name = match.group(8)
        sql_content = match.group(9)
        s_sql_content = match.group(9)
        sql_result = match.group(10)
        # 先匹配存在DDL语句的sql内容
        # 去除-- /* # 等注释符
        #logging.info(f"1 {sql_content}")
        COMMENT_PATTERN = re.compile(r"(--.*?\\r\\n|#.*?\\r\\n|/\*.*?\*/)", re.DOTALL | re.MULTILINE)
        sql_content = COMMENT_PATTERN.sub("", sql_content)
        # 去除多余的换行符/'等
        #logging.info(f"2 {sql_content}")
        sql_content = sql_content.replace('\\n', ' ').replace('\\\'', '\'').replace('\\t', ' ').replace('\\r', ' ')
        #logging.info(f"3 {sql_content}")
        # 去除前后多余的空格
        sql_content = sql_content.strip()
        #logging.info(f"4 {sql_content}")
        # 检查是否为 DDL 语句
        DDL_PATTERN = re.compile(r"^(CREATE|ALTER|DROP|RENAME)", re.IGNORECASE)
        if sql_result == "0" and DDL_PATTERN.match(sql_content):
            logging.info("-" * 50)
            logging.info(f"SQL Content: {s_sql_content}")
            logging.info(f"Time: {sql_time}")
            logging.info(f"Database Name: {database_name}")
            logging.info(f"SQL Content: {sql_content}")
            logging.info("-" * 50)
            save_to_mysql(sql_time, database_name, sql_content)

# 启动程序,传入 缓存 文件  和 审计日志记录 文件 位置
def monitor_log(state_file, log_file):
    """Monitor MySQL audit logs."""
    last_position, last_inode, last_dev = load_state(state_file)
    while True:
        try:
            current_st = os.stat(log_file)
        except OSError:
            logging.warning("Log file not found, retrying...")
            time.sleep(1)
            continue
        if (current_st.st_ino != last_inode) or (current_st.st_dev != last_dev):
            logging.info("New log file detected")
            last_position = 0
            last_inode = current_st.st_ino
            last_dev = current_st.st_dev
            save_state(state_file, last_position, last_inode, last_dev)
        if current_st.st_size < last_position:
            logging.info("File truncated")
            last_position = 0
            save_state(state_file, last_position, last_inode, last_dev)
        if current_st.st_size > last_position:
            try:
                with open(log_file, 'r', encoding='utf-8', errors='replace') as f:
                    f.seek(last_position)
                    for line in f:
                        process_log_entry(line.strip())
                    last_position = f.tell()
                    save_state(state_file, last_position, last_inode, last_dev)
            except Exception as e:
                logging.error(f"Error reading log file: {str(e)}")
        time.sleep(0.5)


if __name__ == "__main__":
    log_file = '/data/mysql/server_audit.log'
    state_file = "/opt/mysql_audit_monitor/mysqlMonitor.state"
    logging.info(f"Starting MySQL audit log monitoring: {log_file}")
    try:
        monitor_log(state_file, log_file)
    except KeyboardInterrupt:
        logging.info("Monitoring stopped")
        sys.exit(0)

在这里插入图片描述

在这里插入图片描述

后记

生产环境建议
场景 推荐方案
MySQL 5.7.34以下版本 MariaDB插件方案
MySQL 8.0+ Percona Server或迁移至MariaDB
审计日志存储 独立MySQL实例,与业务库物理隔离
高频操作环境 写入Elasticsearch替代MySQL

通过server_audit插件+Python监控的组合,可在MySQL社区版低成本实现企业级审计需求。但需注意:

📌 版本兼容性是成功前提,MySQL 8.0用户务必选择Percona/MariaDB
📌 日志清洗环节直接影响分析准确性(如转义符处理)
📌 审计日志存储分离是安全最佳实践

最终效果

  • 所有数据库操作可视化展示
  • 实时捕获高危DDL语句
  • 满足等保2.0/ISO 27001审计要求

网站公告

今日签到

点亮在社区的每一天
去签到