redis 8.2 버전에선 lag 데이터를 같이 전달해주지만, 최신의 spring data redis 에선(3.5.x or 4.0.x) lag 정보를 파싱해주지 않아서 직접 lua script 로 lag 정보를 가져온 뒤 파싱해주어야합니다.
spring data redis 4.0.x lag 파싱 미지원에 대한 대응으로 lua script 실행 및 직접 json 파싱
127.0.0.1:9001> XINFO GROUPS summary:1
-> Redirected to slot [8259] located at 127.0.0.1:9002
1) 1) "name"
2) "summary-consumer-group"
3) "consumers"
4) (integer) 35
5) "pending"
6) (integer) 4000
7) "last-delivered-id"
8) "1767094106027-0"
9) "entries-read"
10) (integer) 6061
11) "lag"
12) (integer) 0
redis 8.2 버전에선 lag 데이터를 같이 전달해주지만, 최신의 spring data redis 에선(3.5.x or 4.0.x) lag 정보를 파싱해주지 않아서 직접 lua script 로 lag 정보를 가져온 뒤 파싱해주어야합니다.
local stream = KEYS[1]
local groups = redis.call('XINFO','GROUPS',stream)
local result = {}
for _, group in ipairs(groups) do
local groupInfo = {}
-- group은 [key1, value1, key2, value2, ...] 구조로 반환받아서
-- i를 2씩 증가시키며 key/value 쌍으로 변환하면 됩니다.
for i = 1, #group, 2 do
local key = group[i]
local value = group[i + 1]
groupInfo[key] = value
end
table.insert(result, {
name = groupInfo['name'],
lag = groupInfo['lag'],
pending = groupInfo['pending'],
consumers = groupInfo['consumers']
})
end
return cjson.encode(result)
이제 info groups 의 lag 정보도 같이 API 에 담아서 전달해서 보면 아래와 같이 group 정보 뿐만 아니라 lag, pending 도 같이 확인할 수 있습니다.

모니터링 하면서 알아냈는데 redis stream 은 consumer 자동정리를 지원하지 않습니다. 그래서 consumer 와 연결 끊어졌을 때 주기적으로 정리해주는 로직이 필요합니다. 이 부분도 추후에 개선해봐야겠네요.
그리고 이제는 프로메테우스에서 주기적으로 해당 API를 찌르면서 lag, pending 등의 정보들을 시계열로 보여줄 수 있도록 해야겠습니다! 백엔드만 하다가 대시보드 작업하니까 완전 새롭고 재밌어서 좋네요.
그리고 최신순으로 볼 수 있도록 xRange -> xRevRange 로 API 변경하였습니다.
그리고 10초마다 spring actuator 의 prometheus endpoint 에서 메트릭 수집하도록 스케줄러로 데이터 갱신 작업을 추가했고 이를 grafana 로 시각화 했더니 아래와 같이 pending, lag, consuemr 등을 한 눈에 볼 수 있었습니다.
