我早些时候承诺会演示一些可能广泛使用的有趣的生成器。我们来看看一个名为repeat的生成器:
```
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
```
这个函数会重复你传给它的值,直到你告诉它停止。 让我们来看看另一个函数take,它在与repeat结合使用时很有用:
```
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int, ) <-chan interface{} {
takeStream := make(chan interface{})
go func() {
defer close(takeStream)
for i := 0; i < num; i++ {
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
```
这个函数会从其传入的valueStream中取出第一个元素然后退出。二者组合起来会怎么样呢?
```
done := make(chan interface{})
defer close(done)
for num := range take(done, repeat(done, 1), 10) {
fmt.Printf("%v ", num)
}
```
这会输出:
```
1 1 1 1 1 1 1 1 1 1
```
在这个基本的例子中,我们创建了一个repeat生成器来生成无限数量的重复生成器,但是只取前10个。repeat生成器由take接收。虽然我们可以生成无线数量的流,但只会生成n+1个实例,其中n是我们传入take的数量。
我们可以扩展这一点。让我们创建另一个生成器,但是这次我们创建一个重复调用函数的生成器repeatFn:
```
repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
defer close(valueStream)
for {
select {
case <-done:
return
case valueStream <- fn():
}
}
}()
return valueStream
}
```
我们用它来生成10个随机数:
```
done := make(chan interface{})
defer close(done)
rand := func() interface{} {
return rand.Int()
}
for num := range take(done, repeatFn(done, rand), 10) {
fmt.Println(num)
}
```
这会输出:
```
5577006791947779410
8674665223082153551
6129484611666145821
4037200794235010051
3916589616287113937
6334824724549167320
605394647632969758
1443635317331776148
894385949183117216
2775422040480279449
```
您可能想知道为什么所有这些发生器通道类型都是interface{}。
Go中的空接口有点争议,但我认为处理interface的通道方便使用标准的管道模式。 正如我们前面所讨论的,管道的强大来自可重用的阶段。当阶段以适合自身的特异性水平进行操作时,这是最好的。在repeat和repeatFn生成器中,我们需要关注的是通过在列表或运算符上循环来生成数据流。这些操作都不需要关于处理的类型,而只需要知道参数的类型。
当需要处理特定的类型时,可以放置一个执行类型断言的阶段。有一个额外的管道阶段和类型断言的性能开销可以忽略不计,正如我们稍后会看到的。 以下是一个介绍toString管道阶段的小例子:
```
toString := func(done <-chan interface{}, valueStream <-chan interface{}, ) <-chan string {
stringStream := make(chan string)
go func() {
defer close(stringStream)
for v := range valueStream {
select {
case <-done:
return
case stringStream <- v.(string):
}
}
}()
return stringStream
}
```
可以这样使用它:
```
done := make(chan interface{})
defer close(done)
var message string
for token := range toString(done, take(done, repeat(done, "I", "am."), 5)) {
message += token
}
fmt.Printf("message: %s...", message)
```
这会输出:
```
message: Iam.Iam.I...
```
现在让我们证明刚才提到的性能问题。我们将编写两个基准测试函数:一个测试通用阶段,一个测试类型特定阶段:
```
func BenchmarkGeneric(b *testing.B) {
done := make(chan interface{})
defer close(done)
b.ResetTimer()
for range toString(done, take(done, repeat(done, "a"), b.N)) {
}
}
func BenchmarkTyped(b *testing.B) {
repeat := func(done <-chan interface{}, values ...string) <-chan string {
valueStream := make(chan string)
go func() {
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
take := func(done <-chan interface{}, valueStream <-chan string, num int, ) <-chan string {
takeStream := make(chan string)
go func() {
defer close(takeStream)
for i := num; i > 0 || i == -1; {
if i != -1 {
i--
}
select {
case <-done:
return
case takeStream <- <-valueStream:
}
}
}()
return takeStream
}
done := make(chan interface{})
defer close(done)
b.ResetTimer()
for range take(done, repeat(done, "a"), b.N) {
}
}
```
这会输出:
| BenchmarkGeneric-4 | 1000000 | 2266 ns/op |
| --- | --- | --- |
| BenchmarkTyped-4 | 1000000 | 1181 ns/op |
| PASS ok | command-line-arguments | 3.486s |
可以看到,特定类型的速度是接口类型的2倍。一般来说,管道上的限制因素将是生成器,或者是密集计算的某个阶段。如果生成器不像repeat和repeatFn生成器那样从内存中创建流,则可能会受I/O限制。从磁盘或网络读取数据可能会超出此处显示的性能开销。
那么,如果真是在计算上存在性能瓶颈,我们该怎么办?基于这种情况,让我们来讨论扇出扇入技术。
* * * * *
学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
- 前序
- 谁适合读这本书
- 章节导读
- 在线资源
- 第一章 并发编程介绍
- 摩尔定律,可伸缩网络和我们所处的困境
- 为什么并发编程如此困难
- 数据竞争
- 原子性
- 内存访问同步
- 死锁,活锁和锁的饥饿问题
- 死锁
- 活锁
- 饥饿
- 并发安全性
- 优雅的面对复杂性
- 第二章 代码建模:序列化交互处理
- 并发与并行
- 什么是CSP
- CSP在Go中的衍生物
- Go的并发哲学
- 第三章 Go的并发构建模块
- Goroutines
- sync包
- WaitGroup
- Mutex和RWMutex
- Cond
- Once
- Pool
- Channels
- select语句
- GOMAXPROCS
- 结论
- 第四章 Go的并发编程范式
- 访问范围约束
- fo-select循环
- 防止Goroutine泄漏
- or-channel
- 错误处理
- 管道
- 构建管道的最佳实践
- 便利的生成器
- 扇入扇出
- or-done-channel
- tee-channel
- bridge-channel
- 队列
- context包
- 小结
- 第五章 可伸缩并发设计
- 错误传递
- 超时和取消
- 心跳
- 请求并发复制处理
- 速率限制
- Goroutines异常行为修复
- 本章小结
- 第六章 Goroutines和Go运行时
- 任务调度