企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# 接口 *Driver/QueueI.php* ``` <?php // +---------------------------------------------------------------------- // | QueueI.php // +---------------------------------------------------------------------- // | Description: queue interface // +---------------------------------------------------------------------- // | Time: 2018/12/19 上午11:17 // +---------------------------------------------------------------------- // | Author: Object,半醒的狐狸<2252390865@qq.com> // +---------------------------------------------------------------------- namespace Driver; interface QueueI { /** * @return array * 查询tubes列表 * 一个系统需要多个队列,它们可能分别用于存储短信、邮件等,它们互相隔离。 */ public function tubes(): array; /** * @param Job $job * @return Job * 向队列中存储一个消息(任务) */ public function put(Job $job): Job; /** * @param string $tube 需要指定从哪个队列接收任务 * @return Job * 从队列接收一个消息(任务) */ public function reserve(string $tube): Job; /** * @param Job $job * @return bool * 删除某个消息(任务) */ public function delete(Job $job): bool; /** * @param string $tube * @return array * 获取某个队列中的消息列表 */ public function jobs(string $tube): array; } ``` # 工具类 *Queue.php* ``` <?php // +---------------------------------------------------------------------- // | Queue.php // +---------------------------------------------------------------------- // | Description: 队列工具 // +---------------------------------------------------------------------- // | Time: 2018/12/19 上午11:15 // +---------------------------------------------------------------------- // | Author: Object,半醒的狐狸<2252390865@qq.com> // +---------------------------------------------------------------------- class Queue { /** * @param string $driver * @param array $options * 初始化 */ public static function init($driver = 'Mysql',$options = []) { $class = "Driver\\$driver"; self::$driver = new $class($options); } public static function tubes(): array { return self::$driver->tubes(); } public static function put(Job $job): Job { return self::$driver->put($job); } public static function reserve(string $tube = 'default'): Job { return self::$driver->reserve($tube); } public static function jobs(string $tube = 'default'): array { return self::$driver->jobs($tube); } public static function delete(Job $job): bool { return self::$driver->delete($job); } } ``` # 数据对象 *Driver/Job.php* ``` <?php // +---------------------------------------------------------------------- // | Job.php // +---------------------------------------------------------------------- // | Description: 任务对象 // +---------------------------------------------------------------------- // | Time: 2018/12/19 下午3:19 // +---------------------------------------------------------------------- // | Author: Object,半醒的狐狸<2252390865@qq.com> // +---------------------------------------------------------------------- namespace Driver; class Job { public $id = null; public $tube; public $status; public $job_data; public $attempts; public $sort; public $reserved_at; public $available_at; public $created_at; public static $field = [ 'id', 'tube', 'status', 'job_data', 'attempts', 'sort', 'reserved_at', 'available_at', 'created_at', ]; public static $field_string = 'id,tube,status,job_data,attempts,sort,' . 'reserved_at,available_at,created_at'; public static function arr2job($jobs) { $real_jobs = []; foreach ($jobs as $v) { if (!is_array($v)) { $v = json_decode($v, true); } $real_jobs[] = new Job($v); } return $real_jobs; } public function __construct(array $data = []) { foreach ($data as $k => $v) { $this->$k = $v; } $this->created_at = time(); $this->available_at = $this->created_at; } public function isEmpty() { return $this->job_data ? false : true; } } ```