企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# job消息队列实现 ThinkPHP的Queue内置了 Redis、Database、Topthink、Sync四种驱动,这里使用的是 Redis,也推荐使用 Redis think-queue 队列消息可以进行任务的发布、获取、执行、删除、重新发布、延迟发布、超时控制等操作 ## config配置query.php ~~~ <?php return [ 'default' => 'redis', 'connector' => 'sync', 'connections' => [ 'sync' => [ 'type' => 'sync', ], 'database' => [ 'type' => 'database', 'queue' => 'default', 'table' => 'jobs', 'connection' => null, ], 'redis' => [ 'type' => 'redis', 'queue' => 'queue', 'host' => '127.0.0.1', 'port' => 6379, 'password' => 'niushop123!@#', 'select' => 0, 'timeout' => 0, 'persistent' => false, ], ], 'failed' => [ 'type' => 'none', 'table' => 'failed_jobs', ], ]; ~~~ ## 生产者:写入消息队列 ~~~ Queue::push($job_handler_classname, $params);//$job_handler_classname指需要执行的消息类 $params指传入参数 Queue::later($later_time, $job_handler_classname, $params);//$later_time指延迟时间 ~~~ 例如计划任务 ~~~ public function execute() { $system_config_model = new SystemConfig(); $config = $system_config_model->getSystemConfig()[ 'data' ] ?? []; $is_open_queue = $config[ 'is_open_queue' ] ?? 0; $query_execute_time = $is_open_queue == 1 ? time() + 60 : time(); $list = model('cron')->getList([ [ 'execute_time', '<=', $query_execute_time ] ]); if (!empty($list)) { foreach ($list as $k => $v) { $event_res = checkQueue($v, function($params) { //加入消息队列 $job_handler_classname = 'Cronexecute'; try { if ($params[ 'execute_time' ] <= time()) { Queue::push($job_handler_classname, $params); } else { Queue::later($params[ 'execute_time' ] - time(), $job_handler_classname, $params); } } catch (\Exception $e) { $res = $this->error($e->getMessage()); } return $res ?? $this->success(); }, function($params) { try { $res = event($params[ 'event' ], [ 'relate_id' => $params[ 'relate_id' ] ]); } catch (\Exception $e) { $res = $this->error($e->getMessage()); } $data_log = [ 'name' => $params[ 'name' ], 'event' => $params[ 'event' ], 'relate_id' => $params[ 'relate_id' ], 'message' => json_encode($res) ]; $this->addCronLog($data_log); return $res; }); $event_code = $event_res[ 'code' ] ?? 0; if ($event_code < 0) { Log::write('自动任务888'); Log::write($event_res); continue; } //循环任务 if ($v[ 'type' ] == 2) { $period = $v[ 'period' ] == 0 ? 1 : $v[ 'period' ]; switch ( $v[ 'period_type' ] ) { case 0://分 $execute_time = $v[ 'execute_time' ] + $period * 60; break; case 1://天 $execute_time = strtotime('+' . $period . 'day', $v[ 'execute_time' ]); break; case 2://周 $execute_time = strtotime('+' . $period . 'week', $v[ 'execute_time' ]); break; case 3://月 $execute_time = strtotime('+' . $period . 'month', $v[ 'execute_time' ]); break; } model('cron')->update([ 'execute_time' => $execute_time ], [ [ 'id', '=', $v[ 'id' ] ] ]); } else { model('cron')->delete([ [ 'id', '=', $v[ 'id' ] ] ]); } } } ~~~ ## 消费者:指实现消息队列执行,统一在系统或者插件的job文件夹下 ~~~ /** * 事件通过队列异步调用 * Class Eventasync * @package app\job */ class Cronexecute { public function fire(Job $job, $data) { $job->delete(); try { $res = event($data[ 'event' ], [ 'relate_id' => $data[ 'relate_id' ] ]); $data_log = [ 'name' => $data[ 'name' ], 'event' => $data[ 'event' ], 'relate_id' => $data[ 'relate_id' ], 'message' => json_encode($res) ]; Log::write("计划任务:{$data[ 'event' ]} relate_id: {$data[ 'relate_id' ]}执行结果:" . json_encode($res, JSON_UNESCAPED_UNICODE)); $cron_model = new Cron(); $cron_model->addCronLog($data_log); } catch (\Exception $e) { Log::write($e->getMessage()); $job->delete(); } } } ~~~ ## 消息队列启动 ~~~ php think queue:work ~~~