Skip to main content Link Menu Expand (external link) Document Search Copy Copied
  • spring data redis 에 아직 xAckDel 이 반영이 안되어있어서 유틸 만들어서 추가하였습니다.
fun StringRedisTemplate.xAckDel(
    topic: String,
    groupName: String,
    messageIds: List<String>
): Map<String, XAckDelResult> {

    val args = mutableListOf<ByteArray>()
    args += topic.toByteArray()
    args += groupName.toByteArray()
    args += "DELREF".toByteArray()
    args += "IDS".toByteArray()
    args += messageIds.size.toString().toByteArray()
    messageIds.forEach { args += it.toByteArray() }

    val raw = this.execute { connection ->
        connection.execute("XACKDEL", *args.toTypedArray())
    }

    val results = when (raw) {
        is List<*> -> raw.map {
            when (it) {
                is Long -> XAckDelResult.from(it.toInt())
                is ByteArray -> XAckDelResult.from(String(it).toInt())
                else -> XAckDelResult.NOT_EXISTS
            }
        }
        else -> List(messageIds.size) { XAckDelResult.NOT_EXISTS }
    }

    return messageIds
        .zip(results)
        .associate { (id, result) ->
            id to result
        }
}

그래도 결국 extension 함수로 만들어도 lettuce 에서 응답파싱할 때 XACKDEL 응답을 뭘로 파싱해야하는지 몰라서 에러가 납니다. 그래서 기존과 동일하게 파이프라인으로 xack + xdel 을 호출하는게 현재로선 최선인 것 같습니다!

아직 (spring-data-redis 4.1)[https://docs.spring.io/spring-data-redis/reference/4.1-SNAPSHOT/api/java/org/springframework/data/redis/connection/ReactiveClusterStreamCommands.html] 에서 XACKDEL 명령어 지원이 없어서 추후 반영되면 그 때 사용하겠습니다.

번외로 docker 에서 Grafana 를 실행중이라 redis cluster 연결 자체가 어려웠습니다. 물론 aws ec2 에 고정 ip 할당받고 openVPN 깔고 설정 한 뒤 vpn 터널링하면 가능합니다. 재직중이였던 회사에서 썼던 방법이기도 했지만 비용 문제로 redis cluster 와 마찬가지로 로컬에 grafana 를 띄워서 redis cluster 에 연결하는 방식을 택했습니다. 한정된 자원 안에서 여러가지를 시도해보는거죠.

Grafana Redis Cluster 대시보드

123

원래라면 aws redis cluster 쓰지만 grafana 에서 한 눈에 볼 수 있도록 설정했습니다. stream 메세지 처리량 관측은 작업중입니다. redis exporter 로 주기적으로 redis 관련 메트릭을 prometheus 로 쏘아주고 grafana 로 시각화했습니다. 이 대시보드로 특정 노드에 단일 stream 을 할당하고 부하가 걸리는 것을 관측할 예정입니다. 이후 샤딩을 통해 부하분산을 시도할려고 합니다.

메트릭 테스트

2

3

Grafana 에서 쓸 수 있도록 group 별 json 묶어서 개별 row 출력하는 LUA

현재 시간 호출 후에 경과시간 계산

EVAL "
local stream = KEYS[1]

-- 스트림 최신 ID 시간 추출
local sinfo = redis.call('XINFO','STREAM',stream)
local last_generated
for i=1,#sinfo,2 do
  if sinfo[i] == 'last-generated-id' then
    last_generated = sinfo[i+1]
    break
  end
end
local gen_ms = tonumber(string.match(last_generated,'(%d+)%-%d+'))

-- 이걸로 시간계산
local function format_duration_ms(ms)
  if ms < 1000 then
    return string.format('%dms', ms)
  end

  local sec = math.floor(ms / 1000)
  local rem_ms = ms % 1000

  if sec < 60 then
    return string.format('%ds %dms', sec, rem_ms)
  elseif sec < 3600 then
    return string.format('%dm %ds',
      math.floor(sec / 60),
      sec % 60
    )
  else
    return string.format('%dh %dm %ds',
      math.floor(sec / 3600),
      math.floor((sec % 3600) / 60),
      sec % 60
    )
  end
end

local groups = redis.call('XINFO','GROUPS',stream)
local result = {}

for _,g in ipairs(groups) do
  local m = {}
  for i=1,#g,2 do m[g[i]] = g[i+1] end

  local entry = { group = m['name'] }
  local last = m['last-delivered-id']

  if last and last ~= '0-0' then
    local del_ms = tonumber(string.match(last,'(%d+)%-%d+'))
    local lag_ms = math.max(0, gen_ms - del_ms)

    entry.last_delivered_id = last
    entry.lag = format_duration_ms(lag_ms)
    entry.lag_ms = lag_ms
  else
    entry.last_delivered_id = nil
    entry.lag = 'never'
    entry.lag_ms = -1
  end

  table.insert(result, cjson.encode(entry))
end

return result
" 1 summary:1

이렇게 읽어오고 Grafana 에서 json 파싱해서 row 별로 출력하면 됩니다. 그러면 결과물은!

1