【从入门到放弃-Java】并发编程-NIO-Selector

前言

前两篇【从入门到放弃-Java】并发编程-NIO-Channel【从入门到放弃-Java】并发编程-NIO-Buffer中我们学习了NIO中两个重要的概念Channel和Buffer。
今天我们来看下另一个重要的内容 Selector

简介

Selector是多路复用器,会不断轮询已经注册了的Channel。当有注册的channel产生连接、读、写等事件时,就会被Selector发现,从而可以进行相关后续操作。

Selector的好处是,可以通过一个线程来管理多个通道,减少了创建线程的资源占用及线程切换带来的消耗

Selector

SelectableChannel可以通过SelectionKey(记录channel和selector的注册关系)注册到Selector上。Selector维护了三个SelectionKey集合:

  • key set:存放了Selector上已经注册了的Channel的key。可以通过keys()方法获取。
  • selected-key set:当之前注册感兴趣的事件到达时,set中的keys会被更新或添加,set中维护了当前至少有一个可以操作的事件的channel key的集合。是key set的子集。可以使用selectedKeys()获取。
  • cancelled-key:存放已经调用cancel方法取消,等待下次操作时会调用deregister取消注册的channel,调用deregister后,所有的set中都没有这个channel的key了。

open

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Opens a selector.
*
* <p> The new selector is created by invoking the {@link
* java.nio.channels.spi.SelectorProvider#openSelector openSelector} method
* of the system-wide default {@link
* java.nio.channels.spi.SelectorProvider} object. </p>
*
* @return A new selector
*
* @throws IOException
* If an I/O error occurs
*/
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}

开启selector,具体的实现会根据操作系统类型创建不同的实现类,如macOS下实际上是new了一个KQueueSelectorProvider实例,低层基于操作系统的kqueue实现。

register

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
//新建一个SelectionKey,记录channel与selector之间的注册关系
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);

//前置操作,这里主要是判断下selector是否还处于open状态
// register (if needed) before adding to key set
implRegister(k);

// 添加selectionKey至key set
// add to the selector's key set, removing it immediately if the selector
// is closed. The key is not in the channel's key set at this point but
// it may be observed by a thread iterating over the selector's key set.
keys.add(k);
try {
// 更新注册的事件码
k.interestOps(ops);
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}

注册selector和channel之间的事件关系。

select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// timeout超时
@Override
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}

@Override
public final int select() throws IOException {
return lockAndDoSelect(null, -1);
}

// 不阻塞
@Override
public final int selectNow() throws IOException {
return lockAndDoSelect(null, 0);
}

private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
synchronized (this) {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
return doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
}

protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
assert Thread.holdsLock(this);

// 如果timeout = 0时,不阻塞
long to = Math.min(timeout, Integer.MAX_VALUE); // max kqueue timeout
boolean blocking = (to != 0);
boolean timedPoll = (to > 0);

int numEntries;
processUpdateQueue();
processDeregisterQueue();
try {
// 设置interrupt 可以处理中断信号 防止线程一直阻塞
begin(blocking);

// 轮询的监听,直到有注册的事件发生或超时。
do {
long startTime = timedPoll ? System.nanoTime() : 0;
numEntries = KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);
if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
// timed poll interrupted so need to adjust timeout
long adjust = System.nanoTime() - startTime;
to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
if (to <= 0) {
// timeout expired so no retry
numEntries = 0;
}
}
} while (numEntries == IOStatus.INTERRUPTED);
assert IOStatus.check(numEntries);

} finally {
end(blocking);
}
processDeregisterQueue();
return processEvents(numEntries, action);
}

selectedKeys

1
2
3
4
public final Set<SelectionKey> selectedKeys() {
ensureOpen();
return publicSelectedKeys;
}

获取被事件唤醒的key
注意:当被遍历处理selectedKeys时,key被处理完需要手动remove掉,防止下次被重复消费,selectedKeys不会帮你删除已处理过的key。

close

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public final void close() throws IOException {
boolean open = selectorOpen.getAndSet(false);
if (!open)
return;
implCloseSelector();
}


public final void implCloseSelector() throws IOException {
//通知处于阻塞的select方法立即返回
wakeup();
synchronized (this) {
implClose();
synchronized (publicSelectedKeys) {
// 遍历所有的SelectionKey,取消注册
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
selectedKeys.remove(ski);
i.remove();
}
assert selectedKeys.isEmpty() && keys.isEmpty();
}
}
}

SelectionKey

SelectionKey在channel register时创建。用来记录channel和selector之间的注册事件关系。
事件主要有:

  • OP_READ
  • OP_WRITE
  • OP_CONNECT
  • OP_ACCEPT

每个SelectionKey有两个由整数表示的操作集合,用来标识channel支持的操作类型。

interest set:是在创建SelectionKey时定义的,当集合中的操作发生时,将会把channel置为ready状态
ready set:检测到selector中已经就绪的操作类型集合

channel

1
2
3
public SelectableChannel channel() {
return (SelectableChannel)channel;
}

获取SelectionKey中的channel

selector

1
2
3
public Selector selector() {
return selector;
}

获取SelectionKey中的selector

isReadable

1
2
3
public final boolean isReadable() {
return (readyOps() & OP_READ) != 0;
}

根据readyOps(readySet)判断channel是否是可读状态

isWritable

1
2
3
public final boolean isWritable() {
return (readyOps() & OP_WRITE) != 0;
}

根据readyOps(readySet)判断channel是否是可写状态

isConnectable

1
2
3
public final boolean isConnectable() {
return (readyOps() & OP_CONNECT) != 0;
}

根据readyOps(readySet)判断channel是否是connect状态,通常是客户端使用,判断连接是否建立

isReadable

1
2
3
public final boolean isAcceptable() {
return (readyOps() & OP_ACCEPT) != 0;
}

根据readyOps(readySet)判断channel是否是accept状态,通常是服务端使用,判断是否有客户端请求建立连接

总结

通过使用selector,可以使用一个线程来管理多个连接。需要注意的一点是,通常读、写操作都是比较耗时的,为了提高服务端的性能应该把Selector::select和read、write的具体处理逻辑在不同的线程中处理。
即:使用一个线程来进行select,只做分发。在获取到就绪的SelectionKey后,通过线程池在不同的线程中处理读写操作。

通过学习完NIO相关的知识,我们可以很清楚的回答下面这个问题

  • 问:基于BIO实现的server端,当建立100个连接时,需要多少个线程?基于NIO实现的呢?

  • 答:基于BIO实现的server端,通常需要由一个线程accept,并为每个新建立的连接创建一个线程去处理IO操作,因此需要 1个accept线程+100个IO线程
    基于NIO实现的server端,使用Selector多路复用机制,由一个线程进行select,为了提高并发可以使用线程池来处理IO操作,通常为了发挥CPU的性能会创建(cpu核数 x 2)个线程来处理IO操作。因此需要 1个select线程 + cpu核数 x 2 个IO线程