我们在之前的章节列举了管道的各种优点,但有时候,尽管管道没有准备好,我们的程序依然还是要干活的,这种处理方式,被称为“队列”。
这意味着,一旦管道的某个阶段完成了工作,将其存储在内存中的临时位置,以便其他阶段可以稍后检索它,而你无需保持其引用。在“Channels”章节,我们讨论了带缓冲的通道,你可以把它视作队列的一种。
虽然在系统中引入队列功能非常有用,但它通常是优化程序时希望采用的最后一种技术之一。过早地添加队列会隐藏同步问题,例如死锁和活锁,并且,随着程序不断重构,你可能会发现需要更多或更少的队列。
那么使用队列有什么好处呢?队列通常用来尝试解决性能问题。队列几乎不会减少程序的总运行时间,它只会让程序的行为有所不同。
让我们看个简单的管道例子:
```
done := make(chan interface{})
defer close(done)
zeros := take(done, 3, repeat(done, 0))
short := sleep(done, 1*time.Second, zeros)
long := sleep(done, 4*time.Second, short)
pipeline := long
```
这个管道链共有4个阶段:
1. 间隔0s,不间断生成数据流。
2. 在接收到3条数据后取消前置操作。
3. 休眠1秒,短耗时阶段。
4. 休眠3秒,长耗时阶段。
我们假设阶段1和阶段2是即时的,那么需要关注的是休眠如何影响管道的运行时间。
![](https://box.kancloud.cn/1aac976f616e1dfcbac544ba9cfd0386_681x489.jpg)
你可以看到,这个管道耗时13秒。短耗时阶段花费了大约9秒。
如果我们给管道加入缓存会怎么样?让我们试试在长短耗时阶段之间添加个缓冲:
```
done := make(chan interface{})
defer close(done)
zeros := take(done, 3, repeat(done, 0))
short := sleep(done, 1*time.Second, zeros)
buffer := buffer(done, 2, short) // Buffers sends from short by 2
long := sleep(done, 4*time.Second, short)
pipeline := long
```
![](https://box.kancloud.cn/6483094be023672db2c5cfb0d332a23f_577x316.jpg)
![](https://box.kancloud.cn/d9e52fa2faee1e2aac81ce77f98788ec_572x167.jpg)
整个管道依然是13秒,但短耗时阶段时长降低到了3秒,看来加入缓存是有效的。但是如果整个管道仍然需要13秒来执行,这对我们有什么帮助?
我们来看看下面这个操作:
```
p := processRequest(done, acceptConnection(done, httpHandler))
```
这条管道会持续运行直到被取消,并且在取消之前会持续接受连接。在这期间,你肯定不希望处理连接的processRequest因acceptConnection接受连接而阻塞,你会希望processRequest是持续可用的,否则程序的用户可能会发现连接请求被拒绝。
因此,队列的价值并不是减少了某个阶段的运行时间,而是减少了它处于阻塞状态的时间。 这可以让程序继续工作。 在这个例子中,用户可能会在他们的请求中感受到延迟,但不会被拒绝服务。
通过这种方式,队列的真正用途是将操作流程分离,以便一个阶段的运行时间不会影响另一个阶段的运行时间。以这种方式解耦来改变整个系统的运行时行为,这取决于你的程序,产生的结果可能是好的也可能是不好的。
接下来我们回到关于队列的讨论。 队列应该放在哪里? 缓冲区大小应该是多少? 这些问题的答案取决于管道的性质。
我们首先分析队列提高系统整体性能适用于哪些情况:
* 如果某个阶段执行批处理能够节省时间。
* 如果推迟某个阶段产生结果可以在程序中循环执行。
适用于第一种情况的一个例子是,将输入缓冲到内存而非硬盘中。实际上bufio包就是这么干的。下面这个例子比较了使用缓冲与非缓冲进行写操作:
```
func BenchmarkUnbufferedWrite(b *testing.B) {
performWrite(b, tmpFileOrFatal())
}
func BenchmarkBufferedWrite(b *testing.B) {
bufferredFile := bufio.NewWriter(tmpFileOrFatal())
performWrite(b, bufio.NewWriter(bufferredFile))
}
func tmpFileOrFatal() *os.File {
file, err := ioutil.TempFile("", "tmp")
if err != nil {
log.Fatal("error: %v", err)
}
return file
}
func performWrite(b *testing.B, writer io.Writer) {
done := make(chan interface{})
defer close(done)
b.ResetTimer()
for bt := range take(done, repeat(done, byte(0)), b.N) {
writer.Write([]byte{bt.(byte)})
}
}
```
执行命令行:
```
go test -bench=. src/concurrency-patterns-in-go/queuing/buffering_test.go
```
这会输出:
![](https://box.kancloud.cn/78e05c83bd14c27940261aa48159788e_563x118.jpg)
如预期的那样,有缓冲的写入比无缓冲更快。这是因为在bufio.Writer中,写入操作在内部缓冲区中进入队列,直到已经积累了足够长的数据块,该块才被写出。这个过程通常称为分块。
分块速度更快,因为bytes.Buffer必须增加其分配的内存以容纳存储的字节数据。出于各种原因,内存扩张操作代价高昂; 因此,我们需要增长的时间越少,整个系统的整体效率就越高。 于是,队列提高了整个系统的性能。
这只是一个简单的内存分块示例,但是你可能会频繁地进行分块。通常,执行任何操作都存在开销,分块可能会提高系统性能。例如打开数据库事务,计算消息校验和以及分配内存连续空间。
除了分块之外,如果程序算法支持向后查找或排序优化,队列也可以起到帮助作用。
第二种情况,某个阶段的延迟执行导致更多的数据进入管道,但没有被发现,这更加致命,因为它可能导致上游系统的崩溃。
这个想法通常被称为负反馈循环,甚至是死亡螺旋。 这是因为管道与上游系统之间存在经常性关系;上游系统提交新请求的速度在某种程度上与管道的有效性有关。
如果管道的效率降低到某个临界阈值以下,则管道上游的系统开始增加其对管道的输入,这导致管道损失更多效率,并且死亡螺旋开始。 如果没有安全防护,该系统将无法恢复。
通过在管道入口处引入队列,你可以延迟请求来打破反馈循环。从调用者的角度来看,请求似乎正在处理中,但需要很长时间。只要调用者不超时,管道将保持稳定。如果调用方超时,则需要确保你在出列时支持安全检查。如果不这样做,可能会无意中通过处理无效请求创建了另一个负馈循环,从而降低管道的效率。
>如果你曾尝试过一些热门的新系统(例如,新游戏服务器,用于产品发布的网站等),并且尽管开发人员尽了最大的努力,但该网站一直处于不稳定状态,恭喜!你可能目睹了一个负反馈循环。
>开发团队总是在尝试不同的解决方案,直到有人意识到他们需要一个队列,并且匆忙地将其实现。
>然后客户开始抱怨排队时间。
从以上的例子中,我们可以看到一种模式慢慢浮出水面,队列应该满足以下情况:
* 在管道的入口处。
* 在某个阶段进行批处理会更高效。
您可能会试图在其他地方添加队列,例如,某个阶段会执行密集计算。要避免这种诱惑!正如我们所知道的那样,只有少数情况下,对立会减少管道的运行时间。为排除干扰而尝试队列会产生灾难性后果。
为了理解为什么,我们必须讨论管道的吞吐量。别担心,这并不困难,这也将帮助我们回答关于如何确定队列应该多大的问题。
在队列理论中,有一条定律(进行足够的抽样)可以预测管道的吞吐量。这就是所谓的"最小原则",你只需要知道几点就可以理解和利用它。
我们以代数的方式定义“最小原则”,它通常表示为:L = λW,其中
* L = 系统中的平均单位数。
* λ = 单位的平均到达率。
* W = 单位在系统中花费的平均时间。
这个等式仅适用于所谓的稳定系统。 在一条管道中,一个稳定的系统就是数据进入管道或入口的速率等于它退出系统或出口的速率。 如果进入速率超过出口速度,那么你的系统就不稳定,并且已经进入死亡螺旋。 如果入口速率小于出口速率,则系统仍然不稳定,因为你的资源没有被完全利用。这不是世界上最糟糕的情况,但是如果发现资源利用严重不足(例如,集群或数据中心),也许你会关心这一点。
假设我们的管道是稳定的。如果我们想要减少单位花费在系统中的平均时间n,只有一个选择:减少系统中平均单位数:L/n = λW / n。 如果提高出口率,我们只能减少系统中的平均单位数量。还要注意,如果我们将队列添加到阶段,我们增加L,这会增加单位的到达率(nL = nλ* W)或增加单位在系统中的平均时间(nL = λ* nW)。通过最小原则,我们可以证明,队列对于减少系统花费的时间帮助不大。
同时请注意,由于我们正在观察整个管道,因此将W减少n倍将分布在我们管道的所有阶段。在我们的案例中,最小原则应该是这样定义的:
```
L = λΣiWi
```
不分青红皂白的优化,可能导致你的管道完全被最慢的执行阶段影响。
这个原则可以帮助我们分析管道的各个阶段。假设我们的管道有三个阶段。
让我们尝试确定管道每秒可以处理多少个请求。假设我们在管道上启用了采样,发现1个请求(r)需要约1秒才能通过管道。让我们向公式放入这些数字:
```
3r = λr/s * 1s
3r/s = λr/s
λr/s = 3r/s
```
我们将L设置为3,因为我们管道中的每个阶段都在处理请求。然后我们将W设置为1秒,做一个小代数计算,瞧!在这个管道中,我们每秒可以处理三个请求。
假设采样表明请求需要1 ms来处理。 我们的队列需要处理每秒100000次请求的大小是多少?
```
Lr-3r = 100,000r/s * 0.0001s
Lr-3r = 10r
Lr = 7r
```
我们的管道有三个阶段,所以我们将L递减3。将λ设置为100000 r/s,我们发现如果想要处理很多请求,我们的队列应该有7个容量。请记住, 如果增加队列的大小,它需要更长的时间才能完成! 你实际上在延迟降低系统利用率。
但这个公式也存在缺陷,它无法观察对失败的处理。请记住,如果由于某种原因管道发生混乱,你将丢失队列中的所有请求。为了缓解这种情况,你可以将队列大小保持为零,也可以将其移至持久队列中,该队列是一个持续存在的队列,可以在需要时再读取。
队列在你的系统中可能很有用,但由于它的复杂性,它通常是我建议实现的最后优化手段之一。
* * * * *
学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
- 前序
- 谁适合读这本书
- 章节导读
- 在线资源
- 第一章 并发编程介绍
- 摩尔定律,可伸缩网络和我们所处的困境
- 为什么并发编程如此困难
- 数据竞争
- 原子性
- 内存访问同步
- 死锁,活锁和锁的饥饿问题
- 死锁
- 活锁
- 饥饿
- 并发安全性
- 优雅的面对复杂性
- 第二章 代码建模:序列化交互处理
- 并发与并行
- 什么是CSP
- CSP在Go中的衍生物
- Go的并发哲学
- 第三章 Go的并发构建模块
- Goroutines
- sync包
- WaitGroup
- Mutex和RWMutex
- Cond
- Once
- Pool
- Channels
- select语句
- GOMAXPROCS
- 结论
- 第四章 Go的并发编程范式
- 访问范围约束
- fo-select循环
- 防止Goroutine泄漏
- or-channel
- 错误处理
- 管道
- 构建管道的最佳实践
- 便利的生成器
- 扇入扇出
- or-done-channel
- tee-channel
- bridge-channel
- 队列
- context包
- 小结
- 第五章 可伸缩并发设计
- 错误传递
- 超时和取消
- 心跳
- 请求并发复制处理
- 速率限制
- Goroutines异常行为修复
- 本章小结
- 第六章 Goroutines和Go运行时
- 任务调度