[TOC] # 使用flask从零构建自动化运维平台 ## 安装 事先准备好python环境 ``` pip install flask ``` ## 开发ide 推荐使用pycharm ![](https://box.kancloud.cn/51f6398a74ef18cd6397610d78011b2f_1680x1050.png) ## 开发思路 摆在面前的两条路,Django那种MVC(Model View Controller),另一种就是比较流行的动静分离,也就是前端和后端分开开发。 我个人倾向于前后分离,前后分离又有一个选择是采用RPC(Remote Procedure Call)or RestFul((Representation State Transfer) 两种方式各有优点。我个人倾向于接口集中化,就是一个地址来处理所有和前后端交互。 总合几方面的考虑最后还是使用了flask框架 ## 使用到的flask拓展 | 拓展名 | 地址 | 描述 | | ------------ | ------------ | ------------ | | flask-jsonrpc | [git](http://github.com/cenobites/flask-jsonrpc)| jsonrpc-flask拓展 | | flask_sqlalchemy| | flask ORM| |flask-marshmallow | |将sqlalchemy查询的结果转成json| |flask_script | |flask的拓展命令行插件| |flask_migrate | |数据库同步插件| ## 设计一个最常用的helloworld接口 ```python from flask import Flask from flask_jsonrpc import JSONRPC app = Flask(__name__) # 启用一个web界面的api调试 jsonrpc = JSONRPC(app, '/api', enable_web_browsable_api=True) # 增加一个方法,前端可以调用 @jsonrpc.method('App.index') def index(): return u'Hello World!' if __name__ == '__main__': app.run(port=2001, debug=True) ``` 前端调用 ``` POST /api { "date": "Wed, 21 Mar 2018 02:23:22 GMT", "server": "Werkzeug/0.14.1 Python/3.6.2", "content-length": "101", "content-type": "application/json", "data": { "jsonrpc": "2.0", "method": "App.index", "params": [], "id": "3f46e50f-b46b-49e3-b16b-56af7ab21867" } } ``` 后端返回 ``` HTTP 200 { "id": "3f46e50f-b46b-49e3-b16b-56af7ab21867", "jsonrpc": "2.0", "result": "Hello World!" } ``` ## 添加验证 就是身份认证通过后才可以请求到资源,这样权限也可以得到控制,这里使用的是目前比较热门的token认证方式。 token本身是经过加密后的字符串,携带了一部分加密的内容和有效期,在这个有效期,使用这串字符串就能访问到资源。过了之后就不能访问了。 需要导入的包和一些配置 ```python from flask import Flask from flask_jsonrpc import JSONRPC from flask_sqlalchemy import SQLAlchemy from itsdangerous import TimedJSONWebSignatureSerializer as Serializer import werkzeug app = Flask(__name__) jsonrpc = JSONRPC(app, '/api', enable_web_browsable_api=True) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/test.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True app.config['SECRET_KEY'] = '1q2w3e' db = SQLAlchemy(app) ``` ### 1. 使用ORM创建一个用户表 [flask-sqlalchemy教程](http://docs.jinkan.org/docs/flask-sqlalchemy/quickstart.html "flask-sqlalchemy教程") 定义数据模型 ```python class User(db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True) password_hash = db.Column(db.String(164)) def password(self, password): """ 设置密码hash值 """ self.password_hash = werkzeug.security.generate_password_hash(password) def verify_password(self, password): """ 将用户输入的密码明文与数据库比对 """ return werkzeug.security.check_password_hash(password) def __init__(self, username): self.username = username def __repr__(self): return '<User %r>' % self.username ``` 使用模型创建表结构 ``` from SmartOps import db db.create_all() ``` ### 2.设计用户登录接口 前端: ``` POST /api { "date": "Wed, 21 Mar 2018 06:24:12 GMT", "server": "Werkzeug/0.14.1 Python/3.6.2", "content-length": "158", "content-type": "application/json", "data": { "jsonrpc": "2.0", "method": "user.register", "params": { "username": "test", "password": "123456" }, "id": "cde5749c-de2f-46ea-b9c5-824cf1f3fc92" } } ``` 后台: ``` HTTP 200 { "id": "cde5749c-de2f-46ea-b9c5-824cf1f3fc92", "jsonrpc": "2.0", "result": { "message": "用户已存在", "status": 1 } } ``` > 0:代表注册成功 1:代表注册失败 后台代码: ```python @jsonrpc.method('user.register(username=str,password=str)') def user_register(username, password): if not User.query.filter_by(username=username).first(): user = User(username=username) user.password(password) db.session.add(user) db.session.commit() return {'status': 0, 'message': u'注册成功'} else: return {'status': 1, 'message': u'用户已存在'} ``` ### 3. 用户注册完成需要登录 前端: ``` POST /api { "date": "Wed, 21 Mar 2018 06:27:48 GMT", "server": "Werkzeug/0.14.1 Python/3.6.2", "content-length": "1148", "content-type": "application/json", "data": { "jsonrpc": "2.0", "method": "user.verify", "params": { "username": "test", "password": "123456" }, "id": "36d8d630-4092-4add-abfc-55a257dae7d9" } } ``` 后台: ``` HTTP 200 { "id": "945307c4-8009-49ed-ab43-96d29ae37294", "jsonrpc": "2.0", "result": { "message": "欢迎test2", "status": 0, "token": "eyJhbGciOiJIUzI1NiIsImlhdCI6MTUyMTYxNTYyNywiZXhwIjoxNTIxNjE2MjI3fQ.eyJpZCI6M30.fw8tSBcDfITEgG5mHeMpyFio821jlmVQgmlXlZxDadI" } } ``` 后台代码: ```python @jsonrpc.method('user.verify(username=str,password=str)') def user_verify(username, password): user = User.query.filter_by(username=username).first() if not user: return {'status': 1, 'message': u'用户名不存在'} if user.verify_password(password): token = user.generate_auth_token() return {'status': 0, 'message': u'欢迎%s' % username, 'token': token} return {'status': 1, 'message': u'密码错误'} ``` ### 4. token的生成与验证 生成函数 ```python class User(db.Model): ... def generate_auth_token(self, expiration=600): s = Serializer(app.config['SECRET_KEY'], expires_in=expiration) return bytes.decode(s.dumps({'id': self.id})) @staticmethod def verify_auth_token(token): s = Serializer(app.config['SECRET_KEY']) try: data = s.loads(token) except SignatureExpired: return None # valid token, but expired except BadSignature: return None # invalid token user = User.query.get(data['id']) return user ... ``` 这里需要修改一下JSONRPC的源码,默认的是通过认证用户名和密码,用户名和密码一直经过互联网传输很不安全,给修改成通过token验证 修改method方法 ```python def method(self, name, authenticated=False, safe=False, validate=False, **options): def decorator(f): arg_names = getargspec(f)[0] X = {'name': name, 'arg_names': arg_names} if authenticated: # TODO: this is an assumption # X['arg_names'] = ['username', 'password'] + X['arg_names'] X['arg_names'] = ['token'] + X['arg_names'] X['name'] = _inject_args(X['name'], ('String', 'String')) _f = self.auth_backend(f, authenticated) else: _f = f method, arg_types, return_type = _parse_sig(X['name'], X['arg_names'], validate) _f.json_args = X['arg_names'] _f.json_arg_types = arg_types _f.json_return_type = return_type _f.json_method = method _f.json_safe = safe _f.json_sig = X['name'] _f.json_validate = validate self.site.register(method, _f) return _f return decorator ``` 然后再新写一下认证接口 ```python def authenticate(f, f_check_auth): @wraps(f) def _f(*args, **kwargs): is_auth = False try: creds = args[:2] is_auth = f_check_auth(creds[0], creds[1]) if is_auth: args = args[2:] except IndexError: print(kwargs) if 'token' in kwargs: is_auth = f_check_auth(kwargs['token']) if is_auth: kwargs.pop('token') else: raise InvalidParamsError('Authenticated methods require at least ' '[token] or {token: } arguments') if not is_auth: raise InvalidCredentialsError() return f(*args, **kwargs) return _f ``` 修改一下初始化JSONRPC让他能识别到咱们自己写的认证接口 ```python jsonrpc = JSONRPC(app, '/api', enable_web_browsable_api=True, auth_backend=authenticate) ``` ### 5.认证这里算是完成了。 前端使用token认证就能请求到资源了 前端: ``` POST /api { "date": "Wed, 21 Mar 2018 07:29:49 GMT", "server": "Werkzeug/0.14.1 Python/3.6.2", "content-length": "100", "content-type": "application/json", "data": { "jsonrpc": "2.0", "method": "App.hello", "params": { "token": "eyJhbGciOiJIUzI1NiIsImlhdCI6MTUyMTYxNzMzMywiZXhwIjoxNTIxNjE3OTMzfQ.eyJpZCI6M30.pswj1Sbny8EM2u8T01s7gizS02LQ-RS2PSvme2jQQLs", "name": "jack" }, "id": "61a746df-21c5-485c-9aa5-b7c9b3a938e6" } } ``` 后端: ``` HTTP 200 { "id": "61a746df-21c5-485c-9aa5-b7c9b3a938e6", "jsonrpc": "2.0", "result": "Hello jack!" } ``` 后台测试代码: ```python @jsonrpc.method('App.hello(name=str)', authenticated=check_auth) def index(name): return u'Hello %s!' % name ``` ## 附上完整代码 ```python from flask import Flask from flask_jsonrpc import JSONRPC from flask_sqlalchemy import SQLAlchemy from itsdangerous import TimedJSONWebSignatureSerializer as Serializer import werkzeug app = Flask(__name__) def authenticate(f, f_check_auth): @wraps(f) def _f(*args, **kwargs): is_auth = False try: creds = args[:2] is_auth = f_check_auth(creds[0], creds[1]) if is_auth: args = args[2:] except IndexError: if 'token' in kwargs: is_auth = f_check_auth(kwargs['token']) if is_auth: kwargs.pop('token') else: raise InvalidParamsError('Authenticated methods require at least ' '[token] or {token: } arguments') if not is_auth: raise InvalidCredentialsError() return f(*args, **kwargs) return _f jsonrpc = JSONRPC(app, '/api', enable_web_browsable_api=True, auth_backend=authenticate) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/test.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True app.config['SECRET_KEY'] = '1q2w3e' app.debug = True db = SQLAlchemy(app) from functools import wraps class User(db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True) password_hash = db.Column(db.String(164)) def password(self, password): """ 设置密码hash值 """ self.password_hash = werkzeug.security.generate_password_hash(password) def verify_password(self, password): """ 将用户输入的密码明文与数据库比对 """ print(self.username) if self.password_hash: return werkzeug.security.check_password_hash(self.password_hash, password) return None def generate_auth_token(self, expiration=600): s = Serializer(app.config['SECRET_KEY'], expires_in=expiration) return bytes.decode(s.dumps({'id': self.id})) @staticmethod def verify_auth_token(token): s = Serializer(app.config['SECRET_KEY']) try: data = s.loads(token) except SignatureExpired: return None # valid token, but expired except BadSignature: return None # invalid token user = User.query.get(data['id']) return user def __init__(self, username): self.username = username def __repr__(self): return '<User %r>' % self.username @jsonrpc.method('user.register(username=str,password=str)') def user_register(username, password): if not User.query.filter_by(username=username).first(): user = User(username=username) user.password(password) db.session.add(user) db.session.commit() return {'status': 0, 'message': u'注册成功'} else: return {'status': 1, 'message': u'用户已存在'} @jsonrpc.method('user.verify(username=str,password=str)') def user_verify(username, password): user = User.query.filter_by(username=username).first() if not user: return {'status': 1, 'message': u'用户名不存在'} if user.verify_password(password): token = user.generate_auth_token() return {'status': 0, 'message': u'欢迎%s' % username, 'token': token} return {'status': 1, 'message': u'密码错误'} def check_auth(token): #启用debug模式不需要进行token认证 if app.debug: return True user = User.verify_auth_token(token) if user: return True return False @jsonrpc.method('App.hello(name=str)', authenticated=check_auth) def index(name): return u'Hello %s!' % name if __name__ == '__main__': app.run(port=2001) ```