created at 2023-01-04
INDEX
- DB sync 시 고려할점
- 어떻게 두 개의 master DB를 sync 해야할까?
- Kafka connector란?
- Kafka Source connector setting
- postgres wal_level 설정
- debezium connector 설정(source)
- connector kafka 등록(schema configuration)
- DB에 값 추가했을 때 실제로 Kafka로 흘러가는지 확인
- Kafka Sink connector setting
- postgres-kafka-source-connector 컨테이너 실행
- jdbc-connector 설치 및 삽입
- sink-connector configuration
- uni-directional DB sink 결과
현재 단방향까지 구현된 버전은 v3.1.0이며, 해당 코드의 다양한 버전은 아래의 깃허브에 존재한다.
앞서 연동에 추가할 부분이 있다. 채팅서비스를 두개로 실행하는데, 문제는 DB가 서로 독립이라는 점이다. 따라서 두 개중, 어느 DB가 INSERT/ALTER 등이 된다면 다른 DB도 같은 트랜젝션을 수행해야한다. 즉, 분산 DB이면서 서로 sync되도록 해야된다.
이렇게 DB를 따로 뗀 이유는 수평확장시키기 좋기 때문이다. 이러한 두 개의 DB 모두 master DB로 수행된다.
1. DB sync 시 고려할점
이 부분에서 고려할 점은 다음과 같다.
- 백업 DB처럼 단반향 sync가 아닌 양방향 sync 를 해야하기 때문에 서로 맞물리는 무한루프를 조심해야한다.
보통 Source/Target DB를 정하고 CDC(Change Data Capture)후 Sync를 하는경우가 대부분이다. 양방향 sync는 좀더 까다로운것 같다. 아래는 양방향에 있어 발생가능한 이슈 및 해결방법이다.
Second, you’ll need to make sure to not propagate the same data change forth and back in an infinite loop. One way of doing so could for instance be an SMT(단일 메시지 변환) which you apply to both sources and which adds a Kafka Connect header property representing the “origin” of a given change. In your sink connector, you’d then add that origin as an additional column to your data as you update it. The source connector on that side would then have to be set up (e.g. again using an SMT) to ignore all the changes which originate from replication, as opposed to actual data changes e.g. done by a business application.
Issue from https://groups.google.com/g/debezium/c/YS22DAgFXSc
1-1. 여기서 SMT란?
Single Message Transforms (SMTs) is a Kafka API that provides a simple interface for manipulating records as they flow through both the source and sink side of your data pipeline.
reference https://camel.apache.org/camel-kafka-connector/3.18.x/reference/transformers/index.html
위의 레퍼런스를 번역하자면 SMTs는 source connector
or sink connector
에서 레코드의 칼럼명이나 value 등을 변경시켜서 전달해주는 kafka 편의기능 rest api 이다. 아래는 다양한 변환방법이다.
Some common uses for transforms are:
- Renaming fields( 칼럼명 재정의 )
- Masking values( 특정 칼럼의 value를 valid null로 만듬 ex) {value} to 0 or “” or false )
- Routing records to topics based on a value( cloud 에서는 안됨 )
- Converting or inserting timestamps into the record
- Manipulating keys, like setting a key from a field’s value( 카프카는 key를 통해 원하는 메세지를 가져올 수 있다. 이 방법은 특정 칼럼의 value를 key로 변환해주는 방법이다 )
2. 어떻게 두 개의 master DB를 sync 해야할까?
필자는 Kafka connector을 이용하여 싱크를 맞추려한다. 그러기 위해서는 Kafka connector에 대한 이해가 바탕이 되어야한다.
2-1. Kafka connector란?
Kafka connector의 기본적인 플로우는 다음과 같다.
- RDB에 INSERT/UPDATE/ALTER 등 변경되는 트랜젝션이 실행되고 TXlog에 기록된다
- Kafka Connector은 이를 읽고(CDC) Kafka의 Topic에 삽입한다
이 때 이 connector을 우리는 source conenctor이라고 부른다. 그리고 Kafka ---> DB
를 연결시켜주는 connector은 sink connector이라고 부른다.
일단 먼저 Kafka source connector을 설정해보고, kafdrop으로 실제 CDC되는지 관찰해보자
2-2. Kafka Source connector setting
Kafka soruce connector은 아래의 4가지 과정을 거쳐 설정 및 확인할 수 있다.
- postgres wal_level 설정
- debezium connector 설정
- connector kafka 등록
- DB에 값 추가했을 떄 실제로 Kafka로 흘러가는지 확인
2-2-1. postgres wal_level 설정
- 먼저 wal-level이 무엇일까?
WAL은 Write-Ahead Logging의 약자로 트랜젝션로그에 어떤식으로 변경된 사항을 저장할 지 정하는 설정이다.
WAL은 크게 Logical과 Replica가 존재한다.
- Logical level : 레코드 값이 변경되면, 변경된 레코드 전체가 저장된다.
lsn | xid | data -----------+-----+------------------------------------------------------------------------ 0/703F738 | 583 | BEGIN 583 0/703F738 | 583 | table public.mytab: INSERT: id[integer]:3 name[character varying]:'t3' 0/703F838 | 583 | table public.mytab: INSERT: id[integer]:4 name[character varying]:'t4' 0/703F8A8 | 583 | COMMIT 583 0/703F8E0 | 584 | BEGIN 584 0/703F8E0 | 584 | table public.mytab: DELETE: (no-tuple-data) 0/703F948 | 584 | COMMIT 584
statement + row 기반으로 저장된다고 볼 수 있다.
- Replica level : 레코드 값이 변경되면, 레코드 내 변경된 값 부분만 트랜젝션 로그에 저장한다.
그리고 이러한 WAL level은 postgres에서는 기본적으로 replica로 설정되어있다(9.4버전 이후부터 logcal을 지원한다). 이 replica level은 debezium kafka connector에서는 지원하지 않는다. 즉, 레코드 전체값이 적혀있는 트랜젝션 로그(logical level)를 보고 CDC하도록 설정되어있다. 따라서 우리는 이 default replica level을 logical로 아래와같이 바꿔줘야한다.
chatting-db-1:
container_name: chatting-db-1
image: postgres:12-alpine
environment:
- POSTGRES_PASSWORD=password
- POSTGRES_USER=postgres
- POSTGRES_DB=chat1
expose:
- "5433" # Publishes 5433 to other containers but NOT to host machine
ports:
- "5433:5433"
volumes:
- ./backups:/home/backups
command: -c wal_level=logical -p 5433 # logical로 변경
restart: always
chatting-db-2:
container_name: chatting-db-2
image: postgres:12-alpine
environment:
- POSTGRES_PASSWORD=password
- POSTGRES_USER=postgres
- POSTGRES_DB=chat2
expose:
- "5434" # Publishes 5433 to other containers but NOT to host machine
ports:
- "5434:5434"
volumes:
- ./backups:/home/backups
command: -c wal_level=logical -p 5434 # logical로 변경
restart: always
2-2-2. debezium connector 설정
이제 DB설정은 끝났고, DB의 트랜젝션 로그의 변경사항을 관찰(CDC)하고 Kafka 토픽에 삽입해주는 connector을 컨테이너로 아래와 같이 띄울 것이다.
# -------- postgres -> kafka source connector --------
kafka-source-connector:
image: debezium/connect:1.9
container_name: postgres-kafka-source-connector
ports:
- 8083:8083
environment:
CONFIG_STORAGE_TOPIC: my_connect_configs
OFFSET_STORAGE_TOPIC: my_connect_offsets
STATUS_STORAGE_TOPIC: my_connect_statuses
BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
depends_on:
- kafka1
- kafka2
- kafka3
- zookeeper
- chatting-db-2
2-2-3. connector kafka 등록
이렇게 DB, Connector-Kafka 을 띄웠다면 이제는 서로 연결해주어야할 차례이다. 우리는 Kafka connector가 지원하는 restapi를 통해 연결시켜줄 수 있다.
POST http://localhost:8083/connectors
{
"name": "source-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "chatting-db-2",
"database.port": "5434",
"database.user": "postgres",
"database.password": "password",
"database.dbname" : "chat2",
"database.server.name": "dbserver5434",
"transforms": "unwrap,addTopicPrefix",
# message의 schema를 after 필드로만 설정
# 이 부분을 설정하지 않는다면 source와 sink connector의 schema가 일치하지 않는다.
# 즉, kafka로 메세지가 흘러갈 순 있지만 kafka에서 db로 sink가 진행되지 않을것이다.
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"$1"
}
}
2-2-4. DB에 값 추가했을 때 실제로 Kafka로 흘러가는지 확인
이제는 실제로 확인해 볼 차례이다. 우리는 다음과 같이 확인해볼것이다. This is final process.
POST to server
POST http://localhost:8080/chat/user { "userId":"aa", "userName":"황보규민" }
In docker container log
chatting-server-2 | 2023-01-05 05:55:07.829 INFO 1 --- [ad | producer-1] chatting.chat.web.ChatController : 메세지 전송 성공 topic=log-user-add, offset=0, partition=2
See kafka with Kafdrop
아래는 발생한 에러와 해결한 방법에 대해 정리했다.
이슈
Connector configuration is invalid and contains the following 1 error(s) Error while validating connector config: Connection to localhost:5434 refused
이슈 해결방법 정리 : https://github.com/ghkdqhrbals/spring-chatting-server/issues/1
2-3. Kafka Sink connector setting
자! 이제 DB->Kafka는 완료되었으니, Kafka->DB로 Sink connector을 구축해야한다. 다음의 동영상을 참고하자.
위의 동영상은 confluent의 cloud로 connector을 설정하는 방법이다. 하지만 필자는 굳이 cloud로 설정할 필요없다고 생각했다. 따라서 source connector에서 사용하던 debezium connector 컨테이너에 jdbc-connector만 추가해서 사용하는 방법으로 가기로 하였다.
2-3-1. postgres-kafka-source-connector 컨테이너 실행
kafka-source-connector:
image: debezium/connect:1.9
container_name: postgres-kafka-source-connector
ports:
- 8083:8083
environment:
CONFIG_STORAGE_TOPIC: __pg.source.config.storage
OFFSET_STORAGE_TOPIC: __pg.source.offset.storage
STATUS_STORAGE_TOPIC: __pg.source.status.storage
PLUGIN_PATH: /kafka/connect # connector 플러그인 저장소 위치
BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
depends_on:
- kafka1
- kafka2
- kafka3
- zookeeper
- chatting-db-1
- chatting-db-2
2-3-2. jdbc-connector 설치 및 삽입
아래와 같이 shell script를 작성해서 실행한다.
해당 shell script를 실행하기 전에 먼저
postgres-kafka-source-connector
컨테이너가 필요하다.
#!/bin/bash
echo "(step-1) confluent-hub cli 다운로드 및 압축해제"
curl -LO http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
mkdir confluent-etcs | tar -xvzf confluent-hub-client-latest.tar.gz -C confluent-etcs
#
echo "(step-2) confluent-hub 환경변수 설정"
echo $(pwd)
export CONFLUENT_HOME=$(pwd)/confluent-etcs
export PATH=$PATH:$CONFLUENT_HOME/bin
#
echo "(step-3) cli를 통한 jdbc connector 다운로드"
$CONFLUENT_HOME/bin/confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.6.0 --component-dir $(pwd)/confluent-etcs
#
echo "(step-4) debezium connector 컨테이너의 connector리스트에 삽입"
docker cp $(pwd)/confluent-etcs/confluentinc-kafka-connect-jdbc postgres-kafka-source-connector:/kafka/connect
echo "(step-5) debezium connector 컨테이너 재시작으로 loading new connector"
docker restart postgres-kafka-source-connector
이렇게 설치를 끝내고 GET http://localhost:8083/connector-plugins
을 전송하면 아래와 같이 정상적으로 JdbcSink/SourceConnector와 연동된 것을 확인할 수 있다.
[
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "10.6.0"
},
{
"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source",
"version": "10.6.0"
},
...
{
"class": "io.debezium.connector.postgresql.PostgresConnector",
"type": "source",
"version": "1.9.7.Final"
},
...
]
2-3-3. sink-connector configuration
POST http://localhost:8083/connectors
{
"name": "sink-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"task.max" : 1,
"topics": "dbserver5434.public.user_table",
"connection.url": "jdbc:postgresql://chatting-db-1:5433/chat1",
"connection.user":"postgres",
"connection.password":"password",
# table/column 자동생성 방지
# 두개의 테이블이 이미 동일함
"auto.create": "false",
"auto.evolve": "false",
"delete.enabled": "true",
"insert.mode": "upsert",
"pk.mode": "record_key",
"tombstones.on.delete": "true",
# schema일치 확인 및 payload 추출 과정
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"transforms": "unwrap,addTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.addTopicPrefix.regex":"(.*)",
"transforms.addTopicPrefix.replacement":"$1",
# 해당 테이블에 new row 삽입
"table.name.format":"user_table",
# 몇 개의 메세지를 읽고 sink할 것인지
"batch.size": "1"
}
}
최종적으로 Kafka에서 dbserver5434.public.user_table
토픽에 대한 schema는 아래와 같이 설정된다.
{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "user_id"
},
{
"type": "string",
"optional": true,
"field": "user_name"
},
{
"type": "string",
"optional": true,
"field": "user_status"
}
],
"optional": false,
"name": "dbserver5434.public.user_table.Value"
},
"payload": {
"user_id": "a",
"user_name": "a",
"user_status": "a"
}
}
2-4. uni-directional DB sink 결과
일단 단방향 sink 설정은 이걸로 끝이 났다. 한번 확인해보자.
먼저 kafka-connector 컨테이너 실행 이후 install-jdbc-connector.sh 실행
gyuminhwangbo@Gyuminui-MacBookPro spring-chatting-server % sh ./install-jdbc-connector.sh
(step-1) confluent-hub cli 다운로드 및 압축해제
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 7584k 100 7584k 0 0 1131k 0 0:00:06 0:00:06 --:--:-- 1587k
mkdir: confluent-etcs: File exists
x share/doc/confluent-hub-client/notices/
x share/doc/confluent-hub-client/licenses/
x share/java/confluent-hub-client/jakarta.ws.rs-api-2.1.6.jar
...
(step-2) confluent-hub 환경변수 설정
/Users/gyuminhwangbo/study/spring-chatting-server
(step-3) cli를 통한 jdbc connector 다운로드
Running in a "--no-prompt" mode
Implicit acceptance of the license below:
Confluent Community License
https://www.confluent.io/confluent-community-license
Downloading component Kafka Connect JDBC 10.6.0, provided by Confluent, Inc. from Confluent Hub and installing into /Users/gyuminhwangbo/study/spring-chatting-server/confluent-etcs
Implicit confirmation of the question: Do you want to uninstall existing version 10.6.0?
...
(step-4) debezium connector 컨테이너의 connector리스트에 삽입
(step-5) debezium connector 컨테이너 재시작으로 loading new connector
postgres-kafka-source-connector
그리고 connector configuration upload를 하였다.
아래는 DB:5434과 연결된 chatServer의 api gateway(nginx)에 user_table의 insert api를 날렸을 때 터미널 상황이다.
# (1) kafka-connector가 postgresDB:5434의 Txlog에서 변경사항 감지(CDC)
postgres-kafka-source-connector | 2023-01-08 07:19:07,589 INFO Postgres|dbserver5434|streaming First LSN 'LSN{0/168BF58}' received [io.debezium.connector.postgresql.connection.WalPositionLocator]
postgres-kafka-source-connector | 2023-01-08 07:19:07,589 INFO Postgres|dbserver5434|streaming WAL resume position 'LSN{0/168BF58}' discovered [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource]
postgres-kafka-source-connector | 2023-01-08 07:19:07,593 INFO Postgres|dbserver5434|streaming Connection gracefully closed [io.debezium.jdbc.JdbcConnection]
postgres-kafka-source-connector | 2023-01-08 07:19:07,600 INFO Postgres|dbserver5434|streaming Connection gracefully closed [io.debezium.jdbc.JdbcConnection]
# (Additional) POST 반환값
nginx | 192.168.240.1 - - [08/Jan/2023:07:19:07 +0000] "POST /chat/user HTTP/1.1" 200 86 "-" "PostmanRuntime/7.29.2" "-"
# (2) PgOutput - postgres가 기본적으로 제공하는 replica설정을 앞서 우리가
# logical로 바꿧었다. 그리고 logical로 저장된 TXlog들을 디코딩해서 kafka에 밀어넣기 위해
# PgOutput이라는 모듈을 통해 logicalTX----(decoding-PgOutput)---->kafka를 수행한다.
postgres-kafka-source-connector | 2023-01-08 07:19:07,614 INFO Postgres|dbserver5434|streaming Initializing PgOutput logical decoder publication [io.debezium.connector.postgresql.connection.PostgresReplicationConnection]
chatting-db-2 | 2023-01-08 07:19:07.620 UTC [82] LOG: starting logical decoding for slot "debezium"
chatting-db-2 | 2023-01-08 07:19:07.620 UTC [82] DETAIL: Streaming transactions committing after 0/168BDD8, reading WAL from 0/168BDD8.
chatting-db-2 | 2023-01-08 07:19:07.620 UTC [82] STATEMENT: START_REPLICATION SLOT "debezium" LOGICAL 0/168BDD8 ("proto_version" '1', "publication_names" 'dbz_publication')
chatting-db-2 | 2023-01-08 07:19:07.620 UTC [82] LOG: logical decoding found consistent point at 0/168BDD8
chatting-db-2 | 2023-01-08 07:19:07.620 UTC [82] DETAIL: There are no running transactions.
chatting-db-2 | 2023-01-08 07:19:07.620 UTC [82] STATEMENT: START_REPLICATION SLOT "debezium" LOGICAL 0/168BDD8 ("proto_version" '1', "publication_names" 'dbz_publication')
postgres-kafka-source-connector | 2023-01-08 07:19:07,636 INFO Postgres|dbserver5434|streaming Requested thread factory for connector PostgresConnector, id = dbserver5434 named = keep-alive [io.debezium.util.Threads]
postgres-kafka-source-connector | 2023-01-08 07:19:07,637 INFO Postgres|dbserver5434|streaming Creating thread debezium-postgresconnector-dbserver5434-keep-alive [io.debezium.util.Threads]
postgres-kafka-source-connector | 2023-01-08 07:19:07,638 INFO Postgres|dbserver5434|streaming Processing messages [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource]
postgres-kafka-source-connector | 2023-01-08 07:19:07,654 INFO Postgres|dbserver5434|streaming Message with LSN 'LSN{0/168BF58}' arrived, switching off the filtering [io.debezium.connector.postgresql.connection.WalPositionLocator]
postgres-kafka-source-connector | 2023-01-08 07:19:08,264 INFO || 1 records sent during previous 00:01:18.579, last recorded offset: {transaction_id=null, lsn_proc=23641944, lsn=23641944, txId=501, ts_usec=1673162347279067} [io.debezium.connector.common.BaseSourceTask]
# (Additional) 이건 그냥 별개로 connector 안거치고 바로 kafka에 삽입하는 별도의 pipeline.
chatting-server-2 | 2023-01-08 07:19:07.806 INFO 1 --- [ad | producer-1] chatting.chat.web.ChatController : 메세지 전송 성공 topic=log-user-add, offset=0, partition=1
# (3) JDBC-Sink connector가 kafka-topic의 소비된 message의 last offset을 확인하고,
# 신규 데이터 발견, postgresql에 대한 dialect를 만들어서 쿼리를 실행시키는 과정
postgres-kafka-source-connector | 2023-01-08 07:19:08,287 INFO || [Producer clientId=connector-producer-source-connector-0] Resetting the last seen epoch of partition dbserver5434.public.user_table-0 to 0 since the associated topicId changed from null to oCJqYdENQ1C2cPZeNEhtsw [org.apache.kafka.clients.Metadata]
postgres-kafka-source-connector | 2023-01-08 07:19:08,313 INFO || Attempting to open connection #1 to PostgreSql [io.confluent.connect.jdbc.util.CachedConnectionProvider]
postgres-kafka-source-connector | 2023-01-08 07:19:08,412 INFO || Maximum table name length for database is 63 bytes [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect]
postgres-kafka-source-connector | 2023-01-08 07:19:08,412 INFO || JdbcDbWriter Connected [io.confluent.connect.jdbc.sink.JdbcDbWriter]
postgres-kafka-source-connector | 2023-01-08 07:19:08,430 INFO || Checking PostgreSql dialect for existence of TABLE "user_table" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect]
postgres-kafka-source-connector | 2023-01-08 07:19:08,440 INFO || Using PostgreSql dialect TABLE "user_table" present [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect]
postgres-kafka-source-connector | 2023-01-08 07:19:08,456 INFO || Checking PostgreSql dialect for type of TABLE "user_table" [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect]
postgres-kafka-source-connector | 2023-01-08 07:19:08,460 INFO || Setting metadata for table "user_table" to Table{name='"user_table"', type=TABLE columns=[Column{'user_name', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'user_id', isPrimaryKey=true, allowsNull=false, sqlType=varchar}, Column{'user_status', isPrimaryKey=false, allowsNull=true, sqlType=varchar}]} [io.confluent.connect.jdbc.util.TableDefinitions]
드디어! 단방향 설정이 끝났다. 이제는 양방향이 남았다. 포스팅이 너무 길어져서 양방향 설계는 다음 포스팅에서 설계하겠다. 아래는 고려하는 양방향 DB sync의 아키텍처이다.
Reference
- https://www.confluent.io/blog/sync-databases-and-remove-silos-with-kafka-cdc/
- One way DB sync
- Bi-directional DB sync
- https://medium.com/event-driven-utopia/configuring-debezium-to-capture-postgresql-changes-with-docker-compose-224742ca5372
- https://stackoverflow.com/questions/59978213/debezium-could-not-access-file-decoderbufs-using-postgres-11-with-default-plug
- source connector configuration 문법 with debezium Postgres connector
- Debezium을 이용 source/sink connector 설정