Skip to main content Link Menu Expand (external link) Document Search Copy Copied

created at 2023-04-17

1. Saga Choreography Architecture

v5 버전인 현재 구현진행/완료 중인 아키텍처입니다.

img

2. Saga Choreography 이벤트 성공값 클라이언트 반환 feat. SSE

저는 신규유저 추가 기능을 Saga의 Choreography 로 구현하였는데요. 여기서 기능적으로 힘들었던 부분이 있었습니다. 바로 이벤트들이 모두 처리되었을 때, 클라이언트에게 그 결과값을 반환하는 부분이였어요. 그래서 저는 위의 그림에서도 설명하였던 Server-Sent-EventWebFlux 로 이를 해결하였습니다. 먼저 결과부터 볼까요?

2.1 Saga Choreography 이벤트 성공값 클라이언트 반환 결과

img

userStatus/chatStatus/customerStatus 이 세 가지가 Kafka MQ 백본망을 통해 처리됩니다. 그리고 SSE 로 stream 형식으로 결과들이 하나씩 전송됩니다.

2.2 하나의 클라이언트 당 하나의 Sinks 생성

저는 Netty 로 웹서버를 구축했는데요. Netty는 Spring-WebFlux를 지원합니다. 그리고 WebFlux 는 멀티 스레드로부터 하나의 채널을 구축하도록 도와주는 Sinks 객체를 지원합니다.

우리가 해결해야 하는 문제는 여러 스레드로부터 하나의 채널을 통해 지속적으로 클라이언트가 결과값을 반환받도록 하는것이죠? 이 채널을 Sinks 객체로 설정한다면 해결할 수 있습니다!

2.2.1 Concurrent Sinks Map 생성

public static ConcurrentHashMap<String, Sinks.Many<Object>> sinkMap = new ConcurrentHashMap<>();

여러 스레드의 접근으로부터 lock 해주는 ConcurrentHashMap 을 사용합니다. Key 값으로는 userId를 사용할 것이며, Value 로는 하나의 채널 즉 Sinks 객체를 할당해줍니다. 이는 @Configuration 에다가 static 으로 설정하였습니다.

2.2.2 Kafka Listener Thread 에서 Sinks 꺼내서 Flux streaming

@Slf4j
@Component
@RequiredArgsConstructor
public class MessageListener {
    private final UserCommandQueryService userService;
    private final KafkaTemplate<String, Object> kafkaProducerTemplate;

    // concurrency를 partition 개수에 맞추어 설정하는 것이 중요합니다!
    // 저는 partition 개수를 5로 설정해서 concurrency 파라미터에 "5"로 설정해주어야 합니다.
    @KafkaListener(topics = KafkaTopic.userRes, containerFactory = "userKafkaListenerContainerFactory", concurrency = KafkaTopicPartition.userRes)
    public void listenUser(UserResponseEvent req) {
        log.info("메세지 도착 = {}", req.getServiceName());

        // 이벤트 Status 를 Trancation 테이블에 업데이트 시켜줍니다.
        userService.updateStatus(req).exceptionally(e->{
            // 이벤트 응답들을 처리할 때 에러가 난다면, sinkMap 에서 Sinks 를 가져와서 tryEmitError()를 통해
            // 에러 emit + onComplete 를 클라이언트에게 전송합니다.
            AsyncConfig.sinkMap.get(req.getUserId()).tryEmitError(e);
            return null;
        });
    }

    private void sendToKafka(String topic,Object req) {
        kafkaProducerTemplate.send(topic, req).thenAccept((SendResult<String, Object> result)->{
            log.debug("메세지 전송 성공 topic={}, offset={}, partition={}",topic, result.getRecordMetadata().offset(), result.getRecordMetadata().partition());
        }).exceptionally(e->{
            log.error("메세지 전송 실패={}", e.getMessage());
            return null;
        });
    }
}

2.2.3 Service Thread 에서 Sinks 꺼내서 Flux streaming

public class userServiceImpl implements userService{
    ...
    @Override
    @Async
    @Transactional
    public CompletableFuture<UserTransaction> updateStatus(UserResponseEvent event) {
        Optional<UserTransaction> tx = userTransactionRepository.findByEventId(event.getEventId());

        if (tx.isPresent()){
            UserTransaction ut = tx.get();
            // 서비스 이름을 통해 메세지 Status 구분
            switch (event.getServiceName()){
                case ServiceNames.chat -> {
                    ut.setChatStatus(event.getUserResponseStatus());
                    break;
                }
                case ServiceNames.customer -> {
                    ut.setCustomerStatus(event.getUserResponseStatus());
                    break;
                }
            }
            String chatStatus = ut.getChatStatus();
            String customerStatus = ut.getCustomerStatus();
            String userStatus = ut.getUserStatus();
            
            // 둘 다 SUCCESS 일 경우,
            if (chatStatus.equals(UserResponseStatus.USER_SUCCES.name())
                    && customerStatus.equals(UserResponseStatus.USER_SUCCES.name())){
                // 유저 Status 에 따라 완료/미완료
                if (userStatus.equals(UserStatus.USER_INSERT.name())){

                    // 유저 저장
                    Optional<User> findUser = userRepository.findById(ut.getUserId());
                    if (!findUser.isPresent()){
                        User user = new User(
                                ut.getUserId(),
                                ut.getUserPw(),
                                ut.getEmail(),
                                ut.getUserName(),
                                LocalDateTime.now(),
                                LocalDateTime.now(),
                                LocalDateTime.now(),
                                ut.getRole()
                        );
                        userRepository.save(user);
                    }else{
                        // 유저가 이미 존재할 때
                        ut.setUserStatus(UserStatus.USER_INSERT_FAIL.name());
                        // CompletableFuture Fail 처리
                        return CompletableFuture.failedFuture(new ResponseStatusException(HttpStatus.CONFLICT, "동일한 사용자가 존재합니다"));
                    }
                    
                    // 이벤트 Transaction UPDATE
                    ut.setUserStatus(UserStatus.USER_INSERT_COMPLETE.name());

                    // 결과 SSE 클라이언트 반환
                    AsyncConfig.sinkMap.get(event.getUserId()).tryEmitNext(ut);
                    AsyncConfig.sinkMap.get(event.getUserId()).tryEmitComplete();

                } else if (userStatus.equals(UserStatus.USER_DELETE.name())) {
                    userRepository.deleteById(ut.getUserId());
                    
                    // 이벤트 Transaction UPDATE
                    ut.setUserStatus(UserStatus.USER_DELETE_COMPLETE.name());

                    // 결과 SSE 클라이언트 반환
                    AsyncConfig.sinkMap.get(event.getUserId()).tryEmitNext(ut);
                    AsyncConfig.sinkMap.get(event.getUserId()).tryEmitComplete();
                }
                
            } else if (chatStatus.equals(UserResponseStatus.USER_FAIL.name())
                    || customerStatus.equals(UserResponseStatus.USER_FAIL.name())) {
                // chatService/customerService 로 부터 응답받은 이벤트에 FAIL 이 있을 경우
                // Transaction FAIL 처리
                ut.setUserStatus(UserStatus.USER_INSERT_FAIL.name());
                return CompletableFuture.failedFuture(new ResponseStatusException(HttpStatus.CONFLICT, "동일한 사용자가 존재합니다"));
                
            } else {
                // 중간 결과값들 계속 클라이언트에게 전송
                AsyncConfig.sinkMap.get(event.getUserId()).tryEmitNext(ut);
            }
        }else{
            // 이벤트 트랜젝션이 없을 때, 
            return CompletableFuture.failedFuture(new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "없는 트랜젝션 입니다"));
        }
        return CompletableFuture.completedFuture(tx.get());
    }
    ...
}

2.2.4 Executor Threads(Netty) 에서 Sinks 꺼내서 Flux streaming

public class UserController {
    ...
    @PostMapping(value = "/user", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<?> saveUser(@RequestBody RequestUser req) throws InterruptedException {
        // saga choreograhpy tx 관리 id;
        UUID eventId = UUID.randomUUID();
        
        // SinkMap 에 클라이언트 Sinks 저장
        AsyncConfig.sinkMap.put(req.getUserId(), Sinks.many().multicast().onBackpressureBuffer());

        UserEvent userEvent = new UserEvent(
                eventId,
                UserStatus.USER_INSERT,
                req.getUserId()
        );
        
        // 이벤트 Publishing (만약 MQ가 닫혀있으면 exception)
        userCommandQueryService
                .newUserEvent(req, eventId, userEvent)
                .exceptionally(e -> {
                    if (e.getCause() instanceof CustomException) {
                        CustomException e2 = ((CustomException) e.getCause());
                        AsyncConfig.sinkMap.get(req.getUserId()).tryEmitError(new ResponseStatusException(e2.getErrorCode().getHttpStatus(), e2.getErrorCode().getDetail()));
                    } else {
                        AsyncConfig.sinkMap.get(req.getUserId()).tryEmitError(new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
                    }
                    return null;
                });
        return AsyncConfig.sinkMap.get(req.getUserId()).asFlux().log();
    }
}

문제점이 몇 개 보이는데요. 1) 클라이언트의 매 요청마다 새롭게 Sinks 를 업데이트하게 된다는 문제가 하나 있을 것 같습니다. 그리고 해당 2) ConcurrentHashMap 의 사이즈를 고려해야 한다는 점 또한 문제가 될 것 같아요. 이 부분은 고민해봐야 할 것 같습니다.