Skip to main content Link Menu Expand (external link) Document Search Copy Copied

서론

저희 팀은 유저 주문 시 발생하는 알림 전송 로직을 기존에는 주문 처리 흐름 안에 직접 포함해 운영하고 있었습니다. 이 알림 로직은 PUSH → SMS → 알림톡 순서로 failover 되며, 각 단계가 순차적으로 처리되는 구조입니다. 문제는 이 모든 과정이 동기적이고 블로킹된다는 것이죠. 예를 들어 외부 서비스의 타임아웃이 각 단계마다 5초라면, 최악의 경우 주문 요청이 최대 15초간 지연될 수 있습니다. 즉, 유저가 주문을 눌렀을 때, 실제 주문 완료까지 15초를 기다려야 하는 상황이 발생합니다.

그래서 저희는 알림 전송과 같은 시간이 오래 걸리는 작업을 메인 로직에서 분리하여 비동기적으로 처리하기로 결정했고, 이를 위해 메시지 큐를 도입하기로 했습니다.

처음에는 Amazon ActiveMQ를 고려했습니다. AWS Client VPN 연동이나 IAM 기반 인증 등 운영 측면에서 편리합니다. 하지만 비용을 확인해보니, 싱글 브로커 기준으로 월 약 100만원 수준이었습니다. 또한, 이번에 분리하려는 노티 시스템 +a는 디스크 기반의 영구 저장이 필요하지 않고 당장 수평 확장까지 고려할 상황도 아니었기에 과한 선택이라는 판단이 들었습니다. 개발 리소스도 더 필요하구요.

a

그래서 “기존 사용하던 redis 에 신규기능인 stream 으로 mq 구현해서 사용해보자” 로 방향을 틀었고 아래는 설계 포인트입니다.

  1. 이벤트 리밸런싱
  2. 이벤트 균등분배
  3. DLQ(Dead Letter Queue)
  4. consumer 관리

하나씩 볼까요?

0. 요약 먼저! 기술적 문제와 해결 방법

먼저 abstract 부터 보시죠. “그래서 어떤 문제가 있었고 어떻게 해결했는데?” 를 시간순서대로 요약 및 표로 나타내었습니다.

문제 순서문제 발견방향 설정수정 방법해결된 문제
1알림 로직이 동기 블로킹되어 주문 응답 지연 (최대 15초)알림을 메인 로직에서 분리하여 비동기 처리Amazon MQ 나 Redis Stream 기반 MQ 도입주문 속도 개선
2Amazon MQ는 비용 과다 및 과한 기능Redis Stream으로 대체기존 Redis 의 Stream 기능으로 MQ 직접 구축비용 절감, 간단한 수준의 운영 가능
3죽은 consumer가 PEL 메시지 방치timeout 메시지를 다른 consumer가 회수XAUTOCLAIM, XCLAIM, 주기적 PEL 조회장애 복구 가능, 메시지 유실 방지
4실패 이벤트들이 특정 consumer에만 분배실패 이벤트 균등 분배min(hash(eventId + consumerId)) 기반 할당, 락 처리실패 이벤트 균등 분배
5consumer 추가 시 중복 처리 발생 가능처리 전 mutex 설정해서 선점Redis Lock으로 이벤트 단독 소비 보장중복 소비 방지, 데이터 정합성 유지
6실패 메시지 무한 재시도실패 횟수 기준 DLQ 분기횟수 초과 시 DLQ로 이동, 로깅시스템 안정성 향상, abnormal한 이벤트 격리
7consumer 상태 추적 불가consumer 생존 여부 직접 관리key + TTL, Set 기반 Heartbeat 구현실시간 consumer 관리, 리밸런싱 기준(consumerId) 확보
8Stream이 메모리 기반이라 용량 관리에 민감오래된 메시지 삭제 처리XDEL로 과거 이벤트 순차 제거메모리 누수 방지

1. 이벤트 리밸런싱

Redis Stream에서 consumer가 XREADGROUP 명령을 통해 이벤트를 소비하면, 해당 이벤트는 PEL(Pending Entry List) 에 등록됩니다. 이 상태는 “메시지를 가져갔지만 아직 처리 완료(ACK)하지 않았다”는 의미입니다. 만약 consumer가 메시지를 처리하는 도중에 1) 죽거나 2) 예외가 발생해 ACK를 보내지 않으면, 해당 메시지는 계속 PEL에 남아 있게 됩니다. 이 때 일정 시간(idle_time)이 지난 후에도 ACK되지 않은 메시지는, 다른 살아있는 consumer가 XAUTOCLAIM 또는 XCLAIM 명령을 통해 가져가서 재소비 해야합니다. 이렇게 메시지를 다른 곳에서 이전하여 처리는 과정을 “이벤트 리밸런싱” 이라고 부릅니다. 그러면 정의 이후 구현은 간단하죠!

consumer 들이 주기적으로 PEL 를 조회하여 처리실패 된 이벤트를 가져와서 재처리하도록 구현하면됩니다.

추가로 메모리 기반 한정된 용량이므로 xdel 로 일정이상 큐에 쌓이면 옛날 이벤트 순으로 제거해야합니다.

2. 실패 이벤트 균등분배

저는 consumer들이 1초 주기 스케줄러로 XPENDING 로 리스트(ACK 가 오지않은 이벤트들)를 반환하여 소비시점으로부터 오랜 시간이 지난 이벤트들을 실패 이벤트로 규정 및 처리하도록 구현하였습니다. 하지만 이 방법에는 문제가 있습니다. consumer-1,2,3 이 있을 때 한번 스케줄러가 시작되면 변수없이 linear 하게 순서가 정해진다는 것입니다. consumer-1 가 먼저 실행되면 항상 consumer-1 은 첫 번째로 실행된다는 거죠. 대부분의 경우 문제없지만 아래의 경우 문제가 발생합니다.

  • consumer-1: 항상 1초 타이밍에 정확히 요청
  • consumer-2: 1.1초에 요청
  • consumer-3: 1.2초에 요청

어떤 문제가 있는지 보기 편하게 좀 더 편하게 도식화해볼게요.

a

초록색 consumer의 지분이 매우 많은게 눈에 보이죠? 이벤트가 일정하게 수신된다는 가정하에 consumer-1은 80% 실패 이벤트를 맡아서 처리하고 나머지는 각각 10%씩 맡습니다. 그리고 이는 consumer 간 최초 시작 시간이 가까울 수록 더 크게 불균형해집니다. 운이 나쁘면 consumer-1 이 99.99…% 의 실패 이벤트를 처리할 수 있다는 것이죠.

이 문제를 두 가지 방법으로 풀 수 있습니다.

  1. consumer 별 (1 ~ 1.x초) 랜덤한 시간을 주기로 가지도록.
    • 간편하게 처리할 수 있습니다.
    • 완전균등분배는 어렵습니다.
  2. min(hash(event-id + consumer-id)) 인 consumer 가 해당 이벤트 처리
    • XINFO GROUPS 를 통해 특정 Consumer Group의 last-delivered-id를 얻고, 해당 ID 이후의 메시지 ID 목록을 쿼리.
    • consumer 들은 현재 살아있는 consumer-id 리스트를 받아와서 “그래서 이 이벤트는 누가 처리할건데?”를 위의 식으로 알 수 있어야합니다.
    • 이벤트 중복 consume 을 막아야합니다.
    • 모든 이벤트가 균등하게 분산됩니다.

간단하게 2의 순서를 나타내면 아래와 같습니다.

  • 이벤트 재처리: XPENDING → aliveConsumerIds.minByOrNull { hash(eventId + it) } == myConsumerId → XREADGROUP → XACK

재처리가 아닌 일반 소비는 아래처럼 설정하면 XREADGROUP → 수신 → XREADGROUP → … 반복하여 균일한 이벤트 소비가 가능합니다.

private fun initListener() {
    val listenerContainer = StreamMessageListenerContainer.create(
        redisTemplate.connectionFactory,
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
            .targetType(String::class.java)
            .pollTimeout(Duration.ofMillis(streamConfig.pollTimeoutMs))
            .build(),
    )
    listenerContainer.receive(
        Consumer.from(streamConfig.consumerGroupName, PodContext.id),
        StreamOffset.create(streamConfig.streamKey, ReadOffset.lastConsumed()),
        listener,
    )

    listenerContainer.start()
}

그리고 이벤트 재처리 시 중복처리방지가 있어야합니다. 새로운 consumer 가 등장할 경우 이벤트를 중복해서 처리할 수 있기 때문입니다.

예시를 들어볼까요? consumer-1이 XPENDING 으로 실패한 이벤트들을 가져옵니다. 그리고 aliveConsumerIds.minByOrNull { hash(eventId + it) } == myConsumerId 조건을 만족해 event-a를 처리하게 됐습니다. 그런데 이벤트를 처리하는 도중, consumer-2가 새로 등록되면서 aliveConsumerIds에 추가되고 동일한 로직을 수행하면, 같은 event-a에 대해 본인이 처리담당이라고 판단하게 될 수 있습니다.

즉, 새로운 consumer가 추가될 경우 중복처리될 가능성이 있기때문에 consumer-1 은 이벤트를 실제로 소비하기이전 lock 을 걸고 소비해서 중복소비를 제한해야합니다.

3. DLQ

PEL 에서는 이벤트가 몇 번 소비되었는지 함께 저장하고 있습니다. 만약 소비 횟수가 threshold 를 넘어가도 계속해서 놔두면 무한대로 소비되기떄문에 비정상 이벤트를 따로 빼서 처리를 해줘야겠죠! 이를 일단 ack 날리고 DLQ 에 따로 빼서 수동보정을 하든, 로깅을 따로 영구적으로 하든 처리를 해야합니다.

4. consumer 관리

Redis Stream 은 Kafka 처럼 consumer 상태 추적기능을 제공하지 않습니다. 그리고 저는 consumer들에게 이벤트 균등분배를 위해 동기화된 consumer 리스트가 필요하죠. 두 가지 방식으로 동기화시킬 수 있습니다.

  1. XINFO CONSUMERS 로 consumer 리스트 관리
  2. 직접 리스트 관리 & 동기화

Redis stream 에는 XINFO CONSUMERS 명령어로 그룹 내 consumer 들을 확인할 수 있습니다. 하지만 해당 기능으로 consumer 관리하기 별로 좋지 않습니다. 이유가 무엇일까요?

XINFO CONSUMERS 로 consumer 을 가져와서 죽은 consumer 을 처리할 때 판단방식은 idle_time 이 일정주기 이상인 경우를 죽은 노드라고 판단 후 제거하게됩니다. 문제는 이벤트 자체가 없을 경우에도 idle_time이 증가한다는 것입니다. 그래서 단지 이벤트가 없을 뿐인데 멀쩡한 consumer를 제거해버립니다.

그래서 저는 따로 관리를 하였습니다. consumer 들은 일정 주기로 자신의 id 를 key + ttl 로 등록 및 set 에 추가하도록 말이죠. 그리고 set 에 sadd 합니다. 그래서 set 보고 key 확인 후 ttl 만료되서 사라졌으면 set 에서 제거하고 key 존재하면 살아있는 노드라고 판단하는거죠. set 은 내부 value 에 ttl 설정이 안되서 key 로 ttl 설정을 따로 해주었습니다. key 를 {streamKey}:alive:nodes:* 와일드카드로 검색할 순 있긴 한데 아시다시피 인덱싱 되어있지 않아서 풀스캔을 해버리겠죠? 그래서 set 을 따로 두었습니다. 또한 주기적으로 동기화 시켜주는 스케줄러 만들고 추가로 XINFO 내 consumer 이랑도 동기화시켜 주어야합니다. consumer 가 무한정 쌓이기 때문이죠.

그래서 실패 이벤트 재처리 종합 플로우는 아래와 같습니다(머메이드로 작성해봤는데 되게 편하네요).

flow

5. 응용

이것저것 만져보니 응용도 가능했습니다. 각 Pod에서 자신과 연결된 WebSocket 세션에 대해 일괄적으로 이벤트를 전송하는 기능도 구현할 수 있었습니다.

a

client-A1 이 요청을 전송하면 서버가 처리하여 stream 으로 이벤트를 전송합니다. 이후 해당 stream 을 개별 중복 소비해서 처리하도록 하면! 각각의 서버들은 자신과 연결된 웹소켓에 메세지를 뿌릴 수 있게 됩니다.

저희는 다중 클라이언트에게 웹소켓을 통한 동일 메세지 전송 지원을 위해 위와 같은 방식으로 구현하였습니다.

후기

프레임워크나 외부 도구에 의존하지 않고, raw하게 구조를 설계하고 자동화하며, 문제를 하나씩 뜯어보며 해결하는 과정이 제가 선호하는 방식이었고(너무 낮은 레벨만 아니라면) 덕분에 정말 몰입해서 작업할 수 있었습니다.

물론 Redis는 Kafka처럼 파티션이나 디스크 기반의 안정적인 메시지 보관 기능이 없고, 리밸런싱이나 consumer 추적도 직접 구현해야 했습니다. 휘발성이 강하다는 단점도 있죠. 그렇지만 알림이나 이벤트 전송처럼 비교적 가벼운 메시지 처리에는 Redis Stream이 훨씬 가볍고 빠르게 대응할 수 있습니다. 무엇보다도, 직접 설계하고 구현하는 과정을 통해 시스템이 어떻게 작동해야 하는지에 대해 더 깊이 이해할 수 있었던 좋은 기회였습니다.

정리하자면, 이 프로젝트를 통해 한정된 자원 안에서 “메시징 시스템이 어떻게 작동해야 하는가?” 를 구조적으로 고민해볼 수 있어서 좋은 경험이였습니다.