Java进阶之高并发核心Selector详解

网友投稿 252 2023-01-15

Java进阶之高并发核心Selector详解

一、Selector设计

笔者下载得是openjdk8的源码, 画出类图

linux2.6以后才有的EpollSelectorImpl

Windows平台是WindowsSelectorImpl

MacOSX平台是KQueueSelectorImpl

从名字也可以猜到,openjdk肯定在底层还是用epoll,kqueue,iocp这些技术来实现的I/O多路复用。

二、获取Selector

众所周知,Selector.open()可以得到一个Selector实例,怎么实现的呢?

// Selector.java

public static Selector open() throws IOException {

// 首先找到provider,然后再打开Selector

return SelectorProvider.provider().openSelector();

}

// java.nio.channels.spi.SelectorProvider

public static SelectorProvider provider() {

synchronized (lock) {

if (provider != null)

return provider;

return AccessController.doPrivileged(

new PrivilegedAction() {

public SelectorProvider run() {

if (loadProviderFromProperty())

return provider;

if (loadProviderAsService())

return provider;

// 这里就是打开Selector的真正方法

provider = sun.nio.ch.DefaultSelectorProvider.create();

return provider;

}

});

}

}

在openjdk中,每个操作系统都有一个sun.nio.ch.DefaultSelectorProvider实现,以solaris为例:

/**

* Returns the default SelectorProvider.

*/

public static SelectorProvider create() {

// 获取OS名称

String osname = AccessController

.doPrivileged(new GetPropertyAction("os.name"));

// 根据名称来创建不同的Selctor

if (osname.equals("SunOS"))

return createProvider("sun.nio.ch.DevPollSelectorProvider");

if (http://osname.equals("Linux"))

return createProvider("sun.nio.ch.EPollSelectorProvider");

return new sun.nio.ch.PollSelectorProvider();

}

如果系统名称是Linux的话,真正创建的是sun.nio.ch.EPollSelectorProvider。如果不是SunOS也不是Linux,就使用sun.nio.ch.PollSelectorProvider, 关于PollSelector有兴趣的读者自行了解下, 本文仅以实际常用的EpollSelector为例探讨。

打开sun.nio.ch.EPollSelectorProvider查看openSelector方法

public AbstractSelector openSelector() throws IOException {

return new EPollSelectorImpl(this);

}

很直观,这样我们在Linux平台就得到了最终的Selector实现:sun.nio.ch.EPollSelectorImpl

三、EPollSelector如何进行select

epoll系统调用主要分为3个函数

epoll_create: 创建一个epollfd,并开辟epoll自己的内核高速cache区,建立红黑树,分配好想要的size的内存对象,建立一个list链表,用于存储准备就绪的事件。epoll_wait: 等待内核返回IO事件epoll_ctl: 对新旧事件进行新增修改或者删除

3.1 Epoll fd的创建

EPollSelectorImpl的构造器代码如下:

EPollSelectorImpl(SelectorProvider sp) throws IOException {

super(sp);

// makePipe返回管道的2个文件描述符,编码在一个long类型的变量中

// 高32位代表读 低32位代表写

// 使用pipe为了实现Selector的wakeup逻辑

long pipeFds = IOUtil.makePipe(false);

fd0 = (int) (pipeFds >>> 32);

fd1 = (int) pipeFds;

// 新建一个EPollArrayWrapper

pollWrapper = new EPollArrayWrapper();

pollWrapper.initInterrupt(fd0, fd1);

fdToKey = new HashMap<>();

}

再看EPollArrayWrapper的初始化过程

EPollArrayWrapper() throws IOException {

// creates the epoll file descriptor

// 创建epoll fd

epfd = epollCreate();

// the epoll_event array passed to epoll_wait

int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;

pollArray = new AllocatedNativeObject(allocationSize, true);

pollArrayAddress = pollArray.address();

// eventHigh needed when using file descriptors > 64k

if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)

eventsHigh = new HashMap<>();

}

private native int epollCreate();

在初始化过程中调用了epollCreate方法,这是个native方法。

打开

jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c

EPollArrayWrapper() throws IOException {

// creates the epoll file descriptor

// 创建epoll fd

epfd = epollCreate();

// the epoll_event array passed to epoll_wait

int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;

pollArray = new AllocatedNativeObject(allocationSize, true);

pollArrayAddress = pollArray.address();

// eventHigh needed when using file descriptors > 64k

if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)

eventsHigh = new HashMap<>();

}

private native int epollCreate();

可以看到最后还是使用了操作系统的api: epoll_create函数

3.2 Epoll wait等待内核IO事件

调用Selector.select(),最后会委托给各个实现的doSelect方法,限于篇幅不贴出太详细的,这里看下EpollSelectorImpl的doSelect方法

protected int doSelect(long timeout) throws IOException {

if (closed)

throw new ClosedSelectorException();

processDeregisterQueue();

try {

begin();

// 真正的实现是这行

pollWrapper.poll(timeout);

} finally {

end();

}

processDeregisterQueue();

int numKeysUpdated = updateSelectedKeys();

// 以下基本都是异常处理

if (pollWrapper.interrupted()) {

// Clear the wakeup pipe

pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);

synchronized (interruptLock) {

pollWrapper.clearInterrupted();

IOUtil.drain(fd0);

interruptTriggered = false;

}

}

return numKeysUpdated;

}

然后我们去看pollWrapper.poll, 打开jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java:

int poll(long timeout) throws IOException {

updateRegistrations();

// 这个epollWait是不是有点熟悉呢?

updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);

for (int i=0; i

if (getDescriptor(i) == incomingInterruptFD) {

interruptedIndex = i;

interrupted = true;

break;

}

}

return updated;

}

private native int epollWait(long pollAddress, int numfds, long timeout,

int epfd) throws IOException;

epollWait也是个native方法,打开c代码一看:

JNIEXPORT jint JNICALL

Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,

jlong address, jint numfds,

jlong timeout, jint epfd)

{

struct epoll_event *events = jlong_to_ptr(address);

int res;

if (timeout <= 0) { /* Indefinite or no wait */

// 发起epoll_wait系统调用等待内核事件

RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);

} else { /* Bounded wait; bounded restarts */

res = iepoll(epfd, events, numfds, timeout);

}

if (res < 0) {

JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");

}

return res;

}

=

可以看到,最后还是发起的epoll_wait系统调用.

3.3 epoll control以及openjdk对事件管理的封装

JDK中对于注册到Selector上的IO事件关系是使用SelectionKey来表示,代表了Channel感兴趣的事件,如Read,Write,Connect,Accept.

调用Selector.register()时均会将事件存储到EpollArrayWrapper的成员变量eventsLow和eventsHigh中

// events for file descriptors with registration changes pending, indexed

// by file descriptor and stored as bytes for efficiency reasons. For

// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at

// least) then the update is stored in a map.

// 使用数组保存事件变更, 数组的最大长度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024

private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];

// 超过数组长度的事件会缓存到这个map中,等待下次处理

private Map eventsHigh;

/**

* Sets the pending update events for the given file descriptor. This

* method has no effect if the update events is already set to KILLED,

* unless {@code force} is {@code true}.

*/

private void setUpdateEvents(int fd, byte events, boolean force) {

// 判断fd和数组长度

if (fd < MAX_UPDATE_ARRAY_SIZE) {

if ((eventsLow[fd] != KILLED) || force) {

eventsLow[fd] = events;

}

} else {

Integer key = Integer.valueOf(fd);

if (!isEventsHighKilled(key) || force) {

eventsHigh.put(key, Byte.valueOf(events));

}

}

}

上面看到EpollArrayWrapper.poll()的时候, 首先会调用updateRegistrations

/**

* Returns the pending update events for the given file descriptor.

*/

private byte getUpdateEvents(int fd) {

if (fd < MAX_UPDATE_ARRAY_SIZE) {

return eventsLow[fd];

} else {

Byte result = eventsHigh.get(Integer.valueOf(fd));

// result should never be null

return result.byteValue();

}

}

/**

* Update the pending registrations.

*/

private void updateRegistrations() {

synchronized (updateLock) {

int j = 0;

while (j < updateCount) {

int fd = updateDescriptors[j];

// 从保存的eventsLow和eventsHigh里取出事件

short events = getUpdateEvents(fd);

boolean isRegistered = registered.get(fd);

int opcode = 0;

if (events != KILLED) {

// 判断操作类型以传给epoll_ctl

// 没有指定EPOLLET事件类型

if (isRegistered) {

opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;

} else {

opcode = (events != 0) ? EPOLL_CTL_ADD : 0;

}

if (opcode != 0) {

// 熟悉的epoll_ctl

epollCtl(epfd, opcode, fd, events);

if (opcode == EPOLL_CTL_ADD) {

registered.set(fd);

} else if (opcode == EPOLL_CTL_DEL) {

registered.clear(fd);

}

}

}

j++;

}

updateCount = 0;

}

}

private native void epollCtl(int epfd, int opcode, int fd, int events);

在获取到事件之后将操作委托给了epollCtl,这又是个native方法,打开相应的c代码一看:

JNIEXPORT void JNICALL

Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,

jint opcode, jint fd, jint events)

{

struct epoll_event event;

int res;

event.events = events;

event.data.fd = fd;

// 发起epoll_ctl调用来进行IO事件的管理

RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);

/*

* A channel may be registered with several Selectors. When each Selector

* is polled a EPOLL_CTL_DEL op will be inserted into its pending update

* list to remove the file descriptor from epoll. The "last" Selector will

* close the file descriptor which automatically unregisters it from each

* epoll descriptor. To avoid costly synchronization between Selectors we

* allow pending updates to be processed, ignoring errors. The errors are

* harmless as the last update for the file descriptor is guaranteed to

* be EPOLL_CTL_DEL.

*/

if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {

JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");

}

}

原来还是我们的老朋友epoll_ctl.

有个小细节是jdk没有指定ET(边缘触发)还是LT(水平触发),所以默认会用LT:)

在AbstractSelectorImpl中有3个set保存事件

// Public views of the key sets

// 注册的所有事件

private Set publicKeys; // Immutable

// 内核返回的IO事件封装,表示哪些fd有数据可读可写

private Set publicSelectedKeys; // Removal allowed, but not addition

// 取消的事件

private final Set cancelledKeys = new HashSet();

在EpollArrayWrapper.poll调用完成之后, 会调用updateSelectedKeys来更新上面的仨set

private int updateSelectedKeys() {

int entries = pollWrapper.updated;

int numKeysUpdated = 0;

for (int i=0; i

int nextFD = pollWrapper.getDescriptor(i);

SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));

// ski is null in the case of an interrupt

if (ski != null) {

int rOps = pollWrapper.getEventOps(i);

if (selectedKeys.contains(ski)) {

if (ski.channel.translateAndSetReadyOps(rOps, ski)) {

numKeysUpdated++;

}

} else {

ski.channel.translateAndSetReadyOps(rOps, ski);

if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {

selectedKeys.add(ski);

numKeysUpdated++;

}

}

}

}

return numKeysUpdated;

代码很直白,拿出事件对set比对操作。

四、Selector类的相关方法

重点注意四个方法

select(): 这是一个阻塞方法,调用该方法,会阻塞,直到返回一个有事件发生的selectionKey集合

selectNow() :非阻塞方法,获取不到有事件发生的selectionKey集合,也会立即返回

select(long):阻塞方法,如果没有获取到有事件发生的selectionKey集合,阻塞指定的long时间

selectedKeys(): 返回全部selectionKey集合,不管是否有事件发生

可以理解:selector一直在监听select()

五、Selector、SelectionKey、ServerScoketChannel、ScoketChannel的关系

Server代码:

public class NIOServer {

public static void main(String[] args) throws Exception{

//创建ServerSocketChannel -> ServerSocket

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

//得到一个Selecor对象

Selector selector = Selector.open();

//绑定一个端口6666, 在服务器端监听

serverSocketChannel.socket().bind(new InetSocketAddress(6666));

//设置为非阻塞

serverSocketChannel.configureBlocking(false);

//把 serverSocketChannel 注册到 selector 关心 事件为 OP_ACCEPT

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

System.out.println("注册后的selectionkey 数量=" + selector.keys().size()); // 1

//循环等待客户端连接

while (true) {

//这里我们等待1秒,如果没有事件发生, 返回

if(selector.select(1000) == 0) { //没有事件发生

System.out.println("服务器等待了1秒,无连接");

continue;

}

//如果返回的>0, 就获取到相关的 selectionKey集合

//1.如果返回的>0, 表示已经获取到关注的事件

//2. selector.selectedKeys() 返回关注事件的集合

// 通过 selectionKeys 反向获取通道

Set selectionKeys = selector.selectedKeys();

System.out.println("selectionKeys 数量 = " + selectionKeys.size());

//遍历 Set, 使用迭代器遍历

Iterator keyIterator = selectionKeys.iterator();

while (keyIterator.hasNext()) {

//获取到SelectionKey

SelectionKey key = keyIterator.next();

//根据key 对应的通道发生的事件做相应处理

if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客户端连接

//该该客户端生成一个 SocketChannel

SocketChannel socketChannel = serverSocketChannel.accept();

System.out.println("客户端连接成功 生成了一个 socketChannel " + socketChannel.hashCode());

//将 SocketChannel 设置为非阻塞

socketChannel.configureBlocking(false);

//将socketChannel 注册到selector, 关注事件为 OP_READ, 同时给socketChannel

//关联一个Buffer

socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));

System.out.println("客户端连接后 ,注册的selectionkey 数量=" + selector.keys().size()); //2,3,4..

}

if(key.isReadable()) { //发生 OP_READ

//通过key 反向获取到对应channel

SocketChannel channel = (SocketChannel)key.channel();

//获取到该channel关联的buffer

ByteBuffer buffer = (ByteBuffer)key.attachment();

channel.read(buffer);

System.out.println("form 客户端 " + new String(buffer.array()));

}

//手动从集合中移动当前的selectionKey, 防止重复操作

keyIterator.remove();

}

}

}

}

Client代码

public class NIOClient {

public static void main(String[] args) throws Exception{

//得到一个网络通道

SocketChannel socketChannel = SocketChannel.open();

//设置非阻塞

socketChannel.configureBlocking(false);

//提供服务器端的ip 和 端口

InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);

//连接服务器

if (!socketChannel.connect(inetSocketAddress)) {

while (!socketChannel.finishConnect()) {

System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作..");

}

}

//...如果连接成功,就发送数据

Strinhttp://g str = "hello, 尚硅谷~";

//Wraps a byte array into a buffer

ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());

//发送数据,将 buffer 数据写入 channel

socketChannel.write(buffer);

System.in.read();

}

}

六、总结

if (getDescriptor(i) == incomingInterruptFD) {

interruptedIndex = i;

interrupted = true;

break;

}

}

return updated;

}

private native int epollWait(long pollAddress, int numfds, long timeout,

int epfd) throws IOException;

epollWait也是个native方法,打开c代码一看:

JNIEXPORT jint JNICALL

Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,

jlong address, jint numfds,

jlong timeout, jint epfd)

{

struct epoll_event *events = jlong_to_ptr(address);

int res;

if (timeout <= 0) { /* Indefinite or no wait */

// 发起epoll_wait系统调用等待内核事件

RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);

} else { /* Bounded wait; bounded restarts */

res = iepoll(epfd, events, numfds, timeout);

}

if (res < 0) {

JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");

}

return res;

}

=

可以看到,最后还是发起的epoll_wait系统调用.

3.3 epoll control以及openjdk对事件管理的封装

JDK中对于注册到Selector上的IO事件关系是使用SelectionKey来表示,代表了Channel感兴趣的事件,如Read,Write,Connect,Accept.

调用Selector.register()时均会将事件存储到EpollArrayWrapper的成员变量eventsLow和eventsHigh中

// events for file descriptors with registration changes pending, indexed

// by file descriptor and stored as bytes for efficiency reasons. For

// file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at

// least) then the update is stored in a map.

// 使用数组保存事件变更, 数组的最大长度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024

private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];

// 超过数组长度的事件会缓存到这个map中,等待下次处理

private Map eventsHigh;

/**

* Sets the pending update events for the given file descriptor. This

* method has no effect if the update events is already set to KILLED,

* unless {@code force} is {@code true}.

*/

private void setUpdateEvents(int fd, byte events, boolean force) {

// 判断fd和数组长度

if (fd < MAX_UPDATE_ARRAY_SIZE) {

if ((eventsLow[fd] != KILLED) || force) {

eventsLow[fd] = events;

}

} else {

Integer key = Integer.valueOf(fd);

if (!isEventsHighKilled(key) || force) {

eventsHigh.put(key, Byte.valueOf(events));

}

}

}

上面看到EpollArrayWrapper.poll()的时候, 首先会调用updateRegistrations

/**

* Returns the pending update events for the given file descriptor.

*/

private byte getUpdateEvents(int fd) {

if (fd < MAX_UPDATE_ARRAY_SIZE) {

return eventsLow[fd];

} else {

Byte result = eventsHigh.get(Integer.valueOf(fd));

// result should never be null

return result.byteValue();

}

}

/**

* Update the pending registrations.

*/

private void updateRegistrations() {

synchronized (updateLock) {

int j = 0;

while (j < updateCount) {

int fd = updateDescriptors[j];

// 从保存的eventsLow和eventsHigh里取出事件

short events = getUpdateEvents(fd);

boolean isRegistered = registered.get(fd);

int opcode = 0;

if (events != KILLED) {

// 判断操作类型以传给epoll_ctl

// 没有指定EPOLLET事件类型

if (isRegistered) {

opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;

} else {

opcode = (events != 0) ? EPOLL_CTL_ADD : 0;

}

if (opcode != 0) {

// 熟悉的epoll_ctl

epollCtl(epfd, opcode, fd, events);

if (opcode == EPOLL_CTL_ADD) {

registered.set(fd);

} else if (opcode == EPOLL_CTL_DEL) {

registered.clear(fd);

}

}

}

j++;

}

updateCount = 0;

}

}

private native void epollCtl(int epfd, int opcode, int fd, int events);

在获取到事件之后将操作委托给了epollCtl,这又是个native方法,打开相应的c代码一看:

JNIEXPORT void JNICALL

Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,

jint opcode, jint fd, jint events)

{

struct epoll_event event;

int res;

event.events = events;

event.data.fd = fd;

// 发起epoll_ctl调用来进行IO事件的管理

RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);

/*

* A channel may be registered with several Selectors. When each Selector

* is polled a EPOLL_CTL_DEL op will be inserted into its pending update

* list to remove the file descriptor from epoll. The "last" Selector will

* close the file descriptor which automatically unregisters it from each

* epoll descriptor. To avoid costly synchronization between Selectors we

* allow pending updates to be processed, ignoring errors. The errors are

* harmless as the last update for the file descriptor is guaranteed to

* be EPOLL_CTL_DEL.

*/

if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {

JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");

}

}

原来还是我们的老朋友epoll_ctl.

有个小细节是jdk没有指定ET(边缘触发)还是LT(水平触发),所以默认会用LT:)

在AbstractSelectorImpl中有3个set保存事件

// Public views of the key sets

// 注册的所有事件

private Set publicKeys; // Immutable

// 内核返回的IO事件封装,表示哪些fd有数据可读可写

private Set publicSelectedKeys; // Removal allowed, but not addition

// 取消的事件

private final Set cancelledKeys = new HashSet();

在EpollArrayWrapper.poll调用完成之后, 会调用updateSelectedKeys来更新上面的仨set

private int updateSelectedKeys() {

int entries = pollWrapper.updated;

int numKeysUpdated = 0;

for (int i=0; i

int nextFD = pollWrapper.getDescriptor(i);

SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));

// ski is null in the case of an interrupt

if (ski != null) {

int rOps = pollWrapper.getEventOps(i);

if (selectedKeys.contains(ski)) {

if (ski.channel.translateAndSetReadyOps(rOps, ski)) {

numKeysUpdated++;

}

} else {

ski.channel.translateAndSetReadyOps(rOps, ski);

if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {

selectedKeys.add(ski);

numKeysUpdated++;

}

}

}

}

return numKeysUpdated;

代码很直白,拿出事件对set比对操作。

四、Selector类的相关方法

重点注意四个方法

select(): 这是一个阻塞方法,调用该方法,会阻塞,直到返回一个有事件发生的selectionKey集合

selectNow() :非阻塞方法,获取不到有事件发生的selectionKey集合,也会立即返回

select(long):阻塞方法,如果没有获取到有事件发生的selectionKey集合,阻塞指定的long时间

selectedKeys(): 返回全部selectionKey集合,不管是否有事件发生

可以理解:selector一直在监听select()

五、Selector、SelectionKey、ServerScoketChannel、ScoketChannel的关系

Server代码:

public class NIOServer {

public static void main(String[] args) throws Exception{

//创建ServerSocketChannel -> ServerSocket

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

//得到一个Selecor对象

Selector selector = Selector.open();

//绑定一个端口6666, 在服务器端监听

serverSocketChannel.socket().bind(new InetSocketAddress(6666));

//设置为非阻塞

serverSocketChannel.configureBlocking(false);

//把 serverSocketChannel 注册到 selector 关心 事件为 OP_ACCEPT

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

System.out.println("注册后的selectionkey 数量=" + selector.keys().size()); // 1

//循环等待客户端连接

while (true) {

//这里我们等待1秒,如果没有事件发生, 返回

if(selector.select(1000) == 0) { //没有事件发生

System.out.println("服务器等待了1秒,无连接");

continue;

}

//如果返回的>0, 就获取到相关的 selectionKey集合

//1.如果返回的>0, 表示已经获取到关注的事件

//2. selector.selectedKeys() 返回关注事件的集合

// 通过 selectionKeys 反向获取通道

Set selectionKeys = selector.selectedKeys();

System.out.println("selectionKeys 数量 = " + selectionKeys.size());

//遍历 Set, 使用迭代器遍历

Iterator keyIterator = selectionKeys.iterator();

while (keyIterator.hasNext()) {

//获取到SelectionKey

SelectionKey key = keyIterator.next();

//根据key 对应的通道发生的事件做相应处理

if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客户端连接

//该该客户端生成一个 SocketChannel

SocketChannel socketChannel = serverSocketChannel.accept();

System.out.println("客户端连接成功 生成了一个 socketChannel " + socketChannel.hashCode());

//将 SocketChannel 设置为非阻塞

socketChannel.configureBlocking(false);

//将socketChannel 注册到selector, 关注事件为 OP_READ, 同时给socketChannel

//关联一个Buffer

socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));

System.out.println("客户端连接后 ,注册的selectionkey 数量=" + selector.keys().size()); //2,3,4..

}

if(key.isReadable()) { //发生 OP_READ

//通过key 反向获取到对应channel

SocketChannel channel = (SocketChannel)key.channel();

//获取到该channel关联的buffer

ByteBuffer buffer = (ByteBuffer)key.attachment();

channel.read(buffer);

System.out.println("form 客户端 " + new String(buffer.array()));

}

//手动从集合中移动当前的selectionKey, 防止重复操作

keyIterator.remove();

}

}

}

}

Client代码

public class NIOClient {

public static void main(String[] args) throws Exception{

//得到一个网络通道

SocketChannel socketChannel = SocketChannel.open();

//设置非阻塞

socketChannel.configureBlocking(false);

//提供服务器端的ip 和 端口

InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);

//连接服务器

if (!socketChannel.connect(inetSocketAddress)) {

while (!socketChannel.finishConnect()) {

System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作..");

}

}

//...如果连接成功,就发送数据

Strinhttp://g str = "hello, 尚硅谷~";

//Wraps a byte array into a buffer

ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());

//发送数据,将 buffer 数据写入 channel

socketChannel.write(buffer);

System.in.read();

}

}

六、总结

int nextFD = pollWrapper.getDescriptor(i);

SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));

// ski is null in the case of an interrupt

if (ski != null) {

int rOps = pollWrapper.getEventOps(i);

if (selectedKeys.contains(ski)) {

if (ski.channel.translateAndSetReadyOps(rOps, ski)) {

numKeysUpdated++;

}

} else {

ski.channel.translateAndSetReadyOps(rOps, ski);

if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {

selectedKeys.add(ski);

numKeysUpdated++;

}

}

}

}

return numKeysUpdated;

代码很直白,拿出事件对set比对操作。

四、Selector类的相关方法

重点注意四个方法

select(): 这是一个阻塞方法,调用该方法,会阻塞,直到返回一个有事件发生的selectionKey集合

selectNow() :非阻塞方法,获取不到有事件发生的selectionKey集合,也会立即返回

select(long):阻塞方法,如果没有获取到有事件发生的selectionKey集合,阻塞指定的long时间

selectedKeys(): 返回全部selectionKey集合,不管是否有事件发生

可以理解:selector一直在监听select()

五、Selector、SelectionKey、ServerScoketChannel、ScoketChannel的关系

Server代码:

public class NIOServer {

public static void main(String[] args) throws Exception{

//创建ServerSocketChannel -> ServerSocket

ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

//得到一个Selecor对象

Selector selector = Selector.open();

//绑定一个端口6666, 在服务器端监听

serverSocketChannel.socket().bind(new InetSocketAddress(6666));

//设置为非阻塞

serverSocketChannel.configureBlocking(false);

//把 serverSocketChannel 注册到 selector 关心 事件为 OP_ACCEPT

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

System.out.println("注册后的selectionkey 数量=" + selector.keys().size()); // 1

//循环等待客户端连接

while (true) {

//这里我们等待1秒,如果没有事件发生, 返回

if(selector.select(1000) == 0) { //没有事件发生

System.out.println("服务器等待了1秒,无连接");

continue;

}

//如果返回的>0, 就获取到相关的 selectionKey集合

//1.如果返回的>0, 表示已经获取到关注的事件

//2. selector.selectedKeys() 返回关注事件的集合

// 通过 selectionKeys 反向获取通道

Set selectionKeys = selector.selectedKeys();

System.out.println("selectionKeys 数量 = " + selectionKeys.size());

//遍历 Set, 使用迭代器遍历

Iterator keyIterator = selectionKeys.iterator();

while (keyIterator.hasNext()) {

//获取到SelectionKey

SelectionKey key = keyIterator.next();

//根据key 对应的通道发生的事件做相应处理

if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客户端连接

//该该客户端生成一个 SocketChannel

SocketChannel socketChannel = serverSocketChannel.accept();

System.out.println("客户端连接成功 生成了一个 socketChannel " + socketChannel.hashCode());

//将 SocketChannel 设置为非阻塞

socketChannel.configureBlocking(false);

//将socketChannel 注册到selector, 关注事件为 OP_READ, 同时给socketChannel

//关联一个Buffer

socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));

System.out.println("客户端连接后 ,注册的selectionkey 数量=" + selector.keys().size()); //2,3,4..

}

if(key.isReadable()) { //发生 OP_READ

//通过key 反向获取到对应channel

SocketChannel channel = (SocketChannel)key.channel();

//获取到该channel关联的buffer

ByteBuffer buffer = (ByteBuffer)key.attachment();

channel.read(buffer);

System.out.println("form 客户端 " + new String(buffer.array()));

}

//手动从集合中移动当前的selectionKey, 防止重复操作

keyIterator.remove();

}

}

}

}

Client代码

public class NIOClient {

public static void main(String[] args) throws Exception{

//得到一个网络通道

SocketChannel socketChannel = SocketChannel.open();

//设置非阻塞

socketChannel.configureBlocking(false);

//提供服务器端的ip 和 端口

InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);

//连接服务器

if (!socketChannel.connect(inetSocketAddress)) {

while (!socketChannel.finishConnect()) {

System.out.println("因为连接需要时间,客户端不会阻塞,可以做其它工作..");

}

}

//...如果连接成功,就发送数据

Strinhttp://g str = "hello, 尚硅谷~";

//Wraps a byte array into a buffer

ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());

//发送数据,将 buffer 数据写入 channel

socketChannel.write(buffer);

System.in.read();

}

}

六、总结

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Java ThreadLocal的使用场景总结
下一篇:天气位置信息免费api(定位实时天气预报下载)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~