企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 入队 ~~~ <?php $redis = new Redis(); $redis->connect("127.0.0.1", "6379"); //php客户端设置的ip及端口 $redis->auth('root'); //密码 $redis->zAdd('goods:delay:task', time() + 50, json_encode(['id' => 1, 'cid' => 1, 'name' => 'php'])); $redis->zAdd('goods:delay:task', time() + 60, json_encode(['id' => 2, 'cid' => 2, 'name' => 'js'])); $redis->zAdd('goods:delay:task', time() + 70, json_encode(['id' => 3, 'cid' => 3, 'name' => 'py'])); /** * 尝试3秒内获取锁 * @param string $lockName * @param int $timeout * @return bool|string */ function acquireLock($lockName, $timeout = 3) { global $redis; //唯一id $identifier = uniqid(); //当前时间+3秒 $end = time() + $timeout; //当这个时间大于等于当前时间就开始循环 while ($end >= time()) { //设置锁,key是锁的名字,值是唯一id,只有键key不存在的时候才会设置key的值 //设置成功就返回唯一id if ($redis->set($lockName, $identifier, ['nx'])) { return $identifier; } //暂停1000微秒 usleep(1000); } return false; } /** * 释放锁 * @param $lockName * @param $identifier * @return bool */ function releaseLock($lockName, $identifier) { global $redis; while (true) { //监控这个key的变化 $redis->watch($lockName); //如果redis中获取的这个key等于这个唯一ID if ($redis->get($lockName) == $identifier) { //开启事务 $redis->multi(Redis::MULTI); //删除这个key $redis->del($lockName); //执行事务 $res = $redis->exec(); //如果事务返回值是存在,并且等于1,表示事务成功 if (isset($res[0]) && $res[0] == 1) { return true; } } else { //如果获取的key不等于这个唯一索引 //取消监控 $redis->unwatch(); return false; } } } while (true) { // 因为是有序集合,只要判断第一条记录的延时时间,例如第一条未到执行时间 // 相对说明集合的其他任务未到执行时间 //返回有序集 key 中,指定区间内的成员。其中成员的位置按 score 值递增(从小到大)来排序 $rs = $redis->zRange('goods:delay:task', 0, 0, true);//["{"id":1,"cid":1,"name":"php"}" => 1512713765 ] // 集合没有任务,睡眠时间设置为5秒 if (empty($rs)) { echo 'no tasks , sleep 5 seconds' . PHP_EOL; sleep(5); continue; } //获得key $taskJson = key($rs);//{"id":1,"cid":1,"name":"php"} $delay = $rs[$taskJson];//获得时间 //把存储的json解析下来 $task = json_decode($taskJson, true); $now = time(); // 到时间执行延时任务 if ($delay <= $now) { // 对当前任务加锁,避免移动移动延时任务到任务队列时被其他客户端修改 //加锁成功会返回唯一id,不成功会返回false if (!($identifier = acquireLock($task['id']))) { continue; } // 移动延时任务到任务队列 //删,移除有序集key中的一个或多个成员,不存在的成员将被忽略 $redis->zRem('goods:delay:task', $taskJson); //增,只能将一个值value插入到列表key的表尾 $redis->rPush('goods:task', $taskJson); echo $task['id'] . ' run ' . PHP_EOL; // 释放锁,执行完了开始释放锁 releaseLock($task['id'], $identifier); } else { // 延时任务未到执行时间 $sleep = $delay - $now; // 最大值设置为2秒,保证如果有新的任务(延时时间1秒)进入集合时能够及时的被处理 // $sleep = $sleep > 2 ? 2 :$sleep; echo 'wait ' . $sleep . ' seconds ' . PHP_EOL; sleep($sleep); } } ~~~ # 出队 ~~~ <?php $redis = new Redis(); $redis->connect("127.0.0.1", "6379"); //php客户端设置的ip及端口 $redis->auth('root'); //密码 // 出队 while (true) { // 阻塞设置超时时间为3秒 //删除和获取列表中的第一个元素,或阻塞直到有可用 $task = $redis->blPop(['goods:task'], 3); if ($task) { $redis->rPush('goods:success:task', $task[1]); $task = json_decode($task[1], true); echo $task['id'] . ':' . $task['cid'] . ':' . 'handle success'; echo PHP_EOL; } else { echo 'nothing' . PHP_EOL; sleep(5); } } ~~~