企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 15) 异步消息任务机制 ​ 我们之前在`include/task_msg.h`中, 其中task的消息类型我们只是实现了`NEW_CONN`,目的是`thread_pool`选择一个线程,让一个线程里的`thread_queue`去创建一个连接对象。但是并没有对`NEW_TASK`的任务类型进行定义。这种类型是允许服务端去执行某项具体的业务。并不是根据客户端来消息去被动回复的业务,而是服务端主动发送的业务给到客户端。 ### 15.1 任务函数类型 ​ 我们先定义task的回调函数类型 > lars_reactor/include/event_loop.h ```c //... //定义异步任务回调函数类型 typedef void (*task_func)(event_loop *loop, void *args); //... ``` ​ 为了防止循环头文件引用,我们把typedef定义在`event_loop.h`中。 > lars_reactor/include/task_msg.h ```c #pragma once #include "event_loop.h" //定义异步任务回调函数类型 typedef void (*task_func)(event_loop *loop, void *args); struct task_msg { enum TASK_TYPE { NEW_CONN, //新建链接的任务 NEW_TASK, //一般的任务 }; TASK_TYPE type; //任务类型 //任务的一些参数 union { //针对 NEW_CONN新建链接任务,需要传递connfd int connfd; //针对 NEW_TASK 新建任务, //可以给一个任务提供一个回调函数 struct { task_func task_cb; //注册的任务函数 void *args; //任务函数对应的形参 }; }; }; ``` ​ `task_func`是我们定义的一个任务的回调函数类型,第一个参数当然就是让哪个loop机制去执行这个task任务。很明显,一个loop是对应一个thread线程的。也就是让哪个thread去执行这个task任务。args是`task_func`的函数形参。 ​ ### 15.2 event_loop模块添加task任务机制 ​ 我们知道,task绑定一个loop,很明显,一个`event_loop`应该拥有需要被执行的task集合。 ​ 在这里,我们将event_loop加上已经就绪的task任务的属性 > lars_reactor/include/event_loop.h ```c #pragma once /* * * event_loop事件处理机制 * * */ #include <sys/epoll.h> #include <ext/hash_map> #include <ext/hash_set> #include <vector> #include "event_base.h" #include "task_msg.h" #define MAXEVENTS 10 // map: fd->io_event typedef __gnu_cxx::hash_map<int, io_event> io_event_map; //定义指向上面map类型的迭代器 typedef __gnu_cxx::hash_map<int, io_event>::iterator io_event_map_it; //全部正在监听的fd集合 typedef __gnu_cxx::hash_set<int> listen_fd_set; //定义异步任务回调函数类型 typedef void (*task_func)(event_loop *loop, void *args); class event_loop { public: //构造,初始化epoll堆 event_loop(); //阻塞循环处理事件 void event_process(); //添加一个io事件到loop中 void add_io_event(int fd, io_callback *proc, int mask, void *args=NULL); //删除一个io事件从loop中 void del_io_event(int fd); //删除一个io事件的EPOLLIN/EPOLLOUT void del_io_event(int fd, int mask); // =========================================== //获取全部监听事件的fd集合 void get_listen_fds(listen_fd_set &fds) { fds = listen_fds; } //=== 异步任务task模块需要的方法 === //添加一个任务task到ready_tasks集合中 void add_task(task_func func, void *args); //执行全部的ready_tasks里面的任务 void execute_ready_tasks(); // =========================================== private: int _epfd; //epoll fd //当前event_loop 监控的fd和对应事件的关系 io_event_map _io_evs; //当前event_loop 一共哪些fd在监听 listen_fd_set listen_fds; //一次性最大处理的事件 struct epoll_event _fired_evs[MAXEVENTS]; // =========================================== //需要被执行的task集合 typedef std::pair<task_func, void*> task_func_pair; std::vector<task_func_pair> _ready_tasks; // =========================================== }; ``` 添加了两个属性: `task_func_pair`: 回调函数和参数的键值对. `_ready_tasks`: 所有已经就绪的待执行的任务集合。 同时添加了两个主要方法: `void add_task(task_func func, void *args)`: 添加一个任务到_ready_tasks中. `void execute_ready_tasks()`:执行全部的_ready_tasks任务。 将这两个方法实现如下: > lars_reactor/src/event_loop.cpp ```c //... //添加一个任务task到ready_tasks集合中 void event_loop::add_task(task_func func, void *args) { task_func_pair func_pair(func, args); _ready_tasks.push_back(func_pair); } //执行全部的ready_tasks里面的任务 void event_loop::execute_ready_tasks() { std::vector<task_func_pair>::iterator it; for (it = _ready_tasks.begin(); it != _ready_tasks.end(); it++) { task_func func = it->first;//任务回调函数 void *args = it->second;//回调函数形参 //执行任务 func(this, args); } //全部执行完毕,清空当前的_ready_tasks _ready_tasks.clear(); } //... ``` ​ 那么`execute_ready_tasks()`函数需要在一个恰当的时候被执行,我们这里就放在每次event_loop一次`epoll_wait()`处理完一组fd事件之后,触发一次额外的task任务。 > lars_reactor/src/event_loop.cpp ```c //阻塞循环处理事件 void event_loop::event_process() { while (true) { io_event_map_it ev_it; int nfds = epoll_wait(_epfd, _fired_evs, MAXEVENTS, 10); for (int i = 0; i < nfds; i++) { //... //... } //每次处理完一组epoll_wait触发的事件之后,处理异步任务 this->execute_ready_tasks(); } } ``` ​ 这里补充一下,因为在task的回调函数中,有形参`event_loop *loop`,可能会使用当前loop中监控的fd信息,所以我们应该给event_loop补充一个获取当前loop监控的全部fd信息的方法 ```c class event_loop{ //... //获取全部监听事件的fd集合 void get_listen_fds(listen_fd_set &fds) { fds = listen_fds; } //... }; ``` ### 15.3 thread_pool模块添加task任务机制 ​ 接下来我们就要用thread_pool来想每个thread所绑定的event_pool中去发送task任务,很明显thread_pool应该具备能够将task加入到event_pool中的_ready_task集合的功能。 > lars_reactor/include/thread_pool.h ```c #pragma once #include <pthread.h> #include "task_msg.h" #include "thread_queue.h" class thread_pool { public: //构造,初始化线程池, 开辟thread_cnt个 thread_pool(int thread_cnt); //获取一个thead thread_queue<task_msg>* get_thread(); //发送一个task任务给thread_pool里的全部thread void send_task(task_func func, void *args = NULL); private: //_queues是当前thread_pool全部的消息任务队列头指针 thread_queue<task_msg> ** _queues; //当前线程池中的线程个数 int _thread_cnt; //已经启动的全部therad编号 pthread_t * _tids; //当前选中的线程队列下标 int _index; }; ``` ​ `send_task()`方法就是发送给线程池中全部的thread去执行task任务. > lars_reactor/src/thread_pool.cpp ```c void thread_pool::send_task(task_func func, void *args) { task_msg task; //给当前thread_pool中的每个thread里的pool添加一个task任务 for (int i = 0; i < _thread_cnt; i++) { //封装一个task消息 task.type = task_msg::NEW_TASK; task.task_cb = func; task.args = args; //取出第i个thread的消息队列 thread_queue<task_msg> *queue = _queues[i]; //发送task消息 queue->send(task); } } ``` ​ `send_task()`的实现实际上是告知全部的thread,封装一个`NEW_TASK`类型的消息,通过`task_queue`告知对应的thread.很明显当我们进行 `queue->send(task)`的时候,当前的thread绑定的loop,就会触发`deal_task_message()`回调了。 > lars_reactor/src/thread_pool.cpp ```c /* * 一旦有task消息过来,这个业务是处理task消息业务的主流程 * * 只要有人调用 thread_queue:: send()方法就会触发次函数 */ void deal_task_message(event_loop *loop, int fd, void *args) { //得到是哪个消息队列触发的 thread_queue<task_msg>* queue = (thread_queue<task_msg>*)args; //将queue中的全部任务取出来 std::queue<task_msg> tasks; queue->recv(tasks); while (tasks.empty() != true) { task_msg task = tasks.front(); //弹出一个元素 tasks.pop(); if (task.type == task_msg::NEW_CONN) { //是一个新建链接的任务 //并且将这个tcp_conn加入当当前线程的loop中去监听 tcp_conn *conn = new tcp_conn(task.connfd, loop); if (conn == NULL) { fprintf(stderr, "in thread new tcp_conn error\n"); exit(1); } printf("[thread]: get new connection succ!\n"); } else if (task.type == task_msg::NEW_TASK) { //===========是一个新的普通任务=============== //当前的loop就是一个thread的事件监控loop,让当前loop触发task任务的回调 loop->add_task(task.task_cb, task.args); //========================================== } else { //其他未识别任务 fprintf(stderr, "unknow task!\n"); } } } ``` ​ 我们判断task.type如果是`NEW_TASK`就将该task加入到当前loop中去. 通过上面的设计,可以看出来,thread_pool的`send_task()`应该是一个对外的开发者接口,所以我们要让服务器的`tcp_server`能够获取到`thread_pool`属性. > lars_reactor/include/tcp_server.h ```c class tcp_server { //... //获取当前server的线程池 thread_pool *thread_poll() { return _thread_pool; } //... }; ``` ​ ok,这样我们基本上完成的task异步处理业务的机制. 下面我们来测试一下这个功能. ### 15.4 完成Lars Reactor V0.11开发 > server.cpp ```c #include "tcp_server.h" #include <string> #include <string.h> #include "config_file.h" tcp_server *server; void print_lars_task(event_loop *loop, void *args) { printf("======= Active Task Func! ========\n"); listen_fd_set fds; loop->get_listen_fds(fds);//不同线程的loop,返回的fds是不同的 //可以向所有fds触发 listen_fd_set::iterator it; //遍历fds for (it = fds.begin(); it != fds.end(); it++) { int fd = *it; tcp_conn *conn = tcp_server::conns[fd]; //取出fd if (conn != NULL) { int msgid = 101; const char *msg = "Hello I am a Task!"; conn->send_message(msg, strlen(msg), msgid); } } } //回显业务的回调函数 void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { printf("callback_busi ...\n"); //直接回显 conn->send_message(data, len, msgid); } //打印信息回调函数 void print_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { printf("recv client: [%s]\n", data); printf("msgid: [%d]\n", msgid); printf("len: [%d]\n", len); } //新客户端创建的回调 void on_client_build(net_connection *conn, void *args) { int msgid = 101; const char *msg = "welcome! you online.."; conn->send_message(msg, strlen(msg), msgid); //创建链接成功之后触发任务 server->thread_poll()->send_task(print_lars_task); } //客户端销毁的回调 void on_client_lost(net_connection *conn, void *args) { printf("connection is lost !\n"); } int main() { event_loop loop; //加载配置文件 config_file::setPath("./serv.conf"); std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0"); short port = config_file::instance()->GetNumber("reactor", "port", 8888); printf("ip = %s, port = %d\n", ip.c_str(), port); server = new tcp_server(&loop, ip.c_str(), port); //注册消息业务路由 server->add_msg_router(1, callback_busi); server->add_msg_router(2, print_busi); //注册链接hook回调 server->set_conn_start(on_client_build); server->set_conn_close(on_client_lost); loop.event_process(); return 0; } ``` ​ 我们在每次建立连接成功之后,触发任务机制。其中`print_lars_task()`方法就是我们的异步任务。由于是全部thead都出发,所以该方法会被每个thread执行。但是不同的thread中的pool所返回的fd是不一样的,这里在`print_lars_task()`中,我们给对应的客户端做了一个简单的消息发送。 ​ > client.cpp ```c #include "tcp_client.h" #include <stdio.h> #include <string.h> //客户端业务 void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { //得到服务端回执的数据 char *str = NULL; str = (char*)malloc(len+1); memset(str, 0, len+1); memcpy(str, data, len); printf("recv server: [%s]\n", str); printf("msgid: [%d]\n", msgid); printf("len: [%d]\n", len); } //客户端销毁的回调 void on_client_build(net_connection *conn, void *args) { int msgid = 1; const char *msg = "Hello Lars!"; conn->send_message(msg, strlen(msg), msgid); } //客户端销毁的回调 void on_client_lost(net_connection *conn, void *args) { printf("on_client_lost...\n"); printf("Client is lost!\n"); } int main() { event_loop loop; //创建tcp客户端 tcp_client client(&loop, "127.0.0.1", 7777, "clientv0.6"); //注册消息路由业务 client.add_msg_router(1, busi); client.add_msg_router(101, busi); //设置hook函数 client.set_conn_start(on_client_build); client.set_conn_close(on_client_lost); //开启事件监听 loop.event_process(); return 0; } ``` ​ 客户端代码无差别。 编译并运行 服务端: ```bash $ ./server msg_router init... ip = 127.0.0.1, port = 7777 create 0 thread create 1 thread create 2 thread create 3 thread create 4 thread add msg cb msgid = 1 add msg cb msgid = 2 begin accept begin accept [thread]: get new connection succ! callback_busi ... ======= Active Task Func! ======== ======= Active Task Func! ======== ======= Active Task Func! ======== ======= Active Task Func! ======== ======= Active Task Func! ======== ``` 客户端: ```c $ ./client msg_router init... do_connect EINPROGRESS add msg cb msgid = 1 add msg cb msgid = 101 connect 127.0.0.1:7777 succ! recv server: [welcome! you online..] msgid: [101] len: [21] recv server: [Hello Lars!] msgid: [1] len: [11] recv server: [Hello I am a Task!] msgid: [101] len: [18] ``` ​ task机制已经集成完毕,lars_reactor功能更加强大了。 --- ### 关于作者: 作者:`Aceld(刘丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原创声明:未经作者允许请勿转载, 如果转载请注明出处**