企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 发送字符串代码 ## service **EchoServer** ~~~ package com.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void start() throws InterruptedException { EventLoopGroup eventLoopGroup = null; try{ //server端引导类 ServerBootstrap serverBootstrap = new ServerBootstrap(); //连接池处理数据 eventLoopGroup = new NioEventLoopGroup(); //指定通道类型为NioServerSocketChannel,一种异步模式,OIO阻塞模式为OioServerSocketChannel // 设置InetSocketAddress让服务器监听某个端口已等待客户端连接 // 设置childHandler执行所有的连接请求 serverBootstrap.group(eventLoopGroup) .channel(NioServerSocketChannel.class) .localAddress("127.0.0.1",port) .childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel channel) throws Exception { //进 //注册两个InboundHandler,执行顺序为注册顺序 //所以应该是InboundHandler1,InboundHandler2 //出 //注册两个OutboundHandler,执行顺序为注册顺序的逆序 //所以应该是OutboundHandler2,OutboundHandler1 //在我业务处理中增加一系列的流水线,业务经过这些流水线就能得到结果了 channel.pipeline().addLast(new EchoInHandler1()); channel.pipeline().addLast(new EchoOutHandler1()); channel.pipeline().addLast(new EchoOutHandler2()); channel.pipeline().addLast(new EchoInHandler2()); } }); //最后绑定服务器等待直到绑定完成,调用sync()方法会阻塞直到服务器完成绑定 ChannelFuture channelFuture = serverBootstrap.bind().sync(); System.out.println("开始监听,端口为: "+channelFuture.channel().localAddress()); //等待channel关闭,因为使用sync(),所以关闭操作也会被阻塞,调用sync()方法会阻塞直到服务器关闭 channelFuture.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { // 阻塞等待线程组关闭 eventLoopGroup.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { new EchoServer(20000).start(); } } ~~~ **EchoInHandler1** ~~~ package com.netty; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; public class EchoInHandler1 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("in1"); // 用fireChannelRead发送到下一个InboundHandler ctx.fireChannelRead(msg); //这个方法走完会走channelReadComplete } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); //刷新后才将数据发出到SocketChannel } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //如果发生异常走这个 cause.printStackTrace(); //把连接关闭 ctx.close(); } } ~~~ **EchoInHandler2** ~~~ package com.netty; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.Date; public class EchoInHandler2 extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("in2"); ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); //解码 String body = new String(req, "UTF-8"); System.out.println("接收客户端数据:" + body); //向客户端写数据 System.out.println("server向client发送数据"); String currentTime = new Date(System.currentTimeMillis()).toString(); //把数据变成ByteBuf ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes()); //开始准备发,然后会走OutHandler ctx.write(resp); //方法执行完会走channelReadComplete } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { //刷新后才将数据发出到SocketChannel ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //异常处理 cause.printStackTrace(); ctx.close(); } } ~~~ **EchoOutHandler1** ~~~ package com.netty; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; public class EchoOutHandler1 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("out1"); //msg就是InHandler发过来的,你可以对他再次加工 System.out.println(msg); ctx.write(msg); ctx.flush(); } } ~~~ **EchoOutHandler2** ~~~ package com.netty; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; public class EchoOutHandler2 extends ChannelOutboundHandlerAdapter { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { System.out.println("out2"); ctx.write(msg); // super.write(ctx, msg, promise); } } ~~~ ## client **EchoClient** ~~~ package com.nettyClient; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { EventLoopGroup nioEventLoopGroup = null; try { // 客户端引导类 Bootstrap bootstrap = new Bootstrap(); // EventLoopGroup可以理解为是一个线程池,这个线程池用来处理连接、接受数据、发送数据 nioEventLoopGroup = new NioEventLoopGroup(); bootstrap.group(nioEventLoopGroup)//多线程处理 .channel(NioSocketChannel.class)//指定通道类型为NioServerSocketChannel,一种异步模式,OIO阻塞模式为OioServerSocketChannel .remoteAddress(new InetSocketAddress(host, port))//地址 .handler(new ChannelInitializer<SocketChannel>() {//业务处理类 @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler());//注册handler } }); // 连接服务器 ChannelFuture channelFuture = bootstrap.connect().sync(); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { nioEventLoopGroup.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoClient("127.0.0.1", 20000).start(); } } ~~~ **EchoClientHandler** ~~~ package com.nettyClient; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> { // 客户端连接服务器后被调用 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端连接服务器,开始发送数据……"); byte[] req = "QUERY TIME ORDER".getBytes();//消息 ByteBuf firstMessage = Unpooled.buffer(req.length);//创建一个空的ByteBuff用于缓存即将发送的数据 firstMessage.writeBytes(req);//发送 ctx.writeAndFlush(firstMessage);//flush } //从服务器接收到数据后调用 @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { System.out.println("client 读取server数据.."); // 服务端返回消息后 ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); System.out.println("服务端返回的数据为 :" + body); } // • 发生异常时被调用 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("client exceptionCaught.."); // 释放资源 ctx.close(); } } ~~~