企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] ## MUTEX ### 互斥锁的实现机制 是 临界区 在并发编程中,如果程序中的一部分会被并发访问或修改,为了避免并发访问导致的意想不到的结果,这部分程序需要被保护起来,这部分被保护起来的程序,就叫做临界区。 ### 对外暴漏的方法和结构 Locker接口: ~~~ type Locker interface { Lock() Unlock() } ~~~ Mutex 就实现了这个接口,Lock请求锁,Unlock释放锁 ~~~ type Mutex struct { state int32 //锁状态,保护四部分含义 sema uint32 //信号量,用于阻塞等待或者唤醒 } ~~~ ![](https://img.kancloud.cn/90/a1/90a19a9aeb2c5b940d7f75532e163288_667x368.png) **Locked**:表示该 mutex 是否被锁定,0 表示没有,1 表示处于锁定状态; **Woken**:表示是否有协程被唤醒,0 表示没有,1 表示有协程处于唤醒状态,并且在加锁过程中; **Starving**:Go1.9 版本之后引入,表示 mutex 是否处于饥饿状态,0 表示没有,1 表示有协程处于饥饿状态; **Waiter**: 等待锁的协程数量。 #### 方法解析 ~~~ const ( // mutex is locked ,在低位,值 1 mutexLocked = 1 << iota //标识有协程被唤醒,处于 state 中的第二个 bit 位,值 2 mutexWoken //标识 mutex 处于饥饿模式,处于 state 中的第三个 bit 位,值 4 mutexStarving // 值 3,state 值通过右移三位可以得到 waiter 的数量 // 同理,state += 1 << mutexWaiterShift,可以累加 waiter 的数量 mutexWaiterShift = iota // 标识协程处于饥饿状态的最长阻塞时间,当前被设置为 1ms starvationThresholdNs = 1e6 ) ~~~ #### Lock ~~~ func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. //运气好,直接加锁成功 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // Slow path (outlined so that the fast path can be inlined) //内联,加锁失败,就得去自旋竞争或者饥饿模式下竞争 m.lockSlow() } ~~~ ~~~ func (m *Mutex) lockSlow() { var waitStartTime int64 // 标识是否处于饥饿模式 starving := false // 唤醒标记 awoke := false // 自旋次数 iter := 0 old := m.state for { // 非饥饿模式下,开启自旋操作 // 从 runtime_canSpin(iter) 的实现中(runtime/proc.sync_runtime_canSpin)可以知道, // 如果 iter 的值大于 4,将返回 false if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 如果没有其他 waiter 被唤醒,那么将当前协程置为唤醒状态,同时 CAS 更新 mutex 的 Woken 位 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } // 开启自旋 runtime_doSpin() iter++ // 重新检查 state 的值 old = m.state continue } new := old // 非饥饿状态 if old&mutexStarving == 0 { // 当前协程可以直接加锁 new |= mutexLocked } // mutex 已经被锁住或者处于饥饿模式 // 那么当前协程不能获取到锁,将会进入等待状态 if old&(mutexLocked|mutexStarving) != 0 { // waiter 数量加 1,当前协程处于等待状态 new += 1 << mutexWaiterShift } // 当前协程处于饥饿状态并且 mutex 依然被锁住,那么设置 mutex 为饥饿模式 if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } // 清除唤醒标记 // &^ 与非操作,mutexWoken: 10 -> 01 // 此操作之后,new 的 Locked 位值是 1,如果能够成功写入到 m.state 字段,那么当前协程获取锁成功 new &^= mutexWoken } // CAS 设置新状态成功 if atomic.CompareAndSwapInt32(&m.state, old, new) { // 旧的锁状态已经被释放并且处于非饥饿状态 // 这个时候当前协程正常请求到了锁,就可以直接返回了 if old&(mutexLocked|mutexStarving) == 0 { break } // 处理当前协程的饥饿状态 // 如果之前已经处于等待状态了(已经在队列里面),那么将其加入到队列头部,从而可以被高优唤醒 queueLifo := waitStartTime != 0 if waitStartTime == 0 { // 阻塞开始时间 waitStartTime = runtime_nanotime() } // P 操作,阻塞等待 runtime_SemacquireMutex(&m.sema, queueLifo, 1) // 唤醒之后,如果当前协程等待超过 1ms,那么标识当前协程处于饥饿状态 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state // mutex 已经处于饥饿模式 if old&mutexStarving != 0 { // 1. 如果当前协程被唤醒但是 mutex 还是处于锁住状态 // 那么 mutex 处于非法状态 // // 2. 或者如果此时 waiter 数量是 0,并且 mutex 未被锁住 // 代表当前协程没有在 waiters 中,但是却想要获取到锁,那么 mutex 状态非法 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } // delta 代表加锁并且将 waiter 数量减 1 两步操作 delta := int32(mutexLocked - 1<<mutexWaiterShift) // 非饥饿状态 或者 当前只剩下一个 waiter 了(就是当前协程本身) if !starving || old>>mutexWaiterShift == 1 { // 那么 mutex 退出饥饿模式 delta -= mutexStarving } // 设置新的状态 atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } if race.Enabled { race.Acquire(unsafe.Pointer(m)) } } ~~~ ### 加锁过程: (1)协程调用Lock()函数。 (2)判断这个锁是否未初始化,如果该锁没有初始化,则对该锁进行初始化设置,然后获取锁,Lock()函数返回。如果该锁已被初始化过,则进入下一步操作。 (3)判断锁与协程的当前状态,如果该锁处于加锁状态且当前协程不处于饥饿状态,则尝试获取该锁的协程会进入自旋状态(最多自旋4次)。 (4)自旋完成后,计算当前锁的状态,协程尝试获取锁,如果获取成功,Lock()函数返回。如果获取失败,则进入下一步操作。 (5)自旋完成后尝试获取锁失败,该协程进入休眠状态,协程进入阻塞状态,并不断尝试获取信号。 (6)休眠协程获取到信号(有其他协程释放了锁),被唤醒,此时go会判断当前协程是否处于饥饿状态(如果协程从调用Lock()函数起,超过1ms后都没有成功获取到锁,则该协程进入饥饿状态)。如果该协程处于饥饿状态,不会启动自旋过程,而是再次被阻塞,一旦有其他的协程释放了锁,那么该饥饿协程就会被唤醒并直接获取锁,Lock()函数返回。如果该协程处于正常状态,则进入下一步操作。 (7)如果协程被判定处于正常状态,即满足再次启动自旋条件(协程从调用Lock()函数起到现在被唤醒的总时间不超过1ms),则重置自旋次数,再次启动自旋,尝试获取锁。如果自旋结束后,该协程还是没有获取到锁,则回到步骤(3),开始新一轮循环。 在加锁过程中有两种情况: * Mutex 处于非饥饿模式:这个时候如果协程加锁不成功不会立刻进入阻塞队列,而是判断自己是否满足自旋的条件,如果满足,则启动自旋,在自旋过程中尝试获取锁。 * Mutex 处于饥饿模式:如果一个协程阻塞等待的时间过长(超过 1ms),那么 mutex 会被标记为饥饿模式,此时协程抢锁过程中不会开启自旋,而是一旦有协程释放了锁,那么一定会唤醒协程,被唤醒的协程将会成功获取到锁。 > 非饥饿模式下,协程会开启自旋操作。自旋能够避免协程切换,使当前自旋的协程有机会更快获取到锁。自旋对应 CPU 的 PAUSE 指令,相当于 CPU 进行空转。 由于自旋操作会很大程度上给 CPU 带来一定的压力,因此自旋不能无限制进行下去,所有在这里,会通过 `sync_runtime_canSpin(int)` 判断能否进入自旋, >允许自旋的条件如下: * 自旋次数要不超过 4 次,这个是根据入参确定的; * CPU 的核数要大于 1,否则自旋没有意义,因为没有其他的协程能够在当前协程自旋期间获取到时间片而释放锁,自旋只会白白浪费时间; * gomaxprocs 要大于 1,也就是 GMP 中的 P (Processor); * 至少有一个本地的 P 队列,并且其可运行的 G 队列为空。 #### UnLock ~~~ // 解锁操作 func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // mutexLocked 位设置为 0,解锁 new := atomic.AddInt32(&m.state, -mutexLocked) // 如果此时 state 值不是 0,代表其他位不是 0(或者出现异常使用导致 mutexLocked 位也不是 0) // 此时需要进一步做一些其他操作,比如唤醒等待中的协程等 if new != 0 { m.unlockSlow(new) } } ~~~ 解锁操作会根据 Mutex.state 的状态来判断需不需要去唤醒其他等待中的协程。 ~~~ func (m *Mutex) unlockSlow(new int32) { // new - state 字段原子减 1 之后的值,如果之前是处于加锁状态,那么此时 new 的末位应该是 0 // 此时 new+mutexLocked 正常情况下会将 new 末位变成 1 // 那么如果和 mutexLocked 做与运算之后的结果是 0,代表 new 值非法,解锁了一个未加锁的 mutex if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 如果不是处于饥饿状态 if new&mutexStarving == 0 { old := new for { // old>>mutexWaiterShift == 0 代表没有等待加锁的协程了,自然不需要执行唤醒操作 // old&mutexLocked != 0 代表已经有协程加锁成功,此时没有必要再唤醒一个协程(因为它不可能加锁成功) // old&mutexWoken != 0 代表已经有协程被唤醒并且在加锁过程中,此时不需要再执行唤醒操作了 // old&mutexStarving != 0 代表已经进入了饥饿状态, // 以上四种情况,皆不需要执行唤醒操作 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 唤醒一个等待中的协程,将 state woken 位置为 1 // old - 1<<mutexWaiterShift waiter 数量减 1 new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // 饥饿模式 // 将 mutex 的拥有权转移给下一个 waiter,并且交出 CPU 时间片,从而能够让下一个 waiter 立刻开始执行 runtime_Semrelease(&m.sema, true, 1) } } ~~~ 饥饿模式的最大等待时间阈值设置成了 1 毫秒,一旦等待者等待的时间超过了这个阈值,Mutex 的处理就有可能进入饥饿模式,优先让等待者先获取到锁,新来的让一下,给之前的一些机会。 通过加入饥饿模式,可以避免把机会全都留给新来的 goroutine,保证了请求锁的goroutine 获取锁的公平性。 ### 解锁过程 1、先判断当前是否加锁了,如果没加锁就解锁,直接报错 2、分两种情况: >1、正常模式下,判断是否需要唤醒g,如果不需要,就直接返回,如果需要就把waiter-1操作 >2、饥饿模式,直接把锁交给等待队列头部g,让他运行 #### 注意事项 1、**Lock() / Unlock() 方法一定要成对出现** 2、**小心拷贝 Mutex**,拷贝的话会带上之前的mutex的数据,导致出现问题 3、**Lock() / Unlock() 尽量在同一个协程里面**,遵循“**谁申请,谁释放**” ### 饥饿模式和正常模式 > Mutex 可能处于两种操作模式下:正常模式和饥饿模式 #### 正常模式(非公平锁) 正常模式下waiter 都是进入先入先出队列,被唤醒的 waiter 并不会直接持有锁,而是要和新来的 goroutine 进行竞争。新来的 goroutine 有先天的优势,它们正在 CPU 中运行,可能它们的数量还不少,所以,在高并发情况下,被唤醒的 waiter 可能比较悲剧地获取不到锁,这时,它会被插入到队列的前面。 ##### 正常锁-->饥饿锁触发条件: >如果 waiter 获取不到锁的时间超过阈值 1 毫秒,那么,这个 Mutex 就进入到了饥饿模式。 #### 饥饿模式(公平锁) Mutex 的拥有者将直接把锁交给队列最前面的 waiter。新来的 goroutine不会尝试获取锁,即使看起来锁没有被持有,它也不会去抢,也不会 spin,它会乖乖地加入到等待队列的尾部。 ##### 饥饿-->正常 转换: >* 此 waiter 已经是队列中的最后一个 waiter 了,没有其它的等待锁的 goroutine 了; >* 此 waiter 的等待时间小于 1 毫秒。 ### 总结 正常模式拥有更好的性能,因为即使有等待抢锁的 waiter,goroutine 也可以连续多次获取到锁。 饥饿模式是对公平性和性能的一种平衡,它避免了某些 goroutine 长时间的等待锁。在饥饿模式下,优先对待的是那些一直在等待的 waiter。