Skip to main content Link Menu Expand (external link) Document Search Copy Copied

created at 2022-12-19

추가내용

문제점2. 메모리상에서 유저데이터의 복제 해결 방법

이전 포스팅[실시간 채팅방 구현(2)]에서 문제점2인 메모리상 유저데이터 복제가 일어난다고 기술했다. 이를 해결하기 위해 sticky session과 같이 kafka 파티셔닝을 진행한다고 하였다. 상세내용은 아래의 코드를 통해 보이겠다.

KafkaProducerController

@Slf4j
@RestController
@RequestMapping("/api")
public class KafkaProducerController {
    // kafka producer를 위한 KafkaTemplate를 지정한다.
    private final KafkaTemplate<String, Object> kafkaProducerTemplate;

    @Value("${kafka.topic-login-request}")
    public String TOPIC_LOGIN_REQUEST;

    @Value("${kafka.topic-login-response}")
    public String TOPIC_LOGIN_RESPONSE;

    ...

    // 임시로 PostMapping하였다.
    // 역할1. userId와 userPw를 담은 RequestLoginDTO가 도착하면 이를 kafka의 특정 파티션으로 전송
    @PostMapping("login")
    public ResponseEntity<?> produceMessageWithRequestLoginDTO(@RequestBody RequestLoginDTO requestLoginDTO) {

        // 이부분이 중요하다!!!!
        // userId를 Kafka의 토픽에 key로 던져주면서 특정 파티션에 들어가도록 설정한다
        // (던져주면 partition = HASH(useriD) mod (파티션개수) 로 수행할듯)
        ListenableFuture<SendResult<String, Object>> future = kafkaProducerTemplate.send(TOPIC_LOGIN, requestLoginDTO.getUserId(), requestLoginDTO);

        // callback
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("Unable to send message: {}", ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("Sent message with key: {}, offset: {}, partition: {}", requestLoginDTO.getUserId(), result.getRecordMetadata().offset(), result.getRecordMetadata().partition());
            }
        });
        return ResponseEntity.ok(requestLoginDTO);
    }
}

MessageListener

@Slf4j
@Component
public class MessageListener {

    ...

    // 로그인
    @KafkaListener(topics = "${kafka.topic-login-request}", containerFactory = "loginKafkaListenerContainerFactory")
    public void listenLogin(RequestLoginDTO loginDTO) {
        log.info("Receive [RequestLoginDTO] Message with userID={},userPw={}", loginDTO.getUserId(), loginDTO.getUserPw());

        Optional<User> user = userService.matchUserIdAndUserPw(loginDTO.getUserId(), loginDTO.getUserPw());

        ResponseLoginDTO responseLoginDTO = new ResponseLoginDTO();
        responseLoginDTO.setRequestUserID(loginDTO.getUserId());

        if (user.isPresent()) {
            responseLoginDTO.setIsAccept(true);
            responseLoginDTO.setUser(user.get());
        } else {
            responseLoginDTO.setIsAccept(false);
            responseLoginDTO.setUser(new User());
        }

        // Kafka 메세지 전송/비동기 처리를 위한 ListenableFuture 사용
        ListenableFuture<SendResult<String, Object>> future = kafkaProducerTemplate.send(TOPIC_LOGIN_RESPONSE, loginDTO.getUserId(), responseLoginDTO);

        // 메시지 비동기 callback 처리
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("Fail to send message to broker: {}", ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("Send message with offset: {}, partition: {}", result.getRecordMetadata().offset(), result.getRecordMetadata().partition());
            }
        });
    }
}

KafkaTopicConfig

// 역할1. Kafka 토픽 할당
@Configuration
public class KafkaTopicConfig {
    @Autowired
    private KafkaAdmin kafkaAdmin;

    @Value("${kafka.topic-login-request}")
    public String TOPIC_LOGIN_REQUEST;

    @Value("${kafka.topic-login-response}")
    public String TOPIC_LOGIN_RESPONSE;

    // 계산 잘 해야한다. Partition 개수 >= Group내 Conusmer 개수.
    // 생성하고자 하는 Conumser=2 Partition은 2이기에, 각각 conusmer에게 leader-partition 매칭가능
    private NewTopic generateTopic(String topicName) {
      return TopicBuilder.name(topicName)
        .partitions(2) // 파티션 할당 개수
        .replicas(3) // broker 3대에 할당
        .build(); // 즉, 토픽은 총 2개의 leader-partition, 2개의 follow-partition 보유
    }

    @PostConstruct
    public void init() {
      kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_LOGIN_REQUEST));
      kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_LOGIN_RESPONSE));
    }
}

KafkaProducerConfig

@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServer;

    private ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        // Kafka Broker 엔드 포인트 할당(8097, 8098, 8099)
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
        // Kafka에 전송될 키는 스트링으로 설정
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // Kafka에 전송될 값을 직렬화 하는 방법을 지정
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaProducerTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

KafkaConsumerConfig

// 역할1. Kafka연결
// 역할2. 메세지를 받아서 우리가 원하는 타입인 RequestLoginDTO로 변환
@Slf4j
@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value("${kafka.bootstrap-servers}")
    private String bootstrapServer;

    ...

    // login consumer 객체 변환
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, RequestLoginDTO> loginKafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, RequestLoginDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(loginConsumerFactory("chatServerGroup"));
        return factory;
    }

    // login consumer 객체 변환
    public ConsumerFactory<String, RequestLoginDTO> loginConsumerFactory(String groupId) {

        // 앞서 producer이 json직렬화로 보냈으니, 마찬가지로 jsonDeserialize
        JsonDeserializer<RequestLoginDTO> deserializer = new JsonDeserializer<>(RequestLoginDTO.class);
        deserializer.setRemoveTypeHeaders(false);
        // 모든 패키지 신뢰
        deserializer.addTrustedPackages("*");
        deserializer.setUseTypeMapperForKey(true);

        // 객체변환 설정
        ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
                .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
                .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                .put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
                // 역직렬화 로직
                .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
                .build();

        return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
    }
}

유저의 ID는 모든 서비스에 필요한 중요한 key이며 유니크하다. kafkaProducerTemplate.send(.., key=userId, ..)로 front에서 kafka의 특정 파티션으로 들어가도록 하고, 해당 파티션은 backend 서버 중 하나가 계속(만약 계속 살아있다면) 읽게된다면 메모리의 중복사용이 해결된다.

Kafka Test

참고로 Kafka브로커는 3대(port:29092, 39092, 49092)를 설정하였고 아래의 docker compose를 통해 실행하였다.

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.2.1
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka1:
    image: confluentinc/cp-kafka:7.2.1
    container_name: kafka1
    ports:
      - "8097:8097"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: EXTERNAL://localhost:8097,INTERNAL://kafka1:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL

  kafka2:
    image: confluentinc/cp-kafka:7.2.1
    container_name: kafka2
    ports:
      - "8098:8098"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: EXTERNAL://localhost:8098,INTERNAL://kafka2:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL

  kafka3:
    image: confluentinc/cp-kafka:7.2.1
    container_name: kafka3
    ports:
      - "8099:8099"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: EXTERNAL://localhost:8099,INTERNAL://kafka3:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL

위의 docker compose를 실행하게 되면 아래와 같이 컨테이너 생성된다.

gyuminhwangbo@Gyuminui-MacBookPro ghkdqhrbals.github.io % docker ps
CONTAINER ID   IMAGE                             COMMAND                  CREATED          STATUS          PORTS                              NAMES
9e0fd3e5a419   confluentinc/cp-kafka:7.2.1       "/etc/confluent/dock…"   10 minutes ago   Up 10 minutes   0.0.0.0:8098->8098/tcp, 9092/tcp   kafka2
b24b4c68887b   confluentinc/cp-kafka:7.2.1       "/etc/confluent/dock…"   10 minutes ago   Up 10 minutes   0.0.0.0:8099->8099/tcp, 9092/tcp   kafka3
7dfabb292ff4   confluentinc/cp-kafka:7.2.1       "/etc/confluent/dock…"   10 minutes ago   Up 10 minutes   0.0.0.0:8097->8097/tcp, 9092/tcp   kafka1
02adb2b2386d   confluentinc/cp-zookeeper:7.2.1   "/etc/confluent/dock…"   10 minutes ago   Up 10 minutes   2181/tcp, 2888/tcp, 3888/tcp       zookeeper

이후, 아래와 같이 backend에서 login-response로 전달했던 메세지를 확인해볼 수 있다. POST /login -> producer -> kafka -> consumer 이 부분

# kafka3 브로커 접속
gyuminhwangbo@Gyuminui-MacBookPro ghkdqhrbals.github.io % docker exec -it kafka3 /bin/bash

# login-response에 쌓인 메세지 확인
[appuser@b24b4c68887b ~]$ kafka-console-consumer --bootstrap-server localhost:9092 --topic login-response --from-beginning
# login fail
{"requestUserID":"aa","isAccept":false,"user":{"userId":null,"userPw":null,"email":null,"userName":null,"userStatus":null,"joinDate":null,"loginDate":null,"logoutDate":null}}
# login success
{"requestUserID":"a","isAccept":true,"user":{"userId":"a","userPw":"1234","email":"a@naver.com","userName":"user_A","userStatus":"안녕하세요!","joinDate":[2022,12,17],"loginDate":[2022,12,17],"logoutDate":[2022,12,17]}}

추가적인 테스트

직접 토픽 생성부터 pub/sub 하는 테스트를 추가적으로 진행해보았다.

# kafka2 브로커 접속
gyuminhwangbo@Gyuminui-MacBookPro ghkdqhrbals.github.io % docker exec -it kafka2 /bin/bash

# 직접 토픽생성
[appuser@9e0fd3e5a419 ~]$ kafka-topics --bootstrap-server localhost:9092 --create --topic randomTopic2 --partitions 3 --replication-factor 3
Created topic randomTopic2.

# 생성된 토픽확인
[appuser@9e0fd3e5a419 ~]$ kafka-topics --describe --topic randomTopic2 --bootstrap-server kafka1:9092
Topic: randomTopic2	TopicId: BL7DT0u4SuO_flLPKHbutg	PartitionCount: 3	ReplicationFactor: 3	Configs:
	Topic: randomTopic2	Partition: 0	Leader: 3	Replicas: 3,2,1	Isr: 3,1,2
	Topic: randomTopic2	Partition: 1	Leader: 1	Replicas: 1,3,2	Isr: 3,2,1
	Topic: randomTopic2	Partition: 2	Leader: 2	Replicas: 2,1,3	Isr: 3,1,2

# 직접 producing
[appuser@9e0fd3e5a419 ~]$ kafka-console-producer --bootstrap-server localhost:9092 --topic randomTopic
>Hi
>AA

# 다른 터미널에서 kafka3 브로커 접속
gyuminhwangbo@Gyuminui-MacBookPro ghkdqhrbals.github.io % docker exec -it kafka3 /bin/bash

# 직접 consuming
[appuser@b24b4c68887b ~]$ kafka-console-consumer --bootstrap-server localhost:9092 --topic randomTopic --from-beginning
Hi # 이와같이 받아볼 수 있다.
AA