协议
websocket是个二进制协议,需要先通过Http协议进行握手,从而协商完成从Http协议向websocket协议的转换。一旦握手结束,当前的TCP连接后续将采用二进制websocket协议进行双向双工交互,自此与Http协议无关。
可以通过这篇知乎了解一下websocket协议的基本原理:[《WebSocket 是什么原理?为什么可以实现持久连接?》](https://www.zhihu.com/question/20215561)。
粘包
我们开发过TCP服务的都知道,需要通过协议decode从TCP字节流中解析出一个一个请求,那么websocket又怎么样呢?
websocket以message为单位进行通讯,本身就是一个在TCP层上的一个分包协议,其实并不需要我们再进行粘包处理。但是因为单个message可能很大很大(比如一个视频文件),那么websocket显然不适合把一个视频作为一个message传输(中途断了前功尽弃),所以websocket协议其实是支持1个message分多个frame帧传输的。
我们的浏览器提供的编程API都是message粒度的,把frame拆帧的细节对开发者隐蔽了,而服务端websocket框架一般也做了同样的隐藏,会自动帮我们收集所有的frame后拼成messasge再回调,所以结论就是:
websocket以message为单位通讯,不需要开发者自己处理粘包问题。
golang实现
golang官方标准库里有一个websocket的包,但是它提供的就是frame粒度的API,压根不能用。
不过官方其实已经认可了一个准标准库实现,它实现了message粒度的API,让开发者不需要关心websocket协议细节,开发起来非常方便,其文档地址:https://godoc.org/github.com/gorilla/websocket。
开发websocket服务时,首先要基于http库对外暴露接口,然后由websocket库接管TCP连接进行协议升级,然后进行websocket协议的数据交换,所以开发时总是要用到http库和websocket库。
上述websocket文档中对开发websocket服务有明确的注意事项要求,主要是指:
读和写API不是并发安全的,需要启动单个goroutine串行处理。
关闭API是线程安全的,一旦调用则阻塞的读和写API会出错返回,从而终止处理。
在我的实现中,我对websocket进行了封装,简化应用层开发的复杂度,主要思路是:
请求和应答都放入管道中排队。
读协程阻塞读websocket,将message放入请求队列。
写协程阻塞读应答channel,将message写给websocket。
如何处理websocket错误和主动关闭websocket呢?
读/写协程调用websocket若返回错误,那么直接调用websocket的Close关闭连接,协程退出。(此时用户可能仍旧持有连接对象,继续向下阅读!)
websocket连接关闭后,用户通常正阻塞在读/写channel上而不知情,所以每个连接配套一个closeChan专门用于唤醒用户代码,关闭websocket连接同时关闭closeChan,这会令<-closeChan总是立即返回。
因为上一条设计,所以用户读/写channel时总是select同时监听channel和closeChan,以便实时感知到websocket连接的关闭。
用户可以主动关闭连接,websocket连接重复Close没有影响,而closeChan重复关闭会报错,所以通过一个上锁的状态位判重处理。
描述比较繁琐,实际并不复杂,看看我的代码吧:
https://github.com/owenliang/go-websocket。
server.go
~~~
package main
import (
"errors"
"fmt"
"net/http"
"sync"
"time"
"github.com/gorilla/websocket"
)
// http升级websocket协议的配置
var wsUpgrader = websocket.Upgrader{
// 允许所有CORS跨域请求
CheckOrigin: func(r *http.Request) bool {
return true
},
}
// 客户端读写消息
type wsMessage struct {
messageType int
data []byte
}
// 客户端连接
type wsConnection struct {
wsSocket *websocket.Conn // 底层websocket
inChan chan *wsMessage // 读队列
outChan chan *wsMessage // 写队列
mutex sync.Mutex // 避免重复关闭管道
isClosed bool
closeChan chan byte // 关闭通知
}
func (wsConn *wsConnection) wsReadLoop() {
for {
// 读一个message
msgType, data, err := wsConn.wsSocket.ReadMessage()
if err != nil {
goto error
}
req := &wsMessage{
msgType,
data,
}
// 放入请求队列
select {
case wsConn.inChan <- req:
case <-wsConn.closeChan:
goto closed
}
}
error:
wsConn.wsClose()
closed:
}
func (wsConn *wsConnection) wsWriteLoop() {
for {
select {
// 取一个应答
case msg := <-wsConn.outChan:
// 写给websocket
if err := wsConn.wsSocket.WriteMessage(msg.messageType, msg.data); err != nil {
goto error
}
case <-wsConn.closeChan:
goto closed
}
}
error:
wsConn.wsClose()
closed:
}
func (wsConn *wsConnection) procLoop() {
// 启动一个gouroutine发送心跳
go func() {
for {
time.Sleep(2 * time.Second)
if err := wsConn.wsWrite(websocket.TextMessage, []byte("heartbeat from server")); err != nil {
fmt.Println("heartbeat fail")
wsConn.wsClose()
break
}
}
}()
// 这是一个同步处理模型(只是一个例子),如果希望并行处理可以每个请求一个gorutine,注意控制并发goroutine的数量!!!
for {
msg, err := wsConn.wsRead()
if err != nil {
fmt.Println("read fail")
break
}
fmt.Println(string(msg.data))
err = wsConn.wsWrite(msg.messageType, msg.data)
if err != nil {
fmt.Println("write fail")
break
}
}
}
func wsHandler(resp http.ResponseWriter, req *http.Request) {
// 应答客户端告知升级连接为websocket
wsSocket, err := wsUpgrader.Upgrade(resp, req, nil)
if err != nil {
return
}
wsConn := &wsConnection{
wsSocket: wsSocket,
inChan: make(chan *wsMessage, 1000),
outChan: make(chan *wsMessage, 1000),
closeChan: make(chan byte),
isClosed: false,
}
// 处理器
go wsConn.procLoop()
// 读协程
go wsConn.wsReadLoop()
// 写协程
go wsConn.wsWriteLoop()
}
func (wsConn *wsConnection) wsWrite(messageType int, data []byte) error {
select {
case wsConn.outChan <- &wsMessage{messageType, data}:
case <-wsConn.closeChan:
return errors.New("websocket closed")
}
return nil
}
func (wsConn *wsConnection) wsRead() (*wsMessage, error) {
select {
case msg := <-wsConn.inChan:
return msg, nil
case <-wsConn.closeChan:
}
return nil, errors.New("websocket closed")
}
func (wsConn *wsConnection) wsClose() {
wsConn.wsSocket.Close()
wsConn.mutex.Lock()
defer wsConn.mutex.Unlock()
if !wsConn.isClosed {
wsConn.isClosed = true
close(wsConn.closeChan)
}
}
func main() {
http.HandleFunc("/ws", wsHandler)
http.ListenAndServe("0.0.0.0:7777", nil)
}
~~~
client.html
~~~
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<script>
window.addEventListener("load", function(evt) {
var output = document.getElementById("output");
var input = document.getElementById("input");
var ws;
var print = function(message) {
var d = document.createElement("div");
d.innerHTML = message;
output.appendChild(d);
};
document.getElementById("open").onclick = function(evt) {
if (ws) {
return false;
}
ws = new WebSocket("ws://localhost:7777/ws");
ws.onopen = function(evt) {
print("OPEN");
}
ws.onclose = function(evt) {
print("CLOSE");
ws = null;
}
ws.onmessage = function(evt) {
print("RESPONSE: " + evt.data);
}
ws.onerror = function(evt) {
print("ERROR: " + evt.data);
}
return false;
};
document.getElementById("send").onclick = function(evt) {
if (!ws) {
return false;
}
print("SEND: " + input.value);
ws.send(input.value);
return false;
};
document.getElementById("close").onclick = function(evt) {
if (!ws) {
return false;
}
ws.close();
return false;
};
});
</script>
</head>
<body>
<table>
<tr><td valign="top" width="50%">
<p>Click "Open" to create a connection to the server,
"Send" to send a message to the server and "Close" to close the connection.
You can change the message and send multiple times.
</p>
<form>
<button id="open">Open</button>
<button id="close">Close</button>
<input id="input" type="text" value="Hello world!">
<button id="send">Send</button>
</form>
</td><td valign="top" width="50%">
<div id="output"></div>
</td></tr></table>
</body>
</html>
~~~
首先运行server.go,然后打开client.html页面,即可体验所有流程:
- 序言
- 目录
- 环境搭建
- Linux搭建golang环境
- Windows搭建golang环境
- Mac搭建golang环境
- 介绍
- 1.Go语言的主要特征
- 2.golang内置类型和函数
- 3.init函数和main函数
- 4.包
- 1.工作空间
- 2.源文件
- 3.包结构
- 4.文档
- 5.编写 Hello World
- 6.Go语言 “ _ ”(下划线)
- 7.运算符
- 8.命令
- 类型
- 1.变量
- 2.常量
- 3.基本类型
- 1.基本类型介绍
- 2.字符串String
- 3.数组Array
- 4.类型转换
- 4.引用类型
- 1.引用类型介绍
- 2.切片Slice
- 3.容器Map
- 4.管道Channel
- 5.指针
- 6.自定义类型Struct
- 编码格式转换
- 流程控制
- 1.条件语句(if)
- 2.条件语句 (switch)
- 3.条件语句 (select)
- 4.循环语句 (for)
- 5.循环语句 (range)
- 6.循环控制Goto、Break、Continue
- 函数
- 1.函数定义
- 2.参数
- 3.返回值
- 4.匿名函数
- 5.闭包、递归
- 6.延迟调用 (defer)
- 7.异常处理
- 8.单元测试
- 压力测试
- 方法
- 1.方法定义
- 2.匿名字段
- 3.方法集
- 4.表达式
- 5.自定义error
- 接口
- 1.接口定义
- 2.执行机制
- 3.接口转换
- 4.接口技巧
- 面向对象特性
- 并发
- 1.并发介绍
- 2.Goroutine
- 3.Chan
- 4.WaitGroup
- 5.Context
- 应用
- 反射reflection
- 1.获取基本类型
- 2.获取结构体
- 3.Elem反射操作基本类型
- 4.反射调用结构体方法
- 5.Elem反射操作结构体
- 6.Elem反射获取tag
- 7.应用
- json协议
- 1.结构体转json
- 2.map转json
- 3.int转json
- 4.slice转json
- 5.json反序列化为结构体
- 6.json反序列化为map
- 终端读取
- 1.键盘(控制台)输入fmt
- 2.命令行参数os.Args
- 3.命令行参数flag
- 文件操作
- 1.文件创建
- 2.文件写入
- 3.文件读取
- 4.文件删除
- 5.压缩文件读写
- 6.判断文件或文件夹是否存在
- 7.从一个文件拷贝到另一个文件
- 8.写入内容到Excel
- 9.日志(log)文件
- server服务
- 1.服务端
- 2.客户端
- 3.tcp获取网页数据
- 4.http初识-浏览器访问服务器
- 5.客户端访问服务器
- 6.访问延迟处理
- 7.form表单提交
- web模板
- 1.渲染终端
- 2.渲染浏览器
- 3.渲染存储文件
- 4.自定义io.Writer渲染
- 5.模板语法
- 时间处理
- 1.格式化
- 2.运行时间
- 3.定时器
- 锁机制
- 互斥锁
- 读写锁
- 性能比较
- sync.Map
- 原子操作
- 1.原子增(减)值
- 2.比较并交换
- 3.导入、导出、交换
- 加密解密
- 1.md5
- 2.base64
- 3.sha
- 4.hmac
- 常用算法
- 1.冒泡排序
- 2.选择排序
- 3.快速排序
- 4.插入排序
- 5.睡眠排序
- 设计模式
- 创建型模式
- 单例模式
- 抽象工厂模式
- 工厂方法模式
- 原型模式
- 结构型模式
- 适配器模式
- 桥接模式
- 合成/组合模式
- 装饰模式
- 外观模式
- 享元模式
- 代理模式
- 行为性模式
- 职责链模式
- 命令模式
- 解释器模式
- 迭代器模式
- 中介者模式
- 备忘录模式
- 观察者模式
- 状态模式
- 策略模式
- 模板模式
- 访问者模式
- 数据库操作
- golang操作MySQL
- 1.mysql使用
- 2.insert操作
- 3.select 操作
- 4.update 操作
- 5.delete 操作
- 6.MySQL事务
- golang操作Redis
- 1.redis介绍
- 2.golang链接redis
- 3.String类型 Set、Get操作
- 4.String 批量操作
- 5.设置过期时间
- 6.list队列操作
- 7.Hash表
- 8.Redis连接池
- golang操作ETCD
- 1.etcd介绍
- 2.链接etcd
- 3.etcd存取
- 4.etcd监听Watch
- golang操作kafka
- 1.kafka介绍
- 2.写入kafka
- 3.kafka消费
- golang操作ElasticSearch
- 1.ElasticSearch介绍
- 2.kibana介绍
- 3.写入ElasticSearch
- NSQ
- 安装
- 生产者
- 消费者
- beego框架
- 1.beego框架环境搭建
- 2.参数配置
- 1.默认参数
- 2.自定义配置
- 3.config包使用
- 3.路由设置
- 1.自动匹配
- 2.固定路由
- 3.正则路由
- 4.注解路由
- 5.namespace
- 4.多种数据格式输出
- 1.直接输出字符串
- 2.模板数据输出
- 3.json格式数据输出
- 4.xml格式数据输出
- 5.jsonp调用
- 5.模板处理
- 1.模板语法
- 2.基本函数
- 3.模板函数
- 6.请求处理
- 1.GET请求
- 2.POST请求
- 3.文件上传
- 7.表单验证
- 1.表单验证
- 2.定制错误信息
- 3.struct tag 验证
- 4.XSRF过滤
- 8.静态文件处理
- 1.layout设计
- 9.日志处理
- 1.日志处理
- 2.logs 模块
- 10.会话控制
- 1.会话控制
- 2.session 包使用
- 11.ORM 使用
- 1.链接数据库
- 2. CRUD 操作
- 3.原生 SQL 操作
- 4.构造查询
- 5.事务处理
- 6.自动建表
- 12.beego 验证码
- 1.验证码插件
- 2.验证码使用
- beego admin
- 1.admin安装
- 2.admin开发
- beego 热升级
- gin框架
- 安装使用
- 项目
- 秒杀项目
- 日志收集
- 面试题
- 面试题一
- 面试题二
- 错题集
- Go语言陷阱和常见错误
- 常见语法错误
- 初级
- 中级
- 高级
- Go高级应用
- goim
- goim 启动流程
- goim 工作流程
- goim 结构体
- gopush
- gopush工作流程
- gopush启动流程
- gopush业务流程
- gopush应用
- gopush新添功能
- rpc
- HTTP RPC
- TCP RPC
- JSON RPC
- 常见RPC开源框架
- pprof
- pprof介绍
- pprof应用
- 封装 websocket
- zookeeper
- 基本操作测试
- 简单的分布式server
- Zookeeper命令行使用
- cgo
- Go语言 demo
- 用Go语言计算一个人的年龄,生肖,星座
- 超简易Go语言实现的留言板代码
- 信号处理模块,可用于在线加载配置,配置动态加载的信号为SIGHUP
- 阳历和阴历相互转化的工具类 golang版本
- 错误总结