前面一节,介绍了 pheanstalk 的接口文件,目的是为了更接近 pheanstalkd 的使用。
> 这同样是一个从 0 到 1 了解第三方包如何使用的过程,这是一个学习方法,在其它的包时,也有借鉴意义。
在过了一遍 contracts 之后,我们可以发现,pheanstalk 最接近用户层(开发者)的接口,就是 PheanstalkInterface 。
那么,使用手册,我们也通过解读 PheanstalkInterface 的具体实现 Pheanstalk 类来撰写。
# 建立连接
```
include_once "vendor/autoload.php";
$conn = \Pheanstalk\Pheanstalk::create('127.0.0.1',11300,10);
```
上面的 create 方法,第一个参数必填,需要传入目标 beanstalkd 服务器的 ip ,前提是,对于 beanstalkd server 而言,你的 ip (可能是内网,也可能是外网,也可能是本机),必须在它的监听列表中。
第二个参数是端口,可不填,默认为 11300 ,当然你可以修改。
第三个参数是连接超时时间,可不填,默认为 10 ,表示 10 秒之后还未连接成功就超时了。
**要注意的是:此 create 方法只是返回一个配置好的 pheanstalk 实例,但并非是一个连接好的实例。**
如何理解呢?
从实现上, pheanstalk 的流程是等到有指令了,也就是有 dispatch 指令后,才建立一个 socket 连接,而它与 beanstalkd 通信的底层,用的也是 socket 。
所以,如果你只写了 create 方法,并不能看出是否能够连接成功。
你还需要发送一个指令才能知道是否能够连接成功。
一旦发送过指令之后,Pheanstalk->connection->socket 就会指向一个 socket 资源。
# 关闭、重连
虽然,我们能看到 Pheanstalkd 类中,有 reconnect 方法,还有 $this->connection 的属性。
然而它们的访问权限是 private 。
这告诉我们,关闭和重连,不需要我们手动操作。
我们可以看到,这两个方法,只有在 pheanstalk 向 beanstalkd 发送指令时,会用到。
**因为 PHP 使用 socket 连接时,socket 是有生命周期的,超出一定时间 socket 就会失效,此时,Pheanstalk 的 dispatch 方法就会自动重连。**
我们知道,pheanstalk 的底层通信基于 php 的 socket ,那么,必然具备 socket 的特性,再深入的内容,可以参考 php 手册中的 socket 部分。
# 生产者操作
## 选择 tube
```
$conn->useTube('myTube'); // 会返回 this ,因此支持链式操作
```
## put job
```
$job = $conn->put('this is my first job !');
```
这里传入的字符串,就是放到 beanstalkd 中 job 的 body
当然,关于 priority 、 delay 、 ttr 这些参数你可以在 body 后面传入,具体可参考方法接口。
put 会返回一个 Job 实例,如下:
```
Pheanstalk\Job Object
(
[id:Pheanstalk\Job:private] => 1
[data:Pheanstalk\Job:private] => this is my first job !
)
```
## 链式操作
```
$job = $conn->useTube('myTube')->put('this is my 2nd job !');
```
## 默认 Tube
我们知道,默认的 Tube 是 default ,如果我们不使用 useTube 的话,put 的 job 就会在 default tube 中。
# 消费者操作
## 接收 job
### reserve
```
$job = $conn->reserve(); // 默认情况会接收 default tube 的 job
print_r($job); // 如果 reserve 没能得到 job ,就会一直阻塞在上面
```
上面返回的一定是 Job 实例。
### reserveWithTimeOut
```
try {
$job2 = $conn->reserveWithTimeout(10); // 阻塞接收,10秒之后超时,就不再接收了
print_r($job2);
if ($job2 === null) {
throw new Exception('超时了');
}
// 处理 job ...
sleep(60);
} catch (\Pheanstalk\Exception\DeadlineSoonException $e) {
print_r('deadline soon' . $e->getMessage());
} catch (Exception $e) {
print_r($e->getMessage());
}
```
注意:这个方法,超时的话会返回 null ,成功的话,会返回一个 Job 实例。
还有,上面代码片段中有一行 sleep ,这一行的作用,是做一个小实验。
当,job 被成功返回过来,但没有 sleep 这一行,你会发现,你连续运行两次该代码,返回的 job 是同一个。
理论上,我们应该接收到下一个 job ,因为这个 job 被接收了之后,状态就变成 reserved 而不再是 ready 。
那如何来测试 job 是否真的变成 reserved 了呢?
这就是 sleep 的用处,步骤如下:
- 先往你的 beanstalkd 加入两个 job
- 再在两个 cli 窗口运行两次含 sleep 的消费者代码
你会发现这个情况,如下图:
![](https://box.kancloud.cn/bbcd9b836e8289007612aa75e859e21f_628x178.png)
![](https://box.kancloud.cn/fa0b5ee917e0be7fffdeb6222dac554d_1294x342.png)
它们分别接收了两个 job 。
> 这里不要用 web 测试,用 cli 才能更直观地看到效果。
这个现象,也很好理解:
- reserve 成功之后,你的 php 进程还在于 beanstalkd 保持着 socket 连接,此连接不销毁,beanstalkd 都会为你冻结此 job 。
- 一旦 php 进程销毁(执行结束),beanstalkd 没有接收到其他对 job 的操作,自然就回到 ready 队列中了
## 删除 job
```
$conn->delete($job2);
```
这个方法没有返回值,但如果出错的话,会抛出异常。
一般在 job 被消费者处理完毕之后才调用 delete 方法。
## 释放 reserved job
```
$conn->release($job,$priority,$delay);
```
此方法没有返回值,如果出错会抛出异常。
此方法可以将 reserved job 重新放回 ready 队列中,一般在消费者逻辑处理失败后,才使用这个方法。
## 预留
```
$conn->bury($job,$priority);
```
此方法没有返回值,如果出错会抛出异常。
此方法可以将一个 job 操作为 buried 状态,比如当你的消费者接收到这个 job 时,对 job 进行了一系列检查,经检查,发现这个 job 还不能被消费,此时可以将 job 操作为 buried ,直到有客户端对这个 job 发送了 kick 指令,才会被再次 reserve 。
## 延迟 ttr
```
$conn->touch($job);
```
此方法没有返回值,如果出错会抛出异常。
## 添加监听的管道
```
$conn->watch('test')->watch('sms');
```
此方法返回 $conn 本身,可链式操作,每次运行,都会为 watch list 添加一个 tube 。
## 删除监听的管道
```
$conn->ignore('email')->ignore('sms');
```
此方法返回 $conn 本身,可链式操作,每次运行,都会从 watch list 中移除一个 tube 。
> 默认,watch list 中会有一个 default ,所以,当你执行 watch 时,default 仍然在 watch list 中。
## 仅监听一个管道
```
$conn->watchOnly('sms');
```
此方法返回 $conn 本身,可链式操作。
通过此快捷方法,可以一次性移除 watch list 中所有的 tube ,除了此方法传入的 tube 以外。
# 其他命令
## 单纯获取 job
```
$conn->peek(new \Pheanstalk\Job(1,'')); // 根据 id 获取 job ,id 在 beanstalkd 是唯一的,不论处于什么 tube
$conn->useTube('order')->peekReady(); // 从 order tube 中获取排在最前面的 job(这个顺序,同 reserve 的顺序)
$conn->peekDelayed(); // 从 default tube 中获取(最)即将变成 ready 的 delayed job
$conn->peekBuried(); // 返回下一个 buried job
```
> 这里的 peekReady 、 peekDelayed 、 peekBuried 虽然在译文解释中,提到「下一个」,但它并非是一个指针,当你调用一起 peekReady ,下一次再调用 peekReady 就自动将指针移动到「下一个」。**如果你不手动将当前得到的 Job 操作为其他状态,或者延长延迟时间等,你多次调用 peek 命令,他将返回同一个 Job。**
## 批量操作 job 为 ready
```
$conn->useTube('order');
$kicked = $conn->kick(10);
print_r($kicked);
```
上面的代码片段,是针对 order tube 操作 10 条 buried 或 delayed 为 ready 。
具体可以看 beanstalkd 译文中的 kick 命令。
## 操作指定 job 为 ready
```
$conn->useTube('sms')->kickJob($job);
```
此方法没有返回值。
## 获取 job 的统计信息
```
$job = $conn->reserve();
$res = $conn->statsJob($job);
print_r($res);
```
此方法返回一个 ArrayResponse 实例,具体字段意义,需参考 beanstalkd 译文。
$res 如下:
```
Pheanstalk\Response\ArrayResponse Object
(
[name:Pheanstalk\Response\ArrayResponse:private] => OK
[storage:ArrayObject:private] => Array
(
[id] => 2
[tube] => default
[state] => reserved
[pri] => 1
[age] => 92865
[delay] => 0
[ttr] => 60
[time-left] => 59
[file] => 0
[reserves] => 8
[timeouts] => 0
[releases] => 0
[buries] => 0
[kicks] => 0
)
)
```
## 获取 tube 统计信息
```
$res = $conn->statsTube('default');
print_r($res);
```
此方法返回一个 ArrayResponse 实例,具体字段意义,需参考 beanstalkd 译文。
$res 如下:
```
Pheanstalk\Response\ArrayResponse Object
(
[name:Pheanstalk\Response\ArrayResponse:private] => OK
[storage:ArrayObject:private] => Array
(
[name] => default
[current-jobs-urgent] => 1
[current-jobs-ready] => 2
[current-jobs-reserved] => 0
[current-jobs-delayed] => 0
[current-jobs-buried] => 0
[total-jobs] => 2
[current-using] => 1
[current-watching] => 1
[current-waiting] => 0
[cmd-delete] => 0
[cmd-pause-tube] => 0
[pause] => 0
[pause-time-left] => 0
)
)
```
## 获取 beanstalkd 服务器统计信息
```
$res = $conn->stats();
print_r($res);
```
此方法返回一个 ArrayResponse 实例,具体字段意义,需参考 beanstalkd 译文。
$res 如下:
```
Pheanstalk\Response\ArrayResponse Object
(
[name:Pheanstalk\Response\ArrayResponse:private] => OK
[storage:ArrayObject:private] => Array
(
[current-jobs-urgent] => 1
[current-jobs-ready] => 2
[current-jobs-reserved] => 0
[current-jobs-delayed] => 0
[current-jobs-buried] => 0
[cmd-put] => 2
[cmd-peek] => 10
[cmd-peek-ready] => 11
[cmd-peek-delayed] => 10
[cmd-peek-buried] => 10
[cmd-reserve] => 2
[cmd-reserve-with-timeout] => 16
[cmd-delete] => 0
[cmd-release] => 0
[cmd-use] => 7
[cmd-watch] => 4
[cmd-ignore] => 4
[cmd-bury] => 0
[cmd-kick] => 4
[cmd-touch] => 0
[cmd-stats] => 1
[cmd-stats-job] => 1
[cmd-stats-tube] => 1
[cmd-list-tubes] => 0
[cmd-list-tube-used] => 0
[cmd-list-tubes-watched] => 0
[cmd-pause-tube] => 0
[job-timeouts] => 0
[total-jobs] => 2
[max-job-size] => 65535
[current-tubes] => 1
[current-connections] => 1
[current-producers] => 0
[current-workers] => 0
[current-waiting] => 0
[total-connections] => 37
[pid] => 1
[version] => 1.10
[rusage-utime] => 0.020000
[rusage-stime] => 0.030000
[uptime] => 96714
[binlog-oldest-index] => 0
[binlog-current-index] => 0
[binlog-records-migrated] => 0
[binlog-records-written] => 0
[binlog-max-size] => 10485760
[id] => a1e58a6bbd4c3b8b
[hostname] => 91db88742cda
)
)
```
## 查看当前 tube 列表
```
$res = $conn->listTubes();
print_r($res);
```
此方法将返回一个数组,如下:
```
Array
(
[0] => default
[1] => myTube
)
```
## 查看当前 use 的 tube 列表
```
$res = $conn->listTubeUsed();
print_r($res);
```
此方法返回一个字符串,为当前 used 的 tube 名。
注意,同一时间,只有一个 tube 会被 used 。
## 查看当前 watch list
```
$res = $conn->listTubesWatched();
print_r($res);
```
此方法将返回一个数组,如下:
```
Array
(
[0] => default
)
```
## 冻结 tube
```
$conn->pauseTube('default',90);
```
此方法没有返回值,会将 tube 冻结 90 秒,冻结期间,消费者无法 reserve job ,如果 tube 冻结后,有客户端发送了 reserve 指令,则会阻塞,直到冻结结束,或 reserve time out 。