企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
#### 8.2 创建及启动Worker工作池 现在添加Worker工作池,先定义一些启动工作池的接口 > zinx/ziface/imsghandler.go ```go /* 消息管理抽象层 */ type IMsgHandle interface{ DoMsgHandler(request IRequest) //马上以非阻塞方式处理消息 AddRouter(msgId uint32, router IRouter) //为消息添加具体的处理逻辑 StartWorkerPool() //启动worker工作池 SendMsgToTaskQueue(request IRequest) //将消息交给TaskQueue,由worker进行处理 } ``` > zinx/znet/msghandler.go ```go //启动一个Worker工作流程 func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) { fmt.Println("Worker ID = ", workerID, " is started.") //不断的等待队列中的消息 for { select { //有消息则取出队列的Request,并执行绑定的业务方法 case request := <-taskQueue: mh.DoMsgHandler(request) } } } //启动worker工作池 func (mh *MsgHandle) StartWorkerPool() { //遍历需要启动worker的数量,依此启动 for i:= 0; i < int(mh.WorkerPoolSize); i++ { //一个worker被启动 //给当前worker对应的任务队列开辟空间 mh.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen) //启动当前Worker,阻塞的等待对应的任务队列是否有消息传递进来 go mh.StartOneWorker(i, mh.TaskQueue[i]) } } ``` `StartWorkerPool()`方法是启动Worker工作池,这里根据用户配置好的`WorkerPoolSize`的数量来启动,然后分别给每个Worker分配一个`TaskQueue`,然后用一个goroutine来承载一个Worker的工作业务。 `StartOneWorker()`方法就是一个Worker的工作业务,每个worker是不会退出的\(目前没有设定worker的停止工作机制\),会永久的从对应的TaskQueue中等待消息,并处理。