💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
### 10.1 消息任务类型 > lars_reactor/include/task_msg.h ```c #pragma once #include "event_loop.h" struct task_msg { enum TASK_TYPE { NEW_CONN, //新建链接的任务 NEW_TASK, //一般的任务 }; TASK_TYPE type; //任务类型 //任务的一些参数 union { //针对 NEW_CONN新建链接任务,需要传递connfd int connfd; /*==== 暂时用不上 ==== */ //针对 NEW_TASK 新建任务, //那么可以给一个任务提供一个回调函数 struct { void (*task_cb)(event_loop*, void *args); void *args; }; }; }; ``` ​ 这里面task_msg一共有两个类型的type,一个是新链接的任务,一个是普通任务。两个任务所携带的参数不同,所以用了一个union。 ​ ### 10.2 消息任务队列 > lars_reactor/include/thread_queue.h ```c #pragma once #include <queue> #include <pthread.h> #include <sys/eventfd.h> #include <stdio.h> #include <unistd.h> #include "event_loop.h" /* * * 每个thread对应的 消息任务队列 * * */ template <typename T> class thread_queue { public: thread_queue() { _loop = NULL; pthread_mutex_init(&_queue_mutex, NULL); _evfd = eventfd(0, EFD_NONBLOCK); if (_evfd == -1) { perror("evenfd(0, EFD_NONBLOCK)"); exit(1); } } ~thread_queue() { pthread_mutex_destroy(&_queue_mutex); close(_evfd); } //向队列添加一个任务 void send(const T& task) { //触发消息事件的占位传输内容 unsigned long long idle_num = 1; pthread_mutex_lock(&_queue_mutex); //将任务添加到队列 _queue.push(task); //向_evfd写,触发对应的EPOLLIN事件,来处理该任务 int ret = write(_evfd, &idle_num, sizeof(unsigned long long)); if (ret == -1) { perror("_evfd write"); } pthread_mutex_unlock(&_queue_mutex); } //获取队列,(当前队列已经有任务) void recv(std::queue<T>& new_queue) { unsigned int long long idle_num = 1; pthread_mutex_lock(&_queue_mutex); //把占位的数据读出来,确保底层缓冲没有数据存留 int ret = read(_evfd, &idle_num, sizeof(unsigned long long)); if (ret == -1) { perror("_evfd read"); } //将当前的队列拷贝出去,将一个空队列换回当前队列,同时清空自身队列,确保new_queue是空队列 std::swap(new_queue, _queue); pthread_mutex_unlock(&_queue_mutex); } //设置当前thead_queue是被哪个事件触发event_loop监控 void set_loop(event_loop *loop) { _loop = loop; } //设置当前消息任务队列的 每个任务触发的回调业务 void set_callback(io_callback *cb, void *args = NULL) { if (_loop != NULL) { _loop->add_io_event(_evfd, cb, EPOLLIN, args); } } //得到当前loop event_loop * get_loop() { return _loop; } private: int _evfd; //触发消息任务队列读取的每个消息业务的fd event_loop *_loop; //当前消息任务队列所绑定在哪个event_loop事件触发机制中 std::queue<T> _queue; //队列 pthread_mutex_t _queue_mutex; //进行添加任务、读取任务的保护锁 }; ``` ​ 一个模板类,主要是消息任务队列里的元素类型未必一定是`task_msg`类型。 `thread_queue`需要绑定一个`event_loop`。来触发消息到达,捕获消息并且触发处理消息业务的动作。 ​ 这里面有个`_evfd`是为了触发消息队列消息到达,处理该消息作用的,将`_evfd`加入到对应线程的`event_loop`中,然后再通过`set_callback`设置一个通用的该queue全部消息所触发的处理业务call_back,在这个call_back里开发者可以自定义实现一些处理业务流程。 1. 通过`send`将任务发送给消息队列。 2. 通过`event_loop`触发注册的io_callback得到消息队列里的任务。 3. 在io_callback中调用`recv`取得`task`任务,根据任务的不同类型,处理自定义不同业务流程。 ### 10.3 线程池 ​ 接下来,我们定义线程池,将`thread_queue`和`thread_pool`进行关联。 > lars_reactor/include/thread_pool.h ```c #pragma once #include <pthread.h> #include "task_msg.h" #include "thread_queue.h" class thread_pool { public: //构造,初始化线程池, 开辟thread_cnt个 thread_pool(int thread_cnt); //获取一个thead thread_queue<task_msg>* get_thread(); private: //_queues是当前thread_pool全部的消息任务队列头指针 thread_queue<task_msg> ** _queues; //当前线程池中的线程个数 int _thread_cnt; //已经启动的全部therad编号 pthread_t * _tids; //当前选中的线程队列下标 int _index; }; ``` **属性**: `_queues`:是`thread_queue`集合,和当前线程数量一一对应,每个线程对应一个queue。里面存的元素是`task_msg`。 `_tids`:保存线程池中每个线程的ID。 `_thread_cnt`:当前线程的个数. `_index`:表示外层在选择哪个thead处理任务时的一个下标,因为是轮询处理,所以需要一个下标记录。 **方法**: `thread_pool()`:构造函数,初始化线程池。 `get_thread()`:通过轮询方式,获取一个线程的thread_queue. > lars_reactor/src/thread_pool.cpp ```c #include "thread_pool.h" #include "event_loop.h" #include "tcp_conn.h" #include <unistd.h> #include <stdio.h> /* * 一旦有task消息过来,这个业务是处理task消息业务的主流程 * * 只要有人调用 thread_queue:: send()方法就会触发次函数 */ void deal_task_message(event_loop *loop, int fd, void *args) { //得到是哪个消息队列触发的 thread_queue<task_msg>* queue = (thread_queue<task_msg>*)args; //将queue中的全部任务取出来 std::queue<task_msg> tasks; queue->recv(tasks); while (tasks.empty() != true) { task_msg task = tasks.front(); //弹出一个元素 tasks.pop(); if (task.type == task_msg::NEW_CONN) { //是一个新建链接的任务 //并且将这个tcp_conn加入当当前线程的loop中去监听 tcp_conn *conn = new tcp_conn(task.connfd, loop); if (conn == NULL) { fprintf(stderr, "in thread new tcp_conn error\n"); exit(1); } printf("[thread]: get new connection succ!\n"); } else if (task.type == task_msg::NEW_TASK) { //是一个新的普通任务 //TODO } else { //其他未识别任务 fprintf(stderr, "unknow task!\n"); } } } //一个线程的主业务main函数 void *thread_main(void *args) { thread_queue<task_msg> *queue = (thread_queue<task_msg>*)args; //每个线程都应该有一个event_loop来监控客户端链接的读写事件 event_loop *loop = new event_loop(); if (loop == NULL) { fprintf(stderr, "new event_loop error\n"); exit(1); } //注册一个触发消息任务读写的callback函数 queue->set_loop(loop); queue->set_callback(deal_task_message, queue); //启动阻塞监听 loop->event_process(); return NULL; } thread_pool::thread_pool(int thread_cnt) { _index = 0; _queues = NULL; _thread_cnt = thread_cnt; if (_thread_cnt <= 0) { fprintf(stderr, "_thread_cnt < 0\n"); exit(1); } //任务队列的个数和线程个数一致 _queues = new thread_queue<task_msg>*[thread_cnt]; _tids = new pthread_t[thread_cnt]; int ret; for (int i = 0; i < thread_cnt; ++i) { //创建一个线程 printf("create %d thread\n", i); //给当前线程创建一个任务消息队列 _queues[i] = new thread_queue<task_msg>(); ret = pthread_create(&_tids[i], NULL, thread_main, _queues[i]); if (ret == -1) { perror("thread_pool, create thread"); exit(1); } //将线程脱离 pthread_detach(_tids[i]); } } thread_queue<task_msg>* thread_pool::get_thread() { if (_index == _thread_cnt) { _index = 0; } return _queues[_index]; } ``` ​ 这里主要看`deal_task_message()`方法,是处理收到的task任务的。目前我们只对`NEW_CONN`类型的任务进行处理,一般任务先不做处理,因为暂时用不上。 ​ `NEW_CONN`的处理主要是让当前线程创建链接,并且将该链接由当前线程的event_loop接管。 ​ 接下来我们就要将线程池添加到reactor框架中去。 ### 10.4 reactor线程池关联 ​ 将线程池添加到`tcp_server`中。 > lars_reactor/include/tcp_server.h ```c #pragma once #include <netinet/in.h> #include "event_loop.h" #include "tcp_conn.h" #include "message.h" #include "thread_pool.h" class tcp_server { public: // ... // ... private: // ... //线程池 thread_pool *_thread_pool; }; ``` 在构造函数中,添加_thread_pool的初始化工作。并且在accept成功之后交给线程处理客户端的读写事件。 ```c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <strings.h> #include <unistd.h> #include <signal.h> #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include <arpa/inet.h> #include <errno.h> #include "tcp_server.h" #include "tcp_conn.h" #include "reactor_buf.h" //server的构造函数 tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port) { // ... //6 创建链接管理 _max_conns = MAX_CONNS; //创建链接信息数组 conns = new tcp_conn*[_max_conns+3];//3是因为stdin,stdout,stderr 已经被占用,再新开fd一定是从3开始,所以不加3就会栈溢出 if (conns == NULL) { fprintf(stderr, "new conns[%d] error\n", _max_conns); exit(1); } //7 =============创建线程池================= int thread_cnt = 3;//TODO 从配置文件中读取 if (thread_cnt > 0) { _thread_pool = new thread_pool(thread_cnt); if (_thread_pool == NULL) { fprintf(stderr, "tcp_server new thread_pool error\n"); exit(1); } } // ======================================== //8 注册_socket读事件-->accept处理 _loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this); } //开始提供创建链接服务 void tcp_server::do_accept() { int connfd; while(true) { //accept与客户端创建链接 printf("begin accept\n"); connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen); if (connfd == -1) { if (errno == EINTR) { fprintf(stderr, "accept errno=EINTR\n"); continue; } else if (errno == EMFILE) { //建立链接过多,资源不够 fprintf(stderr, "accept errno=EMFILE\n"); } else if (errno == EAGAIN) { fprintf(stderr, "accept errno=EAGAIN\n"); break; } else { fprintf(stderr, "accept error\n"); exit(1); } } else { //accept succ! int cur_conns; get_conn_num(&cur_conns); //1 判断链接数量 if (cur_conns >= _max_conns) { fprintf(stderr, "so many connections, max = %d\n", _max_conns); close(connfd); } else { // ========= 将新连接由线程池处理 ========== if (_thread_pool != NULL) { //启动多线程模式 创建链接 //1 选择一个线程来处理 thread_queue<task_msg>* queue = _thread_pool->get_thread(); //2 创建一个新建链接的消息任务 task_msg task; task.type = task_msg::NEW_CONN; task.connfd = connfd; //3 添加到消息队列中,让对应的thread进程event_loop处理 queue->send(task); // ===================================== } else { //启动单线程模式 tcp_conn *conn = new tcp_conn(connfd, _loop); if (conn == NULL) { fprintf(stderr, "new tcp_conn error\n"); exit(1); } printf("[tcp_server]: get new connection succ!\n"); break; } } } } } ``` ### 10.5 完成Lars ReactorV0.8开发 ​ 0.8版本的server.cpp和client.cpp是不用改变的。开启服务端和客户端观察执行结果即可。 服务端: ```bash $ ./server msg_router init... create 0 thread create 1 thread create 2 thread add msg cb msgid = 1 add msg cb msgid = 2 begin accept begin accept [thread]: get new connection succ! read data: Hello Lars! call msgid = 1 call data = Hello Lars! call msglen = 11 callback_busi ... ======= ``` 客户端 ```bash $ ./client msg_router init... do_connect EINPROGRESS add msg cb msgid = 1 add msg cb msgid = 101 connect 127.0.0.1:7777 succ! do write over, del EPOLLOUT call msgid = 101 call data = welcome! you online.. call msglen = 21 recv server: [welcome! you online..] msgid: [101] len: [21] ======= call msgid = 1 call data = Hello Lars! call msglen = 11 recv server: [Hello Lars!] msgid: [1] len: [11] ======= ``` ​ 我们会发现,链接已经成功创建成功,并且是由于线程处理的读写任务。 --- ### 关于作者: 作者:`Aceld(刘丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原创声明:未经作者允许请勿转载, 如果转载请注明出处**