关于python:python-sqlalchemy-pg

11次阅读

共计 2218 个字符,预计需要花费 6 分钟才能阅读完成。


from sqlalchemy import Column, String, create_engine, Integer, Text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
import time
 
# 创建对象的基类:
Base = declarative_base()
 
 
# 定义 User 对象:
class User(Base):
    # 表的名字:
    __tablename__ = 'users'
 
    # 表的构造:
    id = Column(Integer, autoincrement=True, primary_key=True, unique=True, nullable=False)
    name = Column(String(50), nullable=False)
    sex = Column(String(4), nullable=False)
    nation = Column(String(20), nullable=False)
    birth = Column(String(8), nullable=False)
    id_address = Column(Text, nullable=False)
    id_number = Column(String(18), nullable=False)
    creater = Column(String(32))
    create_time = Column(String(20), nullable=False)
    updater = Column(String(32))
    update_time = Column(String(20), nullable=False, default=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
                         onupdate=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
    comment = Column(String(200))
 
 
# 初始化数据库连贯:
engine = create_engine('postgres://postgres:123123@localhost:5432/postgres')  # 用户名: 明码 @localhost: 端口 / 数据库名
 
# 创立 DBSession 类型:
DBSession = sessionmaker(bind=engine)
 
def createTable():
    # 创立表
    Base.metadata.create_all(engine)
 
 
def insertData():
    # 插入操作
    # 创立会话
    session = DBSession()
    # 创立新 User 对象:
    local_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    new_user = User(name='hsh4', sex='男', nation='汉', birth='19991012', id_address='广州大学', id_number='441242142142',
                    create_time=local_time)
    # 增加到 session:
    session.add(new_user)
    # 提交即保留到数据库:
    session.commit()
    # 敞开 session:
    session.close()
 
def selectData():
    # 查问操作
    # 创立 Session
    session = DBSession()
    # 创立 Query 查问,filter 是 where 条件,最初调用 one() 返回惟一行,如果调用 all() 则返回所有行:
    user = session.query(User).filter(User.id == '1' and User.name == 'hsh4').one()
    print('name:', user.name)
    print('id_address:', user.id_address)
    session.close()  # 敞开 Session
 
def updateData():
    # 更新操作
    session = DBSession()  # 创立会话
    users = session.query(User).filter_by(name="hsh4").first()  # 查问条件
    users.id_number = "abcd"  # 更新操作
    session.add(users)  # 增加到会话
    session.commit()  # 提交即保留到数据库
    session.close()  # 敞开会话
 
def deleteData():
    # 删除操作
    session = DBSession()  # 创立会话
    delete_users = session.query(User).filter(User.id == "1").first()
    if delete_users:
        session.delete(delete_users)
        session.commit()
    session.close()  # 敞开会话
 
def dropTable():
    # 删除表
    session = DBSession()  # 创立会话
    session.execute('drop table users')
    session.commit()
    session.close()
 
 
# createTable()
# insertData()
# selectData()
# updateData()
# deleteData()
正文完
 0