企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] 有时候我们的代码中可能会存在多个 goroutine 同时操作一个资源(临界区)的情况,这种情况下就会发生竞态问题(数据竞态)。这就好比现实生活中十字路口被各个方向的汽车竞争,还有火车上的卫生间被车厢里的人竞争。 示例如下: ```go package main import ( "fmt" "sync" ) var ( total int swg sync.WaitGroup ) func sum(j int) { defer swg.Done() for i := 0; i < j; i++ { total += i } } func main() { swg.Add(2) go sum(20000) go sum(20000) swg.Wait() fmt.Printf("total: %v\n", total) } // 运行10次的结果: // total: 337931955 // total: 228638665 // total: 256345036 // total: 378517386 // total: 283447322 // total: 366658153 // total: 248299535 // total: 251935294 // total: 399980000 // total: 313504210 ``` >[info] 出现上面的结果出现累加和不一致。是因为开了两个协程执行 sum 函数,当两个协程同时执行 `total += i` 时,两个读到的 total 变量指都是一样的,加上i的值。所以启动一个加的i就丢失了, 出现上面的状况... ## 互斥锁 互斥锁是一种常用的控制共享资源访问的方法,它能够保证同一时间只有一个 `goroutine` 可以访问共享资源。Go 语言中使用 `sync` 包中提供的 `Mutex` 类型来实现互斥锁。 `sync.Mutex` 提供了两个方法供我们使用。 | 方法名 | 功能 | | :-: | :-: | | func (m *Mutex) Lock() | 获取互斥锁 | | func (m *Mutex) Unlock() | 释放互斥锁 | 优化上述示例的代码 ```go package main import ( "fmt" "sync" ) var ( total int swg sync.WaitGroup sm sync.Mutex ) func sum(j int) { defer swg.Done() for i := 0; i < j; i++ { sm.Lock() total += i sm.Unlock() } } func main() { for i := 0; i < 2; i++ { swg.Add(1) go sum(20000) } swg.Wait() fmt.Printf("total: %v\n", total) } // 运行10次的结果: // total: 399980000 // total: 399980000 // total: 399980000 // total: 399980000 // total: 399980000 // total: 399980000 // total: 399980000 // total: 399980000 // total: 399980000 // total: 399980000 ``` 使用互斥锁能够保证同一时间有且只有一个 goroutine 进入临界区,其他的 goroutine 则在等待锁;当互斥锁释放后,等待的 goroutine 才可以获取锁进入临界区,多个 goroutine 同时等待一个锁时,唤醒的策略是随机的。 ## 读写互斥锁 互斥锁是完全互斥的,但是实际上有很多场景是读多写少的,当我们并发的去读取一个资源而不涉及资源修改的时候是没有必要加互斥锁的,这种场景下使用读写锁是更好的一种选择。读写锁在 Go 语言中使用sync包中的RWMutex类型。 当goroutine对资源加读锁,其他goroutine也可以读该资源,但不能对该资源做写操作;当goroutine对资源加写锁,其他goroutine不能够对该资源进行读和写操作。 `sync.RWMutex` 提供了以下5个方法。 | 方法名 | 功能 | | :-: | :-: | | func (rw *RWMutex) Lock() | 获取写锁 | | func (rw *RWMutex) Unlock() | 释放写锁 | | func (rw *RWMutex) RLock() | 获取读锁 | | func (rw *RWMutex) RUnlock() | 释放读锁 | | func (rw *RWMutex) RLocker() Locker | 返回一个实现Locker接口的读写锁 | 写个互斥锁和读写互斥锁的耗时对比。 ```go package main import ( "fmt" "sync" "time" ) var ( total int swg sync.WaitGroup sm sync.Mutex srwm sync.RWMutex ) func getTotalMutex() { defer swg.Done() // 假设读操作需要 1 毫秒 sm.Lock() time.Sleep(time.Millisecond) fmt.Sprintln(total) sm.Unlock() } func setTotalMutex() { defer swg.Done() // 假设写操作需要 3 毫秒 sm.Lock() time.Sleep(time.Millisecond * 3) total++ sm.Unlock() } func getTotalRWMutex() { defer swg.Done() // 假设读操作需要 1 毫秒 srwm.RLock() time.Sleep(time.Millisecond) fmt.Sprintln(total) srwm.RUnlock() } func setTotalRWMutex() { defer swg.Done() // 假设写操作需要 3 毫秒 srwm.Lock() time.Sleep(time.Millisecond * 3) total++ srwm.Unlock() } func main() { // Mutex 锁执行花费所用的时间 t1 := time.Now() for i := 0; i < 50; i++ { swg.Add(1) go func() { setTotalMutex() }() } for i := 0; i < 500; i++ { swg.Add(1) go func() { getTotalMutex() }() } swg.Wait() fmt.Printf("Mutex -> total: %v, cost: %v\n", total, time.Since(t1)) // RWMutex 锁执行花费所用的时间 t2 := time.Now() for i := 0; i < 50; i++ { swg.Add(1) go func() { setTotalRWMutex() }() } for i := 0; i < 500; i++ { swg.Add(1) go func() { getTotalRWMutex() }() } swg.Wait() fmt.Printf("RWMutex -> total: %v, cost: %v\n", total, time.Since(t2)) } // 运行结果 // Mutex -> total: 50, cost: 2.628357908s // RWMutex -> total: 100, cost: 248.810775ms ``` ## sync.WaitGroup(等待锁) 在代码中生硬的使用 `time.Sleep` 肯定是不合适的,Go语言中可以使用 `sync.WaitGroup` 来实现并发任务的同步。`sync.WaitGroup` 有以下几个方法: | 方法名 | 功能 | | :-: | :-: | | func (wg * WaitGroup) Add(delta int) | 计数器+delta | | (wg *WaitGroup) Done() | 计数器-1 | | (wg *WaitGroup) Wait() | 阻塞直到计数器变为0 | `sync.WaitGroup` 内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了 N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用 Done 方法将计数器减1。通过调用 Wait 来等待并发任务执行完,当计数器值为 0 时,表示所有并发任务已经完成。 ```go var swg sync.WaitGroup func demo(i int) { defer swg.Done() fmt.Printf("Hello demo%d\n", i) } func main(){ fmt.Println("hello main begin~") for i := 1; i <= 5; i++ { swg.Add(1) go demo(i) } swg.Wait() fmt.Println("hello main end~") } // 运行结果 // hello main begin~ // Hello demo5 // Hello demo2 // Hello demo3 // Hello demo4 // Hello demo1 // hello main end~ ``` ## sync.Once Once 常常用来初始化单例资源,或者并发访问只需初始化一次的共享资源 `sync.Once` 主要用于以下场景: - 单例模式:确保全局只有一个实例对象,避免重复创建资源。 - 延迟初始化:在程序运行过程中需要用到某个资源时,通过 sync.Once 动态地初始化该资源。 - 只执行一次的操作:例如只需要执行一次的配置加载、数据清理等操作。 示例代码如下 ```go var ( swg sync.WaitGroup once sync.Once ) func main() { swg.Add(1) for i := 0; i < 155555; i++ { go once.Do(func() { defer swg.Done() fmt.Println(i) }) } swg.Wait() } // 运行结果 // i: 137 ``` >[info] 注意:`sync.WaitGroup` 只能 `Add()` 函数,无论并发有多少次,都只能为1。不然就会出现 `fatal error: all goroutines are asleep - deadlock!` 报错 ## sync.map golang默认的map不是线程安全的,并发写操作map时会panic `fatal error: concurrent map writes`。sync包提供了并发安全map的功能。采用了空间换时间的方法。在读多写少,写入后不需要频繁更新的场景比较适合使用。 sync.Map 内置常用几种方法 | 方法名 | 功能 | | :-: | :-: | | func (m *Map) Store(key, value interface{}) | 存储key-value数据 | | func (m *Map) Load(key interface{}) (value interface{}, ok bool) | 查询key对应的value | | func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) | 查询或存储key对应的value | | func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) | 查询并删除key | | func (m *Map) Delete(key interface{}) | 删除key | | func (m *Map) Range(f func(key, value interface{}) bool) | 对map中的每个key-value依次调用f | 下面的代码示例演示了并发读写sync.Map ```go func mian() { m := sync.Map{} for i := 0; i < 5; i++ { swg.Add(1) go func(i int) { defer swg.Done() // 新增、修改 m.Store(fmt.Sprintf("name%d", i), i) }(i) } swg.Wait() // 查询 key := "name0" // value, ok := m.Load(key) if value, ok := m.Load(key); ok { fmt.Printf("value: %v\n", value) } else { fmt.Printf("%v is not exist", key) } // 删除 key = "name4" value, ok := m.LoadAndDelete(key) if ok { fmt.Printf("delete %v: %v\n", key, value) } else { fmt.Printf("%v is not exist\n", key) } // 遍历变量m的key-value m.Range(func(key, value interface{}) bool { fmt.Println(key, value) return true }) } // 运行结果 // value: 0 // delete name4: 4 // name2 2 // name3 3 // name0 0 // name1 1 ```