[TOC]
## RWMUTEX
标准库中的 RWMutex 是一个 reader/writer 互斥锁。RWMutex 在某一时刻只能由任意数量的 reader 持有,或者是只被单个的 writer 持有。
读写锁的设计一般满足如下规则:
* 写写互斥:这个很好理解,与一般的互斥锁语义相同;
* 读写互斥,包含两部分含义,都是为了避免读不一致的情况产生:(类似mysql的幻读)
* 在拥有写锁的时候其他协程不能获取到读锁;
* 在拥有读锁的时候其他协程不能获取到写锁;
* 读读不互斥:不同的协程可以同时获取到读锁,这个是提高读性能的关键所在。
### 提供的方法
RWMutex 对外共提供了五个方法,分别是:
* Lock/Unlock:写操作时调用的方法。如果锁已经被 reader 或者 writer 持有,那么,Lock 方法会一直阻塞,直到能获取到锁;Unlock 则是配对的释放锁的方法。
* RLock/RUnlock:读操作时调用的方法。如果锁已经被 writer 持有的话,RLock 方法会一直阻塞,直到能获取到锁,否则就直接返回;而 RUnlock 是 reader 释放锁的方法。
* RLocker:这个方法的作用是为读操作返回一个 Locker 接口的对象。它的 Lock 方法会调用 RWMutex 的 RLock 方法,它的 Unlock 方法会调用 RWMutex 的 RUnlock 方法。
### RWMutex 实现
通过记录 readerCount 读锁的数量来进行控制,当有一个写锁的时候,会将读 锁数量设置为负数 1<<30。目的是让新进入的读锁等待之前的写锁释放通知读 锁。同样的当有写锁进行抢占时,也会等待之前的读锁都释放完毕,才会开始 21 进行后续的操作。 而等写锁释放完之后,会将值重新加上 1<<30, 并通知刚才 新进入的读锁(rw.readerSem),两者互相限制。
### RWMutex 结构体
~~~
type RWMutex struct {
w Mutex // 复用互斥锁能力
//写锁信号量 当阻塞写操作的读操作goroutine释放读锁时,通过该信号量通知阻塞的写操作的goroutine;
writerSem uint32
// 读锁信号量 当写操作goroutine释放写锁时,通过该信号量通知阻塞的读操作的goroutine
readerSem uint32
// 当前读操作的数量,包含所有已经获取到读锁或者被写操作阻塞的等待获取读锁的读操作数量
readerCount int32
// 获取写锁需要等待读锁释放的数量
readerWait int32
}
~~~
RWMutex 利用 readerCount / readerWait 属性,提供了一个非常巧妙的思路。写操作到来的时候,会把 readerCount 的值复制给 readerWait,用来标记在当前写操作之前的读操作的数量。当读锁释放的时候,会递减 readerWait,当它变成 0 的时候,就代表前面的读操作全部完成了,此时写操作会被唤醒。
### 写锁
~~~
const rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) Lock() {
// First, resolve competition with other writers.
// 写锁也就是互斥锁,复用互斥锁的能力来解决与其他写锁的竞争
// 如果写锁已经被获取了,其他goroutine在获取写锁时会进入自旋或者休眠
rw.w.Lock()
// 将readerCount设置为负值,告诉读锁现在有一个正在等待运行的写锁(获取互斥锁成功)
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 获取互斥锁成功并不代表goroutine获取写锁成功,我们默认最大有2^30的读操作数目,减去这个最大数目
// 后仍然不为0则表示前面还有读锁,需要等待读锁释放并更新写操作被阻塞时等待的读操作goroutine个数;
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
~~~
* 获取互斥锁,写锁也就是互斥锁,这里我们复用互斥锁mutex的加锁能力,当互斥锁加锁成功后,其他写锁goroutine再次尝试获取锁时就会进入自旋休眠等待;
* 判断获取写锁是否成功,这里有一个变量rwmutexMaxReaders = 1 << 30表示最大支持2^30个并发读,互斥锁加锁成功后,假设2^30个读操作都已经释放了读锁,通过原子操作将readerCount设置为负数在加上2^30,如果此时r仍然不为0说面还有读操作正在进行,则写锁需要等待,同时通过原子操作更新readerWait字段,也就是更新写操作被阻塞时等待的读操作goroutine个数;readerWait在上文的读锁释放锁时会进行判断,进行递减,当前readerWait递减到0时就会唤醒写锁。
~~~
func (rw *RWMutex) Unlock() {
// Announce to readers there is no active writer.
// 将readerCount的恢复为正数,也就是解除对读锁的互斥
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// 如果后面还有读操作的goroutine则需要唤醒他们
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 释放互斥锁,写操作的goroutine和读操作的goroutine同时竞争
rw.w.Unlock()
}
~~~
### 读锁
~~~
func (rw *RWMutex) RLock() {
// 原子操作readerCount 只要值不是负数就表示获取读锁成功
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 有一个正在等待的写锁,为了避免饥饿后面进来的读锁进行阻塞等待
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}
~~~
精简了竞态检测的方法,读锁方法就只有两行代码了,逻辑如下:
使用原子操作更新readerCount,将readercount值加1,只要原子操作后值不为负数就表示加读锁成功,如果值为负数表示已经有写锁获取互斥锁成功,写锁goroutine正在等待或运行,所以为了避免饥饿后面进来的读锁要进行阻塞等待,调用runtime\_SemacquireMutex阻塞等待。
释放读锁代码主要分为两部分,第一部分:
~~~
func (rw *RWMutex) RUnlock() {
// 将readerCount的值减1,如果值等于等于0直接退出即可;否则进入rUnlockSlow处理
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
}
~~~
readerCount的值代表当前正在执行的读操作goroutine数量,执行递减操作后的值大于等于0表示当前没有异常场景或写锁阻塞等待,所以直接退出即可,否则需要处理这两个逻辑:
rUnlockSlow逻辑如下:
~~~
func (rw *RWMutex) rUnlockSlow(r int32) {
// r+1等于0表示没有加读锁就释放读锁,异常场景要抛出异常
// r+1 == -rwmutexMaxReaders 也表示没有加读锁就是释放读锁
// 因为写锁加锁成功后会将readerCout的值减去rwmutexMaxReaders
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// 如果有写锁正在等待读锁时会更新readerWait的值,所以一步递减rw.readerWait值
// 如果readerWait在原子操作后的值等于0了说明当前阻塞写锁的读锁都已经释放了,需要唤醒等待的写锁
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}
~~~
代码解读:
* r+1等于0说明当前goroutine没有加读锁就进行释放读锁操作,属于非法操作
* r+1 == -rwmutexMaxReaders 说明写锁加锁成功了会将readerCount的减去rwmutexMaxReaders变成负数,如果此前没有加读锁,那么直接释放读锁就会造成这个等式成立,也属于没有加读锁就进行释放读锁操作,属于非法操作;
* readerWait代表写操作被阻塞时读操作的goroutine数量,如果有写锁正在等待时就会更新readerWait的值,读锁释放锁时需要readerWait进行递减,如果递减后等于0说明当前阻塞写锁的读锁都已经释放了,需要唤醒等待的写锁。(看下文写锁的代码就呼应上了)
### 读写锁插队的策略
假设现在有5个goroutine分别是G1、G2、G3、G4、G5,现在G1、G2获取读锁成功,还没释放读锁,G3要执行写操作,获取写锁失败就会阻塞等待,当前阻塞写锁的读锁goroutine数量为2
![](https://img.kancloud.cn/2c/07/2c07e600ae2d5c72d1ccef46c804d3bd_520x267.png)
后续G4进来想要获取读锁,这时她就会判断如果当前有写锁的goroutine正在阻塞等待,为了避免写锁饥饿,那这个G4也会进入阻塞等待,后续G5进来想要获取写锁,因为G3在占用互斥锁,所以G5会进入自旋/休眠 阻塞等待;
![](https://img.kancloud.cn/af/49/af4982854dda237a67f34559bc5b6cd5_774x261.png)
现在G1、G2释放了读锁,当释放读锁是判断如果阻塞写锁goroutine的读锁goroutine数量为0了并且有写锁等待就会唤醒正在阻塞等待的写锁G3,G3得到了唤醒:
![](https://img.kancloud.cn/6e/ec/6eec72bcd8f4e414c964802ad0763085_767x266.png)
G3处理完写操作后会释放写锁,这一步会同时唤醒等待的读锁/写锁的goroutine,至于G4、G5谁能先获取锁就看谁比较快
### 总结
* 读写锁提供四种操作:读上锁,读解锁,写上锁,写解锁;加锁规则是读读共享,写写互斥,读写互斥,写读互斥;
* 读写锁中的读锁是一定要存在的,其目的是也是为了规避原子性问题,只有写锁没有读锁的情况下会导致我们读取到中间值;
* Go语言的读写锁在设计上也避免了写锁饥饿的问题,通过字段readerCount、readerWait进行控制,当写锁的goroutine被阻塞时,后面进来想要获取读锁的goroutine也都会被阻塞住,当写锁释放时,会将后面的读操作goroutine、写操作的goroutine都唤醒,剩下的交给他们竞争吧;
> **读锁获取锁流程:**
* 锁空闲时,读锁可以立马被获取
* 如果当前有写锁正在阻塞,那么想要获取读锁的goroutine就会被休眠
>**释放读锁流程:**
* 判断是否没加锁就释放,如果是就抛出异常;
* 写锁被读锁阻塞等待的场景下,会将readerWait的值进行递减,readerWait表示阻塞写操作goroutine的读操作goroutine数量,当readerWait减到0时则可以唤醒被阻塞写操作的goroutine了;
>**写锁获取锁流程**
* 写锁复用了mutex互斥锁的能力,首先尝试获取互斥锁,获取互斥锁失败就会进入自旋/休眠;
* 获取互斥锁成功并不代表写锁加锁成功,此时如果还有占用读锁的goroutine,那么就会阻塞住,否则就会加写锁成功
>**释放写锁流程**
* 释放写锁会将负值的readerCount变成正值,解除对读锁的互斥
* 唤醒当前阻塞住的所有读锁
* 释放互斥锁
### RWMutex 注意事项
1. RWMutex 是单写多读锁,该锁可以加多个读锁或者一个写锁
2. 读锁占用的情况下会阻止写,不会阻止读,多个 Goroutine 可以同时获取 读锁
3. 写锁会阻止其他 Goroutine(无论读和写)进来,整个锁由该 Goroutine 独占
4. 适用于读多写少的场景
5. RWMutex 类型变量的零值是一个未锁定状态的互斥锁
6. RWMutex 在首次被使用之后就不能再被拷贝
7. RWMutex 的读锁或写锁在未锁定状态,解锁操作都会引发 panic
8. RWMutex 的一个写锁去锁定临界区的共享资源,如果临界区的共享资源已 被(读锁或写锁)锁定,这个写锁操作的 goroutine 将被阻塞直到解锁
9. RWMutex 的读锁不要用于递归调用,比较容易产生死锁
10. RWMutex 的锁定状态与特定的 goroutine 没有关联。一个 goroutine 可 以 RLock(Lock),另一个 goroutine 可以 RUnlock(Unlock)
11. 写锁被解锁后,所有因操作锁定读锁而被阻塞的 goroutine 会被唤醒,并 都可以成功锁定读锁
12. 读锁被解锁后,在没有被其他读锁锁定的前提下,所有因操作锁定写锁而 被阻塞的 Goroutine,其中等待时间最长的一个 Goroutine 会被唤醒
- Go准备工作
- 依赖管理
- Go基础
- 1、变量和常量
- 2、基本数据类型
- 3、运算符
- 4、流程控制
- 5、数组
- 数组声明和初始化
- 遍历
- 数组是值类型
- 6、切片
- 定义
- slice其他内容
- 7、map
- 8、函数
- 函数基础
- 函数进阶
- 9、指针
- 10、结构体
- 类型别名和自定义类型
- 结构体
- 11、接口
- 12、反射
- 13、并发
- 14、网络编程
- 15、单元测试
- Go常用库/包
- Context
- time
- strings/strconv
- file
- http
- Go常用第三方包
- Go优化
- Go问题排查
- Go框架
- 基础知识点的思考
- 面试题
- 八股文
- 操作系统
- 整理一份资料
- interface
- array
- slice
- map
- MUTEX
- RWMUTEX
- Channel
- waitGroup
- context
- reflect
- gc
- GMP和CSP
- Select
- Docker
- 基本命令
- dockerfile
- docker-compose
- rpc和grpc
- consul和etcd
- ETCD
- consul
- gin
- 一些小点
- 树
- K8s
- ES
- pprof
- mycat
- nginx
- 整理后的面试题
- 基础
- Map
- Chan
- GC
- GMP
- 并发
- 内存
- 算法
- docker