JAVA/Concurrency
Java Stream 과 CompletableFuture 를 함께 사용할때 주의할점
소농배
2023. 9. 13. 11:19
ExecutorService executorService = Executors.newFixedThreadPool(3);
Stream.of(3, 2, 1)
.map((i) -> CompletableFuture.supplyAsync(() -> {
try {
System.out.println(Thread.currentThread().getName() + " : start");
Thread.sleep(i * 1000L);
System.out.println(Thread.currentThread().getName() + " : end");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return i;
}, executorService))
.map(CompletableFuture::join)
.forEach(System.out::println);
CompletableFuture 를 활용하여 각 Stream 을 병렬로 처리되기를 기대한 코드이다.
3초, 2초, 1초 만큼 Thread sleep 을 걸어놓았고 각 쓰레드가 task 를 끝나기를 대기했다고 출력을 하게된다.
pool-1-thread-1 : start
pool-1-thread-1 : end
3
pool-1-thread-2 : start
pool-1-thread-2 : end
2
pool-1-thread-3 : start
pool-1-thread-3 : end
1
결과를 확인하면 1번 쓰레드가 다 끝난 다음에서야 2번 쓰레드가 실행된것을 확인할 수 있다. 즉, 병렬로 실행되지 않았다
Stream 을 Futures 와 함께 사용하게되면 다음 CompletableFuture 는 첫번째 CompletableFuture 가 완료된 다음에서야 생성된다는 것을 알 수 있다.
즉, 위에 예제 코드는 CompletableFuture.join() 을 순차적으로 실행하여 Sequential 하게 실행된 것이다.
마치 아래와같이 Futures 를 만들고 바로 get() 한 것과 같은 상황이다.
for(;;) {
Future f = new Futures()
f.get()
}
위 설명에 나온대로 코드를 수정해보면 아래와 같이 수정할 수 있다.
List<CompletableFuture<Integer>> futures = Stream.of(3, 2, 1)
.map((i) -> CompletableFuture.supplyAsync(() -> {
System.out.println(Thread.currentThread().getName() + " is start");
try {
Thread.sleep(i * 1000L);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(Thread.currentThread().getName() + " is end");
return i;
}, executorService)).collect(Collectors.toList())
futures.stream().map(CompletableFuture::join).forEach(System.out::println);
CompletableFutures 를 먼저 만든후에 stream 을 한번 끊고 각 CompletableFutures 의 join 을 걸고 결과를 출력하기위한 stream 을 다시 생성
pool-1-thread-2 is start
pool-1-thread-1 is start
pool-1-thread-3 is start
pool-1-thread-3 is end
pool-1-thread-2 is end
pool-1-thread-1 is end
3
2
1
수정한 코드에서는 위와같이 각 쓰레드가 함께 시작하여 병렬처리된 것을 확인할 수 있다.