[toc]
## 一、链路保持
链路保持流程应遵循以下规定:
1. 下级平台登录成功后,在与上级平台之间如果有应用业务数据包往来的情况下,不需要发送链路保持数据包;否则,下级平台应每隔1min发送一个链路保持请求数据包到上级平台以保持链路连接;
2. 在没有应用数据包往来的情况下,上级平台连续3min未收到下级平台发送的链路保持请求数据包,则认为与下级平台的连接中断,将主动断开数据传输链路;
3. 在没有应用数据包往来的情况下,下级平台连续3min未收到上级平台发送的链路保持应答数据包,则认为与上级平台的连接中断,将主动断开数据传输从链路。
解析: 第一条和第三条都是关于客户端的实现。第二条是服务端的实现。
## 二、NIO长连接ChannelServer
长连接的实现方案:
- 基于Socket+多线程;
- Java NIO;
- Mina;
- Netty;
- QuickServer。
本案例,基于性能+简洁的考虑,我们基于Java的NIO实现长连接服务端。
### 代码结构
```
location-center
|-- src
|-- main
|-- java
|-- com.zihan.location.parser
|-- AuthHelper.java
|-- LocationParserHelper.java
|-- ByteMeta.java
|-- socket
|-- alive
|-- ChannelServer.java ## 长连接socketServer
|-- Protocol.java ## socket操作
|-- ProtocolImpl.java ## socket操作实现
```
### NIO服务端开发步骤
写代码之前,我们先总结一下使用Java NIO进行服务端开发的步骤:
1. 创建ServerSocketChannel,配置它为非阻塞模式;
2. 绑定监听,配置TCP参数,例如backlog大小;
3. 创建一个独立的I/O线程,用于轮询多路复用器Selector;
4. 创建Selector,将之前创建的ServerSocketChannel注册到Selector上,监听`SelectionKey.ACCEPT`;
5. 启动I/O线程,在循环体中执行Selector.select()方法,轮询就绪的Channel;
6. 当轮询到了处于就绪状态的Channel时,需要对其进行判断,如果是`OP_ACCEPT`状态,说明是新的客户端接入,则调用`ServerSocketChannelaccept()`方法接受新的客户端;
7. 设置新接入的客户端链路SocketChannel为非阻塞模式,配置其他的一些TCP参数;
8. 将SocketChannel注册到Selector,监听`OP_READ`操作位;
9. 如果轮询的Channel为OP_READ,则说明SocketChannel中有新的就绪的数据包需要读取,则构造ByteBuffer对象,读取数据包;
10. 如果轮询的Channel为`OP_WRITE`,说明还有数据没有发送完成,需要继续发送。
```java
@Component
@Slf4j
public class ChannelServer {
private static final int BUFFSIZE=1024;
private static final int TIMEOUT= 180000;
private static final int PORT = 8089;
@Resource
private Protocol protocol;////创建处理协议类型
public void sendMessagestartSocketServer() {
try {
Selector selector = Selector.open();//创建选择器
//打开监听信道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
//与端口绑定
serverChannel.socket().bind(new InetSocketAddress(PORT));
//设置非阻塞模式
serverChannel.configureBlocking(false);
//将选择器注册到监听信道,只有非阻塞信道才可以注册,并指出该信道可以Accept
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true){
//等待某信道就绪(或超时)
if(selector.select(TIMEOUT)==0){
log.info("无客户端连接,系统等待中");
continue;
}
//取得迭代器selectedKey()中包含了每个准备好某一操作信道的selectionKey
Iterator it = selector.selectedKeys().iterator();
while(it.hasNext()){
SelectionKey key = (SelectionKey) it.next();
try{
if(key.isAcceptable()){
//有客户端请求时
//接受请求并处理 处理该key
protocol.handleAccept(key);
}
if(key.isValid() && key.isReadable()){
//从客户端取数据
protocol.handleRead(key);
}
// if(key.isValid() && key.isWritable()){
// //客户端可写时进行发送
// Thread.sleep(1000);
// if (response.length > 0){
// protocol.handlWrite(key, ByteBuffer.wrap(response));
// }
// }
}catch(Exception e){
it.remove();
continue;
}
it.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
```
### Protocol接口
Protocol接口,定义了socket服务端在读、写状态时,可以进行的几种操作。
- 客户端连接时,注册一个通道到选择器;
- 通道状态为可读时,读取缓冲区中的数据;
- 通道状态为可写时,向写缓冲区写入数据。
```java
public interface Protocol {
void handleAccept(SelectionKey skey) throws IOException;
void handleRead(SelectionKey skey) throws IOException;
void handlWrite(SelectionKey skey, ByteBuffer bf) throws IOException;
}
```
Protocol接口实现:
```
@Slf4j
@Service
public class ProtocolImpl implements Protocol {
@Resource
private ReportLogService reportLogService;
private int buffSize = 2048;
/**
* 接收一个socketChannel的处理
*/
@Override
public void handleAccept(SelectionKey skey) throws IOException {
// TODO Auto-generated method stub
SocketChannel clientChannel = ((ServerSocketChannel)skey.channel()).accept();
clientChannel.configureBlocking(false);
clientChannel.register(skey.selector(), SelectionKey.OP_READ,
ByteBuffer.allocate(buffSize));
log.info("clientChannel"+clientChannel);
}
/**
* 向一个socketChannel写入
*/
@Override
public void handlWrite(SelectionKey skey , ByteBuffer bf) throws IOException {
// TODO Auto-generated method stub
SocketChannel clientChannel = (SocketChannel) skey.channel();
clientChannel.write(bf);
skey.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
/**
* 接收一个socketChannel的处理
*/
@Override
public void handleRead(SelectionKey skey) throws IOException {
// 获得客户端通讯信道
SocketChannel clientChannel = (SocketChannel) skey.channel();
log.info("数据读取中:"+clientChannel);
// 得到清空缓冲区
ByteBuffer bf = (ByteBuffer) skey.attachment();
bf.clear();
long bytesRead;
try {
bytesRead = clientChannel.read(bf);
}catch (IOException e){
skey.cancel();
clientChannel.socket().close();
clientChannel.close();
return;
}
if (bytesRead == -1) {
clientChannel.close();
} else {
bf.flip();
byte[] bytes = bf.array();
log.info("服务器收到来自"+clientChannel.getRemoteAddress()+"的消息:"+ByteUtil.bytesToHex(bytes));
this.packageParse(bytes,clientChannel);
//根据bytes完成业务解析功能
skey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
}
}
```
## 三、粘包与拆包
客户端为了提高Socket通讯的效率,往往在开发过程中,不是有一条消息就发送一条请求。而是,缓冲区快满的时候再把数据发送出去。因此,我们会遇到粘包、拆包的一些情形。
**什么是粘包?**
简单来讲,就是发送端需要等缓冲区满才发送出去,这样发送数据包就是粘包。
因此,我们要对粘包按照协议规定的头、尾信息进行拆分。然后再通过遍历的方式,分别处理数据包中的业务请求。
```
private void packageParse(byte[] bytes,SocketChannel clientChannel) throws IOException {
//解决粘包问题
byte[] response = new byte[0];
//整包解析
List<byte[]> list = this.packageCut(bytes);
for (byte[] item : list){
try {
response = ArrayUtils.addAll(response,LocationParserHelper.buildResponse(item,clientChannel));
} catch (Exception e) {
log.error(e.getMessage());
e.printStackTrace();
response = ArrayUtils.addAll(response,ResponseHelper.error());
}
}
clientChannel.write(ByteBuffer.wrap(response));
//记录日志
this.saveLog(bytes,clientChannel,response);
}
/**
* 拆包
* @param bytes
* @return
*/
private List<byte[]> packageCut(byte[] bytes){
List<byte[]> list = new ArrayList<>();
int start =0 ,end = 0;
for (int i = 0; i<bytes.length ;i++){
byte item = bytes[i];
if (item == MsgConfig.getSTARTER()){
start = i;
i += MsgConfig.getHeaderLength();//提升解包效率
}else if (item == MsgConfig.getEND()){
end = i;
if (end > start) {
log.info("拆包成功:解析到1条数据,start=" + start + ",end=" + end + ",i=" + i);
byte[] sub = ByteUtil.subBytes(bytes, start, end - start + 1);
String s = ByteUtil.bytesToHex(sub);
if (!s.startsWith("5b")){
log.info("拆包得到异常包:" + s);
}else {
log.info("拆包得到数据为:" + ByteUtil.bytesToHex(sub));
list.add(sub);
}
start = end + 1;
}
}
}
log.info("拆包成功,得到" + list.size() + "条数据");
return list;
}
```
## 四、日志记录
另外,这个通讯框架中我们还可以加入日志的功能,把通讯过程记录作完整的记录。
### 数据库设计
```
CREATE TABLE `loc_report_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`ip` varchar(20) DEFAULT NULL COMMENT '客户端ip地址',
`msg_body` text COMMENT '消息体',
`msg_response` text COMMENT '消息回复',
`create_time` datetime DEFAULT NULL COMMENT '创建时间',
`update_time` datetime DEFAULT NULL COMMENT '更新时间',
`deleted` int(1) DEFAULT '0' COMMENT '是否删除:0-否;1-已删除',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=50577 DEFAULT CHARSET=utf8mb4
```
```
/**
* 记录同步日志信息
* TODO:改成AOP方式
* @param bytes
* @param clientChannel
* @param response
*/
private void saveLog (byte[] bytes,SocketChannel clientChannel,byte[] response){
ReportLog reportLog = new ReportLog();
try {
reportLog.setIp(clientChannel.getRemoteAddress().toString().replace("/",""));
reportLog.setMsgBody(ByteUtil.bytesToHex(bytes));
reportLog.setMsgResponse(ByteUtil.bytesToHex(response));
reportLogService.insert(reportLog);
}catch (Exception e){
e.printStackTrace();
log.warn("日志存储失败");
}
}
```
- 第一章 开篇寄语
- 1-1 技术选型要点
- 1-2 认识905.4王国的交流规范
- 1-3 联系作者
- 第二章 Socket编程的基础知识
- 2-1 Socket家族的基石
- 2-2 byte数组基础
- 2-3 缓冲区基础
- 2-4 NIO Socket通讯的工作原理
- 第三章 905.4规范解读
- 3-1 基于通道选择器的Socket长连接及消息读写框架
- 3-2 严格的信件收发员
- 3-3 负责消息处理的一家子
- 3-4 负责认证的大儿子(AuthWorker)
- 3-5 哑巴老二(PingWoker)
- 3-6 勤奋的定位汇报员老三(LocationReportWorker)
- 3-7 精明的老四(BusinessReportWorker)
- 3-8 数据检察官——CRC16-CCITT校验
- 3-11 数据的加密官
- 3-12 头尾标识转义
- 第四章 测试方法
- 4-1 测试数据样例
- 4-2 客户端链路保持功能实现
- 4-3 使用Socket短连接进行功能测试
- 4-4 NIO服务端性能分析
- 4-5 http测试方法(推荐)
- 第五章 从NIO到netty
- 5-1 编程进阶——Netty核心基础
- 5-2 Netty使用常见问题
- 5-3 使用Netty重写Server端
- 5-4 Netty之链路管理
- 5-5 netty堆外内存泄漏如何应对?
- 第六章 统计与监控
- 6-1 Grafana监控面板
- 第七章 售后服务
- 7-1 勘误与优化
- 7-2 获取源码