## 6) Route订阅模式
### 6.1 订阅模块的设计与实现
订阅模式整体的设计.
> lars_dns/include/subscribe.h
```c
#pragma once
#include <vector>
#include <pthread.h>
#include <ext/hash_set>
#include <ext/hash_map>
#include "lars_reactor.h"
#include "lars.pb.h"
#include "dns_route.h"
using namespace __gnu_cxx;
//定义订阅列表数据关系类型,key->modid/cmdid, value->fds(订阅的客户端文件描述符)
typedef hash_map<uint64_t, hash_set<int>> subscribe_map;
//定义发布列表的数据关系类型, key->fd(订阅客户端的文件描述符), value->modids
typedef hash_map<int, hash_set<uint64_t>> publish_map;
class SubscribeList {
public:
//设计单例
static void init() {
_instance = new SubscribeList();
}
static SubscribeList *instance() {
//保证init方法在这个进程执行中,只执行一次
pthread_once(&_once, init);
return _instance;
}
//订阅
void subscribe(uint64_t mod, int fd);
//取消订阅
void unsubscribe(uint64_t mod, int fd);
//发布
void publish(std::vector<uint64_t> &change_mods);
//根据在线用户fd得到需要发布的列表
void make_publish_map(listen_fd_set &online_fds,
publish_map &need_publish);
private:
//设计单例
SubscribeList();
SubscribeList(const SubscribeList &);
const SubscribeList& operator=(const SubscribeList);
static SubscribeList *_instance;
static pthread_once_t _once;
subscribe_map _book_list; //订阅清单
pthread_mutex_t _book_list_lock;
publish_map _push_list; //发布清单
pthread_mutex_t _push_list_lock;
};
```
首先`SubscribeList`采用单例设计。这里面定义了两种数据类型
```c
//定义订阅列表数据关系类型,key->modid/cmdid, value->fds(订阅的客户端文件描述符)
typedef hash_map<uint64_t, hash_set<int>> subscribe_map;
//定义发布列表的数据关系类型, key->fd(订阅客户端的文件描述符), value->modids
typedef hash_map<int, hash_set<uint64_t>> publish_map;
```
`subscribe_map`是目前dns系统的总体订阅列表,记录了订阅的modid/cmdid都有哪些fds已经订阅了,其实一个fd就代表一个客户端。
`publish_map`是即将发布的表,其实这里面是subscribe_map的一个反表,key是订阅的客户端fd,而value是该客户端需要接收的订阅modid/cmdid数据。
**属性**:
`_book_list`:目前dns已经全部的订阅信息清单。
`_push_list`:目前dns即将发布的客户端及订阅信息清单。
**方法**
`void subscribe(uint64_t mod, int fd)`: 加入modid/cmdid 和订阅的客户端fd到_book_list中。
`void unsubscribe(uint64_t mod, int fd)`:取消一条订阅数据。
`void publish(std::vector<uint64_t> &change_mods)`: 发布订阅数据,其中change_mods是需要发布的那些modid/cmdid组合。
`void make_publish_map(listen_fd_set &online_fds, publish_map &need_publish)`: 根据目前在线的订阅用户,得到需要通信的发布订阅列表。
具体实现如下:
> lars_dns/src/subscribe.cpp
```c
#include "subscribe.h"
extern tcp_server *server;
//单例对象
SubscribeList *SubscribeList::_instance = NULL;
//用于保证创建单例的init方法只执行一次的锁
pthread_once_t SubscribeList::_once = PTHREAD_ONCE_INIT;
SubscribeList::SubscribeList()
{
}
//订阅
void SubscribeList::subscribe(uint64_t mod, int fd)
{
//将mod->fd的关系加入到_book_list中
pthread_mutex_lock(&_book_list_lock);
_book_list[mod].insert(fd);
pthread_mutex_unlock(&_book_list_lock);
}
//取消订阅
void SubscribeList::unsubscribe(uint64_t mod, int fd)
{
//将mod->fd关系从_book_list中删除
pthread_mutex_lock(&_book_list_lock);
if (_book_list.find(mod) != _book_list.end()) {
_book_list[mod].erase(fd);
if (_book_list[mod].empty() == true) {
_book_list.erase(mod);
}
}
pthread_mutex_unlock(&_book_list_lock);
}
void push_change_task(event_loop *loop, void *args)
{
SubscribeList *subscribe = (SubscribeList*)args;
//1 获取全部的在线客户端fd
listen_fd_set online_fds;
loop->get_listen_fds(online_fds);
//2 从subscribe的_push_list中 找到与online_fds集合匹配,放在一个新的publish_map里
publish_map need_publish;
subscribe->make_publish_map(online_fds, need_publish);
//3 依次从need_publish取出数据 发送给对应客户端链接
publish_map::iterator it;
for (it = need_publish.begin(); it != need_publish.end(); it++) {
int fd = it->first; //fd
//遍历 fd对应的 modid/cmdid集合
hash_set<uint64_t>::iterator st;
for (st = it->second.begin(); st != it->second.end(); st++) {
//一个modid/cmdid
int modid = int((*st) >> 32);
int cmdid = int(*st);
//组装pb消息,发送给客户
lars::GetRouteResponse rsp;
rsp.set_modid(modid);
rsp.set_cmdid(cmdid);
//通过route查询对应的host ip/port信息 进行组装
host_set hosts = Route::instance()->get_hosts(modid, cmdid) ;
for (host_set_it hit = hosts.begin(); hit != hosts.end(); hit++) {
uint64_t ip_port_pair = *hit;
lars::HostInfo host_info;
host_info.set_ip((uint32_t)(ip_port_pair >> 32));
host_info.set_port((int)ip_port_pair);
//添加到rsp中
rsp.add_host()->CopyFrom(host_info);
}
//给当前fd 发送一个更新消息
std::string responseString;
rsp.SerializeToString(&responseString);
//通过fd取出链接信息
net_connection *conn = tcp_server::conns[fd];
if (conn != NULL) {
conn->send_message(responseString.c_str(), responseString.size(), lars::ID_GetRouteResponse);
}
}
}
}
//根据在线用户fd得到需要发布的列表
void SubscribeList::make_publish_map(
listen_fd_set &online_fds,
publish_map &need_publish)
{
publish_map::iterator it;
pthread_mutex_lock(&_push_list_lock);
//遍历_push_list 找到 online_fds匹配的数据,放到need_publish中
for (it = _push_list.begin(); it != _push_list.end(); it++) {
//it->first 是 fd
//it->second 是 modid/cmdid
if (online_fds.find(it->first) != online_fds.end()) {
//匹配到
//当前的键值对移动到need_publish中
need_publish[it->first] = _push_list[it->first];
//当该组数据从_push_list中删除掉
_push_list.erase(it);
}
}
pthread_mutex_unlock(&_push_list_lock);
}
//发布
void SubscribeList::publish(std::vector<uint64_t> &change_mods)
{
//1 将change_mods已经修改的mod->fd
// 放到 发布清单_push_list中
pthread_mutex_lock(&_book_list_lock);
pthread_mutex_lock(&_push_list_lock);
std::vector<uint64_t>::iterator it;
for (it = change_mods.begin(); it != change_mods.end(); it++) {
uint64_t mod = *it;
if (_book_list.find(mod) != _book_list.end()) {
//将mod下面的fd set集合拷迁移到 _push_list中
hash_set<int>::iterator fds_it;
for (fds_it = _book_list[mod].begin(); fds_it != _book_list[mod].end(); fds_it++) {
int fd = *fds_it;
_push_list[fd].insert(mod);
}
}
}
pthread_mutex_unlock(&_push_list_lock);
pthread_mutex_unlock(&_book_list_lock);
//2 通知各个线程去执行推送任务
server->thread_poll()->send_task(push_change_task, this);
}
```
这里需要注意的是`publish()`里的server变量是全局变量,全局唯一的server句柄。
### 6.2 开启订阅
那么订阅功能实现了,该如何是调用触发订阅功能能,我们可以在一个客户端建立连接成功之后来调用.
> lars_dns/src/dns_service.cpp
```c
#include <ext/hash_set>
#include "lars_reactor.h"
#include "subscribe.h"
#include "dns_route.h"
#include "lars.pb.h"
tcp_server *server;
using __gnu_cxx::hash_set;
typedef hash_set<uint64_t> client_sub_mod_list;
// ...
//订阅route 的modid/cmdid
void create_subscribe(net_connection * conn, void *args)
{
conn->param = new client_sub_mod_list;
}
//退订route 的modid/cmdid
void clear_subscribe(net_connection * conn, void *args)
{
client_sub_mod_list::iterator it;
client_sub_mod_list *sub_list = (client_sub_mod_list*)conn->param;
for (it = sub_list->begin(); it != sub_list->end(); it++) {
uint64_t mod = *it;
SubscribeList::instance()->unsubscribe(mod, conn->get_fd());
}
delete sub_list;
conn->param = NULL;
}
int main(int argc, char **argv)
{
event_loop loop;
//加载配置文件
config_file::setPath("conf/lars_dns.conf");
std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0");
short port = config_file::instance()->GetNumber("reactor", "port", 7778);
//创建tcp服务器
server = new tcp_server(&loop, ip.c_str(), port);
//==========注册链接创建/销毁Hook函数============
server->set_conn_start(create_subscribe);
server->set_conn_close(clear_subscribe);
//============================================
//注册路由业务
server->add_msg_router(lars::ID_GetRouteRequest, get_route);
//开始事件监听
printf("lars dns service ....\n");
loop.event_process();
return 0;
}
```
这里注册了两个链接Hook。`create_subscribe()`和`clear_subscribe()`。
`client_sub_mod_list`为当前客户端链接所订阅的route信息列表。主要存放当前客户订阅的modid/cmdid的集合。因为不同的客户端订阅的信息不同,所以要将该列表与每个conn进行绑定。
---
### 关于作者:
作者:`Aceld(刘丹冰)`
mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com)
github: [https://github.com/aceld](https://github.com/aceld)
原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld)
![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg)
>**原创声明:未经作者允许请勿转载, 如果转载请注明出处**
- 一、Lars系统概述
- 第1章-概述
- 第2章-项目目录构建
- 二、Reactor模型服务器框架
- 第1章-项目结构与V0.1雏形
- 第2章-内存管理与Buffer封装
- 第3章-事件触发EventLoop
- 第4章-链接与消息封装
- 第5章-Client客户端模型
- 第6章-连接管理及限制
- 第7章-消息业务路由分发机制
- 第8章-链接创建/销毁Hook机制
- 第9章-消息任务队列与线程池
- 第10章-配置文件读写功能
- 第11章-udp服务与客户端
- 第12章-数据传输协议protocol buffer
- 第13章-QPS性能测试
- 第14章-异步消息任务机制
- 第15章-链接属性设置功能
- 三、Lars系统之DNSService
- 第1章-Lars-dns简介
- 第2章-数据库创建
- 第3章-项目目录结构及环境构建
- 第4章-Route结构的定义
- 第5章-获取Route信息
- 第6章-Route订阅模式
- 第7章-Backend Thread实时监控
- 四、Lars系统之Report Service
- 第1章-项目概述-数据表及proto3协议定义
- 第2章-获取report上报数据
- 第3章-存储线程池及消息队列
- 五、Lars系统之LoadBalance Agent
- 第1章-项目概述及构建
- 第2章-主模块业务结构搭建
- 第3章-Report与Dns Client设计与实现
- 第4章-负载均衡模块基础设计
- 第5章-负载均衡获取Host主机信息API
- 第6章-负载均衡上报Host主机信息API
- 第7章-过期窗口清理与过载超时(V0.5)
- 第8章-定期拉取最新路由信息(V0.6)
- 第9章-负载均衡获取Route信息API(0.7)
- 第10章-API初始化接口(V0.8)
- 第11章-Lars Agent性能测试工具
- 第12章- Lars启动工具脚本