created at 2023-04-17
1. Saga Choreography Architecture
v5 버전인 현재 구현진행/완료 중인 아키텍처입니다.
2. Saga Choreography 이벤트 성공값 클라이언트 반환 feat. SSE
저는 신규유저 추가 기능을 Saga의 Choreography 로 구현하였는데요. 여기서 기능적으로 힘들었던 부분이 있었습니다. 바로 이벤트들이 모두 처리되었을 때, 클라이언트에게 그 결과값을 반환하는 부분이였어요. 그래서 저는 위의 그림에서도 설명하였던 Server-Sent-Event 와 WebFlux 로 이를 해결하였습니다. 먼저 결과부터 볼까요?
2.1 Saga Choreography 이벤트 성공값 클라이언트 반환 결과
userStatus/chatStatus/customerStatus 이 세 가지가 Kafka MQ 백본망을 통해 처리됩니다. 그리고 SSE 로 stream 형식으로 결과들이 하나씩 전송됩니다.
2.2 하나의 클라이언트 당 하나의 Sinks 생성
저는 Netty 로 웹서버를 구축했는데요. Netty는 Spring-WebFlux를 지원합니다. 그리고 WebFlux 는 멀티 스레드로부터 하나의 채널을 구축하도록 도와주는 Sinks
객체를 지원합니다.
우리가 해결해야 하는 문제는 여러 스레드로부터 하나의 채널을 통해 지속적으로 클라이언트가 결과값을 반환받도록 하는것이죠? 이 채널을 Sinks 객체로 설정한다면 해결할 수 있습니다!
2.2.1 Concurrent Sinks Map 생성
public static ConcurrentHashMap<String, Sinks.Many<Object>> sinkMap = new ConcurrentHashMap<>();
여러 스레드의 접근으로부터 lock 해주는 ConcurrentHashMap
을 사용합니다. Key 값으로는 userId를 사용할 것이며, Value 로는 하나의 채널 즉 Sinks 객체를 할당해줍니다. 이는 @Configuration 에다가 static 으로 설정하였습니다.
2.2.2 Kafka Listener Thread 에서 Sinks 꺼내서 Flux streaming
@Slf4j
@Component
@RequiredArgsConstructor
public class MessageListener {
private final UserCommandQueryService userService;
private final KafkaTemplate<String, Object> kafkaProducerTemplate;
// concurrency를 partition 개수에 맞추어 설정하는 것이 중요합니다!
// 저는 partition 개수를 5로 설정해서 concurrency 파라미터에 "5"로 설정해주어야 합니다.
@KafkaListener(topics = KafkaTopic.userRes, containerFactory = "userKafkaListenerContainerFactory", concurrency = KafkaTopicPartition.userRes)
public void listenUser(UserResponseEvent req) {
log.info("메세지 도착 = {}", req.getServiceName());
// 이벤트 Status 를 Trancation 테이블에 업데이트 시켜줍니다.
userService.updateStatus(req).exceptionally(e->{
// 이벤트 응답들을 처리할 때 에러가 난다면, sinkMap 에서 Sinks 를 가져와서 tryEmitError()를 통해
// 에러 emit + onComplete 를 클라이언트에게 전송합니다.
AsyncConfig.sinkMap.get(req.getUserId()).tryEmitError(e);
return null;
});
}
private void sendToKafka(String topic,Object req) {
kafkaProducerTemplate.send(topic, req).thenAccept((SendResult<String, Object> result)->{
log.debug("메세지 전송 성공 topic={}, offset={}, partition={}",topic, result.getRecordMetadata().offset(), result.getRecordMetadata().partition());
}).exceptionally(e->{
log.error("메세지 전송 실패={}", e.getMessage());
return null;
});
}
}
2.2.3 Service Thread 에서 Sinks 꺼내서 Flux streaming
public class userServiceImpl implements userService{
...
@Override
@Async
@Transactional
public CompletableFuture<UserTransaction> updateStatus(UserResponseEvent event) {
Optional<UserTransaction> tx = userTransactionRepository.findByEventId(event.getEventId());
if (tx.isPresent()){
UserTransaction ut = tx.get();
// 서비스 이름을 통해 메세지 Status 구분
switch (event.getServiceName()){
case ServiceNames.chat -> {
ut.setChatStatus(event.getUserResponseStatus());
break;
}
case ServiceNames.customer -> {
ut.setCustomerStatus(event.getUserResponseStatus());
break;
}
}
String chatStatus = ut.getChatStatus();
String customerStatus = ut.getCustomerStatus();
String userStatus = ut.getUserStatus();
// 둘 다 SUCCESS 일 경우,
if (chatStatus.equals(UserResponseStatus.USER_SUCCES.name())
&& customerStatus.equals(UserResponseStatus.USER_SUCCES.name())){
// 유저 Status 에 따라 완료/미완료
if (userStatus.equals(UserStatus.USER_INSERT.name())){
// 유저 저장
Optional<User> findUser = userRepository.findById(ut.getUserId());
if (!findUser.isPresent()){
User user = new User(
ut.getUserId(),
ut.getUserPw(),
ut.getEmail(),
ut.getUserName(),
LocalDateTime.now(),
LocalDateTime.now(),
LocalDateTime.now(),
ut.getRole()
);
userRepository.save(user);
}else{
// 유저가 이미 존재할 때
ut.setUserStatus(UserStatus.USER_INSERT_FAIL.name());
// CompletableFuture Fail 처리
return CompletableFuture.failedFuture(new ResponseStatusException(HttpStatus.CONFLICT, "동일한 사용자가 존재합니다"));
}
// 이벤트 Transaction UPDATE
ut.setUserStatus(UserStatus.USER_INSERT_COMPLETE.name());
// 결과 SSE 클라이언트 반환
AsyncConfig.sinkMap.get(event.getUserId()).tryEmitNext(ut);
AsyncConfig.sinkMap.get(event.getUserId()).tryEmitComplete();
} else if (userStatus.equals(UserStatus.USER_DELETE.name())) {
userRepository.deleteById(ut.getUserId());
// 이벤트 Transaction UPDATE
ut.setUserStatus(UserStatus.USER_DELETE_COMPLETE.name());
// 결과 SSE 클라이언트 반환
AsyncConfig.sinkMap.get(event.getUserId()).tryEmitNext(ut);
AsyncConfig.sinkMap.get(event.getUserId()).tryEmitComplete();
}
} else if (chatStatus.equals(UserResponseStatus.USER_FAIL.name())
|| customerStatus.equals(UserResponseStatus.USER_FAIL.name())) {
// chatService/customerService 로 부터 응답받은 이벤트에 FAIL 이 있을 경우
// Transaction FAIL 처리
ut.setUserStatus(UserStatus.USER_INSERT_FAIL.name());
return CompletableFuture.failedFuture(new ResponseStatusException(HttpStatus.CONFLICT, "동일한 사용자가 존재합니다"));
} else {
// 중간 결과값들 계속 클라이언트에게 전송
AsyncConfig.sinkMap.get(event.getUserId()).tryEmitNext(ut);
}
}else{
// 이벤트 트랜젝션이 없을 때,
return CompletableFuture.failedFuture(new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "없는 트랜젝션 입니다"));
}
return CompletableFuture.completedFuture(tx.get());
}
...
}
2.2.4 Executor Threads(Netty) 에서 Sinks 꺼내서 Flux streaming
public class UserController {
...
@PostMapping(value = "/user", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<?> saveUser(@RequestBody RequestUser req) throws InterruptedException {
// saga choreograhpy tx 관리 id;
UUID eventId = UUID.randomUUID();
// SinkMap 에 클라이언트 Sinks 저장
AsyncConfig.sinkMap.put(req.getUserId(), Sinks.many().multicast().onBackpressureBuffer());
UserEvent userEvent = new UserEvent(
eventId,
UserStatus.USER_INSERT,
req.getUserId()
);
// 이벤트 Publishing (만약 MQ가 닫혀있으면 exception)
userCommandQueryService
.newUserEvent(req, eventId, userEvent)
.exceptionally(e -> {
if (e.getCause() instanceof CustomException) {
CustomException e2 = ((CustomException) e.getCause());
AsyncConfig.sinkMap.get(req.getUserId()).tryEmitError(new ResponseStatusException(e2.getErrorCode().getHttpStatus(), e2.getErrorCode().getDetail()));
} else {
AsyncConfig.sinkMap.get(req.getUserId()).tryEmitError(new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, e.getMessage()));
}
return null;
});
return AsyncConfig.sinkMap.get(req.getUserId()).asFlux().log();
}
}
문제점이 몇 개 보이는데요. 1) 클라이언트의 매 요청마다 새롭게 Sinks 를 업데이트하게 된다는 문제가 하나 있을 것 같습니다. 그리고 해당 2) ConcurrentHashMap 의 사이즈를 고려해야 한다는 점 또한 문제가 될 것 같아요. 이 부분은 고민해봐야 할 것 같습니다.