Skip to main content Link Menu Expand (external link) Document Search Copy Copied
  • spring data redis 에 아직 xAckDel 이 반영이 안되어있어서 유틸 만들어서 추가하였습니다.
fun StringRedisTemplate.xAckDel(
    topic: String,
    groupName: String,
    messageIds: List<String>
): Map<String, XAckDelResult> {

    val args = mutableListOf<ByteArray>()
    args += topic.toByteArray()
    args += groupName.toByteArray()
    args += "DELREF".toByteArray()
    args += "IDS".toByteArray()
    args += messageIds.size.toString().toByteArray()
    messageIds.forEach { args += it.toByteArray() }

    val raw = this.execute { connection ->
        connection.execute("XACKDEL", *args.toTypedArray())
    }

    val results = when (raw) {
        is List<*> -> raw.map {
            when (it) {
                is Long -> XAckDelResult.from(it.toInt())
                is ByteArray -> XAckDelResult.from(String(it).toInt())
                else -> XAckDelResult.NOT_EXISTS
            }
        }
        else -> List(messageIds.size) { XAckDelResult.NOT_EXISTS }
    }

    return messageIds
        .zip(results)
        .associate { (id, result) ->
            id to result
        }
}

그래도 결국 extension 함수로 만들어도 lettuce 에서 응답파싱할 때 XACKDEL 응답을 뭘로 파싱해야하는지 몰라서 에러가 납니다. 그래서 기존과 동일하게 파이프라인으로 xack + xdel 을 호출하는게 현재로선 최선인 것 같습니다!

아직 (spring-data-redis 4.1)[https://docs.spring.io/spring-data-redis/reference/4.1-SNAPSHOT/api/java/org/springframework/data/redis/connection/ReactiveClusterStreamCommands.html] 에서 XACKDEL 명령어 지원이 없어서 추후 반영되면 그 때 사용하겠습니다.

번외로 docker 에서 Grafana 를 실행중이라 redis cluster 연결 자체가 어려웠습니다. 물론 aws ec2 에 고정 ip 할당받고 openVPN 깔고 설정 한 뒤 vpn 터널링하면 가능합니다. 재직중이였던 회사에서 썼던 방법이기도 했지만 비용 문제로 redis cluster 와 마찬가지로 로컬에 grafana 를 띄워서 redis cluster 에 연결하는 방식을 택했습니다. 한정된 자원 안에서 여러가지를 시도해보는거죠.

Grafana Redis Cluster 대시보드

123

원래라면 aws redis cluster 쓰지만 grafana 에서 한 눈에 볼 수 있도록 설정했습니다. stream 메세지 처리량 관측은 작업중입니다. redis exporter 로 주기적으로 redis 관련 메트릭을 prometheus 로 쏘아주고 grafana 로 시각화했습니다. 이 대시보드로 특정 노드에 단일 stream 을 할당하고 부하가 걸리는 것을 관측할 예정입니다. 이후 샤딩을 통해 부하분산을 시도할려고 합니다.