开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(五)

avatar
作者
猴君
阅读量:3

一、前言

    使用 FastAPI 可以帮助我们更简单高效地部署 AI 交互业务。FastAPI 提供了快速构建 API 的能力,开发者可以轻松地定义模型需要的输入和输出格式,并编写好相应的业务逻辑。

    FastAPI 的异步高性能架构,可以有效支持大量并发的预测请求,为用户提供流畅的交互体验。此外,FastAPI 还提供了容器化部署能力,开发者可以轻松打包 AI 模型为 Docker 镜像,实现跨环境的部署和扩展。

    总之,使用 FastAPI 可以大大提高 AI 应用程序的开发效率和用户体验,为 AI 模型的部署和交互提供全方位的支持。

    LangChain基础入门:开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(一),本篇学习如何集成LangChain进行模型交互,并使用工具获取实时信息


二、术语

2.1.FastAPI

    FastAPI 是一个用于构建 API 的现代、快速(高性能)的 Python Web 框架。它是基于标准 Python 类型注释的 ASGI (Asynchronous Server Gateway Interface) 框架。

FastAPI 具有以下主要特点:

  1. 快速: FastAPI 使用 ASGI 服务器和 Starlette 框架,在性能测试中表现出色。它可以与 Uvicorn 一起使用,提供非常高的性能。

  2. 简单: FastAPI 利用 Python 类型注释,使 API 定义变得简单且直观。开发人员只需要定义输入和输出模型,FastAPI 会自动生成 API 文档。

  3. 现代: FastAPI 支持 OpenAPI 标准,可以自动生成 API 文档和交互式文档。它还支持 JSON Schema 和数据验证。

  4. 全功能: FastAPI 提供了路由、依赖注入、数据验证、安全性、测试等功能,是一个功能齐全的 Web 框架。

  5. 可扩展: FastAPI 被设计为可扩展的。开发人员可以轻松地集成其他库和组件,如数据库、身份验证等。

2.2.WebSocket

    是一种计算机通信协议,它提供了在单个 TCP 连接上进行全双工通信的机制。它是 HTML5 一个重要的组成部分。

WebSocket 协议主要有以下特点:

  1. 全双工通信:WebSocket 允许客户端和服务器之间进行双向实时通信,即数据可以同时在两个方向上流动。这与传统的 HTTP 请求-响应模型不同,HTTP 中数据只能单向流动。

  2. 持久性连接:WebSocket 连接是一种持久性的连接,一旦建立就会一直保持,直到客户端或服务器主动关闭连接。这与 HTTP 的连接是短暂的不同。

  3. 低开销:相比 HTTP 请求-响应模型,WebSocket 在建立连接时需要较少的数据交换,因此网络开销较小。

  4. 实时性:由于 WebSocket 连接是持久性的,且数据可以双向流动,因此 WebSocket 非常适用于需要实时、低延迟数据交互的应用场景,如聊天应用、实时游戏、股票行情等。

2.3.Tool

    Tool(工具)是为了增强其语言模型的功能和实用性而设计的一系列辅助手段,用于扩展模型的能力。例如代码解释器(Code Interpreter)和知识检索(Knowledge Retrieval)等都属于其工具。

2.4.langchain预置的tools

    https://github.com/langchain-ai/langchain/tree/v0.1.16/docs/docs/integrations/tools

   基本这些工具能满足大部分需求,具体使用参见:


三、前置条件

3.1. 创建虚拟环境&安装依赖

  增加Google Search的依赖包

conda create -n fastapi_test python=3.10 conda activate fastapi_test pip install fastapi websockets uvicorn pip install --quiet  langchain-core langchain-community langchain-openai pip install google-search-results

3.2. 注册Google Search API账号

1. 输入注册信息

可以使用Google账号登录,但仍要执行下面的认证操作

2. 需要认证邮箱

3. 需要认证手机

4. 认证成功

3.3. 生成Google Search API的KEY


四、技术实现

4.1. Google Search小试

# -*- coding: utf-8 -*- import os  from langchain_community.utilities.serpapi import SerpAPIWrapper  os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" serp = SerpAPIWrapper() result = serp.run("广州的实时气温如何?") print("实时搜索结果:", result)

调用结果:

4.2. 非流式输出

    本章代码将开源模型应用落地-FastAPI-助力模型交互-WebSocket篇(三)基础上进行拓展

服务端:

import uvicorn import os  from typing import Annotated from fastapi import (     Depends,     FastAPI,     WebSocket,     WebSocketException,     WebSocketDisconnect,     status, ) from langchain.agents import create_structured_chat_agent, AgentExecutor from langchain_community.utilities import SerpAPIWrapper  from langchain_core.prompts import ChatPromptTemplate, SystemMessagePromptTemplate, HumanMessagePromptTemplate from langchain_core.tools import tool from langchain_openai import ChatOpenAI  os.environ["OPENAI_API_KEY"] = 'sk-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'    #你的Open AI Key os.environ["SERPAPI_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"   class ConnectionManager:     def __init__(self):         self.active_connections: list[WebSocket] = []      async def connect(self, websocket: WebSocket):         await websocket.accept()         self.active_connections.append(websocket)      def disconnect(self, websocket: WebSocket):         self.active_connections.remove(websocket)      async def send_personal_message(self, message: str, websocket: WebSocket):         await websocket.send_text(message)      async def broadcast(self, message: str):         for connection in self.active_connections:             await connection.send_text(message)  manager = ConnectionManager()  app = FastAPI()  async def authenticate(     websocket: WebSocket,     userid: str,     secret: str, ):     if userid is None or secret is None:         raise WebSocketException(code=status.WS_1008_POLICY_VIOLATION)      print(f'userid: {userid},secret: {secret}')     if '12345' == userid and 'xxxxxxxxxxxxxxxxxxxxxxxxxx' == secret:         return 'pass'     else:         return 'fail'  @tool def search(query:str):     """只有需要了解实时信息或不知道的事情的时候才会使用这个工具,需要传入要搜索的内容。"""     serp = SerpAPIWrapper()     result = serp.run(query)     print("实时搜索结果:", result)     return result   def get_prompt():     template='''     Respond to the human as helpfully and accurately as possible. You have access to the following tools:          {tools}          Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).          Valid "action" values: "Final Answer" or {tool_names}          Provide only ONE action per $JSON_BLOB, as shown:          ```          {{            "action": $TOOL_NAME,            "action_input": $INPUT          }}          ```          Follow this format:          Question: input question to answer          Thought: consider previous and subsequent steps          Action:          ```          $JSON_BLOB          ```          Observation: action result          ... (repeat Thought/Action/Observation N times)          Thought: I know what to respond          Action:          ```          {{            "action": "Final Answer",            "action_input": "Final response to human"          }}          Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Format is Action:```$JSON_BLOB```then Observation     '''     system_message_prompt = SystemMessagePromptTemplate.from_template(template)     human_template='''     {input}          {agent_scratchpad}           (reminder to respond in a JSON blob no matter what)     '''     human_message_prompt = HumanMessagePromptTemplate.from_template(human_template)     prompt = ChatPromptTemplate.from_messages([system_message_prompt, human_message_prompt])      return prompt  async def chat(query):     global llm,tools     agent = create_structured_chat_agent(         llm, tools, get_prompt()     )      agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True, handle_parsing_errors=True)      result = agent_executor.invoke({"input": query})     print(result['output'])     yield result['output']  @app.websocket("/ws") async def websocket_endpoint(*,websocket: WebSocket,userid: str,permission: Annotated[str, Depends(authenticate)],):     await manager.connect(websocket)     try:         while True:             text = await websocket.receive_text()              if 'fail' == permission:                 await manager.send_personal_message(                     f"authentication failed", websocket                 )             else:                 if text is not None and len(text) > 0:                     async for msg in chat(text):                         await manager.send_personal_message(msg, websocket)      except WebSocketDisconnect:         manager.disconnect(websocket)         print(f"Client #{userid} left the chat")         await manager.broadcast(f"Client #{userid} left the chat")  if __name__ == '__main__':     tools = [search]     llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, max_tokens=512)     uvicorn.run(app, host='0.0.0.0',port=7777) 

客户端:

<!DOCTYPE html> <html>     <head>         <title>Chat</title>     </head>     <body>         <h1>WebSocket Chat</h1>         <form action="" onsubmit="sendMessage(event)">             <label>USERID: <input type="text" id="userid" autocomplete="off" value="12345"/></label>             <label>SECRET: <input type="text" id="secret" autocomplete="off" value="xxxxxxxxxxxxxxxxxxxxxxxxxx"/></label>             <br/>             <button onclick="connect(event)">Connect</button>             <hr>             <label>Message: <input type="text" id="messageText" autocomplete="off"/></label>             <button>Send</button>         </form>         <ul id='messages'>         </ul>         <script>             var ws = null;             function connect(event) {                 var userid = document.getElementById("userid")                 var secret = document.getElementById("secret")                 ws = new WebSocket("ws://localhost:7777/ws?userid="+userid.value+"&secret=" + secret.value);                 ws.onmessage = function(event) {                     var messages = document.getElementById('messages')                     var message = document.createElement('li')                     var content = document.createTextNode(event.data)                     message.appendChild(content)                     messages.appendChild(message)                 };                 event.preventDefault()             }             function sendMessage(event) {                 var input = document.getElementById("messageText")                 ws.send(input.value)                 input.value = ''                 event.preventDefault()             }         </script>     </body> </html>

调用结果:

用户输入:你好

不需要触发工具调用

模型输出:你好!有什么我可以帮忙的吗?

用户输入:广州现在天气如何?

需要调用工具

模型输出:The current weather in Guangzhou is partly cloudy with a temperature of 95°F, 66% chance of precipitation, 58% humidity, and wind speed of 16 mph. This information was last updated on Monday at 1:00 PM.

PS:

1. 在AI交互中,LangChain框架并不是必须引入,此处引用仅用于简化Openai的交互流程。

2. 页面输出的样式可以根据实际需要进行调整,此处仅用于演示效果。

3. 目前还遗留两个问题,一是如何实现流式输出,二是如何更好维护prompt模版,篇幅有限,下回分解


五、附带说明

5.1. 如何避免模型用英文回复

在提示词模版加入:Remember to answer in Chinese.  暗示模型一定要以中文进行回复。

修改后的提示语为:

Respond to the human as helpfully and accurately as possible. You have access to the following tools:          {tools}          Use a json blob to specify a tool by providing an action key (tool name) and an action_input key (tool input).          Valid "action" values: "Final Answer" or {tool_names}          Provide only ONE action per $JSON_BLOB, as shown:          ```          {{            "action": $TOOL_NAME,            "action_input": $INPUT          }}          ```          Follow this format:          Question: input question to answer          Thought: consider previous and subsequent steps          Action:          ```          $JSON_BLOB          ```          Observation: action result          ... (repeat Thought/Action/Observation N times)          Thought: I know what to respond          Action:          ```          {{            "action": "Final Answer",            "action_input": "Final response to human"          }}          Begin! Reminder to ALWAYS respond with a valid json blob of a single action. Use tools if necessary. Respond directly if appropriate. Remember to answer in Chinese.Format is Action:```$JSON_BLOB```then Observation

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!