企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] ## waitGroup ### 基本结构 ``` type WaitGroup struct { // 避免复制使用的一个技巧,可以告诉vet工具违反了复制使用的规则,辅助Vet检查 // 这个在 go sync 包中有很多它出现的身影 noCopy noCopy // 64 位:高 32 位作为计数器,低 32 位作为 waiter 计数 // 64 位的原子操作要求 64 位对齐,但 32 位编译器无法保证这个要求 // 因此分配 12 字节,然后将其中对齐的 8 字节作为状态,其他 4 字节用于存储原语 state1 [3]uint32 } ``` state1的方法:对64和32位的系统进行兼容 ~~~ func (wg *WaitGroup) state() (statep *uint64, semap *uint32) { if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2] } else { return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0] } } ~~~ ### 基本方法 #### **Add** 来增加设置计数器的值,对协程进行计数 ~~~ func (wg *WaitGroup) Add(delta int) { statep, semap := wg.state() // 竞态检查 if race.Enabled { _ = *statep // trigger nil deref early if delta < 0 { // Synchronize decrements with Wait. race.ReleaseMerge(unsafe.Pointer(wg)) } race.Disable() defer race.Enable() } // delta 左移 32 位添加到计数器上面 state := atomic.AddUint64(statep, uint64(delta)<<32) // v 代表 Add() 完之后当前计数器的值,取高 32 位的值 v := int32(state >> 32) // w 代表当前调用 Wait 被阻塞的数量 w := uint32(state) if race.Enabled && delta > 0 && v == int32(delta) { // The first increment must be synchronized with Wait. // Need to model this as a read, because there can be // several concurrent wg.counter transitions from 0. race.Read(unsafe.Pointer(semap)) } // 非法 if v < 0 { panic("sync: negative WaitGroup counter") } // w != 0,说明已经执行了 Wait() 操作,此时不允许再执行 Add() if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } if v > 0 || w == 0 { return } // v == 0 && w > 0 // 此时不能再有一些状态的并发改变的问题: // - Add() 和 Wait() 操作不能并发执行 // - 如果计数器的值已经是 0 了,此时不能再执行 Wait() 操作 if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 将 waiter 计数设置为 0,并且唤醒所有 waiter // 由于 v 和 w 都是 0,所以这里直接将 *statep 设置为 0 就行 *statep = 0 // 唤醒所有 waiter for ; w != 0; w-- { // 释放信号量 runtime_Semrelease(semap, false, 0) } } ~~~ #### **Done** Done() 方法比较简单,内部就是简单的调用了 Add() 方法,参数传 -1,将计数器的值减 1,代表当前协程工作完毕。 ``` func (wg *WaitGroup) Done() { wg.Add(-1) } ``` #### **Wait** Wait() 方法在子 goroutine 执行完毕之前需要阻塞主 goroutine,其实现就是内部开了一个死循环,不停检查计数器的值,直到其为 0 才结束。 ~~~ func (wg *WaitGroup) Wait() { statep, semap := wg.state() // 竞态检查 if race.Enabled { _ = *statep // trigger nil deref early race.Disable() } // 启动循环 for { state := atomic.LoadUint64(statep) v := int32(state >> 32) w := uint32(state) if v == 0 { // 计数器已经变成 0 了,不需要再等待,直接返回 if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } // 增加 waiter 数量(CAS) // 直接在 state 的低位加就行,也就是直接 +1 if atomic.CompareAndSwapUint64(statep, state, state+1) { if race.Enabled && w == 0 { // Wait must be synchronized with the first Add. // Need to model this is as a write to race with the read in Add. // As a consequence, can do the write only for the first waiter, // otherwise concurrent Waits will race with each other. race.Write(unsafe.Pointer(semap)) } // 等待信号量唤醒 runtime_Semacquire(semap) // 这种情况说明在上一轮 Wait() 返回之前,wg 被重新使用了(重新进行了 Add() / Wait()) if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } } } ~~~ ### WaitGroup 使用的一些注意事项 1.在任何时候都不要使计数器的值小于 0 ,这会引发程序的 panic。 2.Add() 方法的首次调用,与对它的 Wait() 方法的调用不能同时发生,例如在两个不同的 goroutine 中分别调用这两个方法,否则也会引发 panic。因此我们在声明完 WaitGroup 的时候要尽早调用 Add() 方法。 3.如果想要重复使用 WaitGroup,我们需要等待前一轮调用 Wait() 返回之后再发起下一轮的调用。 4.调用 Done() 方法的次数要与 Add() 的计数器值相等,否则将会 panic。