티스토리 뷰

Netty 의 NioEventLoop.java 코드 중 핵심이 되는 run() 메서드 분석

MacOS 를 사용중이기 때문에 KqueueSelectorImpl.java 가 사용되었다.

EventLoop.java 는 두가지를 처리한다.

  1. I/O : Channel 로 부터 발생한 Event 를 받아 I/O 
  2. Non I/O : Task Queue 에 들어있는 Task 처리.

NioEventLoop 생성 

   static {
        DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
                "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));

        if (logger.isDebugEnabled()) {
            logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
        }
    }
    
    ....
    
    protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
        super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
    }
    
    
     protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                            EventExecutorChooserFactory chooserFactory, Object... args) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
        }

        if (executor == null) {
            executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
        }

        children = new EventExecutor[nThreads];

        for (int i = 0; i < nThreads; i ++) {
            boolean success = false;
            try {
                children[i] = newChild(executor, args);

Netty 의 EventLoop 는 EventLoopGroup 에 여러개가 포함되어있는 구조이다. 따라서 NioEventLoopGroup 에서 NioEventLoop 를 지정한 갯수만큼 생성하고 있다.

 

public final class NioEventLoop extends SingleThreadEventLoop {

NioEventLoop.java 는 SingleThreadEventLoop.java 를 상속받고 있다. 따라서 SingleThreadEventExecutor.java 의 execute() 함수에 의해서 Single Thread 로 run() 이 실행된다.

 

    @Override
    protected void run() {
        int selectCnt = 0;
        for (;;) {

run() 이 실행되면 shutdown 시그널을 받기 전까지 for(;;) 로 무한 loop 를 돌게 된다.

 

           try {
                int strategy;
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());

strategy 는 다음 단계에서 어떤 프로세스로 실행이 되어야 하는지를 결정한다. 

strategy 는 아래 세가지 종류가 있다.

  • SELECT : Blocking IO 가 실행되어야 할때. 즉 실행해야할 Task 가 없어서 대기 상태로 들어가야 한다.
  • CONTINUE : IO loop 가 Blokcing IO 를 실행하지 않고 다시 실행되어야 할때
  • BUSY_WAIT : Blocking 없이 새로운 Event 를 polling

strategy 가 0 이상이면 처리해야할 Task 가 있음을 의미한다.

처리해야할 task 가 존재하여 strategy 가 1 이 리턴

    @Override
    public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
        return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
    }

calculateStrategy() 내부 구현을 보면 hasTasks 가 true 인 경우에는 전달받은 selectSupplier (KqueueEventLoop 의 Supplier) 를 호출해서 event count 를 얻어오게 된다. 

hasTasks 가 false 인 경우는 처리할 Task 가 없으므로 Blocking 상태로 event 를 기다린다.

 

                selectCnt++;
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                boolean ranTasks;

ioRatio 는 EventLoop Thread 가 I/O 처리에 소비할 시간과 non I/O 처리에 소비할 시간의 비율을 정한다.

default 는 50 이며 0 부터 100 까지의 숫자를 세팅할 수 있다. 100 으로 설정된 경우에 이 기능이 disable 된다.

 

                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();
                        }
                    } finally {
                        // Ensure we always run tasks.
                        ranTasks = runAllTasks();
                    }

따라서 ioRatio == 100 인 경우에 별도의 처리 없이 바로 task 를 처리한다.

 

strategy = 1 이고 ioRatio = 50 으로 세팅되어있으므로 아래 로직을 타게 된다.

                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {

IO 가 실행되므로 ioStartTime 을 기록하고 시작한다.

 

processSelectedKeys() 가 호출되며 Select 된 Channel 이 실행된다.

    private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC'ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC'ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        }
    }

select 된 key 사이즈만큼 loop 를 돌면서 모든 Channel 을 처리하게 된다.

 

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            ...
            return;
        }

        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }

            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }

Select 된 key 내부에는 Channel 객체가 존재한다.

 

해당 key 의 ReadyOps 가 실행되어야할 operation 을 의미한다.

ReadyOPs 에는 아래와 같이 4가지 종류가 존재한다.

  • OP_READ : read 명령
  • OP_WRITE : write 명령
  • OP_CONNECT : sokcet connect 명령
  • OP_ACCEPT : socket accept 명령

curl 로 netty 서버로 요청을 보냈으므로 server 입장에서는 OP_READ 명령을 받음

각 명령에 해당하는 액션을 Channel 을 통해서 처리하게 된다. 

read() 의 경우에는 readBuf 에서 데이터를 읽어온다.

 

                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }

read 의 경우에 Channel 에 pipeline.fireChannelRead() 를 호출하여 pipeLine 에서 실행되어야 하는 각 이벤트별 핸들러를 호출해준다.

 

이렇게 처리해야하는 Channel 를 모두 처리하게 되면 다시 아래 코드로 돌아와서 ioRatio 에 따라 IO 를 더 처리할지 말지 를 결정하게 된다.

                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }

아까 처음에 IO 를 시작할때 저장해두었던 ioStartTime 을 기준으로 모든 Channel 을 처리하는데 걸린 시간을 ioTime 에 저장한다.

(걸린시간 * non IO 처리 비율) / IO 처리 비율 로 Non I/O 처리 timeout 시간을 구한다.

 

    protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

ioRatio 에 의해 결정된 Non I/O 시간을 timedout 파라미터로 받아 해당 시간 만큼 Non I/O 테스크를 진행한다.

 

                        try {
                            if (!hasTasks()) {
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {

taskQueue 에서 정해진 daedline 만큼 task 를 처리한 후에 NinEventLoop.java 의 run() 함수의 반복문을 다시 돌게된다.

이때 수행할 task 가 없으면 strategy == -1 이 리턴되어 정해진 시간만큼 Channel 의 Event 를 기다리는 kevent() 에서 Channel 의 Event 를 기다린다.

 

 

번외) taskQueue 에는 어떤 Task 가 들어가는지.

Channel 에 read, write, connection 와 같은 operation 들이 taskQueue 로 들어가게된다.

하지만 하당 Task 가 EventLoop Thread 에 서 발생한다면 taskQueue 로 들어가지 않고 즉시 실행된다.

        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
            if (!safeExecute(executor, task, promise, m, !flush)) {
                // We failed to submit the WriteTask. We need to cancel it so we decrement the pending bytes
                // and put it back in the Recycler for re-use later.
                //
                // See https://github.com/netty/netty/issues/8343.
                task.cancel();
            }
        }

하지만 아래와 같이 새로운 Thread 가 Netty operation 을 수행한다면 TaskQueue 로 들어가 EventLoop Thread 가 Non I/O 를 처리하는 시점 runAllTasks() 에 처리된다.

	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		Executors.newFixedThreadPool(3).execute(() -> {
			ctx.writeAndFlush(Unpooled.copiedBuffer("Woongs netty client!", CharsetUtil.UTF_8));
		});
	}

'JAVA > Netty' 카테고리의 다른 글

Netty 간단한 Client 코드 작성  (1) 2021.07.11
Netty 간단한 서버 구축  (0) 2021.07.11
Netty 개념과 아키텍쳐  (0) 2021.07.11
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함