ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
goim 工作流程 ![](https://box.kancloud.cn/7c8590b2972e8cf89b6bcb1f4c30330c_1457x1173.png) 其中 comet服务 在最前面对外提供服务,提供 tcp 和 websocket 两种服务。 接受到消息之后 交给 logic 模块处理。logic 模块在调用 router 查询路由信息。将发送的消息交给 kafka,再由 job 服务读取kafka的消息,推给 comet服务。 拆分成这么多模块的好处是可以多实例部署。其中 在 comet,router,logic 模块都提供了性能监控 net/http/pprof 同时还提供了monitor 服务,服务启动之后访问 xxx:port/monitor/ping 返回 ok 说明服务启动正常,方便运维检查系统。 **各模块功能分析** **comet 模块** comet 模块是在最前端,主要负责和client的链接保持,同时接受,发送消息,通知到客户端。检查链接是否断开。 comet 属于接入层,非常容易扩展,直接开启多个comet节点,修改配置文件中的base节点下的server.id修改成不同值(注意一定要保证不同的comet进程值唯一),前端接入可以使用LVS 或者 DNS来转发。 **logic 模块** logic 模块是comet 模块调用的,接受 comet 模块的命令, 然后进行处理,再发送的消息的kafka队列上,同时链接 router 模块,记录用户的 uid server room 等信息。同时获得router模块的信息。 logic 属于无状态的逻辑层,可以随意增加节点,使用nginx upstream来扩展http接口,内部rpc部分,可以使用LVS四层转发。 **router 模块** router 主要记录了用户的session信息。存储在 Bucket 对象里面。 **job 模块** job服务每秒都在同步 comet的信息,然后再读取 kafka队列的信息。 push 到相关的comet服务器上。用户就接受到了消息。 job 根据kafka的partition来扩展多job工作方式,具体可以参考下kafka的partition负载。 **新添加功能** 1、用户通过websocket 发消息,comet模块使用 rpc 调用logic 将消息发送到kafka,job消费kafka的消息推送给用户。 2、将 kafka 消息队列改成 nsq 消息队列。kafka 与 nsq 对比 kafka消息会固化,存文件,nsq默认是不保存的 kafka消息因为固化下来,所以是保序的,nsq传递时候通常是无序的,当然你也可以保留下信息去check时间戳,因此nsq更适合处理数据量大但是彼此间没有顺序关系的消息。 nsq支持延时消息的投递,比如我想这条消息5分钟之后才被投递出去被客户端消费,较于普通的消息投递,多了个毫秒数。延时消息可用于以下场景,比如一个订单超过30分钟未付款,修改其状态 或者给客户发短信提醒。 3、router挂掉后如何处理? 完成把客户端的心跳消息,也通过logic,都发到router。如果router挂掉,可以根据这些心跳重建在线用户map。 心跳消息发给router,还有一个好处是,能清理太久没有下线的用户脏数据。 ![](https://box.kancloud.cn/b40b732fab910e2c9607138022004439_1413x984.png) **op状态** 握手:op = 0 握手返回: op = 1 心跳:op = 2 心跳返回: op = 3 发消息:op = 4 消息返回:op = 5 授权用户: op = 7 返回: op = 8 OP_PROTO_READY // 消息推送 OP_PROTO_FINISH // 关闭退出 websocket.go **客户端链接逻辑** ~~~ go acceptWebsocket 监听客户端链接 go serveWebsocket(server, conn, r) lAddr := conn.LocalAddr().String() 获取服务端ip rAddr := conn.RemoteAddr().String() 获取客户端ip server.serveWebsocket(conn, rp, wp, tr) ch.Reader.ResetBuffer(conn, rb.Bytes()) 读缓冲,重置IO,可读游标重置为0,且b.buf变为buf tr.Add 添加一个时钟对象 websocket.ReadRequest(rr) websocket,读取请求 wp.Get() 获取一个写缓冲区 ch.Writer.ResetBuffer(conn, wb.Bytes()) 重置IO,可写游标重置为0,句柄变为w,缓冲区变为buf websocket.Upgrade(conn, rr, wr, req) 交换协议 ch.CliProto.Set() 获取一个proto对象的引用,用于写,不会移动可写游标 key, roomId, hb, err = server.authWebsocket(ws, p) 握手->注册,返回 subkey,roomId,heartbeat 握手->注册逻辑 server.operator.Connect(p); PRC -> 调用 logic RPC.Connect 通过 token 获取 用户 userId、roomId rpc -> 调用 RouterRPC.Put 通过 UserId、ServerId、RoomId 存 session server.Bucket(key) 使用subkey根据CityHash32算法,分配到对应的Bucket b.Put(key, roomId, ch) 分配到chs和rooms tr.Set(trd, hb) 设置更新定时器数据。 DefaultWhitelist.Contains(key) 判断是否是白名单用户 server.Stat.IncrWsOnline() 增加 websocket online 统计 ~~~ **客户端链接逻辑流程图** ![](https://box.kancloud.cn/2be435084ad10ba3fda13b654ddfda5a_1225x1272.png) **收发消息逻辑** ~~~ 参数: conn net.Conn 链接 rp, wp *bytes.Pool 读写缓冲池 Pool内存组织如下,Pool是一个链式存储的栈,数据从栈顶出,同时数据也从栈顶回收。 tr *itime.Timer 最小堆算法实现的时钟对象 协程 go server.serveWebsocket() 处理读消息(接收心跳消息、接收用户发消息) for { ch.CliProto.Set() 从 Ring 上,获取一个proto对象的引用,用于写,不会移动可写游标 p.ReadWebsocket(ws) Read Websocket 读websocket发来的消息 判断是 心跳检测 or 发消息操作 p.Operation == define.OP_HEARTBEAT 心跳检测 p.Operation = define.OP_HEARTBEAT_REPLY 心跳回应 else 发消息操作 err = server.operator.Operate(p) p.Operation = define.OP_SEND_SMS_REPLY 发消息回应 ch.CliProto.SetAdv() 移动 Ring 的可写游标 ch.Signal() 将消息 写入 c.signal chan *proto.Proto } 协程 go server.dispatchWebsocket() 处理写消息(心跳回应、推送消息) for { ch.Ready() 消费 c.signal chan *proto.Proto 的消息 switch p 判断消息类型(ProtoReady消息回应、ProtoFinish页面关闭、default 推送消息) ProtoReady->自己发的消息 p, err = ch.CliProto.Get() 从 Ring 上 获取一个proto对象的引用,用于读,不会移动可读游标 p.WriteWebsocket(ws) 消息推送(发消息返回结果) ch.CliProto.GetAdv() 移动可读游标 ProtoFinish页面关闭 -->走断开链接逻辑 default push推送消息 err = p.WriteWebsocket(ws) 消息推送 ws.Flush() 释放写缓冲池 } ~~~ **收发消息流程图** ![](https://box.kancloud.cn/46b499792e47adbca7b1839e0e57fa63_1432x1596.png) **断开链接逻辑** ~~~ err = p.ReadWebsocket(ws); err != nil 浏览器断开链接 ws.ReadMessage(); err != nil err = ErrMessageClose b.Del(key) 根据subkey 删除 chs和rooms 上的Channel和room tr.Del(trd) 删除时间计数 ws.Close() 关闭websocket链接 ch.Close() 发送关闭channel信号 "proto.ProtoFinish" rp.Put(rb) 归还一个缓冲区 server.operator.Disconnect(key, roomId); 等待注销结果 注销过程 server.operator.Disconnect(key, roomId); rpc -> 调用 logic RPC.Disconnect 根据 subKey 返回UserId、seq rpc -> 调用 RouterRPC.Del 根据 UserId、Seq、RoomId 删除 session p = ch.Ready() (等待 return <-c.signal )接受到 "proto.ProtoFinish" 检测到浏览器断开 switch p -> proto.ProtoFinish: 进行退出操作 ws.Close() 关闭链接 wp.Put(wb) 归还一个缓冲区 dispatch goroutine exit 退出协程 go server.dispatchWebsocket() 注销结果返回 server tcp goroutine exit 退出协程 go server.serveWebsocket() server.Stat.DecrWsOnline() 减少 websocket online 统计 ~~~ **断开链接流程图** ![](https://box.kancloud.cn/82e87971bacdde40a5a9e870cd439d0a_786x1423.png) **几个重要的结构体** 做为典型代码即注释的开源项目,goim 基本无太多阅读障碍,几个逻辑点梳理下很快就会明白。 Bucket: 每个 Comet 程序拥有若干个 Bucket, 可以理解为 Session Management, 保存着当前 Comet 服务于哪些 Room 和 Channel. 长连接具体分布在哪个 Bucket 上呢?根据 SubKey 一致性 Hash 来选择。 Room: 可以理解为房间,群组或是一个 Group. 这个房间内维护 N 个 Channel, 即长连接用户。在该 Room 内广播消息,会发送给房间内的所有 Channel. Channel: 维护一个长连接用户,只能对应一个 Room. 推送的消息可以在 Room 内广播,也可以推送到指定的 Channel. Proto: 消息结构体,存放版本号,操作类型,消息序号和消息体。 **job模块消费kafka逻辑** pushRoutines []chan *proto.MPushMsgArg 多播 broadcastRoutines []chan *proto.BoardcastArg 广播 roomRoutines []chan *proto.BoardcastRoomArg 房播 ~~~ 消息从kafka集群消费,经过kafka模块转发至push模块,push模块对消息预处理/过滤/分类,然后发至不同的 comet 信道中 kafka -> push(msg.Value) 消费kafka消息 push -> push() 判断消息类型,多播、广播、房播 多播:define.KAFKA_MESSAGE_MULTI: -> go processPush(pushChs[i]) 根据启动的 go 协程取模分配 -> comet.mPushComet(serverId int32, subKeys []string, body json.RawMessage) 消费 pushChs []chan *pushArg chan 的消息 -> Push(arg *proto.MPushMsgArg) 将消息推到 pushRoutines []chan *proto.MPushMsgArg chan 中 广播:define.KAFKA_MESSAGE_BROADCAST: -> broadcast(m.Msg) 调用所有的 *Comet -> c.Broadcast(&args) 将消息推到 broadcastRoutines []chan *proto.BoardcastArg chan 中 房播:define.KAFKA_MESSAGE_BROADCAST_ROOM: -> roomBucket.Get(int32(m.RoomId)) 获取房间信息 -> NewRoom(roomId, b.round.Timer(b.roomNum), b.options) -> go r.pushproc(t, options.BatchNum, options.SignalTime, options.IdleTime) -> broadcastRoomBytes(r.id, buf.Buffer()) 根据roomid获取 *Comet -> c.BroadcastRoom(&args) 将消息推到 roomRoutines []chan *proto.BoardcastRoomArg chan 中(根据消息总数与options.RoutineSize取模) -> room.Push(0, define.OP_SEND_SMS_REPLY, m.Msg) 将消息保存到room.proto chan *proto.Proto chan 中 main.InitComet(addrs map[int32]string, options CometOptions) 初始化 comet -> go c.process(pushChan, roomChan, broadcastChan) 通过 rpc 调用 comet,作为pushChan,roomChan,broadcastChan信道的消费者 ~~~ **job模块消费kafka逻辑流程图** ![](https://box.kancloud.cn/d9f0e816b12f98f77f7b82684749f508_1431x1368.png) **logic 模块 push http推送接口逻辑** ~~~ http接口-->kafka生产者逻辑 单人推送:push -> r.URL.Query().Get("uid") 获取userId -> ioutil.ReadAll(r.Body) 获取body -> genSubKey(userId) 通过用户ID生成subKeys;同一个用户可以同时多处登陆或者同处多实例登陆,它们都会被同等对待 -> rpc 到 router 模块 RouterRPC.Get 获取指定用户的session信息 -> mpushKafka 根据serverId、subkeys、Msg ,push 到 kafka -> OP: define.KAFKA_MESSAGE_MULTI 单播 -> producer.Input() 多人推送:Pushs -> ioutil.ReadAll(r.Body) 获取body -> parsePushsBody(body []byte) 将body转成userIds、Msg -> genSubKeys(userIds) 并行获取多个用户信息,返回值为map[comet.serverId][]subkey. -> mpushKafka 根据serverId、subkeys、Msg ,push 到 kafka -> OP: define.KAFKA_MESSAGE_MULTI 单播 -> producer.Input() 房间推送:PushRoom -> ioutil.ReadAll(r.Body) 获取body -> param.Get("rid") 获取roomId -> strconv.ParseBool(param.Get("ensure")) 获取ensure,是否强推 -> broadcastRoomKafka 根据roomId、Msg、ensure ,push 到 kafka -> OP: define.KAFKA_MESSAGE_BROADCAST_ROOM 房间内广播 -> producer.Input() 广播:PushAll -> ioutil.ReadAll(r.Body) 获取body -> mpushKafka 根据Msg ,push 到 kafka -> OP: define.KAFKA_MESSAGE_BROADCAST 广播 -> producer.Input() ~~~ **logic 模块 push http推送接口逻辑流程图** ![](https://box.kancloud.cn/d804684a24b03da7b272797239c5cee0_1512x1341.png)