Skip to main content Link Menu Expand (external link) Document Search Copy Copied

Intro

기존에 저희는 Mysql 를 main db 로, 멀티프렌차이즈를 목표로 운영하다보니 여러 다양한 스키마들이 필요했고 이를 위한 Mongodb 를 secondary db 로 운영하고 있었습니다. 그리고 신규 피처로 대량의 회원추출을 진행하기 위해 mysql slave 를 보는 것이 아닌, cdc 를 통해서 Mongodb 에 회원정보를 upsert 하는 작업을 진행하고 있습니다. 이 과정에서 여러가지 조건들의 테이블을 단일 docs 로 합치게 된다면 mysql 에서 처럼 join 굳이 안해도 되기때문에 빠르게 조회가 가능해 집니다.

그래서 Debezium server 을 사용하여 Mysql binlog 를 cdc 하고, 이를 Redis Stream 으로 보내고, 별도 sink pod 에서 Redis Stream 해당 토픽 읽고 Mongodb 에 upsert 하는 구조로 진행하고 있습니다.

OLAP 에는 https://medium.com/@simeon.emanuilov/mongodb-vs-clickhouse-for-olap-1e430a40ade8 ClickHouse 가 Big Data Query 에 좀 더 좋다고는 하네요. 예로 createdAt 같은 칼럼을 상대로 10월 데이터 조회하면 ClickHouse 가 더 빠르다네요.

이 과정을 정리되지 않은 기록으로 우선 남깁니다.

Debezium Server CDC

  • kafka 없이 일반 debezium server 로 진행
    • reference: https://debezium.io/documentation/reference/3.2
    • https://debezium.io/documentation/reference/stable/operations/debezium-server.html

      공식문서에서 보면 debezium.sink.type 설정이 aws 키네시스밖에 없어서 다른거 확인 중… stream 을 꼭 써야하나? kafka 말고 redis stream 도 지원하긴 하는데 source/sink 가 stream -> mongodb 있는지 봐야함.

  • kafka stream → mongodb sink connector 은 있고 redis stream → mongodb sink connector 은 없음… 별도의 pod 를 띄워서 redis stream key 읽고 mongodb 에 upsert 하는 로직을 작성해야할 것 같다.
  • https://tech.kakaopay.com/post/kakaopaysec-mongodb-cdc/#4-consumer-group-생성-및-offset-설정 카카오페이에서는 kafka sink connector 사용함. 우리는 redis stream 쓰기때문에 별도로 만들어야함.

  • debezium server → redis stream 설정값 공식 reference
    • https://debezium.io/documentation/reference/2.7/operations/debezium-server.html#_redis_stream
  • django 통해서 postgresql → mongodb 직접 sink 한 오픈소스. 공식으로 지원하는게 아니라 그냥 참고용. https://github.com/maqboolthoufeeq/cdc_debezium
    • https://dev.to/maqboolthoufeeq/change-data-capture-cdc-with-debezium-server-no-kafka-django-postgres-mongodb-example-m0h

Debezium Server

  • Mysql → Debezium Server → Stream 의 프로세스 소개
    1. Snapshots
    • Debezium Server 가 동작할 때 현재 SQL server 상태의 스냅샷을 찍음. default가 INITIAL
      • recovery, custom, initial_only, etc.
    • 그래서 어느 테이블을 Capture 할거야? 정할 수 있음. table.include.list 파라미터 값으로
  • insert sql 테스트. insert 후 1 증가 확인
    • ghkdqhrbals@ip-172-30-1-90 debezium-server % redis-cli –json XREVRANGE mysql_sink.gyumin.users + - COUNT 1
      | jq -r ‘.[0][1][1] | fromjson | .payload.after.id’ 30001 ghkdqhrbals@ip-172-30-1-90 debezium-server % redis-cli –json XREVRANGE mysql_sink.gyumin.users + - COUNT 1
      | jq -r ‘.[0][1][1] | fromjson | .payload.after.id’ 30002
  • 페이로드 확인
    • payload(insert 는 before 없고 update 는 before 존재. + db & table 묶고 분기하여 mongodb upsert 하면 될 듯)
    • payload

        ...
        "payload": {
            "before": null,
            "after": {
              "id": 30002,
              "name": "테스트용 유저",
              // ...
            },
            "source": {
              "version": "3.2.0.Final",
              "connector": "mysql",
              "name": "mysql_sink",
              "ts_ms": 1755138551000,
              "snapshot": "false", // 이거는 initial 단계에서 추가된  아니라 이후 cdc 한거임.
              "db": "gyumin",
              "sequence": null,
              "ts_us": 1755138551000000,
              "ts_ns": 1755138551000000000,
              "table": "users",
              "server_id": 1,
              "gtid": null,
              "file": "binlog.000002",
              "pos": 7114433,
              "row": 0,
              "thread": 385,
              "query": null
            },
            "transaction": null,
            "op": "c", // c(insert). u(update), d(delete), r(read), t(truncate)
            // 초기에 db 읽어와서 redis stream 로드시킬  r  들어감.
            "ts_ms": 1755138551799,
            "ts_us": 1755138551799549,
            "ts_ns": 1755138551799549000
          }
      
  • source binload 세팅

      // source binload 세팅
      "source": {
        "version": "3.2.0.Final",         // 2025.08.17  stable  최신 버전임. 
        "connector": "mysql",              
        "name": "mysql_sink",              
        "ts_ms": 1755138551000,            // 소스 이벤트 시각(ms). MySQL의 binlog 이벤트 타임스탬프 기반
        "snapshot": "false",               // 스냅샷 여부: true(기존 db  있던 rows 읽어올 때), false(실시간 cdc)
        "db": "gyumin",                  
        "sequence": null,                  
        "ts_us": 1755138551000000,         // binlog 해당 이벤트 시간
        "ts_ns": 1755138551000000000,      
        "table": "users",                  // cdc  테이블
        "server_id": 1,                    // mysql server id. 이거 값을 config 에서 우리가 설정가능
        "gtid": null,                      // GTID 모드 사용  해당 트랜잭션의 GTID. 비활성화면 null
        "file": "binlog.000002",           // 이벤트를 담고 있는 binlog 파일명
        "pos": 7114433,                    //  binlog 파일  오프셋. debezium server 재시작하면 여기서부터 다시 cdc 함.
        "row": 0,                          // insert values 2개면 row 0, 1  나뉨.
        "thread": 385,                     // mysql db 세션의 id
        "query": null                      // ddl 쿼리. dml 에서는 null  표시함. 이거는 insert 한거라 null
      }
    
  • update 시 페이로드 체크.

    전체 row 변경점이 기록되기때문에 upsert 치기 좋긴 함. 변경된 부분만 binlog 에 기록되는게 아닌. mysql 에선 SHOW VARIABLES LIKE 'binlog_row_image'; 이거 FULL 이면 전/후 row 변경점을 다 기록함. postgresql 에서는 wal_level=logical 이랑 비슷함.

      "payload": {
        "before": {
          "id": 30000,
          "name": "테스트용",
          // ...
        },
        "after": {
          "id": 30000,
          "name": "테스트용 변경",
          // ...
        }
    
  • binlog 오프셋 체크 HGETALL metadata:debezium:offsets

    { "ts_sec": 1755155084, "file": "binlog.000002", "pos": 7117146}

설정 reference : https://redis.io/docs/latest/integrate/write-behind/reference/debezium/mysql/

# SINK SETTINTGS
debezium.sink.type=redis
debezium.sink.redis.address=localhost:6379
debezium.sink.redis.db.index=0

# SOURCE SETTINGS
# reference=https://redis.io/docs/latest/integrate/write-behind/reference/debezium/mysql/
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.database.server.id=111
debezium.source.database.dbname=gyumin
debezium.source.database.hostname=localhost
debezium.source.database.port=3306
debezium.source.database.user=root
# debezium.source.databbase.password=
debezium.source.snapshot.mode=initial
# <DB_NAME>.<TABLE_NAME> 으로 테이블 지정함. 그리고 아래 라인 regex 적용으로 ',' separate 시키기때문에 해당 라인 옆에 주석 추가하면 미동작.
debezium.source.table.include.list=gyumin.orders,gyumin.users

# 이건 column.include 옵션으로 칼럼단위 cdc 할 때 사용. 지금은 table 단위 전체 cdc 보기때문에 해당 설정값 미필요.
debezium.source.skip.messages.without.change=true
# binlogs n초 주기로 읽어와서 redis stream 에 밀어넣음. 0으로 설정하면 오자마자 넣는거
debezium.source.offset.flush.interval.ms=1000
# 어디까지 binlog sink 되었는지는 redis 에 저장하도록 함
debezium.source.offset.storage=io.debezium.storage.redis.offset.RedisOffsetBackingStore
# <prefix>.<dbName>.<tableName> key 로 stream 에 쌓임
debezium.source.topic.prefix=cdc
# cdc 할려는 테이블의 ddl 이력을 저장함
debezium.source.schema.history.internal=io.debezium.storage.redis.history.RedisSchemaHistory

증분 스냅샷

  • 이건 cdc 할 테이블 나중에 추가할 떄 사용. 해당 테이블만 초기 rows 읽고 initialize 할 때 사용한다.
    • https://debezium.io/blog/2021/10/07/incremental-snapshots/
    • https://debezium.io/documentation/reference/stable/connectors/mysql.html#debezium-mysql-incremental-snapshots
    • 트릭을 이용하는건데(공식에서 이렇게 말함) 아무 테이블 하나 만들고 .<table> 을 table.list 추가한 뒤 실행. 해당 테이블에 insert 쿼리 아래와 같이 전송하면 debezium 이 이 테이블 읽고있다가 스냅샷 쿼리 인지 후 테이블들을 청크단위 스냅샷 진행한다.

reference

# 증분 스냅샷 명령용 테이블
CREATE TABLE db_schema.dbz_signal (
  id    VARCHAR(64) PRIMARY KEY,
  type  VARCHAR(32) NOT NULL,
  data  VARCHAR(2048) NULL
);

# debezium server property  위의 테이블 캡처하도록 설정. + 추가할 테이블도 바라보게. e.g. new_schema.users 

# new_schema.users 이러이러한 테이블 스냅샷 가져와서 initialize 해라라고 명령하는 쿼리
INSERT INTO db_schema.dbz_signal (id, type, data)
VALUES (
   'signal-1',
   'execute-snapshot',
   '{"data-collections":["new_schema.users"],"type":"incremental"}'
;

이러면!! 스트림이 생성되고

XLEN mysql_sink.new_schema.users 로 크기 및 실제 페이로드를 확인해보면 new_schema 에 들어있던 rows 들이 전부 스트림에 로드된 것을 확인할 수 있다.

Mysql datetime to millis 알고있어야함.

reference : https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types DATETIME with a value of 2018-06-20 06:37:03 becomes 1529476623000.

정상동작 확인되었으니 해당 건 도커라이징 후 외부 환경변수 secret 세팅과 pod 로 로드하는 cicd 파이프라인 및 모니터링 설정 필요!

CDC 안정화

  • redis commandTimeout > redis XGROUPREAD pollTimeout 설정해줘야 됨. pollTimeout 이 XGROUPREAD 로 읽을 때 command 대기시간 설정하는건데 이 값은 redis connection 자체관리 commandTimeout 값보다 작아야한다.
  • 싱크실패 dbms row 재처리 로직 필요
    • 진짜 문제는 만약 주문테이블 싱크할 때 하나가 실패했다고 가정한다면 꽤 까다롭다. PEL 에 들어있는 해당 주문을 다시 싱크 해주어야하는데 딱 이때 마침 해당 주문이 업데이트되면 PEL 재처리할 떄 덮어씌워진다. A –> B –> C 되어야하는데, A -x-> B 가 실패하고, 그 사이에 C 가 들어오면 A -> C 로 되어버린다. 그래서 PEL 재처리할 때 해당 row 가 업데이트된게 있는지 체크하는 로직이 필요하다. 만약 해당 row 가 업데이트 되었다면 재처리할 떄 무시한다(이미 최신의 값으로 업데이트되었기 때문). 이떄 row delete 된 경우를 생각해야한다. row 가 삭제되면 언제 삭제되었는지 시간을 따로 남겨서 재처리 메세지 시간이랑 비교해서 우선권이 있는 메세지를 적용해야하는데 이 부분은 우리 기본정책이 소프트 delete 라 크게 신경안써도 될 듯! 종합하면 row 업데이트 시간을 보고 재처리 메세지 시간보다 최신이면 재처리 무시, 아니면 재처리 진행한다.

구현한 Sink (CDC + ETL) 데이터 적재 모듈 관련 내용정리

DB(MySQL) 변경을 Debezium CDC 로 Redis stream 적재 및 stream key 구독/수신하여 MongoDB 도메인 문서로 적재하는 모듈에 대한 설명.

테이블 단위로 커넥터를 추가해 mysql 테이블을 mongodb 도메인 문서로 매핑/적재하는 구조.

전체 흐름

1. CDC, ETL 모듈 배포

git actions 기존 프렌차이즈 별 수동배포에 공통모듈 배포하는 방식 추가 후 개별 모듈(debezium, sink module) CI/CD 파이프라인 구축. 주요 파라미터(db, redis, mongo, quarkus params, etc.) 는 deploy.yaml 에 envset 으로 linux 로컬 export env 가 치환되어 주입하도록 설정.

data

2. Debezium Server CDC

Debezium Server 가 MySQL binlog 를 읽어 변경 이벤트를 JSON payload로 redis stream 에 전송

3. ETL 모듈

1) Row 데이터 클래스 정의

  • 위치: order-hub-sink/src/main/kotlin/…/connector//
  • Debezium after/before 스냅샷을 그대로 담는 용도
  • 가능한 null 허용으로 선언(ddl 칼럼 nullable 로 변경될 수도 있어서)

2) Mongo 도메인 문서 정의(E : SinkClass)

  • 위치: order-hub/src/main/kotlin/…/domain/sink//
  • sinkInfo(mysqlId, franchiseCode, sinkedAt) 포함
  • 정규화 풀려면 upsert, toEntity 구조 리모델링

3) 싱크 커넥터 구현

  • @Component class XxxSinkConnector(mongoTemplate: MongoTemplate)
  • SinkConnector<{Row}, {Entity}> 상속(이 때 Row 는 mysql 테이블 읽는 중간 클래스, Entity 는 mongodb @Document)
  • typeRef: TypeReference<DebeziumMessage>() 구현
  • mysqlTableName 과 collectionName 지정
  • toEntity(row, franchise)에서 매핑 작성
  • keyQuery(row, franchise)에서 upsert/remove 키 작성
  • 참고: 단순 테이블은 findAndReplace(upsert)로 충분. 문서 내부 별도로 비정규화한 아이템 배열을 수정해야 하는 경우(예: order_items)에는 안전한 필드 단위 .set 사용을 고려

  • SinkConnector<T, E : SinkClass>
    • handle(): DebeziumMessage 파싱, op(c/r/u/d) 에 따라 upsert/remove 수행
    • toEntity(row, franchise): Row -> Mongo 도메인 문서 변환
    • keyQuery(row, franchise): upsert/remove 키 쿼리 작성(보통 sinkInfo.mysqlId + sinkInfo.franchiseCode)
    • mysqlTableName: 담당 MySQL 테이블명 (ex. “orders”, “order_items”)
    • collectionName: 대상 Mongo 컬렉션명 (ex. “sink_orders”)
  • SinkConnectorFinder: mysqlTableName 기준으로 적절한 커넥터 선택

4) 라우팅 등록

  • @Component 만 달면 SinkConnectorFinder 가 자동 수집하여 mysqlTableName 으로 라우팅

5) 테스트 작성

  • Kotest StringSpec + Testcontainers(MongoDB)
  • DebeziumMessage 래퍼를 직접 생성해 c/u/d 케이스를 시뮬레이션
  • 멀티 프랜차이즈(FranchiseCode) 격리, 배열 추가/갱신/삭제, Null/극단값 등 엣지 케이스 검증

6) Order Hub Sink 모듈이 SinkBindingsConfig 에서 설정한 redis stream 구독, 메시지 수신 7) Debezium JSON을 파싱하여 테이블별 SinkConnector 로 라우팅 8) 각 커넥터가 Row(테이블 스키마) -> 도메인 Sink 문서로 매핑하고 MongoDB에 upsert/remove 수행 9) franchiseCode + mysqlId 를 문서의 sinkInfo 에 보관. 동일한 mysqlId 라도 가맹별로 격리 저장되어있음.

Appendix 4. 새로운 테이블 추가 시 체크리스트

  • Row 데이터 클래스 생성 (snake_case, null 허용, BigDecimal/OffsetDateTime 등)
  • 도메인 문서(SinkClass 를 상속받는) 생성
  • 싱크커넥터 구현 및 SinkBindingsConfig 에 @Bean 등록
  • 비정규화 필요 시 OrderItem 참조(개별 upsert, toEntity).
  • Kotest + Testcontainers 테스트 작성 (c/u/d, 멀티 프랜차이즈, 엣지 케이스)
  • Debezium 스냅샷/시그널 전략 검토.(해당 테이블 기존 row 스냅샷 need? or not)
    • 만약 need 하면 위 증분 스냅샷 전략 참고
  • applicaiton.properties 알아놓으면 좋은 규칙들
    • line 끝에 주석달면 Regex 오류남.
    • big decimal 은 debezium.source.decimal.handling.mode=string 으로 설정해야지 정확히 읽어옴.
    • setup each table as .
    • Debezium Server 설정에 맞춰 execute-snapshot 등 시그널을 사용해 특정 테이블 스냅샷 수행 가능

본 모듈에 정의한 Rules

  • sinkedAt 은 binlog event 발생시점이며, 본 모듈에서는 기존 문서의 sinkedAt 보다 빠른 sinkedAt 이 도착하면 그대로 sink 함. 이유는 select 쿼리 네트워크 비용을 줄이기 위함.
  • sink 실패건은 1회에 한해 재시도함. 이후 성공여부 관계없이 pending & stream 에서 제거. 이후 정확한 데이터가 필요할 시 재처리 전략 변경 필요.ㄴ
  • sink 성공하면 stream 에서 제거하여 메모리 확보함.
  • ex) Order 문서에 OrderItem 을 포함하도록 비정규화 시 Order 보다 OrderItem 이 먼저 생길 수 있음. 이를 고려해야함.
  • Row 데이터 클래스는 최대한 null 허용으로 선언하여 이후 RDBMS DDL 변경에 유연하게 대응.
  • db.menus.createIndex({ “sinkInfo.mysqlId”: 1 })
  • db.sink_orders.createIndex({ “sinkInfo.mysqlId”: 1 })
  • db.order_coupons.createIndex({ “sinkInfo.mysqlId”: 1 })
  • db.users.createIndex({ “sinkInfo.mysqlId”: 1 })
  • db.user_terms.createIndex({ “sinkInfo.mysqlId”: 1 })