- spring data redis 에 아직 xAckDel 이 반영이 안되어있어서 유틸 만들어서 추가하였습니다.
fun StringRedisTemplate.xAckDel(
topic: String,
groupName: String,
messageIds: List<String>
): Map<String, XAckDelResult> {
val args = mutableListOf<ByteArray>()
args += topic.toByteArray()
args += groupName.toByteArray()
args += "DELREF".toByteArray()
args += "IDS".toByteArray()
args += messageIds.size.toString().toByteArray()
messageIds.forEach { args += it.toByteArray() }
val raw = this.execute { connection ->
connection.execute("XACKDEL", *args.toTypedArray())
}
val results = when (raw) {
is List<*> -> raw.map {
when (it) {
is Long -> XAckDelResult.from(it.toInt())
is ByteArray -> XAckDelResult.from(String(it).toInt())
else -> XAckDelResult.NOT_EXISTS
}
}
else -> List(messageIds.size) { XAckDelResult.NOT_EXISTS }
}
return messageIds
.zip(results)
.associate { (id, result) ->
id to result
}
}
그래도 결국 extension 함수로 만들어도 lettuce 에서 응답파싱할 때 XACKDEL 응답을 뭘로 파싱해야하는지 몰라서 에러가 납니다. 그래서 기존과 동일하게 파이프라인으로 xack + xdel 을 호출하는게 현재로선 최선인 것 같습니다!
아직 (spring-data-redis 4.1)[https://docs.spring.io/spring-data-redis/reference/4.1-SNAPSHOT/api/java/org/springframework/data/redis/connection/ReactiveClusterStreamCommands.html] 에서 XACKDEL 명령어 지원이 없어서 추후 반영되면 그 때 사용하겠습니다.
번외로 docker 에서 Grafana 를 실행중이라 redis cluster 연결 자체가 어려웠습니다. 물론 aws ec2 에 고정 ip 할당받고 openVPN 깔고 설정 한 뒤 vpn 터널링하면 가능합니다. 재직중이였던 회사에서 썼던 방법이기도 했지만 비용 문제로 redis cluster 와 마찬가지로 로컬에 grafana 를 띄워서 redis cluster 에 연결하는 방식을 택했습니다. 한정된 자원 안에서 여러가지를 시도해보는거죠.
Grafana Redis Cluster 대시보드

원래라면 aws redis cluster 쓰지만 grafana 에서 한 눈에 볼 수 있도록 설정했습니다. stream 메세지 처리량 관측은 작업중입니다. redis exporter 로 주기적으로 redis 관련 메트릭을 prometheus 로 쏘아주고 grafana 로 시각화했습니다. 이 대시보드로 특정 노드에 단일 stream 을 할당하고 부하가 걸리는 것을 관측할 예정입니다. 이후 샤딩을 통해 부하분산을 시도할려고 합니다.
메트릭 테스트


Grafana 에서 쓸 수 있도록 group 별 json 묶어서 개별 row 출력하는 LUA
현재 시간 호출 후에 경과시간 계산
EVAL "
local stream = KEYS[1]
-- 스트림 최신 ID 시간 추출
local sinfo = redis.call('XINFO','STREAM',stream)
local last_generated
for i=1,#sinfo,2 do
if sinfo[i] == 'last-generated-id' then
last_generated = sinfo[i+1]
break
end
end
local gen_ms = tonumber(string.match(last_generated,'(%d+)%-%d+'))
-- 이걸로 시간계산
local function format_duration_ms(ms)
if ms < 1000 then
return string.format('%dms', ms)
end
local sec = math.floor(ms / 1000)
local rem_ms = ms % 1000
if sec < 60 then
return string.format('%ds %dms', sec, rem_ms)
elseif sec < 3600 then
return string.format('%dm %ds',
math.floor(sec / 60),
sec % 60
)
else
return string.format('%dh %dm %ds',
math.floor(sec / 3600),
math.floor((sec % 3600) / 60),
sec % 60
)
end
end
local groups = redis.call('XINFO','GROUPS',stream)
local result = {}
for _,g in ipairs(groups) do
local m = {}
for i=1,#g,2 do m[g[i]] = g[i+1] end
local entry = { group = m['name'] }
local last = m['last-delivered-id']
if last and last ~= '0-0' then
local del_ms = tonumber(string.match(last,'(%d+)%-%d+'))
local lag_ms = math.max(0, gen_ms - del_ms)
entry.last_delivered_id = last
entry.lag = format_duration_ms(lag_ms)
entry.lag_ms = lag_ms
else
entry.last_delivered_id = nil
entry.lag = 'never'
entry.lag_ms = -1
end
table.insert(result, cjson.encode(entry))
end
return result
" 1 summary:1
이렇게 읽어오고 Grafana 에서 json 파싱해서 row 별로 출력하면 됩니다. 그러면 결과물은!
