ThinkChat🤖让你学习和工作更高效,注册即送10W Token,即刻开启你的AI之旅 广告
协议 websocket是个二进制协议,需要先通过Http协议进行握手,从而协商完成从Http协议向websocket协议的转换。一旦握手结束,当前的TCP连接后续将采用二进制websocket协议进行双向双工交互,自此与Http协议无关。 可以通过这篇知乎了解一下websocket协议的基本原理:[《WebSocket 是什么原理?为什么可以实现持久连接?》](https://www.zhihu.com/question/20215561)。 粘包 我们开发过TCP服务的都知道,需要通过协议decode从TCP字节流中解析出一个一个请求,那么websocket又怎么样呢? websocket以message为单位进行通讯,本身就是一个在TCP层上的一个分包协议,其实并不需要我们再进行粘包处理。但是因为单个message可能很大很大(比如一个视频文件),那么websocket显然不适合把一个视频作为一个message传输(中途断了前功尽弃),所以websocket协议其实是支持1个message分多个frame帧传输的。 我们的浏览器提供的编程API都是message粒度的,把frame拆帧的细节对开发者隐蔽了,而服务端websocket框架一般也做了同样的隐藏,会自动帮我们收集所有的frame后拼成messasge再回调,所以结论就是: websocket以message为单位通讯,不需要开发者自己处理粘包问题。 golang实现 golang官方标准库里有一个websocket的包,但是它提供的就是frame粒度的API,压根不能用。 不过官方其实已经认可了一个准标准库实现,它实现了message粒度的API,让开发者不需要关心websocket协议细节,开发起来非常方便,其文档地址:https://godoc.org/github.com/gorilla/websocket。 开发websocket服务时,首先要基于http库对外暴露接口,然后由websocket库接管TCP连接进行协议升级,然后进行websocket协议的数据交换,所以开发时总是要用到http库和websocket库。 上述websocket文档中对开发websocket服务有明确的注意事项要求,主要是指: 读和写API不是并发安全的,需要启动单个goroutine串行处理。 关闭API是线程安全的,一旦调用则阻塞的读和写API会出错返回,从而终止处理。 在我的实现中,我对websocket进行了封装,简化应用层开发的复杂度,主要思路是: 请求和应答都放入管道中排队。 读协程阻塞读websocket,将message放入请求队列。 写协程阻塞读应答channel,将message写给websocket。 如何处理websocket错误和主动关闭websocket呢? 读/写协程调用websocket若返回错误,那么直接调用websocket的Close关闭连接,协程退出。(此时用户可能仍旧持有连接对象,继续向下阅读!) websocket连接关闭后,用户通常正阻塞在读/写channel上而不知情,所以每个连接配套一个closeChan专门用于唤醒用户代码,关闭websocket连接同时关闭closeChan,这会令<-closeChan总是立即返回。 因为上一条设计,所以用户读/写channel时总是select同时监听channel和closeChan,以便实时感知到websocket连接的关闭。 用户可以主动关闭连接,websocket连接重复Close没有影响,而closeChan重复关闭会报错,所以通过一个上锁的状态位判重处理。 描述比较繁琐,实际并不复杂,看看我的代码吧: https://github.com/owenliang/go-websocket。 server.go ~~~ package main import ( "errors" "fmt" "net/http" "sync" "time" "github.com/gorilla/websocket" ) // http升级websocket协议的配置 var wsUpgrader = websocket.Upgrader{ // 允许所有CORS跨域请求 CheckOrigin: func(r *http.Request) bool { return true }, } // 客户端读写消息 type wsMessage struct { messageType int data []byte } // 客户端连接 type wsConnection struct { wsSocket *websocket.Conn // 底层websocket inChan chan *wsMessage // 读队列 outChan chan *wsMessage // 写队列 mutex sync.Mutex // 避免重复关闭管道 isClosed bool closeChan chan byte // 关闭通知 } func (wsConn *wsConnection) wsReadLoop() { for { // 读一个message msgType, data, err := wsConn.wsSocket.ReadMessage() if err != nil { goto error } req := &wsMessage{ msgType, data, } // 放入请求队列 select { case wsConn.inChan <- req: case <-wsConn.closeChan: goto closed } } error: wsConn.wsClose() closed: } func (wsConn *wsConnection) wsWriteLoop() { for { select { // 取一个应答 case msg := <-wsConn.outChan: // 写给websocket if err := wsConn.wsSocket.WriteMessage(msg.messageType, msg.data); err != nil { goto error } case <-wsConn.closeChan: goto closed } } error: wsConn.wsClose() closed: } func (wsConn *wsConnection) procLoop() { // 启动一个gouroutine发送心跳 go func() { for { time.Sleep(2 * time.Second) if err := wsConn.wsWrite(websocket.TextMessage, []byte("heartbeat from server")); err != nil { fmt.Println("heartbeat fail") wsConn.wsClose() break } } }() // 这是一个同步处理模型(只是一个例子),如果希望并行处理可以每个请求一个gorutine,注意控制并发goroutine的数量!!! for { msg, err := wsConn.wsRead() if err != nil { fmt.Println("read fail") break } fmt.Println(string(msg.data)) err = wsConn.wsWrite(msg.messageType, msg.data) if err != nil { fmt.Println("write fail") break } } } func wsHandler(resp http.ResponseWriter, req *http.Request) { // 应答客户端告知升级连接为websocket wsSocket, err := wsUpgrader.Upgrade(resp, req, nil) if err != nil { return } wsConn := &wsConnection{ wsSocket: wsSocket, inChan: make(chan *wsMessage, 1000), outChan: make(chan *wsMessage, 1000), closeChan: make(chan byte), isClosed: false, } // 处理器 go wsConn.procLoop() // 读协程 go wsConn.wsReadLoop() // 写协程 go wsConn.wsWriteLoop() } func (wsConn *wsConnection) wsWrite(messageType int, data []byte) error { select { case wsConn.outChan <- &wsMessage{messageType, data}: case <-wsConn.closeChan: return errors.New("websocket closed") } return nil } func (wsConn *wsConnection) wsRead() (*wsMessage, error) { select { case msg := <-wsConn.inChan: return msg, nil case <-wsConn.closeChan: } return nil, errors.New("websocket closed") } func (wsConn *wsConnection) wsClose() { wsConn.wsSocket.Close() wsConn.mutex.Lock() defer wsConn.mutex.Unlock() if !wsConn.isClosed { wsConn.isClosed = true close(wsConn.closeChan) } } func main() { http.HandleFunc("/ws", wsHandler) http.ListenAndServe("0.0.0.0:7777", nil) } ~~~ client.html ~~~ <!DOCTYPE html> <html> <head> <meta charset="utf-8"> <script> window.addEventListener("load", function(evt) { var output = document.getElementById("output"); var input = document.getElementById("input"); var ws; var print = function(message) { var d = document.createElement("div"); d.innerHTML = message; output.appendChild(d); }; document.getElementById("open").onclick = function(evt) { if (ws) { return false; } ws = new WebSocket("ws://localhost:7777/ws"); ws.onopen = function(evt) { print("OPEN"); } ws.onclose = function(evt) { print("CLOSE"); ws = null; } ws.onmessage = function(evt) { print("RESPONSE: " + evt.data); } ws.onerror = function(evt) { print("ERROR: " + evt.data); } return false; }; document.getElementById("send").onclick = function(evt) { if (!ws) { return false; } print("SEND: " + input.value); ws.send(input.value); return false; }; document.getElementById("close").onclick = function(evt) { if (!ws) { return false; } ws.close(); return false; }; }); </script> </head> <body> <table> <tr><td valign="top" width="50%"> <p>Click "Open" to create a connection to the server, "Send" to send a message to the server and "Close" to close the connection. You can change the message and send multiple times. </p> <form> <button id="open">Open</button> <button id="close">Close</button> <input id="input" type="text" value="Hello world!"> <button id="send">Send</button> </form> </td><td valign="top" width="50%"> <div id="output"></div> </td></tr></table> </body> </html> ~~~ 首先运行server.go,然后打开client.html页面,即可体验所有流程: