ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
[TOC] # 简单实现 ## 命令 使用Redis的 SETNX 命令可以实现分布式锁,下文介绍其实现方法。 SETNX命令简介 命令格式 SETNX key value 将 key 的值设为 value,当且仅当 key 不存在。 若给定的 key 已经存在,则 SETNX 不做任何动作。 SETNX 是SET if Not eXists的简写。 返回值 返回整数,具体为 - 1,当 key 的值被设置 - 0,当 key 的值没被设置 ~~~ redis> SETNX mykey “hello” (integer) 1 redis> SETNX mykey “hello” (integer) 0 redis> GET mykey “hello” redis> ~~~ ## 使用SETNX实现分布式锁 多个进程执行以下Redis命令: ~~~ SETNX lock.foo <current Unix time + lock timeout + 1> ~~~ 如果 SETNX 返回1,说明该进程获得锁,SETNX将键 lock.foo 的值设置为锁的超时时间(当前时间 + 锁的有效时间)。 如果 SETNX 返回0,说明其他进程已经获得了锁,进程不能进入临界区。进程可以在一个循环中不断地尝试 SETNX 操作,以获得锁。 然而,锁超时时,我们不能简单地使用 DEL 命令删除键 lock.foo 以释放锁。考虑以下情况,进程P1已经首先获得了锁 lock.foo,然后进程P1挂掉了。进程P2,P3正在不断地检测锁是否已释放或者已超时,执行流程如下: * P2和P3进程读取键 lock.foo 的值,检测锁是否已超时(通过比较当前时间和键 lock.foo 的值来判断是否超时) * P2和P3进程发现锁 lock.foo 已超时 * P2执行 DEL lock.foo命令 * P2执行 SETNX lock.foo命令,并返回1,即P2获得锁 * P3执行 DEL lock.foo命令将P2刚刚设置的键 lock.foo 删除(这步是由于P3刚才已检测到锁已超时) * P3执行 SETNX lock.foo命令,并返回1,即P3获得锁 * P2和P3同时获得了锁 从上面的情况可以得知,在检测到锁超时后,进程不能直接简单地执行 DEL 删除键的操作以获得锁。 为了解决上述算法可能出现的多个进程同时获得锁的问题,我们再来看以下的算法。 我们同样假设进程P1已经首先获得了锁 lock.foo,然后进程P1挂掉了。接下来的情况: * 进程P4执行 SETNX lock.foo 以尝试获取锁 * 由于进程P1已获得了锁,所以P4执行 SETNX lock.foo 返回0,即获取锁失败 * P4执行 GET lock.foo 来检测锁是否已超时,如果没超时,则等待一段时间,再次检测 * 如果P4检测到锁已超时,即当前的时间大于键 lock.foo 的值,P4会执行以下操作 * GETSET lock.foo <current Unix timestamp + lock timeout + 1> * 由于 GETSET 操作在设置键的值的同时,还会返回键的旧值,通过比较键 lock.foo 的旧值是否小于当前时间,可以判断进程是否已获得锁 * 假如另一个进程P5也检测到锁已超时,并在P4之前执行了 GETSET 操作,那么P4的 GETSET 操作返回的是一个大于当前时间的时间戳,这样P4就不会获得锁而继续等待。注意到,即使P4接下来将键 lock.foo 的值设置了比P5设置的更大的值也没影响。 另外,值得注意的是,在进程释放锁,即执行 DEL lock.foo 操作前,需要先判断锁是否已超时。如果锁已超时,那么锁可能已由其他进程获得,这时直接执行 DEL lock.foo 操作会导致把其他进程已获得的锁释放掉。 ## 程序代码 用以下python代码来实现上述的使用 SETNX 命令作分布式锁的算法。 ~~~ LOCK_TIMEOUT = 3 lock = 0 lock_timeout = 0 lock_key = 'lock.foo' # 获取锁 while lock != 1: now = int(time.time()) lock_timeout = now + LOCK_TIMEOUT + 1 lock = redis_client.setnx(lock_key, lock_timeout) if lock == 1 or (now > int(redis_client.get(lock_key))) and now > int(redis_client.getset(lock_key, lock_timeout)): break else: time.sleep(0.001) # 已获得锁 do_job() # 释放锁 now = int(time.time()) if now < lock_timeout: redis_client.delete(lock_key) ~~~ # 配合lua实现 ## 简介 在分布式系统当中, Redis锁是一个很常用的工具. 举个很常见的例子就是: 某个接口需要去查询数据库的数据, 但是请求量却又很大, 所以我们一般会加一层缓存, 并且设定过期时间. 但是这里存在一个问题就是当并发量很大的情况下, 在缓存过期的瞬间, 会有大量的请求穿透去数据库请求数据, 造成缓存雪崩效应. 这时候如果有锁的机制, 那么就可以控制单个请求去更新缓存. 其实对于Redis锁的看法, 网上已经有很多了, 只是大部分都是基于Java来实现的, 这里给出一个PHP实现的版本. 这里考虑的只是单机部署Redis的情况, 相对会简单好理解, 而且也更加的实用. 如果有分布式Redis部署的情况, 可以参考下Redlock算法的实现. ## 基本要求 实现一个分布式锁定, 我们至少要考虑它能满足一下的这些需求: * 互斥, 就是要在任何的时刻, 同一个锁只能够有一个客户端用户锁定. * 不会死锁, 就算持有锁的客户端在持有期间崩溃了, 但是也不会影响后续的客户端加锁 * 谁加锁谁解锁, 很好理解, 加锁和解锁的必须是同一个客户端 ## 加锁 我们这里使用的是Predis这个这个PHP的客户端, 其他客户端也是同理. 先来看看代码: ~~~ class RedisTool { const LOCK_SUCCESS = 'OK'; const IF_NOT_EXIST = 'NX'; const MILLISECONDS_EXPIRE_TIME = 'PX'; const RELEASE_SUCCESS = 1; /** * 尝试获取锁 * @param \Predis\Client $redis redis客户端 * @param String $key 锁 * @param String $requestId 请求id * @param int $expireTime 过期时间 * @return bool 是否获取成功 */ public static function tryGetLock(\Predis\Client $redis, String $key, String $requestId, int $expireTime) { $result = $redis->set($key, $requestId, self::MILLISECONDS_EXPIRE_TIME, $expireTime, self::IF_NOT_EXIST); return self::LOCK_SUCCESS === (string)$result; } } ~~~ 定义一些Redis的操作符作为常量, 加锁的代码其实很简单, 一行代码即可. 简单解释下这个set方法的五个参数: * 第一个key是锁的名字, 这个由具体业务逻辑控制, 保证唯一即可 * 第二个是请求ID, 可能不好理解. 这样做的目的主要是为了保证加解锁的唯一性. 这样我们就可以知道该锁是哪个客户端加的. * 第三个参数是一个标识符, 标识时间戳以毫秒为最小单位 * 具体的过期时间 * 这个参数是NX, 表示当key不存在时我们才进行set操作 PS. 请求的唯一性ID生成方式很多, 可以参考下这个[chronos](https://github.com/XiaoMi/chronos), 该库是Java版本的 简单解释下上面的那段代码, 设置NX保证了只能有一个客户端获取到锁, 满足互斥性; 加入了过期时间, 保证在客户端崩溃后不会造成死锁; 请求ID的作用是用来标识客户端, 这样客户端在解锁的时候可以进行校验是否同一个客户端. # 解锁 当锁拥有的客户端完成了对共享资源的操作后, 释放锁需要用到Lua脚本, 也很简单: ~~~ if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end ~~~ PHP代码实现: ~~~ class RedisTool { const RELEASE_SUCCESS = 1; public static function releaseLock(\Predis\Client $redis, String $key, String $requestId) { $lua = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; $result = $redis->eval($lua, 1, $key, $requestId); return self::RELEASE_SUCCESS === $result; } } ~~~ 没想到一个简单的解锁操作也要用到Lua脚本, 待会会说说常见的几种错误解锁的方式. 其实为什么要用Lua脚本来实现, 主要是为了保证原子性. Redis的eval可以保证原子性, 主要还是源于Redis的特性, 可以看看官网的介绍 ## 常见错误# 1. 错误加锁 ~~~ public static function wrong1(\Predis\Client $redis, String $key, String $requestId, int $expireTime) { $result = $redis->setnx($key, $requestId); if ($result == 1) { // 这里程序挂了或者expire操作失败,则无法设置过期时间,将发生死锁 $redis->expire($key, $expireTime); } } ~~~ 这是比较常见的一种错误实现, 先通过setnx加锁, 然后在通过expire设置过期时间. 这样乍一看和上面的不都一样吗? 其实不然, 这是两条Redis命令, 不具有原子性, 如果在setnx之后程序挂了, 会使得锁没有设置过期时间, 这样就会发生死锁定. 2. 错误加锁 ~~~ public static function wrong2(\Predis\Client $redis, String $key, int $expireTime) { $expires = floor(microtime(true) * 1000) + $expireTime; // 如果当前锁不存在,返回加锁成功 if ($redis->setnx($key, $expires) == 1) { return true; } // 如果锁存在,获取锁的过期时间 $currentValue = floor($redis->get($key)); if ($currentValue != null && $currentValue < floor(microtime(true) * 1000)) { // 锁已过期,获取上一个锁的过期时间,并设置现在锁的过期时间 $oldValue = floor($redis->getSet($key, $expires)); if ($oldValue != null && $oldValue === $currentValue) { // 考虑并发的情况,只有设置值和当前值相同,它才有权利加锁 return true; } } // 其他情况,一律返回加锁失败 return false; } ~~~ 这个例子实现原理是使用setnx来加锁, 如果锁已经存在的话则获取锁的过期时间并且与当前的时间比较, 过期则设置新的时间, 并且返回加锁成功. 虽然这样也可以加锁, 但是会存在几个问题: * 因为时间是客户端生成的, 这样就必须要保证在分布式环境下客户端的时间必须要同步 * 当锁过期后, 多个客户端同时执行getSet方法, 虽然可以保证互斥性, 只适合这个锁的过期时间在高并发或者多线程的情况下有一定的可能被其他客户端给覆盖 * 锁没有客户端的标识, 这样任何一个客户端都能够解锁 3. 错误解锁 ~~~ public static function wrongRelease1(\Predis\Client $redis, String $key) { $redis->del([$key]); } ~~~ 这是最典型的错误了, 这样的做法没判断锁的拥有者, 会使得任何一个客户端都可以解锁, 甚至会把别人的锁给解除了. 4. 错误解锁 ~~~ public static function wrongRelease2(\Predis\Client $redis, String $key, String $requestId) { // 判断加锁与解锁是不是同一个客户端 if ($requestId === $redis->get($key)) { // 若在此时,这把锁突然不是这个客户端的,则会误解锁 $redis->del([$key]); } ~~~ 上面的解锁也是没有保证原子性, 注释说的很明白了, 有这样的场景来复现: 客户端A加锁成功后一段时间再来解锁, 在执行删除del操作的时候锁过期了, 而且这时候又有其他客户端B来加锁(这时候加锁是肯定成功的, 因为客户端A的锁过期了), 这是客户端A再执行删除del操作, 会把客户端B的锁给清了.