背景
在数据集成业务中集成会需要连接到Oralce读取和更新数据,定义一个用于连接 Oracle 数据库的工具类方便其他脚本中使用可以有以下好处:
- 提升可维护性:集中管理配置信息和异常处理逻辑,修改配置或处理异常时只需在工具类一处操作。
- 增强复用性:避免重复编写数据库连接和关闭代码,便于在不同模块使用,也利于添加新操作方法。
- 保障安全性:封装敏感信息,实现统一权限管理,降低信息泄露风险。
- 便于代码测试:可模拟数据库连接和操作,避免测试影响真实数据库。
直接上代码
#!/usr/bin/env python
# encoding: utf-8
# @author: Evan#chengwenit.com
# Pyhton访问Oracle工具类
import cx_Oracle
from cjc_cfg import oracleconn # 从配置文件中导入默认的 Oracle 连接信息
class OracleDBClient:
def __init__(self, user=None, password=None, host=None, port=None, sid=None):
"""
初始化数据库连接信息,默认从 cjc_cfg.oracleconn 获取
:param user: 数据库用户名
:param password: 数据库密码
:param host: 数据库主机地址
:param port: 数据库端口
:param sid: 数据库SID
"""
self.user = user or oracleconn["user"]
self.password = password or oracleconn["passwd"]
self.host = host or oracleconn["host"]
self.port = port or int(oracleconn["port"]) # 确保是整数
self.sid = sid or oracleconn["sid"]
self.connection = None
self.cursor = None
def connect(self):
"""建立数据库连接"""
try:
dsn = cx_Oracle.makedsn(self.host, self.port, sid=self.sid)
self.connection = cx_Oracle.connect(user=self.user, password=self.password, dsn=dsn)
self.cursor = self.connection.cursor()
except cx_Oracle.DatabaseError as e:
raise Exception(f"数据库连接失败: {e}")
def disconnect(self):
"""关闭数据库连接"""
if self.cursor:
self.cursor.close()
if self.connection:
self.connection.close()
def _dict_fetchall(self, cursor):
"""将查询结果转换为字典列表"""
columns = [desc[0] for desc in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
def execute_query(self, query: str, parameters=None):
"""执行查询语句并返回字典格式结果"""
try:
if parameters:
self.cursor.execute(query, parameters)
else:
self.cursor.execute(query)
return self._dict_fetchall(self.cursor)
except cx_Oracle.DatabaseError as e:
raise Exception(f"查询执行失败: {e}")
def execute_non_query(self, query: str, parameters=None):
"""执行非查询语句(如 INSERT、UPDATE、DELETE)"""
try:
if parameters:
self.cursor.execute(query, parameters)
else:
self.cursor.execute(query)
self.connection.commit()
except cx_Oracle.DatabaseError as e:
self.connection.rollback()
raise Exception(f"非查询语句执行失败: {e}")
def execute_many(self, query: str, parameters_list):
"""批量执行 SQL 语句"""
try:
self.cursor.executemany(query, parameters_list)
self.connection.commit()
except cx_Oracle.DatabaseError as e:
self.connection.rollback()
raise Exception(f"批量执行失败: {e}")