메인로직에서 CDL 분리시키고 해당 큐에선 좀 더 retry 해본 뒤 느리게 천천히 소비되도록 설정하였습니다.

또한 /actuator/prometheus 에 값 갱신 업데이트 및 scrap 해상도를 임시로 1초로 설정해서 테스트에 용이하도록 하였습니다.
또한 consumer 제거 이전 내부 pending 된 메세지들을 pagination 으로 다른 살아있는 consumer 에 할당해준 뒤 제거하도록 아래와 같이 구성하였습니다. 이러면 consumer 제거할 때 관련된 pending 정보들이 초기화되긴 하지만(몇 번 retry 했는지가 1로 설정됨) 메세지 유실이 없으므로 안전하게 consumer 들을 정리해줄 수 있습니다!
전체 코드는 repo 에 있습니다.
@Scheduled(fixedDelay = 1000, initialDelay = 1000)
fun reprocessPendingMessages() {
// monitoringStreams 순회
// -> reprocessStreamPendingMessages 호출
}
private fun reprocessStreamPendingMessages(streamKey: String, minIdleTime: Long) {
// XINFO GROUPS 조회
// 각 group에 대해
// -> handlePendings 호출
}
@Scheduled(fixedDelay = IDLE_CONSUMER_THRESHOLD_MS, initialDelay = IDLE_CONSUMER_THRESHOLD_MS)
fun removeIdleActiveConsumers() {
// allStream 순회
// -> removeStreamIdleConsumers 호출
}
private fun removeStreamIdleConsumers(streamKey: String) {
// XINFO GROUPS 조회
// 각 group에 대해
// -> removeGroupIdleConsumers 호출
}
private fun removeGroupIdleConsumers(streamKey: String, groupName: String) {
// XINFO CONSUMERS 조회
// idle / active consumer 분리
// idle consumer에 대해
// -> reassignPendingMessages 호출
// -> removeConsumer 호출
}
private fun reassignPendingMessages(
streamKey: String,
groupName: String,
idleConsumerName: String,
activeConsumerNames: List<String>
) {
// XPENDING cursor 기반 페이지네이션
// 각 message에 대해
// -> XCLAIM으로 active consumer에 재할당
}
private fun removeConsumer(streamKey: String, groupName: String, consumerName: String): Boolean {
// XGROUP DELCONSUMER 호출
}
private fun handlePendings(
streamKey: String,
groupName: String,
batchSize: Long,
minIdleTime: Long
) {
// XAUTOCLAIM 호출
// 각 message에 대해
// -> pending 정보 조회
// -> 재시도 초과 시 send(DLQ) 또는 ackDel
}