### 10.1 消息任务类型
> lars_reactor/include/task_msg.h
```c
#pragma once
#include "event_loop.h"
struct task_msg
{
enum TASK_TYPE
{
NEW_CONN, //新建链接的任务
NEW_TASK, //一般的任务
};
TASK_TYPE type; //任务类型
//任务的一些参数
union {
//针对 NEW_CONN新建链接任务,需要传递connfd
int connfd;
/*==== 暂时用不上 ==== */
//针对 NEW_TASK 新建任务,
//那么可以给一个任务提供一个回调函数
struct {
void (*task_cb)(event_loop*, void *args);
void *args;
};
};
};
```
这里面task_msg一共有两个类型的type,一个是新链接的任务,一个是普通任务。两个任务所携带的参数不同,所以用了一个union。
### 10.2 消息任务队列
> lars_reactor/include/thread_queue.h
```c
#pragma once
#include <queue>
#include <pthread.h>
#include <sys/eventfd.h>
#include <stdio.h>
#include <unistd.h>
#include "event_loop.h"
/*
*
* 每个thread对应的 消息任务队列
*
* */
template <typename T>
class thread_queue
{
public:
thread_queue()
{
_loop = NULL;
pthread_mutex_init(&_queue_mutex, NULL);
_evfd = eventfd(0, EFD_NONBLOCK);
if (_evfd == -1) {
perror("evenfd(0, EFD_NONBLOCK)");
exit(1);
}
}
~thread_queue()
{
pthread_mutex_destroy(&_queue_mutex);
close(_evfd);
}
//向队列添加一个任务
void send(const T& task) {
//触发消息事件的占位传输内容
unsigned long long idle_num = 1;
pthread_mutex_lock(&_queue_mutex);
//将任务添加到队列
_queue.push(task);
//向_evfd写,触发对应的EPOLLIN事件,来处理该任务
int ret = write(_evfd, &idle_num, sizeof(unsigned long long));
if (ret == -1) {
perror("_evfd write");
}
pthread_mutex_unlock(&_queue_mutex);
}
//获取队列,(当前队列已经有任务)
void recv(std::queue<T>& new_queue) {
unsigned int long long idle_num = 1;
pthread_mutex_lock(&_queue_mutex);
//把占位的数据读出来,确保底层缓冲没有数据存留
int ret = read(_evfd, &idle_num, sizeof(unsigned long long));
if (ret == -1) {
perror("_evfd read");
}
//将当前的队列拷贝出去,将一个空队列换回当前队列,同时清空自身队列,确保new_queue是空队列
std::swap(new_queue, _queue);
pthread_mutex_unlock(&_queue_mutex);
}
//设置当前thead_queue是被哪个事件触发event_loop监控
void set_loop(event_loop *loop) {
_loop = loop;
}
//设置当前消息任务队列的 每个任务触发的回调业务
void set_callback(io_callback *cb, void *args = NULL)
{
if (_loop != NULL) {
_loop->add_io_event(_evfd, cb, EPOLLIN, args);
}
}
//得到当前loop
event_loop * get_loop() {
return _loop;
}
private:
int _evfd; //触发消息任务队列读取的每个消息业务的fd
event_loop *_loop; //当前消息任务队列所绑定在哪个event_loop事件触发机制中
std::queue<T> _queue; //队列
pthread_mutex_t _queue_mutex; //进行添加任务、读取任务的保护锁
};
```
一个模板类,主要是消息任务队列里的元素类型未必一定是`task_msg`类型。
`thread_queue`需要绑定一个`event_loop`。来触发消息到达,捕获消息并且触发处理消息业务的动作。
这里面有个`_evfd`是为了触发消息队列消息到达,处理该消息作用的,将`_evfd`加入到对应线程的`event_loop`中,然后再通过`set_callback`设置一个通用的该queue全部消息所触发的处理业务call_back,在这个call_back里开发者可以自定义实现一些处理业务流程。
1. 通过`send`将任务发送给消息队列。
2. 通过`event_loop`触发注册的io_callback得到消息队列里的任务。
3. 在io_callback中调用`recv`取得`task`任务,根据任务的不同类型,处理自定义不同业务流程。
### 10.3 线程池
接下来,我们定义线程池,将`thread_queue`和`thread_pool`进行关联。
> lars_reactor/include/thread_pool.h
```c
#pragma once
#include <pthread.h>
#include "task_msg.h"
#include "thread_queue.h"
class thread_pool
{
public:
//构造,初始化线程池, 开辟thread_cnt个
thread_pool(int thread_cnt);
//获取一个thead
thread_queue<task_msg>* get_thread();
private:
//_queues是当前thread_pool全部的消息任务队列头指针
thread_queue<task_msg> ** _queues;
//当前线程池中的线程个数
int _thread_cnt;
//已经启动的全部therad编号
pthread_t * _tids;
//当前选中的线程队列下标
int _index;
};
```
**属性**:
`_queues`:是`thread_queue`集合,和当前线程数量一一对应,每个线程对应一个queue。里面存的元素是`task_msg`。
`_tids`:保存线程池中每个线程的ID。
`_thread_cnt`:当前线程的个数.
`_index`:表示外层在选择哪个thead处理任务时的一个下标,因为是轮询处理,所以需要一个下标记录。
**方法**:
`thread_pool()`:构造函数,初始化线程池。
`get_thread()`:通过轮询方式,获取一个线程的thread_queue.
> lars_reactor/src/thread_pool.cpp
```c
#include "thread_pool.h"
#include "event_loop.h"
#include "tcp_conn.h"
#include <unistd.h>
#include <stdio.h>
/*
* 一旦有task消息过来,这个业务是处理task消息业务的主流程
*
* 只要有人调用 thread_queue:: send()方法就会触发次函数
*/
void deal_task_message(event_loop *loop, int fd, void *args)
{
//得到是哪个消息队列触发的
thread_queue<task_msg>* queue = (thread_queue<task_msg>*)args;
//将queue中的全部任务取出来
std::queue<task_msg> tasks;
queue->recv(tasks);
while (tasks.empty() != true) {
task_msg task = tasks.front();
//弹出一个元素
tasks.pop();
if (task.type == task_msg::NEW_CONN) {
//是一个新建链接的任务
//并且将这个tcp_conn加入当当前线程的loop中去监听
tcp_conn *conn = new tcp_conn(task.connfd, loop);
if (conn == NULL) {
fprintf(stderr, "in thread new tcp_conn error\n");
exit(1);
}
printf("[thread]: get new connection succ!\n");
}
else if (task.type == task_msg::NEW_TASK) {
//是一个新的普通任务
//TODO
}
else {
//其他未识别任务
fprintf(stderr, "unknow task!\n");
}
}
}
//一个线程的主业务main函数
void *thread_main(void *args)
{
thread_queue<task_msg> *queue = (thread_queue<task_msg>*)args;
//每个线程都应该有一个event_loop来监控客户端链接的读写事件
event_loop *loop = new event_loop();
if (loop == NULL) {
fprintf(stderr, "new event_loop error\n");
exit(1);
}
//注册一个触发消息任务读写的callback函数
queue->set_loop(loop);
queue->set_callback(deal_task_message, queue);
//启动阻塞监听
loop->event_process();
return NULL;
}
thread_pool::thread_pool(int thread_cnt)
{
_index = 0;
_queues = NULL;
_thread_cnt = thread_cnt;
if (_thread_cnt <= 0) {
fprintf(stderr, "_thread_cnt < 0\n");
exit(1);
}
//任务队列的个数和线程个数一致
_queues = new thread_queue<task_msg>*[thread_cnt];
_tids = new pthread_t[thread_cnt];
int ret;
for (int i = 0; i < thread_cnt; ++i) {
//创建一个线程
printf("create %d thread\n", i);
//给当前线程创建一个任务消息队列
_queues[i] = new thread_queue<task_msg>();
ret = pthread_create(&_tids[i], NULL, thread_main, _queues[i]);
if (ret == -1) {
perror("thread_pool, create thread");
exit(1);
}
//将线程脱离
pthread_detach(_tids[i]);
}
}
thread_queue<task_msg>* thread_pool::get_thread()
{
if (_index == _thread_cnt) {
_index = 0;
}
return _queues[_index];
}
```
这里主要看`deal_task_message()`方法,是处理收到的task任务的。目前我们只对`NEW_CONN`类型的任务进行处理,一般任务先不做处理,因为暂时用不上。
`NEW_CONN`的处理主要是让当前线程创建链接,并且将该链接由当前线程的event_loop接管。
接下来我们就要将线程池添加到reactor框架中去。
### 10.4 reactor线程池关联
将线程池添加到`tcp_server`中。
> lars_reactor/include/tcp_server.h
```c
#pragma once
#include <netinet/in.h>
#include "event_loop.h"
#include "tcp_conn.h"
#include "message.h"
#include "thread_pool.h"
class tcp_server
{
public:
// ...
// ...
private:
// ...
//线程池
thread_pool *_thread_pool;
};
```
在构造函数中,添加_thread_pool的初始化工作。并且在accept成功之后交给线程处理客户端的读写事件。
```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <unistd.h>
#include <signal.h>
#include <sys/types.h> /* See NOTES */
#include <sys/socket.h>
#include <arpa/inet.h>
#include <errno.h>
#include "tcp_server.h"
#include "tcp_conn.h"
#include "reactor_buf.h"
//server的构造函数
tcp_server::tcp_server(event_loop *loop, const char *ip, uint16_t port)
{
// ...
//6 创建链接管理
_max_conns = MAX_CONNS;
//创建链接信息数组
conns = new tcp_conn*[_max_conns+3];//3是因为stdin,stdout,stderr 已经被占用,再新开fd一定是从3开始,所以不加3就会栈溢出
if (conns == NULL) {
fprintf(stderr, "new conns[%d] error\n", _max_conns);
exit(1);
}
//7 =============创建线程池=================
int thread_cnt = 3;//TODO 从配置文件中读取
if (thread_cnt > 0) {
_thread_pool = new thread_pool(thread_cnt);
if (_thread_pool == NULL) {
fprintf(stderr, "tcp_server new thread_pool error\n");
exit(1);
}
}
// ========================================
//8 注册_socket读事件-->accept处理
_loop->add_io_event(_sockfd, accept_callback, EPOLLIN, this);
}
//开始提供创建链接服务
void tcp_server::do_accept()
{
int connfd;
while(true) {
//accept与客户端创建链接
printf("begin accept\n");
connfd = accept(_sockfd, (struct sockaddr*)&_connaddr, &_addrlen);
if (connfd == -1) {
if (errno == EINTR) {
fprintf(stderr, "accept errno=EINTR\n");
continue;
}
else if (errno == EMFILE) {
//建立链接过多,资源不够
fprintf(stderr, "accept errno=EMFILE\n");
}
else if (errno == EAGAIN) {
fprintf(stderr, "accept errno=EAGAIN\n");
break;
}
else {
fprintf(stderr, "accept error\n");
exit(1);
}
}
else {
//accept succ!
int cur_conns;
get_conn_num(&cur_conns);
//1 判断链接数量
if (cur_conns >= _max_conns) {
fprintf(stderr, "so many connections, max = %d\n", _max_conns);
close(connfd);
}
else {
// ========= 将新连接由线程池处理 ==========
if (_thread_pool != NULL) {
//启动多线程模式 创建链接
//1 选择一个线程来处理
thread_queue<task_msg>* queue = _thread_pool->get_thread();
//2 创建一个新建链接的消息任务
task_msg task;
task.type = task_msg::NEW_CONN;
task.connfd = connfd;
//3 添加到消息队列中,让对应的thread进程event_loop处理
queue->send(task);
// =====================================
}
else {
//启动单线程模式
tcp_conn *conn = new tcp_conn(connfd, _loop);
if (conn == NULL) {
fprintf(stderr, "new tcp_conn error\n");
exit(1);
}
printf("[tcp_server]: get new connection succ!\n");
break;
}
}
}
}
}
```
### 10.5 完成Lars ReactorV0.8开发
0.8版本的server.cpp和client.cpp是不用改变的。开启服务端和客户端观察执行结果即可。
服务端:
```bash
$ ./server
msg_router init...
create 0 thread
create 1 thread
create 2 thread
add msg cb msgid = 1
add msg cb msgid = 2
begin accept
begin accept
[thread]: get new connection succ!
read data: Hello Lars!
call msgid = 1
call data = Hello Lars!
call msglen = 11
callback_busi ...
=======
```
客户端
```bash
$ ./client
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 1
add msg cb msgid = 101
connect 127.0.0.1:7777 succ!
do write over, del EPOLLOUT
call msgid = 101
call data = welcome! you online..
call msglen = 21
recv server: [welcome! you online..]
msgid: [101]
len: [21]
=======
call msgid = 1
call data = Hello Lars!
call msglen = 11
recv server: [Hello Lars!]
msgid: [1]
len: [11]
=======
```
我们会发现,链接已经成功创建成功,并且是由于线程处理的读写任务。
---
### 关于作者:
作者:`Aceld(刘丹冰)`
mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
github: [https://github.com/aceld](https://github.com/aceld)
原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld)
![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg)
>**原创声明:未经作者允许请勿转载, 如果转载请注明出处**
- 一、Lars系统概述
- 第1章-概述
- 第2章-项目目录构建
- 二、Reactor模型服务器框架
- 第1章-项目结构与V0.1雏形
- 第2章-内存管理与Buffer封装
- 第3章-事件触发EventLoop
- 第4章-链接与消息封装
- 第5章-Client客户端模型
- 第6章-连接管理及限制
- 第7章-消息业务路由分发机制
- 第8章-链接创建/销毁Hook机制
- 第9章-消息任务队列与线程池
- 第10章-配置文件读写功能
- 第11章-udp服务与客户端
- 第12章-数据传输协议protocol buffer
- 第13章-QPS性能测试
- 第14章-异步消息任务机制
- 第15章-链接属性设置功能
- 三、Lars系统之DNSService
- 第1章-Lars-dns简介
- 第2章-数据库创建
- 第3章-项目目录结构及环境构建
- 第4章-Route结构的定义
- 第5章-获取Route信息
- 第6章-Route订阅模式
- 第7章-Backend Thread实时监控
- 四、Lars系统之Report Service
- 第1章-项目概述-数据表及proto3协议定义
- 第2章-获取report上报数据
- 第3章-存储线程池及消息队列
- 五、Lars系统之LoadBalance Agent
- 第1章-项目概述及构建
- 第2章-主模块业务结构搭建
- 第3章-Report与Dns Client设计与实现
- 第4章-负载均衡模块基础设计
- 第5章-负载均衡获取Host主机信息API
- 第6章-负载均衡上报Host主机信息API
- 第7章-过期窗口清理与过载超时(V0.5)
- 第8章-定期拉取最新路由信息(V0.6)
- 第9章-负载均衡获取Route信息API(0.7)
- 第10章-API初始化接口(V0.8)
- 第11章-Lars Agent性能测试工具
- 第12章- Lars启动工具脚本