多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
# AMQP异步任务系统 需要了解的知识。 * [Process](Process.md) * [RabbitMQ](http://www.rabbitmq.com/documentation.html) 本系统由SD框架和RabbitMQ搭建。 ## 创建异步作业进程 通过继承AMQPTaskProcess,我们来创建一个异步任务作业的进程类。 ``` class MyAMQPTaskProcess extends AMQPTaskProcess { public function start($process) { parent::start($process); $this->createDirectConsume('msgs'); } /** * 路由消息返回class名称 * @param $body * @return string */ protected function route($body) { return TestAMQPTask::class; } } ``` 通过createDirectConsume函数可以快速创建一个消费队列。 * createDirectConsume ``` function createDirectConsume($queue, $prefetch_count = 2, $global = false, $exchange = null, $consumerTag = null) ``` 一般情况我们只需要设置queue和prefetch_count这俩个参数。 queue为消费队列的名称,prefetch_count=2代表这个队列只能被这个进程同时消费2次,直到消费成功或者失败,简单的来说并发为2。 global参数代表这个并发是针对队列还是进程的。false是针对队列,true代表是进程。 我们可以多次调用createDirectConsume来消费不同的队列。 * route route路由的作用,$body是消费得到的值,这个函数需要返回一个class名。 ## 创建作业任务 创建类继承AMQPTask。 ``` class TestAMQPTask extends AMQPTask { /** * @var TestModel */ public $TestModel; public function initialization(AMQPMessage $message) { parent::initialization($message); $this->TestModel = $this->loader->model(TestModel::class, $this); } /** * handle * @param $body */ public function handle($body) { var_dump($body); $this->ack(); } } ``` * initialization 和Model,Controller一样用于初始化,或者进行loader * handle 处理任务,处理任务一定需要调用ack或者是reject。 * ack 任务处理完毕 * reject ``` function reject($requeue = true) ``` 任务被拒绝,requeue=true代表这个任务回到队列,false代表任务被抛弃。 ## 创建用户进程 在AppServer中创建进程 ``` /** * 用户进程 */ public function startProcess() { parent::startProcess(); for ($i=0;$i<5;$i++) { ProcessManager::getInstance()->addProcess(MyAMQPTaskProcess::class,true,$i); } } ``` 这样我们创建了5个异步任务进程。 # 注意 1. 消费队列必须存在,不然会报错 2. 一定在handle处理结束后调用ack或者reject 3. AMQPTask的initialization和handle均支持协程