### **列方法**
~~~
# 类属性
@hybrid_property
def fullname(self):
return self.firstname + ' ' + self.lastname # 这样就可以用user.fullname访问该属性
# 验证列
@validates('email')
def validate_email(self, key, address):
assert '@' in address
return address
~~~
### **CRUD**
* 像`join`自身类似的需求,可以使用别名`user_model1 = aliased(UserModel)`
* 目前没有找到合适的方法去返回影响的行数,但是在`UPDATE/DELETE`方法中可以使用`result.rowcount`来返回SQL中where语句匹配到的行数,折衷方案是可以多加一个where条件去返回实际的影响行数。
<br />
* **执行原生语句,返回的是`ResultProxy`对象:**
~~~
result = conn.execute("INSERT INTO user (name) VALUES ('haofly')")
result = conn.execute("INSERT INTO user (name) VALUES ('haofly') RETURNING id") # 插入并拿到插入的id
result.fetchall()
~~~
* **执行原生语句的时候,防止SQL注入:**
~~~
bind_sql = 'SELECT * FROM xxx WHERE field = :value'
session.execute(bind_sql, {'value': 'value1'})
// 或者用下面的方式插入一个字典或者列表
session.execute(MyModel.__table__.insert(), modelDict)
session.execute(MyModel.__table__.insert(), modelDicts)
~~~
<br />
### **查询**
>[success]* `filter_by`只能用`=`,而`filter`可以用`==,!=`等多种取值方式,且必须带表名
~~~
# 查询表
query = session.query(User)
print(query) // 得到sql语句
query.statement // 同上
query.count() // COUNT操作
query.get(2) // 根据主键获取的简便写法
query.first() // 只获取第一条
query.all() // 获取所有数据
session.query(User.id).distinct().all() // DISTINCT操作
query.limit(2).offset(2).all() // limit offset要注意如果page相乘的时候page-1
// 筛选
query.filter(
getattr(User, 'icon_id') == 3, // 通过字段名的字符串形式获取属性
User.id==2,
User.age>10, // 大于、小于、等于直接写
User.deleted_at == None, // IS NULL用None代替
User.name.in_(['hao', 'fly']) // IN操作
).first().name
query.filter('id = 2').first() // 复杂的filter
query.filter_by(deleted_at == None) // flask-sqlalchemy的查询方式
query.order_by('user_name').all() // 排序
query.order_by(desc('name')).all() // 倒序排序,from sqlalchemy import desc
// 使用功能函数
query(func.count('*')).all()
query(func.json_contains(User.age, '{"A":"B"}')).all() // 使用JSON_CONTAINS
// 查询列
session.query(User.name) // 去除指定列
session.query(User.id, User.name) // 去除指定列
session.query.with_entities(User.id, User.name) // 获取指定列
// 拼接
query2.filter(or_(User.id == 1)) // or操作,or ...
query2.filter(or_(User.id == 1, User.name.like(''))) // or操作,or (xxx AND xxx)
// 关联查询
query(User).join(Post, User.id == Post.user_id).all() // join查询
query(User).join(Post, and_(User.id == Post.user_id, User.deleted_at==None)) // JOIN ... ON (xxx AND xxx),join的and操作
// 关联查询外键
query.filter(Post.user == user)
query.filter(Post.user == None)
query.filter(User.posts.contains(post))
query.filter(User.posts.any(title='hao'))
query.filter(Post.user.has(name='haofly'))
from sqlalchemy.sql import exists
stmt = exists().where(Post.user_id==User.id)
for name, in session.query(User.name).filter(stmt): // 查询存在Post的user
print(name)
// LIKE查询
query.filter(User.name.like('%王%'))
MyModel.query.filter(User.name.like('%hao%'))
~~~
<br />
## **其他新增方式**
>[warning] 注意在连接数据库时`autoflush`参数默认为`True`,但是并不是`add`之后就自动将语句`flush`到数据库,
而是指每次查询前回自动`flush`,所以无论`autoflush`是否为`True`,`add`之后都需要手动`session.flush()`
~~~
session.add(User(name='haofly')) # 直接插入一条数据
session.flush() # 必须手动flush
// 批量插入ORM版
session.bulk_save_objects([User(name="wang") for i in xrange(1000)])
// 批量插入非ORM版
result = session.execute(
User.__table__.insert(),
[{'name': 'wang', 'age': 10}, {}]
)
session.commit()
result.lastrowid // 获取上一次插入的主键id
modelobj.id // 如果是ORM,那么直接在add后获取主键id值就行了
~~~
<br />
### **修改**
~~~
query.filter(...).update({User.age: 10})
session.flush()
user.name = 'new'
session.commit()
~~~
### **删除**
~~~
session.delete(user)
session.flush()
~~~
### **自定义SQL构造**
~~~
// 在所有的Insert语句前加上指定的前缀/后缀,例如加上ON DUPLICATE KEY UPDATE。
// 例如下面这个例子,当传入append_string参数时会将指定的字符串添加到后面
from sqlalchemy.sql.expression import Insert
@compiles(Insert)
def prefix_inserts(insert, compiler, **kw):
s = compiler.visit_insert(insert, **kw)
if 'append_string' in insert.kwargs:
return s + " " + insert.kwargs['append_string']
return s
session.execute(MyModel.__table__.insert(append_string = 'ON DUPLICATE KEY UPDATE fieldname="abc"'), objects)
~~~