import pandas as pd
from sqlalchemy import create_engine, text
import cx_Oracle
from sqlalchemy.exc import DatabaseError
import traceback
# SQL Server 配置
sql_server_conn_str = 'mssql+pyodbc://用户名:密码@数据库地址:端口/库名?driver=ODBC+Driver+11+for+SQL+Server'
sql_server_engine = create_engine(sql_server_conn_str)
# Oracle 配置
oracle_conn_str = 'oracle+cx_oracle://用户名:密码@数据库地址:端口/库名'
oracle_engine = create_engine(oracle_conn_str,
connect_args={"encoding": "UTF-8", "nencoding": "UTF-8"})
def get_sqlserver_columns(table_name):
"""获取SQL Server表的列定义"""
with sql_server_engine.connect() as conn:
columns = conn.execute(text(f"""
SELECT COLUMN_NAME, DATA_TYPE, CHARACTER_MAXIMUM_LENGTH
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{table_name}'
""")).fetchall()
return columns
def get_numeric_columns(table_name):
"""获取需要转换为整数的列(NUMBER类型)"""
with sql_server_engine.connect() as conn:
result = conn.execute(text(f"""
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_NAME = '{table_name}'
AND DATA_TYPE IN ('int', 'smallint', 'tinyint', 'bigint')
"""))
return [row.COLUMN_NAME for row in result]
def map_oracle_type(sqlserver_type, max_length):
"""SQL Server到Oracle类型映射"""
sqlserver_type = sqlserver_type.lower()
if max_length and max_length < 1:
max_length = None
type_map = {
'varchar': lambda: 'CLOB' if max_length is None else f'VARCHAR2({min(max_length, 4000)})',
'nvarchar': lambda: 'NCLOB' if max_length is None else f'NVARCHAR2({min(max_length, 2000)})',
'text': 'CLOB',
'char': lambda: f'CHAR({max_length})' if max_length else 'CHAR(1)',
'nchar': lambda: f'NCHAR({max_length})' if max_length else 'NCHAR(1)',
'int': 'NUMBER(10)',
'bigint': 'NUMBER(19)',
'smallint': 'NUMBER(5)',
'tinyint': 'NUMBER(3)',
'decimal': 'NUMBER',
'numeric': 'NUMBER',
'float': 'BINARY_DOUBLE',
'real': 'BINARY_FLOAT',
'datetime': 'TIMESTAMP(6)',
'datetime2': 'TIMESTAMP(6)',
'date': 'DATE',
'time': 'TIMESTAMP(6)',
'bit': 'NUMBER(1)'
}
if sqlserver_type in type_map:
return type_map[sqlserver_type]() if callable(type_map[sqlserver_type]) else type_map[sqlserver_type]
return 'CLOB'
def migrate_table(table_name):
try:
oracle_table_name = table_name.upper()
columns_info = get_sqlserver_columns(table_name)
numeric_cols = get_numeric_columns(table_name) # 获取需要转换的列
# 构建Oracle表结构
columns_with_types = [
f'"{col.COLUMN_NAME}" {map_oracle_type(col.DATA_TYPE, col.CHARACTER_MAXIMUM_LENGTH)}'
for col in columns_info
]
with oracle_engine.connect() as conn:
if conn.execute(text("SELECT 1 FROM user_tables WHERE table_name = :name"),
{'name': oracle_table_name}).scalar():
print(f"删除旧表 {oracle_table_name}...")
conn.execute(text(f'DROP TABLE "{oracle_table_name}" PURGE'))
conn.commit()
create_sql = f'CREATE TABLE "{oracle_table_name}" ({", ".join(columns_with_types)})'
print(f"\n[DEBUG] 建表SQL:\n{create_sql}")
conn.execute(text(create_sql))
conn.commit()
chunks = pd.read_sql_table(
table_name,
sql_server_engine,
chunksize=10000
)
for chunk_idx, chunk_df in enumerate(chunks):
# 空数据直接跳过
if len(chunk_df) == 0:
print(f"跳过空数据批次:{chunk_idx + 1}")
continue
# 空值处理
chunk_df = chunk_df.where(pd.notnull(chunk_df), None)
# 动态转换数值列
for col in numeric_cols:
if col in chunk_df.columns:
chunk_df[col] = chunk_df[col].fillna(0).astype('int64')
# 日期类型转换
datetime_cols = [col for col in chunk_df.columns
if pd.api.types.is_datetime64_any_dtype(chunk_df[col])]
for col in datetime_cols:
chunk_df[col] = chunk_df[col].dt.tz_localize(None)
# 构建插入SQL
columns = ', '.join([f'"{col}"' for col in chunk_df.columns])
placeholders = ', '.join([f':{col}' for col in chunk_df.columns])
insert_sql = text(f"""
INSERT INTO "{oracle_table_name}" ({columns})
VALUES ({placeholders})
""")
try:
conn.execute(insert_sql, chunk_df.to_dict(orient='records'))
conn.commit()
print(f"批次 {chunk_idx + 1}: 成功插入 {len(chunk_df)} 行")
except DatabaseError as e:
conn.rollback()
print(f"插入失败: {str(e)}")
print("问题数据样例:", chunk_df.iloc[0].to_dict())
return
except Exception as e:
print(f"严重错误: {str(e)}")
traceback.print_exc()
def migrate_database():
with sql_server_engine.connect() as conn:
tables = conn.execute(text("""
SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE='BASE TABLE'
""")).fetchall()
exclude_tables = ['表名'] # 要过滤的表
for table in tables:
table_name = table[0]
# 跳过特定表(不区分大小写)
if table_name.upper() in [t.upper() for t in exclude_tables]:
print(f"\n{'=' * 30} 跳过表 {table_name} {'=' * 30}")
continue
print(f"\n{'=' * 30} 迁移表 {table_name} {'=' * 30}")
migrate_table(table_name)
if __name__ == '__main__':
# migrate_table('表名') # 单表测试
migrate_database() # 全库迁移
print("\n迁移完成")
上面代码主要是解决整库迁移过程中相关表的创建(备注:不同数据库之间数据类型的映射转换),还有读取原始数据的类型转换 和 分批插入优化。
注意事项:
Oracle
1.在pycharm搜索不到cx_Oracle的库,通过cmd的方式进入python安装环境的目录Scripts下,然后pip install cx_Oracle 进行导入。
2.配置 Oracle Instant Client
1).根据Oracle的版本和操作系统进行相应下载
官方地址:Oracle Instant Client Downloads
2).配置环境变量
- Windows:将解压路径添加到系统
Path
变量。
SqlServer
1.根据SqlServer的版本进行驱动下载安装
官方地址:Download ODBC Driver for SQL Server - ODBC Driver for SQL Server | Microsoft Learn