# Celery介绍和基本使用
**需求场景**
1. 对100台命令执行一条批量命令,命令执行需要很长时间,但是不想让主程序等着结果返回,而是给主程序返回一个任务ID,task_id
主程序过一段时间根据task_id,获取执行结果即可,再命令执行期间,主程序 可以继续做其他事情
2. 定时任务,比如每天检测一下所有的客户资料,发现是客户的生日,发个祝福短信
**解决方案**
1. 逻辑view 中启一个进程
父进程结束,子进程跟着结束,子进程任务没有完成,不符合需求
父进程结束,等着子进程结束,父进程需等着结果返回,不符合需求
小结:该方案解决不了阻塞问题,即需要等待
2. 启动 subprocess,任务托管给操作系统执行
实现task_id,实现异步,解决阻塞
小结:大批量高并发,主服务器会出现问题,解决不了并发
3. celery
celery提供多子节点,解决并发问题
## celery介绍
celery是一个基于python开发的分布式异步消息队列,轻松实现任务的异步处理
celery在执行任务时需要一个消息中间件来接收和发送任务消息,以及存储任务结果,一般使用RabbitMQ 或 Redis
## celery优点
简单:熟悉celery的工作流程后,配置使用简单
高可用:当任务执行失败或执行过程中发生连接中断,celery会自动尝试重新执行任务
快速:一个单进程的celery每分钟可处理上百万个任务
灵活:几乎celery的各个组件都可以被扩展及自定制
## celery基本工作流程
![](https://box.kancloud.cn/32d19c7420eb9c8c072f1b38ff0f8e41_670x319.png)
其中中间队列用于分配任务以及存储执行结果
## celery安装及使用
1. 安装python模块
```
pip3 install celery
pip3 install redis
```
2. 安装redis服务
```
wget http://download.redis.io/releases/redis-3.2.8.tar.gz
tar -zxvf redis-3.2.8.tar.gz
cd redis-3.2.8
make
src/redis-server # 启动redis 服务
```
3. 创建一个celery application 用来定义任务列表
创建一个任务 tasks.py
```
from celery import Celery
app = Celery('TASK',
broker='amqp://guest:guest@localhost:5672//',
backend='redis://localhost')
@app.task
def add(x,y):
print("running...",x,y)
return x+y
```
4. 启动celery worker 来开始监听并执行任务
```
celery -A tasks worker --loglevel=info
```
tasks 任务文件名,worker 任务角色,--loglevel=info 任务日志级别
5. 调用任务
打开另外终端,进入命令行模式,调用任务
![](https://box.kancloud.cn/ccf1e9529cc2e20728b3e0ab1c6baa08_573x167.png)
6. celery常用接口
* tasks.add(4,6) ---> 本地执行
* tasks.add.delay(3,4) --> worker执行
* t=tasks.add.delay(3,4) --> t.get() 获取结果,或卡住,阻塞
* t.ready()---> False:未执行完,True:已执行完
* t.get(propagate=False) 抛出简单异常,但程序不会停止
* t.traceback 追踪完整异常
# 项目中使用Celery
## 1. 项目目录结构
```
project
|-- __init__.py
|-- celery.py # 配置文档
|-- tasks.py # 任务函数
|-- tasks2.py # 任务函数
```
## 2. 项目文件
project/celery.py
```
# from celery import Celery 默认当前路径,更改为绝对路径(当前路径有个celery.py文件啦)
from celery import Celery
app = Celery('project',
broker='redis://localhost',
backend='redis://localhost',
include=['project.tasks','project.tasks2']) # 配置文件和任务文件分开了,可以写多个任务文件
# app 扩展配置
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
```
celery.py作用相当于配置文件
project/tasks.py
```
from .celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
```
project/tasks.py
```
from .celery import app
@app.task
def hello():
return 'Hello World'
```
3. 启动项目worker
```
celery -A project worker -l info
```
另启终端,与project同目录进入python3
![](https://box.kancloud.cn/4abe47350dd74f51919d94120f8a98b4_585x245.png)
4. 实现分布式
当启动多个时 celery -A project worker -l info,去broker去相应任务,实现分布式
5. 后台启动woker
```
celery multi start w1 -A project -l info
celery multi start w2 -A project -l info
celery multi start w3 -A project -l info
celery multi restart w1 -A project -l info
celery multi stop w1 w2 w3 # 任务立刻停止
celery multi stopwait w1 w2 w3 # 任务执行完,停止
```
# Celery定时任务
celery支持定时任务,设定好任务的执行时间,celery就会定时帮你执行,这个定时任务模块叫 celery beat
项目目录结构
```
project
|-- __init__.py
|-- celery.py # 配置文件
|-- periodic_task.py # 定时任务文件
```
脚本celery.py
```
from celery import Celery
app = Celery('project',
broker='redis://localhost',
backend='redis://localhost',
include=['project.periodic_task',])
app.conf.update(
result_expires=3600,
)
if __name__ == '__main__':
app.start()
```
脚本periodic_task.py
```
from .celery import app
from celery.schedules import crontab
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# 每10s调用 test('hello')
sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')
# 每20s调用 test('world')
sender.add_periodic_task(20.0, test.s('world'), expires=10)
# 每周一早上7:30 执行 test('Happy Mondays!')
sender.add_periodic_task(
crontab(hour=7, minute=30, day_of_week=1), # 可灵活修改
test.s('Happy Mondays!'),
)
@app.task
def test(arg):
print(arg)
```
启动角色 worker 执行任务
```
celery -A project worker -l info
```
启动角色 beat 将定时任务放到队列中
```
celery -A project.periodic_task beat -l debug
```
也可以在配置文件celery.py 里添加定时任务
```
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'project.tasks.add',
'schedule': 30.0,
'args': (16, 16)
},
}
app.conf.timezone = 'UTC'
```
# Celery与Django结合
```
LearnCelery
|-- app1
|-- tasks.py
|-- models.py
|-- app2
|-- tasks.py
|-- models.py
|-- LearnCelery
|-- __init__.py
|-- celery.py
|-- settings.py
```
脚本代码
LearnCelery/app/tasks.py # 必须叫这个名字
```
from celery import shared_task
import time
# 所有的app都可以调用
@shared_task
def add(x, y):
time.sleep(10)
return x + y
@shared_task
def mul(x, y):
time.sleep(10)
return x * y
```
LearnCelery/LearnCelery/__init__.py
```
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ['celery_app']
```
LearnCelery/LearnCelery/celery.py
```
import os
from celery import Celery
# 单独脚本调用Django内容时,需配置脚本的环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mysite.settings')
app = Celery('mysite')
# CELERY_ 作为前缀,在settings中写配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 到Django各个app下,自动发现tasks.py 任务脚本
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
```
LearnCelery/LearnCelery/settings.py
```
# For celery
CELERY_BROKER_URL = 'redis://localhost'
CELERY_RESULT_BACKEND = 'redis://localhost'
```
3. 启动celery
```
celery -A LearnCelery worker -l debug
```
4. urls.py 视图处理
```
urlpatterns = [
url(r'^celery_call/$', views.celery_call),
url(r'^celery_res/$', views.celery_res),
]
```
# django中使用计划任务
1. 安装插件
```
pip3 install django-celery-beat
```
2. 修改配置 settings.py
```
INSTALLED_APPS = [
'django_celery_beat',
]
```
3.数据库迁移
```
python manage.py migrate
```
4. 启动 celery beat
```
celery -A LearnCelery beat -l info -S django
```
定时任务存到数据库里,启动beat定时取任务放到队列里执行
5. admin管理
![](https://box.kancloud.cn/33d360707ed8e3019c407402435e8eac_875x323.png)
![](https://box.kancloud.cn/b884ec26e2cd1c98d8bf5c480e91a478_1680x1594.png)
此时启动你的celery beat 和worker,会发现每隔2分钟,beat会发起一个任务消息让worker执行scp_task任务
注意,经测试,每添加或修改一个任务,celery beat都需要重启一次,要不然新的配置不会被celery beat进程读到