企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
有时候我们需要花费长时间运行某个任务(如:发送短信,发送邮件),我们可以不必在用户的请求中等待任务结束,那么可以使用任务队列发送必须的数据给另一个进程。这样就可以在后台运行任务,立即返回请求。 Celery 是强大的任务队列库,它可以用于简单的后台任务,也可用于复杂的多阶段应用的计划。 安装依赖 ``` (.venv) root@airvip:~/python_app/flask-demo# pip install celery ``` 我们单独创建一个文件夹存放我们的任务队列,在 `app` 目录下新建 `tasks` 目录 ``` root@airvip:~/python_app/flask-demo/app# pwd /root/python_app/flask-demo/app root@airvip:~/python_app/flask-demo/app# mkdir tasks root@airvip:~/python_app/flask-demo/app# ``` 我们选用 redis 作为我们的消息中间件,celery 也支持 **RabbitMQ**,**Amazon SQS**,**Zookeeper** 在 `app/tasks` 目录下新建 `__init__.py` , `config.py`文件以及 `compute` 目录 `config.py` 文件内容如下 ``` #!/usr/bin/env python3 # -*- encoding: utf-8 -*- BROKER_URL = "redis://127.0.0.1:6379/1" CELERY_RESULT_BACKEND = "redis://127.0.0.1:6379/2" ``` `config.py`文件内容如下 ``` #!/usr/bin/env python3 # -*- encoding: utf-8 -*- from celery import Celery from app.tasks import config # 定义 celery 对象 celery_app = Celery("tasks_app") # 引入配置信息 celery_app.config_from_object(config) # 自动搜寻异步任务 celery_app.autodiscover_tasks(["app.tasks.compute"]) ``` 在 `app/tasks/compute` 目录下新建 `tasks.py` 文件,内容如下 ``` #!/usr/bin/env python3 # -*- encoding: utf-8 -*- from app.tasks import celery_app import time @celery_app.task def mycomputer(num): time.sleep(5) return num ** 9 ``` 引入了时间模块,睡眠一段时间,主要表示这是一个很耗时的操作 在 `app/api_1_0/controller/index.py` 文件下添加方法来调用耗时任务 ``` #!/usr/bin/env python3 # -*- encoding: utf-8 -*- from app.api_1_0 import bp from app.tasks.compute.tasks import mycomputer @bp.route('/jisuan') def jisuan(): mycomputer.delay(9) return 'ok \n' ``` ## 开启 Celery 的后台任务 ``` (.venv) root@airvip:~/python_app/flask-demo# celery -A app.tasks worker -l info /root/python_app/flask-demo/.venv/lib/python3.6/site-packages/celery/platforms.py:797: RuntimeWarning: You're running the worker with superuser privileges: this is absolutely not recommended! Please specify a different user using the --uid option. User information: uid=0 euid=0 gid=0 egid=0 uid=uid, euid=euid, gid=gid, egid=egid, -------------- celery@airvip v5.0.5 (singularity) --- ***** ----- -- ******* ---- Linux-4.15.0-99-generic-x86_64-with-Ubuntu-18.04-bionic 2021-01-06 09:31:34 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: tasks_app:0x7fbf0d06c278 - ** ---------- .> transport: redis://127.0.0.1:6379/1 - ** ---------- .> results: redis://127.0.0.1:6379/2 - *** --- * --- .> concurrency: 1 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery [tasks] . app.tasks.compute.tasks.mycomputer ``` 可以看到,任务正常启动 ## 测试 ``` root@airvip:~# curl 127.0.0.1:5000/api/v1.0/jisuan ok ``` 我们可以在我们 redis 第 2 号库中查看结果 ![celery 任务结果](https://img.kancloud.cn/a8/fa/a8fa01f6e29ed3ba7e905b9d7b05127f_421x50.png)