channel 是 CSP 模式的具体实现,用于多个 goroutine 通讯。 其内部实现了同步,确保并发安全。多个goroutine同时访问,不需要加锁。 由于管道容量是5,开启go写入10个数据,再写入5个数据,会阻塞,然而read每秒会读取一个,然后在会写入一个数据。 ~~~ package main import ( "fmt" "time" ) func write(ch chan int) { for i := 0; i < 10; i++ { ch <- i fmt.Println("put data:", i) } } func read(ch chan int) { for { var b int b = <-ch fmt.Println(b) time.Sleep(time.Second) } } func main() { intChan := make(chan int, 5) go write(intChan) go read(intChan) time.Sleep(10 * time.Second) } ~~~ 输出结果: ~~~ put data: 0 put data: 1 put data: 2 put data: 3 put data: 4 put data: 5 0 1 put data: 6 2 put data: 7 3 put data: 8 4 put data: 9 5 6 7 8 9 ~~~ 默认为同步模式,需要发送和接收配对。否则会被阻塞,直到另一方准备好后被唤醒。 ~~~ package main import "fmt" func main() { data := make(chan int) // 数据交换队列 exit := make(chan bool) // 退出通知 go func() { for d := range data { // 从队列迭代接收数据,直到 close 。 fmt.Println(d) } fmt.Println("recv over.") exit <- true // 发出退出通知。 }() data <- 1 // 发送数据。 data <- 2 data <- 3 close(data) // 关闭队列。 fmt.Println("send over.") <-exit // 等待退出通知。 } ~~~ 输出结果: ~~~ 1 2 send over. 3 recv over. ~~~ 异步方式通过判断缓冲区来决定是否阻塞。如果缓冲区已满,发送被阻塞;缓冲区为空,接收被阻塞。 通常情况下,异步 channel 可减少排队阻塞,具备更高的效率。但应该考虑使用指针规避大对象拷贝,将多个元素打包,减小缓冲区大小等。 ~~~ package main import ( "fmt" ) func main() { data := make(chan int, 3) // 缓冲区可以存储 3 个元素 exit := make(chan bool) data <- 1 // 在缓冲区未满前,不会阻塞。 data <- 2 data <- 3 go func() { for d := range data { // 在缓冲区未空前,不会阻塞。 fmt.Println(d) } exit <- true }() data <- 4 // 如果缓冲区已满,阻塞。 data <- 5 close(data) <-exit } ~~~ 输出结果: ~~~ 1 2 3 4 5 ~~~ channel选择 : 如果需要同时处理多个 channel,可使用 select 语句。它随机选择一个可用 channel 做收发操作,或执行 default case。 ~~~ package main import ( "fmt" "os" ) func main() { a, b := make(chan int, 3), make(chan int) go func() { v, ok, s := 0, false, "" for { select { case v, ok = <-a: s = "a" case v, ok = <-b: s = "b" } if ok { fmt.Println(s, v) } else { os.Exit(0) } } }() for i := 0; i < 5; i++ { select { // 随机选择可 channel,接收数据。 case a <- i: case b <- i: } } close(a) select {} // 没有可用 channel,阻塞 main goroutine。 } ~~~ 输出结果: ~~~ // 每次运行输出结果都不同 b 3 a 0 a 1 a 2 b 4 ~~~ 在循环中使用 select default case 需要小心,避免形成洪水。 模式 :用简单工厂模式打包并发任务和 channel。 ~~~ package main import ( "math/rand" "time" ) func NewTest() chan int { c := make(chan int) rand.Seed(time.Now().UnixNano()) go func() { time.Sleep(time.Second) c <- rand.Int() }() return c } func main() { t := NewTest() println(<-t) // 等待 goroutine 结束返回。 } ~~~ 用 channel 实现信号量 (semaphore)。 ~~~ package main import ( "fmt" "runtime" "sync" ) func main() { runtime.GOMAXPROCS(2) wg := sync.WaitGroup{} wg.Add(3) sem := make(chan int, 1) for i := 0; i < 3; i++ { go func(id int) { defer wg.Done() sem <- 1 // 向 sem 发送数据,阻塞或者成功。 for x := 0; x < 3; x++ { fmt.Println(id, x) } <-sem // 接收数据,使得其他阻塞 goroutine 可以发送数据。 }(i) } wg.Wait() } ~~~ 输出结果: ~~~ // 每次运行输出结果都不同 0 0 0 1 0 2 2 0 2 1 2 2 1 0 1 1 1 2 ~~~ 用 closed channel 发出退出通知。 ~~~ package main import ( "sync" "time" ) func main() { var wg sync.WaitGroup quit := make(chan bool) for i := 0; i < 2; i++ { wg.Add(1) go func(id int) { defer wg.Done() task := func() { println(id, time.Now().Nanosecond()) time.Sleep(time.Second) } for { select { case <-quit: // closed channel 不会阻塞,因此可用作退出通知。 return default: // 执行正常任务。 task() } } }(i) } time.Sleep(time.Second * 5) // 让测试 goroutine 运行一会。 close(quit) // 发出退出通知。 wg.Wait() } ~~~ 用 select 实现超时 (timeout)。 ~~~ package main import ( "fmt" "time" ) func main() { w := make(chan bool) c := make(chan int, 2) go func() { select { case v := <-c: fmt.Println(v) case <-time.After(time.Second * 3): fmt.Println("timeout.") } w <- true }() // c <- 1 // 注释掉,引发 timeout。 <-w } ~~~