第4天在Flask应用中使用数据库FlaskSQLAlchemy

原文: http://www.catonlinepy.tech/声明:原创不易,未经许可,不得转载 1. 你将学会什么接第3天表单的使用课程,今天的课程主要涉及到与数据库相关的两个插件的使用,一个是Flask_SQLAlchemy,另外一个是Flask_Migrate。通过今天的学习,你将学会如何对数据库进行基本的操作,以及如何完成数据库的迁移。教程中的代码都会托管到github上,猫姐不厌其烦地强调,在学习本课内容时一定要自己尝试手敲代码,遇到问题再到github上查看代码,如果实在不知道如何解决,可以在日志下方留言。 2.使用Flask_SQLAlchemy管理数据库2.1 Flask_SQLAlchemy的安装对于web后台开发工作,必须要掌握的一项技能便是对数据库的CRUD(create, read, update, delete)操作,如果开发过程中直接使用原生的sql语句对数据库进行操作,将是非常痛苦的事件(毕竟sql语句有很多反人类的设计)。Flask_SQLAlchemy插件将开发人员从这个泥潭中解救出来了,我们只需要使用Python的类就能轻松的完成对表的增删改查操作,并且该插件还支持多种数据库类型,如MySQL、PostgreSQL、和SQLite等。 在进入正式学习之前,我们照旧要建立今天的项目目录,如下: # 进入到虚拟环境目录,激活虚拟环境maojie@Thinkpad:~/flask-plan/$ source miao_venv/bin/activate# 到flask-course-primary目录下创建第四天的课程day4目录(miao_venv) maojie@Thinkpad:~/flask-plan/flask-course-primary$ mkdir day4# 进入day4目录(miao_venv) maojie@Thinkpad:~/flask-plan/flask-course-primary$ cd day4# 新建database_demo目录(miao_venv) maojie@Thinkpad:~/flask-plan/flask-course-primary/day4$ mkdir database_demo# 进入到database_demo目录(miao_venv) maojie@Thinkpad:~/flask-plan/flask-course-primary/day4$ cd database_demo/# 在database_demo目录中新建__init__.py文件(miao_venv) maojie@Thinkpad:~/flask-plan/flask-course-primary/day4/database_demo$ touch __init__.py# 在database_demo包中新建routes.py路由文件(miao_venv) maojie@Thinkpad:~/flask-plan/flask-course-primary/day4/database_demo$ touch routes.py# 在day4目录下新建run.py文件(miao_venv) maojie@Thinkpad:~/flask-plan/flask-course-primary/day4/$ touch run.py安装Flask_SQLAlchemy插件还是使用pip命令,如下:# 注意一定要在虚拟环境中(miao_venv) maojie@Thinkpad:~/flask-plan/flask-course-primary/day4$ pip install Flask_SQLAlchemy2.2 配置Flask_SQLAlchemy我们的教程中使用的数据库是SQLite(Linux),主要原因是SQLite足够简单,不需要进行任何配置便可使用,十分适用于入门。下面在__init__.py文件中配置数据库,如下所示: # 在__init__.py文件中的内容from flask import Flask# 从flask_sqlalchemy导入SQLAlchemy类from flask_sqlalchemy import SQLAlchemyimport os# 通过Flask创建一个app实例app = Flask(__name__)basedir = os.path.abspath(os.path.dirname(__file__))# Flask_SQLAlchemy插件从SQLALCHEMY_DATABASE_URI配置的变量中获取应用的数据库位置app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(basedir, 'database.db')# 通过SQLAlchemy创建db实例,表示程序使用的数据库,并且db能够使用Flask_SQLAlchemy的所有功能db = SQLAlchemy(app)2.3 构建数据库模型数据模型通常用来定义数据库中的表及表中的字段。下面代码中的User类和Post类就代表了数据库中的两张表(官方叫法是数据模型)。后面我们会看到,通过这里定义的两个Python类,我们就可以非常容易的完成表的增删改查,以下是routes.py文件中的内容。 ...

June 11, 2019 · 4 min · jiezi

sqlalchemy使用count时遇到的坑

在用flask-sqlalchemy对一个千万级别表进行count操作时,出现了耗时严重、内存飙升的情况。要统计出一天内车辆访问次数,原代码如下: car_visit_counts = CarVisit.query.filter( CarVisit.park == car_visit.park, CarVisit.plate_number == car_visit.plate_number, CarVisit.visited_at >= today_start_time(),).count()发现代码运行特别慢,所以把生成的sql打印出来看一下: SELECT COUNT(*) AS count_1FROM ( SELECT car_visits.id AS car_visits_id , car_visits.park_id AS car_visits_park_id , car_visits.store_id AS car_visits_store_id , car_visits.car_id AS car_visits_car_id , car_visits.brand_id AS car_visits_brand_id , ... FROM car_visits WHERE %(param_1)s = car_visits.park_id AND car_visits.plate_number = %(plate_number_1)s AND car_visits.visited_at >= %(visited_at_1)s ) AS anon_1可以发现进行了一次子查询,这样的话会生成临时表,效率低下,将原语句改变一下: car_visit_counts = db.session.query(func.count(CarVisit.id)).filter( CarVisit.park == car_visit.park, CarVisit.plate_number == car_visit.plate_number, CarVisit.visited_at >= today_start_time(),).scalar()此时在看一下打印的sql语句: ...

June 7, 2019 · 1 min · jiezi

SQLAlchemy的类继承、抽象类

Python中的类继承是相当容易的,但是在SQLAlchemy中却不能直接用Python类继承完成,还要多加一些设置。网上关于这个东西,东说西说的非常多,甚至官网都没有把最简单的解决方案po出来,取而代之的是非常复杂的Inheritance Configuration。首先说最简单的方案,来自Stackoverflow,亲测完美有效,最符合Python类继承。参考:Sqlalchemy: avoiding multiple inheritance and having abstract base class正解在这里,我们称这个方法为__abstract__方法:Base = declarative_base()class CommonRoutines(Base): abstract = True id = Column(Integer, primary_key=True) def init(self): # …class Foo(CommonRoutines): tablename = ‘foo’ name = Column(…) def init(self, name): super().init() self.name = name # …也就是说,抽象类中只要用__abstract__ = True代替__tablename__即可完成一切工作,其它一切都和Python内置的类继承一摸一样了。继承中的类方法和静态方法SQLAlchemy的ORM继承,在classmethod和staticmethod继承是和Python OOP面向对象的继承方案一致的。也就是说:被冠之@staticmethod的静态方法,会被继承,但是在子类调用的时候,却是调用的父类同名方法。被冠之@classmethod的类方法,会被继承,子类调用的时候就是调用子类的这个方法。继承中的外键奇怪的是,SQLAlchemy定义的ORM,在继承父级ORM时候,Foreign Key外键是不能继承的,它强制要求在子类中重新定义。参考官方文档:Mapping Class Inheritance Hierarchies 建议直接用Ctrl-f搜索"foreign`关键字,就能看到官方在继承时,也都要重新定义一遍外键。再参考:SQLAlchemy Inheritanceclass Parent(Base): abstract = True id = Column(‘id’, Integer, primary_key=True) name = Column(’name’, String) age = Column(‘age’, String) fk = Column(‘fk’, Integer, ForeignKey(‘anotherTable.id’), primary_key=True)class Son(Parent): tablename = ‘son’ fk = Column(‘fk’, Integer, ForeignKey(‘anotherTable.id’), primary_key=True)其它继承方案如果参考别人的方案、官网的方案,会让你晕头转向。为了避免重复参考别人的东西,这里贴上一些不是解决方案的解决方案。declarative_base(cls=XX)方法:class CommonBase(object): @classmethod def somecommonaction(cls): # body hereBase = declarative_base(cls=CommonBase)class Table1(Base): # tablename & Table1 specific fields hereclass Table2(Base): # tablename & Table2 specific fields here这样的缺点是,很难看清继承关系。官方的__mapper_args__方法:class Person(Base): tablename = ‘people’ id = Column(Integer, primary_key=True) discriminator = Column(’type’, String(50)) mapper_args = {‘polymorphic_on’: discriminator}class Engineer(Person): tablename = ’engineers’ mapper_args = {‘polymorphic_identity’: ’engineer’} id = Column(Integer, ForeignKey(‘people.id’), primary_key=True) primary_language = Column(String(50))可以看出,这个必须在父子类都中分别定义难懂的__mapper_args__属性。这还不算完,官网中还说各种映射需要不同的复杂设置。有兴趣可参考官网:https://docs.sqlalchemy.org/e… ...

January 24, 2019 · 1 min · jiezi

flask-sqlalchemy操作(基础)

以下内容介绍了Sqlalchemy的基础查询语句,下篇文章将介绍其高级查询(聚合、自关联、连接、子查询等)模型类# 用户表class User(db.Model): tablename = ‘user’ uid = db.Column(db.String(32), primary_key=True, nullable=False) username = db.Column(db.String(20), nullable=True) password = db.Column(db.String(128), nullable=True) email = db.Column(db.String(30), nullable=True) addresses = db.relationship(‘Address’, backref=‘user’)# 地址信息class Address(db.Model): tablename = ‘address’ aid = db.Column(db.String(32), primary_key=True, nullable=False) name = db.Column(db.String(32), nullable=True) site = db.Column(db.String(100), nullable=True) phone = db.Column(db.Integer, nullable=True) uid = db.Column(db.String(32), db.ForeignKey(‘user.uid’))# 关联表OrderItem = db.Table( ‘orderitem’, db.Column(‘gid’, db.String(32), nullable=True), db.Column(‘product_id’, db.String(32), db.ForeignKey(‘product.pid’)), db.Column(‘order_id’, db.String(32), db.ForeignKey(‘order.oid’)))# 商品信息class Product(db.Model): tablename = ‘product’ pid = db.Column(db.String(32), nullable=False, primary_key=True) pname = db.Column(db.String(50), nullable=True) market_price = db.Column(db.Float, nullable=True) shop_price = db.Column(db.Float, nullable=True) pimage = db.Column(db.String(200), nullable=True) pdate = db.Column(db.Date, nullable=True) is_hot = db.Column(db.Integer, nullable=True) pdesc = db.Column(db.String(255), nullable=True) pflag = db.Column(db.Integer, nullable=True) order = db.relationship(‘Order’, secondary=OrderItem)# 订单表class Order(db.Model): tablename = ‘order’ oid = db.Column(db.String(32), nullable=False, primary_key=True) count = db.Column(db.Integer, nullable=True) subtotal = db.Column(db.Float, nullable=True) ordertime = db.Column(db.DateTime, nullable=True) flag = db.Column(db.String(10), nullable=True)增一对一order = models.Orders(oid=orderid, ordertime=datetime.now(), total=pcount, uid=pid) # 构建对象models.db.session.add(order) # 添加对象models.db.session.commit() # 提交事务一对多p = models.User(uid=‘122’, username=‘hello’, password=‘123456’, email='1@qq.com’) # 主表c1 = models.Address(aid=‘1111111111’,site=‘xxxxxxxxxx’) # 子表c2 = models.Address(aid=‘2222222222’, site=‘yyyyyyyyyy’) # 子表p.addresses = [c1, c2] # 赋值对象models.db.session.add(p) # 添加models.db.session.commit() # 提交多对多p = models.Product(pid=‘1’,pname=‘hello’) # 生成或获取商品对象o = models.Order(oid=‘1’, ) # 生成订单对象p.order = [o] # 订单表与商品表关联models.db.session.add(p) # 添加models.db.session.commit() # 提交查一对一models.Product.query.get(pid) # 根据主键返回一个对象,必须查询主键models.Product.query.all() # 返回全部对象models.Product.query.filter_by(pid=pid).first() # 返回第一个对象,检索任何值均可models.Product.query.filter_by(cid=‘1’).limit(4).all() # 限制返回对象一对多# 根据用户获取地址p = models.user.query.get(uid) # 根据主键返回一个对象,必须查询主键p.addresses # 一对多获取对象# 根据地址获取用户u = models.Address.query.get(1)print(u.user)多对多p = models.Product.query.get(1) # 正向查询print(p.order)o = models.Order.query.get(1) # 逆向查询print(o.product)改一对一good = models.Product.query.filter_by(pid=pid).first() # 获取good.pflag = 6 # 修改models.db.session.commit() # 提交一对多u = models.User.query.get(1) for i in u.addresses: i.name = ‘Gage’models.db.session.commit()多对多o = models.Order.query.get(1) for i in o.product: i.pname = ‘Gage’models.db.session.commit()删一对一add = models.Address.query.filter_by(aid=aid).first() # 获取models.db.session.delete(add) # 添加models.db.session.commit() # 提交一对多# 根据用户删除地址# cascade=‘all’ 添加此属性会级联删除# 用户一个用户对应多个地址,因此需要循环遍历,实际开发需使用filter_by 或 filter进行过滤特定删除u = models.User.query.get(1) for i in u.addresses: print(i.aid)models.db.session.delete(i)models.db.session.commit()# 根据地址删除用户a = models.Address.query.get(1)models.db.session.delete(a.user)models.db.session.commit()多对多# 实际开发也是如此,即只删除依赖# 根据商品删除所依赖的订单p = models.Product.query.get(1)o = models.Order.query.get(1)p.order.remove(o)models.db.session.commit()# 根据订单删除所关联的商品p = models.Product.query.get(1)o = models.Order.query.get(1)o.product.remove(p)models.db.session.commit() ...

January 15, 2019 · 2 min · jiezi

Flask分页iter_pages之None分析

昨天看一了下问答区的问题,很多人都咨询了flask在分页的时候总是会遍历出None,这是怎么回事呢?先来一段小程序:数据库信息:代码程序:if name == “main”: user=User.query.paginate(1,2) for i in user.iter_pages(): print(i,end=" “) 输出信息:1 2 3 4 5 None 8 9 今天我自己看了一下源码,来分析一下def iter_pages(self, left_edge=2, left_current=2, right_current=5, right_edge=2): last = 0 for num in xrange(1, self.pages + 1): if num <= left_edge or \ (num > self.page - left_current - 1 and \ num < self.page + right_current) or \ num > self.pages - right_edge: if last + 1 != num: yield None yield num last = num可以很明了的看到当last+1 != num的时候就会返回None,那么接下来的问题就是如果可以使last+1 != num,那就是当num <= left_edge or (num > self.page - left_current - 1 and num < self.page + right_current) or num > self.pages - right_edge不成立的时候,下次循环就会使last+1 != num。那么,我们如何结果很多人想要完整遍历的目的呢?那就是在默认参数总改变值,使上述的三个条件中任意一个永远成立,如使num <= left_edge永远成立for i in user.iter_pages(left_edge=user.pages): print(i,end=” “) 输出结果:1 2 3 4 5 7 8 9这样就完美解决了。当然以上促使以上三个任意条件永远成立都可以进行完整遍历。 ...

January 3, 2019 · 1 min · jiezi

Flask_sqlalchemy之数据分页

在做项目中很多时候我们都需要分页,可能很多人都知道使用paginate进行分页,可是然后就不知道了,今天像大家介绍一下个paginate分页。paginate(self, page=None, per_page=None, error_out=True, max_per_page=None) page:需要查询第几页的数据,默认值:1 源代码""" if page is None: try: page = int(request.args.get(‘page’, 1)) except (TypeError, ValueError): if error_out: abort(404) page = 1""" per_page:每页数据量,默认值:20 源代码""" if per_page is None: try: per_page = int(request.args.get(‘per_page’, 20)) except (TypeError, ValueError): if error_out: abort(404) per_page = 20""" error_out:当参数为True时,会有404响应,在page<1或per_page<0都会响应404 源代码""" if page < 1: if error_out: abort(404) else: page = 1 if per_page < 0: if error_out: abort(404) else: per_page = 20""" max_per_page:每页最大数据量,如果指定,则默认取其与per_page的最小值 源代码""" if max_per_page is not None: per_page = min(per_page, max_per_page)“““其实今天我想写的并不是这几个参数,而是它的返回值,Pagination对象,既然他返回了一个Pagination对象,那么这个东西是什么,有什么用呢?Pagination(query, page, per_page, total, items):一个帮助分页的类has_next:判断是否有下一页has_prev:判断是否有上一页next_num:返回下一页页码prev_num:返回上一页页码page :当前页码pages:总页数per_page:每一页的数据量prev():返回Pagination上一页的对象next():返回Pagination下一页的对象items:返回当前页面项目,可遍历iter_pages(left_edge = 2,left_current = 2,right_current = 5,right_edge = 2):迭代分页中的页码好了,今天的内容我介绍完了,大家尝试一下吧。 ...

January 2, 2019 · 1 min · jiezi

Flask之扩展flask-migrate

flask-migrate一个用来做数据迁移的falsk扩展,一般都是结合flask-sqlalchemy使用,在上一篇文章中我也介绍了这个扩展,需要的小伙伴可以看一下,后续我会将flask-sqlalchemy更深层的写出来。【config.py】SQLALCHEMY_DATABASE_URI=‘mysql://root:mysql@127.0.0.1:3306/test’ //数据库连接SQLALCHEMY_TRACK_MODIFICATIONS=False【data_migrate.py】from flask import Flaskfrom flask_sqlalchemy import SQLAlchemyfrom flask_script import Manager #这是一个做脚本调式的库,有时间我也会总结from flask_migrate import Migrate,MigrateCommandapp = Flask(name)app.config.from_envvar(‘config.py’)db = SQLAlchemy(app) migrate = Migrate(app, db)manager = Manager(app)manager.add_command(‘db’, MigrateCommand)class User(db.Model): #创建一个模型类,用于做数据迁移 id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(32)) if name == ‘main’: manager.run()【控制台测试】>>>python data_migrate.py db init //创建迁移存储库>>>python data_migrate.py db migrate -m ‘版本名后缀’ //生成初始迁移>>>python data_migrate.py db upgrade //将迁移应用于数据库//若有修改,可重复执行2/3这两条命令>>>python 文件 db history //显示整个历史版本记录【其他命令】python data_migrate.py db –help //帮助,查找所有命令python data_migrate.py db current //显示当前版本python data_migrate.py db upgrade 版本号 //升级版本,不指定版本为最新版本python data_migrate.py db downgrade 版本号 //降级数据库,不指定版本则是最老版本 ...

December 28, 2018 · 1 min · jiezi

Flask扩展之flask-sqlalchemy(上)

flask-sqlalchemy是flask的一个ORM扩展框架,这个扩展在sqlalchemy的进行的扩展,更方便的结合Flask.什么是ORM?其是Object Relational Mapping的缩写,中文:对象关系映射,说白了就是程序中的实体类通过ORM可以映射成为数据库中的表,方便我们通过程序的方式操作数据表,这里就包括数据表的生成、删除、关系创建及表记录的增删改查。【config.py】SQLALCHEMY_DATABASE_URI=‘mysql://root:mysql@127.0.0.1:3306/test’ //数据库连接SQLALCHEMY_TRACK_MODIFICATIONS=False上面两项是必配置的属性,否则程序将不能正常运行:所有配置键见最后【create_sur.py】from flask import Flaskfrom flask_sqlalchemy import SQLAlchemyapp = Flask(name)db = SQLAlchemy(app)app.config.from_object(‘config.py’)class User(db.Model): tablename = ‘user’ #指定表名,默认模型类小写 id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(32)) def repr(self): return ‘Role:%s’ % self.nameif name == “main”: db.create_all() app.run()这样子在test数据库下就生成了一张user表,是不是很简单。在进行数据的增删改查之前,我们先来看一下常用的操作语句:常见操作语句db.session.add(obj) 添加对象db.session.add_all([obj1,obj2,..]) 添加多个对象db.session.delete(obj) 删除对象db.session.commit() 提交会话db.session.rollback() 回滚db.session.remove() 移除会话增加数据>>>from create_sur import db,User>>>user1=User(name=‘jim’)>>>db.session.add(user1)>>>db.session.commit() //添加一条数据>>>user2=User(name=‘sam’)>>>user3=User(name=‘alice’)>>>db.session.add_all([user2,user3]) //批量添加数据>>>db.session.commit()查询数据>>>from create_sur import db,User>>>user_all=User.query.all() //查询所有数据>>>user=User.query.filter_by(name=‘jim’).all() //查询name为jim的数据>>>user=User.query.filter(User.name=‘sam’).first() //查询name为sam的数据常用过滤函数:常用查询函数:删除数据//在查询数据的基础上>>>db.session.delete(obj) //obj为查询后的数据对象>>>db.session.commit()更新数据//在查询数据的基础上通过修改对象的属性然后再添加达到更新的作用>>>user=User.query.filter(User.name=‘sam’).first() //查询数据>>>user.name=‘sam_two’>>>db.session.add(user)>>>db.session.commit()上面是一些简单的小例子,接下来我们写一个有外键关系的两个表的例子:【create_sur2.py】from flask import Flaskfrom flask_sqlalchemy import SQLAlchemyapp = Flask(name)db = SQLAlchemy(app)app.config.from_object(‘config.py’)class Role(db.Model): # 定义表名 tablename = ‘roles’ # 定义列对象 id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True) #设置关系属性,方便查询使用 us = db.relationship(‘User’, backref=‘role’) #重写__repr__方法,方便查看对象输出内容 def repr(self): return ‘Role:%s’% self.name class User(db.Model): tablename = ‘users’ id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(64), unique=True, index=True) password = db.Column(db.String(64)) role_id = db.Column(db.Integer, db.ForeignKey(‘roles.id’)) #定义外键 def repr(self): return ‘User:%s’%self.name上述有两个重点:使用ForeignKey义外键属性使用relationship方法定义两表的关系接下来我们测试一下这两个方法的作用:添加一些数据:>>>from create_sur2 import db,User,Role>>>ro1 = Role(name=‘admin’)>>>ro2 = Role(name=‘user’)>>>db.session.add_all([ro1,ro2])>>>db.session.commit()>>>us1 = User(name=‘wang’, password=‘123456’, role_id=ro1.id)>>>us2 = User(name=‘zhang’, password=‘201512’, role_id=ro2.id)>>>us3 = User(name=‘chen’, password=‘987654’, role_id=ro2.id)>>>us4 = User(name=‘zhou’, password=‘456789’, role_id=ro1.id)>>>db.session.add_all([us1,us2,us3,us4])>>>db.session.commit()测试一下查询:>>>from create_sur2 import User,Role>>>role=Role.query.get(1)>>>role.User.all()[User:wang,User:zhou] 这里之所以可以找到就是因为relationship通过外键作用实现的>>>user=User.query.get(3)>>>user.role[Role:user] 这里之所以可以找到就是因为relationship的backref参数值实现的所有配置清单SQLALCHEMY_DATABASE_URI 用于连接的数据库 URI SQLALCHEMY_BINDS 一个映射 binds 到连接 URI 的字典SQLALCHEMY_ECHO 如果设置为Ture, SQLAlchemy 会记录所有 发给 stderr 的语句,这对调试有用。(打印sql语句)SQLALCHEMY_RECORD_QUERIES 可以用于显式地禁用或启用查询记录。查询记录 在调试或测试模式自动启用。更多信息见get_debug_queries()。SQLALCHEMY_NATIVE_UNICODE 可以用于显式禁用原生 unicode 支持。当使用 不合适的指定无编码的数据库默认值时,这对于 一些数据库适配器是必须的(比如 Ubuntu 上 某些版本的 PostgreSQL )。SQLALCHEMY_POOL_SIZE 数据库连接池的大小。默认是引擎默认值(通常 是 5 )SQLALCHEMY_POOL_TIMEOUT 设定连接池的连接超时时间。默认是 10 。SQLALCHEMY_POOL_RECYCLE 多少秒后自动回收连接。这对 MySQL 是必要的, 它默认移除闲置多于 8 小时的连接。注意如果 使用了 MySQL , Flask-SQLALchemy 自动设定 这个值为 2 小时。所有数据类型 ...

December 27, 2018 · 1 min · jiezi