在诸如守护进程这样的长期进程中,拥有一组长生命周期的goroutines非常普遍。这些goroutines通常被阻塞,等待被某种方式唤醒以继续工作。有时候,这些例程依赖于你没有很好控制的资源。也许一个goroutine会接收到Web服务中希望获取数据的请求,或者它正在监视一个临时文件。 如果程序处理不够健壮,goroutine会很容易陷入一个糟糕的状态。在长期运行的过程中,如果能创建一种机制来确保goroutine的健康状况良好,并在健康状况不佳时重新启动,那么我们的项目想必能活得久一点。 我们将在本节讨论对goroutines异常行为进行修复的话题。 我们将使用心跳来检查正在监测的goroutine的活跃程度。心跳的类型将取决于你想要监控的内容,但是如果你的goroutine可能会产生活锁,请确保心跳包含某种信息,以表明该goroutine不仅没死掉,而且还可以正常执行任务。在本节中,为了简单起见,我们只会考虑goroutines是活的还是死的。 下面这段代码建立一个管理者监视一个goroutine的健康状况,以及它的子例程。如果例程变得不健康,管理者将重新启动子例程。为此,它需要引用一个可以启动goroutine的函数。让我们看看管理程序是什么样子的: ``` type startGoroutineFn func(done <-chan interface{}, pulseInterval time.Duration) (heartbeat <-chan interface{}) //1 newSteward := func(timeout time.Duration, startGoroutine startGoroutineFn) startGoroutineFn { //2 return func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} { heartbeat := make(chan interface{}) go func() { defer close(heartbeat) var wardDone chan interface{} var wardHeartbeat <-chan interface{} startWard := func() { //3 wardDone = make(chan interface{}) //4 wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2) //5 } startWard() pulse := time.Tick(pulseInterval) monitorLoop: for { //6 timeoutSignal := time.After(timeout) for { select { case <-pulse: select { case heartbeat <- struct{}{}: default: } case <-wardHeartbeat: //7 continue monitorLoop case <-timeoutSignal: //8 log.Println("steward: ward unhealthy; restarting") close(wardDone) startWard() continue monitorLoop case <-done: return } } } }() return heartbeat } } ``` 1. 这里我们定义一个可以监控和重新启动的goroutine的函数签名。 我们看到熟悉的done通道,以及熟悉的心跳模式写法。 2. 在这里我们设置了超时时间,并使用函数startGoroutine来启动它正在监控的goroutine。有趣的是,监控器本身返回一个startGoroutineFn,表示监控器自身也是可监控的。 3. 在这里我们定义一个闭包,它以同样的的方式来启动我们正在监视的goroutine。 4. 这是我们创建一个新通道,我们会将其传递给监控通道,以响应发出的停止信号。 5. 在这里,我们开启对目标goroutine的监控。如果监控器停止工作,或者监控器想要停止被监控区域,我们希望监控者也停止,因此我们将两个done通道都包含在逻辑中。我们传入的心跳间隔是超时时间的一半,但正如我们在“心跳”中讨论的那样,这可以调整。 6. 这是我们的内部循环,它确保监控者可以发出自己的心跳。 7. 在这里我们如果接收到监控者的心跳,就会知道它还处于正常工作状态,程序会继续监测循环。 8. 这里如果我们发现监控者超时,我们要求监控者停下来,并开始一个新的goroutine。然后开始新的监测。 我们的for循环有点杂乱,但如果你阅读过前面的章节,熟悉其中的模式,那么理解起来会相对简单。 接下来让我们试试看如果监控一个行为异常的goroutine,会发生什么: ``` log.SetOutput(os.Stdout) log.SetFlags(log.Ltime | log.LUTC) doWork := func(done <-chan interface{}, _ time.Duration) <-chan interface{} { log.Println("ward: Hello, I'm irresponsible!") go func() { <-done // 1 log.Println("ward: I am halting.") }() return nil } doWorkWithSteward := newSteward(4*time.Second, doWork) // 2 done := make(chan interface{}) time.AfterFunc(9*time.Second, func() { // 3 log.Println("main: halting steward and ward.") close(done) }) for range doWorkWithSteward(done, 4*time.Second) { // 4 } log.Println("Done") ``` 1. 可以看到这个goroutine什么都没干,持续阻塞等待被取消,它同样不会发出任何表明自己正常信号。 2. 这里开始建立被监控的例程,其4秒后会超时。 3. 这里我们9秒后向done通道发出信号停止整个程序。 4. 最后,我们启动监控器并在其心跳范围内防止示例停止。 这会输出: ``` 18:28:07 ward: Hello, I'm irresponsible! 18:28:11 steward: ward unhealthy; restarting 18:28:11 ward: Hello, I'm irresponsible! 18:28:11 ward: I am halting. 18:28:15 steward: ward unhealthy; restarting 18:28:15 ward: Hello, I'm irresponsible! 18:28:15 ward: I am halting. 18:28:16 main: halting steward and ward. 18:28:16 ward: I am halting. 18:28:16 Done ``` 看起来工作正常。我们的监控器比较简单,除了取消操作和心跳所需信息之外不接收也不返回任何参数。我们可以用闭包强化一下: ``` doWorkFn := func(done <-chan interface{}, intList ...int) (startGoroutineFn, <-chan interface{}) {//1 intChanStream := make(chan (<-chan interface{}))//2 intStream := bridge(done, intChanStream) doWork := func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} {//3 intStream := make(chan interface{})//4 heartbeat := make(chan interface{}) go func() { defer close(intStream) select { case intChanStream <- intStream://5 case <-done: return } pulse := time.Tick(pulseInterval) for { valueLoop: for _, intVal := range intList { if intVal < 0 { log.Printf("negative value: %v\n", intVal)//6 return } for { select { case <-pulse: select { case heartbeat <- struct{}{}: default: } case intStream <- intVal: continue valueLoop case <-done: return } } } } }() return heartbeat } return doWork, intStream } ``` 1. 我们将监控器关闭的内容放入返回值,并返回所有监控器用来交流数据的通道。 2. 我们建立通道的通道,这是我们在前面章节中"bridge"模式的应用。 3. 这里我们建立闭包控制监控器的启动和关闭。 4. 这是各通道与监控器交互数据的实例。 5. 这里我们向起数据交互作用的通道传入数据。 6. 这里我们返回负数并从goroutine返回以模拟不正常的工作状态。 由于我们可能会启动监控器的多个副本,因此我们使用"bridge"模式来帮助向doWorkFn的调用者呈现单个不间断的通道。通过这样的方式,我们的监控器可以简单地通过组成模式而变得任意复杂。让我们看看如何调用: ``` log.SetFlags(log.Ltime | log.LUTC) log.SetOutput(os.Stdout) done := make(chan interface{}) defer close(done) doWork, intStream := doWorkFn(done, 1, 2, -1, 3, 4, 5) //1 doWorkWithSteward := newSteward(1*time.Millisecond, doWork) //2 doWorkWithSteward(done, 1*time.Hour) //3 for intVal := range take(done, intStream, 6) { //4 fmt.Printf("Received: %v\n", intVal) } ``` 1. 这里我们调用该函数,它会将传入的不定长整数参数转换为可通信的流。 2. 在这里,我们创建了一个检查doWork关闭的监视器。我们预计这里会极快的进入失败流程,所以将监控时间设置为一毫秒。 3. 我们通知 steward 开启监测。 4. 最后,我们使用该管道,并从intStream中取出前六个值。 这会输出: ``` Received: 1 23:25:33 negative value: -1 Received: 2 23:25:33 steward: ward unhealthy; restarting Received: 1 23:25:33 negative value: -1 Received: 2 23:25:33 steward: ward unhealthy; restarting Received: 1 23:25:33 negative value: -1 Received: 2 ``` 我们可以看到监控器发现错误并重启。你可能还会注意到我们只接收到了1和2,这证明了重启功能正常。如果你的系统对重复值很敏感,一定要考虑对其进行处理。你也可以考虑在一定次数的失败后退出。比如在这样的位置: ``` valueLoop: for _, intVal := range intList { // ... } ``` 稍作修改: ``` valueLoop: for { intVal := intList[0] intList = intList[1:] // ... } ``` 尽管我们依然停留在返回的无效负数上,尽管我们的监控器将继续失败,但这会记录在重新启动前的位置,你可以在这个思路上扩展。 使用这样的方式可以确保你的系统保持健康,此外,相信系统崩溃的减少也能大幅度降低开发过程中猝死的几率。 愿诸君健康工作,准点下班。 * * * * * 学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。