阅读量:1
文章目录
- 一、SQLAlchemy介绍和快速使用
- 二、sqlalchemy操作表
- 三、SQL表模型:一对多关系
- 四、SQL表模型:多对多关系
- 五、基于Scoped_Session实现线程安全
- 六、基本增删查改和高级查询
- 七、Flask-SQLAlchemy的使用
- 八、Flask-Migrate的使用
一、SQLAlchemy介绍和快速使用
1.介绍
SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
SQLAlchemy是一个企业级的orm框架
,Django的orm框架,只能在Django框架中使用,不能在别的地方使用,而SQLAlchemy可以单独使用,也可以用在其他框架中
-flask : sqlalchemy -fastapi:sqlalchemy python界的orm框架 -django orm -sqlalchemy -peewee
组成部分:
- Engine,框架的引擎
- Connection Pooling ,数据库连接池
- Dialect,选择连接数据库的DB API种类
- Schema/Types,架构和类型
- SQL Exprression Language,SQL表达式语言
安装
pip install sqlalchemy
SQLAlchemy本身无法操作数据库,其必须依赖pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html # django中如何反向生成models python manage.py inspectdb > app/models.py
2.sqlalchemy原生操作
'使用sqlalchemy操作原生sql' import pymysql 1.导入 from sqlalchemy import create_engine from sqlalchemy.engine.base import Engine 2.创建引擎 engine = create_engine( "mysql+pymysql://root:1234@127.0.0.1:3306/cnblogs", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) '================================' 3.使用引擎拿到链接 conn = engine.raw_connection() 4.剩下的和之前操作sql一样,使用pymysql cursor=conn.cursor(pymysql.cursors.DictCursor) cursor.execute('select * from article limit 10') res = cursor.fetchall() print(res) '使用多线程测试:' from threading import Thread def task(arg): conn = engine.raw_connection() cursor = conn.cursor() cursor.execute('select * from article') res = cursor.fetchall() print(res) cursor.close() conn.close() for i in range(20): t = Thread(target=task,args=(i,)) t.start()
二、sqlalchemy操作表
1.创建、删除表
1.导入 import datetime from sqlalchemy import create_engine from sqlalchemy.orm import declarative_base from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() # Base当成models.Model 2.创建表模型 class User(Base): __tablename__ = 'users' # 创建的表名 # 写字段 id = Column(Integer,primary_key=True,autoincrement=True) # 主键、自增 name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空 普通索引 email = Column(String(32), unique=True) # 唯一索引 # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间 ctime = Column(DateTime, default=datetime.datetime.now) extra = Column(Text) # 大文本 3.没有命令,后期可以使用第三方模块,可以有命令操作 # 目前需要手动做 # sqlalchemy 不能创建数据库,能创建表,不能删除增加字段(第三方模块) 3.1.创建引擎 engine = create_engine( # 需要自己手动创建库 "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy01", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) 3.2.把表模型同步到数据库中(能创建删除,不能修改) Base.metadata.create_all(engine) 3.3.删除表 Base.metadata.drop_all(engine)
2.简单操作(orm)
from models import User 1.创建引擎 from sqlalchemy import create_engine engine = create_engine( # 需要自己手动创建库 "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy01", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) 2.orm操作,借助于engine 得到session(conn)对象 from sqlalchemy.orm import sessionmaker Connection = sessionmaker(bind=engine) conn = Connection() # 不是flask中的session,而是会话,conn的 3. 拿到(conn)会话对象后,进行orm操作 3.1 增加数据 user = User(name='jack',email='1@qq.com') # 插入到数据库 conn.add(user) # 放个对象 # 提交 conn.commit() # 关闭连接 conn.close() 3.2 查询数据 # 查询User表中id为1的所有数据-->是一个列表 res = conn.query(User).filter_by(id=1).all() print(res) 3.3 删除数据 res = conn.query(User).filter_by(name='jack').delete() print(res) # 返回删除的条数 conn.commit() # 提交 3.4 修改 res = conn.query(User).filter_by(name='jack').update({'name':'oscar','extra':'xxxx'}) conn.commit()
三、SQL表模型:一对多关系
1.表模型
import datetime from sqlalchemy import create_engine from sqlalchemy.orm import declarative_base, relationship from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() # Base 当成 models.Model # 一对多 :一个兴趣被多个人喜欢 一个人只喜欢一个兴趣 class Hobby(Base): __tablename__ = 'hobby' id = Column(Integer, primary_key=True) caption = Column(String(50), default='篮球') def __str__(self): return self.caption def __repr__(self): return self.caption class Person(Base): __tablename__ = 'person' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) # hobby指的是tablename而不是类名,uselist=False # 外键关联--》强外键--》物理外键 hobby_id = Column(Integer, ForeignKey("hobby.id")) # 加一个unique=True就是一对一的外键关系 # 跟数据库无关,不会新增字段,只用于快速连表操作 # 基于对象的跨表查询:就要加这个字段,取对象 person.hobby pserson.hobby_id # 类名,backref用于反向查询 hobby = relationship('Hobby', backref='pers') def __str__(self): return self.name def __repr__(self): return self.name if __name__ == '__main__': # 创建引擎 engine = create_engine( "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy02", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) # 把表模型同步到数据库中 Base.metadata.create_all(engine)
2.新增和基于对象的跨表查询
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from models import Hobby, Person engine = create_engine( "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy02", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Session = sessionmaker(bind=engine) session=Session() '''一对多关系新增''' # 增加 Hobby hobby = Hobby(caption='足球') hobby1 = Hobby() session.add_all([hobby,hobby1]) session.commit() '''三种新增方案''' # 方案一 person = Person(name='jack',hobby_id=1) person1 = Person(name='tom',hobby_id=2) session.add_all([person,person1]) session.commit() # 方案二 # 增加person表数据时直接增加关联的hobby表数据 hobby = Hobby(caption='乒乓球') person = Person(name='oscar',hobby=hobby) # 这里的直接使用对象的方式,前提是表模型中必须有relationship session.add(person) person1 = Person(name='kami',hobby=Hobby(caption='棒球')) session.add(person1) session.commit() # 方案三 hobby = session.query(Hobby).filter_by(id=1).first() print(hobby) person = Person(name='ankn',hobby=hobby) person1 = Person(name='tank',hobby_id=hobby.id) session.add_all([person,person1]) session.commit() '''基于对象的跨表查询--正向''' person = session.query(Person).filter_by(id=2).first() # person = session.query(Person).filter_by(name='tom').first() print(person) # tom # 基于对象的跨表查询,正向查询 print(person.hobby.caption) # 篮球 print(person.hobby_id) # 2 '''基于对象的跨表查询--反向''' hobby = session.query(Hobby).filter_by(id=1).first() # hobby = session.query(Hobby).filter_by(caption='足球').first() print(hobby) # 足球 # 基于对象的跨表查询,反向查询 print(hobby.pers) # [jack, ankn, tank] 是一个列表套对象 print(hobby.pers[1].name) # for i in hobby.pers: # 循环拿出每一个对象 # print(i.name)
四、SQL表模型:多对多关系
1.表模型
import datetime from sqlalchemy import create_engine from sqlalchemy.orm import declarative_base, relationship from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() # Base 当成 models.Model '''多对多''' class Boy2Girl(Base): __tablename__ = 'boy2girl' id = Column(Integer, primary_key=True, autoincrement=True) girl_id = Column(Integer, ForeignKey('girl.id')) boy_id = Column(Integer, ForeignKey('boy.id')) # boy = relationship('Boy', backref='boy') class Girl(Base): __tablename__ = 'girl' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) def __str__(self): return self.name def __repr__(self): return self.name class Boy(Base): __tablename__ = 'boy' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(64), unique=True, nullable=False) def __str__(self): return self.name def __repr__(self): return self.name # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以--等同于manytomany # 方便快速查询,写了这个字段,相当于django 的manytomany,快速使用基于对象的跨表查询 girls = relationship('Girl', secondary='boy2girl', backref='boys') if __name__ == '__main__': # 创建引擎 engine = create_engine( "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy02", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) # 把表模型同步到数据库中 Base.metadata.create_all(engine)
2.新增和基于对象的跨表查询
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from models import Girl, Boy, Boy2Girl engine = create_engine( "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy02", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) Session = sessionmaker(bind=engine) session=Session() '''多对多关系新增''' 1.手动操作第三张表,建立关系 # 增加Boy boy = Boy(name='彭于晏') boy1 = Boy(name='吴彦祖') boy2 = Boy(name='金城武') session.add(boy) session.add_all([boy1, boy2]) # 增加Girl girl = Girl(name='金玟庭') girl1 = Girl(name='田振姬') girl2 = Girl(name='赵美延') session.add(girl) session.add_all([girl1, girl2]) session.commit() # 建立外键关系:手动操作第三张表 obj = Boy2Girl(boy_id=3, girl_id=1) obj1 = Boy2Girl(boy_id=3, girl_id=2) obj2 = Boy2Girl(boy_id=3, girl_id=3) session.add_all([obj, obj1, obj2]) session.commit() 2.通过关联关系 girl = Girl(name='沈小婷') girl1 = Girl(name='柳智敏') boy = Boy(name='古天乐',girls=[girl,girl1]) # 可以多条 session.add(boy) session.commit() '''基于对象的跨表查询--正向''' boy = session.query(Boy).filter_by(name='古天乐').first() print(boy) # 基于对象的跨表查询,正向查询 print(boy.girls) # 列表套对象 print(boy.girls[0]) for i in boy.girls: 循环取出对象 print(i.name) '''基于对象的跨表查询--反向''' girl = session.query(Girl).filter_by(name='赵美延').first() print(girl) # 基于对象的跨表查询,反向查询 print(girl.boys) # 列表套对象
五、基于Scoped_Session实现线程安全
session对象
,集成到flask中去,要把session对象做成全局(大家公用),还是每个视图函数独有一个(每次都要实例化得到这个session对象)不是session_key,而是链接会话session
每个视图函数独有一个,每次都要实例化,
sqlalchemy提供了一种方式
,让咱们使用全局的一个session,但是每个视图函数中使用的都是不同的request,session都是这种实现机制
sqlalchemy提供了一种,在不同线程中,虽然使用全局session,实际上每个线程自己独有一个session
1.线程安全
from sqlalchemy.orm import sessionmaker from sqlalchemy import create_engine engine = create_engine("mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy02", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) """ session是链接对象,如果集成到flask中,我们是把session定义成全局还是每个视图函数一个session呢? 正常来讲要每个视图函数定义一个session,有些麻烦。 sqlalchemy 帮咱提供了一个只要定义一次的session,能够做到在不同线程中,使用的是自己的session,底层基于local。 """ 1.原来获得 session对象 # session=Session() # 放在flask中,这样如果做成全局,线程就不安全,想要安全就得在视图函数中每次都实例化得到 # print(type(session)) from sqlalchemy.orm import scoped_session from threading import local from models import User 2.使用scoped_session得到session对象,保证线程安全,本意就是每个线程使用自己独立的session session=scoped_session(Session) print(type(session)) ############# 执行ORM操作 ############# obj = User(name='jack',email='1@qq.com') session.add(obj) # 提交事务 session.commit() # # 关闭session session.close()
2.研究
''' 研究1:session对象,现在不是原来的session对象了 session=Session() <class 'sqlalchemy.orm.session.Session'> session=scoped_session(Session) <class 'sqlalchemy.orm.scoping.scoped_session'> ''' ''' 研究2:类上加装饰器,能够给对象,增加属性或方法 ''' # 加个showName方法和name属性 def showName(): print('xxxxx') def add(func): def inner(*args,**kwargs): res = func(*args,**kwargs) # Person() res.name = 'jack' res.showName=showName return res return inner @add # Person=add(Person) class Person: pass p = Person() print(p.name) # jack p.showName() ''' 研究3:如何保证线程安全 补充from threading import local,local是一个类,类加括号实例对象 l=local() # 而local对象线程是安全的,不需要加锁 这个l,可以被多个线程并发操作, 而且不会数据错乱 如何实现的?内部通过线程id号作区分,取值赋值用的都是自己的那部分,例如 线程1:l.name='jack' 线程2:l.name='tom' # 线程1上取name,永远取出来的是线程1当时放入的值,线程二也是一样 session:scoped_session独享 session.add------》return self._proxied.add() self._proxied----》return self.registry() self.registry = ThreadLocalRegistry(session_factory) ThreadLocalRegistry(session_factory)--》这个对象加括号,会触发它的__call__ def __call__(self) -> _T: try: # 这个register就是self.registry = threading.local(),所以它是线程安全的 # 所以不同线程都是使用自己的value,也就是session对象 return self.registry.value # 不同线程有自己的一个session对象 except AttributeError: val = self.registry.value = self.createfunc() # 所以这个就是Session() return val '''
六、基本增删查改和高级查询
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import scoped_session from models import Boy, Girl, Boy2Girl, Hobby, User, Person from sqlalchemy.sql import text engine = create_engine("mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy02", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = scoped_session(Session) # 线程安全 '''1.基本增加:add、add_all''' # add_all hobby = Hobby(caption='羽毛球') hobby1 = Hobby(caption='橄榄球') session.add_all([hobby,hobby1]) # add person = Person(name='彭于晏',hobby=hobby) session.add(person) session.commit() '''2.删除''' # 方式一 session.query(User).filter_by(id=1).delete() # 方式二: session.delete(对象) user = session.query(User).filter_by(id=1).first() session.delete(user) '''3.修改,更新''' # 方式一 session.query(User).filter_by(id=1).update({'name':'oscar'}) session.commit() # 方式二:类名.属性名,作为要修改的key session.query(Boy).filter_by(id=4).update({Boy.name:'xxxx'}) session.commit() # 方式三: # 对象.name='xxx' session.add(对象) boy=session.query(Boy).filter_by(id=1).first() boy.name='xxl' session.add(boy) # 有id就是修改,没有就是新增 session.commit() # 类似于django的F查询 # 当字符串拼接 session.query(User).filter_by(id=1).update({'name':User.name+'_nb'},synchronize_session=False) # 当字符串相加 session.query(User).filter_by(id=1).update({'id':User.id+6}, synchronize_session="evaluate") session.commit() '''4.查询--->基本查询''' # 1.filter_by 写条件 res=session.query(User).filter_by(name='jack',id=1).first() res=session.query(User).filter_by(name='jack').all() # 放在列表中 不是queryset对象 print(res) # 2.filter 写表达式 res=session.query(User).filter(User.name=='jack').first() res=session.query(User).filter(User.id>=3).all() res=session.query(User).filter(User.name!='xxxx').all() print(res) # 3.只查表中某几个字段,并重命名 # select name as xx,age from user; res=session.query(User.name.label('xx'), User.email) # select name,email from user; res=session.query(User.name, User.email).all() res=session.query(User) # 4.条件可以使用text自己拼凑 占位符 # select * from user where id< 224 and name=jack order by id res = session.query(User).filter(text("id<:value and name=:name")).params(value=224, name='jack').order_by(User.id).all() print(res) # 5.直接写原生sql # SELECT * FROM users where name=jack res = session.query(User).from_statement(text("SELECT * FROM users where name=:name")).params(name='jack').all() res = session.query(User).from_statement(text("SELECT * FROM users where name=:name")).params(name='张三') print(res) print('=======================================') '高级查询' '''表达式,and条件连接''' res = session.query(User).filter(User.id > 1, User.name == 'jack').all() # and条件 '''between''' # select * from User where user.id between 1 and 3 and name=jack res = session.query(User).filter(User.id.between(1, 3), User.name == 'jack').all() '''in 条件''' res = session.query(User).filter(User.id.in_([1, 3, 4])).all() res = session.query(User).filter(User.email.in_(['1@qq.com', '3@qq.com'])).all() '''~非,除。。外''' res = session.query(User).filter(~User.id.in_([1, 3, 4])).all() '''二次筛选''' # select * from User where id not in (select id from User where name=jack) res = session.query(User).filter(~User.id.in_(session.query(User.id).filter_by(name='jack'))).all() '''and or条件''' from sqlalchemy import and_, or_ # or_包裹的都是or条件,and_包裹的都是and条件 res = session.query(User).filter(and_(User.id >= 3, User.name == 'jack')).all() # and条件 # 默认就是and条件,结果和上面一样 res = session.query(User).filter(User.id >= 3,User.name=='jack').all() # select * from User where id < 2 or name = jack; res = session.query(User).filter(or_(User.id < 2, User.name == 'jack')).all() # or条件 res = session.query(User).filter( or_( User.id < 2, and_(User.name == 'jack', User.id > 3), User.extra != "" )).all() '''通配符,模糊查询''' # select * from User where email like %@%; res = session.query(User).filter(User.email.like('%@%')).all() res = session.query(User.id,User.name).filter(~User.name.like('j%')).all() # 不以什么开头 '''限制,用于分页,区间''' res = session.query(User)[2 * 5:2 * 5 + 2] # 切片可以做运算 res = session.query(User)[2:4] # 从零开始,顾头不顾尾 '''排序,根据email降序排列(从大到小)''' res = session.query(User).order_by(User.email.desc()).all() res = session.query(User).order_by(User.email.asc()).all() # 第一个条件重复后,再按第二个条件升序排 res = session.query(User).order_by(User.name.desc(), User.id.asc()).all() '''分组查询''' from sqlalchemy.sql import func # select name from User group by name; 一旦分组后,就只能查询 分组后的字段和聚合函数的字段 res = session.query(User.name).group_by(User.name).all() # 分组之后取最大id,id之和,最小id res = session.query( func.max(User.id), # 最大 func.sum(User.id), # 总和 func.min(User.id), # 最小 func.count(User.id), # 数量 User.name).group_by(User.name).all() for item in res: # 循环拿出列表里面的每一个元组 print(item) '''having筛选''' # select max(id),sum(id),min(id) from User group by name having max(id)>2; res = session.query( func.max(User.id), func.sum(User.id), func.min(User.id)).group_by(User.name).having(func.max(User.id) > 2).all() '''链表操作''' # select * from person,hobby where person.hobby_id=hobby.id; # 原生sql res = session.query(Person, Hobby).filter(Person.hobby_id == Hobby.id).all() # join表,默认是inner join,自动按外键关联 内连接 # select * from Person inner Hobby on Person.hobby_id=Hobby.id; # 原生sql res = session.query(Person).join(Hobby).all() # isouter=True 外连,表示Person left join Favor,没有右连接,反过来即可 # select * from Person left Hobby on Person.hobby_id=Hobby.id; # 原生sql res = session.query(Person).join(Hobby, isouter=True).all() res = session.query(Hobby).join(Person, isouter=True).all() # 反过来顺序就是右链接 # 自己指定on条件(连表条件),第二个参数,支持on多个条件,用and_,同上 res = session.query(Person).join(Hobby, Person.id == Hobby.id, isouter=True) # sql本身有问题,只是给你讲, 自己指定链接字段 '''union和union all''' # 组合(了解)UNION 操作符用于合并两个或多个 SELECT 语句的结果集 q1 = session.query(Boy.id,Boy.name).filter(Boy.id > 1) q2 = session.query(Girl.id,Girl.name).filter(Girl.id < 5) # union 去重 res = q1.union(q2).all() print(res) print(len(res)) q1 = session.query(Boy.name).filter(Boy.id > 1) q2 = session.query(Girl.name).filter(Girl.id < 5) res = q1.union_all(q2).all() print(res) print(len(res)) '''一对多,基于链表跨表查(__链表)''' # 方式一:直接连 res = session.query(Person, Hobby).filter(Person.hobby_id == Hobby.id, Hobby.id >= 2).all() # 方式二:join连 res = session.query(Person).join(Hobby).filter(Person.id >= 2).all() '''多对多关系,基于链表的跨表查''' # 方式一:直接连 res = session.query(Boy, Girl, Boy2Girl).filter(Boy.id == Boy2Girl.boy_id, Girl.id == Boy2Girl.girl_id).all() # 方式二:join连 res = session.query(Boy).join(Boy2Girl).join(Girl).filter(Person.id >= 2).all() print(res) session.commit() session.close()
七、Flask-SQLAlchemy的使用
1.flask中集成sqlalchemy使用(自己做)
'创建一个flask项目' ''' 项目结构 src -__init__.py -db.py -models.py -settings.py -views.py templates -index.html app.py ''' 'app.py' from src import app if __name__ == '__main__': app.run() 'settings.py' DEBUG=True '__init__.py' from flask import Flask app = Flask(__name__, template_folder='../templates') app.config.from_pyfile('./settings.py') # 注册蓝图 from src.views import bp_user app.register_blueprint(bp_user) 'db.py' from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker from sqlalchemy.orm import scoped_session from sqlalchemy.sql import text engine = create_engine("mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy02", max_overflow=0, pool_size=5) Session = sessionmaker(bind=engine) session = scoped_session(Session) # 线程安全 'models.py' import datetime from sqlalchemy import create_engine from sqlalchemy.orm import declarative_base, relationship from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index Base = declarative_base() # Base 当成 models.Model # 单表 class User(Base): __tablename__ = 'users' # 表名 # 写字段 id = Column(Integer, primary_key=True, autoincrement=True) # id 主键 name = Column(String(32), index=True, nullable=False) # name列,索引,不可为空 email = Column(String(32), unique=True) # datetime.datetime.now不能加括号,加了括号,以后永远是当前时间 ctime = Column(DateTime, default=datetime.datetime.now) extra = Column(Text) def __str__(self): return self.name def __repr__(self): return self.name if __name__ == '__main__': # 创建引擎 engine = create_engine( "mysql+pymysql://root:1234@127.0.0.1:3306/sqlalchemy02", max_overflow=0, # 超过连接池大小外最多创建的连接 pool_size=5, # 连接池大小 pool_timeout=30, # 池中没有线程最多等待的时间,否则报错 pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置) ) # 把表模型同步到数据库中 Base.metadata.create_all(engine) 'views.py' from flask import render_template,jsonify from flask import Blueprint bp_user = Blueprint('user',__name__) from .db import session from .models import User @bp_user.route('/') def index(): # 把用户表中所有数据都查出来,返回给前端 res = session.query(User).all() '''前后端分离传入数据''' # 不能直接放入jsonify中,json序列化,只能序列化 数字、字符串、列表、布尔和字典 # res是列表套对象,是不能直接序列化的 # l = [] # for i in res: # l.append({'name': i.name, 'email': i.email}) # return jsonify({'code': 100, 'msg': '成功', 'results': l}) '''前后端混合传入数据''' return render_template('index.html',users=res) 'index.html' <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>hello world</h1> {% for user in users %} <p>用户名:{{user.name}}----->邮箱:{{user.email}}</p> {% endfor %} </body> </html>
2.flask中集成flask-sqlalchemy使用
# 第三方: flask-sqlalchemy 封装了用起来,更简洁 安装:pip install flask-sqlalchemy '''借助于flask-sqlalchemy''' 1 导入 from flask_sqlalchemy import SQLAlchemy 2 实例化得到对象 db = SQLAlchemy() 3 将db注册到app中 db.init_app(app) ------2,3 可以合并为db = SQLAlchemy(app)-------- 4 视图函数中使用session 全局的db.session # 线程安全的 5 models.py 中继承Model db.Model 6 写字段 username = db.Column(db.String(80), unique=True, nullable=False) 7 配置文件中加入 SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root@127.0.0.1:3306/ddd?charset=utf8" SQLALCHEMY_POOL_SIZE = 5 SQLALCHEMY_POOL_TIMEOUT = 30 SQLALCHEMY_POOL_RECYCLE = -1 # 追踪对象的修改并且发送信号 SQLALCHEMY_TRACK_MODIFICATIONS = False
八、Flask-Migrate的使用
1.数据库肯定要自己创建 2.创建表,增加删除字段,需要手动做处理,而django 有两个命令自动做 python manage.py makemigrations # 记录变化 python manage.py migrate #把变化同步到数据库 -那有没有种方案,跟djagno一样,自动记录,自动迁移? 3.第三方模块:flask-migrate,就能完成跟django一样,只是命令稍微有些不同 # https://github.com/miguelgrinberg/Flask-Migrate/ 安装:pip install Flask-Migrate --upgrade 4.使用步骤 -导入 from flask_migrate import Migrate -注册 '需要先导入app和db才行' migrate = Migrate(app, db) # flask就会多出几个命令 ''' flask --app manage:app db init # 初始化,第一次执行,以后再也不执行了,它执行完,会出现一个migrations文件夹 flask --app manage:app db migrate # django中的makemigrations 是一模一样 flask --app manage:app db upgrade # 跟django的migrate一样 如果启动文件就是叫app则可以省略--app manage:app,直接flask db migrate这样使用即可,其他命令也是 '''
flask中还有和django中的模块类似的模块,如cors、cache、restful等等,具体可自行查找了解