💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
# Netty入门 ## 一、Netty概述 > Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端。《官网》 使用Netty的框架有: * RocketMQ - 阿里巴巴开源的消息队列。 * gRPC - rpc 框架。 * Dubbo - rpc 框架。 * Zookeeper - 分布式协调框架。 * Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端。 Netty框架与Java的nio相比: * NIO工作量大,bug多。 * NIO需要自己构建协议。 * NIO需要自己解决TCP的传输问题。 * NIO的epoll空轮询可能会导致CPU100%的问题。 * Netty对NIO的API进行了增强,例如将ThreadLocal增强成FastThreadLocal,将ByteBuffer增强成ByteBuf。 &nbsp; ## 二、使用 1. 引入依赖 ~~~ <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.39.Final</version> </dependency> ~~~ 2. 服务端代码 ~~~ new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) { System.out.println(msg); } }); } }) .bind(8080); ~~~ - ServerBootstrap 服务端的启动类。 - NioEventLoopGroup 类似于线程池+Selector。 - childHandler 用于处理SocketChannel的内容,可以理解成Server和Client是父子关系。 - pipeline 管道,可以理解成每个Handler都是一个工序,管道由这些工序组成。 3. 客户端代码 ~~~ new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }) .connect("127.0.0.1", 8080) .sync() .channel() .writeAndFlush(new Date() + ": hello world!"); ~~~ - Bootstrap 客户端启动类。 - sync 表示同步连接建立后才能调用writeAndFlush往服务端输出内容。 **注意:** 在客户端和服务端中接受数据和发送数据都会去调用Handler处理器的方法进行处理。可以将channel理解为数据的通道,pipeline理解为处理数据的流水线,而Handler就是该流水线中的每道工序。 而eventLoop可以理解为处理数据的工人,由单线程的线程池组成。 &nbsp; ## 三、组件 ### 3.1 EventLoop EventLoop表示事件循环对象,对应着一个线程,其本质是一个单线程的线程池,同时里面维护了一个Selector。通过里面的run方法源源不断的处理Channel上的IO事件。 EventLoop继承自OrderedEventExecutor和EventLoopGroup,里面有个parent方法用来判断属于哪个EventLoopGroup。 &nbsp; ### 3.2 EventLoopGroup EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(**这样做可以保证该channel上 io 事件处理时的线程安全**)。 :-: ![](https://img.kancloud.cn/da/4f/da4fc6b42f35878ba0ac24187594180b_945x159.png) EventLoopGroup有多个实现类,常用的有: ~~~  MultithreadEventLoopGroup  NioEventLoopGroup  DefaultEventLoopGroup 只能执行普通任务和定时任务不能执行IO任务 ~~~ 实现类需要实现如下方法: * 实现了 Iterable 接口提供遍历 EventLoop。 * 实现 next 方法获取集合中下一个 EventLoop。 NioEventLoopGroup的默认线程数由*DEFAULT\_EVENT\_LOOP\_THREADS*这个参数配置: :-: ![](https://img.kancloud.cn/f9/f4/f9f4e9fa9f22228dbdc00a2112cbe4aa_1014x200.png) 默认为CPU核心数的两倍。 NioEventLoopGroup可以设置一个Boss EventLoop用于处理accept事件,多个Worker EventLoop用于处理SocketChannel IO 读写事件。两个Worker会轮流处理事件,同时是绑定channel进行处理。 channel交由特点的channel处理的关键代码如下: ~~~ static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); // 下一个 handler 的事件循环是否与当前的事件循环是同一个线程 EventExecutor executor = next.executor(); // 是,直接调用 if (executor.inEventLoop()) { // 判断当前handler中的线程是否和executor是同一个线程 next.invokeChannelRead(m); } // 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人) else { executor.execute(new Runnable() { // 下一个handler的线程 @Override public void run() { next.invokeChannelRead(m); } }); } } ~~~ 如果两个Handler绑定的是同个线程,则直接执行,否则的话交由下一个eventLoop判断执行。 NioEventLoop也可以用于执行普通任务和定时任务: ~~~ NioEventLoopGroup nioWorkers = new NioEventLoopGroup(2); log.debug("server start..."); // 让线程启动起来 Thread.sleep(2000); nioWorkers.execute(()->{ log.debug("normal task..."); }); nioWorkers.scheduleAtFixedRate(() -> { log.debug("running..."); }, 0, 1, TimeUnit.SECONDS); ~~~ &nbsp; **关闭EventLoopGroup** 优雅关闭 `shutdownGracefully` 方法。该方法会首先切换 `EventLoopGroup` 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行。从而确保整体应用是在正常有序的状态下退出的。 默认是2秒后关闭,15秒后超时关闭,由如下两个参数限定: :-: ![](https://img.kancloud.cn/a1/48/a148dac8151e56e7b496b5e7ba1ff13b_535x68.png) shutdownGracefully方法在关闭的时候会进行空循环关闭 :-: ![](https://img.kancloud.cn/81/a9/81a913e467d842c94b4e40b9b53349c6_482x360.png) &nbsp; ### 3.3 Channel 可以理解为数据流通道,其主要的API有: 1. close():关闭Channel。 2. closeFuture():处理Channel的关闭。 3. pipeline():往流水线中添加处理器。 4. write():将输入写入缓冲区中,不一定会立刻刷出。 5. writeAndFlush():写入并立刻将数据刷出。 &nbsp; **ChannelFuture** 注意带有**Future、promise**名称的都是和异步方法一起使用的。 ~~~ ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { ch.pipeline().addLast(new StringEncoder()); } }) // 异步非阻塞,由main线程发起调用,真正执行 connect 的是nio线程 .connect("127.0.0.1", 8080); // 1 channelFuture.sync().channel().writeAndFlush(new Date() + ": hello world!"); ~~~ 由于ChannelFuture本身是异步非阻塞的,所以才需要调用sync方法等待连接后获取channel对象。也可以使用回调的方法获取连接: ~~~ // addListener(回调对象)也可以异步处理结果,nio线程连接建立之后,就会调用operationComplete方法,处理channel的是nio线程 channelFuture.addListener((ChannelFutureListener) future -> { System.out.println(future.channel()); // 2 }); ~~~ &nbsp; **CloseFuture** 调用closeFuture方法可以获取CloseFuture对象,可用于同步或者异步处理关闭。 ~~~ // 获取 CloseFuture 对象, 1) 同步处理关闭, 2) 异步处理关闭,回调函出,nio线程处理 ChannelFuture closeFuture = channel.closeFuture(); /*log.debug("waiting close..."); closeFuture.sync(); log.debug("处理关闭之后的操作");*/ closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { log.debug("处理关闭之后的操作"); // gracefully 优雅的 group.shutdownGracefully(); } }); ~~~ &nbsp; 总的来说,Netty使用异步操作,用不同的线程来发起连接建立,另外一个线程来真正的建立连接,这样可以提高效率。原因如下: * 单线程没法异步提高效率,必须配合多线程、多核 cpu 才能发挥异步的优势,类似于cpu的**流水线指令**。 * 异步**并没有缩短响应时间**,反而有所增加,提高的是吞吐量,即单位时间的处理量。 * 合理进行任务拆分,也是利用异步的关键。 &nbsp; ### 3.4 Future & Promise Netty的Future继承自JDK的Future,而Promise又继承自Netty的Future,三者有如下的区别: * jdk Future **只能**同步等待任务结束(或成功、或失败)才能得到结果,通过get方法来进行同步的阻塞。 * netty Future 可以同步等待任务结束得到结果(get方法同步阻塞),也可以异步方式(addListener)得到结果,但都是要等任务结束才能得到结果。 * netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。 三者的使用方式如下: ~~~ public class TestFuture { // 测试JDK Future public void testJKDFuture() { ExecutorService service = Executors.newFixedThreadPool(2); Future<Integer> future = service.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { log.debug("执行计算"); Thread.sleep(1000); return 50; } }); // 主线程通过future来获取结果 log.debug("等待结果"); // get方法会阻塞获取 log.debug("结果是 {}", future.get()); } // 测试 netty Future public void testNettyFuture() { NioEventLoopGroup group = new NioEventLoopGroup(); EventLoop eventLoop = group.next(); eventLoop.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { log.debug("执行计算"); Thread.sleep(1000); return 70; } }); // 同步等待 log.debug("等待结果..."); log.debug("结果是{}", future.get()); // 异步等待 future.addListener(new GenericFutureListener<Future<? super Integer>>() { @Override public void operationComplete(Future<? super Integer> future) throws Exception { log.debug("接收结果{}", future.getNow()); } }) } // promise public void testNettyPromise() { DefaultEventLoop eventExecutors = new DefaultEventLoop(); DefaultPromise<Integer> promise = new DefaultPromise<>(eventExecutors); eventExecutors.execute(()->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("set success, {}",10); // 手动装入结果 promise.setSuccess(10); }); log.debug("start..."); log.debug("{}",promise.getNow()); // 还没有结果 // get方法是同步阻塞的 log.debug("{}",promise.get()); } } ~~~ Future和Promise都可以看成一个容器,用于不同线程之间交互数据,例如在线程池中执行完后的结果在主线程中被获取。而使用Promise则可以更加灵活的处理数据,不仅仅是通过返回值来获取。 &nbsp; ### 3.5 Pipeline & Handler ChannelHandler 用来处理 Channel 上的各种事件,分为入站、出站两种。所有 ChannelHandler 被连成一串,就是 Pipeline、 * 入站处理器通常是 ChannelInboundHandlerAdapter 的子类,主要**用来读取客户端数据,写回结果**。 * 出站处理器通常是 ChannelOutboundHandlerAdapter 的子类,主要对**写回结果进行加工**。 打个比喻,每个 Channel 是一个产品的加工车间,Pipeline 是车间中的流水线,ChannelHandler 就是流水线上的各道工序,而 ByteBuf 是原材料,经过很多工序的加工:先经过一道道入站工序,再经过一道道出站工序最终变成产品。 Netty会对每个pipeline添加上header handler和tail handler,形成一个双向链表。出入站工序处理顺序的举例: ~~~java new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { protected void initChannel(NioSocketChannel ch) { ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(1); ctx.fireChannelRead(msg); // 1 } }); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(2); ctx.fireChannelRead(msg); // 2 } }); ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { System.out.println(3); ctx.channel().write(msg); // 3 } }); // 出站处理器需要有写入操作才会触发,出站是按照加入pipeline的顺序相反。 ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(4); ctx.write(msg, promise); // 4 } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(5); ctx.write(msg, promise); // 5 } }); ch.pipeline().addLast(new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { System.out.println(6); ctx.write(msg, promise); // 6 } }); } }) .bind(8080); ~~~ 总共添加了三道入站处理器和三道出站处理器工序,对于每个入站处理器,需要在处理完数据最后调用**ChannelHandlerContext::fireChannelRead(msg)**,来将msg交由给下一道入站处理器处理。对于出站处理器,需要由写出数据的时候才会执行,同时是按照添加的顺序逆序执行的,调用ChannelHandlerContext::channel()::write(msg)会将处理工序指向到pipeline的tail节点,之后向前处理出站处理器。调用ChannelHandlerContext::write()会将工序指向当前节点的上一个节点。完整的结构如图所示: :-: ![](https://img.kancloud.cn/8b/a7/8ba73726c2999f0a0e4bae576ff8de3d_1005x255.png) 因此输出结果为: ~~~ 1 2 3 6 5 4 ~~~ 可以在pipeline工序中通过msg传递上一个工序处理的msg结果, 跟生活中的流水线一样。为了让数据能够传递,在每个handler中必须调用super.channelRead(ctx, msg)或者是ctx.fireChannelRead(msg); 对于写出数据的方法有如下两个类似的方式 ~~~  ctx.writeAndFlush(ctx.alloc().bytebuff()); // 从当前handler往前找出站处理器  ch.writeAndFlush(ctx.alloc().bytebuff()); // 从tail往前找出站处理器 ~~~ **备注:使用了责任链模式。** &nbsp; ### 3.6 ByteBuf 与nio包的ByteBuffer类似,都是对字节数据的封装,可以看成是ByteBuffer的增强。 #### 3.6.1. 创建方式 ~~~ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(10); ~~~ 从直接内存中一个大小为10字节的ByteBuf,可以自动扩容(ByteBuffer是不能自动扩容的),默认为256字节。 展示ByteBuf的内容,使用自定义的log方法: ~~~ private static void log(ByteBuf buffer) { int length = buffer.readableBytes(); int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4; StringBuilder buf = new StringBuilder(rows * 80 * 2) .append("read index:").append(buffer.readerIndex()) .append(" write index:").append(buffer.writerIndex()) .append(" capacity:").append(buffer.capacity()) .append(NEWLINE); appendPrettyHexDump(buf, buffer); System.out.println(buf.toString()); } ~~~ 输出如下: ~~~ read index:0 write index:0 capacity:10 ~~~ 这种创建方式在windows系统会默认开启池化和分配直接内存。 **池化技术**:类似于线程池可以重用线程,Netty开启池化技术也能重用ByteBuf。其优点有: * 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力。 * 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率。 * 高并发时,池化功能更节约内存,减少内存溢出的可能。 池化可以通过如下参数开启: ~~~ -Dio.netty.allocator.type={unpooled|pooled} # unpooled没有开启,pooled表示开启 ~~~ 4.1 以后,非 Android 平台**默认启用池化实现**,Android 平台启用非池化实现。 &nbsp; **其他分配方式** ~~~ ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(10); // 分配JVM堆内存 ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(10); // 分配直接内存 ~~~ * 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制,系统内存和JDK的内存会进行映射),适合配合池化功能一起用。 * 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放。 :-: ![](https://img.kancloud.cn/ec/7b/ec7ba84c484a9860816e84410a509c48_1192x620.png) #### 3.6.2.组成与API ByteBuf包含四个部分,由读写指针来维护读写操作。 :-: ![](https://img.kancloud.cn/68/e9/68e9afa3f7014e3419b5c60fc98fc001_626x195.png) &nbsp; **写入操作** ByteBuf的写入操作的API有write和set开头的组成,其中write开头的会修改写指针,set开头的不会改变写指针的位置。 &nbsp; **扩容规则** 当容量不够用的时候ByteBuf会自动扩容,扩容规则如下: * 如何写入后数据大小未超过 512字节,则选择下一个 16 的整数倍,例如写入后大小为 12 ,则扩容后 capacity 是 16。 * 如果写入后数据大小超过 512字节,则选择下一个 2^n,例如写入后大小为 513,则扩容后 capacity 是 2^10=1024(2^9=512 已经不够了)。 * 扩容不能超过 max capacity (整数的最大值)会报错。 &nbsp; **读取** read开头的方法,指针会往前移动,可以通过mark或者reset设置重复读取。另外一系列get开头的方法不会移动读指针。 &nbsp; **释放内存** ByteBuf尽量自己手动释放,ByteBuf采用的是“引用计数”(继承ReferencedCounted接口)的方式,当计数为0的时候就会被清理掉。其中调用**release**方法可以让引用计数+1,调用retain可以让引用计数-1。不同的ByteBuf的释放机制不尽相同,在使用的过程中需要让最后处理的Handler进行手动释放ByteBuf。 同时head handler和tail handler也会保证流到这里的ByteBuf会被释放掉。 **复制** 调用copy会进行深度复制操作,对新ByteBuf的操作不会影响原来的ByteBuf。 &nbsp; ByteBuf和ByteBuffer相比的优点: 1. 使用池化技术,重用ByteBuf,提高内存的使用。 2. ByteBuf使用读写指针,ByteBuffer只使用一个指针,需要通过filp,clear,compact不断调整指针的状态。 3. 会自动扩容。 4. 支持链式调用,使用更流畅。 5. 很多地方体现零拷贝,例如 slice、duplicate、CompositeByteBuf,减少内存复制,提高性能。