## 8) 消息业务路由分发机制
现在我们发送的消息都是message结构的,有个message头里面其中有两个关键的字段,`msgid`和`msglen`,其中加入`msgid`的意义就是我们可以甄别是哪个消息,从而对这类消息做出不同的业务处理。但是现在我们无论是服务端还是客户端都是写死的两个业务,就是"回显业务",显然这并不满足我们作为服务器框架的需求。我们需要开发者可以注册自己的回调业务。所以我们需要提供一个注册业务的入口,然后在后端根据不同的`msgid`来激活不同的回调业务函数。
### 8.1 添加消息分发路由类msg_router
下面我们提供这样一个中转的router模块,在include/message.h添加
> lars_reactor/include/message.h
```c
#pragma once
#include <ext/hash_map>
//解决tcp粘包问题的消息头
struct msg_head
{
int msgid;
int msglen;
};
//消息头的二进制长度,固定数
#define MESSAGE_HEAD_LEN 8
//消息头+消息体的最大长度限制
#define MESSAGE_LENGTH_LIMIT (65535 - MESSAGE_HEAD_LEN)
//msg 业务回调函数原型
//===================== 消息分发路由机制 ==================
class tcp_client;
typedef void msg_callback(const char *data, uint32_t len, int msgid, tcp_client *client, void *user_data);
//消息路由分发机制
class msg_router
{
public:
msg_router():_router(),_args() {}
//给一个消息ID注册一个对应的回调业务函数
int register_msg_router(int msgid, msg_callback *msg_cb, void *user_data)
{
if(_router.find(msgid) != _router.end()) {
//该msgID的回调业务已经存在
return -1;
}
_router[msgid] = msg_cb;
_args[msgid] = user_data;
return 0;
}
//调用注册的对应的回调业务函数
void call(int msgid, uint32_t msglen, const char *data, tcp_client *client)
{
//判断msgid对应的回调是否存在
if (_router.find(msgid) == _router.end()) {
fprintf(stderr, "msgid %d is not register!\n", msgid);
return;
}
//直接取出回调函数,执行
msg_callback *callback = _router[msgid];
void *user_data = _args[msgid];
callback(data, msglen, msgid, client, user_data);
}
private:
//针对消息的路由分发,key为msgID, value为注册的回调业务函数
__gnu_cxx::hash_map<int, msg_callback *> _router;
//回调业务函数对应的参数,key为msgID, value为对应的参数
__gnu_cxx::hash_map<int, void *> _args;
};
//===================== 消息分发路由机制 ==================
```
开发者需要注册一个`msg_callback`类型的函数,通过`msg_router`类的`register_msg_router()`方法来注册,同时通过`call()`方法来调用。
全部回调业务函数和msgid的对应关系保存在一个hash_map类型的`_router`map中,`_args`保存对应的参数。
但是这里有个小细节需要注意一下,`msg_callback`的函数类型声明是这样的。
```c
typedef void msg_callback(const char *data, uint32_t len, int msgid, tcp_client *client, void *user_data);
```
其中这里面第4个参数,只能是tcp_client类型的参数,也就是我们之前的设计的msg_callback只支持tcp_client的消息回调机制,但是很明显我们的需求是不仅是`tcp_client`要用,tcp_server中的`tcp_conn`也要用到这个机制,那么很显然这个参数在这就不是很合适,那么如果设定一个形参既能指向`tcp_client`又能能指向`tcp_conn`两个类型呢,当然答案就只能是将这两个类抽象出来一层,用父类指针指向子类然后通过多态特性来调用就可以了,所以我们需要先定义一个抽象类。
### 8.2 链接抽象类创建
经过分析,我们定义如下的抽象类,并提供一些接口。
> lars_reactor/include/net_connection.h
```c
#pragma once
/*
*
* 网络通信的抽象类,任何需要进行收发消息的模块,都可以实现该类
*
* */
class net_connection
{
public:
//发送消息的接口
virtual int send_message(const char *data, int datalen, int msgid) = 0;
};
```
然后让我们tcp_server端的`tcp_conn`类继承`net_connecton`, 客户端的`tcp_client` 继承`net_connection`
> lars_reactor/include/tcp_conn.h
```c
class tcp_conn : public net_connection
{
//...
};
```
> lars_reactor/include/tcp_client.h
```c
class tcp_client : public net_connection
{
//...
}
```
这样,我们就可以用一个net_connection指针指向这两种不同的对象实例了。
接下来我们将`msg_callback`回调业务函数类型改成
```c
typedef void msg_callback(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data);
```
这样这个业务函数就可以支持tcp_conn和tcp_client了。
所以修改之后,我们的`msg_router`类定义如下:
> lars_reactor/include/message.h
```c
//消息路由分发机制
class msg_router
{
public:
msg_router(): {
printf("msg router init ...\n");
}
//给一个消息ID注册一个对应的回调业务函数
int register_msg_router(int msgid, msg_callback *msg_cb, void *user_data)
{
if(_router.find(msgid) != _router.end()) {
//该msgID的回调业务已经存在
return -1;
}
printf("add msg cb msgid = %d\n", msgid);
_router[msgid] = msg_cb;
_args[msgid] = user_data;
return 0;
}
//调用注册的对应的回调业务函数
void call(int msgid, uint32_t msglen, const char *data, net_connection *net_conn)
{
printf("call msgid = %d\n", msgid);
//判断msgid对应的回调是否存在
if (_router.find(msgid) == _router.end()) {
fprintf(stderr, "msgid %d is not register!\n", msgid);
return;
}
//直接取出回调函数,执行
msg_callback *callback = _router[msgid];
void *user_data = _args[msgid];
callback(data, msglen, msgid, net_conn, user_data);
printf("=======\n");
}
private:
//针对消息的路由分发,key为msgID, value为注册的回调业务函数
__gnu_cxx::hash_map<int, msg_callback*> _router;
//回调业务函数对应的参数,key为msgID, value为对应的参数
__gnu_cxx::hash_map<int, void*> _args;
};
```
### 8.3 msg_router集成到tcp_server中
#### A. tcp_server添加msg_router静态成员变量
> lars_reactor/include/tcp_server.h
```c
class tcp_server
{
public:
// ...
//---- 消息分发路由 ----
static msg_router router;
// ...
};
```
同时定义及初始化
> lars_reactor/src/tcp_server.cpp
```c
//...
// ==== 消息分发路由 ===
msg_router tcp_server::router;
//...
```
#### B. tcp_server提供注册路由方法
> lars_reactor/include/tcp_server.c
```c
class tcp_server
{
public:
//...
//注册消息路由回调函数
void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {
router.register_msg_router(msgid, cb, user_data);
}
//...
public:
//全部已经在线的连接信息
//---- 消息分发路由 ----
static msg_router router;
//...
};
```
#### C. 修正tcp_conn的do_read改成消息分发
> lars_reactor/src/tcp_conn.cpp
```c
//...
//处理读业务
void tcp_conn::do_read()
{
//1. 从套接字读取数据
int ret = ibuf.read_data(_connfd);
if (ret == -1) {
fprintf(stderr, "read data from socket\n");
this->clean_conn();
return ;
}
else if ( ret == 0) {
//对端正常关闭
printf("connection closed by peer\n");
clean_conn();
return ;
}
//2. 解析msg_head数据
msg_head head;
//[这里用while,可能一次性读取多个完整包过来]
while (ibuf.length() >= MESSAGE_HEAD_LEN) {
//2.1 读取msg_head头部,固定长度MESSAGE_HEAD_LEN
memcpy(&head, ibuf.data(), MESSAGE_HEAD_LEN);
if(head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0) {
fprintf(stderr, "data format error, need close, msglen = %d\n", head.msglen);
this->clean_conn();
break;
}
if (ibuf.length() < MESSAGE_HEAD_LEN + head.msglen) {
//缓存buf中剩余的数据,小于实际上应该接受的数据
//说明是一个不完整的包,应该抛弃
break;
}
//2.2 再根据头长度读取数据体,然后针对数据体处理 业务
//头部处理完了,往后偏移MESSAGE_HEAD_LEN长度
ibuf.pop(MESSAGE_HEAD_LEN);
//处理ibuf.data()业务数据
printf("read data: %s\n", ibuf.data());
//消息包路由模式
tcp_server::router.call(head.msgid, head.msglen, ibuf.data(), this);
////回显业务
//callback_busi(ibuf.data(), head.msglen, head.msgid, NULL, this);
//消息体处理完了,往后便宜msglen长度
ibuf.pop(head.msglen);
}
ibuf.adjust();
return ;
}
//...
```
### 8.4 msg_router集成到tcp_client中
> lars_reactor/include/tcp_client.h
```c
class tcp_client : public net_connection
{
public:
// ...
//设置业务处理回调函数
//void set_msg_callback(msg_callback *msg_cb)
//{
//this->_msg_callback = msg_cb;
//}
//注册消息路由回调函数
void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL) {
_router.register_msg_router(msgid, cb, user_data);
}
private:
//处理消息的分发路由
msg_router _router;
//msg_callback *_msg_callback; //单路由模式去掉
// ...
// ...
};
```
然后在修正`tcp_client`的`do_read()`方法。
> lars_reactor/src/tcp_client.cpp
```c
//处理读业务
int tcp_client::do_read()
{
//确定已经成功建立连接
assert(connected == true);
// 1. 一次性全部读取出来
//得到缓冲区里有多少字节要被读取,然后将字节数放入b里面。
int need_read = 0;
if (ioctl(_sockfd, FIONREAD, &need_read) == -1) {
fprintf(stderr, "ioctl FIONREAD error");
return -1;
}
//确保_buf可以容纳可读数据
assert(need_read <= _ibuf.capacity - _ibuf.length);
int ret;
do {
ret = read(_sockfd, _ibuf.data + _ibuf.length, need_read);
} while(ret == -1 && errno == EINTR);
if (ret == 0) {
//对端关闭
if (_name != NULL) {
printf("%s client: connection close by peer!\n", _name);
}
else {
printf("client: connection close by peer!\n");
}
clean_conn();
return -1;
}
else if (ret == -1) {
fprintf(stderr, "client: do_read() , error\n");
clean_conn();
return -1;
}
assert(ret == need_read);
_ibuf.length += ret;
//2. 解包
msg_head head;
int msgid, length;
while (_ibuf.length >= MESSAGE_HEAD_LEN) {
memcpy(&head, _ibuf.data + _ibuf.head, MESSAGE_HEAD_LEN);
msgid = head.msgid;
length = head.msglen;
/*
if (length + MESSAGE_HEAD_LEN < _ibuf.length) {
break;
}
*/
//头部读取完毕
_ibuf.pop(MESSAGE_HEAD_LEN);
// ===================================
//3. 交给业务函数处理
//if (_msg_callback != NULL) {
//this->_msg_callback(_ibuf.data + _ibuf.head, length, msgid, this, NULL);
//}
// 消息路由分发
this->_router.call(msgid, length, _ibuf.data + _ibuf.head, this);
// ===================================
//数据区域处理完毕
_ibuf.pop(length);
}
//重置head指针
_ibuf.adjust();
return 0;
}
```
### 8.5 完成Lars Reactor V0.6开发
我们现在重新写一下 server.cpp 和client.cpp的两个应用程序
> lars_reacor/example/lars_reactor_0.6/server.cpp
```c
#include "tcp_server.h"
//回显业务的回调函数
void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("callback_busi ...\n");
//直接回显
conn->send_message(data, len, msgid);
}
//打印信息回调函数
void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("recv client: [%s]\n", data);
printf("msgid: [%d]\n", msgid);
printf("len: [%d]\n", len);
}
int main()
{
event_loop loop;
tcp_server server(&loop, "127.0.0.1", 7777);
//注册消息业务路由
server.add_msg_router(1, callback_busi);
server.add_msg_router(2, print_busi);
loop.event_process();
return 0;
}
```
> lars_reacor/example/lars_reactor_0.6/client.cpp
```c
#include "tcp_client.h"
#include <stdio.h>
#include <string.h>
//客户端业务
void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
//得到服务端回执的数据
printf("recv server: [%s]\n", data);
printf("msgid: [%d]\n", msgid);
printf("len: [%d]\n", len);
}
int main()
{
event_loop loop;
//创建tcp客户端
tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6");
//注册消息路由业务
client.add_msg_router(1, busi);
//开启事件监听
loop.event_process();
return 0;
}
```
> lars_reactor/src/tcp_client.cpp
```c
//判断链接是否是创建链接,主要是针对非阻塞socket 返回EINPROGRESS错误
static void connection_delay(event_loop *loop, int fd, void *args)
{
tcp_client *cli = (tcp_client*)args;
loop->del_io_event(fd);
int result = 0;
socklen_t result_len = sizeof(result);
getsockopt(fd, SOL_SOCKET, SO_ERROR, &result, &result_len);
if (result == 0) {
//链接是建立成功的
cli->connected = true;
printf("connect %s:%d succ!\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
// ================ 发送msgid:1 =====
//建立连接成功之后,主动发送send_message
const char *msg = "hello lars!";
int msgid = 1;
cli->send_message(msg, strlen(msg), msgid);
// ================ 发送msgid:2 =====
const char *msg2 = "hello Aceld!";
msgid = 2;
cli->send_message(msg2, strlen(msg2), msgid);
// ================
loop->add_io_event(fd, read_callback, EPOLLIN, cli);
if (cli->_obuf.length != 0) {
//输出缓冲有数据可写
loop->add_io_event(fd, write_callback, EPOLLOUT, cli);
}
}
else {
//链接创建失败
fprintf(stderr, "connection %s:%d error\n", inet_ntoa(cli->_server_addr.sin_addr), ntohs(cli->_server_addr.sin_port));
}
}
```
运行结果:
服务端
```c
$ ./server
msg_router init...
add msg cb msgid = 1
add msg cb msgid = 2
begin accept
get new connection succ!
read data: hello lars!
call msgid = 1
callback_busi ...
server send_message: hello lars!:11, msgid = 1
=======
read data: hello Aceld!
call msgid = 2
recv client: [hello Aceld!]
msgid: [2]
len: [12]
```
客户端
```c
$ ./client
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 1
connect 127.0.0.1:7777 succ!
do write over, del EPOLLOUT
call msgid = 1
recv server: [hello lars!]
msgid: [1]
len: [11]
=======
```
---
### 关于作者:
作者:`Aceld(刘丹冰)`
mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
github: [https://github.com/aceld](https://github.com/aceld)
原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld)
![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg)
>**原创声明:未经作者允许请勿转载, 如果转载请注明出处**
- 一、Lars系统概述
- 第1章-概述
- 第2章-项目目录构建
- 二、Reactor模型服务器框架
- 第1章-项目结构与V0.1雏形
- 第2章-内存管理与Buffer封装
- 第3章-事件触发EventLoop
- 第4章-链接与消息封装
- 第5章-Client客户端模型
- 第6章-连接管理及限制
- 第7章-消息业务路由分发机制
- 第8章-链接创建/销毁Hook机制
- 第9章-消息任务队列与线程池
- 第10章-配置文件读写功能
- 第11章-udp服务与客户端
- 第12章-数据传输协议protocol buffer
- 第13章-QPS性能测试
- 第14章-异步消息任务机制
- 第15章-链接属性设置功能
- 三、Lars系统之DNSService
- 第1章-Lars-dns简介
- 第2章-数据库创建
- 第3章-项目目录结构及环境构建
- 第4章-Route结构的定义
- 第5章-获取Route信息
- 第6章-Route订阅模式
- 第7章-Backend Thread实时监控
- 四、Lars系统之Report Service
- 第1章-项目概述-数据表及proto3协议定义
- 第2章-获取report上报数据
- 第3章-存储线程池及消息队列
- 五、Lars系统之LoadBalance Agent
- 第1章-项目概述及构建
- 第2章-主模块业务结构搭建
- 第3章-Report与Dns Client设计与实现
- 第4章-负载均衡模块基础设计
- 第5章-负载均衡获取Host主机信息API
- 第6章-负载均衡上报Host主机信息API
- 第7章-过期窗口清理与过载超时(V0.5)
- 第8章-定期拉取最新路由信息(V0.6)
- 第9章-负载均衡获取Route信息API(0.7)
- 第10章-API初始化接口(V0.8)
- 第11章-Lars Agent性能测试工具
- 第12章- Lars启动工具脚本