python3连接数据库工具类之Oracle

发布于:2025-05-11 ⋅ 阅读:(16) ⋅ 点赞:(0)

背景

在数据集成业务中集成会需要连接到Oralce读取和更新数据,定义一个用于连接 Oracle 数据库的工具类方便其他脚本中使用可以有以下好处:

  • 提升可维护性:集中管理配置信息和异常处理逻辑,修改配置或处理异常时只需在工具类一处操作。
  • 增强复用性:避免重复编写数据库连接和关闭代码,便于在不同模块使用,也利于添加新操作方法。
  • 保障安全性:封装敏感信息,实现统一权限管理,降低信息泄露风险。
  • 便于代码测试:可模拟数据库连接和操作,避免测试影响真实数据库。

直接上代码

#!/usr/bin/env python
# encoding: utf-8
# @author: Evan#chengwenit.com
# Pyhton访问Oracle工具类

import cx_Oracle
from cjc_cfg import oracleconn  # 从配置文件中导入默认的 Oracle 连接信息

class OracleDBClient:
    def __init__(self, user=None, password=None, host=None, port=None, sid=None):
        """
        初始化数据库连接信息,默认从 cjc_cfg.oracleconn 获取
        :param user: 数据库用户名
        :param password: 数据库密码
        :param host: 数据库主机地址
        :param port: 数据库端口
        :param sid: 数据库SID
        """
        self.user = user or oracleconn["user"]
        self.password = password or oracleconn["passwd"]
        self.host = host or oracleconn["host"]
        self.port = port or int(oracleconn["port"])  # 确保是整数
        self.sid = sid or oracleconn["sid"]
        self.connection = None
        self.cursor = None

    def connect(self):
        """建立数据库连接"""
        try:
            dsn = cx_Oracle.makedsn(self.host, self.port, sid=self.sid)
            self.connection = cx_Oracle.connect(user=self.user, password=self.password, dsn=dsn)
            self.cursor = self.connection.cursor()
        except cx_Oracle.DatabaseError as e:
            raise Exception(f"数据库连接失败: {e}")

    def disconnect(self):
        """关闭数据库连接"""
        if self.cursor:
            self.cursor.close()
        if self.connection:
            self.connection.close()

    def _dict_fetchall(self, cursor):
        """将查询结果转换为字典列表"""
        columns = [desc[0] for desc in cursor.description]
        return [dict(zip(columns, row)) for row in cursor.fetchall()]

    def execute_query(self, query: str, parameters=None):
        """执行查询语句并返回字典格式结果"""
        try:
            if parameters:
                self.cursor.execute(query, parameters)
            else:
                self.cursor.execute(query)
            return self._dict_fetchall(self.cursor)
        except cx_Oracle.DatabaseError as e:
            raise Exception(f"查询执行失败: {e}")

    def execute_non_query(self, query: str, parameters=None):
        """执行非查询语句(如 INSERT、UPDATE、DELETE)"""
        try:
            if parameters:
                self.cursor.execute(query, parameters)
            else:
                self.cursor.execute(query)
            self.connection.commit()
        except cx_Oracle.DatabaseError as e:
            self.connection.rollback()
            raise Exception(f"非查询语句执行失败: {e}")

    def execute_many(self, query: str, parameters_list):
        """批量执行 SQL 语句"""
        try:
            self.cursor.executemany(query, parameters_list)
            self.connection.commit()
        except cx_Oracle.DatabaseError as e:
            self.connection.rollback()
            raise Exception(f"批量执行失败: {e}")

网站公告

今日签到

点亮在社区的每一天
去签到