用户连接流程:
客户端:
![](https://box.kancloud.cn/7b6831b06ba6a50edc74f63f9d692300_1078x858.png)
~~~
***获取节点 getScript() http://localhost:8090/1/server/get?k=Terry-Mao&p=2
[先根据用户的key获取连接节点,然后再根据对应的节点创建长连接(参数p=1 websocket,p=2 tcp)]
创建websocket连接
获取离线消息
建立心跳任务
初始化完成(等待接收在线消息 or 心跳应答)
~~~
服务端:
~~~
建立连接:
SubscribeHandle() 监听websocket请求
addr := ws.Request().RemoteAddr 获取用户ip地址
params := ws.Request().URL.Query() 获取url参数 key、heartbeat、token、version
UserChannel.Get(key, true) 在ChannelList上添加一个Channel,并返回这个用户的 Channel
c.AuthToken(key, token) token验证(默认不验证)
c.AddConn(key, &Connection{Conn: ws, Proto: WebsocketProto, Version: version})创建一个用户连接,
1.判断是否超过最大连接数
2.用户链接应答
3.conn.HandleWrite(key) 开启 goroutine 从 Connection.Buf chan []byte 中读取消息,推送给用户
4.c.conn.PushFront(conn) root *Element 上添加一个 *Element
5.ConnStat.IncrAdd() 总连接数+1
for{} 阻塞等待心跳->心跳应答
获取离线消息:
GetOfflineMsg() 获取离线消息
r.Method != "GET" 判断请求方式
params := r.URL.Query() 获取url参数 key、msgId、callback
myrpc.MessageRPC.Get() 随机获取一个rpc连接
client.Call(myrpc.MessageServiceGetPrivate, args, reply) rpc 到message GetPrivate()方法
UseStorage.GetPrivate(m.Key, m.MsgId) 根据key和msgId从redis or mysql中获取消息
redis:
1.conn := s.getConn(key) 根据key从连接池获取一个redis连接(hash算法)
2.redis.Values(conn.Do("ZRANGEBYSCORE", key, fmt.Sprintf("(%d", mid), "+inf", "WITHSCORES")) 返回所有符合条件 mid < msgId <= +inf(最大值)) 的成员及成员的 message
3.redis.Scan(values, &b, &cmid) 遍历消息
4.json.Unmarshal(b, rm) 消息反序列化
5.rm.Expire < now 判断消息是否过期
6.s.delCH <- &RedisDelMessage{Key: key, MIds: delMsgs}: 删除unmarshal失败的消息和过期消息(clean() 方法)
mysql:
1.s.getConn(key) 获取mysql连接
2.db.Query(getPrivateMsgSQL, key, mid) 查询skey=key and mid>msgId 的消息
"SELECT mid, ttl, msg FROM private_msg WHERE skey=? AND mid>? ORDER BY mid"
3.for rows.Next() 遍历消息
4.now > expire 判断消息是否过期
~~~
推送单个私信流程:
![](https://box.kancloud.cn/c0318cdbc8d3deb6b45f24f1aec906f1_1046x1070.png)
~~~
PushPrivate() 推送单个私信
r.Method != "POST" 判断是否是post请求
ioutil.ReadAll(r.Body) 读取请求内容
params := r.URL.Query() 获取url参数 key、expire
node := myrpc.GetComet(key) 根据key获取comet节点 ???[判断key连接的comet]
client := node.Rpc.Get() 随机获取一个rpc连接
client.Call(myrpc.CometServicePushPrivate, args, &ret) rpc 调用 comet 推送私信
PushPrivate()
UserChannel.New(args.Key) 获取用户channel
ch.PushMsg(args.Key, m, args.Expire) 推送私信
1.client := myrpc.MessageRPC.Get() 随机获取一个RPC连接
2.m.MsgId = id.Get() 生成msgId (时间戳/100)
3.m.GroupId != myrpc.PublicGroupId && expire > 0 判断是是否需要message保存(私信+过期时间>0)
4.client.Call(myrpc.MessageServiceSavePrivate, args, &ret) rpc 调用 message 模块,保存消息
SavePrivate() 保存私信
UseStorage.SavePrivate(m.Key, m.Msg, m.MsgId, m.Expire)
redis:
1.conn := s.getConn(key) 根据key 通过hash算法从连接池获取一个连接
2.conn.Send("ZADD", key, mid, m) 操作写入缓冲区
3.conn.Send("ZREMRANGEBYRANK", key, 0, -1*(Conf.RedisMaxStore+1))
// conn.Send("ZREMRANGEBYRANK", key, 0, -21) 有序集只剩下最后写入的20个成员
4.conn.Flush() 提交操作
5.conn.Receive() 接受redis应答
mysql:
1.db := s.getConn(key) 根据key 通过hash算法从连接池获取一个连接
2.b.Exec(savePrivateMsgSQL, key, mid, now.Unix()+int64(expire), []byte(msg), now, now) 存mysql
5. c.writeMsg(key, m) 推送在线消息
~~~
批量推送私信流程:
![](https://box.kancloud.cn/f5a22da3945d948ef58e6c43fe3ba2df_1072x1040.png)
~~~
PushMultiPrivate() 批量推送私信
r.Method != "POST" 判断是否是post请求
ioutil.ReadAll(r.Body) 读取请求内容
parseMultiPrivate(bodyBytes) 获取 keys 、 message 和 ret(错误码)
根据key获取node,通过node获取rpc 链接
rpc -> comet
PushPrivates() 向多个key推送私信
UserChannel.New(key) 根据key获取一个channel 和 ChannelBucket
并保存到 bucketMap
遍历 bucketMap 开启 goroutine 存储消息 和 推送消息
1.获取请求消息
2.根据请求消息解析出keys和message
3.根据key匹配节点,存储到 map[node]keys
4.rpc 调用用节点的 PushPrivates() 方法
5.判断 key 所在的bucketChannel 返回 map[*ChannelBucket]*batchChannel
6.遍历 ChannelBucket 给keys 发消息,并rpc 调用 Message模块 存储消息
~~~
writeMsg:
1.调用每一个 用户 Element 元素
2.将消息写入Connection.Buf chan []byte
3.HandleWrite() 消费 Buf chan 将消息推给用户
MsgId:
通过当前时间的纳秒时间戳(除) / 100 获得
存储时随存储到redis中,用于在用户获取离线消息时,匹配 MsgId 大于 用户所传参的值
- 序言
- 目录
- 环境搭建
- Linux搭建golang环境
- Windows搭建golang环境
- Mac搭建golang环境
- 介绍
- 1.Go语言的主要特征
- 2.golang内置类型和函数
- 3.init函数和main函数
- 4.包
- 1.工作空间
- 2.源文件
- 3.包结构
- 4.文档
- 5.编写 Hello World
- 6.Go语言 “ _ ”(下划线)
- 7.运算符
- 8.命令
- 类型
- 1.变量
- 2.常量
- 3.基本类型
- 1.基本类型介绍
- 2.字符串String
- 3.数组Array
- 4.类型转换
- 4.引用类型
- 1.引用类型介绍
- 2.切片Slice
- 3.容器Map
- 4.管道Channel
- 5.指针
- 6.自定义类型Struct
- 编码格式转换
- 流程控制
- 1.条件语句(if)
- 2.条件语句 (switch)
- 3.条件语句 (select)
- 4.循环语句 (for)
- 5.循环语句 (range)
- 6.循环控制Goto、Break、Continue
- 函数
- 1.函数定义
- 2.参数
- 3.返回值
- 4.匿名函数
- 5.闭包、递归
- 6.延迟调用 (defer)
- 7.异常处理
- 8.单元测试
- 压力测试
- 方法
- 1.方法定义
- 2.匿名字段
- 3.方法集
- 4.表达式
- 5.自定义error
- 接口
- 1.接口定义
- 2.执行机制
- 3.接口转换
- 4.接口技巧
- 面向对象特性
- 并发
- 1.并发介绍
- 2.Goroutine
- 3.Chan
- 4.WaitGroup
- 5.Context
- 应用
- 反射reflection
- 1.获取基本类型
- 2.获取结构体
- 3.Elem反射操作基本类型
- 4.反射调用结构体方法
- 5.Elem反射操作结构体
- 6.Elem反射获取tag
- 7.应用
- json协议
- 1.结构体转json
- 2.map转json
- 3.int转json
- 4.slice转json
- 5.json反序列化为结构体
- 6.json反序列化为map
- 终端读取
- 1.键盘(控制台)输入fmt
- 2.命令行参数os.Args
- 3.命令行参数flag
- 文件操作
- 1.文件创建
- 2.文件写入
- 3.文件读取
- 4.文件删除
- 5.压缩文件读写
- 6.判断文件或文件夹是否存在
- 7.从一个文件拷贝到另一个文件
- 8.写入内容到Excel
- 9.日志(log)文件
- server服务
- 1.服务端
- 2.客户端
- 3.tcp获取网页数据
- 4.http初识-浏览器访问服务器
- 5.客户端访问服务器
- 6.访问延迟处理
- 7.form表单提交
- web模板
- 1.渲染终端
- 2.渲染浏览器
- 3.渲染存储文件
- 4.自定义io.Writer渲染
- 5.模板语法
- 时间处理
- 1.格式化
- 2.运行时间
- 3.定时器
- 锁机制
- 互斥锁
- 读写锁
- 性能比较
- sync.Map
- 原子操作
- 1.原子增(减)值
- 2.比较并交换
- 3.导入、导出、交换
- 加密解密
- 1.md5
- 2.base64
- 3.sha
- 4.hmac
- 常用算法
- 1.冒泡排序
- 2.选择排序
- 3.快速排序
- 4.插入排序
- 5.睡眠排序
- 设计模式
- 创建型模式
- 单例模式
- 抽象工厂模式
- 工厂方法模式
- 原型模式
- 结构型模式
- 适配器模式
- 桥接模式
- 合成/组合模式
- 装饰模式
- 外观模式
- 享元模式
- 代理模式
- 行为性模式
- 职责链模式
- 命令模式
- 解释器模式
- 迭代器模式
- 中介者模式
- 备忘录模式
- 观察者模式
- 状态模式
- 策略模式
- 模板模式
- 访问者模式
- 数据库操作
- golang操作MySQL
- 1.mysql使用
- 2.insert操作
- 3.select 操作
- 4.update 操作
- 5.delete 操作
- 6.MySQL事务
- golang操作Redis
- 1.redis介绍
- 2.golang链接redis
- 3.String类型 Set、Get操作
- 4.String 批量操作
- 5.设置过期时间
- 6.list队列操作
- 7.Hash表
- 8.Redis连接池
- golang操作ETCD
- 1.etcd介绍
- 2.链接etcd
- 3.etcd存取
- 4.etcd监听Watch
- golang操作kafka
- 1.kafka介绍
- 2.写入kafka
- 3.kafka消费
- golang操作ElasticSearch
- 1.ElasticSearch介绍
- 2.kibana介绍
- 3.写入ElasticSearch
- NSQ
- 安装
- 生产者
- 消费者
- beego框架
- 1.beego框架环境搭建
- 2.参数配置
- 1.默认参数
- 2.自定义配置
- 3.config包使用
- 3.路由设置
- 1.自动匹配
- 2.固定路由
- 3.正则路由
- 4.注解路由
- 5.namespace
- 4.多种数据格式输出
- 1.直接输出字符串
- 2.模板数据输出
- 3.json格式数据输出
- 4.xml格式数据输出
- 5.jsonp调用
- 5.模板处理
- 1.模板语法
- 2.基本函数
- 3.模板函数
- 6.请求处理
- 1.GET请求
- 2.POST请求
- 3.文件上传
- 7.表单验证
- 1.表单验证
- 2.定制错误信息
- 3.struct tag 验证
- 4.XSRF过滤
- 8.静态文件处理
- 1.layout设计
- 9.日志处理
- 1.日志处理
- 2.logs 模块
- 10.会话控制
- 1.会话控制
- 2.session 包使用
- 11.ORM 使用
- 1.链接数据库
- 2. CRUD 操作
- 3.原生 SQL 操作
- 4.构造查询
- 5.事务处理
- 6.自动建表
- 12.beego 验证码
- 1.验证码插件
- 2.验证码使用
- beego admin
- 1.admin安装
- 2.admin开发
- beego 热升级
- gin框架
- 安装使用
- 项目
- 秒杀项目
- 日志收集
- 面试题
- 面试题一
- 面试题二
- 错题集
- Go语言陷阱和常见错误
- 常见语法错误
- 初级
- 中级
- 高级
- Go高级应用
- goim
- goim 启动流程
- goim 工作流程
- goim 结构体
- gopush
- gopush工作流程
- gopush启动流程
- gopush业务流程
- gopush应用
- gopush新添功能
- rpc
- HTTP RPC
- TCP RPC
- JSON RPC
- 常见RPC开源框架
- pprof
- pprof介绍
- pprof应用
- 封装 websocket
- zookeeper
- 基本操作测试
- 简单的分布式server
- Zookeeper命令行使用
- cgo
- Go语言 demo
- 用Go语言计算一个人的年龄,生肖,星座
- 超简易Go语言实现的留言板代码
- 信号处理模块,可用于在线加载配置,配置动态加载的信号为SIGHUP
- 阳历和阴历相互转化的工具类 golang版本
- 错误总结