๋ค๋ค ๋๊ธฐ/๋น๋๊ธฐ์ Blocking/Non-blocking ์ ํ๋ฒ์ฉ ๋ค์ด๋ณด์ จ์๊ฑฐ์์.
ํ์ง๋ง Spring-Java ์์ ์ด๋ฅผ ์์ ์กฐํฉ์ ๋ํ ์ค์ ๊ตฌํ ๋ฐฉ์์ ์ค๋ช ํ๋ ๊ธ์ด ๋ณ๋ก ์๋๋ผ๊ตฌ์.
๊ทธ๋์! ์ค๋์ ํฌ์คํ ์ ํด๋น ๊ตฌํ ๋ฐฉ์์ ๋ํด ์์ธํ ์ค๋ช ํ๋ ค๊ณ ํฉ๋๋ค.
1. ๋น๋๊ธฐ/๋๊ธฐ์ Blocking/Non-blocking ์ ์ฐจ์ด์
1.1 Sync vs Async
ํธ์ถ๋๋ ํจ์์ ์์ ์๋ฃ ์ฌ๋ถ๋ฅผ ๋๊ฐ ์ ๊ฒฝ์ฐ๋๊ฐ ๊ด์ฌ์ฌ!
A ์ค๋ ๋๊ฐ B ์ค๋ ๋์๊ฒ ์์ฒญํ์ ๋,
- Sync : A ์ค๋ ๋๋ ์ง์ ์ ์ผ๋ก B ์ค๋ ๋์๊ฒ ๊ฒฐ๊ณผ์์ฒญ ๋ฉ์ธ์ง๋ฅผ ์ ์กํจ. ์ฆ, ๋ฉ์ธ ์ค๋ ๋๊ฐ ์์ ์๋ฃ ์ฌ๋ถ๋ฅผ ์ฒดํฌํจ.
- Async : B ์ค๋ ๋๊ฐ A ์ค๋ ๋์๊ฒ ๊ฒฐ๊ณผ๋ฅผ ์ ๋ฌํจ์ผ๋ก์จ, ๊ฒฐ๊ณผ๋ฅผ ๊ฐ์ ์ ์ผ๋ก ๋ฐ์. ์ฆ, ์๋ธ ์ค๋ ๋๊ฐ ์์ ์๋ฃ ์ฌ๋ถ๋ฅผ ๋ฉ์ธ์ ์ ์กํจ.
1.2 Blocking vs Non-blocking
ํธ์ถ๋๋ ํจ์๊ฐ ๋ฐ๋ก ๋ฆฌํดํ๋๋ ๋ง๋๋๊ฐ ๊ด์ฌ์ฌ!
A ์ค๋ ๋๊ฐ B ์ค๋ ๋์๊ฒ ์์ฒญํ์ ๋,
- Blocking : A ์ค๋ ๋๋ ๋ฆฌํด์ ๋ฐ๊ธฐ ์ ๊น์ง ๋ค์์ ๋ช ๋ น์ ์คํํ ์ ์์
- Non-blocking : ๊ทธ๋ ์ง ์๊ณ ์์ฒญ๊ณผ ๋์์ ๋ค์์ ๋ช ๋ น์ ์คํํ ์ ์์
2. ์กฐํฉ์ ๋ฐ๋ฅธ 4 ๊ฒฝ์ฐ์ ๊ตฌํ์ฐจ์ด์ (feat. CompletableFuture + WebFlux)
๊ธฐ๋ณธ์ ์ผ๋ก CompletableFuture ๋ฅผ ์ฌ์ฉํ๊ฒ ๋๋ฉด, ๊ฒฐ๊ณผ๋
.get()
์ด๋.join()
๋ฉ์๋๋ฅผ ํตํด ๋ฉ์ธ์ค๋ ๋์์ ์์ ํด์ผํด์. ๊ทธ๋ฆฌ๊ณ ์ด ๋ฉ์๋๋ค์ Blocking ๋ฉ์๋์ ๋๋ค. ์ฆ, CompletableFuture ๋ฅผ ์ฌ์ฉํ๊ฒ ๋๋ฉด, ์ค์ง์ ์ธ Non-Blocking ๊ตฌํ์ด ํ๋ค์ด์. ์๋ํ๋ฉด ์ด์จ๋ ๊ฒฐ๊ณผ๋ฅผ Blocking ์ผ๋ก ๋ฐ๊ธฐ ๋๋ฌธ์ด์ฃ !.๊ทธ๋์ ์ ๋ Flux ์ Sink ๊ฐ์ฒด๋ฅผ ํตํด Non-blocking ๊ตฌํ์ ํ์์ด์. WebFlux ํฌ์คํ ์์ ๋ค์ํ ์์๋ฅผ ํ์ธํ ์ ์์ต๋๋ค.
Reference : https://www.inflearn.com/news/72620
2.1 Async + Blocking( CompletableFuture )
- CompletableFuture ๋ฅผ ํ์ฉํ ์ฝ๋
void async_blocking() throws ExecutionException, InterruptedException {
ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
System.out.println("[Thread 1] - ์์
์์ํ ๊ฒ์");
// Async
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
System.out.println("[Thread 2] - [Thread 1]์ผ๋ก๋ถํฐ ์ ๋ฌ๋ฐ์ ์์
์ฒ๋ฆฌํ ๊ฒ์");
return "Thread 2์ ๊ฒฐ๊ณผ๋ฌผ";
},t);
// Blocking
String result = completableFuture.get();
System.out.println("[Thread 1] - ๋๋ฌ๊ตฐ์! ๊ฒฐ๊ณผ๋ฌผ์ : \""+result+"\", ์ด์ ๋ค์ ์์
์ํํ ๊ฒ์");
System.out.println("[Thread 1] ๋ค์ ์์
์ํ ์ค...");
t.destroy();
}
- ๊ฒฐ๊ณผ
[Thread 1] - ์์
์์ํ ๊ฒ์
[Thread 2] - [Thread 1]์ผ๋ก๋ถํฐ ์ ๋ฌ๋ฐ์ ์์
์ฒ๋ฆฌํ ๊ฒ์
[Thread 1] - ๋๋ฌ๊ตฐ์! ๊ฒฐ๊ณผ๋ฌผ์ : "Thread 2์ ๊ฒฐ๊ณผ๋ฌผ", ์ด์ ๋ค์ ์์
์ํํ ๊ฒ์
[Thread 1] ๋ค์ ์์
์ํ ์ค...
2.2 Async + Non-blocking( CompletableFuture + WebFlux )
- CompletableFuture ์ WebFlux ๋ฅผ ์ถ๊ฐํ ์ฝ๋
void async_non_blocking() throws ExecutionException, InterruptedException {
ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
Sinks.Many<Object> sinks = Sinks.many().replay().all();
System.out.println("[Thread 1] - ์์
์์ํ ๊ฒ์");
// Async
CompletableFuture.runAsync(() -> {
try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
System.out.println("[Thread 2] - [Thread 1]์ผ๋ก๋ถํฐ ์ ๋ฌ๋ฐ์ ์์
์ฒ๋ฆฌํ ๊ฒ์");
sinks.tryEmitNext("Thread 2์ ๊ฒฐ๊ณผ๋ฌผ");
},t);
// Non-Blocking
sinks.asFlux().log().subscribe(result->{
System.out.println("[Thread 1] - ๋๋ฌ๊ตฐ์! ๊ฒฐ๊ณผ๋ฌผ์ : \""+result+"\", ์ด์ ๋ค์ ์์
์ํํ ๊ฒ์");
});
System.out.println("[Thread 1] ๋ค์ ์์
์ํ ์ค...");
try {Thread.sleep(1000);} catch (InterruptedException e) {throw new RuntimeException(e);}
t.destroy();
}
- ๊ฒฐ๊ณผ
[Thread 1] - ์์
์์ํ ๊ฒ์
[Thread 1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
[Thread 1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | request(unbounded)
[Thread 1] ๋ค์ ์์
์ํ ์ค...
[Thread 2] - [Thread 1]์ผ๋ก๋ถํฐ ์ ๋ฌ๋ฐ์ ์์
์ฒ๋ฆฌํ ๊ฒ์
[Thread 2] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onNext(Thread 2์ ๊ฒฐ๊ณผ๋ฌผ)
[Thread 1] - ๋๋ฌ๊ตฐ์! ๊ฒฐ๊ณผ๋ฌผ์ : "Thread 2์ ๊ฒฐ๊ณผ๋ฌผ", ์ด์ ๋ค์ ์์
์ํํ ๊ฒ์
2.3 Sync + Blocking( IORead )
์ด ๋ถ๋ถ์ file.read()
, file.write()
์ ๊ฐ์ด ๊ฐ๋จํ ์์์ด๊ธฐ๋๋ฌธ์ ์๋ตํ๊ฒ ์ต๋๋ค.
2.4 Sync + Non-blocking( CompletableFuture )
์ด ๋ถ๋ถ์ ๊ฐ๋ตํ ์์๋ฅผ ๋ค๊ธฐ ์ํด CompletableFuture ์ ์ฌ์ฉํ์ด์. ํ์ง๋ง! ์ด ์์๋ ์๋ฒฝํ Sync + Non-blocking ์์๊ฐ ์๋๋ผ๋ ์ ์ ์์์ฃผ์ธ์. ์๋ํ๋ฉด ์๋ ์ฝ๋์ completableFuture.get()
๋ ๊ฒฐ๊ตญ Blocking ์ด๊ฑฐ๋ ์. ๋ค๋ง ์ด์ ์ completableFuture.isDone()
์ผ๋ก Thread 1
์ด ์ง์ ์ ์ผ๋ก Thread 2
์๊ฒ ์์
์๋ฃ์ฌ๋ถ๋ฅผ ๋ฌผ์ด๋ณด๊ธฐ๋๋ฌธ์ ์๋์ ๊ฐ์ ์์๋ฅผ ๋ค์์ต๋๋ค. ์ค์ Sync + Non-blocking ์ asyncFileChannel.read
์ ๊ฐ์ ์์์
๋๋ค.
void sync_non_blocking() throws ExecutionException, InterruptedException {
ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
System.out.println("[Thread 1] - ์์
์์ํ ๊ฒ์");
// Async ์ด์ง๋ง, ์๋์ completableFuture.isDone()์ ํตํ callBack ๋ฌด์๋ก ๊ฒฐ๋ก ์ Sync
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("[Thread 2] - [Thread 1]์ผ๋ก๋ถํฐ ์ ๋ฌ๋ฐ์ ์์
์ฒ๋ฆฌํ ๊ฒ์");
try {Thread.sleep(500);} catch (InterruptedException e) {throw new RuntimeException(e);}
return "Thread 2์ ๊ฒฐ๊ณผ๋ฌผ";
},t);
// Non-Blocking
while(!completableFuture.isDone()){
Thread.sleep(200);
System.out.println("[Thread 1] - Thread 2๋ ์์
์ด ๋๋ฌ๋์? ๊ทธ๋์ ์ ๋ ๋ค๋ฅธ์ผ ์ข ํ ๊ฒ์");
System.out.println("[Thread 1] - ๋ค๋ฅธ ์ผ ์ค...");
}
// ๋ค์ ์์
System.out.println("[Thread 1] - ๋๋ฌ๊ตฐ์! ๊ฒฐ๊ณผ๋ฌผ์ : \""+completableFuture.get()+"\", ์ด์ ๋ค์ ์์
์ํํ ๊ฒ์");
System.out.println("[Thread 1] ๋ค์ ์์
์ํ ์ค...");
t.destroy();
}
- ๊ฒฐ๊ณผ
[Thread 1] - ์์
์์ํ ๊ฒ์
[Thread 2] - [Thread 1]์ผ๋ก๋ถํฐ ์ ๋ฌ๋ฐ์ ์์
์ฒ๋ฆฌํ ๊ฒ์
[Thread 1] - Thread 2๋ ์์
์ด ๋๋ฌ๋์? ๊ทธ๋์ ์ ๋ ๋ค๋ฅธ์ผ ์ข ํ ๊ฒ์
[Thread 1] - ๋ค๋ฅธ ์ผ ์ค...
[Thread 1] - Thread 2๋ ์์
์ด ๋๋ฌ๋์? ๊ทธ๋์ ์ ๋ ๋ค๋ฅธ์ผ ์ข ํ ๊ฒ์
[Thread 1] - ๋ค๋ฅธ ์ผ ์ค...
[Thread 1] - Thread 2๋ ์์
์ด ๋๋ฌ๋์? ๊ทธ๋์ ์ ๋ ๋ค๋ฅธ์ผ ์ข ํ ๊ฒ์
[Thread 1] - ๋ค๋ฅธ ์ผ ์ค...
[Thread 1] - ๋๋ฌ๊ตฐ์! ๊ฒฐ๊ณผ๋ฌผ์ : "Thread 2์ ๊ฒฐ๊ณผ๋ฌผ", ์ด์ ๋ค์ ์์
์ํํ ๊ฒ์
[Thread 1] ๋ค์ ์์
์ํ ์ค...