一、前言
使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。
FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,FastAPI 还提供了容器化部署能力,开发者可以轻松打包 AI 模型为 Docker 镜像,实现跨环境的部署和扩展。
总之,使用 FastAPI 可以大大提高 AI 应用程序的开发效率和用户体验,为 AI 模型的部署和交互提供全方位的支持。
基础入门:开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(一),本篇学习如何接收认证参数以及处理断开连接
二、术语
2.1.FastAPI
FastAPI 是一个用于构建 API 的现代、快速(高性能)的 Python Web 框架。它是基于标准 Python 类型注释的 ASGI (Asynchronous Server Gateway Interface) 框架。
FastAPI 具有以下主要特点:
快速: FastAPI 使用 ASGI 服务器和 Starlette 框架,在性能测试中表现出色。它可以与 Uvicorn 一起使用,提供非常高的性能。
简单: FastAPI 利用 Python 类型注释,使 API 定义变得简单且直观。开发人员只需要定义输入和输出模型,FastAPI 会自动生成 API 文档。
现代: FastAPI 支持 OpenAPI 标准,可以自动生成 API 文档和交互式文档。它还支持 JSON Schema 和数据验证。
全功能: FastAPI 提供了路由、依赖注入、数据验证、安全性、测试等功能,是一个功能齐全的 Web 框架。
可扩展: FastAPI 被设计为可扩展的。开发人员可以轻松地集成其他库和组件,如数据库、身份验证等。
2.2.WebSocket
是一种计算机通信协议,它提供了在单个 TCP 连接上进行全双工通信的机制。它是 HTML5 一个重要的组成部分。
WebSocket 协议主要有以下特点:
全双工通信:WebSocket 允许客户端和服务器之间进行双向实时通信,即数据可以同时在两个方向上流动。这与传统的 HTTP 请求-响应模型不同,HTTP 中数据只能单向流动。
持久性连接:WebSocket 连接是一种持久性的连接,一旦建立就会一直保持,直到客户端或服务器主动关闭连接。这与 HTTP 的连接是短暂的不同。
低开销:相比 HTTP 请求-响应模型,WebSocket 在建立连接时需要较少的数据交换,因此网络开销较小。
实时性:由于 WebSocket 连接是持久性的,且数据可以双向流动,因此 WebSocket 非常适用于需要实时、低延迟数据交互的应用场景,如聊天应用、实时游戏、股票行情等。
三、前置条件
3.1. 创建虚拟环境&安装依赖
conda create -n fastapi_test python=3.10 conda activate fastapi_test pip install fastapi websockets uvicorn
四、技术实现
4.1. 接收认证参数
在使用 FastAPI 和 WebSocket 构建 API 接口时,存在一些安全风险需要注意,例如:身份验证和授权:确保API有适当的身份验证和授权机制,以防止未经授权的访问
服务端:
from typing import Annotated from fastapi import ( Depends, FastAPI, WebSocket, WebSocketException, status, ) import uvicorn app = FastAPI() async def authenticate( websocket: WebSocket, userid: str, secret: str, ): if userid is None or secret is None: raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION) print(f'userid: {userid},secret: {secret}') if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret: return 'pass' else: return 'fail' @app.websocket("/ws") async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],): await websocket.accept() while True: text = await websocket.receive_text() if 'fail' == permission: await websocket.send_text( f"authentication failed" ) else: await websocket.send_text(f"receive message from: {userid}, text was: {text}") if __name__ == '__main__': uvicorn.run(app, host='0.0.0.0',port=7777)
客户端:
<!DOCTYPE html> <html> <head> <title>Chat</title> </head> <body> <h1>WebSocket Chat</h1> <form action="" onsubmit="sendMessage(event)"> <label>USERID: <input type="text" id="userid" autocomplete="off" value="12345"/></label> <label>SECRET: <input type="text" id="secret" autocomplete="off" value="xxxxxxxxxxxxxxxxxxxxxxxxxx"/></label> <br/> <button onclick="connect(event)">Connect</button> <hr> <label>Message: <input type="text" id="messageText" autocomplete="off"/></label> <button>Send</button> </form> <ul id='messages'> </ul> <script> var ws = null; function connect(event) { var userid = document.getElementById("userid") var secret = document.getElementById("secret") ws = new WebSocket("ws://localhost:7777/ws?userid="+userid.value+"&secret=" + secret.value); ws.onmessage = function(event) { var messages = document.getElementById('messages') var message = document.createElement('li') var content = document.createTextNode(event.data) message.appendChild(content) messages.appendChild(message) }; event.preventDefault() } function sendMessage(event) { var input = document.getElementById("messageText") ws.send(input.value) input.value = '' event.preventDefault() } </script> </body> </html>
调用结果:
输入正确的userid和secret
输入错误的userid和secret
4.2. 处理断开连接
在4.1.接收认证参数的代码基础上,进行修改
from typing import Annotated from fastapi import ( Depends, FastAPI, WebSocket, WebSocketException, WebSocketDisconnect, status, ) import uvicorn class ConnectionManager: def __init__(self): self.active_connections: list[WebSocket] = [] async def connect(self, websocket: WebSocket): await websocket.accept() self.active_connections.append(websocket) def disconnect(self, websocket: WebSocket): self.active_connections.remove(websocket) async def send_personal_message(self, message: str, websocket: WebSocket): await websocket.send_text(message) async def broadcast(self, message: str): for connection in self.active_connections: await connection.send_text(message) manager = ConnectionManager() app = FastAPI() async def authenticate( websocket: WebSocket, userid: str, secret: str, ): if userid is None or secret is None: raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION) print(f'userid: {userid},secret: {secret}') if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret: return 'pass' else: return 'fail' @app.websocket("/ws") async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],): await manager.connect(websocket) try: while True: text = await websocket.receive_text() if 'fail' == permission: await manager.send_personal_message( f"authentication failed", websocket ) else: await manager.send_personal_message(f"receive message from: {userid}, text was: {text}", websocket) except WebSocketDisconnect: manager.disconnect(websocket) print(f"Client #{userid} left the chat") await manager.broadcast(f"Client #{userid} left the chat") if __name__ == '__main__': uvicorn.run(app, host='0.0.0.0',port=7777)
调用结果:
1. 打开会话一:
2. 打开会话二:
3.关闭其中一个会话
五、附带说明
5.1. FastAPI框架中Depends关键字的作用
依赖管理:定义函数之间的依赖关系,将一个函数的输出作为另一个函数的输入。
授权和认证:检查用户的身份和权限,确保只有授权的用户才能访问特定的路由或执行特定的操作。
数据校验:对输入数据进行校验,确保输入数据满足特定的格式或约束条件。
依赖注入:实现依赖注入,将依赖项注入到函数中,而不是在函数内部创建依赖项。