[TOC]
## waitGroup
### 基本结构
```
type WaitGroup struct {
// 避免复制使用的一个技巧,可以告诉vet工具违反了复制使用的规则,辅助Vet检查
// 这个在 go sync 包中有很多它出现的身影
noCopy noCopy
// 64 位:高 32 位作为计数器,低 32 位作为 waiter 计数
// 64 位的原子操作要求 64 位对齐,但 32 位编译器无法保证这个要求
// 因此分配 12 字节,然后将其中对齐的 8 字节作为状态,其他 4 字节用于存储原语
state1 [3]uint32
}
```
state1的方法:对64和32位的系统进行兼容
~~~
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
~~~
### 基本方法
#### **Add**
来增加设置计数器的值,对协程进行计数
~~~
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
// 竞态检查
if race.Enabled {
_ = *statep // trigger nil deref early
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
// delta 左移 32 位添加到计数器上面
state := atomic.AddUint64(statep, uint64(delta)<<32)
// v 代表 Add() 完之后当前计数器的值,取高 32 位的值
v := int32(state >> 32)
// w 代表当前调用 Wait 被阻塞的数量
w := uint32(state)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(semap))
}
// 非法
if v < 0 {
panic("sync: negative WaitGroup counter")
}
// w != 0,说明已经执行了 Wait() 操作,此时不允许再执行 Add()
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
// v == 0 && w > 0
// 此时不能再有一些状态的并发改变的问题:
// - Add() 和 Wait() 操作不能并发执行
// - 如果计数器的值已经是 0 了,此时不能再执行 Wait() 操作
if *statep != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 将 waiter 计数设置为 0,并且唤醒所有 waiter
// 由于 v 和 w 都是 0,所以这里直接将 *statep 设置为 0 就行
*statep = 0
// 唤醒所有 waiter
for ; w != 0; w-- {
// 释放信号量
runtime_Semrelease(semap, false, 0)
}
}
~~~
#### **Done**
Done() 方法比较简单,内部就是简单的调用了 Add() 方法,参数传 -1,将计数器的值减 1,代表当前协程工作完毕。
```
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
```
#### **Wait**
Wait() 方法在子 goroutine 执行完毕之前需要阻塞主 goroutine,其实现就是内部开了一个死循环,不停检查计数器的值,直到其为 0 才结束。
~~~
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
// 竞态检查
if race.Enabled {
_ = *statep // trigger nil deref early
race.Disable()
}
// 启动循环
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// 计数器已经变成 0 了,不需要再等待,直接返回
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
// 增加 waiter 数量(CAS)
// 直接在 state 的低位加就行,也就是直接 +1
if atomic.CompareAndSwapUint64(statep, state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(semap))
}
// 等待信号量唤醒
runtime_Semacquire(semap)
// 这种情况说明在上一轮 Wait() 返回之前,wg 被重新使用了(重新进行了 Add() / Wait())
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}
~~~
### WaitGroup 使用的一些注意事项
1.在任何时候都不要使计数器的值小于 0 ,这会引发程序的 panic。
2.Add() 方法的首次调用,与对它的 Wait() 方法的调用不能同时发生,例如在两个不同的 goroutine 中分别调用这两个方法,否则也会引发 panic。因此我们在声明完 WaitGroup 的时候要尽早调用 Add() 方法。
3.如果想要重复使用 WaitGroup,我们需要等待前一轮调用 Wait() 返回之后再发起下一轮的调用。
4.调用 Done() 方法的次数要与 Add() 的计数器值相等,否则将会 panic。
- Go准备工作
- 依赖管理
- Go基础
- 1、变量和常量
- 2、基本数据类型
- 3、运算符
- 4、流程控制
- 5、数组
- 数组声明和初始化
- 遍历
- 数组是值类型
- 6、切片
- 定义
- slice其他内容
- 7、map
- 8、函数
- 函数基础
- 函数进阶
- 9、指针
- 10、结构体
- 类型别名和自定义类型
- 结构体
- 11、接口
- 12、反射
- 13、并发
- 14、网络编程
- 15、单元测试
- Go常用库/包
- Context
- time
- strings/strconv
- file
- http
- Go常用第三方包
- Go优化
- Go问题排查
- Go框架
- 基础知识点的思考
- 面试题
- 八股文
- 操作系统
- 整理一份资料
- interface
- array
- slice
- map
- MUTEX
- RWMUTEX
- Channel
- waitGroup
- context
- reflect
- gc
- GMP和CSP
- Select
- Docker
- 基本命令
- dockerfile
- docker-compose
- rpc和grpc
- consul和etcd
- ETCD
- consul
- gin
- 一些小点
- 树
- K8s
- ES
- pprof
- mycat
- nginx
- 整理后的面试题
- 基础
- Map
- Chan
- GC
- GMP
- 并发
- 内存
- 算法
- docker