企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 简介 ~~~ var mailbox uint8 var lock sync.RWMutex sendCond := sync.NewCond(&lock) recvCond := sync.NewCond(lock.RLocker()) ~~~ **本身不是锁,要与锁结合使用** go标准库中的sync.Cond类型代表了条件变量. 条件变量要与锁(互斥锁,或者读写锁)一起使用.成员变量L代表与条件变量搭配使用的锁 ~~~ type Cond struct { noCopy noCopy L Locker notify notifyList checker copyChecker } ~~~ 对应有3个常用方法: Wait, Signal, Broadcast ~~~ func (c *Cond) Wait() ~~~ * 阻塞等待条件变量满足,等醒 * 释放已掌握的互斥锁相当于cond.L.Unlock().**注意:1,2两步为一个原子操作** * 当被唤醒的时候,Wait()返回,解除阻塞并重新获取互斥锁.相当于cond.L.Lock() 为什么wait要做那3步操作,因为你在等待的时候,把锁释放掉啊,让别人访问公共空间,然后你被唤醒的时候,你需要拿到锁,拿到锁才能对公共空间访问 ~~~ func (c *Cond) Signal() ~~~ **Signal()通知的顺序是根据原来加入通知列表(Wait())的先入先出** **若没有Wait(),也不会报错** **单发通知,一次一个**,给一个正在等待(阻塞)在该条件变量上的协程发送通知 ~~~ func (c *Cond) Broadcast() ~~~ **广播通知,都醒了,惊群**,给正在等待(阻塞)在该条件变量上的所有协程发送通知 # 生产者消费者 ![](https://box.kancloud.cn/e1a172592fa15c6ebd6943eaad94baa6_799x437.png) 代码注意点是,那里用for,不用for用if的haul,唤醒后往下执行,如果容量满的话是会阻塞的,如果是for的话,wait好的话会再次判断下的,if没有再次判断 用if的话,会出现问题而且是偶尔的出现,因为if里面如果唤醒,那么往下如果阻塞,阻塞的话,消费者无法唤醒他了,因为wait已经走过了 ~~~ //创建全局条件变量 var cond sync.Cond //生产者 func producer(out chan<- int, idx int) { for { //条件变量对应互斥锁加锁 cond.L.Lock() //注意这边用for不能用if //循环判断,如果条件不满足直接跳过,满足就等待,因为怕唤醒后有多个生产者一下子让他充满 //让他解开的同时,顺便判断下,怕其他生产者已经写到了3个 for len(out) == 3 { //产品区满,等待消费者 cond.Wait() //挂起当前协程,等待条件变量满足,被消费者唤醒 } num := rand.Intn(1000) //产生一个随机数 out <- num fmt.Println("---生产者---产生数据---剩余多少个---", idx, num, len(out)) cond.L.Unlock() //生产结束,解锁互斥锁 cond.Signal() //唤醒阻塞的消费者 time.Sleep(time.Second) } } //消费者 func consumer(in <-chan int, idx int) { for { //条件变量对应互斥锁加锁(与生产者是同一个) cond.L.Lock() //产品区为空,等待生产者生产 for len(in) == 0 { cond.Wait() } //将channel中的数据读取(消费) num := <-in fmt.Println("---消费者---消费数据---公共区剩余多少个---", idx, num, len(in)) //消费结束,解锁互斥锁 cond.L.Unlock() //唤醒阻塞的生产者 cond.Signal() //消费者休息一会儿,给其他协程机会 time.Sleep(time.Millisecond * 500) } } func main() { rand.Seed(time.Now().UnixNano()) //产品区(公共区)使用channel模拟 product := make(chan int, 3) //创建互斥锁和条件变量 cond.L = new(sync.Mutex) //生产者 for i := 0; i < 5; i++ { go producer(product, i+1) } //消费者 for i := 0; i < 3; i++ { go consumer(product, i+1) } for { ; } } ~~~ # 注意点 我们在利用条件变量等待通知的时候,需要在它基于的那个互斥锁保护下进行。而在进行单发通知或广播通知的时候,却是恰恰相反的,也就是说,需要在对应的互斥锁解锁之后再做这两种操作。 --- 条件变量并不是被用来保护临界区和共享资源的,它是用于协调想要访问共享资源的那些线程的。当共享资源的状态发生变化时,它可以被用来通知被互斥锁阻塞的线程。 --- 把调用它的 goroutine(也就是当前的 goroutine)加入到当前条件变量的通知队列中。 解锁当前的条件变量基于的那个互斥锁。 让当前的 goroutine 处于等待状态,等到通知到来时再决定是否唤醒它。此时,这个 goroutine 就会阻塞在调用这个Wait方法的那行代码上。 --- 如果通知到来并且决定唤醒这个 goroutine,那么就在唤醒它之后重新锁定当前条件变量基于的互斥锁。自此之后,当前的 goroutine 就会继续执行后面的代码了 --- 如果一个 goroutine 因收到通知而被唤醒,但却发现共享资源的状态,依然不符合它的要求,那么就应该再次调用条件变量的Wait方法,并继续等待下次通知的到来。 --- 条件变量的Wait方法总会把当前的 goroutine 添加到通知队列的队尾,而它的Signal方法总会从通知队列的队首开始,查找可被唤醒的 goroutine。所以,因Signal方法的通知,而被唤醒的 goroutine 一般都是最早等待的那一个。 --- 最后,请注意,**条件变量的通知具有即时性**。也就是说,如果发送通知的时候没有 goroutine 为此等待,那么该通知就会被直接丢弃。在这之后才开始等待的 goroutine 只可能被后面的通知唤醒。 # 适合什么 条件变量适合保护那些可执行两个对立操作的共享资源。比如,一个既可读又可写的共享文件。又比如,既有生产者又有消费者的产品池。 **尽量少的锁争** 相对应的,我们在调用条件变量的 Wait 方法的时候,应该处在其中的锁的保护之下。因为有同一个锁保护,所以不可能有多个 goroutine 同时执行到这个 Wait 方法调用,也就不可能存在针对其中锁的重复解锁。 对于同一个锁,多个 goroutine 对它重复锁定时只会有一个成功,其余的会阻塞;多个 goroutine 对它重复解锁时也只会有一个成功,但其余的会抛 panic