查询
使用sql语句查询:
from urllib import parse
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData
con = create_engine('mysql+pymysql://root:root@localhost:3306/testdb?charset=utf8')
# pd.read_sql(sql,con)
result = con.execute("SELECT * FROM testtable")
# 处理查询结果
for row in result:
print(row)
con.dispose()
使用ORM方式查询:
from urllib import parse
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData
metadata = MetaData()
employees = Table('testtable', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('email', String),
)
engine = create_engine('mysql+pymysql://root:root@localhost:3306/testdb?charset=utf8')
# pd.read_sql(sql,con)
s = employees.select()
conn = engine.connect()
result = conn.execute(s)
# 处理查询结果
for row in result:
print(row)
conn.close()
engine.dispose()
插入:
from urllib import parse
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData
metadata = MetaData()
employees = Table('testtable', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('email', String),
)
engine = create_engine('mysql+pymysql://root:root@localhost:3306/testdb?charset=utf8')
# pd.read_sql(sql,con)
ins = employees.insert().values(id=11, name="nameereee", email="kkkkkkkkkk")
# 执行插入语句
conn = engine.connect()
conn.execute(ins)
# 关闭连接
conn.close()
engine.dispose()
删除:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
engine = create_engine('mysql+pymysql://root:root@localhost:3306/testdb?charset=utf8')
metadata = MetaData()
employees = Table('testtable', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('email', String),
)
conn = engine.connect()
stmt = employees.delete().where(employees.c.name == 'nameereee')
conn.execute(stmt)
s = employees.select()
conn.execute(s).fetchall()
更新:
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String
engine = create_engine('mysql+pymysql://root:root@localhost:3306/testdb?charset=utf8')
metadata = MetaData()
employees = Table('testtable', metadata,
Column('id', Integer, primary_key=True),
Column('name', String),
Column('email', String),
)
conn = engine.connect()
stmt=employees.update().where(employees.c.id=='3').values(name='Kapoor')
conn.execute(stmt)
s = employees.select()
conn.execute(s).fetchall()