from django.db import connection
from django.db import transaction
from django.db.utils import (
IntegrityError,
OperationalError,
ProgrammingError,
DataError
)
from django.utils import timezone
class Db(object):
"""数据库操作工具类,封装了CRUD、批量操作、事务和软删除功能
数据表必须要有:
updated_at-更新时间
created_at-创建时间
is_deleted-软删除标志 0为删除,1已删除
deleted_at-删除时间
"""
# ------------------------------
# 核心查询方法
# ------------------------------
def execute_query(self, sql, params=None, fetchone=False, return_last_id=False):
"""
执行SQL查询
:param sql: SQL语句
:param params: 查询参数(元组)
:param fetchone: 是否只返回第一条记录(SELECT专用)
:param return_last_id: 是否返回最后插入的ID(INSERT/UPDATE/DELETE专用)
:return: SELECT返回字典列表/单条字典,其他语句返回受影响行数或最后插入ID
"""
try:
with connection.cursor() as cursor:
cursor.execute(sql, params or ())
# 打印sql
# print(f"{sql}====\n===={params}")
if sql.strip().lower().startswith(('select', 'show')):
columns = [col[0] for col in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
return results[0] if fetchone and results else results
else:
if return_last_id:
try:
return cursor.lastrowid
except AttributeError:
# 如果数据库后端不支持lastrowid,则返回受影响行数
return cursor.rowcount
else:
return cursor.rowcount
except IntegrityError as e:
error_msg = f"数据完整性错误(可能原因:主键冲突、唯一约束重复):{str(e)}"
print(error_msg)
raise ValueError(error_msg) from e
except OperationalError as e:
error_msg = f"数据库连接/操作失败(可能原因:数据库未启动、网络中断):{str(e)}"
print(error_msg)
raise ConnectionError(error_msg) from e
except ProgrammingError as e:
error_msg = f"SQL语法/表结构错误(可能原因:表/字段不存在、SQL拼写错误):{str(e)}"
print(error_msg)
raise SyntaxError(error_msg) from e
except DataError as e:
error_msg = f"数据格式错误(可能原因:类型不匹配、长度超限):{str(e)}"
print(error_msg)
raise TypeError(error_msg) from e
except Exception as e:
error_msg = f"意外错误:{str(e)}"
print(error_msg)
raise
# ------------------------------
# 插入操作
# ------------------------------
def insert_data(self, table, data, auto_time=True, return_last_id=True):
"""
插入单条数据
:param table: 表名字符串
:param data: 数据字典,格式 {字段名: 值}
:param auto_time: 是否自动添加时间字段(默认True)
:param return_last_id: 是否返回最后插入的ID
:return: 最后插入的ID或受影响的行数
"""
fields = list(data.keys())
values = list(data.values())
if auto_time:
now = timezone.now()
fields.extend(['created_at', 'updated_at'])
values.extend([now, now])
placeholders = ', '.join(['%s'] * len(fields))
sql = f"INSERT INTO {table} ({', '.join(fields)}) VALUES ({placeholders})"
return self.execute_query(sql, values, return_last_id=return_last_id)
def batch_insert(self, table, data_list, auto_time=True, batch_size=1000):
"""
批量插入数据(自动分批次)
:param table: 表名字符串
:param data_list: 数据字典列表,格式 [{字段名: 值}, ...]
:param auto_time: 是否自动添加时间字段(默认True)
:param batch_size: 每批次处理的记录数(默认1000)
:return: 总共插入的行数
"""
if not data_list:
return 0
total_rows = 0
total_batches = (len(data_list) + batch_size - 1) // batch_size
for i in range(total_batches):
start = i * batch_size
end = start + batch_size
batch_data = data_list[start:end]
first_item = batch_data[0].copy()
fields = list(first_item.keys())
if auto_time:
now = timezone.now()
fields.extend(['created_at', 'updated_at'])
for item in batch_data:
item['created_at'] = now
item['updated_at'] = now
placeholders = ', '.join(['%s'] * len(fields))
value_groups = [tuple(item.values()) for item in batch_data]
flat_values = [val for group in value_groups for val in group]
sql = f"INSERT INTO {table} ({', '.join(fields)}) VALUES "
sql += ', '.join([f"({placeholders})"] * len(batch_data))
rows = self.execute_query(sql, flat_values)
total_rows += rows
return total_rows
# ------------------------------
# 更新操作
# ------------------------------
def update_data(self, table, data, where, params=None, auto_time=True, return_last_id=False):
"""
更新数据
:param table: 表名字符串
:param data: 要更新的数据字典,格式 {字段名: 值}
:param where: WHERE条件(不带WHERE关键字),例如 "id = %s"
:param params: WHERE条件的参数列表
:param auto_time: 是否自动更新updated_at字段(默认True)
:param return_last_id: 是否返回最后插入的ID(在支持的数据库中可能返回更新的行ID)
:return: 受影响的行数或最后插入ID
"""
set_items = []
values = list(data.values())
if auto_time:
data['updated_at'] = timezone.now()
set_items = [f"{field} = %s" for field in data.keys()]
values = list(data.values())
else:
set_items = [f"{field} = %s" for field in data.keys()]
set_clause = ', '.join(set_items)
sql = f"UPDATE {table} SET {set_clause} WHERE {where}"
if params:
values.extend(params)
return self.execute_query(sql, values, return_last_id=return_last_id)
def batch_update(self, table, data_list, where_field='id', auto_time=True, batch_size=1000):
"""
批量更新数据(使用CASE WHEN优化)
:param table: 表名字符串
:param data_list: 数据字典列表,每个字典必须包含where_field字段
:param where_field: 用于匹配记录的字段(默认id)
:param auto_time: 是否自动更新updated_at字段(默认True)
:param batch_size: 每批次处理的记录数(默认1000)
:return: 总共更新的行数
"""
if not data_list:
return 0
total_rows = 0
total_batches = (len(data_list) + batch_size - 1) // batch_size
for i in range(total_batches):
start = i * batch_size
end = start + batch_size
batch_data = data_list[start:end]
case_clauses = []
values = []
update_fields = {k for item in batch_data for k in item.keys() if k != where_field}
for field in update_fields:
case_sql = f"{field} = CASE {where_field} "
for item in batch_data:
case_sql += f"WHEN %s THEN %s "
values.extend([item[where_field], item[field]])
case_sql += "END"
case_clauses.append(case_sql)
where_ids = [item[where_field] for item in batch_data]
values.extend(where_ids)
if auto_time:
now = timezone.now()
case_clauses.append(f"updated_at = %s")
values.append(now)
set_clause = ', '.join(case_clauses)
where_placeholders = ', '.join(['%s'] * len(where_ids))
sql = f"UPDATE {table} SET {set_clause} WHERE {where_field} IN ({where_placeholders})"
rows = self.execute_query(sql, values)
total_rows += rows
return total_rows
# ------------------------------
# 删除/软删除操作
# ------------------------------
def soft_delete(self, table, where, params=None, return_last_id=False):
"""
软删除数据(标记is_deleted=1)
:param table: 表名字符串
:param where: WHERE条件(不带WHERE关键字),例如 "id = %s"
:param params: WHERE条件的参数列表
:param return_last_id: 是否返回最后插入的ID(在支持的数据库中可能返回删除的行ID)
:return: 受影响的行数或最后插入ID
"""
data = {'is_deleted': 1, 'deleted_at': timezone.now()}
return self.update_data(table, data, where, params, auto_time=False, return_last_id=return_last_id)
def delete_data(self, table, where, params=None, hard_delete=False, return_last_id=False):
"""
删除数据(默认软删除)
:param table: 表名字符串
:param where: WHERE条件(不带WHERE关键字),例如 "id = %s"
:param params: WHERE条件的参数列表
:param hard_delete: 是否执行硬删除(物理删除)
:param return_last_id: 是否返回最后插入的ID(在支持的数据库中可能返回删除的行ID)
:return: 受影响的行数或最后插入ID
"""
if hard_delete:
sql = f"DELETE FROM {table} WHERE {where}"
return self.execute_query(sql, params, return_last_id=return_last_id)
else:
return self.soft_delete(table, where, params, return_last_id=return_last_id)
# ------------------------------
# 查询操作
# ------------------------------
def get_list(self, table, where=None, params=None, order_by=None, limit=None, offset=None,
with_deleted=False, only_deleted=False, fields=None):
"""
查询列表数据
:param table: 表名字符串
:param where: WHERE条件(不带WHERE关键字)
:param params: WHERE条件的参数列表
:param order_by: 排序字段,例如 "created_at DESC"
:param limit: 返回记录数限制
:param offset: 偏移量(用于分页)
:param with_deleted: 是否包含已删除数据
:param only_deleted: 是否只返回已删除数据
:param fields: 要返回的字段列表,默认返回所有字段
:return: 符合条件的记录列表
"""
if fields:
select_str = ', '.join(fields)
else:
select_str = '*'
conditions = []
query_params = []
if where:
conditions.append(where)
if params:
query_params.extend(params)
if not with_deleted:
if only_deleted:
conditions.append("is_deleted = 1")
else:
conditions.append("is_deleted = 0")
order_str = f"ORDER BY {order_by}" if order_by else ""
limit_str = f"LIMIT {limit}" if limit is not None else ""
offset_str = f"OFFSET {offset}" if offset is not None else ""
where_clause = " AND ".join(conditions) if conditions else ""
where_str = f"WHERE {where_clause}" if where_clause else ""
sql = f"SELECT {select_str} FROM {table} {where_str} {order_str} {limit_str} {offset_str}"
return self.execute_query(sql, query_params)
def get_one(self, table, where, params=None, with_deleted=False, only_deleted=False, fields=None):
"""
查询单条数据
:param table: 表名字符串
:param where: WHERE条件(不带WHERE关键字)
:param params: WHERE条件的参数列表
:param with_deleted: 是否包含已删除数据
:param only_deleted: 是否只返回已删除数据
:param fields: 要返回的字段列表,默认返回所有字段
:return: 符合条件的单条记录或None
"""
return self.get_list(
table, where, params,
with_deleted=with_deleted,
only_deleted=only_deleted,
fields=fields,
limit=1
)
def get_count(self, table, where=None, params=None, with_deleted=False, only_deleted=False):
"""
查询记录总数
:param table: 表名字符串
:param where: WHERE条件(不带WHERE关键字)
:param params: WHERE条件的参数列表
:param with_deleted: 是否包含已删除数据
:param only_deleted: 是否只返回已删除数据
:return: 记录总数
"""
conditions = []
query_params = []
if where:
conditions.append(where)
if params:
query_params.extend(params)
if not with_deleted:
if only_deleted:
conditions.append("is_deleted = 1")
else:
conditions.append("is_deleted = 0")
where_clause = " AND ".join(conditions) if conditions else ""
where_str = f"WHERE {where_clause}" if where_clause else ""
sql = f"SELECT COUNT(*) AS count FROM {table} {where_str}"
result = self.execute_query(sql, query_params, fetchone=True)
return result['count'] if result else 0
# ------------------------------
# 事务操作
# ------------------------------
def execute_transaction(self, operations):
"""
执行一组SQL操作作为事务
:param operations: SQL操作列表,格式 [(SQL语句, 参数元组), ...]
:return: 每个操作的返回结果列表
:raises: 任何操作失败时抛出异常,所有操作回滚
"""
with transaction.atomic():
results = []
for idx, (sql, params) in enumerate(operations):
try:
result = self.execute_query(sql, params)
results.append(result)
except Exception as e:
raise RuntimeError(f"事务中第{idx + 1}条SQL执行失败:{str(e)}") from e
return results
# ------------------------------
# 数据恢复
# ------------------------------
def restore_data(self, table, where, params=None):
"""
恢复软删除的数据(is_deleted=0)
:param table: 表名字符串
:param where: WHERE条件(不带WHERE关键字),例如 "id = %s"
:param params: WHERE条件的参数列表
:return: 受影响的行数
"""
data = {'is_deleted': 0, 'deleted_at': None}
return self.update_data(table, data, where, params, auto_time=True)
def get_count(self, table, where=None, params=None, with_deleted=False, only_deleted=False):
"""
查询记录总数
:param table: 表名字符串
:param where: WHERE条件(不带WHERE关键字)
:param params: WHERE条件的参数列表
:param with_deleted: 是否包含已删除数据
:param only_deleted: 是否只返回已删除数据
:return: 记录总数
"""
conditions = []
query_params = []
if where:
conditions.append(where)
if params:
query_params.extend(params)
# if not with_deleted:
# if only_deleted:
# conditions.append("is_deleted = 1")
# else:
# conditions.append("is_deleted = 0")
where_clause = " AND ".join(conditions) if conditions else ""
where_str = f"WHERE {where_clause}" if where_clause else ""
sql = f"SELECT COUNT(*) AS count FROM {table} {where_str}"
result = self.execute_query(sql, query_params, fetchone=True)
return result['count'] if result else 0