企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] ## `ccnet`从单线程演变为多线程 ccnet现状: * epoll * linux平台 * 事件循环 * 单线程 * reactor * 监听和新连接的事件都在主线程中 ## 单线程的`ccnet` ![rbx4XD.png](https://s3.ax1x.com/2020/12/29/rbx4XD.png) `ccnet`的单线程的角色: 1. `EventLoop`: 整个事件循环。 2. `EPollPoller`: 负责事件的收集。 3. `Channel`:负责事件的分发。 4. `Acceptor`:处理客户端新连接,绑定监听回调时间。 5. `TcpConnection`: 包含`EventLoop`组件,`Channel`组件,接收缓冲区和发送缓冲区,负责数据的收发。 6. `TcpServer`:包含`EventLoop`组件,`Acceptor`组件,客户端连接`map`,还有回调接口 7. `Buffer`: 缓冲区 `ccnet`的单线程模型只是对epoll进行了封装,然后根据reactor模型在代码上进行了组件的区分,但是整体操作还是单线程,不能充分利用硬件资源。`TcpServer`的回调函数中,数据接收和业务处理在同一条线程中。 ## 单线程的`reactor` ![rbbTJS.png](https://s3.ax1x.com/2020/12/29/rbbTJS.png) Reactor模型中定义的三种角色: 1. Reactor:负责监听和分配事件,将I/O事件分派给对应的Handler。新的事件包含连接建立就绪、读就绪、写就绪等。 2. Acceptor:处理客户端新连接,并分派请求到处理器中。 3. Handler:将自身与事件绑定,执行非阻塞读/写任务。 单线程`reactor`的消息处理流程: 1. `Reactor`通过 `select/poll/epoll` 监控连接事件,通过`dispatch`进行分发 2. 如果是连接建立的事件,则由`acceptor`处理,并创建`handler`处理后续事件。 3. 如果不是建立连接事件,则`Reactor`会分发调用`Handler`来响应。 4. `handler`会完成`read`->业务处理->`send`的完整业务流程。 `Reactor`单线程模型只是在代码上进行了组件的区分,但是整体操作还是单线程,不能充分利用硬件资源。`handler`业务处理部分没有异步。 对于一些小容量应用场景,可以使用单`Reactor`单线程模型。但是对于高负载、大并发的应用场景却不合适,主要原因如下: 1. 即便`Reactor`线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送。 2. 当`Reactor`线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重`Reactor`线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈。 3. 一旦`Reactor`线程意外中断或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障。 于是所线程 ## 主从`Reactor`多线程模型 ![rbjKAI.png](https://s3.ax1x.com/2020/12/29/rbjKAI.png) 消息处理流程: 1. 从主线程池中随机选择一个`Reactor`线程作为`acceptor`线程,用于绑定监听端口,接收客户端连接 2. `acceptor`线程接收客户端连接请求之后创建新的`SocketChannel`,将其注册到主线程池的其它`Reactor`线程上。 3. 步骤2完成之后,业务层的链路正式建立,将`SocketChannel`从主线程池的Reactor线程的多路复用器上摘除,重新注册到`SubReactor`线程池的线程上,并创建一个`Handler`用于处理各种**读写事件** 4. 当有新的事件发生时,`SubReactor`会调用连接对应的`Handler`进行响应 5. `Handler`通过`Read`读取数据后,会分发给后面的`Worker`线程池进行业务处理 6. `Worker`线程池会分配独立的线程完成真正的业务处理,如何将响应结果发给Handler进行处理 7. `Handler`收到响应结果后通过`Send`将响应结果返回给`Client` ## 多线程的`ccnet` ![rbzD8P.png](https://s3.ax1x.com/2020/12/29/rbzD8P.png) ![rbTFET.png](https://s3.ax1x.com/2020/12/29/rbTFET.png) 根据主从`Reactor`多线程模型,可以想到`ccnet`单线程模型中还缺少事件循环线程池`EventLoopThreadPool`组件和**worker线程池**,`EventLoopThreadPool`顾名思义,每个线程里都有一个事件循环。 可以把`ccnet`的单线程模型中 `TcpServer`组件修改一下,因为他包含事件循环`EventLoop`组件`和Acceptor`组件,也绑定了 `Acceptor`的新连接事件回调,`TcpServer`一个 `ccnet`多线程模型所需组件的集合,他还需要的组件如下: * `EventLoopThread`: on loop peer thread,负责开启一个线程一个事件循环。 * `EventLoopThreadPool`: 是事件循环线程池。 * `TcpServer`: 增加`EventLoopThreadPool`组件。 修改`TcpServer.cc`的代码: ``` void TcpServer::newConnection(int sockfd, const TcpAddr& peerAddr) { ... EventLoop *io_loop = thread_pool_->getNextLoop(); //增加的行 TcpConnectionPtr conn(new TcpConnection(io_loop, //修改的行 connName, sockfd, localaddr, peerAddr)); ... io_loop->runInLoop(std::bind(&TcpConnection::connectEstablished, conn));//修改的行 } ``` 修改`TcpServer.h`代码: ``` class TcpServer { ... //thread pool std::shared_ptr<EventLoopThreadPool> thread_pool_; ... } ``` 修改example中的`EchoServer.cc`代码: ``` server_.setNumThreads(4); //设置线程数量 ``` 详细代码请参考 github: https://github.com/diycat1024/ccnet