java 单机接口限流处理方案
392
2022-06-06
环境安装:
sudo pip install flask
Flask 是一个Python的微服务的框架,基于Werkzeug, 一个 WSGI 类库。
Flask 优点:
Written in Python (that can be an advantage);
Simple to use;
Flexible;
Multiple good deployment options;
RESTful request dispatching
RESOURCES
一个响应 /articles 和 /articles/:id的 API 服务:
from flask import Flask, url_for app = Flask(__name__) @app.route('/') def api_root(): return 'Welcome' @app.route('/articles') def api_articles(): return 'List of ' + url_for('api_articles') @app.route('/articles/<articleid>') def api_article(articleid): return 'You are reading ' + articleid if __name__ == '__main__': app.run()
请求:
curl http://127.0.0.1:5000/
响应:
GET /
Welcome
GET /articles
List of /articles
GET /articles/123
You are reading 123
REQUESTS
GET Parameters
from flask import request @app.route('/hello') def api_hello(): if 'name' in request.args: return 'Hello ' + request.args['name'] else: return 'Hello John Doe'
请求:
GET /hello
Hello John Doe
GET /hello?name=Luis
Hello Luis
Request Methods (HTTP Verbs)
@app.route('/echo', methods = ['GET', 'POST', 'PATCH', 'PUT', 'DELETE']) def api_echo(): if request.method == 'GET': return "ECHO: GET\n" elif request.method == 'POST': return "ECHO: POST\n" elif request.method == 'PATCH': return "ECHO: PACTH\n" elif request.method == 'PUT': return "ECHO: PUT\n" elif request.method == 'DELETE': return "ECHO: DELETE"
请求指定request type:
curl -X PATCH http://127.0.0.1:5000/echo
GET /echo
ECHO: GET
POST /ECHO
ECHO: POST
Request Data & Headers
from flask import json @app.route('/messages', methods = ['POST']) def api_message(): if request.headers['Content-Type'] == 'text/plain': return "Text Message: " + request.data elif request.headers['Content-Type'] == 'application/json': return "JSON Message: " + json.dumps(request.json) elif request.headers['Content-Type'] == 'application/octet-stream': f = open('./binary', 'wb') f.write(request.data) f.close() return "Binary message written!" else: return "415 Unsupported Media Type ;)"
请求指定content type:
curl -H "Content-type: application/json" \
-X POST http://127.0.0.1:5000/messages -d '{"message":"Hello Data"}'
curl -H "Content-type: application/octet-stream" \
-X POST http://127.0.0.1:5000/messages --data-binary @message.bin
RESPONSES
from flask import Response @app.route('/hello', methods = ['GET']) def api_hello(): data = { 'hello' : 'world', 'number' : 3 } js = json.dumps(data) resp = Response(js, status=200, mimetype='application/json') resp.headers['Link'] = 'http://luisrei.com' return resp
查看response HTTP headers:
curl -i http://127.0.0.1:5000/hello
优化代码:
from flask import jsonify
使用
resp = jsonify(data) resp.status_code = 200
替换
resp = Response(js, status=200, mimetype='application/json')
Status Codes & Errors
@app.errorhandler(404) def not_found(error=None): message = { 'status': 404, 'message': 'Not Found: ' + request.url, } resp = jsonify(message) resp.status_code = 404 return resp @app.route('/users/<userid>', methods = ['GET']) def api_users(userid): users = {'1':'john', '2':'steve', '3':'bill'} if userid in users: return jsonify({userid:users[userid]}) else: return not_found()
请求:
GET /users/2
HTTP/1.0 200 OK
{
"2": "steve"
}
GET /users/4
HTTP/1.0 404 NOT FOUND
{
"status": 404,
"message": "Not Found: http://127.0.0.1:5000/users/4"
}
AUTHORIZATION
from functools import wraps def check_auth(username, password): return username == 'admin' and password == 'secret' def authenticate(): message = {'message': "Authenticate."} resp = jsonify(message) resp.status_code = 401 resp.headers['WWW-Authenticate'] = 'Basic realm="Example"' return resp def requires_auth(f): @wraps(f) def decorated(*args, **kwargs): auth = request.authorization if not auth: return authenticate() elif not check_auth(auth.username, auth.password): return authenticate() return f(*args, **kwargs) return decorated
replacing the check_auth function and using the requires_auth decorator:
@app.route('/secrets')
@requires_auth
def api_hello():
return "Shhh this is top secret spy stuff!"
HTTP basic authentication:
curl -v -u "admin:secret" http://127.0.0.1:5000/secrets
SIMPLE DEBUG & LOGGING
Debug:
app.run(debug=True)
Logging:
import logging file_handler = logging.FileHandler('app.log') app.logger.addHandler(file_handler) app.logger.setLevel(logging.INFO) @app.route('/hello', methods = ['GET']) def api_hello(): app.logger.info('informing') app.logger.warning('warning') app.logger.error('screaming bloody murder!') return "check your logs\n"
以上这篇使用Python & Flask 实现RESTful Web API的实例就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~