Skip to main content Link Menu Expand (external link) Document Search Copy Copied

메인로직에서 CDL 분리시키고 해당 큐에선 좀 더 retry 해본 뒤 느리게 천천히 소비되도록 설정하였습니다.

또한 /actuator/prometheus 에 값 갱신 업데이트 및 scrap 해상도를 임시로 1초로 설정해서 테스트에 용이하도록 하였습니다.

또한 consumer 제거 이전 내부 pending 된 메세지들을 pagination 으로 다른 살아있는 consumer 에 할당해준 뒤 제거하도록 아래와 같이 구성하였습니다. 이러면 consumer 제거할 때 관련된 pending 정보들이 초기화되긴 하지만(몇 번 retry 했는지가 1로 설정됨) 메세지 유실이 없으므로 안전하게 consumer 들을 정리해줄 수 있습니다!

전체 코드는 repo 에 있습니다.

@Scheduled(fixedDelay = 1000, initialDelay = 1000)
fun reprocessPendingMessages() {
    // monitoringStreams 순회
    // -> reprocessStreamPendingMessages 호출
}

private fun reprocessStreamPendingMessages(streamKey: String, minIdleTime: Long) {
    // XINFO GROUPS 조회
    // 각 group에 대해
    // -> handlePendings 호출
}

@Scheduled(fixedDelay = IDLE_CONSUMER_THRESHOLD_MS, initialDelay = IDLE_CONSUMER_THRESHOLD_MS)
fun removeIdleActiveConsumers() {
    // allStream 순회
    // -> removeStreamIdleConsumers 호출
}

private fun removeStreamIdleConsumers(streamKey: String) {
    // XINFO GROUPS 조회
    // 각 group에 대해
    // -> removeGroupIdleConsumers 호출
}

private fun removeGroupIdleConsumers(streamKey: String, groupName: String) {
    // XINFO CONSUMERS 조회
    // idle / active consumer 분리
    // idle consumer에 대해
    // -> reassignPendingMessages 호출
    // -> removeConsumer 호출
}

private fun reassignPendingMessages(
    streamKey: String,
    groupName: String,
    idleConsumerName: String,
    activeConsumerNames: List<String>
) {
    // XPENDING cursor 기반 페이지네이션
    // 각 message에 대해
    // -> XCLAIM으로 active consumer에 재할당
}

private fun removeConsumer(streamKey: String, groupName: String, consumerName: String): Boolean {
    // XGROUP DELCONSUMER 호출
}

private fun handlePendings(
    streamKey: String,
    groupName: String,
    batchSize: Long,
    minIdleTime: Long
) {
    // XAUTOCLAIM 호출
    // 각 message에 대해
    // -> pending 정보 조회
    // -> 재시도 초과 시 send(DLQ) 또는 ackDel
}