我早些时候承诺会演示一些可能广泛使用的有趣的生成器。我们来看看一个名为repeat的生成器: ``` repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} { valueStream := make(chan interface{}) go func() { defer close(valueStream) for { for _, v := range values { select { case <-done: return case valueStream <- v: } } } }() return valueStream } ``` 这个函数会重复你传给它的值,直到你告诉它停止。 让我们来看看另一个函数take,它在与repeat结合使用时很有用: ``` take := func(done <-chan interface{}, valueStream <-chan interface{}, num int, ) <-chan interface{} { takeStream := make(chan interface{}) go func() { defer close(takeStream) for i := 0; i < num; i++ { select { case <-done: return case takeStream <- <-valueStream: } } }() return takeStream } ``` 这个函数会从其传入的valueStream中取出第一个元素然后退出。二者组合起来会怎么样呢? ``` done := make(chan interface{}) defer close(done) for num := range take(done, repeat(done, 1), 10) { fmt.Printf("%v ", num) } ``` 这会输出: ``` 1 1 1 1 1 1 1 1 1 1 ``` 在这个基本的例子中,我们创建了一个repeat生成器来生成无限数量的重复生成器,但是只取前10个。repeat生成器由take接收。虽然我们可以生成无线数量的流,但只会生成n+1个实例,其中n是我们传入take的数量。 我们可以扩展这一点。让我们创建另一个生成器,但是这次我们创建一个重复调用函数的生成器repeatFn: ``` repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} { valueStream := make(chan interface{}) go func() { defer close(valueStream) for { select { case <-done: return case valueStream <- fn(): } } }() return valueStream } ``` 我们用它来生成10个随机数: ``` done := make(chan interface{}) defer close(done) rand := func() interface{} { return rand.Int() } for num := range take(done, repeatFn(done, rand), 10) { fmt.Println(num) } ``` 这会输出: ``` 5577006791947779410 8674665223082153551 6129484611666145821 4037200794235010051 3916589616287113937 6334824724549167320 605394647632969758 1443635317331776148 894385949183117216 2775422040480279449 ``` 您可能想知道为什么所有这些发生器通道类型都是interface{}。 Go中的空接口有点争议,但我认为处理interface的通道方便使用标准的管道模式。 正如我们前面所讨论的,管道的强大来自可重用的阶段。当阶段以适合自身的特异性水平进行操作时,这是最好的。在repeat和repeatFn生成器中,我们需要关注的是通过在列表或运算符上循环来生成数据流。这些操作都不需要关于处理的类型,而只需要知道参数的类型。 当需要处理特定的类型时,可以放置一个执行类型断言的阶段。有一个额外的管道阶段和类型断言的性能开销可以忽略不计,正如我们稍后会看到的。 以下是一个介绍toString管道阶段的小例子: ``` toString := func(done <-chan interface{}, valueStream <-chan interface{}, ) <-chan string { stringStream := make(chan string) go func() { defer close(stringStream) for v := range valueStream { select { case <-done: return case stringStream <- v.(string): } } }() return stringStream } ``` 可以这样使用它: ``` done := make(chan interface{}) defer close(done) var message string for token := range toString(done, take(done, repeat(done, "I", "am."), 5)) { message += token } fmt.Printf("message: %s...", message) ``` 这会输出: ``` message: Iam.Iam.I... ``` 现在让我们证明刚才提到的性能问题。我们将编写两个基准测试函数:一个测试通用阶段,一个测试类型特定阶段: ``` func BenchmarkGeneric(b *testing.B) { done := make(chan interface{}) defer close(done) b.ResetTimer() for range toString(done, take(done, repeat(done, "a"), b.N)) { } } func BenchmarkTyped(b *testing.B) { repeat := func(done <-chan interface{}, values ...string) <-chan string { valueStream := make(chan string) go func() { defer close(valueStream) for { for _, v := range values { select { case <-done: return case valueStream <- v: } } } }() return valueStream } take := func(done <-chan interface{}, valueStream <-chan string, num int, ) <-chan string { takeStream := make(chan string) go func() { defer close(takeStream) for i := num; i > 0 || i == -1; { if i != -1 { i-- } select { case <-done: return case takeStream <- <-valueStream: } } }() return takeStream } done := make(chan interface{}) defer close(done) b.ResetTimer() for range take(done, repeat(done, "a"), b.N) { } } ``` 这会输出: | BenchmarkGeneric-4 | 1000000 | 2266 ns/op | | --- | --- | --- | | BenchmarkTyped-4 | 1000000 | 1181 ns/op | | PASS ok | command-line-arguments | 3.486s | 可以看到,特定类型的速度是接口类型的2倍。一般来说,管道上的限制因素将是生成器,或者是密集计算的某个阶段。如果生成器不像repeat和repeatFn生成器那样从内存中创建流,则可能会受I/O限制。从磁盘或网络读取数据可能会超出此处显示的性能开销。 那么,如果真是在计算上存在性能瓶颈,我们该怎么办?基于这种情况,让我们来讨论扇出扇入技术。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。