[TOC]
# 简介
**nio优势不在于数据传送的速度**
nio是new io的简称,在jdk1.4里面提供的新api
为所有的原始类型提供(Buffer)缓存支持
字符集编码解码解决方案
channel:一个新的原始i/o抽象
支持锁和内存映射文件的文件访问接口
提供多路(non-bloking)非阻塞式的高伸缩性网络I/O
# socket /nio原理
1. 阻塞和非阻塞
阻塞和非阻塞是进程在访问数据的时候,数据是否准备就绪的一种处理方式。
当数据没有准备好的时候,
阻塞:往往需要等待缓冲区中的数据准备好之后才处理,否则一直等待。
非阻塞:当我们的进程访问我们的数据缓冲区的时候,数据没有准备好的时候,直接返回,不需要等待。有数据的时候,也直接返回
2. 同步和异步
同步和异步都是基于应用程序和操作系统处理IO事件所采用的方式:
同步:应用程序要直接参与IO事件的操作;
异步:所有的IO读写事件交给操作系统去处理;
同步的方式在处理IO事件的时候,必须阻塞在某个方法上面等待我们的IO事件完成(阻塞在IO事件或者通过轮询IO事件的方式);对于异步来说,所有的IO读写都交给了操作系统,这个时候,我们可以去做其他的事情,并不需要去完成真正的IO操作。当操作系统完成IO之后,给我们的应用程序一个通知就可以了。
同步有两种实现模式:
1. 阻塞到IO事件 阻塞到read 或者 write 方法上,这个时候我们就完全不能做自己的事情。(在这种情况下,我们只能把读写方法放置到线程中,然后阻塞线程的方式来实现并发服务,对线程的性能开销比较大)
2. IO事件的轮询 --在linux c语言编程中叫做多路复用技术(select模式)
读写事件交给一个专门的线程来处理,这个线程完成IO事件的注册功能,还有就是不断地去轮询我们的读写缓冲区(操作系统),看是否有数据准备好,然后通知我们的相应的业务处理线程。这样的话,我们的业务处理线程就可以做其他的事情。在这种模式下,阻塞的不是所有的IO线程,而是阻塞的只是select线程
比喻说明:
Client
Selector 管家
BOSS
当客人来的时候,就给管家说,我来了(注册),管家得到这个注册信息后,就给BOSS说,我这里有一个或者多个客人。BOSS就说你去给某人A这件东西(IO数据),给另外一个人B另一件东西。这个时候,客人是可以去做自己的事情(比如看看花园等等),当管家知道BOSS给他任务后,他就会去找对应的某人(根据客人的注册信息),告诉他BOSS给他了某样东西。
# JAVA IO模型
基于以上4中IO模型,JAVA对应的实现有:
BIO--同步阻塞: JDK1.4以前我们使用的都是BIO
阻塞到我们的读写方法,阻塞到线程来提高并发性能,但是效果不是很好
NIO--同步非阻塞:JDK1.4 linux多路复用技术(select模式) 实现IO事件的轮询方式:同步非阻塞的模式,这种方式目前是主流的网络通信模式
mina netty ——网络通信框架,比自己写NIO要容易些,并且代码可读性更好
AIO:JDK1.7(NIO2)真正的异步非阻塞IO(基于linux的epoll模式)
小结:
1)BIO阻塞的IO
2)NIO select多路复用+非阻塞 同步非阻塞
3)AIO异步非阻塞IO
# NIO详解
New IO成功的解决了上述问题,它是怎样解决的呢?
IO处理客户端请求的最小单位是线程
而NIO使用了比线程还小一级的单位:通道(Channel)
可以说,NIO中只需要一个线程就能完成所有接收,读,写等操作
要学习NIO,首先要理解它的三大核心
Selector,选择器
Buffer,缓冲区
Channel,通道
**Buffer**
首先要知道什么是Buffer
在NIO中数据交互不再像IO机制那样使用流
而是使用Buffer(缓冲区)
可以看出Buffer在整个工作流程中的位置
buffer实际上是一个容器,一个连续数组,它通过几个变量来保存这个数据的当前位置状态:
1. capacity:容量,缓冲区能容纳元素的数量
2. position:当前位置,是缓冲区中下一次发生读取和写入操作的索引,当前位置通过大多数读写操作向前推进
3. limit:界限,是缓冲区中最后一个有效位置之后下一个位置的索引
如图:
![](https://box.kancloud.cn/28959b3f44c5a9aa5bf630ca7dc817c8_1442x492.png)
几个常用方法:
~~~
.flip() //将limit设置为position,然后position重置为0,返回对缓冲区的引用
.clear() //清空调用缓冲区并返回对缓冲区的引用
~~~
来点实际点的,上面图中的具体代码如下:
1. 首先给Buffer分配空间,以字节为单位
~~~
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
~~~
创建一个ByteBuffer对象并且指定内存大小
2. 向Buffer中写入数据:
~~~
1).数据从Channel到Buffer:channel.read(byteBuffer);
2).数据从Client到Buffer:byteBuffer.put(...);
~~~
3. 从Buffer中读取数据:
~~~
1).数据从Buffer到Channel:channel.write(byteBuffer);
2).数据从Buffer到Server:byteBuffer.get(...);
~~~
**Selector**
选择器是NIO的核心,它是channel的管理者
通过执行select()阻塞方法,监听是否有channel准备好
一旦有数据可读,此方法的返回值是SelectionKey的数量
所以服务端通常会死循环执行select()方法,直到有channl准备就绪,然后开始工作
每个channel都会和Selector绑定一个事件,然后生成一个SelectionKey的对象
需要注意的是:
channel和Selector绑定时,channel必须是非阻塞模式
而FileChannel不能切换到非阻塞模式,因为它不是套接字通道,所以FileChannel不能和Selector绑定事件
在NIO中一共有四种事件:
1. SelectionKey.OP_CONNECT:连接事件
2. SelectionKey.OP_ACCEPT:接收事件
3. SelectionKey.OP_READ:读事件
4. SelectionKey.OP_WRITE:写事件
**Channel**
共有四种通道:
FileChannel:作用于IO文件流
DatagramChannel:作用于UDP协议
SocketChannel:作用于TCP协议
ServerSocketChannel:作用于TCP协议
本篇文章通过常用的TCP协议来讲解NIO
我们以ServerSocketChannel为例:
打开一个ServerSocketChannel通道
~~~
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
~~~
关闭ServerSocketChannel通道:
~~~
serverSocketChannel.close();
~~~
循环监听SocketChannel:
~~~
while(true){
SocketChannel socketChannel = serverSocketChannel.accept();
clientChannel.configureBlocking(false);
}
~~~
`clientChannel.configureBlocking(false);`语句是将此通道设置为非阻塞,也就是异步
自由控制阻塞或非阻塞便是NIO的特性之一
**SelectionKey**
SelectionKey是通道和选择器交互的核心组件
比如在SocketChannel上绑定一个Selector,并注册为连接事件:
~~~
SocketChannel clientChannel = SocketChannel.open();
clientChannel.configureBlocking(false);
clientChannel.connect(new InetSocketAddress(port));
clientChannel.register(selector, SelectionKey.OP_CONNECT);
~~~
核心在register()方法,它返回一个SelectionKey对象
来检测channel事件是那种事件可以使用以下方法:
~~~
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
~~~
服务端便是通过这些方法 在轮询中执行相对应操作
当然通过Channel与Selector绑定的key也可以反过来拿到他们
~~~
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
~~~
在Channel上注册事件时,我们也可以顺带绑定一个Buffer:
~~~
clientChannel.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocateDirect(1024));
~~~
或者绑定一个Object:
~~~
selectionKey.attach(Object);
Object anthorObj = selectionKey.attachment();
~~~
# NIO代码
**编解码类CharsetHelper**
~~~
package com.nio;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
public class CharsetHelper {
private static final String UTF_8 = "UTF-8";
private static CharsetEncoder encoder = Charset.forName(UTF_8).newEncoder();
private static CharsetDecoder decoder = Charset.forName(UTF_8).newDecoder();
public static ByteBuffer encode(CharBuffer in) throws CharacterCodingException {
return encoder.encode(in);
}
public static CharBuffer decode(ByteBuffer in) throws CharacterCodingException{
return decoder.decode(in);
}
}
~~~
**服务端类NioServer**
~~~
package com.nio;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class NioServer {
//和操作系统交互的缓存
private ByteBuffer readBuffer;
//轮询器
private Selector selector;
public static void main(String[] args) {
NioServer server = new NioServer();
server.init();
System.out.println("server started --> 8383");
server.listen();
}
private void init() {
//分配个缓存
readBuffer = ByteBuffer.allocate(1024);
ServerSocketChannel serverSocketChannel;
try {
//创建一个socket channel; channel是nio中对通信通道的抽象,不分入站出站方向
serverSocketChannel = ServerSocketChannel.open();
//设置通道为非阻塞的方式
serverSocketChannel.configureBlocking(false);
//将通道绑定在服务器的ip地址和某个端口上
serverSocketChannel.socket().bind(new InetSocketAddress(8383));
//打开一个多路复用器
selector = Selector.open();
//将上面创建好的socket channel注册到selector多路复用器上
//对于复用端来说,一定要先注册一个OP_ACCEPT事件用来响应客户端的连接请求
//将上述的通道管理器和通道绑定,并为该通道注册OP_ACCEPT事件
//注册事件后,当该事件到达时,selector.select()会返回(一个key),如果该事件没到达selector.select()会一直阻塞
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}catch (Exception e) {
e.printStackTrace();
}
}
private void listen() {
while(true) {
try {
//去询问一次selector选择器
//这是一个阻塞方法,一直等待直到有数据可读,返回值是key的数量(可以有多个)
selector.select();
//拿到事件的key
//如果channel有数据了,将生成的key访入keys集合中,得到这个keys集合的迭代器
Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
//使用迭代器遍历集合
while (ite.hasNext()) {
//遍历到一个事件key,得到集合中的一个key实例
SelectionKey key = ite.next();
//确保不重复处理
//拿到当前key实例之后记得在迭代器中将这个元素删除,非常重要,否则会出错
ite.remove();
//处理事件
handlekey(key);
}
}catch (Exception e) {
e.printStackTrace();
}
}
}
//处理事件
private void handlekey(SelectionKey key) {
SocketChannel channel = null;
try {
//这个key是可接受连接的吗
if (key.isAcceptable()) {
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
//接收连接请求
channel = serverChannel.accept();
channel.configureBlocking(false);
//对读的事件感兴趣
channel.register(selector,SelectionKey.OP_READ);
//这个事件是可读的吗
}else if(key.isReadable()) {
channel = (SocketChannel) key.channel();
//先清空一下buffer,因为要用,以防是老的数据
readBuffer.clear();
/**
* 当客户端channel关闭后,会不断收到read事件,但没有消息,即read方法返回-1
* 所以这时服务器端也需要关闭channel,避免无限无效的处理
* 把消息read到readBuffer中
*/
int count = channel.read(readBuffer);
if (count > 0) {
// 一定需要调用flip函数,否则读取错误数据
// 简单来说,flip操作就是让读写指针、limit指针复位到正确的位置
readBuffer.flip();
/*
* 使用CharBuffer配合取出正确的数据;
* String question = new String(readBuffer.array());可能会出错,
* 因为前面readBuffer.clear();并未真正清理数据 只是重置缓冲区的position,
* limit, mark, 而readBuffer.array()会返回整个缓冲区的内容。
* decode方法只取readBuffer的position到limit数据。
* 例如,上一次读取到缓冲区的是"where", clear后position为0,limit为 1024,
* 再次读取“bye"到缓冲区后,position为3,limit不变,
* flip后position为0,limit为3,前三个字符被覆盖了,但"re"还存在缓冲区中, 所以 new
* String(readBuffer.array()) 返回 "byere",
* 而decode(readBuffer)返回"bye"。
*/
CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
String question = charBuffer.toString();
// 根据客户端的请求,调用相应的业务方法获取业务结果
String answer = getAnswer(question);
channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));
} else {
// 这里关闭channel,因为客户端已经关闭channel或者异常了
channel.close();
}
}
}catch (Exception e) {
e.printStackTrace();
}
}
private String getAnswer(String question) {
String answer = null;
if ("who".equals(question)) {
answer = "我是凤姐\n";
} else if ("what".equals(question)) {
answer = "我是来帮你解闷的\n";
} else if ("where".equals(question)) {
answer = "我来自外太空\n";
} else if ("hi".equals(question)) {
answer = "hello\n";
} else if ("bye".equals(question)) {
answer = "88\n";
} else {
answer = "请输入 who, 或者what, 或者where";
}
return answer;
}
}
~~~
**客户端NioClient**
~~~
package com.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class NioClient implements Runnable{
private BlockingQueue<String> words;
private Random random;
public static void main(String[] args) {
// 多个线程发起Socket客户端连接请求
for(int i=0; i<10; i++){
NioClient c = new NioClient();
c.init();
new Thread(c).start();
}
}
@Override
public void run() {
SocketChannel channel = null;
Selector selector = null;
try {
channel = SocketChannel.open();
channel.configureBlocking(false);
// 主动请求连接
channel.connect(new InetSocketAddress("localhost", 8383));
selector = Selector.open();
channel.register(selector, SelectionKey.OP_CONNECT);
boolean isOver = false;
while(! isOver){
selector.select();
Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
while(ite.hasNext()){
SelectionKey key = ite.next();
ite.remove();
if(key.isConnectable()){
if(channel.isConnectionPending()){
if(channel.finishConnect()){
//只有当连接成功后才能注册OP_READ事件
key.interestOps(SelectionKey.OP_READ);
channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));
sleep();
}
else{
key.cancel();
}
}
}
else if(key.isReadable()){
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
channel.read(byteBuffer);
byteBuffer.flip();
CharBuffer charBuffer = CharsetHelper.decode(byteBuffer);
String answer = charBuffer.toString();
System.out.println(Thread.currentThread().getId() + "---" + answer);
String word = getWord();
if(word != null){
channel.write(CharsetHelper.encode(CharBuffer.wrap(word)));
}
else{
isOver = true;
}
sleep();
}
}
}
} catch (IOException e) {
// TODO
}
finally{
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(selector != null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private void init() {
words = new ArrayBlockingQueue<String>(5);
try {
words.put("hi");
words.put("who");
words.put("what");
words.put("where");
words.put("bye");
} catch (InterruptedException e) {
e.printStackTrace();
}
random = new Random();
}
private String getWord(){
return words.poll();
}
private void sleep() {
try {
TimeUnit.SECONDS.sleep(random.nextInt(3));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void sleep(long l) {
try {
TimeUnit.SECONDS.sleep(l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
~~~
- linux
- 常用命令
- 高级文本命令
- 面试题
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推荐
- java高级特性
- 多线程
- 实现线程的三种方式
- 同步关键词
- 读写锁
- 锁的相关概念
- 多线程的join
- 有三个线程T1 T2 T3,保证顺序执行
- java五种线程池
- 守护线程与普通线程
- ThreadLocal
- BlockingQueue消息队列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty简介
- 案例一发送字符串
- 案例二发送对象
- 轻量级RPC开发
- 简介
- spring(IOC/AOP)
- spring初始化顺序
- 通过ApplicationContextAware加载Spring上下文
- InitializingBean的作用
- 结论
- 自定义注解
- zk在框架中的应用
- hadoop
- 简介
- hadoop集群搭建
- hadoop单机安装
- HDFS简介
- hdfs基本操作
- hdfs环境搭建
- 常见问题汇总
- hdfs客户端操作
- mapreduce工作机制
- 案列-单词统计
- 局部聚合Combiner
- 案列-流量统计(分区,排序,比较)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法实现
- 案例-求topN(分组)
- 自定义inputFormat
- 自定义outputFormat
- 框架运算全流程
- mapreduce的优化方案
- HA机制
- Hive
- 安装
- DDL操作
- 创建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 严格模式
- 数据类型
- shell参数
- 函数
- 内置运算符
- 内置函数
- 自定义函数
- Transform实现
- 特殊分割符处理
- 案例
- 级联求和accumulate
- flume
- 简介
- 安装
- 常用的组件
- 拦截器
- 案例
- 采集目录到HDFS
- 采集文件到HDFS
- 多个agent串联
- 日志采集和汇总
- 自定义拦截器
- 高可用配置
- 使用注意
- sqoop
- 安装
- 数据导入
- 导入数据到HDFS
- 导入关系表到HIVE
- 导入表数据子集
- 增量导入
- 数据导出
- 作业
- 原理
- azkaban
- 简介
- 安装
- 案例
- 简介
- command类型单一job
- command类型多job工作流flow
- HDFS操作任务
- mapreduce任务
- hive脚本任务
- hbase
- 简介
- 安装
- 命令行
- 基本CURD
- 过滤器查询
- 系统架构
- 物理存储
- 寻址机制
- 读写过程
- Region管理
- master工作机制
- 建表高级属性
- 与mapreduce结合
- 协处理器
- 点击流平台开发
- 简介
- storm
- 简介
- 安装
- 集群启动及任务过程分析
- 单词统计
- 并行度
- ACK容错机制
- ACK简介