protectedvoidrun(){ for (;;) { try { try { //通过hasTasks方法判断队列中是否还有未处理的方法 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue;
case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO //没有任务则执行,select()执行网络IO case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } // fall through default: } } catch (IOException e) { //如果本轮Selector的轮询结果为null,那么可能触发了jdk epoll的bug //该bug会导致IO线程处于100%的状态,需要重建Selector来解决 rebuildSelector0(); handleLoopException(e); continue; }
privatevoidrebuildSelector0(){ final Selector oldSelector = selector; final SelectorTuple newSelectorTuple;
if (oldSelector == null) { return; }
try { //创建一个新的Selector newSelectorTuple = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; }
// Register all channels to the new Selector. int nChannels = 0; for (SelectionKey key: oldSelector.keys()) { //将原Selector上注册的所有SelectionKey转移到新的Selector Object a = key.attachment(); try { if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; }
int interestOps = key.interestOps(); key.cancel(); SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a); if (a instanceof AbstractNioChannel) { // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } }
try { // time to close the old selector as everything else is registered to the new one //关闭旧的Selector oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } }
if (logger.isInfoEnabled()) { logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); } }
privatevoidprocessSelectedKey(SelectionKey k, AbstractNioChannel ch){ final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 省略代码 ......
try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);
unsafe.finishConnect(); }
if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }
protectedvoidrun(){ for (;;) { try { try { //通过hasTasks方法判断队列中是否还有未处理的方法 switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue;
case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO //没有任务则执行,select()执行网络IO case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } // fall through default: } } catch (IOException e) { //如果本轮Selector的轮询结果为null,那么可能触发了jdk epoll的bug //该bug会导致IO线程处于100%的状态,需要重建Selector来解决 rebuildSelector0(); handleLoopException(e); continue; }
privatevoidrebuildSelector0(){ final Selector oldSelector = selector; final SelectorTuple newSelectorTuple;
if (oldSelector == null) { return; }
try { //创建一个新的Selector newSelectorTuple = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; }
// Register all channels to the new Selector. int nChannels = 0; for (SelectionKey key: oldSelector.keys()) { //将原Selector上注册的所有SelectionKey转移到新的Selector Object a = key.attachment(); try { if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; }
int interestOps = key.interestOps(); key.cancel(); SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a); if (a instanceof AbstractNioChannel) { // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } }
try { // time to close the old selector as everything else is registered to the new one //关闭旧的Selector oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } }
if (logger.isInfoEnabled()) { logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); } }
privatevoidprocessSelectedKey(SelectionKey k, AbstractNioChannel ch){ final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 省略代码 ......
try { int readyOps = k.readyOps(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);
unsafe.finishConnect(); }
if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }