Netty NioEventLoop 코드 레벨 분석
Netty 의 NioEventLoop.java 코드 중 핵심이 되는 run() 메서드 분석
MacOS 를 사용중이기 때문에 KqueueSelectorImpl.java 가 사용되었다.
EventLoop.java 는 두가지를 처리한다.
- I/O : Channel 로 부터 발생한 Event 를 받아 I/O
- Non I/O : Task Queue 에 들어있는 Task 처리.
NioEventLoop 생성
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
....
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
children[i] = newChild(executor, args);
Netty 의 EventLoop 는 EventLoopGroup 에 여러개가 포함되어있는 구조이다. 따라서 NioEventLoopGroup 에서 NioEventLoop 를 지정한 갯수만큼 생성하고 있다.
public final class NioEventLoop extends SingleThreadEventLoop {
NioEventLoop.java 는 SingleThreadEventLoop.java 를 상속받고 있다. 따라서 SingleThreadEventExecutor.java 의 execute() 함수에 의해서 Single Thread 로 run() 이 실행된다.
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
run() 이 실행되면 shutdown 시그널을 받기 전까지 for(;;) 로 무한 loop 를 돌게 된다.
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
strategy 는 다음 단계에서 어떤 프로세스로 실행이 되어야 하는지를 결정한다.
strategy 는 아래 세가지 종류가 있다.
- SELECT : Blocking IO 가 실행되어야 할때. 즉 실행해야할 Task 가 없어서 대기 상태로 들어가야 한다.
- CONTINUE : IO loop 가 Blokcing IO 를 실행하지 않고 다시 실행되어야 할때
- BUSY_WAIT : Blocking 없이 새로운 Event 를 polling
strategy 가 0 이상이면 처리해야할 Task 가 있음을 의미한다.
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
calculateStrategy() 내부 구현을 보면 hasTasks 가 true 인 경우에는 전달받은 selectSupplier (KqueueEventLoop 의 Supplier) 를 호출해서 event count 를 얻어오게 된다.
hasTasks 가 false 인 경우는 처리할 Task 가 없으므로 Blocking 상태로 event 를 기다린다.
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
ioRatio 는 EventLoop Thread 가 I/O 처리에 소비할 시간과 non I/O 처리에 소비할 시간의 비율을 정한다.
default 는 50 이며 0 부터 100 까지의 숫자를 세팅할 수 있다. 100 으로 설정된 경우에 이 기능이 disable 된다.
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
따라서 ioRatio == 100 인 경우에 별도의 처리 없이 바로 task 를 처리한다.
strategy = 1 이고 ioRatio = 50 으로 세팅되어있으므로 아래 로직을 타게 된다.
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
IO 가 실행되므로 ioStartTime 을 기록하고 시작한다.
processSelectedKeys() 가 호출되며 Select 된 Channel 이 실행된다.
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
select 된 key 사이즈만큼 loop 를 돌면서 모든 Channel 을 처리하게 된다.
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
...
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
Select 된 key 내부에는 Channel 객체가 존재한다.
해당 key 의 ReadyOps 가 실행되어야할 operation 을 의미한다.
ReadyOPs 에는 아래와 같이 4가지 종류가 존재한다.
- OP_READ : read 명령
- OP_WRITE : write 명령
- OP_CONNECT : sokcet connect 명령
- OP_ACCEPT : socket accept 명령
각 명령에 해당하는 액션을 Channel 을 통해서 처리하게 된다.
read() 의 경우에는 readBuf 에서 데이터를 읽어온다.
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
read 의 경우에 Channel 에 pipeline.fireChannelRead() 를 호출하여 pipeLine 에서 실행되어야 하는 각 이벤트별 핸들러를 호출해준다.
이렇게 처리해야하는 Channel 를 모두 처리하게 되면 다시 아래 코드로 돌아와서 ioRatio 에 따라 IO 를 더 처리할지 말지 를 결정하게 된다.
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
아까 처음에 IO 를 시작할때 저장해두었던 ioStartTime 을 기준으로 모든 Channel 을 처리하는데 걸린 시간을 ioTime 에 저장한다.
(걸린시간 * non IO 처리 비율) / IO 처리 비율 로 Non I/O 처리 timeout 시간을 구한다.
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
ioRatio 에 의해 결정된 Non I/O 시간을 timedout 파라미터로 받아 해당 시간 만큼 Non I/O 테스크를 진행한다.
try {
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
taskQueue 에서 정해진 daedline 만큼 task 를 처리한 후에 NinEventLoop.java 의 run() 함수의 반복문을 다시 돌게된다.
이때 수행할 task 가 없으면 strategy == -1 이 리턴되어 정해진 시간만큼 Channel 의 Event 를 기다리는 kevent() 에서 Channel 의 Event 를 기다린다.
번외) taskQueue 에는 어떤 Task 가 들어가는지.
Channel 에 read, write, connection 와 같은 operation 들이 taskQueue 로 들어가게된다.
하지만 하당 Task 가 EventLoop Thread 에 서 발생한다면 taskQueue 로 들어가지 않고 즉시 실행된다.
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
if (!safeExecute(executor, task, promise, m, !flush)) {
// We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
// See https://github.com/netty/netty/issues/8343.
task.cancel();
}
}
하지만 아래와 같이 새로운 Thread 가 Netty operation 을 수행한다면 TaskQueue 로 들어가 EventLoop Thread 가 Non I/O 를 처리하는 시점 runAllTasks() 에 처리된다.
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Executors.newFixedThreadPool(3).execute(() -> {
ctx.writeAndFlush(Unpooled.copiedBuffer("Woongs netty client!", CharsetUtil.UTF_8));
});
}