合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
> # 内容 * [内容](https://www.kancloud.cn/book/xiaohuamao/go100/edit#_0) * [G的状态](https://www.kancloud.cn/book/xiaohuamao/go100/edit#G_4) * [P的状态](https://www.kancloud.cn/book/xiaohuamao/go100/edit#P_29) * [GMP调度模型](https://www.kancloud.cn/book/xiaohuamao/go100/edit#GMP_40) * [GMP数据结构](https://www.kancloud.cn/book/xiaohuamao/go100/edit#GMP_57) * [协程的调度不是随机的](https://www.kancloud.cn/book/xiaohuamao/go100/edit#_396) * [CSP 模型](https://www.kancloud.cn/book/xiaohuamao/go100/edit#CSP__431) * [协程 (捕获异常 和 协程池)](https://www.kancloud.cn/book/xiaohuamao/go100/edit#____436) * [channel 数据结构](https://www.kancloud.cn/book/xiaohuamao/go100/edit#channel__535) * [操作 nil channel, close channel, 正常 channel](https://www.kancloud.cn/book/xiaohuamao/go100/edit#_nil_channel_close_channel__channel_556) * [channel 发送和接收数据的本质](https://www.kancloud.cn/book/xiaohuamao/go100/edit#channel__563) * [range channel](https://www.kancloud.cn/book/xiaohuamao/go100/edit#range_channel_581) * [使用 select 来多路复用 channel](https://www.kancloud.cn/book/xiaohuamao/go100/edit#_select__channel_606) * [如何优雅的关闭 channel](https://www.kancloud.cn/book/xiaohuamao/go100/edit#_channel_654) * [交替打印](https://www.kancloud.cn/book/xiaohuamao/go100/edit#_789) > # G的状态 ~~~ const ( _Gidle = iota // 0: 刚创建但尚未初始化,未分配栈空间 _Grunnable // 1: 已就绪(创建后/被唤醒时),等待调度执行,位于运行队列 _Grunning // 2: 正正在运行,绑定到M和P,其栈被锁定,无法被GC或其他线程访问 _Gsyscall // 3: 正在执行系统调用(文件I/O、网络I/O、线程同步操作等),处于内核态,栈被锁定,不在运行队列 _Gwaiting // 4: 被阻塞,等待资源(如通道、锁、网络等),不在运行队列 _Gmoribund_unused // 5: 未使用状态(保留,占位符) _Gdead // 6: 已结束生命周期 _Genqueue_unused // 7: 未使用状态(保留,占位符) _Gcopystack // 8: 栈正在动态扩容/收缩,禁止调度(每G的栈初始大小为2KB,当栈空间不足时会分配更大的栈,并将现有的栈内容复制到新栈中) _Gpreempted // 9: 被调度器抢占暂停,等待重新调度, 在运行队列 // GC 扫描相关标志 _Gscan = 0x1000 // 标志位,用于叠加在其他状态上,表示 GC 正在扫描 Goroutine 栈 _Gscanrunnable = _Gscan + _Grunnable // 0x1001: GC 正在扫描运行队列中的 Goroutine 的栈 _Gscanrunning = _Gscan + _Grunning // 0x1002: GC 正在扫描正在运行的 Goroutine 的栈 _Gscansyscall = _Gscan + _Gsyscall // 0x1003: GC 正在扫描处于系统调用中的 Goroutine 的栈 _Gscanwaiting = _Gscan + _Gwaiting // 0x1004: GC 正在扫描处于等待状态的 Goroutine 的栈 _Gscanpreempted = _Gscan + _Gpreempted // 0x1009: GC 正在扫描被抢占暂停的 Goroutine 的栈 ) ~~~ > # P的状态 ~~~ const ( _Pidle = iota // 0: 空闲状态,未被 M 使用,未运行用户代码或调度工作 _Prunning // 1: 正在运行,被 M 持有,运行用户代码或调度 Goroutine _Psyscall // 2: 与执行系统调用的 M 关联,暂不运行用户代码 _Pgcstop // 3: 暂停状态,GC 的 STW 阶段被停止 _Pdead // 4: 不可用状态,通常因 GOMAXPROCS 设置减少 ) ~~~ > # GMP调度模型 * 用户级线程(G):内核级线程(M) 多对多模型 * 数量 * P的数量:默认情况下P的数量与逻辑CPU核心数相同, 通过runtime.GOMAXPROCS(n)来修改, 通过runtime.NumCPU()查看 * M和G数量:由调度器动态管理, 通过runtime.NumGoroutine()查看G数量, M的最大数量:sched.maxmcount = 10000 * 流程 * 创建G:先加入P的本地队列, 如果本地队列已满, 则加入到全局队列 * 获取G:优先从P的本地运列获取G, 如果本地队列为空,从全局队列获取G, 如果全局队列也为空,从其他P的本地队列窃取任务 * 执行G: * \_Gsyscall时: 不在运行队列, 当前的P释放M,绑定新的M继续调度, 旧M上调度的旧G执行完后, 旧G加入某个P的本地队列或全局队列,旧M限制的时候会加入全局空闲队列(旧M在没有执行完系统调用前不会被其它P绑定) * \_Gwaiting时:不在运行队列, 释放G的绑定, 单独的G等待事件完成后唤醒, 然后加个某个P的本地队列或全局队列 * \_Gpreempted时:在运行队列,被抢占的 G因为时间片用尽或调度器的其他策略,暂时让出 CPU * \_Gdead时: 当前的G不需要执行,销毁 ![](https://git.kancloud.cn/repos/xiaohuamao/go100/raw/e34f0b206fb87426acef5cface4f91a47035c35c/images/18-go-func%E8%B0%83%E5%BA%A6%E5%91%A8%E6%9C%9F.jpeg?access-token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzQ4OTQ5NTQsImlhdCI6MTczNDg1MTc1NCwicmVwb3NpdG9yeSI6InhpYW9odWFtYW9cL2dvMTAwIiwidXNlciI6eyJ1c2VybmFtZSI6InhpYW9odWFtYW8iLCJuYW1lIjoiXHU0ZjFmXHU2ZDJhd2lubmllIiwiZW1haWwiOiIyOTg3NDIzMTY0QHFxLmNvbSIsInRva2VuIjoiMzA4ZjlmN2UwYTZkZTY5MzQyNTk0Y2YzMjFkNzA2ZTYiLCJhdXRob3JpemUiOnsicHVsbCI6dHJ1ZSwicHVzaCI6dHJ1ZSwiYWRtaW4iOnRydWV9fX0.Gfne-tXSRr9KoDkXSKBtBUVW-CNbVQJK6yWyFXbCTHI) (图片来源:[https://github.com/aceld/golang/blob/main/images/18-go-func调度周期.jpeg](https://github.com/aceld/golang/blob/main/images/18-go-func%E8%B0%83%E5%BA%A6%E5%91%A8%E6%9C%9F.jpeg)) > # GMP数据结构 * G:任务同单位, 通过m字段绑定所属的M * P:持有G队列, 通过m字段绑定所属的M * M: curg当前的G, p字段 当前的P * schedt:管控全局资源分配,调度行为,以及 GC 协调 ~~~ type g struct { // 栈相关字段 stack stack // Goroutine 栈的范围:[stack.lo, stack.hi) stackguard0 uintptr // 检测栈溢出的基准指针,通常为 stack.lo + StackGuard stackguard1 uintptr // 特殊栈的基准指针(如 g0 和 gsignal 的系统栈) // 异常处理 _panic *_panic // 指向当前 Goroutine 最近的 panic 信息,用于异常恢复 _defer *_defer // 指向当前 Goroutine 最近的 defer 结构,用于延迟调用管理 // 调度相关 m *m // 当前 Goroutine 所属的线程 m sched gobuf // Goroutine 的寄存器上下文信息,保存执行状态 atomicstatus atomic.Uint32 // Goroutine 的状态,使用原子操作管理(如 _Grunning、_Gwaiting 等) goid uint64 // Goroutine 唯一标识符 schedlink guintptr // 调度链表指针,用于 Goroutine 的队列操作 waitsince int64 // 阻塞时间戳,用于监控 Goroutine 的等待时间 waitreason waitReason // 阻塞原因(如通道操作、锁等待等) // 抢占与栈调整 preempt bool // 是否触发抢占,通常由调度器根据时间片或栈检查触发 preemptStop bool // 是否在抢占后进入 _Gpreempted 状态 preemptShrink bool // 是否在安全点缩减栈空间 asyncSafePoint bool // 是否停在异步安全点,栈中可能存在不精确指针 paniconfault bool // 遇到异常地址时是否 panic(而不是崩溃) gcscandone bool // 是否已扫描过当前 Goroutine 的栈,避免重复扫描 throwsplit bool // 是否禁止栈分裂,某些 Goroutine 可能要求完整栈 activeStackChans bool // 是否有指向该 Goroutine 栈的未锁定通道,影响栈复制 // 通道与同步状态 parkingOnChan atomic.Bool // 当前 Goroutine 是否停在通道操作上 inMarkAssist bool // 是否参与 GC 标记辅助工作 coroexit bool // 是否在协程切换时退出 // 性能统计与状态 raceignore int8 // 是否忽略竞态检测,用于屏蔽特定 Goroutine 的事件 nocgocallback bool // 是否禁用从 C 回调到 Go tracking bool // 是否追踪此 Goroutine 的调度延迟 trackingSeq uint8 // 调度追踪序列号 trackingStamp int64 // 上次开始追踪的时间戳 runnableTime int64 // 累计的可运行时间,仅用于性能追踪 lockedm muintptr // 锁定的线程 m,表示 Goroutine 被绑定到特定线程 // 信号处理 sig uint32 // 接收到的信号值 writebuf []byte // 用于信号处理的缓冲区 sigcode0 uintptr // 信号的附加信息 sigcode1 uintptr // 信号的附加信息 sigpc uintptr // 触发信号的程序计数器 // Goroutine 关系 parentGoid uint64 // 父 Goroutine 的 ID gopc uintptr // 创建此 Goroutine 的 `go` 指令的地址 ancestors *[]ancestorInfo // 祖先信息(用于调试) startpc uintptr // Goroutine 执行的入口函数地址 // 性能调试与分析 racectx uintptr // 竞态检测上下文,用于调试竞态条件 waiting *sudog // 当前阻塞的 sudog 指针 cgoCtxt []uintptr // Cgo 调用栈上下文信息 labels unsafe.Pointer // 分析器标签,用于性能统计 timer *timer // 与当前 Goroutine 关联的定时器 sleepWhen int64 // 睡眠的截止时间(Unix 时间戳) selectDone atomic.Uint32 // 是否参与了 select 操作,以及是否完成 // 性能分析与协程状态 goroutineProfiled goroutineProfileStateHolder // 当前 Goroutine 的性能分析状态 coroarg *coro // 协程切换时的参数 trace gTraceState // 当前 Goroutine 的跟踪信息 // GC 辅助 gcAssistBytes int64 // 当前 Goroutine 的 GC 辅助字节计数,为负值时需参与垃圾回收 } type m struct { // 基础信息 g0 *g // 默认 Goroutine(系统级 g0,用于调度和运行时管理) morebuf gobuf // 当前 Goroutine 调度的上下文缓冲区 divmod uint32 // ARM 架构的除法/取模分母(liblink 使用) _ uint32 // 对齐填充字段 // 线程及信号相关 procid uint64 // 操作系统线程 ID(供调试器使用) gsignal *g // 用于信号处理的 Goroutine goSigStack gsignalStack // Go 分配的信号处理栈 sigmask sigset // 保存的信号屏蔽集 tls [tlsSlots]uintptr // 线程局部存储数据 // 启动及当前 Goroutine 状态 mstartfn func() // m 启动时的函数 curg *g // 当前正在运行的 Goroutine catchsig guintptr // 捕获信号的 Goroutine(在致命信号期间运行) // 调度器相关 p puintptr // 当前线程绑定的 P nextp puintptr // 即将绑定的 P oldp puintptr // 上次绑定的 P(如从系统调用恢复时) id int64 // m 的唯一标识符 mallocing int32 // 是否正在进行内存分配 throwing throwType // 是否正在执行异常(panic)操作 preemptoff string // 禁止抢占的原因(为空则允许抢占) locks int32 // 当前 m 持有的锁数量 dying int32 // 是否标记为销毁状态 profilehz int32 // 性能分析的采样频率 spinning bool // 是否处于自旋状态(等待任务) blocked bool // 是否阻塞在某个通知(note) newSigstack bool // 是否初始化了新的信号栈 printlock int8 // 打印调试日志时的锁 incgo bool // 是否正在执行 cgo 调用 isextra bool // 是否为额外线程(辅助线程) isExtraInC bool // 是否为 C 中的辅助线程 isExtraInSig bool // 是否为信号处理中的辅助线程 freeWait atomic.Uint32 // 是否可以安全地释放 g0 并删除 m needextram bool // 是否需要额外的 m traceback uint8 // 是否启用追踪调试 ncgocall uint64 // 累计 cgo 调用次数 ncgo int32 // 当前进行中的 cgo 调用数量 cgoCallersUse atomic.Uint32 // 临时标记 cgoCallers 是否在使用 cgoCallers *cgoCallers // cgo 堆栈跟踪 park note // 用于同步的等待通知 alllink *m // 全局 m 链表中的下一个 m schedlink muintptr // 调度器链表中的下一个 m lockedg guintptr // 当前锁定的 Goroutine createstack [32]uintptr // 创建此线程的调用栈(用于调试) lockedExt uint32 // 外部锁定计数(用于 LockOSThread) lockedInt uint32 // 内部锁定计数 nextwaitm muintptr // 等待锁的下一个 m // 锁与等待 mLockProfile mLockProfile // 锁性能分析信息 profStack []uintptr // 用于内存、阻塞或互斥锁的栈跟踪 waitunlockf func(*g, unsafe.Pointer) bool // 解锁时的回调函数 waitlock unsafe.Pointer // 等待锁的指针 waitTraceSkip int // 跟踪时跳过的栈帧数 waitTraceBlockReason traceBlockReason // 阻塞的具体原因 // 系统调用 syscalltick uint32 // 系统调用的计时器 freelink *m // 已释放 m 链表的下一项 trace mTraceState // m 的跟踪状态 // 库调用 libcall libcall // 库调用信息 libcallpc uintptr // 库调用的程序计数器 libcallsp uintptr // 库调用的栈指针 libcallg guintptr // 执行库调用的 Goroutine winsyscall winlibcall // Windows 系统调用参数 // VDSO(虚拟动态共享对象) vdsoSP uintptr // VDSO 调用的栈指针 vdsoPC uintptr // VDSO 调用的程序计数器 // 抢占与信号 preemptGen atomic.Uint32 // 已完成的抢占信号计数 signalPending atomic.Uint32 // 是否有待处理的抢占信号 // 缓存 pcvalueCache pcvalueCache // PC 值查找缓存 // 随机数 chacha8 chacha8rand.State // ChaCha8 随机数生成器状态 cheaprand uint64 // 简易随机数生成器 // 锁信息 locksHeldLen int // 持有的锁数量 locksHeld [10]heldLockInfo // 持有的锁的详细信息(最多记录 10 个) } type p struct { id int32 // P 的唯一标识符 status uint32 // P 的状态(pidle、prunning 等) link puintptr // 链接到下一个空闲的 P schedtick uint32 // 每次调度器调用时递增 syscalltick uint32 // 每次系统调用时递增 sysmontick sysmontick // sysmon 上次观察到的计数 m muintptr // 关联的 M(空闲时为 nil) mcache *mcache // P 的内存缓存 pcache pageCache // 页面缓存 raceprocctx uintptr // Race Detector 的处理上下文 // 延迟池 deferpool []*_defer // 可用的延迟结构池 deferpoolbuf [32]*_defer // 缓存用于延迟结构池的缓冲区 // Goroutine ID 缓存 goidcache uint64 // 缓存的 Goroutine ID 起始值 goidcacheend uint64 // 缓存的 Goroutine ID 结束值 // 可运行的 Goroutine 队列 runqhead uint32 // 可运行队列的头部 runqtail uint32 // 可运行队列的尾部 runq [256]guintptr // 可运行队列,存储 Goroutine 的指针 // runnext 是当前 Goroutine 已准备好下一个运行的 Goroutine // 它会继承当前 Goroutine 的剩余时间片,优化调度延迟。 runnext guintptr // 已死亡的 Goroutine 列表 gFree struct { gList n int32 // 列表中 Goroutine 的数量 } sudogcache []*sudog // Sudog 对象的缓存 sudogbuf [128]*sudog // Sudog 缓存的缓冲区 // 堆上的 mspan 对象缓存 mspancache struct { len int // 缓存的 mspan 对象数量 buf [128]*mspan // 缓存的 mspan 对象 } // Pinner 对象缓存,用于减少重复创建的分配 pinnerCache *pinner trace pTraceState // 跟踪状态 palloc persistentAlloc // 持久性分配,用于避免互斥锁 // 垃圾回收 (GC) 状态 gcAssistTime int64 // assistAlloc 的时间(纳秒) gcFractionalMarkTime int64 // fractional mark worker 的时间(纳秒) limiterEvent limiterEvent // GC CPU 限制器的事件 gcMarkWorkerMode gcMarkWorkerMode // 下一个标记工作线程的模式 gcMarkWorkerStartTime int64 // 最近标记工作线程开始的时间 gcw gcWork // P 的 GC 工作缓存 wbBuf wbBuf // P 的 GC 写屏障缓冲区 runSafePointFn uint32 // 如果为 1,则在下一个安全点运行 sched.safePointFn statsSeq atomic.Uint32 // P 的统计信息写入状态(偶数:未写入,奇数:写入中) // 定时器堆 timers timers // 累积的 goroutine 栈扫描大小 maxStackScanDelta int64 // 栈扫描增量(触发 gcController.maxStackScan 时刷新) scannedStackSize uint64 // GC 时间扫描的栈大小 scannedStacks uint64 // GC 时间扫描的 Goroutine 数量 preempt bool // 标志是否需要尽快进入调度器 gcStopTime int64 // 上次进入 _Pgcstop 的时间戳 } type schedt struct { goidgen atomic.Uint64 // 全局 Goroutine ID 生成器 lastpoll atomic.Int64 // 上次网络轮询的时间,0 表示当前正在轮询 pollUntil atomic.Int64 // 当前轮询的睡眠时间 lock mutex // 调度器全局锁 // M 相关状态 midle muintptr // 空闲 M 列表,等待工作的 M nmidle int32 // 空闲 M 的数量 nmidlelocked int32 // 被锁定且空闲的 M 数量 mnext int64 // 已创建的 M 数量及下一个 M 的 ID maxmcount int32 // 允许的最大 M 数量 nmsys int32 // 系统级别的 M 数量,不计入死锁检测 nmfreed int64 // 已释放的 M 总数 ngsys atomic.Int32 // 系统 Goroutine 的数量 // P 相关状态 pidle puintptr // 空闲 P 列表 npidle atomic.Int32 // 空闲 P 的数量 nmspinning atomic.Int32 // 当前自旋中的 M 数量 needspinning atomic.Uint32 // 是否需要更多自旋 M 的标志,需持有 sched.lock 才可设置 // 全局可运行的 Goroutine 队列 runq gQueue // 全局队列 runqsize int32 // 队列大小 // 调度器禁用控制 disable struct { user bool // 是否禁用用户 Goroutine 调度 runnable gQueue // 等待调度的 Goroutine 队列 n int32 // 队列长度 } // 全局 G 缓存 gFree struct { lock mutex // 缓存锁 stack gList // 带栈的 G 列表 noStack gList // 无栈的 G 列表 n int32 // 缓存的 G 数量 } // sudog 对象的全局缓存 sudoglock mutex sudogcache *sudog // 延迟对象的全局缓存 deferlock mutex deferpool *_defer // 等待释放的 M 列表,通过 m.freelink 链接 freem *m // GC 相关状态 gcwaiting atomic.Bool // 是否正在等待 GC stopwait int32 // 停止等待的计数器 stopnote note // 停止的通知 sysmonwait atomic.Bool // 系统监控是否等待中 sysmonnote note // 系统监控通知 // 下一个 GC 安全点需执行的函数 safePointFn func(*p) // 每个 P 在安全点需要运行的函数 safePointWait int32 // 等待安全点的计数器 safePointNote note // 安全点的通知 profilehz int32 // CPU 性能分析的频率 // GOMAXPROCS 调整的时间统计 procresizetime int64 // 上次调整 GOMAXPROCS 的时间(纳秒) totaltime int64 // 累计 GOMAXPROCS 的积分时间 sysmonlock mutex // 系统监控锁,阻止系统监控与 runtime 的交互 // 调度延迟分布 timeToRun timeHistogram // G 在 _Grunnable 到 _Grunning 状态的延迟分布 // P 的空闲时间 idleTime atomic.Int64 // 当前周期的空闲时间,GC 周期重置 // Goroutine 互斥锁等待时间 totalMutexWaitTime atomic.Int64 // Goroutine 在 _Gwaiting 状态下等待锁的总时间 // 停止世界(STW)延迟分布 stwStoppingTimeGC timeHistogram // GC 相关的 STW 停止延迟 stwStoppingTimeOther timeHistogram // 非 GC 的 STW 停止延迟 // STW 总延迟分布 stwTotalTimeGC timeHistogram // GC 相关的 STW 总延迟 stwTotalTimeOther timeHistogram // 非 GC 的 STW 总延迟 // 运行时锁等待时间 totalRuntimeLockWaitTime atomic.Int64 // G 在 _Grunnable 状态下等待 runtime 锁的总时间 } ~~~ > # 协程的调度不是随机的 * Go 协程调度**不是随机的**,但它也**不是按顺序**的。它是一种基于信号的抢占式、工作窃取的调度模型 * 调度器的目的是在多个协程之间公平分配 CPU 资源,并在必要时暂停某些协程,让其他协程有机会运行 ~~~ package main import ( "fmt" "time" ) func main() { ch := make(chan struct{}) for i := 0; i < 10; i++ { go func(num int) { for { <-ch fmt.Println(num) } }(i) time.Sleep(time.Millisecond) } time.Sleep(time.Second) for j := 0; j < 10; j++ { ch <- struct{}{} //**不加 `time.Sleep(time.Millisecond)`** 时,多个 Goroutine 几乎同时启动,调度器会随机选择哪个 Goroutine 先得到 CPU,因此打印顺序不确定。 //**加了 `time.Sleep(time.Millisecond)`** 时,每个 Goroutine 启动后,主协程会休眠 1 毫秒。这个时间足够让调度器有机会依次启动每个 Goroutine,导致它们的打印顺序更加有序 //time.Sleep(time.Millisecond) 改变输出结果 } time.Sleep(time.Minute) } ~~~ > # CSP 模型 * **CSP**(Communicating Sequential Processes,通信顺序进程) 是一种并发编程模型,其核心理念是**不要通过共享内存来通信,而要通过通信来实现内存共享**。 * 在 Go 语言中,CSP 模型主要通过**Goroutine**和**Channel**来实现。多个 Goroutine 之间的通信通常使用**Channel**,从而避免了共享内存导致的并发问题。 > # 协程 (捕获异常 和 协程池) * 直接用go关键字开协程,不捕获异常的话, 如果出现异常,会导致整个程序结束 * Go 语言中的**Goroutine**相较于系统线程来说非常轻量级,其初始栈大小仅为**2KB**。然而,在高并发场景下,大量的 Goroutine 被频繁创建和销毁,可能会对性能产生负面影响,并增加**GC(垃圾回收)**的压力。 * 为了减少 Goroutine 的创建和销毁所带来的性能损耗,建议充分**复用 Goroutine**。通过使用**Goroutine 池**或者其他方式来复用已经存在的 Goroutine,可以有效地降低系统的开销 * goroutine.go ~~~ package main import ( "fmt" ) // WorkerPool 定义一个工作池结构体 type WorkerPool struct { maxWorkers int taskQueue chan func() } // NewWorkerPool 创建一个新的工作池 func NewWorkerPool(maxWorkers int) *WorkerPool { return &WorkerPool{ maxWorkers: maxWorkers, taskQueue: make(chan func()), } } // Start 启动工作池 func (wp *WorkerPool) Start() { for i := 0; i < wp.maxWorkers; i++ { go wp.worker() } } // worker 执行任务的工作者 goroutine func (wp *WorkerPool) worker() { for task := range wp.taskQueue { safeExecute(task) } } // safeExecute 安全执行任务,捕获异常 func safeExecute(task func()) { defer func() { if r := recover(); r != nil { fmt.Println("Recovered from panic:", r) } }() task() } // Submit 提交任务到工作池 func (wp *WorkerPool) Submit(task func()) { wp.taskQueue <- task } ~~~ * main.go ~~~ package main import ( "fmt" "time" ) var pool *WorkerPool func init() { pool = NewWorkerPool(20) pool.Start() } func SafeGo(f func()) { pool.Submit(f) } func main() { for i := 0; i < 10; i++ { SafeGo(func(num int) func() { return func() { fmt.Println("A", num) } }(i)) } for i := 0; i < 10; i++ { SafeGo(func(num int) func() { return func() { fmt.Println("B", num) } }(i)) } time.Sleep(time.Second * 3) } ~~~ > # channel 数据结构 * chan T // 可以接收和发送类型为 T 的数据 * chan<- T // 只可以用来发送 T 类型的数据 * <-chan T // 只可以用来接收 T 类型的数据 ~~~ type hchan struct { qcount uint // 通道中当前元素个数 dataqsiz uint // 缓冲区的大小 (无缓冲时为0) buf unsafe.Pointer // (环形缓冲区)指向缓冲区的指针,缓冲区通过这个指针来存储数据,已经发送但还未被接收的数据 elemsize uint16 // 单个元素的大小(如果通道是 chan int 类型,那么每个元素就是一个 int,elemsize 表示 int 类型的字节大小。) closed uint32 // 标识 channel 是否已关闭 (0 表示未关闭,1 表示已关闭) timer *timer // 用于处理与该通道相关的超时操作 elemtype *_type // 指向类型描述符的指针,它包含了通道中传输数据类型的所有信息 sendx uint // 下一个要发送元素的位置 recvx uint // 下一个要接收元素的位置 (每次读取数据后,recvx 都会递增,当到达缓冲区末尾时,recvx 会重置为 0,形成环形读取操作) recvq waitq // 等待接收数据的 goroutine 队列 sendq waitq // 等待发送数据的 goroutine 队列 lock mutex // 互斥锁,用于保护 channel 的操作 } ~~~ > # 操作 nil channel, close channel, 正常 channel | 操作 | nil channel | close channel | 正常 channel | | --- | --- | --- | --- | | close | panic: close of nil channel | panic: close of closed channel | 正常关闭 | | 读操作 | fatal error: all goroutines are asleep - deadlock! | 有未接收的值可以正常读, 没有的话读到对应类型的零值 | 阻塞/正常读数据 | | 写操作 | fatal error: all goroutines are asleep - deadlock! | panic: send on closed channel | 阻塞/正常写数据 | > # channel 发送和接收数据的本质 ~~~ //向 channel 发送值类型会拷贝, 发送引用类型拷贝的是引用 package main import "fmt" func main() { ch := make(chan []int, 1) s := make([]int, 1) s[0] = 1 ch <- s s[0] = 2 fmt.Println(<-ch) //输出2 } ~~~ > # range channel ~~~ package main import ( "fmt" "time" ) func main() { ch := make(chan int) go func() { time.Sleep(time.Second * 10) close(ch) }() //如果通道没有关闭,但生产者没有再发送数据,range 会持续阻塞,直到有新的数据发送到通道或通道被关闭 for v := range ch { fmt.Println(v) } fmt.Println("Done") } ~~~ > # 使用 select 来多路复用 channel ~~~ package main import ( "fmt" "time" ) // 随机选择:当多个 channel 同时满足条件时,select 会随机选择一个执行。(一个channel先准备,另一个channel后准备好,select也是随机选择) // 默认分支:可以在 select 中添加 default 分支,当所有的 channel 都没有数据时,select 可以立即执行 default 分支而不阻塞。 func main() { ch1 := make(chan struct{}) ch2 := make(chan struct{}) go func() { ch1 <- struct{}{} }() go func() { ch2 <- struct{}{} }() go func() { for { select { case <-ch1: fmt.Println("ch1") time.Sleep(1 * time.Second) case <-ch2: fmt.Println("ch2") time.Sleep(1 * time.Second) case <-time.After(2 * time.Second): fmt.Println("超时") time.Sleep(1 * time.Second) default: fmt.Println("default") time.Sleep(1 * time.Second) } } }() time.Sleep(time.Second * 10) close(ch1) //通道关闭后,select 还会执行 time.Sleep(time.Second * 10) } ~~~ > # 如何优雅的关闭 channel ~~~ //channel关闭原则: 不要在消费端关闭channel,不要再生产端有多个并行的时候执行关闭操作 //判断channel是否关闭: v, ok := <-ch 取值的时候加判断, 关闭的channel, v 返回对应类型的零值, ok 返回 false。用这种方式去判断channel 是否关闭有副作用, 会读出channel里的元素 //如何优雅的关闭channel //--- 1.直接关闭channel用recover捕获异常 //----2.sync.Once来确保channel只关闭一次 //--- 3.增加状态字段存channel是否关闭(需要用到锁) //--- 4.增加一个channel作为关闭信号(如下代码一) //--- 5.使用context(如下代码二) //用完的channel, 没有发送者, 没有接收者, 也没有关闭会一直占用内存吗? 会的 //已关闭的channel, 还有未接收的数据, 但是没有发送者和接受者会一直占用内存吗? 不会, 如果没有任何引用指向已关闭的通道,它会被垃圾回收器回收,并释放内存 //为什么go不提供一个函数来判断 channel 是否关闭? 避免竞争条件(获取的那一时刻没关闭,后续关闭问题) ~~~ * 代码1 ~~~ package main import ( "fmt" "sync" "time" ) func main() { ch := make(chan int, 100) chClose := make(chan struct{}) var ( wgSend sync.WaitGroup ) wgSend.Add(10) // 启动生产者 for i := 0; i < 10; i++ { go func(num int) { defer wgSend.Done() for { select { case <-chClose: fmt.Println("发送关闭", num) return case ch <- num: } } }(i) } // 启动消费者 for i := 0; i < 10; i++ { go func(num int) { for { select { case <-chClose: fmt.Println("接收关闭", num) return case v := <-ch: fmt.Println("接收到的数据", v) } } }(i) } time.Sleep(time.Second * 10) close(chClose) wgSend.Wait() close(ch) } ~~~ * 代码2 ~~~ package main import ( "context" "fmt" "sync" "time" ) func main() { ch := make(chan int, 100) ctx, cancel := context.WithCancel(context.Background()) var ( wgSend sync.WaitGroup ) wgSend.Add(10) // 启动生产者 for i := 0; i < 10; i++ { go func(ctx context.Context, num int) { defer wgSend.Done() for { select { case <-ctx.Done(): fmt.Println("发送关闭", num) return case ch <- num: } } }(ctx, i) } // 启动消费者 for i := 0; i < 10; i++ { go func(ctx context.Context, num int) { for { select { case <-ctx.Done(): fmt.Println("接收关闭", num) return case v := <-ch: fmt.Println("接收到的数据", v) } } }(ctx, i) } time.Sleep(time.Second * 10) cancel() // 通知所有协程关闭 wgSend.Wait() close(ch) } ~~~ > # 交替打印 ~~~ package main import ( "fmt" "time" ) func main() { //无缓冲通道的特点是发送和接收必须同时发生,否则操作会阻塞(不能是ch := make(chan struct{}, 1),有缓冲了会出现抢占) ch := make(chan struct{}) go func() { for { <-ch fmt.Println(1) ch <- struct{}{} } }() go func() { for { <-ch fmt.Println(2) ch <- struct{}{} } }() ch <- struct{}{} time.Sleep(time.Hour) } ~~~