> # 内容
* [内容](https://www.kancloud.cn/book/xiaohuamao/go100/edit#_0)
* [G的状态](https://www.kancloud.cn/book/xiaohuamao/go100/edit#G_4)
* [P的状态](https://www.kancloud.cn/book/xiaohuamao/go100/edit#P_29)
* [GMP调度模型](https://www.kancloud.cn/book/xiaohuamao/go100/edit#GMP_40)
* [GMP数据结构](https://www.kancloud.cn/book/xiaohuamao/go100/edit#GMP_57)
* [协程的调度不是随机的](https://www.kancloud.cn/book/xiaohuamao/go100/edit#_396)
* [CSP 模型](https://www.kancloud.cn/book/xiaohuamao/go100/edit#CSP__431)
* [协程 (捕获异常 和 协程池)](https://www.kancloud.cn/book/xiaohuamao/go100/edit#____436)
* [channel 数据结构](https://www.kancloud.cn/book/xiaohuamao/go100/edit#channel__535)
* [操作 nil channel, close channel, 正常 channel](https://www.kancloud.cn/book/xiaohuamao/go100/edit#_nil_channel_close_channel__channel_556)
* [channel 发送和接收数据的本质](https://www.kancloud.cn/book/xiaohuamao/go100/edit#channel__563)
* [range channel](https://www.kancloud.cn/book/xiaohuamao/go100/edit#range_channel_581)
* [使用 select 来多路复用 channel](https://www.kancloud.cn/book/xiaohuamao/go100/edit#_select__channel_606)
* [如何优雅的关闭 channel](https://www.kancloud.cn/book/xiaohuamao/go100/edit#_channel_654)
* [交替打印](https://www.kancloud.cn/book/xiaohuamao/go100/edit#_789)
> # G的状态
~~~
const (
_Gidle = iota // 0: 刚创建但尚未初始化,未分配栈空间
_Grunnable // 1: 已就绪(创建后/被唤醒时),等待调度执行,位于运行队列
_Grunning // 2: 正正在运行,绑定到M和P,其栈被锁定,无法被GC或其他线程访问
_Gsyscall // 3: 正在执行系统调用(文件I/O、网络I/O、线程同步操作等),处于内核态,栈被锁定,不在运行队列
_Gwaiting // 4: 被阻塞,等待资源(如通道、锁、网络等),不在运行队列
_Gmoribund_unused // 5: 未使用状态(保留,占位符)
_Gdead // 6: 已结束生命周期
_Genqueue_unused // 7: 未使用状态(保留,占位符)
_Gcopystack // 8: 栈正在动态扩容/收缩,禁止调度(每G的栈初始大小为2KB,当栈空间不足时会分配更大的栈,并将现有的栈内容复制到新栈中)
_Gpreempted // 9: 被调度器抢占暂停,等待重新调度, 在运行队列
// GC 扫描相关标志
_Gscan = 0x1000 // 标志位,用于叠加在其他状态上,表示 GC 正在扫描 Goroutine 栈
_Gscanrunnable = _Gscan + _Grunnable // 0x1001: GC 正在扫描运行队列中的 Goroutine 的栈
_Gscanrunning = _Gscan + _Grunning // 0x1002: GC 正在扫描正在运行的 Goroutine 的栈
_Gscansyscall = _Gscan + _Gsyscall // 0x1003: GC 正在扫描处于系统调用中的 Goroutine 的栈
_Gscanwaiting = _Gscan + _Gwaiting // 0x1004: GC 正在扫描处于等待状态的 Goroutine 的栈
_Gscanpreempted = _Gscan + _Gpreempted // 0x1009: GC 正在扫描被抢占暂停的 Goroutine 的栈
)
~~~
> # P的状态
~~~
const (
_Pidle = iota // 0: 空闲状态,未被 M 使用,未运行用户代码或调度工作
_Prunning // 1: 正在运行,被 M 持有,运行用户代码或调度 Goroutine
_Psyscall // 2: 与执行系统调用的 M 关联,暂不运行用户代码
_Pgcstop // 3: 暂停状态,GC 的 STW 阶段被停止
_Pdead // 4: 不可用状态,通常因 GOMAXPROCS 设置减少
)
~~~
> # GMP调度模型
* 用户级线程(G):内核级线程(M) 多对多模型
* 数量
* P的数量:默认情况下P的数量与逻辑CPU核心数相同, 通过runtime.GOMAXPROCS(n)来修改, 通过runtime.NumCPU()查看
* M和G数量:由调度器动态管理, 通过runtime.NumGoroutine()查看G数量, M的最大数量:sched.maxmcount = 10000
* 流程
* 创建G:先加入P的本地队列, 如果本地队列已满, 则加入到全局队列
* 获取G:优先从P的本地运列获取G, 如果本地队列为空,从全局队列获取G, 如果全局队列也为空,从其他P的本地队列窃取任务
* 执行G:
* \_Gsyscall时: 不在运行队列, 当前的P释放M,绑定新的M继续调度, 旧M上调度的旧G执行完后, 旧G加入某个P的本地队列或全局队列,旧M限制的时候会加入全局空闲队列(旧M在没有执行完系统调用前不会被其它P绑定)
* \_Gwaiting时:不在运行队列, 释放G的绑定, 单独的G等待事件完成后唤醒, 然后加个某个P的本地队列或全局队列
* \_Gpreempted时:在运行队列,被抢占的 G因为时间片用尽或调度器的其他策略,暂时让出 CPU
* \_Gdead时: 当前的G不需要执行,销毁
![](https://git.kancloud.cn/repos/xiaohuamao/go100/raw/e34f0b206fb87426acef5cface4f91a47035c35c/images/18-go-func%E8%B0%83%E5%BA%A6%E5%91%A8%E6%9C%9F.jpeg?access-token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJleHAiOjE3MzQ4OTQ5NTQsImlhdCI6MTczNDg1MTc1NCwicmVwb3NpdG9yeSI6InhpYW9odWFtYW9cL2dvMTAwIiwidXNlciI6eyJ1c2VybmFtZSI6InhpYW9odWFtYW8iLCJuYW1lIjoiXHU0ZjFmXHU2ZDJhd2lubmllIiwiZW1haWwiOiIyOTg3NDIzMTY0QHFxLmNvbSIsInRva2VuIjoiMzA4ZjlmN2UwYTZkZTY5MzQyNTk0Y2YzMjFkNzA2ZTYiLCJhdXRob3JpemUiOnsicHVsbCI6dHJ1ZSwicHVzaCI6dHJ1ZSwiYWRtaW4iOnRydWV9fX0.Gfne-tXSRr9KoDkXSKBtBUVW-CNbVQJK6yWyFXbCTHI)
(图片来源:[https://github.com/aceld/golang/blob/main/images/18-go-func调度周期.jpeg](https://github.com/aceld/golang/blob/main/images/18-go-func%E8%B0%83%E5%BA%A6%E5%91%A8%E6%9C%9F.jpeg))
> # GMP数据结构
* G:任务同单位, 通过m字段绑定所属的M
* P:持有G队列, 通过m字段绑定所属的M
* M: curg当前的G, p字段 当前的P
* schedt:管控全局资源分配,调度行为,以及 GC 协调
~~~
type g struct {
// 栈相关字段
stack stack // Goroutine 栈的范围:[stack.lo, stack.hi)
stackguard0 uintptr // 检测栈溢出的基准指针,通常为 stack.lo + StackGuard
stackguard1 uintptr // 特殊栈的基准指针(如 g0 和 gsignal 的系统栈)
// 异常处理
_panic *_panic // 指向当前 Goroutine 最近的 panic 信息,用于异常恢复
_defer *_defer // 指向当前 Goroutine 最近的 defer 结构,用于延迟调用管理
// 调度相关
m *m // 当前 Goroutine 所属的线程 m
sched gobuf // Goroutine 的寄存器上下文信息,保存执行状态
atomicstatus atomic.Uint32 // Goroutine 的状态,使用原子操作管理(如 _Grunning、_Gwaiting 等)
goid uint64 // Goroutine 唯一标识符
schedlink guintptr // 调度链表指针,用于 Goroutine 的队列操作
waitsince int64 // 阻塞时间戳,用于监控 Goroutine 的等待时间
waitreason waitReason // 阻塞原因(如通道操作、锁等待等)
// 抢占与栈调整
preempt bool // 是否触发抢占,通常由调度器根据时间片或栈检查触发
preemptStop bool // 是否在抢占后进入 _Gpreempted 状态
preemptShrink bool // 是否在安全点缩减栈空间
asyncSafePoint bool // 是否停在异步安全点,栈中可能存在不精确指针
paniconfault bool // 遇到异常地址时是否 panic(而不是崩溃)
gcscandone bool // 是否已扫描过当前 Goroutine 的栈,避免重复扫描
throwsplit bool // 是否禁止栈分裂,某些 Goroutine 可能要求完整栈
activeStackChans bool // 是否有指向该 Goroutine 栈的未锁定通道,影响栈复制
// 通道与同步状态
parkingOnChan atomic.Bool // 当前 Goroutine 是否停在通道操作上
inMarkAssist bool // 是否参与 GC 标记辅助工作
coroexit bool // 是否在协程切换时退出
// 性能统计与状态
raceignore int8 // 是否忽略竞态检测,用于屏蔽特定 Goroutine 的事件
nocgocallback bool // 是否禁用从 C 回调到 Go
tracking bool // 是否追踪此 Goroutine 的调度延迟
trackingSeq uint8 // 调度追踪序列号
trackingStamp int64 // 上次开始追踪的时间戳
runnableTime int64 // 累计的可运行时间,仅用于性能追踪
lockedm muintptr // 锁定的线程 m,表示 Goroutine 被绑定到特定线程
// 信号处理
sig uint32 // 接收到的信号值
writebuf []byte // 用于信号处理的缓冲区
sigcode0 uintptr // 信号的附加信息
sigcode1 uintptr // 信号的附加信息
sigpc uintptr // 触发信号的程序计数器
// Goroutine 关系
parentGoid uint64 // 父 Goroutine 的 ID
gopc uintptr // 创建此 Goroutine 的 `go` 指令的地址
ancestors *[]ancestorInfo // 祖先信息(用于调试)
startpc uintptr // Goroutine 执行的入口函数地址
// 性能调试与分析
racectx uintptr // 竞态检测上下文,用于调试竞态条件
waiting *sudog // 当前阻塞的 sudog 指针
cgoCtxt []uintptr // Cgo 调用栈上下文信息
labels unsafe.Pointer // 分析器标签,用于性能统计
timer *timer // 与当前 Goroutine 关联的定时器
sleepWhen int64 // 睡眠的截止时间(Unix 时间戳)
selectDone atomic.Uint32 // 是否参与了 select 操作,以及是否完成
// 性能分析与协程状态
goroutineProfiled goroutineProfileStateHolder // 当前 Goroutine 的性能分析状态
coroarg *coro // 协程切换时的参数
trace gTraceState // 当前 Goroutine 的跟踪信息
// GC 辅助
gcAssistBytes int64 // 当前 Goroutine 的 GC 辅助字节计数,为负值时需参与垃圾回收
}
type m struct {
// 基础信息
g0 *g // 默认 Goroutine(系统级 g0,用于调度和运行时管理)
morebuf gobuf // 当前 Goroutine 调度的上下文缓冲区
divmod uint32 // ARM 架构的除法/取模分母(liblink 使用)
_ uint32 // 对齐填充字段
// 线程及信号相关
procid uint64 // 操作系统线程 ID(供调试器使用)
gsignal *g // 用于信号处理的 Goroutine
goSigStack gsignalStack // Go 分配的信号处理栈
sigmask sigset // 保存的信号屏蔽集
tls [tlsSlots]uintptr // 线程局部存储数据
// 启动及当前 Goroutine 状态
mstartfn func() // m 启动时的函数
curg *g // 当前正在运行的 Goroutine
catchsig guintptr // 捕获信号的 Goroutine(在致命信号期间运行)
// 调度器相关
p puintptr // 当前线程绑定的 P
nextp puintptr // 即将绑定的 P
oldp puintptr // 上次绑定的 P(如从系统调用恢复时)
id int64 // m 的唯一标识符
mallocing int32 // 是否正在进行内存分配
throwing throwType // 是否正在执行异常(panic)操作
preemptoff string // 禁止抢占的原因(为空则允许抢占)
locks int32 // 当前 m 持有的锁数量
dying int32 // 是否标记为销毁状态
profilehz int32 // 性能分析的采样频率
spinning bool // 是否处于自旋状态(等待任务)
blocked bool // 是否阻塞在某个通知(note)
newSigstack bool // 是否初始化了新的信号栈
printlock int8 // 打印调试日志时的锁
incgo bool // 是否正在执行 cgo 调用
isextra bool // 是否为额外线程(辅助线程)
isExtraInC bool // 是否为 C 中的辅助线程
isExtraInSig bool // 是否为信号处理中的辅助线程
freeWait atomic.Uint32 // 是否可以安全地释放 g0 并删除 m
needextram bool // 是否需要额外的 m
traceback uint8 // 是否启用追踪调试
ncgocall uint64 // 累计 cgo 调用次数
ncgo int32 // 当前进行中的 cgo 调用数量
cgoCallersUse atomic.Uint32 // 临时标记 cgoCallers 是否在使用
cgoCallers *cgoCallers // cgo 堆栈跟踪
park note // 用于同步的等待通知
alllink *m // 全局 m 链表中的下一个 m
schedlink muintptr // 调度器链表中的下一个 m
lockedg guintptr // 当前锁定的 Goroutine
createstack [32]uintptr // 创建此线程的调用栈(用于调试)
lockedExt uint32 // 外部锁定计数(用于 LockOSThread)
lockedInt uint32 // 内部锁定计数
nextwaitm muintptr // 等待锁的下一个 m
// 锁与等待
mLockProfile mLockProfile // 锁性能分析信息
profStack []uintptr // 用于内存、阻塞或互斥锁的栈跟踪
waitunlockf func(*g, unsafe.Pointer) bool // 解锁时的回调函数
waitlock unsafe.Pointer // 等待锁的指针
waitTraceSkip int // 跟踪时跳过的栈帧数
waitTraceBlockReason traceBlockReason // 阻塞的具体原因
// 系统调用
syscalltick uint32 // 系统调用的计时器
freelink *m // 已释放 m 链表的下一项
trace mTraceState // m 的跟踪状态
// 库调用
libcall libcall // 库调用信息
libcallpc uintptr // 库调用的程序计数器
libcallsp uintptr // 库调用的栈指针
libcallg guintptr // 执行库调用的 Goroutine
winsyscall winlibcall // Windows 系统调用参数
// VDSO(虚拟动态共享对象)
vdsoSP uintptr // VDSO 调用的栈指针
vdsoPC uintptr // VDSO 调用的程序计数器
// 抢占与信号
preemptGen atomic.Uint32 // 已完成的抢占信号计数
signalPending atomic.Uint32 // 是否有待处理的抢占信号
// 缓存
pcvalueCache pcvalueCache // PC 值查找缓存
// 随机数
chacha8 chacha8rand.State // ChaCha8 随机数生成器状态
cheaprand uint64 // 简易随机数生成器
// 锁信息
locksHeldLen int // 持有的锁数量
locksHeld [10]heldLockInfo // 持有的锁的详细信息(最多记录 10 个)
}
type p struct {
id int32 // P 的唯一标识符
status uint32 // P 的状态(pidle、prunning 等)
link puintptr // 链接到下一个空闲的 P
schedtick uint32 // 每次调度器调用时递增
syscalltick uint32 // 每次系统调用时递增
sysmontick sysmontick // sysmon 上次观察到的计数
m muintptr // 关联的 M(空闲时为 nil)
mcache *mcache // P 的内存缓存
pcache pageCache // 页面缓存
raceprocctx uintptr // Race Detector 的处理上下文
// 延迟池
deferpool []*_defer // 可用的延迟结构池
deferpoolbuf [32]*_defer // 缓存用于延迟结构池的缓冲区
// Goroutine ID 缓存
goidcache uint64 // 缓存的 Goroutine ID 起始值
goidcacheend uint64 // 缓存的 Goroutine ID 结束值
// 可运行的 Goroutine 队列
runqhead uint32 // 可运行队列的头部
runqtail uint32 // 可运行队列的尾部
runq [256]guintptr // 可运行队列,存储 Goroutine 的指针
// runnext 是当前 Goroutine 已准备好下一个运行的 Goroutine
// 它会继承当前 Goroutine 的剩余时间片,优化调度延迟。
runnext guintptr
// 已死亡的 Goroutine 列表
gFree struct {
gList
n int32 // 列表中 Goroutine 的数量
}
sudogcache []*sudog // Sudog 对象的缓存
sudogbuf [128]*sudog // Sudog 缓存的缓冲区
// 堆上的 mspan 对象缓存
mspancache struct {
len int // 缓存的 mspan 对象数量
buf [128]*mspan // 缓存的 mspan 对象
}
// Pinner 对象缓存,用于减少重复创建的分配
pinnerCache *pinner
trace pTraceState // 跟踪状态
palloc persistentAlloc // 持久性分配,用于避免互斥锁
// 垃圾回收 (GC) 状态
gcAssistTime int64 // assistAlloc 的时间(纳秒)
gcFractionalMarkTime int64 // fractional mark worker 的时间(纳秒)
limiterEvent limiterEvent // GC CPU 限制器的事件
gcMarkWorkerMode gcMarkWorkerMode // 下一个标记工作线程的模式
gcMarkWorkerStartTime int64 // 最近标记工作线程开始的时间
gcw gcWork // P 的 GC 工作缓存
wbBuf wbBuf // P 的 GC 写屏障缓冲区
runSafePointFn uint32 // 如果为 1,则在下一个安全点运行 sched.safePointFn
statsSeq atomic.Uint32 // P 的统计信息写入状态(偶数:未写入,奇数:写入中)
// 定时器堆
timers timers
// 累积的 goroutine 栈扫描大小
maxStackScanDelta int64 // 栈扫描增量(触发 gcController.maxStackScan 时刷新)
scannedStackSize uint64 // GC 时间扫描的栈大小
scannedStacks uint64 // GC 时间扫描的 Goroutine 数量
preempt bool // 标志是否需要尽快进入调度器
gcStopTime int64 // 上次进入 _Pgcstop 的时间戳
}
type schedt struct {
goidgen atomic.Uint64 // 全局 Goroutine ID 生成器
lastpoll atomic.Int64 // 上次网络轮询的时间,0 表示当前正在轮询
pollUntil atomic.Int64 // 当前轮询的睡眠时间
lock mutex // 调度器全局锁
// M 相关状态
midle muintptr // 空闲 M 列表,等待工作的 M
nmidle int32 // 空闲 M 的数量
nmidlelocked int32 // 被锁定且空闲的 M 数量
mnext int64 // 已创建的 M 数量及下一个 M 的 ID
maxmcount int32 // 允许的最大 M 数量
nmsys int32 // 系统级别的 M 数量,不计入死锁检测
nmfreed int64 // 已释放的 M 总数
ngsys atomic.Int32 // 系统 Goroutine 的数量
// P 相关状态
pidle puintptr // 空闲 P 列表
npidle atomic.Int32 // 空闲 P 的数量
nmspinning atomic.Int32 // 当前自旋中的 M 数量
needspinning atomic.Uint32 // 是否需要更多自旋 M 的标志,需持有 sched.lock 才可设置
// 全局可运行的 Goroutine 队列
runq gQueue // 全局队列
runqsize int32 // 队列大小
// 调度器禁用控制
disable struct {
user bool // 是否禁用用户 Goroutine 调度
runnable gQueue // 等待调度的 Goroutine 队列
n int32 // 队列长度
}
// 全局 G 缓存
gFree struct {
lock mutex // 缓存锁
stack gList // 带栈的 G 列表
noStack gList // 无栈的 G 列表
n int32 // 缓存的 G 数量
}
// sudog 对象的全局缓存
sudoglock mutex
sudogcache *sudog
// 延迟对象的全局缓存
deferlock mutex
deferpool *_defer
// 等待释放的 M 列表,通过 m.freelink 链接
freem *m
// GC 相关状态
gcwaiting atomic.Bool // 是否正在等待 GC
stopwait int32 // 停止等待的计数器
stopnote note // 停止的通知
sysmonwait atomic.Bool // 系统监控是否等待中
sysmonnote note // 系统监控通知
// 下一个 GC 安全点需执行的函数
safePointFn func(*p) // 每个 P 在安全点需要运行的函数
safePointWait int32 // 等待安全点的计数器
safePointNote note // 安全点的通知
profilehz int32 // CPU 性能分析的频率
// GOMAXPROCS 调整的时间统计
procresizetime int64 // 上次调整 GOMAXPROCS 的时间(纳秒)
totaltime int64 // 累计 GOMAXPROCS 的积分时间
sysmonlock mutex // 系统监控锁,阻止系统监控与 runtime 的交互
// 调度延迟分布
timeToRun timeHistogram // G 在 _Grunnable 到 _Grunning 状态的延迟分布
// P 的空闲时间
idleTime atomic.Int64 // 当前周期的空闲时间,GC 周期重置
// Goroutine 互斥锁等待时间
totalMutexWaitTime atomic.Int64 // Goroutine 在 _Gwaiting 状态下等待锁的总时间
// 停止世界(STW)延迟分布
stwStoppingTimeGC timeHistogram // GC 相关的 STW 停止延迟
stwStoppingTimeOther timeHistogram // 非 GC 的 STW 停止延迟
// STW 总延迟分布
stwTotalTimeGC timeHistogram // GC 相关的 STW 总延迟
stwTotalTimeOther timeHistogram // 非 GC 的 STW 总延迟
// 运行时锁等待时间
totalRuntimeLockWaitTime atomic.Int64 // G 在 _Grunnable 状态下等待 runtime 锁的总时间
}
~~~
> # 协程的调度不是随机的
* Go 协程调度**不是随机的**,但它也**不是按顺序**的。它是一种基于信号的抢占式、工作窃取的调度模型
* 调度器的目的是在多个协程之间公平分配 CPU 资源,并在必要时暂停某些协程,让其他协程有机会运行
~~~
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan struct{})
for i := 0; i < 10; i++ {
go func(num int) {
for {
<-ch
fmt.Println(num)
}
}(i)
time.Sleep(time.Millisecond)
}
time.Sleep(time.Second)
for j := 0; j < 10; j++ {
ch <- struct{}{}
//**不加 `time.Sleep(time.Millisecond)`** 时,多个 Goroutine 几乎同时启动,调度器会随机选择哪个 Goroutine 先得到 CPU,因此打印顺序不确定。
//**加了 `time.Sleep(time.Millisecond)`** 时,每个 Goroutine 启动后,主协程会休眠 1 毫秒。这个时间足够让调度器有机会依次启动每个 Goroutine,导致它们的打印顺序更加有序
//time.Sleep(time.Millisecond) 改变输出结果
}
time.Sleep(time.Minute)
}
~~~
> # CSP 模型
* **CSP**(Communicating Sequential Processes,通信顺序进程) 是一种并发编程模型,其核心理念是**不要通过共享内存来通信,而要通过通信来实现内存共享**。
* 在 Go 语言中,CSP 模型主要通过**Goroutine**和**Channel**来实现。多个 Goroutine 之间的通信通常使用**Channel**,从而避免了共享内存导致的并发问题。
> # 协程 (捕获异常 和 协程池)
* 直接用go关键字开协程,不捕获异常的话, 如果出现异常,会导致整个程序结束
* Go 语言中的**Goroutine**相较于系统线程来说非常轻量级,其初始栈大小仅为**2KB**。然而,在高并发场景下,大量的 Goroutine 被频繁创建和销毁,可能会对性能产生负面影响,并增加**GC(垃圾回收)**的压力。
* 为了减少 Goroutine 的创建和销毁所带来的性能损耗,建议充分**复用 Goroutine**。通过使用**Goroutine 池**或者其他方式来复用已经存在的 Goroutine,可以有效地降低系统的开销
* goroutine.go
~~~
package main
import (
"fmt"
)
// WorkerPool 定义一个工作池结构体
type WorkerPool struct {
maxWorkers int
taskQueue chan func()
}
// NewWorkerPool 创建一个新的工作池
func NewWorkerPool(maxWorkers int) *WorkerPool {
return &WorkerPool{
maxWorkers: maxWorkers,
taskQueue: make(chan func()),
}
}
// Start 启动工作池
func (wp *WorkerPool) Start() {
for i := 0; i < wp.maxWorkers; i++ {
go wp.worker()
}
}
// worker 执行任务的工作者 goroutine
func (wp *WorkerPool) worker() {
for task := range wp.taskQueue {
safeExecute(task)
}
}
// safeExecute 安全执行任务,捕获异常
func safeExecute(task func()) {
defer func() {
if r := recover(); r != nil {
fmt.Println("Recovered from panic:", r)
}
}()
task()
}
// Submit 提交任务到工作池
func (wp *WorkerPool) Submit(task func()) {
wp.taskQueue <- task
}
~~~
* main.go
~~~
package main
import (
"fmt"
"time"
)
var pool *WorkerPool
func init() {
pool = NewWorkerPool(20)
pool.Start()
}
func SafeGo(f func()) {
pool.Submit(f)
}
func main() {
for i := 0; i < 10; i++ {
SafeGo(func(num int) func() {
return func() {
fmt.Println("A", num)
}
}(i))
}
for i := 0; i < 10; i++ {
SafeGo(func(num int) func() {
return func() {
fmt.Println("B", num)
}
}(i))
}
time.Sleep(time.Second * 3)
}
~~~
> # channel 数据结构
* chan T // 可以接收和发送类型为 T 的数据
* chan<- T // 只可以用来发送 T 类型的数据
* <-chan T // 只可以用来接收 T 类型的数据
~~~
type hchan struct {
qcount uint // 通道中当前元素个数
dataqsiz uint // 缓冲区的大小 (无缓冲时为0)
buf unsafe.Pointer // (环形缓冲区)指向缓冲区的指针,缓冲区通过这个指针来存储数据,已经发送但还未被接收的数据
elemsize uint16 // 单个元素的大小(如果通道是 chan int 类型,那么每个元素就是一个 int,elemsize 表示 int 类型的字节大小。)
closed uint32 // 标识 channel 是否已关闭 (0 表示未关闭,1 表示已关闭)
timer *timer // 用于处理与该通道相关的超时操作
elemtype *_type // 指向类型描述符的指针,它包含了通道中传输数据类型的所有信息
sendx uint // 下一个要发送元素的位置
recvx uint // 下一个要接收元素的位置 (每次读取数据后,recvx 都会递增,当到达缓冲区末尾时,recvx 会重置为 0,形成环形读取操作)
recvq waitq // 等待接收数据的 goroutine 队列
sendq waitq // 等待发送数据的 goroutine 队列
lock mutex // 互斥锁,用于保护 channel 的操作
}
~~~
> # 操作 nil channel, close channel, 正常 channel
| 操作 | nil channel | close channel | 正常 channel |
| --- | --- | --- | --- |
| close | panic: close of nil channel | panic: close of closed channel | 正常关闭 |
| 读操作 | fatal error: all goroutines are asleep - deadlock! | 有未接收的值可以正常读, 没有的话读到对应类型的零值 | 阻塞/正常读数据 |
| 写操作 | fatal error: all goroutines are asleep - deadlock! | panic: send on closed channel | 阻塞/正常写数据 |
> # channel 发送和接收数据的本质
~~~
//向 channel 发送值类型会拷贝, 发送引用类型拷贝的是引用
package main
import "fmt"
func main() {
ch := make(chan []int, 1)
s := make([]int, 1)
s[0] = 1
ch <- s
s[0] = 2
fmt.Println(<-ch) //输出2
}
~~~
> # range channel
~~~
package main
import (
"fmt"
"time"
)
func main() {
ch := make(chan int)
go func() {
time.Sleep(time.Second * 10)
close(ch)
}()
//如果通道没有关闭,但生产者没有再发送数据,range 会持续阻塞,直到有新的数据发送到通道或通道被关闭
for v := range ch {
fmt.Println(v)
}
fmt.Println("Done")
}
~~~
> # 使用 select 来多路复用 channel
~~~
package main
import (
"fmt"
"time"
)
// 随机选择:当多个 channel 同时满足条件时,select 会随机选择一个执行。(一个channel先准备,另一个channel后准备好,select也是随机选择)
// 默认分支:可以在 select 中添加 default 分支,当所有的 channel 都没有数据时,select 可以立即执行 default 分支而不阻塞。
func main() {
ch1 := make(chan struct{})
ch2 := make(chan struct{})
go func() {
ch1 <- struct{}{}
}()
go func() {
ch2 <- struct{}{}
}()
go func() {
for {
select {
case <-ch1:
fmt.Println("ch1")
time.Sleep(1 * time.Second)
case <-ch2:
fmt.Println("ch2")
time.Sleep(1 * time.Second)
case <-time.After(2 * time.Second):
fmt.Println("超时")
time.Sleep(1 * time.Second)
default:
fmt.Println("default")
time.Sleep(1 * time.Second)
}
}
}()
time.Sleep(time.Second * 10)
close(ch1) //通道关闭后,select 还会执行
time.Sleep(time.Second * 10)
}
~~~
> # 如何优雅的关闭 channel
~~~
//channel关闭原则: 不要在消费端关闭channel,不要再生产端有多个并行的时候执行关闭操作
//判断channel是否关闭: v, ok := <-ch 取值的时候加判断, 关闭的channel, v 返回对应类型的零值, ok 返回 false。用这种方式去判断channel 是否关闭有副作用, 会读出channel里的元素
//如何优雅的关闭channel
//--- 1.直接关闭channel用recover捕获异常
//----2.sync.Once来确保channel只关闭一次
//--- 3.增加状态字段存channel是否关闭(需要用到锁)
//--- 4.增加一个channel作为关闭信号(如下代码一)
//--- 5.使用context(如下代码二)
//用完的channel, 没有发送者, 没有接收者, 也没有关闭会一直占用内存吗? 会的
//已关闭的channel, 还有未接收的数据, 但是没有发送者和接受者会一直占用内存吗? 不会, 如果没有任何引用指向已关闭的通道,它会被垃圾回收器回收,并释放内存
//为什么go不提供一个函数来判断 channel 是否关闭? 避免竞争条件(获取的那一时刻没关闭,后续关闭问题)
~~~
* 代码1
~~~
package main
import (
"fmt"
"sync"
"time"
)
func main() {
ch := make(chan int, 100)
chClose := make(chan struct{})
var (
wgSend sync.WaitGroup
)
wgSend.Add(10)
// 启动生产者
for i := 0; i < 10; i++ {
go func(num int) {
defer wgSend.Done()
for {
select {
case <-chClose:
fmt.Println("发送关闭", num)
return
case ch <- num:
}
}
}(i)
}
// 启动消费者
for i := 0; i < 10; i++ {
go func(num int) {
for {
select {
case <-chClose:
fmt.Println("接收关闭", num)
return
case v := <-ch:
fmt.Println("接收到的数据", v)
}
}
}(i)
}
time.Sleep(time.Second * 10)
close(chClose)
wgSend.Wait()
close(ch)
}
~~~
* 代码2
~~~
package main
import (
"context"
"fmt"
"sync"
"time"
)
func main() {
ch := make(chan int, 100)
ctx, cancel := context.WithCancel(context.Background())
var (
wgSend sync.WaitGroup
)
wgSend.Add(10)
// 启动生产者
for i := 0; i < 10; i++ {
go func(ctx context.Context, num int) {
defer wgSend.Done()
for {
select {
case <-ctx.Done():
fmt.Println("发送关闭", num)
return
case ch <- num:
}
}
}(ctx, i)
}
// 启动消费者
for i := 0; i < 10; i++ {
go func(ctx context.Context, num int) {
for {
select {
case <-ctx.Done():
fmt.Println("接收关闭", num)
return
case v := <-ch:
fmt.Println("接收到的数据", v)
}
}
}(ctx, i)
}
time.Sleep(time.Second * 10)
cancel() // 通知所有协程关闭
wgSend.Wait()
close(ch)
}
~~~
> # 交替打印
~~~
package main
import (
"fmt"
"time"
)
func main() {
//无缓冲通道的特点是发送和接收必须同时发生,否则操作会阻塞(不能是ch := make(chan struct{}, 1),有缓冲了会出现抢占)
ch := make(chan struct{})
go func() {
for {
<-ch
fmt.Println(1)
ch <- struct{}{}
}
}()
go func() {
for {
<-ch
fmt.Println(2)
ch <- struct{}{}
}
}()
ch <- struct{}{}
time.Sleep(time.Hour)
}
~~~
- 草稿
- Golang
- 切片 slice
- 数组和切片的区别
- 左闭右开
- make([]int, 5) 和 make([]int, 0, 5) 区别
- 切片非线程安全,并发操作为啥不会像map一样报错
- []struct{} 如何遍历
- 切片如何删除某个元素
- append 一个nil 切片
- 哈希表 map
- 并发操作
- 并发写报错
- 并发读不会报错
- 并发读有写报错
- 并发迭代有写报错
- 自制并发安全字典
- 官方并发安全字典
- 对未初始化的 map 进行赋值操作
- map的底层
- 无序输出
- 等量扩容
- 实现集合
- map的key可以使哪些值
- 协程 go
- 协程相关阅读
- 进程、线程、协程
- 协程 (捕获异常 和 协程池)
- GPM 模型
- CSP模型
- channel
- channel 相关操作
- 交替打印
- 如何让channel 只能接收/只能发送
- channel 常见报错
- channel 死锁
- nil channel 和 已关闭的 channel
- 使用 select 来多路复用 channel
- channel 的使用
- 接口和结构体
- 简单使用
- 两个结构体能否比较
- 工厂模式
- 概念
- 简单工厂
- 方法工厂
- 堆和栈,值类型和引用类型,内存逃逸,垃圾回收
- 栈和堆
- 内存逃逸
- 值类型和引用类型
- 垃圾回收方式
- 性能优化分析工具 pprof
- golang 代码片段
- 片段一 defer
- 片段二 channel
- Golang 相关
- Golang 相关阅读
- Golang 1-10
- make 和 new 的区别
- 使用指针的场景
- Go语言的context包
- 位运算
- Copy 是浅拷贝还是深拷贝
- init 函数 和 sync.Once
- select 多路复用
- Golang 其它
- MongoDB
- 可比较类型 与 可转json 类型
- Gorm
- 面向对象和面向过程
- go语言实现-面向对象
- go语言实现-面向过程
- 限流,熔断,降级
- 了解
- 熔断配置
- 熔断例子
- 服务降级
- github.com/alibaba/sentinel-golang
- 互斥锁 读写锁 原子锁
- 为什么需要锁
- 互斥锁
- 读写锁
- 原子锁
- 互斥锁性能对比
- 原子锁性能对比
- 互斥锁 or 原子锁?
- 条件锁
- 计数器
- GoFrame
- GF1.16版本
- 修改使用的表
- 按天、周、月、年
- GoFrame 文档
- 配置文件
- 生成脚本
- 排序算法
- 相关排序
- 冒泡排序
- 选择排序
- 插入排序
- 快速排序
- 归并排序
- 堆排序
- 数据库
- 分布式怎么保证线程安全
- 数据库实现方式
- 基于表记录
- 乐观锁
- 悲观锁
- Redis实现方式
- Zookeeper实现方式
- Mysql 相关
- group_concat
- 索引优化
- 索引优化1
- 定期分析和优化索引
- 覆盖索引
- 组合索引
- 聚簇索引和非聚簇索引
- 索引类型与方式、聚簇与非聚簇索引
- 事务特征和隔离级别
- 查询优化
- mysql自增表插入数据时,Id不连续问题
- InnoDB引擎 和 MyISAM引擎区别
- 锁
- 悲观锁和乐观锁
- 查询,更新,插入语句
- 什么是死锁
- 怎么处理死锁
- MySQL 隔离级别
- 事务特征
- 隔离级别
- 废弃3
- 索引
- 索引类型和方式、聚簇和非聚簇索引(上)
- 索引类型和方式、聚簇和非聚簇索引(下)
- 回表、覆盖索引、最左前缀、联合索引、索引下推、索引合并
- Mysql 优化
- 索引的原理
- 千万级表修改表结构
- Redis
- 获取随机三条数据
- Redis 持久化方式
- 全量模式 RDB 冷备份(内存快照)
- 增量模式 AOF 热备份(文件追加)
- 过期key的删除策略、内存淘汰机制
- 数据结构
- 位图
- 网络
- 网络相关
- 游戏同步方式:帧同步和状态同步
- Websocket
- OSI模型
- TCP 与 UDP
- 三次握手四次挥手
- Http 状态码
- 1xx(信息性状态码)
- 101 服务端代码
- 101 客户端代码
- 2xx(成功状态码)
- 3xx(重定向状态码)
- 302 服务端代码
- 302 客户端代码
- 4xx(客户端错误状态码)
- 5xx(服务器错误状态码)
- 如何排查接口问题
- 网络请求和响应过程
- time_wait
- keep-alive
- http 和 rpc 的区别
- I/O多路复用 select和poll
- too many open file
- 其它技术
- git 相关操作
- 修改提交备注
- 多个提交合并成一个提交
- 回退版本
- 小程序和公众号
- 消息模板
- 获取code
- 静默登录
- 其它技术相关
- C盘空间不足
- 生成式人工智能AIGC
- 共享文件
- 接口文档, mock提供测试数据
- 抓包工具
- Python
- 安装包失败
- 自动化测试 Scrapy
- AIGC:人工智能生成内容
- PHP
- xhprof 性能分析
- 一键安装
- 哈希冲突的解决方式
- 链地址法(拉链法)
- 开放地址法
- 再哈希
- 概念1
- Nginx
- 负载均衡方式
- 加密解密
- 简单了解
- 签名算法例子
- 码例子1
- 代码例子2
- Linux
- netstat (用于查看和管理网络连接和路由表)
- ps 用于查看和管理进程
- ab 压测
- nohup 守护进程
- lsof (List Open File 获取被进程打开文件的信息)
- tail 查看日志
- 各类linux同步机制
- Socket 服务端的实现,select 和epoll的区别?
- scp 传输,awk 是一个强大的文本分析工具
- pidof
- 项目
- 棋牌
- 牌的编码
- 出牌规则
- 洗牌
- 股票
- 股票知识
- 龙虎榜数据缓存方式
- 单日龙虎榜数据
- 单只股票的历史上榜
- 遇到的问题
- 浮点数精度问题
- Mysql Sum 精度问题(float, double精度问题)
- 分页问题(数据重复)
- 工具包
- v3
- common.go
- common_test.go
- customized.go
- customized_test.go
- slice.go
- slice_test.go
- time.go
- time_test.go
- v4
- common.go
- common_test.go
- customized.go
- customized_test.go
- slice.go
- time.go
- time_test.go
- 相关阅读
- 协程 goroutine
- 通道 channel
- json 和 gob 序列化和反序列化
- redis 有序集合
- mysql22
- 相关阅读 s
- pyTorch
- defer
- 内存泄漏
- 数据传输
- 杂项
- 一提
- gogogoo
- 内容