企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
> # 进程,线程,协程 - **进程**:操作系统中资源分配的基本单位,具有独立的地址空间, 上下文切换开销大,适合需要隔离性和独立性高的任务 - **线程**:进程中的执行单元,同一进程的线程共享内存和资源,适合需要大量并发且能够共享资源的任务 - **协程**:比线程更轻量级的执行单元,由用户态调度管理。协程的调度由程序自行控制,性能开销更低,适合大量并发场景。 > # GMP 原理和调度 - [GMP模型](https://go.cyub.vip/gmp/gmp-model/) - G是goroutine,M是线程,P是处理器。 P负责调度goroutine到线程上,维护了一个本地队列,存储了所有需要它来调度的G - P 的数量通常与逻辑 CPU 数量相等,可以通过 `runtime.GOMAXPROCS` 来调整。P 的数量越多,并发能力越强,但也会导致调度开销增加 - `runtime.Gosched()` 可以主动让出执行权 - 当创建新的 Goroutine 时,它通常被加入到当前 P 的本地队列中。如果当前 P 的本地队列已满(如超过 256 个 Goroutine),部分 Goroutine 会被移至全局队列 - P 先尝试从本地队列中获取 Goroutine。如果本地队列为空,P 会尝试从全局队列中获取 Goroutine。如果全局队列也为空,P 会从其他 P 的本地队列中“窃取” Goroutine 执行(窃取 Goroutine 时,通常会窃取一半数量的 Goroutine) - M 执行一个 G 时,必须绑定一个 P。如果 M 因阻塞操作(如系统调用)而无法继续执行,P 会解除与 M 的绑定,寻找另一个可用的 M 执行其他 G。原 M 完成阻塞操作后,将试图重新获取一个 P 继续工作 > # 协程的调度不是随机的 - Go 协程调度**不是随机的**,但它也**不是按顺序**的。它是一种基于信号的抢占式、工作窃取的调度模型 - 调度器的目的是在多个协程之间公平分配 CPU 资源,并在必要时暂停某些协程,让其他协程有机会运行 ~~~ package main import ( "fmt" "time" ) func main() { ch := make(chan struct{}) for i := 0; i < 10; i++ { go func(num int) { for { <-ch fmt.Println(num) } }(i) time.Sleep(time.Millisecond) } time.Sleep(time.Second) for j := 0; j < 10; j++ { ch <- struct{}{} //**不加 `time.Sleep(time.Millisecond)`** 时,多个 Goroutine 几乎同时启动,调度器会随机选择哪个 Goroutine 先得到 CPU,因此打印顺序不确定。 //**加了 `time.Sleep(time.Millisecond)`** 时,每个 Goroutine 启动后,主协程会休眠 1 毫秒。这个时间足够让调度器有机会依次启动每个 Goroutine,导致它们的打印顺序更加有序 //time.Sleep(time.Millisecond) 改变输出结果 } time.Sleep(time.Minute) } ~~~ > # CSP 模型 * **CSP**(Communicating Sequential Processes,通信顺序进程) 是一种并发编程模型,其核心理念是**不要通过共享内存来通信,而要通过通信来实现内存共享**。 * 在 Go 语言中,CSP 模型主要通过 **Goroutine** 和 **Channel** 来实现。多个 Goroutine 之间的通信通常使用 **Channel**,从而避免了共享内存导致的并发问题。 > # 协程 (捕获异常 和 协程池) - 直接用go关键字开协程,不捕获异常的话, 如果出现异常,会导致整个程序结束 - Go 语言中的 **Goroutine** 相较于系统线程来说非常轻量级,其初始栈大小仅为 **2KB**。然而,在高并发场景下,大量的 Goroutine 被频繁创建和销毁,可能会对性能产生负面影响,并增加 **GC(垃圾回收)** 的压力。 - 为了减少 Goroutine 的创建和销毁所带来的性能损耗,建议充分 **复用 Goroutine**。通过使用 **Goroutine 池** 或者其他方式来复用已经存在的 Goroutine,可以有效地降低系统的开销 - goroutine.go ~~~ package main import ( "fmt" ) // WorkerPool 定义一个工作池结构体 type WorkerPool struct { maxWorkers int taskQueue chan func() } // NewWorkerPool 创建一个新的工作池 func NewWorkerPool(maxWorkers int) *WorkerPool { return &WorkerPool{ maxWorkers: maxWorkers, taskQueue: make(chan func()), } } // Start 启动工作池 func (wp *WorkerPool) Start() { for i := 0; i < wp.maxWorkers; i++ { go wp.worker() } } // worker 执行任务的工作者 goroutine func (wp *WorkerPool) worker() { for task := range wp.taskQueue { safeExecute(task) } } // safeExecute 安全执行任务,捕获异常 func safeExecute(task func()) { defer func() { if r := recover(); r != nil { fmt.Println("Recovered from panic:", r) } }() task() } // Submit 提交任务到工作池 func (wp *WorkerPool) Submit(task func()) { wp.taskQueue <- task } ~~~ - main.go ~~~ package main import ( "fmt" "time" ) var pool *WorkerPool func init() { pool = NewWorkerPool(20) pool.Start() } func SafeGo(f func()) { pool.Submit(f) } func main() { for i := 0; i < 10; i++ { SafeGo(func(num int) func() { return func() { fmt.Println("A", num) } }(i)) } for i := 0; i < 10; i++ { SafeGo(func(num int) func() { return func() { fmt.Println("B", num) } }(i)) } time.Sleep(time.Second * 3) } ~~~