企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 服务端启动的demo **Server类** ~~~ import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.AttributeKey; public class Server { public static void main(String[] args) { //服务端接收客户端连接的线程,accept NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); //客户端发送消息给服务端的线程 NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { //这个里面是空的,什么也不做 ServerBootstrap b = new ServerBootstrap(); //把这2个线程配置进去 b.group(bossGroup, workerGroup) //设置socketChannel .channel(NioServerSocketChannel.class) //设置客户端连接设置tcp的属性 .childOption(ChannelOption.TCP_NODELAY, true) //每次创建客户端连接的时候绑定些基本属性 .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue") //服务端启动过程中哪些逻辑,这边我写的只是打印出来 //这个handler主要是accept用户的连接 .handler(new ServerHandler()) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //配置数据流读写的处理逻辑.会配置很多 // ch.pipeline().addLast(); //... } }); //绑定端口 ChannelFuture f = b.bind(8888).sync(); f.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } } ~~~ **ServerHandler类** ~~~ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.concurrent.TimeUnit; /** * 主要是accept用户的连接 */ public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("channelActive"); } @Override public void channelRegistered(ChannelHandlerContext ctx) { System.out.println("channelRegistered"); } @Override public void handlerAdded(ChannelHandlerContext ctx) { System.out.println("handlerAdded"); } @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); new Thread(new Runnable() { @Override public void run() { // 耗时的操作 String result = loadFromDB(); ctx.channel().writeAndFlush(result); ctx.executor().schedule(new Runnable() { @Override public void run() { // ... } }, 1, TimeUnit.SECONDS); } }).start(); } private String loadFromDB() { return "hello world!"; } } ~~~ # 服务端启动流程 * 创建服务端Channel 调用JDK底层API的jdk的channel,把他包装成自己的channel,同时创建一些基本组件绑定在自己的channel上面 * 初始化服务端Channel 初始化一些基本属性,添加逻辑处理器器 * 注册selector 将底层的Channel注册到底层的selector轮询器上面,把服务端channel作为一个daragram绑定在jdk底层的channel上面,事件轮询有数据来的时候直接拿daragram * 端口绑定 调用jdk底层的API,绑定端口 ## 创建服务端Channel ~~~ --bind()[用户代码入口] | |--initAndRegister()初始化并注册 | |--newChannel()[创建服务端channel] ~~~ bind()我们一直跟进去会发现调用doBind()方法 在doBind()方法里面发现有 initAndRegister()这个方法这个就是创建服务端的channel方法 到initAndRegister()这个方法里面这里就是创建服务端channel ~~~ //创建服务端channel channel = channelFactory.newChannel(); //初始化channel init(channel); ~~~ 这边是newChannel(),我们到ReflectiveChannelFactory这个类里面看 ~~~ public T newChannel() { try { return clazz.newInstance(); ~~~ 他是反射创建的,那么我们就要看channelFactory了 这个channelFactory在哪里初始化的,又传了哪些channelFactory? 我们回到业务代码那边的.channel(NioServerSocketChannel.class)点channel进去 ~~~ return channelFactory(new ReflectiveChannelFactory<C>(channelClass)); ~~~ channelFactory没做什么事情只是把他原路返回 参数channelClass就是传进来的NioServerSocketChannel 看到他会把这个传进来的进行反射创建channel 那么我们现在要看下NioServerSocketChannel做了什么事情 ## 反射创建服务端Channel ~~~ newSocket()[通过jdk来创建底层jdk channel] | | NioServerSocketChannelConfig()[tcp参数配置类] | | AbstractNioChannel() | |--configureBlocking(false)[阻塞模式] | |--AbstractChannel()[创建id,unsafe,pipeline] ~~~ 我们来看NioServerSocketChannel源码 ~~~ public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } ~~~ 然后点newSocket进去,发现 ~~~ return provider.openServerSocketChannel(); ~~~ 这就是调用jdk底层的创建ServerSocketChannel 这个NioServerSocketChannel构造方法创建完毕会调用这个 ~~~ public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); } ~~~ 他会把jdk创建好的ServerSocketChannel传递进去,就是这边的形参this 如果以后想对这个channel的tcp参数获取可以用NioServerSocketChannelConfig这个 然后我们看上面一行 ~~~ super(null, channel, SelectionKey.OP_ACCEPT); ~~~ super一层层点上去发现AbstractNioChannel里面有这行 ~~~ ch.configureBlocking(false); ~~~ 这行就是设置服务端的channel是非阻塞的 然后我们看 ~~~ AbstractChannel()[创建id,unsafe,pipeline] ~~~ AbstractChannel就是对channel的一个抽象, 服务端,客户端的channel都是继承这个的 这个id就是channel的唯一标识,unsafe是tcp相关的读写,pipeline是处理的逻辑链 ~~~ protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); ~~~ 点这个super到上一层,发现 ~~~ protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); } ~~~ 这些就是设置那几个参数 ## 初始化服务端channel ~~~ init()[初始化入口] | |--set ChannelOptions,ChannelAttrs | |--set ChildOptions, ChildAttrs | |--config handler[配置服务端pipeline] | |--add ServerBootstrapAcceptor[添加连接器] ~~~ 我们从业务层的bind跟进去到dobind方法 然后发现这行 ~~~ final ChannelFuture regFuture = initAndRegister(); ~~~ initAndRegister里面有个init方法,我们用command+option+b找到实现类 ~~~ void init(Channel channel) throws Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { channel.config().setOptions(options); } ~~~ 这就是把我们绑定的channel设置进去,下面也是一样 在这个方法里面在往下看 ~~~ synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size())); } synchronized (childAttrs) { currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size())); } ~~~ 这就是把ChildOptions, ChildAttrs 保存到currentChildOptions和currentChildAttrs这2个局部变量 这个方法往下看 ~~~ p.addLast ~~~ 拿到服务端的pipeline,配置服务端pipeline ~~~ config.handler(); ~~~ 把用户自定义的handler添加进去 handler就是把他保存为bootstrap的变量,然后在外面 ~~~ pipeline.addLast(handler); ~~~ 用这样的方法把用户的逻辑处理链添加进去 然后netty会把一个特殊的channelhandle(添加连接器),他会在你启动的时候添加一个特殊的处理器 还是那个方法往下看 ~~~ pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); ~~~ 在构造ServerBootstrapAcceptor的时候会把前面设置的currentChildOptions和currentChildAttrs和用户代码里面配置的childHandler对应currentChildHandler currentChildGroup就是用户代码里面的group # 服务端socket在哪里初始化? # 在哪里accept连接?