Skip to main content Link Menu Expand (external link) Document Search Copy Copied

created at 2022-12-21

진행상황

Backend Server 완료

아래는 Kafka Backend Server 세팅 클래스 및 기능들이다.

  1. MessageListener : 실제 메세지 내부 로직 수행
  2. KafkaTopicConst : 토픽을 application-properties에서 로드
  3. KafkaTopicConfig : 토픽 등록
  4. KafkaConsumerConfig : 메세지 수신 그룹 및 타입 설정
  5. KafkaProducerConfig : 메세지 발신 설정
  6. KafkaAdminConfig : 카프카 연결

테스트(Login)

  1. 메세지 전송 img

  2. 메세지 수신

     # 직접 메세지 소비
     [appuser@1ec915a5c4a5 ~]$ kafka-console-consumer --bootstrap-server localhost:9092 --topic login-response
    
     {"userId":"a","isSuccess":true,"errorMessage":"","user":{"userId":"a","userPw":"1234","email":"a@naver.com","userName":"user_A","userStatus":"바뀐 status message","joinDate":[2022,12,17],"loginDate":[2022,12,22],"logoutDate":[2022,12,17]}}
    

Kafka 코드

@Slf4j
@Component
public class MessageListener extends KafkaTopicConst{
    private final KafkaTemplate<String, Object> kafkaProducerTemplate;
    private final UserService userService;
    private final FriendService friendService;
    private final RoomService roomService;
    private final ChatService chatService;

    public MessageListener(KafkaTemplate<String, Object> kafkaProducerTemplate, UserService userService, FriendService friendService, RoomService roomService, ChatService chatService) {
        this.kafkaProducerTemplate = kafkaProducerTemplate;
        this.userService = userService;
        this.friendService = friendService;
        this.roomService = roomService;
        this.chatService = chatService;
    }

    // 로그인 요청
    @KafkaListener(topics = "${kafka.topic-login-request}", containerFactory = "loginKafkaListenerContainerFactory")
    public void listenLogin(RequestLoginDTO req) {
        log.info("Receive [RequestLoginDTO] Message with userId={},userPw={}",req.getUserId(),req.getUserPw());

        ResponseLoginDTO resp = userService.login(req.getUserId(), req.getUserPw());

        // Kafka 메세지 전송/비동기 처리를 위한 ListenableFuture 사용
        ListenableFuture<SendResult<String, Object>> future = kafkaProducerTemplate.send(TOPIC_LOGIN_RESPONSE, req.getUserId(),resp);
        listenFuture(future); // 메시지 비동기 callback 처리
    }

    // 로그아웃 요청
    @KafkaListener(topics = "${kafka.topic-logout-request}", containerFactory = "logoutKafkaListenerContainerFactory")
    public void listenLogin(@Payload String userId) {
        ResponseLogoutDTO resp = userService.logout(userId);

        ListenableFuture<SendResult<String, Object>> future = kafkaProducerTemplate.send(TOPIC_LOGOUT_RESPONSE, userId, resp);
        listenFuture(future);
    }

    // 유저 참여 채팅방 목록 조회
    @KafkaListener(topics = "${kafka.topic-user-search-room-request}", containerFactory = "userRoomKafkaListenerContainerFactory")
    public void rooms(RequestUserRoomDTO req){
        ResponseUserRoomDTO resp = userService.findAllMyRooms(req.getUserId());

        ListenableFuture<SendResult<String, Object>> future = kafkaProducerTemplate.send(TOPIC_USER_SEARCH_ROOM_RESPONSE, req.getUserId(),resp);
        listenFuture(future);
    }

    // 유저의 친구목록 조회
    @KafkaListener(topics = "${kafka.topic-user-search-friend-request}", containerFactory = "userSearchFriendKafkaListenerContainerFactory")
    public void findFriend(RequestUserFriendDTO req){
        String userId = req.getUserId();

        ResponseUserFriendDTO resp = new ResponseUserFriendDTO(userId);

        Optional<User> findUser = userService.findById(userId);

        if (!findUser.isPresent()){
            resp.setStat(userId + "유저가 존재하지 않습니다");
        }else{
            List<Friend> friends = friendService.findAllByUserId(userId);
            resp.setIsSuccess(true);
            resp.setFriend(friends);
        }
        ListenableFuture<SendResult<String, Object>> future = kafkaProducerTemplate.send(TOPIC_USER_SEARCH_FRIEND_RESPONSE, req.getUserId(),resp);
        listenFuture(future);
    }

    // 유저 저장
    @KafkaListener(topics = "${kafka.topic-user-add-request}", containerFactory = "userAddKafkaListenerContainerFactory")
    public void addUser(RequestAddUserDTO req){
        User user = new User(
                req.getUserId(),
                req.getUserPw(),
                req.getEmail(),
                req.getUserName(),
                "",
                LocalDate.now(),
                LocalDate.now(),
                LocalDate.now()
        );

        ResponseAddUserDTO resp = userService.save(user);

        ListenableFuture<SendResult<String, Object>> future = kafkaProducerTemplate.send(TOPIC_USER_ADD_RESPONSE, req.getUserId(),resp);
        listenFuture(future);
    }

    // 유저 상태메세지 변경
    @KafkaListener(topics = "${kafka.topic-user-status-change-request}", containerFactory = "userChangeStatusKafkaListenerContainerFactory")
    public void changeUserStatus(RequestChangeUserStatusDTO req){
        ResponseChangeUserStatusDTO resp = userService.updateUserStatus(req);

        ListenableFuture<SendResult<String, Object>> future =
                kafkaProducerTemplate.send(TOPIC_USER_STATUS_CHANGE_RESPONSE, req.getUserId(),resp);
        listenFuture(future);
    }

    //채팅방 개설
    @KafkaListener(topics = "${kafka.topic-user-add-room-request}", containerFactory = "userAddRoomKafkaListenerContainerFactory")
    public void createRoomForm(RequestAddUserRoomDTO req){
        ResponseAddUserRoomDTO resp = userService.makeRoomWithFriends(req);

        ListenableFuture<SendResult<String, Object>> future =
                kafkaProducerTemplate.send(TOPIC_USER_ADD_ROOM_RESPONSE, req.getUserId(),resp);
        listenFuture(future);
    }

    // 채팅 저장
    @KafkaListener(topics = "${kafka.topic-user-add-chat-request}", containerFactory = "userAddChatKafkaListenerContainerFactory")
    public void chattingRoom(RequestAddChatMessageDTO req) {

        Optional<Room> room = roomService.findByRoomId(req.getRoomId());
        Optional<User> user = userService.findById(req.getWriterId());

        ResponseAddChatMessageDTO resp = new ResponseAddChatMessageDTO(req.getRoomId(),req.getWriterId());

        if (!room.isPresent()){
            resp.setErrorMessage("채팅방이 존재하지 않습니다");
        }
        if(!user.isPresent()){
            resp.setErrorMessage("유저가 존재하지 않습니다");
        }

        Chatting chat = createChatting(user.get(),room.get(),req.getMessage());

        resp = chatService.save(chat);

        ListenableFuture<SendResult<String, Object>> future =
                kafkaProducerTemplate.send(TOPIC_USER_ADD_CHAT_RESPONSE, req.getWriterId(),resp);
        listenFuture(future);
    }

    // 친구 저장
    @KafkaListener(topics = "${kafka.topic-user-add-friend-request}", containerFactory = "userAddFriendKafkaListenerContainerFactory")
    public void addFriend(RequestAddFriendDTO req){
        List<String> friendIds = req.getFriendId();
        ResponseAddFriendDTO resp = new ResponseAddFriendDTO(req.getUserId());

        for(String friendId : friendIds){
            Optional<User> findId = userService.findById(friendId);
            if (!findId.isPresent()){
                resp.setErrorMessage("유저가 존재하지 않습니다");
                break;
            }
            friendService.save(req.getUserId(), friendId);
        }

        ListenableFuture<SendResult<String, Object>> future =
                kafkaProducerTemplate.send(TOPIC_USER_ADD_FRIEND_RESPONSE, req.getUserId(),resp);
        listenFuture(future);
    }


    // utils
    private Chatting createChatting(User user,Room room, String message){
        Chatting chat = new Chatting();
        chat.setRoom(room);
        chat.setSendUser(user);
        chat.setCreatedDate(ZonedDateTime.now().toLocalDate());
        chat.setCreatedTime(ZonedDateTime.now().toLocalTime());
        chat.setMessage(message);
        return chat;
    }

    private void listenFuture(ListenableFuture<SendResult<String, Object>> future) {
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("메세지 전송 실패 : {}", ex.getMessage());
            }
            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("메세지 전송 성공 topic: {}, offset: {}, partition: {}",result.getRecordMetadata().topic() ,result.getRecordMetadata().offset(), result.getRecordMetadata().partition());
            }
        });
    }
}
@Component
public class KafkaTopicConst {
    @Value("${kafka.topic-login-request}")
    public String TOPIC_LOGIN_REQUEST;
    @Value("${kafka.topic-login-response}")
    public String TOPIC_LOGIN_RESPONSE;

    @Value("${kafka.topic-logout-request}")
    public String TOPIC_LOGOUT_REQUEST;
    @Value("${kafka.topic-logout-response}")
    public String TOPIC_LOGOUT_RESPONSE;

    @Value("${kafka.topic-user-search-room-request}")
    public String TOPIC_USER_ROOM_REQUEST;
    @Value("${kafka.topic-user-search-room-response}")
    public String TOPIC_USER_ROOM_RESPONSE;

    @Value("${kafka.topic-user-search-room-request}")
    public String TOPIC_USER_SEARCH_ROOM_REQUEST;
    @Value("${kafka.topic-user-search-room-response}")
    public String TOPIC_USER_SEARCH_ROOM_RESPONSE;

    @Value("${kafka.topic-user-search-friend-request}")
    public String TOPIC_USER_SEARCH_FRIEND_REQUEST;
    @Value("${kafka.topic-user-search-friend-response}")
    public String TOPIC_USER_SEARCH_FRIEND_RESPONSE;

    @Value("${kafka.topic-user-add-request}")
    public String TOPIC_USER_ADD_REQUEST;
    @Value("${kafka.topic-user-add-response}")
    public String TOPIC_USER_ADD_RESPONSE;

    @Value("${kafka.topic-user-status-change-request}")
    public String TOPIC_USER_STATUS_CHANGE_REQUEST;
    @Value("${kafka.topic-user-status-change-response}")
    public String TOPIC_USER_STATUS_CHANGE_RESPONSE;

    @Value("${kafka.topic-user-add-room-request}")
    public String TOPIC_USER_ADD_ROOM_REQUEST;
    @Value("${kafka.topic-user-add-room-response}")
    public String TOPIC_USER_ADD_ROOM_RESPONSE;
    @Value("${kafka.topic-user-add-chat-request}")
    public String TOPIC_USER_ADD_CHAT_REQUEST;
    @Value("${kafka.topic-user-add-chat-response}")
    public String TOPIC_USER_ADD_CHAT_RESPONSE;

    @Value("${kafka.topic-user-add-friend-request}")
    public String TOPIC_USER_ADD_FRIEND_REQUEST;
    @Value("${kafka.topic-user-add-friend-response}")
    public String TOPIC_USER_ADD_FRIEND_RESPONSE;
}
@Configuration
public class KafkaTopicConfig extends KafkaTopicConst {
    @Autowired
    private KafkaAdmin kafkaAdmin;

    // 계산 잘 해야한다. Partition 개수 >= Group내 Conusmer 개수
    // 생성하고자 하는 Conumser=2 Partition은 2이기에, 각각 conusmer에게 leader-partition 매칭가능
    private NewTopic generateTopic(String topicName,int partitionNum, int brokerNum) {
        return TopicBuilder.name(topicName)
                .partitions(partitionNum) // 할당하고자 하는 파티션 개수
                .replicas(brokerNum) // replica sync를 위한 broker 개수
                .build(); // 토픽은 총 2개의 leader-partition, 4개의 follow-partition 로 설정할 것임
    }

    @PostConstruct
    public void init() {
        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_LOGIN_REQUEST,2,3));
        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_LOGIN_RESPONSE,2,3));

        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_LOGOUT_REQUEST,2,3));
        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_LOGOUT_RESPONSE,2,3));

        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_USER_ROOM_REQUEST,2,3));
        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_USER_ROOM_RESPONSE,2,3));

        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_USER_SEARCH_ROOM_REQUEST,2,3));
        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_USER_SEARCH_ROOM_RESPONSE,2,3));

        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_USER_SEARCH_FRIEND_REQUEST,2,3));
        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_USER_SEARCH_FRIEND_RESPONSE,2,3));

        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_USER_ADD_REQUEST,5,3));
        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_USER_ADD_RESPONSE,5,3));

        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_USER_STATUS_CHANGE_REQUEST,2,3));
        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_USER_STATUS_CHANGE_RESPONSE,2,3));

        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_USER_ADD_ROOM_REQUEST,2,3));
        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_USER_ADD_ROOM_RESPONSE,2,3));

        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_USER_ADD_CHAT_REQUEST,2,3));
        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_USER_ADD_CHAT_RESPONSE,2,3));

        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_USER_ADD_FRIEND_REQUEST,2,3));
        kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_USER_ADD_FRIEND_RESPONSE,2,3));
    }
}
@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServer;

    private ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaProducerTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}
@Slf4j
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServer;

    // 로그인 요청
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, RequestLoginDTO> loginKafkaListenerContainerFactory() {
        return getContainerFactory("defaultGroup",RequestLoginDTO.class);
    }
    // 로그아웃
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> logoutKafkaListenerContainerFactory() {
        return getContainerFactory("defaultGroup",String.class);
    }
    // 유저 참여 채팅방 목록 조회
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, RequestUserRoomDTO> userRoomKafkaListenerContainerFactory() {
        return getContainerFactory("defaultGroup",RequestUserRoomDTO.class);
    }
    // 유저 친구목록 조회
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, RequestUserFriendDTO> userSearchFriendKafkaListenerContainerFactory() {
        return getContainerFactory("defaultGroup",RequestUserFriendDTO.class);
    }
    // 유저 저장
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, RequestAddUserDTO> userAddKafkaListenerContainerFactory() {
        return getContainerFactory("defaultGroup",RequestAddUserDTO.class);
    }
    // 유저 상태메세지 변경
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, RequestChangeUserStatusDTO> userChangeStatusKafkaListenerContainerFactory() {
        return getContainerFactory("defaultGroup",RequestChangeUserStatusDTO.class);
    }
    // 채팅방 개설
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, RequestAddUserRoomDTO> userAddRoomKafkaListenerContainerFactory() {
        return getContainerFactory("defaultGroup",RequestAddUserRoomDTO.class);
    }
    // 채팅 저장
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, RequestAddChatMessageDTO> userAddChatKafkaListenerContainerFactory() {
        return getContainerFactory("defaultGroup",RequestAddChatMessageDTO.class);
    }
    // 유저 친구 저장
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, RequestAddFriendDTO> userAddFriendKafkaListenerContainerFactory() {
        return getContainerFactory("defaultGroup",RequestAddFriendDTO.class);
    }

    /**
     * 유틸 목록 - 제네릭 클래스로 여러메소드에서 중복사용가능하도록 유틸화하였다.
     * --Methods--
     * getContainerFactory()
     * getKafkaConsumerFactory()
     * setConfig()
     * setDeserializer()
     */

    private <T> ConcurrentKafkaListenerContainerFactory<String, T> getContainerFactory(String groupId, Class<T> classType) {
        ConcurrentKafkaListenerContainerFactory<String, T> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(getKafkaConsumerFactory(groupId, classType));
        return factory;
    }

    private <T> DefaultKafkaConsumerFactory<String, T> getKafkaConsumerFactory(String groupId,Class<T> classType) {
        JsonDeserializer<T> deserializer = setDeserializer(classType);
        return new DefaultKafkaConsumerFactory<>(setConfig(groupId, deserializer), new StringDeserializer(), deserializer);
    }

    private <T> ImmutableMap<String, Object> setConfig(String groupId, JsonDeserializer<T> deserializer) {
        ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
                .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
                .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                .put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
                .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
                .build();
        return config;
    }

    private <T> JsonDeserializer<T> setDeserializer(Class<T> classType) {
        JsonDeserializer<T> deserializer = new JsonDeserializer<>(classType);
        deserializer.setRemoveTypeHeaders(false);
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);
        return deserializer;
    }
}
@Configuration
public class KafkaAdminConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServer;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);

        return new KafkaAdmin(configs);
    }
}

이 밖에 많은수의 데이터 발신/수신 포멧들과 서비스들이 연계되어 사용된다.

Frontend Server 진행중

Backend 개발 이후 세션과 페이지를 관리하는 Frontend Server을 개발중에 있다. 이 과정에서 고려할 몇가지를 추가하려 한다.

  • Server-Sent Event로 진행

이전 계획은 Frontend Server에서 webClient로 HTTP 비동기-nonBlocking 수신하려하였지만, backend에서 HTTP를 통하지 않고 직접 토픽 내 메세지 수신으로 바꾸었기에 수정이 필요하다.

수정 이후 예상되는 전체 flow는 다음과 같다.

수정된 Flow(로그인 예시)

  1. 클라이언트 브라우저에서 GET /login FrontServer에 전송.
  2. Front Server에서 SSE 모델 전송 및 유저인증 메시지 Kafka에 login-request 토픽으로 전달
  3. 클라이언트 브라우저에서 SSE 이벤트 수신 이전 로그인 로딩창 확인
  4. Backend Server에서 Kafka메세지 수신 및 인증로직 수행 이후 login-response 토픽에 전달
  5. Front Server는 이를 수신 및 SSE 이벤트 클라이언트 브라우저에 전달
  6. 클라이언트는 비로소 로그인 성공화면 redirect