created at 2022-12-19
추가내용
문제점2. 메모리상에서 유저데이터의 복제 해결 방법
이전 포스팅[실시간 채팅방 구현(2)]에서 문제점2인 메모리상 유저데이터 복제가 일어난다고 기술했다. 이를 해결하기 위해 sticky session과 같이 kafka 파티셔닝을 진행한다고 하였다. 상세내용은 아래의 코드를 통해 보이겠다.
KafkaProducerController
@Slf4j
@RestController
@RequestMapping("/api")
public class KafkaProducerController {
// kafka producer를 위한 KafkaTemplate를 지정한다.
private final KafkaTemplate<String, Object> kafkaProducerTemplate;
@Value("${kafka.topic-login-request}")
public String TOPIC_LOGIN_REQUEST;
@Value("${kafka.topic-login-response}")
public String TOPIC_LOGIN_RESPONSE;
...
// 임시로 PostMapping하였다.
// 역할1. userId와 userPw를 담은 RequestLoginDTO가 도착하면 이를 kafka의 특정 파티션으로 전송
@PostMapping("login")
public ResponseEntity<?> produceMessageWithRequestLoginDTO(@RequestBody RequestLoginDTO requestLoginDTO) {
// 이부분이 중요하다!!!!
// userId를 Kafka의 토픽에 key로 던져주면서 특정 파티션에 들어가도록 설정한다
// (던져주면 partition = HASH(useriD) mod (파티션개수) 로 수행할듯)
ListenableFuture<SendResult<String, Object>> future = kafkaProducerTemplate.send(TOPIC_LOGIN, requestLoginDTO.getUserId(), requestLoginDTO);
// callback
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
log.error("Unable to send message: {}", ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("Sent message with key: {}, offset: {}, partition: {}", requestLoginDTO.getUserId(), result.getRecordMetadata().offset(), result.getRecordMetadata().partition());
}
});
return ResponseEntity.ok(requestLoginDTO);
}
}
MessageListener
@Slf4j
@Component
public class MessageListener {
...
// 로그인
@KafkaListener(topics = "${kafka.topic-login-request}", containerFactory = "loginKafkaListenerContainerFactory")
public void listenLogin(RequestLoginDTO loginDTO) {
log.info("Receive [RequestLoginDTO] Message with userID={},userPw={}", loginDTO.getUserId(), loginDTO.getUserPw());
Optional<User> user = userService.matchUserIdAndUserPw(loginDTO.getUserId(), loginDTO.getUserPw());
ResponseLoginDTO responseLoginDTO = new ResponseLoginDTO();
responseLoginDTO.setRequestUserID(loginDTO.getUserId());
if (user.isPresent()) {
responseLoginDTO.setIsAccept(true);
responseLoginDTO.setUser(user.get());
} else {
responseLoginDTO.setIsAccept(false);
responseLoginDTO.setUser(new User());
}
// Kafka 메세지 전송/비동기 처리를 위한 ListenableFuture 사용
ListenableFuture<SendResult<String, Object>> future = kafkaProducerTemplate.send(TOPIC_LOGIN_RESPONSE, loginDTO.getUserId(), responseLoginDTO);
// 메시지 비동기 callback 처리
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
log.error("Fail to send message to broker: {}", ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("Send message with offset: {}, partition: {}", result.getRecordMetadata().offset(), result.getRecordMetadata().partition());
}
});
}
}
KafkaTopicConfig
// 역할1. Kafka 토픽 할당
@Configuration
public class KafkaTopicConfig {
@Autowired
private KafkaAdmin kafkaAdmin;
@Value("${kafka.topic-login-request}")
public String TOPIC_LOGIN_REQUEST;
@Value("${kafka.topic-login-response}")
public String TOPIC_LOGIN_RESPONSE;
// 계산 잘 해야한다. Partition 개수 >= Group내 Conusmer 개수.
// 생성하고자 하는 Conumser=2 Partition은 2이기에, 각각 conusmer에게 leader-partition 매칭가능
private NewTopic generateTopic(String topicName) {
return TopicBuilder.name(topicName)
.partitions(2) // 파티션 할당 개수
.replicas(3) // broker 3대에 할당
.build(); // 즉, 토픽은 총 2개의 leader-partition, 2개의 follow-partition 보유
}
@PostConstruct
public void init() {
kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_LOGIN_REQUEST));
kafkaAdmin.createOrModifyTopics(generateTopic(TOPIC_LOGIN_RESPONSE));
}
}
KafkaProducerConfig
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServer;
private ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
// Kafka Broker 엔드 포인트 할당(8097, 8098, 8099)
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
// Kafka에 전송될 키는 스트링으로 설정
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// Kafka에 전송될 값을 직렬화 하는 방법을 지정
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaProducerTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
KafkaConsumerConfig
// 역할1. Kafka연결
// 역할2. 메세지를 받아서 우리가 원하는 타입인 RequestLoginDTO로 변환
@Slf4j
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServer;
...
// login consumer 객체 변환
@Bean
public ConcurrentKafkaListenerContainerFactory<String, RequestLoginDTO> loginKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, RequestLoginDTO> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(loginConsumerFactory("chatServerGroup"));
return factory;
}
// login consumer 객체 변환
public ConsumerFactory<String, RequestLoginDTO> loginConsumerFactory(String groupId) {
// 앞서 producer이 json직렬화로 보냈으니, 마찬가지로 jsonDeserialize
JsonDeserializer<RequestLoginDTO> deserializer = new JsonDeserializer<>(RequestLoginDTO.class);
deserializer.setRemoveTypeHeaders(false);
// 모든 패키지 신뢰
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
// 객체변환 설정
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer)
.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
// 역직렬화 로직
.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
.build();
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
}
}
유저의 ID는 모든 서비스에 필요한 중요한 key이며 유니크하다. kafkaProducerTemplate.send(.., key=userId, ..)
로 front에서 kafka의 특정 파티션으로 들어가도록 하고, 해당 파티션은 backend 서버 중 하나가 계속(만약 계속 살아있다면) 읽게된다면 메모리의 중복사용이 해결된다.
Kafka Test
참고로 Kafka브로커는 3대(port:29092, 39092, 49092)를 설정하였고 아래의 docker compose를 통해 실행하였다.
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.2.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka1:
image: confluentinc/cp-kafka:7.2.1
container_name: kafka1
ports:
- "8097:8097"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: EXTERNAL://localhost:8097,INTERNAL://kafka1:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
kafka2:
image: confluentinc/cp-kafka:7.2.1
container_name: kafka2
ports:
- "8098:8098"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: EXTERNAL://localhost:8098,INTERNAL://kafka2:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
kafka3:
image: confluentinc/cp-kafka:7.2.1
container_name: kafka3
ports:
- "8099:8099"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: EXTERNAL://localhost:8099,INTERNAL://kafka3:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
위의 docker compose를 실행하게 되면 아래와 같이 컨테이너 생성된다.
gyuminhwangbo@Gyuminui-MacBookPro ghkdqhrbals.github.io % docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
9e0fd3e5a419 confluentinc/cp-kafka:7.2.1 "/etc/confluent/dock…" 10 minutes ago Up 10 minutes 0.0.0.0:8098->8098/tcp, 9092/tcp kafka2
b24b4c68887b confluentinc/cp-kafka:7.2.1 "/etc/confluent/dock…" 10 minutes ago Up 10 minutes 0.0.0.0:8099->8099/tcp, 9092/tcp kafka3
7dfabb292ff4 confluentinc/cp-kafka:7.2.1 "/etc/confluent/dock…" 10 minutes ago Up 10 minutes 0.0.0.0:8097->8097/tcp, 9092/tcp kafka1
02adb2b2386d confluentinc/cp-zookeeper:7.2.1 "/etc/confluent/dock…" 10 minutes ago Up 10 minutes 2181/tcp, 2888/tcp, 3888/tcp zookeeper
이후, 아래와 같이 backend에서 login-response로 전달했던 메세지를 확인해볼 수 있다. POST /login -> producer -> kafka -> consumer 이 부분
# kafka3 브로커 접속
gyuminhwangbo@Gyuminui-MacBookPro ghkdqhrbals.github.io % docker exec -it kafka3 /bin/bash
# login-response에 쌓인 메세지 확인
[appuser@b24b4c68887b ~]$ kafka-console-consumer --bootstrap-server localhost:9092 --topic login-response --from-beginning
# login fail
{"requestUserID":"aa","isAccept":false,"user":{"userId":null,"userPw":null,"email":null,"userName":null,"userStatus":null,"joinDate":null,"loginDate":null,"logoutDate":null}}
# login success
{"requestUserID":"a","isAccept":true,"user":{"userId":"a","userPw":"1234","email":"a@naver.com","userName":"user_A","userStatus":"안녕하세요!","joinDate":[2022,12,17],"loginDate":[2022,12,17],"logoutDate":[2022,12,17]}}
추가적인 테스트
직접 토픽 생성부터 pub/sub 하는 테스트를 추가적으로 진행해보았다.
# kafka2 브로커 접속
gyuminhwangbo@Gyuminui-MacBookPro ghkdqhrbals.github.io % docker exec -it kafka2 /bin/bash
# 직접 토픽생성
[appuser@9e0fd3e5a419 ~]$ kafka-topics --bootstrap-server localhost:9092 --create --topic randomTopic2 --partitions 3 --replication-factor 3
Created topic randomTopic2.
# 생성된 토픽확인
[appuser@9e0fd3e5a419 ~]$ kafka-topics --describe --topic randomTopic2 --bootstrap-server kafka1:9092
Topic: randomTopic2 TopicId: BL7DT0u4SuO_flLPKHbutg PartitionCount: 3 ReplicationFactor: 3 Configs:
Topic: randomTopic2 Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,1,2
Topic: randomTopic2 Partition: 1 Leader: 1 Replicas: 1,3,2 Isr: 3,2,1
Topic: randomTopic2 Partition: 2 Leader: 2 Replicas: 2,1,3 Isr: 3,1,2
# 직접 producing
[appuser@9e0fd3e5a419 ~]$ kafka-console-producer --bootstrap-server localhost:9092 --topic randomTopic
>Hi
>AA
# 다른 터미널에서 kafka3 브로커 접속
gyuminhwangbo@Gyuminui-MacBookPro ghkdqhrbals.github.io % docker exec -it kafka3 /bin/bash
# 직접 consuming
[appuser@b24b4c68887b ~]$ kafka-console-consumer --bootstrap-server localhost:9092 --topic randomTopic --from-beginning
Hi # 이와같이 받아볼 수 있다.
AA