Skip to main content Link Menu Expand (external link) Document Search Copy Copied

created at 2022-12-11

1. 현재의 단계

현재까지 version1 실시간 채팅방 구현을 위한 STOMP/Spring 연동이 완료되었다. 상세한 내용과 코드는 다음의 github 링크에서 확인할 수 있다.

2. 프로젝트의 문제점

  • 부족한 장애대응

    Kafka를 사용하기 이전에는 아래와 같이 데이터를 주고받는것을 목표로 하였었다.

    Kafka 이전에 고려했던 점

    • 프론트에서 WebClient를 사용해서 RestApi호출할 때, Spring의 WebClient를 비동기 + Non-Blocking으로 설계할 것. 이렇게 되면 순서는 다음과 같음.
      1. 프론트에서 @Async어노테이션으로 비동기 private 함수 호출(이때, websocket으로 진행중표시)
      2. 비동기 함수에서 webClient를 .subscribe(result-> hashset… )로 non-blocking설정하고 restapi들(하나의 api를 호출하는 것은 non-blocking의미가 없다. 호출이 여러개여야지 한방에 다 보내고 wait으로 기다릴 떄, 성능향상을 기대할 수 있기때문)을 호출하고 wait
      3. 비동기함수 내 webClient의 response가 도착하면 websocket으로 데이터 표시

    하지만 이렇게 설계한다면, 백엔드 서버가 죽었을 때 다른 서버로 누가 대신 요청을 옮겨줄 수가 없다. 즉, 장애대응이 불가능하다.

  • 로그기록 저장 부하 발생(비동기 문제점)

    로그기록은 바로바로 처리하지 않아도 된다. 그러니까 Eventually Consistance 만 된다면 실제로 로그를 저장하는 시점을 보장하지 않아도 된다. 하지만, 기존의 구조에서는 바로바로 처리하는 구조이기에 부하가 발생했다. Future 객체나 Flux 객체로 비동기 처리가 가능하지만 결국 최종적으로 .get() 이나 .subscribe() 를 통해서 결과를 수신해야만 한다. 즉, 완전한 비동기는 아니라는 뜻이다.

3. 장애대응과 비동기는 어떻게 해결할 수 있을까? = Kafka 이벤트 스트림 사용

  1. 장애 대응 용이

    Kafka는 장애대응이 용이하다. 특정 서버가 죽었을 때, 다른 서버가 데이터를 받아서 처리하도록 설정가능하다. 이는 Kafka가 죽었을 때도 마찬가지이다. 카프카 클러스터 내 특정 데이터 큐(topic-partition)을 도맡아(leader partition) 처리하는 카프카 서버(broker)가 죽어도 다른 카프카 서버 내 싱크된 큐(follow partition)가 대신해서 처리하도록 대장 카프카 서버(controller broker)가 장애를 대응해준다. 이 대장 카프카 서버는 zookeeper에 의해 선정/관리된다.

  2. 로깅에 대한 완전한 비동기 지원

    원래는 로깅을 DB 에 INSERT 하고 그 결과까지 기다려야 했다면, Kafka 는 전달만 하고 결과까지 기다리지 않아도 된다. Consumer 이 알아서 이벤트를 꺼내쓰는 형태인 Kafka 는 로깅에 그야말로 완전한 비동기를 지원한다. 물론, 이벤트의 멱등성을 고려해야한다(이벤트 ID 를 따로 PK 로 적어준다던가 등등).

  3. (추가)확장성 용이

    카프카는 ConsumerGroup이 데이터(topic)와 매칭되어 데이터를 받을 수 있도록 도와준다. ConsumerGroup은 쉽게 생각해서 같은 기능의 서버(Consumer)를 수평적으로 확장해놓은 집합이라고 생각하면 된다. 그리고 이러한 ConsumerGroup내 서버(Consumer)들 중 하나가 죽으면, 그 서버에서 처리하던 특정 데이터(topic-partition)들을 다른 서버가 이어받아 처리하도록 도와준다.

즉, 데이터의 백본역할을 수행하는 Kafka를 통해 장애대응과 부가기능 비동기 수행에서 이점을 가져갈 수 있다.

4. 어떤식으로 Kafka 를 implement해야할까?

일단 Kafka의 topic은 stomp처럼 많이 생성할 수 없다. 이는 broker수만큼 topic의 replica를 감당해야하며, 모든 topic들의 partition(ISR-InSync-Replica)들은 follow partition들과 sync과정을 가지기 때문에 각기 broker에 걸리는 부하는 (총 broker개수)(topic개수partition개수) 이상이다. 즉, topic 개수에 limit에 존재한다. 무작정 chat.room.10 이런식으로 채팅방 별 topic을 설정할 수 없다는 것이다. 따라서 클라이언트가 직접 publish/consume할 수 없다.

Is There a Limit on the Number of Topics in a Kafka Instance?

5. 이상적인 kafka 통신방법은 무엇일까?

먼저 서버의 위치부터 정리해보자

  • client : port:8080
  • kafka 서버 : port:9092, port:9093
  • chat Front Server : port:8081 - (기능 : 세션/인증관리/웹소켓 관리, 에러처리) - Confluent Rest Proxy와 같은 역할

    Confluent Rest Proxy : 카프카를 사용하지 못하는 클라이언트 환경을 고려해 POST는 Producer로, GET은 Consumer로 HTTP를 Kafka와 매칭시켜주는 기술 or 플랫폼

  • chat Backend Server : port:8083, port:8084 - (기능 : DB관리, restapiServer, 비즈니스로직수행) consumerGroup:groupId=chat

Kafka 통신순서 예시(로컬에서의 채팅방 입장 및 채팅전송)

  • 채팅방 입장
    1. 클라이언트는 chat Front Server에 GET localhost:8081/chat/{roomId} 요청. 클라이언트는 비동기로 해당 요청을 실행하며, webclient의 .subcribe()으로 non-blocking 실행한다. (Asnyc-NonBlocking은 아래 참조)

      img reference : Blocking-NonBlocking-Synchronous-Asynchronous

    2. localhost:8081은 이를 받고 kakao.chat.chatRoom.REQUEST 토픽으로 localhost:9092에 메세지를 put한다. { “userId”:”HwangboGyumin”, “roomId”:10 }
    3. kakao.chat.chatRoom.REQUEST 토픽 내 실제 메세지가 삽입된 leader partition을 구독중인 Consumer localhost:8083 은 메세지를 소비한다.
    4. localhost:8083 은 로직 수행하여 DB에서 이전 채팅목록을 kakao.chat.chatRoom.RESPONSE 토픽으로 localhost:9092에 메세지를 put한다.
      {
        "endpoint":"ws://localhost:8081/stomp/chat",
        "chatRecord":{
         "userId":"HwangboGyumin",
         "roomId":10,
         "chatMessage":{
             "userId":"A",
             "roomId":"10",
             "meesage":"안녕?",
             "createdAt":"20221213 13:10 +09:00 UST"
         },{
              "userId":"B",
              ...
         }
        }
      }
      
  1. localhost:8081은 해당 메세지를 소비하여 클라이언트에 이전채팅목록 반환
  • 채팅전송
    1. 클라이언트는 이전 채팅방 접속 시 반환받은 “endpoint”:ws://localhost:8081/stomp/chat주소에 웹소켓을 연결한다.
    2. 연결성공 이후, POST localhost:8081/chat/{roomId}로 chatMessage를 전송한다.
    3. localhost:8081은 이를 수신받고 먼저 localhost:9092kakao.chat.chatMessage.REQUEST 토픽으로 전달한다.
    4. kakao.chat.chatMessage.REQUEST 토픽 내 실제 메세지가 삽입된 leader partition을 구독중인 Consumer localhost:8083 은 메세지를 소비한다.
    5. localhost:8083은 로직 수행하여 DB에 chatMessage 저장 및 kakao.chat.chatMessage.RESPONSE에 성공여부 메세지를 put한다.
    6. localhost:8081kakao.chat.chatMessage.RESPONSE토픽에 저장된 메세지를 소비하여 chatMessage DB저장 성공 여부를 판단한다.
    7. 제대로 저장되었다면, localhost:8081에서 stomp.send(topic:/pub/chat/room/{roomId})로 chatMessage를 publish한다.
    8. 클라이언트는 아래와 같이 해당 토픽을 웹소켓을 통해 수신받고 있기에 화면에 채팅이 표시된다.
stomp.subscribe("/sub/chat/room/" + roomId, function (chatMessage) { ... }

6. 현재의 프로젝트에 Kafka를 연동함으로써 예상되는 장점요약

  • 서버 수평확장에 있어 DB Sync 설정가능(Kafka Connect의 CDC)

    Kafka Connect : 트랜젝션 로그들로 sync하기에 ACID 보장가능

  • 서버 다운 시 대처가능

    kafka의 zooKeeper에서 변화감지 및 Consumer 변경 가능함으로써 서버다운대처가능

  • 부가기능들 자유롭게 수행가능

    메세지의 실시간 처리가 필요없는 부가기능들 = 응답 안기다려도 되는 기능

    ex) 어뷰저 관측, 로그저장

(Appendix) 추가할 작업

  • Chatting 메세지를 저장하는 Repository 추가

    고려할 점

    1. 사용자가 마지막에 읽은 채팅의 위치를 저장하는 칼럼 추가
    2. 지금 채팅방에 입장할때마다 입장메세지가 표시된다. 이것을 삭제하고 입장메세지의 토픽인 /pub/chat/enter의 핸들러를 거칠때마다 마지막 읽은 메세지의 위치 표시(반환)
    3. 카카오의 톡서랍처럼 나한테 중요한 메세지들을 채팅방,채팅메세지,시간 이렇게 저장할 수 있도록 중요메세지 저장 테이블 추가
  • Room/Participant 제거기능 추가
  • User가 삭제되었을 때, 연관된 데이터 삭제하도록 데이터 흐름 관찰(JPA실제 쿼리문 관찰)
  • Kafka 설정
    • UserServer/FriendServer/ChattingServer 전부 따로 떼서 Kafka를 통해 서로 메세지를 주고받도록 설정

      고려할 점

      • Partition + Broker 추가
      • Docker-compose로 Kafka/DB/Spring 구동 편리성 도모
      • Spring 서버 다운 시, 대책마련
      • 메시지가 소비될 떄, 멱등성 고려해야함
      • ElasticSearch LoggingServer 추가

Reference