티스토리 뷰
MacOS 를 사용하여 Netty 의 NioEventLoop.java 가 사용되었습니다.
WebClient -> Netty 로 전달되는 과정에 중점
WebClient 를 호출한 MainThread 에서 실행되는 영역
static {
this.webClient = WebClient.builder().baseUrl("http://127.0.0.1:8090")
.clientConnector(new ReactorClientHttpConnector(HttpClient.create().compress(true)))
.build();
}
@PostConstruct
public void webClientTest() throws InterruptedException {
while(true) {
Thread.sleep(30000);
webClient.post()
.uri("/hello")
.bodyValue("Woongs Test")
.exchange()
.flatMap(clientResponse -> {
return clientResponse.bodyToMono(String.class);
})
.doOnNext(System.out::println)
.subscribe();
}
}
@PostConstruct 를 사용하여 주기적으로 WebClient 가 호출되도록 테스트 코드 작성.
localhost 의 8090 포트로 post 요청을 30초마다 보내게된다.
PooledConnectionProvider.java
@Override
public void onNext(PooledRef<PooledConnection> value) {
pooledRef = value;
PooledConnection pooledConnection = value.poolable();
pooledConnection.pooledRef = pooledRef;
Channel c = pooledConnection.channel;
if (c.eventLoop().inEventLoop()) {
run();
}
else {
c.eventLoop()
.execute(this);
}
}
Channel 을 관리하는 PooledConnectionProvier 가 event 를 전달받어 onNext() 가 호출됨.
value 는 현재 처리할 PooledConnection 이 담겨있으며 value.channel 에 connection 맺은 Channel 이 들어있음
@PostConsturct 에서 WebClient 를 실행하였으므로 WebClient 를 호출한것은 Main 쓰레드이다.
따라서 Else 문으로 전달된다.
Channel 에는 해당 Channel 의 Netty NioEventLoop 를 변수로 가지고 있어 NioEventLoop 로 Post 요청을 Execute() 한다.
SingleThreadEventExecutor.java (NioEventLoop.java)
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
...
}
execute() 내부에는 addTask() 를 호출하고 있다.
protected void addTask(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
addTask() 함수를 호출하여 실행되어야 하는 POST 요청을 NioEventLoop 의 taskQueue 에 저장한다.
여기까지 실행되면 WebClient 를 호출한 MainThread 의 역할은 종료되는것을 알 수 있다.
그렇다면 add 된 Task 는 어디에서 처리가 되는것인지 확인해야 한다.
NioEventLoop 영역
아래 분석에서 알 수 있듯이 NioEventLoop 는 Channel 에서 발생한 IO Event 뿐만 아니라 Task Queue 의 Task 도 처리하는 역할을 가지고 있다.
https://woooongs.tistory.com/73
NioEventLoop.java
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
NioEventLoop 의 run() 메서드는 무한 루프를 돌면서 IO Event 와 Task Queue 의 테스크들을 처리한다.
runAllTasks() 의 Break Point 를 잡은 후에 MainThread 에서 WebClient 를 호출하면 addTask() 이후에 NioEventLoop 의 runAllTasks() 에 잡히는 것을 확인할 수 있다.
이때 Task 를 처리하는 것은 MainThread 가 아니라 NioEventLoop 쓰레드이다.
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
safeExecute() 함수에 의해서 Task 의 run() 메서드가 실행된다.
PooledConnectionProvider.java
ChannelOperations<?, ?> ops = opsFactory.create(pooledConnection, pooledConnection, null);
if (ops != null) {
ops.bind();
// First send a notification that the connection is ready and then change the state
// In case a cancellation was received, ChannelOperations will be disposed
// and there will be no subscription to the I/O handler at all.
// https://github.com/reactor/reactor-netty/issues/1165
sink.success(ops);
obs.onStateChange(ops, State.CONFIGURED);
}
run() 메서드에서 obs.onStateChange() 함수가 호출되면 실제 ByteBuf 에 Write 하기 위한 코드들이 실행된다.
Observer 를 통해서 상태 변경을 감지한 HttpIOHandlerObserver.java 가 상태 변경을 전달 받아 reactor component 들에게 전달한다.
HttpClientOperation.java 가 mapper 로 사용되어 HttpOpertions.send() 가 호출되게 된다.
@Override
@SuppressWarnings("unchecked")
public NettyOutbound send(Publisher<? extends ByteBuf> source) {
if (!channel().isActive()) {
return then(Mono.error(AbortedException.beforeSend()));
}
if (source instanceof Mono) {
return new PostHeadersNettyOutbound(((Mono<ByteBuf>)source)
.flatMap(msg -> {
if (markSentHeaderAndBody(msg)) {
try {
afterMarkSentHeaders();
}
catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
return Mono.error(e);
}
if (HttpUtil.getContentLength(outboundHttpMessage(), -1) == 0) {
log.debug(format(channel(), "Dropped HTTP content, " +
"since response has Content-Length: 0 {}"), toPrettyHexDump(msg));
msg.release();
return FutureMono.from(channel().writeAndFlush(newFullBodyMessage(Unpooled.EMPTY_BUFFER)));
}
return FutureMono.from(channel().writeAndFlush(newFullBodyMessage(msg)));
}
return FutureMono.from(channel().writeAndFlush(msg));
})
.doOnDiscard(ByteBuf.class, ByteBuf::release), this, null);
}
return super.send(source);
}
channel().writeAndFlust() 함수에 의해서 해당 Channel 에 write 가 실행되게 된다.
HttpClientOperation 의 send 함수 호출 부분은 EncoderHttpMessageWriter.java 에서 MessageWriter 가 만들어질때 이미 결정되어있고 subscribe() 가 호출되고 onNext() 가 호출되었을때 그제서야 실제 콜백인 send() 가 호출되는 것이다.
NioSocketChannel.java
@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
...
switch (nioBufferCnt) {
...
case 1: {
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
...
}
ch.write() 가 호출되어 실제 write 가 실행되며 이 이후에 해당 요청을 받는 서버쪽에서 요청을 수신하게 된다.
SocketChannelImpl.java
var3 = IOUtil.write(this.fd, var1, -1L, nd);
native 메서드를 호출하여 write.
만약 WebClient 를 호출한 쓰레드가 MainThread 가 아니라 NioEventLoop 쓰레드라면?
@Override
public ChannelHandlerContext flush() {
final AbstractChannelHandlerContext next = findContextOutbound(MASK_FLUSH);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeFlush();
} else {
Tasks tasks = next.invokeTasks;
if (tasks == null) {
next.invokeTasks = tasks = new Tasks(next);
}
safeExecute(executor, tasks.invokeFlushTask, channel().voidPromise(), null, false);
}
return this;
}
AbstractChannelHandlerContext.java 에서 현재 Thread 가 EventLoop 쓰레드 이므로 TaskQueue 에 넣지 않고 즉시 실행한다.
- Total
- Today
- Yesterday
- AbstractMethodError
- dynamodb
- ResultSet
- custom config data convertion
- rate limit
- wait()
- router
- MariaDB
- RouteDefinition
- reative
- DyanomoDB
- reactor
- N+1
- mariadb-connector-j
- ConcurrentHashMap
- notifyAll()
- referencedColumnName
- getBoolean
- spring cloud gateway
- RoutePredication
- Seperate Chaining
- aurora
- msyql-connector-java
- circurit breaker
- GlobalFilter
- Flux
- mariada-connector
- Lazy
- HashMap
- notify()
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |