多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
创建运行user数据表: ``` -- phpMyAdmin SQL Dump -- version 5.0.2 -- https://www.phpmyadmin.net/ -- -- 主机: localhost -- 生成日期: 2021-08-03 17:17:00 -- 服务器版本: 8.0.20 -- PHP 版本: 7.4.6 SET SQL_MODE = "NO_AUTO_VALUE_ON_ZERO"; START TRANSACTION; SET time_zone = "+00:00"; /*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */; /*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */; /*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */; /*!40101 SET NAMES utf8mb4 */; -- -- 数据库: `test1` -- -- -------------------------------------------------------- -- -- 表的结构 `user` -- CREATE TABLE `user` ( `id` int NOT NULL, `username` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL, `password` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, `nickname` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL, `email` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; -- -- 转储表的索引 -- -- -- 表的索引 `user` -- ALTER TABLE `user` ADD PRIMARY KEY (`id`); -- -- 在导出的表使用AUTO_INCREMENT -- -- -- 使用表AUTO_INCREMENT `user` -- ALTER TABLE `user` MODIFY `id` int NOT NULL AUTO_INCREMENT, AUTO_INCREMENT=1; COMMIT; /*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */; /*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */; /*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */; ``` 运行单个文件main.py: ``` """ 日期:2021-08-03 描述:Python Flask操作MySQL简单案例 """ from flask_sqlalchemy import SQLAlchemy from flask import Flask, jsonify, request import configparser import os app = Flask(__name__) # 使用ConfigParser 首选需要初始化实例,并读取配置文件: my_config = configparser.ConfigParser() # 连接数据库information_collection app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DEV_DATABASE_URL') or \ "mysql+pymysql://root:123456@127.0.0.1:3306/test1" app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True mydb = SQLAlchemy() mydb.init_app(app) # 用户模型 class User(mydb.Model): id = mydb.Column(mydb.Integer, primary_key=True) username = mydb.Column(mydb.String(60), nullable=False) password = mydb.Column(mydb.String(30), nullable=False) nickname = mydb.Column(mydb.String(50)) email = mydb.Column(mydb.String(30), nullable=False) def __repr__(self): return '<User %r>' % self.username # 获取用户列表,所有数据 @app.route('/users', methods=['GET']) def getUsers(): data = User.query.all() datas = [] for user in data: datas.append({'id': user.id, 'username': user.username, 'nickname': user.nickname, 'email': user.email}) return jsonify(data=datas) # 获取单条数据 @app.route('/user/<int:userId>', methods=['GET']) def getUser(userId): user = User.query.filter_by(id=userId).first() if (user is None): result = {'msg': '找不到数据'} else: result = {'id': user.id, 'username': user.username, 'nickname': user.nickname, 'email': user.email} return jsonify(data=result) # 添加用户数据,一条一条添加 # @app.route('/user/add', methods=['POST']) @app.route('/user/add', methods=['GET']) def addUser(): username = request.args.get('username') password = request.args.get('password') nickname = request.args.get('nickname') email = request.args.get('email') user = User(username=username, password=password, nickname=nickname, email=email) try: mydb.session.add(user) mydb.session.commit() except: mydb.session.rollback() mydb.session.flush() userId = user.id if (user.id is None): result = {'msg': '添加失败'} return jsonify(data=result) data = User.query.filter_by(id=userId).first() result = {'id': user.id, 'username': user.username, 'nickname': user.nickname, 'email': user.email} return jsonify(data=result) # 修改用户数据 # @app.route('/user/update/<int:userId>', methods=['PATCH']) @app.route('/user/update/<int:userId>', methods=['GET']) def updateUser(userId): username = request.args.get('username') password = request.args.get('password') nickname = request.args.get('nickname') email = request.args.get('email') try: user = User.query.filter_by(id=userId).first() if (user is None): result = {'msg': '找不到要修改的记录'} return jsonify(data=result) else: user.username = username user.password = password user.nickname = nickname user.email = email mydb.session.commit() except: mydb.session.rollback() # 回滚 mydb.session.flush() # 重置 userId = user.id data = User.query.filter_by(id=userId).first() result = {'id': user.id, 'username': user.username, 'password': user.password, 'nickname': user.nickname, 'email': user.email} return jsonify(data=result) # 删除用户数据 # @app.route('/user/delete/<int:userId>', methods=['DELETE']) @app.route('/user/delete/<int:userId>', methods=['GET']) def deleteUser(userId): User.query.filter_by(id=userId).delete() mydb.session.commit() return getUsers() if __name__ == '__main__': app.run() ```