티스토리 뷰

JAVA/Reactor

Reactor Context 예제

소농배 2021. 4. 26. 15:19

Reactor 는 이벤트를 처리하는 Thread 가 publishOn 혹은 subscribeOn 과 같은 함수로 인해서 계속 바뀔 수 있다.

이로 인해서 Transaction, Logback 과 같이 ThreadLocal 을 이용하여 데이터를 전달하는 기능들에 한계가 발생하였다.

 

이것을 해결하기위하여 Reactive Sequence 상에서 공유되는 데이터를 만들 수 있는데 이 기능이 Context 이다.

 

Context 를 사용하기 전에 확인해보고 싶은 두가지 궁금한점이 있었다.

  1. Reactive Stream 을 처리하는 쓰레드가 변경되어도 Context 는 변하지 않는지.
  2. Reactive Stream 을 처리하는 과정에서 Context 의 데이터가 변경되면 하위 Stream 에 영향이 있는지.
Mono.just(0)
	.publishOn(Schedulers.newSingle("PUBLISHED_THREAD")) --- 1
    .flatMap((t) -> Mono.subscriberContext().map(ctx -> ctx.get(0))) --- 2
    .doOnNext((result) -> System.out.println(Thread.currentThread().getName() + " : " + result)) --- 3
    .publishOn(Schedulers.newSingle("PUBLISHED_THREAD")) --- 4
    .flatMap((t) -> Mono.subscriberContext().map(ctx -> ctx.get(0))) --- 5
    .doOnNext((result) -> System.out.println(Thread.currentThread().getName() + " : " + result)) --- 6
    .flatMap((t) -> Mono.subscriberContext().map(ctx -> ctx.put(0, "WOONGS"))) --- 7
    .flatMap((t) -> Mono.subscriberContext().map(ctx -> ctx.get(0))) --- 8
    .doOnNext((result) -> System.out.println(Thread.currentThread().getName() + " : " + result)) --- 9
    .subscriberContext(ctx -> ctx.put(0, "DAMIAN")) --- 0
    .subscribe();

호출되는 순서대로 숫자를 붙였다.

 

0. Reactive Stream 이 구독되기 전에 Context 를 만들었고 Key : 0 , Value : DAMIAN 값을 저장.

1. Event 가 발행되면서 PUBLISHED_THREAD-1 라는 이름을 가진 쓰레드를 생성하여 하위 Stream 은 PUBLISHED_THREAD-1 에서 처리.

2. Context 에서 Key : 0 인 데이터의 Value 를 하위 Stream 으로 전달

3. 현재 실행되는 쓰레드의 이름과 상위 Stream 에서 전달받은 Value 를 출력

4. PUBLISHED_THREAD-2 라는 이름을 가지는 쓰레드를 새로 생성하여 하위 Stream 은 PUBLISEHD_THREAD-2 에서 처리.

5. Context 에서 Key : 0 인 데이터의 Value 를 하위 Stream 으로 전달

6. 현재 실행되는 쓰레드의 이름과 상위 Stream 에서 전달받은 Value 를 출력

7. Context 의 Key : 0 에 Value 를 WOONGS 로 변경

8. Context 에서 Key : 0 인 데이터의 Value 를 하위 Stream 으로 전달

9. 현재 실행되는 쓰레드의 이름과 상위 Stream 에서 전달받은 Value 를 출력

 

결과

PUBLISHED_THREAD-1 : DAMIAN --- 3
PUBLISHED_THREAD-2 : DAMIAN --- 6
PUBLISHED_THREAD-2 : DAMIAN --- 9

 

위 코드는 Main 쓰레드에서 실행되었고 subScribeOn 이 호출되지 않았기 때문에 subscriberContext 는 Main 쓰레드에서 실행되었을 것이다. 후에 publishOn 이 호출되면서 PUBLISHED_THREAD-1 에서 Reactive Stream 처리가 되기 시작하였다. 3번의 출력 결과를 보면 Main 쓰레드에서 생성한 Context 의 결과가 출력이 되었으므로 Reactive Stream 을 처리하는 쓰레드가 변경되어도 Context 는 전달 되는 것을 확인할 수 있었다.

 

6번 결과를 보면 한번 더 쓰레드가 변경되었지만 Context 에 데이터가 잘 전달 되었다.

 

9번 결과는 8번 라인에서 Context 의 데이터를 변경하였으나 하위 Stream 에서는 변경된 데이터가 아닌 0번 라인에서 저장된 데이터가 전달되었다. 이 결과로 보아 subscribe() 이전에 생성된 subscriberContext 에 대해서 subscribe() 가 호출된 이후에 데이터를 변경하더라도 하위 Stream 에서는 영향을 받지 않는것을 알 수 있다.

'JAVA > Reactor' 카테고리의 다른 글

Reactor reference 문서  (0) 2021.04.11
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함