Skip to main content Link Menu Expand (external link) Document Search Copy Copied

1. WebFlux 한줄 정의

WebFlux는 Reactive Programming 을 편하게 사용가능하게하는 Spring 제공 객체로, Pub/Sub 방식으로 이벤트 메세지를 주고받습니다.

Reactive Programming : 데이터 스트림과 변경 사항 전파를 중심으로하는 비동기 프로그래밍 패러다임입니다.

2. WebFlux + Future 예시

2-1. CODE

void multiThreadSink(){
    // Sinks 객체를 통해 Flux를 여러 스레드에서 접근가능합니다.
    // Sinks.many().replay().all() = sink의 flux에 신규 구독자 추가되면, 이전 메세지들 전부 날려줍니다.
    Sinks.Many<Object> sinks = Sinks.many().replay().all();
    ThreadPoolTaskExecutor t = getThreadPoolTaskExecutor();
    
    // 퓨처를 하나 만들어서 새로운 스레드로 sinks의 통합 flux를 구독합니다.
    CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
        sinks.asFlux().log().subscribe(c -> {
            logger.debug("SUBSCRIBE-task-2: "+c);
            // 결과(본 스레드가 이전 메세지 정보들을 불러옵니다)
            // SUBSCRIBE-task-2: First <- task-1 스레드
            // SUBSCRIBE-task-2: Thrid <- task-1 스레드
            // 별도의 비동기 스레드이기 때문에, First/Thrid 둘 다 없을 수도 있어요
        });
        return "Completed";
    },t);
    logger.info("emit 1 start");
    sinks.tryEmitNext("First");
    logger.info("emit 1 end");
    

    // 퓨처를 하나 만들어서 새로운 스레드로 sinks의 통합 flux를 구독합니다.
    CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(() -> {
        Flux<Object> flux = sinks.asFlux();
        flux.log().subscribe(c -> {
            logger.debug("SUBSCRIBE-task-3: "+c);
            // 결과(본 스레드가 이전 메세지 정보들을 불러옵니다)
            // SUBSCRIBE-task-3: First <- task-2 스레드
            // SUBSCRIBE-task-3: Thrid <- task-2 스레드
            // 별도의 비동기 스레드이기 때문에, First/Thrid 둘 다 없을 수도 있어요
        });
        
        // 이 때, 추가적으로 스레드 내부에서 sinks에 "Second" 메세지를 Publish 해볼게요
        logger.info("emit 2 start");
        sinks.tryEmitNext("Second"); 
        logger.info("emit 2 end");
        // 결과(현재 sink의 flux 를 subscribe하는 구독자들에게 직접 본 스레드가 publishing)
        // SUBSCRIBE-task-1: Second <- task-2 스레드
        // SUBSCRIBE-task-2: Second <- task-2 스레드
        // SUBSCRIBE-task-3: Second <- task-2 스레드
        return "Completed";
    },t);
    logger.info("emit 3 start");
    sinks.tryEmitNext("Third");
    logger.info("emit 3 end");
    sinks.asFlux().subscribe(str->{
        logger.info("SUBSCRIBE-task-1: "+(String) str);
    });

    try {
        String s1 = cf1.get();
        String s2 = cf2.get();
        logger.info("emit 4 start");
        sinks.tryEmitNext("Forth");
        logger.info("emit 4 end");
        // 결과(현재 sink의 flux 를 subscribe하는 구독자들에게 직접 본 스레드가 publishing)
        // SUBSCRIBE-task-1: Forth <- main 스레드
        // SUBSCRIBE-task-2: Forth <- main 스레드
        // SUBSCRIBE-task-3: Forth <- main 스레드
    } catch (InterruptedException e) {throw new RuntimeException(e);} 
    catch (ExecutionException e) {throw new RuntimeException(e);}
}


// 스레드 풀 설정
@NotNull
private static ThreadPoolTaskExecutor getThreadPoolTaskExecutor() {
    ThreadPoolTaskExecutor t = new ThreadPoolTaskExecutor();
    t.setCorePoolSize(5);
    t.setMaxPoolSize(10);
    t.setQueueCapacity(10);
    t.setThreadNamePrefix("task-");
    t.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    t.setWaitForTasksToCompleteOnShutdown(true);
    t.initialize();
    return t;
}

2-2. RESULTS

17:41:21.723 [Test worker] INFO com.example.shopuserservice.FluxTest -- emit 1 start
17:41:21.724 [Test worker] INFO com.example.shopuserservice.FluxTest -- emit 1 end
17:41:21.725 [Test worker] INFO com.example.shopuserservice.FluxTest -- emit 3 start
17:41:21.725 [Test worker] INFO com.example.shopuserservice.FluxTest -- emit 3 end
17:41:21.734 [Test worker] INFO com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-1: First
17:41:21.734 [Test worker] INFO com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-1: Third

# task-1, task-2 스레드 Sinks의 Flux 구독시작
# 이전 Sinks에 설정한 replay().all() 로 인해 task-1,task-2는 이전 메세지를 전부 가져옵니다
# replay().all(int number_of_gets)로 바꿔주면, 복사하는 메세지의 개수를 정할 수 있어요
17:41:21.735 [task-1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
17:41:21.735 [task-2] INFO reactor.Flux.SinkManyReplayProcessor.2 -- | onSubscribe([Fuseable] SinkManyReplayProcessor.ReplayInner)
17:41:21.739 [task-2] INFO reactor.Flux.SinkManyReplayProcessor.2 -- | request(unbounded)
17:41:21.739 [task-1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | request(unbounded)
17:41:21.740 [task-1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onNext(First)
17:41:21.740 [task-1] DEBUG com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-2: First
17:41:21.740 [task-2] INFO reactor.Flux.SinkManyReplayProcessor.2 -- | onNext(First)
17:41:21.740 [task-1] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onNext(Third)
17:41:21.740 [task-1] DEBUG com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-2: Third
17:41:21.740 [task-2] DEBUG com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-3: First
17:41:21.741 [task-2] INFO reactor.Flux.SinkManyReplayProcessor.2 -- | onNext(Third)
Completed
17:41:21.741 [task-2] DEBUG com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-3: Third

# task-2 스레드에서 sinks에 "Second" 메세지를 전송하고, Subscribers에게 배달합니다.
17:41:21.741 [task-2] INFO com.example.shopuserservice.FluxTest -- emit 2 start
17:41:21.741 [task-2] INFO com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-1: Second
17:41:21.741 [task-2] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onNext(Second)
17:41:21.741 [task-2] DEBUG com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-2: Second
17:41:21.741 [task-2] INFO reactor.Flux.SinkManyReplayProcessor.2 -- | onNext(Second)
17:41:21.741 [task-2] DEBUG com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-3: Second
17:41:21.741 [task-2] INFO com.example.shopuserservice.FluxTest -- emit 2 end
Completed

# main 스레드에서 sinks에게 "Forth" 메세지를 전송하고, SubScribers에게 배달합니다.
17:41:21.742 [Test worker] INFO com.example.shopuserservice.FluxTest -- emit 4 start
17:41:21.742 [Test worker] INFO com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-1: Forth
17:41:21.742 [Test worker] INFO reactor.Flux.SinkManyReplayProcessor.1 -- | onNext(Forth)
17:41:21.742 [Test worker] DEBUG com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-2: Forth
17:41:21.742 [Test worker] INFO reactor.Flux.SinkManyReplayProcessor.2 -- | onNext(Forth)
17:41:21.742 [Test worker] DEBUG com.example.shopuserservice.FluxTest -- SUBSCRIBE-task-3: Forth
17:41:21.743 [Test worker] INFO com.example.shopuserservice.FluxTest -- emit 4 end

3. Sinks 메소드

  • many().multicast(): a sink that will transmit only newly pushed data to its subscribers, honoring their backpressure (newly pushed as in “after the subscriber’s subscription”).
  • many().unicast(): same as above, with the twist that data pushed before the first subscriber registers is buffered.
  • many().replay(): a sink that will replay a specified history size of pushed data to new subscribers then continue pushing new data live.

4. Bifunction Webflux

4-1. CODE

void bifunction(){
    Flux<String> flux = Flux.generate(
            () -> 0, // 초기 객체 세팅
            (state, sink) -> {
                sink.next("3 x " + state + " = " + 3*state); // sinks.onNext() 호출
                if (state == 10) sink.complete(); // sinks.onComplete() 호출
                return state + 1;
            });
            
    CompletableFuture cf = CompletableFuture.runAsync(() -> {
        flux.log().subscribe(str -> {
            System.out.println(str);
        });
    });
    flux.log().subscribe(s->{
        System.out.println(s);
    });
}
}

4-2. RESULTS

18:36:06.283 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onSubscribe([Fuseable] FluxGenerate.GenerateSubscription)
18:36:06.283 [Test worker] INFO reactor.Flux.Generate.1 -- | onSubscribe([Fuseable] FluxGenerate.GenerateSubscription)
18:36:06.286 [Test worker] INFO reactor.Flux.Generate.1 -- | request(unbounded)
18:36:06.286 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | request(unbounded)
18:36:06.292 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 0 = 0)
18:36:06.292 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 0 = 0)
3 x 0 = 0
3 x 0 = 0
18:36:06.293 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 1 = 3)
3 x 1 = 3
18:36:06.293 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 1 = 3)
3 x 1 = 3
18:36:06.293 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 2 = 6)
3 x 2 = 6
18:36:06.294 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 2 = 6)
18:36:06.294 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 3 = 9)
3 x 2 = 6
3 x 3 = 9
18:36:06.294 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 3 = 9)
3 x 3 = 9
18:36:06.294 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 4 = 12)
3 x 4 = 12
18:36:06.294 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 4 = 12)
18:36:06.294 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 5 = 15)
3 x 4 = 12
3 x 5 = 15
18:36:06.294 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 5 = 15)
18:36:06.294 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 6 = 18)
3 x 5 = 15
3 x 6 = 18
18:36:06.294 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 6 = 18)
3 x 6 = 18
18:36:06.295 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 7 = 21)
3 x 7 = 21
18:36:06.295 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 7 = 21)
3 x 7 = 21
18:36:06.295 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 8 = 24)
18:36:06.295 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 8 = 24)
3 x 8 = 24
3 x 8 = 24
18:36:06.295 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 9 = 27)
18:36:06.295 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 9 = 27)
3 x 9 = 27
3 x 9 = 27
18:36:06.295 [Test worker] INFO reactor.Flux.Generate.1 -- | onNext(3 x 10 = 30)
3 x 10 = 30
18:36:06.295 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onNext(3 x 10 = 30)
3 x 10 = 30
18:36:06.296 [Test worker] INFO reactor.Flux.Generate.1 -- | onComplete()
18:36:06.296 [ForkJoinPool.commonPool-worker-1] INFO reactor.Flux.Generate.2 -- | onComplete()