[toc] ## 一、链路保持 链路保持流程应遵循以下规定: 1. 下级平台登录成功后,在与上级平台之间如果有应用业务数据包往来的情况下,不需要发送链路保持数据包;否则,下级平台应每隔1min发送一个链路保持请求数据包到上级平台以保持链路连接; 2. 在没有应用数据包往来的情况下,上级平台连续3min未收到下级平台发送的链路保持请求数据包,则认为与下级平台的连接中断,将主动断开数据传输链路; 3. 在没有应用数据包往来的情况下,下级平台连续3min未收到上级平台发送的链路保持应答数据包,则认为与上级平台的连接中断,将主动断开数据传输从链路。 解析: 第一条和第三条都是关于客户端的实现。第二条是服务端的实现。 ## 二、NIO长连接ChannelServer 长连接的实现方案: - 基于Socket+多线程; - Java NIO; - Mina; - Netty; - QuickServer。 本案例,基于性能+简洁的考虑,我们基于Java的NIO实现长连接服务端。 ### 代码结构 ``` location-center |-- src |-- main |-- java |-- com.zihan.location.parser |-- AuthHelper.java |-- LocationParserHelper.java |-- ByteMeta.java |-- socket |-- alive |-- ChannelServer.java ## 长连接socketServer |-- Protocol.java ## socket操作 |-- ProtocolImpl.java ## socket操作实现 ``` ### NIO服务端开发步骤 写代码之前,我们先总结一下使用Java NIO进行服务端开发的步骤: 1. 创建ServerSocketChannel,配置它为非阻塞模式; 2. 绑定监听,配置TCP参数,例如backlog大小; 3. 创建一个独立的I/O线程,用于轮询多路复用器Selector; 4. 创建Selector,将之前创建的ServerSocketChannel注册到Selector上,监听`SelectionKey.ACCEPT`; 5. 启动I/O线程,在循环体中执行Selector.select()方法,轮询就绪的Channel; 6. 当轮询到了处于就绪状态的Channel时,需要对其进行判断,如果是`OP_ACCEPT`状态,说明是新的客户端接入,则调用`ServerSocketChannelaccept()`方法接受新的客户端; 7. 设置新接入的客户端链路SocketChannel为非阻塞模式,配置其他的一些TCP参数; 8. 将SocketChannel注册到Selector,监听`OP_READ`操作位; 9. 如果轮询的Channel为OP_READ,则说明SocketChannel中有新的就绪的数据包需要读取,则构造ByteBuffer对象,读取数据包; 10. 如果轮询的Channel为`OP_WRITE`,说明还有数据没有发送完成,需要继续发送。 ```java @Component @Slf4j public class ChannelServer { private static final int BUFFSIZE=1024; private static final int TIMEOUT= 180000; private static final int PORT = 8089; @Resource private Protocol protocol;////创建处理协议类型 public void sendMessagestartSocketServer() { try { Selector selector = Selector.open();//创建选择器 //打开监听信道 ServerSocketChannel serverChannel = ServerSocketChannel.open(); //与端口绑定 serverChannel.socket().bind(new InetSocketAddress(PORT)); //设置非阻塞模式 serverChannel.configureBlocking(false); //将选择器注册到监听信道,只有非阻塞信道才可以注册,并指出该信道可以Accept serverChannel.register(selector, SelectionKey.OP_ACCEPT); while(true){ //等待某信道就绪(或超时) if(selector.select(TIMEOUT)==0){ log.info("无客户端连接,系统等待中"); continue; } //取得迭代器selectedKey()中包含了每个准备好某一操作信道的selectionKey Iterator it = selector.selectedKeys().iterator(); while(it.hasNext()){ SelectionKey key = (SelectionKey) it.next(); try{ if(key.isAcceptable()){ //有客户端请求时 //接受请求并处理 处理该key protocol.handleAccept(key); } if(key.isValid() && key.isReadable()){ //从客户端取数据 protocol.handleRead(key); } // if(key.isValid() && key.isWritable()){ // //客户端可写时进行发送 // Thread.sleep(1000); // if (response.length > 0){ // protocol.handlWrite(key, ByteBuffer.wrap(response)); // } // } }catch(Exception e){ it.remove(); continue; } it.remove(); } } } catch (Exception e) { e.printStackTrace(); } } } ``` ### Protocol接口 Protocol接口,定义了socket服务端在读、写状态时,可以进行的几种操作。 - 客户端连接时,注册一个通道到选择器; - 通道状态为可读时,读取缓冲区中的数据; - 通道状态为可写时,向写缓冲区写入数据。 ```java public interface Protocol { void handleAccept(SelectionKey skey) throws IOException; void handleRead(SelectionKey skey) throws IOException; void handlWrite(SelectionKey skey, ByteBuffer bf) throws IOException; } ``` Protocol接口实现: ``` @Slf4j @Service public class ProtocolImpl implements Protocol { @Resource private ReportLogService reportLogService; private int buffSize = 2048; /** * 接收一个socketChannel的处理 */ @Override public void handleAccept(SelectionKey skey) throws IOException { // TODO Auto-generated method stub SocketChannel clientChannel = ((ServerSocketChannel)skey.channel()).accept(); clientChannel.configureBlocking(false); clientChannel.register(skey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(buffSize)); log.info("clientChannel"+clientChannel); } /** * 向一个socketChannel写入 */ @Override public void handlWrite(SelectionKey skey , ByteBuffer bf) throws IOException { // TODO Auto-generated method stub SocketChannel clientChannel = (SocketChannel) skey.channel(); clientChannel.write(bf); skey.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE); } /** * 接收一个socketChannel的处理 */ @Override public void handleRead(SelectionKey skey) throws IOException { // 获得客户端通讯信道 SocketChannel clientChannel = (SocketChannel) skey.channel(); log.info("数据读取中:"+clientChannel); // 得到清空缓冲区 ByteBuffer bf = (ByteBuffer) skey.attachment(); bf.clear(); long bytesRead; try { bytesRead = clientChannel.read(bf); }catch (IOException e){ skey.cancel(); clientChannel.socket().close(); clientChannel.close(); return; } if (bytesRead == -1) { clientChannel.close(); } else { bf.flip(); byte[] bytes = bf.array(); log.info("服务器收到来自"+clientChannel.getRemoteAddress()+"的消息:"+ByteUtil.bytesToHex(bytes)); this.packageParse(bytes,clientChannel); //根据bytes完成业务解析功能 skey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } } ``` ## 三、粘包与拆包 客户端为了提高Socket通讯的效率,往往在开发过程中,不是有一条消息就发送一条请求。而是,缓冲区快满的时候再把数据发送出去。因此,我们会遇到粘包、拆包的一些情形。 **什么是粘包?** 简单来讲,就是发送端需要等缓冲区满才发送出去,这样发送数据包就是粘包。 因此,我们要对粘包按照协议规定的头、尾信息进行拆分。然后再通过遍历的方式,分别处理数据包中的业务请求。 ``` private void packageParse(byte[] bytes,SocketChannel clientChannel) throws IOException { //解决粘包问题 byte[] response = new byte[0]; //整包解析 List<byte[]> list = this.packageCut(bytes); for (byte[] item : list){ try { response = ArrayUtils.addAll(response,LocationParserHelper.buildResponse(item,clientChannel)); } catch (Exception e) { log.error(e.getMessage()); e.printStackTrace(); response = ArrayUtils.addAll(response,ResponseHelper.error()); } } clientChannel.write(ByteBuffer.wrap(response)); //记录日志 this.saveLog(bytes,clientChannel,response); } /** * 拆包 * @param bytes * @return */ private List<byte[]> packageCut(byte[] bytes){ List<byte[]> list = new ArrayList<>(); int start =0 ,end = 0; for (int i = 0; i<bytes.length ;i++){ byte item = bytes[i]; if (item == MsgConfig.getSTARTER()){ start = i; i += MsgConfig.getHeaderLength();//提升解包效率 }else if (item == MsgConfig.getEND()){ end = i; if (end > start) { log.info("拆包成功:解析到1条数据,start=" + start + ",end=" + end + ",i=" + i); byte[] sub = ByteUtil.subBytes(bytes, start, end - start + 1); String s = ByteUtil.bytesToHex(sub); if (!s.startsWith("5b")){ log.info("拆包得到异常包:" + s); }else { log.info("拆包得到数据为:" + ByteUtil.bytesToHex(sub)); list.add(sub); } start = end + 1; } } } log.info("拆包成功,得到" + list.size() + "条数据"); return list; } ``` ## 四、日志记录 另外,这个通讯框架中我们还可以加入日志的功能,把通讯过程记录作完整的记录。 ### 数据库设计 ``` CREATE TABLE `loc_report_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'ID', `ip` varchar(20) DEFAULT NULL COMMENT '客户端ip地址', `msg_body` text COMMENT '消息体', `msg_response` text COMMENT '消息回复', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `update_time` datetime DEFAULT NULL COMMENT '更新时间', `deleted` int(1) DEFAULT '0' COMMENT '是否删除:0-否;1-已删除', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=50577 DEFAULT CHARSET=utf8mb4 ``` ``` /** * 记录同步日志信息 * TODO:改成AOP方式 * @param bytes * @param clientChannel * @param response */ private void saveLog (byte[] bytes,SocketChannel clientChannel,byte[] response){ ReportLog reportLog = new ReportLog(); try { reportLog.setIp(clientChannel.getRemoteAddress().toString().replace("/","")); reportLog.setMsgBody(ByteUtil.bytesToHex(bytes)); reportLog.setMsgResponse(ByteUtil.bytesToHex(response)); reportLogService.insert(reportLog); }catch (Exception e){ e.printStackTrace(); log.warn("日志存储失败"); } } ```