티스토리 뷰

JAVA/Reactor

Reactor reference 문서

소농배 2021. 4. 11. 20:29

1. About the Documentation

이 문서는 리액터 공식 문서를 번역한 것이다.

출처 : projectreactor.io/docs/core/release/reference/index.html

2. Getting Started

2.1 Reactor 소개

 Reactor 는 JVM 기반에 backpressure 라고 불리는 demand management 효율적으로 관리하는 완전한 Non-blocking 리액티브 프로그래밍을 제공한다. Reactor 는 Java 8 의 함수지향 API 들과 연동하며 특히 CompletableFuture, Stream, Duration 을 사용한다. Reactor 는 비동기 Sequence 인 FluxMono 를 제공하며 Reactive Streams 의 특징을 널리 구현한다.

 

 Reactor 는 또한 reactor-netty 와 함께 non-blocking IPC ( inter process communction - 프로세스간 통신 ) 을 지원한다. MSA 에 적합한 Reactor Netty 는 HTTP, TCP, UDP 를 위한 backpressure-ready 네트워크 엔진을 제공한다. 리액티브 인코딩과 디코딩을 완전히 지원한다.

 

2.2 필수 조건

 Reactor core 는 Java8 이상부터 동작하며 org.reactivestreams:reactive-streams:1.0.3 의존성을 갖는다.

 

2.3 BOM 과 버전 정책 

 Reactor 는 BOM (Bill of Materials) 모델을 사용한다. 이 큐레이팅 된 목록은 함께 잘 작동하는 아티팩트들을 그룹화하여 잠재적으로 다른 버전 지정 체계에도 관련 버전을 제공한다.

 

 3.3.x 와 3.4.x 사이에 버전 정책이 변경되었다 (Dysprosium and Europium)

 

2.4 Reactor 받기

 Rector 를 사용하는 가장 쉬운 방법은 BOM 을 사용하여 관련 의존성을 프로젝트에 작성하는 것이다. 이런 의존성을 추가할때 BOM 으로부터 선택되는 버전을 사용해야 하므로 의존성에 버전을 포함해서는 안된다.

 

 그러나 특정 버전의 아티팩트를 사용하고자 한다면 의존성을 명시할때 버전 정보도 추가해주면 된다. 

 

2.4.1 Maven 설치

 메이븐은 BOM 개념을 지원한다. 아래 pom.xml 을 추가함으로서 BOM 을 추가할 수 있다.

<dependencyManagement> 
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2020.0.5</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

만약 dependencyManagement 가 이미 pom.xml 에 포함되어있다면 제외 해당 부분은 제외하면 된다.

 

다음으로 버전 정보를 제외한 의존성을 추가해준다.

<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId> 
        
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId> 
        <scope>test</scope>
    </dependency>
</dependencies>

 

 2.4.2 Gradle 설치

 Gradle 5.0 이전 버전은 메이븐 BOM 을 지원하지 않는다. 그러나 Spring gradle-dependency-management 플러그인을 사용할 수 있다.

 

첫번째로 플러그인을 적용한다.

plugins {
    id "io.spring.dependency-management" version "1.0.7.RELEASE" 
}

 두번째로 BOM 을 추가한다.

dependencyManagement {
     imports {
          mavenBom "io.projectreactor:reactor-bom:2020.0.5"
     }
}

 

마지막으로 버전 정보 없이 의존성을 추가해준다.

dependencies {
     implementation 'io.projectreactor:reactor-core' 
}

Gradle 5.0 버전 부터는 BOM 을 제공하는 네이티브 Gradle 을 사용한다.

dependencies {
     implementation platform('io.projectreactor:reactor-bom:2020.0.5')
     implementation 'io.projectreactor:reactor-core' 
}

 

 

3. 리액티브 프로그래밍 소개

 Rector 는 리액티브 프로그래밍 페러다임을 구현한 것이다. 

리액티브 프로그래밍은 데이터 스트림과 변경의 전파와 관련된 비동기 프로그밍 페러다임이다. 리액티브 프로그래밍은 고정된 또는 변경되는 데이터 스트림을 프로그래밍 언어를 통하여 쉽게 전달할 수 있도록 한다.

 리액티브 프로그래밍의 첫번째 단계로 마이크로소프트는 Reactive Extensions (RX) 라이브러리를 .NET 생태계에서 만들었다. 그 후에 JVM 을 위한 RxJava 를 구현하였다. 시간이 지나면서 Reactive Stream 을 위한 노력이 있었고 JVM 을 위한 Reactive 라이브러리에 interface 와 규칙들이 Java9 에 Flox 클래스에 정의되었다.

 

 리액티브 프로그래밍 패러다임은 종종 객체지향 프로그래밍에서 옵저버 디자인 패턴을 확장한 것이라고 표현된다. 리액티브 스트림 패턴은 우리에게 친숙한 iterator 디자인 패턴과 비교될 수 있다. Iterator 패턴과 리액티브 스트림 패턴의 가장 큰 차이는 Iterator 패턴은 풀 기반이라면 reative stream 은 푸쉬 기반이다.

 

비록 value 에 접근하는 함수는 오로지 Iterable 의 책임이라고 하더라도 Iterator 를 사용하는 것은 반드시 필요한 프로그래밍 패턴이다. 사실 Iterator 시퀀스에서 next() 값에 접근하도록 선택하는 것은 개발자의 몫이다. reactive stream 에서 위와 일맥상통하는 것은 Publisher-Subscriber 이다. 그러나 Subscriber 에게 새로운 값이 도착하여 사용가능하다고 알리는 것은 Publisher 이다.  이 행위가 REACTIVE 하게 동작할 수 있는 키이다. 또한 푸쉬된 값에 적용되는 연산은 명령형이 아닌 선언적으로 표현된다. 개발자가 데이터 흐름을 제어하는것이 아니라 계산 로직을 전달한다. 

 

 값을 푸쉬하는것 외에도 에러를 다루는것과 처리를 완료하는 것은 구현된 기능으로 해결할 수 있다. Publisher 는 새로운 값을 onNext()

를 호출함으로서 Subscriber 에게 푸쉬할 수 있다. 그러나 onError() 를 호출하여 에러가 발생했다는 것의 신호가 될 수 있으며 onCmoplete() 를 호출함으로서 완료되었다는 신호가 될 수도 있다. 에러와 완료 둘다 시퀀스를 종료한다. 

onNext x 0..N [onError | onComplete]

이 접근은 매우 유연하다. 이 패턴은 값이 없는 것, 하나의 값, 여러 값들의 경우들을 지원한다. ( 시계의 초침이 움직이는 것 처럼 무한한 시퀀스 또한 처리할 수 있다. )

 

 그렇다면 비동기 리액티브 라이브러리가 필요한 이유가 무엇일까?

 

3.1. 블로킹(blocking)은 낭비일 수 있다.

 현대의 어플리케이션들은 동시에 굉장히 많은 유저를 소화해야할 수 있으며 비록 현대의 하드웨어가 계속해서 발전하고 있지만 현대 소프투웨어의 성능은 여전히 중요한 개념이다. 

 

 프로그램의 성능을 향상시킬 수 있는 방법이 두 가지 있다.

  • 더 많은 쓰레드와 하드웨어 자원을 사용하기 위한 병렬성
  • 현재 사용하는 자원을 더 잘 사용하기 위한 효율성

 보통 자바 개발자들은 블로킹(blocking) 코드를 사용함으로서 프로그램을 개발한다. 이 관습은 성능 병목 현상이 발생하기 전까지는 괜찮다. 그 후에 앞서 개발된 블로킹 코드와 비슷한 동작을 하는 추가적인 쓰레드를 도입한다고 가정해보자. 그러나 이렇게 자원을 활용한 것은 경합과 동시성 문제를 야기할 수 있다. 

 

여전히 블로킹은 자원을 낭비한다. 자세히 들여다보면 DB 요청이나 네트워크 호출과 같은 I/O 를 실행하는 프로그램은 데이터를 기다리느라 이러한 쓰레드들로 인해 자원이 낭비되고 있다.

 

 그렇기 때문에 병렬성은 묘책이 아니다. 하드웨어의 최대 성능을 사용하도록 하지만 추론하기 힘들며 리소스 낭비에 취약하다.

 

3.2 비동기의 구제

 앞서 언급한 두번째 접근법인 효율성은 자원 낭비의 해결책이 될 수 있다. 논블로킹 비동기 코드를 작성함으로서 현재 사용하고 있는 리소스가 다른 활성화된 테스크를 저리하도록 했다가 비동기 처리가 끝난 후에 현재 프로새스로 돌아오도록 할 수 있다.

 

 그러나 JVM 에서 비동기 코드를 어떻게 작성할 수 있을까? 자바는 비동기 프로그래밍에 두가지 모델을 제공한다.

  • Callbacks : 비동기 함수는 반환값이 없지만 callback 파라미터를 전달 받는다. 이 callback 파라미터는 결과가 사용가능할때 호출된다. 잘 알려진 예제로는 Swing 의 EventListerner 가 있다.
  • Futures : 비동기 함수는 즉시 Future<T> 를 반환한다. 비동기 프로세스는 T 값을 계산하지만 Future 객체가 이것에 접근하는 것을 감싸고 있다. 해당 값은 즉시 사용가능하지 않으며 Future 는 값이 유효할때까지 폴링될 수 있다. 예를들어 ExecutorService 는 Callabe<T> 테스크를 Future 객체를 사용하여 실행한다.

 위 두가지 기술로 충분할까? 몇가지 경우에 위 두가지 방법은 한계가 있다.

 

 Callback 은 콜백지옥이라 불리는 유지보수와 가독성에 큰 문제가 있다.

 

 아래 예제는 사용자에게 5가지 Favorite 을 노출한다. 만약 Favorite 없다면 추천한다. 이 프로그램은 세가지 서비스를 제공한다. 

Favorite 아이디를 제공, Favorite 상세 정보를 불러온다. 추천 항목을 상세 정보와 함께 제공

userService.getFavorites(userId, new Callback<List<String>>() { -- 1
  public void onSuccess(List<String> list) { -- 2
    if (list.isEmpty()) { -- 3
      suggestionService.getSuggestions(new Callback<List<Favorite>>() {
        public void onSuccess(List<Favorite> list) { -- 4
          UiUtils.submitOnUiThread(() -> { -- 5
            list.stream()
                .limit(5)
                .forEach(uiList::show); -- 6
            });
        }

        public void onError(Throwable error) { -- 7
          UiUtils.errorPopup(error);
        }
      });
    } else {
      list.stream() -- 8
          .limit(5)
          .forEach(favId -> favoriteService.getDetails(favId,  -- 9
            new Callback<Favorite>() {
              public void onSuccess(Favorite details) {
                UiUtils.submitOnUiThread(() -> uiList.show(details));
              }

              public void onError(Throwable error) {
                UiUtils.errorPopup(error);
              }
            }
          ));
    }
  }

  public void onError(Throwable error) {
    UiUtils.errorPopup(error);
  }
});
  1. 콜백 기반의 서비스이다. 비동기 프로세스가 성공하거나 실패할 경우에 실행된다.
  2. 선호하는 ID 를 가져오는데 성공했을때 실행된다.
  3. 만약 ID 가 비어있다면 suggestionService 가 호출되어야 한다.
  4. suggestionService 가 두번째 콜백에게 선호 ID 리스트 전달.
  5. UI 를 다루어야 하기 때문에 consuming 코드가 UI thread 에서 실행되고 있는지 확신해야한다. 
  6. 5개 까지만 노출하기 위하여 Java8 의 stream 을 사용하여 화면에 노출
  7. 각각의 단계에 모두 동일한 에러 핸들링 코드가 적용되었다.
  8. ID 레벨의 단게로 돌아가서 서비스가 전체 리스트를 반환하였다면 favoriteService 를 호출하여 Favorite 상세 객체들을 불러온다. 5개까지만 필요하므로 5개로 제한한다.
  9. UI thread 를 통하여 UI 로 데이터를 전달하기 위한 반복되는 코드이다.

 코드양이 너무 많고 이해하기 힘들며 반복되는 코드가 너무 많다. Rector 로 동일한 기능을 하는 코드를 살펴보자

userService.getFavorites(userId) -- 1
           .flatMap(favoriteService::getDetails) -- 2
           .switchIfEmpty(suggestionService.getSuggestions()) -- 3
           .take(5) -- 4
           .publishOn(UiUtils.uiThreadScheduler()) -- 5
           .subscribe(uiList::show, UiUtils::errorPopup); -- 6
  1. favorite ID 를 가져오는 것부터 시작한다
  2. 비동기적으로 ID 를 Favorite 객체로 변환한다. 이 과정을 통해 Favorite 의 flow 가 생성된다.
  3. 만약 Favorite 이 비어있다면 suggestionService 를 호출하도록 한다.
  4. 결과의 5개 요소만 필요하므로 5개만 갖도록 한다.
  5. 마지막으로 UI 를 처리하는 Thread 에서 실행되기를 원하므로 데이터를 UI Thread 로 전달한다.
  6. 최종 형태의 데이터로 어떤 작업을 하고자 하는지 혹은 에러는 어떻케 처리할지 명시함으로서 실행한다.

 Favorite ID 를 얻어오기까지 800ms 이내로 수행될 것을 보장하고 싶거나 너무 오래걸린다면 cache 에서 불러오도록 하고자 한다면 어떻게 할 것인가? 콜백 기반 코드에서는 매우 복잡한 작업이 될 것이다. Reactor 에서는 timeout 을 적용함으로서 쉽게 처리할 수 있다.

userService.getFavorites(userId)
           .timeout(Duration.ofMillis(800)) --- 1
           .onErrorResume(cacheService.cachedFavoritesFor(userId)) --- 2
           .flatMap(favoriteService::getDetails) --- 3
           .switchIfEmpty(suggestionService.getSuggestions())
           .take(5)
           .publishOn(UiUtils.uiThreadScheduler())
           .subscribe(uiList::show, UiUtils::errorPopup);
  1. 800ms 이상 아무것도 리턴되지 않으면 error 를 전달한다.
  2. error 가 발생한 경우에 cacheService 를 호출한다.
  3. 이후는 위 예제와 같다.

 Future 는 콜백보다는 조금은 나을 수 있다. 하지만 Java 8 에서의 CompleatableFuture 개선이 있음에도 불구하고 코드 작성에 마냥 좋지만은 않다. 여러개의 Future 객체를 조합하는 것은 쉽지 않다. 또한 Future 는 다른 문제들이 있다.

  • Future 객체의 get() 함수를 호출함으로서 블로킹으로 만들기 쉽다.
  • 지연 실행을 지원하지 않는다.
  • 다수의 값과 에러 핸들링에 대한 지원이 부족하다.

 또 다른 예제를 살펴보자 : 이름과 통계와 ID 를 짝으로 만들고자 하는 리스트를 받았다. 이 작업들은 모두 비동기로 처리되어야 한다. Completable Future 를 이용한 구현이다.

CompletableFuture<List<String>> ids = ifhIds(); --- 1

CompletableFuture<List<String>> result = ids.thenComposeAsync(l -> { --- 2
	Stream<CompletableFuture<String>> zip =
			l.stream().map(i -> { --- 3
				CompletableFuture<String> nameTask = ifhName(i); --- 4
				CompletableFuture<Integer> statTask = ifhStat(i); --- 5

				return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); --- 6
			});
	List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); --- 7
	CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

	CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); --- 8
	return allDone.thenApply(v -> combinationList.stream()
			.map(CompletableFuture::join) --- 9
			.collect(Collectors.toList()));
});

List<String> results = result.join(); --- 10
assertThat(results).contains(
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121");
  1. ID 값을 리턴하는 Future 로 시작한다.
  2. ID 가 리턴되었을때 실행시키고자 하는 비동기 프로세스이다.
  3. 각 ID 들에 대해서
  4. 이름을 추출한다.
  5. 통계를 추출한다.
  6. 각각의 결과를 조합한다.
  7. 모든 조합 작업이 완료된 Future 리스트가 생겼다. 이 것을 실행하기 위하여 List 에서 Array 로 형변환이 필요하다.
  8. CompletableFuture.allOf 로 배열을 전달한다. CompletableFuture.allOf 의 리턴값인 Future 는 모든 작업이 완료된 Future 이다.
  9. join() 을 호출하여 각 Future 의 결과물을 모은다.
  10. 모든 비동기 파이프라인이 트리거되었고 모든 프로세스가 끝나기를 기다린다.

 Reactor 는 더 연산자의 조합이 존재하므로 코드를 간단하게 만들 수 있다.

Flux<String> ids = ifhrIds(); --- 1

Flux<String> combinations =
		ids.flatMap(id -> { --- 2 
			Mono<String> nameTask = ifhrName(id); --- 3 
			Mono<Integer> statTask = ifhrStat(id); --- 4 

			return nameTask.zipWith(statTask, --- 5
					(name, stat) -> "Name " + name + " has stats " + stat);
		});

Mono<List<String>> result = combinations.collectList(); --- 6

List<String> results = result.block(); --- 7
assertThat(results).containsExactly( --- 8
		"Name NameJoe has stats 103",
		"Name NameBart has stats 104",
		"Name NameHenry has stats 105",
		"Name NameNicole has stats 106",
		"Name NameABSLAJNFOAJNFOANFANSF has stats 121"
);
  1. ID 를 가져오는 것으로 비동기 시퀀스가 생성된다.
  2. 시퀀스의 각 요소를 위한 처리 비동기 프로세스이다.
  3. 이름을 추출한다.
  4. 통계를 추출한다.
  5. 두 값을 비동기로 조합한다.
  6. Flux 처리가 완료되면 각 값들을 List 로 합한다.
  7. 운영환경에서는 Flux 가 비동기적으로 작업을 계속하도록 해야 하므로 Mono 를 리턴하도록 했을 것이다. 그러나 테스트를 해야하므로 block() 을 호출하여 결과를 기다린다.
  8. 결과를 확인

 콜백과 Future 를 사용했을때 문제점은 비슷하다 그리고 리액티브 프로그래밍이 Publisher-Subscriber 를 사용하는 이유다.

 

3.3 명령형에서 반응형 프로그래밍으로

 Reactor 와 같은 리액티브 라이브러리는 아래 몇가지 관점에 집중하여 JVM 위에서 동작하는 고전적인 비동기 접근 방식을 돕는다.

  • 결합성과 가독성
  • 다양한 언어 연산자를 통한 데이터를 Flow 로서 처리
  • subscribe() 하기 전에는 어떤 동작도 하지 않는다.
  • Backpressure 또는 Consumer 가 Producer 에게 전달하는 데이터가 너무 많은 경우 Signal 을 전달할 수 있는 장치
  • 동시성에 구애받지 않는 높은 수준의 추상화

3.3.1. 결합성과 가독성

 이전의 비동기 테스크의 결과를 다른 비동기 테스크에 사용하는 것처럼 여러개의 비동기 테스크를 조합하여 사용하는 것을 결합성(Composability)이라고 한다. 그렇지 않으면 여러개의 테스크를 fork-join 스타일을 사용하여 실행할 수 있다. 게다가 비동기 테스크를 하이 레벨 시스템에 있는 별개의 컴포넌트로서 재사용할 수 있다.

 

 테스크들을 조합할 수 있는 것은 가독성과 코드를 유지보수하는것에 매우 관련이 있다. 비동기 프로세스가 코드의 양과 복잡성을 늘리기 때문에 코드를 읽고 작성하는 것을 어렵게 만든다. 위에서 보았듯이, 콜백 모델은 간단하지만, 콜백의 주된 문제점중의 하나는 콜백안에서 콜백을 실행해야하는 경우이다. 이 문제는 콜백 지옥(Callback Hell) 이라고 불린다. 예상할 수 있듯이 이러한 코드는 이전으로 되돌리기 힘들다. 

 

 Reactor 는 모든 테스크가 동일한 레벨에서 수행도리 수 있도록 다양한 구성의 옵션들을 제공한다. 

 

3.3.2. 조립 라인에 비유

 Reactor  의 데이터 처리는 조립 라인과 비유해서 생각할 수 있다. Reactor 는 컨베이너벨트이면서 워크스테이션이다. 가공 전의 물질이 원천 소스로 부터 부어진다. (Publisher) 소비자에게 전달 될 준비가 완료된 완제품으로 전달된다. (Consumer).

 

 원친 물질은 다양한 변환 과정과 중개 단계 또는 중간 단계의 물질들을 조합하는 큰 조립 라인을 통과하게 될 수 있다. 만약 작은 문제가 발생하거나 하나의 과정이 막히게 되면 문제가 발생하는 워크스테이션은 상위 단계로 원천 물질의 Flow 를 제한하도록 신호를 보낼 수 있다.

 

3.3.3. 연산자

 Reactor 에서 연산자는 조립 라인의 워크스테이션이다. 각각의 연산자는 Publisher 에게 행동들을 추가하고 이전 단계의 Publisher 를 새로운 인스턴스로 감싼다. 그래서 모든 과정은 연결되어있고, 첫번째 Publisher 로부터 기원된 데이터는 이 체인들을 거치면서 변경되어 전달된다. 결국 Subscriber 에 의해서 이 과정들이 종료된다. Subscriber 에 의해서 subscribe() 되기 전까지는 어떠한 일도 발생하지 않는다.

 

 Reactor Stream 은 어떠한 구현된 연산자를 제공해주지 않지만 Reactor 와 같은 리액티브 라이브러리는 다양한 언어 연산자들을 제공한다. 이 연산자들은 간단한 변환부터 복잡한 조합 그리고 에러 핸들링까지 제공한다.

 

3.3.3. subscriber() 하기 전까지는 어떠한 일도 발생하지 않는다.

 Reactor 에서 Publisher 체인을 작성하면 기본적으로는 데이터 펌프 작용이 시작하지 않는다. 대신에 당신이 작성한 비동기 프로세스에 대한 추상화된 설명을 생성할 수 있다.

 

 구동하는 행위에 의하여 subscriber 와 publisher 를 묶을 수 있다. 이 행위가 전체 체인의 데이터 흐름을 촉발시킨다. data flow 가 촉진된 것은 Subscriber 의 request 신호가 Upstream 으로 전파되는 것에 성공했다는 것이다.

 

3.3.5. Backpressure

 Upstream 으로의 신호 전파는 또한 backpressure 를 구현하곤 한다. backpressure 는 조립 라인에서 워크스테이션이 upstream 워크스테이션에게 라인을 조금 천천히 보내달라고 피드백을 보내는 것과 같다.

 

 Reactive Stream 이 정의하고 있는 Backpressure 의 실제 메커니즘은 다음 비유와 조금 더 비슷하다. Subscriber 는 unbounded 모드로 실행될 수 있다. 그리고 publisher 에게 보낼 수 있는 데이터는 최대한 빨리 보내달라고 요청할 수 있다. 또는 최대 N 개의 요소 까지 처리할 수 있는 양 만큼만 Publisher 에게 요청하는 Request 메커니즘을 사용할 수 있다.

 

 중간의 연산자 또한 통과하는 request 를 변경할 수 있다. 10개의 값으로 그룹핑되는 buffer 연산자를 생각해보자. 만약 Subscriber 가 한개의 버퍼를 요청 하면 Publisher 에게 10개의 값이 들어있는 한개의 buffer 를 발행할 수 있게 된다. 또한 어떤 연산자는 prefetching 전략을 구현하여, request(1) 왕복을 피하고 요청되기 전에 덜 비용이 드는 유익한 원소를 먼저 발행한다.

 

 이 방법은 push 모델에서 push-pull 하이브리드 보델로 변경하여 downstream 이 수용가능한 원소 갯수인 N 개 만큼만 pull 할 수 있게 해준다. 그러나 요소가 준비되지 않은 경우 생성될때마다 업스트림에 의해 푸쉬된다.

 

3.3.6. Hot vs Cold

 리액트 라이브러리인 Rx 는 두개의 카테고리로 구분될 수 있다 : hot 그리고 cold

이 구분은 subscriber 가 리액트 스트림에 어떻게 반응하는지에 따라서 구분된다.

  •  Cold 시퀀스는 원천 데이터 소스를 포함하여 각각의 Subscriber 의 의해서 시작된다. 예를들어 원천 데이터 소스가 HTTP call 에 의한 것이라면 새로운 HTTP 요청이 각 Subscription 에 의해서 새롭게 만들어진다.
  • Hot 시퀀스는 각각의 Subscriber 에 의해서 시작되지 않는다. 오히려 늦은 Subscriber 가 subscribed 한 후에 방출된 신호를 받는다. 그러나 기억해야한다. 어떤 hot reactive stream 은 전체 혹은 일부가 방출 이력에 의해서 캐시 혹은 다시 실행될 수 있다. 일반적인 관점에서 hot 시퀀스는 어떠한 subscriber 가 기다리고 있지 않다고 하더라도 방출할 수 있다. (subscribe 하기 전에 어떠한 것도 발생하지 않는다의 예외)

4. Reactor core 기능

리액터 프로젝트의 주된 결과물은 reactor-core 이다. 리액티브 라이브러리는 Java8 을 타겟으로 Reactive Stream 에 집중하였다.

 

리액터는 Publisher 를 구현한 반응형 자료형을 소개한다. 그러나 또한 다앙한 연산자를 제공한다. : Flux, Mono

Flux 객체는 0..N 개의 반응형 시퀀스를 나타내는 반면 Mono 객체는 단일 혹은 0개의 결과를 나타낸다

 

이 구분은 자료형의 의미 정보를 전달하여 비동기 처리의 대략적인 갯수를 나타낸다. 예를 들어 HTTP 요청은 하나의 응답만 발행한다. 따라서 count 연산을 통해서 얻을 유의미한 정보는 없다. HTTP 호출의 결과를 Flux<HttpRespons> 가 아닌 Mono<HttpResponse> 로 표현한다는 것은 0개 또는 1개 항목의 컨텍스트와 관련된 연산자만 제공하기 때문입니다.

 

연산자는 최대 원소 갯수를 변경할 수 있습니다. 따라서 Flux 의 count 연산자는 Mono<Long> 을 반환한다.

 

4.1 Flux, 0-N 아이템을 위한 비동기 시퀀스

아래 그림은 Flux 의 아이템 변환 과정을 보여준다.

Flux<T> 는 Publisher<T> 의 규격에 맞춘 구현체이다. Flux 는 0..N 개의 아이템을 비동기 시퀀스로 내보낼 수 있으며 선택적으로 완료 신호 혹은 에러에 의하여 종료할 수 있다. Reactive Stream 규격에 따르면 이 세가지 종류의 시그널은 downstream Subscriber 의 onNext, onComplete, onError 를 호출함으로 변역될 수 있다.

 

 가능한 신호의 넓은 범위와 함께 Flux 는 범용적인 반응형 타입이다. 모든 이벤트, 제거하는 것 조차, 선택적이다. onNext 이벤트에는 없지만 onComplete 이벤트는 빈 유하한 시퀀스를 나타낸다. 그러나 onComplete 을 제거하면 무한한 빈 시퀀스를 만들 수 있다. 비슷하게 무한한 시퀀스는 빈 필요하지 않다. 예를들어 Flux.interval(Duration) 은 시간이 흐름에 따라 Flux<Long> 을 무한하게 발행한다.

 

4.2. Mono

아래 이미지는 Mono 의 아이템 변환 과정을 보여준다.

Mono<T> 는 onNext 시그널을 통해 하나의 아이템을 보내고나서 omComplete 신호를 통해 종료 또는 onError 신호를 전달할 수 있는 특별한 Publisher<T> 구현체이다.

 

대부분의 Mono 구현체는 Subscriber onNext() 를 호출한 후에 가 onComplete 을 즉시 호출할 것을 기대한다. Mono.n ever() 는 논외다. 이것은 어떠한 신호도 내보내지 않는다. 이 행위는 기술적으로 금지되었다. 반면에 onNext 와 onError 는 명시적으로 혼용이 금지되었다.

 

Mono 는 Flux 에서 사용 가능한 연산자의 일부들을 제공한다. 그리도 어떤 연산자는 Mono 를 Flux 로 변환한다. ( 이 연산자는 Mono 를 또 다른 Publisher 로 결합한다) 예를들어 Mono.concatWith(Publisher) 는 Flux 를 리턴한다. 반면에 Mono.then(Mono) 는 또 다른 반환한다.

 

Mono 는 값이 없는 완료의 개념을 갖는 비동기 프로세스를 표현하는데 사용할 수 있다. 이것을 만들기 위하여 Mono<Void> 를 사용할 수 있다.

 

4.3. Flux 와 Mono 를 생성하는 간단한 방법과 이것을 구동하는 방법

Flux 와 Mono 를 시작하는 가장 쉬운 방법은 많은 팩터리 멕서드를 사용하는 방법이다.

 

예를들어 String 시퀀스를 만들기 위하여 만들고자 하는 String 을 나열하거나 Collection 에 String 들을 추가한 후에  이 것들로부터 Flux 를 만들 수 있다.

Flux<String> seq1 = Flux.just("foo", "bar", "foobar");

List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);

subscribing 할때가 되면, Flux 와 Mono 는 Java 8 의 람다를 사용한다. subscribe() 를 호출하기위한 서로 다른 조합의 Callback 선택지가 존재한다. 

subscribe(); --- 1

subscribe(Consumer<? super T> consumer); --- 2

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer); --- 3

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer); --- 4

subscribe(Consumer<? super T> consumer,
          Consumer<? super Throwable> errorConsumer,
          Runnable completeConsumer,
          Consumer<? super Subscription> subscriptionConsumer); --- 5
  1. 구독하고 sequence 를 트리거한다.
  2. 발행된 데이터로 어떠한 작업을 한다
  3. 어떠한 작업을 하지만 에러에 대한 핸들링이 추가되었다
  4. 어떠한 작업과 에러를 핸들링하지만 성공적으로 시퀀스가 마무리 되었을때 실행하고자 하는 코드도 함께 전달한다.
  5. 4번의 내용을 모두 포함하면서 subscribe 호출에 의해 발행되는 Subscripition 으로 어떠한 작업을 한다.

4.3.1. subscribe 함수 예제

 이 섹션에서는 subscribe 함수의 5가지 시그니쳐에 대한 예제를 보여준다. 아래 코드는 인자가 없는 함수를 표현한다.

Flux<Integer> ints = Flux.range(1, 3); --- 1
ints.subscribe(); --- 2
  1. subscriber 가 붙었을때 세개의 Integer 를 발행하는 Flux 를 생성
  2. 가장 간단한 방법으로 구독

 

위 코드는 어떠한 결과값도 화면에 노출하지 않는다. 그러나 이것은 잘 동작한 것이다. Flux 는 세개의 값을 발행하였다. 만약 Lambda 를 제공하였다면 화면에 값이 보였을 것이다. 다음 예제는 subscribe 함수는 값이 화면에 보여지도록 한다.

Flux<Integer> ints = Flux.range(1, 3); --- 1
ints.subscribe(i -> System.out.println(i)); --- 2

결과는 아래와 같다.

1
2
3

 

다음 예제로 넘어가기 위해서 에러를 발생시켜야 한다.

Flux<Integer> ints = Flux.range(1, 4) --- 1
      .map(i -> { --- 2
        if (i <= 3) return i; --- 3
        throw new RuntimeException("Got to 4"); --- 4
      });
ints.subscribe(i -> System.out.println(i), --- 5
      error -> System.err.println("Error: " + error));
  1. 구독되면 4개의 값을 발행하는 Flux 를 생성
  2. 어떤 값에 대하여 다른 처리를 하기 위하여 map 이 필요하다
  3. 대부분의 값은 그대로 반환
  4. 4에 대해서는 예외 발생
  5. 에러 핸들링을 포함한 구독

2개의 람다가 존재하는 하나는 전달받은 값을 노출하는 것이고 하나는 에러를 핸들링 하는 것이다.

1
2
3
Error: java.lang.RuntimeException: Got to 4

 

다음 시그니쳐는 두개의 람다 표현식을 포함하고 완료 이벤트가 발생했을때 처리를 위한 파라미터도 존재한다.

Flux<Integer> ints = Flux.range(1, 4); --- 1
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> System.out.println("Done")); --- 2
  1. 구독이 시작되면 integer 를 발행하는 Flux 생성
  2. 완료 이벤트가 발생할때를 위한 핸드러를 포함해서 구독

에러 시그널과 완료 시그널은 모두 이벤트를 제거한다 그리고 둘중에 하나만 발생하게 된다. consumer 작업을 완료하도록 하기 위하여 error 가 트리거 되지 않도록 주의해야한다.

1
2
3
4
Done

subscribe 함수의 마지막 시그니쳐는 Consumer<Subscription> 을 포함한다.

 

Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(i -> System.out.println(i),
    error -> System.err.println("Error " + error),
    () -> System.out.println("Done"),
    sub -> sub.request(10)); --- 1
  1. 구독을 하기 시작할때 Subscription 을 받게된다. 10개의 값을 더 전달해달라는 신호를 전달한다.

4.3.2. Disposable 을 이용한 subscribe() 취소

모든 subcribe() 의 시그니쳐들은 Disposable 을 리턴 타입으로 가지고 있다. 이 경우에 Disposable 인터페이스는 구독이 취소되었다는 것을 의미한다.

 

Flux 또는 Mono 에서 취소는 Element 를 발행하는 것을 멈춘다. 그러나 즉시 실행되는 것을 보장하지 않는다. 취소 시그널을 받기 전에 너무 빨리 발행하게 되는 경우가 있을 수 있다. 

 

Disposable 과 관련된 유틸들은 Disposables 클래스에서 사용할 수 있다. Disposables.swap() 은 Disposable Wrapper 를 생성하여 Atomic 하게 취소하고 Disposable 을 대체할 수 있다. 예를들어 이것은 UI 시나리오에서 유저가 버튼을 눌렀을때 요청을 취소하고 새로운 요청을 만들때 유용할 수 있다. Wrapper 자체를 닫으면 Sequence 자체가 닫히게 된다. 

 

또 다른 흥미로운 유틸은 Disposables.composite(...) 이다. 이 합성은 여러개의 Disposable 을 모을 수 있게 해준다. 예를들어 처리중인 모든 서비스콜을 한번에 모아서 처리하는데 사용할 수 있다. 합성된 dispose() 가 한번 호출되면 또 다른 Disposable 을 추가하는 요청은 제거된다.

 

4.3.3. 람다의 대안 : BaseSubscriber

subscribe() 에는 람다를 조합해서 사용할때보다 조금 더 범용적이고 Subscriber 의 특생을 갖춘 함수가 있다. 이러한 Subscriber 코드 작성을 돕기 위하여 BaseSubcriber 라고 불리는 상속가능한 클래스가 있다.

 

BaseSubscriber 를 상속한 SampleSubscriber 를 만들었다. 아래는 Flux 에 SampleSubscriber 를 붙이는 예제이다.

SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(ss);

아래 예제는 SampleSubscriber 가 BaseSubscriber 의 최소구현으로 이루어진 코드이다.

package io.projectreactor.samples;

import org.reactivestreams.Subscription;

import reactor.core.publisher.BaseSubscriber;

public class SampleSubscriber<T> extends BaseSubscriber<T> {

	public void hookOnSubscribe(Subscription subscription) {
		System.out.println("Subscribed");
		request(1);
	}

	public void hookOnNext(T value) {
		System.out.println(value);
		request(1);
	}
}

SampleSubscriber 는 리액터에서 사용자 정의 Subscribers 를 위한 추상 클래스인 BaseSubscriber 를 상속받았다. 이 클래스는 subscriber 의 행위를 재정의할 수 있다. 기본적으로, 이것은 무한한 요청을 촉진하고 subscribe() 와 같이 동작한다. 그러나 BaseSubscriber 를 상속받는것은 커스텀한 요청 양을 원할때 유용하다.

 

커스텀한 요청양을 위한 최소한의 구현은 hookOnSubscribe(Subscription subsctiption) 그리고 hookOnNext(T value) 이다. 이 경우에 hookOnSubscribe() 함수에 의해서 첫번째 requset 가 만들어졌을때 "Subscribed" 가 화면에 출력된다. 그 후에 hookOnNext 함수가 추가적인 요청들을 수행하여 value 를 출력한다.

Subscribed
1
2
3
4

BaseSubscriber 는 또한 requestUnbounded() 함수를 제공한다 이 함수는 request(Long.MAX_VALUE) 와 동일하게 무한 요청 모드로 동작하게된다. cancel() 함수가 호출될때까지.

 

BaseSubscriber 에는 몇가지 hook 들이 더 존재한다. : hookOnComplete, hookOnError, hookOnCancel, hookFinally

 

4.3.4. Backpressure 와 요청을 개조하는 방법

Reactor 에서 backpressure 를 구현할때, Consumer 가 pressure 하는 방법은 upstream operator 에게 request 를 다시 보냄으로서 전파한다. 현재 요청의 합은 때때로 현재 "Demand" 또는 "Pending Request" 로 참고될 수 있다. "Demand" 는 publisher 가 할 수 있는한 최대한 빨리 모든 요청을 발행해달라는 무한 모드와 같은 Lone.MAX_VALUE 로 제한된다. 이는 backpressure 를 비활성화 한 것과 같다.

 

첫번째 요청이 마지막 subscriber 에게 전달되어 구독될때, 대부분의 subscribing 의 방법은 즉시 Long.MAX_VALUE 로 무한 모드로 동작하게 된다.

  • subscribe() 그리고 대부분의 람다 
  • block(), blockFirst(), blockLast()
  • toIterable(), toStream()

Request 를 커스터마이징하는 가장 간단한 방법은 BaseSubscriber 의 hookOnSubscribe 함수를 재정의하는 것이다.

Flux.range(1, 10)
    .doOnRequest(r -> System.out.println("request of " + r))
    .subscribe(new BaseSubscriber<Integer>() {

      @Override
      public void hookOnSubscribe(Subscription subscription) {
        request(1);
      }

      @Override
      public void hookOnNext(Integer integer) {
        System.out.println("Cancelling after having received " + integer);
        cancel();
      }
    });

결과는 아래와 같다.

request of 1
Cancelling after having received 1

 

4.3.5. Downstream 으로부터 Demand 를 변경하는 연산자

 명심해야할 것은 Subscribe 단계의 demand 표현은 upstream 체인에 의하여 변경될 수 있다. 대표적인 경우가 buffer(N) 이다. 만약 request(2) 를 받게 된다면 2개의 가득찬 buffer 를 요구하는 것으로 해석된다. 결과적으로 buffer 가 N 개 만큼이 있어야 가득차기 때문에 buffer 요청은 2 * N 으로 요청을 변경한다.

 

int 타입을 인풋 파라미터로 받는 prefetch 연산자에 대해서고 알고 있어야 한다. prefetch 는 downstream 요청을 변경하는 것과는 조금 다른 범주이다. prefetch 는 보통 flatMap 과 같은 Publisher 로 부터 들어오는 값들을 얻어내기 위해 사용된다.

 

prefetch 는 내부의 시퀀스로 인해서 만들어진 최최의 요청들을 조율하기 위한 방법이다. 만약 명시되지 않았따면 이 연산자는 32 를 요구하도록 되어있다.

 

이 연산자는 보통 replenishing optimization 을 구현한다. prefetch 요청의 75%가 수행된 것을 확인하면 upstream 에게 75% 를 다시 요청한다. 이는 연산자가 앞으로 들어올 요청을 사전에 예측할 수 있도록 만들어진 체험적인 최적화이다.

 

Request 를 조절할 수 있는 연산자는 2개가 남았다 : limitRate, limitRequest

 

limitRate(N) 은 downstream 의 요청을 분할하여 더 작은 배치로 업스트림으로 전파되도록 한다. 예를들어 100 request 은 10개의 request 로 나뉘어서 upstream 으로 전달된다. limitRate 는 앞서 언급되었던 replenishing optimization 을 구현하였다.

 

 

 

 

 

 

 

 

출처 : projectreactor.io/docs/core/release/reference/index.html

'JAVA > Reactor' 카테고리의 다른 글

Reactor Context 예제  (0) 2021.04.26
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/12   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31
글 보관함