# Mysql 驱动
## 数据库表结构
*Driver/queue.sql*
```
CREATE TABLE `queues` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`tube` varchar(30) NOT NULL DEFAULT 'default',
`status` enum('ready','reserved') DEFAULT 'ready',
`job_data` text NOT NULL,
`attempts` int(11) NOT NULL DEFAULT '0',
`sort` int(10) NOT NULL DEFAULT '100',
`reserved_at` int(11) DEFAULT NULL,
`available_at` int(11) DEFAULT NULL,
`created_at` int(11) DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `queues_index` (`tube`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
```
> 字段意义可以参考《实现框架》一节中的 Job.php 注释
> 为tube加上索引,可以提升根据 tube 接收 job 的效率
## 驱动逻辑
*Driver/MysqlDriver.php*
```
<?php
// +----------------------------------------------------------------------
// | MysqlDriver.php
// +----------------------------------------------------------------------
// | Description: mysql驱动
// +----------------------------------------------------------------------
// | Time: 2018/12/19 上午11:17
// +----------------------------------------------------------------------
// | Author: Object,半醒的狐狸<2252390865@qq.com>
// +----------------------------------------------------------------------
namespace Driver;
class MysqlDriver implements QueueI
{
private $conn; // 数据库连接
private $config; // 配置
private $table; // 表
private $select_suffix; // 查询的前缀
private $delete_suffix; // 删除的前缀
private $update_suffix; // 更新的前缀
private $insert_suffix; // 插入的前缀
public function __construct($options = [])
{
$this->config = $options;
$this->conn = new \PDO(
$this->config['dsn'],
$this->config['username'],
$this->config['password']
);
$field_string = Job::$field_string;
$this->table = $this->config['table'];
$this->select_suffix = "SELECT {$field_string} FROM {$this->table}";
$this->delete_suffix = "DELETE FROM {$this->table}";
$this->update_suffix = "UPDATE {$this->table}";
$this->insert_suffix = "INSERT INTO {$this->table}";
}
public function tubes(): array
{
$sql = "SELECT `tube` FROM {$this->table} GROUP BY `tube`";
$res = $this->conn->query($sql);
if (!$res) {
throw new \PDOException('查询错误:' . $sql . '-错误提示:' . json_encode($statement->errorInfo()));
}
return $res->fetchAll(\PDO::FETCH_ASSOC);
}
public function delete(Job $job): bool
{
if (!$job->id) {
throw new \Exception('job id 不能为空');
}
$sql = "{$this->delete_suffix} WHERE id = :id";
$statement = $this->conn->prepare($sql);
$res = $statement->execute([':id' => $job->id]);
return $res;
}
public function jobs(string $tube): array
{
$sql = "{$this->select_suffix} WHERE tube = :tube";
$statement = $this->conn->prepare($sql);
$res = $statement->execute([':tube' => $tube]);
if (!$res) {
throw new \PDOException('查询错误:' . $sql . '-错误提示:' . json_encode($statement->errorInfo()));
}
return Job::arr2job($statement->fetchAll(\PDO::FETCH_ASSOC));
}
public function put(Job $job): Job
{
// 组装sql
$sql = "{$this->insert_suffix}";
$field = '';
$prepare = '';
$value = [];
foreach (Job::$field as $v) {
if ($job->$v) {
$field .= "{$v},";
$prepare .= ":{$v},";
$value[":{$v}"] = $job->$v;
}
}
$field = '(' . trim($field, ',') . ')';
$prepare = '(' . trim($prepare, ',') . ')';
$sql = "{$sql} {$field} VALUES {$prepare}";
// 执行sql
$statement = $this->conn->prepare($sql);
$res = $statement->execute($value);
// 结果
if (!$res) {
throw new \PDOException("插入错误:" . $sql . '-错误提示:' . json_encode($statement->errorInfo()));
}
$job->id = $this->conn->lastInsertId();
return $job;
}
public function reserve(string $tube): Job
{
$time = time();
$over_time = $time - $this->config['ttr'];
$sql = "{$this->select_suffix} WHERE (status = 'ready' OR (status = 'reserved' AND reserved_at <= {$over_time})) AND available_at <= {$time} AND tube = :tube ORDER BY sort limit 1";
$statement = $this->conn->prepare($sql);
$res = $statement->execute([':tube' => $tube]);
if (!$res) {
throw new \PDOException('查询错误:', $sql);
}
if ($data = $statement->fetch()) {
$job = new Job($data);
$attempts = $job->attempts + 1;
$time = time();
$sql = "{$this->update_suffix} SET status='reserved',attempts = {$attempts},reserved_at = {$time} WHERE id = {$job->id}";
$rows = $this->conn->exec($sql);
if ($rows <= 0) {
throw new \PDOException('更新出错:' . $sql . '-错误提示:' . json_encode($statement->errorInfo()));
}
return $job;
}
return new Job();
}
}
```
> 关于mysql的实现,主要观察其实现接口每个方法的sql与流程,特别是reserve方法,当接收成功之后,还需更新该条消息的状态。
# Redis 驱动
在这里,我们使用 redis 的 list 和 sorted-set 来实现数据的存储,因为它本身具备顺序和去重的特性。
> redis 服务的搭建与操作,此处不做介绍,这里主要模拟实现消息队列
另外,你的 PHP 环境还需开启 Redis 扩展。
接下来,是驱动的实现。
*Driver/RedisDriver.php*
```
<?php
// +----------------------------------------------------------------------
// | RedisDriver.php
// +----------------------------------------------------------------------
// | Description: redis驱动
// +----------------------------------------------------------------------
// | Time: 2018/12/19 上午11:17
// +----------------------------------------------------------------------
// | Author: Object,半醒的狐狸<2252390865@qq.com>
// +----------------------------------------------------------------------
namespace Driver;
class RedisDriver implements QueueI
{
private $conn;
private $config;
private $tubes_key;
public function __construct($options = [])
{
$this->conn = new \Redis();
$this->conn->connect($options['ip'], $options['port']);
if (isset($options['password'])) {
$this->conn->auth($options['password']);
}
$this->tubes_key = $options['tubes'];
}
public function tubes(): array
{
// 使用 sorted-set 存储当前拥有的队列,比如你 default、test、sms 队列
return $this->conn->zRange($this->tubes_key, 0, -1);
}
public function jobs(string $tube): array
{
return Job::arr2job($this->conn->lRange($tube, 0, -1));
}
public function put(Job $job): Job
{
// 维护 tube 集合,可实现不重复
$this->conn->zAdd($this->tubes_key, 1, $job->tube);
// 用 list 存储队列内容,返回的队列长度,就是这个 job 在 list 中的下标
if ($id = $this->conn->lPush($job->tube, json_encode($job))) {
$job->id = $id;
} else {
throw new \RedisException('插入失败');
}
return $job;
}
public function delete(Job $job): bool
{
// 在 redis 的 list 中不可使用 lRem 来删除具体项,具体原因,在后面测试一节描述
return true;
}
public function reserve(string $tube): Job
{
// redis 的rPop在接收时就会将 job 从 list 中删除,所以,没有 reserve 状态
if ($data = $this->conn->rPop($tube)) {
$job = json_decode($data, true);
}
return new Job($job ?? []);
}
}
```