合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
为 `app/api_1_0/controller` 下的 `user.py` 控制器做查询操作 Flask-SQLAlchemy 在 Model 类上提供了 query 属性。通过 query 属性会得到一个所有记录的查询对象。在使用 all() 或者 first() 发起查询。 我们为了看到返回结果,返回数据只能是 `string`, `dict`, `tuple`,我们从 `flask` 导入了 `jsonify` ``` Python from app.api_1_0 import bp from app.api_1_0.model.user import UserModel from app import db from flask import jsonify @bp.route('/user', methods=['POST']) def add_user(): pass @bp.route('/user', methods=['GET']) def list_user(): try:         user_obj = UserModel.query.all() except Exception as e: return '查询失败'     user_list = [] for user in user_obj:         user_list.append(user.to_dict()) return jsonify(user_list) ``` 测试 ![通过模型查询](https://img.kancloud.cn/cd/5b/cd5b673ccfa995679af153580cd62a0e_570x156.png) ### 其他查询 **1. 根据条件查询** ``` # 查询id为1的记录 UserModel.query.filter_by(id=1).all() # 查询id为1的记录只取一条 UserModel.query.filter_by(id=1).limit(1).all() # 查询符合条件的记录根据id排序只取2条 UserModel.query.order_by(UserModel.id).limit(2).all() UserModel.query.order_by(UserModel.id.desc()).limit(2).all() UserModel.query.filter(UserModel.username.endswith('维奇')).order_by(UserModel.id.desc()).limit(2).all() UserModel.query.filter(UserModel.username.startswith('阿尔')).order_by(UserModel.id).limit(2).all() UserModel.query.filter(UserModel.username.contains('尔维')).order_by(UserModel.id).all() # 查询符合限制条的第一条 UserModel.query.filter_by(id=2).first() UserModel.query.filter_by(id=1).first_or_404() # 根据主键查询 UserModel.query.get(1) UserModel.query.get_or_404(1) ``` 使用 `get_or_404()` 来代替 `get()`,使用 `first_or_404()` 来代替 `first()`,对于不存在的条目返回一个 404 错误,而不是返回 None。 **2. 统计符合条件的数量** ``` UserModel.query.filter_by(id=1).count() ``` **3.group分组统计** 根据名字分组,查询名字,统计同名手机号个数,根据名字分组,原 sql 语句 ``` SELECT base_user.username AS base_user_username, count(base_user.mobile) AS count_1 FROM base_user GROUP BY base_user.username ``` 用 flask_sqlachemy 表示,返回的是一个 list ``` from sqlalchemy import func UserModel.query.with_entities(UserModel.username, func.count(UserModel.mobile)).group_by('username').all() ``` **4.原生查询** ``` from sqlalchemy import text UserModel.query.filter(text('id>=:value1 and id <:value2')).params(value1=2,value2=5).all() UserModel.query.from_statement(text("select * from base_user where id=:value")).params(value=1).all() db.session.execute("SELECT COUNT(*) FROM table").scalar() # 返回的数字 ``` **4.分页** ``` UserModel.query.paginate(page, per_page,Error_out) 1. page: 哪一个页 2. per_page: 每页多少条数据 3. Error_out: False 查不到不报错 4. pages: 共有多少页 5. items: 当前页数的所有对象 user_obj = UserModel.query.paginate(2,2,False) user_obj.page user_obj.per_page user_obj.pages ``` ### 查询条件 **and** ``` from sqlalchemy import and_ UserModel.query.filter(and_(UserModel.username == '阿尔维奇',UserModel.id==1)).all() ``` 或者 ``` UserModel.query.filter(UserModel.username == '阿尔维奇',UserModel.id == 1).all() ``` **or** ``` from sqlalchemy import or_ UserModel.query.filter(or_(UserModel.username == '阿尔维奇', UserModel.username == 'Python')).all() ``` **in** ``` UserModel.query.filter(UserModel.id.in_([1,2])).all() ``` **not in** ``` UserModel.query.filter(~UserModel.id.in_([1,2])).all() ``` **like** ``` UserModel.query.filter(UserModel.username.like('阿尔%')).all() ```