Backend Thread的后台总业务流程如下:
![](https://img.kancloud.cn/9c/da/9cdaea769064e2aecced69365f720928_839x796.png)
### 7.1 数据库表相关查询方法实现
我们先实现一些基本的数据表达查询方法:
> lars_dns/src/dns_route.cpp
```c
/*
* return 0, 表示 加载成功,version没有改变
* 1, 表示 加载成功,version有改变
* -1 表示 加载失败
* */
int Route::load_version()
{
//这里面只会有一条数据
snprintf(_sql, 1000, "SELECT version from RouteVersion WHERE id = 1;");
int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
if (ret)
{
fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn));
return -1;
}
MYSQL_RES *result = mysql_store_result(&_db_conn);
if (!result)
{
fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn));
return -1;
}
long line_num = mysql_num_rows(result);
if (line_num == 0)
{
fprintf(stderr, "No version in table RouteVersion: %s\n", mysql_error(&_db_conn));
return -1;
}
MYSQL_ROW row = mysql_fetch_row(result);
//得到version
long new_version = atol(row[0]);
if (new_version == this->_version)
{
//加载成功但是没有修改
return 0;
}
this->_version = new_version;
printf("now route version is %ld\n", this->_version);
mysql_free_result(result);
return 1;
}
//加载RouteData到_temp_pointer
int Route::load_route_data()
{
_temp_pointer->clear();
snprintf(_sql, 100, "SELECT * FROM RouteData;");
int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
if (ret)
{
fprintf(stderr, "load version error: %s\n", mysql_error(&_db_conn));
return -1;
}
MYSQL_RES *result = mysql_store_result(&_db_conn);
if (!result)
{
fprintf(stderr, "mysql store result: %s\n", mysql_error(&_db_conn));
return -1;
}
long line_num = mysql_num_rows(result);
MYSQL_ROW row;
for (long i = 0;i < line_num; ++i)
{
row = mysql_fetch_row(result);
int modid = atoi(row[1]);
int cmdid = atoi(row[2]);
unsigned ip = atoi(row[3]);
int port = atoi(row[4]);
uint64_t key = ((uint64_t)modid << 32) + cmdid;
uint64_t value = ((uint64_t)ip << 32) + port;
(*_temp_pointer)[key].insert(value);
}
printf("load data to tmep succ! size is %lu\n", _temp_pointer->size());
mysql_free_result(result);
return 0;
}
//将temp_pointer的数据更新到data_pointer
void Route::swap()
{
pthread_rwlock_wrlock(&_map_lock);
route_map *temp = _data_pointer;
_data_pointer = _temp_pointer;
_temp_pointer = temp;
pthread_rwlock_unlock(&_map_lock);
}
//加载RouteChange得到修改的modid/cmdid
//将结果放在vector中
void Route::load_changes(std::vector<uint64_t> &change_list)
{
//读取当前版本之前的全部修改
snprintf(_sql, 1000, "SELECT modid,cmdid FROM RouteChange WHERE version <= %ld;", _version);
int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
if (ret)
{
fprintf(stderr, "mysql_real_query: %s\n", mysql_error(&_db_conn));
return ;
}
MYSQL_RES *result = mysql_store_result(&_db_conn);
if (!result)
{
fprintf(stderr, "mysql_store_result %s\n", mysql_error(&_db_conn));
return ;
}
long lineNum = mysql_num_rows(result);
if (lineNum == 0)
{
fprintf(stderr, "No version in table ChangeLog: %s\n", mysql_error(&_db_conn));
return ;
}
MYSQL_ROW row;
for (long i = 0;i < lineNum; ++i)
{
row = mysql_fetch_row(result);
int modid = atoi(row[0]);
int cmdid = atoi(row[1]);
uint64_t key = (((uint64_t)modid) << 32) + cmdid;
change_list.push_back(key);
}
mysql_free_result(result);
}
//将RouteChange
//删除RouteChange的全部修改记录数据,remove_all为全部删除
//否则默认删除当前版本之前的全部修改
void Route::remove_changes(bool remove_all)
{
if (remove_all == false)
{
snprintf(_sql, 1000, "DELETE FROM RouteChange WHERE version <= %ld;", _version);
}
else
{
snprintf(_sql, 1000, "DELETE FROM RouteChange;");
}
int ret = mysql_real_query(&_db_conn, _sql, strlen(_sql));
if (ret != 0)
{
fprintf(stderr, "delete RouteChange: %s\n", mysql_error(&_db_conn));
return ;
}
return;
}
```
这里面提供了基本的对一些表的加载和删除操作:
`load_version()`:加载当前route信息版本号。
`load_route_data()`:加载`RouteData`信息表,到_temp_pointer中。
`swap()`:将__temp_pointer的表数据同步到_data_temp表中.
`load_changes()`:加载RouteChange得到修改的modid/cmdid,将结果放在vector中
`remove_changes()`:清空之前的修改记录。
### 7.2 Backend Thread业务流程实现
> lars_dns/src/dns_route.cpp
```c
//周期性后端检查db的route信息的更新变化业务
//backend thread完成
void *check_route_changes(void *args)
{
int wait_time = 10;//10s自动修改一次,也可以从配置文件读取
long last_load_time = time(NULL);
//清空全部的RouteChange
Route::instance()->remove_changes(true);
//1 判断是否有修改
while (true) {
sleep(1);
long current_time = time(NULL);
//1.1 加载RouteVersion得到当前版本号
int ret = Route::instance()->load_version();
if (ret == 1) {
//version改版 有modid/cmdid修改
//2 如果有修改
//2.1 将最新的RouteData加载到_temp_pointer中
if (Route::instance()->load_route_data() == 0) {
//2.2 更新_temp_pointer数据到_data_pointer map中
Route::instance()->swap();
last_load_time = current_time;//更新最后加载时间
}
//2.3 获取被修改的modid/cmdid对应的订阅客户端,进行推送
std::vector<uint64_t> changes;
Route::instance()->load_changes(changes);
//推送
SubscribeList::instance()->publish(changes);
//2.4 删除当前版本之前的修改记录
Route::instance()->remove_changes();
}
else {
//3 如果没有修改
if (current_time - last_load_time >= wait_time) {
//3.1 超时,加载最新的temp_pointer
if (Route::instance()->load_route_data() == 0) {
//3.2 _temp_pointer数据更新到_data_pointer map中
Route::instance()->swap();
last_load_time = current_time;
}
}
}
}
return NULL;
}
```
该实现与上面流程图描述的过程一样。那么`check_route_changes()`我们可以让一个后台线程进行承载。
> lars_dns/src/dns_service.cpp
```c
int main(int argc, char **argv)
{
event_loop loop;
//加载配置文件
config_file::setPath("conf/lars_dns.conf");
std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
short port = config_file::instance()->GetNumber("reactor", "port", 7778);
//创建tcp服务器
server = new tcp_server(&loop, ip.c_str(), port);
//注册链接创建/销毁Hook函数
server->set_conn_start(create_subscribe);
server->set_conn_close(clear_subscribe);
//注册路由业务
server->add_msg_router(lars::ID_GetRouteRequest, get_route);
// =================================================
//开辟backend thread 周期性检查db数据库route信息的更新状态
pthread_t tid;
int ret = pthread_create(&tid, NULL, check_route_changes, NULL);
if (ret == -1) {
perror("pthread_create backendThread");
exit(1);
}
//设置分离模式
pthread_detach(tid);
// =================================================
//开始事件监听
printf("lars dns service ....\n");
loop.event_process();
return 0;
}
```
### 7.3 完成dns模块的订阅功能测试V0.3
我们提供一个修改一个modid/cmdid的sql语句来触发订阅条件,并且让dns service服务器主动给订阅的客户端发送该订阅消息。
> lars_dns/test/test_insert_dns_route.sql
```sql
USE lars_dns;
SET @time = UNIX_TIMESTAMP(NOW());
INSERT INTO RouteData(modid, cmdid, serverip, serverport) VALUES(1, 1, 3232235953, 9999);
UPDATE RouteVersion SET version = @time WHERE id = 1;
INSERT INTO RouteChange(modid, cmdid, version) VALUES(1, 1, @time);
```
客户端代码:
> lars_dns/test/lars_dns_test1.cpp
```c
#include <string.h>
#include <unistd.h>
#include <string>
#include "lars_reactor.h"
#include "lars.pb.h"
//命令行参数
struct Option
{
Option():ip(NULL),port(0) {}
char *ip;
short port;
};
Option option;
void Usage() {
printf("Usage: ./lars_dns_test -h ip -p port\n");
}
//解析命令行
void parse_option(int argc, char **argv)
{
for (int i = 0; i < argc; i++) {
if (strcmp(argv[i], "-h") == 0) {
option.ip = argv[i + 1];
}
else if (strcmp(argv[i], "-p") == 0) {
option.port = atoi(argv[i + 1]);
}
}
if ( !option.ip || !option.port ) {
Usage();
exit(1);
}
}
//typedef void (*conn_callback)(net_connection *conn, void *args);
void on_connection(net_connection *conn, void *args)
{
//发送Route信息请求
lars::GetRouteRequest req;
req.set_modid(1);
req.set_cmdid(1);
std::string requestString;
req.SerializeToString(&requestString);
conn->send_message(requestString.c_str(), requestString.size(), lars::ID_GetRouteRequest);
}
void deal_get_route(const char *data, uint32_t len, int msgid, net_connection *net_conn, void *user_data)
{
//解包得到数据
lars::GetRouteResponse rsp;
rsp.ParseFromArray(data, len);
//打印数据
printf("modid = %d\n", rsp.modid());
printf("cmdid = %d\n", rsp.cmdid());
printf("host_size = %d\n", rsp.host_size());
for (int i = 0; i < rsp.host_size(); i++) {
printf("-->ip = %u\n", rsp.host(i).ip());
printf("-->port = %d\n", rsp.host(i).port());
}
}
int main(int argc, char **argv)
{
parse_option(argc, argv);
event_loop loop;
tcp_client *client;
//创建客户端
client = new tcp_client(&loop, option.ip, option.port, "lars_dns_test");
if (client == NULL) {
fprintf(stderr, "client == NULL\n");
exit(1);
}
//客户端成功建立连接,首先发送请求包
client->set_conn_start(on_connection);
//设置服务端回应包处理业务
client->add_msg_router(lars::ID_GetRouteResponse, deal_get_route);
loop.event_process();
return 0;
}
```
启动dns_server:
```bash
$ ./bin/lars_dns
msg_router init...
create 0 thread
create 1 thread
create 2 thread
create 3 thread
create 4 thread
add msg cb msgid = 1
lars dns service ....
now route version is 1571058286 modID = 1, cmdID = 1, ip = 3232235953, port = 9999
```
启动客户端:
```bash
$ ./lars_dns_test1 -h 127.0.0.1 -p 7778
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 2
connect 127.0.0.1:7778 succ!
modid = 1
cmdid = 1
host_size = 1
-->ip = 3232235953
-->port = 9999
```
我们知道,第一请求modid/cmdid就会订阅该Route模块。
然后我们通过外界修改modid=1,cmdid=1的模块,新开一个终端,执行test_insert_dns_route.sql
```bash
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql> use lars_dns;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed
mysql> \. test_insert_dns_route.sql
Database changed
Query OK, 0 rows affected (0.00 sec)
Query OK, 1 row affected (0.01 sec)
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
Query OK, 1 row affected (0.02 sec)
mysql>
```
然后我会会发现客户端已经得到一个新的消息,就是最新的route数据过来。是由dns_service主动推送过来的订阅消息.
客户端:
```bash
$ ./lars_dns_test1 -h 127.0.0.1 -p 7778
msg_router init...
do_connect EINPROGRESS
add msg cb msgid = 2
connect 127.0.0.1:7778 succ!
modid = 1
cmdid = 1
host_size = 1
-->ip = 3232235953
-->port = 9999
modid = 1
cmdid = 1
host_size = 1
-->ip = 3232235953
-->port = 9999
```
这样我们的订阅功能就完成了,整体的lars_dns模块的工作到此的基本需求全部也已经满足。
---
### 关于作者:
作者:`Aceld(刘丹冰)`
mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
github: [https://github.com/aceld](https://github.com/aceld)
原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld)
![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg)
>**原创声明:未经作者允许请勿转载, 如果转载请注明出处**
- 一、Lars系统概述
- 第1章-概述
- 第2章-项目目录构建
- 二、Reactor模型服务器框架
- 第1章-项目结构与V0.1雏形
- 第2章-内存管理与Buffer封装
- 第3章-事件触发EventLoop
- 第4章-链接与消息封装
- 第5章-Client客户端模型
- 第6章-连接管理及限制
- 第7章-消息业务路由分发机制
- 第8章-链接创建/销毁Hook机制
- 第9章-消息任务队列与线程池
- 第10章-配置文件读写功能
- 第11章-udp服务与客户端
- 第12章-数据传输协议protocol buffer
- 第13章-QPS性能测试
- 第14章-异步消息任务机制
- 第15章-链接属性设置功能
- 三、Lars系统之DNSService
- 第1章-Lars-dns简介
- 第2章-数据库创建
- 第3章-项目目录结构及环境构建
- 第4章-Route结构的定义
- 第5章-获取Route信息
- 第6章-Route订阅模式
- 第7章-Backend Thread实时监控
- 四、Lars系统之Report Service
- 第1章-项目概述-数据表及proto3协议定义
- 第2章-获取report上报数据
- 第3章-存储线程池及消息队列
- 五、Lars系统之LoadBalance Agent
- 第1章-项目概述及构建
- 第2章-主模块业务结构搭建
- 第3章-Report与Dns Client设计与实现
- 第4章-负载均衡模块基础设计
- 第5章-负载均衡获取Host主机信息API
- 第6章-负载均衡上报Host主机信息API
- 第7章-过期窗口清理与过载超时(V0.5)
- 第8章-定期拉取最新路由信息(V0.6)
- 第9章-负载均衡获取Route信息API(0.7)
- 第10章-API初始化接口(V0.8)
- 第11章-Lars Agent性能测试工具
- 第12章- Lars启动工具脚本