Table of contents
Intro
기존에 저희는 Mysql 를 main db 로, 멀티프렌차이즈를 목표로 운영하다보니 여러 다양한 스키마들이 필요했고 이를 위한 Mongodb 를 secondary db 로 운영하고 있었습니다. 그리고 신규 피처로 대량의 회원추출을 진행하기 위해 mysql slave 를 보는 것이 아닌, cdc 를 통해서 Mongodb 에 회원정보를 upsert 하는 작업을 진행하고 있습니다. 이 과정에서 여러가지 조건들의 테이블을 단일 docs 로 합치게 된다면 mysql 에서 처럼 join 굳이 안해도 되기때문에 빠르게 조회가 가능해 집니다.
그래서 Debezium server 을 사용하여 Mysql binlog 를 cdc 하고, 이를 Redis Stream 으로 보내고, 별도 sink pod 에서 Redis Stream 해당 토픽 읽고 Mongodb 에 upsert 하는 구조로 진행하고 있습니다.
OLAP 에는 https://medium.com/@simeon.emanuilov/mongodb-vs-clickhouse-for-olap-1e430a40ade8 ClickHouse 가 Big Data Query 에 좀 더 좋다고는 하네요. 예로 createdAt 같은 칼럼을 상대로 10월 데이터 조회하면 ClickHouse 가 더 빠르다고합니다.
이 과정을 정리되지 않은 기록으로 우선 남깁니다.
Debezium Server CDC
- kafka 없이 일반 debezium server 로 진행
- reference: https://debezium.io/documentation/reference/3.2
- https://debezium.io/documentation/reference/stable/operations/debezium-server.html
공식문서에서 보면 debezium.sink.type 설정이 aws 키네시스밖에 없어서 다른거 확인 중… stream 을 꼭 써야하나? kafka 말고 redis stream 도 지원하긴 하는데 source/sink 가 stream -> mongodb 있는지 봐야함.
- kafka stream → mongodb sink connector 은 있고 redis stream → mongodb sink connector 은 없음… 별도의 pod 를 띄워서 redis stream key 읽고 mongodb 에 upsert 하는 로직을 작성해야할 것 같다.
https://tech.kakaopay.com/post/kakaopaysec-mongodb-cdc/#4-consumer-group-생성-및-offset-설정 카카오페이에서는 kafka sink connector 사용함. 우리는 redis stream 쓰기때문에 별도로 만들어야함.
- debezium server → redis stream 설정값 공식 reference
- https://debezium.io/documentation/reference/2.7/operations/debezium-server.html#_redis_stream
- django 통해서 postgresql → mongodb 직접 sink 한 오픈소스. 공식으로 지원하는게 아니라 그냥 참고용. https://github.com/maqboolthoufeeq/cdc_debezium
- https://dev.to/maqboolthoufeeq/change-data-capture-cdc-with-debezium-server-no-kafka-django-postgres-mongodb-example-m0h
- Mysql → Debezium Server → Redis Stream 의 프로세스 소개
- Snapshots
- Debezium Server 가 동작할 때 현재 SQL server 상태의 스냅샷을 찍음. default가 INITIAL
- recovery, custom, initial_only, etc.
- 그래서 어느 테이블을 Capture 할거야? 정할 수 있음.
table.include.list
파라미터 값으로
- insert sql 테스트. insert 후 1 증가 확인
ghkdqhrbals@ip-172-30-1-90 debezium-server % redis-cli --json XREVRANGE mysql_sink.gyumin.users + - COUNT 1 | jq -r '.[0][1][1] | fromjson | .payload.after.id'
30001ghkdqhrbals@ip-172-30-1-90 debezium-server % redis-cli --json XREVRANGE mysql_sink.gyumin.users + - COUNT 1 | jq -r '.[0][1][1] | fromjson | .payload.after.id'
30002
- 페이로드 확인
- payload(insert 는 before 없고 update 는 before 존재. + db & table 묶고 분기하여 mongodb upsert 하면 될 듯)
payload
... "payload": { "before": null, "after": { "id": 30002, "name": "테스트용 유저", // ... }, "source": { "version": "3.2.0.Final", "connector": "mysql", "name": "mysql_sink", "ts_ms": 1755138551000, "snapshot": "false", // 이거는 initial 단계에서 추가된 게 아니라 이후 cdc 한거임. "db": "gyumin", "sequence": null, "ts_us": 1755138551000000, "ts_ns": 1755138551000000000, "table": "users", "server_id": 1, "gtid": null, "file": "binlog.000002", "pos": 7114433, "row": 0, "thread": 385, "query": null }, "transaction": null, "op": "c", // c(insert). u(update), d(delete), r(read), t(truncate) // 초기에 db 읽어와서 redis stream 로드시킬 떄 r 로 들어감. "ts_ms": 1755138551799, "ts_us": 1755138551799549, "ts_ns": 1755138551799549000 }
source binload 세팅
// source binload 세팅 "source": { "version": "3.2.0.Final", // 2025.08.17 에 stable 한 최신 버전임. "connector": "mysql", "name": "mysql_sink", "ts_ms": 1755138551000, // 소스 이벤트 시각(ms). MySQL의 binlog 이벤트 타임스탬프 기반 "snapshot": "false", // 스냅샷 여부: true(기존 db 에 있던 rows 읽어올 때), false(실시간 cdc) "db": "gyumin", "sequence": null, "ts_us": 1755138551000000, // binlog 해당 이벤트 시간 "ts_ns": 1755138551000000000, "table": "users", // cdc 한 테이블 "server_id": 1, // mysql server id. 이거 값을 config 에서 우리가 설정가능 "gtid": null, // GTID 모드 사용 시 해당 트랜잭션의 GTID. 비활성화면 null "file": "binlog.000002", // 이벤트를 담고 있는 binlog 파일명 "pos": 7114433, // 위 binlog 파일 내 오프셋. debezium server 재시작하면 여기서부터 다시 cdc 함. "row": 0, // insert values 2개면 row 0, 1 로 나뉨. "thread": 385, // mysql db 세션의 id "query": null // ddl 쿼리. dml 에서는 null 로 표시함. 이거는 insert 한거라 null }
update 시 페이로드 체크.
전체 row 변경점이 기록되기때문에 upsert 치기 좋긴 함. 변경된 부분만 binlog 에 기록되는게 아닌. mysql 에선
SHOW VARIABLES LIKE 'binlog_row_image';
이거 FULL 이면 전/후 row 변경점을 다 기록함. postgresql 에서는 wal_level=logical 이랑 비슷함."payload": { "before": { "id": 30000, "name": "테스트용", // ... }, "after": { "id": 30000, "name": "테스트용 변경", // ... }
binlog 오프셋 체크
HGETALL metadata:debezium:offsets
{ "ts_sec": 1755155084, "file": "binlog.000002", "pos": 7117146}
설정 reference : https://redis.io/docs/latest/integrate/write-behind/reference/debezium/mysql/
# SINK SETTINTGS
debezium.sink.type=redis
debezium.sink.redis.address=localhost:6379
debezium.sink.redis.db.index=0
# SOURCE SETTINGS
# reference=https://redis.io/docs/latest/integrate/write-behind/reference/debezium/mysql/
debezium.source.connector.class=io.debezium.connector.mysql.MySqlConnector
debezium.source.database.server.id=111
debezium.source.database.dbname=gyumin
debezium.source.database.hostname=localhost
debezium.source.database.port=3306
debezium.source.database.user=root
# debezium.source.databbase.password=
debezium.source.snapshot.mode=initial
# <DB_NAME>.<TABLE_NAME> 으로 테이블 지정함. 그리고 아래 라인 regex 적용으로 ',' separate 시키기때문에 해당 라인 옆에 주석 추가하면 미동작.
debezium.source.table.include.list=gyumin.orders,gyumin.users
# 이건 column.include 옵션으로 칼럼단위 cdc 할 때 사용. 지금은 table 단위 전체 cdc 보기때문에 해당 설정값 미필요.
debezium.source.skip.messages.without.change=true
# binlogs n초 주기로 읽어와서 redis stream 에 밀어넣음. 0으로 설정하면 오자마자 넣는거
debezium.source.offset.flush.interval.ms=1000
# 어디까지 binlog sink 되었는지는 redis 에 저장하도록 함
debezium.source.offset.storage=io.debezium.storage.redis.offset.RedisOffsetBackingStore
# <prefix>.<dbName>.<tableName> key 로 stream 에 쌓임
debezium.source.topic.prefix=cdc
# cdc 할려는 테이블의 ddl 이력을 저장함
debezium.source.schema.history.internal=io.debezium.storage.redis.history.RedisSchemaHistory
증분 스냅샷
- 이건 cdc 할 테이블 나중에 추가할 떄 사용. 해당 테이블만 초기 rows 읽고 initialize 할 때 사용한다.
- https://debezium.io/blog/2021/10/07/incremental-snapshots/
- https://debezium.io/documentation/reference/stable/connectors/mysql.html#debezium-mysql-incremental-snapshots
- 트릭을 이용하는건데(공식에서 이렇게 말함) 아무 테이블 하나 만들고
.<table> 을 table.list 추가한 뒤 실행. 해당 테이블에 insert 쿼리 아래와 같이 전송하면 debezium 이 이 테이블 읽고있다가 스냅샷 쿼리 인지 후 테이블들을 청크단위 스냅샷 진행한다.
# 증분 스냅샷 명령용 테이블
CREATE TABLE db_schema.dbz_signal (
id VARCHAR(64) PRIMARY KEY,
type VARCHAR(32) NOT NULL,
data VARCHAR(2048) NULL
);
# debezium server property 에 위의 테이블 캡처하도록 설정. + 추가할 테이블도 바라보게. e.g. new_schema.users
# new_schema.users 이러이러한 테이블 스냅샷 가져와서 initialize 해라라고 명령하는 쿼리
INSERT INTO db_schema.dbz_signal (id, type, data)
VALUES (
'signal-1',
'execute-snapshot',
'{"data-collections":["new_schema.users"],"type":"incremental"}'
;
이러면!! 스트림이 생성되고
XLEN mysql_sink.new_schema.users
로 크기 및 실제 페이로드를 확인해보면 new_schema 에 들어있던 rows 들이 전부 스트림에 로드된 것을 확인할 수 있다.
Mysql datetime to millis 알고있어야함.
reference : https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-temporal-types
DATETIME
with a value of2018-06-20 06:37:03
becomes1529476623000
.
정상동작 확인되었으니 해당 건 도커라이징 후 외부 환경변수 secret 세팅과 pod 로 로드하는 cicd 파이프라인 및 모니터링 설정 필요!
CDC 안정화
- redis commandTimeout > redis XGROUPREAD pollTimeout 설정해줘야 됨. pollTimeout 이 XGROUPREAD 로 읽을 때 command 대기시간 설정하는건데 이 값은 redis connection 자체관리 commandTimeout 값보다 작아야한다.
- 싱크실패 dbms row 재처리 로직 필요
- 진짜 문제는 만약 주문테이블 싱크할 때 하나가 실패했다고 가정한다면 꽤 까다롭다. PEL 에 들어있는 해당 주문을 다시 싱크 해주어야하는데 딱 이때 마침 해당 주문이 업데이트되면 PEL 재처리할 떄 덮어씌워진다. A –> B –> C 되어야하는데, A -x-> B 가 실패하고, 그 사이에 C 가 들어오면 A -> C 로 되어버린다. 그래서 PEL 재처리할 때 해당 row 가 업데이트된게 있는지 체크하는 로직이 필요하다. 만약 해당 row 가 업데이트 되었다면 재처리할 떄 무시한다(이미 최신의 값으로 업데이트되었기 때문). 이떄 row delete 된 경우를 생각해야한다. row 가 삭제되면 언제 삭제되었는지 시간을 따로 남겨서 재처리 메세지 시간이랑 비교해서 우선권이 있는 메세지를 적용해야하는데 이 부분은 우리 기본정책이 소프트 delete 라 크게 신경안써도 될 듯! 종합하면 row 업데이트 시간을 보고 재처리 메세지 시간보다 최신이면 재처리 무시, 아니면 재처리 진행한다.
구현한 Sink (CDC + ETL) 데이터 적재 모듈 관련 내용정리
DB(MySQL) 변경을 Debezium CDC 로 Redis stream 적재 및 stream key 구독/수신하여 MongoDB 도메인 문서로 적재하는 모듈에 대한 설명.
테이블 단위로 커넥터를 추가해 mysql 테이블을 mongodb 도메인 문서로 매핑/적재하는 구조.
전체 흐름
CDC, ETL 모듈 배포
git actions 기존 프렌차이즈 별 수동배포에 공통모듈 배포하는 방식 추가 후 개별 모듈(debezium, sink module) CI/CD 파이프라인 구축. 주요 파라미터(db, redis, mongo, quarkus params, etc.) 는 deploy.yaml 에 envset 으로 linux 로컬 export env 가 치환되어 주입하도록 설정.
Debezium Server CDC
Debezium Server 가 MySQL binlog 를 읽어 변경 이벤트를 JSON payload로 redis stream 에 전송
ETL 모듈
1) Row 데이터 클래스 정의
- 위치: order-hub-sink/src/main/kotlin/…/connector/
/ - Debezium after/before 스냅샷을 그대로 담는 용도
- 가능한 null 허용으로 선언(ddl 칼럼 nullable 로 변경될 수도 있어서)
2) Mongo 도메인 문서 정의(E : SinkClass)
- 위치: order-hub/src/main/kotlin/…/domain/sink/
/ - sinkInfo(mysqlId, franchiseCode, sinkedAt) 포함
- 정규화 풀려면 upsert, toEntity 구조 리모델링
3) 싱크 커넥터 구현
- @Component class XxxSinkConnector(mongoTemplate: MongoTemplate)
- SinkConnector<{Row}, {Entity}> 상속(이 때 Row 는 mysql 테이블 읽는 중간 클래스, Entity 는 mongodb @Document)
- typeRef: TypeReference<DebeziumMessage
>() 구현
- mysqlTableName 과 collectionName 지정
- toEntity(row, franchise)에서 매핑 작성
- keyQuery(row, franchise)에서 upsert/remove 키 작성
참고: 단순 테이블은 findAndReplace(upsert)로 충분. 문서 내부 별도로 비정규화한 아이템 배열을 수정해야 하는 경우(예: order_items)에는 안전한 필드 단위 .set 사용을 고려
- SinkConnector<T, E : SinkClass>
- handle(): DebeziumMessage 파싱, op(c/r/u/d) 에 따라 upsert/remove 수행
- toEntity(row, franchise): Row -> Mongo 도메인 문서 변환
- keyQuery(row, franchise): upsert/remove 키 쿼리 작성(보통 sinkInfo.mysqlId + sinkInfo.franchiseCode)
- mysqlTableName: 담당 MySQL 테이블명 (ex. “orders”, “order_items”)
- collectionName: 대상 Mongo 컬렉션명 (ex. “sink_orders”)
- SinkConnectorFinder: mysqlTableName 기준으로 적절한 커넥터 선택
4) 라우팅 등록
- @Component 만 달면 SinkConnectorFinder 가 자동 수집하여 mysqlTableName 으로 라우팅
5) 테스트 작성
- Kotest StringSpec + Testcontainers(MongoDB)
- DebeziumMessage 래퍼를 직접 생성해 c/u/d 케이스를 시뮬레이션
- 멀티 프랜차이즈(FranchiseCode) 격리, 배열 추가/갱신/삭제, Null/극단값 등 엣지 케이스 검증
6) Order Hub Sink 모듈이 SinkBindingsConfig 에서 설정한 redis stream 구독, 메시지 수신 7) Debezium JSON을 파싱하여 테이블별 SinkConnector 로 라우팅 8) 각 커넥터가 Row(테이블 스키마) -> 도메인 Sink 문서로 매핑하고 MongoDB에 upsert/remove 수행 9) franchiseCode + mysqlId 를 문서의 sinkInfo 에 보관. 동일한 mysqlId 라도 가맹별로 격리 저장되어있음.
하다보니 생긴 중요한 이슈 및 팁
- redis stream 에 메시지 적재하는 속도보다 sink 처리 속도가 느려서 메모리 부족으로 OOM 발생함. 그래서 sink 시 binlog -> redis stream 적재 유량제어 할 수 있는 옵션을 사용하면 된다. 이거 설정할 땐
redis-cli CONFIG SET maxmemory 4096mb
로 redis 서버의 max memory limit 먼저 1차 설정해주고 난 뒤 적용하니까 잘됨. 그냥 기존 0 (unlimited) 상태에서 이 옵션만 주면 제대로 적용이 안됨. (관련 공식 레퍼런스나 올라온 이슈 찾아봐도 없어서 내 추측으로는 redis maxmemory unlimited 0 상태에서는 debezium server limit 옵션이 무시되는 것 같음.) maxmemory-policy 는 noeviction 으로 설정해두었음.debezium.sink.redis.memory.limit.mb=512
(512MB 로 제한)debezium.sink.redis.memory.threshold.percentage=85
(85% 도달 시점부터 유량제어 시작)
- debezium.sink.redis.memory.limit.mb=${DBZ_REDIS_MEMORY_LIMIT_MB:512}
- debezium.sink.redis.memory.threshold.percentage=85 reference
아래보면 설정이전에는 메모리 4G 찍고 key 제거. 설정이후 512 / 85% 로 제한되니까 메모리 400 대 언더로 유지되는거 확인.
- 어떤 stream 을 읽을 지 미리 하드코딩해서 세팅해놓기보단 특정 prefix( mysql_sink.* ) 로 시작하는 stream 을 동적으로 연결해서 읽도록 설정하는게 운영에 훨씬 유리함. 그래서 스케줄러로 매 초마다 stream key 목록을 조회해서 읽도록 설정. 또 stream read 커넥션 맺을 필요 없으니까 인메모리 리스트로 관리.
- stream 에 적재된 mysql 데이터를 읽는건 기존에 벌크로 읽고 있었지만 결국 읽어와서 mongodb upsert 를 하나씩 히고있었고 꽤 병목이였음. 단건처리 물론 속도는 n ms 이내 이지만 이게 누적되면 꽤 커지기 때문에 결국 병목이였음. 또한 개별 메세지 ack, delete, pending 관리또한 오버헤드라 전체적으로 하나의 청크단위처리가 필요했다. stream -> 100 read -> buffer 적재 -> 100개 차면 bullk mongodb flush() & redis bulk ack, delete. 하도록 수행. 100개 다 안차도 1초에 한번씩 flush() 하도록 스케줄러 추가. 이러니까 훨씬 안정적이고 빠름. 기존엔 단건 메세지 처리에 1.3 ~ 1.5 ms 소요됬는데 약 100개처리에 평균 27ms 정도니까 약 0.27 ms 거의 5배 빨라짐. 목표는 네트워크 라운드 트립 비용 줄었기였고 결과는 예상한 대로 나왔다. 아래는 AOP 붙여서 bulkHandle() 메서드 실행시간 찍은 로그.
2025-09-26T02:48:01.890+09:00 INFO 96441 --- [fooddash-domain] [ virtual-246] [\] c.f.o.connector.SinkConnector : bulkHandle called with 100 records for table menus
SinkConnector.bulkHandle(..) 실행 시간: 26.672 ms
2025-09-26T02:48:01.926+09:00 INFO 96441 --- [fooddash-domain] [MessageBroker-2] [\] c.f.o.connector.SinkConnector : bulkHandle called with 12 records for table menus
SinkConnector.bulkHandle(..) 실행 시간: 4.947 ms
2025-09-26T02:48:01.984+09:00 INFO 96441 --- [fooddash-domain] [ virtual-246] [\] c.f.o.connector.SinkConnector : bulkHandle called with 100 records for table menus
SinkConnector.bulkHandle(..) 실행 시간: 26.582 ms
2025-09-26T02:48:02.027+09:00 INFO 96441 --- [fooddash-domain] [MessageBroker-8] [\] c.f.o.connector.SinkConnector : bulkHandle called with 25 records for table menus
SinkConnector.bulkHandle(..) 실행 시간: 7.496 ms
2025-09-26T02:48:02.079+09:00 INFO 96441 --- [fooddash-domain] [ virtual-246] [\] c.f.o.connector.SinkConnector : bulkHandle called with 100 records for table menus
SinkConnector.bulkHandle(..) 실행 시간: 27.653 ms
2025-09-26T02:48:02.127+09:00 INFO 96441 --- [fooddash-domain] [MessageBroker-8] [\] c.f.o.connector.SinkConnector : bulkHandle called with 1 records for table menus
SinkConnector.bulkHandle(..) 실행 시간: 1.658 ms
- 종합하면 메모리 기반이기때문에 데이터 유실이 있을 수 있음. 유실 최소화하기위해 noeviction 정책 설정 및 mysql->redis 유량제어 설정. history 및 offset redis 에 기록되어 저장되기때문에 debezium server 재시작시점부터 다시 cdc 가능. 진짜 만약에 이 binlog 어디까지 읽었는지 offset 마저 날라가도 처음부터 스냅샷해도 됨. redis -> mongodb 에 멱등성이 보장되기 때문에.(만약 sink 된 문서의 수정이 있다면 수정됨=true 로 해놓고 ETL 모듈에서 upsert 할 때 true 면 업데이트 안하도록 처리하든 아니면 따로 read 전용 sink 문서로 관리하든 데이터 운영정책에 맞게 처리하면 됨.)
Appendix
새로운 테이블 추가 시 체크리스트
- Row 데이터 클래스 생성 (snake_case, null 허용, BigDecimal/OffsetDateTime 등)
- 도메인 문서(SinkClass 를 상속받는) 생성
싱크커넥터 구현 및 SinkBindingsConfig 에 @Bean 등록이거 다이나믹하게 sink_mysql.* prefix 로 시작하는 stream 을 읽도록 변경해서 유지보수 편하게 해놓음.- 비정규화 필요 시 OrderItem 참조(개별 upsert, toEntity).
- Kotest + Testcontainers 테스트 작성 (c/u/d, 멀티 프랜차이즈, 엣지 케이스)
- Debezium 스냅샷/시그널 전략 검토.(해당 테이블 기존 row 스냅샷 need? or not)
- 만약 need 하면 위 증분 스냅샷 전략 참고
- Debezium server application.properties 알아놓으면 좋은 규칙들
- line 끝에 주석달면 Regex 오류남.
- big decimal 은 debezium.source.decimal.handling.mode=string 으로 설정해야지 정확히 읽어옴.
- setup each table as
. - Debezium Server 설정에 맞춰 execute-snapshot 등 시그널을 사용해 특정 테이블 스냅샷 수행 가능
datetime debezium 내 변환
debezium server 은 mysql 시간 칼럼 타입에 따라 micro, milli, second 등 다르게 로딩함.
팀 내에서 datetime(2), datetime, timestamp 등 다양한 시간 타입을 쓰고있어서 debezium adaptive_seconds 모드로 변환적재했을 때 이게 micro, milli, second 중 뭔지 모르는 문제가 있음. 물론 기존 ddl 칼럼 타입을 보면 이게 뭔지 알 수 있지만 귀찮음. 2280 년까지는(정확히는 2286-11-20 17:46:40 UTC) 문제없는 방법이 있는데 그게 10자리 이하는 second, 11~13자리 milli, 14~16자리 micro 로 구분하는 방법임. 1970년 이전은 마이너스 표긴데 이것도 변환 가능함. 얘도 경계값이 초 단위가 1~10자리 적용되는 범위가 1653(1653-03-05 23:46:40 UTC)년도인데 사실 정말 과거의 정보를 가지고있어야한다는 요구사항이 없다면(db 에 이 값을 저장하고싶다가 아니면) 마이너스도 마찬가지로 abs 해서 적용하면 됨.
근데 정말 우리는 2280년 이후 값을 쓰지않나? 다시 보니까
9999년 12월 31일
값을 쓰고 있음. 그래서 그냥time.precision.mode=connect
로 설정하고 모든 타임 칼럼 타입 datetime(m) 으로 다 통일한 뒤 debezium 이 milliseconds 로 일괄설정하도록 유도하는게 좋을듯. 그러면 그냥 Long 타입 오면 생각안하고 milliseconds 판단 후 변환해주면 됨. (참고로 postgresql 일괄 microsecond 설정같은거 가능함. mysql 은 노가다 ㅜ)
본 모듈에 정의한 Rules
- sinkedAt 은 binlog event 발생시점이며, 본 모듈에서는 기존 문서의 sinkedAt 보다 빠른 sinkedAt 이 도착하면 그대로 sink 함. 이유는 select 쿼리 네트워크 비용을 줄이기 위함.
- sink 실패건은 1회에 한해 재시도함. 이후 성공여부 관계없이 pending & stream 에서 제거. 이후 정확한 데이터가 필요할 시 재처리 전략 변경 필요.
- sink 성공하면 stream 에서 제거하여 메모리 확보함.
- ex) Order 문서에 OrderItem 을 포함하도록 비정규화 시 Order 보다 OrderItem 이 먼저 생길 수 있음. 이를 고려해야함.
- Row 데이터 클래스는 최대한 null 허용으로 선언하여 이후 RDBMS DDL 변경에 유연하게 대응.