## 15) 异步消息任务机制
我们之前在`include/task_msg.h`中, 其中task的消息类型我们只是实现了`NEW_CONN`,目的是`thread_pool`选择一个线程,让一个线程里的`thread_queue`去创建一个连接对象。但是并没有对`NEW_TASK`的任务类型进行定义。这种类型是允许服务端去执行某项具体的业务。并不是根据客户端来消息去被动回复的业务,而是服务端主动发送的业务给到客户端。
### 15.1 任务函数类型
我们先定义task的回调函数类型
> lars_reactor/include/event_loop.h
```c
//...
//定义异步任务回调函数类型
typedef void (*task_func)(event_loop *loop, void *args);
//...
```
为了防止循环头文件引用,我们把typedef定义在`event_loop.h`中。
> lars_reactor/include/task_msg.h
```c
#pragma once
#include "event_loop.h"
//定义异步任务回调函数类型
typedef void (*task_func)(event_loop *loop, void *args);
struct task_msg
{
enum TASK_TYPE
{
NEW_CONN, //新建链接的任务
NEW_TASK, //一般的任务
};
TASK_TYPE type; //任务类型
//任务的一些参数
union {
//针对 NEW_CONN新建链接任务,需要传递connfd
int connfd;
//针对 NEW_TASK 新建任务,
//可以给一个任务提供一个回调函数
struct {
task_func task_cb; //注册的任务函数
void *args; //任务函数对应的形参
};
};
};
```
`task_func`是我们定义的一个任务的回调函数类型,第一个参数当然就是让哪个loop机制去执行这个task任务。很明显,一个loop是对应一个thread线程的。也就是让哪个thread去执行这个task任务。args是`task_func`的函数形参。
### 15.2 event_loop模块添加task任务机制
我们知道,task绑定一个loop,很明显,一个`event_loop`应该拥有需要被执行的task集合。
在这里,我们将event_loop加上已经就绪的task任务的属性
> lars_reactor/include/event_loop.h
```c
#pragma once
/*
*
* event_loop事件处理机制
*
* */
#include <sys/epoll.h>
#include <ext/hash_map>
#include <ext/hash_set>
#include <vector>
#include "event_base.h"
#include "task_msg.h"
#define MAXEVENTS 10
// map: fd->io_event
typedef __gnu_cxx::hash_map<int, io_event> io_event_map;
//定义指向上面map类型的迭代器
typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it;
//全部正在监听的fd集合
typedef __gnu_cxx::hash_set<int> listen_fd_set;
//定义异步任务回调函数类型
typedef void (*task_func)(event_loop *loop, void *args);
class event_loop
{
public:
//构造,初始化epoll堆
event_loop();
//阻塞循环处理事件
void event_process();
//添加一个io事件到loop中
void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL);
//删除一个io事件从loop中
void del_io_event(int fd);
//删除一个io事件的EPOLLIN/EPOLLOUT
void del_io_event(int fd, int mask);
// ===========================================
//获取全部监听事件的fd集合
void get_listen_fds(listen_fd_set &fds) {
fds = listen_fds;
}
//=== 异步任务task模块需要的方法 ===
//添加一个任务task到ready_tasks集合中
void add_task(task_func func, void *args);
//执行全部的ready_tasks里面的任务
void execute_ready_tasks();
// ===========================================
private:
int _epfd; //epoll fd
//当前event_loop 监控的fd和对应事件的关系
io_event_map _io_evs;
//当前event_loop 一共哪些fd在监听
listen_fd_set listen_fds;
//一次性最大处理的事件
struct epoll_event _fired_evs[MAXEVENTS];
// ===========================================
//需要被执行的task集合
typedef std::pair<task_func, void*> task_func_pair;
std::vector<task_func_pair> _ready_tasks;
// ===========================================
};
```
添加了两个属性:
`task_func_pair`: 回调函数和参数的键值对.
`_ready_tasks`: 所有已经就绪的待执行的任务集合。
同时添加了两个主要方法:
`void add_task(task_func func, void *args)`: 添加一个任务到_ready_tasks中.
`void execute_ready_tasks()`:执行全部的_ready_tasks任务。
将这两个方法实现如下:
> lars_reactor/src/event_loop.cpp
```c
//...
//添加一个任务task到ready_tasks集合中
void event_loop::add_task(task_func func, void *args)
{
task_func_pair func_pair(func, args);
_ready_tasks.push_back(func_pair);
}
//执行全部的ready_tasks里面的任务
void event_loop::execute_ready_tasks()
{
std::vector<task_func_pair>::iterator it;
for (it = _ready_tasks.begin(); it != _ready_tasks.end(); it++) {
task_func func = it->first;//任务回调函数
void *args = it->second;//回调函数形参
//执行任务
func(this, args);
}
//全部执行完毕,清空当前的_ready_tasks
_ready_tasks.clear();
}
//...
```
那么`execute_ready_tasks()`函数需要在一个恰当的时候被执行,我们这里就放在每次event_loop一次`epoll_wait()`处理完一组fd事件之后,触发一次额外的task任务。
> lars_reactor/src/event_loop.cpp
```c
//阻塞循环处理事件
void event_loop::event_process()
{
while (true) {
io_event_map_it ev_it;
int nfds = epoll_wait(_epfd, _fired_evs, MAXEVENTS, 10);
for (int i = 0; i < nfds; i++) {
//...
//...
}
//每次处理完一组epoll_wait触发的事件之后,处理异步任务
this->execute_ready_tasks();
}
}
```
这里补充一下,因为在task的回调函数中,有形参`event_loop *loop`,可能会使用当前loop中监控的fd信息,所以我们应该给event_loop补充一个获取当前loop监控的全部fd信息的方法
```c
class event_loop{
//...
//获取全部监听事件的fd集合
void get_listen_fds(listen_fd_set &fds) {
fds = listen_fds;
}
//...
};
```
### 15.3 thread_pool模块添加task任务机制
接下来我们就要用thread_pool来想每个thread所绑定的event_pool中去发送task任务,很明显thread_pool应该具备能够将task加入到event_pool中的_ready_task集合的功能。
> lars_reactor/include/thread_pool.h
```c
#pragma once
#include <pthread.h>
#include "task_msg.h"
#include "thread_queue.h"
class thread_pool
{
public:
//构造,初始化线程池, 开辟thread_cnt个
thread_pool(int thread_cnt);
//获取一个thead
thread_queue<task_msg>* get_thread();
//发送一个task任务给thread_pool里的全部thread
void send_task(task_func func, void *args = NULL);
private:
//_queues是当前thread_pool全部的消息任务队列头指针
thread_queue<task_msg> ** _queues;
//当前线程池中的线程个数
int _thread_cnt;
//已经启动的全部therad编号
pthread_t * _tids;
//当前选中的线程队列下标
int _index;
};
```
`send_task()`方法就是发送给线程池中全部的thread去执行task任务.
> lars_reactor/src/thread_pool.cpp
```c
void thread_pool::send_task(task_func func, void *args)
{
task_msg task;
//给当前thread_pool中的每个thread里的pool添加一个task任务
for (int i = 0; i < _thread_cnt; i++) {
//封装一个task消息
task.type = task_msg::NEW_TASK;
task.task_cb = func;
task.args = args;
//取出第i个thread的消息队列
thread_queue<task_msg> *queue = _queues[i];
//发送task消息
queue->send(task);
}
}
```
`send_task()`的实现实际上是告知全部的thread,封装一个`NEW_TASK`类型的消息,通过`task_queue`告知对应的thread.很明显当我们进行 `queue->send(task)`的时候,当前的thread绑定的loop,就会触发`deal_task_message()`回调了。
> lars_reactor/src/thread_pool.cpp
```c
/*
* 一旦有task消息过来,这个业务是处理task消息业务的主流程
*
* 只要有人调用 thread_queue:: send()方法就会触发次函数
*/
void deal_task_message(event_loop *loop, int fd, void *args)
{
//得到是哪个消息队列触发的
thread_queue<task_msg>* queue = (thread_queue<task_msg>*)args;
//将queue中的全部任务取出来
std::queue<task_msg> tasks;
queue->recv(tasks);
while (tasks.empty() != true) {
task_msg task = tasks.front();
//弹出一个元素
tasks.pop();
if (task.type == task_msg::NEW_CONN) {
//是一个新建链接的任务
//并且将这个tcp_conn加入当当前线程的loop中去监听
tcp_conn *conn = new tcp_conn(task.connfd, loop);
if (conn == NULL) {
fprintf(stderr, "in thread new tcp_conn error\n");
exit(1);
}
printf("[thread]: get new connection succ!\n");
}
else if (task.type == task_msg::NEW_TASK) {
//===========是一个新的普通任务===============
//当前的loop就是一个thread的事件监控loop,让当前loop触发task任务的回调
loop->add_task(task.task_cb, task.args);
//==========================================
}
else {
//其他未识别任务
fprintf(stderr, "unknow task!\n");
}
}
}
```
我们判断task.type如果是`NEW_TASK`就将该task加入到当前loop中去.
通过上面的设计,可以看出来,thread_pool的`send_task()`应该是一个对外的开发者接口,所以我们要让服务器的`tcp_server`能够获取到`thread_pool`属性.
> lars_reactor/include/tcp_server.h
```c
class tcp_server {
//...
//获取当前server的线程池
thread_pool *thread_poll() {
return _thread_pool;
}
//...
};
```
ok,这样我们基本上完成的task异步处理业务的机制. 下面我们来测试一下这个功能.
### 15.4 完成Lars Reactor V0.11开发
> server.cpp
```c
#include "tcp_server.h"
#include <string>
#include <string.h>
#include "config_file.h"
tcp_server *server;
void print_lars_task(event_loop *loop, void *args)
{
printf("======= Active Task Func! ========\n");
listen_fd_set fds;
loop->get_listen_fds(fds);//不同线程的loop,返回的fds是不同的
//可以向所有fds触发
listen_fd_set::iterator it;
//遍历fds
for (it = fds.begin(); it != fds.end(); it++) {
int fd = *it;
tcp_conn *conn = tcp_server::conns[fd]; //取出fd
if (conn != NULL) {
int msgid = 101;
const char *msg = "Hello I am a Task!";
conn->send_message(msg, strlen(msg), msgid);
}
}
}
//回显业务的回调函数
void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("callback_busi ...\n");
//直接回显
conn->send_message(data, len, msgid);
}
//打印信息回调函数
void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
printf("recv client: [%s]\n", data);
printf("msgid: [%d]\n", msgid);
printf("len: [%d]\n", len);
}
//新客户端创建的回调
void on_client_build(net_connection *conn, void *args)
{
int msgid = 101;
const char *msg = "welcome! you online..";
conn->send_message(msg, strlen(msg), msgid);
//创建链接成功之后触发任务
server->thread_poll()->send_task(print_lars_task);
}
//客户端销毁的回调
void on_client_lost(net_connection *conn, void *args)
{
printf("connection is lost !\n");
}
int main()
{
event_loop loop;
//加载配置文件
config_file::setPath("./serv.conf");
std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
short port = config_file::instance()->GetNumber("reactor", "port", 8888);
printf("ip = %s, port = %d\n", ip.c_str(), port);
server = new tcp_server(&loop, ip.c_str(), port);
//注册消息业务路由
server->add_msg_router(1, callback_busi);
server->add_msg_router(2, print_busi);
//注册链接hook回调
server->set_conn_start(on_client_build);
server->set_conn_close(on_client_lost);
loop.event_process();
return 0;
}
```
我们在每次建立连接成功之后,触发任务机制。其中`print_lars_task()`方法就是我们的异步任务。由于是全部thead都出发,所以该方法会被每个thread执行。但是不同的thread中的pool所返回的fd是不一样的,这里在`print_lars_task()`中,我们给对应的客户端做了一个简单的消息发送。
> client.cpp
```c
#include "tcp_client.h"
#include <stdio.h>
#include <string.h>
//客户端业务
void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data)
{
//得到服务端回执的数据
char *str = NULL;
str = (char*)malloc(len+1);
memset(str, 0, len+1);
memcpy(str, data, len);
printf("recv server: [%s]\n", str);
printf("msgid: [%d]\n", msgid);
printf("len: [%d]\n", len);
}
//客户端销毁的回调
void on_client_build(net_connection *conn, void *args)
{
int msgid = 1;
const char *msg = "Hello Lars!";
conn->send_message(msg, strlen(msg), msgid);
}
//客户端销毁的回调
void on_client_lost(net_connection *conn, void *args)
{
printf("on_client_lost...\n");
printf("Client is lost!\n");
}
int main()
{
event_loop loop;
//创建tcp客户端
tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6");
//注册消息路由业务
client.add_msg_router(1, busi);
client.add_msg_router(101, busi);
//设置hook函数
client.set_conn_start(on_client_build);
client.set_conn_close(on_client_lost);
//开启事件监听
loop.event_process();
return 0;
}
```
客户端代码无差别。
编译并运行
服务端:
```bash
$ ./server
msg_router init...
ip = 127.0.0.1, port = 7777
create 0 thread
create 1 thread
create 2 thread
create 3 thread
create 4 thread
add msg cb msgid = 1
add msg cb msgid = 2
begin accept
begin accept
[thread]: get new connection succ!
callback_busi ...
======= Active Task Func! ========
======= Active Task Func! ========
======= Active Task Func! ========
======= Active Task Func! ========
======= Active Task Func! ========
```
客户端:
```c
$ ./client
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 1
add msg cb msgid = 101
connect 127.0.0.1:7777 succ!
recv server: [welcome! you online..]
msgid: [101]
len: [21]
recv server: [Hello Lars!]
msgid: [1]
len: [11]
recv server: [Hello I am a Task!]
msgid: [101]
len: [18]
```
task机制已经集成完毕,lars_reactor功能更加强大了。
---
### 关于作者:
作者:`Aceld(刘丹冰)`
mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
github: [https://github.com/aceld](https://github.com/aceld)
原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld)
![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg)
>**原创声明:未经作者允许请勿转载, 如果转载请注明出处**
- 一、Lars系统概述
- 第1章-概述
- 第2章-项目目录构建
- 二、Reactor模型服务器框架
- 第1章-项目结构与V0.1雏形
- 第2章-内存管理与Buffer封装
- 第3章-事件触发EventLoop
- 第4章-链接与消息封装
- 第5章-Client客户端模型
- 第6章-连接管理及限制
- 第7章-消息业务路由分发机制
- 第8章-链接创建/销毁Hook机制
- 第9章-消息任务队列与线程池
- 第10章-配置文件读写功能
- 第11章-udp服务与客户端
- 第12章-数据传输协议protocol buffer
- 第13章-QPS性能测试
- 第14章-异步消息任务机制
- 第15章-链接属性设置功能
- 三、Lars系统之DNSService
- 第1章-Lars-dns简介
- 第2章-数据库创建
- 第3章-项目目录结构及环境构建
- 第4章-Route结构的定义
- 第5章-获取Route信息
- 第6章-Route订阅模式
- 第7章-Backend Thread实时监控
- 四、Lars系统之Report Service
- 第1章-项目概述-数据表及proto3协议定义
- 第2章-获取report上报数据
- 第3章-存储线程池及消息队列
- 五、Lars系统之LoadBalance Agent
- 第1章-项目概述及构建
- 第2章-主模块业务结构搭建
- 第3章-Report与Dns Client设计与实现
- 第4章-负载均衡模块基础设计
- 第5章-负载均衡获取Host主机信息API
- 第6章-负载均衡上报Host主机信息API
- 第7章-过期窗口清理与过载超时(V0.5)
- 第8章-定期拉取最新路由信息(V0.6)
- 第9章-负载均衡获取Route信息API(0.7)
- 第10章-API初始化接口(V0.8)
- 第11章-Lars Agent性能测试工具
- 第12章- Lars启动工具脚本