twisted实现MMORPG 游戏数据库操作封装设计与实现

发布于:2025-02-15 ⋅ 阅读:(37) ⋅ 点赞:(0)

在设计 MMORPG(大规模多人在线角色扮演游戏)时,数据库系统是游戏架构中至关重要的一部分。数据库不仅承担了游戏中各种数据(如玩家数据、物品数据、游戏世界状态等)的存储和管理任务,还必须高效地支持并发访问、事务处理和复杂的查询。为了确保系统的可扩展性和维护性,我们需要对数据库操作进行封装和模块化设计。

为了实现上述目标,本文设计了一个基于 Twisted 的数据库封装系统。Twisted 是一个异步框架,适用于处理大量并发任务。结合 Twisted 和数据库连接池(adbapi.ConnectionPool),我们可以高效地执行异步数据库操作。
在这里插入图片描述

在这里插入图片描述

1. DatabaseError
作用:自定义异常类,用于在数据库操作发生错误时抛出详细的错误信息,包含错误消息和错误码(默认为500)。

2. DatabaseOperation
作用:抽象基类,定义了数据库操作的统一接口。所有具体的数据库操作类(如 SelectOperation, InsertOperation 等)都继承自此类,必须实现 excute 方法来执行数据库事务。
关键方法:
executeQuery:执行实际的 SQL 查询。
handleSuccess:操作成功时的回调函数。
handleFailure:操作失败时的回调函数。

3. SelectOperation
作用:继承自 DatabaseOperation,封装了 SELECT 查询操作。提供了 executeQuery 方法来执行 SQL 查询,并返回查询结果。
关键方法:
executeQuery:执行 SELECT 查询,并根据提供的表名、列名、查询条件等生成 SQL 语句。
excute:实现 DatabaseOperation 中的抽象方法,执行查询操作。

4. InsertOperation
作用:继承自 DatabaseOperation,封装了 INSERT 插入操作。通过 executeQuery 方法生成插入的 SQL 语句并执行。
关键方法:
executeQuery:构建并执行 INSERT SQL 语句,将数据插入指定的表。
excute:实现 DatabaseOperation 中的抽象方法,执行插入操作。


5. UpdateOperation
作用:继承自 DatabaseOperation,封装了 UPDATE 更新操作。通过 executeQuery 方法生成更新的 SQL 语句并执行。
关键方法:
executeQuery:构建并执行 UPDATE SQL 语句,用新值更新指定的行。
excute:实现 DatabaseOperation 中的抽象方法,执行更新操作。


6. DeleteOperation
作用:继承自 DatabaseOperation,封装了 DELETE 删除操作。通过 executeQuery 方法生成删除的 SQL 语句并执行。
关键方法:
executeQuery:构建并执行 DELETE SQL 语句,根据指定条件删除记录。
excute:实现 DatabaseOperation 中的抽象方法,执行删除操作。
7. DatabaseManager
作用:负责数据库连接池的管理和数据库操作的执行。它使用 adbapi.ConnectionPool 创建数据库连接池,执行操作并处理事务。
关键方法:
getConnection:返回数据库连接池的实例。
executeOperation:接受一个数据库操作对象,调用 runInteraction 方法来执行异步数据库事务,并处理操作成功或失败的回调。
8. GameQueryPlayerId
作用:继承自 SelectOperation,封装了查询玩家信息的操作。它指定查询条件为玩家的名称,并通过 excute 方法执行查询。
关键方法:
excute:执行 SelectOperation 中的 executeQuery 方法,查询玩家 ID。
set_query_name:设置查询的玩家名称。

代码

from twisted.enterprise import adbapi
from twisted.internet.defer import Deferred
import pymysql
import traceback
from twisted.internet import reactor
from functools import partial
from abc import ABC, abstractmethod
# 异常类定义
class DatabaseError(Exception):
    def __init__(self, message, code=500):
        self.message = message
        self.code = code


# 抽象的数据库操作类
class IDatabaseOperation(ABC):
    @abstractmethod
    def executeQuery(self, txn, table: str, columns: list, values: dict = {}, condition: dict = None) -> any:
        pass

    def handleSuccess(self, result: any):
        print("Operation succeeded with result:", result)

    def handleFailure(self, error: Exception):
        # 这里做一些额外的错误处理,比如记录日志或者返回友好的错误信息
        print("Operation failed:", error)

    @abstractmethod
    def excute(self, txn):
        #这里封装代码
        pass

class ABC_SelectOperation(IDatabaseOperation):
    def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:
        try:
            column_str = ", ".join(columns) if columns else "*"
            query = f"SELECT {column_str} FROM {table}"

            if condition:
                condition_str = " AND ".join([f"{key} = %s" for key in condition.keys()])
                query += f" WHERE {condition_str}"

            print(f"Executing query: {query}, with values: {tuple(condition.values()) if condition else ()}")
            txn.execute(query, tuple(condition.values()) if condition else ())  # use condition values if any
            result = txn.fetchall()
            return result
        except Exception as e:
            print(f"Error executing query: {e}")
            traceback.print_exc()
            # 返回一个失败的结果以便继续后续操作
            return {"error": str(e)}

    @abstractmethod
    def excute(self, txn):
        pass


class ABC_InsertOperation(IDatabaseOperation):
    def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:
        try:
            column_str = ", ".join(columns)
            placeholders = ", ".join(["%s"] * len(columns))  # Create placeholders based on column length

            # Extract values from the dictionary for each column
            value_tuple = tuple(values[col] for col in columns)

            query = f"INSERT INTO {table} ({column_str}) VALUES ({placeholders})"

            print(f"Executing insert query: {query}, with values: {value_tuple}")
            txn.execute(query, value_tuple)  # Use parameterized query
            return txn.lastrowid  # Return the inserted record ID
        except Exception as e:
            print(f"Error executing insert query: {e}")
            traceback.print_exc()
            # 返回一个失败的结果以便继续后续操作
            return {"error": str(e)}

    @abstractmethod
    def excute(self, txn):
        pass

class ABC_UpdateOperation(IDatabaseOperation):
    def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:
        try:
            if condition and not isinstance(condition, dict):
                raise TypeError("Condition must be a dictionary")

            set_str = ", ".join([f"{col} = %s" for col in columns])
            query = f"UPDATE {table} SET {set_str}"

            # Ensure that values is passed as a tuple for update
            value_tuple = tuple(values[col] for col in columns)

            if condition:
                condition_str = " AND ".join([f"{key} = %s" for key in condition.keys()])
                query += f" WHERE {condition_str}"
                value_tuple += tuple(condition.values())  # Append condition values

            print(f"Executing update query: {query}, with values: {value_tuple}")
            txn.execute(query, value_tuple)  # Use parameterized query
            txn.connection.commit()
            return txn.rowcount  # Return the number of updated rows
        except Exception as e:
            print(f"Error executing update query: {e}")
            traceback.print_exc()
            # 返回一个失败的结果以便继续后续操作
            return {"error": str(e)}

    @abstractmethod
    def excute(self, txn):
        pass


class ABC_DeleteOperation(IDatabaseOperation):
    def executeQuery(self, txn, table: str, columns: list = None, values: dict = None, condition: dict = None) -> any:
        try:
            if not condition:
                raise ValueError("Condition for deletion cannot be empty.")
            condition_str = " AND ".join([f"{key} = %s" for key in condition.keys()])
            query = f"DELETE FROM {table} WHERE {condition_str}"

            print(f"Executing delete query: {query}, with values: {tuple(condition.values())}")
            txn.execute(query, tuple(condition.values()))  # Use condition values for parameterized query
            return txn.rowcount
        except Exception as e:
            print(f"Error executing delete query: {e}")
            traceback.print_exc()
            # 返回一个失败的结果以便继续后续操作
            return {"error": str(e)}

    @abstractmethod
    def excute(self):
        pass



# 数据库管理类,负责数据库连接池和事务
class DatabaseManager:
    def __init__(self, db_config):
        # 初始化数据库连接池
        self.dbConnectionPool = adbapi.ConnectionPool("pymysql", **db_config)

    def getConnection(self):
        return self.dbConnectionPool

    def executeOperation(self, operation: DatabaseOperation) -> Deferred:
        try:
            # 使用 partial 创建一个指定了参数的函数

            deferred = self.dbConnectionPool.runInteraction(operation.excute)
            deferred.addCallback(operation.handleSuccess)
            deferred.addErrback(operation.handleFailure)
            return deferred
        except Exception as e:
            error = DatabaseError(str(e), 500)
            operation.handleFailure(error)
            return None


# 示例数据库配置
db_config = {
    'host': 'localhost',
    'user': 'root',
    'password': 'root',
    'database': 'test',
}


class GameQueryPlayerId(ABC_SelectOperation):
    def __init__(self):
        self.select_columns = ["name", "id"]
        self.elect_condition = {"name": "new_name"}  # 在此给出查询条件
    def excute(self, txn):

        # 执行查询操作
        return self.executeQuery(txn,"test", columns=self.select_columns, condition=self.elect_condition )

    def set_query_name(self, name):
        self.elect_condition["name"] = name

# 示例操作
def main():

    # 创建DatabaseManager实例
    db_manager = DatabaseManager(db_config)
    ccGameQueryPlayerId = GameQueryPlayerId()
    ccGameQueryPlayerId.set_query_name("new_name")
    deferred = db_manager.executeOperation(ccGameQueryPlayerId)



    reactor.run()


if __name__ == "__main__":
    main()


网站公告

今日签到

点亮在社区的每一天
去签到