💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
## 4.14\. Multiplexing 基于管道,我们可以很容易实现一个支持多路客户端的服务器程序。采用的技巧是将每个客户端私有的通信管道 作为消息的一部分发送给服务器,然后服务器通过这些管道和客户端独立通信。现实中的服务器实现都很复杂, 我们这里只给出一个服务器的简单实现来展现前面描述的技巧。首先定义一个"request"类型,里面包含一个 客户端的通信管道。 ``` 09 type request struct { 10 a, b int 11 replyc chan int 12 } ``` 服务器对客户端发送过来的两个整数进行运算。下面是具体的函数,函数在运算完之后将结构通过结构中的 管道返回给客户端。 ``` 14 type binOp func(a, b int) int 16 func run(op binOp, req *request) { 17 reply := op(req.a, req.b) 18 req.replyc <- reply 19 } ``` 第14行现定义一个"binOp"函数类型,用于对两个整数进行运算。 服务器routine线程是一个无限循环,它接受客户端请求。然后为每个客户端启动一个独立的routine线程, 用于处理客户数据(不会被某个客户端阻塞)。 ``` 21 func server(op binOp, service chan *request) { 22 for { 23 req := <-service 24 go run(op, req) // don't wait for it 25 } 26 } ``` 启动服务器的方法也是一个类似的routine线程,然后返回服务器的请求管道。 ``` 28 func startServer(op binOp) chan *request { 29 req := make(chan *request) 30 go server(op, req) 31 return req 32 } ``` 这里是一个简单的测试。首先启动服务器,处理函数为计算两个整数的和。接着向服务器发送"N"个请求(无阻塞)。 当所有请求都发送完了之后,再进行验证返回结果。 ``` 34 func main() { 35 adder := startServer(func(a, b int) int { return a + b }) 36 const N = 100 37 var reqs [N]request 38 for i := 0; i < N; i++ { 39 req := &reqs[i] 40 req.a = i 41 req.b = i + N 42 req.replyc = make(chan int) 43 adder <- req 44 } 45 for i := N-1; i >= 0; i-- { // doesn't matter what order 46 if <-reqs[i].replyc != N + 2*i { 47 fmt.Println("fail at", i) 48 } 49 } 50 fmt.Println("done") 51 } ``` 前面的服务器程序有个小问题:当main函数退出之后,服务器没有关闭,而且可能有一些客户端被阻塞在 管道通信中。为了处理这个问题,我们可给服务器增加一个控制管道,用于退出服务器。 ``` 32 func startServer(op binOp) (service chan *request, quit chan bool) { 33 service = make(chan *request) 34 quit = make(chan bool) 35 go server(op, service, quit) 36 return service, quit 37 } ``` 首先给"server"函数增加一个控制管道参数,然后这样使用: ``` 21 func server(op binOp, service chan *request, quit chan bool) { 22 for { 23 select { 24 case req := <-service: 25 go run(op, req) // don't wait for it 26 case <-quit: 27 return 28 } 29 } 30 } ``` 在服务器函数中,"select"操作服用于从多个通讯管道中选择一个就绪的管道。如果所有的管道都没有数据, 那么将等待知道有任意一个管道有数据。如果有多个管道就绪,则随即选择一个。服务器处理客户端请求,如果 有退出消息则退出。 最后是在main函数中保存"quit"管道,然后在退出的时候向服务线程发送停止命令。 ``` 40 adder, quit := startServer(func(a, b int) int { return a + b }) ... 55 quit <- true ``` 当然,Go语言及并行编程要讨论的问题很多。这个入门只是给出一些简单的例子。