SqlServer整库迁移至Oracle

发布于:2025-04-03 ⋅ 阅读:(14) ⋅ 点赞:(0)

import pandas as pd
from sqlalchemy import create_engine, text
import cx_Oracle
from sqlalchemy.exc import DatabaseError
import traceback

# SQL Server 配置
sql_server_conn_str = 'mssql+pyodbc://用户名:密码@数据库地址:端口/库名?driver=ODBC+Driver+11+for+SQL+Server'
sql_server_engine = create_engine(sql_server_conn_str)

# Oracle 配置
oracle_conn_str = 'oracle+cx_oracle://用户名:密码@数据库地址:端口/库名'
oracle_engine = create_engine(oracle_conn_str,
                              connect_args={"encoding": "UTF-8", "nencoding": "UTF-8"})

def get_sqlserver_columns(table_name):
    """获取SQL Server表的列定义"""
    with sql_server_engine.connect() as conn:
        columns = conn.execute(text(f"""
            SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH
            FROM INFORMATION_SCHEMA.COLUMNS
            WHERE TABLE_NAME = '{table_name}'
        """)).fetchall()
    return columns

def get_numeric_columns(table_name):
    """获取需要转换为整数的列(NUMBER类型)"""
    with sql_server_engine.connect() as conn:
        result = conn.execute(text(f"""
            SELECT COLUMN_NAME 
            FROM INFORMATION_SCHEMA.COLUMNS 
            WHERE TABLE_NAME = '{table_name}'
            AND DATA_TYPE IN ('int', 'smallint', 'tinyint', 'bigint')
        """))
        return [row.COLUMN_NAME for row in result]

def map_oracle_type(sqlserver_type, max_length):
    """SQL Server到Oracle类型映射"""
    sqlserver_type = sqlserver_type.lower()
    if max_length and max_length < 1:
        max_length = None
    type_map = {
        'varchar': lambda: 'CLOB' if max_length is None else f'VARCHAR2({min(max_length, 4000)})',
        'nvarchar': lambda: 'NCLOB' if max_length is None else f'NVARCHAR2({min(max_length, 2000)})',
        'text': 'CLOB',
        'char': lambda: f'CHAR({max_length})' if max_length else 'CHAR(1)',
        'nchar': lambda: f'NCHAR({max_length})' if max_length else 'NCHAR(1)',
        'int': 'NUMBER(10)',
        'bigint': 'NUMBER(19)',
        'smallint': 'NUMBER(5)',
        'tinyint': 'NUMBER(3)',
        'decimal': 'NUMBER',
        'numeric': 'NUMBER',
        'float': 'BINARY_DOUBLE',
        'real': 'BINARY_FLOAT',
        'datetime': 'TIMESTAMP(6)',
        'datetime2': 'TIMESTAMP(6)',
        'date': 'DATE',
        'time': 'TIMESTAMP(6)',
        'bit': 'NUMBER(1)'
    }
    if sqlserver_type in type_map:
        return type_map[sqlserver_type]() if callable(type_map[sqlserver_type]) else type_map[sqlserver_type]
    return 'CLOB'

def migrate_table(table_name):
    try:
        oracle_table_name = table_name.upper()
        columns_info = get_sqlserver_columns(table_name)
        numeric_cols = get_numeric_columns(table_name)  # 获取需要转换的列

        # 构建Oracle表结构
        columns_with_types = [
            f'"{col.COLUMN_NAME}" {map_oracle_type(col.DATA_TYPE, col.CHARACTER_MAXIMUM_LENGTH)}'
            for col in columns_info
        ]

        with oracle_engine.connect() as conn:
            if conn.execute(text("SELECT 1 FROM user_tables WHERE table_name = :name"),
                            {'name': oracle_table_name}).scalar():
                print(f"删除旧表 {oracle_table_name}...")
                conn.execute(text(f'DROP TABLE "{oracle_table_name}" PURGE'))
                conn.commit()

            create_sql = f'CREATE TABLE "{oracle_table_name}" ({", ".join(columns_with_types)})'
            print(f"\n[DEBUG] 建表SQL:\n{create_sql}")
            conn.execute(text(create_sql))
            conn.commit()

            chunks = pd.read_sql_table(
                table_name,
                sql_server_engine,
                chunksize=10000
            )

            for chunk_idx, chunk_df in enumerate(chunks):
                # 空数据直接跳过
                if len(chunk_df) == 0:
                    print(f"跳过空数据批次:{chunk_idx + 1}")
                    continue

                # 空值处理
                chunk_df = chunk_df.where(pd.notnull(chunk_df), None)

                # 动态转换数值列
                for col in numeric_cols:
                    if col in chunk_df.columns:
                        chunk_df[col] = chunk_df[col].fillna(0).astype('int64')

                # 日期类型转换
                datetime_cols = [col for col in chunk_df.columns
                               if pd.api.types.is_datetime64_any_dtype(chunk_df[col])]
                for col in datetime_cols:
                    chunk_df[col] = chunk_df[col].dt.tz_localize(None)

                # 构建插入SQL
                columns = ', '.join([f'"{col}"' for col in chunk_df.columns])
                placeholders = ', '.join([f':{col}' for col in chunk_df.columns])
                insert_sql = text(f"""
                    INSERT INTO "{oracle_table_name}" ({columns}) 
                    VALUES ({placeholders})
                """)

                try:
                    conn.execute(insert_sql, chunk_df.to_dict(orient='records'))
                    conn.commit()
                    print(f"批次 {chunk_idx + 1}: 成功插入 {len(chunk_df)} 行")
                except DatabaseError as e:
                    conn.rollback()
                    print(f"插入失败: {str(e)}")
                    print("问题数据样例:", chunk_df.iloc[0].to_dict())
                    return

    except Exception as e:
        print(f"严重错误: {str(e)}")
        traceback.print_exc()

def migrate_database():
    with sql_server_engine.connect() as conn:
        tables = conn.execute(text("""
            SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES 
            WHERE TABLE_TYPE='BASE TABLE'
        """)).fetchall()

    exclude_tables = ['表名']  # 要过滤的表

    for table in tables:
        table_name = table[0]

        # 跳过特定表(不区分大小写)
        if table_name.upper() in [t.upper() for t in exclude_tables]:
            print(f"\n{'=' * 30} 跳过表 {table_name} {'=' * 30}")
            continue

        print(f"\n{'=' * 30} 迁移表 {table_name} {'=' * 30}")
        migrate_table(table_name)

if __name__ == '__main__':
    # migrate_table('表名')  # 单表测试
    migrate_database()  # 全库迁移
    print("\n迁移完成")


上面代码主要是解决整库迁移过程中相关表的创建(备注:不同数据库之间数据类型的映射转换),还有读取原始数据的类型转换 和 分批插入优化。

注意事项:

Oracle

1.在pycharm搜索不到cx_Oracle的库,通过cmd的方式进入python安装环境的目录Scripts下,然后pip install cx_Oracle 进行导入。

2.配置 Oracle Instant Client

1).根据Oracle的版本和操作系统进行相应下载  

官方地址:Oracle Instant Client Downloads

2).配置环境变量

  • Windows:将解压路径添加到系统 Path 变量。

SqlServer

1.根据SqlServer的版本进行驱动下载安装

官方地址:Download ODBC Driver for SQL Server - ODBC Driver for SQL Server | Microsoft Learn