💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
在学习netty源码之前,应该对netty的基本用法有所了解,由于netty大多数时候用于开发服务器端程序,因此下面以一个时间服务器为例,演示Netty的基本使用,并对主要概念进行介绍。 ## 2.1 服务器启动程序 时间服务器很简单,每次收到`QUERY TIME ORDER`请求后返回当前时间。 1. main方法中通过ServerBootstrap启动netty服务器 ``` //创建两个线程组,专门用于网络事件的处理,Reactor线程组 //一个用来接收客户端的连接, //一个用来进行SocketChannel的网络读写 EventLoopGroup bossGroup=new NioEventLoopGroup(); EventLoopGroup workGroup=new NioEventLoopGroup(); try{ //辅助启动类 ServerBootstrap b=new ServerBootstrap(); b.group(bossGroup,workGroup) // 注册两个线程组 .channel(NioServerSocketChannel.class)//创建的channel为NioServerSocketChannel【nio-ServerSocketChannel】 .option(ChannelOption.SO_BACKLOG, 1024) // 设置TCP属性 .childOption(ChannelOption.SO_KEEPALIVE, true) //配置accepted的channel属性 .childHandler(new ChildChannelHandler());//处理IO事件的处理类,处理网络事件 ChannelFuture f=b.bind(port).sync();//绑定端口后同步等待 f.channel().closeFuture().sync();//阻塞 }catch(Exception e){ e.printStackTrace(); }finally{ bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } ``` 2. 定义ChannelInitializer,会在ServerChannel注册到事件循环后触发initChannel事件。 ``` // ChannelHandler初始化处理器 class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeServerHandler()); } } ``` 3. TimeServerHandler 里面负责处理业务逻辑,发送当前时间 ``` public class TimeServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf=(ByteBuf) msg;//将msg转换成Netty的ByteBuf对象 byte[] req=new byte[buf.readableBytes()]; buf.readBytes(req); String body=new String(req,"GBK"); System.out.println("The time server receive order : "+body); String currentTime="QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(System.currentTimeMillis()).toString():"BAD ORDER"; ByteBuf resp=Unpooled.copiedBuffer(currentTime.getBytes()); ctx.write(resp);//只是写入缓冲区 } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush();//通过网络发送 } } ``` ## 2.2 过程解析 1. 创建辅助启动类ServerBootstrap,并设置相关配置: * group() 设置处理Accept事件和读写操作的事件循环组 * channel() 设置通道类型为NioServerSocketChannel,这是netty自己定义的Channel,指的是服务器通道,内部包含java中的ServerSocketChannel,相应的还有客户端通道NioSocketChannel。 * option()/childOption() 设置服务器通道的选项和建立连接后的客户端通道的选项 * childHandler() 设置子处理器,内部需要将户自定义的处理器加入到netty中,这涉及到Channel,ChannelHandler和Pipeline,后续会有讲解 2.调用bind()方法绑定端口,sync()会阻塞等待处理请求。这是因为bind()方法是一个异步过程,会立即返回一个ChannelFuture对象,调用sync()会等待执行完成。 3.获得Channel的closeFuture阻塞等待关闭,服务器Channel关闭时closeFuture会完成。 > Future的使用参见【第四章 Future和Promise】 ## 2.3 相关概念 在学习Netty的源码之前,需要对Netty的主要概念进行了解,主要是初步明白每个类负责的任务是什么,能够完成哪些工作。当然,每个概念的具体实现会在后续章节中进行介绍。 **Channel** 这里的Channel与Java的Channel不是同一个,是netty自己定义的通道;Netty的Channel是对网络连接处理的抽象,负责与网络进行通讯,支持NIO和OIO两种方式;内部与网络socket连接,通过channel能够进行I/O操作,如读、写、连接和绑定。 通过Channel可以执行具体的I/O操作,如read, write, connect, 和bind。在Netty中,所有I/O操作都是异步的;Netty的服务器端处理客户端连接的Channel创建时可以设置父Channel。例如:ServerSocketChannel接收到请求创建SocketChannel,SocketChannel的父为ServerSocketChannel。 **ChannelHandler与ChannelPipeline** ChannelHandler是通道处理器,用来处理I/O事件或拦截I/O操作,ChannelPipeline字如其名,是一个双向流水线,内部维护了多个ChannelHandler,服务器端收到I/O事件后,每次顺着ChannelPipeline依次调用ChannelHandler的相关方法。 ChannelHandler是个接口,通常我们在Netty中需要使用下面的子类: * ChannelInboundHandler 用来处理输入的I/O事件 * ChannelOutboundHandler 用来处理输出的I/O事件 另外,下面的adapter类提供了 * ChannelInboundHandlerAdapter 用来处理输入的I/O事件 * ChannelOutboundHandlerAdapter 用来处理输出的I/O事件 * ChannelDuplexHandler 可以用来处理输入和输出的I/O事件 Netty的ChannelPipeline和ChannelHandler机制类似于Servlet和Filter过滤器/拦截器,每次收到请求会依次调用配置好的拦截器链。Netty服务器收到消息后,将消息在ChannelPipeline中流动和传递,途经的ChannelHandler会对消息进行处理,ChannelHandler分为两种inbound和outbound,服务器read过程中只会调用inbound的方法,write时只寻找链中的outbound的Handler。 ChannelPipeline内部维护了一个双向链表,Head和Tail分别代表表头和表尾,Head作为总入口和总出口,负责底层的网络读写操作;用户自己定义的ChannelHandler会被添加到链表中,这样就可以对I/O事件进行拦截和处理;这样的好处在于用户可以方便的通过新增和删除链表中的ChannelHandler来实现不同的业务逻辑,不需要对已有的ChannelHandler进行修改。 ![NettyPipeline](http://web.uxiaowo.com/netty/Future/Pipeline.png) 如图所示,在服务器初始化后,ServerSocketChannel的会创建一个Pipeline,内部维护了ChannelHanlder的双向链表,读取数据时,会依次调用ChannelInboundHandler子类的channelRead()方法,例如:读取到客户端数据后,依次调用解码-业务逻辑-直到Tail。 而写入数据时,会从用户自定义的ChannelHandler出发查找ChannelOutboundHandler的子类,调用channelWrite(),最终由Head的write()向socket写入数据。例如:写入数据会通过业务逻辑的组装--编码--写入socket(Head的write)。 **EventLoop与EventLoopGroup** EventLoop是事件循环,EventLoopGroup是运行在线程池中的事件循环组,Netty使用了Reactor模型,服务器的连接和读写放在线程池之上的事件循环中执行,这是Netty获得高性能的原因之一。事件循环内部会打开selector,并将Channel注册到事件循环中,事件循环不断的进行select()查找准备就绪的描述符;此外,某些系统任务也会被提交到事件循环组中运行。 **ServerBootstrap** ServerBootstrap是辅助启动类,用于服务端的启动,内部维护了很多用于启动和建立连接的属性。包括: * EventLoopGroup group 线程池组 * channel是通道 * channelFactory 通道工厂,用于创建channel * localAddress 本地地址 * options 通道的选项,主要是TCP连接的属性 * attrs 用来设置channel的属性, * handler 通道处理器 ## 2.4 启动过程 了解了上面的概念后,我们再来根据程序说明一下服务器的启动过程,主要分为四个阶段: * 配置config:设置启动器/服务器通道/客户端通道等相关配置; * 初始化init:主要功能是打开java的serversocketchannel,内部会初始化Netty的Channel及其ChannelPipeline; * 注册register:将初始化后的Netty-Channel注册到事件循环的selector上面。具体过程:将打开netty的Channel注册到线程池组的selector上;触发Pipeline上面ChannelHandler的channelRegistered,至此注册完毕; * 绑定bind:将java的ServerSocketChannel绑定到本地的端口上面,结束后使用fireChannelActive通知Pipeline里的ChannelHandle,执行其channelActive方法; 由于注册阶段是异步的,绑定阶段会与之同时进行,因此注册阶段完毕后会判断绑定阶段是否结束从而触发channelActive。在启动完毕后,会建立下图中的连接结构: ![NettyPipeline](http://web.uxiaowo.com/netty/Future/Channel.png) Netty的Channel一端与java的Channel相连接,可以进行网络I/O操作;另一端与Pipeline连接,用来执行业务逻辑。一旦事件循环组中的EventLoop在循环中select()到准备就绪的I/O描述符后,就会交给NettyChannel处理,NettyChannel交给Pipeline的链表进行业务逻辑处理。