企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 6) tcp客户端触发模型 ​ 我们可以给客户端添加触发模型。同时也提供一系列的接口供开发者写客户端应用程序来使用。 ### 6.1 tcp_client类设计 > lars_reactor/include/tcp_client.h ```c #pragma once #include "io_buf.h" #include "event_loop.h" #include "message.h" #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> class tcp_client { public: //初始化客户端套接字 tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name); //发送message方法 int send_message(const char *data, int msglen, int msgid); //创建链接 void do_connect(); //处理读业务 int do_read(); //处理写业务 int do_write(); //释放链接资源 void clean_conn(); ~tcp_client(); //设置业务处理回调函数 void set_msg_callback(msg_callback *msg_cb) { this->_msg_callback = msg_cb; } bool connected; //链接是否创建成功 //server端地址 struct sockaddr_in _server_addr; io_buf _obuf; io_buf _ibuf; private: int _sockfd; socklen_t _addrlen; //客户端的事件处理机制 event_loop* _loop; //当前客户端的名称 用户记录日志 const char *_name; msg_callback *_msg_callback; }; ``` ​ 这里注意的是,tcp_client并不是tcp_server的一部分,而是单纯为写客户端提供的接口。所以这里也需要实现一套对读写事件处理的业务。 这里使用的读写缓冲是原始的`io_buf`,并不是服务器封装好的`reactor_buf`原因是后者是转为server做了一层封装,io_buf的基本方法比较全。 **关键成员**: `_sockfd`:当前客户端套接字。 `_server_addr`: 链接的服务端的IP地址。 `_loop`: 客户端异步触发事件机制event_loop句柄。 `_msg_callback`: 当前客户端处理服务端的回调业务。 `connected`:是否已经成功connect服务端的标致。 **方法**: `tcp_client()`:构造函数,主要是在里面完成基本的套接字初始化及connect操作. `do_connect()`:创建链接 `do_read()`:处理链接的读业务。 `do_write()`:处理链接的写业务。 `clean_conn()`:清空链接资源。 ### 6.2 创建链接 > lars_reactor/src/tcp_client.cpp ```c tcp_client::tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name): _ibuf(4194304), _obuf(4194304) { _sockfd = -1; _msg_callback = NULL; _name = name; _loop = loop; bzero(&_server_addr, sizeof(_server_addr)); _server_addr.sin_family = AF_INET; inet_aton(ip, &_server_addr.sin_addr); _server_addr.sin_port = htons(port); _addrlen = sizeof(_server_addr); this->do_connect(); } ``` ​ 这里初始化tcp_client链接信息,然后调用`do_connect()`创建链接. > lars_reactor/src/tcp_client.cpp ```c //创建链接 void tcp_client::do_connect() { if (_sockfd != -1) { close(_sockfd); } //创建套接字 _sockfd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, IPPROTO_TCP); if (_sockfd == -1) { fprintf(stderr, "create tcp client socket error\n"); exit(1); } int ret = connect(_sockfd, (const struct sockaddr*)&_server_addr, _addrlen); if (ret == 0) { //链接创建成功 connected = true; //注册读回调 _loop->add_io_event(_sockfd, read_callback, EPOLLIN, this); //如果写缓冲去有数据,那么也需要触发写回调 if (this->_obuf.length != 0) { _loop->add_io_event(_sockfd, write_callback, EPOLLOUT, this); } printf("connect %s:%d succ!\n", inet_ntoa(_server_addr.sin_addr), ntohs(_server_addr.sin_port)); } else { if(errno == EINPROGRESS) { //fd是非阻塞的,可能会出现这个错误,但是并不表示链接创建失败 //如果fd是可写状态,则为链接是创建成功的. fprintf(stderr, "do_connect EINPROGRESS\n"); //让event_loop去触发一个创建判断链接业务 用EPOLLOUT事件立刻触发 _loop->add_io_event(_sockfd, connection_delay, EPOLLOUT, this); } else { fprintf(stderr, "connection error\n"); exit(1); } } } ``` ### 6.3 有关非阻塞客户端socket创建链接问题 ​ 这里转载一篇文章,是有关非阻塞套接字,connect返回-1,并且errno是`EINPROGRESS`的情况。因为我们的client是采用event_loop形式,socket需要被设置为非阻塞。所以需要针对这个情况做处理。下面是说明。 ​ 客户端测试程序时,由于出现很多客户端,经过connect成功后,代码卡在recv系统调用中,后来发现可能是由于socket默认是阻塞模式,所以会令很多客户端链接处于链接却不能传输数据状态。 ​ 后来修改socket为非阻塞模式,但在connect的时候,发现返回值为-1,刚开始以为是connect出现错误,但在服务器上看到了链接是ESTABLISED状态。证明链接是成功的 ​ 但为什么会出现返回值是-1呢? 经过查询资料,以及看stevens的APUE,也发现有这么一说。 ​ 当connect在非阻塞模式下,会出现返回`-1`值,错误码是`EINPROGRESS`,但如何判断connect是联通的呢?stevens书中说明要在connect后,继续判断该socket是否可写? ​ **若可写,则证明链接成功。** ​ 如何判断可写,有2种方案,一种是select判断是否可写,二用poll模型。 select: ```c int CheckConnect(int iSocket) { fd_set rset; FD_ZERO(&rset); FD_SET(iSocket, &rset); timeval tm; tm. tv_sec = 0; tm.tv_usec = 0; if ( select(iSocket + 1, NULL, &rset, NULL, &tval) <= 0) { close(iSocket); return -1; } if (FD_ISSET(iSocket, &rset)) { int err = -1; socklen_t len = sizeof(int); if ( getsockopt(iSocket, SOL_SOCKET, SO_ERROR ,&err, &len) < 0 ) { close(iSocket); printf("errno:%d %s\n", errno, strerror(errno)); return -2; } if (err) { errno = err; close(iSocket); return -3; } } return 0; } ``` poll: ```c int CheckConnect(int iSocket) { struct pollfd fd; int ret = 0; socklen_t len = 0; fd.fd = iSocket; fd.events = POLLOUT; while ( poll (&fd, 1, -1) == -1 ) { if( errno != EINTR ){ perror("poll"); return -1; } } len = sizeof(ret); if ( getsockopt (iSocket, SOL_SOCKET, SO_ERROR, &ret, &len) == -1 ) { perror("getsockopt"); return -1; } if(ret != 0) { fprintf (stderr, "socket %d connect failed: %s\n", iSocket, strerror (ret)); return -1; } return 0; } ``` ### 6.3 针对EINPROGRESS的连接创建处理 ​ 看上面`do_connect()`的代码其中一部分: ```c if(errno == EINPROGRESS) { //fd是非阻塞的,可能会出现这个错误,但是并不表示链接创建失败 //如果fd是可写状态,则为链接是创建成功的. fprintf(stderr, "do_connect EINPROGRESS\n"); //让event_loop去触发一个创建判断链接业务 用EPOLLOUT事件立刻触发 _loop->add_io_event(_sockfd, connection_delay, EPOLLOUT, this); } ``` 这里是又触发一个写事件,直接让程序流程跳转到`connection_delay()`方法.那么我们需要在里面判断链接是否已经判断成功,并且做出一定的创建成功之后的业务动作。 > lars_reactor/src/tcp_client.cpp ```c //判断链接是否是创建链接,主要是针对非阻塞socket 返回EINPROGRESS错误 static void connection_delay(event_loop *loop, int fd, void *args) { tcp_client *cli = (tcp_client*)args; loop->del_io_event(fd); int result = 0; socklen_t result_len = sizeof(result); getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len); if (result == 0) { //链接是建立成功的 cli->connected = true; printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port)); //建立连接成功之后,主动发送send_message const char *msg = "hello lars!"; int msgid = 1; cli->send_message(msg, strlen(msg), msgid); loop->add_io_event(fd, read_callback, EPOLLIN, cli); if (cli->_obuf.length != 0) { //输出缓冲有数据可写 loop->add_io_event(fd, write_callback, EPOLLOUT, cli); } } else { //链接创建失败 fprintf(stderr, "connection %s:%d error\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port)); } } ``` ​ 这是一个事件回调,所以用的是static方法而不是成员方法。首先是利用`getsockopt`判断链接是否创建成功,如果成功,那么 我们当前这个版本的客户端是直接写死主动调用`send_message()`方法发送给服务端一个`hello lars!`字符串。然后直接交给我们的`read_callback()`方法处理,当然如果写缓冲有数据,我们也会触发写的`write_callback()`方法。 ​ 接下来,看看这两个callback以及send_message是怎么实现的。 **callback** > lars_reactor/src/tcp_client.cpp ```c static void write_callback(event_loop *loop, int fd, void *args) { tcp_client *cli = (tcp_client *)args; cli->do_write(); } static void read_callback(event_loop *loop, int fd, void *args) { tcp_client *cli = (tcp_client *)args; cli->do_read(); } //处理读业务 int tcp_client::do_read() { //确定已经成功建立连接 assert(connected == true); // 1. 一次性全部读取出来 //得到缓冲区里有多少字节要被读取,然后将字节数放入b里面。 int need_read = 0; if (ioctl(_sockfd, FIONREAD, &need_read) == -1) { fprintf(stderr, "ioctl FIONREAD error"); return -1; } //确保_buf可以容纳可读数据 assert(need_read <= _ibuf.capacity - _ibuf.length); int ret; do { ret = read(_sockfd, _ibuf.data + _ibuf.length, need_read); } while(ret == -1 && errno == EINTR); if (ret == 0) { //对端关闭 if (_name != NULL) { printf("%s client: connection close by peer!\n", _name); } else { printf("client: connection close by peer!\n"); } clean_conn(); return -1; } else if (ret == -1) { fprintf(stderr, "client: do_read() , error\n"); clean_conn(); return -1; } assert(ret == need_read); _ibuf.length += ret; //2. 解包 msg_head head; int msgid, length; while (_ibuf.length >= MESSAGE_HEAD_LEN) { memcpy(&head, _ibuf.data + _ibuf.head, MESSAGE_HEAD_LEN); msgid = head.msgid; length = head.msglen; /* if (length + MESSAGE_HEAD_LEN < _ibuf.length) { break; } */ //头部读取完毕 _ibuf.pop(MESSAGE_HEAD_LEN); //3. 交给业务函数处理 if (_msg_callback != NULL) { this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL); } //数据区域处理完毕 _ibuf.pop(length); } //重置head指针 _ibuf.adjust(); return 0; } //处理写业务 int tcp_client::do_write() { //数据有长度,切头部索引是起始位置 assert(_obuf.head == 0 && _obuf.length); int ret; while (_obuf.length) { //写数据 do { ret = write(_sockfd, _obuf.data, _obuf.length); } while(ret == -1 && errno == EINTR);//非阻塞异常继续重写 if (ret > 0) { _obuf.pop(ret); _obuf.adjust(); } else if (ret == -1 && errno != EAGAIN) { fprintf(stderr, "tcp client write \n"); this->clean_conn(); } else { //出错,不能再继续写 break; } } if (_obuf.length == 0) { //已经写完,删除写事件 printf("do write over, del EPOLLOUT\n"); this->_loop->del_io_event(_sockfd, EPOLLOUT); } return 0; } //释放链接资源,重置连接 void tcp_client::clean_conn() { if (_sockfd != -1) { printf("clean conn, del socket!\n"); _loop->del_io_event(_sockfd); close(_sockfd); } connected = false; //重新连接 this->do_connect(); } tcp_client::~tcp_client() { close(_sockfd); } ``` ​ 这里是基本的读数据和写数据的处理业务实现。我们重点看`do_read()`方法,里面有段代码: ```c //3. 交给业务函数处理 if (_msg_callback != NULL) { this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL); } ``` ​ 是将我们从服务端读取到的代码,交给了`_msg_callback()`方法来处理的,这个实际上是用户开发者自己在业务上注册的回调业务函数。在tcp_client.h中我们已经提供了`set_msg_callback`暴露给开发者注册使用。 ------ **send_message** > lars_reactor/src/tcp_client.cpp ```c //主动发送message方法 int tcp_client::send_message(const char *data, int msglen, int msgid) { if (connected == false) { fprintf(stderr, "no connected , send message stop!\n"); return -1; } //是否需要添加写事件触发 //如果obuf中有数据,没必要添加,如果没有数据,添加完数据需要触发 bool need_add_event = (_obuf.length == 0) ? true:false; if (msglen + MESSAGE_HEAD_LEN > this->_obuf.capacity - _obuf.length) { fprintf(stderr, "No more space to Write socket!\n"); return -1; } //封装消息头 msg_head head; head.msgid = msgid; head.msglen = msglen; memcpy(_obuf.data + _obuf.length, &head, MESSAGE_HEAD_LEN); _obuf.length += MESSAGE_HEAD_LEN; memcpy(_obuf.data + _obuf.length, data, msglen); _obuf.length += msglen; if (need_add_event) { _loop->add_io_event(_sockfd, write_callback, EPOLLOUT, this); } return 0; } ``` ​ 将发送的数据写给obuf,然后出发write_callback将obuf的数据传递给对方服务端。 ### 6.4 完成Lars Reactor V0.4开发 ​ 好了,现在我们框架部分已经完成,接下来我们就要实现一个serverapp 和 一个clientapp来进行测试. 我们创建`example/lars_reactor_0.4`文件夹。 **Makefile** ```makefile CXX=g++ CFLAGS=-g -O2 -Wall -fPIC -Wno-deprecated INC=-I../../include LIB=-L../../lib -llreactor -lpthread OBJS = $(addsuffix .o, $(basename $(wildcard *.cc))) all: $(CXX) -o server $(CFLAGS) server.cpp $(INC) $(LIB) $(CXX) -o client $(CFLAGS) client.cpp $(INC) $(LIB) clean: -rm -f *.o server client ``` 服务端代码: > server.cpp ```c #include "tcp_server.h" int main() { event_loop loop; tcp_server server(&loop, "127.0.0.1", 7777); loop.event_process(); return 0; } ``` 客户端代码: > client.cpp ```c #include "tcp_client.h" #include <stdio.h> #include <string.h> //客户端业务 void busi(const char *data, uint32_t len, int msgid, tcp_client *conn, void *user_data) { //得到服务端回执的数据 printf("recv server: [%s]\n", data); printf("msgid: [%d]\n", msgid); printf("len: [%d]\n", len); } int main() { event_loop loop; //创建tcp客户端 tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.4"); //注册回调业务 client.set_msg_callback(busi); //开启事件监听 loop.event_process(); return 0; } ``` 编译并分别启动server 和client 服务端输出: ```bash $ ./server begin accept get new connection succ! read data: hello lars! server send_message: hello lars!:11, msgid = 1 ``` 客户端输出: ```bash $ ./client do_connect EINPROGRESS connect 127.0.0.1:7777 succ! do write over, del EPOLLOUT recv server: [hello lars!] msgid: [1] len: [11] ``` ​ 现在客户端已经成功的发送数据给服务端,并且回显的数据也直接被客户端解析,我们的框架到现在就可以做一个基本的客户端和服务端的完成了,但是还差很多,接下来我们继续优化。 --- ### 关于作者: 作者:`Aceld(刘丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原创声明:未经作者允许请勿转载, 如果转载请注明出处**