文章导航

Reactor线程模型

Netty的线程模型实际上就是Reactor模型的一种实现。

Reactor模型是基于事件驱动开发的,核心组成部分是一个Reactor和一个线程池,其中Reactor负责监听和分配事件,线程池负责处理事件。根据Reactor的数量有线程池的数量,又可以将Reactor分为三种模型:

  • 单线程模型(单Reactor,单线程)
  • 多线程模型(单Reactor,多线程)
  • 主从多线程模型(多Reactor,多线程)

单线程模型

YcVCsH.png

  • Reactor内部通过selector轮询连接,收到事件后,通过dispatch进行分发。
  • 如果是连接事件,则分发给Acceptor处理,Accepter通过accept接受连接,并创建一个Headler来处理连接后的各种事件。
  • 如果是读写事件,那么直接交由对应的Headelr进行处理。

多线程模型

YcZNNt.png

  • 主线程中,Reactor对象通过selector监控连接事件,收到事件后通过dispatch进行分发。
  • 如果是建立连接的事件,则Accepter负责处理,它会通过accept接受请求,并创建一个Headler来处理后序事件,Headler只负责相应事件,不进行业务操作,也就是只进行read读取数据和write写出数据,业务处理是交给线程池进行处理
  • 线程池分配一个线程来进行业务的处理,处理结果交由对应的Handler进行转发。

主从多线程模型

YcZgNq.png

  • 存在多个Reactor,每个Reactor都有自己的selector选择器,线程和dispatch
  • 主线程中的mainReactor通过自己的selector监控连接建立事件,收到事件后通过Accepter接受,将任务分配给某个子线程。
  • 子线程中的subReactormainReactor分配的连接加入连接队列中通过自己的selector进行监听,并创建一个Handler用于处理后序事件。
  • Handler完成read->业务处理->send的完整业务流程。

Netty中的线程模型与Reactor的联系

在Netty中主要是通过NioEventLoopGroup线程池来实现具体的线程模型的。

单线程模型

单线程模型就是指定一个线程执行客户端连接和读写操作,也就是在一个Reactor中完成。对应的实现方式就是将NioEventLoopGroup线程数设置为1.

Netty中是这样构造单线程模型的:

1
2
3
4
5
6
7
8
NioEventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(group)
.channel(NioServerSocketChannel.class)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ServerHandlerInitializer());

多线程模型

多线程模型就是当Reactor进行客户端的连接处理,然后业务处理交由线程池来执行。

Netty中是这样构造多线程模型的:

1
2
3
4
5
6
7
NioEventLoopGroup eventGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(eventGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ServerHandlerInitializer());

主从多线程模型(最常使用)

主从多线程模型是有多个Reactor,也就是有多个selector,所以我们定义一个bossGroup和一个workGroup

在Netty中是这样构建主从多线程模型的:

1
2
3
4
5
6
7
8
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ServerHandlerInitializer());

相较于多线程模型,主从多线程模型不会遇到处理连接的瓶颈问题。在多线程模型下,因为只有一个NIO的Acceptor来处理连接请求,所以会出现性能瓶颈。

NioEventLoop源码分析

在Netty线程模型中,NioEventLoop是比较关键的类。下面我们对它的实现进行分析。

Yc7gje.png

它的继承关系图如下:

YcoFud.png

NioEventLoop需要处理网络IO请求,因此有一个多路复用器Selector:

1
2
3
4
5
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;

private final SelectorProvider provider;

并且在构造方法中完成了初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}

NioEventLooprun()方法比较的关键:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
protected void run() {
for (;;) {
try {
try {
//通过hasTasks方法判断队列中是否还有未处理的方法
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
//没有任务则执行,select()执行网络IO
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
//如果本轮Selector的轮询结果为null,那么可能触发了jdk epoll的bug
//该bug会导致IO线程处于100%的状态,需要重建Selector来解决
rebuildSelector0();
handleLoopException(e);
continue;
}

cancelledKeys = 0;
needsToSelectAgain = false;
//处理IO事件所需的事件和花费在处理task的时间的比例,默认为50%
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
//如果比例为100.则表示每次处理完IO后,才开始处理task
processSelectedKeys();
} finally {

// 执行task任务
runAllTasks();
}
} else {
//记录处理IO的开始时间
final long ioStartTime = System.nanoTime();
try {
//处理IO请求
processSelectedKeys();
} finally {
//计算IO请求的耗时
final long ioTime = System.nanoTime() - ioStartTime;
//执行task。判断执行task任务时间是否超过配置的比例,如果超过则停止执行task
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}

重建Selector的方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;

if (oldSelector == null) {
return;
}

try {
//创建一个新的Selector
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}

// Register all channels to the new Selector.
int nChannels = 0;

for (SelectionKey key: oldSelector.keys()) {
//将原Selector上注册的所有SelectionKey转移到新的Selector
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}

int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}

//用新的Selector替换旧的
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;

try {
// time to close the old selector as everything else is registered to the new one
//关闭旧的Selector
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}

if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}

处理IO请求的是由processSelectedKey完成的,它的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();

// 省略代码 ......

try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}


if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}

if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
  1. 首先获取 Channel 的 NioUnsafe,所有的读写等操作都在 Channel 的 unsafe 类中操作。
  2. 获取 SelectionKey 就绪事件,如果是 OP_CONNECT,则说明已经连接成功,并把注册的 OP_CONNECT 事件取消。
  3. 如果是 OP_WRITE 事件,说明可以继续向 Channel 中写入数据,当写完数据后用户自己吧 OP_WRITE 事件取消掉。
  4. 如果是 OP_READ 或 OP_ACCEPT 事件,则调用 unsafe.read() 进行读取数据。unsafe.read() 中会调用到 ChannelPipeline 进行读取数据。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private final class NioMessageUnsafe extends AbstractNioUnsafe {

@Override
public void read() {
// 省略代码 ......
// 获取 Channel 对应的 ChannelPipeline
final ChannelPipeline pipeline = pipeline();

boolean closed = false;
Throwable exception = null;
try {
// 省略代码 ......
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 委托给 pipeline 中的 Handler 进行读取数据
pipeline.fireChannelRead(readBuf.get(i));
}

当 NioEventLoop 读取数据的时候会委托给 Channel 中的 unsafe 对象进行读取数据。
Unsafe中真正读取数据是交由 ChannelPipeline 来处理。
ChannelPipeline 中是注册的我们自定义的 Handler,然后由 ChannelPipeline中的 Handler 一个接一个的处理请求的数据。

作者:jijs
链接:https://www.jianshu.com/p/9e5e45a23309
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。