企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
### **列方法** ~~~ # 类属性 @hybrid_property def fullname(self): return self.firstname + ' ' + self.lastname # 这样就可以用user.fullname访问该属性 # 验证列 @validates('email') def validate_email(self, key, address): assert '@' in address return address ~~~ ### **CRUD** * 像`join`自身类似的需求,可以使用别名`user_model1 = aliased(UserModel)` * 目前没有找到合适的方法去返回影响的行数,但是在`UPDATE/DELETE`方法中可以使用`result.rowcount`来返回SQL中where语句匹配到的行数,折衷方案是可以多加一个where条件去返回实际的影响行数。 <br /> * **执行原生语句,返回的是`ResultProxy`对象:** ~~~ result = conn.execute("INSERT INTO user (name) VALUES ('haofly')") result = conn.execute("INSERT INTO user (name) VALUES ('haofly') RETURNING id") # 插入并拿到插入的id result.fetchall() ~~~ * **执行原生语句的时候,防止SQL注入:** ~~~ bind_sql = 'SELECT * FROM xxx WHERE field = :value' session.execute(bind_sql, {'value': 'value1'}) // 或者用下面的方式插入一个字典或者列表 session.execute(MyModel.__table__.insert(), modelDict) session.execute(MyModel.__table__.insert(), modelDicts) ~~~ <br /> ### **查询** >[success]* `filter_by`只能用`=`,而`filter`可以用`==,!=`等多种取值方式,且必须带表名 ~~~ # 查询表 query = session.query(User) print(query) // 得到sql语句 query.statement // 同上 query.count() // COUNT操作 query.get(2) // 根据主键获取的简便写法 query.first() // 只获取第一条 query.all() // 获取所有数据 session.query(User.id).distinct().all() // DISTINCT操作 query.limit(2).offset(2).all() // limit offset要注意如果page相乘的时候page-1 // 筛选 query.filter( getattr(User, 'icon_id') == 3, // 通过字段名的字符串形式获取属性 User.id==2, User.age>10, // 大于、小于、等于直接写 User.deleted_at == None, // IS NULL用None代替 User.name.in_(['hao', 'fly']) // IN操作 ).first().name query.filter('id = 2').first() // 复杂的filter query.filter_by(deleted_at == None) // flask-sqlalchemy的查询方式 query.order_by('user_name').all() // 排序 query.order_by(desc('name')).all() // 倒序排序,from sqlalchemy import desc // 使用功能函数 query(func.count('*')).all() query(func.json_contains(User.age, '{"A":"B"}')).all() // 使用JSON_CONTAINS // 查询列 session.query(User.name) // 去除指定列 session.query(User.id, User.name) // 去除指定列 session.query.with_entities(User.id, User.name) // 获取指定列 // 拼接 query2.filter(or_(User.id == 1)) // or操作,or ... query2.filter(or_(User.id == 1, User.name.like(''))) // or操作,or (xxx AND xxx) // 关联查询 query(User).join(Post, User.id == Post.user_id).all() // join查询 query(User).join(Post, and_(User.id == Post.user_id, User.deleted_at==None)) // JOIN ... ON (xxx AND xxx),join的and操作 // 关联查询外键 query.filter(Post.user == user) query.filter(Post.user == None) query.filter(User.posts.contains(post)) query.filter(User.posts.any(title='hao')) query.filter(Post.user.has(name='haofly')) from sqlalchemy.sql import exists stmt = exists().where(Post.user_id==User.id) for name, in session.query(User.name).filter(stmt): // 查询存在Post的user print(name) // LIKE查询 query.filter(User.name.like('%王%')) MyModel.query.filter(User.name.like('%hao%')) ~~~ <br /> ## **其他新增方式** >[warning] 注意在连接数据库时`autoflush`参数默认为`True`,但是并不是`add`之后就自动将语句`flush`到数据库, 而是指每次查询前回自动`flush`,所以无论`autoflush`是否为`True`,`add`之后都需要手动`session.flush()` ~~~ session.add(User(name='haofly')) # 直接插入一条数据 session.flush() # 必须手动flush // 批量插入ORM版 session.bulk_save_objects([User(name="wang") for i in xrange(1000)]) // 批量插入非ORM版 result = session.execute( User.__table__.insert(), [{'name': 'wang', 'age': 10}, {}] ) session.commit() result.lastrowid // 获取上一次插入的主键id modelobj.id // 如果是ORM,那么直接在add后获取主键id值就行了 ~~~ <br /> ### **修改** ~~~ query.filter(...).update({User.age: 10}) session.flush() user.name = 'new' session.commit() ~~~ ### **删除** ~~~ session.delete(user) session.flush() ~~~ ### **自定义SQL构造** ~~~ // 在所有的Insert语句前加上指定的前缀/后缀,例如加上ON DUPLICATE KEY UPDATE。 // 例如下面这个例子,当传入append_string参数时会将指定的字符串添加到后面 from sqlalchemy.sql.expression import Insert @compiles(Insert) def prefix_inserts(insert, compiler, **kw): s = compiler.visit_insert(insert, **kw) if 'append_string' in insert.kwargs: return s + " " + insert.kwargs['append_string'] return s session.execute(MyModel.__table__.insert(append_string = 'ON DUPLICATE KEY UPDATE fieldname="abc"'), objects) ~~~