ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
## 编写一个 WorkerPool 队列消费 > 队列消费是高并发系统中最常用的异步处理模型,通常我们是编写一个 Console 命令行程序在后台执行 Redis、RabbitMQ 等 MQ 的队列消费,并将处理结果落地到 mysql 等数据库中,由于这类需求的标准化比较容易,因此我们开发了 [workerpool](https://github.com/mix-go/workerpool) 库来处理这类需求,基本上大部分异步处理类需求都可使用 首先我们使用 `mix` 命令创建一个 Console 项目骨架: ~~~ mix new --name=hello ~~~ 然后我们在 `commands` 新建 `commands/workerpool.go` 文件: - 定义一个新的命令结构体 `WorkerPoolDaemonCommand` - `WorkerPoolDaemonCommand.Main` 方法为默认执行的入口方法 - 代码中 `workerpool.NewDispatcher(jobQueue, 15, NewWorker)` 创建了一个调度器 - `NewWorker` 负责初始化执行任务的工作协程 - 任务数据会在 `worker.Do` 方法中触发,我们只需要将我们的业务逻辑写到该方法中即可 - 当程序接收到进程信号时,调度器能平滑控制所有的 Worker 在执行完队列里全部的任务后再退出调度,保证数据的完整性 ~~~ package commands import ( "context" "fmt" "github.com/mix-go/console-skeleton/globals" "github.com/mix-go/console/catch" "github.com/mix-go/workerpool" "os" "os/signal" "strings" "syscall" "time" ) type worker struct { workerpool.WorkerTrait } func (t *worker) Do(data interface{}) { defer func() { if err := recover(); err != nil { catch.Error(err) } }() // 执行业务处理 // ... // 将处理结果落地到数据库 // ... } func NewWorker() workerpool.Worker { return &worker{} } type WorkerPoolDaemonCommand struct { } func (t *WorkerPoolDaemonCommand) Main() { redis := globals.Redis() jobQueue := make(chan interface{}, 50) d := workerpool.NewDispatcher(jobQueue, 15, NewWorker) ch := make(chan os.Signal) signal.Notify(ch, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) go func() { <-ch d.Stop() }() go func() { for { res, err := redis.BRPop(context.Background(), 3*time.Second, "foo").Result() if err != nil { if strings.Contains(err.Error(), "redis: nil") { continue } fmt.Println(fmt.Sprintf("Redis Error: %s", err)) d.Stop(); return } // brPop命令最后一个键才是值 jobQueue <- res[1] } }() d.Run() // 阻塞代码,直到任务全部执行完成并且全部 Worker 停止 } ~~~ 然后我们把上面的 `WorkerPoolDaemonCommand` 在 `manifest/commands` 中注册,新增一个命令配置: - 新建 `manifest/commands/workerpool.go` 文件 ~~~ package commands import ( "github.com/mix-go/console" "github.com/mix-go/console-skeleton/commands" ) func init() { Commands = append(Commands, console.CommandDefinition{ Name: "wpd", Usage: "\tWorker pool daemon demo", Options: []console.OptionDefinition{ { Names: []string{"d", "daemon"}, Usage: "Run in the background", }, }, Command: &commands.WorkerPoolDaemonCommand{}, }, ) } ~~~ ## 编译与测试 > 也可以在 Goland Run 里配置 Program arguments 直接编译执行,[Goland 使用] 章节有详细介绍 接下来我们编译上面的程序: ~~~ // linux & macOS go build -o bin/go_build_main_go main.go // win go build -o bin/go_build_main_go.exe main.go ~~~ 执行 `wpd` 命令: ~~~ $ cd bin $ ./go_build_main_go wpd ~~~ 当我们想在服务器后台执行时,只需增加 `-d/--daemon` 参数: ~~~ $ ./go_build_main_go wpd -d ~~~