다들 동기/비동기와 Blocking/Non-blocking 을 한번씩 들어보셨을거에요.
하지만 Spring-Java 에서 이를 섞은 조합에 대한 실제 구현 방식을 설명하는 글이 별로 없더라구요.
그래서! 오늘의 포스팅은 해당 구현 방식에 대해 자세히 설명하려고 합니다.
1. 비동기/동기와 Blocking/Non-blocking 의 차이점
1.1 Sync vs Async
호출되는 함수의 작업 완료 여부를 누가 신경쓰냐가 관심사!
A 스레드가 B 스레드에게 요청했을 때,
- Sync : A 스레드는 직접적으로 B 스레드에게 결과요청 메세지를 전송함. 즉, 메인 스레드가 작업 완료 여부를 체크함.
- Async : B 스레드가 A 스레드에게 결과를 전달함으로써, 결과를 간접적으로 받음. 즉, 서브 스레드가 작업 완료 여부를 메인에 전송함.
1.2 Blocking vs Non-blocking
호출되는 함수가 바로 리턴하느냐 마느냐가 관심사!
A 스레드가 B 스레드에게 요청했을 때,
- Blocking : A 스레드는 리턴을 받기 전까지 다음의 명령을 실행할 수 없음
- Non-blocking : 그렇지 않고 요청과 동시에 다음의 명령을 실행할 수 있음
2. 조합에 따른 4 경우의 구현차이점 (feat. CompletableFuture + WebFlux)
기본적으로 CompletableFuture 를 사용하게 되면, 결과는
.get()이나.join()메소드를 통해 메인스레드에서 수신해야해요. 그리고 이 메소드들은 Blocking 메소드입니다. 즉, CompletableFuture 를 사용하게 되면, 실질적인 Non-Blocking 구현이 힘들어요. 왜냐하면 어쨋든 결과를 Blocking 으로 받기 때문이죠!.그래서 저는 Flux 와 Sink 객체를 통해 Non-blocking 구현을 하였어요. WebFlux 포스팅에서 다양한 예시를 확인할 수 있습니다.
Reference : https://www.inflearn.com/news/72620
2.1 Async + Blocking( CompletableFuture )
- CompletableFuture 를 활용한 코드
void async_blocking() throws ExecutionException, InterruptedException {
ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
System.out.println("[Thread 1] - 작업 시작할게요");
// Async
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
System.out.println("[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요");
return "Thread 2의 결과물";
},t);
// Blocking
String result = completableFuture.get();
System.out.println("[Thread 1] - 끝났군요! 결과물은 : \""+result+"\", 이제 다음 작업 수행할게요");
System.out.println("[Thread 1] 다음 작업 수행 중...");
t.destroy();
}
- 결과
[Thread 1] - 작업 시작할게요
[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요
[Thread 1] - 끝났군요! 결과물은 : "Thread 2의 결과물", 이제 다음 작업 수행할게요
[Thread 1] 다음 작업 수행 중...
2.2 Async + Non-blocking( CompletableFuture + WebFlux )
- CompletableFuture 에 WebFlux 를 추가한 코드
void async_non_blocking() throws ExecutionException, InterruptedException {
ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
Sinks.Many<Object> sinks = Sinks.many().replay().all();
System.out.println("[Thread 1] - 작업 시작할게요");
// Async
CompletableFuture.runAsync(() -> {
try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
System.out.println("[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요");
sinks.tryEmitNext("Thread 2의 결과물");
},t);
// Non-Blocking
sinks.asFlux().log().subscribe(result->{
System.out.println("[Thread 1] - 끝났군요! 결과물은 : \""+result+"\", 이제 다음 작업 수행할게요");
});
System.out.println("[Thread 1] 다음 작업 수행 중...");
try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}
t.destroy();
}
- 결과
[Thread 1] - 작업 시작할게요
[Thread 1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
[Thread 1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | request(unbounded)
[Thread 1] 다음 작업 수행 중...
[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요
[Thread 2] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onNext(Thread 2의 결과물)
[Thread 1] - 끝났군요! 결과물은 : "Thread 2의 결과물", 이제 다음 작업 수행할게요
2.3 Sync + Blocking( IORead )
이 부분은 file.read(), file.write() 와 같이 간단한 예시이기때문에 생략하겠습니다.
2.4 Sync + Non-blocking( CompletableFuture )
이 부분은 간략한 예시를 들기 위해 CompletableFuture 을 사용했어요. 하지만! 이 예시는 완벽한 Sync + Non-blocking 예시가 아니라는 점을 알아주세요. 왜냐하면 아래 코드의 completableFuture.get() 는 결국 Blocking 이거든요. 다만 이전에 completableFuture.isDone() 으로 Thread 1 이 직접적으로 Thread 2 에게 작업완료여부를 물어보기때문에 아래와 같은 예시를 들었습니다. 실제 Sync + Non-blocking 은 asyncFileChannel.read 와 같은 예시입니다.
void sync_non_blocking() throws ExecutionException, InterruptedException {
ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
System.out.println("[Thread 1] - 작업 시작할게요");
// Async 이지만, 아래의 completableFuture.isDone()을 통한 callBack 무시로 결론은 Sync
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요");
try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
return "Thread 2의 결과물";
},t);
// Non-Blocking
while(!completableFuture.isDone()){
Thread.sleep(200);
System.out.println("[Thread 1] - Thread 2님 작업이 끝났나요? 그동안 저는 다른일 좀 할게요");
System.out.println("[Thread 1] - 다른 일 중...");
}
// 다음 작업
System.out.println("[Thread 1] - 끝났군요! 결과물은 : \""+completableFuture.get()+"\", 이제 다음 작업 수행할게요");
System.out.println("[Thread 1] 다음 작업 수행 중...");
t.destroy();
}
- 결과
[Thread 1] - 작업 시작할게요
[Thread 2] - [Thread 1]으로부터 전달받은 작업 처리할게요
[Thread 1] - Thread 2님 작업이 끝났나요? 그동안 저는 다른일 좀 할게요
[Thread 1] - 다른 일 중...
[Thread 1] - Thread 2님 작업이 끝났나요? 그동안 저는 다른일 좀 할게요
[Thread 1] - 다른 일 중...
[Thread 1] - Thread 2님 작업이 끝났나요? 그동안 저는 다른일 좀 할게요
[Thread 1] - 다른 일 중...
[Thread 1] - 끝났군요! 결과물은 : "Thread 2의 결과물", 이제 다음 작업 수행할게요
[Thread 1] 다음 작업 수행 중...
Everyone has probably heard of synchronous/asynchronous and blocking/non-blocking at least once.
But there were not many posts explaining how to actually implement combinations of these in Spring-Java.
So this post explains the implementation method in detail.
1. Difference between async/sync and blocking/non-blocking
1.1 Sync vs Async
The concern is who cares whether the called function’s work is completed.
When thread A requests work from thread B:
- Sync: thread A directly sends a result request message to thread B. In other words, the main thread checks whether the work is complete.
- Async: thread B delivers the result to thread A, so the result is received indirectly. In other words, the sub thread sends the work completion result to the main thread.
1.2 Blocking vs Non-blocking
The concern is whether the called function returns immediately or not.
When thread A requests work from thread B:
- Blocking: thread A cannot execute the next command until it receives a return.
- Non-blocking: thread A can execute the next command immediately after making the request.
2. Implementation differences for the four combinations, with CompletableFuture + WebFlux
Basically, when using CompletableFuture, the result must be received from the main thread through
.get()or.join(). These methods are blocking methods. So if you use CompletableFuture alone, implementing actual non-blocking behavior is difficult. That is because the result is eventually received in a blocking way.So I implemented non-blocking behavior through Flux and Sink objects. You can find various examples in the WebFlux post.
Reference: https://www.inflearn.com/news/72620
2.1 Async + Blocking, CompletableFuture
- Code using CompletableFuture
void async_blocking() throws ExecutionException, InterruptedException {
ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
System.out.println("[Thread 1] - I will start the work");
// Async
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
System.out.println("[Thread 2] - I will process the work received from [Thread 1]");
return "Result from Thread 2";
},t);
// Blocking
String result = completableFuture.get();
System.out.println("[Thread 1] - It is done. Result: \""+result+"\". I will continue the next work");
System.out.println("[Thread 1] doing next work...");
t.destroy();
}
- Result
[Thread 1] - I will start the work
[Thread 2] - I will process the work received from [Thread 1]
[Thread 1] - It is done. Result: "Result from Thread 2". I will continue the next work
[Thread 1] doing next work...
2.2 Async + Non-blocking, CompletableFuture + WebFlux
- Code adding WebFlux to CompletableFuture
void async_non_blocking() throws ExecutionException, InterruptedException {
ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
Sinks.Many<Object> sinks = Sinks.many().replay().all();
System.out.println("[Thread 1] - I will start the work");
// Async
CompletableFuture.runAsync(() -> {
try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
System.out.println("[Thread 2] - I will process the work received from [Thread 1]");
sinks.tryEmitNext("Result from Thread 2");
},t);
// Non-Blocking
sinks.asFlux().log().subscribe(result->{
System.out.println("[Thread 1] - It is done. Result: \""+result+"\". I will continue the next work");
});
System.out.println("[Thread 1] doing next work...");
try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}
t.destroy();
}
- Result
[Thread 1] - I will start the work
[Thread 1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
[Thread 1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | request(unbounded)
[Thread 1] doing next work...
[Thread 2] - I will process the work received from [Thread 1]
[Thread 2] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onNext(Result from Thread 2)
[Thread 1] - It is done. Result: "Result from Thread 2". I will continue the next work
2.3 Sync + Blocking, IORead
This part is omitted because it is a simple example like file.read() and file.write().
2.4 Sync + Non-blocking, CompletableFuture
I used CompletableFuture here to give a simple example. But please note that this is not a perfect Sync + Non-blocking example. That is because completableFuture.get() in the code below is eventually blocking. However, before that, Thread 1 directly asks Thread 2 whether the work is done through completableFuture.isDone(), ignoring a callback. That is why I used it as the example below. A real Sync + Non-blocking example would be something like asyncFileChannel.read.
void sync_non_blocking() throws ExecutionException, InterruptedException {
ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
System.out.println("[Thread 1] - I will start the work");
// Async, but because callbacks are ignored and completableFuture.isDone() is used below, the conclusion is Sync
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("[Thread 2] - I will process the work received from [Thread 1]");
try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
return "Result from Thread 2";
},t);
// Non-Blocking
while(!completableFuture.isDone()){
Thread.sleep(200);
System.out.println("[Thread 1] - Thread 2, are you done? I will do other work meanwhile");
System.out.println("[Thread 1] - doing other work...");
}
// next work
System.out.println("[Thread 1] - It is done. Result: \""+completableFuture.get()+"\". I will continue the next work");
System.out.println("[Thread 1] doing next work...");
t.destroy();
}
- Result
[Thread 1] - I will start the work
[Thread 2] - I will process the work received from [Thread 1]
[Thread 1] - Thread 2, are you done? I will do other work meanwhile
[Thread 1] - doing other work...
[Thread 1] - Thread 2, are you done? I will do other work meanwhile
[Thread 1] - doing other work...
[Thread 1] - Thread 2, are you done? I will do other work meanwhile
[Thread 1] - doing other work...
[Thread 1] - It is done. Result: "Result from Thread 2". I will continue the next work
[Thread 1] doing next work...