阅读量:3
路径操作的高级配置
OpenAPI 的 operationId
from fastapi import FastAPI app = FastAPI() # 通过 operation_id 参数设置 @app.get("/items/", operation_id="some_specific_id_you_define") async def read_items(): return [{"item_id": "Foo"}]
使用 路径操作函数 的函数名作为 operationId
from fastapi import FastAPI from fastapi.routing import APIRoute app = FastAPI() @app.get("/items/") async def read_items(): return [{"item_id": "Foo"}] def use_route_names_as_operation_ids(app: FastAPI) -> None: """ Simplify operation IDs so that generated API clients have simpler function names. Should be called only after all routes have been added. """ for route in app.routes: if isinstance(route, APIRoute): # 个人觉得这种操作挺不错,可以在一个地方统一处理,更有利于建立规范 且方便管理 route.operation_id = route.name # in this case, 'read_items' use_route_names_as_operation_ids(app)
从 OpenAPI 中排除
from fastapi import FastAPI app = FastAPI() # 使用 include_in_schema 参数并将其设置为 False, # openapi 文档中就看不到这个接口的信息 # # 个人认为,可以作为是否完成的开关使用,默认设置 False,当接口完成开发后设置为 True @app.get("/items/", include_in_schema=False) async def read_items(): return [{"item_id": "Foo"}]
docstring 的高级描述
from typing import Set, Union from fastapi import FastAPI from pydantic import BaseModel app = FastAPI() class Item(BaseModel): name: str description: Union[str, None] = None price: float tax: Union[float, None] = None tags: Set[str] = set() @app.post("/items/", response_model=Item, summary="Create an item") async def create_item(item: Item): """ Create an item with all the information: - **name**: each item must have a name - **description**: a long description - **price**: required - **tax**: if the item doesn't have tax, you can omit this - **tags**: a set of unique tag strings for this item > \\f : 换页 的转义字符 1 <br/> > 比较有意思的是,这里可以按照 Markdown 的语法写文档注释 > Swagger UI 只支持一些简单的 Markdown 语法 > 比如现在的 `>` Swagger UI就不支持,但是 ReDoc 支持 > 但是比较遗憾的是 这两者对 Markdown 的语法支持的不够完善 但是比较遗憾的是 这两者都不支持换行 但是比较遗憾的是 这两者都不支持换行 但是比较遗憾的是 这两者都不支持换行 `\n` 和 `<br/>` 两者的换行还不一样 对于 `>` 中的换行 要使用 `<br/>`, 不然会作为两段 引用, 只能说对于 Markdown语法的支持两者都有提升空间啊 \f \\f : 换页 的转义字符 2 换页的内容,不会再 上面提到的两者中 进行展示 比较有意思的是,这里可以按照 Markdown 的语法写文档注释 然后在文档中就会按照 Markdown 的样式进行展示 :param item: User input. """ return item
自定义响应
使用 ORJSONResponse
如果你需要压榨性能,你可以安装并使用
orjson
并将响应设置为ORJSONResponse
from fastapi import FastAPI from fastapi.responses import ORJSONResponse app = FastAPI() # 使用 ORJSONResponse 代替 JSONResponse @app.get("/items/", response_class=ORJSONResponse) async def read_items(): return ORJSONResponse([{"item_id": "Foo"}])
StreamingResponse
如果您有类似文件的对象(例如,由 open() 返回的对象),则可以在 StreamingResponse 中将其返回
from fastapi import FastAPI from fastapi.responses import StreamingResponse some_file_path = "large-video-file.mp4" app = FastAPI() @app.get("/") def main(): def iterfile(): # (1) with open(some_file_path, mode="rb") as file_like: # (2) yield from file_like # (3) # 使用 StreamingResponse 返回数据 return StreamingResponse(iterfile(), media_type="video/mp4")
FileResponse
异步传输文件作为响应。
from fastapi import FastAPI from fastapi.responses import FileResponse some_file_path = "large-video-file.mp4" app = FastAPI() @app.get("/") async def main(): return FileResponse(some_file_path)
响应 Cookies
使用 Response
参数
app = FastAPI() @app.post("/cookie-and-object/") def create_cookie(response: Response): # 定义一个 response: Response 的参数 # 调用 set_cookie response.set_cookie(key="fakesession", value="fake-cookie-session-value") return {"message": "Come to the dark side, we have cookies"}
直接响应 Response
from fastapi import FastAPI from fastapi.responses import JSONResponse app = FastAPI() @app.post("/cookie/") def create_cookie(): content = {"message": "Come to the dark side, we have cookies"} # 创建一个 Response 或其子类 对象 response = JSONResponse(content=content) # 调用 set_cookie response.set_cookie(key="fakesession", value="fake-cookie-session-value") return response
响应头
和 响应Cookies 类似也有两种方式
使用 Response
参数
from fastapi import FastAPI, Response app = FastAPI() @app.get("/headers-and-object/") def get_headers(response: Response): # 声明一个 Response 类的形参 # 像字典添加 键值对 一样,往 headers中添加键值对 response.headers["X-Cat-Dog"] = "alone in the world" return {"message": "Hello World"}
直接返回 Response
from fastapi import FastAPI from fastapi.responses import JSONResponse app = FastAPI() @app.get("/headers/") def get_headers(): content = {"message": "Hello World"} # 将要返回的响应头信息先存到字典中 header_dict header_dict = {"X-Cat-Dog": "alone in the world", "Content-Language": "en-US"} # 通过 kv 形式设置 headers, headers=header_dict return JSONResponse(content=content, headers=header_dict )
直接使用请求
from fastapi import FastAPI, Request app = FastAPI() @app.get("/items/{item_id}") def read_root(item_id: str, request: Request): # 声明一个形参 request: Request # 可以从 request 中得到请求信息 client_host = request.client.host return {"client_host": client_host, "item_id": item_id}
添加 ASGI 中间件
from fastapi import FastAPI from unicorn import UnicornMiddleware app = FastAPI() # 通过 FastAPI 的实例 app 调用 add_middleware 方法添加中间件 app.add_middleware(UnicornMiddleware, some_config="rainbow")
使用代理
在 FastAPI 应用里设置 root_path
from fastapi import FastAPI, Request # 将 "/api/v1" 赋值给 root_path app = FastAPI(root_path="/api/v1") @app.get("/app") def read_main(request: Request): return {"message": "Hello World", "root_path": request.scope.get("root_path")}
附加的服务器(多环境部署时)
from fastapi import FastAPI, Request # 可以通过 servers,将多个环境的地址以列表的形式赋值给 servers app = FastAPI( servers=[ {"url": "https://stag.example.com", "description": "Staging environment"}, {"url": "https://prod.example.com", "description": "Production environment"}, ], root_path="/api/v1", # 将 root_path 从 servers 剔除,root_path_in_servers=False # 默认 servers 会包含 root_path, root_path_in_servers=True # root_path_in_servers=False, ) @app.get("/app") def read_main(request: Request): return {"message": "Hello World", "root_path": request.scope.get("root_path")}
WebSockets
安装 WebSockets
pip install websockets
创建 websocket
from fastapi import FastAPI, WebSocket from fastapi.responses import HTMLResponse app = FastAPI() html = """ <!DOCTYPE html> <html> <head> <title>Chat</title> </head> <body> <h1>WebSocket Chat</h1> <form action="" οnsubmit="sendMessage(event)"> <input type="text" id="messageText" autocomplete="off"/> <button>Send</button> </form> <ul id='messages'> </ul> <script> var ws = new WebSocket("ws://localhost:8000/ws"); ws.onmessage = function(event) { var messages = document.getElementById('messages') var message = document.createElement('li') var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body> </html> """ @app.get("/") async def get(): return HTMLResponse(html) # 通过 app.websocket 创建一个 websocket @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): # 接收消息 await websocket.accept() while True: data = await websocket.receive_text() # 发送消息 await websocket.send_text(f"Message text was: {data}")
事件:启动 - 关闭
startup
事件
from fastapi import FastAPI app = FastAPI() items = {} # 通过 app.on_event("startup") ”声明“一个函数为启动函数 # 可以做一些预处理工作,比如 往数据库中写入一些初始数据 @app.on_event("startup") async def startup_event(): items["foo"] = {"name": "Fighters"} items["bar"] = {"name": "Tenders"} @app.get("/items/{item_id}") async def read_items(item_id: str): return items[item_id]
shutdown
事件
from fastapi import FastAPI app = FastAPI() # 通过 app.on_event("shutdown") ”声明“一个函数为关闭函数 @app.on_event("shutdown") def shutdown_event(): with open("log.txt", mode="a") as log: log.write("Application shutdown") @app.get("/items/") async def read_items(): return [{"name": "Foo"}]
到此结 DragonFangQy 2024.07.18