python实现批量连接sqlserver数据库,批量上传存储过程,批量更新存储过程到多个目标数据库

发布于:2024-08-11 ⋅ 阅读:(134) ⋅ 点赞:(0)
import sys
import json
import pyodbc
import re
import os
import chardet
from PyQt5.QtWidgets import (QApplication, QMainWindow, QPushButton, QFileDialog, QLabel, QHBoxLayout, QVBoxLayout,
                             QWidget, QMessageBox, QCheckBox, QFormLayout, QDialog, QDialogButtonBox, QScrollArea,
                             QFrame, QGridLayout, QTextEdit)
from PyQt5.QtCore import Qt
from PyQt5.QtGui import QGuiApplication

class SQLUploader(QMainWindow):
    def __init__(self):
        super().__init__()
        self.setWindowTitle('SQL Server 存储过程上传工具')
        self.setGeometry(100, 100, 600, 400)
        self.center()  # Center the window

        # 初始化用户界面
        self.init_ui()

        # 加载数据库配置
        self.load_config()

        # 状态变量
        self.source_connection = None
        self.target_connections = []
        self.uploaded_file_paths = []  # 支持多个文件路径
        self.procedures_list = []  # 存储获取的存储过程信息

    def init_ui(self):
        """初始化用户界面"""
        # 创建按钮
        self.connect_source_button = QPushButton('连接源数据库')
        self.connect_source_button.clicked.connect(self.connect_source_database)
        self.connect_source_button.setFixedSize(120, 40)

        self.select_target_button = QPushButton('选择目标数据库')
        self.select_target_button.clicked.connect(self.select_and_connect_target_databases)
        self.select_target_button.setFixedSize(120, 40)

        self.upload_button = QPushButton('上传存储过程')
        self.upload_button.clicked.connect(self.upload_procedures)
        self.upload_button.setEnabled(False)
        self.upload_button.setFixedSize(120, 40)

        self.update_button = QPushButton('更新存储过程')
        self.update_button.clicked.connect(self.update_procedures)
        self.update_button.setEnabled(False)
        self.update_button.setFixedSize(120, 40)

        self.get_procedures_button = QPushButton('获取源存储过程')
        self.get_procedures_button.clicked.connect(self.get_source_procedures)
        self.get_procedures_button.setEnabled(False)
        self.get_procedures_button.setFixedSize(120, 40)

        self.batch_update_button = QPushButton('批量更新存储过程')
        self.batch_update_button.clicked.connect(self.batch_update_procedures)
        self.batch_update_button.setEnabled(False)
        self.batch_update_button.setFixedSize(120, 40)

        # 创建状态标签
        self.source_db_status_label = QLabel('源数据库连接状态: <br>尚未连接')
        self.target_db_status_label = QLabel('目标数据库连接状态: <br>尚未连接')
        self.file_status_label = QLabel('文件上传状态: <br>尚未上传')
        self.update_status_label = QLabel('更新状态: 尚未更新')

        # 创建状态区域的滚动视图
        self.status_scroll_area = QScrollArea()
        self.status_scroll_area.setWidgetResizable(True)
        self.status_scroll_area.setMaximumHeight(150)  # 设置最大高度
        self.status_scroll_area.setFrameShape(QFrame.NoFrame)

        # 创建水平布局用于存放前三个状态标签
        status_top_layout = QHBoxLayout()
        status_top_layout.addWidget(self.target_db_status_label)
        status_top_layout.addWidget(self.source_db_status_label)
        status_top_layout.addWidget(self.file_status_label)

        # 创建垂直布局用于存放所有状态标签
        status_layout = QVBoxLayout()
        status_layout.addLayout(status_top_layout)
        status_layout.addWidget(self.update_status_label)

        status_content = QWidget()
        status_content.setLayout(status_layout)

        self.status_scroll_area.setWidget(status_content)

        # 布局设置
        button_layout = QGridLayout()
        button_layout.addWidget(self.select_target_button, 0, 0)
        button_layout.addWidget(self.upload_button, 0, 1)
        button_layout.addWidget(self.update_button, 0, 2)
        button_layout.addWidget(self.connect_source_button, 1, 0)
        button_layout.addWidget(self.get_procedures_button, 1, 1)
        button_layout.addWidget(self.batch_update_button, 1, 2)

        main_layout = QVBoxLayout()
        main_layout.addLayout(button_layout)
        main_layout.addWidget(self.status_scroll_area)

        container = QWidget()
        container.setLayout(main_layout)
        self.setCentralWidget(container)

    def update_status_labels(self, proc_names):
        """更新状态标签,纵向排列存储过程名称"""
        self.file_status_label.setText('文件上传状态: 尚未上传')
        if proc_names:
            proc_names_text = "<br>".join(proc_names)
            self.file_status_label.setText(f'文件上传状态: 已上传存储过程:<br>{proc_names_text}')

    def center(self):
        """将窗口居中显示"""
        screen = QGuiApplication.primaryScreen()
        rect = screen.availableGeometry()
        size = self.geometry()
        self.move((rect.width() - size.width()) // 2, (rect.height() - size.height()) // 2)

    def load_config(self):
        """从 config.json 文件中加载数据库配置"""
        try:
            if getattr(sys, 'frozen', False):
                # 如果是打包后的程序,使用 _MEIPASS 路径
                config_path = os.path.join(sys._MEIPASS, 'config.json')
            else:
                # 如果是开发环境
                config_path = 'config.json'

            with open(config_path, 'r', encoding='utf-8') as file:
                self.config = json.load(file)
            self.source_db_config = self.config.get('source_database', {})
            self.target_db_configs = self.config.get('target_databases', [])
        except Exception as e:
            QMessageBox.critical(self, '错误', f'加载配置失败: {e}')
            sys.exit(1)

    def build_connection_string(self, db_config):
        """根据数据库配置构建连接字符串"""
        return f"DRIVER={{SQL Server}};SERVER={db_config['host']};DATABASE={db_config['database']};UID={db_config['user']};PWD={db_config['password']}"

    def connect_source_database(self):
        """连接源数据库"""
        if self.source_connection:
            self.disconnect_source_database()

        self.source_db_status_label.setText('源数据库连接状态: 连接中...')

        db_config = self.source_db_config
        conn_str = self.build_connection_string(db_config)
        try:
            with pyodbc.connect(conn_str) as conn:
                self.source_connection = conn
                self.source_db_status_label.setText(
                    f'源数据库连接状态: <br>{self.source_db_config["name"]} 连接成功')
                self.get_procedures_button.setEnabled(True)  # 启用获取存储过程按钮
        except pyodbc.Error as e:
            self.source_db_status_label.setText(f'源数据库连接状态: <br>连接失败 - {str(e)}')
            QMessageBox.critical(self, '错误', f'连接源数据库失败: {e}')
        except Exception as e:
            self.source_db_status_label.setText(f'源数据库连接状态: <br>连接失败 - {str(e)}')
            QMessageBox.critical(self, '错误', f'连接源数据库失败: {e}')

    def select_and_connect_target_databases(self):
        """显示目标数据库选择对话框并连接所选的数据库"""
        if not self.target_db_configs:
            QMessageBox.warning(self, '警告', '没有目标数据库配置可供选择。')
            return

        dialog = QDialog(self)
        dialog.setWindowTitle('选择目标数据库')
        dialog.setGeometry(200, 200, 300, 300)  # 调整对话框的大小以适应新的布局
        self.center_dialog(dialog)  # Center the dialog

        grid_layout = QGridLayout()  # 使用网格布局来处理多列显示
        grid_layout.setHorizontalSpacing(5)  # 设置列间距
        grid_layout.setVerticalSpacing(5)  # 设置行间距

        # 添加全选和清空按钮
        select_all_button = QPushButton('全选')
        select_all_button.clicked.connect(self.select_all_checkboxes)
        select_all_button.setFixedSize(100, 40)
        clear_all_button = QPushButton('清空')
        clear_all_button.clicked.connect(self.clear_all_checkboxes)
        clear_all_button.setFixedSize(100, 40)

        # 将按钮放置在对话框的顶部
        button_layout = QHBoxLayout()
        button_layout.addWidget(select_all_button)
        button_layout.addWidget(clear_all_button)

        # 创建复选框并添加到网格布局中
        self.checkboxes = {}
        max_items_per_column = 10
        num_columns = (len(self.target_db_configs) + max_items_per_column - 1) // max_items_per_column

        for index, db in enumerate(self.target_db_configs):
            row = index % max_items_per_column
            col = index // max_items_per_column
            checkbox = QCheckBox(db['name'])
            grid_layout.addWidget(checkbox, row, col)
            self.checkboxes[db['name']] = (db, checkbox)

        button_box = QDialogButtonBox(QDialogButtonBox.Ok | QDialogButtonBox.Cancel)
        button_box.button(QDialogButtonBox.Ok).setText('连接')
        button_box.button(QDialogButtonBox.Cancel).setText('关闭')
        button_box.accepted.connect(self.check_and_connect_selected_target_databases)
        button_box.rejected.connect(dialog.reject)

        # 设置对话框的布局
        main_layout = QVBoxLayout()
        main_layout.addLayout(button_layout)  # 添加按钮布局
        main_layout.addLayout(grid_layout)  # 添加复选框布局
        main_layout.addWidget(button_box)
        dialog.setLayout(main_layout)
        dialog.exec_()

    def check_and_connect_selected_target_databases(self):
        """检查是否选择了目标数据库,并连接所选的数据库"""
        selected_databases = [name for name, (_, checkbox) in self.checkboxes.items() if checkbox.isChecked()]

        if not selected_databases:
            QMessageBox.warning(self, '警告', '请至少选择一个目标数据库。')
            return

        self.connect_selected_target_databases(self.sender().parent())  # 传递对话框对象

    def select_all_checkboxes(self):
        """全选所有复选框"""
        for checkbox in self.checkboxes.values():
            checkbox[1].setChecked(True)

    def clear_all_checkboxes(self):
        """清空所有复选框"""
        for checkbox in self.checkboxes.values():
            checkbox[1].setChecked(False)

    def center_dialog(self, dialog):
        """将对话框居中显示"""
        screen = QGuiApplication.primaryScreen()
        rect = screen.availableGeometry()
        size = dialog.geometry()
        dialog.move((rect.width() - size.width()) // 2, (rect.height() - size.height()) // 2)

        def connect_selected_target_databases(self, dialog):
        """连接用户选择的目标数据库"""
        if self.target_connections:
            self.disconnect_all_target_databases()

        # 设置目标数据库连接状态为连接中
        self.target_db_status_label.setText('目标数据库连接状态: 连接中...')
        self.target_connections = []

        connection_results = []

        # 连接所选的目标数据库
        for name, (db_config, checkbox) in self.checkboxes.items():
            if checkbox.isChecked():
                conn_str = self.build_connection_string(db_config)
                try:
                    with pyodbc.connect(conn_str) as conn:
                        self.target_connections.append((name, conn_str))
                        connection_results.append(f'{name} 连接成功')
                except pyodbc.Error as e:
                    connection_results.append(f'连接失败 {name} 数据库 - {str(e)}')
                except Exception as e:
                    connection_results.append(f'连接失败 {name} 数据库 - {str(e)}')

        # 关闭选择数据库对话框
        dialog.accept()

        # 汇总连接结果
        if self.target_connections:
            self.upload_button.setEnabled(True)  # 启用上传按钮
            # 设置状态标签
            self.target_db_status_label.setText('目标数据库连接状态:<br>' + '<br>'.join(connection_results))
        else:
            self.update_button.setEnabled(False)
            self.upload_button.setEnabled(False)
            self.batch_update_button.setEnabled(False)
            # 更新状态标签以显示没有连接成功的情况
            self.target_db_status_label.setText('目标数据库连接状态: 所有目标数据库连接失败。')

        # 显示汇总的连接结果
        QMessageBox.information(self, '连接结果', '\n'.join(connection_results))

    def disconnect_all_target_databases(self):
        """断开所有目标数据库连接"""
        self.target_connections = []
        self.target_db_status_label.setText('目标数据库连接状态: 所有目标数据库连接已断开。')

    def detect_file_encoding(self, file_path):
        """检测文件编码"""
        try:
            with open(file_path, 'rb') as file:
                raw_data = file.read(10000)  # 读取前 10000 字节
            result = chardet.detect(raw_data)
            return result['encoding']
        except Exception as e:
            QMessageBox.critical(self, '错误', f'检测文件编码失败: {e}')
            return None

    def read_stored_procedure(self, file_path):
        """从文件中读取 SQL 存储过程"""
        try:
            encoding = self.detect_file_encoding(file_path)
            if encoding is None:
                return ''
            with open(file_path, 'r', encoding=encoding) as file:
                return file.read()
        except Exception as e:
            QMessageBox.critical(self, '错误', f'读取存储过程文件失败: {e}')
            return ''

    def clean_sql_script(self, sql_script):
        """清理 SQL 脚本中的不必要的部分"""
        # 移除 USE 语句
        cleaned_script = re.sub(r'^USE \[.*?\]\s*', '', sql_script, flags=re.IGNORECASE | re.MULTILINE)

        # 移除所有的 GO 语句
        cleaned_script = re.sub(r'\s*GO\s*', '', cleaned_script, flags=re.IGNORECASE)

        # 处理 SET ANSI_NULLS 和 SET QUOTED_IDENTIFIER
        # 保证这些 SET 语句在存储过程定义之前
        cleaned_script = re.sub(r'^SET ANSI_NULLS ON\s*', '', cleaned_script, flags=re.IGNORECASE | re.MULTILINE)
        cleaned_script = re.sub(r'^SET QUOTED_IDENTIFIER ON\s*', '', cleaned_script, flags=re.IGNORECASE | re.MULTILINE)

        # 确保 CREATE/ALTER PROCEDURE 是查询批次中的第一个语句
        match = re.match(r'^(?:CREATE|ALTER)\s+PROCEDURE', cleaned_script, re.IGNORECASE)
        if match:
            cleaned_script = cleaned_script.strip()

        return cleaned_script

    def extract_procedure_name(self, proc_sql):
        """从 SQL 脚本中提取存储过程名称"""
        match = re.search(r'(?:CREATE|ALTER)\s+PROCEDURE\s+\[?dbo\]?\.\[?([^\]]+)\]?', proc_sql, re.IGNORECASE)
        return match.group(1) if match else '未知存储过程'

    def upload_procedures(self):
        """处理文件对话框并上传多个存储过程"""
        options = QFileDialog.Options()
        file_paths, _ = QFileDialog.getOpenFileNames(self, '打开 SQL 文件', '', 'SQL 文件 (*.sql);;所有文件 (*)',
                                                   options=options)

        if file_paths:
            # 清空上次上传的状态
            self.proc_names = []
            self.update_status_labels(self.proc_names)
            self.uploaded_file_paths = file_paths

            for file_path in file_paths:
                try:
                    proc_sql = self.read_stored_procedure(file_path)
                    if not proc_sql:
                        continue

                    # 处理和清理 SQL 脚本
                    set_statements = []
                    set_statements.extend(re.findall(r'^SET ANSI_NULLS ON\s*', proc_sql, re.IGNORECASE | re.MULTILINE))
                    set_statements.extend(
                        re.findall(r'^SET QUOTED_IDENTIFIER ON\s*', proc_sql, re.IGNORECASE | re.MULTILINE))

                    # 清理 SQL 脚本
                    cleaned_sql = self.clean_sql_script(proc_sql)
                    if not cleaned_sql:
                        QMessageBox.warning(self, '警告', f'存储过程脚本为空或无效: {file_path}')
                        continue

                    final_sql = '\n'.join(set_statements) + '\n' + cleaned_sql

                    proc_name = self.extract_procedure_name(cleaned_sql)
                    self.proc_names.append(proc_name)
                    self.update_status_labels(self.proc_names)

                except Exception as e:
                    QMessageBox.critical(self, '错误', f'上传存储过程失败: {e}')
                    print(f"上传存储过程失败: {e}")

            if self.proc_names:
                self.update_button.setEnabled(True)  # 上传文件成功后,启用更新按钮
                QMessageBox.information(self, '信息', f'{len(self.proc_names)} 个存储过程已准备好进行更新。')

    def upload_stored_procedure(self, connection_str, proc_name, proc_sql):
        """将存储过程上传到数据库"""
        try:
            with pyodbc.connect(connection_str) as conn:
                cursor = conn.cursor()

                # 检查存储过程是否存在
                check_proc_sql = f"""
                IF OBJECT_ID('{proc_name}', 'P') IS NOT NULL
                    SELECT 1
                ELSE
                    SELECT 0
                """
                cursor.execute(check_proc_sql)
                exists = cursor.fetchone()[0]

                if exists:
                    # 存储过程存在,先删除再创建
                    drop_proc_sql = f"IF OBJECT_ID('{proc_name}', 'P') IS NOT NULL DROP PROCEDURE {proc_name};"
                    cursor.execute(drop_proc_sql)
                    conn.commit()

                    # 替换 ALTER 为 CREATE
                    proc_sql = self.replace_alter_with_create(proc_sql)
                else:
                    # 存储过程不存在,直接使用 CREATE
                    proc_sql = self.replace_alter_with_create(proc_sql)

                # 创建新的存储过程
                cursor.execute(proc_sql)
                conn.commit()
                return True, ""
        except pyodbc.Error as e:
            print(f"数据库错误: {e}")
            return False, str(e)
        except Exception as e:
            print(f"其他错误: {e}")
            return False, str(e)

    def replace_alter_with_create(self, sql_script):
        """将 ALTER PROCEDURE 替换为 CREATE PROCEDURE"""
        altered_script = re.sub(
            r'\bALTER\s+PROCEDURE\b',
            'CREATE PROCEDURE',
            sql_script,
            flags=re.IGNORECASE
        )
        return altered_script

    def update_procedures(self):
        """更新所有连接的目标数据库中的存储过程"""
        if not self.target_connections or not self.uploaded_file_paths:
            QMessageBox.warning(self, '警告', '请确保已连接目标数据库并上传存储过程文件。')
            return

        success = True
        update_results = {}
        failed_procs = {}  # 记录每个数据库的失败存储过程及其错误信息

        for file_path in self.uploaded_file_paths:
            proc_sql = self.read_stored_procedure(file_path)
            if not proc_sql:
                QMessageBox.warning(self, '警告', f'读取存储过程 SQL 脚本失败: {file_path}')
                continue

            # 清理 SQL 脚本
            cleaned_sql = self.clean_sql_script(proc_sql)
            if not cleaned_sql:
                QMessageBox.warning(self, '警告', f'存储过程脚本为空或无效: {file_path}')
                continue

            proc_name = self.extract_procedure_name(cleaned_sql)

            for name, conn_str in self.target_connections:
                result, error_msg = self.upload_stored_procedure(conn_str, proc_name, cleaned_sql)
                if result:
                    if name not in update_results:
                        update_results[name] = {'success': 0, 'failure': 0}
                    update_results[name]['success'] += 1
                else:
                    success = False
                    if name not in failed_procs:
                        failed_procs[name] = []
                    failed_procs[name].append(f"存储过程 '{proc_name}': {error_msg}")
                    if name not in update_results:
                        update_results[name] = {'success': 0, 'failure': 0}
                    update_results[name]['failure'] += 1

        # 构建更新结果信息
        update_info = []
        for name, stats in update_results.items():
            update_info.append(f"{name} 数据库: 更新成功 {stats['success']} 条, 更新失败 {stats['failure']} 条")
        if failed_procs:
            for name, errors in failed_procs.items():
                update_info.append(f"{name} 数据库更新失败的存储过程:")
                update_info.extend(errors)

        # 显示更新结果
        QMessageBox.information(self, '更新结果', '\n'.join(update_info))

        # 更新状态标签
        self.update_status_label.setText('<br>'.join(update_info))

    def get_source_procedures(self):
        """获取源数据库中所有存储过程的内容,排除以 sp_ 开头的存储过程"""
        if not self.source_connection:
            QMessageBox.warning(self, '警告', '请先连接到源数据库。')
            return

        cursor = self.source_connection.cursor()
        try:
            # 使用 sys.procedures 和 sys.sql_modules 获取存储过程定义
            cursor.execute("""
                SELECT
                    p.name AS ProcedureName,
                    sm.definition AS ProcedureDefinition
                FROM sys.procedures p
                JOIN sys.sql_modules sm ON p.object_id = sm.object_id
                WHERE p.name NOT LIKE 'sp_%'
                ORDER BY p.name
            """)
            procedures = cursor.fetchall()
            self.procedures_list = [(row.ProcedureName, row.ProcedureDefinition) for row in procedures]
            proc_names = [row.ProcedureName for row in procedures]

            # 显示存储过程名称的弹框
            self.show_procedures_dialog(proc_names)

            # 启用批量更新按钮
            self.batch_update_button.setEnabled(True)

        except pyodbc.Error as e:
            QMessageBox.critical(self, '错误', f'获取存储过程失败: {e}')
        except Exception as e:
            QMessageBox.critical(self, '错误', f'获取存储过程失败: {e}')

    def show_procedures_dialog(self, proc_names):
        """显示存储过程名称的对话框"""
        dialog = QDialog(self)
        dialog.setWindowTitle('存储过程列表')

        layout = QVBoxLayout()

        # 创建一个文本编辑器显示存储过程列表
        text_edit = QTextEdit()
        text_edit.setReadOnly(True)
        text_edit.setText("\n".join(proc_names))
        text_edit.setMaximumHeight(200)  # 设置最大高度以显示滚动条
        text_edit.setVerticalScrollBarPolicy(Qt.ScrollBarAlwaysOn)

        layout.addWidget(text_edit)

        button_box = QDialogButtonBox(QDialogButtonBox.Ok)
        button_box.accepted.connect(dialog.accept)
        layout.addWidget(button_box)

        dialog.setLayout(layout)
        dialog.exec_()

    def batch_update_procedures(self):
        """批量更新存储过程到目标数据库"""
        if not self.source_connection:
            QMessageBox.warning(self, '警告', '请先连接到源数据库。')
            return

        if not self.target_connections:
            QMessageBox.warning(self, '警告', '请连接到目标数据库。')
            return

        if not self.procedures_list:
            QMessageBox.warning(self, '警告', '请先获取源数据库中的存储过程。')
            return

        successful_updates = 0
        failed_procs = []

        for proc_name, proc_definition in self.procedures_list:
            proc_successful = True
            for _, conn_str in self.target_connections:
                try:
                    with pyodbc.connect(conn_str) as conn:
                        cursor = conn.cursor()
                        # 删除现有的存储过程
                        cursor.execute(f"IF OBJECT_ID('{proc_name}', 'P') IS NOT NULL DROP PROCEDURE {proc_name}")
                        # 创建新的存储过程
                        cursor.execute(proc_definition)
                        # 提交事务
                        conn.commit()
                except pyodbc.Error as e:
                    proc_successful = False
                    break  # 如果在某个目标数据库失败,不再尝试其他数据库
                except Exception as e:
                    proc_successful = False
                    break  # 如果在某个目标数据库失败,不再尝试其他数据库

            if proc_successful:
                successful_updates += 1
            else:
                failed_procs.append(proc_name)

        # 生成消息
        if len(failed_procs) == 0:
            QMessageBox.information(self, '成功', '存储过程已成功批量更新到所有目标数据库。')
        else:
            message = f'批量更新完成: 成功更新 {successful_updates} 条, 失败 {len(failed_procs)} 条'
            failed_procs_str = "<br>".join(failed_procs)
            message += f'<br>失败的存储过程:<br>{failed_procs_str}'
            QMessageBox.warning(self, '部分成功', message)

        # 更新状态标签
        self.update_status_label.setText(f'成功批量更新 {successful_updates} 条, 失败 {len(failed_procs)} 条')

    def closeEvent(self, event):
        """关闭事件,确保程序退出时断开所有数据库连接"""
        self.disconnect_all_target_databases()
        event.accept()


if __name__ == '__main__':
    app = QApplication(sys.argv)
    window = SQLUploader()
    window.show()
    sys.exit(app.exec_())

config.json配置文件

{
    "source_database": {
        "name": "源数据库",
        "host": "",
        "database": "",
        "user": "",
        "password": ""
    },
    "target_databases": [
        {
            "name": "目标数据库",
            "host": "",
            "database": "",
            "user": "",
            "password": ""
        }
    ]
}


网站公告

今日签到

点亮在社区的每一天
去签到