Flask入门四(SQLAlchemy快速使用、原生sql操作、操作表、表模型、基于Scoped_Session实现线程安全、基本增删查改、高级查询、Flask-SQLAlchemy的使用)

avatar
作者
猴君
阅读量:1

文章目录

一、SQLAlchemy介绍和快速使用

1.介绍

SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

SQLAlchemy是一个企业级的orm框架,Django的orm框架,只能在Django框架中使用,不能在别的地方使用,而SQLAlchemy可以单独使用,也可以用在其他框架中

	-flask : sqlalchemy     -fastapi:sqlalchemy      	python界的orm框架 		-django orm 		-sqlalchemy 		-peewee 

组成部分:

  • Engine,框架的引擎
  • Connection Pooling ,数据库连接池
  • Dialect,选择连接数据库的DB API种类
  • Schema/Types,架构和类型
  • SQL Exprression Language,SQL表达式语言

安装

	pip install  sqlalchemy 

SQLAlchemy本身无法操作数据库,其必须依赖pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

	MySQL-Python     	mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>      	pymysql 	    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] 	     	MySQL-Connector 	    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> 	     	cx_Oracle 	    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 	     	更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html  	# django中如何反向生成models 	python manage.py inspectdb > app/models.py 

2.sqlalchemy原生操作

	'使用sqlalchemy操作原生sql' 	import pymysql 	 	1.导入 	from sqlalchemy import create_engine 	from sqlalchemy.engine.base import Engine 	 	2.创建引擎 	engine = create_engine( 	    "mysql+pymysql://root:1234@127.0.0.1:3306/cnblogs", 	    max_overflow=0,  # 超过连接池大小外最多创建的连接 	    pool_size=5,     # 连接池大小 	    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错 	    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置) 	) 	 	'================================' 	 	3.使用引擎拿到链接 	conn = engine.raw_connection() 	4.剩下的和之前操作sql一样,使用pymysql 	cursor=conn.cursor(pymysql.cursors.DictCursor) 	cursor.execute('select * from article limit 10') 	res = cursor.fetchall() 	print(res) 	 	 	'使用多线程测试:' 	from threading import Thread 	def task(arg): 	    conn = engine.raw_connection() 	    cursor = conn.cursor() 	    cursor.execute('select * from article') 	    res = cursor.fetchall() 	    print(res) 	    cursor.close() 	    conn.close() 	 	 	for i in range(20): 	    t = Thread(target=task,args=(i,)) 	    t.start() 

二、sqlalchemy操作表

1.创建、删除表

1.导入 import datetime from sqlalchemy import create_engine from sqlalchemy.orm import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base()  # Base当成models.Model  2.创建表模型 class User(Base):     __tablename__ = 'users'  # 创建的表名     # 写字段     id = Column(Integer,primary_key=True,autoincrement=True)  # 主键、自增     name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空  普通索引     email = Column(String(32), unique=True)  # 唯一索引     # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间     ctime = Column(DateTime, default=datetime.datetime.now)     extra = Column(Text)  # 大文本  3.没有命令,后期可以使用第三方模块,可以有命令操作 # 目前需要手动做 # sqlalchemy 不能创建数据库,能创建表,不能删除增加字段(第三方模块) 3.1.创建引擎 engine = create_engine(  # 需要自己手动创建库     "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy01",     max_overflow=0,  # 超过连接池大小外最多创建的连接     pool_size=5,     # 连接池大小     pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错     pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置) ) 3.2.把表模型同步到数据库中(能创建删除,不能修改) Base.metadata.create_all(engine)  3.3.删除表 Base.metadata.drop_all(engine) 

2.简单操作(orm)

from models import User  1.创建引擎 from sqlalchemy import create_engine engine = create_engine(  # 需要自己手动创建库     "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy01",     max_overflow=0,  # 超过连接池大小外最多创建的连接     pool_size=5,  # 连接池大小     pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错     pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置) )  2.orm操作,借助于engine 得到session(conn)对象 from sqlalchemy.orm import sessionmaker Connection = sessionmaker(bind=engine) conn = Connection()  # 不是flask中的session,而是会话,conn的  3. 拿到(conn)会话对象后,进行orm操作 3.1 增加数据 user = User(name='jack',email='1@qq.com') # 插入到数据库 conn.add(user)  # 放个对象 # 提交 conn.commit() # 关闭连接 conn.close()  3.2 查询数据 # 查询User表中id为1的所有数据-->是一个列表 res = conn.query(User).filter_by(id=1).all() print(res)   3.3 删除数据 res = conn.query(User).filter_by(name='jack').delete() print(res) # 返回删除的条数 conn.commit() # 提交  3.4 修改 res = conn.query(User).filter_by(name='jack').update({'name':'oscar','extra':'xxxx'}) conn.commit() 

三、SQL表模型:一对多关系

1.表模型

import datetime from sqlalchemy import create_engine from sqlalchemy.orm import declarative_base, relationship from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index  Base = declarative_base()  # Base 当成 models.Model  # 一对多 :一个兴趣被多个人喜欢  一个人只喜欢一个兴趣 class Hobby(Base):     __tablename__ = 'hobby'     id = Column(Integer, primary_key=True)     caption = Column(String(50), default='篮球')      def __str__(self):         return self.caption      def __repr__(self):         return self.caption    class Person(Base):     __tablename__ = 'person'     id = Column(Integer, primary_key=True)     name = Column(String(32), index=True, nullable=True)     # hobby指的是tablename而不是类名,uselist=False     # 外键关联--》强外键--》物理外键     hobby_id = Column(Integer, ForeignKey("hobby.id")) # 加一个unique=True就是一对一的外键关系      # 跟数据库无关,不会新增字段,只用于快速连表操作     # 基于对象的跨表查询:就要加这个字段,取对象  person.hobby     pserson.hobby_id     # 类名,backref用于反向查询     hobby = relationship('Hobby', backref='pers')      def __str__(self):         return self.name      def __repr__(self):         return self.name   if __name__ == '__main__':     # 创建引擎     engine = create_engine(         "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy02",         max_overflow=0,  # 超过连接池大小外最多创建的连接         pool_size=5,  # 连接池大小         pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错         pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)     )      # 把表模型同步到数据库中     Base.metadata.create_all(engine) 

2.新增和基于对象的跨表查询

from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from models import Hobby, Person  engine = create_engine(     "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy02",     max_overflow=0,  # 超过连接池大小外最多创建的连接     pool_size=5,  # 连接池大小     pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错     pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置) )  Session = sessionmaker(bind=engine) session=Session()  '''一对多关系新增''' # 增加 Hobby hobby = Hobby(caption='足球') hobby1 = Hobby() session.add_all([hobby,hobby1]) session.commit()  '''三种新增方案''' # 方案一 person = Person(name='jack',hobby_id=1) person1 = Person(name='tom',hobby_id=2) session.add_all([person,person1]) session.commit()  # 方案二 # 增加person表数据时直接增加关联的hobby表数据 hobby = Hobby(caption='乒乓球') person = Person(name='oscar',hobby=hobby) # 这里的直接使用对象的方式,前提是表模型中必须有relationship session.add(person) person1 = Person(name='kami',hobby=Hobby(caption='棒球')) session.add(person1) session.commit()   # 方案三 hobby = session.query(Hobby).filter_by(id=1).first() print(hobby) person = Person(name='ankn',hobby=hobby) person1 = Person(name='tank',hobby_id=hobby.id) session.add_all([person,person1]) session.commit()   '''基于对象的跨表查询--正向''' person = session.query(Person).filter_by(id=2).first() # person = session.query(Person).filter_by(name='tom').first() print(person)  # tom # 基于对象的跨表查询,正向查询 print(person.hobby.caption)  # 篮球 print(person.hobby_id)  # 2   '''基于对象的跨表查询--反向''' hobby = session.query(Hobby).filter_by(id=1).first() # hobby = session.query(Hobby).filter_by(caption='足球').first() print(hobby)  # 足球 # 基于对象的跨表查询,反向查询 print(hobby.pers)  # [jack, ankn, tank] 是一个列表套对象 print(hobby.pers[1].name) # for i in hobby.pers:  # 循环拿出每一个对象 #     print(i.name) 

四、SQL表模型:多对多关系

1.表模型

import datetime from sqlalchemy import create_engine from sqlalchemy.orm import declarative_base, relationship from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index  Base = declarative_base()  # Base 当成 models.Model  '''多对多''' class Boy2Girl(Base):     __tablename__ = 'boy2girl'     id = Column(Integer, primary_key=True, autoincrement=True)     girl_id = Column(Integer, ForeignKey('girl.id'))     boy_id = Column(Integer, ForeignKey('boy.id'))     # boy = relationship('Boy', backref='boy')   class Girl(Base):     __tablename__ = 'girl'     id = Column(Integer, primary_key=True)     name = Column(String(64), unique=True, nullable=False)      def __str__(self):         return self.name      def __repr__(self):         return self.name   class Boy(Base):     __tablename__ = 'boy'     id = Column(Integer, primary_key=True, autoincrement=True)     name = Column(String(64), unique=True, nullable=False)      def __str__(self):         return self.name      def __repr__(self):         return self.name      # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以--等同于manytomany     # 方便快速查询,写了这个字段,相当于django 的manytomany,快速使用基于对象的跨表查询     girls = relationship('Girl', secondary='boy2girl', backref='boys')    if __name__ == '__main__':     # 创建引擎     engine = create_engine(         "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy02",         max_overflow=0,  # 超过连接池大小外最多创建的连接         pool_size=5,  # 连接池大小         pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错         pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)     )      # 把表模型同步到数据库中     Base.metadata.create_all(engine) 

2.新增和基于对象的跨表查询

	from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from models import Girl, Boy, Boy2Girl  engine = create_engine(     "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy02",     max_overflow=0,  # 超过连接池大小外最多创建的连接     pool_size=5,  # 连接池大小     pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错     pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置) )  Session = sessionmaker(bind=engine) session=Session()   '''多对多关系新增''' 1.手动操作第三张表,建立关系 # 增加Boy boy = Boy(name='彭于晏') boy1 = Boy(name='吴彦祖') boy2 = Boy(name='金城武') session.add(boy) session.add_all([boy1, boy2]) # 增加Girl girl = Girl(name='金玟庭') girl1 = Girl(name='田振姬') girl2 = Girl(name='赵美延') session.add(girl) session.add_all([girl1, girl2]) session.commit() # 建立外键关系:手动操作第三张表 obj = Boy2Girl(boy_id=3, girl_id=1) obj1 = Boy2Girl(boy_id=3, girl_id=2) obj2 = Boy2Girl(boy_id=3, girl_id=3) session.add_all([obj, obj1, obj2]) session.commit()  2.通过关联关系 girl = Girl(name='沈小婷') girl1 = Girl(name='柳智敏') boy = Boy(name='古天乐',girls=[girl,girl1])  # 可以多条 session.add(boy) session.commit()   '''基于对象的跨表查询--正向''' boy = session.query(Boy).filter_by(name='古天乐').first() print(boy) # 基于对象的跨表查询,正向查询 print(boy.girls)  # 列表套对象 print(boy.girls[0]) for i in boy.girls:  循环取出对象     print(i.name)   '''基于对象的跨表查询--反向''' girl = session.query(Girl).filter_by(name='赵美延').first() print(girl) # 基于对象的跨表查询,反向查询 print(girl.boys)  # 列表套对象 

五、基于Scoped_Session实现线程安全

session对象,集成到flask中去,要把session对象做成全局(大家公用),还是每个视图函数独有一个(每次都要实例化得到这个session对象)不是session_key,而是链接会话session

每个视图函数独有一个,每次都要实例化,sqlalchemy提供了一种方式,让咱们使用全局的一个session,但是每个视图函数中使用的都是不同的request,session都是这种实现机制

sqlalchemy提供了一种,在不同线程中,虽然使用全局session,实际上每个线程自己独有一个session

1.线程安全

from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy02", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine)  """ session是链接对象,如果集成到flask中,我们是把session定义成全局还是每个视图函数一个session呢? 正常来讲要每个视图函数定义一个session,有些麻烦。 sqlalchemy 帮咱提供了一个只要定义一次的session,能够做到在不同线程中,使用的是自己的session,底层基于local。 """ 1.原来获得 session对象 # session=Session()  # 放在flask中,这样如果做成全局,线程就不安全,想要安全就得在视图函数中每次都实例化得到 # print(type(session)) from sqlalchemy.orm import scoped_session from threading import local from models import User 2.使用scoped_session得到session对象,保证线程安全,本意就是每个线程使用自己独立的session session=scoped_session(Session) print(type(session)) ############# 执行ORM操作 ############# obj = User(name='jack',email='1@qq.com') session.add(obj) # 提交事务 session.commit() # # 关闭session session.close() 

2.研究

''' 研究1:session对象,现在不是原来的session对象了 session=Session()  <class 'sqlalchemy.orm.session.Session'> session=scoped_session(Session)  <class 'sqlalchemy.orm.scoping.scoped_session'> '''  ''' 研究2:类上加装饰器,能够给对象,增加属性或方法 ''' # 加个showName方法和name属性 def showName():     print('xxxxx')   def add(func):     def inner(*args,**kwargs):         res = func(*args,**kwargs) # Person()         res.name = 'jack'         res.showName=showName         return res     return inner  @add  # Person=add(Person) class Person:     pass  p = Person() print(p.name)  # jack p.showName()  ''' 研究3:如何保证线程安全     补充from threading import local,local是一个类,类加括号实例对象      l=local() # 而local对象线程是安全的,不需要加锁      这个l,可以被多个线程并发操作, 而且不会数据错乱      如何实现的?内部通过线程id号作区分,取值赋值用的都是自己的那部分,例如          线程1:l.name='jack'          线程2:l.name='tom'         # 线程1上取name,永远取出来的是线程1当时放入的值,线程二也是一样                       session:scoped_session独享     session.add------》return self._proxied.add()     self._proxied----》return self.registry()     self.registry = ThreadLocalRegistry(session_factory)     ThreadLocalRegistry(session_factory)--》这个对象加括号,会触发它的__call__         def __call__(self) -> _T:             try:                 # 这个register就是self.registry = threading.local(),所以它是线程安全的                 # 所以不同线程都是使用自己的value,也就是session对象                 return self.registry.value  # 不同线程有自己的一个session对象             except AttributeError:                 val = self.registry.value = self.createfunc()  # 所以这个就是Session()                 return val ''' 

六、基本增删查改和高级查询

from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import scoped_session from models import Boy, Girl, Boy2Girl, Hobby, User, Person from sqlalchemy.sql import text  engine = create_engine("mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy02", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = scoped_session(Session)  # 线程安全  '''1.基本增加:add、add_all''' # add_all hobby = Hobby(caption='羽毛球') hobby1 = Hobby(caption='橄榄球') session.add_all([hobby,hobby1]) # add person = Person(name='彭于晏',hobby=hobby) session.add(person) session.commit()  '''2.删除''' # 方式一 session.query(User).filter_by(id=1).delete() # 方式二: session.delete(对象) user = session.query(User).filter_by(id=1).first() session.delete(user)   '''3.修改,更新''' # 方式一 session.query(User).filter_by(id=1).update({'name':'oscar'}) session.commit() # 方式二:类名.属性名,作为要修改的key session.query(Boy).filter_by(id=4).update({Boy.name:'xxxx'}) session.commit() # 方式三: # 对象.name='xxx' session.add(对象) boy=session.query(Boy).filter_by(id=1).first() boy.name='xxl' session.add(boy) # 有id就是修改,没有就是新增 session.commit()   # 类似于django的F查询 # 当字符串拼接 session.query(User).filter_by(id=1).update({'name':User.name+'_nb'},synchronize_session=False) # 当字符串相加 session.query(User).filter_by(id=1).update({'id':User.id+6}, synchronize_session="evaluate") session.commit()   '''4.查询--->基本查询''' # 1.filter_by 写条件 res=session.query(User).filter_by(name='jack',id=1).first() res=session.query(User).filter_by(name='jack').all()  # 放在列表中 不是queryset对象 print(res)  # 2.filter 写表达式 res=session.query(User).filter(User.name=='jack').first() res=session.query(User).filter(User.id>=3).all() res=session.query(User).filter(User.name!='xxxx').all() print(res)  # 3.只查表中某几个字段,并重命名 # select name as xx,age from user; res=session.query(User.name.label('xx'), User.email)  # select name,email from user; res=session.query(User.name, User.email).all() res=session.query(User)   # 4.条件可以使用text自己拼凑  占位符 # select * from user where id< 224 and name=jack order by id res = session.query(User).filter(text("id<:value and name=:name")).params(value=224, name='jack').order_by(User.id).all() print(res)   # 5.直接写原生sql # SELECT * FROM users where name=jack res = session.query(User).from_statement(text("SELECT * FROM users where name=:name")).params(name='jack').all() res = session.query(User).from_statement(text("SELECT * FROM users where name=:name")).params(name='张三') print(res)  print('=======================================')  '高级查询' '''表达式,and条件连接''' res = session.query(User).filter(User.id > 1, User.name == 'jack').all()  # and条件  '''between''' # select * from User where user.id between 1 and 3 and name=jack res = session.query(User).filter(User.id.between(1, 3), User.name == 'jack').all()  '''in 条件''' res = session.query(User).filter(User.id.in_([1, 3, 4])).all() res = session.query(User).filter(User.email.in_(['1@qq.com', '3@qq.com'])).all()   '''~非,除。。外''' res = session.query(User).filter(~User.id.in_([1, 3, 4])).all()   '''二次筛选''' # select * from User where id not in (select id from User where name=jack) res = session.query(User).filter(~User.id.in_(session.query(User.id).filter_by(name='jack'))).all()   '''and or条件''' from sqlalchemy import and_, or_ # or_包裹的都是or条件,and_包裹的都是and条件 res = session.query(User).filter(and_(User.id >= 3, User.name == 'jack')).all()  # and条件 # 默认就是and条件,结果和上面一样 res = session.query(User).filter(User.id >= 3,User.name=='jack').all()  # select * from User where id < 2 or name = jack; res = session.query(User).filter(or_(User.id < 2, User.name == 'jack')).all()  # or条件 res = session.query(User).filter(     or_(         User.id < 2,         and_(User.name == 'jack', User.id > 3),         User.extra != ""     )).all()   '''通配符,模糊查询''' # select * from User where email like %@%; res = session.query(User).filter(User.email.like('%@%')).all() res = session.query(User.id,User.name).filter(~User.name.like('j%')).all()  # 不以什么开头   '''限制,用于分页,区间''' res = session.query(User)[2 * 5:2 * 5 + 2] # 切片可以做运算 res = session.query(User)[2:4] # 从零开始,顾头不顾尾   '''排序,根据email降序排列(从大到小)''' res = session.query(User).order_by(User.email.desc()).all() res = session.query(User).order_by(User.email.asc()).all() # 第一个条件重复后,再按第二个条件升序排 res = session.query(User).order_by(User.name.desc(), User.id.asc()).all()   '''分组查询''' from sqlalchemy.sql import func # select name from User group by name; 一旦分组后,就只能查询 分组后的字段和聚合函数的字段 res = session.query(User.name).group_by(User.name).all() # 分组之后取最大id,id之和,最小id res = session.query(     func.max(User.id),  # 最大     func.sum(User.id),  # 总和     func.min(User.id),  # 最小     func.count(User.id),  # 数量     User.name).group_by(User.name).all() for item in res:  # 循环拿出列表里面的每一个元组     print(item)   '''having筛选''' # select max(id),sum(id),min(id) from User group by name having max(id)>2; res = session.query(     func.max(User.id),     func.sum(User.id),     func.min(User.id)).group_by(User.name).having(func.max(User.id) > 2).all()   '''链表操作''' # select * from person,hobby where person.hobby_id=hobby.id;   # 原生sql res = session.query(Person, Hobby).filter(Person.hobby_id == Hobby.id).all() # join表,默认是inner join,自动按外键关联  内连接 # select * from Person inner Hobby on Person.hobby_id=Hobby.id;         # 原生sql res = session.query(Person).join(Hobby).all() # isouter=True 外连,表示Person left join Favor,没有右连接,反过来即可 # select * from Person left Hobby on Person.hobby_id=Hobby.id;          # 原生sql res = session.query(Person).join(Hobby, isouter=True).all() res = session.query(Hobby).join(Person, isouter=True).all()  # 反过来顺序就是右链接 # 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上 res = session.query(Person).join(Hobby, Person.id == Hobby.id, isouter=True)  # sql本身有问题,只是给你讲, 自己指定链接字段   '''union和union all''' # 组合(了解)UNION 操作符用于合并两个或多个 SELECT 语句的结果集 q1 = session.query(Boy.id,Boy.name).filter(Boy.id > 1) q2 = session.query(Girl.id,Girl.name).filter(Girl.id < 5) # union 去重 res = q1.union(q2).all() print(res) print(len(res))  q1 = session.query(Boy.name).filter(Boy.id > 1) q2 = session.query(Girl.name).filter(Girl.id < 5) res = q1.union_all(q2).all() print(res) print(len(res))   '''一对多,基于链表跨表查(__链表)''' # 方式一:直接连 res = session.query(Person, Hobby).filter(Person.hobby_id == Hobby.id, Hobby.id >= 2).all() # 方式二:join连 res = session.query(Person).join(Hobby).filter(Person.id >= 2).all()  '''多对多关系,基于链表的跨表查''' # 方式一:直接连 res = session.query(Boy, Girl, Boy2Girl).filter(Boy.id == Boy2Girl.boy_id, Girl.id == Boy2Girl.girl_id).all() # 方式二:join连 res = session.query(Boy).join(Boy2Girl).join(Girl).filter(Person.id >= 2).all()  print(res) session.commit() session.close() 

七、Flask-SQLAlchemy的使用

1.flask中集成sqlalchemy使用(自己做)

	'创建一个flask项目' 	''' 		项目结构 		src 			-__init__.py 			-db.py 			-models.py 			-settings.py 			-views.py 		templates 			-index.html 		app.py 	'''  	'app.py' 	from src import app 	if __name__ == '__main__': 	    app.run()  	'settings.py' 	DEBUG=True  	'__init__.py' 	from flask import Flask 	app = Flask(__name__, template_folder='../templates') 	app.config.from_pyfile('./settings.py') 	 	# 注册蓝图 	from src.views import bp_user 	app.register_blueprint(bp_user) 	 	'db.py' 	from sqlalchemy import create_engine 	from sqlalchemy.orm import sessionmaker 	from sqlalchemy.orm import scoped_session 	from sqlalchemy.sql import text 	engine = create_engine("mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy02", max_overflow=0, pool_size=5) 	Session = sessionmaker(bind=engine) 	session = scoped_session(Session)  # 线程安全  	'models.py' 	import datetime 	from sqlalchemy import create_engine 	from sqlalchemy.orm import declarative_base, relationship 	from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index 	Base = declarative_base()  # Base 当成 models.Model 	# 单表 	class User(Base): 	    __tablename__ = 'users'  # 表名 	    # 写字段 	    id = Column(Integer, primary_key=True, autoincrement=True)  # id 主键 	    name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空 	    email = Column(String(32), unique=True) 	    # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间 	    ctime = Column(DateTime, default=datetime.datetime.now) 	    extra = Column(Text) 	 	    def __str__(self): 	        return self.name 	 	    def __repr__(self): 	        return self.name 	 	if __name__ == '__main__': 	    # 创建引擎 	    engine = create_engine( 	        "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy02", 	        max_overflow=0,  # 超过连接池大小外最多创建的连接 	        pool_size=5,  # 连接池大小 	        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错 	        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置) 	    ) 	 	    # 把表模型同步到数据库中 	    Base.metadata.create_all(engine)  	'views.py' 	from flask import render_template,jsonify 	from flask import Blueprint 	bp_user = Blueprint('user',__name__) 	from .db import session 	from .models import User 	 	@bp_user.route('/') 	def index(): 	    # 把用户表中所有数据都查出来,返回给前端 	    res = session.query(User).all() 	    '''前后端分离传入数据''' 	    # 不能直接放入jsonify中,json序列化,只能序列化 数字、字符串、列表、布尔和字典 	    # res是列表套对象,是不能直接序列化的 	    # l = [] 	    # for i in res: 	    #     l.append({'name': i.name, 'email': i.email}) 	    # return jsonify({'code': 100, 'msg': '成功', 'results': l}) 	    '''前后端混合传入数据''' 	    return render_template('index.html',users=res) 	 	'index.html' 	<!DOCTYPE html> 	<html lang="en"> 	<head> 	    <meta charset="UTF-8"> 	    <title>Title</title> 	</head> 	<body> 	<h1>hello world</h1> 	{% for user in users %} 	<p>用户名:{{user.name}}----->邮箱:{{user.email}}</p> 	{% endfor %} 	</body> 	</html> 

2.flask中集成flask-sqlalchemy使用

	# 第三方: flask-sqlalchemy 封装了用起来,更简洁 	安装:pip install flask-sqlalchemy 	'''借助于flask-sqlalchemy''' 	1 导入 from flask_sqlalchemy import SQLAlchemy     2 实例化得到对象     	db = SQLAlchemy()     3  将db注册到app中     	db.init_app(app)     ------2,3 可以合并为db = SQLAlchemy(app)--------          4 视图函数中使用session     	全局的db.session  # 线程安全的     5 models.py 中继承Model     	db.Model     6 写字段      	username = db.Column(db.String(80), unique=True, nullable=False)     7 配置文件中加入     SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root@127.0.0.1:3306/ddd?charset=utf8"     SQLALCHEMY_POOL_SIZE = 5     SQLALCHEMY_POOL_TIMEOUT = 30     SQLALCHEMY_POOL_RECYCLE = -1     # 追踪对象的修改并且发送信号     SQLALCHEMY_TRACK_MODIFICATIONS = False 

八、Flask-Migrate的使用

	1.数据库肯定要自己创建 	2.创建表,增加删除字段,需要手动做处理,而django 有两个命令自动做 		python manage.py makemigrations	 # 记录变化     	python manage.py migrate  #把变化同步到数据库 		-那有没有种方案,跟djagno一样,自动记录,自动迁移? 	 	3.第三方模块:flask-migrate,就能完成跟django一样,只是命令稍微有些不同 	# https://github.com/miguelgrinberg/Flask-Migrate/ 	安装:pip install Flask-Migrate --upgrade 	 	4.使用步骤 		-导入 		from flask_migrate import Migrate 		-注册 		'需要先导入app和db才行' 		migrate = Migrate(app, db)  # flask就会多出几个命令 		''' 		flask --app manage:app db init  # 初始化,第一次执行,以后再也不执行了,它执行完,会出现一个migrations文件夹 		flask --app manage:app db migrate # django中的makemigrations 是一模一样 		flask --app manage:app db upgrade  # 跟django的migrate一样 		如果启动文件就是叫app则可以省略--app manage:app,直接flask db migrate这样使用即可,其他命令也是 		''' 

flask中还有和django中的模块类似的模块,如cors、cache、restful等等,具体可自行查找了解

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!