[TOC] 写代码也是一种艺术,结构层次感一定要好,这样做出来的才是一个好作品。 ## 代码管理 git ### 目录结构 ``` SmartOps ├── app │   ├── factory.py │   ├── __init__.py │   ├── models.py │   ├── server.py │   ├── soapi │   ├── static │   └── templates ├── config.py ├── manage.py ├── migrations ├── requirements.txt ├── tests └── venv ``` ### 目录结构用途说明 | 目录 |用途 | | ------------ | ------------ | | app | 存放flask应用代码| | migrations| 数据库迁移脚本 | | tests| 单元测试 | | venv | 虚拟环境 | ### 目录文件说明 | 文件 |用途 | | ------------ | ------------ | | manage.py| 启动程序以及其他的程序任务。 | | config.py | 存储配置 | | requirements.txt| python依赖包| ## 配置文件 配置文件决定程序做出什么的行为,比较常用的就是开发环境,测试环境,正式环境三种,下面是个例子 config.py ```python import logging import os from datetime import timedelta CONFIG = { "development": "config.DevelopmentConfig", "testing": "config.TestingConfig", "production": "config.ProductionConfig", "default": "config.ProductionConfig" } USER_SECRET_KEY = 'asdfoasdjgio' VERIFY_DEBUG = True class BaseConfig(object): """Base class for default set of configs.""" DEBUG = False TESTING = False SECURITY_PASSWORD_HASH = 'pbkdf2_sha512' SECURITY_TRACKABLE = True LOGGING_FORMAT = "[%(asctime)s] [%(funcName)-30s] +\ [%(levelname)-6s] %(message)s" LOGGING_LOCATION = 'web.log' LOGGING_LEVEL = logging.DEBUG SECURITY_TOKEN_MAX_AGE = 60 * 30 SECURITY_CONFIRMABLE = False SQLALCHEMY_TRACK_MODIFICATIONS = False CACHE_TYPE = 'simple' SECURITY_PASSWORD_SALT = 'super-secret-stuff-here' COMPRESS_MIMETYPES = ['text/html', 'text/css', 'text/xml', 'application/json', 'application/javascript'] WTF_CSRF_ENABLED = False COMPRESS_LEVEL = 6 COMPRESS_MIN_SIZE = 500 # Change it based on your admin user, should ideally read from DB. ADMIN_USER = 'admin' ADMIN_PASSWORD = 'admin' JWT_EXPIRES = timedelta(minutes=10) class DevelopmentConfig(BaseConfig): """Default set of configurations for development mode.""" DEBUG = True TESTING = False BASEDIR = os.path.abspath(os.path.dirname(__file__)) SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(BASEDIR, 'app.db') SECRET_KEY = 'not-so-super-secret' JWT_SECRET_KEY = 'another_super_awesome_secret_stuff_yo.' SQLALCHEMY_TRACK_MODIFICATIONS = True class ProductionConfig(BaseConfig): """Default set of configurations for prod mode.""" DEBUG = False TESTING = False BASEDIR = os.path.abspath(os.path.dirname(__file__)) SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(BASEDIR, 'app.db') SECRET_KEY = 'Super-awesome-secret-stuff' SQLALCHEMY_TRACK_MODIFICATIONS = True JWT_SECRET_KEY = 'another_super_awesome_secret_stuff_yo.' class TestingConfig(BaseConfig): """Default set of configurations for test mode.""" DEBUG = False TESTING = True SQLALCHEMY_DATABASE_URI = 'sqlite://' SECRET_KEY = '792842bc-c4df-4de1-9177-d5207bd9faa6' JWT_SECRET_KEY = 'another_super_awesome_secret_stuff_yo.' ``` ## 使用工厂来创建app app/\_\_init\_\_.py ```python import os from flask import Flask from config import CONFIG from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() def create_app(config_name): """Configure the app w.r.t Flask-security, databases, loggers.""" app = Flask(__name__) app.config.from_object(CONFIG[config_name]) db.init_app(app) return app ``` ## 使用manage来管理 manage.py ```python from app import create_app, db from app.models import User from flask_script import Manager, Shell from flask_migrate import Migrate, MigrateCommand from flask_jsonrpc import JSONRPC import os app = create_app(os.getenv('FLASK_CONFIG') or 'development') jsonrpc = JSONRPC(app, '/api', enable_web_browsable_api=True, auth_backend=User.authenticate) manager = Manager(app) migrate = Migrate(app, db) def make_shell_context(): return dict(app=app, db=db, User=User) manager.add_command("shell", Shell(make_context=make_shell_context)) manager.add_command('db', MigrateCommand) import app.soapi.user if __name__ == '__main__': manager.run() ``` ## 使用manage创建数据库 ### 初始化 ```shell python manage.py init ``` ### 创建历史版本 ```shell python manage.py migrate -m ‘first’ ``` ### 创建数据库 ```shell python manage.py upgrade ``` ## jsonrpc模块化 app/soapi/user.py ``` from flask import Blueprint mod = Blueprint('user', __name__) from manage import jsonrpc jsonrpc.register_blueprint(mod) from app.models import User from app import db @jsonrpc.method('user.register(username=str,password=str)') def user_register(username, password): if not User.query.filter_by(username=username).first(): user = User(username=username) user.password(password) db.session.add(user) db.session.commit() return {'status': 0, 'message': u'注册成功'} else: return {'status': 1, 'message': u'用户已存在'} @jsonrpc.method('user.verify(username=str,password=str)') def user_verify(username, password): user = User.query.filter_by(username=username).first() if not user: return {'status': 1, 'message': u'用户名不存在'} if user.verify_password(password): token = user.generate_auth_token() return {'status': 0, 'message': u'欢迎%s' % username, 'token': token} return {'status': 1, 'message': u'密码错误'} ``` 然后在manage中import过来 ```python import app.soapi.user ``` ## 数据模型拆分 app/models.py ```python from itsdangerous import BadSignature, SignatureExpired, TimedJSONWebSignatureSerializer as Serializer import werkzeug from functools import wraps from flask_jsonrpc import InvalidCredentialsError, InvalidParamsError from app import db from config import USER_SECRET_KEY, VERIFY_DEBUG class User(db.Model): id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(80), unique=True) password_hash = db.Column(db.String(164)) def password(self, password): """ 设置密码hash值 """ self.password_hash = werkzeug.security.generate_password_hash(password) def verify_password(self, password): """ 将用户输入的密码明文与数据库比对 """ if self.password_hash: return werkzeug.security.check_password_hash(self.password_hash, password) return None def generate_auth_token(self, expiration=600): s = Serializer(USER_SECRET_KEY, expires_in=expiration) return bytes.decode(s.dumps({'id': self.id})) @staticmethod def verify_auth_token(token): s = Serializer(USER_SECRET_KEY) try: data = s.loads(token) except SignatureExpired: return None # valid token, but expired except BadSignature: return None # invalid token user = User.query.get(data['id']) return user @staticmethod def authenticate(f, f_check_auth): @wraps(f) def _f(*args, **kwargs): is_auth = False try: creds = args[:2] is_auth = f_check_auth(creds[0], creds[1]) if is_auth: args = args[2:] except IndexError: if 'token' in kwargs: is_auth = f_check_auth(kwargs['token']) if is_auth: kwargs.pop('token') else: raise InvalidParamsError('Authenticated methods require at least ' '[token] or {token: } arguments') if not is_auth: raise InvalidCredentialsError() return f(*args, **kwargs) return _f @staticmethod def check_auth(token): # 启用debug模式不需要进行token认证 if VERIFY_DEBUG: return True user = User.verify_auth_token(token) if user: return True return False def __init__(self, username): self.username = username def __repr__(self): return '<User %r>' % self.username ```