💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
锁是一种同步机制,用于在多任务环境中限制资源的访问,以满足互斥需求。 go源码sync包中经常用于同步操作的方式: * 原子操作 * 互斥锁 * 读写锁 * waitgroup 我们着重来分析下互斥锁和读写锁. 互斥锁: 下面是互斥锁的数据结构: ~~~go // A Mutex is a mutual exclusion lock. // The zero value for a Mutex is an unlocked mutex. // // A Mutex must not be copied after first use. type Mutex struct { state int32 // 互斥锁上锁状态枚举值如下所示 sema uint32 // 信号量,向处于Gwaitting的G发送信号 } const ( mutexLocked = 1 << iota // 值为1,表示在state中由低向高第1位,意义:锁是否可用,0可用,1不可用,锁定中 mutexWoken // 值为2,表示在state中由低向高第2位,意义:mutex是否被唤醒 mutexStarving // 当前的互斥锁进入饥饿状态; mutexWaiterShift = iota //值为2,表示state中统计阻塞在此mutex上goroutine的数目需要位移的偏移量 starvationThresholdNs = 1e6 ~~~ state和sema两个加起来只占 8 字节空间的结构体表示了 Go 语言中的互斥锁。 互斥锁的状态比较复杂,如下图所示,最低三位分别表示 mutexLocked、mutexWoken 和 mutexStarving,剩下的位置用来表示当前有多少个 Goroutine 等待互斥锁的释放. [![](https://github.com/KeKe-Li/data-structures-questions/raw/master/src/images/138.jpg)](https://github.com/KeKe-Li/data-structures-questions/blob/master/src/images/138.jpg) 在默认情况下,互斥锁的所有状态位都是 0,int32 中的不同位分别表示了不同的状态: * mutexLocked 表示互斥锁的锁定状态; * mutexWoken 表示从正常模式被从唤醒; * mutexStarving 当前的互斥锁进入饥饿状态; * waitersCount 当前互斥锁上等待的 Goroutine 个数; sync.Mutex 有两种模式,正常模式和饥饿模式。 在正常模式下,锁的等待者会按照先进先出的顺序获取锁。 但是刚被唤起的`Goroutine`与新创建的`Goroutine`竞争时,大概率会获取不到锁,为了减少这种情况的出现,一旦 Goroutine 超过 1ms 没有获取到锁,它就会将当前互斥锁切换饥饿模式,防止部分 Goroutine 被饿死。 饥饿模式是在 Go 语言 1.9 版本引入的优化的,引入的目的是保证互斥锁的公平性(Fairness)。 在饥饿模式中,互斥锁会直接交给等待队列最前面的 Goroutine。新的 Goroutine 在该状态下不能获取锁、也不会进入自旋状态,它们只会在队列的末尾等待。 如果一个 Goroutine 获得了互斥锁并且它在队列的末尾或者它等待的时间少于 1ms,那么当前的互斥锁就会被切换回正常模式。 相比于饥饿模式,正常模式下的互斥锁能够提供更好地性能,饥饿模式的能避免 Goroutine 由于陷入等待无法获取锁而造成的高尾延时。 互斥锁的加锁是靠 sync.Mutex.Lock 方法完成的, 当锁的状态是 0 时,将`mutexLocked`位置成 1: ~~~go // Lock locks m. // If the lock is already in use, the calling goroutine // blocks until the mutex is available. func (m *Mutex) Lock() { // Fast path: grab unlocked mutex. if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return } // Slow path (outlined so that the fast path can be inlined) m.lockSlow() } ~~~ 如果互斥锁的状态不是 0 时就会调用`sync.Mutex.lockSlow`尝试通过自旋(Spinnig)等方式等待锁的释放, 这个方法是一个非常大 for 循环,它获取锁的过程: 1. 判断当前 Goroutine 能否进入自旋; 2. 通过自旋等待互斥锁的释放; 3. 计算互斥锁的最新状态; 4. 更新互斥锁的状态并获取锁; 那么互斥锁是如何判断当前 Goroutine 能否进入自旋等互斥锁的释放,是通过它的lockSlow方法, 由于自旋是一种多线程同步机制,所以呢当前的进程在进入自旋的过程中会一直保持对 CPU 的占用,持续检查某个条件是否为真。 通常在多核的 CPU 上,自旋可以避免 Goroutine 的切换,使用得当会对性能带来很大的增益,但是往往使用的不得当就会拖慢整个程序. 所以 Goroutine 进入自旋的条件非常苛刻: * 互斥锁只有在普通模式才能进入自旋; * `runtime.sync_runtime_canSpin`需要返回 true: a. 需要运行在多 CPU 的机器上; b. 当前的Goroutine 为了获取该锁进入自旋的次数小于四次; c. 当前机器上至少存在一个正在运行的处理器 P 并且处理的运行队列为空; 一旦当前 Goroutine 能够进入自旋就会调用`runtime.sync_runtime_doSpin`和`runtime.procyield`并执行 30 次的 PAUSE 指令,该指令只会占用 CPU 并消耗 CPU 时间. 处理了自旋相关的特殊逻辑之后,互斥锁会根据上下文计算当前互斥锁最新的状态。 通过几个不同的条件分别会更新 state 字段中存储的不同信息,`mutexLocked`、`mutexStarving`、`mutexWoken`和`mutexWaiterShift`: ~~~go new := old if old&mutexStarving == 0 { new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { new &^= mutexWoken } ~~~ 计算了新的互斥锁状态之后,就会使用 CAS 函数 sync/atomic.CompareAndSwapInt32 更新该状态: ~~~go if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // 通过 CAS 函数获取了锁 } ... runtime_SemacquireMutex(&m.sema, queueLifo, 1) starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } } ~~~ 如果我们没有通过 CAS 获得锁,会调用`runtime.sync_runtime_SemacquireMutex`使用信号量保证资源不会被两个 Goroutine 获取。 `runtime.sync_runtime_SemacquireMutex`会在方法中不断调用尝试获取锁并休眠当前 Goroutine 等待信号量的释放,一旦当前 Goroutine 可以获取信号量,它就会立刻返回,`sync.Mutex.Lock`方法的剩余代码也会继续执行。 在正常模式下,这段代码会设置唤醒和饥饿标记、重置迭代次数并重新执行获取锁的循环. 在饥饿模式下,当前 Goroutine 会获得互斥锁,如果等待队列中只存在当前 Goroutine,互斥锁还会从饥饿模式中退出. 互斥锁的解锁过程`sync.Mutex.Unlock`与加锁过程相比就很简单,该过程会先使用`sync/atomic.AddInt32`函数快速解锁,这时会发生下面的两种情况: * 如果该函数返回的新状态等于 0,当前 Goroutine 就成功解锁了互斥锁; * 如果该函数返回的新状态不等于 0,这段代码会调用`sync.Mutex.unlockSlow`方法开始慢速解锁: ~~~go func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // Fast path: drop lock bit. new := atomic.AddInt32(&m.state, -mutexLocked) if new != 0 { // Outlined slow path to allow inlining the fast path. // To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock. m.unlockSlow(new) } } ~~~ `sync.Mutex.unlockSlow`方法首先会校验锁状态的合法性, 如果当前互斥锁已经被解锁过了就会直接抛出异常`sync: unlock of unlocked mutex`中止当前程序。 在正常情况下会根据当前互斥锁的状态,分别处理正常模式和饥饿模式下的互斥锁. ~~~go func (m *Mutex) unlockSlow(new int32) { if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } if new&mutexStarving == 0 { old := new for { // If there are no waiters or a goroutine has already // been woken or grabbed the lock, no need to wake anyone. // In starvation mode ownership is directly handed off from unlocking // goroutine to the next waiter. We are not part of this chain, // since we did not observe mutexStarving when we unlocked the mutex above. // So get off the way. if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // Grab the right to wake someone. new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state } } else { // Starving mode: handoff mutex ownership to the next waiter, and yield // our time slice so that the next waiter can start to run immediately. // Note: mutexLocked is not set, the waiter will set it after wakeup. // But mutex is still considered locked if mutexStarving is set, // so new coming goroutines won't acquire it. runtime_Semrelease(&m.sema, true, 1) } } ~~~ 在正常模式下,这段代码会分别处理以下两种情况处理: * 如果互斥锁不存在等待者或者互斥锁的`mutexLocked`、`mutexStarving`、`mutexWoken`状态不都为 0,那么当前方法就可以直接返回,不需要唤醒其他等待者; * 如果互斥锁存在等待者,会通过`sync.runtime_Semrelease`唤醒等待者并移交锁的所有权; 在饥饿模式下,上述代码会直接调用`sync.runtime_Semrelease`方法将当前锁交给下一个正在尝试获取锁的等待者,等待者被唤醒后会得到锁,在这时互斥锁还不会退出饥饿状态; 互斥锁的加锁过程比较复杂,它涉及自旋、信号量以及调度等概念: * 如果互斥锁处于初始化状态,就会直接通过置位 mutexLocked 加锁; * 如果互斥锁处于 mutexLocked 并且在普通模式下工作,就会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放; * 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式; * 互斥锁在正常情况下会通过`runtime.sync_runtime_SemacquireMutex`函数将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒当前 Goroutine; * 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,当前 Goroutine 会将互斥锁切换回正常模式; 互斥锁的解锁过程与之相比就比较简单,其代码行数不多、逻辑清晰,也比较容易理解: * 当互斥锁已经被解锁时,那么调用`sync.Mutex.Unlock`会直接抛出异常; * 当互斥锁处于饥饿模式时,会直接将锁的所有权交给队列中的下一个等待者,等待者会负责设置`mutexLocked`标志位; * 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,就会直接返回;在其他情况下会通过`sync.runtime_Semrelease`唤醒对应的 Goroutine. 读写锁: 读写互斥锁`sync.RWMutex`是细粒度的互斥锁,它不限制资源的并发读,但是读写、写写操作无法并行执行。 sync.RWMutex 中总共包含5 个字段: ~~~go type RWMutex struct { w Mutex // 复用互斥锁提供的能力 writerSem uint32 // 写等待读 readerSem uint32 // 读等待写 readerCount int32 // 存储了当前正在执行的读操作的数量 readerWait int32 // 当写操作被阻塞时等待的读操作个数 } ~~~ 我们从写锁开始分析: 当我们想要获取写锁时,需要调用`sync.RWMutex.Lock`方法: ~~~go func (rw *RWMutex) Lock() { if race.Enabled { _ = rw.w.state race.Disable() } // First, resolve competition with other writers. rw.w.Lock() // Announce to readers there is a pending writer. r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // Wait for active readers. if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_SemacquireMutex(&rw.writerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) race.Acquire(unsafe.Pointer(&rw.writerSem)) } } ~~~ * 这里调用结构体持有的`sync.Mutex`的`sync.Mutex.Lock`方法阻塞后续的写操作; 因为互斥锁已经被获取,其他 Goroutine 在获取写锁时就会进入自旋或者休眠; * 调用`sync/atomic.AddInt32`方法阻塞后续的读操作: 如果仍然有其他 Goroutine 持有互斥锁的读锁`(r != 0)`,该 Goroutine 会调用`runtime.sync_runtime_SemacquireMutex`进入休眠状态等待所有读锁所有者执行结束后释放`writerSem`信号量将当前协程唤醒。 写锁的释放会调用`sync.RWMutex.Unlock`方法: ~~~go func (rw *RWMutex) Unlock() { if race.Enabled { _ = rw.w.state race.Release(unsafe.Pointer(&rw.readerSem)) race.Disable() } // Announce to readers there is no active writer. r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { race.Enable() throw("sync: Unlock of unlocked RWMutex") } // Unblock blocked readers, if any. for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) } // Allow other writers to proceed. rw.w.Unlock() if race.Enabled { race.Enable() } } ~~~ 解锁与加锁的过程正好相反,写锁的释放分为以下几个步骤: 1. 调用`sync/atomic.AddInt32`函数将`readerCount`变回正数,释放读锁; 2. 通过 for 循环触发所有由于获取读锁而陷入等待的 Goroutine: 3. 调用`sync.Mutex.Unlock`方法释放写锁; 获取写锁时会先阻塞写锁的获取,后阻塞读锁的获取,这种策略能够保证读操作不会被连续的写操作饿死。 接着是读锁: 读锁的加锁方法`sync.RWMutex.RLock`就比较简单了,该方法会通过`sync/atomic.AddInt32`将`readerCount`加一: ~~~go func (rw *RWMutex) RLock() { if race.Enabled { _ = rw.w.state race.Disable() } if atomic.AddInt32(&rw.readerCount, 1) < 0 { // A writer is pending, wait for it. runtime_SemacquireMutex(&rw.readerSem, false, 0) } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(&rw.readerSem)) } } ~~~ 如果`RLock`该方法返回负数,其他 Goroutine 获得了写锁,当前 Goroutine 就会调用`runtime.sync_runtime_SemacquireMutex`陷入休眠等待锁的释放; 如果`RLock`该方法的结果为非负数,没有 Goroutine 获得写锁,当前方法就会成功返回. 当 Goroutine 想要释放读锁时,会调用如下所示的`RUnlock`方法: ~~~go func (rw *RWMutex) RUnlock() { if race.Enabled { _ = rw.w.state race.ReleaseMerge(unsafe.Pointer(&rw.writerSem)) race.Disable() } if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { // Outlined slow-path to allow the fast-path to be inlined rw.rUnlockSlow(r) } if race.Enabled { race.Enable() } } ~~~ 该方法会先减少正在读资源的`readerCount`整数,根据`sync/atomic.AddInt32`的返回值不同会分别进行处理: * 如果返回值大于等于零,表示读锁直接解锁成功. * 如果返回值小于零 ,表示有一个正在执行的写操作,在这时会调用`rUnlockSlow`方法. ~~~go func (rw *RWMutex) rUnlockSlow(r int32) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { race.Enable() throw("sync: RUnlock of unlocked RWMutex") } // A writer is pending. if atomic.AddInt32(&rw.readerWait, -1) == 0 { // The last reader unblocks the writer. runtime_Semrelease(&rw.writerSem, false, 1) } } ~~~ `rUnlockSlow`该方法会减少获取锁的写操作等待的读操作数`readerWait`并在所有读操作都被释放之后触发写操作的信号量,`writerSem`,该信号量被触发时,调度器就会唤醒尝试获取写锁的 Goroutine。 其实读写互斥锁(sync.RWMutex),虽然提供的功能非常复杂,不过因为它是在互斥锁( sync.Mutex)的基础上,所以整体的实现上会简单很多。 因此呢: * 调用`sync.RWMutex.Lock`尝试获取写锁时; 每次`sync.RWMutex.RUnlock`都会将`readerCount`其减一,当它归零时该 Goroutine 就会获得写锁, 将`readerCount`减少`rwmutexMaxReaders`个数以阻塞后续的读操作. * 调用`sync.RWMutex.Unlock`释放写锁时,会先通知所有的读操作,然后才会释放持有的互斥锁; 读写互斥锁在互斥锁之上提供了额外的更细粒度的控制,能够在读操作远远多于写操作时提升性能。