合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
[TOC] ## 第6章 Token与HTTPBasic验证 —— 用令牌来管理用户 在我的TP5课程里,我们使用令牌的方式是服务器缓存的方式。那么在Python Flask中我们换一种令牌的发放方式。我们将用户的信息加密后作为令牌返回到客户端,客户端在访问服务器API时必须以HTTP Basic的方式携带令牌,我们再读取令牌信息后,将用户信息存入到g变量中,共业务代码全局使用... ### 6-1 Token概述 ![](https://ws1.sinaimg.cn/large/006tNc79gy1fz8dul72bxj31uq0sq4bc.jpg) ![](https://ws3.sinaimg.cn/large/006tNbRwgy1fys47a8u44j31tk0u0jyn.jpg) ### 6-2 获取Token令牌 理论上来说 get\_token 的 HTTP 动词应该设为 get,但是由于获取 token 需要传入账号和密码,所以这里要使用 POST 方法。此外相对于 get 方法,post 方法的传参相对安全。如果使用 get 的方式来传递用户的账号和密码,我们只能够把这两个参数放在 url 后面的?(问号)里作为查询参数来传递,但是如果使用post 的话,我们就可以把账号和密码放到 HTTP 的 body 里面来传递。 先编写 ginger/app/api/v1/token.py,思路: 1. 创建 token 红图,将 token 红图注册到 v1蓝图中去 2. 注册 token 路由,使用 POST 方法 3. 实例化 ClientForm,验证 validate\_for\_api(验证数据格式) 4. 使用 promise 辨别客户端类型 5. 使用 promise 验证客户端提交的账户密码,查询用户返回用户 id 6. 生成 token,使用 TimedJSONWebSignatureSerializer 序列化器生成 token 生成的 token 是byte 类型的字符串,需要使用 `decode('ascii')`解码 7. 将 token 序列化返回,并返回 HTTP 状态码 ~~~  from flask import current_app, jsonify  ​  from app.libs.enums import ClientTypeEnum  from app.libs.red_print import RedPrint  from app.models.user import User  from app.validators.forms import ClientForm  from itsdangerous import TimedJSONWebSignatureSerializer as Serializer  ​  api = RedPrint('token')  ​  ​  @api.route('', methods=['POST'])  def get_token():      form = ClientForm().validate_for_api()      promise = {          ClientTypeEnum.USER_EMAIL: User.verify_by_email,          ClientTypeEnum.USER_MOBILE: User.verify_by_mobile,          ClientTypeEnum.USER_MINA: User.verify_by_mina,          ClientTypeEnum.USER_WX: User.verify_by_wx,     }      identity = promise[ClientTypeEnum(form.type.data)](form.account.data, form.secret.data)      expiration = current_app.config['TOKEN_EXPIRATION']      token = generate_auth_token(identity['uid'], form.type.data, None, expiration)      t = {          'token': token.decode('ascii')     }      return jsonify(t), 201  ​  ​  def generate_auth_token(uid, ac_type, scope=None, expiration=7200):      s = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration)      return s.dumps({          'uid': uid,          'token': ac_type.value     }) ~~~ 在 User 模型内编写验证客户端账户密码的方法: 1. 先查询用户 2. 如果用户不存在,返回找不到 3. 使用 check\_password\_hash 比对客户端传入的密码与数据库中保存的密码 4. 如果用户密码错误,返回授权失败 ~~~  @staticmethod      def verify_by_email(email, password):          user = User.query.filter_by(email=email).first()          if not user:              raise NotFound(msg='user not found')          if not user.check_password(password):              raise AuthFailed()          return {'uid': user.id}  ​      def check_password(self, raw):          if not self.password:              return False          return check_password_hash(self.password, raw) ~~~ 在配置文件中配置 TOKEN\_EXPIRATION,这是作为开发用途,所以过期时间比较长,正式上线之后过期时间设置为两小时比较合适 ~~~  TOKEN_EXPIRATION = 30 * 24 * 3600 ~~~ ### 6-3 Token的用处 ### 6-4 @auth拦截器执行流程 本节主要了解一下 @auth.login\_required 和 @auth.verify\_password 两个装饰器的作用。 现在需要保护的视图函数前打上 @auth.login\_required 装饰器, ~~~  @api.route('', methods=['GET'])  @auth.login_required  def get_user():      return 'user' ~~~ 再在 ginger/app/libs/token\_auth.py 内为 verify\_password 函数打上 @verify\_password 装饰器。 ~~~  from flask_httpauth import HTTPBasicAuth  ​  auth = HTTPBasicAuth()  ​  @auth.verify_password  def verify_password(account, password):      pass ~~~ @auth 拦截器的请求流程: 1. 先访问到 get\_user 视图函数, 2. 再转到 verify\_password 函数, * 如果 verify\_password 函数返回 True,则返回到 get\_user 视图函数内执行 get\_user 的内容,最后返回给客户端 * 否则,直接给客户端返回 Unauthorized Access ### 6-5 HTTPBasicAuth基本原理 在 verify\_password 函数中需要接收 account、password 两个参数,那么如何获取这两个参数呢? 之前在 ClientForm 中已经可以接收到客户端传来的 account、password 了,这种接收账号和密码的方式是我们自己定义的一种方式,毫无疑问账号和密码只是两个普通的参数,当然可以通过自定义的方式传送到服务器。 除了自定义的方式传参方式,HTTP 这种协议本身就有一种规范,这种规范允许我们传递账号和密码。HTTP 自带的发送账号和密码的方式有很多种,其中一种就是 HTTPBasicAuth,还有其他的诸如 HTTPDigestAuth 等其他规范。 HTTPBasicAuth 规定: 必须将账号、密码放在 HTTP 的 Headers 里面。(之前 ClientForm 是将账户和密码放在 HTTP 的 body 里面发送的) HTTP 的头是一组一组的 key:value 的键值对。 * 需要在 HTTP 的头中设置一个固定的 key,key 的名字叫做 Authorization, * key 的 value 是 basic base64(account:password) > 注意: > > 1. 前面是 basic + 一个空格 > > 2. base64() 表示 base64 加密 > postman 演示: ![](https://ws4.sinaimg.cn/large/006tNc79gy1fyt6wrmr1sj32eq0p6ae0.jpg) ![](https://ws1.sinaimg.cn/large/006tNc79gy1fyt6yah56pj31p0086dh6.jpg) 只要 HTTP 传入的参数符合规范,verify\_password 函数就能正确的获到账户和密码。 > 面试问题: > > 以上就是 HTTPBasicAuth 的基本原理和传递规范,关于 HTTP 的协议在很多服务器面试的时候都会经常问到,这种 HTTPBasicAuth 面试的问题也是非常多的,一定要熟记。 ### 6-6 以BasicAuth的方式发送Token 第一次登陆的时候需要账户密码,但是后续的访问需要的是 token 而不是 account、password,但是我们可以通过相同的方式传递 token,我们只需要将 token 当做 account 来传就可以了,密码不传或者传空。 ### 6-7 验证Token 1. 我们需要编写 verify\_auth\_token 函数来验证 token 1. 使用 SECRET\_KEY 实例化一个序列化器 2. 使用 `s.loads(token)`将 token 反序列化赋给 data * 如果遇到 BadSignature,则表示 token 错误,返回 token 非法的错误信息 * 如果遇到 SignatureExpired,则表示 token 过期,返回 token 过期的错误信息 3. 从 data 中取出 uid 和 ac\_type(这两个是之前我们生产 token 的时候放进去的) 4. 然后将 uid 和 ac\_type 打包返回,我们可以选择元组、字典的方式返回,但是最好的方式是使用一个对象式的结构返回回去: ~~~  User = namedtuple('User', ['uid', 'ac_type', 'scope']) ~~~ 这里使用 namedtuple 有什么优势,我们在后面时候 User 对象的时候就知道了。 2. 将 verify\_auth\_token 的返回值赋给 user\_info 3. 如果 user\_info 为空,则表示验证失败,返回 False 4. 否则将 user\_info 保存到 g 变量的 user 属性中。 (这个 g 变量就跟 flask 的 request 对象是一样的,它们都是一个代理模式的实现,至于 g 变量怎么用后面再说,线程安全。) ~~~  from collections import namedtuple  ​  from flask import current_app, g  from flask_httpauth import HTTPBasicAuth  from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, BadSignature, SignatureExpired  ​  from app.libs.error_code import AuthFailed  ​  auth = HTTPBasicAuth()  User = namedtuple('User', ['uid', 'ac_type', 'scope'])  ​  ​  @auth.verify_password  def verify_password(token, password):      user_info = verify_auth_token(token)      if not user_info:          return False      else:          g.user = user_info          return True  ​  ​  def verify_auth_token(token):      s = Serializer(current_app.config['SECRET_KEY'])      try:          data = s.loads(token)      except BadSignature:          raise AuthFailed(msg='token is invalid', error_code=1002)      except SignatureExpired:          raise AuthFailed(msg='token is expired', error_code=1003)      uid = data['uid']      ac_type = data['type']      return User(uid, ac_type, '') ~~~ ### 6-8 重写first\_or\_404与get\_or\_404 前面对 get\_user 视图函数的保护工作已经做完了,现在我们来正式编写 get\_user 视图函数。 1. 查询用户 2. 判断查询的用户是否存在,不存在则返回 NotFound ~~~  @api.route('/<int:uid>', methods=['GET'])  @auth.login_required  def get_user(uid):      user = User.query.get(uid)      if not user:          raise NotFound()      return user ~~~ 那么问题来了,我们每次查询数据库的时候都需要判断查询的内容是否存在,这是一件很麻烦的事情。我们可不可以不写呢?好在有一个 get\_or\_404 方法,但是 get\_or\_404 方法放回的是系统内部的错误信息,并不是 APIException,不是 APIException 的话那么返回的格式就不是我们要求的 JSON 格式,所以我们需要重写 get\_or\_404 方法,让它返回 APIException。get\_or\_404 是 query 的方法,我们找到 query 在 ginger/models/base.py 中: ![](https://ws2.sinaimg.cn/large/006tNc79gy1fyt9lxdzrdj31b908nq4y.jpg) ![](https://ws1.sinaimg.cn/large/006tNc79gy1fyt9lallwjj31s00u0te5.jpg) 我们按照 BaseQuery.get\_or\_404 仿写就可以了,其他都一样,只是`abort(404)`那里改成 `raise APIException`就可以了。同理仿写 first\_or\_404。 ~~~  class Query(BaseQuery):      def filter_by(self, **kwargs):          if 'status' not in kwargs.keys():              kwargs['status'] = 1          return super(Query, self).filter_by(**kwargs)  ​      def get_or_404(self, ident):          rv = self.get(ident)          if not rv:              raise NotFound()          return rv  ​      def first_or_404(self):          rv = self.first()          if not rv:              raise NotFound()          return rv ~~~ 以下是需要修改的地方: 1. 第一处![](https://ws4.sinaimg.cn/large/006tNc79gy1fyt9pud0lvj31cy0degof.jpg)改成:![](https://ws2.sinaimg.cn/large/006tNc79gy1fyt9qsyi8bj31b60a276i.jpg) 2. 第二处 ![](https://ws4.sinaimg.cn/large/006tNc79gy1fyt9sxqni5j31ei0bytal.jpg) 改成: ![](https://ws3.sinaimg.cn/large/006tNc79gy1fyt9tiu3foj318b08lta4.jpg)