💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
[TOC] # 队列服务配置 在配置文件queue.php的connections中已经默认定义了redis的连接: ~~~ 'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => 'default', 'expire' => 60, ], ~~~ 在.env环境配置文件中把默认的队列驱动改成redis: ~~~ QUEUE_DRIVER=redis ~~~ 为了避免配置缓存的影响,执行以下命令清除并重建配置缓存: ~~~ php artisan config:cache ~~~ # 新建Queueable Jobs 使用命令: ~~~ php artisan make:job MyJob ~~~ 新建一个名为MyJob的队列处理类,在App/Jobs目录下自动生成一个MyJob.php文件。 MyJob.php需要实现handle方法,用来具体执行队列任务,构造函数可以用来传递需要的参数,handle方法支持依赖注入。 这里handle方法随便写了一个,就是往一个list类型数据结构中存一个key-value数据,测试消费队列的时候有没有起作用,构造函数传两个参数就是key和value. ~~~ <?php namespace App\Jobs; use App\Jobs\Job; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Support\Facades\Redis; class MyJob extends Job implements ShouldQueue { use InteractsWithQueue, SerializesModels; private $key; private $value; /** * Create a new job instance. * * @return void */ public function __construct($key, $value) { $this->key = $key; $this->value = $value; } /** * Execute the job. * * @return void */ public function handle() { Redis::hset('queue.test', $this->key, $this->value); } public function failed() { dump('failed'); } } ~~~ 在控制器中使用dispatch方法调用队列,将任务放入队列中,控制器中写个方法如public function test(),new MyJob的构造函数传两个随机生成的字符串参数: ~~~ $queueId = $this->dispatch(new MyJob('key_'.str_random(4), str_random(10))); dd($queueId); ~~~ 也可以指定分发 ~~~ //发送消息 public function SendMessage(Request $request){ ... $this->dispatch((new SendMessage($sendParams))->onQueue('snail:SendMessage')); } ~~~ ## 内部实现 ![](https://box.kancloud.cn/3d35b6574ed12d38784a85b45547b25b_1386x1230.png) 配置并访问路由,可以多访问几次,然后到Redis中查看,会发现把队列存入了一个queue::queue:default的List结构中 其中value内容如下,这是转换成json格式的,而实际上是经过序列号的字符串: ~~~ { "job": "Illuminate\\Queue\\CallQueuedHandler@call", "data": { "command": "O:14:\"App\\Jobs\\MyJob\":6:{s:19:\"\u0000App\\Jobs\\MyJob\u0000key\";i:1;s:21:\"\u0000App\\Jobs\\MyJob\u0000value\";i:2;s:10:\"connection\";N;s:5:\"queue\";N;s:5:\"delay\";N;s:6:\"\u0000*\u0000job\";N;}" }, "id": "EV2bhqUlx0T8pRCVHw1qT0fkP8AQcyI8", "attempts": 1 } ~~~ 这里data参数里包含了队列服务Job的名称,构造函数的参数等信息,消费者执行任务的依据。attempts表示重试的次数,往往执行队列任务失败了会重试,可以设置最多尝试次数 # 消费队列 这个时候任务只是入了队列,但并没有消费,执行: ~~~ php artisan queue:listen ~~~ 这个命令,Laravel就开始消费队列。 可以看到这几个任务以此被消费,再去Redis看看有没有实现预期要达到的效果,每个任务往一个List类型的结构写入数据 也可以这样 ~~~ home/niuyufu/php/bin/php /home/niuyufu/webroot/snail_api/artisan queue:work --queue=snail:SendMessage --tries=3 --memory=512 --daemon ~~~ ## 内部实现 ![](https://box.kancloud.cn/162b10a96e37f094429b09d7415b8167_1414x1298.png) 队列消息分析 监控redis对应队列消息具体产生的消息操作如下 ~~~ tail -f | redis-cli -h 10.94.120.13 -p 6380 monitor | grep "queues:snail" 1492446053.406282 [0 10.95.117.155:57132] "WATCH" "queues:snail:SendMessage:delayed" 1492446053.406452 [0 10.95.117.155:57132] "ZRANGEBYSCORE" "queues:snail:SendMessage:delayed" "-inf" "1492446053" 1492446053.406754 [0 10.95.117.155:57132] "WATCH" "queues:snail:SendMessage:reserved" 1492446053.406842 [0 10.95.117.155:57132] "ZRANGEBYSCORE" "queues:snail:SendMessage:reserved" "-inf" "1492446053" 1492446053.407029 [0 10.95.117.155:57132] "LPOP" "queues:snail:SendMessage" 1492446053.407700 [0 10.95.117.155:57132] "ZADD" "queues:snail:SendMessage:reserved" "1492446113" "{job}" 1492446053.463953 [0 10.95.117.155:57132] "ZREM" "queues:snail:SendMessage:reserved" "{job}" ~~~ PS如果你的redis是codis的话注意了因为codis禁用方法列表 ~~~ KEYS, MOVE, OBJECT, RENAME, RENAMENX, SORT, SCAN, BITOP,MSETNX, BLPOP, BRPOP, BRPOPLPUSH, PSUBSCRIBEPUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, DISCARD, EXEC, MULTI, UNWATCH, WATCH, SCRIPT EXISTS, SCRIPT FLUSH, SCRIPT KILL, SCRIPT LOAD, AUTH, ECHO, SELECT, BGREWRITEAOF, BGSAVE, CLIENT KILL, CLIENT LIST, CONFIG GET, CONFIG SET, CONFIG RESETSTAT, DBSIZE, DEBUG OBJECT, DEBUG SEGFAULT, FLUSHALL, FLUSHDB, INFO, LASTSAVE, MONITOR, SAVE, SHUTDOWN, SLAVEOF, SLOWLOG, SYNC, TIME ~~~ 所以执行消费任务会有以下错误 ~~~ [Predis\Connection\ConnectionException] Error while reading line from the server. [tcp://100.90.154.39:3000] ~~~ 解决方法为修改config/queue.php ~~~ 'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => 'default', 'expire' => null, //禁用即可 ], ~~~ # 优化日志处理 如果你的系统有切割文件日志操作。会发现虽然日志被切分了但程序却没有往新文件里写入。如2017-04-18 13:50启动的项目日志会一直打到 snail.log.2017041813上。 改进方案 app/Jobs/Job.php文件中添加如下方法 ~~~ public function releaseLoggerFile(){ $handles=\Log::getMonolog()->getHandlers(); if(!is_array($handles) || empty($handles)){ return; } foreach($handles as $handle){ if(method_exists($handle, "close")){ $handle->close(); } } return; } ~~~ 在具体job实现类中的handle方法结尾添加 ~~~ public function handle() { ... $this->releaseLoggerFile(); //释放log文件 } ~~~ # 线上部署 创建任务shell ~~~ #!/bin/sh day=$(date +'%y%m%d') now=$(date +"%F %T") php_command="/home/niuyufu/php/bin/php" command="/home/niuyufu/webroot/snail_api/artisan" log="/home/niuyufu/webroot/log/wave" queue_name_arr=("snail:SendMessage" "snail:SendMessageToApp") case $1 in "run") for queue_name in ${queue_name_arr[*]} do count=$(ps aux | grep artisan |grep queue=${queue_name} | grep -v grep | wc -l) if [ ${count} -lt 1 ];then echo "${now}:${queue_name} process has exit ${count}\n"; nohup $php_command $command queue:work --queue=${queue_name} --tries=3 --memory=512 --daemon >> $log/queueMonitor.log 2>&1 & else echo "${now}:${queue_name} process is runing"; fi done ;; "stop") kill -9 $(ps -ef | grep "queue:work" | grep -v grep | awk '{print $2}' | tr -s '\n' ' ') echo ${now}."Queue process all stop"; ;; "list") ps -ef | grep "queue:work" | grep -v grep ;; *) echo " Usage: QueueMonitorCommandShell.sh [run|stop|list] " ;; esac ~~~ # 总结 laravel这边的延迟队列使用了三个队列。 ~~~ queue:default:delayed // 存储延迟任务 queue:default // 存储“生”任务就是未处理任务 queue:default:reserved // 存储待处理任务 ~~~ 任务在三个队列中进行轮转最后一定进入到queue:default:reserved并且成功后把任务从这个队列中删除。 laravel5.1 使用了watch来控制队列的原子操作但由于codis本身不支持 watch 方法。所以使用codis不能完全体验队列功能延迟队列不支持、不支持数据重跑对线上数据比较严格操作谨慎使用。 laravel5.3 之后redis队列 开始使用lua脚本支持的队列原子操作它没有使用 watch multi等操作所以如果线上codis 支持lua的话可以完整体验到队列功能。