Skip to main content Link Menu Expand (external link) Document Search Copy Copied

redis 8.2 버전에선 lag 데이터를 같이 전달해주지만, 최신의 spring data redis 에선(3.5.x or 4.0.x) lag 정보를 파싱해주지 않아서 직접 lua script 로 lag 정보를 가져온 뒤 파싱해주어야합니다.

spring data redis 4.0.x lag 파싱 미지원에 대한 대응으로 lua script 실행 및 직접 json 파싱

127.0.0.1:9001> XINFO GROUPS summary:1
-> Redirected to slot [8259] located at 127.0.0.1:9002
1)  1) "name"
    2) "summary-consumer-group"
    3) "consumers"
    4) (integer) 35
    5) "pending"
    6) (integer) 4000
    7) "last-delivered-id"
    8) "1767094106027-0"
    9) "entries-read"
   10) (integer) 6061
   11) "lag"
   12) (integer) 0

redis 8.2 버전에선 lag 데이터를 같이 전달해주지만, 최신의 spring data redis 에선(3.5.x or 4.0.x) lag 정보를 파싱해주지 않아서 직접 lua script 로 lag 정보를 가져온 뒤 파싱해주어야합니다.

local stream = KEYS[1]
local groups = redis.call('XINFO','GROUPS',stream)
local result = {}
for _, group in ipairs(groups) do
    local groupInfo = {}
    
    -- group은 [key1, value1, key2, value2, ...] 구조로 반환받아서
    -- i를 2씩 증가시키며 key/value 쌍으로 변환하면 됩니다.
    for i = 1, #group, 2 do 
    local key = group[i]
    local value = group[i + 1]
    groupInfo[key] = value
    end
    table.insert(result, {
    name = groupInfo['name'],
    lag = groupInfo['lag'],
    pending = groupInfo['pending'],
    consumers = groupInfo['consumers']
    })
end
return cjson.encode(result)

이제 info groups 의 lag 정보도 같이 API 에 담아서 전달해서 보면 아래와 같이 group 정보 뿐만 아니라 lag, pending 도 같이 확인할 수 있습니다.

모니터링 하면서 알아냈는데 redis stream 은 consumer 자동정리를 지원하지 않습니다. 그래서 consumer 와 연결 끊어졌을 때 주기적으로 정리해주는 로직이 필요합니다. 이 부분도 추후에 개선해봐야겠네요.

그리고 이제는 프로메테우스에서 주기적으로 해당 API를 찌르면서 lag, pending 등의 정보들을 시계열로 보여줄 수 있도록 해야겠습니다! 백엔드만 하다가 대시보드 작업하니까 완전 새롭고 재밌어서 좋네요.

그리고 최신순으로 볼 수 있도록 xRange -> xRevRange 로 API 변경하였습니다.

그리고 10초마다 spring actuator 의 prometheus endpoint 에서 메트릭 수집하도록 스케줄러로 데이터 갱신 작업을 추가했고 이를 grafana 로 시각화 했더니 아래와 같이 pending, lag, consuemr 등을 한 눈에 볼 수 있었습니다.