ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
## 4) 获取report上报数据 ### 4.1 构建Lars-Reporter项目 创建Lars-Reporter项目目录 > Lars/lars_reporter/bin/ > > Lars/lars_reporter/conf/ > > Lars/lars_reporter/include/ > > Lars/lars_reporter/src/ > > Lars/lars_reporter/test/ > > Lars/lars_reporter/Makefile 其中: > lars_reporter/conf/lars_reporter.conf ```ini [reactor] maxConn = 1024 threadNum = 5 ip = 127.0.0.1 port = 7779 [mysql] db_host = 127.0.0.1 db_port = 3306 db_user = root db_passwd = aceld db_name = lars_dns [repoter] db_thread_cnt = 3 ``` > lars_reporter/Makefile ```makefile TARGET= bin/lars_reporter CXX=g++ CFLAGS=-g -O2 -Wall -Wno-deprecated BASE=../base BASE_H=$(BASE)/include PROTO = $(BASE)/proto PROTO_H = $(BASE)/proto LARS_REACTOR=../lars_reactor LARS_REACTOR_H =$(LARS_REACTOR)/include LARS_REACTOR_LIB=$(LARS_REACTOR)/lib -llreactor MYSQL=$(BASE)/mysql-connector-c MYSQL_H=$(MYSQL)/include MYSQL_LIB=$(MYSQL)/lib/libmysqlclient.a OTHER_LIB = -lpthread -ldl -lprotobuf SRC= ./src INC= -I./include -I$(BASE_H) -I$(LARS_REACTOR_H) -I$(MYSQL_H) -I$(PROTO_H) LIB= $(MYSQL_LIB) -L$(LARS_REACTOR_LIB) $(OTHER_LIB) OBJS = $(addsuffix .o, $(basename $(wildcard $(SRC)/*.cpp))) OBJS += $(PROTO)/lars.pb.o $(TARGET): $(OBJS) mkdir -p bin $(CXX) $(CFLAGS) -o $(TARGET) $(OBJS) $(INC) $(LIB) %.o: %.cpp $(CXX) $(CFLAGS) -c -o $@ $< $(INC) .PHONY: clean clean: -rm -f src/*.o $(PROTO)/lars.pb.o $(TARGET) ``` ### 4.2 完成Lars-Service Reporter接受处理业务 ​ 我们先完成客户端/或者agent的发送过来的reporter上报数据请求的处理业务。 > lars_reporter/src/reporter_service.cpp ```c #include "lars_reactor.h" #include "lars.pb.h" #include "store_report.h" #include <string> void get_report_status(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { lars::ReportStatusRequest req; req.ParseFromArray(data, len); //将上报数据存储到db StoreReport sr; sr.store(req); } int main(int argc, char **argv) { event_loop loop; //加载配置文件 config_file::setPath("./conf/lars_reporter.conf"); std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0"); short port = config_file::instance()->GetNumber("reactor", "port", 7779); //创建tcp server tcp_server server(&loop, ip.c_str(), port); //添加数据上报请求处理的消息分发处理业务 server.add_msg_router(lars::ID_ReportStatusRequest, get_report_status); //启动事件监听 loop.event_process(); return 0; } ``` > lars_reporter/include/store_report.h ```c #pragma once #include "mysql.h" #include "lars.pb.h" class StoreReport { public: StoreReport(); void store(lars::ReportStatusRequest req); private: MYSQL _db_conn; }; ``` > lars_reporter/src/store_report.cpp ```c #include "store_report.h" #include "lars_reactor.h" #include <string> #include <unistd.h> StoreReport::StoreReport() { //1 初始化 //1.1 多线程使用mysql需要先调用mysql_library_init mysql_library_init(0, NULL, NULL); //1.2 初始化链接,和设置超时时间 mysql_init(&_db_conn); mysql_options(&_db_conn, MYSQL_OPT_CONNECT_TIMEOUT, "30"); my_bool reconnect = 1; mysql_options(&_db_conn, MYSQL_OPT_RECONNECT, &reconnect); //2 加载配置 std::string db_host = config_file::instance()->GetString("mysql", "db_host", "127.0.0.1"); short db_port = config_file::instance()->GetNumber("mysql", "db_port", 3306); std::string db_user = config_file::instance()->GetString("mysql", "db_user", "root"); std::string db_passwd = config_file::instance()->GetString("mysql", "db_passwd", "aceld"); std::string db_name = config_file::instance()->GetString("mysql", "db_name", "lars_dns"); //3 链接数据库 if ( mysql_real_connect(&_db_conn, db_host.c_str(), db_user.c_str(), db_passwd.c_str(), db_name.c_str(), db_port, NULL, 0) == NULL) { fprintf(stderr, "mysql real connect error\n"); exit(1); } } void StoreReport::store(lars::ReportStatusRequest req) { for (int i = 0; i < req.results_size(); i++) { //一条report 调用记录 const lars::HostCallResult &result = req.results(i); int overload = result.overload() ? 1: 0; char sql[1024]; snprintf(sql, 1024, "INSERT INTO ServerCallStatus" "(modid, cmdid, ip, port, caller, succ_cnt, err_cnt, ts, overload) " "VALUES (%d, %d, %u, %u, %u, %u, %u, %u, %d) ON DUPLICATE KEY " "UPDATE succ_cnt = %u, err_cnt = %u, ts = %u, overload = %d", req.modid(), req.cmdid(), result.ip(), result.port(), req.caller(), result.succ(), result.err(), req.ts(), overload, result.succ(), result.err(), req.ts(), overload); mysql_ping(&_db_conn);//ping 测试一下,防止链接断开,会触发重新建立连接 if (mysql_real_query(&_db_conn, sql, strlen(sql)) != 0) { fprintf(stderr, "Fial to Insert into ServerCallStatus %s\n", mysql_error(&_db_conn)); } } } ``` ​ 这里面的业务很简单,就是如果有客户端发送`ID_ReportStatusRequest`的消息过来,进行处理,然后入库即可。 ### 4.3 完成Lars-reporterV0.1版本测试 > lars_reporter/test/reportClient.cpp ```c #include "lars_reactor.h" #include "lars.pb.h" void report_status(net_connection *conn, void *user_data) { tcp_client *client = (tcp_client*)conn; lars::ReportStatusRequest req; //组装测试消息 req.set_modid(rand() % 3 + 1); req.set_cmdid(1); req.set_caller(123); req.set_ts(time(NULL)); for (int i = 0; i < 3; i ++) { lars::HostCallResult result; result.set_ip(i + 1); result.set_port((i + 1) * (i + 1)); result.set_succ(100); result.set_err(3); result.set_overload(true); req.add_results()->CopyFrom(result); } std::string requestString; req.SerializeToString(&requestString); //发送给reporter service client->send_message(requestString.c_str(), requestString.size(), lars::ID_ReportStatusRequest); } void connection_build(net_connection *conn, void *args) { report_status(conn, args); } int main(int argc, char **argv) { event_loop loop; tcp_client client(&loop, "127.0.0.1", 7779, "reportClient"); //添加建立连接成功业务 client.set_conn_start(connection_build); loop.event_process(); return 0; } ``` > lars_reporter/test/Makefile ```c TARGET= reportClient CXX=g++ CFLAGS=-g -O2 -Wall -Wno-deprecated BASE=../../base BASE_H=$(BASE)/include PROTO = $(BASE)/proto PROTO_H = $(BASE)/proto LARS_REACTOR=../../lars_reactor LARS_REACTOR_H =$(LARS_REACTOR)/include LARS_REACTOR_LIB=$(LARS_REACTOR)/lib -llreactor OTHER_LIB = -lpthread -ldl -lprotobuf SRC= ./src INC= -I./include -I$(BASE_H) -I$(LARS_REACTOR_H) -I$(PROTO_H) LIB= $(MYSQL_LIB) -L$(LARS_REACTOR_LIB) $(OTHER_LIB) OBJS = reportClient.o OBJS += $(PROTO)/lars.pb.o $(TARGET): $(OBJS) $(CXX) $(CFLAGS) -o $(TARGET) $(OBJS) $(INC) $(LIB) %.o: %.cpp $(CXX) $(CFLAGS) -c -o $@ $< $(INC) .PHONY: clean clean: -rm -f ./*.o $(TARGET) ``` ​ 这里我们简单写了一个针对lars_reporter的一个客户端测试程序,模拟发送一个包,测试一下一个基本的reporter的正常功能。 编译并执行,我们发现数据库表中`ServerCallStatus`已经有了我们模拟封装的数据入库。 --- ### 关于作者: 作者:`Aceld(刘丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原创声明:未经作者允许请勿转载, 如果转载请注明出处**