Python 助力 DBA:高效批量管理数据库服务器的多线程解决方案-多库查询汇总工具实现

发布于:2024-12-18 ⋅ 阅读:(18) ⋅ 点赞:(0)

批量数据库服务器连接测试与数据汇总:Python实现方案

作为数据库服务器运维人员,我们经常需要面对大量服务器的连接测试和数据汇总工作。本文将介绍一个使用Python实现的高效解决方案,可以帮助我们快速完成这些任务。

需求概述

  1. 从配置文件中读取要测试的数据库服务器IP地址列表。
  2. 批量测试数据库服务器的连接情况。
  3. 在所有可连接的服务器上执行相同的SQL查询。
  4. 将查询结果汇总到一个单独的数据库中,并包含对应服务器的IP地址。
  5. 自动创建结果表,表名按日期随机生成。
  6. 提供详细的日志输出,包括实时的处理进度。

实现方案

我们使用Python来实现这个方案,主要利用了以下库和技术:

  • pyodbc: 用于数据库连接和操作
  • configparser: 读取配置文件
  • concurrent.futures: 实现并发处理
  • logging: 日志记录
  • 多线程技术:提高处理效率

代码实现

以下是完整的Python代码实现:

import pyodbc
import logging
import configparser
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from datetime import datetime
import random
import string

# 配置日志
def setup_logger():
    """
    设置日志记录器,同时输出到文件和控制台
    """
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    # 文件处理器
    file_handler = logging.FileHandler('db_query_aggregation.log')
    file_handler.setLevel(logging.INFO)
    file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
    file_handler.setFormatter(file_formatter)
    
    # 控制台处理器
    console_handler = logging.StreamHandler()
    console_handler.setLevel(logging.INFO)
    console_formatter = logging.Formatter('%(message)s')
    console_handler.setFormatter(console_formatter)
    
    logger.addHandler(file_handler)
    logger.addHandler(console_handler)
    
    return logger

logger = setup_logger()

def read_config(config_file):
    """
    读取配置文件
    
    :param config_file: 配置文件路径
    :return: 包含配置信息的字典
    """
    try:
        config = configparser.ConfigParser()
        config.read(config_file)
        return {
            'ip_list_file': config['Files']['ip_list_file'],
            'source_db_username': config['SourceDB']['username'],
            'source_db_password': config['SourceDB']['password'],
            'target_db_info': dict(config['TargetDB']),
            'max_workers': int(config['Settings']['max_workers']),
            'query': config['Query']['sql']
        }
    except Exception as e:
        logger.error(f"读取配置文件时出错: {e}")
        raise

def read_ip_list(file_path):
    """
    从文件中读取IP地址列表
    
    :param file_path: IP地址文件路径
    :return: IP地址列表
    """
    try:
        with open(file_path, 'r') as file:
            return [line.strip() for line in file if line.strip()]
    except IOError as e:
        logger.error(f"无法读取IP地址文件: {e}")
        return []

def create_connection(server, database, username, password):
    """
    创建数据库连接
    
    :param server: 服务器地址
    :param database: 数据库名称
    :param username: 用户名
    :param password: 密码
    :return: 数据库连接对象,如果连接失败则返回None
    """
    try:
        conn_str = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}'
        return pyodbc.connect(conn_str, timeout=5)
    except pyodbc.Error as e:
        logger.error(f"连接到服务器 {server} 失败: {e}")
        return None

def execute_query(connection, query):
    """
    执行SQL查询
    
    :param connection: 数据库连接对象
    :param query: SQL查询语句
    :return: 查询结果列表
    """
    try:
        cursor = connection.cursor()
        cursor.execute(query)
        return cursor.fetchall()
    except pyodbc.Error as e:
        logger.error(f"执行查询时出错: {e}")
        return []

def process_server(ip, username, password, query):
    """
    处理单个服务器的查询
    
    :param ip: 服务器IP地址
    :param username: 数据库用户名
    :param password: 数据库密码
    :param query: SQL查询语句
    :return: 元组 (IP地址, 查询结果)
    """
    start_time = time.time()
    logger.info(f"开始处理服务器 {ip}")
    try:
        conn = create_connection(ip, 'master', username, password)
        if conn:
            results = execute_query(conn, query)
            conn.close()
            end_time = time.time()
            processing_time = end_time - start_time
            logger.info(f"服务器 {ip} 处理完成. 获取 {len(results)} 行数据. 耗时 {processing_time:.2f} 秒")
            return ip, results
    except Exception as e:
        logger.error(f"处理服务器 {ip} 时发生错误: {e}")
    logger.info(f"服务器 {ip} 处理失败")
    return ip, []

def create_target_table(connection, table_name, columns):
    """
    在目标数据库中创建表
    
    :param connection: 目标数据库连接对象
    :param table_name: 要创建的表名
    :param columns: 列定义列表
    """
    try:
        cursor = connection.cursor()
        create_table_query = f"CREATE TABLE {table_name} (ServerIP VARCHAR(15), {', '.join(columns)})"
        cursor.execute(create_table_query)
        connection.commit()
        logger.info(f"成功创建表 {table_name}")
    except pyodbc.Error as e:
        logger.error(f"创建目标表时出错: {e}")
        raise

def insert_data(connection, table_name, data):
    """
    将数据插入目标数据库
    
    :param connection: 目标数据库连接对象
    :param table_name: 目标表名
    :param data: 要插入的数据列表
    :return: 插入的行数
    """
    try:
        cursor = connection.cursor()
        placeholders = ', '.join(['?' for _ in range(len(data[0]))])
        insert_query = f"INSERT INTO {table_name} VALUES ({placeholders})"
        cursor.fast_executemany = True
        cursor.executemany(insert_query, data)
        connection.commit()
        return cursor.rowcount
    except pyodbc.Error as e:
        logger.error(f"插入数据时出错: {e}")
        connection.rollback()
        return 0

def generate_table_name():
    """
    生成随机表名
    
    :return: 生成的表名
    """
    date_str = datetime.now().strftime("%Y%m%d")
    random_str = ''.join(random.choices(string.ascii_lowercase, k=5))
    return f"QueryResults_{date_str}_{random_str}"

def main():
    """
    主函数,协调整个数据查询和汇总过程
    """
    try:
        # 读取配置
        config = read_config('config.ini')
        ip_list = read_ip_list(config['ip_list_file'])
        
        if not ip_list:
            logger.error("IP地址列表为空,程序终止")
            return

        logger.info(f"开始处理 {len(ip_list)} 个服务器")

        # 并发查询所有服务器
        results = []
        with ThreadPoolExecutor(max_workers=config['max_workers']) as executor:
            future_to_ip = {executor.submit(process_server, ip, config['source_db_username'], 
                                            config['source_db_password'], config['query']): ip for ip in ip_list}
            for future in as_completed(future_to_ip):
                ip, result = future.result()
                if result:
                    results.extend([(ip,) + tuple(row) for row in result])

        if not results:
            logger.info("没有查询到数据,程序终止")
            return

        # 连接目标数据库
        target_conn = create_connection(**config['target_db_info'])
        if not target_conn:
            logger.error("无法连接到目标数据库,程序终止")
            return

        # 创建目标表并插入数据
        table_name = generate_table_name()
        columns = [f"Column{i} VARCHAR(100)" for i in range(len(results[0]) - 1)]
        create_target_table(target_conn, table_name, columns)

        rows_inserted = insert_data(target_conn, table_name, results)
        target_conn.close()

        logger.info(f"数据汇总完成。插入 {rows_inserted} 行到表 {table_name}")
        print(f"查询结果已插入表: {table_name}")

    except Exception as e:
        logger.critical(f"程序执行过程中发生严重错误: {e}")
        print(f"程序执行过程中发生错误,请查看日志文件获取详细信息。")

if __name__ == "__main__":
    main()

代码说明

  1. 配置文件读取:使用 configparser 模块读取配置文件,包括数据库连接信息、查询语句等。

  2. 多线程处理:使用 ThreadPoolExecutor 并发执行查询,提高效率。

  3. 异常处理:每个关键操作都包含了异常处理,确保程序的稳定性。

  4. 模块化设计:将不同功能分解为独立的函数,提高代码的可读性和可维护性。

  5. 日志记录:使用 logging 模块记录详细的操作日志,同时输出到文件和控制台。

  6. 动态表创建:在目标数据库中动态创建表,表名包含日期和随机字符串。

  7. 数据汇总:将所有服务器的查询结果汇总到一个列表中,包括服务器IP地址。

  8. 批量数据插入:使用 executemany 批量插入数据到目标表。

使用说明

  1. 创建 config.ini 配置文件,包含以下内容:
[Files]
ip_list_file = server_ip_list.txt

[SourceDB]
username = your_source_username
password = your_source_password

[TargetDB]
server = your_target_server
database = your_target_database
username = your_target_username
password = your_target_password

[Settings]
max_workers = 50

[Query]
sql = SELECT column1, column2 FROM your_table
  1. 准备一个包含要查询的服务器IP地址的文本文件(如 server_ip_list.txt)。

  2. 运行脚本,它将并发查询所有服务器,汇总结果(包括服务器IP),并插入到目标数据库的新表中。

  3. 脚本执行完成后,会输出生成的表名。

结论

这个Python脚本提供了一个高效、灵活的解决方案,可以批量测试数据库服务器连接、执行查询并汇总结果。它具有以下优点:

  • 并发处理,大幅提高效率
  • 详细的日志记录,便于监控和调试
  • 灵活的配置,易于适应不同环境
  • 异常处理完善,提高程序稳定性
  • 结果包含服务器IP,便于追踪数据来源

对于需要管理大量数据库服务器的运维人员来说,这个脚本可以显著提高工作效率。您可以根据实际需求进一步调整和优化这个脚本,例如添加更多的错误处理、优化查询性能,或者扩展功能以支持更复杂的操作。

通过使用这个脚本,您可以轻松地对多个数据库服务器进行批量操作,并将结果汇总到一个中心位置,大大简化了数据库管理和监控的工作流程。