Peewee+Postgresql+PooledPostgresqlDatabase重连机制

发布于:2024-09-18 ⋅ 阅读:(132) ⋅ 点赞:(0)

需求:

Postgresql数据库服务重启后,需要业务代码正常读写数据库

方案:

  1. 通过继承playhouse.shortcuts.ReconnectMixin和playhouse.pool.PooledPostgresqlDatabase来创建一个新的ReconnectPooledPostgresqlDatabase类
  2. 修改reconnect_errors属性来适配Postgresql的错误类型
  3. 使用ReconnectPooledPostgresqlDatabase来获取数据库连接

测试

  1. 启动程序 -->重起数据库服务–>读写数据库操作正常
  2. 启动应用程序–>关闭数据库服务–>读写数据库失败–>启动数据库服务–>读写数据库操作正常

示例:

from playhouse.pool import PooledPostgresqlDatabase
from peewee import OperationalError, InterfaceError
from playhouse.shortcuts import ReconnectMixin


class ReconnectPooledPostgresqlDatabase(ReconnectMixin, PooledPostgresqlDatabase):
    """
    支持重连机制的数据库连接池类,可以通过扩展reconnect_errors来支持重连场景
    """
    reconnect_errors = (
        # Postgres error examples:
        (OperationalError, 'terminat'),
        (InterfaceError, 'connection already closed')
    )


class PostgresqlManager(object):
    """
    PG数据库管理类
    """
    __instance_lock = threading.Lock()
    __database = None

    @classmethod
    def get_database(cls, **kwargs):
        if cls.__database is None:
            with cls.__instance_lock:
                cls.__database = PooledPostgresqlDatabase(
                    database=kwargs.get("database"),
                    host=kwargs.get("host", "127.0.0.1"),
                    port=int(kwargs.get("port", 5432)),
                    user=kwargs.get("user", "postgres"),
                    password=kwargs.get("password", "postgres"),
                    max_connections=int(kwargs.get("max_connections", 50)),
                    stale_timeout=int(kwargs.get("stale_timeout", 600))
                )
        return cls.__database

网站公告

今日签到

点亮在社区的每一天
去签到