Reactor Context 예제
Reactor 는 이벤트를 처리하는 Thread 가 publishOn 혹은 subscribeOn 과 같은 함수로 인해서 계속 바뀔 수 있다.
이로 인해서 Transaction, Logback 과 같이 ThreadLocal 을 이용하여 데이터를 전달하는 기능들에 한계가 발생하였다.
이것을 해결하기위하여 Reactive Sequence 상에서 공유되는 데이터를 만들 수 있는데 이 기능이 Context 이다.
Context 를 사용하기 전에 확인해보고 싶은 두가지 궁금한점이 있었다.
- Reactive Stream 을 처리하는 쓰레드가 변경되어도 Context 는 변하지 않는지.
- Reactive Stream 을 처리하는 과정에서 Context 의 데이터가 변경되면 하위 Stream 에 영향이 있는지.
Mono.just(0)
.publishOn(Schedulers.newSingle("PUBLISHED_THREAD")) --- 1
.flatMap((t) -> Mono.subscriberContext().map(ctx -> ctx.get(0))) --- 2
.doOnNext((result) -> System.out.println(Thread.currentThread().getName() + " : " + result)) --- 3
.publishOn(Schedulers.newSingle("PUBLISHED_THREAD")) --- 4
.flatMap((t) -> Mono.subscriberContext().map(ctx -> ctx.get(0))) --- 5
.doOnNext((result) -> System.out.println(Thread.currentThread().getName() + " : " + result)) --- 6
.flatMap((t) -> Mono.subscriberContext().map(ctx -> ctx.put(0, "WOONGS"))) --- 7
.flatMap((t) -> Mono.subscriberContext().map(ctx -> ctx.get(0))) --- 8
.doOnNext((result) -> System.out.println(Thread.currentThread().getName() + " : " + result)) --- 9
.subscriberContext(ctx -> ctx.put(0, "DAMIAN")) --- 0
.subscribe();
호출되는 순서대로 숫자를 붙였다.
0. Reactive Stream 이 구독되기 전에 Context 를 만들었고 Key : 0 , Value : DAMIAN 값을 저장.
1. Event 가 발행되면서 PUBLISHED_THREAD-1 라는 이름을 가진 쓰레드를 생성하여 하위 Stream 은 PUBLISHED_THREAD-1 에서 처리.
2. Context 에서 Key : 0 인 데이터의 Value 를 하위 Stream 으로 전달
3. 현재 실행되는 쓰레드의 이름과 상위 Stream 에서 전달받은 Value 를 출력
4. PUBLISHED_THREAD-2 라는 이름을 가지는 쓰레드를 새로 생성하여 하위 Stream 은 PUBLISEHD_THREAD-2 에서 처리.
5. Context 에서 Key : 0 인 데이터의 Value 를 하위 Stream 으로 전달
6. 현재 실행되는 쓰레드의 이름과 상위 Stream 에서 전달받은 Value 를 출력
7. Context 의 Key : 0 에 Value 를 WOONGS 로 변경
8. Context 에서 Key : 0 인 데이터의 Value 를 하위 Stream 으로 전달
9. 현재 실행되는 쓰레드의 이름과 상위 Stream 에서 전달받은 Value 를 출력
결과
PUBLISHED_THREAD-1 : DAMIAN --- 3
PUBLISHED_THREAD-2 : DAMIAN --- 6
PUBLISHED_THREAD-2 : DAMIAN --- 9
위 코드는 Main 쓰레드에서 실행되었고 subScribeOn 이 호출되지 않았기 때문에 subscriberContext 는 Main 쓰레드에서 실행되었을 것이다. 후에 publishOn 이 호출되면서 PUBLISHED_THREAD-1 에서 Reactive Stream 처리가 되기 시작하였다. 3번의 출력 결과를 보면 Main 쓰레드에서 생성한 Context 의 결과가 출력이 되었으므로 Reactive Stream 을 처리하는 쓰레드가 변경되어도 Context 는 전달 되는 것을 확인할 수 있었다.
6번 결과를 보면 한번 더 쓰레드가 변경되었지만 Context 에 데이터가 잘 전달 되었다.
9번 결과는 8번 라인에서 Context 의 데이터를 변경하였으나 하위 Stream 에서는 변경된 데이터가 아닌 0번 라인에서 저장된 데이터가 전달되었다. 이 결과로 보아 subscribe() 이전에 생성된 subscriberContext 에 대해서 subscribe() 가 호출된 이후에 데이터를 변경하더라도 하위 Stream 에서는 영향을 받지 않는것을 알 수 있다.