批量数据库服务器连接测试与数据汇总:Python实现方案
作为数据库服务器运维人员,我们经常需要面对大量服务器的连接测试和数据汇总工作。本文将介绍一个使用Python实现的高效解决方案,可以帮助我们快速完成这些任务。
需求概述
- 从配置文件中读取要测试的数据库服务器IP地址列表。
- 批量测试数据库服务器的连接情况。
- 在所有可连接的服务器上执行相同的SQL查询。
- 将查询结果汇总到一个单独的数据库中,并包含对应服务器的IP地址。
- 自动创建结果表,表名按日期随机生成。
- 提供详细的日志输出,包括实时的处理进度。
实现方案
我们使用Python来实现这个方案,主要利用了以下库和技术:
pyodbc
: 用于数据库连接和操作configparser
: 读取配置文件concurrent.futures
: 实现并发处理logging
: 日志记录- 多线程技术:提高处理效率
代码实现
以下是完整的Python代码实现:
import pyodbc
import logging
import configparser
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
from datetime import datetime
import random
import string
# 配置日志
def setup_logger():
"""
设置日志记录器,同时输出到文件和控制台
"""
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# 文件处理器
file_handler = logging.FileHandler('db_query_aggregation.log')
file_handler.setLevel(logging.INFO)
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_formatter = logging.Formatter('%(message)s')
console_handler.setFormatter(console_formatter)
logger.addHandler(file_handler)
logger.addHandler(console_handler)
return logger
logger = setup_logger()
def read_config(config_file):
"""
读取配置文件
:param config_file: 配置文件路径
:return: 包含配置信息的字典
"""
try:
config = configparser.ConfigParser()
config.read(config_file)
return {
'ip_list_file': config['Files']['ip_list_file'],
'source_db_username': config['SourceDB']['username'],
'source_db_password': config['SourceDB']['password'],
'target_db_info': dict(config['TargetDB']),
'max_workers': int(config['Settings']['max_workers']),
'query': config['Query']['sql']
}
except Exception as e:
logger.error(f"读取配置文件时出错: {e}")
raise
def read_ip_list(file_path):
"""
从文件中读取IP地址列表
:param file_path: IP地址文件路径
:return: IP地址列表
"""
try:
with open(file_path, 'r') as file:
return [line.strip() for line in file if line.strip()]
except IOError as e:
logger.error(f"无法读取IP地址文件: {e}")
return []
def create_connection(server, database, username, password):
"""
创建数据库连接
:param server: 服务器地址
:param database: 数据库名称
:param username: 用户名
:param password: 密码
:return: 数据库连接对象,如果连接失败则返回None
"""
try:
conn_str = f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};UID={username};PWD={password}'
return pyodbc.connect(conn_str, timeout=5)
except pyodbc.Error as e:
logger.error(f"连接到服务器 {server} 失败: {e}")
return None
def execute_query(connection, query):
"""
执行SQL查询
:param connection: 数据库连接对象
:param query: SQL查询语句
:return: 查询结果列表
"""
try:
cursor = connection.cursor()
cursor.execute(query)
return cursor.fetchall()
except pyodbc.Error as e:
logger.error(f"执行查询时出错: {e}")
return []
def process_server(ip, username, password, query):
"""
处理单个服务器的查询
:param ip: 服务器IP地址
:param username: 数据库用户名
:param password: 数据库密码
:param query: SQL查询语句
:return: 元组 (IP地址, 查询结果)
"""
start_time = time.time()
logger.info(f"开始处理服务器 {ip}")
try:
conn = create_connection(ip, 'master', username, password)
if conn:
results = execute_query(conn, query)
conn.close()
end_time = time.time()
processing_time = end_time - start_time
logger.info(f"服务器 {ip} 处理完成. 获取 {len(results)} 行数据. 耗时 {processing_time:.2f} 秒")
return ip, results
except Exception as e:
logger.error(f"处理服务器 {ip} 时发生错误: {e}")
logger.info(f"服务器 {ip} 处理失败")
return ip, []
def create_target_table(connection, table_name, columns):
"""
在目标数据库中创建表
:param connection: 目标数据库连接对象
:param table_name: 要创建的表名
:param columns: 列定义列表
"""
try:
cursor = connection.cursor()
create_table_query = f"CREATE TABLE {table_name} (ServerIP VARCHAR(15), {', '.join(columns)})"
cursor.execute(create_table_query)
connection.commit()
logger.info(f"成功创建表 {table_name}")
except pyodbc.Error as e:
logger.error(f"创建目标表时出错: {e}")
raise
def insert_data(connection, table_name, data):
"""
将数据插入目标数据库
:param connection: 目标数据库连接对象
:param table_name: 目标表名
:param data: 要插入的数据列表
:return: 插入的行数
"""
try:
cursor = connection.cursor()
placeholders = ', '.join(['?' for _ in range(len(data[0]))])
insert_query = f"INSERT INTO {table_name} VALUES ({placeholders})"
cursor.fast_executemany = True
cursor.executemany(insert_query, data)
connection.commit()
return cursor.rowcount
except pyodbc.Error as e:
logger.error(f"插入数据时出错: {e}")
connection.rollback()
return 0
def generate_table_name():
"""
生成随机表名
:return: 生成的表名
"""
date_str = datetime.now().strftime("%Y%m%d")
random_str = ''.join(random.choices(string.ascii_lowercase, k=5))
return f"QueryResults_{date_str}_{random_str}"
def main():
"""
主函数,协调整个数据查询和汇总过程
"""
try:
# 读取配置
config = read_config('config.ini')
ip_list = read_ip_list(config['ip_list_file'])
if not ip_list:
logger.error("IP地址列表为空,程序终止")
return
logger.info(f"开始处理 {len(ip_list)} 个服务器")
# 并发查询所有服务器
results = []
with ThreadPoolExecutor(max_workers=config['max_workers']) as executor:
future_to_ip = {executor.submit(process_server, ip, config['source_db_username'],
config['source_db_password'], config['query']): ip for ip in ip_list}
for future in as_completed(future_to_ip):
ip, result = future.result()
if result:
results.extend([(ip,) + tuple(row) for row in result])
if not results:
logger.info("没有查询到数据,程序终止")
return
# 连接目标数据库
target_conn = create_connection(**config['target_db_info'])
if not target_conn:
logger.error("无法连接到目标数据库,程序终止")
return
# 创建目标表并插入数据
table_name = generate_table_name()
columns = [f"Column{i} VARCHAR(100)" for i in range(len(results[0]) - 1)]
create_target_table(target_conn, table_name, columns)
rows_inserted = insert_data(target_conn, table_name, results)
target_conn.close()
logger.info(f"数据汇总完成。插入 {rows_inserted} 行到表 {table_name}")
print(f"查询结果已插入表: {table_name}")
except Exception as e:
logger.critical(f"程序执行过程中发生严重错误: {e}")
print(f"程序执行过程中发生错误,请查看日志文件获取详细信息。")
if __name__ == "__main__":
main()
代码说明
配置文件读取:使用
configparser
模块读取配置文件,包括数据库连接信息、查询语句等。多线程处理:使用
ThreadPoolExecutor
并发执行查询,提高效率。异常处理:每个关键操作都包含了异常处理,确保程序的稳定性。
模块化设计:将不同功能分解为独立的函数,提高代码的可读性和可维护性。
日志记录:使用
logging
模块记录详细的操作日志,同时输出到文件和控制台。动态表创建:在目标数据库中动态创建表,表名包含日期和随机字符串。
数据汇总:将所有服务器的查询结果汇总到一个列表中,包括服务器IP地址。
批量数据插入:使用
executemany
批量插入数据到目标表。
使用说明
- 创建
config.ini
配置文件,包含以下内容:
[Files]
ip_list_file = server_ip_list.txt
[SourceDB]
username = your_source_username
password = your_source_password
[TargetDB]
server = your_target_server
database = your_target_database
username = your_target_username
password = your_target_password
[Settings]
max_workers = 50
[Query]
sql = SELECT column1, column2 FROM your_table
准备一个包含要查询的服务器IP地址的文本文件(如
server_ip_list.txt
)。运行脚本,它将并发查询所有服务器,汇总结果(包括服务器IP),并插入到目标数据库的新表中。
脚本执行完成后,会输出生成的表名。
结论
这个Python脚本提供了一个高效、灵活的解决方案,可以批量测试数据库服务器连接、执行查询并汇总结果。它具有以下优点:
- 并发处理,大幅提高效率
- 详细的日志记录,便于监控和调试
- 灵活的配置,易于适应不同环境
- 异常处理完善,提高程序稳定性
- 结果包含服务器IP,便于追踪数据来源
对于需要管理大量数据库服务器的运维人员来说,这个脚本可以显著提高工作效率。您可以根据实际需求进一步调整和优化这个脚本,例如添加更多的错误处理、优化查询性能,或者扩展功能以支持更复杂的操作。
通过使用这个脚本,您可以轻松地对多个数据库服务器进行批量操作,并将结果汇总到一个中心位置,大大简化了数据库管理和监控的工作流程。