合规国际互联网加速 OSASE为企业客户提供高速稳定SD-WAN国际加速解决方案。 广告
## 12) udp服务与客户端 ​ 接下来为了让Reactor框架功能更加丰富,结合之前的功能,再加上udpserver的服务接口。udp我们暂时不考虑加线程池实现,只是单线程的处理方式。 ### 12.1 udp_server服务端功能实现 > lars_reactor/include/udp_server.h ```c #pragma once #include <netinet/in.h> #include "event_loop.h" #include "net_connection.h" #include "message.h" class udp_server :public net_connection { public: udp_server(event_loop *loop, const char *ip, uint16_t port); virtual int send_message(const char *data, int msglen, int msgid); //注册消息路由回调函数 void add_msg_router(int msgid, msg_callback* cb, void *user_data = NULL); ~udp_server(); //处理消息业务 void do_read(); private: int _sockfd; char _read_buf[MESSAGE_LENGTH_LIMIT]; char _write_buf[MESSAGE_LENGTH_LIMIT]; //事件触发 event_loop* _loop; //服务端ip struct sockaddr_in _client_addr; socklen_t _client_addrlen; //消息路由分发 msg_router _router; }; ``` ​ 对应的方法实现方式如下: > lars_reactor/src/udp_server.cpp ```c #include <signal.h> #include <unistd.h> #include <strings.h> #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <arpa/inet.h> #include <stdio.h> #include <string.h> #include "udp_server.h" void read_callback(event_loop *loop, int fd, void *args) { udp_server *server = (udp_server*)args; //处理业务函数 server->do_read(); } void udp_server::do_read() { while (true) { int pkg_len = recvfrom(_sockfd, _read_buf, sizeof(_read_buf), 0, (struct sockaddr *)&_client_addr, &_client_addrlen); if (pkg_len == -1) { if (errno == EINTR) { continue; } else if (errno == EAGAIN) { break; } else { perror("recvfrom\n"); break; } } //处理数据 msg_head head; memcpy(&head, _read_buf, MESSAGE_HEAD_LEN); if (head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0 || head.msglen + MESSAGE_HEAD_LEN != pkg_len) { //报文格式有问题 fprintf(stderr, "do_read, data error, msgid = %d, msglen = %d, pkg_len = %d\n", head.msgid, head.msglen, pkg_len); continue; } //调用注册的路由业务 _router.call(head.msgid, head.msglen, _read_buf+MESSAGE_HEAD_LEN, this); } } udp_server::udp_server(event_loop *loop, const char *ip, uint16_t port) { //1 忽略一些信号 if (signal(SIGHUP, SIG_IGN) == SIG_ERR) { perror("signal ignore SIGHUB"); exit(1); } if (signal(SIGPIPE, SIG_IGN) == SIG_ERR) { perror("signal ignore SIGPIPE"); exit(1); } //2 创建套接字 //SOCK_CLOEXEC在execl中使用该socket则关闭,在fork中使用该socket不关闭 _sockfd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP); if (_sockfd == -1) { perror("create udp socket"); exit(1); } //3 设置服务ip+port struct sockaddr_in servaddr; bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; inet_aton(ip, &servaddr.sin_addr);//设置ip servaddr.sin_port = htons(port);//设置端口 //4 绑定 bind(_sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr)); //3 添加读业务事件 _loop = loop; bzero(&_client_addr, sizeof(_client_addr)); _client_addrlen = sizeof(_client_addr); printf("server on %s:%u is running...\n", ip, port); _loop->add_io_event(_sockfd, read_callback, EPOLLIN, this); } int udp_server::send_message(const char *data, int msglen, int msgid) { if (msglen > MESSAGE_LENGTH_LIMIT) { fprintf(stderr, "too large message to send\n"); return -1; } msg_head head; head.msglen = msglen; head.msgid = msgid; memcpy(_write_buf, &head, MESSAGE_HEAD_LEN); memcpy(_write_buf + MESSAGE_HEAD_LEN, data, msglen); int ret = sendto(_sockfd, _write_buf, msglen + MESSAGE_HEAD_LEN, 0, (struct sockaddr*)&_client_addr, _client_addrlen); if (ret == -1) { perror("sendto().."); return -1; } return ret; } //注册消息路由回调函数 void udp_server::add_msg_router(int msgid, msg_callback* cb, void *user_data) { _router.register_msg_router(msgid, cb, user_data); } udp_server::~udp_server() { _loop->del_io_event(_sockfd); close(_sockfd); } ``` ​ 这里面实现的方式和tcp_server的实现方式几乎一样,需要注意的是,udp的socket编程是不需要listen的,而且也不需要accept。所以recvfrom就能够得知每个包的对应客户端是谁,然后回执消息给对应的客户端就可以。因为没有连接,所以都是以包为单位来处理的,一个包一个包处理。可能相邻的两个包来自不同的客户端。 ### 12.2 udp_client客户端功能实现 > lars_reactor/include/udp_client.h ```h #pragma once #include "net_connection.h" #include "message.h" #include "event_loop.h" class udp_client: public net_connection { public: udp_client(event_loop *loop, const char *ip, uint16_t port); ~udp_client(); void add_msg_router(int msgid, msg_callback *cb, void *user_data = NULL); virtual int send_message(const char *data, int msglen, int msgid); //处理消息 void do_read(); private: int _sockfd; char _read_buf[MESSAGE_LENGTH_LIMIT]; char _write_buf[MESSAGE_LENGTH_LIMIT]; //事件触发 event_loop *_loop; //消息路由分发 msg_router _router; }; ``` > lars_reactor/src/udp_client.cpp ```c #include "udp_client.h" #include <sys/types.h> #include <sys/socket.h> #include <arpa/inet.h> #include <unistd.h> #include <strings.h> #include <string.h> #include <stdio.h> void read_callback(event_loop *loop, int fd, void *args) { udp_client *client = (udp_client*)args; client->do_read(); } udp_client::udp_client(event_loop *loop, const char *ip, uint16_t port) { //1 创建套接字 _sockfd = socket(AF_INET, SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_UDP); if (_sockfd == -1) { perror("create socket error"); exit(1); } struct sockaddr_in servaddr; bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; inet_aton(ip, &servaddr.sin_addr); servaddr.sin_port = htons(port); //2 链接 int ret = connect(_sockfd, (const struct sockaddr*)&servaddr, sizeof(servaddr)); if (ret == -1) { perror("connect"); exit(1); } //3 添加读事件 _loop = loop; _loop->add_io_event(_sockfd, read_callback, EPOLLIN, this); } udp_client::~udp_client() { _loop->del_io_event(_sockfd); close(_sockfd); } //处理消息 void udp_client::do_read() { while (true) { int pkt_len = recvfrom(_sockfd, _read_buf, sizeof(_read_buf), 0, NULL, NULL); if (pkt_len == -1) { if (errno == EINTR) { continue; } else if (errno == EAGAIN) { break; } else { perror("recvfrom()"); break; } } //处理客户端包 msg_head head; memcpy(&head, _read_buf, MESSAGE_HEAD_LEN); if (head.msglen > MESSAGE_LENGTH_LIMIT || head.msglen < 0 || head.msglen + MESSAGE_HEAD_LEN != pkt_len) { //报文格式有问题 fprintf(stderr, "do_read, data error, msgid = %d, msglen = %d, pkt_len = %d\n", head.msgid, head.msglen, pkt_len); continue; } //调用注册的路由业务 _router.call(head.msgid, head.msglen, _read_buf+MESSAGE_HEAD_LEN, this); } } void udp_client::add_msg_router(int msgid, msg_callback *cb, void *user_data) { _router.register_msg_router(msgid, cb, user_data); } int udp_client::send_message(const char *data, int msglen, int msgid) { if (msglen > MESSAGE_LENGTH_LIMIT) { fprintf(stderr, "too large message to send\n"); return -1; } msg_head head; head.msglen = msglen; head.msgid = msgid; memcpy(_write_buf, &head, MESSAGE_HEAD_LEN); memcpy(_write_buf + MESSAGE_HEAD_LEN, data, msglen); int ret = sendto(_sockfd, _write_buf, msglen + MESSAGE_HEAD_LEN, 0, NULL, 0); if (ret == -1) { perror("sendto().."); return -1; } return ret; } ``` ​ 客户端和服务端代码除了构造函数不同,其他基本差不多。接下来我们可以测试一下udp的通信功能 ### 12.3 完成Lars Reactor V0.10开发 服务端 > server.cpp ```c #include <string> #include <string.h> #include "config_file.h" #include "udp_server.h" //回显业务的回调函数 void callback_busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { printf("callback_busi ...\n"); //直接回显 conn->send_message(data, len, msgid); } int main() { event_loop loop; //加载配置文件 config_file::setPath("./serv.conf"); std::string ip = config_file::instance()->GetString("reactor", "ip", "0.0.0.0"); short port = config_file::instance()->GetNumber("reactor", "port", 8888); printf("ip = %s, port = %d\n", ip.c_str(), port); udp_server server(&loop, ip.c_str(), port); //注册消息业务路由 server.add_msg_router(1, callback_busi); loop.event_process(); return 0; } ``` 客户端 > client.cpp ```c #include <stdio.h> #include <string.h> #include "udp_client.h" //客户端业务 void busi(const char *data, uint32_t len, int msgid, net_connection *conn, void *user_data) { //得到服务端回执的数据 char *str = NULL; str = (char*)malloc(len+1); memset(str, 0, len+1); memcpy(str, data, len); printf("recv server: [%s]\n", str); printf("msgid: [%d]\n", msgid); printf("len: [%d]\n", len); } int main() { event_loop loop; //创建udp客户端 udp_client client(&loop, "127.0.0.1", 7777); //注册消息路由业务 client.add_msg_router(1, busi); //发消息 int msgid = 1; const char *msg = "Hello Lars!"; client.send_message(msg, strlen(msg), msgid); //开启事件监听 loop.event_process(); return 0; } ``` 启动服务端和客户端并允许,结果如下: server ```bash $ ./server ip = 127.0.0.1, port = 7777 msg_router init... server on 127.0.0.1:7777 is running... add msg cb msgid = 1 call msgid = 1 call data = Hello Lars! call msglen = 11 callback_busi ... ======= ``` client ```bash $ ./client msg_router init... add msg cb msgid = 1 call msgid = 1 call data = Hello Lars! call msglen = 11 recv server: [Hello Lars!] msgid: [1] len: [11] ======= ``` --- ### 关于作者: 作者:`Aceld(刘丹冰)` mail: [danbing.at@gmail.com](mailto:danbing.at@gmail.com) github: [https://github.com/aceld](https://github.com/aceld) 原创书籍: [https://www.kancloud.cn/@aceld](https://www.kancloud.cn/@aceld) ![](https://img.kancloud.cn/b0/d1/b0d11a21ba62e96aef1c11d5bfff2cf8_227x227.jpg) >**原创声明:未经作者允许请勿转载, 如果转载请注明出处**