sqlalchemy 分表实现方案

发布于:2024-05-10 ⋅ 阅读:(26) ⋅ 点赞:(0)
1.需求及场景概述

        现有系统中因历史数据量过大,产生了将历史数据进行按月存储的要求,系统和数据库交互使用的是sqlalchemy,假设系统的原来的历史记录表(record)如下:

为了将历史数据按月分表存储,我们需要以此表为基础按月创建对应的月表来进行分表存储,同时又要使用orm的功能。面对这样的需求我们很自然的会想到创建如下模型

class Recode_202405(Base): 
    __tablename__ = 'record_202405' 
    id = Column(INT(11), primary_key=True) 
    name = Column(String(100, 'utf8mb4_unicode_ci'))

这样当然可以,但是我们不可能每月手动去创建这个模型,然后重启自己的服务,这明显有问题, 那如何解决呢,下面就介绍一种在这种场景下基于sqlalchemy实现的分表存储方案。

首先,我们对我们应用场景及需求进行一下描述:

1.我们有一张基础表,这个表作为我们创建月表的模板。

2.当有新数据需要入库时,我们将数据按月存储到当前月份的月表中,如果当月表不存在系统自动创建。

3.支持使用OMR方便数据存储及查询。

2.实现方案

        既然要支持使用ORM ,我们势必要获取月表的model。sqlalchemy0.9.1版本推出了Automap,它可以自动映射数据库的表,通过数据表名映射model。大概的用法如下:

from sqlalchemy.ext.automap import automap_base

AutoBase = automap_base()

# reflect the tables
AutoBase.prepare(engine, reflect=True)
tablename = "record_202405"
RecordDao = getattr(AutoBase.classes, tablename)

        既然可以通过表名映射回model,那么现在的问题就是如何基于已有的基础表创建当月的月表,可以参考如下代码:

from sqlalchemy.ext.automap import automap_base
AutoBase = automap_base()
  
table_name = 'record'
date = '202405'
# 基于基础表创建指定月份的月表
base_table = autobase.metadata.tables[table_name]
mdata = MetaData()
new_table = base_table.tometadata(metadata=mdata, name=f'{table_name}_{date}')
try:
    # 创建表
    new_table.create(bind=engine)
except BaseException as e:
    print(e)

基于上述方案,我们可以封装一个方法,该方法接收两个参数,一个是表名,一个是分表的月份,其实可以进行任何程度的分表,返回该表的model,通过sqlalchemy 的ORM 进行增删改查操作,整体代码如下:

from sqlalchemy import create_engine, MetaData
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.ext.automap import automap_base
from urllib import parse
from contextlib import contextmanager
import time


mysql_user = 'root'
mysql_password = 'root'
mysql_host = '127.0.0.1'
mysql_port = 3306
mysql_db = 'db'


SQLALCHEMY_DATABASE_URL = f'mysql+pymysql://{mysql_user}:{parse.quote_plus(mysql_password)}@{mysql_host}:{mysql_port}/{mysql_db}?charset=utf8'

engine = create_engine(SQLALCHEMY_DATABASE_URL, pool_recycle=7200)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)


@contextmanager
def session_maker():
    try:
        db: Session = SessionLocal()
        yield db
        db.commit()
    except Exception as e:
        print(e)
        db.rollback()
    finally:
        db.close()


def get_table_model(table_name, date=None):
    autobase = automap_base()
    autobase.prepare(engine, reflect=True)
    print(autobase)

    base_model = None
    try:
        base_model = getattr(autobase.classes, table_name)
    except Exception as e:
        print(e)
        return base_model
    if date == None: # 不分表的情况
        return base_model
    else:            # 分表的情况
        # 获取基础类
        base_table = autobase.metadata.tables[table_name]
        mdata = MetaData()
        new_table = base_table.tometadata(metadata=mdata, name=f'{table_name}_{date}')
        # new_table = base_model.tometadata(metadata=mdata, name=f'{table_name}_{date}')

        try:
            # 创建表
            new_table.create(bind=engine)
        except BaseException as e:
            # 忽略建表异常,存在多进程同时建表情况,忽略后刷新autobase再试
            print(e)
            
        '''Automap的映射虽然是自动的,但是只有在启动的时候生效,也就是说如果新建一个数据表,
        而没有告诉Automap,那这个表是找不到的。在实际使用中,可以捕获AttributeError异常,
        并再次调用AutoBase.prepare(engine, reflect=True) 刷新映射关系。'''
        autobase = automap_base()
        autobase.prepare(engine, reflect=True)
        base_model = getattr(autobase.classes, f'{table_name}_{date}')

        return base_model



def get_table_model(table_name, date=None):
    autobase = automap_base()
    autobase.prepare(engine, reflect=True)
    print(autobase)

    base_model = None
    try:
        base_model = getattr(autobase.classes, table_name)
    except Exception as e:
        print(e)
        return base_model
    if date == None: # 不分表的情况
        return base_model
    else:            # 分表的情况
        # 获取基础类
        base_table = autobase.metadata.tables[table_name]
        mdata = MetaData()
        new_table = base_table.tometadata(metadata=mdata, name=f'{table_name}_{date}')
        # new_table = base_model.tometadata(metadata=mdata, name=f'{table_name}_{date}')

        try:
            # 创建表
            new_table.create(bind=engine)
        except BaseException as e:
            # 忽略建表异常,存在多进程同时建表情况,忽略后刷新autobase再试
            print(e)
            
        '''Automap的映射虽然是自动的,但是只有在启动的时候生效,也就是说如果新建一个数据表,
        而没有告诉Automap,那这个表是找不到的。在实际使用中,可以捕获AttributeError异常,
        并再次调用AutoBase.prepare(engine, reflect=True) 刷新映射关系。'''
        autobase = automap_base()
        autobase.prepare(engine, reflect=True)
        base_model = getattr(autobase.classes, f'{table_name}_{date}')

        return base_model
        

# 使用
Record = database.get_table_model("record", '202406')

ll = Record(name="1111")

with database.session_maker() as db:
    db.add(ll)
    db.commit()
    ll = db.query(Record).all()
    print(ll)

参考:sqlalchemy分表操作 - 知乎