企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
## 6) Route订阅模式 ### 6.1 订阅模块的设计与实现 ​ 订阅模式整体的设计. > lars_dns/include/subscribe.h ```c #pragma once #include <vector> #include <pthread.h> #include <ext/hash_set> #include <ext/hash_map> #include "lars_reactor.h" #include "lars.pb.h" #include "dns_route.h" using namespace __gnu_cxx; //定义订阅列表数据关系类型,key->modid/cmdid, value->fds(订阅的客户端文件描述符) typedef hash_map<uint64_t, hash_set<int>> subscribe_map; //定义发布列表的数据关系类型, key->fd(订阅客户端的文件描述符), value->modids typedef hash_map<int, hash_set<uint64_t>> publish_map; class SubscribeList { public: //设计单例 static void init() { _instance = new SubscribeList(); } static SubscribeList *instance() { //保证init方法在这个进程执行中,只执行一次 pthread_once(&_once, init); return _instance; } //订阅 void subscribe(uint64_t mod, int fd); //取消订阅 void unsubscribe(uint64_t mod, int fd); //发布 void publish(std::vector<uint64_t> &change_mods); //根据在线用户fd得到需要发布的列表 void make_publish_map(listen_fd_set &online_fds, publish_map &need_publish); private: //设计单例 SubscribeList(); SubscribeList(const SubscribeList &); const SubscribeList& operator=(const SubscribeList); static SubscribeList *_instance; static pthread_once_t _once; subscribe_map _book_list; //订阅清单 pthread_mutex_t _book_list_lock; publish_map _push_list; //发布清单 pthread_mutex_t _push_list_lock; }; ``` ​ 首先`SubscribeList`采用单例设计。这里面定义了两种数据类型 ```c //定义订阅列表数据关系类型,key->modid/cmdid, value->fds(订阅的客户端文件描述符) typedef hash_map<uint64_t, hash_set<int>> subscribe_map; //定义发布列表的数据关系类型, key->fd(订阅客户端的文件描述符), value->modids typedef hash_map<int, hash_set<uint64_t>> publish_map; ``` ​ `subscribe_map`是目前dns系统的总体订阅列表,记录了订阅的modid/cmdid都有哪些fds已经订阅了,其实一个fd就代表一个客户端。 ​ `publish_map`是即将发布的表,其实这里面是subscribe_map的一个反表,key是订阅的客户端fd,而value是该客户端需要接收的订阅modid/cmdid数据。 **属性**: `_book_list`:目前dns已经全部的订阅信息清单。 `_push_list`:目前dns即将发布的客户端及订阅信息清单。 **方法** `void subscribe(uint64_t mod, int fd)`: 加入modid/cmdid 和订阅的客户端fd到_book_list中。 `void unsubscribe(uint64_t mod, int fd)`:取消一条订阅数据。 `void publish(std::vector<uint64_t> &change_mods)`: 发布订阅数据,其中change_mods是需要发布的那些modid/cmdid组合。 `void make_publish_map(listen_fd_set &online_fds, publish_map &need_publish)`: 根据目前在线的订阅用户,得到需要通信的发布订阅列表。 具体实现如下: > lars_dns/src/subscribe.cpp ```c #include "subscribe.h" extern tcp_server *server; //单例对象 SubscribeList *SubscribeList::_instance = NULL; //用于保证创建单例的init方法只执行一次的锁 pthread_once_t SubscribeList::_once = PTHREAD_ONCE_INIT; SubscribeList::SubscribeList() { } //订阅 void SubscribeList::subscribe(uint64_t mod, int fd) { //将mod->fd的关系加入到_book_list中 pthread_mutex_lock(&_book_list_lock); _book_list[mod].insert(fd); pthread_mutex_unlock(&_book_list_lock); } //取消订阅 void SubscribeList::unsubscribe(uint64_t mod, int fd) { //将mod->fd关系从_book_list中删除 pthread_mutex_lock(&_book_list_lock); if (_book_list.find(mod) != _book_list.end()) { _book_list[mod].erase(fd); if (_book_list[mod].empty() == true) { _book_list.erase(mod); } } pthread_mutex_unlock(&_book_list_lock); } void push_change_task(event_loop *loop, void *args) { SubscribeList *subscribe = (SubscribeList*)args; //1 获取全部的在线客户端fd listen_fd_set online_fds; loop->get_listen_fds(online_fds); //2 从subscribe的_push_list中 找到与online_fds集合匹配,放在一个新的publish_map里 publish_map need_publish; subscribe->make_publish_map(online_fds, need_publish); //3 依次从need_publish取出数据 发送给对应客户端链接 publish_map::iterator it; for (it = need_publish.begin(); it != need_publish.end(); it++) { int fd = it->first; //fd //遍历 fd对应的 modid/cmdid集合 hash_set<uint64_t>::iterator st; for (st = it->second.begin(); st != it->second.end(); st++) { //一个modid/cmdid int modid = int((*st) >> 32); int cmdid = int(*st); //组装pb消息,发送给客户 lars::GetRouteResponse rsp; rsp.set_modid(modid); rsp.set_cmdid(cmdid); //通过route查询对应的host ip/port信息 进行组装 host_set hosts = Route::instance()->get_hosts(modid, cmdid) ; for (host_set_it hit = hosts.begin(); hit != hosts.end(); hit++) { uint64_t ip_port_pair = *hit; lars::HostInfo host_info; host_info.set_ip((uint32_t)(ip_port_pair >> 32)); host_info.set_port((int)ip_port_pair); //添加到rsp中 rsp.add_host()->CopyFrom(host_info); } //给当前fd 发送一个更新消息 std::string responseString; rsp.SerializeToString(&responseString); //通过fd取出链接信息 net_connection *conn = tcp_server::conns[fd]; if (conn != NULL) { conn->send_message(responseString.c_str(), responseString.size(), lars::ID_GetRouteResponse); } } } } //根据在线用户fd得到需要发布的列表 void SubscribeList::make_publish_map( listen_fd_set &online_fds, publish_map &need_publish) { publish_map::iterator it; pthread_mutex_lock(&_push_list_lock); //遍历_push_list 找到 online_fds匹配的数据,放到need_publish中 for (it = _push_list.begin(); it != _push_list.end(); it++) { //it->first 是 fd //it->second 是 modid/cmdid if (online_fds.find(it->first) != online_fds.end()) { //匹配到 //当前的键值对移动到need_publish中 need_publish[it->first] = _push_list[it->first]; //当该组数据从_push_list中删除掉 _push_list.erase(it); } } pthread_mutex_unlock(&_push_list_lock); } //发布 void SubscribeList::publish(std::vector<uint64_t> &change_mods) { //1 将change_mods已经修改的mod->fd // 放到 发布清单_push_list中 pthread_mutex_lock(&_book_list_lock); pthread_mutex_lock(&_push_list_lock); std::vector<uint64_t>::iterator it; for (it = change_mods.begin(); it != change_mods.end(); it++) { uint64_t mod = *it; if (_book_list.find(mod) != _book_list.end()) { //将mod下面的fd set集合拷迁移到 _push_list中 hash_set<int>::iterator fds_it; for (fds_it = _book_list[mod].begin(); fds_it != _book_list[mod].end(); fds_it++) { int fd = *fds_it; _push_list[fd].insert(mod); } } } pthread_mutex_unlock(&_push_list_lock); pthread_mutex_unlock(&_book_list_lock); //2 通知各个线程去执行推送任务 server->thread_poll()->send_task(push_change_task, this); } ``` ​ 这里需要注意的是`publish()`里的server变量是全局变量,全局唯一的server句柄。 ### 6.2 开启订阅 ​ 那么订阅功能实现了,该如何是调用触发订阅功能能,我们可以在一个客户端建立连接成功之后来调用. > lars_dns/src/dns_service.cpp ```c #include <ext/hash_set> #include "lars_reactor.h" #include "subscribe.h" #include "dns_route.h" #include "lars.pb.h" tcp_server *server; using __gnu_cxx::hash_set; typedef hash_set<uint64_t> client_sub_mod_list; // ... //订阅route 的modid/cmdid void create_subscribe(net_connection * conn, void *args) { conn->param = new client_sub_mod_list; } //退订route 的modid/cmdid void clear_subscribe(net_connection * conn, void *args) { client_sub_mod_list::iterator it; client_sub_mod_list *sub_list = (client_sub_mod_list*)conn->param; for (it = sub_list->begin(); it != sub_list->end(); it++) { uint64_t mod = *it; SubscribeList::instance()->unsubscribe(mod, conn->get_fd()); } delete sub_list; conn->param = NULL; } int main(int argc, char **argv) { event_loop loop; //加载配置文件 config_file::setPath("conf/lars_dns.conf"); std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0"); short port = config_file::instance()->GetNumber("reactor", "port", 7778); //创建tcp服务器 server = new tcp_server(&loop, ip.c_str(), port); //==========注册链接创建/销毁Hook函数============ server->set_conn_start(create_subscribe); server->set_conn_close(clear_subscribe); //============================================ //注册路由业务 server->add_msg_router(lars::ID_GetRouteRequest, get_route); //开始事件监听 printf("lars dns service ....\n"); loop.event_process(); return 0; } ``` ​ 这里注册了两个链接Hook。`create_subscribe()`和`clear_subscribe()`。 `client_sub_mod_list`为当前客户端链接所订阅的route信息列表。主要存放当前客户订阅的modid/cmdid的集合。因为不同的客户端订阅的信息不同,所以要将该列表与每个conn进行绑定。 --- ### 关于作者: 作者:`Aceld(刘丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原创声明:未经作者允许请勿转载, 如果转载请注明出处**