企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
Backend Thread的后台总业务流程如下: ![](https://img.kancloud.cn/9c/da/9cdaea769064e2aecced69365f720928_839x796.png) ### 7.1 数据库表相关查询方法实现 ​ 我们先实现一些基本的数据表达查询方法: > lars_dns/src/dns_route.cpp ```c /* * return 0, 表示 加载成功,version没有改变 * 1, 表示 加载成功,version有改变 * -1 表示 加载失败 * */ int Route::load_version() { //这里面只会有一条数据 snprintf(_sql, 1000, "SELECT version from RouteVersion WHERE id = 1;"); int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql)); if (ret) { fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn)); return -1; } MYSQL_RES *result = mysql_store_result(&_db_conn); if (!result) { fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn)); return -1; } long line_num = mysql_num_rows(result); if (line_num == 0) { fprintf(stderr, "No version in table RouteVersion: %s\n", mysql_error(&_db_conn)); return -1; } MYSQL_ROW row = mysql_fetch_row(result); //得到version long new_version = atol(row[0]); if (new_version == this->_version) { //加载成功但是没有修改 return 0; } this->_version = new_version; printf("now route version is %ld\n", this->_version); mysql_free_result(result); return 1; } //加载RouteData到_temp_pointer int Route::load_route_data() { _temp_pointer->clear(); snprintf(_sql, 100, "SELECT * FROM RouteData;"); int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql)); if (ret) { fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn)); return -1; } MYSQL_RES *result = mysql_store_result(&_db_conn); if (!result) { fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn)); return -1; } long line_num = mysql_num_rows(result); MYSQL_ROW row; for (long i = 0;i < line_num; ++i) { row = mysql_fetch_row(result); int modid = atoi(row[1]); int cmdid = atoi(row[2]); unsigned ip = atoi(row[3]); int port = atoi(row[4]); uint64_t key = ((uint64_t)modid << 32) + cmdid; uint64_t value = ((uint64_t)ip << 32) + port; (*_temp_pointer)[key].insert(value); } printf("load data to tmep succ! size is %lu\n", _temp_pointer->size()); mysql_free_result(result); return 0; } //将temp_pointer的数据更新到data_pointer void Route::swap() { pthread_rwlock_wrlock(&_map_lock); route_map *temp = _data_pointer; _data_pointer = _temp_pointer; _temp_pointer = temp; pthread_rwlock_unlock(&_map_lock); } //加载RouteChange得到修改的modid/cmdid //将结果放在vector中 void Route::load_changes(std::vector<uint64_t> &change_list) { //读取当前版本之前的全部修改 snprintf(_sql, 1000, "SELECT modid,cmdid FROM RouteChange WHERE version <= %ld;", _version); int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql)); if (ret) { fprintf(stderr, "mysql_real_query: %s\n", mysql_error(&_db_conn)); return ; } MYSQL_RES *result = mysql_store_result(&_db_conn); if (!result) { fprintf(stderr, "mysql_store_result %s\n", mysql_error(&_db_conn)); return ; } long lineNum = mysql_num_rows(result); if (lineNum == 0) { fprintf(stderr, "No version in table ChangeLog: %s\n", mysql_error(&_db_conn)); return ; } MYSQL_ROW row; for (long i = 0;i < lineNum; ++i) { row = mysql_fetch_row(result); int modid = atoi(row[0]); int cmdid = atoi(row[1]); uint64_t key = (((uint64_t)modid) << 32) + cmdid; change_list.push_back(key); } mysql_free_result(result); } //将RouteChange //删除RouteChange的全部修改记录数据,remove_all为全部删除 //否则默认删除当前版本之前的全部修改 void Route::remove_changes(bool remove_all) { if (remove_all == false) { snprintf(_sql, 1000, "DELETE FROM RouteChange WHERE version <= %ld;", _version); } else { snprintf(_sql, 1000, "DELETE FROM RouteChange;"); } int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql)); if (ret != 0) { fprintf(stderr, "delete RouteChange: %s\n", mysql_error(&_db_conn)); return ; } return; } ``` 这里面提供了基本的对一些表的加载和删除操作: `load_version()`:加载当前route信息版本号。 `load_route_data()`:加载`RouteData`信息表,到_temp_pointer中。 `swap()`:将__temp_pointer的表数据同步到_data_temp表中. `load_changes()`:加载RouteChange得到修改的modid/cmdid,将结果放在vector中 `remove_changes()`:清空之前的修改记录。 ### 7.2 Backend Thread业务流程实现 > lars_dns/src/dns_route.cpp ```c //周期性后端检查db的route信息的更新变化业务 //backend thread完成 void *check_route_changes(void *args) { int wait_time = 10;//10s自动修改一次,也可以从配置文件读取 long last_load_time = time(NULL); //清空全部的RouteChange Route::instance()->remove_changes(true); //1 判断是否有修改 while (true) { sleep(1); long current_time = time(NULL); //1.1 加载RouteVersion得到当前版本号 int ret = Route::instance()->load_version(); if (ret == 1) { //version改版 有modid/cmdid修改 //2 如果有修改 //2.1 将最新的RouteData加载到_temp_pointer中 if (Route::instance()->load_route_data() == 0) { //2.2 更新_temp_pointer数据到_data_pointer map中 Route::instance()->swap(); last_load_time = current_time;//更新最后加载时间 } //2.3 获取被修改的modid/cmdid对应的订阅客户端,进行推送 std::vector<uint64_t> changes; Route::instance()->load_changes(changes); //推送 SubscribeList::instance()->publish(changes); //2.4 删除当前版本之前的修改记录 Route::instance()->remove_changes(); } else { //3 如果没有修改 if (current_time - last_load_time >= wait_time) { //3.1 超时,加载最新的temp_pointer if (Route::instance()->load_route_data() == 0) { //3.2 _temp_pointer数据更新到_data_pointer map中 Route::instance()->swap(); last_load_time = current_time; } } } } return NULL; } ``` ​ 该实现与上面流程图描述的过程一样。那么`check_route_changes()`我们可以让一个后台线程进行承载。 > lars_dns/src/dns_service.cpp ```c int main(int argc, char **argv) { event_loop loop; //加载配置文件 config_file::setPath("conf/lars_dns.conf"); std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0"); short port = config_file::instance()->GetNumber("reactor", "port", 7778); //创建tcp服务器 server = new tcp_server(&loop, ip.c_str(), port); //注册链接创建/销毁Hook函数 server->set_conn_start(create_subscribe); server->set_conn_close(clear_subscribe); //注册路由业务 server->add_msg_router(lars::ID_GetRouteRequest, get_route); // ================================================= //开辟backend thread 周期性检查db数据库route信息的更新状态 pthread_t tid; int ret = pthread_create(&tid, NULL, check_route_changes, NULL); if (ret == -1) { perror("pthread_create backendThread"); exit(1); } //设置分离模式 pthread_detach(tid); // ================================================= //开始事件监听 printf("lars dns service ....\n"); loop.event_process(); return 0; } ``` ### 7.3 完成dns模块的订阅功能测试V0.3 ​ 我们提供一个修改一个modid/cmdid的sql语句来触发订阅条件,并且让dns service服务器主动给订阅的客户端发送该订阅消息。 > lars_dns/test/test_insert_dns_route.sql ```sql USE lars_dns; SET @time = UNIX_TIMESTAMP(NOW()); INSERT INTO RouteData(modid, cmdid, serverip, serverport) VALUES(1, 1, 3232235953, 9999); UPDATE RouteVersion SET version = @time WHERE id = 1; INSERT INTO RouteChange(modid, cmdid, version) VALUES(1, 1, @time); ``` 客户端代码: > lars_dns/test/lars_dns_test1.cpp ```c #include <string.h> #include <unistd.h> #include <string> #include "lars_reactor.h" #include "lars.pb.h" //命令行参数 struct Option { Option():ip(NULL),port(0) {} char *ip; short port; }; Option option; void Usage() { printf("Usage: ./lars_dns_test -h ip -p port\n"); } //解析命令行 void parse_option(int argc, char **argv) { for (int i = 0; i < argc; i++) { if (strcmp(argv[i], "-h") == 0) { option.ip = argv[i + 1]; } else if (strcmp(argv[i], "-p") == 0) { option.port = atoi(argv[i + 1]); } } if ( !option.ip || !option.port ) { Usage(); exit(1); } } //typedef void (*conn_callback)(net_connection *conn, void *args); void on_connection(net_connection *conn, void *args) { //发送Route信息请求 lars::GetRouteRequest req; req.set_modid(1); req.set_cmdid(1); std::string requestString; req.SerializeToString(&requestString); conn->send_message(requestString.c_str(), requestString.size(), lars::ID_GetRouteRequest); } void deal_get_route(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data) { //解包得到数据 lars::GetRouteResponse rsp; rsp.ParseFromArray(data, len); //打印数据 printf("modid = %d\n", rsp.modid()); printf("cmdid = %d\n", rsp.cmdid()); printf("host_size = %d\n", rsp.host_size()); for (int i = 0; i < rsp.host_size(); i++) { printf("-->ip = %u\n", rsp.host(i).ip()); printf("-->port = %d\n", rsp.host(i).port()); } } int main(int argc, char **argv) { parse_option(argc, argv); event_loop loop; tcp_client *client; //创建客户端 client = new tcp_client(&loop, option.ip, option.port, "lars_dns_test"); if (client == NULL) { fprintf(stderr, "client == NULL\n"); exit(1); } //客户端成功建立连接,首先发送请求包 client->set_conn_start(on_connection); //设置服务端回应包处理业务 client->add_msg_router(lars::ID_GetRouteResponse, deal_get_route); loop.event_process(); return 0; } ``` 启动dns_server: ```bash $ ./bin/lars_dns msg_router init... create 0 thread create 1 thread create 2 thread create 3 thread create 4 thread add msg cb msgid = 1 lars dns service .... now route version is 1571058286 modID = 1, cmdID = 1, ip = 3232235953, port = 9999 ``` 启动客户端: ```bash $ ./lars_dns_test1 -h 127.0.0.1 -p 7778 msg_router init... do_connect EINPROGRESS add msg cb msgid = 2 connect 127.0.0.1:7778 succ! modid = 1 cmdid = 1 host_size = 1 -->ip = 3232235953 -->port = 9999 ``` 我们知道,第一请求modid/cmdid就会订阅该Route模块。 然后我们通过外界修改modid=1,cmdid=1的模块,新开一个终端,执行test_insert_dns_route.sql ```bash Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> use lars_dns; Reading table information for completion of table and column names You can turn off this feature to get a quicker startup with -A Database changed mysql> \. test_insert_dns_route.sql Database changed Query OK, 0 rows affected (0.00 sec) Query OK, 1 row affected (0.01 sec) Query OK, 1 row affected (0.01 sec) Rows matched: 1 Changed: 1 Warnings: 0 Query OK, 1 row affected (0.02 sec) mysql> ``` 然后我会会发现客户端已经得到一个新的消息,就是最新的route数据过来。是由dns_service主动推送过来的订阅消息. 客户端: ```bash $ ./lars_dns_test1 -h 127.0.0.1 -p 7778 msg_router init... do_connect EINPROGRESS add msg cb msgid = 2 connect 127.0.0.1:7778 succ! modid = 1 cmdid = 1 host_size = 1 -->ip = 3232235953 -->port = 9999 modid = 1 cmdid = 1 host_size = 1 -->ip = 3232235953 -->port = 9999 ``` ​ 这样我们的订阅功能就完成了,整体的lars_dns模块的工作到此的基本需求全部也已经满足。 --- ### 关于作者: 作者:`Aceld(刘丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原创声明:未经作者允许请勿转载, 如果转载请注明出处**