阅读量:2
Python Flask 和 gRPC 示例项目
本文将介绍如何在 Python 中使用 Flask 和 gRPC 创建一个简单的示例应用程序,并使用 requests
库进行测试。
环境设置
首先,确保您已经安装了 Python。然后,创建一个虚拟环境以管理您的依赖项。
python -m venv myenv source myenv/bin/activate # Windows 使用 `myenv\Scripts\activate`
安装必要的包:
pip install Flask grpcio grpcio-tools requests
定义 gRPC 服务
创建一个 .proto
文件来定义 gRPC 服务。保存文件名为 service.proto
:
syntax = "proto3"; package demo; service DemoService { rpc SayHello (HelloRequest) returns (HelloResponse) {} } message HelloRequest { string name = 1; } message HelloResponse { string message = 1; }
从 Proto 文件生成 Python 代码
使用 grpc_tools
生成 Python 代码:
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. service.proto
这将生成两个文件:service_pb2.py
和 service_pb2_grpc.py
。
实现 gRPC 服务器
创建一个名为 grpc_server.py
的文件:
from concurrent import futures import grpc import service_pb2 import service_pb2_grpc class DemoService(service_pb2_grpc.DemoServiceServicer): def SayHello(self, request, context): return service_pb2.HelloResponse(message=f'Hello, {request.name}!') def serve(): server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) service_pb2_grpc.add_DemoServiceServicer_to_server(DemoService(), server) server.add_insecure_port('[::]:50051') server.start() print("gRPC server started on port 50051") server.wait_for_termination() if __name__ == '__main__': serve()
实现 Flask 服务器
创建一个名为 flask_server.py
的文件:
from flask import Flask, request, jsonify import grpc import service_pb2 import service_pb2_grpc app = Flask(__name__) def get_grpc_client(): channel = grpc.insecure_channel('localhost:50051') stub = service_pb2_grpc.DemoServiceStub(channel) return stub @app.route('/hello', methods=['POST']) def say_hello(): data = request.json name = data.get('name') stub = get_grpc_client() response = stub.SayHello(service_pb2.HelloRequest(name=name)) return jsonify({'message': response.message}) if __name__ == '__main__': app.run(port=5000, debug=True)
运行服务器
打开两个终端窗口或标签。
在第一个终端中启动 gRPC 服务器:
python grpc_server.py
在第二个终端中启动 Flask 服务器:
python flask_server.py
使用 requests
进行测试
创建一个名为 test_flask_server.py
的文件,并添加以下代码:
import requests def test_say_hello(): url = 'http://localhost:5000/hello' data = {'name': 'World'} response = requests.post(url, json=data) if response.status_code == 200: print('Success!') print('Response JSON:', response.json()) else: print('Failed!') print('Status Code:', response.status_code) print('Response Text:', response.text) if __name__ == '__main__': test_say_hello()
运行测试
确保
grpc_server.py
和flask_server.py
正在运行。在另一个终端中运行
test_flask_server.py
:python test_flask_server.py
您应该看到如下输出:
Success! Response JSON: {'message': 'Hello, World!'}
运行截图
这个测试脚本发送一个包含 name
为 "World"
的 JSON 对象的 POST 请求到 http://localhost:5000/hello
,然后打印出服务器返回的响应。如果服务器正常工作,您会看到成功消息和返回的 JSON 数据。