# 5.2 互斥锁
```
type Mutex struct {
state int32 // 表示 mutex 锁当前的状态
sema uint32 // 信号量,用于唤醒 goroutine
}
```
## 状态
```
const (
mutexLocked = 1 << iota // 互斥锁已锁住
mutexWoken
mutexStarving
mutexWaiterShift = iota
starvationThresholdNs = 1e6
)
```
Mutex 可能处于两种不同的模式:正常模式和饥饿模式。
在正常模式中,等待者按照 FIFO 的顺序排队获取锁,但是一个被唤醒的等待者有时候并不能获取 mutex, 它还需要和新到来的 goroutine 们竞争 mutex 的使用权。 新到来的 goroutine 存在一个优势,它们已经在 CPU 上运行且它们数量很多, 因此一个被唤醒的等待者有很大的概率获取不到锁,在这种情况下它处在等待队列的前面。 如果一个 goroutine 等待 mutex 释放的时间超过 1ms,它就会将 mutex 切换到饥饿模式
在饥饿模式中,mutex 的所有权直接从解锁的 goroutine 递交到等待队列中排在最前方的 goroutine。 新到达的 goroutine 们不要尝试去获取 mutex,即使它看起来是在解锁状态,也不要试图自旋, 而是排到等待队列的尾部。
如果一个等待者获得 mutex 的所有权,并且看到以下两种情况中的任一种:
1. 它是等待队列中的最后一个,
2. 它等待的时间少于 1ms,它便将 mutex 切换回正常操作模式
正常模式下的性能会更好,因为一个 goroutine 能在即使有很多阻塞的等待者时多次连续的获得一个 mutex,饥饿模式的重要性则在于避免了病态情况下的尾部延迟。
## 加锁
Lock 对申请锁的情况分为三种:
1. 无冲突,通过 CAS 操作把当前状态设置为加锁状态
2. 有冲突,开始自旋,并等待锁释放,如果其他 goroutine 在这段时间内释放该锁,直接获得该锁;如果没有释放则为下一种情况
3. 有冲突,且已经过了自旋阶段,通过调用 semrelease 让 goroutine 进入等待状态
```
func (m *Mutex) Lock() {
// 快速路径: 抓取并锁上未锁住状态的互斥锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
(...)
return
}
m.lockSlow()
}
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.state
for {
// Don't spin in starvation mode, ownership is handed off to waiters
// so we won't be able to acquire the mutex anyway.
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
// Active spinning makes sense.
// Try to set mutexWoken flag to inform Unlock
// to not wake other blocked goroutines.
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {
new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}
if awoke {
// The goroutine has been woken from sleep,
// so we need to reset the flag in either case.
if new&mutexWoken == 0 {
throw("sync: inconsistent mutex state")
}
new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {
if old&(mutexLocked|mutexStarving) == 0 {
break // locked the mutex with CAS
}
// If we were already waiting before, queue at the front of the queue.
queueLifo := waitStartTime != 0
if waitStartTime == 0 {
waitStartTime = runtime_nanotime()
}
runtime_SemacquireMutex(&m.sema, queueLifo, 1)
starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {
(...)
delta := int32(mutexLocked - 1<<mutexWaiterShift)
if !starving || old>>mutexWaiterShift == 1 {
// Exit starvation mode.
// Critical to do it here and consider wait time.
// Starvation mode is so inefficient, that two goroutines
// can go lock-step infinitely once they switch mutex
// to starvation mode.
delta -= mutexStarving
}
atomic.AddInt32(&m.state, delta)
break
}
awoke = true
iter = 0
} else {
old = m.state
}
}
(...)
}
```
## 解锁
```
func (m *Mutex) Unlock() {
(...)
// Fast path: drop lock bit.
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
// Outlined slow path to allow inlining the fast path.
// To hide unlockSlow during tracing we skip one extra frame when tracing GoUnblock.
m.unlockSlow(new)
}
}
func (m *Mutex) unlockSlow(new int32) {
(...)
if new&mutexStarving == 0 {
old := new
for {
// If there are no waiters or a goroutine has already
// been woken or grabbed the lock, no need to wake anyone.
// In starvation mode ownership is directly handed off from unlocking
// goroutine to the next waiter. We are not part of this chain,
// since we did not observe mutexStarving when we unlocked the mutex above.
// So get off the way.
// 如果没有等待着,或者已经存在一个 goroutine 被唤醒或者得到锁,
// 或处于饥饿模式,无需唤醒任何等待状态的 goroutine
if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
return
}
// Grab the right to wake someone.
new = (old - 1<<mutexWaiterShift) | mutexWoken
if atomic.CompareAndSwapInt32(&m.state, old, new) {
// 唤醒一个阻塞的 goroutine,但不是唤醒第一个等待者
runtime_Semrelease(&m.sema, false, 1)
return
}
old = m.state
}
} else {
// Starving mode: handoff mutex ownership to the next waiter.
// Note: mutexLocked is not set, the waiter will set it after wakeup.
// But mutex is still considered locked if mutexStarving is set,
// so new coming goroutines won't acquire it.
// 饥饿模式: 直接将 mutex 所有权交给等待队列最前端的 goroutine
runtime_Semrelease(&m.sema, true, 1)
}
}
```
## 例:并发安全的单例模式
利用原子操作和互斥锁我们可以轻松实现一个非常简单的并发安全单例模式,即 sync.Once。
sync.Once 用来保证绝对一次执行的对象,例如可在单例的初始化中使用。 它内部的结构也相对简单:
```
// Once 对象可以保证一个动作的绝对一次执行。
type Once struct {
// done 表明某个动作是否被执行
// 由于其使用频繁(热路径),故将其放在结构体的最上方
// 热路径在每个调用点进行内嵌
// 将 done 放在第一位,在某些架构下(amd64/x86)能获得更加紧凑的指令,
// 而在其他架构下能更少的指令(用于计算其偏移量)。
done uint32
m Mutex
}
```
注意,这个结构在 Go 1.13 中得到了重新调整,在其之前`done`字段在`m`之后。
源码也非常简单:
```
// Do 当且仅当第一次调用时,f 会被执行。换句话说,给定
// var once Once
// 如果 once.Do(f) 被多次调用则只有第一次会调用 f,即使每次提供的 f 不同。
// 每次执行必须新建一个 Once 实例。
//
// Do 用于变量的一次初始化,由于 f 是无参数的,因此有必要使用函数字面量来捕获参数:
// config.once.Do(func() { config.init(filename) })
//
// 因为该调用无返回值,因此如果 f 调用了 Do,则会导致死锁。
//
// 如果 f 发生 panic,则 Do 认为 f 已经返回;之后的调用也不会调用 f。
//
func (o *Once) Do(f func()) {
// 原子读取 Once 内部的 done 属性,是否为 0,是则进入慢速路径,否则直接调用
if atomic.LoadUint32(&o.done) == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
// 注意,我们只使用原子读读取了 o.done 的值,这是最快速的路径执行原子操作,即 fast-path
// 但当我们需要确保在并发状态下,是不是有多个人读到 0,因此必须加锁,这个操作相对昂贵,即 slow-path
o.m.Lock()
defer o.m.Unlock()
// 正好我们有一个并发的 goroutine 读到了 0,那么立即执行 f 并在结束时候调用原子写,将 o.done 修改为 1
if o.done == 0 {
defer atomic.StoreUint32(&o.done, 1)
f()
}
// 当 o.done 为 0 的 goroutine 解锁后,其他人会继续加锁,这时会发现 o.done 已经为了 1 ,于是 f 已经不用在继续执行了
}
```
- 第一部分 :基础篇
- 第1章 Go语言的前世今生
- 1.2 Go语言综述
- 1.3 顺序进程通讯
- 1.4 Plan9汇编语言
- 第2章 程序生命周期
- 2.1 从go命令谈起
- 2.2 Go程序编译流程
- 2.3 Go 程序启动引导
- 2.4 主Goroutine的生与死
- 第3 章 语言核心
- 3.1 数组.切片与字符串
- 3.2 散列表
- 3.3 函数调用
- 3.4 延迟语句
- 3.5 恐慌与恢复内建函数
- 3.6 通信原语
- 3.7 接口
- 3.8 运行时类型系统
- 3.9 类型别名
- 3.10 进一步阅读的参考文献
- 第4章 错误
- 4.1 问题的演化
- 4.2 错误值检查
- 4.3 错误格式与上下文
- 4.4 错误语义
- 4.5 错误处理的未来
- 4.6 进一步阅读的参考文献
- 第5章 同步模式
- 5.1 共享内存式同步模式
- 5.2 互斥锁
- 5.3 原子操作
- 5.4 条件变量
- 5.5 同步组
- 5.6 缓存池
- 5.7 并发安全散列表
- 5.8 上下文
- 5.9 内存一致模型
- 5.10 进一步阅读的文献参考
- 第二部分 运行时篇
- 第6章 并发调度
- 6.1 随机调度的基本概念
- 6.2 工作窃取式调度
- 6.3 MPG模型与并发调度单
- 6.4 调度循环
- 6.5 线程管理
- 6.6 信号处理机制
- 6.7 执行栈管理
- 6.8 协作与抢占
- 6.9 系统监控
- 6.10 网络轮询器
- 6.11 计时器
- 6.12 非均匀访存下的调度模型
- 6.13 进一步阅读的参考文献
- 第7章 内存分配
- 7.1 设计原则
- 7.2 组件
- 7.3 初始化
- 7.4 大对象分配
- 7.5 小对象分配
- 7.6 微对象分配
- 7.7 页分配器
- 7.8 内存统计
- 第8章 垃圾回收
- 8.1 垃圾回收的基本想法
- 8.2 写屏幕技术
- 8.3 调步模型与强弱触发边界
- 8.4 扫描标记与标记辅助
- 8.5 免清扫式位图技术
- 8.6 前进保障与终止检测
- 8.7 安全点分析
- 8.8 分代假设与代际回收
- 8.9 请求假设与实务制导回收
- 8.10 终结器
- 8.11 过去,现在与未来
- 8.12 垃圾回收统一理论
- 8.13 进一步阅读的参考文献
- 第三部分 工具链篇
- 第9章 代码分析
- 9.1 死锁检测
- 9.2 竞争检测
- 9.3 性能追踪
- 9.4 代码测试
- 9.5 基准测试
- 9.6 运行时统计量
- 9.7 语言服务协议
- 第10章 依赖管理
- 10.1 依赖管理的难点
- 10.2 语义化版本管理
- 10.3 最小版本选择算法
- 10.4 Vgo 与dep之争
- 第12章 泛型
- 12.1 泛型设计的演进
- 12.2 基于合约的泛型
- 12.3 类型检查技术
- 12.4 泛型的未来
- 12.5 进一步阅读的的参考文献
- 第13章 编译技术
- 13.1 词法与文法
- 13.2 中间表示
- 13.3 优化器
- 13.4 指针检查器
- 13.5 逃逸分析
- 13.6 自举
- 13.7 链接器
- 13.8 汇编器
- 13.9 调用规约
- 13.10 cgo与系统调用
- 结束语: Go去向何方?