## 编写一个 WorkerPool 队列消费
> 队列消费是高并发系统中最常用的异步处理模型,通常我们是编写一个 Console 命令行程序在后台执行 Redis、RabbitMQ 等 MQ 的队列消费,并将处理结果落地到 mysql 等数据库中,由于这类需求的标准化比较容易,因此我们开发了 [workerpool](https://github.com/mix-go/workerpool) 库来处理这类需求,基本上大部分异步处理类需求都可使用
首先我们使用 `mix` 命令创建一个 Console 项目骨架:
~~~
mix new --name=hello
~~~
然后我们在 `commands` 新建 `commands/workerpool.go` 文件:
- 定义一个新的命令结构体 `WorkerPoolDaemonCommand`
- `WorkerPoolDaemonCommand.Main` 方法为默认执行的入口方法
- 代码中 `workerpool.NewDispatcher(jobQueue, 15, NewWorker)` 创建了一个调度器
- `NewWorker` 负责初始化执行任务的工作协程
- 任务数据会在 `worker.Do` 方法中触发,我们只需要将我们的业务逻辑写到该方法中即可
- 当程序接收到进程信号时,调度器能平滑控制所有的 Worker 在执行完队列里全部的任务后再退出调度,保证数据的完整性
~~~
package commands
import (
"context"
"fmt"
"github.com/mix-go/console-skeleton/globals"
"github.com/mix-go/console/catch"
"github.com/mix-go/workerpool"
"os"
"os/signal"
"strings"
"syscall"
"time"
)
type worker struct {
workerpool.WorkerTrait
}
func (t *worker) Do(data interface{}) {
defer func() {
if err := recover(); err != nil {
catch.Error(err)
}
}()
// 执行业务处理
// ...
// 将处理结果落地到数据库
// ...
}
func NewWorker() workerpool.Worker {
return &worker{}
}
type WorkerPoolDaemonCommand struct {
}
func (t *WorkerPoolDaemonCommand) Main() {
redis := globals.Redis()
jobQueue := make(chan interface{}, 50)
d := workerpool.NewDispatcher(jobQueue, 15, NewWorker)
ch := make(chan os.Signal)
signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-ch
d.Stop()
}()
go func() {
for {
res, err := redis.BRPop(context.Background(), 3*time.Second, "foo").Result()
if err != nil {
if strings.Contains(err.Error(), "redis: nil") {
continue
}
fmt.Println(fmt.Sprintf("Redis Error: %s", err))
d.Stop();
return
}
// brPop命令最后一个键才是值
jobQueue <- res[1]
}
}()
d.Run() // 阻塞代码,直到任务全部执行完成并且全部 Worker 停止
}
~~~
然后我们把上面的 `WorkerPoolDaemonCommand` 在 `manifest/commands` 中注册,新增一个命令配置:
- 新建 `manifest/commands/workerpool.go` 文件
~~~
package commands
import (
"github.com/mix-go/console"
"github.com/mix-go/console-skeleton/commands"
)
func init() {
Commands = append(Commands,
console.CommandDefinition{
Name: "wpd",
Usage: "\tWorker pool daemon demo",
Options: []console.OptionDefinition{
{
Names: []string{"d", "daemon"},
Usage: "Run in the background",
},
},
Command: &commands.WorkerPoolDaemonCommand{},
},
)
}
~~~
## 编译与测试
> 也可以在 Goland Run 里配置 Program arguments 直接编译执行,[Goland 使用] 章节有详细介绍
接下来我们编译上面的程序:
~~~
// linux & macOS
go build -o bin/go_build_main_go main.go
// win
go build -o bin/go_build_main_go.exe main.go
~~~
执行 `wpd` 命令:
~~~
$ cd bin
$ ./go_build_main_go wpd
~~~
当我们想在服务器后台执行时,只需增加 `-d/--daemon` 参数:
~~~
$ ./go_build_main_go wpd -d
~~~