Skip to main content Link Menu Expand (external link) Document Search Copy Copied

다들 동기/비동기와 Blocking/Non-blocking 을 한번씩 들어보셨을거에요.

하지만 Spring-Java 에서 이를 섞은 조합에 대한 실제 구현 방식을 설명하는 글이 별로 없더라구요.

그래서! 오늘의 포스팅은 해당 구현 방식에 대해 자세히 설명하려고 합니다.

1. 비동기/동기와 Blocking/Non-blocking 의 차이점

1.1 Sync vs Async

호출되는 함수의 작업 완료 여부를 누가 신경쓰냐가 관심사!

A 스레드가 B 스레드에게 요청했을 때,

  • Sync : A 스레드는 직접적으로 B 스레드에게 결과요청 메세지를 전송함. 즉, 메인 스레드가 작업 완료 여부를 체크함.
  • Async : B 스레드가 A 스레드에게 결과를 전달함으로써, 결과를 간접적으로 받음. 즉, 서브 스레드가 작업 완료 여부를 메인에 전송함.

1.2 Blocking vs Non-blocking

호출되는 함수가 바로 리턴하느냐 마느냐가 관심사!

A 스레드가 B 스레드에게 요청했을 때,

  • Blocking : A 스레드는 리턴을 받기 전까지 다음의 명령을 실행할 수 없음
  • Non-blocking : 그렇지 않고 요청과 동시에 다음의 명령을 실행할 수 있음

2. 조합에 따른 4 경우의 구현차이점 (feat. CompletableFuture + WebFlux)

기본적으로 CompletableFuture 를 사용하게 되면, 결과는 .get()이나 .join()메소드를 통해 메인스레드에서 수신해야해요. 그리고 이 메소드들은 Blocking 메소드입니다. 즉, CompletableFuture 를 사용하게 되면, 실질적인 Non-Blocking 구현이 힘들어요. 왜냐하면 어쨋든 결과를 Blocking 으로 받기 때문이죠!.

그래서 저는 FluxSink 객체를 통해 Non-blocking 구현을 하였어요. WebFlux 포스팅에서 다양한 예시를 확인할 수 있습니다.

img Reference : https://www.inflearn.com/news/72620

2.1 Async + Blocking( CompletableFuture )

  • CompletableFuture 를 활용한 코드
void async_blocking() throws ExecutionException, InterruptedException {
    ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
    System.out.println("[Thread 1] - 작업 시작할게요");

    // Async
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
        System.out.println("[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요");
        return "Thread 2의 결과물";
    },t);

    // Blocking
    String result = completableFuture.get();

    System.out.println("[Thread 1] - 끝났군요! 결과물은 : \""+result+"\", 이제 다음 작업 수행할게요");
    System.out.println("[Thread 1] 다음 작업 수행 중...");

    t.destroy();
}
  • 결과
[Thread 1] - 작업 시작할게요
[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요
[Thread 1] - 끝났군요! 결과물은 : "Thread 2의 결과물", 이제 다음 작업 수행할게요
[Thread 1] 다음 작업 수행 중...

2.2 Async + Non-blocking( CompletableFuture + WebFlux )

  • CompletableFuture 에 WebFlux 를 추가한 코드
void async_non_blocking() throws ExecutionException, InterruptedException {
    ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
    Sinks.Many<Object> sinks = Sinks.many().replay().all();

    System.out.println("[Thread 1] - 작업 시작할게요");

    // Async
    CompletableFuture.runAsync(() -> {
        try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
        System.out.println("[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요");
        sinks.tryEmitNext("Thread 2의 결과물");
    },t);

    // Non-Blocking
    sinks.asFlux().log().subscribe(result->{
        System.out.println("[Thread 1] - 끝났군요! 결과물은 : \""+result+"\", 이제 다음 작업 수행할게요");
    });

    System.out.println("[Thread 1] 다음 작업 수행 중...");

    try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}
    t.destroy();
}
  • 결과
[Thread 1] - 작업 시작할게요
[Thread 1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
[Thread 1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | request(unbounded)
[Thread 1] 다음 작업 수행 중...
[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요
[Thread 2] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onNext(Thread 2의 결과물)
[Thread 1] - 끝났군요! 결과물은 : "Thread 2의 결과물", 이제 다음 작업 수행할게요

2.3 Sync + Blocking( IORead )

이 부분은 file.read(), file.write() 와 같이 간단한 예시이기때문에 생략하겠습니다.

2.4 Sync + Non-blocking( CompletableFuture )

이 부분은 간략한 예시를 들기 위해 CompletableFuture 을 사용했어요. 하지만! 이 예시는 완벽한 Sync + Non-blocking 예시가 아니라는 점을 알아주세요. 왜냐하면 아래 코드의 completableFuture.get() 는 결국 Blocking 이거든요. 다만 이전에 completableFuture.isDone() 으로 Thread 1 이 직접적으로 Thread 2 에게 작업완료여부를 물어보기때문에 아래와 같은 예시를 들었습니다. 실제 Sync + Non-blocking 은 asyncFileChannel.read 와 같은 예시입니다.

void sync_non_blocking() throws ExecutionException, InterruptedException {
    ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
    System.out.println("[Thread 1] - 작업 시작할게요");
    
    // Async 이지만, 아래의 completableFuture.isDone()을 통한 callBack 무시로 결론은 Sync
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
        System.out.println("[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요");
        try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
        return "Thread 2의 결과물";
    },t);

    // Non-Blocking
    while(!completableFuture.isDone()){
        Thread.sleep(200);
        System.out.println("[Thread 1] - Thread 2님 작업이 끝났나요? 그동안 저는 다른일 좀 할게요");
        System.out.println("[Thread 1] - 다른 일 중...");
    }
    // 다음 작업
    System.out.println("[Thread 1] - 끝났군요! 결과물은 : \""+completableFuture.get()+"\", 이제 다음 작업 수행할게요");
    System.out.println("[Thread 1] 다음 작업 수행 중...");

    t.destroy();
}
  • 결과
[Thread 1] - 작업 시작할게요
[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요
[Thread 1] - Thread 2님 작업이 끝났나요? 그동안 저는 다른일 좀 할게요
[Thread 1] - 다른 일 중...
[Thread 1] - Thread 2님 작업이 끝났나요? 그동안 저는 다른일 좀 할게요
[Thread 1] - 다른 일 중...
[Thread 1] - Thread 2님 작업이 끝났나요? 그동안 저는 다른일 좀 할게요
[Thread 1] - 다른 일 중...
[Thread 1] - 끝났군요! 결과물은 : "Thread 2의 결과물", 이제 다음 작업 수행할게요
[Thread 1] 다음 작업 수행 중...