企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# **管道** 管道是一个虚拟的方法用来连接 goroutines 和 通道,使一个 goroutine 的输出成为另一个的输入,使用通道传递数据。 使用管道的一个好处是程序中有不变的数据流,因此 goroutine 和 通道不必等所有都就绪才开始执行。另外,因为你不必把所有内容都保存为变量,就节省了变量和内存空间的使用。最后,管道简化了程序设计并提升了维护性。 我们使用 `pipeline.go` 代码来说明管道的使用。这个程序分六部分来介绍。`pipeling.go` 程序执行的任务是在给定范围内产生随机数,当任何数字随机出现第二次时就结束。但在终止前,程序将打印第一个随机数出现第二次之前的所有随机数之和。你需要三个函数来连接程序的通道。程序的逻辑在这三个函数中,但数据流在管道的通道内。 这个程序有两个通道。第一个(通道A)用于从第一个函数获取随机数并发送它们到第二个函数。第二个(通道B)被第二个函数用来发送可接受的随机数到第三个函数。第三个函数负责从通道 B 获取数据,并计算结果和展示。 `pipeline.go` 的第一段代码如下: ```go package main import ( "fmt" "math/rand" "os" "strconv" "time" ) var CLOSEA = false var DATA = make(map[int]bool) ``` 因为 `second()` 函数需要通过一种方式告诉 `first()` 函数关闭第一个通道,所以我使用一个全局变量 `CLOSEA` 来处理。`CLOSEA` 变量只在 `first()` 函数中检查,并只能在 `second()` 函数中修改。 `pipeline.go` 的第二段代码如下: ```go func random(min, max int) int { return rand.Intn(max-min) + min } func first(min, max int, out chan<- int) { for { if CLOSEA { close(out) return } out <- random(min, max) } } ``` 上面的代码展示了 `random` 和 `first` 函数的实现。你已经对 `random()` 函数比较熟悉了,它在一定范围内产生随机数。但真正有趣的是 `first()` 函数。它使用 `for` 循环持续运行,直到 `CLOSEA` 变为 `true`。那样的话,它就关闭 `out` 通道。 `pipeline.go` 的第三段代码如下: ```go func second(out chan<- int, in <-chan int) { for x := range in { fmt.Print(x, " ") _, ok := DATA[x] if ok { CLOSEA = true } else { DATA[x] = true out <- x } } fmt.Println() close(out) } ``` `second()` 函数从 `in` 通道接收数据,并发送该数据到 `out` 通道。但是,`second()` 函数一旦发现 `DATA` map 中已经存在了该随机数,它就把 `CLOSEA` 全局变量设为 `true` 并停止发送任何数据到 `out` 通道。然后,它就关闭 `out` 通道。 `pipeline.go` 的第四段代码如下: ```go func third(in <-chan int){ var sum int sum = 0 for x2 := range in { sum = sum + x2 } fmt.Println("The sum of the random numbers is %d\n", sum) } ``` `third` 函数持续从 `in` 通道读取数据。当通道被 `second()` 函数关闭时,`for` 循环停止获取数据,函数打印输出。从这很清楚的看到 `second()` 函数控制许多事情。 `pipeline.go` 的第五段代码如下: ```go func main() { if len(os.Args) != 3 { fmt.Println("Need two integer paramters!") os.Exit(1) } n1, _ := strconv.Atoi(os.Args[1]) n2, _ := strconv.Atoi(os.Args[2]) if n1 > n2 { fmt.Printf("%d should be smaller than %d\n", n1, n2) return } ``` `pipeline.go` 的最后一段如下: ```go rand.Seed(time.Now().UnixNano()) A := make(chan int) B := make(chan int) go first(n1, n2, A) go second(B, A) third(B) } ``` 这里,定义了需要的通道,并执行两个 `goroutines` 和一个函数。`third()` 函数阻止 `main` 返回,因为它不作为 `goroutine` 执行。 执行 `pipeline.go` 产生如下输出: ```shell $go run pipeline.go 1 10 2 2 The sum of the random numbers is 2 $go run pipeline.go 1 10 9 7 8 4 3 3 The sum of the random numbers is 31 $go run pipeline.go 1 10 1 6 9 7 1 The sum of the random numbers is 23 $go run pipeline.go 10 20 16 19 16 The sum of the random numbers is 35 $go run pipeline.go 10 20 10 16 17 11 15 10 The sum of the random numbers is 69 $go run pipeline.go 10 20 12 11 14 15 10 15 The sum of the random numbers is 62 ``` 这里重点是尽管 `first` 函数按自己的节奏持续产生随机数,并且 `second()` 函数把它们打印在屏幕上,不需要的随机数也就是已经出现的随机数,不会发送给 `third()` 函数,因此就不会包含在最终的总和中。