Intro
기존에 저희는 Mysql 를 main db 로, 멀티프렌차이즈를 목표로 운영하다보니 여러 다양한 스키마들이 필요했고 이를 위한 Mongodb 를 secondary db 로 운영하고 있었습니다. 그리고 신규 피처로 대량의 회원추출을 진행하기 위해 mysql slave 를 보는 것이 아닌, cdc 를 통해서 Mongodb 에 회원정보를 upsert 하는 작업을 진행하고 있습니다. 이 과정에서 여러가지 조건들의 테이블을 단일 docs 로 합치게 된다면 mysql 에서 처럼 join 굳이 안해도 되기때문에 빠르게 조회가 가능해 집니다.
그래서 Debezium server 을 사용하여 Mysql binlog 를 cdc 하고, 이를 Redis Stream 으로 보내고, 별도 sink pod 에서 Redis Stream 해당 토픽 읽고 Mongodb 에 upsert 하는 구조로 진행하고 있습니다.
OLAP 에는 https://medium.com/@simeon.emanuilov/mongodb-vs-clickhouse-for-olap-1e430a40ade8 ClickHouse 가 Big Data Query 에 좀 더 좋다고는 하네요. 예로 createdAt 같은 칼럼을 상대로 10월 데이터 조회하면 ClickHouse 가 더 빠르다네요. |
이 과정을 정리되지 않은 기록으로 우선 남깁니다.
Debezium Server CDC
- kafka 없이 일반 debezium server 로 진행
- reference: https://debezium.io/documentation/reference/3.2
- https://debezium.io/documentation/reference/stable/operations/debezium-server.html
공식문서에서 보면 debezium.sink.type 설정이 aws 키네시스밖에 없어서 다른거 확인 중… stream 을 꼭 써야하나? kafka 말고 redis stream 도 지원하긴 하는데 source/sink 가 stream -> mongodb 있는지 봐야함.
- kafka stream → mongodb sink connector 은 있고 redis stream → mongodb sink connector 은 없음… 별도의 pod 를 띄워서 redis stream key 읽고 mongodb 에 upsert 하는 로직을 작성해야할 것 같다.
https://tech.kakaopay.com/post/kakaopaysec-mongodb-cdc/#4-consumer-group-생성-및-offset-설정 카카오페이에서는 kafka sink connector 사용함. 우리는 redis stream 쓰기때문에 별도로 만들어야함.
- debezium server → redis stream 설정값 공식 reference
- https://debezium.io/documentation/reference/2.7/operations/debezium-server.html#_redis_stream
- django 통해서 postgresql → mongodb 직접 sink 한 오픈소스. 공식으로 지원하는게 아니라 그냥 참고용. https://github.com/maqboolthoufeeq/cdc_debezium
- https://dev.to/maqboolthoufeeq/change-data-capture-cdc-with-debezium-server-no-kafka-django-postgres-mongodb-example-m0h
Debezium Server
- Mysql → Debezium Server → Stream 의 프로세스 소개
- Snapshots
- Debezium Server 가 동작할 때 현재 SQL server 상태의 스냅샷을 찍음. default가 INITIAL
- recovery, custom, initial_only, etc.
- 그래서 어느 테이블을 Capture 할거야? 정할 수 있음.
table.include.list
파라미터 값으로
- insert sql 테스트. insert 후 1 증가 확인
- ghkdqhrbals@ip-172-30-1-90 debezium-server % redis-cli –json XREVRANGE mysql_sink.gyumin.users + - COUNT 1
| jq -r ‘.[0][1][1] | fromjson | .payload.after.id’ 30001 ghkdqhrbals@ip-172-30-1-90 debezium-server % redis-cli –json XREVRANGE mysql_sink.gyumin.users + - COUNT 1
| jq -r ‘.[0][1][1] | fromjson | .payload.after.id’ 30002
- ghkdqhrbals@ip-172-30-1-90 debezium-server % redis-cli –json XREVRANGE mysql_sink.gyumin.users + - COUNT 1
- 페이로드 확인
- payload(insert 는 before 없고 update 는 before 존재. + db & table 묶고 분기하여 mongodb upsert 하면 될 듯)
payload
... "payload": { "before": null, "after": { "id": 30002, "name": "테스트용 유저", // ... }, "source": { "version": "3.2.0.Final", "connector": "mysql", "name": "mysql_sink", "ts_ms": 1755138551000, "snapshot": "false", // 이거는 initial 단계에서 추가된 게 아니라 이후 cdc 한거임. "db": "gyumin", "sequence": null, "ts_us": 1755138551000000, "ts_ns": 1755138551000000000, "table": "users", "server_id": 1, "gtid": null, "file": "binlog.000002", "pos": 7114433, "row": 0, "thread": 385, "query": null }, "transaction": null, "op": "c", // c(insert). u(update), d(delete), r(read), t(truncate) // 초기에 db 읽어와서 redis stream 로드시킬 떄 r 로 들어감. "ts_ms": 1755138551799, "ts_us": 1755138551799549, "ts_ns": 1755138551799549000 }
source binload 세팅
// source binload 세팅 "source": { "version": "3.2.0.Final", // 2025.08.17 에 stable 한 최신 버전임. "connector": "mysql", "name": "mysql_sink", "ts_ms": 1755138551000, // 소스 이벤트 시각(ms). MySQL의 binlog 이벤트 타임스탬프 기반 "snapshot": "false", // 스냅샷 여부: true(기존 db 에 있던 rows 읽어올 때), false(실시간 cdc) "db": "gyumin", "sequence": null, "ts_us": 1755138551000000, // binlog 해당 이벤트 시간 "ts_ns": 1755138551000000000, "table": "users", // cdc 한 테이블 "server_id": 1, // mysql server id. 이거 값을 config 에서 우리가 설정가능 "gtid": null, // GTID 모드 사용 시 해당 트랜잭션의 GTID. 비활성화면 null "file": "binlog.000002", // 이벤트를 담고 있는 binlog 파일명 "pos": 7114433, // 위 binlog 파일 내 오프셋. debezium server 재시작하면 여기서부터 다시 cdc 함. "row": 0, // insert values 2개면 row 0, 1 로 나뉨. "thread": 385, // mysql db 세션의 id "query": null // ddl 쿼리. dml 에서는 null 로 표시함. 이거는 insert 한거라 null }
update 시 페이로드 체크.
전체 row 변경점이 기록되기때문에 upsert 치기 좋긴 함. 변경된 부분만 binlog 에 기록되는게 아닌. mysql 에선
SHOW VARIABLES LIKE 'binlog_row_image';
이거 FULL 이면 전/후 row 변경점을 다 기록함. postgresql 에서는 wal_level=logical 이랑 비슷함."payload": { "before": { "id": 30000, "name": "테스트용", // ... }, "after": { "id": 30000, "name": "테스트용 변경", // ... }
binlog 오프셋 체크
HGETALL metadata:debezium:offsets
{ "ts_sec": 1755155084, "file": "binlog.000002", "pos": 7117146}
설정 reference : https://redis.io/docs/latest/integrate/write-behind/reference/debezium/mysql/
# SINK SETTINTGS
debezium.sink.type=redis
debezium.sink.redis.address=localhost:6379
debezium.sink.redis.db.index=0
# SOURCE SETTINGS
# reference=https://redis.io/docs/latest/integrate/write-behind/reference/debezium/mysql/
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.database.server.id=111
debezium.source.database.dbname=gyumin
debezium.source.database.hostname=localhost
debezium.source.database.port=3306
debezium.source.database.user=root
# debezium.source.databbase.password=
debezium.source.snapshot.mode=initial
# <DB_NAME>.<TABLE_NAME> 으로 테이블 지정함. 그리고 아래 라인 regex 적용으로 ',' separate 시키기때문에 해당 라인 옆에 주석 추가하면 미동작.
debezium.source.table.include.list=gyumin.orders,gyumin.users
# 이건 column.include 옵션으로 칼럼단위 cdc 할 때 사용. 지금은 table 단위 전체 cdc 보기때문에 해당 설정값 미필요.
debezium.source.skip.messages.without.change=true
# binlogs n초 주기로 읽어와서 redis stream 에 밀어넣음. 0으로 설정하면 오자마자 넣는거
debezium.source.offset.flush.interval.ms=1000
# 어디까지 binlog sink 되었는지는 redis 에 저장하도록 함
debezium.source.offset.storage=io.debezium.storage.redis.offset.RedisOffsetBackingStore
# <prefix>.<dbName>.<tableName> key 로 stream 에 쌓임
debezium.source.topic.prefix=cdc
# cdc 할려는 테이블의 ddl 이력을 저장함
debezium.source.schema.history.internal=io.debezium.storage.redis.history.RedisSchemaHistory
Mysql datetime to millis 알고있어야함.
reference : https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
DATETIME
with a value of2018-06-20 06:37:03
becomes1529476623000
.
정상동작 확인되었으니 해당 건 도커라이징 후 외부 환경변수 secret 세팅과 pod 로 로드하는 cicd 파이프라인 및 모니터링 설정 필요!
ETL
엔티티 만들고 변환시키는 로직 추가중.