- spring data redis 에 아직 xAckDel 이 반영이 안되어있어서 유틸 만들어서 추가하였습니다.
fun StringRedisTemplate.xAckDel(
topic: String,
groupName: String,
messageIds: List<String>
): Map<String, XAckDelResult> {
val args = mutableListOf<ByteArray>()
args += topic.toByteArray()
args += groupName.toByteArray()
args += "DELREF".toByteArray()
args += "IDS".toByteArray()
args += messageIds.size.toString().toByteArray()
messageIds.forEach { args += it.toByteArray() }
val raw = this.execute { connection ->
connection.execute("XACKDEL", *args.toTypedArray())
}
val results = when (raw) {
is List<*> -> raw.map {
when (it) {
is Long -> XAckDelResult.from(it.toInt())
is ByteArray -> XAckDelResult.from(String(it).toInt())
else -> XAckDelResult.NOT_EXISTS
}
}
else -> List(messageIds.size) { XAckDelResult.NOT_EXISTS }
}
return messageIds
.zip(results)
.associate { (id, result) ->
id to result
}
}
그래도 결국 extension 함수로 만들어도 lettuce 에서 응답파싱할 때 XACKDEL 응답을 뭘로 파싱해야하는지 몰라서 에러가 납니다. 그래서 기존과 동일하게 파이프라인으로 xack + xdel 을 호출하는게 현재로선 최선인 것 같습니다!
아직 (spring-data-redis 4.1)[https://docs.spring.io/spring-data-redis/reference/4.1-SNAPSHOT/api/java/org/springframework/data/redis/connection/ReactiveClusterStreamCommands.html] 에서 XACKDEL 명령어 지원이 없어서 추후 반영되면 그 때 사용하겠습니다.
번외로 docker 에서 Grafana 를 실행중이라 redis cluster 연결 자체가 어려웠습니다. 물론 aws ec2 에 고정 ip 할당받고 openVPN 깔고 설정 한 뒤 vpn 터널링하면 가능합니다. 재직중이였던 회사에서 썼던 방법이기도 했지만 비용 문제로 redis cluster 와 마찬가지로 로컬에 grafana 를 띄워서 redis cluster 에 연결하는 방식을 택했습니다. 한정된 자원 안에서 여러가지를 시도해보는거죠.
Grafana Redis Cluster 대시보드

원래라면 aws redis cluster 쓰지만 grafana 에서 한 눈에 볼 수 있도록 설정했습니다. stream 메세지 처리량 관측은 작업중입니다. redis exporter 로 주기적으로 redis 관련 메트릭을 prometheus 로 쏘아주고 grafana 로 시각화했습니다. 이 대시보드로 특정 노드에 단일 stream 을 할당하고 부하가 걸리는 것을 관측할 예정입니다. 이후 샤딩을 통해 부하분산을 시도할려고 합니다.