[TOC]
# 队列服务配置
在配置文件queue.php的connections中已经默认定义了redis的连接:
~~~
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'expire' => 60,
],
~~~
在.env环境配置文件中把默认的队列驱动改成redis:
~~~
QUEUE_DRIVER=redis
~~~
为了避免配置缓存的影响,执行以下命令清除并重建配置缓存:
~~~
php artisan config:cache
~~~
# 新建Queueable Jobs
使用命令:
~~~
php artisan make:job MyJob
~~~
新建一个名为MyJob的队列处理类,在App/Jobs目录下自动生成一个MyJob.php文件。
MyJob.php需要实现handle方法,用来具体执行队列任务,构造函数可以用来传递需要的参数,handle方法支持依赖注入。
这里handle方法随便写了一个,就是往一个list类型数据结构中存一个key-value数据,测试消费队列的时候有没有起作用,构造函数传两个参数就是key和value.
~~~
<?php
namespace App\Jobs;
use App\Jobs\Job;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Support\Facades\Redis;
class MyJob extends Job implements ShouldQueue
{
use InteractsWithQueue, SerializesModels;
private $key;
private $value;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($key, $value)
{
$this->key = $key;
$this->value = $value;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
Redis::hset('queue.test', $this->key, $this->value);
}
public function failed()
{
dump('failed');
}
}
~~~
在控制器中使用dispatch方法调用队列,将任务放入队列中,控制器中写个方法如public function test(),new MyJob的构造函数传两个随机生成的字符串参数:
~~~
$queueId = $this->dispatch(new MyJob('key_'.str_random(4), str_random(10)));
dd($queueId);
~~~
也可以指定分发
~~~
//发送消息
public function SendMessage(Request $request){
...
$this->dispatch((new SendMessage($sendParams))->onQueue('snail:SendMessage'));
}
~~~
## 内部实现
![](https://box.kancloud.cn/3d35b6574ed12d38784a85b45547b25b_1386x1230.png)
配置并访问路由,可以多访问几次,然后到Redis中查看,会发现把队列存入了一个queue::queue:default的List结构中
其中value内容如下,这是转换成json格式的,而实际上是经过序列号的字符串:
~~~
{
"job": "Illuminate\\Queue\\CallQueuedHandler@call",
"data": {
"command": "O:14:\"App\\Jobs\\MyJob\":6:{s:19:\"\u0000App\\Jobs\\MyJob\u0000key\";i:1;s:21:\"\u0000App\\Jobs\\MyJob\u0000value\";i:2;s:10:\"connection\";N;s:5:\"queue\";N;s:5:\"delay\";N;s:6:\"\u0000*\u0000job\";N;}"
},
"id": "EV2bhqUlx0T8pRCVHw1qT0fkP8AQcyI8",
"attempts": 1
}
~~~
这里data参数里包含了队列服务Job的名称,构造函数的参数等信息,消费者执行任务的依据。attempts表示重试的次数,往往执行队列任务失败了会重试,可以设置最多尝试次数
# 消费队列
这个时候任务只是入了队列,但并没有消费,执行:
~~~
php artisan queue:listen
~~~
这个命令,Laravel就开始消费队列。
可以看到这几个任务以此被消费,再去Redis看看有没有实现预期要达到的效果,每个任务往一个List类型的结构写入数据
也可以这样
~~~
home/niuyufu/php/bin/php /home/niuyufu/webroot/snail_api/artisan queue:work --queue=snail:SendMessage --tries=3 --memory=512 --daemon
~~~
## 内部实现
![](https://box.kancloud.cn/162b10a96e37f094429b09d7415b8167_1414x1298.png)
队列消息分析
监控redis对应队列消息具体产生的消息操作如下
~~~
tail -f | redis-cli -h 10.94.120.13 -p 6380 monitor | grep "queues:snail"
1492446053.406282 [0 10.95.117.155:57132] "WATCH" "queues:snail:SendMessage:delayed"
1492446053.406452 [0 10.95.117.155:57132] "ZRANGEBYSCORE" "queues:snail:SendMessage:delayed" "-inf" "1492446053"
1492446053.406754 [0 10.95.117.155:57132] "WATCH" "queues:snail:SendMessage:reserved"
1492446053.406842 [0 10.95.117.155:57132] "ZRANGEBYSCORE" "queues:snail:SendMessage:reserved" "-inf" "1492446053"
1492446053.407029 [0 10.95.117.155:57132] "LPOP" "queues:snail:SendMessage"
1492446053.407700 [0 10.95.117.155:57132] "ZADD" "queues:snail:SendMessage:reserved" "1492446113" "{job}"
1492446053.463953 [0 10.95.117.155:57132] "ZREM" "queues:snail:SendMessage:reserved" "{job}"
~~~
PS如果你的redis是codis的话注意了因为codis禁用方法列表
~~~
KEYS, MOVE, OBJECT, RENAME, RENAMENX, SORT, SCAN, BITOP,MSETNX, BLPOP, BRPOP, BRPOPLPUSH, PSUBSCRIBEPUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, DISCARD, EXEC, MULTI, UNWATCH, WATCH, SCRIPT EXISTS, SCRIPT FLUSH, SCRIPT KILL, SCRIPT LOAD, AUTH, ECHO, SELECT, BGREWRITEAOF, BGSAVE, CLIENT KILL, CLIENT LIST, CONFIG GET, CONFIG SET, CONFIG RESETSTAT, DBSIZE, DEBUG OBJECT, DEBUG SEGFAULT, FLUSHALL, FLUSHDB, INFO, LASTSAVE, MONITOR, SAVE, SHUTDOWN, SLAVEOF, SLOWLOG, SYNC, TIME
~~~
所以执行消费任务会有以下错误
~~~
[Predis\Connection\ConnectionException]
Error while reading line from the server. [tcp://100.90.154.39:3000]
~~~
解决方法为修改config/queue.php
~~~
'redis' => [
'driver' => 'redis',
'connection' => 'default',
'queue' => 'default',
'expire' => null, //禁用即可
],
~~~
# 优化日志处理
如果你的系统有切割文件日志操作。会发现虽然日志被切分了但程序却没有往新文件里写入。如2017-04-18 13:50启动的项目日志会一直打到 snail.log.2017041813上。
改进方案
app/Jobs/Job.php文件中添加如下方法
~~~
public function releaseLoggerFile(){
$handles=\Log::getMonolog()->getHandlers();
if(!is_array($handles) || empty($handles)){
return;
}
foreach($handles as $handle){
if(method_exists($handle, "close")){
$handle->close();
}
}
return;
}
~~~
在具体job实现类中的handle方法结尾添加
~~~
public function handle()
{
...
$this->releaseLoggerFile(); //释放log文件
}
~~~
# 线上部署
创建任务shell
~~~
#!/bin/sh
day=$(date +'%y%m%d')
now=$(date +"%F %T")
php_command="/home/niuyufu/php/bin/php"
command="/home/niuyufu/webroot/snail_api/artisan"
log="/home/niuyufu/webroot/log/wave"
queue_name_arr=("snail:SendMessage" "snail:SendMessageToApp")
case $1 in
"run")
for queue_name in ${queue_name_arr[*]}
do
count=$(ps aux | grep artisan |grep queue=${queue_name} | grep -v grep | wc -l)
if [ ${count} -lt 1 ];then
echo "${now}:${queue_name} process has exit ${count}\n";
nohup $php_command $command queue:work --queue=${queue_name} --tries=3 --memory=512 --daemon >> $log/queueMonitor.log 2>&1 &
else
echo "${now}:${queue_name} process is runing";
fi
done
;;
"stop")
kill -9 $(ps -ef | grep "queue:work" | grep -v grep | awk '{print $2}' | tr -s '\n' ' ')
echo ${now}."Queue process all stop";
;;
"list")
ps -ef | grep "queue:work" | grep -v grep
;;
*)
echo "
Usage: QueueMonitorCommandShell.sh [run|stop|list]
"
;;
esac
~~~
# 总结
laravel这边的延迟队列使用了三个队列。
~~~
queue:default:delayed // 存储延迟任务
queue:default // 存储“生”任务就是未处理任务
queue:default:reserved // 存储待处理任务
~~~
任务在三个队列中进行轮转最后一定进入到queue:default:reserved并且成功后把任务从这个队列中删除。
laravel5.1 使用了watch来控制队列的原子操作但由于codis本身不支持 watch 方法。所以使用codis不能完全体验队列功能延迟队列不支持、不支持数据重跑对线上数据比较严格操作谨慎使用。
laravel5.3 之后redis队列 开始使用lua脚本支持的队列原子操作它没有使用 watch multi等操作所以如果线上codis 支持lua的话可以完整体验到队列功能。
- 配置
- composer安装
- composer用法
- composer版本约束表达
- phpstorm
- sftp文件同步
- php类型约束
- laradock
- 配置文件缓存详解
- git
- 自定义函数
- 核心概念
- IOC
- 服务提供者
- Facade
- 契约
- 生命周期
- 路由
- 请求
- 命名路由
- 路由分组
- 资源路由
- 控制器路由
- 响应宏
- 响应
- Command
- 创建命令
- 定时任务
- console路由
- 执行用户自定义的定时任务
- artisan命令
- 中间件
- 创建中间件
- 使用中间件
- 前置和后置
- 详细介绍
- 访问次数限制
- 为 VerifyCsrfToken 添加过滤条件
- 单点登录
- 事件
- 创建
- ORM
- 简介
- DB类
- 配置
- CURD
- queryScope和setAttribute
- 查看sql执行过程
- 关联关系
- 一对一
- 一对多
- 多对多
- 远程关联
- 多态一对多
- 多态多对多
- 关联数据库的调用
- withDefault
- 跨模型更新时间戳
- withCount,withSum ,withAvg, withMax,withMin
- SQL常见操作
- 模型事件
- 模型事件详解
- 模型事件与 Observer
- deleted 事件未被触发
- model validation
- ORM/代码片段
- Repository模式
- 多重where语句
- 中间表类型转换
- Collection集合
- 新增的一些方法
- 常见用法
- 求和例子
- 机场登机例子
- 计算github活跃度
- 转化评论格式
- 计算营业额
- 创建lookup数组
- 重新组织出表和字段关系并且字段排序
- 重构循环
- 其他例子
- 其他问题一
- 去重
- 第二个数组按第一个数组的键值排序
- 搜索ES
- 安装
- 表单
- Request
- sessiom
- Response
- Input
- 表单验证
- 简介
- Validator
- Request类
- 接口中的表单验证
- Lumen 中自定义表单验证返回消息
- redis
- 广播事件
- 发布订阅
- 队列
- 守护进程
- redis队列的坑
- beanstalkd
- rabbitmq
- redis队列
- 日志模块
- 错误
- 日志详解
- 数据填充与迁移
- 生成数据
- 数据填充seed
- migrate
- 常见错误
- Blade模板
- 流程控制
- 子视图
- URL
- 代码片段
- Carbon时间类
- 一些用法
- 邮件
- 分页
- 加密解密
- 缓存
- 文件上传
- 优化
- 随记
- 嵌套评论
- 判断字符串是否是合法的 json 字符串
- 单元测试
- 计算出两个日期的diff
- 自定义一个类文件让composer加载
- 时间加减
- 对象数组互转方法
- 用户停留过久自动退出登录
- optional 辅助方法
- 文件下载
- Api
- Dingo api
- auth.basic
- api_token
- Jwt-Auth
- passport
- Auth
- Authentication 和 Authorization
- Auth Facade
- 授权策略
- Gates
- composer包
- debug包
- idehelp包
- image处理
- 验证码
- jq插件
- 第三方登录
- 第三方支付
- log显示包
- 微信包
- xss过滤
- Excel包
- MongoDB
- php操作
- 聚合查询
- 发送带附件邮件
- 中文转拼音包
- clockwork网页调试
- emoji表情
- symfony组件
- swooletw/laravel-swoole
- 常见问题
- 跨域问题
- Laravel队列优先级的一个坑
- cache:clear清除缓存问题
- .env无法读取
- 源码相关基础知识
- __set和__get
- 依赖注入、控制反转和依赖倒置原则
- 控制反转容器(Ioc Container)
- 深入服务容器
- call_user_func
- compact
- 中间件简易实现
- array_reduce
- 中间件实现代码
- Pipeline管道操作
- composer自动加载
- redis延时队列
- 了解laravel redis队列
- cli
- 源码解读
- Facade分析
- Facade源码分析
- IOC服务容器
- 中间件原理
- 依赖注入浅析
- 微信
- 微信公众号
- 常用接收消息
- 6大接收接口
- 常用被动回复消息
- 接口调用凭证
- 自定义菜单
- 新增素材
- 客服消息
- 二维码
- 微信语音
- LBS定位
- 网页授权
- JSSDK
- easywechat
- 小程序
- 小程序配置app.json