Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Intro

기존에 저희는 Mysql 를 main db 로, 멀티프렌차이즈를 목표로 운영하다보니 여러 다양한 스키마들이 필요했고 이를 위한 Mongodb 를 secondary db 로 운영하고 있었습니다. 그리고 신규 피처로 대량의 회원추출을 진행하기 위해 mysql slave 를 보는 것이 아닌, cdc 를 통해서 Mongodb 에 회원정보를 upsert 하는 작업을 진행하고 있습니다. 이 과정에서 여러가지 조건들의 테이블을 단일 docs 로 합치게 된다면 mysql 에서 처럼 join 굳이 안해도 되기때문에 빠르게 조회가 가능해 집니다.

그래서 Debezium server 을 사용하여 Mysql binlog 를 cdc 하고, 이를 Redis Stream 으로 보내고, 별도 sink pod 에서 Redis Stream 해당 토픽 읽고 Mongodb 에 upsert 하는 구조로 진행하고 있습니다.

OLAP 에는 https://medium.com/@simeon.emanuilov/mongodb-vs-clickhouse-for-olap-1e430a40ade8 ClickHouse 가 Big Data Query 에 좀 더 좋다고는 하네요. 예로 createdAt 같은 칼럼을 상대로 10월 데이터 조회하면 ClickHouse 가 더 빠르다네요.

이 과정을 정리되지 않은 기록으로 우선 남깁니다.

Debezium Server CDC

  • kafka 없이 일반 debezium server 로 진행
    • reference: https://debezium.io/documentation/reference/3.2
    • https://debezium.io/documentation/reference/stable/operations/debezium-server.html

      공식문서에서 보면 debezium.sink.type 설정이 aws 키네시스밖에 없어서 다른거 확인 중… stream 을 꼭 써야하나? kafka 말고 redis stream 도 지원하긴 하는데 source/sink 가 stream -> mongodb 있는지 봐야함.

  • kafka stream → mongodb sink connector 은 있고 redis stream → mongodb sink connector 은 없음… 별도의 pod 를 띄워서 redis stream key 읽고 mongodb 에 upsert 하는 로직을 작성해야할 것 같다.
  • https://tech.kakaopay.com/post/kakaopaysec-mongodb-cdc/#4-consumer-group-생성-및-offset-설정 카카오페이에서는 kafka sink connector 사용함. 우리는 redis stream 쓰기때문에 별도로 만들어야함.

  • debezium server → redis stream 설정값 공식 reference
    • https://debezium.io/documentation/reference/2.7/operations/debezium-server.html#_redis_stream
  • django 통해서 postgresql → mongodb 직접 sink 한 오픈소스. 공식으로 지원하는게 아니라 그냥 참고용. https://github.com/maqboolthoufeeq/cdc_debezium
    • https://dev.to/maqboolthoufeeq/change-data-capture-cdc-with-debezium-server-no-kafka-django-postgres-mongodb-example-m0h

Debezium Server

  • Mysql → Debezium Server → Stream 의 프로세스 소개
    1. Snapshots
    • Debezium Server 가 동작할 때 현재 SQL server 상태의 스냅샷을 찍음. default가 INITIAL
      • recovery, custom, initial_only, etc.
    • 그래서 어느 테이블을 Capture 할거야? 정할 수 있음. table.include.list 파라미터 값으로
  • insert sql 테스트. insert 후 1 증가 확인
    • ghkdqhrbals@ip-172-30-1-90 debezium-server % redis-cli –json XREVRANGE mysql_sink.gyumin.users + - COUNT 1
      | jq -r ‘.[0][1][1] | fromjson | .payload.after.id’ 30001 ghkdqhrbals@ip-172-30-1-90 debezium-server % redis-cli –json XREVRANGE mysql_sink.gyumin.users + - COUNT 1
      | jq -r ‘.[0][1][1] | fromjson | .payload.after.id’ 30002
  • 페이로드 확인
    • payload(insert 는 before 없고 update 는 before 존재. + db & table 묶고 분기하여 mongodb upsert 하면 될 듯)
    • payload

        ...
        "payload": {
            "before": null,
            "after": {
              "id": 30002,
              "name": "테스트용 유저",
              // ...
            },
            "source": {
              "version": "3.2.0.Final",
              "connector": "mysql",
              "name": "mysql_sink",
              "ts_ms": 1755138551000,
              "snapshot": "false", // 이거는 initial 단계에서 추가된  아니라 이후 cdc 한거임.
              "db": "gyumin",
              "sequence": null,
              "ts_us": 1755138551000000,
              "ts_ns": 1755138551000000000,
              "table": "users",
              "server_id": 1,
              "gtid": null,
              "file": "binlog.000002",
              "pos": 7114433,
              "row": 0,
              "thread": 385,
              "query": null
            },
            "transaction": null,
            "op": "c", // c(insert). u(update), d(delete), r(read), t(truncate)
            // 초기에 db 읽어와서 redis stream 로드시킬  r  들어감.
            "ts_ms": 1755138551799,
            "ts_us": 1755138551799549,
            "ts_ns": 1755138551799549000
          }
      
  • source binload 세팅

      // source binload 세팅
      "source": {
        "version": "3.2.0.Final",         // 2025.08.17  stable  최신 버전임. 
        "connector": "mysql",              
        "name": "mysql_sink",              
        "ts_ms": 1755138551000,            // 소스 이벤트 시각(ms). MySQL의 binlog 이벤트 타임스탬프 기반
        "snapshot": "false",               // 스냅샷 여부: true(기존 db  있던 rows 읽어올 때), false(실시간 cdc)
        "db": "gyumin",                  
        "sequence": null,                  
        "ts_us": 1755138551000000,         // binlog 해당 이벤트 시간
        "ts_ns": 1755138551000000000,      
        "table": "users",                  // cdc  테이블
        "server_id": 1,                    // mysql server id. 이거 값을 config 에서 우리가 설정가능
        "gtid": null,                      // GTID 모드 사용  해당 트랜잭션의 GTID. 비활성화면 null
        "file": "binlog.000002",           // 이벤트를 담고 있는 binlog 파일명
        "pos": 7114433,                    //  binlog 파일  오프셋. debezium server 재시작하면 여기서부터 다시 cdc 함.
        "row": 0,                          // insert values 2개면 row 0, 1  나뉨.
        "thread": 385,                     // mysql db 세션의 id
        "query": null                      // ddl 쿼리. dml 에서는 null  표시함. 이거는 insert 한거라 null
      }
    
  • update 시 페이로드 체크.

    전체 row 변경점이 기록되기때문에 upsert 치기 좋긴 함. 변경된 부분만 binlog 에 기록되는게 아닌. mysql 에선 SHOW VARIABLES LIKE 'binlog_row_image'; 이거 FULL 이면 전/후 row 변경점을 다 기록함. postgresql 에서는 wal_level=logical 이랑 비슷함.

      "payload": {
        "before": {
          "id": 30000,
          "name": "테스트용",
          // ...
        },
        "after": {
          "id": 30000,
          "name": "테스트용 변경",
          // ...
        }
    
  • binlog 오프셋 체크 HGETALL metadata:debezium:offsets

    { "ts_sec": 1755155084, "file": "binlog.000002", "pos": 7117146}

설정 reference : https://redis.io/docs/latest/integrate/write-behind/reference/debezium/mysql/

# SINK SETTINTGS
debezium.sink.type=redis
debezium.sink.redis.address=localhost:6379
debezium.sink.redis.db.index=0

# SOURCE SETTINGS
# reference=https://redis.io/docs/latest/integrate/write-behind/reference/debezium/mysql/
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.database.server.id=111
debezium.source.database.dbname=gyumin
debezium.source.database.hostname=localhost
debezium.source.database.port=3306
debezium.source.database.user=root
# debezium.source.databbase.password=
debezium.source.snapshot.mode=initial
# <DB_NAME>.<TABLE_NAME> 으로 테이블 지정함. 그리고 아래 라인 regex 적용으로 ',' separate 시키기때문에 해당 라인 옆에 주석 추가하면 미동작.
debezium.source.table.include.list=gyumin.orders,gyumin.users

# 이건 column.include 옵션으로 칼럼단위 cdc 할 때 사용. 지금은 table 단위 전체 cdc 보기때문에 해당 설정값 미필요.
debezium.source.skip.messages.without.change=true
# binlogs n초 주기로 읽어와서 redis stream 에 밀어넣음. 0으로 설정하면 오자마자 넣는거
debezium.source.offset.flush.interval.ms=1000
# 어디까지 binlog sink 되었는지는 redis 에 저장하도록 함
debezium.source.offset.storage=io.debezium.storage.redis.offset.RedisOffsetBackingStore
# <prefix>.<dbName>.<tableName> key 로 stream 에 쌓임
debezium.source.topic.prefix=cdc
# cdc 할려는 테이블의 ddl 이력을 저장함
debezium.source.schema.history.internal=io.debezium.storage.redis.history.RedisSchemaHistory

Mysql datetime to millis 알고있어야함.

reference : https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types DATETIME with a value of 2018-06-20 06:37:03 becomes 1529476623000.

정상동작 확인되었으니 해당 건 도커라이징 후 외부 환경변수 secret 세팅과 pod 로 로드하는 cicd 파이프라인 및 모니터링 설정 필요!

ETL

엔티티 만들고 변환시키는 로직 추가중.