goim 工作流程
![](https://box.kancloud.cn/7c8590b2972e8cf89b6bcb1f4c30330c_1457x1173.png)
其中 comet服务 在最前面对外提供服务,提供 tcp 和 websocket 两种服务。
接受到消息之后 交给 logic 模块处理。logic 模块在调用 router 查询路由信息。将发送的消息交给 kafka,再由 job 服务读取kafka的消息,推给 comet服务。
拆分成这么多模块的好处是可以多实例部署。其中 在 comet,router,logic 模块都提供了性能监控 net/http/pprof
同时还提供了monitor 服务,服务启动之后访问 xxx:port/monitor/ping
返回 ok 说明服务启动正常,方便运维检查系统。
**各模块功能分析**
**comet 模块**
comet 模块是在最前端,主要负责和client的链接保持,同时接受,发送消息,通知到客户端。检查链接是否断开。
comet 属于接入层,非常容易扩展,直接开启多个comet节点,修改配置文件中的base节点下的server.id修改成不同值(注意一定要保证不同的comet进程值唯一),前端接入可以使用LVS 或者 DNS来转发。
**logic 模块**
logic 模块是comet 模块调用的,接受 comet 模块的命令,
然后进行处理,再发送的消息的kafka队列上,同时链接 router 模块,记录用户的 uid server room 等信息。同时获得router模块的信息。
logic 属于无状态的逻辑层,可以随意增加节点,使用nginx upstream来扩展http接口,内部rpc部分,可以使用LVS四层转发。
**router 模块**
router 主要记录了用户的session信息。存储在 Bucket 对象里面。
**job 模块**
job服务每秒都在同步 comet的信息,然后再读取 kafka队列的信息。
push 到相关的comet服务器上。用户就接受到了消息。
job 根据kafka的partition来扩展多job工作方式,具体可以参考下kafka的partition负载。
**新添加功能**
1、用户通过websocket 发消息,comet模块使用 rpc 调用logic 将消息发送到kafka,job消费kafka的消息推送给用户。
2、将 kafka 消息队列改成 nsq 消息队列。kafka 与 nsq 对比
kafka消息会固化,存文件,nsq默认是不保存的
kafka消息因为固化下来,所以是保序的,nsq传递时候通常是无序的,当然你也可以保留下信息去check时间戳,因此nsq更适合处理数据量大但是彼此间没有顺序关系的消息。
nsq支持延时消息的投递,比如我想这条消息5分钟之后才被投递出去被客户端消费,较于普通的消息投递,多了个毫秒数。延时消息可用于以下场景,比如一个订单超过30分钟未付款,修改其状态 或者给客户发短信提醒。
3、router挂掉后如何处理?
完成把客户端的心跳消息,也通过logic,都发到router。如果router挂掉,可以根据这些心跳重建在线用户map。
心跳消息发给router,还有一个好处是,能清理太久没有下线的用户脏数据。
![](https://box.kancloud.cn/b40b732fab910e2c9607138022004439_1413x984.png)
**op状态**
握手:op = 0
握手返回: op = 1
心跳:op = 2
心跳返回: op = 3
发消息:op = 4
消息返回:op = 5
授权用户: op = 7
返回: op = 8
OP_PROTO_READY // 消息推送
OP_PROTO_FINISH // 关闭退出
websocket.go
**客户端链接逻辑**
~~~
go acceptWebsocket 监听客户端链接
go serveWebsocket(server, conn, r)
lAddr := conn.LocalAddr().String() 获取服务端ip
rAddr := conn.RemoteAddr().String() 获取客户端ip
server.serveWebsocket(conn, rp, wp, tr)
ch.Reader.ResetBuffer(conn, rb.Bytes()) 读缓冲,重置IO,可读游标重置为0,且b.buf变为buf
tr.Add 添加一个时钟对象
websocket.ReadRequest(rr) websocket,读取请求
wp.Get() 获取一个写缓冲区
ch.Writer.ResetBuffer(conn, wb.Bytes()) 重置IO,可写游标重置为0,句柄变为w,缓冲区变为buf
websocket.Upgrade(conn, rr, wr, req) 交换协议
ch.CliProto.Set() 获取一个proto对象的引用,用于写,不会移动可写游标
key, roomId, hb, err = server.authWebsocket(ws, p) 握手->注册,返回 subkey,roomId,heartbeat
握手->注册逻辑
server.operator.Connect(p);
PRC -> 调用 logic RPC.Connect 通过 token 获取 用户 userId、roomId
rpc -> 调用 RouterRPC.Put 通过 UserId、ServerId、RoomId 存 session
server.Bucket(key) 使用subkey根据CityHash32算法,分配到对应的Bucket
b.Put(key, roomId, ch) 分配到chs和rooms
tr.Set(trd, hb) 设置更新定时器数据。
DefaultWhitelist.Contains(key) 判断是否是白名单用户
server.Stat.IncrWsOnline() 增加 websocket online 统计
~~~
**客户端链接逻辑流程图**
![](https://box.kancloud.cn/2be435084ad10ba3fda13b654ddfda5a_1225x1272.png)
**收发消息逻辑**
~~~
参数:
conn net.Conn 链接
rp, wp *bytes.Pool 读写缓冲池
Pool内存组织如下,Pool是一个链式存储的栈,数据从栈顶出,同时数据也从栈顶回收。
tr *itime.Timer 最小堆算法实现的时钟对象
协程 go server.serveWebsocket() 处理读消息(接收心跳消息、接收用户发消息)
for {
ch.CliProto.Set() 从 Ring 上,获取一个proto对象的引用,用于写,不会移动可写游标
p.ReadWebsocket(ws) Read Websocket 读websocket发来的消息
判断是 心跳检测 or 发消息操作
p.Operation == define.OP_HEARTBEAT 心跳检测
p.Operation = define.OP_HEARTBEAT_REPLY 心跳回应
else 发消息操作
err = server.operator.Operate(p)
p.Operation = define.OP_SEND_SMS_REPLY 发消息回应
ch.CliProto.SetAdv() 移动 Ring 的可写游标
ch.Signal() 将消息 写入 c.signal chan *proto.Proto
}
协程 go server.dispatchWebsocket() 处理写消息(心跳回应、推送消息)
for {
ch.Ready() 消费 c.signal chan *proto.Proto 的消息
switch p 判断消息类型(ProtoReady消息回应、ProtoFinish页面关闭、default 推送消息)
ProtoReady->自己发的消息
p, err = ch.CliProto.Get() 从 Ring 上 获取一个proto对象的引用,用于读,不会移动可读游标
p.WriteWebsocket(ws) 消息推送(发消息返回结果)
ch.CliProto.GetAdv() 移动可读游标
ProtoFinish页面关闭
-->走断开链接逻辑
default push推送消息
err = p.WriteWebsocket(ws) 消息推送
ws.Flush() 释放写缓冲池
}
~~~
**收发消息流程图**
![](https://box.kancloud.cn/46b499792e47adbca7b1839e0e57fa63_1432x1596.png)
**断开链接逻辑**
~~~
err = p.ReadWebsocket(ws); err != nil 浏览器断开链接
ws.ReadMessage(); err != nil
err = ErrMessageClose
b.Del(key) 根据subkey 删除 chs和rooms 上的Channel和room
tr.Del(trd) 删除时间计数
ws.Close() 关闭websocket链接
ch.Close() 发送关闭channel信号 "proto.ProtoFinish"
rp.Put(rb) 归还一个缓冲区
server.operator.Disconnect(key, roomId); 等待注销结果
注销过程
server.operator.Disconnect(key, roomId);
rpc -> 调用 logic RPC.Disconnect 根据 subKey 返回UserId、seq
rpc -> 调用 RouterRPC.Del 根据 UserId、Seq、RoomId 删除 session
p = ch.Ready() (等待 return <-c.signal )接受到 "proto.ProtoFinish" 检测到浏览器断开
switch p -> proto.ProtoFinish: 进行退出操作
ws.Close() 关闭链接
wp.Put(wb) 归还一个缓冲区
dispatch goroutine exit 退出协程 go server.dispatchWebsocket()
注销结果返回
server tcp goroutine exit 退出协程 go server.serveWebsocket()
server.Stat.DecrWsOnline() 减少 websocket online 统计
~~~
**断开链接流程图**
![](https://box.kancloud.cn/82e87971bacdde40a5a9e870cd439d0a_786x1423.png)
**几个重要的结构体**
做为典型代码即注释的开源项目,goim 基本无太多阅读障碍,几个逻辑点梳理下很快就会明白。
Bucket: 每个 Comet 程序拥有若干个 Bucket, 可以理解为 Session Management, 保存着当前 Comet 服务于哪些 Room 和 Channel. 长连接具体分布在哪个 Bucket 上呢?根据 SubKey 一致性 Hash 来选择。
Room: 可以理解为房间,群组或是一个 Group. 这个房间内维护 N 个 Channel, 即长连接用户。在该 Room 内广播消息,会发送给房间内的所有 Channel.
Channel: 维护一个长连接用户,只能对应一个 Room. 推送的消息可以在 Room 内广播,也可以推送到指定的 Channel.
Proto: 消息结构体,存放版本号,操作类型,消息序号和消息体。
**job模块消费kafka逻辑**
pushRoutines []chan *proto.MPushMsgArg 多播
broadcastRoutines []chan *proto.BoardcastArg 广播
roomRoutines []chan *proto.BoardcastRoomArg 房播
~~~
消息从kafka集群消费,经过kafka模块转发至push模块,push模块对消息预处理/过滤/分类,然后发至不同的 comet 信道中
kafka
-> push(msg.Value) 消费kafka消息
push
-> push() 判断消息类型,多播、广播、房播
多播:define.KAFKA_MESSAGE_MULTI:
-> go processPush(pushChs[i]) 根据启动的 go 协程取模分配
-> comet.mPushComet(serverId int32, subKeys []string, body json.RawMessage) 消费 pushChs []chan *pushArg chan 的消息
-> Push(arg *proto.MPushMsgArg) 将消息推到 pushRoutines []chan *proto.MPushMsgArg chan 中
广播:define.KAFKA_MESSAGE_BROADCAST:
-> broadcast(m.Msg) 调用所有的 *Comet
-> c.Broadcast(&args) 将消息推到 broadcastRoutines []chan *proto.BoardcastArg chan 中
房播:define.KAFKA_MESSAGE_BROADCAST_ROOM:
-> roomBucket.Get(int32(m.RoomId)) 获取房间信息
-> NewRoom(roomId, b.round.Timer(b.roomNum), b.options)
-> go r.pushproc(t, options.BatchNum, options.SignalTime, options.IdleTime)
-> broadcastRoomBytes(r.id, buf.Buffer()) 根据roomid获取 *Comet
-> c.BroadcastRoom(&args) 将消息推到 roomRoutines []chan *proto.BoardcastRoomArg chan 中(根据消息总数与options.RoutineSize取模)
-> room.Push(0, define.OP_SEND_SMS_REPLY, m.Msg) 将消息保存到room.proto chan *proto.Proto chan 中
main.InitComet(addrs map[int32]string, options CometOptions) 初始化 comet
-> go c.process(pushChan, roomChan, broadcastChan) 通过 rpc 调用 comet,作为pushChan,roomChan,broadcastChan信道的消费者
~~~
**job模块消费kafka逻辑流程图**
![](https://box.kancloud.cn/d9f0e816b12f98f77f7b82684749f508_1431x1368.png)
**logic 模块 push http推送接口逻辑**
~~~
http接口-->kafka生产者逻辑
单人推送:push
-> r.URL.Query().Get("uid") 获取userId
-> ioutil.ReadAll(r.Body) 获取body
-> genSubKey(userId) 通过用户ID生成subKeys;同一个用户可以同时多处登陆或者同处多实例登陆,它们都会被同等对待
-> rpc 到 router 模块 RouterRPC.Get 获取指定用户的session信息
-> mpushKafka 根据serverId、subkeys、Msg ,push 到 kafka
-> OP: define.KAFKA_MESSAGE_MULTI 单播
-> producer.Input()
多人推送:Pushs
-> ioutil.ReadAll(r.Body) 获取body
-> parsePushsBody(body []byte) 将body转成userIds、Msg
-> genSubKeys(userIds) 并行获取多个用户信息,返回值为map[comet.serverId][]subkey.
-> mpushKafka 根据serverId、subkeys、Msg ,push 到 kafka
-> OP: define.KAFKA_MESSAGE_MULTI 单播
-> producer.Input()
房间推送:PushRoom
-> ioutil.ReadAll(r.Body) 获取body
-> param.Get("rid") 获取roomId
-> strconv.ParseBool(param.Get("ensure")) 获取ensure,是否强推
-> broadcastRoomKafka 根据roomId、Msg、ensure ,push 到 kafka
-> OP: define.KAFKA_MESSAGE_BROADCAST_ROOM 房间内广播
-> producer.Input()
广播:PushAll
-> ioutil.ReadAll(r.Body) 获取body
-> mpushKafka 根据Msg ,push 到 kafka
-> OP: define.KAFKA_MESSAGE_BROADCAST 广播
-> producer.Input()
~~~
**logic 模块 push http推送接口逻辑流程图**
![](https://box.kancloud.cn/d804684a24b03da7b272797239c5cee0_1512x1341.png)
- 序言
- 目录
- 环境搭建
- Linux搭建golang环境
- Windows搭建golang环境
- Mac搭建golang环境
- 介绍
- 1.Go语言的主要特征
- 2.golang内置类型和函数
- 3.init函数和main函数
- 4.包
- 1.工作空间
- 2.源文件
- 3.包结构
- 4.文档
- 5.编写 Hello World
- 6.Go语言 “ _ ”(下划线)
- 7.运算符
- 8.命令
- 类型
- 1.变量
- 2.常量
- 3.基本类型
- 1.基本类型介绍
- 2.字符串String
- 3.数组Array
- 4.类型转换
- 4.引用类型
- 1.引用类型介绍
- 2.切片Slice
- 3.容器Map
- 4.管道Channel
- 5.指针
- 6.自定义类型Struct
- 编码格式转换
- 流程控制
- 1.条件语句(if)
- 2.条件语句 (switch)
- 3.条件语句 (select)
- 4.循环语句 (for)
- 5.循环语句 (range)
- 6.循环控制Goto、Break、Continue
- 函数
- 1.函数定义
- 2.参数
- 3.返回值
- 4.匿名函数
- 5.闭包、递归
- 6.延迟调用 (defer)
- 7.异常处理
- 8.单元测试
- 压力测试
- 方法
- 1.方法定义
- 2.匿名字段
- 3.方法集
- 4.表达式
- 5.自定义error
- 接口
- 1.接口定义
- 2.执行机制
- 3.接口转换
- 4.接口技巧
- 面向对象特性
- 并发
- 1.并发介绍
- 2.Goroutine
- 3.Chan
- 4.WaitGroup
- 5.Context
- 应用
- 反射reflection
- 1.获取基本类型
- 2.获取结构体
- 3.Elem反射操作基本类型
- 4.反射调用结构体方法
- 5.Elem反射操作结构体
- 6.Elem反射获取tag
- 7.应用
- json协议
- 1.结构体转json
- 2.map转json
- 3.int转json
- 4.slice转json
- 5.json反序列化为结构体
- 6.json反序列化为map
- 终端读取
- 1.键盘(控制台)输入fmt
- 2.命令行参数os.Args
- 3.命令行参数flag
- 文件操作
- 1.文件创建
- 2.文件写入
- 3.文件读取
- 4.文件删除
- 5.压缩文件读写
- 6.判断文件或文件夹是否存在
- 7.从一个文件拷贝到另一个文件
- 8.写入内容到Excel
- 9.日志(log)文件
- server服务
- 1.服务端
- 2.客户端
- 3.tcp获取网页数据
- 4.http初识-浏览器访问服务器
- 5.客户端访问服务器
- 6.访问延迟处理
- 7.form表单提交
- web模板
- 1.渲染终端
- 2.渲染浏览器
- 3.渲染存储文件
- 4.自定义io.Writer渲染
- 5.模板语法
- 时间处理
- 1.格式化
- 2.运行时间
- 3.定时器
- 锁机制
- 互斥锁
- 读写锁
- 性能比较
- sync.Map
- 原子操作
- 1.原子增(减)值
- 2.比较并交换
- 3.导入、导出、交换
- 加密解密
- 1.md5
- 2.base64
- 3.sha
- 4.hmac
- 常用算法
- 1.冒泡排序
- 2.选择排序
- 3.快速排序
- 4.插入排序
- 5.睡眠排序
- 设计模式
- 创建型模式
- 单例模式
- 抽象工厂模式
- 工厂方法模式
- 原型模式
- 结构型模式
- 适配器模式
- 桥接模式
- 合成/组合模式
- 装饰模式
- 外观模式
- 享元模式
- 代理模式
- 行为性模式
- 职责链模式
- 命令模式
- 解释器模式
- 迭代器模式
- 中介者模式
- 备忘录模式
- 观察者模式
- 状态模式
- 策略模式
- 模板模式
- 访问者模式
- 数据库操作
- golang操作MySQL
- 1.mysql使用
- 2.insert操作
- 3.select 操作
- 4.update 操作
- 5.delete 操作
- 6.MySQL事务
- golang操作Redis
- 1.redis介绍
- 2.golang链接redis
- 3.String类型 Set、Get操作
- 4.String 批量操作
- 5.设置过期时间
- 6.list队列操作
- 7.Hash表
- 8.Redis连接池
- golang操作ETCD
- 1.etcd介绍
- 2.链接etcd
- 3.etcd存取
- 4.etcd监听Watch
- golang操作kafka
- 1.kafka介绍
- 2.写入kafka
- 3.kafka消费
- golang操作ElasticSearch
- 1.ElasticSearch介绍
- 2.kibana介绍
- 3.写入ElasticSearch
- NSQ
- 安装
- 生产者
- 消费者
- beego框架
- 1.beego框架环境搭建
- 2.参数配置
- 1.默认参数
- 2.自定义配置
- 3.config包使用
- 3.路由设置
- 1.自动匹配
- 2.固定路由
- 3.正则路由
- 4.注解路由
- 5.namespace
- 4.多种数据格式输出
- 1.直接输出字符串
- 2.模板数据输出
- 3.json格式数据输出
- 4.xml格式数据输出
- 5.jsonp调用
- 5.模板处理
- 1.模板语法
- 2.基本函数
- 3.模板函数
- 6.请求处理
- 1.GET请求
- 2.POST请求
- 3.文件上传
- 7.表单验证
- 1.表单验证
- 2.定制错误信息
- 3.struct tag 验证
- 4.XSRF过滤
- 8.静态文件处理
- 1.layout设计
- 9.日志处理
- 1.日志处理
- 2.logs 模块
- 10.会话控制
- 1.会话控制
- 2.session 包使用
- 11.ORM 使用
- 1.链接数据库
- 2. CRUD 操作
- 3.原生 SQL 操作
- 4.构造查询
- 5.事务处理
- 6.自动建表
- 12.beego 验证码
- 1.验证码插件
- 2.验证码使用
- beego admin
- 1.admin安装
- 2.admin开发
- beego 热升级
- gin框架
- 安装使用
- 项目
- 秒杀项目
- 日志收集
- 面试题
- 面试题一
- 面试题二
- 错题集
- Go语言陷阱和常见错误
- 常见语法错误
- 初级
- 中级
- 高级
- Go高级应用
- goim
- goim 启动流程
- goim 工作流程
- goim 结构体
- gopush
- gopush工作流程
- gopush启动流程
- gopush业务流程
- gopush应用
- gopush新添功能
- rpc
- HTTP RPC
- TCP RPC
- JSON RPC
- 常见RPC开源框架
- pprof
- pprof介绍
- pprof应用
- 封装 websocket
- zookeeper
- 基本操作测试
- 简单的分布式server
- Zookeeper命令行使用
- cgo
- Go语言 demo
- 用Go语言计算一个人的年龄,生肖,星座
- 超简易Go语言实现的留言板代码
- 信号处理模块,可用于在线加载配置,配置动态加载的信号为SIGHUP
- 阳历和阴历相互转化的工具类 golang版本
- 错误总结