企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 安装 > composer require topthink/think-queue ## 配置 > 配置文件位于 `config/queue.php` ### 公共配置 ~~~php [ 'default'=>'sync' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动//或其他自定义的完整的类名 ] ~~~ 配置消息队列的驱动 根据选择的存储方式,在\\app\\config\\queue.php这个配置文件中,添加消息队列对应的驱动配置 ![](https://img.kancloud.cn/d2/e1/d2e15e42103b59bc2acacd6121bd597c_1716x1418.png) ## 创建任务类 > 单模块项目推荐使用 `app\job` 作为任务类的命名空间 多模块项目可用使用 `app\module\job` 作为任务类的命名空间 也可以放在任意可以自动加载到的地方 > > 任务类不需继承任何类,如果这个类只有一个任务,那么就只需要提供一个`fire`方法就可以了,如果有多个小任务,就写多个方法,下面发布任务的时候会有区别 > 每个方法会传入两个参数 `think\queue\Job $job`(当前的任务对象) 和 `$data`(发布任务时自定义的数据) > > 还有个可选的任务失败执行的方法 `failed` 传入的参数为`$data`(发布任务时自定义的数据) ~~~ 例一:消息的消费与删除 namespace app\job; use think\queue\Job; class Job1{ public function fire(Job $job, $data){ //....这里执行具体的任务 if ($job->attempts() > 3) { //通过这个方法可以检查这个任务已经重试了几次了 } //如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法 $job->delete(); // 也可以重新发布这个任务 $job->release($delay); //$delay为延迟时间 } public function failed($data){ // ...任务达到最大重试次数后,失败了 } } ~~~ ~~~ 例二: namespace app\lib\job; use think\queue\Job; class Job2{ public function task1(Job $job, $data){ } public function task2(Job $job, $data){ } public function failed($data){ } } ~~~ ~~~php 例三:多模块举例 ~~~ ~~~ <?php /** * 消息队列(queue)使用方法 * 使用redis实现消息队列demo */ namespace app\crm\job\export; use think\facade\Db; use think\queue\Job; class QueueJobTest { public function fire(Job $job,$data){ // 如有必要,可以根据业务需求和数据库中的最新数据,判断该任务是否仍有必要执行. $isJobStillNeedToBeDone = $this->checkJobNeedIfDone($data); if(!$isJobStillNeedToBeDone){ //删除任务 $job->delete(); return; } $isJobDone = $this->saveInfo($data); //如果任务执行完成,删除此队列任务, if($isJobDone){ //删除任务 $job->delete(); }else{ //通过这个方法可以检查这个任务已经重试了几次了 if ($job->attempts() > 3) { //如果大于设置的次数3,可执行删除任务或重新发布此任务 //此处直接执行删除任务 $job->delete(); //--begin-重新发布此任务------- //$job->release(2); //参数为设置的延迟时间,表示该任务延迟2秒后再执行 //--end-重新发布此任务--------—— } } } /** * 可选的任务失败执行的方法,此处只作列举 * 任务失败,执行failed方法 * @param $data 发布任务时自定义的数据 */ //public function failed($data){ //// ...任务达到最大重试次数后,失败了 //执行失败的业务代码 //} /** * 有些消息在到达消费者时,可能已经不再需要执行了 * @param array|mixed $data 发布任务时自定义的数据 * @return boolean 任务执行的结果 */ private function checkJobNeedIfDone($data){ //可查询数据库确认是否需要继续执行,比如订单支付的状态更改等后续操作 //此处省略业务逻辑,直接返回true,表示继续执行 return true; } /** * 任务执行的业务代码 * 如发送邮件,数据入库等 */ private function saveInfo($data){ //此处模拟数据入库 $res = Db::name('test')->insert( [ 'title' => $data['title'], 'content' => $data['content'], 'add_time' => time() ] ); if($res){ return true; }else{ return false; } } } ~~~ ## 发布任务 > 两个方法 > > `think\facade\Queue::push($job, $data = '', $queue = null);`       //立即执行 > > 或  > > `think\facade\Queue::later($delay, $job, $data = '', $queue = null);`  //延时执行,在参数 `$delay `秒后执行在`$delay`秒后执行 > > `$job` 是任务名 > 单模块的,且命名空间是`app\job`的,比如上面的例子一,写`Job1`类名即可 > 多模块的,且命名空间是`app\module\job`的,写`model/Job1`即可 > 其他的需要些完整的类名,比如上面的例子二,需要写完整的类名`app\lib\job\Job2` > 如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名`app\lib\job\Job2@task1`、`app\lib\job\Job2@task2` > > `$data` 是你要传到任务里的参数 > > `$queue` 队列名,指定这个任务是在哪个队列上执行,同下面监控队列的时候指定的队列名,可不填 > > **例:** > > ~~~php > 文件路径: > > ~~~ > > **任务名(类名):app\\crm\\job\\export\\QueueJobTest** > > **传到任务里的参数:**\['title' => '队列任务入库demo','content' => 'xxxx内容'\]**** > > **队列名(可不填):queueJobTest** > > ****//生产者业务代码中把任务push到队列中:**** > > **think\\facade\\Queue::push('app\\crm\\job\\export\\QueueJobTest',  \['title' => '队列任务入库demo','content' => 'xxxx内容'\], '**queueJobTest**')** ![](https://img.kancloud.cn/f9/c9/f9c925d7d8abd5029b8533d7992bd79b_1700x537.png) **发布任务**  浏览器中访问:http://localhost/index.php/Test/actionQueueJobTest   ## 监听任务并执行 消费者进行处理队列 ~~~ &> php think queue:listen &> php think queue:work ~~~ 两种,具体的可选参数可以输入命令加 `--help` 查看 **处理任务** > 方式一: > > 切换当前终端窗口的目录到项目根目录下,执行 > > php think queue:work --queue queueJobTest > > 或 > > php think queue:listen --queue queueJobTest > 方式二: > > supervisor安装参考:https://www.cnblogs.com/chihuobao/p/15341719.html   > > 可配合supervisor使用,保证进程常驻 > > 添加queueJobTest.ini文件输入如下内容 > > > \[program:queueJobTest\] > user=root > command = php /home/wwwroot/test\_crm/think queue:listen --queue queueJobTest --timeout 0 --memory 1024 > autostart=true > autorestart=false > stderr\_logfile=/tmp/queue\_stderr.log > stdout\_logfile=/tmp/queue\_stdout.log **1、首先,你需要在你的框架中下载queue这个扩展** ~~~ composer require topthink/think-queue ~~~ **2.公共配置,配置文件位于:config/queue.php** [![复制代码](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0); "复制代码") ~~~ return [ 'default' => 'redis', 'connections' => [ 'sync' => [ 'type' => 'sync', ], 'database' => [ 'type' => 'database', 'queue' => 'default', 'table' => 'jobs', 'connection' => null, ], 'redis' => [ 'type' => 'redis', 'queue' => 'default', 'host' => '127.0.0.1', 'port' => 6379, 'password' => '', 'select' => 0, 'timeout' => 0, 'persistent' => false, ], ], 'failed' => [ 'type' => 'none', 'table' => 'failed_jobs', ], ]; ~~~ [![复制代码](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0); "复制代码") **你可以在配置文件中选择你需要使用数据库的类型,根据自己的需求使用,这里我选择的是redis** ~~~ 'default'=>'sync' //驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动//或其他自定义的完整的类名 ~~~ **3.创建任务类** **单模块项目推荐使用 `app\job` 作为任务类的命名空间** **多模块项目可用使用 `app\module\job` 作为任务类的命名空间 也可以放在任意可以自动加载到的地方** **我这里是在  app\\admin\\job  创建了这个任务类** **里面执行了我发送邮件的任务,这里的例子可以实现延迟发送邮件的任务** **示例代码如下** [![复制代码](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0); "复制代码") ~~~ <?php namespace app\admin\job; use think\facade\Log; use think\queue\Job; use tool\SendMail; class Test { public function fire(Job $job, $data){ if ($job->attempts()>3){ //执行失败写入错误日志 Log::error('fire执行失败'); //删除这个任务 $job->delete(); }else{ $toemail="407489255@qq.com"; //定义收件人的邮箱 $username="407486225@qq.com"; //发送方的邮箱地址 $password="qmxrvjgohiucbbgj"; //发送方邮箱的密码 $setFrom="1223524552@qq.com"; //设置发件人信息 $addReplyTo="1003729782@qq.com"; //设置收件人信息 $contrnt="邮件内容是 <b>您的验证码是:123456</b>,哈哈哈!111122"; //设置邮件内容 SendMail::sendMail($toemail,$username,$password,$setFrom,$addReplyTo,$contrnt); //执行成功删除这个任务 $job->delete(); } } } ~~~ [![复制代码](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0); "复制代码") ~~~ 这个是发送邮件的封住类 配合上面的使用 ~~~ [![复制代码](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0); "复制代码") ~~~ <?php namespace tool; use PHPMailer\PHPMailer\PHPMailer; class SendMail { public static function sendMail($toemail,$username,$password,$setFrom,$addReplyTo,$contrnt) { $toemail ="$toemail";//定义收件人的邮箱 $mail = new PHPMailer(); $mail->isSMTP();// 使用SMTP服务 $mail->CharSet = "utf8";// 编码格式为utf8,不设置编码的话,中文会出现乱码 $mail->Host = "smtp.qq.com";// 发送方的SMTP服务器地址 $mail->SMTPAuth = true;// 是否使用身份验证 $mail->Username = "$username";// 发送方的163邮箱用户名,就是你申请163的SMTP服务使用的163邮箱</span><span style="color:#333333;"> $mail->Password = "$password";// 发送方的邮箱密码,注意用163邮箱这里填写的是“客户端授权密码”而不是邮箱的登录密码!</span><span style="color:#333333;"> $mail->SMTPSecure = "ssl";// 使用ssl协议方式</span><span style="color:#333333;"> $mail->Port = 465;// 163邮箱的ssl协议方式端口号是465/994 $mail->setFrom("$setFrom", "Mailer");// 设置发件人信息,如邮件格式说明中的发件人,这里会显示为Mailer(xxxx@163.com),Mailer是当做名字显示 $mail->addAddress($toemail, 'Wang');// 设置收件人信息,如邮件格式说明中的收件人,这里会显示为Liang(yyyy@163.com) $mail->addReplyTo("$addReplyTo", "Reply");// 设置回复人信息,指的是收件人收到邮件后,如果要回复,回复邮件将发送到的邮箱地址 //$mail->addCC("xxx@163.com");// 设置邮件抄送人,可以只写地址,上述的设置也可以只写地址(这个人也能收到邮件) //$mail->addBCC("xxx@163.com");// 设置秘密抄送人(这个人也能收到邮件) //$mail->addAttachment("bug0.jpg");// 添加附件 $mail->Subject = "这是一个测试邮件";// 邮件标题 $mail->Body = "$contrnt";// 邮件正文 //$mail->AltBody = "This is the plain text纯文本";// 这个是设置纯文本方式显示的正文内容,如果不支持Html方式,就会用到这个,基本无用 if (!$mail->send()) {// 发送邮件 echo "Message could not be sent."; echo "Mailer Error: " . $mail->ErrorInfo;// 输出错误信息 } else { echo '发送成功'; } } } ~~~ [![复制代码](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0); "复制代码") **4.发布任务   控制器调用这个  进入发布任务** [![复制代码](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0); "复制代码") ~~~ <?php declare (strict_types = 1); namespace app\admin\controller; use think\facade\Queue; use think\Request; class index { /** * 显示资源列表 * * @return \think\Response */ public function index() { //设置执行这个任务 Queue::later('20','app\admin\job\Test@fire',['name'=>'Test'],'fire'); } ~~~ [![复制代码](https://common.cnblogs.com/images/copycode.gif)](javascript:void(0); "复制代码") ~~~ 注:一写参数的解释与使用方法 ~~~ > `think\facade\Queue::push($job, $data = '', $queue = null)` 和 `think\facade\Queue::later($delay, $job, $data = '', $queue = null)` 两个方法,前者是立即执行,后者是在`$delay`秒后执行 `$job` 是任务名 单模块的,且命名空间是`app\job`的,比如上面的例子一,写`Job1`类名即可 多模块的,且命名空间是`app\module\job`的,写`model/Job1`即可 其他的需要些完整的类名,比如上面的例子二,需要写完整的类名`app\lib\job\Job2` 如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名`app\lib\job\Job2@task1`、`app\lib\job\Job2@task2` `$data` 是你要传到任务里的参数 `$queue` 队列名,指定这个任务是在哪个队列上执行,同下面监控队列的时候指定的队列名,可不填 ## 5.监听任务并执行 这是我的执行的方法 ~~~ php think queue:listen --queue fire ~~~ **参数解释** > php think queue:listen > php think queue:work 两种,具体的可选参数可以输入命令加 --help 查看 > 可配合supervisor使用,保证进程常驻