多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
# Mysql 驱动 ## 数据库表结构 *Driver/queue.sql* ``` CREATE TABLE `queues` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `tube` varchar(30) NOT NULL DEFAULT 'default', `status` enum('ready','reserved') DEFAULT 'ready', `job_data` text NOT NULL, `attempts` int(11) NOT NULL DEFAULT '0', `sort` int(10) NOT NULL DEFAULT '100', `reserved_at` int(11) DEFAULT NULL, `available_at` int(11) DEFAULT NULL, `created_at` int(11) DEFAULT NULL, PRIMARY KEY (`id`), KEY `queues_index` (`tube`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ``` > 字段意义可以参考《实现框架》一节中的 Job.php 注释 > 为tube加上索引,可以提升根据 tube 接收 job 的效率 ## 驱动逻辑 *Driver/MysqlDriver.php* ``` <?php // +---------------------------------------------------------------------- // | MysqlDriver.php // +---------------------------------------------------------------------- // | Description: mysql驱动 // +---------------------------------------------------------------------- // | Time: 2018/12/19 上午11:17 // +---------------------------------------------------------------------- // | Author: Object,半醒的狐狸<2252390865@qq.com> // +---------------------------------------------------------------------- namespace Driver; class MysqlDriver implements QueueI { private $conn; // 数据库连接 private $config; // 配置 private $table; // 表 private $select_suffix; // 查询的前缀 private $delete_suffix; // 删除的前缀 private $update_suffix; // 更新的前缀 private $insert_suffix; // 插入的前缀 public function __construct($options = []) { $this->config = $options; $this->conn = new \PDO( $this->config['dsn'], $this->config['username'], $this->config['password'] ); $field_string = Job::$field_string; $this->table = $this->config['table']; $this->select_suffix = "SELECT {$field_string} FROM {$this->table}"; $this->delete_suffix = "DELETE FROM {$this->table}"; $this->update_suffix = "UPDATE {$this->table}"; $this->insert_suffix = "INSERT INTO {$this->table}"; } public function tubes(): array { $sql = "SELECT `tube` FROM {$this->table} GROUP BY `tube`"; $res = $this->conn->query($sql); if (!$res) { throw new \PDOException('查询错误:' . $sql . '-错误提示:' . json_encode($statement->errorInfo())); } return $res->fetchAll(\PDO::FETCH_ASSOC); } public function delete(Job $job): bool { if (!$job->id) { throw new \Exception('job id 不能为空'); } $sql = "{$this->delete_suffix} WHERE id = :id"; $statement = $this->conn->prepare($sql); $res = $statement->execute([':id' => $job->id]); return $res; } public function jobs(string $tube): array { $sql = "{$this->select_suffix} WHERE tube = :tube"; $statement = $this->conn->prepare($sql); $res = $statement->execute([':tube' => $tube]); if (!$res) { throw new \PDOException('查询错误:' . $sql . '-错误提示:' . json_encode($statement->errorInfo())); } return Job::arr2job($statement->fetchAll(\PDO::FETCH_ASSOC)); } public function put(Job $job): Job { // 组装sql $sql = "{$this->insert_suffix}"; $field = ''; $prepare = ''; $value = []; foreach (Job::$field as $v) { if ($job->$v) { $field .= "{$v},"; $prepare .= ":{$v},"; $value[":{$v}"] = $job->$v; } } $field = '(' . trim($field, ',') . ')'; $prepare = '(' . trim($prepare, ',') . ')'; $sql = "{$sql} {$field} VALUES {$prepare}"; // 执行sql $statement = $this->conn->prepare($sql); $res = $statement->execute($value); // 结果 if (!$res) { throw new \PDOException("插入错误:" . $sql . '-错误提示:' . json_encode($statement->errorInfo())); } $job->id = $this->conn->lastInsertId(); return $job; } public function reserve(string $tube): Job { $time = time(); $over_time = $time - $this->config['ttr']; $sql = "{$this->select_suffix} WHERE (status = 'ready' OR (status = 'reserved' AND reserved_at <= {$over_time})) AND available_at <= {$time} AND tube = :tube ORDER BY sort limit 1"; $statement = $this->conn->prepare($sql); $res = $statement->execute([':tube' => $tube]); if (!$res) { throw new \PDOException('查询错误:', $sql); } if ($data = $statement->fetch()) { $job = new Job($data); $attempts = $job->attempts + 1; $time = time(); $sql = "{$this->update_suffix} SET status='reserved',attempts = {$attempts},reserved_at = {$time} WHERE id = {$job->id}"; $rows = $this->conn->exec($sql); if ($rows <= 0) { throw new \PDOException('更新出错:' . $sql . '-错误提示:' . json_encode($statement->errorInfo())); } return $job; } return new Job(); } } ``` > 关于mysql的实现,主要观察其实现接口每个方法的sql与流程,特别是reserve方法,当接收成功之后,还需更新该条消息的状态。 # Redis 驱动 在这里,我们使用 redis 的 list 和 sorted-set 来实现数据的存储,因为它本身具备顺序和去重的特性。 > redis 服务的搭建与操作,此处不做介绍,这里主要模拟实现消息队列 另外,你的 PHP 环境还需开启 Redis 扩展。 接下来,是驱动的实现。 *Driver/RedisDriver.php* ``` <?php // +---------------------------------------------------------------------- // | RedisDriver.php // +---------------------------------------------------------------------- // | Description: redis驱动 // +---------------------------------------------------------------------- // | Time: 2018/12/19 上午11:17 // +---------------------------------------------------------------------- // | Author: Object,半醒的狐狸<2252390865@qq.com> // +---------------------------------------------------------------------- namespace Driver; class RedisDriver implements QueueI { private $conn; private $config; private $tubes_key; public function __construct($options = []) { $this->conn = new \Redis(); $this->conn->connect($options['ip'], $options['port']); if (isset($options['password'])) { $this->conn->auth($options['password']); } $this->tubes_key = $options['tubes']; } public function tubes(): array { // 使用 sorted-set 存储当前拥有的队列,比如你 default、test、sms 队列 return $this->conn->zRange($this->tubes_key, 0, -1); } public function jobs(string $tube): array { return Job::arr2job($this->conn->lRange($tube, 0, -1)); } public function put(Job $job): Job { // 维护 tube 集合,可实现不重复 $this->conn->zAdd($this->tubes_key, 1, $job->tube); // 用 list 存储队列内容,返回的队列长度,就是这个 job 在 list 中的下标 if ($id = $this->conn->lPush($job->tube, json_encode($job))) { $job->id = $id; } else { throw new \RedisException('插入失败'); } return $job; } public function delete(Job $job): bool { // 在 redis 的 list 中不可使用 lRem 来删除具体项,具体原因,在后面测试一节描述 return true; } public function reserve(string $tube): Job { // redis 的rPop在接收时就会将 job 从 list 中删除,所以,没有 reserve 状态 if ($data = $this->conn->rPop($tube)) { $job = json_decode($data, true); } return new Job($job ?? []); } } ```