Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Spring Boot Data Redis 3.5.6 XAUTOCLAIM 미지원으로 인한 lua 구현

https://github.com/spring-projects/spring-data-redis/issues/2316 2022년부터 xautoclaim 지원 요청이 있었지만 아직 지원하고 있지 않습니다. XAUTOCLAIM 은 메세지 재처리를 위해 PEL 내 메세지를 소비할 때 중복처리를 막기 위한 정말 중요한 기능인데 지원하지 않으니 직접 lua 스크립트를 작성하여 구현하였습니다.

local stream = KEYS[1]
local group = ARGV[1]
local min_idle_time = tonumber(ARGV[2])
local consumer = ARGV[2]

-- XAUTOCLAIM을 사용하여 idle 메시지 자동 재처리
local result = redis.call('XAUTOCLAIM', stream, group, consumer, min_idle_time, '0-0')
return result[2]

이렇게 작성해 놓고 아래와 같이 lua 스크립트랑 변수 넣어서 전송하면!

val result: String? = redisTemplate.execute { conn ->
    conn.eval(
        script.toByteArray(),
        org.springframework.data.redis.connection.ReturnType.VALUE,
        1,
        streamKey.toByteArray(),
        groupName.toByteArray(),
        "60000".toByteArray()  // 60초 이상 idle인 메시지 재처리
    ) as? String
}

알고보니 Spring Boot Data Redis 3.5.6 말고 Lettuce 6.6.0 XAUTOCLAIM 지원되고 있었음

다 구현하고 정상동작 확인 후 정말 지원하고있지않나 다시 찾아봤는데 Spring data redis 3.5.4 streamCommand() 에서는 지원하고 있지 않고 lettuce core 6.6.0 에서 지원하고있네요. connection 가져와서 autoClaim 전송하면 됩니다.

    private fun autoClaim(streamKey: String,
                        groupName: String,
                        consumerName: String,
                        count: Long,): ClaimedMessages<ByteArray, ByteArray> {
    return redisTemplate.execute { conn ->
        val native = conn.nativeConnection
        val commands = native as RedisAdvancedClusterAsyncCommands<ByteArray, ByteArray>

        val consumer = Consumer.from(groupName.toByteArray(), consumerName.toByteArray())
        val args = XAutoClaimArgs.Builder.xautoclaim(consumer, MIN_IDLE, "0-0").count(count)

        commands.xautoclaim(streamKey.toByteArray(), args).get()
    }!!

결론적으로 Lua로 우회 구현할 필요 없이, native Lettuce command를 직접 호출하는 게 더 깔끔한 선택이었습니다.

생각보다 lua script 를 자주 쓰게 되는데, 크기가 큰 lua script 를 계속 네트워크로 전송하게 되면 오버헤드가 발생할 수 있습니다. 이럴 땐 lua script 저장 실행 Redis의 Lua script 저장 후 실행하는 EVALSHA 방식을 쓰면 오버헤드를 줄일 수 있습니다. 지금 단계에서는 병목이 아니지만, 성능 이슈가 생기면 그때 적용할 예정입니다.

Pending 추가명령어로 Delivery count 체크 및 최대 재처리 횟수 초과 시 CDL 전달

claim messages 에는 delivery count 가 없습니다. pending 명령어 반환값에는 있죠. 그래서 xautoclaim 한 후 해당 id 를 pending 명령어로 다시 조회해서 delivery count 를 가져와서 일정횟수 초과하면 CDL 에 전달해서 처리하도록 구현이 필요합니다.

MAX_RETRY_COUNT = 5 로 잡고 이를 넘어가면 CDL 로 넘어가도록 구현하였습니다. summary:1 에 존재하던 악질 5개 메세지들을 summary:1:CDL 에 밀어넣고 천천히 시도하도록 설정하였습니다. 이렇게 안빼주면 안에서 계속 무한 재처리 시도되니 이렇게 따로 빼놓는 작업이 필수입니다! 그런데 CDL 스트림까지 추가되니 모니터링 해야할 게 너무 늘어나서 또 다시 정리가 필요할 것 같습니다.