## 简介 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。 ## 和定时任务区别 >延时任务有别于定时任务,定时任务往往是固定周期的,有明确的触发时间。 >[warning] 而延时任务一般没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件。 > 任务事件生成时并不想让消费者立即拿到,而是延迟一定时间后才接收到该事件进行消费。 ## 业务场景 - 订单超时,用户下单后进入支付页面(通常会有超时限制)超过15分钟没有进行操作,那么这个订单就需要作废处理。 - 如何定期检查处于退款状态的订单是否已经退款成功? - 注册后到现在已经一周的用户,如何发短信撩动。 - 交易信息双重效验防止因系统级/应用级/用户级等各种异常情况发生后导致的全部/部分丢失的订单信息。 - 实现重复通知,默认失败连续通知10次(通知间隔为`n*2+1/min`),直到消费方正确响应,超出推送上限次数后标记为异常状态,可进行恢复! ## 使用场景 > 延迟队列多用于需要延迟工作的场景。 最常见的是以下两种场景: ### 1、延迟消费 1. 用户生成订单之后,需要过一段时间校验订单的支付状态,如果订单仍未支付则需要及时地关闭订单。 2. 用户注册成功之后,需要过一段时间比如一周后校验用户的使用情况,如果发现用户活跃度较低,则发送邮件或者短信来提醒用户使用。 ### 2、延迟重试 比如消费者从队列里消费消息时失败了,但是想要延迟一段时间后自动重试。 >[warning] 如果不使用延迟队列,那么我们只能通过一个轮询扫描程序去完成。 ### 扫表存在的问题是 - 扫表与数据库长时间连接,在数量量大的情况容易出现连接异常中断,需要更多的异常处理,对程序健壮性要求高 - 在数据量大的情况下延时较高,规定内处理不完,影响业务,虽然可以启动多个进程来处理,这样会带来额外的维护成本,不能从根本上解决。 - 每个业务都要维护一个自己的扫表逻辑。 当业务越来越多时,发现扫表部分的逻辑会重复开发,但是非常类似 ## 缓存队列设计 ![](https://img.kancloud.cn/9e/bd/9ebdf012c01e3f385b8cac8c9a218cfa_1392x742.png) ## 场景设计 实际的生产场景是笔者负责的某个系统需要对接一个外部的资金方,每一笔资金下单后需要延时30分钟推送对应的附件。 这里简化为一个订单信息数据延迟处理的场景,就是每一笔下单记录一条订单消息(暂时叫做`OrderMessage`),订单消息需要延迟5到15秒后进行异步处理。 ![](https://img.kancloud.cn/08/68/08683eb976b047dac0d1a0b72f7ad4d8_1021x173.png) ## 延时队列的实现 选用了基于`Redis`的有序集合`Sorted Set`和`Crontab`短轮询进行实现。 ### 具体方案是: 1. 订单创建的时候,订单ID和当前时间戳分别作为`Sorted Set`的`member`和`score`添加到订单队列`Sorted Set`中。 2. 订单创建的时候,订单ID和推送内容`JSON`字符串分别作为`field`和`value`添加到订单队列内容`Hash`中。 3. 第1步和第2步操作的时候用`Lua`脚本保证原子性。 4. 使用一个异步线程通过`Sorted Set`的命令`ZREVRANGEBYSCORE`弹出指定数量的`订单ID`对应的订单队列内容`Hash`中的订单推送内容数据进行处理。 ### 对于第4点处理有两种方案: > 处理方案一 弹出订单内容数据的同时进行数据删除,也就是`ZREVRANGEBYSCORE`、`ZREM`和`HDEL`命令要在同一个`Lua`脚本中执行,这样的话`Lua`脚本的编写难度大,并且由于弹出数据已经在`Redis`中删除,如果数据处理失败则可能需要从数据库重新查询补偿。 > 处理方案二 弹出订单内容数据之后,在数据处理完成的时候再主动删除订单队列`Sorted Set`和订单队列内容`Hash`中对应的数据,这样的话需要控制并发,有重复执行的可能性。 >[warning] 选用了方案一,也就是从`Sorted Set`弹出订单ID并且从Hash中获取完推送数据之后马上删除这两个集合中对应的数据。 方案的流程图大概是这样: ![](https://img.kancloud.cn/c9/0a/c90afd71fb917ba12b8878138cd578d4_1094x565.png) ## 相关Redis命令 ### Sorted Set相关命令 >[success] `ZADD`命令 - 将一个或多个成员元素及其分数值加入到有序集当中。 ``` ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN ``` >[success] `ZREVRANGEBYSCORE`命令 - 返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。 ``` ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count] ``` - max:分数区间 - 最大分数。 - min:分数区间 - 最小分数。 - WITHSCORES:可选参数,是否返回分数值,指定则会返回得分值。 - LIMIT:可选参数,offset和count原理和`MySQL`的`LIMIT offset,size`一致,如果不指定此参数则返回整个集合的数据。 >[success] `ZREM`命令 - 用于移除有序集中的一个或多个成员,不存在的成员将被忽略。 ``` ZREM key member [member ...] ``` ### Hash相关命令 >[success] `HMSET`命令 - 同时将多个field-value(字段-值)对设置到哈希表中。 ``` HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN ``` >[success] `HDEL`命令 - 删除哈希表key中的一个或多个指定字段,不存在的字段将被忽略。 ``` HDEL KEY_NAME FIELD1.. FIELDN ``` ### Lua 语法 * 加载`Lua`脚本并且返回脚本的`SHA-1`字符串:`SCRIPT LOAD script`。 * 执行已经加载的`Lua`脚本:`EVALSHA sha1 numkeys key [key ...] arg [arg ...]`。 * `unpack`函数可以把`table`类型的参数转化为可变参数,不过需要注意的是`unpack`函数必须使用在非变量定义的函数调用的最后一个参数,否则会失效,详细见`Stackoverflow`的提问[table.unpack() only returns the first element](https://stackoverflow.com/questions/32439689/table-unpack-only-returns-the-first-element)。 >[warning] 如果不熟悉Lua语言,建议系统学习一下,因为想用好Redis,一定离不开Lua。 ## Lua 脚本 ### 入队` enqueue.lua` ```lua local zset_key = KEYS[1] local hash_key = KEYS[2] local zset_value = ARGV[1] local zset_score = ARGV[2] local hash_field = ARGV[3] local hash_value = ARGV[4] redis.call('ZADD', zset_key, zset_score, zset_value) redis.call('HSET', hash_key, hash_field, hash_value) return nil ``` > 将任务的执行时间作为score,要执行的任务数据作为value,存放在zset中 ### 出队 `dequeue.lua` ```lua local zset_key = KEYS[1] local hash_key = KEYS[2] local min_score = ARGV[1] local max_score = ARGV[2] local offset = ARGV[3] local limit = ARGV[4] -- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代 local status, type = next(redis.call('TYPE', zset_key)) if status ~= nil and status == 'ok' then if type == 'zset' then local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit) if list ~= nil and #list > 0 then -- unpack函数能把table转化为可变参数 redis.call('ZREM', zset_key, unpack(list)) local result = redis.call('HMGET', hash_key, unpack(list)) redis.call('HDEL', hash_key, unpack(list)) return result end end end return nil ``` > 如果最小的分数小于等于当前时间戳,就将该任务取出来执行,否则休眠一段时间后再查询。 >[danger] 注意:这里其实有一个性能隐患,命令`ZREVRANGEBYSCORE`的时间复杂度可以视为为O(N),N是集合的元素个数,由于这里把所有的订单信息都放进了同一个Sorted Set(ORDER_QUEUE)中,所以在一直有新增数据的时候,`dequeue`脚本的时间复杂度一直比较高,后续订单量升高之后会此处一定会成为性能瓶颈,后面会给出解决的方案 这里的出队使用`Crontab` 作为轮训去查询消费 ## 业务核心代码 ### 延迟队列类 RedisDelayQueue.php ```php <?php /** * @desc Redis 延迟任务队列 * @author Tinywan(ShaoBo Wan) * @date 2021/03/02 11:36 */ declare(strict_types=1); namespace redis; class RedisDelayQueue { // 生产者 脚本sha值 const DELAY_QUEUE_PRODUCER_SCRIPT_SHA = 'DELAY:QUEUE:PRODUCER:SCRIPT:SHA'; // 消费者 脚本sha值 const DELAY_QUEUE_CONSUMER_SCRIPT_SHA = 'DELAY:QUEUE:CONSUMER:SCRIPT:SHA'; // 订单关闭 const DELAY_QUEUE_ORDER_CLOSE = 'DELAY:QUEUE:ORDER:CLOSE'; // 订单关闭详情哈希 const DELAY_QUEUE_ORDER_CLOSE_HASH = 'DELAY:QUEUE:ORDER:CLOSE:HASH'; /** * Redis 静态实例 * @return \Redis */ private static function _redis() { $redis = \redis\BaseRedis::server(); $redis->select(3); return $redis; } /** * @desc: 延迟队列 生产者 * @param string $keys1 * @param string $keys2 * @param string $member * @param int $score * @param array $message * @return mixed */ public static function producer(string $keys1, string $keys2, string $member, int $score, array $message) { $redis = self::_redis(); $scriptSha = $redis->get(self::DELAY_QUEUE_PRODUCER_SCRIPT_SHA); if (!$scriptSha) { $script = <<<luascript redis.call('ZADD', KEYS[1], ARGV[1], ARGV[2]) redis.call('HSET', KEYS[2], ARGV[2], ARGV[3]) return 1 luascript; $scriptSha = $redis->script('load', $script); $redis->set(self::DELAY_QUEUE_PRODUCER_SCRIPT_SHA, $scriptSha); } $hashValue = json_encode($message, JSON_UNESCAPED_UNICODE); return $redis->evalSha($scriptSha, [$keys1, $keys2, $score, $member, $hashValue], 2); } /** * @desc: 延迟队列 消费者 * @param string $keys1 * @param string $keys2 * @param int $maxScore * @return mixed */ public static function consumer(string $keys1, string $keys2, int $maxScore) { $redis = self::_redis(); $scriptSha = $redis->get(self::DELAY_QUEUE_CONSUMER_SCRIPT_SHA); if (!$scriptSha) { $script = <<<luascript local status, type = next(redis.call('TYPE', KEYS[1])) if status ~= nil and status == 'ok' then if type == 'zset' then local list = redis.call('ZREVRANGEBYSCORE', KEYS[1], ARGV[1], ARGV[2], 'LIMIT', ARGV[3], ARGV[4]) if list ~= nil and #list > 0 then redis.call('ZREM', KEYS[1], unpack(list)) local result = redis.call('HMGET', KEYS[2], unpack(list)) redis.call('HDEL', KEYS[2], unpack(list)) return result end end end luascript; $scriptSha = $redis->script('load', $script); $redis->set(self::DELAY_QUEUE_CONSUMER_SCRIPT_SHA, $scriptSha); } return $redis->evalSha($scriptSha, [$keys1, $keys2, $maxScore, 0, 0, 10], 2); } } ``` > 用redis来实现可以依赖于redis自身的持久化来实现持久化,redis的集群来支持高并发和高可用。因此开发成本很小,可以做到很实时。 ## 脚本命令行 #### 生产者消息 ```php private function delayQueueOrderClose() { $orderId = time(); $keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE; $keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH; $score = time() + 60; // 延迟60秒执行 $message = [ 'event' => RedisDelayQueue::EVENT_ORDER_CLOSE, 'order_id' => $orderId, 'create_time' => time() ]; $res = RedisDelayQueue::producer($keys1, $keys2, (string) $orderId, $score, $message); var_dump($res); } ``` > 如果是ThinkPHP6 框架,执行该命令则可以生产消息,`php think crontab delay-queue-order-producer` 循环 ```php private function delayOrderProducer() { $keys1 = DelayQueue::KEY_ORDER_CLOSE; $keys2 = DelayQueue::KEY_ORDER_CLOSE_HASH; for ($i = 1; $i <= 10; $i++) { $orderId = 'S' . $i; $score = time(); // 延迟60秒执行 $message = [ 'event' => DelayQueue::EVENT_ORDER_CLOSE, 'order_id' => $orderId, 'create_time' => time() ]; $res = DelayQueue::producer($keys1, $keys2, (string) $orderId, $score, $message); var_dump($res); } } ``` #### 消费者消息 >1、通过Crontab 轮询执行 ```php private function delayQueueOrderConsumer() { $keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE; $keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH; $maxScore = time(); $queueList = RedisDelayQueue::consumer($keys1, $keys2, $maxScore); if (false === $queueList) { echo ' [x] Message List is Empty, Try Again ', "\n"; return; } var_dump($queueList); } ``` >[warning] 说明:如果最小的分数小于等于当前时间戳,就将该任务取出来执行,否则休眠一段时间后再查询 > 2、阻塞执行 ```php private function delayQueueOrderConsumerWhile() { $keys1 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE; $keys2 = RedisDelayQueue::DELAY_QUEUE_ORDER_CLOSE_HASH; while (true) { $maxScore = time(); $queueList = RedisDelayQueue::consumer($keys1, $keys2, $maxScore); if (false === $queueList) { echo ' [x] Message List is Empty, Try Again ', "\n"; sleep(1); continue; } // 处理业务 foreach ($queueList as $queue) { $messageArray = json_decode($queue, true); } } } ``` ## 数据删除为处理问题 >[danger] 方案一:弹出订单内容数据的同时进行数据删除,也就是ZREVRANGEBYSCORE、ZREM和HDEL命令要在同一个Lua脚本中执行,这样的话Lua脚本的编写难度大,并且由于弹出数据已经在Redis中删除,如果数据处理失败则可能需要从数据库重新查询补偿。 针对以上的解决方案就是:**消息进入到延迟队列后,保证至少被消费一次。** - 消费延迟队列消息后(zset结构中扫描到期的消息),不及时消费 - 把读取的消息放入一个 redis stream 队列,同时加入消费组 - 通过消费组消费 redis stream 消费,处理业务逻辑 - Redis Stream 消费组,读取消息处理并且 `ACK(将消息标记为"已处理")` - 如果消息读取但是没处理,则进入XPENDING 列表,进行二次消费并且 `ACK(将消息标记为"已处理")` ## Redis Stream