1. WebFlux 한줄 정의
WebFlux는 Reactive Programming 을 편하게 사용가능하게하는 Spring 제공 객체로, Pub/Sub 방식으로 이벤트 메세지를 주고받습니다.
Reactive Programming : 데이터 스트림과 변경 사항 전파를 중심으로하는 비동기 프로그래밍 패러다임입니다.
2. WebFlux + Future 예시
2-1. CODE
void multiThreadSink(){
// Sinks 객체를 통해 Flux를 여러 스레드에서 접근가능합니다.
// Sinks.many().replay().all() = sink의 flux에 신규 구독자 추가되면, 이전 메세지들 전부 날려줍니다.
Sinks.Many<Object> sinks = Sinks.many().replay().all();
ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
// 퓨처를 하나 만들어서 새로운 스레드로 sinks의 통합 flux를 구독합니다.
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
sinks.asFlux().log().subscribe(c -> {
logger.debug("SUBSCRIBE-task-2: "+c);
// 결과(본 스레드가 이전 메세지 정보들을 불러옵니다)
// SUBSCRIBE-task-2: First <- task-1 스레드
// SUBSCRIBE-task-2: Thrid <- task-1 스레드
// 별도의 비동기 스레드이기 때문에, First/Thrid 둘 다 없을 수도 있어요
});
return "Completed";
},t);
logger.info("emit 1 start");
sinks.tryEmitNext("First");
logger.info("emit 1 end");
// 퓨처를 하나 만들어서 새로운 스레드로 sinks의 통합 flux를 구독합니다.
CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
Flux<Object> flux = sinks.asFlux();
flux.log().subscribe(c -> {
logger.debug("SUBSCRIBE-task-3: "+c);
// 결과(본 스레드가 이전 메세지 정보들을 불러옵니다)
// SUBSCRIBE-task-3: First <- task-2 스레드
// SUBSCRIBE-task-3: Thrid <- task-2 스레드
// 별도의 비동기 스레드이기 때문에, First/Thrid 둘 다 없을 수도 있어요
});
// 이 때, 추가적으로 스레드 내부에서 sinks에 "Second" 메세지를 Publish 해볼게요
logger.info("emit 2 start");
sinks.tryEmitNext("Second");
logger.info("emit 2 end");
// 결과(현재 sink의 flux 를 subscribe하는 구독자들에게 직접 본 스레드가 publishing)
// SUBSCRIBE-task-1: Second <- task-2 스레드
// SUBSCRIBE-task-2: Second <- task-2 스레드
// SUBSCRIBE-task-3: Second <- task-2 스레드
return "Completed";
},t);
logger.info("emit 3 start");
sinks.tryEmitNext("Third");
logger.info("emit 3 end");
sinks.asFlux().subscribe(str->{
logger.info("SUBSCRIBE-task-1: "+(String) str);
});
try {
String s1 = cf1.get();
String s2 = cf2.get();
logger.info("emit 4 start");
sinks.tryEmitNext("Forth");
logger.info("emit 4 end");
// 결과(현재 sink의 flux 를 subscribe하는 구독자들에게 직접 본 스레드가 publishing)
// SUBSCRIBE-task-1: Forth <- main 스레드
// SUBSCRIBE-task-2: Forth <- main 스레드
// SUBSCRIBE-task-3: Forth <- main 스레드
} catch (InterruptedException e) {throw new RuntimeException(e);}
catch (ExecutionException e) {throw new RuntimeException(e);}
}
// 스레드 풀 설정
@NotNull
private static ThreadPoolTaskExecutor getThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor t = new ThreadPoolTaskExecutor();
t.setCorePoolSize(5);
t.setMaxPoolSize(10);
t.setQueueCapacity(10);
t.setThreadNamePrefix("task-");
t.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
t.setWaitForTasksToCompleteOnShutdown(true);
t.initialize();
return t;
}
2-2. RESULTS
17:41:21.723 [Test worker] INFO com.example.shopuserservice.FluxTest -- emit 1 start
17:41:21.724 [Test worker] INFO com.example.shopuserservice.FluxTest -- emit 1 end
17:41:21.725 [Test worker] INFO com.example.shopuserservice.FluxTest -- emit 3 start
17:41:21.725 [Test worker] INFO com.example.shopuserservice.FluxTest -- emit 3 end
17:41:21.734 [Test worker] INFO com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-1: First
17:41:21.734 [Test worker] INFO com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-1: Third
# task-1, task-2 스레드 Sinks의 Flux 구독시작
# 이전 Sinks에 설정한 replay().all() 로 인해 task-1,task-2는 이전 메세지를 전부 가져옵니다
# replay().all(int number_of_gets)로 바꿔주면, 복사하는 메세지의 개수를 정할 수 있어요
17:41:21.735 [task-1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
17:41:21.735 [task-2] INFO reactor.Flux.SinkManyReplayProcessor.2 -- | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
17:41:21.739 [task-2] INFO reactor.Flux.SinkManyReplayProcessor.2 -- | request(unbounded)
17:41:21.739 [task-1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | request(unbounded)
17:41:21.740 [task-1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onNext(First)
17:41:21.740 [task-1] DEBUG com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-2: First
17:41:21.740 [task-2] INFO reactor.Flux.SinkManyReplayProcessor.2 -- | onNext(First)
17:41:21.740 [task-1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onNext(Third)
17:41:21.740 [task-1] DEBUG com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-2: Third
17:41:21.740 [task-2] DEBUG com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-3: First
17:41:21.741 [task-2] INFO reactor.Flux.SinkManyReplayProcessor.2 -- | onNext(Third)
Completed
17:41:21.741 [task-2] DEBUG com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-3: Third
# task-2 스레드에서 sinks에 "Second" 메세지를 전송하고, Subscribers에게 배달합니다.
17:41:21.741 [task-2] INFO com.example.shopuserservice.FluxTest -- emit 2 start
17:41:21.741 [task-2] INFO com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-1: Second
17:41:21.741 [task-2] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onNext(Second)
17:41:21.741 [task-2] DEBUG com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-2: Second
17:41:21.741 [task-2] INFO reactor.Flux.SinkManyReplayProcessor.2 -- | onNext(Second)
17:41:21.741 [task-2] DEBUG com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-3: Second
17:41:21.741 [task-2] INFO com.example.shopuserservice.FluxTest -- emit 2 end
Completed
# main 스레드에서 sinks에게 "Forth" 메세지를 전송하고, SubScribers에게 배달합니다.
17:41:21.742 [Test worker] INFO com.example.shopuserservice.FluxTest -- emit 4 start
17:41:21.742 [Test worker] INFO com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-1: Forth
17:41:21.742 [Test worker] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onNext(Forth)
17:41:21.742 [Test worker] DEBUG com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-2: Forth
17:41:21.742 [Test worker] INFO reactor.Flux.SinkManyReplayProcessor.2 -- | onNext(Forth)
17:41:21.742 [Test worker] DEBUG com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-3: Forth
17:41:21.743 [Test worker] INFO com.example.shopuserservice.FluxTest -- emit 4 end
3. Sinks 메소드
- many().multicast(): a sink that will transmit only newly pushed data to its subscribers, honoring their backpressure (newly pushed as in “after the subscriber’s subscription”).
- many().unicast(): same as above, with the twist that data pushed before the first subscriber registers is buffered.
- many().replay(): a sink that will replay a specified history size of pushed data to new subscribers then continue pushing new data live.
4. Bifunction Webflux
4-1. CODE
void bifunction(){
Flux<String> flux = Flux.generate(
() -> 0, // 초기 객체 세팅
(state, sink) -> {
sink.next("3 x " + state + " = " + 3*state); // sinks.onNext() 호출
if (state == 10) sink.complete(); // sinks.onComplete() 호출
return state + 1;
});
CompletableFuture cf = CompletableFuture.runAsync(() -> {
flux.log().subscribe(str -> {
System.out.println(str);
});
});
flux.log().subscribe(s->{
System.out.println(s);
});
}
}
4-2. RESULTS
18:36:06.283 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onSubscribe([Fuseable] FluxGenerate.GenerateSubscription)
18:36:06.283 [Test worker] INFO reactor.Flux.Generate.1 -- | onSubscribe([Fuseable] FluxGenerate.GenerateSubscription)
18:36:06.286 [Test worker] INFO reactor.Flux.Generate.1 -- | request(unbounded)
18:36:06.286 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | request(unbounded)
18:36:06.292 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 0 = 0)
18:36:06.292 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 0 = 0)
3 x 0 = 0
3 x 0 = 0
18:36:06.293 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 1 = 3)
3 x 1 = 3
18:36:06.293 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 1 = 3)
3 x 1 = 3
18:36:06.293 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 2 = 6)
3 x 2 = 6
18:36:06.294 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 2 = 6)
18:36:06.294 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 3 = 9)
3 x 2 = 6
3 x 3 = 9
18:36:06.294 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 3 = 9)
3 x 3 = 9
18:36:06.294 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 4 = 12)
3 x 4 = 12
18:36:06.294 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 4 = 12)
18:36:06.294 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 5 = 15)
3 x 4 = 12
3 x 5 = 15
18:36:06.294 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 5 = 15)
18:36:06.294 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 6 = 18)
3 x 5 = 15
3 x 6 = 18
18:36:06.294 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 6 = 18)
3 x 6 = 18
18:36:06.295 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 7 = 21)
3 x 7 = 21
18:36:06.295 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 7 = 21)
3 x 7 = 21
18:36:06.295 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 8 = 24)
18:36:06.295 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 8 = 24)
3 x 8 = 24
3 x 8 = 24
18:36:06.295 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 9 = 27)
18:36:06.295 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 9 = 27)
3 x 9 = 27
3 x 9 = 27
18:36:06.295 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 10 = 30)
3 x 10 = 30
18:36:06.295 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 10 = 30)
3 x 10 = 30
18:36:06.296 [Test worker] INFO reactor.Flux.Generate.1 -- | onComplete()
18:36:06.296 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onComplete()