ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
# Celery Celery是一个基于Python开发的分布式异步消息队列,可以轻松实现任务的异步处理。它的基本工作就是管理分配任务到不同的服务器,并且取得结果。至于说服务器之间是如何进行通信的?这个Celery本身不能解决。 Celery在执行任务时需要一个消息中间件来接收和发送任务消息,以及存储任务结果,一般使用RabbitMQ 或 Redis,我们这里只讨论Celery+RabbitMQ,其他的组合方式可以查阅更多资料。 RabbitMQ是一个由Erlang语言开发的AMQP的开源实现。AMQP即Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。 在Celery+RabbitMQ组合中,RabbitMQ作为一个消息队列管理工具被引入到和Celery集成,负责处理服务器之间的通信任务。 那么有一个疑问:RabbitMQ作为消息管理系统已经可以实现异步的发送消息,为什么还要使用Celery? Celery相当于包装了一个现成的系统,可以方便的在项目中操作RabbitMQ这个消息队列介质,减少在RabbitMQ上编写脚本的任务。最直接的例子就是在Celery Python里,只需要config一下settings,然后就可以用decorator轻松使用消息队列,而不用在RabbitMQ上编写复杂的脚本。 当然,Celery也支持和Redis、MongoDB之类的组合,原因是RabbitMQ尽管足够强大,但对于一些相对简单的业务环境来说可能太多(复杂)了一些。 # Celery+RabbitMQ是如何工作的? 关于Celery和RabbitMQ的协作方式,可以通过工作上的一些案例来说明: 假设A公司最近在开下年度工作会议,会议上要确定下一年的工作内容和计划,参会人员有老板(下发任务者)、部门主管(Celery分配任务者)、部门员工(工作者)、老板秘书(沟通协调者RabbitMQ)。 那么这场会议首先需要确定的是下一年的具体工作内容,这里就称之为“任务内容”。比如老板说我们下一年要开发出一款社交类APP产品,部门主管表示赞同,于是便愉快地定下了具体的工作任务(task),当然开发一款社交类APP产品是这个项目的总任务,其中可以细分成很多小的任务,比如业务流程是怎么样的?界面怎么设计等。 在确定了具体工作任务后,老板便把这个项目交给了部门主管(Celery),部门主管确定部门员工中谁去完成这项任务,于是指定某个人(Worker),也可以多个人。  发布工作任务的人是老板(下发任务者),他指定了部门主管(Celery)什么时候去完成哪些任务,并要求获取反馈信息。但有一点需要注意,老板只管布置任务,不参与具体的任务分配,这个任务分配的工作是交给部门主管(Celery)去执行。  项目之初,老板(下发任务者)通过公司会议将任务传递给部门主管(Celery),部门主管通过部门会议将任务分配给员工(Worker),过段时间再将任务结果反馈给老板。然而随着任务越来越多,部门主管发现任务太多,每个任务都要反馈结果,记不住,也容易弄乱,导致效率下降。  在召开会议商量了一番后,老板秘书(沟通协调者RabbitMQ)站起来说:“我有个提议,老板每天将布置的任务写成一张纸条放到我这,然后部门主管每天早上来取并交给员工,至于纸条上的任务如何分配,部门主管决定就行,但是要将结果同样写一张纸条反馈给我,我再交给老板。这样老板只负责下发任务,我只负责保管任务纸条,部门主管只负责分配任务并获取反馈,员工只负责按任务工作。大家职责都很明确,效率肯定会更高。”至此,老板与员工的沟通问题也解决了。 ![](https://box.kancloud.cn/b96f66f466c29b8ac469df328f0dfade_658x353.png) # Celery介绍和基本使用 Celery是一个基于Python开发的分布式异步消息任务队列,它简单、灵活、可靠,是一个专注于实时处理的任务队列,同时也支持任务调度。通过它可以轻松的实现任务的异步处理,如果你的业务场景中需要用到异步任务,就可以考虑使用Celery。举几个适用场景: 1)可以在 Request-Response 循环之外执行的操作:发送邮件、推送消息。 2)耗时的操作:调用第三方 API、视频处理(前端通过 AJAX 展示进度和结果)。 3)周期性任务:取代 crontab。 ## Celery有以下几个优点 简单:一旦熟悉了Celery的工作流程后,配置和使用是比较简单的。 高可用:当任务执行失败或执行过程中发生连接中断,Celery 会自动尝试重新执行任务。 快速:一个单进程的Celery每分钟可处理上百万个任务。 灵活: Celery的大部分组件都可以被扩展及自定制。 # 选择Broker Celery的基本架构和工作流程如下图 ![](https://box.kancloud.cn/5d8cf93818f7bc7ad2c04306c4221354_1000x115.png) 常用的Broker有RabbitMQ、Redis、数据库等,我们这里使用的是RabbitMQ ![](https://box.kancloud.cn/155ab4d48f8625d34409f0f6a7e8da72_1000x768.png) # Celery安装使用 Celery是一个Python的应用,而且已经上传到了PyPi,所以可以使用pip或easy_install安装: ``` pip install celery pip install eventlet ``` # 创建Application和Task Celery的默认broker是RabbitMQ,仅需配置一行就可以: ``` broker_url = 'amqp://guest:guest@localhost:5672//' ``` 创建一个Celery Application用来定义任务列表。 实例化一个Celery对象app,然后通过@app.task 装饰器注册一个 task。任务文件就叫tasks.py: ``` from celery import Celery app = Celery(__name__, broker='amqp://guest:guest@localhost:5672//') @app.task def add(x, y):            return x + y ``` # 运行 worker,启动Celery Worker来开始监听并执行任务 在 tasks.py 文件所在目录运行 ``` celery worker -A tasks.app -l INFO -P eventlet ``` 这个命令会开启一个在前台运行的 worker,解释这个命令的意义: worker: 运行 worker 模块。 -A: –app=APP, 指定使用的 Celery 实例。 -l: –loglevel=INFO, 指定日志级别,可选:DEBUG, INFO, WARNING, ERROR, CRITICAL, FATAL 其它常用的选项: -P: –pool=prefork, 并发模型,可选:prefork (默认,multiprocessing), eventlet, gevent, threads. -c: –concurrency=10, 并发级别,prefork 模型下就是子进程数量,默认等于 CPU 核心数 完整的命令行选项可以这样查看: ``` celery worker --help ``` # 调用Task 再打开一个终端, 进行命令行模式,调用任务 ``` from tasks import add add.delay(1,2) add.apply_async(args=(1,2)) ``` 上面两种调用方式等价,delay() 方法是 apply_async() 方法的简写。这个调用会把 add 操作放入到队列里,然后立即返回一个 AsyncResult 对象。如果关心处理结果,需要给 app 配置 CELERY_RESULT_BACKEND,指定一个存储后端保存任务的返回值。 # 在项目中的简单使用流程 1)RabbitMQ所在服务器,启动crontab设置 crontable -user user -e设置定时执行celery application应用。 ``` python tasks.py day ``` 在task.py文件里面启动一个叫做app的Celery Application,编写一个app.task函数来produce 任务到rabbitmq。 ``` app = Celery() app.config_from_object(celeryconfig) ``` 在每个worker里面通过命令启动worker消费任务 ``` celery worker -A tasks.app -l INFO -P eventlet ```