## 5) tcp链接与Message消息封装
好了,现在我们来将服务器的连接做一个简单的封装,在这之前,我们要将我我们所发的数据做一个规定,采用TLV的格式,来进行封装。目的是解决TCP传输的粘包问题。
### 5.1 Message消息封装
![](https://img.kancloud.cn/06/33/06336f097f9db897457e35f1b0399533_1024x768.jpeg)
先创建一个message.h头文件
> lars_reactor/include/message.h
```h
#pragma once
//解决tcp粘包问题的消息头
struct msg_head
{
int msgid;
int msglen;
};
//消息头的二进制长度,固定数
#define MESSAGE_HEAD_LEN 8
//消息头+消息体的最大长度限制
#define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN)
```
接下来我们每次在server和 client之间传递数据的时候,都发送这种数据格式的头再加上后面的数据内容即可。
### 5.2 创建一个tcp_conn连接类
> lars_reactor/include/tcp_conn.h
```h
#pragma once
#include "reactor_buf.h"
#include "event_loop.h"
//一个tcp的连接信息
class tcp_conn
{
public:
//初始化tcp_conn
tcp_conn(int connfd, event_loop *loop);
//处理读业务
void do_read();
//处理写业务
void do_write();
//销毁tcp_conn
void clean_conn();
//发送消息的方法
int send_message(const char *data, int msglen, int msgid);
private:
//当前链接的fd
int _connfd;
//该连接归属的event_poll
event_loop *_loop;
//输出buf
output_buf obuf;
//输入buf
input_buf ibuf;
};
```
简单说明一下里面的成员和方法:
**成员**:
`_connfd`:server刚刚accept成功的套接字
`_loop`:当前链接所绑定的事件触发句柄.
`obuf`:链接输出缓冲,向对端写数据
`ibuf`:链接输入缓冲,从对端读数据
**方法**:
`tcp_client()`:构造,主要在里面实现初始化及创建链接链接的connect过程。
`do_read()`:读数据处理业务,主要是EPOLLIN事件触发。
`do_write()`:写数据处理业务,主要是EPOLLOUT事件触发。
`clean_conn()`:清空链接资源。
`send_message()`:将消息打包成TLV格式发送给对端。
接下来,实现以下`tcp_conn`类.
> lars_reactor/src/tcp_conn.cpp
```c
#include <unistd.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <string.h>
#include "tcp_conn.h"
#include "message.h"
//回显业务
void callback_busi(const char *data, uint32_t len, int msgid, void *args, tcp_conn *conn)
{
conn->send_message(data, len, msgid);
}
//连接的读事件回调
static void conn_rd_callback(event_loop *loop, int fd, void *args)
{
tcp_conn *conn = (tcp_conn*)args;
conn->do_read();
}
//连接的写事件回调
static void conn_wt_callback(event_loop *loop, int fd, void *args)
{
tcp_conn *conn = (tcp_conn*)args;
conn->do_write();
}
//初始化tcp_conn
tcp_conn::tcp_conn(int connfd, event_loop *loop)
{
_connfd = connfd;
_loop = loop;
//1. 将connfd设置成非阻塞状态
int flag = fcntl(_connfd, F_GETFL, 0);
fcntl(_connfd, F_SETFL, O_NONBLOCK|flag);
//2. 设置TCP_NODELAY禁止做读写缓存,降低小包延迟
int op = 1;
setsockopt(_connfd, IPPROTO_TCP, TCP_NODELAY, &op, sizeof(op));//need netinet/in.h netinet/tcp.h
//3. 将该链接的读事件让event_loop监控
_loop->add_io_event(_connfd, conn_rd_callback, EPOLLIN, this);
//4 将该链接集成到对应的tcp_server中
//TODO
}
//处理读业务
void tcp_conn::do_read()
{
//1. 从套接字读取数据
int ret = ibuf.read_data(_connfd);
if (ret == -1) {
fprintf(stderr, "read data from socket\n");
this->clean_conn();
return ;
}
else if ( ret == 0) {
//对端正常关闭
printf("connection closed by peer\n");
clean_conn();
return ;
}
//2. 解析msg_head数据
msg_head head;
//[这里用while,可能一次性读取多个完整包过来]
while (ibuf.length() >= MESSAGE_HEAD_LEN) {
//2.1 读取msg_head头部,固定长度MESSAGE_HEAD_LEN
memcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN);
if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) {
fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen);
this->clean_conn();
break;
}
if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) {
//缓存buf中剩余的数据,小于实际上应该接受的数据
//说明是一个不完整的包,应该抛弃
break;
}
//2.2 再根据头长度读取数据体,然后针对数据体处理 业务
//TODO 添加包路由模式
//头部处理完了,往后偏移MESSAGE_HEAD_LEN长度
ibuf.pop(MESSAGE_HEAD_LEN);
//处理ibuf.data()业务数据
printf("read data: %s\n", ibuf.data());
//回显业务
callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this);
//消息体处理完了,往后便宜msglen长度
ibuf.pop(head.msglen);
}
ibuf.adjust();
return ;
}
//处理写业务
void tcp_conn::do_write()
{
//do_write是触发玩event事件要处理的事情,
//应该是直接将out_buf力度数据io写会对方客户端
//而不是在这里组装一个message再发
//组装message的过程应该是主动调用
//只要obuf中有数据就写
while (obuf.length()) {
int ret = obuf.write2fd(_connfd);
if (ret == -1) {
fprintf(stderr, "write2fd error, close conn!\n");
this->clean_conn();
return ;
}
if (ret == 0) {
//不是错误,仅返回0表示不可继续写
break;
}
}
if (obuf.length() == 0) {
//数据已经全部写完,将_connfd的写事件取消掉
_loop->del_io_event(_connfd, EPOLLOUT);
}
return ;
}
//发送消息的方法
int tcp_conn::send_message(const char *data, int msglen, int msgid)
{
printf("server send_message: %s:%d, msgid = %d\n", data, msglen, msgid);
bool active_epollout = false;
if(obuf.length() == 0) {
//如果现在已经数据都发送完了,那么是一定要激活写事件的
//如果有数据,说明数据还没有完全写完到对端,那么没必要再激活等写完再激活
active_epollout = true;
}
//1 先封装message消息头
msg_head head;
head.msgid = msgid;
head.msglen = msglen;
//1.1 写消息头
int ret = obuf.send_data((const char *)&head, MESSAGE_HEAD_LEN);
if (ret != 0) {
fprintf(stderr, "send head error\n");
return -1;
}
//1.2 写消息体
ret = obuf.send_data(data, msglen);
if (ret != 0) {
//如果写消息体失败,那就回滚将消息头的发送也取消
obuf.pop(MESSAGE_HEAD_LEN);
return -1;
}
if (active_epollout == true) {
//2. 激活EPOLLOUT写事件
_loop->add_io_event(_connfd, conn_wt_callback, EPOLLOUT, this);
}
return 0;
}
//销毁tcp_conn
void tcp_conn::clean_conn()
{
//链接清理工作
//1 将该链接从tcp_server摘除掉
//TODO
//2 将该链接从event_loop中摘除
_loop->del_io_event(_connfd);
//3 buf清空
ibuf.clear();
obuf.clear();
//4 关闭原始套接字
int fd = _connfd;
_connfd = -1;
close(fd);
}
```
具体每个方法的实现,都很清晰。其中`conn_rd_callback()`和`conn_wt_callback()`是注册读写事件的回调函数,设置为static是因为函数类型没有this指针。在里面分别再调用`do_read()`和`do_write()`方法。
### 5.3 修正tcp_server对accept之后的处理方法
> lars_reactor/src/tcp_server.cpp
```c
//...
//开始提供创建链接服务
void tcp_server::do_accept()
{
int connfd;
while(true) {
//accept与客户端创建链接
printf("begin accept\n");
connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
if (connfd == -1) {
if (errno == EINTR) {
fprintf(stderr, "accept errno=EINTR\n");
continue;
}
else if (errno == EMFILE) {
//建立链接过多,资源不够
fprintf(stderr, "accept errno=EMFILE\n");
}
else if (errno == EAGAIN) {
fprintf(stderr, "accept errno=EAGAIN\n");
break;
}
else {
fprintf(stderr, "accept error");
exit(1);
}
}
else {
//accept succ!
// ============= 将之前的触发回调的删掉,改成如下====
tcp_conn *conn = new tcp_conn(connfd, _loop);
if (conn == NULL) {
fprintf(stderr, "new tcp_conn error\n");
exit(1);
}
// ============================================
printf("get new connection succ!\n");
break;
}
}
}
//...
```
这样,每次accept成功之后,创建一个与当前客户端套接字绑定的tcp_conn对象。在构造里就完成了基本的对于EPOLLIN事件的监听和回调动作.
现在可以先编译一下,保证没有语法错误,但是如果想测试,就不能够使用`nc`指令测试了,因为现在服务端只能够接收我们自定义的TLV格式的报文。那么我们需要自己写一个客户端来完成基本的测试。
---
### 关于作者:
作者:`Aceld(刘丹冰)`
mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
github: [https://github.com/aceld](https://github.com/aceld)
原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld)
![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg)
>**原创声明:未经作者允许请勿转载, 如果转载请注明出处**
- 一、Lars系统概述
- 第1章-概述
- 第2章-项目目录构建
- 二、Reactor模型服务器框架
- 第1章-项目结构与V0.1雏形
- 第2章-内存管理与Buffer封装
- 第3章-事件触发EventLoop
- 第4章-链接与消息封装
- 第5章-Client客户端模型
- 第6章-连接管理及限制
- 第7章-消息业务路由分发机制
- 第8章-链接创建/销毁Hook机制
- 第9章-消息任务队列与线程池
- 第10章-配置文件读写功能
- 第11章-udp服务与客户端
- 第12章-数据传输协议protocol buffer
- 第13章-QPS性能测试
- 第14章-异步消息任务机制
- 第15章-链接属性设置功能
- 三、Lars系统之DNSService
- 第1章-Lars-dns简介
- 第2章-数据库创建
- 第3章-项目目录结构及环境构建
- 第4章-Route结构的定义
- 第5章-获取Route信息
- 第6章-Route订阅模式
- 第7章-Backend Thread实时监控
- 四、Lars系统之Report Service
- 第1章-项目概述-数据表及proto3协议定义
- 第2章-获取report上报数据
- 第3章-存储线程池及消息队列
- 五、Lars系统之LoadBalance Agent
- 第1章-项目概述及构建
- 第2章-主模块业务结构搭建
- 第3章-Report与Dns Client设计与实现
- 第4章-负载均衡模块基础设计
- 第5章-负载均衡获取Host主机信息API
- 第6章-负载均衡上报Host主机信息API
- 第7章-过期窗口清理与过载超时(V0.5)
- 第8章-定期拉取最新路由信息(V0.6)
- 第9章-负载均衡获取Route信息API(0.7)
- 第10章-API初始化接口(V0.8)
- 第11章-Lars Agent性能测试工具
- 第12章- Lars启动工具脚本