创建运行user数据表:
```
-- phpMyAdmin SQL Dump
-- version 5.0.2
-- https://www.phpmyadmin.net/
--
-- 主机: localhost
-- 生成日期: 2021-08-03 17:17:00
-- 服务器版本: 8.0.20
-- PHP 版本: 7.4.6
SET SQL_MODE = "NO_AUTO_VALUE_ON_ZERO";
START TRANSACTION;
SET time_zone = "+00:00";
/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */;
/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */;
/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */;
/*!40101 SET NAMES utf8mb4 */;
--
-- 数据库: `test1`
--
-- --------------------------------------------------------
--
-- 表的结构 `user`
--
CREATE TABLE `user` (
`id` int NOT NULL,
`username` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`password` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`nickname` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL,
`email` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
--
-- 转储表的索引
--
--
-- 表的索引 `user`
--
ALTER TABLE `user`
ADD PRIMARY KEY (`id`);
--
-- 在导出的表使用AUTO_INCREMENT
--
--
-- 使用表AUTO_INCREMENT `user`
--
ALTER TABLE `user`
MODIFY `id` int NOT NULL AUTO_INCREMENT, AUTO_INCREMENT=1;
COMMIT;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */;
/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */;
```
运行单个文件main.py:
```
"""
日期:2021-08-03
描述:Python Flask操作MySQL简单案例
"""
from flask_sqlalchemy import SQLAlchemy
from flask import Flask, jsonify, request
import configparser
import os
app = Flask(__name__)
# 使用ConfigParser 首选需要初始化实例,并读取配置文件:
my_config = configparser.ConfigParser()
# 连接数据库information_collection
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DEV_DATABASE_URL') or \
"mysql+pymysql://root:123456@127.0.0.1:3306/test1"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
mydb = SQLAlchemy()
mydb.init_app(app)
# 用户模型
class User(mydb.Model):
id = mydb.Column(mydb.Integer, primary_key=True)
username = mydb.Column(mydb.String(60), nullable=False)
password = mydb.Column(mydb.String(30), nullable=False)
nickname = mydb.Column(mydb.String(50))
email = mydb.Column(mydb.String(30), nullable=False)
def __repr__(self):
return '<User %r>' % self.username
# 获取用户列表,所有数据
@app.route('/users', methods=['GET'])
def getUsers():
data = User.query.all()
datas = []
for user in data:
datas.append({'id': user.id, 'username': user.username, 'nickname': user.nickname, 'email': user.email})
return jsonify(data=datas)
# 获取单条数据
@app.route('/user/<int:userId>', methods=['GET'])
def getUser(userId):
user = User.query.filter_by(id=userId).first()
if (user is None):
result = {'msg': '找不到数据'}
else:
result = {'id': user.id, 'username': user.username, 'nickname': user.nickname, 'email': user.email}
return jsonify(data=result)
# 添加用户数据,一条一条添加
# @app.route('/user/add', methods=['POST'])
@app.route('/user/add', methods=['GET'])
def addUser():
username = request.args.get('username')
password = request.args.get('password')
nickname = request.args.get('nickname')
email = request.args.get('email')
user = User(username=username, password=password, nickname=nickname, email=email)
try:
mydb.session.add(user)
mydb.session.commit()
except:
mydb.session.rollback()
mydb.session.flush()
userId = user.id
if (user.id is None):
result = {'msg': '添加失败'}
return jsonify(data=result)
data = User.query.filter_by(id=userId).first()
result = {'id': user.id, 'username': user.username, 'nickname': user.nickname, 'email': user.email}
return jsonify(data=result)
# 修改用户数据
# @app.route('/user/update/<int:userId>', methods=['PATCH'])
@app.route('/user/update/<int:userId>', methods=['GET'])
def updateUser(userId):
username = request.args.get('username')
password = request.args.get('password')
nickname = request.args.get('nickname')
email = request.args.get('email')
try:
user = User.query.filter_by(id=userId).first()
if (user is None):
result = {'msg': '找不到要修改的记录'}
return jsonify(data=result)
else:
user.username = username
user.password = password
user.nickname = nickname
user.email = email
mydb.session.commit()
except:
mydb.session.rollback() # 回滚
mydb.session.flush() # 重置
userId = user.id
data = User.query.filter_by(id=userId).first()
result = {'id': user.id, 'username': user.username, 'password': user.password, 'nickname': user.nickname, 'email': user.email}
return jsonify(data=result)
# 删除用户数据
# @app.route('/user/delete/<int:userId>', methods=['DELETE'])
@app.route('/user/delete/<int:userId>', methods=['GET'])
def deleteUser(userId):
User.query.filter_by(id=userId).delete()
mydb.session.commit()
return getUsers()
if __name__ == '__main__':
app.run()
```