在诸如守护进程这样的长期进程中,拥有一组长生命周期的goroutines非常普遍。这些goroutines通常被阻塞,等待被某种方式唤醒以继续工作。有时候,这些例程依赖于你没有很好控制的资源。也许一个goroutine会接收到Web服务中希望获取数据的请求,或者它正在监视一个临时文件。 如果程序处理不够健壮,goroutine会很容易陷入一个糟糕的状态。在长期运行的过程中,如果能创建一种机制来确保goroutine的健康状况良好,并在健康状况不佳时重新启动,那么我们的项目想必能活得久一点。 我们将在本节讨论对goroutines异常行为进行修复的话题。
我们将使用心跳来检查正在监测的goroutine的活跃程度。心跳的类型将取决于你想要监控的内容,但是如果你的goroutine可能会产生活锁,请确保心跳包含某种信息,以表明该goroutine不仅没死掉,而且还可以正常执行任务。在本节中,为了简单起见,我们只会考虑goroutines是活的还是死的。
下面这段代码建立一个管理者监视一个goroutine的健康状况,以及它的子例程。如果例程变得不健康,管理者将重新启动子例程。为此,它需要引用一个可以启动goroutine的函数。让我们看看管理程序是什么样子的:
```
type startGoroutineFn func(done <-chan interface{},
pulseInterval time.Duration) (heartbeat <-chan interface{}) //1
newSteward := func(timeout time.Duration, startGoroutine startGoroutineFn) startGoroutineFn { //2
return func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} {
heartbeat := make(chan interface{})
go func() {
defer close(heartbeat)
var wardDone chan interface{}
var wardHeartbeat <-chan interface{}
startWard := func() { //3
wardDone = make(chan interface{}) //4
wardHeartbeat = startGoroutine(or(wardDone, done), timeout/2) //5
}
startWard()
pulse := time.Tick(pulseInterval)
monitorLoop:
for { //6
timeoutSignal := time.After(timeout)
for {
select {
case <-pulse:
select {
case heartbeat <- struct{}{}:
default:
}
case <-wardHeartbeat: //7
continue monitorLoop
case <-timeoutSignal: //8
log.Println("steward: ward unhealthy; restarting")
close(wardDone)
startWard()
continue monitorLoop
case <-done:
return
}
}
}
}()
return heartbeat
}
}
```
1. 这里我们定义一个可以监控和重新启动的goroutine的函数签名。 我们看到熟悉的done通道,以及熟悉的心跳模式写法。
2. 在这里我们设置了超时时间,并使用函数startGoroutine来启动它正在监控的goroutine。有趣的是,监控器本身返回一个startGoroutineFn,表示监控器自身也是可监控的。
3. 在这里我们定义一个闭包,它以同样的的方式来启动我们正在监视的goroutine。
4. 这是我们创建一个新通道,我们会将其传递给监控通道,以响应发出的停止信号。
5. 在这里,我们开启对目标goroutine的监控。如果监控器停止工作,或者监控器想要停止被监控区域,我们希望监控者也停止,因此我们将两个done通道都包含在逻辑中。我们传入的心跳间隔是超时时间的一半,但正如我们在“心跳”中讨论的那样,这可以调整。
6. 这是我们的内部循环,它确保监控者可以发出自己的心跳。
7. 在这里我们如果接收到监控者的心跳,就会知道它还处于正常工作状态,程序会继续监测循环。
8. 这里如果我们发现监控者超时,我们要求监控者停下来,并开始一个新的goroutine。然后开始新的监测。
我们的for循环有点杂乱,但如果你阅读过前面的章节,熟悉其中的模式,那么理解起来会相对简单。 接下来让我们试试看如果监控一个行为异常的goroutine,会发生什么:
```
log.SetOutput(os.Stdout)
log.SetFlags(log.Ltime | log.LUTC)
doWork := func(done <-chan interface{}, _ time.Duration) <-chan interface{} {
log.Println("ward: Hello, I'm irresponsible!")
go func() {
<-done // 1
log.Println("ward: I am halting.")
}()
return nil
}
doWorkWithSteward := newSteward(4*time.Second, doWork) // 2
done := make(chan interface{})
time.AfterFunc(9*time.Second, func() { // 3
log.Println("main: halting steward and ward.")
close(done)
})
for range doWorkWithSteward(done, 4*time.Second) { // 4
}
log.Println("Done")
```
1. 可以看到这个goroutine什么都没干,持续阻塞等待被取消,它同样不会发出任何表明自己正常信号。
2. 这里开始建立被监控的例程,其4秒后会超时。
3. 这里我们9秒后向done通道发出信号停止整个程序。
4. 最后,我们启动监控器并在其心跳范围内防止示例停止。
这会输出:
```
18:28:07 ward: Hello, I'm irresponsible!
18:28:11 steward: ward unhealthy; restarting 18:28:11 ward: Hello, I'm irresponsible!
18:28:11 ward: I am halting.
18:28:15 steward: ward unhealthy; restarting 18:28:15 ward: Hello, I'm irresponsible!
18:28:15 ward: I am halting.
18:28:16 main: halting steward and ward.
18:28:16 ward: I am halting.
18:28:16 Done
```
看起来工作正常。我们的监控器比较简单,除了取消操作和心跳所需信息之外不接收也不返回任何参数。我们可以用闭包强化一下:
```
doWorkFn := func(done <-chan interface{}, intList ...int) (startGoroutineFn, <-chan interface{}) {//1
intChanStream := make(chan (<-chan interface{}))//2
intStream := bridge(done, intChanStream)
doWork := func(done <-chan interface{}, pulseInterval time.Duration) <-chan interface{} {//3
intStream := make(chan interface{})//4
heartbeat := make(chan interface{})
go func() {
defer close(intStream)
select {
case intChanStream <- intStream://5
case <-done:
return
}
pulse := time.Tick(pulseInterval)
for {
valueLoop:
for _, intVal := range intList {
if intVal < 0 {
log.Printf("negative value: %v\n", intVal)//6
return
}
for {
select {
case <-pulse:
select {
case heartbeat <- struct{}{}: default:
}
case intStream <- intVal:
continue valueLoop
case <-done:
return
}
}
}
}
}()
return heartbeat
}
return doWork, intStream
}
```
1. 我们将监控器关闭的内容放入返回值,并返回所有监控器用来交流数据的通道。
2. 我们建立通道的通道,这是我们在前面章节中"bridge"模式的应用。
3. 这里我们建立闭包控制监控器的启动和关闭。
4. 这是各通道与监控器交互数据的实例。
5. 这里我们向起数据交互作用的通道传入数据。
6. 这里我们返回负数并从goroutine返回以模拟不正常的工作状态。
由于我们可能会启动监控器的多个副本,因此我们使用"bridge"模式来帮助向doWorkFn的调用者呈现单个不间断的通道。通过这样的方式,我们的监控器可以简单地通过组成模式而变得任意复杂。让我们看看如何调用:
```
log.SetFlags(log.Ltime | log.LUTC)
log.SetOutput(os.Stdout)
done := make(chan interface{})
defer close(done)
doWork, intStream := doWorkFn(done, 1, 2, -1, 3, 4, 5) //1
doWorkWithSteward := newSteward(1*time.Millisecond, doWork) //2
doWorkWithSteward(done, 1*time.Hour) //3
for intVal := range take(done, intStream, 6) { //4
fmt.Printf("Received: %v\n", intVal)
}
```
1. 这里我们调用该函数,它会将传入的不定长整数参数转换为可通信的流。
2. 在这里,我们创建了一个检查doWork关闭的监视器。我们预计这里会极快的进入失败流程,所以将监控时间设置为一毫秒。
3. 我们通知 steward 开启监测。
4. 最后,我们使用该管道,并从intStream中取出前六个值。
这会输出:
```
Received: 1
23:25:33 negative value: -1
Received: 2
23:25:33 steward: ward unhealthy; restarting Received: 1
23:25:33 negative value: -1
Received: 2
23:25:33 steward: ward unhealthy; restarting Received: 1
23:25:33 negative value: -1
Received: 2
```
我们可以看到监控器发现错误并重启。你可能还会注意到我们只接收到了1和2,这证明了重启功能正常。如果你的系统对重复值很敏感,一定要考虑对其进行处理。你也可以考虑在一定次数的失败后退出。比如在这样的位置:
```
valueLoop:
for _, intVal := range intList {
// ...
}
```
稍作修改:
```
valueLoop:
for {
intVal := intList[0]
intList = intList[1:]
// ...
}
```
尽管我们依然停留在返回的无效负数上,尽管我们的监控器将继续失败,但这会记录在重新启动前的位置,你可以在这个思路上扩展。
使用这样的方式可以确保你的系统保持健康,此外,相信系统崩溃的减少也能大幅度降低开发过程中猝死的几率。
愿诸君健康工作,准点下班。
* * * * *
学识浅薄,错误在所难免。我是长风,欢迎来Golang中国的群(211938256)就本书提出修改意见。
- 前序
- 谁适合读这本书
- 章节导读
- 在线资源
- 第一章 并发编程介绍
- 摩尔定律,可伸缩网络和我们所处的困境
- 为什么并发编程如此困难
- 数据竞争
- 原子性
- 内存访问同步
- 死锁,活锁和锁的饥饿问题
- 死锁
- 活锁
- 饥饿
- 并发安全性
- 优雅的面对复杂性
- 第二章 代码建模:序列化交互处理
- 并发与并行
- 什么是CSP
- CSP在Go中的衍生物
- Go的并发哲学
- 第三章 Go的并发构建模块
- Goroutines
- sync包
- WaitGroup
- Mutex和RWMutex
- Cond
- Once
- Pool
- Channels
- select语句
- GOMAXPROCS
- 结论
- 第四章 Go的并发编程范式
- 访问范围约束
- fo-select循环
- 防止Goroutine泄漏
- or-channel
- 错误处理
- 管道
- 构建管道的最佳实践
- 便利的生成器
- 扇入扇出
- or-done-channel
- tee-channel
- bridge-channel
- 队列
- context包
- 小结
- 第五章 可伸缩并发设计
- 错误传递
- 超时和取消
- 心跳
- 请求并发复制处理
- 速率限制
- Goroutines异常行为修复
- 本章小结
- 第六章 Goroutines和Go运行时
- 任务调度