# 依赖
redis-server
redis php扩展
> 参考[此教程](http://www.yiibai.com/redis/)
# 安装
## composer
最好通过composer安装 `composer require topthink/think-queue 1.1.4`
## 手动安装
去 [https://github.com/top-think/think-queue](https://github.com/top-think/think-queue) 里
把包 下载下来 放入 项目的Vendor里
然后在 common.php 公共函数库里 加入这么一句:
`require './vendor/topthink/think-queue/src/common.php';`
当你在命令行里 切换到项目根目录后, 执行 `php think queue:work -h`
能出现以下 结果 就表示think-queue的 安装好了
![](https://box.kancloud.cn/5ea49d4cb20cd0936e84d1cf6aa3ec49_1482x460.png)
# 配置
> 配置文件位于 `application/extra/queue.php`
## 公共配置
~~~
[
'connector'=>'sync' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动,topthink:Topthink驱动
//或其他自定义的完整的类名
]
~~~
## 驱动配置
> 各个驱动的具体可用配置项在`think\queue\connector`目录下各个驱动类里的`options`属性中,写在上面的`queue`配置里即可覆盖
这里我们只需记住 redis 和sync类型的就好了,因为database是官方不推荐的,topthink 完全不开放。
下面是我的配置:
~~~
return [
'connector'=>'redis',
// 'connector'=>'sync',
'expire' => 0,
'default' => 'default',
'host' => '127.0.0.1',
'port' => 6379,
'password' => '',
'select' => 0,
'timeout' => 0,
'persistent' => false
];
~~~
# 使用
队列的特点是 先进先出,本质是将同步的调用,转成别的进程里的 异步任务。
think-queue 是基于cli 的php任务管理。
## 入队
首先,入队是queue的 push 和delay 两个方法。但是 两个方法的名称和参数不一样。因此,我的建议是 再包一层,封装成一个方法。
~~~
/**
* 添加队列任务
*
* @param string $job_name 队列执行的类路径 不带走类fire方法 带@方法 走类@的方法
* @param array $data 传入数据
* @param mixed $queue_name 队列名 null 或字符串
* @param integer $delay 延迟执行的时间 单位秒
* @return void
*/
public function push_job($job_name, $data, $queue_name = null, $delay = 0){
trace($queue_name);
config('default_return_type', 'json');
$class_name = \strstr($job_name, '@', true);
if(class_exists($class_name)){
if($delay > 0){
$ret = \think\Queue::later($delay, $job_name, $data, $queue_name);
}else{
trace($job_name);
$ret = \think\Queue::push($job_name, $data, $queue_name);
}
trace(sprintf("加入任务%s, 时间%s", $job_name, datetime()));
return $ret;
}
return $this->error('job类 '.$job_name.'不存在');
}
~~~
think-queue 支持普通队列和延迟队列两种,分别对应push 和delay。
然后 这两个方法都有共同参数 $job, $data, $queue。job是队列入队的任务执行路径,比如’A\B@c’
如果队列任务类只有一个方法需要队列执行,请定义为fire, $job里只要指向类路径就可以了。如果有多个 $job里 要@后接方法名。
> 如果是push的话返回的是一个队列任务的id ,延迟的话返回null
## 执行队列任务
队列任务 类 ,最好以job为后缀,放入相关job目录里。如果就一个任务,将队列执行方法 直接命名为fire, 多个子任务的话,方法可以定义多个。在入队时 用@指定消费的任务+方法>
且传递过来的 参数基本 `Job $job, $data`。
举例:
~~~
public function test(Job $job, $data){
// trace("正在执行队列:{$job->getName()}");
// $time = rand(0, 4);
// sleep($time);
$isJobDone = $this->send($data);
trace($data);
if ($isJobDone) {
//如果任务执行成功, 记得删除任务
$job->delete();
trace("队列执行完成:".__FUNCTION__." ". datetime());
} else {
$try_nums = $job->attempts();
trace("报警内容:".__FUNCTION__.",报警失败{$try_nums }次, 执行时间:". datetime());
// $job->release();
// $job->delete();
}
}
~~~
队列的数据data 用户自己定义,通常是一些查询需要的条件,或者第三方服务需要的参数。
这里执行的时候注意一定不要抛异常,容易把队列进程搞挂了。
## 出队
当队列任务执行完毕,最好将任务删除掉。不然堆积在队列里,不断重试。
`$job->delete()`
## 再入队
`$job->release()`
当默认开启队列 最大重试次数参数时, 如果一直失败 一直release 的话他会无限重试。
所以最好设置一个最大重试次数。
## 整个任务失败
当任务重试超过最大尝试次数后, 最后会执行当前任务类的 failed方法 但是参数只有$data, 当一个类里如果有多个子任务的话 不好区分哪个任务。
### 失败事件
首先,我们添加 `queue_failed` 事件标签, 及其对应的回调方法
~~~
// 文件路径: \application\tags.php
// 应用行为扩展定义文件
return [
// 应用初始化
'app_init' => [],
// 应用开始
'app_begin' => [],
// 模块初始化
'module_init' => [],
// 操作开始执行
'action_begin' => [],
// 视图内容过滤
'view_filter' => [],
// 日志写入
'log_write' => [],
// 应用结束
'app_end' => [],
// 任务失败统一回调,有四种定义方式
'queue.failed'=> [
// 数组形式,[ 'ClassName' , 'methodName']
['application\\behavior\\MyQueueFailedLogger', 'logAllFailedQueues']
// 字符串(静态方法),'StaicClassName::methodName'
// 'MyQueueFailedLogger::logAllFailedQueues'
// 字符串(对象方法),'ClassName',此时需在对应的ClassName类中添加一个名为 queueFailed 的方法
// 'application\\behavior\\MyQueueFailedLogger'
// 闭包形式
/*
function( &$jobObject , $extra){
// var_dump($jobObject);
return true;
}
*/
]
];
~~~
这里,我们选择数组形式的回调方式,新增 `\application\behavior\MyQueueFailedLogger` 类,添加一个 `logAllFailedQueues()` 方法
~~~
<?php
/**
* 文件路径: \application\behavior\MyQueueFailedLogger.php
* 这是一个行为类,用于处理所有的消息队列中的任务失败回调
*/
namespace application\behavior;
class MyQueueFailedLogger {
const should_run_hook_callback = true;
/**
* @param $jobObject \think\queue\Job //任务对象,保存了该任务的执行情况和业务数据
* @return bool true //是否需要删除任务并触发其failed() 方法
*/
public function logAllFailedQueues(&$jobObject){
$failedJobLog = [
'jobHandlerClassName' => $jobObject->getName(), // 'application\index\job\Hello'
'queueName' => $jobObject->getQueue(), // 'helloJobQueue'
'jobData' => $jobObject->getRawBody()['data'], // '{'a': 1 }'
'attempts' => $jobObject->attempts(), // 3
];
var_export(json_encode($failedJobLog,true));
// $jobObject->release(); //重发任务
//$jobObject->delete(); //删除任务
//$jobObject->failed(); //通知消费者类任务执行失败
return self::should_run_hook_callback;
}
}
~~~
需要注意该回调方法的返回值:
* 返回 true 时,系统会自动删除该任务,并且自动调用消费者类中的 `failed()` 方法
* 返回 false 时,系统不会自动删除该任务,也不会自动调用消费者类中的 `failed()` 方法,需要开发者另行处理失败任务的删除和通知。
最后,在消费者类中,添加 `failed()` 方法
~~~
/**
* 文件路径: \application\index\job\HelloJob.php
*/
/**
* 该方法用于接收任务执行失败的通知,你可以发送邮件给相应的负责人员
* @param $jobData string|array|... //发布任务时传递的 jobData 数据
*/
public function failed($jobData){
send_mail_to_somebody() ;
print("Warning: Job failed after max retries. job data is :".var_export($data,true)."\n";
}
~~~
这样,就可以做到任务失败的记录与告警
## 启动队列监听服务
* Work 模式
~~~
php think queue:work \
--daemon //是否循环执行,如果不加该参数,则该命令处理完下一个消息就退出
--queue helloJobQueue //要处理的队列的名称
--delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
--force \ //系统处于维护状态时是否仍然处理任务,并未找到相关说明
--memory 128 \ //该进程允许使用的内存上限,以 M 为单位
--sleep 3 \ //如果队列中无任务,则sleep多少秒后重新检查(work+daemon模式)或者退出(listen或非daemon模式)
--tries 2 //如果任务已经超过尝试次数上限,则触发‘任务尝试次数超限’事件,默认为0
~~~
* Listen 模式
~~~
php think queue:listen \
--queue helloJobQueue \ //监听的队列的名称
--delay 0 \ //如果本次任务执行抛出异常且任务未被删除时,设置其下次执行前延迟多少秒,默认为0
--memory 128 \ //该进程允许使用的内存上限,以 M 为单位
--sleep 3 \ //如果队列中无任务,则多长时间后重新检查,daemon模式下有效
--tries 0 \ //如果任务已经超过重发次数上限,则进入失败处理逻辑,默认为0
--timeout 60 //创建的work子进程的允许执行的最长时间,以秒为单位
~~~
可以看到 listen 模式下,不包含 `--deamon` 参数,原因下面会说明
# 注意点
## 安全
redis 默认是没密码的,可以去服务器配置 auth password
## 模式区别
两者都可以用于处理消息队列中的任务
区别在于:
* 执行原理不同
* work 命令是单进程的处理模式。
按照是否设置了 `--daemon` 参数,work命令又可分为单次执行和循环执行两种模式。
* 单次执行:不添加 `--daemon`参数,该模式下,work进程在处理完下一个消息后直接结束当前进程。当不存在新消息时,会sleep一段时间然后退出。
* 循环执行:添加了 `--daemon`参数,该模式下,work进程会循环地处理队列中的消息,直到内存超出参数配置才结束进程。当不存在新消息时,会在每次循环中sleep一段时间。
* listen 命令是 父进程 + 子进程 的处理模式。
listen命令所在的父进程会创建一个单次执行模式的work子进程,并通过该work子进程来处理队列中的下一个消息,当这个work子进程退出之后,listen命令所在的父进程会监听到该子进程的退出信号,并重新创建一个新的单次执行的work子进程
* 退出时机不同
* work 命令的退出时机在上面的执行原理部分已叙述,此处不再重复
* listen 命令中,listen所在的父进程正常情况会一直运行,除非遇到下面两种情况:
* 创建的某个work子进程的执行时间超过了 listen命令行中的`--timeout` 参数配置,此时work子进程会被强制结束,listen所在的父进程也会抛出一个 `ProcessTimeoutException` 异常并退出。开发者可以选择捕获该异常,让父进程继续执行,也可以选择通过 supervisor 等监控软件重启一个新的listen命令。
* listen 命令所在的父进程因某种原因存在内存泄露,则当父进程本身占用的内存超过了命令行中的 `--memory` 参数配置时,父子进程均会退出。正常情况下,listen进程本身占用的内存是稳定不变的。
* 性能不同
* work 命令是在脚本内部做循环,框架脚本在命令执行的初期就已加载完毕;
* 而listen模式则是处理完一个任务之后新开一个work进程,此时会重新加载框架脚本。
因此: work 模式的性能会比listen模式高。
注意:当代码有更新时,work 模式下需要手动去执行 `php think queue:restart` 命令重启队列来使改动生效;而listen 模式会自动生效,无需其他操作。
* 超时控制能力
* work 模式本质上既不能控制进程自身的运行时间,也无法限制执行中的任务的执行时间。
举例来说,假如你在某次上线之后,在上文中的 `\application\index\job\Hello.php` 消费者的`fire`方法中添加了一段死循环 :
~~~
public function fire(){
while(true){ //死循环
$consoleOutPut->writeln("<info>I am looping forever inside a job.</info> \n");
sleep(1);
}
}
~~~
那么这个循环将永远不能停止,直到任务所在的进程超过内存限制或者由管理员手动结束。这个过程不会有任何的告警。更严重的是,如果你配置了expire ,那么这个死循环的任务可能会污染到同样处理 `helloJobQueue` 队列的其他work进程,最后好几个work进程将被卡死在这段死循环中。详情后文会说明。
work 模式下的超时控制能力,实际上应该理解为 多个work 进程配合下的过期任务重发能力。
* 而 listen命令可以限制其创建的work子进程的超时时间。
listen 命令可通过 `--timeout` 参数限制work子进程允许运行的最长时间,超过该时间限制仍未结束的子进程会被强制结束;
* 这里有必要补充一下 expire 和 timeout 之间的区别:
* expire 在配置文件中设置,timeout 在 listen命令 的命令行参数中设置,而且,expire 和 timeout 是两个不同层次上的概念:
* expire 是指任务的过期时间。这个时间是全局的,影响到所有的work进程。(不管是独立的work命令还是 listen 模式下创建的的work子进程) 。expire 针对的对象是 任务。
* timeout 是指work子进程的超时时间。这个时间只对当前执行的listen 命令有效。timeout 针对的对象是 work子进程。
* 使用场景不同
根据上面的介绍,可以看到,
work 命令的适用场景是:
* 任务数量较多
* 性能要求较高
* 任务的执行时间较短
* 消费者类中不存在死循环,sleep() ,exit() ,die() 等容易导致bug的逻辑
listen命令的适用场景是:
* 任务数量较少
* 任务的执行时间较长(如生 成大型的excel报表等),
* 任务的执行时间需要有严格限制
## 调试方式
首先,队列的日志在runtime里cli.log 而不是web的log
因此最好的方式 时自定义一个日志统一存储位置。
然后,队列由两部分组成,业务代码和队列部分。
### 辅助终端神器vscode
编辑代码时直接 ctrl+` 打开终端 可以开启多个终端 添加多个队列
### 业务测试
最方便的方式是本地将队列改为同步, 将队列的任务从推入到消费。都给测试一便。
确保队列业务逻辑部分没有异常。
### 队列的测试
主要测试 业务执行失败后,会不会无限循环入队列,队列失败后,有没有进入报警和写入错误日志。
还有队列进程的内存,是否会挂掉。
## 思考
队列传送数据格式的定义?
什么时候该用队列重试,什么时候直接报警让技术查问题? 是不是应该在告警时留有手动补救执行队列的方式的url?
延迟队列是不是 sleep设为0 才精确
大队列数据时 是不是多几个进程能加速?
参考文档:
[think-queue 笔记](https://github.com/coolseven/notes/tree/master/thinkphp-queue)
[官方文档](https://github.com/top-think/think-queue)