다들 동기/비동기와 Blocking/Non-blocking 을 한번씩 들어보셨을거에요.
하지만 Spring-Java 에서 이를 섞은 조합에 대한 실제 구현 방식을 설명하는 글이 별로 없더라구요.
그래서! 오늘의 포스팅은 해당 구현 방식에 대해 자세히 설명하려고 합니다.
1. 비동기/동기와 Blocking/Non-blocking 의 차이점
1.1 Sync vs Async
호출되는 함수의 작업 완료 여부를 누가 신경쓰냐가 관심사!
A 스레드가 B 스레드에게 요청했을 때,
- Sync : A 스레드는 직접적으로 B 스레드에게 결과요청 메세지를 전송함. 즉, 메인 스레드가 작업 완료 여부를 체크함.
- Async : B 스레드가 A 스레드에게 결과를 전달함으로써, 결과를 간접적으로 받음. 즉, 서브 스레드가 작업 완료 여부를 메인에 전송함.
1.2 Blocking vs Non-blocking
호출되는 함수가 바로 리턴하느냐 마느냐가 관심사!
A 스레드가 B 스레드에게 요청했을 때,
- Blocking : A 스레드는 리턴을 받기 전까지 다음의 명령을 실행할 수 없음
- Non-blocking : 그렇지 않고 요청과 동시에 다음의 명령을 실행할 수 있음
2. 조합에 따른 4 경우의 구현차이점 (feat. CompletableFuture + WebFlux)
기본적으로 CompletableFuture 를 사용하게 되면, 결과는
.get()
이나.join()
메소드를 통해 메인스레드에서 수신해야해요. 그리고 이 메소드들은 Blocking 메소드입니다. 즉, CompletableFuture 를 사용하게 되면, 실질적인 Non-Blocking 구현이 힘들어요. 왜냐하면 어쨋든 결과를 Blocking 으로 받기 때문이죠!.그래서 저는 Flux 와 Sink 객체를 통해 Non-blocking 구현을 하였어요. WebFlux 포스팅에서 다양한 예시를 확인할 수 있습니다.
Reference : https://www.inflearn.com/news/72620
2.1 Async + Blocking( CompletableFuture )
- CompletableFuture 를 활용한 코드
void async_blocking() throws ExecutionException, InterruptedException {
ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
System.out.println("[Thread 1] - 작업 시작할게요");
// Async
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
System.out.println("[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요");
return "Thread 2의 결과물";
},t);
// Blocking
String result = completableFuture.get();
System.out.println("[Thread 1] - 끝났군요! 결과물은 : \""+result+"\", 이제 다음 작업 수행할게요");
System.out.println("[Thread 1] 다음 작업 수행 중...");
t.destroy();
}
- 결과
[Thread 1] - 작업 시작할게요
[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요
[Thread 1] - 끝났군요! 결과물은 : "Thread 2의 결과물", 이제 다음 작업 수행할게요
[Thread 1] 다음 작업 수행 중...
2.2 Async + Non-blocking( CompletableFuture + WebFlux )
- CompletableFuture 에 WebFlux 를 추가한 코드
void async_non_blocking() throws ExecutionException, InterruptedException {
ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
Sinks.Many<Object> sinks = Sinks.many().replay().all();
System.out.println("[Thread 1] - 작업 시작할게요");
// Async
CompletableFuture.runAsync(() -> {
try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
System.out.println("[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요");
sinks.tryEmitNext("Thread 2의 결과물");
},t);
// Non-Blocking
sinks.asFlux().log().subscribe(result->{
System.out.println("[Thread 1] - 끝났군요! 결과물은 : \""+result+"\", 이제 다음 작업 수행할게요");
});
System.out.println("[Thread 1] 다음 작업 수행 중...");
try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}
t.destroy();
}
- 결과
[Thread 1] - 작업 시작할게요
[Thread 1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
[Thread 1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | request(unbounded)
[Thread 1] 다음 작업 수행 중...
[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요
[Thread 2] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onNext(Thread 2의 결과물)
[Thread 1] - 끝났군요! 결과물은 : "Thread 2의 결과물", 이제 다음 작업 수행할게요
2.3 Sync + Blocking( IORead )
이 부분은 file.read()
, file.write()
와 같이 간단한 예시이기때문에 생략하겠습니다.
2.4 Sync + Non-blocking( CompletableFuture )
이 부분은 간략한 예시를 들기 위해 CompletableFuture 을 사용했어요. 하지만! 이 예시는 완벽한 Sync + Non-blocking 예시가 아니라는 점을 알아주세요. 왜냐하면 아래 코드의 completableFuture.get()
는 결국 Blocking 이거든요. 다만 이전에 completableFuture.isDone()
으로 Thread 1
이 직접적으로 Thread 2
에게 작업완료여부를 물어보기때문에 아래와 같은 예시를 들었습니다. 실제 Sync + Non-blocking 은 asyncFileChannel.read
와 같은 예시입니다.
void sync_non_blocking() throws ExecutionException, InterruptedException {
ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
System.out.println("[Thread 1] - 작업 시작할게요");
// Async 이지만, 아래의 completableFuture.isDone()을 통한 callBack 무시로 결론은 Sync
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요");
try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
return "Thread 2의 결과물";
},t);
// Non-Blocking
while(!completableFuture.isDone()){
Thread.sleep(200);
System.out.println("[Thread 1] - Thread 2님 작업이 끝났나요? 그동안 저는 다른일 좀 할게요");
System.out.println("[Thread 1] - 다른 일 중...");
}
// 다음 작업
System.out.println("[Thread 1] - 끝났군요! 결과물은 : \""+completableFuture.get()+"\", 이제 다음 작업 수행할게요");
System.out.println("[Thread 1] 다음 작업 수행 중...");
t.destroy();
}
- 결과
[Thread 1] - 작업 시작할게요
[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요
[Thread 1] - Thread 2님 작업이 끝났나요? 그동안 저는 다른일 좀 할게요
[Thread 1] - 다른 일 중...
[Thread 1] - Thread 2님 작업이 끝났나요? 그동안 저는 다른일 좀 할게요
[Thread 1] - 다른 일 중...
[Thread 1] - Thread 2님 작업이 끝났나요? 그동안 저는 다른일 좀 할게요
[Thread 1] - 다른 일 중...
[Thread 1] - 끝났군요! 결과물은 : "Thread 2의 결과물", 이제 다음 작업 수행할게요
[Thread 1] 다음 작업 수행 중...