💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
## 5) tcp链接与Message消息封装 ​ 好了,现在我们来将服务器的连接做一个简单的封装,在这之前,我们要将我我们所发的数据做一个规定,采用TLV的格式,来进行封装。目的是解决TCP传输的粘包问题。 ### 5.1 Message消息封装 ![](https://img.kancloud.cn/06/33/06336f097f9db897457e35f1b0399533_1024x768.jpeg) ​ 先创建一个message.h头文件 > lars_reactor/include/message.h ```h #pragma once //解决tcp粘包问题的消息头 struct msg_head { int msgid; int msglen; }; //消息头的二进制长度,固定数 #define MESSAGE_HEAD_LEN 8 //消息头+消息体的最大长度限制 #define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN) ``` ​ 接下来我们每次在server和 client之间传递数据的时候,都发送这种数据格式的头再加上后面的数据内容即可。 ### 5.2 创建一个tcp_conn连接类 > lars_reactor/include/tcp_conn.h ```h #pragma once #include "reactor_buf.h" #include "event_loop.h" //一个tcp的连接信息 class tcp_conn { public: //初始化tcp_conn tcp_conn(int connfd, event_loop *loop); //处理读业务 void do_read(); //处理写业务 void do_write(); //销毁tcp_conn void clean_conn(); //发送消息的方法 int send_message(const char *data, int msglen, int msgid); private: //当前链接的fd int _connfd; //该连接归属的event_poll event_loop *_loop; //输出buf output_buf obuf; //输入buf input_buf ibuf; }; ``` 简单说明一下里面的成员和方法: **成员**: `_connfd`:server刚刚accept成功的套接字 `_loop`:当前链接所绑定的事件触发句柄. `obuf`:链接输出缓冲,向对端写数据 `ibuf`:链接输入缓冲,从对端读数据 **方法**: `tcp_client()`:构造,主要在里面实现初始化及创建链接链接的connect过程。 `do_read()`:读数据处理业务,主要是EPOLLIN事件触发。 `do_write()`:写数据处理业务,主要是EPOLLOUT事件触发。 `clean_conn()`:清空链接资源。 `send_message()`:将消息打包成TLV格式发送给对端。 ​ 接下来,实现以下`tcp_conn`类. > lars_reactor/src/tcp_conn.cpp ```c #include <unistd.h> #include <fcntl.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <netinet/tcp.h> #include <string.h> #include "tcp_conn.h" #include "message.h" //回显业务 void callback_busi(const char *data, uint32_t len, int msgid, void *args, tcp_conn *conn) { conn->send_message(data, len, msgid); } //连接的读事件回调 static void conn_rd_callback(event_loop *loop, int fd, void *args) { tcp_conn *conn = (tcp_conn*)args; conn->do_read(); } //连接的写事件回调 static void conn_wt_callback(event_loop *loop, int fd, void *args) { tcp_conn *conn = (tcp_conn*)args; conn->do_write(); } //初始化tcp_conn tcp_conn::tcp_conn(int connfd, event_loop *loop) { _connfd = connfd; _loop = loop; //1. 将connfd设置成非阻塞状态 int flag = fcntl(_connfd, F_GETFL, 0); fcntl(_connfd, F_SETFL, O_NONBLOCK|flag); //2. 设置TCP_NODELAY禁止做读写缓存,降低小包延迟 int op = 1; setsockopt(_connfd, IPPROTO_TCP, TCP_NODELAY, &op, sizeof(op));//need netinet/in.h netinet/tcp.h //3. 将该链接的读事件让event_loop监控 _loop->add_io_event(_connfd, conn_rd_callback, EPOLLIN, this); //4 将该链接集成到对应的tcp_server中 //TODO } //处理读业务 void tcp_conn::do_read() { //1. 从套接字读取数据 int ret = ibuf.read_data(_connfd); if (ret == -1) { fprintf(stderr, "read data from socket\n"); this->clean_conn(); return ; } else if ( ret == 0) { //对端正常关闭 printf("connection closed by peer\n"); clean_conn(); return ; } //2. 解析msg_head数据 msg_head head; //[这里用while,可能一次性读取多个完整包过来] while (ibuf.length() >= MESSAGE_HEAD_LEN) { //2.1 读取msg_head头部,固定长度MESSAGE_HEAD_LEN memcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN); if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) { fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen); this->clean_conn(); break; } if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) { //缓存buf中剩余的数据,小于实际上应该接受的数据 //说明是一个不完整的包,应该抛弃 break; } //2.2 再根据头长度读取数据体,然后针对数据体处理 业务 //TODO 添加包路由模式 //头部处理完了,往后偏移MESSAGE_HEAD_LEN长度 ibuf.pop(MESSAGE_HEAD_LEN); //处理ibuf.data()业务数据 printf("read data: %s\n", ibuf.data()); //回显业务 callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this); //消息体处理完了,往后便宜msglen长度 ibuf.pop(head.msglen); } ibuf.adjust(); return ; } //处理写业务 void tcp_conn::do_write() { //do_write是触发玩event事件要处理的事情, //应该是直接将out_buf力度数据io写会对方客户端 //而不是在这里组装一个message再发 //组装message的过程应该是主动调用 //只要obuf中有数据就写 while (obuf.length()) { int ret = obuf.write2fd(_connfd); if (ret == -1) { fprintf(stderr, "write2fd error, close conn!\n"); this->clean_conn(); return ; } if (ret == 0) { //不是错误,仅返回0表示不可继续写 break; } } if (obuf.length() == 0) { //数据已经全部写完,将_connfd的写事件取消掉 _loop->del_io_event(_connfd, EPOLLOUT); } return ; } //发送消息的方法 int tcp_conn::send_message(const char *data, int msglen, int msgid) { printf("server send_message: %s:%d, msgid = %d\n", data, msglen, msgid); bool active_epollout = false; if(obuf.length() == 0) { //如果现在已经数据都发送完了,那么是一定要激活写事件的 //如果有数据,说明数据还没有完全写完到对端,那么没必要再激活等写完再激活 active_epollout = true; } //1 先封装message消息头 msg_head head; head.msgid = msgid; head.msglen = msglen; //1.1 写消息头 int ret = obuf.send_data((const char *)&head, MESSAGE_HEAD_LEN); if (ret != 0) { fprintf(stderr, "send head error\n"); return -1; } //1.2 写消息体 ret = obuf.send_data(data, msglen); if (ret != 0) { //如果写消息体失败,那就回滚将消息头的发送也取消 obuf.pop(MESSAGE_HEAD_LEN); return -1; } if (active_epollout == true) { //2. 激活EPOLLOUT写事件 _loop->add_io_event(_connfd, conn_wt_callback, EPOLLOUT, this); } return 0; } //销毁tcp_conn void tcp_conn::clean_conn() { //链接清理工作 //1 将该链接从tcp_server摘除掉 //TODO //2 将该链接从event_loop中摘除 _loop->del_io_event(_connfd); //3 buf清空 ibuf.clear(); obuf.clear(); //4 关闭原始套接字 int fd = _connfd; _connfd = -1; close(fd); } ``` ​ 具体每个方法的实现,都很清晰。其中`conn_rd_callback()`和`conn_wt_callback()`是注册读写事件的回调函数,设置为static是因为函数类型没有this指针。在里面分别再调用`do_read()`和`do_write()`方法。 ### 5.3 修正tcp_server对accept之后的处理方法 > lars_reactor/src/tcp_server.cpp ```c //... //开始提供创建链接服务 void tcp_server::do_accept() { int connfd; while(true) { //accept与客户端创建链接 printf("begin accept\n"); connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen); if (connfd == -1) { if (errno == EINTR) { fprintf(stderr, "accept errno=EINTR\n"); continue; } else if (errno == EMFILE) { //建立链接过多,资源不够 fprintf(stderr, "accept errno=EMFILE\n"); } else if (errno == EAGAIN) { fprintf(stderr, "accept errno=EAGAIN\n"); break; } else { fprintf(stderr, "accept error"); exit(1); } } else { //accept succ! // ============= 将之前的触发回调的删掉,改成如下==== tcp_conn *conn = new tcp_conn(connfd, _loop); if (conn == NULL) { fprintf(stderr, "new tcp_conn error\n"); exit(1); } // ============================================ printf("get new connection succ!\n"); break; } } } //... ``` ​ 这样,每次accept成功之后,创建一个与当前客户端套接字绑定的tcp_conn对象。在构造里就完成了基本的对于EPOLLIN事件的监听和回调动作. ​ 现在可以先编译一下,保证没有语法错误,但是如果想测试,就不能够使用`nc`指令测试了,因为现在服务端只能够接收我们自定义的TLV格式的报文。那么我们需要自己写一个客户端来完成基本的测试。 --- ### 关于作者: 作者:`Aceld(刘丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原创声明:未经作者允许请勿转载, 如果转载请注明出处**