Skip to main content Link Menu Expand (external link) Document Search Copy Copied

created at 2023-06-10

동기화 결과

  • 동기화 내용 : 인증서버에 유저 API 요청을 전달했을 떄, 인증서버와 채팅서버, 고객서버의 상태를 SSE로 받아옵니다.
  • 사용기술 스택
    • 개발 언어 : Java(BackEnd), Javascript/HTML(FrontEnd)
    • 베이스 플랫폼 : Spring
    • Async/Non-Blocking 처리 : Spring-Webflux, CompletableFuture
    • 메세지 큐 : Kafka, RebbitMQ
    • 모니터링 : ELK stack, Kafdrop
    • 실시간 처리 : SSE, STOMP
    • 환경 통합 : Docker, Spring-Cloud(Gateway, Config, Eureka)
  • 오렌지 : 대기중, 초록 : 성공, 빨강 : 실패(중복발견)

중복없는 유저 생성 시, Success

img img

중복인 유저 생성 시, Fail(어느 서비스에서 중복이 발생하면 해당 서비스만 빨간불)

img img

Youtube : https://www.youtube.com/watch?v=0g_3M711bwI


FrontEnd/BackEnd 의 수정된 회원가입 플로우

메인 플로우 순서

  1. [Front Server]
    • API Gateway 에 별도의 스레드로 회원가입 API 요청
    • 이 API 요청은 SSE로 응답을 받게 되는데, 실시간으로 수신받으면서 브라우저에 표시하기 위해 STOMP를 이용하여 따로 이벤트 채널을 만듬
  2. [API Gateway] : Eureka 에 회원가입 API 을 제공하는 동일 서비스 목록 요청
  3. [Eureka] : 서비스를 제공하는 서버 목록 반환
  4. [API Gateway] : 서버 목록을 RR 로드밸런싱하여 요청 전송
  5. [User Server]-(인증서버) : AWS-RDS 확인 후, Kafka(Topic:user.newUser.req) 에 이벤트 전송
  6. [Chat Server, Customer Server] : Kafka(Topic:user.newUser.req) 로부터 이벤트를 읽고, 자신들의 RDB 확인 후 Kafka(Topic:user.newUser.res) 에 회원가입 결과 전송
  7. [User Server]-(인증서버)
    • Kafka(Topic:user.newUser.res) 로부터 이벤트를 읽고, 결과의 Status 에 따라 Redis 캐시에 저장한 이벤트 트랜젝션 업데이트
    • Redis 캐시의 이벤트 트랜젝션이 업데이트될 때마다 SSE 로 API API Gateway 에 전송
  8. [API Gateway] : Front Server 에 응답 리다이렉트
  9. [Front Server] : SSE 를 수신받으면서 Stomp 채널에 실시간 표시

동시에 실행되고 있는 플로우

  • [Stomp Protocol] (메인플로우-1 ~ 메인플로우-9)
    • 메인플로우-1 에서 채널이 생성되며, 백엔드로부터 응답받는 SSE 를 실시간으로 브라우저에 표시함
  • [ELK Server] (별도의 서비스)
    • Logstash : Kafka의 회원가입 관련 토픽(user.newUser.req,user.newUser.res)들을 실시간으로 읽고 Elastic Search 의 인덱스에 삽입함
    • Kibana : Elastic Search 의 인덱스들을 그래프로 표현하며 실시간 트래픽 양을 보여줌
  • [Spring Cloud Config Server] (별도의 서비스)
    • 여러 서버에 회원가입 시 필요한 설정파일들을 통합하여 제공함
  • [RebbitMQ] (별도의 서비스)
    • 설정파일들을 Actuator 로 busrefresh 하게 되면, RebbitMQ를 통해 연결된 서버에 전부 배포됨

1. FE/BE 동기화

1.1 FE Async - Blocking 처리

  • 메인 스레드는 Flux 스레드로부터 onComplete()을 수신받을 때 까지 대기해야합니다. 그래서 synchronizenotify, wait를 통해 mutex lock 을 사용하였습니다. 결과로 메인 스레드는 block 되어 BackEnd에서 SSE를 모두 수신받을 때 까지 대기합니다.
public class UserController {
  @PostMapping
  public CompletableFuture<String> addUser(@Valid @ModelAttribute("userForm") UserForm form,
                                           BindingResult bindingResult,
                                           Model model) {

    // Form 에러 모델 전달
    if (bindingResult.hasErrors()) {
      return CompletableFuture.completedFuture("users/addUserForm");
    }
    
    RequestUser req = new RequestUser();
    req.setUserId(form.getUserId());
    req.setUserPw(form.getUserPw());
    req.setEmail(form.getEmail());
    req.setUserName(form.getUserName());
    req.setRole("ROLE_USER"); // 기본적으로 일반 롤 부여. 이것 이외로 ROLE_ADMIN 을 처리할 수 있습니다.

    // 동시성을 위한 별도 스레드 풀 사용.
    // 얘는 block 되지만, Nio 스레드를 쉬게 해주기 위해 별도의 스레드를 사용하게 됩니다.
    return CompletableFuture.supplyAsync(() -> {
      Flux<AddUserResponse> res = webClient.mutate()
              .build()
              .post()
              .uri("http://127.0.0.1:8000/user")
              .bodyValue(req)
              .retrieve()
              .onStatus(
                      HttpStatus::is4xxClientError,
                      r -> r.bodyToMono(ErrorResponse.class).map(CustomThrowableException::new))
              .onStatus(
                      HttpStatus::is5xxServerError,
                      r -> r.bodyToMono(ErrorResponse.class).map(CustomThrowableException::new))
              .bodyToFlux(AddUserResponse.class);

      // Mutex Lock 을 위한 객체
      final Object lock = new Object();

      // 백엔드에서 onComplete 이벤트 수신 시, lock 풀기
      res.doOnComplete(() -> {
        synchronized (lock) {
          lock.notify();
        }
      }).subscribe(response -> {
          // 서비스 별 결과 전송
        template.convertAndSend("/sub/user/" + req.getUserId(), response); 
      });

      // Mutex Lock
      synchronized (lock) {
        try {
          lock.wait();
        } catch (InterruptedException e) {
        }
      }
      
      // 로그인 페이지로 자동 리다이렉트
      return "redirect:/";
    }).exceptionally((e) -> {
      bindingResult.rejectValue("userId", null, e.getMessage());
      return "users/addUserForm";
    });
  }
}

1.2 BE 수정

1.2.1 이벤트 수정

회원가입을 하면 인증서버로 API가 전송됩니다. 이 때 인증서버는 채팅/고객 서버에 유저를 만들어주는 이벤트를 발행합니다. 이 이벤트를 수정하려고합니다.

  • 기존 이벤트
  • 채팅/고객 서버 중 하나라도 유저가 존재한다면, 인증서버에서 Rollback 이벤트를 전송하여 채팅/고객 서버의 유저를 삭제합니다.
  • 인증서버에 유저가 존재하면, 채팅/고객서버에 유저를 추가하지 않습니다.
  • 수정된 이벤트
  • 인증서버는 채팅/고객서버에 유저를 자동으로 동기화합니다.
    1. 인증서버에 유저 저장 및 유지
    2. 채팅/고객서버에 유저 추가 이벤트 전송(발신 토픽 : user.newUser.req, 수신 토픽 : user.newUser.res)
    3. 채팅/고객서버는 유저를 저장 및 유지하고, 중복유저 유무를 이벤트로 보냅니다(발신 토픽 : user.newUser.res)

1.2.2 SSE 수정

  • 유저저장 API 처리 시, 맨 초기에 대기상태로 저장 된 부분또한 WebFlux 통해 SSE 로 전송합니다.
  • 성능향상을 위한 .log() 제거