心跳是并发进程向外界发出信号的一种方式。命名者从人体解剖学中受到启发,使用心跳一词表示被观察者的生命体征。心跳在Go语言出现前就已被广泛使用。
在并发中使用心跳是有原因的。心跳能够让我们更加深入的了解系统,并且在系统存在不确定性的时候对其测试。
我们将在本节中讨论两种不同类型的心跳:
* 以固定时间间隔产生的心跳。
* 在工作单元开始时产生的心跳。
固定时间间隔产生的心跳对于并发来说很有用,它可能在等待处理某个工作单元执行某个任务时发生。由于你不知道这项工作什么时候会进行,所以你的goroutine可能会持续等待。心跳是一种向监听者发出信号的方式,即一切都很好,当前静默是正常的。
以下代码演示了会产生心跳的goroutine:
```
doWork := func(done <-chan interface{}, pulseInterval time.Duration) (<-chan interface{}, <-chan time.Time) {
heartbeat := make(chan interface{}) //1
results := make(chan time.Time)
go func() {
defer close(heartbeat)
defer close(results)
pulse := time.Tick(pulseInterval) //2
workGen := time.Tick(2 * pulseInterval) //3
sendPulse := func() {
select {
case heartbeat <- struct{}{}:
default: //4
}
}
sendResult := func(r time.Time) {
for {
select {
case <-done:
return
case <-pulse: //5
sendPulse()
case results <- r:
return
}
}
}
for {
select {
case <-done:
return
case <-pulse: //5
sendPulse()
case r := <-workGen:
sendResult(r)
}
}
}()
return heartbeat, results
}
```
1. 在这里,我们设置了一个发送心跳信号的通道。doWork会返回该通道。
2. 我们按传入的pulseInterval值定时发送心跳,每次心跳都意味着可以从该通道上读取到内容。
3. 这只是用来模拟进入的工作的另一处代码。我们选择一个比pulseInterval更长的持续时间,以便我们可以看到来自goroutine的心跳。
4. 请注意,我们包含一个default子句。我们必须考虑如果没有人接受到心跳的情况。从goroutine发出的结果是至关重要的,但心跳不是。
5. 就像done通道,无论何时执行发送或接收,你都需要考虑心跳发送的情况。
请注意,由于我们可能在等待输入时发送多个pulse,或者在等待发送结果时发送多个pulse,所有select语句都需要在for循环内。 目前看起来不错; 我们如何利用这个函数并消费它发出的事件? 让我们来看看:
```
done := make(chan interface{})
time.AfterFunc(10*time.Second, func() { close(done) }) //1
const timeout = 2 * time.Second //2
heartbeat, results := doWork(done, timeout/2) //3
for {
select {
case _, ok := <-heartbeat: //4
if ok == false {
return
}
fmt.Println("pulse")
case r, ok := <-results: //5
if ok == false {
return
}
fmt.Printf("results %v\n", r.Second())
case <-time.After(timeout): //6
return
}
}
```
1. 我们设置done通道并在10秒后关闭它。
2. 我们在这里设定超时时间 我们将用它将心跳间隔与超时时间相耦合。
3. 我们向dowork传入超时时间的一半。
4. 我们将hearbeat的读取放入select语句中。每间隔 timeout/2 获取一次来自心跳通道的消息。如果我们没有收到消息,那就说明该goroutine存在问题。
5. 我们从result通道获取数据,没有什么特别的。
6. 如果我们没有收到心跳或result,程序就会超时结束。
这会输出:
```
pulse
pulse
results 52
pulse
pulse
results 54
pulse
pulse
results 56
pulse
pulse
results 58
pulse
```
和预期的一样,每次从result中接收到信息,都会收到两次心跳。
我们可能会使用这样的功能来收集系统的统计参数,当你的goroutine没有像预期那样运行,那么基于固定时间的心跳信号的作用会非常明显。
考虑下一个例子。 我们将在两次迭代后停止goroutine来模拟循环中断,然后不关闭任何一个通道;
```
doWork := func(done <-chan interface{}, pulseInterval time.Duration) (<-chan interface{}, <-chan time.Time) {
heartbeat := make(chan interface{})
results := make(chan time.Time)
go func() {
pulse := time.Tick(pulseInterval)
workGen := time.Tick(2 * pulseInterval)
sendPulse := func() {
select {
case heartbeat <- struct{}{}:
default:
}
}
sendResult := func(r time.Time) {
for {
select {
case <-pulse:
sendPulse()
case results <- r:
return
}
}
}
for i := 0; i < 2; i++ { //1
select {
case <-done:
return
case <-pulse:
sendPulse()
case r := <-workGen:
sendResult(r)
}
}
}()
return heartbeat, results
}
done := make(chan interface{})
time.AfterFunc(10*time.Second, func() { close(done) })
const timeout = 2 * time.Second
heartbeat, results := doWork(done, timeout/2)
for {
select {
case _, ok := <-heartbeat:
if ok == false {
return
}
fmt.Println("pulse")
case r, ok := <-results:
if ok == false {
return
}
fmt.Printf("results %v\n", r)
case <-time.After(timeout):
fmt.Println("worker goroutine is not healthy!")
return
}
}
```
1. 这里我们简单模拟循环中断。前面的例子中,未收到通知会无限循环。这里我们只循环两次。
这会输出:
```
pulse
pulse
worker goroutine is not healthy!
```
效果很不错。在两秒钟之内,我们的系统意识到goroutine未能正确读取,并且打破了for-select循环。通过使用心跳,我们已经成功地避免了死锁,并且不必通过依赖较长的超时而保持稳定性。 我们将在“Goroutines异常行为修复”中进一步理解这个概念。
另外请注意,心跳会帮助处理相反的情况:它让我们知道长时间运行的goroutine依然存在,但花了一段时间才产生一个值并发送至通道。
接下来让我们看看另一个场景:在工作单元开始时产生的心跳。这对测试非常有用。下面是个例子:
```
doWork := func(done <-chan interface{}) (<-chan interface{}, <-chan int) {
heartbeatStream := make(chan interface{}, 1) //1
workStream := make(chan int)
go func() {
defer close(heartbeatStream)
defer close(workStream)
for i := 0; i < 10; i++ {
select { //2
case heartbeatStream <- struct{}{}:
default: //3
}
select {
case <-done:
return
case workStream <- rand.Intn(10):
}
}
}()
return heartbeatStream, workStream
}
done := make(chan interface{})
defer close(done)
heartbeat, results := doWork(done)
for {
select {
case _, ok := <-heartbeat:
if ok {
fmt.Println("pulse")
} else {
return
}
case r, ok := <-results:
if ok {
fmt.Printf("results %v\n", r)
} else {
return
}
}
}
```
1. 这里我们用一个缓冲区创建心跳通道。这确保即使没有人及时监听发送,也总会发送至少一个pulse。
2. 在这里,我们为心跳设置了一个单独的select块。我们不希望将它与发送结果一起包含在同一个select块中,因为如果接收器未准备好,它们将接收到一个pulse,而result的当前值将会丢失。我们也没有为done通道提供case语句,因为我们有一个default可以处理这种情况。
3. 我们再次处理如果没有人监听到心头。因为我们的心跳通道是用缓冲区创建的,如果有人在监听,但没有及时处理第一个心跳,仍会被通知。
这会输出:
```
pulse
results 1
pulse
results 7
pulse
results 7
pulse
results 9
pulse
results 1
pulse
results 8
pulse
results 5
pulse
results 0
pulse
results 6
pulse
results 0
```
如预期一致,每个结果都会有一个心跳。
至于测试的编写。考虑下面的代码:
```
func DoWork( done <-chan interface {}, nums ...int ) (<-chan interface{}, <-chan int) {
heartbeat := make(chan interface{}, 1)
intStream := make(chan int)
go func () {
defer close(heartbeat)
defer close(intStream)
time.Sleep(2*time.Second) // 1
for _, n := range nums {
select {
case heartbeat <- struct{}{}:
default:
}
select {
case <-done:
return
case intStream <- n:
}
}
}()
return heartbeat, intStream
}
```
1. 我们在goroutine开始工作之前模拟延迟。在实践中,延迟可以由各种各样的原因导致,例如CPU负载,磁盘争用,网络延迟和bug。
DoWork函数是一个相当简单的生成器,它将传入的数字转换为它返回通道上的数据流。我们来试试这个函数。下面提供了一个测试的反例:
```
func TestDoWork_GeneratesAllNumbers(t *testing.T) {
done := make(chan interface{})
defer close(done)
intSlice := []int{0, 1, 2, 3, 5}
_, results := DoWork(done, intSlice...)
for i, expected := range intSlice {
select {
case r := <-results:
if r != expected {
t.Errorf(
"index %v: expected %v, but received %v,", i,
expected, r,
)
}
case <-time.After(1 * time.Second): // 1
t.Fatal("test timed out")
}
}
}
```
1. 在这里,我们设置超时,以防止goroutine出现问题导致死锁。
运行结果为:
```
go test ./bad_concurrent_test.go
--- FAIL: TestDoWork_GeneratesAllNumbers (1.00s) bad_concurrent_test.go:46: test timed out
FAIL
FAIL command-line-arguments 1.002s
```
这个测试之所以不好,是因为它的不确定性。如果移除time.Sleep情况会变得更糟:这个测试会有时通过,有时失败。
我们之前提到过程中的外部因素可能会导致goroutine花费更长的时间才能完成第一次迭代。关键在于我们不能保证在超时之前第一次迭代会完成,所以我们开始考虑:这时候超时会有多大意义?我们可以增加超时时间,但这意味着测试时失败也需要很长时间,从而减慢我们的测试效率。
这种情况很可怕,项目组甚至会对测试的正确性及必要性产生怀疑。
幸运的是这种情况并非无解。这是一个正确的测试:
```
func TestDoWork_GeneratesAllNumbers(t *testing.T) {
done := make(chan interface{})
defer close(done)
intSlice := []int{0, 1, 2, 3, 5}
heartbeat, results := DoWork(done, intSlice...)
<-heartbeat //1
i := 0
for r := range results {
if expected := intSlice[i]; r != expected {
t.Errorf("index %v: expected %v, but received %v,", i, expected, r)
}
i++
}
}
```
1. 在这里,我们等待goroutine发出信号表示它正在开始处理迭代。 运行此测试会产生以下输出
```
ok command-line-arguments 2.002s
```
使用心跳我们可以安全地编写该测试,而不会超时。运行的唯一风险是我们的一次迭代花费了过多的时间。 如果这对我们很重要,我们可以利用更安全的、基于间隔的心跳。
以下是使用基于间隔的心跳的测试示例:
```
func DoWork(done <-chan interface{}, pulseInterval time.Duration, nums ...int) (<-chan interface{}, <-chan int) {
heartbeat := make(chan interface{}, 1)
intStream := make(chan int)
go func() {
defer close(heartbeat)
defer close(intStream)
time.Sleep(2 * time.Second)
pulse := time.Tick(pulseInterval)
numLoop: //2
for _, n := range nums {
for { //1
select {
case <-done:
return
case <-pulse:
select {
case heartbeat <- struct{}{}: default:
}
case intStream <- n:
continue numLoop //3
}
}
}
}()
return heartbeat, intStream
}
func TestDoWork_GeneratesAllNumbers(t *testing.T) {
done := make(chan interface{})
defer close(done)
intSlice := []int{0, 1, 2, 3, 5}
const timeout = 2 * time.Second
heartbeat, results := DoWork(done, timeout/2, intSlice...)
<-heartbeat //4
i := 0
for {
select {
case r, ok := <-results:
if ok == false {
return
} else if expected := intSlice[i]; r != expected {
t.Errorf(
"index %v: expected %v, but received %v,", i,
expected, r,
)
}
i++
case <-heartbeat: //5
case <-time.After(timeout):
t.Fatal("test timed out")
}
}
}
```
1. 我们需要两个循环:一个用来覆盖我们的数字列表,并且这个内部循环会运行直到intStream上的数字成功发送。
2. 我们在这里使用一个标签来使内部循环继续更简单一些。
3. 这里我们继续执行外部循环。
4. 我们仍然等待第一次心跳出现,表明我们已经进入了goroutine的循环。
5. 我们在这里获取心跳以实现超时。
运行此测试会输出:
```
ok command-line-arguments 3.002s
```
你可能已经注意到这个版本的逻辑有点混乱。如果你确信goroutine的循环在启动后不会停止执行,我建议只阻塞第一次心跳,然后进入循环语句。你可以编写单独的测试,专门来测试如未能关闭通道,循环迭代耗时过长以及其他与时间相关的情况。
在编写并发代码时,心跳不是绝对必要的,但本节将展示其的实用性。对于任何需要测试的长期运行的goroutines,我强烈推荐这种模式。
* * * * *
学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
- 前序
- 谁适合读这本书
- 章节导读
- 在线资源
- 第一章 并发编程介绍
- 摩尔定律,可伸缩网络和我们所处的困境
- 为什么并发编程如此困难
- 数据竞争
- 原子性
- 内存访问同步
- 死锁,活锁和锁的饥饿问题
- 死锁
- 活锁
- 饥饿
- 并发安全性
- 优雅的面对复杂性
- 第二章 代码建模:序列化交互处理
- 并发与并行
- 什么是CSP
- CSP在Go中的衍生物
- Go的并发哲学
- 第三章 Go的并发构建模块
- Goroutines
- sync包
- WaitGroup
- Mutex和RWMutex
- Cond
- Once
- Pool
- Channels
- select语句
- GOMAXPROCS
- 结论
- 第四章 Go的并发编程范式
- 访问范围约束
- fo-select循环
- 防止Goroutine泄漏
- or-channel
- 错误处理
- 管道
- 构建管道的最佳实践
- 便利的生成器
- 扇入扇出
- or-done-channel
- tee-channel
- bridge-channel
- 队列
- context包
- 小结
- 第五章 可伸缩并发设计
- 错误传递
- 超时和取消
- 心跳
- 请求并发复制处理
- 速率限制
- Goroutines异常行为修复
- 本章小结
- 第六章 Goroutines和Go运行时
- 任务调度