心跳是并发进程向外界发出信号的一种方式。命名者从人体解剖学中受到启发,使用心跳一词表示被观察者的生命体征。心跳在Go语言出现前就已被广泛使用。 在并发中使用心跳是有原因的。心跳能够让我们更加深入的了解系统,并且在系统存在不确定性的时候对其测试。 我们将在本节中讨论两种不同类型的心跳: * 以固定时间间隔产生的心跳。 * 在工作单元开始时产生的心跳。 固定时间间隔产生的心跳对于并发来说很有用,它可能在等待处理某个工作单元执行某个任务时发生。由于你不知道这项工作什么时候会进行,所以你的goroutine可能会持续等待。心跳是一种向监听者发出信号的方式,即一切都很好,当前静默是正常的。 以下代码演示了会产生心跳的goroutine: ``` doWork := func(done <-chan interface{}, pulseInterval time.Duration) (<-chan interface{}, <-chan time.Time) { heartbeat := make(chan interface{}) //1 results := make(chan time.Time) go func() { defer close(heartbeat) defer close(results) pulse := time.Tick(pulseInterval) //2 workGen := time.Tick(2 * pulseInterval) //3 sendPulse := func() { select { case heartbeat <- struct{}{}: default: //4 } } sendResult := func(r time.Time) { for { select { case <-done: return case <-pulse: //5 sendPulse() case results <- r: return } } } for { select { case <-done: return case <-pulse: //5 sendPulse() case r := <-workGen: sendResult(r) } } }() return heartbeat, results } ``` 1. 在这里,我们设置了一个发送心跳信号的通道。doWork会返回该通道。 2. 我们按传入的pulseInterval值定时发送心跳,每次心跳都意味着可以从该通道上读取到内容。 3. 这只是用来模拟进入的工作的另一处代码。我们选择一个比pulseInterval更长的持续时间,以便我们可以看到来自goroutine的心跳。 4. 请注意,我们包含一个default子句。我们必须考虑如果没有人接受到心跳的情况。从goroutine发出的结果是至关重要的,但心跳不是。 5. 就像done通道,无论何时执行发送或接收,你都需要考虑心跳发送的情况。 请注意,由于我们可能在等待输入时发送多个pulse,或者在等待发送结果时发送多个pulse,所有select语句都需要在for循环内。 目前看起来不错; 我们如何利用这个函数并消费它发出的事件? 让我们来看看: ``` done := make(chan interface{}) time.AfterFunc(10*time.Second, func() { close(done) }) //1 const timeout = 2 * time.Second //2 heartbeat, results := doWork(done, timeout/2) //3 for { select { case _, ok := <-heartbeat: //4 if ok == false { return } fmt.Println("pulse") case r, ok := <-results: //5 if ok == false { return } fmt.Printf("results %v\n", r.Second()) case <-time.After(timeout): //6 return } } ``` 1. 我们设置done通道并在10秒后关闭它。 2. 我们在这里设定超时时间 我们将用它将心跳间隔与超时时间相耦合。 3. 我们向dowork传入超时时间的一半。 4. 我们将hearbeat的读取放入select语句中。每间隔 timeout/2 获取一次来自心跳通道的消息。如果我们没有收到消息,那就说明该goroutine存在问题。 5. 我们从result通道获取数据,没有什么特别的。 6. 如果我们没有收到心跳或result,程序就会超时结束。 这会输出: ``` pulse pulse results 52 pulse pulse results 54 pulse pulse results 56 pulse pulse results 58 pulse ``` 和预期的一样,每次从result中接收到信息,都会收到两次心跳。 我们可能会使用这样的功能来收集系统的统计参数,当你的goroutine没有像预期那样运行,那么基于固定时间的心跳信号的作用会非常明显。 考虑下一个例子。 我们将在两次迭代后停止goroutine来模拟循环中断,然后不关闭任何一个通道; ``` doWork := func(done <-chan interface{}, pulseInterval time.Duration) (<-chan interface{}, <-chan time.Time) { heartbeat := make(chan interface{}) results := make(chan time.Time) go func() { pulse := time.Tick(pulseInterval) workGen := time.Tick(2 * pulseInterval) sendPulse := func() { select { case heartbeat <- struct{}{}: default: } } sendResult := func(r time.Time) { for { select { case <-pulse: sendPulse() case results <- r: return } } } for i := 0; i < 2; i++ { //1 select { case <-done: return case <-pulse: sendPulse() case r := <-workGen: sendResult(r) } } }() return heartbeat, results } done := make(chan interface{}) time.AfterFunc(10*time.Second, func() { close(done) }) const timeout = 2 * time.Second heartbeat, results := doWork(done, timeout/2) for { select { case _, ok := <-heartbeat: if ok == false { return } fmt.Println("pulse") case r, ok := <-results: if ok == false { return } fmt.Printf("results %v\n", r) case <-time.After(timeout): fmt.Println("worker goroutine is not healthy!") return } } ``` 1. 这里我们简单模拟循环中断。前面的例子中,未收到通知会无限循环。这里我们只循环两次。 这会输出: ``` pulse pulse worker goroutine is not healthy! ``` 效果很不错。在两秒钟之内,我们的系统意识到goroutine未能正确读取,并且打破了for-select循环。通过使用心跳,我们已经成功地避免了死锁,并且不必通过依赖较长的超时而保持稳定性。 我们将在“Goroutines异常行为修复”中进一步理解这个概念。 另外请注意,心跳会帮助处理相反的情况:它让我们知道长时间运行的goroutine依然存在,但花了一段时间才产生一个值并发送至通道。 接下来让我们看看另一个场景:在工作单元开始时产生的心跳。这对测试非常有用。下面是个例子: ``` doWork := func(done <-chan interface{}) (<-chan interface{}, <-chan int) { heartbeatStream := make(chan interface{}, 1) //1 workStream := make(chan int) go func() { defer close(heartbeatStream) defer close(workStream) for i := 0; i < 10; i++ { select { //2 case heartbeatStream <- struct{}{}: default: //3 } select { case <-done: return case workStream <- rand.Intn(10): } } }() return heartbeatStream, workStream } done := make(chan interface{}) defer close(done) heartbeat, results := doWork(done) for { select { case _, ok := <-heartbeat: if ok { fmt.Println("pulse") } else { return } case r, ok := <-results: if ok { fmt.Printf("results %v\n", r) } else { return } } } ``` 1. 这里我们用一个缓冲区创建心跳通道。这确保即使没有人及时监听发送,也总会发送至少一个pulse。 2. 在这里,我们为心跳设置了一个单独的select块。我们不希望将它与发送结果一起包含在同一个select块中,因为如果接收器未准备好,它们将接收到一个pulse,而result的当前值将会丢失。我们也没有为done通道提供case语句,因为我们有一个default可以处理这种情况。 3. 我们再次处理如果没有人监听到心头。因为我们的心跳通道是用缓冲区创建的,如果有人在监听,但没有及时处理第一个心跳,仍会被通知。 这会输出: ``` pulse results 1 pulse results 7 pulse results 7 pulse results 9 pulse results 1 pulse results 8 pulse results 5 pulse results 0 pulse results 6 pulse results 0 ``` 如预期一致,每个结果都会有一个心跳。 至于测试的编写。考虑下面的代码: ``` func DoWork( done <-chan interface {}, nums ...int ) (<-chan interface{}, <-chan int) { heartbeat := make(chan interface{}, 1) intStream := make(chan int) go func () { defer close(heartbeat) defer close(intStream) time.Sleep(2*time.Second) // 1 for _, n := range nums { select { case heartbeat <- struct{}{}: default: } select { case <-done: return case intStream <- n: } } }() return heartbeat, intStream } ``` 1. 我们在goroutine开始工作之前模拟延迟。在实践中,延迟可以由各种各样的原因导致,例如CPU负载,磁盘争用,网络延迟和bug。 DoWork函数是一个相当简单的生成器,它将传入的数字转换为它返回通道上的数据流。我们来试试这个函数。下面提供了一个测试的反例: ``` func TestDoWork_GeneratesAllNumbers(t *testing.T) { done := make(chan interface{}) defer close(done) intSlice := []int{0, 1, 2, 3, 5} _, results := DoWork(done, intSlice...) for i, expected := range intSlice { select { case r := <-results: if r != expected { t.Errorf( "index %v: expected %v, but received %v,", i, expected, r, ) } case <-time.After(1 * time.Second): // 1 t.Fatal("test timed out") } } } ``` 1. 在这里,我们设置超时,以防止goroutine出现问题导致死锁。 运行结果为: ``` go test ./bad_concurrent_test.go --- FAIL: TestDoWork_GeneratesAllNumbers (1.00s) bad_concurrent_test.go:46: test timed out FAIL FAIL command-line-arguments 1.002s ``` 这个测试之所以不好,是因为它的不确定性。如果移除time.Sleep情况会变得更糟:这个测试会有时通过,有时失败。 我们之前提到过程中的外部因素可能会导致goroutine花费更长的时间才能完成第一次迭代。关键在于我们不能保证在超时之前第一次迭代会完成,所以我们开始考虑:这时候超时会有多大意义?我们可以增加超时时间,但这意味着测试时失败也需要很长时间,从而减慢我们的测试效率。 这种情况很可怕,项目组甚至会对测试的正确性及必要性产生怀疑。 幸运的是这种情况并非无解。这是一个正确的测试: ``` func TestDoWork_GeneratesAllNumbers(t *testing.T) { done := make(chan interface{}) defer close(done) intSlice := []int{0, 1, 2, 3, 5} heartbeat, results := DoWork(done, intSlice...) <-heartbeat //1 i := 0 for r := range results { if expected := intSlice[i]; r != expected { t.Errorf("index %v: expected %v, but received %v,", i, expected, r) } i++ } } ``` 1. 在这里,我们等待goroutine发出信号表示它正在开始处理迭代。 运行此测试会产生以下输出 ``` ok command-line-arguments 2.002s ``` 使用心跳我们可以安全地编写该测试,而不会超时。运行的唯一风险是我们的一次迭代花费了过多的时间。 如果这对我们很重要,我们可以利用更安全的、基于间隔的心跳。 以下是使用基于间隔的心跳的测试示例: ``` func DoWork(done <-chan interface{}, pulseInterval time.Duration, nums ...int) (<-chan interface{}, <-chan int) { heartbeat := make(chan interface{}, 1) intStream := make(chan int) go func() { defer close(heartbeat) defer close(intStream) time.Sleep(2 * time.Second) pulse := time.Tick(pulseInterval) numLoop: //2 for _, n := range nums { for { //1 select { case <-done: return case <-pulse: select { case heartbeat <- struct{}{}: default: } case intStream <- n: continue numLoop //3 } } } }() return heartbeat, intStream } func TestDoWork_GeneratesAllNumbers(t *testing.T) { done := make(chan interface{}) defer close(done) intSlice := []int{0, 1, 2, 3, 5} const timeout = 2 * time.Second heartbeat, results := DoWork(done, timeout/2, intSlice...) <-heartbeat //4 i := 0 for { select { case r, ok := <-results: if ok == false { return } else if expected := intSlice[i]; r != expected { t.Errorf( "index %v: expected %v, but received %v,", i, expected, r, ) } i++ case <-heartbeat: //5 case <-time.After(timeout): t.Fatal("test timed out") } } } ``` 1. 我们需要两个循环:一个用来覆盖我们的数字列表,并且这个内部循环会运行直到intStream上的数字成功发送。 2. 我们在这里使用一个标签来使内部循环继续更简单一些。 3. 这里我们继续执行外部循环。 4. 我们仍然等待第一次心跳出现,表明我们已经进入了goroutine的循环。 5. 我们在这里获取心跳以实现超时。 运行此测试会输出: ``` ok command-line-arguments 3.002s ``` 你可能已经注意到这个版本的逻辑有点混乱。如果你确信goroutine的循环在启动后不会停止执行,我建议只阻塞第一次心跳,然后进入循环语句。你可以编写单独的测试,专门来测试如未能关闭通道,循环迭代耗时过长以及其他与时间相关的情况。 在编写并发代码时,心跳不是绝对必要的,但本节将展示其的实用性。对于任何需要测试的长期运行的goroutines,我强烈推荐这种模式。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。