Skip to main content Link Menu Expand (external link) Document Search Copy Copied

created at 2023-03-06

INDEX

  1. 서론
  2. Multi Thread with Custom Exception, Not using @Async!
    1. 멀티스레딩 결과
      1. 정상적인 api 요청 결과
      2. db-유니크 키 violation 에러 핸들링 결과
    2. 신나는 코딩!
      1. 스레드 풀 및 TaskExecutor 설정해서 Bean에 올리기
      2. IoC 설정
      3. UserService 의 save 메소드 CompletableFuture 반환하기
      4. UserRepositoryJDBC 의 saveAll 메소드 CompletableFuture 반환하기 및 네이티브 쿼리
      5. UserController 의 non-blocking 호출
  3. [로깅목적 멀티스레딩] Multi Thread with Custom Exception, But only for logging
    1. 기존 Single Thread with Custom Exception
    2. CustomAsyncExceptionHandler 추가
    3. AsyncConfig 변경
    4. UserService의 save 메소드 수정

0. 서론

저는 최근에 계속 Chat Server 을 최적화하는 과정을 수행하고 있는데요. 비동기로 바꾸고 별로 문제없는 줄 알았지만 JPA 에러를 핸들링하는 CustomException 이 동작하지 않았습니다ㅜㅜ.

사실 당연한게, 단순히 @Async 어노테이션을 사용하여 JPA-DB 에러를 핸들링한다면, 해당 스레드는 그냥 에러를 반환하지 않고 끝나버리게 됩니다. 즉, 에러 자체를 메인 스레드가 반환받지 못한다는 것이죠!

그래서 저는 애초에 아래의 문제점이 발생하지 않도록, 스레드와 쿼리들을 완벽히 컨트롤하고 싶었습니다.

  • JPA 쿼리 최적화 문제 : 네이티브 쿼리로 상세하게 쿼리들을 Optimizing 할 수 없음
  • Batch 성능 문제 : JPA 배치성능 «< JDBC 배치성능
  • Multi threading 문제 : JPA 호환이 어려움(아래에서 설명)
  • JPA 를 멀티 스레드로 사용 시 문제점

JPA repository 는 @Async 어노테이션을 지원하긴 합니다. 하지만, 오직 새로운 메소드에만 할당할 수 있어요! JPA 기본제공 메소드(ex. save, saveAll, find, etc.)들은 퓨쳐객체를 반환하는 것이 아니기에, 멀티 스레딩 환경에서는 사용할 수 없습니다.

  • 그러면 어떻게 해야될까요?
  1. 기존에 JPA가 제공하던 메소드들을 직접 네이티브 쿼리로 짜야되요!(i.e. INSERT 문은 JPA 가 기존 save, saveAll 메소드를 통해서만 제공하기 떄문에, INSERT 문에 멀티 스레딩을 적용하기 위해서는 네이티브 쿼리로 직접 작성해야합니다.)
  2. Repository/Service 의 메소드 반환 타입을 CompletableFuture 객체로 설정합니다.
    • CompletableFuture란?

      Future 객체에 CallBack 을 더한 객체입니다. 따라서 새로운 스레드로 task를 전송한 뒤, Non-blocking 으로 hooking 할 수 있습니다.

  3. CustomException 을 추가적으로 핸들링하도록 설정합니다.(기존에는 GlobalExceptionHandler 로 AOP 를 작성했지만, 따로 떼줄거에요)

1. Multi Thread with Custom Exception, Not using @Async!

지금부터 @Async를 전부 빼버리고 직접 스레드와 쿼리들을 컨트롤 할거에요. 그리고 Spring Bean 의 IoC를 활용할거에요.

먼저 결과부터 확인해 볼까요?

1-1 멀티스레딩 결과

1-1-1. 정상적인 api 요청 결과

img

1-1-2. db-유니크 키 violation 에러 핸들링 결과

img

일단 요청은 2개의 메소드(1. Kafka에 메세지 전송, 2. DB에 저장요청)를 비동기 Non-blocking 으로 설계했습니다. 정상 상황일 때는 각자 정상적으로 동작하는 것을 확인할 수 있어요.

이제 일부러 유니크 키 violation을 유발하는 api 요청을 보내보았습니다. userRepository에서 ConcurrentException을 볼 수 있죠. 이게, 실제 Exception인 DuplicateKeyException을 감싸고 있는 exception 인데요. 즉 중첩된 Exception이죠. 이 부분을 unwrap해서 DuplicateKeyExceptionCustomException 으로 변환해주었어요. 그리고 db-thread-4 스레드는 서비스와 컨트롤러를 거슬러 올라가서 마지막으로 클라이언트에게 에러를 반환해주게 됩니다!

이제 코드를 확인해 볼까요?

1-2 신나는 코딩!

1-2-1. 스레드 풀 및 TaskExecutor 설정해서 Bean에 올리기

먼저 할당하고자 하는 스레드 개수와 TaskExecutor가 멀티스레딩에서는 필수입니다. 기본적으로는 Java는 ForkJoinPool.commonPool을 사용할 텐데요. 저는 서비스와 데이터베이스에 각각 스레드 풀을 할당하고 싶었습니다. 아래와 같이 빈으로 각각의 Executor 을 빈에 올려주세요.

@Configuration
@Slf4j
public class AsyncConfig  {

    @Bean(name = "taskExecutorForService")
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor t = new ThreadPoolTaskExecutor();
        t.setCorePoolSize(10);
        t.setMaxPoolSize(100);
        t.setQueueCapacity(10);
        t.setThreadNamePrefix("service-thread-");

        t.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        t.setWaitForTasksToCompleteOnShutdown(true);
        t.setAwaitTerminationSeconds(60);

        t.initialize();
        return t;
    }

    @Bean(name = "taskExecutorForDB")
    public Executor AsyncExecutorForDB() {
        ThreadPoolTaskExecutor t = new ThreadPoolTaskExecutor();
        t.setCorePoolSize(10);
        t.setMaxPoolSize(100);
        t.setQueueCapacity(10);
        t.setThreadNamePrefix("db-thread-");

        t.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        t.setWaitForTasksToCompleteOnShutdown(true);
        t.setAwaitTerminationSeconds(60);

        t.initialize();
        return t;
    }
}

1-2-2. IoC 설정

아래와 같이 UserService 서비스에 @Qualifier 어노테이션을 통해 빈에 등록된 taskExecutorForService Executor을 주입시킬거에요.

@Slf4j
@Configuration
public class JpaConfig {

    private final EntityManager em;
    private final UserRepository userRepository;
    private final KafkaTemplate<String, Object> kafkaProducerTemplate;
    private final UserRepositoryJDBC userRepositoryJDBC;
    private final JdbcTemplate jdbcTemplate;
    private final Executor serviceExecutor;

    public JpaConfig(EntityManager em,
                     UserRepository userRepository,
                     KafkaTemplate<String, Object> kafkaProducerTemplate,
                     UserRepositoryJDBC userRepositoryJDBC,
                     JdbcTemplate jdbcTemplate,
                     @Qualifier("taskExecutorForService") Executor serviceExecutor) {
        this.em = em;
        this.userRepository = userRepository;
        this.kafkaProducerTemplate = kafkaProducerTemplate;
        this.userRepositoryJDBC = userRepositoryJDBC;
        this.jdbcTemplate = jdbcTemplate;
        this.serviceExecutor = serviceExecutor;
    }
    
    @Bean
    public UserService userService() {
        return new UserServiceImpl(userRepository, userRepositoryJDBC, kafkaProducerTemplate, serviceExecutor);
    }
}

1-2-3. UserService 의 save 메소드 CompletableFuture 반환하기

컨트롤러에서는 유저를 저장하기 위해서 서비스의 save 메소드를 호출하는데요. 아래와 같이 CompletableFuture을 와일드카드티입으로 반환하도록 설정합니다. 테스트하기 쉽게 log.info() 들은 남겨둘게요 :)

ex는 주입받은 Executor(serviceExecutor) 입니다!

@Slf4j
@Service
@Transactional
public class UserServiceImpl extends KafkaTopicConst implements UserService {
    ...
    @Override
    public CompletableFuture<?> save(User user) throws CustomException {
        log.info("서비스:Save 진입");
        return CompletableFuture.supplyAsync(() -> {
            log.info("서비스:Future 진입");
            return Arrays.asList(user);
        }, ex).thenCompose(u -> {
            log.info("서비스:리포지토리 접근 시작");
            return userRepositoryJDBC.saveAll(u);
        }).exceptionally(e -> {
            log.info("서비스:에러={}", e.getMessage());
            if (e.getCause() instanceof CustomException) {
                CustomException ex = (CustomException) e.getCause(); // 정의된 CustomException이라면 타입 캐스팅
                throw ex;
            } else {
                throw new RuntimeException(); // 처음보는 에러라면, RuntimeException 반환
            }
        }).thenRun(() -> {
            log.info("서비스:끝");
        });
    }
    ...
}

1-2-4. UserRepositoryJDBC 의 saveAll 메소드 CompletableFuture 반환하기

얘는 따로 @Qualifier("taskExecutorForDB")로 빈에서 Executor 을 주입받았습니다. CompletableFuture.runAsync(()->{ … }, [주입받은 Executor])

이후에 Batch 를 적용시키기 위해서 saveAll 로 메소드 네임을 적었습니다.

@Slf4j
@Repository
public class UserRepositoryJDBC {
    private final JdbcTemplate jdbcTemplate;
    private final Executor databaseExecutor;

    public UserRepositoryJDBC(JdbcTemplate jdbcTemplate, @Qualifier("taskExecutorForDB") Executor databaseExecutor) {
        this.jdbcTemplate = jdbcTemplate;
        this.databaseExecutor = databaseExecutor;
    }

    @Transactional
    public CompletableFuture<?> saveAll(List<User> users) throws CustomException {
        log.info("리포지토리:진입");
        return CompletableFuture.runAsync(()->{
            log.info("리포지토리:쿼리시작");
            String sql = "INSERT INTO user_table (user_id, email, join_date, login_date, logout_date, user_name, user_pw) " +
                    "VALUES (?, ?, ?, ?, ?, ?, ?) ";

            jdbcTemplate.batchUpdate(sql,
                    users,
                    users.size(),
                    (PreparedStatement ps, User user) -> {
                        ps.setString(1, user.getUserId());
                        ps.setString(2, user.getEmail());
                        ps.setObject(3, user.getJoinDate());
                        ps.setObject(4, user.getLoginDate());
                        ps.setObject(5, user.getLogoutDate());
                        ps.setString(6,user.getUserName());
                        ps.setString(7,user.getUserPw());
                    });

        },databaseExecutor).exceptionally(e->{
            log.info("리포지토리:에러={}",e.getClass());
            if (e.getCause().getClass() == DuplicateKeyException.class){
                log.info("리포지토리:에러반환={}",DUPLICATE_RESOURCE);
                throw new CustomException(DUPLICATE_RESOURCE);
            }
            throw new RuntimeException();
        });
    }
}

1-2-5. UserController 의 non-blocking 호출

@Slf4j
@RestController
@RequestMapping("/auth")
@RequiredArgsConstructor
public class UserController extends KafkaTopicConst {
    private final UserService userService;
    private final KafkaTemplate<String, Object> kafkaProducerTemplate;
    ...
    @PostMapping("/user")
    public DeferredResult<ResponseEntity<?>> addUser(@RequestBody RequestAddUserDTO request){

        DeferredResult<ResponseEntity<?>> dr = new DeferredResult<>();

        User user = new User(
                request.getUserId(),
                request.getUserPw(),
                request.getEmail(),
                request.getUserName(),
                LocalDate.now(),
                LocalDate.now(),
                LocalDate.now()
        );

        // 유저 서비스를 통해 유저 저장
        saveUserHandler(dr, user);
        
        // 카프카 메세지 전송 (유저 저장에 실패해도 전송)
        sendToKafkaWithKey(TOPIC_USER_CHANGE, new RequestUserChange(user.getUserId(), user.getUserName(), "", "INSERT"), user.getUserId());
        
        return dr;
    }
    
    private void saveUserHandler(DeferredResult<ResponseEntity<?>> dr, User user) {
        log.info("컨트롤러:유저 저장을 위한 서비스 접근 시작");
        CompletableFuture
                .runAsync(() -> {
                    log.info("컨트롤러:Future 진입");
                }).thenCompose(s -> {
                    log.info("컨트롤러:서비스 진입 시작");
                    return userService.save(user);
                }).thenAccept(s1 -> {
                    log.info("컨트롤러:서비스 정상종료");
                    dr.setResult(ResponseEntity.ok("success"));
                }).exceptionally(e -> {
                    log.info("컨트롤러:에러={}", e.getMessage());
                    if (e.getCause() instanceof CustomException) {
                        dr.setResult(ErrorResponse.toResponseEntity(((CustomException) e.getCause()).getErrorCode()));
                    } else {
                        dr.setResult(ResponseEntity.badRequest().body("default bad request response"));
                    }
                    return null;
                }).thenRun(() -> {
                    log.info("컨트롤러:반환=DeferredResults {}", dr.getResult());
                });
    }

    private CompletableFuture<?> sendToKafkaWithKey(String topic,Object req, String key) {
        log.info("컨트롤러:카프카 전송 시작");
        ListenableFuture<SendResult<String, Object>> future = kafkaProducerTemplate.send(topic,key, req);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                log.error("메세지 전송 실패={}", ex.getMessage());
            }
            @Override
            public void onSuccess(SendResult<String, Object> result) {
                log.info("메세지 전송 성공 topic={}, key={}, offset={}, partition={}",topic, key, result.getRecordMetadata().offset(), result.getRecordMetadata().partition());
            }
        });

        return toCF(future);
    }

    <T> CompletableFuture<T> toCF(ListenableFuture<T> lf){
        CompletableFuture<T> cf = new CompletableFuture<T>();
        lf.addCallback(s-> cf.complete(s), e-> cf.completeExceptionally(e));
        return cf;
    }
    ...
    
}
  1. 서버는 /user 요청이 들어오면, addUser에서 saveUserHandler를 호출하게 됩니다. 또한 sendToKafkaWithKey 도 호출합니다.
  2. 먼저 saveUserHandler는 ForkJoinPool.commonPool 의 스레드에서 실행되며, userService의 save를 호출합니다.
  3. 만약 thenCompose 이후 then...Async를 수행한다면, 새로운 스레드로 실행되며 그렇지 않은 경우에는 userService의 스레드가 그대로 실행합니다. 저같은 경우는 userService의 스레드가 그대로 실행하게 되겠네요.
  4. 유저가 정상적으로 저장된다면, Response.ok를 DeferredResult 객체에 넣어줍니다.
  5. 만약 중간에 에러가 발생할 경우 따로 정의한 ErrorResponse.toResponseEntity({예외설명})를 반환하게 됩니다.
  6. 미리 정의한 에러타입이 아니라면 Response.badRequest를 반환하게 됩니다.
  • 동시에 실행되는 sendToKafkaWithKey를 이제 볼까요?
  1. Spring KafkaTemplate 에서 제공하는 성공반환여부 객체는 CompletableFuture 객체가 아닌 ListenableFuture 객체입니다. 따라서 우리는 이후 thenCompose로 넘길 것을 대비해서 객체를 toCF() 를 통해 변환시켜줄거에요(feat. 토비님). sendToKafkaWithKey 객체는 이제 메세지 전송 성공 여부를 별도의 스레드로 실행하게 됩니다!
  • 에러를 로깅만 할 것이라면, Spring 5.3.x 에서 지원하는 아래의 방법 사용할 수 있습니다.

2. [로깅목적] Multi Thread with Custom Exception, But only for logging

AsyncUncaughtExceptionHandler, AsyncConfigurerSupport를 통해서 @Async 어노테이션을 로깅 목적으로 사용할 수 있습니다.(메인 스레드로의 반환 X)

2-1. 기존 Single Thread with Custom Exception

@Slf4j
@RestControllerAdvice
public class GlobalExceptionHandler extends ResponseEntityExceptionHandler {

    @ExceptionHandler(value = { ConstraintViolationException.class, DataIntegrityViolationException.class})
    protected ResponseEntity<ErrorResponse> handleDataException() {
        log.error("handleDataException throw Exception : {}", DUPLICATE_RESOURCE);
        return ErrorResponse.toResponseEntity(DUPLICATE_RESOURCE);
    }

    @ExceptionHandler(value = { CustomException.class })
    protected ResponseEntity<ErrorResponse> handleCustomException(CustomException e) {
        log.error("handleCustomException throw CustomException : {}", e.getErrorCode());
        return ErrorResponse.toResponseEntity(e.getErrorCode());
    }
}

위의 코드는 글로벌하게 Controller에서 Exception을 throw해서 해당 클래스가 CustomException, ConstraintViolationException, DataIntegrityViolationException 일 때 ResponseEntity를 클라이언트에 반환하도록 하는 RestControllerAdvice 입니다.

저는 해당 코드를 통해서 Service 에서 발생한 Exception들을 핸들링하는데요. 문제는 @ExceptionHandler는 오직 동기 예외처리만 한다는 것이었습니다. 따라서 아래와 같이 코드를 수정했는데요. 다만 AsyncUncaughtExceptionHandler은 호출 스레드에게 return을 할 수 없다는 단점이 존재합니다. 따라서 아래는 단순히 로깅을 목적으로 작성해야만 합니다.

2-2. CustomAsyncExceptionHandler 추가

public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... obj) {
        System.out.println("Exception message - " + ex.toString() + ex.getClass().getName());
        System.out.println("Method name - " + method.getName());
        for (Object param : obj) {
            System.out.println("Parameter value - " + param.getClass().getName());
        }
    }
}

2-3. AsyncConfig 변경

@Configuration
@Slf4j
@EnableAsync
public class AsyncConfig extends AsyncConfigurerSupport {

    @Override
    @Bean(name = "taskExecutor")
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor t = new ThreadPoolTaskExecutor();
        t.setCorePoolSize(10);
        t.setMaxPoolSize(100);
        t.setQueueCapacity(10);
        t.setThreadNamePrefix("auth-thread-");

        t.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        t.setWaitForTasksToCompleteOnShutdown(true);
        t.setAwaitTerminationSeconds(60);

        t.initialize();
        return t;
    }
    
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new CustomAsyncExceptionHandler();
    }

}

2-4. UserService의 save 메소드 수정

@Async
@Service
@Transactional
public class UserServiceImpl implements UserService {
    ...
    @Override
    public User save(User user) throws CustomException {
        Optional<User> findUser = userRepository.findById(user.getUserId());

        if (findUser.isPresent()) {
            throw new CustomException(DUPLICATE_RESOURCE);
        } else {
            User save = userRepository.save(user);
            return save;
        }
    }
}

편하게 로깅목적으로만 사용할 것이라면, Spring 이 지원해주는 메소드를 상속받아서 사용하는게 좋겠죠? 단점은 여러 스레드를 사용하는데 제대로 활용할 수가 없죠. 이유는 스레드 간 반환을 설정할 수 없거든요!

3. 결론

결론적으로 이 기나긴 변환과정은 Java 와 Spring 자체에 대한 이해도를 키우는데 도움이 되었습니다. 이제부터는 JPA를 전부 네이티브 쿼리로 바꾸는 작업과 JDBC Batch 작업이 남았네요!