阅读量:0
搭建python服务
from flask import Flask, request, jsonify from pymongo import MongoClient app = Flask(__name__) # 配置MongoDB连接 client = MongoClient('mongodb://localhost:27017/') def get_collection(db_name, collection_name): db = client[db_name] collection = db[collection_name] return collection # 插入一条记录 @app.route('/insert_one', methods=['POST']) def insert_one(): data = request.json db_name = data.pop('db') collection_name = data.pop('collection') collection = get_collection(db_name, collection_name) result = collection.insert_one(data) return jsonify({'inserted_id': str(result.inserted_id)}) # 插入多条记录 @app.route('/insert_many', methods=['POST']) def insert_many(): data = request.json db_name = data.pop('db') collection_name = data.pop('collection') documents = data.pop('documents') collection = get_collection(db_name, collection_name) result = collection.insert_many(documents) return jsonify({'inserted_ids': [str(id) for id in result.inserted_ids]}) # 查找一条记录 @app.route('/find_one', methods=['GET']) def find_one(): db_name = request.args.get('db') collection_name = request.args.get('collection') query = request.args.to_dict() query.pop('db') query.pop('collection') collection = get_collection(db_name, collection_name) result = collection.find_one(query) if result: result['_id'] = str(result['_id']) return jsonify(result) # 查找多条记录 @app.route('/find', methods=['GET']) def find(): db_name = request.args.get('db') collection_name = request.args.get('collection') query = request.args.to_dict() query.pop('db') query.pop('collection') collection = get_collection(db_name, collection_name) results = collection.find(query) result_list = [] for result in results: result['_id'] = str(result['_id']) result_list.append(result) return jsonify(result_list) # 统计记录数 @app.route('/count', methods=['GET']) def count(): db_name = request.args.get('db') collection_name = request.args.get('collection') query = request.args.to_dict() query.pop('db') query.pop('collection') collection = get_collection(db_name, collection_name) count = collection.count_documents(query) return jsonify({'count': count}) if __name__ == '__main__': app.run(debug=True)
启用python正式服务
首先安装Gunicorn:
pip install gunicorn
运行你的Flask应用:
gunicorn -w 4 -b 0.0.0.0:8000 app:app
这里,
-w 4
表示使用4个工作进程,-b 0.0.0.0:8000
表示绑定到所有网络接口的8000端口,app:app
表示Flask应用的入口点(文件名是app.py
,里面的Flask实例名称是app
)。
测试代码
curl -X POST -H "Content-Type: application/json" -d '{"db": "your_database", "collection": "your_collection", "name": "John Doe", "age": 30}' http://localhost:8000/insert_one