## 6) tcp客户端触发模型
我们可以给客户端添加触发模型。同时也提供一系列的接口供开发者写客户端应用程序来使用。
### 6.1 tcp_client类设计
> lars_reactor/include/tcp_client.h
```c
#pragma once
#include "io_buf.h"
#include "event_loop.h"
#include "message.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
class tcp_client
{
public:
//初始化客户端套接字
tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name);
//发送message方法
int send_message(const char *data, int msglen, int msgid);
//创建链接
void do_connect();
//处理读业务
int do_read();
//处理写业务
int do_write();
//释放链接资源
void clean_conn();
~tcp_client();
//设置业务处理回调函数
void set_msg_callback(msg_callback *msg_cb)
{
this->_msg_callback = msg_cb;
}
bool connected; //链接是否创建成功
//server端地址
struct sockaddr_in _server_addr;
io_buf _obuf;
io_buf _ibuf;
private:
int _sockfd;
socklen_t _addrlen;
//客户端的事件处理机制
event_loop* _loop;
//当前客户端的名称 用户记录日志
const char *_name;
msg_callback *_msg_callback;
};
```
这里注意的是,tcp_client并不是tcp_server的一部分,而是单纯为写客户端提供的接口。所以这里也需要实现一套对读写事件处理的业务。 这里使用的读写缓冲是原始的`io_buf`,并不是服务器封装好的`reactor_buf`原因是后者是转为server做了一层封装,io_buf的基本方法比较全。
**关键成员**:
`_sockfd`:当前客户端套接字。
`_server_addr`: 链接的服务端的IP地址。
`_loop`: 客户端异步触发事件机制event_loop句柄。
`_msg_callback`: 当前客户端处理服务端的回调业务。
`connected`:是否已经成功connect服务端的标致。
**方法**:
`tcp_client()`:构造函数,主要是在里面完成基本的套接字初始化及connect操作.
`do_connect()`:创建链接
`do_read()`:处理链接的读业务。
`do_write()`:处理链接的写业务。
`clean_conn()`:清空链接资源。
### 6.2 创建链接
> lars_reactor/src/tcp_client.cpp
```c
tcp_client::tcp_client(event_loop *loop, const char *ip, unsigned short port, const char *name):
_ibuf(4194304),
_obuf(4194304)
{
_sockfd = -1;
_msg_callback = NULL;
_name = name;
_loop = loop;
bzero(&_server_addr, sizeof(_server_addr));
_server_addr.sin_family = AF_INET;
inet_aton(ip, &_server_addr.sin_addr);
_server_addr.sin_port = htons(port);
_addrlen = sizeof(_server_addr);
this->do_connect();
}
```
这里初始化tcp_client链接信息,然后调用`do_connect()`创建链接.
> lars_reactor/src/tcp_client.cpp
```c
//创建链接
void tcp_client::do_connect()
{
if (_sockfd != -1) {
close(_sockfd);
}
//创建套接字
_sockfd = socket(AF_INET, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, IPPROTO_TCP);
if (_sockfd == -1) {
fprintf(stderr, "create tcp client socket error\n");
exit(1);
}
int ret = connect(_sockfd, (const struct sockaddr*)&_server_addr, _addrlen);
if (ret == 0) {
//链接创建成功
connected = true;
//注册读回调
_loop->add_io_event(_sockfd, read_callback, EPOLLIN, this);
//如果写缓冲去有数据,那么也需要触发写回调
if (this->_obuf.length != 0) {
_loop->add_io_event(_sockfd, write_callback, EPOLLOUT, this);
}
printf("connect %s:%d succ!\n", inet_ntoa(_server_addr.sin_addr), ntohs(_server_addr.sin_port));
}
else {
if(errno == EINPROGRESS) {
//fd是非阻塞的,可能会出现这个错误,但是并不表示链接创建失败
//如果fd是可写状态,则为链接是创建成功的.
fprintf(stderr, "do_connect EINPROGRESS\n");
//让event_loop去触发一个创建判断链接业务 用EPOLLOUT事件立刻触发
_loop->add_io_event(_sockfd, connection_delay, EPOLLOUT, this);
}
else {
fprintf(stderr, "connection error\n");
exit(1);
}
}
}
```
### 6.3 有关非阻塞客户端socket创建链接问题
这里转载一篇文章,是有关非阻塞套接字,connect返回-1,并且errno是`EINPROGRESS`的情况。因为我们的client是采用event_loop形式,socket需要被设置为非阻塞。所以需要针对这个情况做处理。下面是说明。
客户端测试程序时,由于出现很多客户端,经过connect成功后,代码卡在recv系统调用中,后来发现可能是由于socket默认是阻塞模式,所以会令很多客户端链接处于链接却不能传输数据状态。
后来修改socket为非阻塞模式,但在connect的时候,发现返回值为-1,刚开始以为是connect出现错误,但在服务器上看到了链接是ESTABLISED状态。证明链接是成功的
但为什么会出现返回值是-1呢? 经过查询资料,以及看stevens的APUE,也发现有这么一说。
当connect在非阻塞模式下,会出现返回`-1`值,错误码是`EINPROGRESS`,但如何判断connect是联通的呢?stevens书中说明要在connect后,继续判断该socket是否可写?
**若可写,则证明链接成功。**
如何判断可写,有2种方案,一种是select判断是否可写,二用poll模型。
select:
```c
int CheckConnect(int iSocket)
{
fd_set rset;
FD_ZERO(&rset);
FD_SET(iSocket, &rset);
timeval tm;
tm. tv_sec = 0;
tm.tv_usec = 0;
if ( select(iSocket + 1, NULL, &rset, NULL, &tval) <= 0)
{
close(iSocket);
return -1;
}
if (FD_ISSET(iSocket, &rset))
{
int err = -1;
socklen_t len = sizeof(int);
if ( getsockopt(iSocket, SOL_SOCKET, SO_ERROR ,&err, &len) < 0 )
{
close(iSocket);
printf("errno:%d %s\n", errno, strerror(errno));
return -2;
}
if (err)
{
errno = err;
close(iSocket);
return -3;
}
}
return 0;
}
```
poll:
```c
int CheckConnect(int iSocket) {
struct pollfd fd;
int ret = 0;
socklen_t len = 0;
fd.fd = iSocket;
fd.events = POLLOUT;
while ( poll (&fd, 1, -1) == -1 ) {
if( errno != EINTR ){
perror("poll");
return -1;
}
}
len = sizeof(ret);
if ( getsockopt (iSocket, SOL_SOCKET, SO_ERROR, &ret, &len) == -1 ) {
perror("getsockopt");
return -1;
}
if(ret != 0) {
fprintf (stderr, "socket %d connect failed: %s\n",
iSocket, strerror (ret));
return -1;
}
return 0;
}
```
### 6.3 针对EINPROGRESS的连接创建处理
看上面`do_connect()`的代码其中一部分:
```c
if(errno == EINPROGRESS) {
//fd是非阻塞的,可能会出现这个错误,但是并不表示链接创建失败
//如果fd是可写状态,则为链接是创建成功的.
fprintf(stderr, "do_connect EINPROGRESS\n");
//让event_loop去触发一个创建判断链接业务 用EPOLLOUT事件立刻触发
_loop->add_io_event(_sockfd, connection_delay, EPOLLOUT, this);
}
```
这里是又触发一个写事件,直接让程序流程跳转到`connection_delay()`方法.那么我们需要在里面判断链接是否已经判断成功,并且做出一定的创建成功之后的业务动作。
> lars_reactor/src/tcp_client.cpp
```c
//判断链接是否是创建链接,主要是针对非阻塞socket 返回EINPROGRESS错误
static void connection_delay(event_loop *loop, int fd, void *args)
{
tcp_client *cli = (tcp_client*)args;
loop->del_io_event(fd);
int result = 0;
socklen_t result_len = sizeof(result);
getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len);
if (result == 0) {
//链接是建立成功的
cli->connected = true;
printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
//建立连接成功之后,主动发送send_message
const char *msg = "hello lars!";
int msgid = 1;
cli->send_message(msg, strlen(msg), msgid);
loop->add_io_event(fd, read_callback, EPOLLIN, cli);
if (cli->_obuf.length != 0) {
//输出缓冲有数据可写
loop->add_io_event(fd, write_callback, EPOLLOUT, cli);
}
}
else {
//链接创建失败
fprintf(stderr, "connection %s:%d error\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
}
}
```
这是一个事件回调,所以用的是static方法而不是成员方法。首先是利用`getsockopt`判断链接是否创建成功,如果成功,那么 我们当前这个版本的客户端是直接写死主动调用`send_message()`方法发送给服务端一个`hello lars!`字符串。然后直接交给我们的`read_callback()`方法处理,当然如果写缓冲有数据,我们也会触发写的`write_callback()`方法。
接下来,看看这两个callback以及send_message是怎么实现的。
**callback**
> lars_reactor/src/tcp_client.cpp
```c
static void write_callback(event_loop *loop, int fd, void *args)
{
tcp_client *cli = (tcp_client *)args;
cli->do_write();
}
static void read_callback(event_loop *loop, int fd, void *args)
{
tcp_client *cli = (tcp_client *)args;
cli->do_read();
}
//处理读业务
int tcp_client::do_read()
{
//确定已经成功建立连接
assert(connected == true);
// 1. 一次性全部读取出来
//得到缓冲区里有多少字节要被读取,然后将字节数放入b里面。
int need_read = 0;
if (ioctl(_sockfd, FIONREAD, &need_read) == -1) {
fprintf(stderr, "ioctl FIONREAD error");
return -1;
}
//确保_buf可以容纳可读数据
assert(need_read <= _ibuf.capacity - _ibuf.length);
int ret;
do {
ret = read(_sockfd, _ibuf.data + _ibuf.length, need_read);
} while(ret == -1 && errno == EINTR);
if (ret == 0) {
//对端关闭
if (_name != NULL) {
printf("%s client: connection close by peer!\n", _name);
}
else {
printf("client: connection close by peer!\n");
}
clean_conn();
return -1;
}
else if (ret == -1) {
fprintf(stderr, "client: do_read() , error\n");
clean_conn();
return -1;
}
assert(ret == need_read);
_ibuf.length += ret;
//2. 解包
msg_head head;
int msgid, length;
while (_ibuf.length >= MESSAGE_HEAD_LEN) {
memcpy(&head, _ibuf.data + _ibuf.head, MESSAGE_HEAD_LEN);
msgid = head.msgid;
length = head.msglen;
/*
if (length + MESSAGE_HEAD_LEN < _ibuf.length) {
break;
}
*/
//头部读取完毕
_ibuf.pop(MESSAGE_HEAD_LEN);
//3. 交给业务函数处理
if (_msg_callback != NULL) {
this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);
}
//数据区域处理完毕
_ibuf.pop(length);
}
//重置head指针
_ibuf.adjust();
return 0;
}
//处理写业务
int tcp_client::do_write()
{
//数据有长度,切头部索引是起始位置
assert(_obuf.head == 0 && _obuf.length);
int ret;
while (_obuf.length) {
//写数据
do {
ret = write(_sockfd, _obuf.data, _obuf.length);
} while(ret == -1 && errno == EINTR);//非阻塞异常继续重写
if (ret > 0) {
_obuf.pop(ret);
_obuf.adjust();
}
else if (ret == -1 && errno != EAGAIN) {
fprintf(stderr, "tcp client write \n");
this->clean_conn();
}
else {
//出错,不能再继续写
break;
}
}
if (_obuf.length == 0) {
//已经写完,删除写事件
printf("do write over, del EPOLLOUT\n");
this->_loop->del_io_event(_sockfd, EPOLLOUT);
}
return 0;
}
//释放链接资源,重置连接
void tcp_client::clean_conn()
{
if (_sockfd != -1) {
printf("clean conn, del socket!\n");
_loop->del_io_event(_sockfd);
close(_sockfd);
}
connected = false;
//重新连接
this->do_connect();
}
tcp_client::~tcp_client()
{
close(_sockfd);
}
```
这里是基本的读数据和写数据的处理业务实现。我们重点看`do_read()`方法,里面有段代码:
```c
//3. 交给业务函数处理
if (_msg_callback != NULL) {
this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);
}
```
是将我们从服务端读取到的代码,交给了`_msg_callback()`方法来处理的,这个实际上是用户开发者自己在业务上注册的回调业务函数。在tcp_client.h中我们已经提供了`set_msg_callback`暴露给开发者注册使用。
------
**send_message**
> lars_reactor/src/tcp_client.cpp
```c
//主动发送message方法
int tcp_client::send_message(const char *data, int msglen, int msgid)
{
if (connected == false) {
fprintf(stderr, "no connected , send message stop!\n");
return -1;
}
//是否需要添加写事件触发
//如果obuf中有数据,没必要添加,如果没有数据,添加完数据需要触发
bool need_add_event = (_obuf.length == 0) ? true:false;
if (msglen + MESSAGE_HEAD_LEN > this->_obuf.capacity - _obuf.length) {
fprintf(stderr, "No more space to Write socket!\n");
return -1;
}
//封装消息头
msg_head head;
head.msgid = msgid;
head.msglen = msglen;
memcpy(_obuf.data + _obuf.length, &head, MESSAGE_HEAD_LEN);
_obuf.length += MESSAGE_HEAD_LEN;
memcpy(_obuf.data + _obuf.length, data, msglen);
_obuf.length += msglen;
if (need_add_event) {
_loop->add_io_event(_sockfd, write_callback, EPOLLOUT, this);
}
return 0;
}
```
将发送的数据写给obuf,然后出发write_callback将obuf的数据传递给对方服务端。
### 6.4 完成Lars Reactor V0.4开发
好了,现在我们框架部分已经完成,接下来我们就要实现一个serverapp 和 一个clientapp来进行测试.
我们创建`example/lars_reactor_0.4`文件夹。
**Makefile**
```makefile
CXX=g++
CFLAGS=-g -O2 -Wall -fPIC -Wno-deprecated
INC=-I../../include
LIB=-L../../lib -llreactor -lpthread
OBJS = $(addsuffix .o, $(basename $(wildcard *.cc)))
all:
$(CXX) -o server $(CFLAGS) server.cpp $(INC) $(LIB)
$(CXX) -o client $(CFLAGS) client.cpp $(INC) $(LIB)
clean:
-rm -f *.o server client
```
服务端代码:
> server.cpp
```c
#include "tcp_server.h"
int main()
{
event_loop loop;
tcp_server server(&loop, "127.0.0.1", 7777);
loop.event_process();
return 0;
}
```
客户端代码:
> client.cpp
```c
#include "tcp_client.h"
#include <stdio.h>
#include <string.h>
//客户端业务
void busi(const char *data, uint32_t len, int msgid, tcp_client *conn, void *user_data)
{
//得到服务端回执的数据
printf("recv server: [%s]\n", data);
printf("msgid: [%d]\n", msgid);
printf("len: [%d]\n", len);
}
int main()
{
event_loop loop;
//创建tcp客户端
tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.4");
//注册回调业务
client.set_msg_callback(busi);
//开启事件监听
loop.event_process();
return 0;
}
```
编译并分别启动server 和client
服务端输出:
```bash
$ ./server
begin accept
get new connection succ!
read data: hello lars!
server send_message: hello lars!:11, msgid = 1
```
客户端输出:
```bash
$ ./client
do_connect EINPROGRESS
connect 127.0.0.1:7777 succ!
do write over, del EPOLLOUT
recv server: [hello lars!]
msgid: [1]
len: [11]
```
现在客户端已经成功的发送数据给服务端,并且回显的数据也直接被客户端解析,我们的框架到现在就可以做一个基本的客户端和服务端的完成了,但是还差很多,接下来我们继续优化。
---
### 关于作者:
作者:`Aceld(刘丹冰)`
mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
github: [https://github.com/aceld](https://github.com/aceld)
原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld)
![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg)
>**原创声明:未经作者允许请勿转载, 如果转载请注明出处**
- 一、Lars系统概述
- 第1章-概述
- 第2章-项目目录构建
- 二、Reactor模型服务器框架
- 第1章-项目结构与V0.1雏形
- 第2章-内存管理与Buffer封装
- 第3章-事件触发EventLoop
- 第4章-链接与消息封装
- 第5章-Client客户端模型
- 第6章-连接管理及限制
- 第7章-消息业务路由分发机制
- 第8章-链接创建/销毁Hook机制
- 第9章-消息任务队列与线程池
- 第10章-配置文件读写功能
- 第11章-udp服务与客户端
- 第12章-数据传输协议protocol buffer
- 第13章-QPS性能测试
- 第14章-异步消息任务机制
- 第15章-链接属性设置功能
- 三、Lars系统之DNSService
- 第1章-Lars-dns简介
- 第2章-数据库创建
- 第3章-项目目录结构及环境构建
- 第4章-Route结构的定义
- 第5章-获取Route信息
- 第6章-Route订阅模式
- 第7章-Backend Thread实时监控
- 四、Lars系统之Report Service
- 第1章-项目概述-数据表及proto3协议定义
- 第2章-获取report上报数据
- 第3章-存储线程池及消息队列
- 五、Lars系统之LoadBalance Agent
- 第1章-项目概述及构建
- 第2章-主模块业务结构搭建
- 第3章-Report与Dns Client设计与实现
- 第4章-负载均衡模块基础设计
- 第5章-负载均衡获取Host主机信息API
- 第6章-负载均衡上报Host主机信息API
- 第7章-过期窗口清理与过载超时(V0.5)
- 第8章-定期拉取最新路由信息(V0.6)
- 第9章-负载均衡获取Route信息API(0.7)
- 第10章-API初始化接口(V0.8)
- 第11章-Lars Agent性能测试工具
- 第12章- Lars启动工具脚本