企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 5) 存储线程池及消息队列 ​ 我们现在的reporter_service的io入库操作,完全是在消息的callback中进行的,那么实际上,这回占用我们server的工作线程的阻塞时间,从而浪费cpu。所以我们应该将io的入库操作,交给一个专门做入库的消息队列线程池来做,这样我们的callback就会立刻返回该业务,从而可以继续处理下一个conn链接的消息事件业务。 ​ 所以我们就要在此给reporter_service设计一个存储数据的线程池及配套的消息队列。当然这里面我们还是直接用写好的`lars_reactor`框架里的接口即可。 > lars_reporter/src/reporter_service.cpp ```c #include "lars_reactor.h" #include "lars.pb.h" #include "store_report.h" #include <string> thread_queue<lars::ReportStatusRequest> **reportQueues = NULL; int thread_cnt = 0; void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { lars::ReportStatusRequest req; req.ParseFromArray(data, len); //将上报数据存储到db StoreReport sr; sr.store(req); //轮询将消息平均发送到每个线程的消息队列中 static int index = 0; //将消息发送给某个线程消息队列 reportQueues[index]->send(req); index ++; index = index % thread_cnt; } void create_reportdb_threads() { thread_cnt = config_file::instance()->GetNumber("reporter", "db_thread_cnt", 3); //开线程池的消息队列 reportQueues = new thread_queue<lars::ReportStatusRequest>*[thread_cnt]; if (reportQueues == NULL) { fprintf(stderr, "create thread_queue<lars::ReportStatusRequest>*[%d], error", thread_cnt) ; exit(1); } for (int i = 0; i < thread_cnt; i++) { //给当前线程创建一个消息队列queue reportQueues[i] = new thread_queue<lars::ReportStatusRequest>(); if (reportQueues == NULL) { fprintf(stderr, "create thread_queue error\n"); exit(1); } pthread_t tid; int ret = pthread_create(&tid, NULL, store_main, reportQueues[i]); if (ret == -1) { perror("pthread_create"); exit(1); } pthread_detach(tid); } } int main(int argc, char **argv) { event_loop loop; //加载配置文件 config_file::setPath("./conf/lars_reporter.conf"); std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0"); short port = config_file::instance()->GetNumber("reactor", "port", 7779); //创建tcp server tcp_server server(&loop, ip.c_str(), port); //添加数据上报请求处理的消息分发处理业务 server.add_msg_router(lars::ID_ReportStatusRequest, get_report_status); //为了防止在业务中出现io阻塞,那么需要启动一个线程池对IO进行操作的,接受业务的请求存储消息 create_reportdb_threads(); //启动事件监听 loop.event_process(); return 0; } ``` ​ 这里主线程启动了线程池,根据配置文件的`db_thread_cnt`数量来开辟。每个线程都会执行`store_main`方法,我们来看一下实现 > lars_reporter/src/store_thread.cpp ```c #include "lars.pb.h" #include "lars_reactor.h" #include "store_report.h" struct Args { thread_queue<lars::ReportStatusRequest>* first; StoreReport *second; }; //typedef void io_callback(event_loop *loop, int fd, void *args); void thread_report(event_loop *loop, int fd, void *args) { //1. 从queue里面取出需要report的数据(需要thread_queue) thread_queue<lars::ReportStatusRequest>* queue = ((Args*)args)->first; StoreReport *sr = ((Args*)args)->second; std::queue<lars::ReportStatusRequest> report_msgs; //1.1 从消息队列中取出全部的消息元素集合 queue->recv(report_msgs); while ( !report_msgs.empty() ) { lars::ReportStatusRequest msg = report_msgs.front(); report_msgs.pop(); //2. 将数据存储到DB中(需要StoreReport) sr->store(msg); } } void *store_main(void *args) { //得到对应的thread_queue thread_queue<lars::ReportStatusRequest> *queue = (thread_queue<lars::ReportStatusRequest>*)args; //定义事件触发机制 event_loop loop; //定义一个存储对象 StoreReport sr; Args callback_args; callback_args.first = queue; callback_args.second = &sr; queue->set_loop(&loop); queue->set_callback(thread_report, &callback_args); //启动事件监听 loop.event_process(); return NULL; } ``` ​ 每个线程都会绑定一个`thread_queue<lars::ReportStatusRequest>`,然后一个线程里面有一个loop,来监控消息队列是否有消息事件过来,如果有消息实现过来,针对每个消息会触发`thread_report()`方法, 在`thread_report()`中,我们就直接将`lars::ReportStatusRequest`消息存储到db中。 ​ 那么,由谁来给每个线程的`thread_queue`发送消息呢,就是agent/客户端发送的请求,我们在处理`lars::ID_ReportStatusRequest` 消息分发业务的时候调用`get_report_status()`来触发。 > lars_reporter/src/reporter_service.cpp ```c void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { lars::ReportStatusRequest req; req.ParseFromArray(data, len); //将上报数据存储到db StoreReport sr; sr.store(req); //轮询将消息平均发送到每个线程的消息队列中 static int index = 0; //将消息发送给某个线程消息队列 reportQueues[index]->send(req); index ++; index = index % thread_cnt; } ``` ​ 这里的分发机制,是采用最轮询的方式,是每个线程依次分配,去调用`thread_queue`的`send()`方法,将消息发送给消息队列。 ​ 最后我们进行测试,效果跟之前的效果是一样的。我们现在已经集成进来了存储线程池,现在就不用担心在处理业务的时候,因为DB等的io阻塞,使cpu得不到充分利用了。 --- ### 关于作者: 作者:`Aceld(刘丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原创声明:未经作者允许请勿转载, 如果转载请注明出处**