背景
通过官方文档学习
官方文档地址 (2.0.x版本) Welcome to Flask — Flask Documentation (2.0.x) (palletsprojects.com)
quickstart
第一个app
代码
文件命名为hello.py
windows执行set FLASK_APP=hello,linux执行export FLASK_APP=hello,然后执行flask run即可运行app
默认运行的app是127.0.0.1:5000,可通过-h -p参数改ip地址和端口。其中,127.0.0.1只能本电脑访问,如果希望其他局域网也可访问,可将host设为0.0.0.0。如果在linux,需要放通对应的端口才能访问:iptables -A INPUT -p tcp --dport 8089 -j ACCEPT; iptables -A INPUT -p udp --dport 8089 -j ACCEPT;
部署时如果是测试环境会有提示,页面也会显示详细报错,可通过环境变量FLASK_ENV设置
http转译
将用户提供的所有值转化为转义字符,防止注入攻击
from markupsafe import escape @app.route("<name>") def hello_world(name): return f"hello, {escape(name)}"
<name>可捕获url的参数作为变量传给url处理函数
路由
使用app的方法.route来进行路由,如@app.route("<test_var>")
一个url处理函数可有多个route装饰,即一个url处理函数,匹配多个url
路由变量类型
可以对路由捕获的参数进行类型限定,如@app.route("<int:test_var>")
关于url末尾的斜杠
resource是文件夹名这种时,可以在url后加个/,当访问没加/,也会重定向到对应url
resource是文件时,url后一般不加/,此时如果访问时加了/,会报错,这防止了对resource重复进行重定向
url_for函数
该函数可用于建立url,用于将所有url汇总并做些自己的处理,但每个url都需要调用一次,函数传入第一个参数是route函数名,返回对应的url,后面可跟任意多关键字参数
可通过该函数,新建某个url,指向已有的url处理函数
注意,route是根据url找处理函数,url_for是根据函数名找url,可以用在<a>的href
请求方法
可通过@app.route("/test", methods=['GET', 'POST'])指定url允许的请求方法,如果不带method参数,默认只支持get方法,但如果显式规定了GET方法,也会自动支持HEAD和OPTIONS方法
静态文件
当需要js,css等文件,可通过静态文件部署
可通过执行url_for('static', filename='test.css')实现静态文件对应的访问endpoint,在这之前需要先在目录下手动创建static文件夹,filename对应的文件应该也放在目录下
from flask import Flask from flask import url_for app = Flask(__name__) @app.route("/hello") def hello_world(): return "<p>hello world</p>" with app.test_request_context(): url_for('static', filename='test.css')
模板渲染
可通过render_template函数渲染,需要提供模板名,模板需要放在项目的templates目录下
{% if name %} <p>hello, {{ name }}</p> {% else %} <p>hello, world</p> {% endif %}
from flask import Flask from flask import url_for, render_template app = Flask(__name__) @app.route("/hello/<name>") def hello_world(name): return render_template("test.html", name=name) #return "<p>hello world</p>" with app.test_request_context(): url_for('static', filename='test.css')
访问请求数据
可通过访问request对象对请求数据进行访问
request.form 表单数据
request.method 请求方法
request.args.get('key_name', '') 访问url里的参数,如果get报错会,服务器最终会返回400
文件上传
上传文件时不要忘了enctype="multipart/form-data"就行,否则浏览器不会上传你的文件
可执行request.files访问文件,文件对象和python内置file对象差不多,多了个save方法
可执行request.files['filename'].filename获取文件原始文件名,一般不可以新人原始文件名,如果要信任,需要使用secure_filename函数处理
cookie
request.cookies访问cookie,是字典形式。建议使用session带的cookie而不是直接使用request.cookie,因为更安全
注意用法,make_response(render_template(**kwargs))
重定向与抛出错误
可以使用flask.redirect, flask.abort进行重定向与返回错误代码
error还可以使用app的errorhandler设定错误码
关于响应对象
函数返回的值会被自动转化成response对象,转化规则如下:1如果返回类型是response类型直接返回 2如果是字符串则作为入参给response 3如果返回字典,最后直接返回调jsonify处理的结果,而不是resp对象 4如果返回是元组,元组里的status code会覆盖 5如果以上都不是,flask会认为返回值是个合法的,直接入参给response
make_response可以封装返回值和状态码,可对make_response对象设定响应头
session对象
除了request对象还有session对象,session可以针对某个特定用户存储一些信息等功能
密钥方法有很多,一个是import secret as s; s.token_hex()
tutorial
app setup
flask以app方式运作,app是flask.Flask的实例,可以创全局app,也可以在函数创app,通过函数调用返回app。
数据库配置
对接sqlite3,缺点是不支持并发,对于并发请求只能串行写入
连接db
g是全局对象,db操作也是全局的,不是每个request都会创db连接这种
创建表
flask会把user数据存在user表,post数据存在post表,先要创建这些表,框架不会自动创建
再去写个函数调用这个schema.sql
open_resource的文件是基于instance目录下的文件,这么写不需要显式指定文件目录
click.command定义命令行,会调用被装饰的函数
注册app
close_db和init_db_command函数需要给app注册,否则app不会用到这两个函数
teardown_appcontext是返回响应给client前需要做的事情
add_command添加了可以让flask命令调用的命令
此函数需要在app工厂函数里调用
初始化数据库
至此,可在flaskr上级目录下调用flask init-db,即可执行上述db初始化动作
# flaskr/db.py import sqlite3 import click from flask import current_app, g from flask.cli import with_appcontext def get_db(): if 'db' not in g: g.db = sqlite3.connect(current_app.config['DATABASE'], detect_types=sqlite3.PARSE_DECLTYPES) g.db.row_factory = sqlite3.Row return g.db def close_db(e=None): db = g.pop('db', None) if db is not None: db.close() def init_db(): db = get_db() with current_app.open_resource('schema.sql') as f: db.executescript(f.read().decode('utf-8')) @click.command('init-db') @with_appcontext def init_db_command(): init_db() click.echo('Initialized the database') def init_app(app): app.teardown_appcontext(close_db) app.cli.add_command(init_db_command)
# flaskr/schema.sql DROP TABLE IF EXISTS user; DROP TABLE IF EXISTS post; CREATE TABLE user ( id INTEGER PRIMARY KEY AUTOINCREMENT, username TEXT UNIQUE NOT NULL, password TEXT NOT NULL ); CREATE TABLE post( id INTEGER PRIMARY KEY AUTOINCREMENT, author_id INTEGER NOT NULL, created TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, title TEXT NOT NULL, body TEXT NOT NULL, FOREIGN KEY (author_id) REFERENCES user (id) );
# flask/__init__.py import os from flask import Flask def create_app(test_config=None): app = Flask(__name__, instance_relative_config=True) app.config.from_mapping(SECRET_KEY='dev',DATABASE=os.path.join(app.instance_path, 'flaskr.sqlite')) if test_config is None: app.config.from_pyfile('config.py', silent=True) else: app.config.from_mapping(test_config) try: os.makedirs(app.instance_path) except OSError: pass @app.route('/hello') def hello(): return 'hello, world' from . import db db.init_app(app) return app
view和blueprints
blueprint可以组织view和其他相关代码,view需要注册给blueprint
创建blueprint
__name__表示blueprint的相对未知,url_prefix用来关联url
蓝图可以在工厂函数中通过调用app.register_blueprint()函数来注册并使用
第一个视图 注册
写一个注册视图
import functools from flask import Blueprint, flash, g, redirect, render_template, request, session, url_for from werkzeug import check_password_hash, generate_password_hash from flaskr.db import get_db bp = Blueprint('auth', __name__, url_prefix='/auth') @bp.route('/register', methods=('GET', 'POST')) def register(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] db = get_db error = None if not username: error = 'username is required' elif not password: error = 'password is required' if error is None: try: db.execute("INSERT INFO user (username, password) VALUES (?, ?)", (username, generate_password_hash(password))) db.commit() except db.IntegrityError: error = f'User {username} is already registerd' else: return redirect(url_for('auth.login')) flash(error) return render_template('auth/register.html)
bp.route将url注册关联到auth blueprint下
request.form是个dict类型的数据
注意数据库插入数据语句采用?作为占位符
pwd直接存数据库不安全,用generate_password_hash生成加密pwd
如果username已存在会报IntegrityError
注册完成后,调用redirect可重定向到登录界面
flash方法可存储error信息,在模板渲染时可拿来用
登录视图
@bp.route('/login', methods=('GET', 'POST')) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] db = get_db error = None user = db.execute('SELECT * from user WHERE username = ?', (username, )).fetchone() if user is None: error = 'incorrect username' elif not check_password_hash(user['password'], password): error = 'incorrect password' if error is None: session.clear() session['user_id'] = user['id'] return redirect(url_for('index')) flash(error) return render_template('auth/login.html')
fetchone返回查询结果的一个,如果查询为空则返回None
check_password_hash将密码加密后和已存储的密码对比是否一致
注意登录成功时会清除当前session,然后将登入用户id存在session,后续请求可通过判断session有无用户id,直接用此session,从而省去后续登录,这需要再写一个逻辑用来在开始视图函数前判断是否已登录
@bp.before_app_request def load_logged_in_user(): user_id = session.get('user_id') if user_id is None: g.user = None else: g.user = get_db().execute('SELECT * FROM user WHERE id = ?', (user_id, )).fetchone()
注销视图
@bp.route('/logout') def logout(): session.clear() return redirect(url_for('index'))
其他视图的认证
其他视图的增删改查都需要用户登入,在auth写一个检测用户登入信息的函数,当装饰器用
def login_required(view): @functools.wrap(view) def wrapped_view(**kwargs): if g.user is None: return redirect(url_for('auth.login')) return view(**kwargs) return wrapped_view
模板
基本模板
此模板用作其他模板基础模板,其他模板会在此基础扩展
<!doctype html> <title>{% block title %}{% endblock %} - Flaskr</title> <link rel="stylesheet" href="{{ url_for('static', filename='style.css') }}"> <nav> <h1>Flaskr</h1> <ul> {% if g.user %} <li><span>{{ g.user['username'] }}</span></li> <li><a href="{{ url_for('auth.logout') }}">Log Out</a></li> {% else %} <li><a href="{{ url_for('auth.register') }}">Register</a></li> <li><a href="{{ url_for('auth.login') }}">Register</a></li> {% endif %} </ul> </nav> <section> <header> {% block header %}{% endblock %} </header> {% for message in get_flashed_messages() %} <div class="flash">{{ message }}</div> {% endfor %} {% block content %}{% endblock %} </section>
全局对象g在模板也同样可以使用
注意,在视图用的flash方法,在视图里可以调用方法get_flashed_messages()方法获取flash信息
注册模板
{% extends 'base.html' %} {% block header %} <h1>{% block title %}Register{% endblock %}</h1> {% endblock %} {% block content %} <form> <label for="username">Username</label> <input name="username" id="username" required /> <label for="password">Password</label> <input type="password" name="password" id="password" required /> <input type="submit" value="Register" /> </form> {% endblock %}
extend扩展了base模板
登录模板
注册用户
现在,让我们到注册界面,注册一个用户
如果不填信息会报错
两个input去掉required的话,不填信息再次点击注册,报错
用已注册的名字再注册提示已注册
登录报错,查看后台日志,因为还没实现index
静态文件
在base.py模板中已经放好样式表,通过url_for('static', filename='style.css')访问,看看效果
blog blueprint
类似auth blueprint,再开发一个blog
blueprint
参考auth blueprint写一个blog的blueprint注意,blog的没有url_prefix,也就是说直接在根路径访问
注意这么写了以后ur_for('index') url_for('blog.index')效果都是一样的
from flask import url_for, Blueprint, flash, g, redirect, render_template, request from werkzeug.exceptions import abort from flaskr.auth import login_required from flaskr.db import get_db bp = Blueprint('blog', __name__)
# init db from . import db db.init_app(app) # init blueprint auth from . import auth app.register_blueprint(auth.bp) # init blueprint blog from . impor blog app.register_blueprint(blog.bp) app.add_url_rule('/', endpoint='index') return app
index
@bp.route('/') def index(): db = get_db() posts = db.execute('SELECT p.id, title, body, created, author_id, username FROM post p JOIN user u ON p.author_id = u.id ORDER BY created DESC').fetchall() return render_template('blog/index.html', posts=posts)
{% extends 'base.html' %} {% block header %} <h1>{% block title %}Posts{% endblock %}</h1> {% if g.user %} <a class="action" href="{{ url_for('blog.create') }}">New</a> {% endif %} {% endblock %} {% block content %} {% for post in posts %} <article> <header> <div> <h1> {{ post['title'] }} </h1> <div class="about"> by {{ post['username'] }} on {{ post['created'].strftime('%Y-%m-%d') }} </div> </div> {% if g.user['id'] == post['author_id'] %} <a class="action" href="{{ url_for('blog.update', id=post['id']) }}"> Edit </a> {% endif %} </header> <p class='body'> {{ post['body'] }} </p> </article> {% if not loop.last %} <hr> {% endif %} {% endfor %} {% endblock %}
create
login_required是之前检查g.user的,若为空则跳转登录界面
@bp.route('/create', methods=('GET', 'POST')) @login_required def create(): if request.method == 'POST': title = request.form['title'] body = request.form['body'] error = None if not title: error = 'Titie is required' else: db = get_db() db.execute('INSERT INTO post (title, body, author_id) VALUES (?, ?, ?)', (title, body, g.user['id'])) db.commit() return redirect(url_for('blog.index')) return render_template('blog/create.html')
{% extends 'base.html' %} {% block header %} <h1>{% block title %}New Post{% endblock %}</h1> {% endblock %} {% block content %} <form method="post"> <label for="title">Title</label> <input name="title" id="title" value="{{ request.form['title'] }}" required> <label for="body">Body</label> <textarea name="body" id="body">{{ request.form['body'] }}</textarea> <input type="submit" value="Save"> </form> {% endblock %}
update
def get_post(id, check_author=True): post = get_db().execute('SELECT p.id, title, body, create, author_id, username FROM post p JOIN user u ON p.author_id = u.id WHERE p.id = ?', (id, )).fetchone() if post is None: abort(404, f'Post id {id} doesn\'t exist.') if check_author and post['author_id'] != g.user['id']: abort(403) return post @bp.route('/<int:id>/update', methods=('GET', 'POST')) @login_required def update(id): post = get_post(id) if request.method == 'POST': title = request.form['title'] body = request.form['body'] error = None if not title: error = 'Title is required.' if error is not None: flash(error) else: db = get_db() db.execute( 'UPDATE post SET title = ?, body = ?' ' WHERE id = ?', (title, body, id) ) db.commit() return redirect(url_for('blog.index')) return render_template('blog/update.html', post=post)
注意,对于route有参数的url,用url_for时要加参数,比如url_for('blog.update', id=post['id'])
{% extends 'base.html' %} {% block header %} <h1>{% block title %}Edit "{{ post['title'] }}"{% endblock %}</h1> {% endblock %} {% block content %} <form method="post"> <label for="title">Title</label> <input name="title" id="title" value="{{ request.form['title'] or post['title'] }}" required> <label for="body">Body</label> <textarea name="body" id="body">{{ request.form['body'] or post['body'] }}</textarea> <input type="submit" value="Save"> </form> <hr> <form action="{{ url_for('blog.delete', id=post['id']) }}" method="post"> <input class="danger" type="submit" value="Delete" onclick="return confirm('Are you sure?');"> </form> {% endblock %}
delete
没有视图,嵌在update里了
@bp.route('/<int:id>/delete', methods=('POST',)) @login_required def delete(id): get_post(id) db = get_db() db.execute('DELETE FROM post WHERE id = ?', (id,)) db.commit() return redirect(url_for('blog.index'))