企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] # 编解码类CharsetHelper ~~~ package com.nio; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.CharacterCodingException; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; public class CharsetHelper { private static final String UTF_8 = "UTF-8"; private static CharsetEncoder encoder = Charset.forName(UTF_8).newEncoder(); private static CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder(); public static ByteBuffer encode(CharBuffer in) throws CharacterCodingException { return encoder.encode(in); } public static CharBuffer decode(ByteBuffer in) throws CharacterCodingException{ return decoder.decode(in); } } ~~~ # 服务端类NioServer ~~~ package com.nio; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.*; import java.util.Iterator; public class NioServer { //和操作系统交互的缓存 private ByteBuffer readBuffer; //轮询器 private Selector selector; public static void main(String[] args) { NioServer server = new NioServer(); server.init(); System.out.println("server started --> 8383"); server.listen(); } private void init() { //分配个缓存 readBuffer = ByteBuffer.allocate(1024); ServerSocketChannel serverSocketChannel; try { //创建一个socket channel; channel是nio中对通信通道的抽象,不分入站出站方向 serverSocketChannel = ServerSocketChannel.open(); //设置通道为非阻塞的方式 serverSocketChannel.configureBlocking(false); //将通道绑定在服务器的ip地址和某个端口上 serverSocketChannel.socket().bind(new InetSocketAddress(8383)); //打开一个多路复用器 selector = Selector.open(); //将上面创建好的socket channel注册到selector多路复用器上 //对于复用端来说,一定要先注册一个OP_ACCEPT事件用来响应客户端的连接请求 //将上述的通道管理器和通道绑定,并为该通道注册OP_ACCEPT事件 //注册事件后,当该事件到达时,selector.select()会返回(一个key),如果该事件没到达selector.select()会一直阻塞 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); }catch (Exception e) { e.printStackTrace(); } } private void listen() { while(true) { try { //去询问一次selector选择器 //这是一个阻塞方法,一直等待直到有数据可读,返回值是key的数量(可以有多个) selector.select(); //拿到事件的key //如果channel有数据了,将生成的key访入keys集合中,得到这个keys集合的迭代器 Iterator<SelectionKey> ite = selector.selectedKeys().iterator(); //使用迭代器遍历集合 while (ite.hasNext()) { //遍历到一个事件key,得到集合中的一个key实例 SelectionKey key = ite.next(); //确保不重复处理 //拿到当前key实例之后记得在迭代器中将这个元素删除,非常重要,否则会出错 ite.remove(); //处理事件 handlekey(key); } }catch (Exception e) { e.printStackTrace(); } } } //处理事件 private void handlekey(SelectionKey key) { SocketChannel channel = null; try { //这个key是可接受连接的吗 if (key.isAcceptable()) { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); //接收连接请求 channel = serverChannel.accept(); channel.configureBlocking(false); //对读的事件感兴趣 channel.register(selector,SelectionKey.OP_READ); //这个事件是可读的吗 }else if(key.isReadable()) { channel = (SocketChannel) key.channel(); //先清空一下buffer,因为要用,以防是老的数据 readBuffer.clear(); /** * 当客户端channel关闭后,会不断收到read事件,但没有消息,即read方法返回-1 * 所以这时服务器端也需要关闭channel,避免无限无效的处理 * 把消息read到readBuffer中 */ int count = channel.read(readBuffer); if (count > 0) { // 一定需要调用flip函数,否则读取错误数据 // 简单来说,flip操作就是让读写指针、limit指针复位到正确的位置 readBuffer.flip(); /* * 使用CharBuffer配合取出正确的数据; * String question = new String(readBuffer.array());可能会出错, * 因为前面readBuffer.clear();并未真正清理数据 只是重置缓冲区的position, * limit, mark, 而readBuffer.array()会返回整个缓冲区的内容。 * decode方法只取readBuffer的position到limit数据。 * 例如,上一次读取到缓冲区的是"where", clear后position为0,limit为 1024, * 再次读取“bye"到缓冲区后,position为3,limit不变, * flip后position为0,limit为3,前三个字符被覆盖了,但"re"还存在缓冲区中, 所以 new * String(readBuffer.array()) 返回 "byere", * 而decode(readBuffer)返回"bye"。 */ CharBuffer charBuffer = CharsetHelper.decode(readBuffer); String question = charBuffer.toString(); // 根据客户端的请求,调用相应的业务方法获取业务结果 String answer = getAnswer(question); channel.write(CharsetHelper.encode(CharBuffer.wrap(answer))); } else { // 这里关闭channel,因为客户端已经关闭channel或者异常了 channel.close(); } } }catch (Exception e) { e.printStackTrace(); } } private String getAnswer(String question) { String answer = null; if ("who".equals(question)) { answer = "我是凤姐\n"; } else if ("what".equals(question)) { answer = "我是来帮你解闷的\n"; } else if ("where".equals(question)) { answer = "我来自外太空\n"; } else if ("hi".equals(question)) { answer = "hello\n"; } else if ("bye".equals(question)) { answer = "88\n"; } else { answer = "请输入 who, 或者what, 或者where"; } return answer; } } ~~~ # 客户端NioClient ~~~ package com.nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; public class NioClient implements Runnable{ private BlockingQueue<String> words; private Random random; public static void main(String[] args) { // 多个线程发起Socket客户端连接请求 for(int i=0; i<10; i++){ NioClient c = new NioClient(); c.init(); new Thread(c).start(); } } @Override public void run() { SocketChannel channel = null; Selector selector = null; try { channel = SocketChannel.open(); channel.configureBlocking(false); // 主动请求连接 channel.connect(new InetSocketAddress("localhost", 8383)); selector = Selector.open(); channel.register(selector, SelectionKey.OP_CONNECT); boolean isOver = false; while(! isOver){ selector.select(); Iterator<SelectionKey> ite = selector.selectedKeys().iterator(); while(ite.hasNext()){ SelectionKey key = ite.next(); ite.remove(); if(key.isConnectable()){ if(channel.isConnectionPending()){ if(channel.finishConnect()){ //只有当连接成功后才能注册OP_READ事件 key.interestOps(SelectionKey.OP_READ); channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord()))); sleep(); } else{ key.cancel(); } } } else if(key.isReadable()){ ByteBuffer byteBuffer = ByteBuffer.allocate(128); channel.read(byteBuffer); byteBuffer.flip(); CharBuffer charBuffer = CharsetHelper.decode(byteBuffer); String answer = charBuffer.toString(); System.out.println(Thread.currentThread().getId() + "---" + answer); String word = getWord(); if(word != null){ channel.write(CharsetHelper.encode(CharBuffer.wrap(word))); } else{ isOver = true; } sleep(); } } } } catch (IOException e) { // TODO } finally{ if(channel != null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } if(selector != null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } } private void init() { words = new ArrayBlockingQueue<String>(5); try { words.put("hi"); words.put("who"); words.put("what"); words.put("where"); words.put("bye"); } catch (InterruptedException e) { e.printStackTrace(); } random = new Random(); } private String getWord(){ return words.poll(); } private void sleep() { try { TimeUnit.SECONDS.sleep(random.nextInt(3)); } catch (InterruptedException e) { e.printStackTrace(); } } private void sleep(long l) { try { TimeUnit.SECONDS.sleep(l); } catch (InterruptedException e) { e.printStackTrace(); } } } ~~~