# Java NIO之选择器
## 1.简介
前面的文章说了缓冲区,说了通道,本文就来说说 NIO 中另一个重要的实现,即选择器 Selector。在更早的[文章](http://www.coolblog.xyz/2018/02/08/IO%E6%A8%A1%E5%9E%8B%E7%AE%80%E8%BF%B0/)中,我简述了几种 IO 模型。如果大家看过之前的文章,并动手写过代码的话。再看 Java 的选择器大概就会知道它是什么了,以及怎么用了。选择器是 Java 多路复用模型的一个实现,可以同时监控多个非阻塞套接字通道。示意图大致如下:
![](https://blog-pictures.oss-cn-shanghai.aliyuncs.com/15221629315617.jpg)
如果大家了解过多路复用模型,那应该也会知道几种复用模型的实现。比如 select,poll 以及 Linux 下的 epoll 和 BSD 下的 kqueue。Java 的选择器并非凭空创造,而是在底层操作系统提供的接口的基础上封装而来。相关的细节,我随后会进行分析。
关于 Java 选择器的简介这里先说到这,接下来进入正题。
## [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#2基本操作及实现)2.基本操作及实现
本章我将对 Selector 的创建,通道的注册,Selector 的选择过程进行分析。内容篇幅较大,希望大家耐心看完。由于 Selector 相关类在不同操作系统下的实现是不同的,加之个人对 Linux epoll 更为熟悉,所以本文所分析的源码也是和 epoll 相关的。好了,进入正题吧。
### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#21-创建选择器)2.1 创建选择器
选择器 Selector 是一个抽象类,所以不能直接创建。Selector 提供了一个 open 方法,通过 open 方法既可以创建选择器实例。示例代码如下:
1
Selector selector = Selector.open();
上面的代码比较简单,只有一行。不过不要被表象迷惑,这行代码仅是完整实现的冰山一角,更复杂的逻辑则隐藏在水面之下。
在简介一节,我已经说了 Java 选择器是对底层多路复用接口的一个包装,这里的 open 方法也不例外。假设我们的 Java 运行在 Linux 平台下,那么 open 最终所做的事情应该是调用操作系统的`epoll_create`函数,用于创建 epoll 实例。真实情况是不是如此呢?答案就在冰山深处,接下来就让我们一起去求索吧。下面我们将沿着 open 方法一路走下去,如下:
```
public abstract class Selector implements Closeable {
public static Selector open() throws IOException {
// 创建 SelectorProvider,再通过其 openSelector 方法创建 Selector
return SelectorProvider.provider().openSelector();
}
// 省略无关代码
}
public abstract class SelectorProvider {
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
// 创建默认的 SelectorProvider
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
}
public class DefaultSelectorProvider {
private DefaultSelectorProvider() { }
/**
* 根据系统名称创建相应的 SelectorProvider
*/
public static SelectorProvider create() {
String osname = AccessController
.doPrivileged(new GetPropertyAction("os.name"));
if (osname.equals("SunOS"))
return createProvider("sun.nio.ch.DevPollSelectorProvider");
if (osname.equals("Linux"))
return createProvider("sun.nio.ch.EPollSelectorProvider");
//
return new sun.nio.ch.PollSelectorProvider();
}
/**
* 加载 SelectorProvider 类,并创建实例
*/
@SuppressWarnings("unchecked")
private static SelectorProvider createProvider(String cn) {
Class<SelectorProvider> c;
try {
c = (Class<SelectorProvider>)Class.forName(cn);
} catch (ClassNotFoundException x) {
throw new AssertionError(x);
}
try {
return c.newInstance();
} catch (IllegalAccessException | InstantiationException x) {
throw new AssertionError(x);
}
}
}
/**
* 创建完 SelectorProvider,接下来要调用 openSelector 方法
* 创建 Selector 的继承类了。
*/
public class EPollSelectorProvider extends SelectorProviderImpl {
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
}
class EPollSelectorImpl extends SelectorImpl {
EPollSelectorImpl(SelectorProvider sp) throws IOException {
// 调用父类构造方法
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
// 创建 EPollArrayWrapper,EPollArrayWrapper 是一个重要的实现
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
}
}
public abstract class SelectorImpl extends AbstractSelector {
protected SelectorImpl(SelectorProvider sp) {
super(sp);
keys = new HashSet<SelectionKey>();
selectedKeys = new HashSet<SelectionKey>();
/* 初始化 publicKeys 和 publicSelectedKeys,
* publicKeys 即 selector.keys() 方法所返回的集合,
* publicSelectedKeys 则是 selector.selectedKeys() 方法返回的集合
*/
if (Util.atBugLevel("1.4")) {
publicKeys = keys;
publicSelectedKeys = selectedKeys;
} else {
publicKeys = Collections.unmodifiableSet(keys);
publicSelectedKeys = Util.ungrowableSet(selectedKeys);
}
}
}
/**
* EPollArrayWrapper 一个重要的实现,这一层再往下就是 C 代码了
*/
class EPollArrayWrapper {
EPollArrayWrapper() throws IOException {
// 调用 epollCreate 方法创建 epoll 文件描述符
epfd = epollCreate();
// the epoll_event array passed to epoll_wait
// 初始化 pollArray,该对象用于存储就绪文件描述符和事件
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
// eventHigh needed when using file descriptors > 64k
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}
// epollCreate 方法是 native 类型的
private native int epollCreate();
}
```
以上代码时 Java 层面的,Java 层调用栈最下面的类是 EPollArrayWrapper(源码路径可以在附录中查找)。EPollArrayWrapper 是一个重要的实现,起着承上启下的作用。上层是 Java 代码,下层是 C 代码。上层的代码看完了,接下来看看冰山深处的 C 代码:
```
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
// 调用 epoll_create 函数创建 epoll 实例,并返回文件描述符 epfd
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
```
上面的代码很简单,仅做了创建 epoll 实例这一件事。看到这里,答案就明了了。最后在附一张时序图帮助大家理清代码调用顺序,如下:
![Selector.open](https://blog-pictures.oss-cn-shanghai.aliyuncs.com/Selector.open.png)
### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#22-选择键)2.2 选择键
#### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#221-几种事件)2.2.1 几种事件
选择键 SelectionKey 包含4种事件,分别是:
```
public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;
```
事件之间可以通过或运算进行组合,比如:
```
int interestOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
```
#### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#222-两种事件集合interestops-和-readyops)2.2.2 两种事件集合:interestOps 和 readyOps
interestOps 即感兴趣的事件集合,通道调用 register 方法注册时会设置此值,interestOps 可通过 SelectionKey interestOps() 方法获取。readyOps 是就绪事件集合,可通过 SelectionKey readyOps() 获取。
interestOps 和 readyOps 被声明在 SelectionKey 子类 SelectionKeyImpl 中,代码如下:
```
public class SelectionKeyImpl extends AbstractSelectionKey {
private volatile int interestOps;
private int readyOps;
}
```
接下来再来看看与 readyOps 事件集合相关的几个方法,如下:
```
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
```
以上方法从字面意思上就可以知道有什么用,这里就不解释了。接下来以 isReadable 方法为例,简单看一下这个方法是如何实现。
```
public final boolean isReadable() {
return (readyOps() & OP_READ) != 0;
}
```
上面说到可以通过或运算组合事件,这里则是通过与运算来测试某个事件是否在事件集合中。比如
```
readyOps = SelectionKey.OP_READ | SelectionKey.OP_WRITE = 0101,
readyOps & OP_READ = 0101 & 0001 = 0001,
readyOps & OP_CONNECT = 0101 & 1000 = 0
```
`readyOps & OP_READ != 0`,所以 OP\_READ 在事件集合中。`readyOps & OP_CONNECT == 0`,所以 OP\_CONNECT 不在事件集合中。
#### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#223-attach-方法)2.2.3 attach 方法
attach 是一个好用的方法,通过这个方法,可以将对象暂存在 SelectionKey 中,待需要的时候直接取出来即可。比如本文对应的练习代码实现了一个简单的 HTTP 服务器,在读取用户请求数据后(即 selectionKey.isReadable() 为 true),会去解析请求头,然后将请求头信息通过 attach 方法放入 selectionKey 中。待通道可写后,再从 selectionKey 中取出请求头,并根据请求头回复客户端不同的消息。当然,这只是一个应用场景,attach 可能还有其他的应用场景,比如标识通道。不过其他的场景我没使用过,就不说了。attach 使用方式如下:
```
selectionKey.attach(obj);
Object attachedObj = selectionKey.attachment();
```
### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#23-通道注册)2.3 通道注册
通道注册即将感兴趣的事件告知 Selector,待事件发生时,Selector 即可返回就绪事件,我们就可以去做后续的事情了。比如 ServerSocketChannel 通道通常对 OP\_ACCEPT 事件感兴趣,那么我们就可以把这个事件注册给 Selector。待事件发生,即服务端接受客户端连接后,我们即可获取这个就绪的事件并做相应的操作。通道注册的示例代码如下:
```
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
```
起初我以为通道注册操作会调用操作系统的 epoll\_ctl 函数,但最终通过看源码,发现自己的理解是错的。既然通道注册阶段不调用 epoll\_ctl 函数。那么,epoll\_ctl 什么时候才会被调用呢?如果不调用 epoll\_ctl,那么注册过程都干了什么事情呢?关于第一个问题,本节还无法解答,不过第二个问题则可以说说。接下来让我们深入通道类 register 方法的调用栈中去探寻答案吧。
```
public abstract class SelectableChannel extends AbstractInterruptibleChannel implements Channel {
public final SelectionKey register(Selector sel, int ops) throws ClosedChannelException {
return register(sel, ops, null);
}
public abstract SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException;
}
public abstract class AbstractSelectableChannel extends SelectableChannel {
private SelectionKey[] keys = null;
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException {
synchronized (regLock) {
// 省去一些校验代码
// 从 keys 数组中查找,查找条件为 k.selector() == sel
SelectionKey k = findKey(sel);
// 如果 k 不为空,则修改 k 所感兴趣的事件
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
// k 为空,则创建一个 SelectionKey,并存储到 keys 数组中
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
}
public abstract class AbstractSelector extends Selector {
protected abstract SelectionKey register(AbstractSelectableChannel ch,
int ops, Object att);
}
public abstract class SelectorImpl extends AbstractSelector {
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) {
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
// 创建 SelectionKeyImpl 实例
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k);
}
k.interestOps(ops);
return k;
}
}
class EPollSelectorImpl extends SelectorImpl {
protected void implRegister(SelectionKeyImpl ski) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
int fd = Integer.valueOf(ch.getFDVal());
// 存储 fd 和 SelectionKeyImpl 的映射关系
fdToKey.put(fd, ski);
pollWrapper.add(fd);
// 将 SelectionKeyImpl 实例存储到 keys 中(这里的 keys 声明在 SelectorImpl 类中),keys 集合可由 selector.keys() 方法获取
keys.add(ski);
}
}
public class SelectionKeyImpl extends AbstractSelectionKey {
public SelectionKey interestOps(int ops) {
ensureValid();
return nioInterestOps(ops);
}
public SelectionKey nioInterestOps(int ops) {
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
// 转换并设置感兴趣的事件
channel.translateAndSetInterestOps(ops, this);
// 设置 interestOps 变量
interestOps = ops;
return this;
}
}
class SocketChannelImpl extends SocketChannel implements SelChImpl {
public void translateAndSetInterestOps(int ops, SelectionKeyImpl sk) {
int newOps = 0;
// 转换事件
if ((ops & SelectionKey.OP_READ) != 0)
newOps |= PollArrayWrapper.POLLIN;
if ((ops & SelectionKey.OP_WRITE) != 0)
newOps |= PollArrayWrapper.POLLOUT;
if ((ops & SelectionKey.OP_CONNECT) != 0)
newOps |= PollArrayWrapper.POLLCONN;
// 设置事件
sk.selector.putEventOps(sk, newOps);
}
}
class class EPollSelectorImpl extends SelectorImpl {
public void putEventOps(SelectionKeyImpl ski, int ops) {
if (closed)
throw new ClosedSelectorException();
SelChImpl ch = ski.channel;
// 设置感兴趣的事件
pollWrapper.setInterest(ch.getFDVal(), ops);
}
}
class EPollArrayWrapper {
void setInterest(int fd, int mask) {
synchronized (updateLock) {
// 扩容 updateDescriptors 数组,并存储文件描述符 fd
int oldCapacity = updateDescriptors.length;
if (updateCount == oldCapacity) {
int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;
int[] newDescriptors = new int[newCapacity];
System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);
updateDescriptors = newDescriptors;
}
updateDescriptors[updateCount++] = fd;
// events are stored as bytes for efficiency reasons
byte b = (byte)mask;
assert (b == mask) && (b != KILLED);
// 存储事件
setUpdateEvents(fd, b, false);
}
}
private void setUpdateEvents(int fd, byte events, boolean force) {
if (fd < MAX_UPDATE_ARRAY_SIZE) {
if ((eventsLow[fd] != KILLED) || force) {
eventsLow[fd] = events;
}
} else {
Integer key = Integer.valueOf(fd);
if (!isEventsHighKilled(key) || force) {
eventsHigh.put(key, Byte.valueOf(events));
}
}
}
}
```
到 setUpdateEvents 这个方法,整个调用栈就结束了。但是我们并未在调用栈中看到调用 epoll\_ctl 函数的地方,也就是说,通道注册时,并不会立即调用 epoll\_ctl,而是先将事件集合 events 存放在 eventsLow。至于 epoll\_ctl 函数何时调用的,需要大家继续往下看了。
### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#24-选择过程)2.4 选择过程
#### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#241-选择方法)2.4.1 选择方法
Selector 包含3种不同功能的选择方法,分别如下:
* int select()
* int select(long timeout)
* int selectNow()
select() 是一个阻塞方法,仅在至少一个通道处于就绪状态时才返回。
select(long timeout) 同样也是阻塞方法,不过可对该方法设置超时时间(timeout > 0),使得线程不会被一直阻塞。如果 timeout = 0,会一直阻塞线程。
selectNow() 为非阻塞方法,调用后立即返回。
以上3个方法均返回 int 类型值,表示每次调用 select 或 selectNow 方法后,新就绪通道的数量。如果某个通道在上一次调用 select 方法时就已经处于就绪状态,但并未将该通道对应的 SelectionKey 对象从 selectedKeys 集合中移除。假设另一个的通道在本次调用 select 期间处于就绪状态,此时,select 返回1,而不是2。
#### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#242-选择过程)2.4.2 选择过程
选择方法用起来虽然简单,但方法之下隐藏的逻辑还是比较复杂的。大致分为下面几个步骤:
1. 检查已取消键集合 cancelledKeys 是否为空,不为空则将 cancelledKeys 的键从 keys 和 selectedKeys 中移除,并将键和通道注销。
2. 调用操作系统的 epoll\_ctl 函数将通道感兴趣的事件注册到 epoll 实例中
3. 调用操作系统的 epoll\_wait 函数监听事件
4. 再次执行步骤1
5. 更新 selectedKeys 集合,并返回就绪通道数量
上面五个步骤对应于 EPollSelectorImpl 类中 doSelect 方法的逻辑,如下:
```
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
// 处理已取消键集合,对应步骤1
processDeregisterQueue();
try {
begin();
// select 方法的核心,对应步骤2和3
pollWrapper.poll(timeout);
} finally {
end();
}
// 处理已取消键集合,对应步骤4
processDeregisterQueue();
// 更新 selectedKeys 集合,并返回就绪通道数量,对应步骤5
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
```
接下来,我们按照上面的步骤顺序去分析代码实现。先来看看步骤1对应的代码:
+----SelectorImpl.java
```
void processDeregisterQueue() throws IOException {
// Precondition: Synchronized on this, keys, and selectedKeys
Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) {
if (!cks.isEmpty()) {
Iterator<SelectionKey> i = cks.iterator();
// 遍历 cancelledKeys,执行注销操作
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
try {
// 执行注销逻辑
implDereg(ski);
} catch (SocketException se) {
throw new IOException("Error deregistering key", se);
} finally {
i.remove();
}
}
}
}
}
```
+----EPollSelectorImpl.java
```
protected void implDereg(SelectionKeyImpl ski) throws IOException {
assert (ski.getIndex() >= 0);
SelChImpl ch = ski.channel;
int fd = ch.getFDVal();
// 移除 fd 和选择键键的映射关系
fdToKey.remove(Integer.valueOf(fd));
// 从 epoll 实例中删除事件
pollWrapper.remove(fd);
ski.setIndex(-1);
// 从 keys 和 selectedKeys 中移除选择键
keys.remove(ski);
selectedKeys.remove(ski);
// 注销选择键
deregister((AbstractSelectionKey)ski);
// 注销通道
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
}
```
上面的代码代码逻辑不是很复杂,首先是获取 cancelledKeys 集合,然后遍历集合,并对每个选择键及其对应的通道执行注销操作。接下来再来看看步骤2和3对应的代码,如下:
+----EPollArrayWrapper.java
```
int poll(long timeout) throws IOException {
// 调用 epoll_ctl 函数注册事件,对应步骤3
updateRegistrations();
// 调用 epoll_wait 函数等待事件发生,对应步骤4
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
/**
* Update the pending registrations.
*/
private void updateRegistrations() {
synchronized (updateLock) {
int j = 0;
while (j < updateCount) {
// 获取 fd 和 events,这两个值在调用 register 方法时被存储到数组中
int fd = updateDescriptors[j];
short events = getUpdateEvents(fd);
boolean isRegistered = registered.get(fd);
int opcode = 0;
if (events != KILLED) {
// 确定 opcode 的值
if (isRegistered) {
opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
} else {
opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
}
if (opcode != 0) {
// 注册事件
epollCtl(epfd, opcode, fd, events);
// 设置 fd 的注册状态
if (opcode == EPOLL_CTL_ADD) {
registered.set(fd);
} else if (opcode == EPOLL_CTL_DEL) {
registered.clear(fd);
}
}
}
j++;
}
updateCount = 0;
}
// 下面两个均是 native 方法
private native void epollCtl(int epfd, int opcode, int fd, int events);
private native int epollWait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;
}
```
看到 updateRegistrations 方法的实现,大家现在知道 epoll\_ctl 这个函数是在哪里调用的了。在 3.2 节通道注册的结尾给大家埋了一个疑问,这里就是答案了。注册通道实际上只是先将事件收集起来,等调用 select 方法时,在一起通过 epoll\_ctl 函数将事件注册到 epoll 实例中。
上面 epollCtl 和 epollWait 方法是 native 类型的,接下来我们再来看看这两个方法是如何实现的。如下:
+----EPollArrayWrapper.c
```
JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd, jint opcode, jint fd, jint events) {
struct epoll_event event;
int res;
event.events = events;
event.data.fd = fd;
// 调用 epoll_ctl 注册事件
RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
}
}
JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd) {
struct epoll_event *events = jlong_to_ptr(address);
int res;
if (timeout <= 0) { /* Indefinite or no wait */
// 调用 epoll_wait 等待事件
RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
} else { /* Bounded wait; bounded restarts */
res = iepoll(epfd, events, numfds, timeout);
}
if (res < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
}
return res;
}
```
上面的C代码没什么复杂的逻辑,这里就不多说了。如果大家对 epoll\_ctl 和 epoll\_wait 函数不了解,可以参考 Linux man-page。关于 epoll 的示例,也可以参考我的另一篇文章[“基于epoll实现简单的web服务器”](http://www.coolblog.xyz/2018/03/02/%E5%9F%BA%E4%BA%8Eepoll%E5%AE%9E%E7%8E%B0%E7%AE%80%E5%8D%95%E7%9A%84web%E6%9C%8D%E5%8A%A1%E5%99%A8/)。
说完步骤2和3对应的代码,接下来再来说说步骤4和5。由于步骤4和步骤1是一样的,这里不再赘述。最后再来说说步骤5的逻辑。代码如下:
+----EPollSelectorImpl.java
```
private int updateSelectedKeys() {
int entries = pollWrapper.updated;
int numKeysUpdated = 0;
for (int i=0; i<entries; i++) {
/* 从 pollWrapper 成员变量的 pollArray 中获取文件描述符,
* pollArray 中的数据由 epoll_wait 设置
*/
int nextFD = pollWrapper.getDescriptor(i);
SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
// ski is null in the case of an interrupt
if (ski != null) {
// 从 pollArray 中获取就绪事件集合
int rOps = pollWrapper.getEventOps(i);
/* 如果 selectedKeys 已包含选择键,则选择键必须由新的事件发生时,
* 才会将 numKeysUpdated + 1
*/
if (selectedKeys.contains(ski)) {
if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
numKeysUpdated++;
}
} else {
// 转换并设置就绪事件集合
ski.channel.translateAndSetReadyOps(rOps, ski);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
// 更新 selectedKeys 集合,并将 numKeysUpdated + 1
selectedKeys.add(ski);
numKeysUpdated++;
}
}
}
}
// 返回 numKeysUpdated
return numKeysUpdated;
}
```
+----SocketChannelImpl.java
```
public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl sk) {
int intOps = sk.nioInterestOps(); // Do this just once, it synchronizes
int oldOps = sk.nioReadyOps();
int newOps = initialOps;
if ((ops & PollArrayWrapper.POLLNVAL) != 0) {
return false;
}
if ((ops & (PollArrayWrapper.POLLERR
| PollArrayWrapper.POLLHUP)) != 0) {
newOps = intOps;
sk.nioReadyOps(newOps);
// No need to poll again in checkConnect,
// the error will be detected there
readyToConnect = true;
return (newOps & ~oldOps) != 0;
}
/*
* 转换事件
*/
if (((ops & PollArrayWrapper.POLLIN) != 0) &&
((intOps & SelectionKey.OP_READ) != 0) &&
(state == ST_CONNECTED))
newOps |= SelectionKey.OP_READ;
if (((ops & PollArrayWrapper.POLLCONN) != 0) &&
((intOps & SelectionKey.OP_CONNECT) != 0) &&
((state == ST_UNCONNECTED) || (state == ST_PENDING))) {
newOps |= SelectionKey.OP_CONNECT;
readyToConnect = true;
}
if (((ops & PollArrayWrapper.POLLOUT) != 0) &&
((intOps & SelectionKey.OP_WRITE) != 0) &&
(state == ST_CONNECTED))
newOps |= SelectionKey.OP_WRITE;
// 设置事件
sk.nioReadyOps(newOps);
// 如果新的就绪事件和老的就绪事件不相同,则返回true,否则返回 false
return (newOps & ~oldOps) != 0;
}
```
上面就是步骤5的逻辑了,简单总结一下。首先是获取就绪通道数量,然后再获取这些就绪通道对应的文件描述符 fd,以及就绪事件集合 rOps。之后调用 translateAndSetReadyOps 转换并设置就绪事件集合。最后,将选择键添加到 selectedKeys 集合中,并累加 numKeysUpdated 值,之后返回该值。
以上就是选择过程的代码讲解,贴了不少代码,可能不太好理解。Java NIO 和操作系统接口关联比较大,所以在学习 NIO 相关原理时,也应该去了解诸如 epoll 等系统调用的知识。没有这些背景知识,很多东西看起来不太好懂。好了,本节到此结束。
### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#25-模板代码)2.5 模板代码
使用 NIO 选择器编程时,主干代码的结构一般比较固定。所以把主干代码写好后,就可以往里填业务代码了。下面贴一个服务端的模板代码,如下:
```
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("localhost", 8080));
ssc.configureBlocking(false);
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
int readyNum = selector.select();
if (readyNum == 0) {
continue;
}
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while(it.hasNext()) {
SelectionKey key = it.next();
if(key.isAcceptable()) {
// 接受连接
} else if (key.isReadable()) {
// 通道可读
} else if (key.isWritable()) {
// 通道可写
}
it.remove();
}
}
```
### [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#26-实例演示)2.6 实例演示
原本打算将示例演示的代码放在本节中展示,奈何文章篇幅已经很大了,所以决定把本节的内容独立成文。在[下一篇文章](http://www.coolblog.xyz/2018/04/04/%E5%9F%BA%E4%BA%8E-Java-NIO-%E5%AE%9E%E7%8E%B0%E7%AE%80%E5%8D%95%E7%9A%84-HTTP-%E6%9C%8D%E5%8A%A1%E5%99%A8/)中,我将会演示使用 Java NIO 完成一个简单的 HTTP 服务器。这里先贴张效果图,如下:
![tinyhttpd_w](https://blog-pictures.oss-cn-shanghai.aliyuncs.com/tinyhttpd1_wm.gif)
## [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#3总结)3.总结
到这里,本文差不多就要结束了。原本只是打算简单说说 Selector 的用法,然后再写一份实例代码。但是后来发现这样写显得比较空洞,没什么深度。所以后来翻了一下 Selector 的源码,大致理解了 Selector 的逻辑,然后就有了上面的分析。不过 Selector 的逻辑并不止我上面所说的那些,还有一些内容我现在还没看,所以就没有讲。对于已写出来的分析,由于我个人水平有限,难免会有错误。如果有错误,也欢迎大家指出来,共同进步!
好了,本文到此结束,感谢大家的阅读。
## [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#参考)参考
* [Java NIO Selector - jenkov.com](http://tutorials.jenkov.com/java-nio/selectors.html#pageToc)
* [Java NIO(6): Selector - 知乎](https://zhuanlan.zhihu.com/p/27434028)
## [](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/#附录)附录
文中贴的一些代码是没有包含在 JDK src.zip 包里的,这里单独列举出来,方便大家查找。
| 文件名 | 路径 |
| --- | --- |
| DefaultSelectorProvider.java | jdk/src/solaris/classes/sun/nio/ch/DefaultSelectorProvider.java |
| EPollSelectorProvider.java | jdk/src/solaris/classes/sun/nio/ch/EPollSelectorProvider.java |
| SelectorImpl.java | jdk/src/share/classes/sun/nio/ch/SelectorImpl.java |
| EPollSelectorImpl.java | jdk/src/solaris/classes/sun/nio/ch/EPollSelectorImpl.java |
| EPollArrayWrapper.java | jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java |
| SelectionKeyImpl.java | jdk/src/share/classes/sun/nio/ch/SelectionKeyImpl.java |
| SocketChannelImpl.java | jdk/src/share/classes/sun/nio/ch/SocketChannelImpl.java |
| EPollArrayWrapper.c | jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c |
转载自 :[http://www.tianxiaobo.com/2018/04/03/Java-NIO之选择器/](http://www.tianxiaobo.com/2018/04/03/Java-NIO%E4%B9%8B%E9%80%89%E6%8B%A9%E5%99%A8/)
- 一.JVM
- 1.1 java代码是怎么运行的
- 1.2 JVM的内存区域
- 1.3 JVM运行时内存
- 1.4 JVM内存分配策略
- 1.5 JVM类加载机制与对象的生命周期
- 1.6 常用的垃圾回收算法
- 1.7 JVM垃圾收集器
- 1.8 CMS垃圾收集器
- 1.9 G1垃圾收集器
- 2.面试相关文章
- 2.1 可能是把Java内存区域讲得最清楚的一篇文章
- 2.0 GC调优参数
- 2.1GC排查系列
- 2.2 内存泄漏和内存溢出
- 2.2.3 深入理解JVM-hotspot虚拟机对象探秘
- 1.10 并发的可达性分析相关问题
- 二.Java集合架构
- 1.ArrayList深入源码分析
- 2.Vector深入源码分析
- 3.LinkedList深入源码分析
- 4.HashMap深入源码分析
- 5.ConcurrentHashMap深入源码分析
- 6.HashSet,LinkedHashSet 和 LinkedHashMap
- 7.容器中的设计模式
- 8.集合架构之面试指南
- 9.TreeSet和TreeMap
- 三.Java基础
- 1.基础概念
- 1.1 Java程序初始化的顺序是怎么样的
- 1.2 Java和C++的区别
- 1.3 反射
- 1.4 注解
- 1.5 泛型
- 1.6 字节与字符的区别以及访问修饰符
- 1.7 深拷贝与浅拷贝
- 1.8 字符串常量池
- 2.面向对象
- 3.关键字
- 4.基本数据类型与运算
- 5.字符串与数组
- 6.异常处理
- 7.Object 通用方法
- 8.Java8
- 8.1 Java 8 Tutorial
- 8.2 Java 8 数据流(Stream)
- 8.3 Java 8 并发教程:线程和执行器
- 8.4 Java 8 并发教程:同步和锁
- 8.5 Java 8 并发教程:原子变量和 ConcurrentMap
- 8.6 Java 8 API 示例:字符串、数值、算术和文件
- 8.7 在 Java 8 中避免 Null 检查
- 8.8 使用 Intellij IDEA 解决 Java 8 的数据流问题
- 四.Java 并发编程
- 1.线程的实现/创建
- 2.线程生命周期/状态转换
- 3.线程池
- 4.线程中的协作、中断
- 5.Java锁
- 5.1 乐观锁、悲观锁和自旋锁
- 5.2 Synchronized
- 5.3 ReentrantLock
- 5.4 公平锁和非公平锁
- 5.3.1 说说ReentrantLock的实现原理,以及ReentrantLock的核心源码是如何实现的?
- 5.5 锁优化和升级
- 6.多线程的上下文切换
- 7.死锁的产生和解决
- 8.J.U.C(java.util.concurrent)
- 0.简化版(快速复习用)
- 9.锁优化
- 10.Java 内存模型(JMM)
- 11.ThreadLocal详解
- 12 CAS
- 13.AQS
- 0.ArrayBlockingQueue和LinkedBlockingQueue的实现原理
- 1.DelayQueue的实现原理
- 14.Thread.join()实现原理
- 15.PriorityQueue 的特性和原理
- 16.CyclicBarrier的实际使用场景
- 五.Java I/O NIO
- 1.I/O模型简述
- 2.Java NIO之缓冲区
- 3.JAVA NIO之文件通道
- 4.Java NIO之套接字通道
- 5.Java NIO之选择器
- 6.基于 Java NIO 实现简单的 HTTP 服务器
- 7.BIO-NIO-AIO
- 8.netty(一)
- 9.NIO面试题
- 六.Java设计模式
- 1.单例模式
- 2.策略模式
- 3.模板方法
- 4.适配器模式
- 5.简单工厂
- 6.门面模式
- 7.代理模式
- 七.数据结构和算法
- 1.什么是红黑树
- 2.二叉树
- 2.1 二叉树的前序、中序、后序遍历
- 3.排序算法汇总
- 4.java实现链表及链表的重用操作
- 4.1算法题-链表反转
- 5.图的概述
- 6.常见的几道字符串算法题
- 7.几道常见的链表算法题
- 8.leetcode常见算法题1
- 9.LRU缓存策略
- 10.二进制及位运算
- 10.1.二进制和十进制转换
- 10.2.位运算
- 11.常见链表算法题
- 12.算法好文推荐
- 13.跳表
- 八.Spring 全家桶
- 1.Spring IOC
- 2.Spring AOP
- 3.Spring 事务管理
- 4.SpringMVC 运行流程和手动实现
- 0.Spring 核心技术
- 5.spring如何解决循环依赖问题
- 6.springboot自动装配原理
- 7.Spring中的循环依赖解决机制中,为什么要三级缓存,用二级缓存不够吗
- 8.beanFactory和factoryBean有什么区别
- 九.数据库
- 1.mybatis
- 1.1 MyBatis-# 与 $ 区别以及 sql 预编译
- Mybatis系列1-Configuration
- Mybatis系列2-SQL执行过程
- Mybatis系列3-之SqlSession
- Mybatis系列4-之Executor
- Mybatis系列5-StatementHandler
- Mybatis系列6-MappedStatement
- Mybatis系列7-参数设置揭秘(ParameterHandler)
- Mybatis系列8-缓存机制
- 2.浅谈聚簇索引和非聚簇索引的区别
- 3.mysql 证明为什么用limit时,offset很大会影响性能
- 4.MySQL中的索引
- 5.数据库索引2
- 6.面试题收集
- 7.MySQL行锁、表锁、间隙锁详解
- 8.数据库MVCC详解
- 9.一条SQL查询语句是如何执行的
- 10.MySQL 的 crash-safe 原理解析
- 11.MySQL 性能优化神器 Explain 使用分析
- 12.mysql中,一条update语句执行的过程是怎么样的?期间用到了mysql的哪些log,分别有什么作用
- 十.Redis
- 0.快速复习回顾Redis
- 1.通俗易懂的Redis数据结构基础教程
- 2.分布式锁(一)
- 3.分布式锁(二)
- 4.延时队列
- 5.位图Bitmaps
- 6.Bitmaps(位图)的使用
- 7.Scan
- 8.redis缓存雪崩、缓存击穿、缓存穿透
- 9.Redis为什么是单线程、及高并发快的3大原因详解
- 10.布隆过滤器你值得拥有的开发利器
- 11.Redis哨兵、复制、集群的设计原理与区别
- 12.redis的IO多路复用
- 13.相关redis面试题
- 14.redis集群
- 十一.中间件
- 1.RabbitMQ
- 1.1 RabbitMQ实战,hello world
- 1.2 RabbitMQ 实战,工作队列
- 1.3 RabbitMQ 实战, 发布订阅
- 1.4 RabbitMQ 实战,路由
- 1.5 RabbitMQ 实战,主题
- 1.6 Spring AMQP 的 AMQP 抽象
- 1.7 Spring AMQP 实战 – 整合 RabbitMQ 发送邮件
- 1.8 RabbitMQ 的消息持久化与 Spring AMQP 的实现剖析
- 1.9 RabbitMQ必备核心知识
- 2.RocketMQ 的几个简单问题与答案
- 2.Kafka
- 2.1 kafka 基础概念和术语
- 2.2 Kafka的重平衡(Rebalance)
- 2.3.kafka日志机制
- 2.4 kafka是pull还是push的方式传递消息的?
- 2.5 Kafka的数据处理流程
- 2.6 Kafka的脑裂预防和处理机制
- 2.7 Kafka中partition副本的Leader选举机制
- 2.8 如果Leader挂了的时候,follower没来得及同步,是否会出现数据不一致
- 2.9 kafka的partition副本是否会出现脑裂情况
- 十二.Zookeeper
- 0.什么是Zookeeper(漫画)
- 1.使用docker安装Zookeeper伪集群
- 3.ZooKeeper-Plus
- 4.zk实现分布式锁
- 5.ZooKeeper之Watcher机制
- 6.Zookeeper之选举及数据一致性
- 十三.计算机网络
- 1.进制转换:二进制、八进制、十六进制、十进制之间的转换
- 2.位运算
- 3.计算机网络面试题汇总1
- 十四.Docker
- 100.面试题收集合集
- 1.美团面试常见问题总结
- 2.b站部分面试题
- 3.比心面试题
- 4.腾讯面试题
- 5.哈罗部分面试
- 6.笔记
- 十五.Storm
- 1.Storm和流处理简介
- 2.Storm 核心概念详解
- 3.Storm 单机版本环境搭建
- 4.Storm 集群环境搭建
- 5.Storm 编程模型详解
- 6.Storm 项目三种打包方式对比分析
- 7.Storm 集成 Redis 详解
- 8.Storm 集成 HDFS 和 HBase
- 9.Storm 集成 Kafka
- 十六.Elasticsearch
- 1.初识ElasticSearch
- 2.文档基本CRUD、集群健康检查
- 3.shard&replica
- 4.document核心元数据解析及ES的并发控制
- 5.document的批量操作及数据路由原理
- 6.倒排索引
- 十七.分布式相关
- 1.分布式事务解决方案一网打尽
- 2.关于xxx怎么保证高可用的问题
- 3.一致性hash原理与实现
- 4.微服务注册中心 Nacos 比 Eureka的优势
- 5.Raft 协议算法
- 6.为什么微服务架构中需要网关
- 0.CAP与BASE理论
- 十八.Dubbo
- 1.快速掌握Dubbo常规应用
- 2.Dubbo应用进阶
- 3.Dubbo调用模块详解
- 4.Dubbo调用模块源码分析
- 6.Dubbo协议模块