created at 2023-03-06
INDEX
- 서론
- Multi Thread with Custom Exception, Not using @Async!
- 멀티스레딩 결과
- 정상적인 api 요청 결과
- db-유니크 키 violation 에러 핸들링 결과
- 신나는 코딩!
- 스레드 풀 및 TaskExecutor 설정해서 Bean에 올리기
- IoC 설정
- UserService 의 save 메소드 CompletableFuture 반환하기
- UserRepositoryJDBC 의 saveAll 메소드 CompletableFuture 반환하기 및 네이티브 쿼리
- UserController 의 non-blocking 호출
- 멀티스레딩 결과
- [로깅목적 멀티스레딩] Multi Thread with Custom Exception, But only for logging
- 기존 Single Thread with Custom Exception
- CustomAsyncExceptionHandler 추가
- AsyncConfig 변경
- UserService의 save 메소드 수정
0. 서론
저는 최근에 계속 Chat Server 을 최적화하는 과정을 수행하고 있는데요. 비동기로 바꾸고 별로 문제없는 줄 알았지만 JPA 에러를 핸들링하는 CustomException 이 동작하지 않았습니다ㅜㅜ.
사실 당연한게, 단순히 @Async 어노테이션을 사용하여 JPA-DB 에러를 핸들링한다면, 해당 스레드는 그냥 에러를 반환하지 않고 끝나버리게 됩니다. 즉, 에러 자체를 메인 스레드가 반환받지 못한다는 것이죠!
그래서 저는 애초에 아래의 문제점이 발생하지 않도록, 스레드와 쿼리들을 완벽히 컨트롤하고 싶었습니다.
- JPA 쿼리 최적화 문제 : 네이티브 쿼리로 상세하게 쿼리들을 Optimizing 할 수 없음
- Batch 성능 문제 : JPA 배치성능 «< JDBC 배치성능
- Multi threading 문제 : JPA 호환이 어려움(아래에서 설명)
- JPA 를 멀티 스레드로 사용 시 문제점
JPA repository 는 @Async 어노테이션을 지원하긴 합니다. 하지만, 오직 새로운 메소드에만 할당할 수 있어요! JPA 기본제공 메소드(ex. save, saveAll, find, etc.)들은 퓨쳐객체를 반환하는 것이 아니기에, 멀티 스레딩 환경에서는 사용할 수 없습니다.
- 그러면 어떻게 해야될까요?
- 기존에 JPA가 제공하던 메소드들을 직접 네이티브 쿼리로 짜야되요!(i.e. INSERT 문은 JPA 가 기존 save, saveAll 메소드를 통해서만 제공하기 떄문에, INSERT 문에 멀티 스레딩을 적용하기 위해서는 네이티브 쿼리로 직접 작성해야합니다.)
- Repository/Service 의 메소드 반환 타입을 CompletableFuture 객체로 설정합니다.
CompletableFuture란?
Future 객체에 CallBack 을 더한 객체입니다. 따라서 새로운 스레드로 task를 전송한 뒤, Non-blocking 으로 hooking 할 수 있습니다.
- CustomException 을 추가적으로 핸들링하도록 설정합니다.(기존에는 GlobalExceptionHandler 로 AOP 를 작성했지만, 따로 떼줄거에요)
1. Multi Thread with Custom Exception, Not using @Async!
지금부터 @Async
를 전부 빼버리고 직접 스레드와 쿼리들을 컨트롤 할거에요. 그리고 Spring Bean 의 IoC를 활용할거에요.
먼저 결과부터 확인해 볼까요?
1-1 멀티스레딩 결과
1-1-1. 정상적인 api 요청 결과
1-1-2. db-유니크 키 violation 에러 핸들링 결과
일단 요청은 2개의 메소드(1. Kafka에 메세지 전송, 2. DB에 저장요청)를 비동기 Non-blocking 으로 설계했습니다. 정상 상황일 때는 각자 정상적으로 동작하는 것을 확인할 수 있어요.
이제 일부러 유니크 키 violation을 유발하는 api 요청을 보내보았습니다. userRepository에서 ConcurrentException을 볼 수 있죠. 이게, 실제 Exception인 DuplicateKeyException
을 감싸고 있는 exception 인데요. 즉 중첩된 Exception이죠. 이 부분을 unwrap해서 DuplicateKeyException
을 CustomException
으로 변환해주었어요. 그리고 db-thread-4
스레드는 서비스와 컨트롤러를 거슬러 올라가서 마지막으로 클라이언트에게 에러를 반환해주게 됩니다!
이제 코드를 확인해 볼까요?
1-2 신나는 코딩!
1-2-1. 스레드 풀 및 TaskExecutor 설정해서 Bean에 올리기
먼저 할당하고자 하는 스레드 개수와 TaskExecutor가 멀티스레딩에서는 필수입니다. 기본적으로는 Java는 ForkJoinPool.commonPool을 사용할 텐데요. 저는 서비스와 데이터베이스에 각각 스레드 풀을 할당하고 싶었습니다. 아래와 같이 빈으로 각각의 Executor 을 빈에 올려주세요.
@Configuration
@Slf4j
public class AsyncConfig {
@Bean(name = "taskExecutorForService")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor t = new ThreadPoolTaskExecutor();
t.setCorePoolSize(10);
t.setMaxPoolSize(100);
t.setQueueCapacity(10);
t.setThreadNamePrefix("service-thread-");
t.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
t.setWaitForTasksToCompleteOnShutdown(true);
t.setAwaitTerminationSeconds(60);
t.initialize();
return t;
}
@Bean(name = "taskExecutorForDB")
public Executor AsyncExecutorForDB() {
ThreadPoolTaskExecutor t = new ThreadPoolTaskExecutor();
t.setCorePoolSize(10);
t.setMaxPoolSize(100);
t.setQueueCapacity(10);
t.setThreadNamePrefix("db-thread-");
t.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
t.setWaitForTasksToCompleteOnShutdown(true);
t.setAwaitTerminationSeconds(60);
t.initialize();
return t;
}
}
1-2-2. IoC 설정
아래와 같이 UserService 서비스에 @Qualifier
어노테이션을 통해 빈에 등록된 taskExecutorForService
Executor을 주입시킬거에요.
@Slf4j
@Configuration
public class JpaConfig {
private final EntityManager em;
private final UserRepository userRepository;
private final KafkaTemplate<String, Object> kafkaProducerTemplate;
private final UserRepositoryJDBC userRepositoryJDBC;
private final JdbcTemplate jdbcTemplate;
private final Executor serviceExecutor;
public JpaConfig(EntityManager em,
UserRepository userRepository,
KafkaTemplate<String, Object> kafkaProducerTemplate,
UserRepositoryJDBC userRepositoryJDBC,
JdbcTemplate jdbcTemplate,
@Qualifier("taskExecutorForService") Executor serviceExecutor) {
this.em = em;
this.userRepository = userRepository;
this.kafkaProducerTemplate = kafkaProducerTemplate;
this.userRepositoryJDBC = userRepositoryJDBC;
this.jdbcTemplate = jdbcTemplate;
this.serviceExecutor = serviceExecutor;
}
@Bean
public UserService userService() {
return new UserServiceImpl(userRepository, userRepositoryJDBC, kafkaProducerTemplate, serviceExecutor);
}
}
1-2-3. UserService 의 save 메소드 CompletableFuture 반환하기
컨트롤러에서는 유저를 저장하기 위해서 서비스의 save 메소드를 호출하는데요. 아래와 같이 CompletableFuture을 와일드카드티입으로 반환하도록 설정합니다. 테스트하기 쉽게 log.info()
들은 남겨둘게요 :)
ex는 주입받은 Executor(serviceExecutor) 입니다!
@Slf4j
@Service
@Transactional
public class UserServiceImpl extends KafkaTopicConst implements UserService {
...
@Override
public CompletableFuture<?> save(User user) throws CustomException {
log.info("서비스:Save 진입");
return CompletableFuture.supplyAsync(() -> {
log.info("서비스:Future 진입");
return Arrays.asList(user);
}, ex).thenCompose(u -> {
log.info("서비스:리포지토리 접근 시작");
return userRepositoryJDBC.saveAll(u);
}).exceptionally(e -> {
log.info("서비스:에러={}", e.getMessage());
if (e.getCause() instanceof CustomException) {
CustomException ex = (CustomException) e.getCause(); // 정의된 CustomException이라면 타입 캐스팅
throw ex;
} else {
throw new RuntimeException(); // 처음보는 에러라면, RuntimeException 반환
}
}).thenRun(() -> {
log.info("서비스:끝");
});
}
...
}
1-2-4. UserRepositoryJDBC 의 saveAll 메소드 CompletableFuture 반환하기
얘는 따로 @Qualifier("taskExecutorForDB")
로 빈에서 Executor 을 주입받았습니다. CompletableFuture.runAsync(()->{ … }, [주입받은 Executor])
이후에 Batch 를 적용시키기 위해서 saveAll 로 메소드 네임을 적었습니다.
@Slf4j
@Repository
public class UserRepositoryJDBC {
private final JdbcTemplate jdbcTemplate;
private final Executor databaseExecutor;
public UserRepositoryJDBC(JdbcTemplate jdbcTemplate, @Qualifier("taskExecutorForDB") Executor databaseExecutor) {
this.jdbcTemplate = jdbcTemplate;
this.databaseExecutor = databaseExecutor;
}
@Transactional
public CompletableFuture<?> saveAll(List<User> users) throws CustomException {
log.info("리포지토리:진입");
return CompletableFuture.runAsync(()->{
log.info("리포지토리:쿼리시작");
String sql = "INSERT INTO user_table (user_id, email, join_date, login_date, logout_date, user_name, user_pw) " +
"VALUES (?, ?, ?, ?, ?, ?, ?) ";
jdbcTemplate.batchUpdate(sql,
users,
users.size(),
(PreparedStatement ps, User user) -> {
ps.setString(1, user.getUserId());
ps.setString(2, user.getEmail());
ps.setObject(3, user.getJoinDate());
ps.setObject(4, user.getLoginDate());
ps.setObject(5, user.getLogoutDate());
ps.setString(6,user.getUserName());
ps.setString(7,user.getUserPw());
});
},databaseExecutor).exceptionally(e->{
log.info("리포지토리:에러={}",e.getClass());
if (e.getCause().getClass() == DuplicateKeyException.class){
log.info("리포지토리:에러반환={}",DUPLICATE_RESOURCE);
throw new CustomException(DUPLICATE_RESOURCE);
}
throw new RuntimeException();
});
}
}
1-2-5. UserController 의 non-blocking 호출
@Slf4j
@RestController
@RequestMapping("/auth")
@RequiredArgsConstructor
public class UserController extends KafkaTopicConst {
private final UserService userService;
private final KafkaTemplate<String, Object> kafkaProducerTemplate;
...
@PostMapping("/user")
public DeferredResult<ResponseEntity<?>> addUser(@RequestBody RequestAddUserDTO request){
DeferredResult<ResponseEntity<?>> dr = new DeferredResult<>();
User user = new User(
request.getUserId(),
request.getUserPw(),
request.getEmail(),
request.getUserName(),
LocalDate.now(),
LocalDate.now(),
LocalDate.now()
);
// 유저 서비스를 통해 유저 저장
saveUserHandler(dr, user);
// 카프카 메세지 전송 (유저 저장에 실패해도 전송)
sendToKafkaWithKey(TOPIC_USER_CHANGE, new RequestUserChange(user.getUserId(), user.getUserName(), "", "INSERT"), user.getUserId());
return dr;
}
private void saveUserHandler(DeferredResult<ResponseEntity<?>> dr, User user) {
log.info("컨트롤러:유저 저장을 위한 서비스 접근 시작");
CompletableFuture
.runAsync(() -> {
log.info("컨트롤러:Future 진입");
}).thenCompose(s -> {
log.info("컨트롤러:서비스 진입 시작");
return userService.save(user);
}).thenAccept(s1 -> {
log.info("컨트롤러:서비스 정상종료");
dr.setResult(ResponseEntity.ok("success"));
}).exceptionally(e -> {
log.info("컨트롤러:에러={}", e.getMessage());
if (e.getCause() instanceof CustomException) {
dr.setResult(ErrorResponse.toResponseEntity(((CustomException) e.getCause()).getErrorCode()));
} else {
dr.setResult(ResponseEntity.badRequest().body("default bad request response"));
}
return null;
}).thenRun(() -> {
log.info("컨트롤러:반환=DeferredResults {}", dr.getResult());
});
}
private CompletableFuture<?> sendToKafkaWithKey(String topic,Object req, String key) {
log.info("컨트롤러:카프카 전송 시작");
ListenableFuture<SendResult<String, Object>> future = kafkaProducerTemplate.send(topic,key, req);
future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
log.error("메세지 전송 실패={}", ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("메세지 전송 성공 topic={}, key={}, offset={}, partition={}",topic, key, result.getRecordMetadata().offset(), result.getRecordMetadata().partition());
}
});
return toCF(future);
}
<T> CompletableFuture<T> toCF(ListenableFuture<T> lf){
CompletableFuture<T> cf = new CompletableFuture<T>();
lf.addCallback(s-> cf.complete(s), e-> cf.completeExceptionally(e));
return cf;
}
...
}
- 서버는
/user
요청이 들어오면, addUser에서saveUserHandler
를 호출하게 됩니다. 또한sendToKafkaWithKey
도 호출합니다. - 먼저
saveUserHandler
는 ForkJoinPool.commonPool 의 스레드에서 실행되며, userService의 save를 호출합니다. - 만약 thenCompose 이후
then...Async
를 수행한다면, 새로운 스레드로 실행되며 그렇지 않은 경우에는 userService의 스레드가 그대로 실행합니다. 저같은 경우는 userService의 스레드가 그대로 실행하게 되겠네요. - 유저가 정상적으로 저장된다면, Response.ok를 DeferredResult 객체에 넣어줍니다.
- 만약 중간에 에러가 발생할 경우 따로 정의한
ErrorResponse.toResponseEntity({예외설명})
를 반환하게 됩니다. - 미리 정의한 에러타입이 아니라면 Response.badRequest를 반환하게 됩니다.
- 동시에 실행되는
sendToKafkaWithKey
를 이제 볼까요?
- Spring KafkaTemplate 에서 제공하는 성공반환여부 객체는 CompletableFuture 객체가 아닌 ListenableFuture 객체입니다. 따라서 우리는 이후 thenCompose로 넘길 것을 대비해서 객체를
toCF()
를 통해 변환시켜줄거에요(feat. 토비님).sendToKafkaWithKey
객체는 이제 메세지 전송 성공 여부를 별도의 스레드로 실행하게 됩니다!
- 에러를 로깅만 할 것이라면, Spring 5.3.x 에서 지원하는 아래의 방법 사용할 수 있습니다.
2. [로깅목적] Multi Thread with Custom Exception, But only for logging
AsyncUncaughtExceptionHandler
, AsyncConfigurerSupport
를 통해서 @Async 어노테이션을 로깅 목적으로 사용할 수 있습니다.(메인 스레드로의 반환 X)
2-1. 기존 Single Thread with Custom Exception
@Slf4j
@RestControllerAdvice
public class GlobalExceptionHandler extends ResponseEntityExceptionHandler {
@ExceptionHandler(value = { ConstraintViolationException.class, DataIntegrityViolationException.class})
protected ResponseEntity<ErrorResponse> handleDataException() {
log.error("handleDataException throw Exception : {}", DUPLICATE_RESOURCE);
return ErrorResponse.toResponseEntity(DUPLICATE_RESOURCE);
}
@ExceptionHandler(value = { CustomException.class })
protected ResponseEntity<ErrorResponse> handleCustomException(CustomException e) {
log.error("handleCustomException throw CustomException : {}", e.getErrorCode());
return ErrorResponse.toResponseEntity(e.getErrorCode());
}
}
위의 코드는 글로벌하게 Controller에서 Exception을 throw해서 해당 클래스가 CustomException
, ConstraintViolationException
, DataIntegrityViolationException
일 때 ResponseEntity를 클라이언트에 반환하도록 하는 RestControllerAdvice 입니다.
저는 해당 코드를 통해서 Service 에서 발생한 Exception들을 핸들링하는데요. 문제는 @ExceptionHandler
는 오직 동기 예외처리만 한다는 것이었습니다. 따라서 아래와 같이 코드를 수정했는데요. 다만 AsyncUncaughtExceptionHandler
은 호출 스레드에게 return을 할 수 없다는 단점이 존재합니다. 따라서 아래는 단순히 로깅을 목적으로 작성해야만 합니다.
2-2. CustomAsyncExceptionHandler 추가
public class CustomAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... obj) {
System.out.println("Exception message - " + ex.toString() + ex.getClass().getName());
System.out.println("Method name - " + method.getName());
for (Object param : obj) {
System.out.println("Parameter value - " + param.getClass().getName());
}
}
}
2-3. AsyncConfig 변경
@Configuration
@Slf4j
@EnableAsync
public class AsyncConfig extends AsyncConfigurerSupport {
@Override
@Bean(name = "taskExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor t = new ThreadPoolTaskExecutor();
t.setCorePoolSize(10);
t.setMaxPoolSize(100);
t.setQueueCapacity(10);
t.setThreadNamePrefix("auth-thread-");
t.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
t.setWaitForTasksToCompleteOnShutdown(true);
t.setAwaitTerminationSeconds(60);
t.initialize();
return t;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new CustomAsyncExceptionHandler();
}
}
2-4. UserService의 save 메소드 수정
@Async
@Service
@Transactional
public class UserServiceImpl implements UserService {
...
@Override
public User save(User user) throws CustomException {
Optional<User> findUser = userRepository.findById(user.getUserId());
if (findUser.isPresent()) {
throw new CustomException(DUPLICATE_RESOURCE);
} else {
User save = userRepository.save(user);
return save;
}
}
}
편하게 로깅목적으로만 사용할 것이라면, Spring 이 지원해주는 메소드를 상속받아서 사용하는게 좋겠죠? 단점은 여러 스레드를 사용하는데 제대로 활용할 수가 없죠. 이유는 스레드 간 반환을 설정할 수 없거든요!
3. 결론
결론적으로 이 기나긴 변환과정은 Java 와 Spring 자체에 대한 이해도를 키우는데 도움이 되었습니다. 이제부터는 JPA를 전부 네이티브 쿼리로 바꾸는 작업과 JDBC Batch 작업이 남았네요!