[TOC]
# 使用flask从零构建自动化运维平台
## 安装
事先准备好python环境
```
pip install flask
```
## 开发ide
推荐使用pycharm
![](https://box.kancloud.cn/51f6398a74ef18cd6397610d78011b2f_1680x1050.png)
## 开发思路
摆在面前的两条路,Django那种MVC(Model View Controller),另一种就是比较流行的动静分离,也就是前端和后端分开开发。
我个人倾向于前后分离,前后分离又有一个选择是采用RPC(Remote Procedure Call)or RestFul((Representation State Transfer) 两种方式各有优点。我个人倾向于接口集中化,就是一个地址来处理所有和前后端交互。
总合几方面的考虑最后还是使用了flask框架
## 使用到的flask拓展
| 拓展名 | 地址 | 描述 |
| ------------ | ------------ | ------------ |
| flask-jsonrpc | [git](http://github.com/cenobites/flask-jsonrpc)| jsonrpc-flask拓展 |
| flask_sqlalchemy| | flask ORM|
|flask-marshmallow | |将sqlalchemy查询的结果转成json|
|flask_script | |flask的拓展命令行插件|
|flask_migrate | |数据库同步插件|
## 设计一个最常用的helloworld接口
```python
from flask import Flask
from flask_jsonrpc import JSONRPC
app = Flask(__name__)
# 启用一个web界面的api调试
jsonrpc = JSONRPC(app, '/api', enable_web_browsable_api=True)
# 增加一个方法,前端可以调用
@jsonrpc.method('App.index')
def index():
return u'Hello World!'
if __name__ == '__main__':
app.run(port=2001, debug=True)
```
前端调用
```
POST /api
{
"date": "Wed, 21 Mar 2018 02:23:22 GMT",
"server": "Werkzeug/0.14.1 Python/3.6.2",
"content-length": "101",
"content-type": "application/json",
"data": {
"jsonrpc": "2.0",
"method": "App.index",
"params": [],
"id": "3f46e50f-b46b-49e3-b16b-56af7ab21867"
}
}
```
后端返回
```
HTTP 200
{
"id": "3f46e50f-b46b-49e3-b16b-56af7ab21867",
"jsonrpc": "2.0",
"result": "Hello World!"
}
```
## 添加验证
就是身份认证通过后才可以请求到资源,这样权限也可以得到控制,这里使用的是目前比较热门的token认证方式。
token本身是经过加密后的字符串,携带了一部分加密的内容和有效期,在这个有效期,使用这串字符串就能访问到资源。过了之后就不能访问了。
需要导入的包和一些配置
```python
from flask import Flask
from flask_jsonrpc import JSONRPC
from flask_sqlalchemy import SQLAlchemy
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
import werkzeug
app = Flask(__name__)
jsonrpc = JSONRPC(app, '/api', enable_web_browsable_api=True)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/test.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['SECRET_KEY'] = '1q2w3e'
db = SQLAlchemy(app)
```
### 1. 使用ORM创建一个用户表
[flask-sqlalchemy教程](http://docs.jinkan.org/docs/flask-sqlalchemy/quickstart.html "flask-sqlalchemy教程")
定义数据模型
```python
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)
password_hash = db.Column(db.String(164))
def password(self, password):
"""
设置密码hash值
"""
self.password_hash = werkzeug.security.generate_password_hash(password)
def verify_password(self, password):
"""
将用户输入的密码明文与数据库比对
"""
return werkzeug.security.check_password_hash(password)
def __init__(self, username):
self.username = username
def __repr__(self):
return '<User %r>' % self.username
```
使用模型创建表结构
```
from SmartOps import db
db.create_all()
```
### 2.设计用户登录接口
前端:
```
POST /api
{
"date": "Wed, 21 Mar 2018 06:24:12 GMT",
"server": "Werkzeug/0.14.1 Python/3.6.2",
"content-length": "158",
"content-type": "application/json",
"data": {
"jsonrpc": "2.0",
"method": "user.register",
"params": {
"username": "test",
"password": "123456"
},
"id": "cde5749c-de2f-46ea-b9c5-824cf1f3fc92"
}
}
```
后台:
```
HTTP 200
{
"id": "cde5749c-de2f-46ea-b9c5-824cf1f3fc92",
"jsonrpc": "2.0",
"result": {
"message": "用户已存在",
"status": 1
}
}
```
> 0:代表注册成功 1:代表注册失败
后台代码:
```python
@jsonrpc.method('user.register(username=str,password=str)')
def user_register(username, password):
if not User.query.filter_by(username=username).first():
user = User(username=username)
user.password(password)
db.session.add(user)
db.session.commit()
return {'status': 0, 'message': u'注册成功'}
else:
return {'status': 1, 'message': u'用户已存在'}
```
### 3. 用户注册完成需要登录
前端:
```
POST /api
{
"date": "Wed, 21 Mar 2018 06:27:48 GMT",
"server": "Werkzeug/0.14.1 Python/3.6.2",
"content-length": "1148",
"content-type": "application/json",
"data": {
"jsonrpc": "2.0",
"method": "user.verify",
"params": {
"username": "test",
"password": "123456"
},
"id": "36d8d630-4092-4add-abfc-55a257dae7d9"
}
}
```
后台:
```
HTTP 200
{
"id": "945307c4-8009-49ed-ab43-96d29ae37294",
"jsonrpc": "2.0",
"result": {
"message": "欢迎test2",
"status": 0,
"token": "eyJhbGciOiJIUzI1NiIsImlhdCI6MTUyMTYxNTYyNywiZXhwIjoxNTIxNjE2MjI3fQ.eyJpZCI6M30.fw8tSBcDfITEgG5mHeMpyFio821jlmVQgmlXlZxDadI"
}
}
```
后台代码:
```python
@jsonrpc.method('user.verify(username=str,password=str)')
def user_verify(username, password):
user = User.query.filter_by(username=username).first()
if not user:
return {'status': 1, 'message': u'用户名不存在'}
if user.verify_password(password):
token = user.generate_auth_token()
return {'status': 0, 'message': u'欢迎%s' % username, 'token': token}
return {'status': 1, 'message': u'密码错误'}
```
### 4. token的生成与验证
生成函数
```python
class User(db.Model):
...
def generate_auth_token(self, expiration=600):
s = Serializer(app.config['SECRET_KEY'], expires_in=expiration)
return bytes.decode(s.dumps({'id': self.id}))
@staticmethod
def verify_auth_token(token):
s = Serializer(app.config['SECRET_KEY'])
try:
data = s.loads(token)
except SignatureExpired:
return None # valid token, but expired
except BadSignature:
return None # invalid token
user = User.query.get(data['id'])
return user
...
```
这里需要修改一下JSONRPC的源码,默认的是通过认证用户名和密码,用户名和密码一直经过互联网传输很不安全,给修改成通过token验证
修改method方法
```python
def method(self, name, authenticated=False, safe=False, validate=False, **options):
def decorator(f):
arg_names = getargspec(f)[0]
X = {'name': name, 'arg_names': arg_names}
if authenticated:
# TODO: this is an assumption
# X['arg_names'] = ['username', 'password'] + X['arg_names']
X['arg_names'] = ['token'] + X['arg_names']
X['name'] = _inject_args(X['name'], ('String', 'String'))
_f = self.auth_backend(f, authenticated)
else:
_f = f
method, arg_types, return_type = _parse_sig(X['name'], X['arg_names'], validate)
_f.json_args = X['arg_names']
_f.json_arg_types = arg_types
_f.json_return_type = return_type
_f.json_method = method
_f.json_safe = safe
_f.json_sig = X['name']
_f.json_validate = validate
self.site.register(method, _f)
return _f
return decorator
```
然后再新写一下认证接口
```python
def authenticate(f, f_check_auth):
@wraps(f)
def _f(*args, **kwargs):
is_auth = False
try:
creds = args[:2]
is_auth = f_check_auth(creds[0], creds[1])
if is_auth:
args = args[2:]
except IndexError:
print(kwargs)
if 'token' in kwargs:
is_auth = f_check_auth(kwargs['token'])
if is_auth:
kwargs.pop('token')
else:
raise InvalidParamsError('Authenticated methods require at least '
'[token] or {token: } arguments')
if not is_auth:
raise InvalidCredentialsError()
return f(*args, **kwargs)
return _f
```
修改一下初始化JSONRPC让他能识别到咱们自己写的认证接口
```python
jsonrpc = JSONRPC(app, '/api', enable_web_browsable_api=True, auth_backend=authenticate)
```
### 5.认证这里算是完成了。
前端使用token认证就能请求到资源了
前端:
```
POST /api
{
"date": "Wed, 21 Mar 2018 07:29:49 GMT",
"server": "Werkzeug/0.14.1 Python/3.6.2",
"content-length": "100",
"content-type": "application/json",
"data": {
"jsonrpc": "2.0",
"method": "App.hello",
"params": {
"token": "eyJhbGciOiJIUzI1NiIsImlhdCI6MTUyMTYxNzMzMywiZXhwIjoxNTIxNjE3OTMzfQ.eyJpZCI6M30.pswj1Sbny8EM2u8T01s7gizS02LQ-RS2PSvme2jQQLs",
"name": "jack"
},
"id": "61a746df-21c5-485c-9aa5-b7c9b3a938e6"
}
}
```
后端:
```
HTTP 200
{
"id": "61a746df-21c5-485c-9aa5-b7c9b3a938e6",
"jsonrpc": "2.0",
"result": "Hello jack!"
}
```
后台测试代码:
```python
@jsonrpc.method('App.hello(name=str)', authenticated=check_auth)
def index(name):
return u'Hello %s!' % name
```
## 附上完整代码
```python
from flask import Flask
from flask_jsonrpc import JSONRPC
from flask_sqlalchemy import SQLAlchemy
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer
import werkzeug
app = Flask(__name__)
def authenticate(f, f_check_auth):
@wraps(f)
def _f(*args, **kwargs):
is_auth = False
try:
creds = args[:2]
is_auth = f_check_auth(creds[0], creds[1])
if is_auth:
args = args[2:]
except IndexError:
if 'token' in kwargs:
is_auth = f_check_auth(kwargs['token'])
if is_auth:
kwargs.pop('token')
else:
raise InvalidParamsError('Authenticated methods require at least '
'[token] or {token: } arguments')
if not is_auth:
raise InvalidCredentialsError()
return f(*args, **kwargs)
return _f
jsonrpc = JSONRPC(app, '/api', enable_web_browsable_api=True, auth_backend=authenticate)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/test.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
app.config['SECRET_KEY'] = '1q2w3e'
app.debug = True
db = SQLAlchemy(app)
from functools import wraps
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)
password_hash = db.Column(db.String(164))
def password(self, password):
"""
设置密码hash值
"""
self.password_hash = werkzeug.security.generate_password_hash(password)
def verify_password(self, password):
"""
将用户输入的密码明文与数据库比对
"""
print(self.username)
if self.password_hash:
return werkzeug.security.check_password_hash(self.password_hash, password)
return None
def generate_auth_token(self, expiration=600):
s = Serializer(app.config['SECRET_KEY'], expires_in=expiration)
return bytes.decode(s.dumps({'id': self.id}))
@staticmethod
def verify_auth_token(token):
s = Serializer(app.config['SECRET_KEY'])
try:
data = s.loads(token)
except SignatureExpired:
return None # valid token, but expired
except BadSignature:
return None # invalid token
user = User.query.get(data['id'])
return user
def __init__(self, username):
self.username = username
def __repr__(self):
return '<User %r>' % self.username
@jsonrpc.method('user.register(username=str,password=str)')
def user_register(username, password):
if not User.query.filter_by(username=username).first():
user = User(username=username)
user.password(password)
db.session.add(user)
db.session.commit()
return {'status': 0, 'message': u'注册成功'}
else:
return {'status': 1, 'message': u'用户已存在'}
@jsonrpc.method('user.verify(username=str,password=str)')
def user_verify(username, password):
user = User.query.filter_by(username=username).first()
if not user:
return {'status': 1, 'message': u'用户名不存在'}
if user.verify_password(password):
token = user.generate_auth_token()
return {'status': 0, 'message': u'欢迎%s' % username, 'token': token}
return {'status': 1, 'message': u'密码错误'}
def check_auth(token):
#启用debug模式不需要进行token认证
if app.debug:
return True
user = User.verify_auth_token(token)
if user:
return True
return False
@jsonrpc.method('App.hello(name=str)', authenticated=check_auth)
def index(name):
return u'Hello %s!' % name
if __name__ == '__main__':
app.run(port=2001)
```