企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
前面一节,介绍了 pheanstalk 的接口文件,目的是为了更接近 pheanstalkd 的使用。 > 这同样是一个从 0 到 1 了解第三方包如何使用的过程,这是一个学习方法,在其它的包时,也有借鉴意义。 在过了一遍 contracts 之后,我们可以发现,pheanstalk 最接近用户层(开发者)的接口,就是 PheanstalkInterface 。 那么,使用手册,我们也通过解读 PheanstalkInterface 的具体实现 Pheanstalk 类来撰写。 # 建立连接 ``` include_once "vendor/autoload.php"; $conn = \Pheanstalk\Pheanstalk::create('127.0.0.1',11300,10); ``` 上面的 create 方法,第一个参数必填,需要传入目标 beanstalkd 服务器的 ip ,前提是,对于 beanstalkd server 而言,你的 ip (可能是内网,也可能是外网,也可能是本机),必须在它的监听列表中。 第二个参数是端口,可不填,默认为 11300 ,当然你可以修改。 第三个参数是连接超时时间,可不填,默认为 10 ,表示 10 秒之后还未连接成功就超时了。 **要注意的是:此 create 方法只是返回一个配置好的 pheanstalk 实例,但并非是一个连接好的实例。** 如何理解呢? 从实现上, pheanstalk 的流程是等到有指令了,也就是有 dispatch 指令后,才建立一个 socket 连接,而它与 beanstalkd 通信的底层,用的也是 socket 。 所以,如果你只写了 create 方法,并不能看出是否能够连接成功。 你还需要发送一个指令才能知道是否能够连接成功。 一旦发送过指令之后,Pheanstalk->connection->socket 就会指向一个 socket 资源。 # 关闭、重连 虽然,我们能看到 Pheanstalkd 类中,有 reconnect 方法,还有 $this->connection 的属性。 然而它们的访问权限是 private 。 这告诉我们,关闭和重连,不需要我们手动操作。 我们可以看到,这两个方法,只有在 pheanstalk 向 beanstalkd 发送指令时,会用到。 **因为 PHP 使用 socket 连接时,socket 是有生命周期的,超出一定时间 socket 就会失效,此时,Pheanstalk 的 dispatch 方法就会自动重连。** 我们知道,pheanstalk 的底层通信基于 php 的 socket ,那么,必然具备 socket 的特性,再深入的内容,可以参考 php 手册中的 socket 部分。 # 生产者操作 ## 选择 tube ``` $conn->useTube('myTube'); // 会返回 this ,因此支持链式操作 ``` ## put job ``` $job = $conn->put('this is my first job !'); ``` 这里传入的字符串,就是放到 beanstalkd 中 job 的 body 当然,关于 priority 、 delay 、 ttr 这些参数你可以在 body 后面传入,具体可参考方法接口。 put 会返回一个 Job 实例,如下: ``` Pheanstalk\Job Object ( [id:Pheanstalk\Job:private] => 1 [data:Pheanstalk\Job:private] => this is my first job ! ) ``` ## 链式操作 ``` $job = $conn->useTube('myTube')->put('this is my 2nd job !'); ``` ## 默认 Tube 我们知道,默认的 Tube 是 default ,如果我们不使用 useTube 的话,put 的 job 就会在 default tube 中。 # 消费者操作 ## 接收 job ### reserve ``` $job = $conn->reserve(); // 默认情况会接收 default tube 的 job print_r($job); // 如果 reserve 没能得到 job ,就会一直阻塞在上面 ``` 上面返回的一定是 Job 实例。 ### reserveWithTimeOut ``` try { $job2 = $conn->reserveWithTimeout(10); // 阻塞接收,10秒之后超时,就不再接收了 print_r($job2); if ($job2 === null) { throw new Exception('超时了'); } // 处理 job ... sleep(60); } catch (\Pheanstalk\Exception\DeadlineSoonException $e) { print_r('deadline soon' . $e->getMessage()); } catch (Exception $e) { print_r($e->getMessage()); } ``` 注意:这个方法,超时的话会返回 null ,成功的话,会返回一个 Job 实例。 还有,上面代码片段中有一行 sleep ,这一行的作用,是做一个小实验。 当,job 被成功返回过来,但没有 sleep 这一行,你会发现,你连续运行两次该代码,返回的 job 是同一个。 理论上,我们应该接收到下一个 job ,因为这个 job 被接收了之后,状态就变成 reserved 而不再是 ready 。 那如何来测试 job 是否真的变成 reserved 了呢? 这就是 sleep 的用处,步骤如下: - 先往你的 beanstalkd 加入两个 job - 再在两个 cli 窗口运行两次含 sleep 的消费者代码 你会发现这个情况,如下图: ![](https://box.kancloud.cn/bbcd9b836e8289007612aa75e859e21f_628x178.png) ![](https://box.kancloud.cn/fa0b5ee917e0be7fffdeb6222dac554d_1294x342.png) 它们分别接收了两个 job 。 > 这里不要用 web 测试,用 cli 才能更直观地看到效果。 这个现象,也很好理解: - reserve 成功之后,你的 php 进程还在于 beanstalkd 保持着 socket 连接,此连接不销毁,beanstalkd 都会为你冻结此 job 。 - 一旦 php 进程销毁(执行结束),beanstalkd 没有接收到其他对 job 的操作,自然就回到 ready 队列中了 ## 删除 job ``` $conn->delete($job2); ``` 这个方法没有返回值,但如果出错的话,会抛出异常。 一般在 job 被消费者处理完毕之后才调用 delete 方法。 ## 释放 reserved job ``` $conn->release($job,$priority,$delay); ``` 此方法没有返回值,如果出错会抛出异常。 此方法可以将 reserved job 重新放回 ready 队列中,一般在消费者逻辑处理失败后,才使用这个方法。 ## 预留 ``` $conn->bury($job,$priority); ``` 此方法没有返回值,如果出错会抛出异常。 此方法可以将一个 job 操作为 buried 状态,比如当你的消费者接收到这个 job 时,对 job 进行了一系列检查,经检查,发现这个 job 还不能被消费,此时可以将 job 操作为 buried ,直到有客户端对这个 job 发送了 kick 指令,才会被再次 reserve 。 ## 延迟 ttr ``` $conn->touch($job); ``` 此方法没有返回值,如果出错会抛出异常。 ## 添加监听的管道 ``` $conn->watch('test')->watch('sms'); ``` 此方法返回 $conn 本身,可链式操作,每次运行,都会为 watch list 添加一个 tube 。 ## 删除监听的管道 ``` $conn->ignore('email')->ignore('sms'); ``` 此方法返回 $conn 本身,可链式操作,每次运行,都会从 watch list 中移除一个 tube 。 > 默认,watch list 中会有一个 default ,所以,当你执行 watch 时,default 仍然在 watch list 中。 ## 仅监听一个管道 ``` $conn->watchOnly('sms'); ``` 此方法返回 $conn 本身,可链式操作。 通过此快捷方法,可以一次性移除 watch list 中所有的 tube ,除了此方法传入的 tube 以外。 # 其他命令 ## 单纯获取 job ``` $conn->peek(new \Pheanstalk\Job(1,'')); // 根据 id 获取 job ,id 在 beanstalkd 是唯一的,不论处于什么 tube $conn->useTube('order')->peekReady(); // 从 order tube 中获取排在最前面的 job(这个顺序,同 reserve 的顺序) $conn->peekDelayed(); // 从 default tube 中获取(最)即将变成 ready 的 delayed job $conn->peekBuried(); // 返回下一个 buried job ``` > 这里的 peekReady 、 peekDelayed 、 peekBuried 虽然在译文解释中,提到「下一个」,但它并非是一个指针,当你调用一起 peekReady ,下一次再调用 peekReady 就自动将指针移动到「下一个」。**如果你不手动将当前得到的 Job 操作为其他状态,或者延长延迟时间等,你多次调用 peek 命令,他将返回同一个 Job。** ## 批量操作 job 为 ready ``` $conn->useTube('order'); $kicked = $conn->kick(10); print_r($kicked); ``` 上面的代码片段,是针对 order tube 操作 10 条 buried 或 delayed 为 ready 。 具体可以看 beanstalkd 译文中的 kick 命令。 ## 操作指定 job 为 ready ``` $conn->useTube('sms')->kickJob($job); ``` 此方法没有返回值。 ## 获取 job 的统计信息 ``` $job = $conn->reserve(); $res = $conn->statsJob($job); print_r($res); ``` 此方法返回一个 ArrayResponse 实例,具体字段意义,需参考 beanstalkd 译文。 $res 如下: ``` Pheanstalk\Response\ArrayResponse Object ( [name:Pheanstalk\Response\ArrayResponse:private] => OK [storage:ArrayObject:private] => Array ( [id] => 2 [tube] => default [state] => reserved [pri] => 1 [age] => 92865 [delay] => 0 [ttr] => 60 [time-left] => 59 [file] => 0 [reserves] => 8 [timeouts] => 0 [releases] => 0 [buries] => 0 [kicks] => 0 ) ) ``` ## 获取 tube 统计信息 ``` $res = $conn->statsTube('default'); print_r($res); ``` 此方法返回一个 ArrayResponse 实例,具体字段意义,需参考 beanstalkd 译文。 $res 如下: ``` Pheanstalk\Response\ArrayResponse Object ( [name:Pheanstalk\Response\ArrayResponse:private] => OK [storage:ArrayObject:private] => Array ( [name] => default [current-jobs-urgent] => 1 [current-jobs-ready] => 2 [current-jobs-reserved] => 0 [current-jobs-delayed] => 0 [current-jobs-buried] => 0 [total-jobs] => 2 [current-using] => 1 [current-watching] => 1 [current-waiting] => 0 [cmd-delete] => 0 [cmd-pause-tube] => 0 [pause] => 0 [pause-time-left] => 0 ) ) ``` ## 获取 beanstalkd 服务器统计信息 ``` $res = $conn->stats(); print_r($res); ``` 此方法返回一个 ArrayResponse 实例,具体字段意义,需参考 beanstalkd 译文。 $res 如下: ``` Pheanstalk\Response\ArrayResponse Object ( [name:Pheanstalk\Response\ArrayResponse:private] => OK [storage:ArrayObject:private] => Array ( [current-jobs-urgent] => 1 [current-jobs-ready] => 2 [current-jobs-reserved] => 0 [current-jobs-delayed] => 0 [current-jobs-buried] => 0 [cmd-put] => 2 [cmd-peek] => 10 [cmd-peek-ready] => 11 [cmd-peek-delayed] => 10 [cmd-peek-buried] => 10 [cmd-reserve] => 2 [cmd-reserve-with-timeout] => 16 [cmd-delete] => 0 [cmd-release] => 0 [cmd-use] => 7 [cmd-watch] => 4 [cmd-ignore] => 4 [cmd-bury] => 0 [cmd-kick] => 4 [cmd-touch] => 0 [cmd-stats] => 1 [cmd-stats-job] => 1 [cmd-stats-tube] => 1 [cmd-list-tubes] => 0 [cmd-list-tube-used] => 0 [cmd-list-tubes-watched] => 0 [cmd-pause-tube] => 0 [job-timeouts] => 0 [total-jobs] => 2 [max-job-size] => 65535 [current-tubes] => 1 [current-connections] => 1 [current-producers] => 0 [current-workers] => 0 [current-waiting] => 0 [total-connections] => 37 [pid] => 1 [version] => 1.10 [rusage-utime] => 0.020000 [rusage-stime] => 0.030000 [uptime] => 96714 [binlog-oldest-index] => 0 [binlog-current-index] => 0 [binlog-records-migrated] => 0 [binlog-records-written] => 0 [binlog-max-size] => 10485760 [id] => a1e58a6bbd4c3b8b [hostname] => 91db88742cda ) ) ``` ## 查看当前 tube 列表 ``` $res = $conn->listTubes(); print_r($res); ``` 此方法将返回一个数组,如下: ``` Array ( [0] => default [1] => myTube ) ``` ## 查看当前 use 的 tube 列表 ``` $res = $conn->listTubeUsed(); print_r($res); ``` 此方法返回一个字符串,为当前 used 的 tube 名。 注意,同一时间,只有一个 tube 会被 used 。 ## 查看当前 watch list ``` $res = $conn->listTubesWatched(); print_r($res); ``` 此方法将返回一个数组,如下: ``` Array ( [0] => default ) ``` ## 冻结 tube ``` $conn->pauseTube('default',90); ``` 此方法没有返回值,会将 tube 冻结 90 秒,冻结期间,消费者无法 reserve job ,如果 tube 冻结后,有客户端发送了 reserve 指令,则会阻塞,直到冻结结束,或 reserve time out 。