# job消息队列实现
ThinkPHP的Queue内置了 Redis、Database、Topthink、Sync四种驱动,这里使用的是 Redis,也推荐使用 Redis
think-queue 队列消息可以进行任务的发布、获取、执行、删除、重新发布、延迟发布、超时控制等操作
## config配置query.php
~~~
<?php
return [
'default' => 'redis',
'connector' => 'sync',
'connections' => [
'sync' => [
'type' => 'sync',
],
'database' => [
'type' => 'database',
'queue' => 'default',
'table' => 'jobs',
'connection' => null,
],
'redis' => [
'type' => 'redis',
'queue' => 'queue',
'host' => '127.0.0.1',
'port' => 6379,
'password' => 'niushop123!@#',
'select' => 0,
'timeout' => 0,
'persistent' => false,
],
],
'failed' => [
'type' => 'none',
'table' => 'failed_jobs',
],
];
~~~
## 生产者:写入消息队列
~~~
Queue::push($job_handler_classname, $params);//$job_handler_classname指需要执行的消息类 $params指传入参数
Queue::later($later_time, $job_handler_classname, $params);//$later_time指延迟时间
~~~
例如计划任务
~~~
public function execute()
{
$system_config_model = new SystemConfig();
$config = $system_config_model->getSystemConfig()[ 'data' ] ?? [];
$is_open_queue = $config[ 'is_open_queue' ] ?? 0;
$query_execute_time = $is_open_queue == 1 ? time() + 60 : time();
$list = model('cron')->getList([ [ 'execute_time', '<=', $query_execute_time ] ]);
if (!empty($list)) {
foreach ($list as $k => $v) {
$event_res = checkQueue($v, function($params) {
//加入消息队列
$job_handler_classname = 'Cronexecute';
try {
if ($params[ 'execute_time' ] <= time()) {
Queue::push($job_handler_classname, $params);
} else {
Queue::later($params[ 'execute_time' ] - time(), $job_handler_classname, $params);
}
} catch (\Exception $e) {
$res = $this->error($e->getMessage());
}
return $res ?? $this->success();
}, function($params) {
try {
$res = event($params[ 'event' ], [ 'relate_id' => $params[ 'relate_id' ] ]);
} catch (\Exception $e) {
$res = $this->error($e->getMessage());
}
$data_log = [
'name' => $params[ 'name' ],
'event' => $params[ 'event' ],
'relate_id' => $params[ 'relate_id' ],
'message' => json_encode($res)
];
$this->addCronLog($data_log);
return $res;
});
$event_code = $event_res[ 'code' ] ?? 0;
if ($event_code < 0) {
Log::write('自动任务888');
Log::write($event_res);
continue;
}
//循环任务
if ($v[ 'type' ] == 2) {
$period = $v[ 'period' ] == 0 ? 1 : $v[ 'period' ];
switch ( $v[ 'period_type' ] ) {
case 0://分
$execute_time = $v[ 'execute_time' ] + $period * 60;
break;
case 1://天
$execute_time = strtotime('+' . $period . 'day', $v[ 'execute_time' ]);
break;
case 2://周
$execute_time = strtotime('+' . $period . 'week', $v[ 'execute_time' ]);
break;
case 3://月
$execute_time = strtotime('+' . $period . 'month', $v[ 'execute_time' ]);
break;
}
model('cron')->update([ 'execute_time' => $execute_time ], [ [ 'id', '=', $v[ 'id' ] ] ]);
} else {
model('cron')->delete([ [ 'id', '=', $v[ 'id' ] ] ]);
}
}
}
~~~
## 消费者:指实现消息队列执行,统一在系统或者插件的job文件夹下
~~~
/**
* 事件通过队列异步调用
* Class Eventasync
* @package app\job
*/
class Cronexecute
{
public function fire(Job $job, $data)
{
$job->delete();
try {
$res = event($data[ 'event' ], [ 'relate_id' => $data[ 'relate_id' ] ]);
$data_log = [
'name' => $data[ 'name' ],
'event' => $data[ 'event' ],
'relate_id' => $data[ 'relate_id' ],
'message' => json_encode($res)
];
Log::write("计划任务:{$data[ 'event' ]} relate_id: {$data[ 'relate_id' ]}执行结果:" . json_encode($res, JSON_UNESCAPED_UNICODE));
$cron_model = new Cron();
$cron_model->addCronLog($data_log);
} catch (\Exception $e) {
Log::write($e->getMessage());
$job->delete();
}
}
}
~~~
## 消息队列启动
~~~
php think queue:work
~~~
- 序言
- 安装教程
- 运行环境
- 安装手册
- 基础
- 前期准备
- 伪静态配置
- 后台目录结构
- uniapp(手机端)目录结构
- 开发命名规范
- 控制器命名规范
- model层命名规范
- 前端(管理页面)命名规范
- 提示面板
- 表单
- uniapp(手机端)命名规范
- api接口命名规范
- 架构
- 入口文件
- config设置
- app应用目录
- component(自定义模板组件)
- model层(数据业务层)
- 数据库操作
- job(消息队列)
- event(事件)
- request(请求对象)
- common(公共函数)
- log(日志处理)
- lang(语言包)
- addon插件
- 数据字典
- 系统基础表
- 配送相关表
- 商品相关表
- 网站设置相关
- 会员相关表
- 订单相关表
- 营销(组合套餐)
- 营销(砍价)
- 营销(优惠券)
- 营销(满减)
- 营销(拼团)
- 营销(秒杀)
- 店铺相关表
- 微信相关表
- 门店相关表
- 结算相关表
- 应用(分销)
- 功能模块
- 商品模块
- 会员模块
- 订单模块
- 数据统计
- 消息队列
- 支付模块
- 短信模块
- 客服
- api接口
- 接口开发
- 插件开发
- 事件开发
- 常用事件
- 插件目录与开发
- 常用插件
- 支付插件
- 拼团插件
- 新人礼