🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
# 依赖 redis-server redis php扩展 > 参考[此教程](http://www.yiibai.com/redis/) # 安装 ## composer 最好通过composer安装 `composer require topthink/think-queue 1.1.4` ## 手动安装 去 [https://github.com/top-think/think-queue](https://github.com/top-think/think-queue) 里 把包 下载下来 放入 项目的Vendor里 然后在 common.php 公共函数库里 加入这么一句: `require './vendor/topthink/think-queue/src/common.php';` 当你在命令行里 切换到项目根目录后, 执行 `php think queue:work -h` 能出现以下 结果 就表示think-queue的 安装好了 ![](https://box.kancloud.cn/5ea49d4cb20cd0936e84d1cf6aa3ec49_1482x460.png) # 配置 > 配置文件位于 `application/extra/queue.php` ## 公共配置 ~~~ [ 'connector'=>'sync' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动,topthink:Topthink驱动 //或其他自定义的完整的类名 ] ~~~ ## 驱动配置 > 各个驱动的具体可用配置项在`think\queue\connector`目录下各个驱动类里的`options`属性中,写在上面的`queue`配置里即可覆盖 这里我们只需记住 redis 和sync类型的就好了,因为database是官方不推荐的,topthink 完全不开放。 下面是我的配置: ~~~ return [ 'connector'=>'redis', // 'connector'=>'sync', 'expire' => 0, 'default' => 'default', 'host' => '127.0.0.1', 'port' => 6379, 'password' => '', 'select' => 0, 'timeout' => 0, 'persistent' => false ]; ~~~ # 使用 队列的特点是 先进先出,本质是将同步的调用,转成别的进程里的 异步任务。 think-queue 是基于cli 的php任务管理。 ## 入队 首先,入队是queue的 push 和delay 两个方法。但是 两个方法的名称和参数不一样。因此,我的建议是 再包一层,封装成一个方法。 ~~~ /** * 添加队列任务 * * @param string $job_name 队列执行的类路径 不带走类fire方法 带@方法 走类@的方法 * @param array $data 传入数据 * @param mixed $queue_name 队列名 null 或字符串 * @param integer $delay 延迟执行的时间 单位秒 * @return void */ public function push_job($job_name, $data, $queue_name = null, $delay = 0){ trace($queue_name); config('default_return_type', 'json'); $class_name = \strstr($job_name, '@', true); if(class_exists($class_name)){ if($delay > 0){ $ret = \think\Queue::later($delay, $job_name, $data, $queue_name); }else{ trace($job_name); $ret = \think\Queue::push($job_name, $data, $queue_name); } trace(sprintf("加入任务%s, 时间%s", $job_name, datetime())); return $ret; } return $this->error('job类 '.$job_name.'不存在'); } ~~~ think-queue 支持普通队列和延迟队列两种,分别对应push 和delay。 然后 这两个方法都有共同参数 $job, $data, $queue。job是队列入队的任务执行路径,比如’A\B@c’ 如果队列任务类只有一个方法需要队列执行,请定义为fire, $job里只要指向类路径就可以了。如果有多个 $job里 要@后接方法名。 > 如果是push的话返回的是一个队列任务的id ,延迟的话返回null ## 执行队列任务 队列任务 类 ,最好以job为后缀,放入相关job目录里。如果就一个任务,将队列执行方法 直接命名为fire, 多个子任务的话,方法可以定义多个。在入队时 用@指定消费的任务+方法> 且传递过来的 参数基本 `Job $job, $data`。 举例: ~~~ public function test(Job $job, $data){ // trace("正在执行队列:{$job->getName()}"); // $time = rand(0, 4); // sleep($time); $isJobDone = $this->send($data); trace($data); if ($isJobDone) { //如果任务执行成功, 记得删除任务 $job->delete(); trace("队列执行完成:".__FUNCTION__." ". datetime()); } else { $try_nums = $job->attempts(); trace("报警内容:".__FUNCTION__.",报警失败{$try_nums }次, 执行时间:". datetime()); // $job->release(); // $job->delete(); } } ~~~ 队列的数据data 用户自己定义,通常是一些查询需要的条件,或者第三方服务需要的参数。 这里执行的时候注意一定不要抛异常,容易把队列进程搞挂了。 ## 出队 当队列任务执行完毕,最好将任务删除掉。不然堆积在队列里,不断重试。 `$job->delete()` ## 再入队 `$job->release()` 当默认开启队列 最大重试次数参数时, 如果一直失败 一直release 的话他会无限重试。 所以最好设置一个最大重试次数。 ## 整个任务失败 当任务重试超过最大尝试次数后, 最后会执行当前任务类的 failed方法 但是参数只有$data, 当一个类里如果有多个子任务的话 不好区分哪个任务。 ### 失败事件 首先,我们添加 `queue_failed` 事件标签, 及其对应的回调方法 ~~~ // 文件路径: \application\tags.php // 应用行为扩展定义文件 return [ // 应用初始化 'app_init' => [], // 应用开始 'app_begin' => [], // 模块初始化 'module_init' => [], // 操作开始执行 'action_begin' => [], // 视图内容过滤 'view_filter' => [], // 日志写入 'log_write' => [], // 应用结束 'app_end' => [], // 任务失败统一回调,有四种定义方式 'queue.failed'=> [ // 数组形式,[ 'ClassName' , 'methodName'] ['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues'] // 字符串(静态方法),'StaicClassName::methodName' // 'MyQueueFailedLogger::logAllFailedQueues' // 字符串(对象方法),'ClassName',此时需在对应的ClassName类中添加一个名为 queueFailed 的方法 // 'application\\behavior\\MyQueueFailedLogger' // 闭包形式 /* function( &$jobObject , $extra){ // var_dump($jobObject); return true; } */ ] ]; ~~~ 这里,我们选择数组形式的回调方式,新增 `\application\behavior\MyQueueFailedLogger` 类,添加一个 `logAllFailedQueues()` 方法 ~~~ <?php /** * 文件路径: \application\behavior\MyQueueFailedLogger.php * 这是一个行为类,用于处理所有的消息队列中的任务失败回调 */ namespace application\behavior; class MyQueueFailedLogger { const should_run_hook_callback = true; /** * @param $jobObject \think\queue\Job //任务对象,保存了该任务的执行情况和业务数据 * @return bool true //是否需要删除任务并触发其failed() 方法 */ public function logAllFailedQueues(&$jobObject){ $failedJobLog = [ 'jobHandlerClassName' => $jobObject->getName(), // 'application\index\job\Hello' 'queueName' => $jobObject->getQueue(), // 'helloJobQueue' 'jobData' => $jobObject->getRawBody()['data'], // '{'a': 1 }' 'attempts' => $jobObject->attempts(), // 3 ]; var_export(json_encode($failedJobLog,true)); // $jobObject->release(); //重发任务 //$jobObject->delete(); //删除任务 //$jobObject->failed(); //通知消费者类任务执行失败 return self::should_run_hook_callback; } } ~~~ 需要注意该回调方法的返回值: * 返回 true 时,系统会自动删除该任务,并且自动调用消费者类中的 `failed()` 方法 * 返回 false 时,系统不会自动删除该任务,也不会自动调用消费者类中的 `failed()` 方法,需要开发者另行处理失败任务的删除和通知。 最后,在消费者类中,添加 `failed()` 方法 ~~~ /** * 文件路径: \application\index\job\HelloJob.php */ /** * 该方法用于接收任务执行失败的通知,你可以发送邮件给相应的负责人员 * @param $jobData string|array|... //发布任务时传递的 jobData 数据 */ public function failed($jobData){ send_mail_to_somebody() ; print("Warning: Job failed after max retries. job data is :".var_export($data,true)."\n"; } ~~~ 这样,就可以做到任务失败的记录与告警 ## 启动队列监听服务 * Work 模式 ~~~ php think queue:work \ --daemon //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出 --queue helloJobQueue //要处理的队列的名称 --delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0 --force \ //系统处于维护状态时是否仍然处理任务,并未找到相关说明 --memory 128 \ //该进程允许使用的内存上限,以 M 为单位 --sleep 3 \ //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式) --tries 2 //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0 ~~~ * Listen 模式 ~~~ php think queue:listen \ --queue helloJobQueue \ //监听的队列的名称 --delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0 --memory 128 \ //该进程允许使用的内存上限,以 M 为单位 --sleep 3 \ //如果队列中无任务,则多长时间后重新检查,daemon模式下有效 --tries 0 \ //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0 --timeout 60 //创建的work子进程的允许执行的最长时间,以秒为单位 ~~~ 可以看到 listen 模式下,不包含 `--deamon` 参数,原因下面会说明 # 注意点 ## 安全 redis 默认是没密码的,可以去服务器配置 auth password ## 模式区别 两者都可以用于处理消息队列中的任务 区别在于: * 执行原理不同 * work 命令是单进程的处理模式。 按照是否设置了 `--daemon` 参数,work命令又可分为单次执行和循环执行两种模式。 * 单次执行:不添加 `--daemon`参数,该模式下,work进程在处理完下一个消息后直接结束当前进程。当不存在新消息时,会sleep一段时间然后退出。 * 循环执行:添加了 `--daemon`参数,该模式下,work进程会循环地处理队列中的消息,直到内存超出参数配置才结束进程。当不存在新消息时,会在每次循环中sleep一段时间。 * listen 命令是 父进程 + 子进程 的处理模式。 listen命令所在的父进程会创建一个单次执行模式的work子进程,并通过该work子进程来处理队列中的下一个消息,当这个work子进程退出之后,listen命令所在的父进程会监听到该子进程的退出信号,并重新创建一个新的单次执行的work子进程 * 退出时机不同 * work 命令的退出时机在上面的执行原理部分已叙述,此处不再重复 * listen 命令中,listen所在的父进程正常情况会一直运行,除非遇到下面两种情况: * 创建的某个work子进程的执行时间超过了 listen命令行中的`--timeout` 参数配置,此时work子进程会被强制结束,listen所在的父进程也会抛出一个 `ProcessTimeoutException` 异常并退出。开发者可以选择捕获该异常,让父进程继续执行,也可以选择通过 supervisor 等监控软件重启一个新的listen命令。 * listen 命令所在的父进程因某种原因存在内存泄露,则当父进程本身占用的内存超过了命令行中的 `--memory` 参数配置时,父子进程均会退出。正常情况下,listen进程本身占用的内存是稳定不变的。 * 性能不同 * work 命令是在脚本内部做循环,框架脚本在命令执行的初期就已加载完毕; * 而listen模式则是处理完一个任务之后新开一个work进程,此时会重新加载框架脚本。 因此: work 模式的性能会比listen模式高。 注意:当代码有更新时,work 模式下需要手动去执行 `php think queue:restart` 命令重启队列来使改动生效;而listen 模式会自动生效,无需其他操作。 * 超时控制能力 * work 模式本质上既不能控制进程自身的运行时间,也无法限制执行中的任务的执行时间。 举例来说,假如你在某次上线之后,在上文中的 `\application\index\job\Hello.php` 消费者的`fire`方法中添加了一段死循环 : ~~~ public function fire(){ while(true){ //死循环 $consoleOutPut->writeln("<info>I am looping forever inside a job.</info> \n"); sleep(1); } } ~~~ 那么这个循环将永远不能停止,直到任务所在的进程超过内存限制或者由管理员手动结束。这个过程不会有任何的告警。更严重的是,如果你配置了expire ,那么这个死循环的任务可能会污染到同样处理 `helloJobQueue` 队列的其他work进程,最后好几个work进程将被卡死在这段死循环中。详情后文会说明。 work 模式下的超时控制能力,实际上应该理解为 多个work 进程配合下的过期任务重发能力。 * 而 listen命令可以限制其创建的work子进程的超时时间。 listen 命令可通过 `--timeout` 参数限制work子进程允许运行的最长时间,超过该时间限制仍未结束的子进程会被强制结束; * 这里有必要补充一下 expire 和 timeout 之间的区别: * expire 在配置文件中设置,timeout 在 listen命令 的命令行参数中设置,而且,expire 和 timeout 是两个不同层次上的概念: * expire 是指任务的过期时间。这个时间是全局的,影响到所有的work进程。(不管是独立的work命令还是 listen 模式下创建的的work子进程) 。expire 针对的对象是 任务。 * timeout 是指work子进程的超时时间。这个时间只对当前执行的listen 命令有效。timeout 针对的对象是 work子进程。 * 使用场景不同 根据上面的介绍,可以看到, work 命令的适用场景是: * 任务数量较多 * 性能要求较高 * 任务的执行时间较短 * 消费者类中不存在死循环,sleep() ,exit() ,die() 等容易导致bug的逻辑 listen命令的适用场景是: * 任务数量较少 * 任务的执行时间较长(如生 成大型的excel报表等), * 任务的执行时间需要有严格限制 ## 调试方式 首先,队列的日志在runtime里cli.log 而不是web的log 因此最好的方式 时自定义一个日志统一存储位置。 然后,队列由两部分组成,业务代码和队列部分。 ### 辅助终端神器vscode 编辑代码时直接 ctrl+` 打开终端 可以开启多个终端 添加多个队列 ### 业务测试 最方便的方式是本地将队列改为同步, 将队列的任务从推入到消费。都给测试一便。 确保队列业务逻辑部分没有异常。 ### 队列的测试 主要测试 业务执行失败后,会不会无限循环入队列,队列失败后,有没有进入报警和写入错误日志。 还有队列进程的内存,是否会挂掉。 ## 思考 队列传送数据格式的定义? 什么时候该用队列重试,什么时候直接报警让技术查问题? 是不是应该在告警时留有手动补救执行队列的方式的url? 延迟队列是不是 sleep设为0 才精确 大队列数据时 是不是多几个进程能加速? 参考文档: [think-queue 笔记](https://github.com/coolseven/notes/tree/master/thinkphp-queue) [官方文档](https://github.com/top-think/think-queue)