企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# Netty自定义协议 使用Netty可以帮助我们很快的自定义属于自己的协议。说明如何自定义协议之前先来看看Netty是如何解决TCP的粘包和半包问题的。 &nbsp; ## 一、粘包和半包 - 粘包:客户端发送的多次数据被服务端一次性接受,例如客户端分别发送“abc”、“efd”、“hijk”;可能被服务器接收的时候一次性接收成“abcefdhijk”。 - 半包:客户端发送的数据可能会被服务端分多次接收,例如客户端发送“abcefdhijk”,但是服务端却接收成“abc”、“efd”、“hijk”。 &nbsp;&nbsp;&nbsp;&nbsp;粘包和半包的问题的本质是TCP协议采用流式传送,其传送是以字节为单位的,消息没有边界,TCP协议每次发送的大小和接收的大小都会送**滑动窗口**的限制;同时对于应用层读取传输层的数据,并不是每次传输层有数据了就一次性读取,而是有个缓冲区在起作用。 &nbsp;&nbsp;&nbsp;&nbsp;除此之外,粘包现象可能由Nagle算法造成,在发送端中,如果每次发送的字节数目太小,则会根据Nagle算法将字节缓存在缓冲区中,等待后面一次性发送。而半包现象可能是要发送的数据包多大,超过了数据链路层的MTU的限制,导致数据链路层不得不将发送的数据分成多个数据帧发送出去。 &nbsp; **Nagle 算法** 使用tcp协议,即使发送一个字节,也需要加入 tcp 头和 ip 头,也就是总字节数会使用 41 bytes,非常不经济。因此为了提高网络利用率,tcp 希望尽可能发送足够大的数据,这就是 Nagle 算法产生的缘由。该算法是指发送端即使还有应该发送的数据,但如果这部分数据很少的话,则进行延迟发送,具体情况如下: - 如果 SO\_SNDBUF 的数据达到 MSS(MTU - 头部),则需要发送。 - 如果 SO\_SNDBUF 中含有 FIN(表示需要连接关闭)这时将剩余数据发送,再关闭。 - 如果 TCP\_NODELAY = true,则需要发送。 - 已发送的数据都收到 ack 时,则需要发送。 - 上述条件不满足,但发生超时(一般为 200ms)则需要发送。 - 除上述情况,延迟发送。 &nbsp; ### 解决粘包和半包问题的方案 1. 短链接:即发送一个数据包则创建一条tcp链接,这种方式效率很低。 &nbsp; 2. 固定长度:每一条消息采用固定的长度,接收端根据协商好的固定长度一次性读取。 Netty中提供*FixedLengthFrameDecoder*类就是采用固定长度编解码器来解决粘包和半包的问题。 使用: ~~~ ch.pipeline.addLast(new FixedLengthFrameDecoder(8)); # 固定长度为8字节 ~~~ 缺点: - 数据包的长度大小不好把握,长度固定太长则会造成浪费,长度固定太小则会对某些数据包不够使用。 &nbsp; 3. 固定分割符:对每一条消息最后添加分割符、例如‘\n’,‘\r\n’。 Netty中提供了*LineBasedFrameDecoder*来使用‘\n’,或者‘\r\n’来作为分割符。如果要使用自定义的字符的话可以使用*DelimiterBasedFrameDecoder*。 使用: ~~~ ch.pipeline().addLast(new LineBasedFrameDecoder(1024)); ~~~ 服务端加入,默认以 \\n 或 \\r\\n 作为分隔符,如果超出指定长度仍未出现分隔符,则抛出异常。 缺点:处理字符数据比较合适,但如果内容本身包含了分隔符(字节数据常常会有此情况),那么就会解析错误。 &nbsp; 4. 预设长度:每一条消息分为 head 和 body,head 中包含 body 的长度,**常用!** Netty提供*LengthFieldBasedFrameDecoder*来支持预设长度的方式解决粘包和半包问题。 构造函数: ~~~ public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {...} ~~~ - maxFrameLength:数据包的最大长度。 - lengthFieldOffset:长度字段的偏移值,即要从接收的数据包的哪个字节开始才算长度字段,一般都设置为0。 - lengthFieldLength:长度字段占多少个字节。 - lengthAdjustment:长度字段为基准,还有几个字节是内容。 - initialBytesToStrip:从头开始剥离多少个字节是内容。 例如: ~~~ // 表示最大长度为1024个字节,长度字段为第一个节点,并且占1个字节,内容从第一个字节后开始。 ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 1, 0, 1)); ~~~ 客户端: ~~~ public class HelloWorldClient { static final Logger log = LoggerFactory.getLogger(HelloWorldClient.class); public static void main(String[] args) { NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { log.debug("connetted..."); ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.debug("sending..."); Random r = new Random(); char c = 'a'; ByteBuf buffer = ctx.alloc().buffer(); for (int i = 0; i < 10; i++) { byte length = (byte) (r.nextInt(16) + 1); // 先写入长度 头部信息 buffer.writeByte(length); // 再写入数据 body信息 for (int j = 1; j <= length; j++) { buffer.writeByte((byte) c); } c++; } // 所以发送的数据为 length+content,这个length要和服务端的协商好 ctx.writeAndFlush(buffer); } }); } }); ChannelFuture channelFuture = bootstrap.connect("192.168.0.103", 9090).sync(); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("client error", e); } finally { worker.shutdownGracefully(); } } } ~~~ &nbsp; ## 二、协议的设计与解析 其实所谓粘包和半包的问题的本质是如何基于TCP协议设计一个不定长的应用层协议。因为TCP中使基于流的方式传输的,消息是没有边界的。**所以协议的目的就是划定消息的边界,制定通信双方要遵守的通信规则。** 只要是按照协议的,双方就能够通信,例如redis中set方法的协议格式如下: ~~~ set name zhangsan # 其协议的格式如下: *3 \n $3 \n set \n $4 \n name \n $8 \n zhangsan \n ~~~ 则在Netty中可以进行模拟: ~~~ private void set(ChannelHandlerContext ctx) { ByteBuf buf = ctx.alloc().buffer(); buf.writeBytes("*3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("set".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("aaa".getBytes()); buf.writeBytes(LINE); buf.writeBytes("$3".getBytes()); buf.writeBytes(LINE); buf.writeBytes("bbb".getBytes()); buf.writeBytes(LINE); ctx.writeAndFlush(buf); } ~~~ 连接到对应的redis即可进行通信。 &nbsp; ### 2.1 自定义协议要素 **头部head**: 1. 魔数:一般在数据包的前面几个字节,用来第一时间判断是否是无效数据包。 2. 版本号:可以支持协议的升级。 3. 序列化算法:消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk。 4. 指令类型:根据具体业务区分指令类型。 5. 请求序号:为了双工通信双方提供异步通信的能力。 6. 正文长度 **正文body** 消息正文,json、xml、对象流等。