引言
使用Flask框架可以轻松实现一个类似chatgpt流式响应的接口,该接口通过POST请求接收prompt和history参数,分别对应为聊天系统的提示词和对话历史,server则给予server-sent event (SSE)返回聊天系统的响应。
服务端
开发前的思考
我们梳理一下,为了实现流式API接口,Server端需要完成的主要工作
1、创建一个flask app, 检查传入的POST请求是否包含JSON,提取必要的数据,并进行验证。
2、响应为SSE: 设置适当的响应头以适应服务器发送事件(text/event-stream),并保持连接活动状态。
3、生成函数: 提供了一个占位符generate()函数。这个函数应包含根据prompt和history生成响应的逻辑。在这个示例中,它简单地流回输入数据。实际开发应当采用真正的LLM大模型,此处从简。
4、流式传输数据: generate()函数设计为持续流式传输数据。在实际应用中,你需要用实际的事件生成逻辑替换循环和time.sleep()。
其中,1比较简单,因为很容易想象处理json数据是Flask的主要工作。3也比较简单,如果使用过大语言模型的产品,你多半会见过打字机式的的UI效果。实际上的大语言模型输出不见得是严格一个个字输出,大概是几个字为单位输出,这是由于其模型输出采用分词器的缘故,即所谓Tokenizer。这不是本文主题,所以我们仅采用一个循环打印来模拟这种返回效果。2和4相对要陌生一些,不过我们可以理解它们就是实现流式接口的必备技术基础,即使没有深入理解也可以实现。正所谓自顶向下的理解一个问题,我们先从最表层的行动:实现(Implementation),开始。
代码实现
严谨起见,首先安装 Flask:
pip install Flask
server部分代码如下
from flask import Flask, request, jsonify, Response app = Flask(__name__) @app.route('/api/stream-chat', methods=['POST']) def stream_chat(): # Check if the request contains JSON data if request.is_json: # Get JSON data data = request.get_json() # Extract 'prompt' field; return an error if it's missing prompt = data.get('prompt') if prompt is None: return jsonify({"error": "Missing 'prompt' field"}), 400 # Extract 'history' field; return an error if it's missing history = data.get('history') if history is None: return jsonify({"error": "Missing 'history' field"}), 400 # Ensure 'history' is a list if not isinstance(history, list): return jsonify({"error": "'history' field must be a list"}), 400 # Call a generate function to process the prompt and history into a response response = Response(generate(message=prompt, chat_history=history), mimetype='application/json') response.headers['Content-Type'] = 'text/event-stream' response.headers['Cache-Control'] = 'no-cache' response.destroy_headers['Connection'] = 'keep-alive' return response else: # If the request does not contain JSON data, return an error return jsonify({"error": "Request body must be JSON"}), 400 def generate(message, chat_history): # This function should generate the events. Here is a simple example that just echoes the input. # Normally, you might have more complex logic here. import json import time while True: data = { "prompt": message, "history": chat_history, "response": "Generated response based on input" } yield f"data:{json.dumps(data)}\n\n" time.sleep(1) # Delay for demonstration; remove or adjust in production if __name__ == '__main__': app.run(debug=True, threaded=True)
接下来我们可以实现一个客户端以测试该接口。测试前请运行刚刚实现的server脚本并保持服务器开启状态。Flask默认使用的端口是5000。
为了与上述 Flask 服务器进行交互,并处理服务器端事件流(SSE),我们使用 Python 的 requests
库来创建一个客户端。这个客户端将发送 POST 请求到 Flask 应用,并监听返回的事件流。
客户端
客户端工作:
- 发送 POST 请求:使用
requests.post
发送一个含有 JSON 数据的 POST 请求到服务器。stream=True
参数允许我们处理持续的响应数据流。 - 处理响应:循环读取服务器的响应。当检测到以
data:
开头的行时,解析该行并打印出数据。这里使用了 JSON 解析来处理数据。
测试客户端和服务器:
- 确保你的 Flask 服务器正在运行。
- 运行上述客户端代码。
首先安装 requests
库
pip install requests
下面是客户端代码的示例:
import requests import json def send_post_request(url, payload): # 将字典转换为JSON格式的字符串 headers = {'Content-Type': 'application/json'} response = requests.post(url, data=json.dumps(payload), headers=headers, stream=True) # 检查请求是否成功 if response.status_code == 200: try: print("Connected to server, listening for events...") for line in response.iter_lines(): if line: decoded_line = line.decode('utf-8') if decoded_line.startswith('data:'): # 提取数据部分 json_data = json.loads(decoded_line.split(':', 1)[1].strip()) print("Received data:", json_data) except Exception as e: print("Error:", e) else: print("Failed to connect:", response.status_code) # URL指向你的Flask应用 url = 'http://127.0.0.1:5000/api/stream-chat' # 构造请求的数据 payload = { 'prompt': 'Example prompt', 'history': ['First action', 'Second action'] } # 发送POST请求并处理SSE send_post_request(url, payload)
这样,你的客户端会发送一个 POST 请求到 Flask 服务器,并实时接收并打印从服务器发送的数据。