Skip to main content Link Menu Expand (external link) Document Search Copy Copied

created at 2023-01-04

INDEX

  1. DB sync 시 고려할점
  2. 어떻게 두 개의 master DB를 sync 해야할까?
    1. Kafka connector란?
    2. Kafka Source connector setting
      1. postgres wal_level 설정
      2. debezium connector 설정(source)
      3. connector kafka 등록(schema configuration)
      4. DB에 값 추가했을 때 실제로 Kafka로 흘러가는지 확인
    3. Kafka Sink connector setting
      1. postgres-kafka-source-connector 컨테이너 실행
      2. jdbc-connector 설치 및 삽입
      3. sink-connector configuration
    4. uni-directional DB sink 결과

현재 단방향까지 구현된 버전은 v3.1.0이며, 해당 코드의 다양한 버전은 아래의 깃허브에 존재한다.

앞서 연동에 추가할 부분이 있다. 채팅서비스를 두개로 실행하는데, 문제는 DB가 서로 독립이라는 점이다. 따라서 두 개중, 어느 DB가 INSERT/ALTER 등이 된다면 다른 DB도 같은 트랜젝션을 수행해야한다. 즉, 분산 DB이면서 서로 sync되도록 해야된다.

이렇게 DB를 따로 뗀 이유는 수평확장시키기 좋기 때문이다. 이러한 두 개의 DB 모두 master DB로 수행된다.

1. DB sync 시 고려할점

이 부분에서 고려할 점은 다음과 같다.

  • 백업 DB처럼 단반향 sync가 아닌 양방향 sync 를 해야하기 때문에 서로 맞물리는 무한루프를 조심해야한다.

    보통 Source/Target DB를 정하고 CDC(Change Data Capture)후 Sync를 하는경우가 대부분이다. 양방향 sync는 좀더 까다로운것 같다. 아래는 양방향에 있어 발생가능한 이슈 및 해결방법이다.

    Second, you’ll need to make sure to not propagate the same data change forth and back in an infinite loop. One way of doing so could for instance be an SMT(단일 메시지 변환) which you apply to both sources and which adds a Kafka Connect header property representing the “origin” of a given change. In your sink connector, you’d then add that origin as an additional column to your data as you update it. The source connector on that side would then have to be set up (e.g. again using an SMT) to ignore all the changes which originate from replication, as opposed to actual data changes e.g. done by a business application.

    Issue from https://groups.google.com/g/debezium/c/YS22DAgFXSc

1-1. 여기서 SMT란?

Single Message Transforms (SMTs) is a Kafka API that provides a simple interface for manipulating records as they flow through both the source and sink side of your data pipeline.

reference https://camel.apache.org/camel-kafka-connector/3.18.x/reference/transformers/index.html

위의 레퍼런스를 번역하자면 SMTs는 source connector or sink connector에서 레코드의 칼럼명이나 value 등을 변경시켜서 전달해주는 kafka 편의기능 rest api 이다. 아래는 다양한 변환방법이다.

Some common uses for transforms are:

reference https://www.confluent.io/blog/kafka-connect-single-message-transformation-tutorial-with-examples/?_ga=2.130915337.76772118.1672804235-1001218784.1670749352&_gac=1.191662808.1671423652.CjwKCAiAkfucBhBBEiwAFjbkr7Bq_5Npm8yLue-N4DKIv4hpPc44IdpcBYN3ITQzeAAdIkGX2Y5wJRoCBYIQAvD_BwE

2. 어떻게 두 개의 master DB를 sync 해야할까?

필자는 Kafka connector을 이용하여 싱크를 맞추려한다. 그러기 위해서는 Kafka connector에 대한 이해가 바탕이 되어야한다.

2-1. Kafka connector란?

Kafka connector의 기본적인 플로우는 다음과 같다. img

  1. RDB에 INSERT/UPDATE/ALTER 등 변경되는 트랜젝션이 실행되고 TXlog에 기록된다
  2. Kafka Connector은 이를 읽고(CDC) Kafka의 Topic에 삽입한다

이 때 이 connector을 우리는 source conenctor이라고 부른다. 그리고 Kafka ---> DB를 연결시켜주는 connector은 sink connector이라고 부른다.

일단 먼저 Kafka source connector을 설정해보고, kafdrop으로 실제 CDC되는지 관찰해보자

2-2. Kafka Source connector setting

Kafka soruce connector은 아래의 4가지 과정을 거쳐 설정 및 확인할 수 있다.

  1. postgres wal_level 설정
  2. debezium connector 설정
  3. connector kafka 등록
  4. DB에 값 추가했을 떄 실제로 Kafka로 흘러가는지 확인

2-2-1. postgres wal_level 설정

  • 먼저 wal-level이 무엇일까?

WAL은 Write-Ahead Logging의 약자로 트랜젝션로그에 어떤식으로 변경된 사항을 저장할 지 정하는 설정이다.

WAL은 크게 Logical과 Replica가 존재한다.

  • Logical level : 레코드 값이 변경되면, 변경된 레코드 전체가 저장된다.

    예시 wal_level = logical

       lsn    | xid |                                  data
    -----------+-----+------------------------------------------------------------------------
    0/703F738 | 583 | BEGIN 583
    0/703F738 | 583 | table public.mytab: INSERT: id[integer]:3 name[character varying]:'t3'
    0/703F838 | 583 | table public.mytab: INSERT: id[integer]:4 name[character varying]:'t4'
    0/703F8A8 | 583 | COMMIT 583
    0/703F8E0 | 584 | BEGIN 584
    0/703F8E0 | 584 | table public.mytab: DELETE: (no-tuple-data)
    0/703F948 | 584 | COMMIT 584
    

    statement + row 기반으로 저장된다고 볼 수 있다.

  • Replica level : 레코드 값이 변경되면, 레코드 내 변경된 값 부분만 트랜젝션 로그에 저장한다.

그리고 이러한 WAL level은 postgres에서는 기본적으로 replica로 설정되어있다(9.4버전 이후부터 logcal을 지원한다). 이 replica level은 debezium kafka connector에서는 지원하지 않는다. 즉, 레코드 전체값이 적혀있는 트랜젝션 로그(logical level)를 보고 CDC하도록 설정되어있다. 따라서 우리는 이 default replica level을 logical로 아래와같이 바꿔줘야한다.

  chatting-db-1:
    container_name: chatting-db-1
    image: postgres:12-alpine
    environment:
      - POSTGRES_PASSWORD=password
      - POSTGRES_USER=postgres
      - POSTGRES_DB=chat1
    expose:
      - "5433" # Publishes 5433 to other containers but NOT to host machine
    ports:
      - "5433:5433"
    volumes:
      - ./backups:/home/backups
    command: -c wal_level=logical -p 5433 # logical로 변경
    restart: always

  chatting-db-2:
    container_name: chatting-db-2
    image: postgres:12-alpine
    environment:
      - POSTGRES_PASSWORD=password
      - POSTGRES_USER=postgres
      - POSTGRES_DB=chat2
    expose:
      - "5434" # Publishes 5433 to other containers but NOT to host machine
    ports:
      - "5434:5434"
    volumes:
      - ./backups:/home/backups
    command: -c wal_level=logical -p 5434 # logical로 변경
    restart: always

2-2-2. debezium connector 설정

이제 DB설정은 끝났고, DB의 트랜젝션 로그의 변경사항을 관찰(CDC)하고 Kafka 토픽에 삽입해주는 connector을 컨테이너로 아래와 같이 띄울 것이다.

  # -------- postgres -> kafka source connector --------
  kafka-source-connector:
    image: debezium/connect:1.9
    container_name: postgres-kafka-source-connector
    ports:
      - 8083:8083
    environment:
      CONFIG_STORAGE_TOPIC: my_connect_configs
      OFFSET_STORAGE_TOPIC: my_connect_offsets
      STATUS_STORAGE_TOPIC: my_connect_statuses
      BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
    depends_on:
      - kafka1
      - kafka2
      - kafka3
      - zookeeper
      - chatting-db-2

2-2-3. connector kafka 등록

이렇게 DB, Connector-Kafka 을 띄웠다면 이제는 서로 연결해주어야할 차례이다. 우리는 Kafka connector가 지원하는 restapi를 통해 연결시켜줄 수 있다.

POST http://localhost:8083/connectors
{
    "name": "source-connector",
    "config": {
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "plugin.name": "pgoutput",
        "database.hostname": "chatting-db-2",
        "database.port": "5434",
        "database.user": "postgres",
        "database.password": "password",
        "database.dbname" : "chat2",
        "database.server.name": "dbserver5434",
        "transforms": "unwrap,addTopicPrefix",

        # message의 schema를 after 필드로만 설정
        # 이 부분을 설정하지 않는다면 source와 sink connector의 schema가 일치하지 않는다.
        # 즉, kafka로 메세지가 흘러갈 순 있지만 kafka에서 db로 sink가 진행되지 않을것이다.
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.addTopicPrefix.regex":"(.*)",
        "transforms.addTopicPrefix.replacement":"$1"
    }
}

2-2-4. DB에 값 추가했을 때 실제로 Kafka로 흘러가는지 확인

이제는 실제로 확인해 볼 차례이다. 우리는 다음과 같이 확인해볼것이다. This is final process.

  1. POST to server

     POST http://localhost:8080/chat/user
     {
       "userId":"aa",
       "userName":"황보규민"
     }
    
  2. In docker container log

     chatting-server-2         | 2023-01-05 05:55:07.829  INFO 1 --- [ad | producer-1] chatting.chat.web.ChatController         : 메세지 전송 성공 topic=log-user-add, offset=0, partition=2
    
  3. See kafka with Kafdrop

    img

    img

아래는 발생한 에러와 해결한 방법에 대해 정리했다.

2-3. Kafka Sink connector setting

자! 이제 DB->Kafka는 완료되었으니, Kafka->DB로 Sink connector을 구축해야한다. 다음의 동영상을 참고하자.

https://youtu.be/2bPx3hfKX04

위의 동영상은 confluent의 cloud로 connector을 설정하는 방법이다. 하지만 필자는 굳이 cloud로 설정할 필요없다고 생각했다. 따라서 source connector에서 사용하던 debezium connector 컨테이너에 jdbc-connector만 추가해서 사용하는 방법으로 가기로 하였다.

2-3-1. postgres-kafka-source-connector 컨테이너 실행

  kafka-source-connector:
    image: debezium/connect:1.9
    container_name: postgres-kafka-source-connector
    ports:
      - 8083:8083
    environment:
      CONFIG_STORAGE_TOPIC: __pg.source.config.storage
      OFFSET_STORAGE_TOPIC: __pg.source.offset.storage
      STATUS_STORAGE_TOPIC: __pg.source.status.storage
      PLUGIN_PATH: /kafka/connect # connector 플러그인 저장소 위치
      BOOTSTRAP_SERVERS: kafka1:9092,kafka2:9092,kafka3:9092
    depends_on:
      - kafka1
      - kafka2
      - kafka3
      - zookeeper
      - chatting-db-1
      - chatting-db-2

2-3-2. jdbc-connector 설치 및 삽입

아래와 같이 shell script를 작성해서 실행한다.

해당 shell script를 실행하기 전에 먼저 postgres-kafka-source-connector 컨테이너가 필요하다.

#!/bin/bash

echo "(step-1) confluent-hub cli 다운로드 및 압축해제"
curl -LO http://client.hub.confluent.io/confluent-hub-client-latest.tar.gz
mkdir confluent-etcs | tar -xvzf confluent-hub-client-latest.tar.gz -C confluent-etcs
#
echo "(step-2) confluent-hub 환경변수 설정"
echo $(pwd)
export CONFLUENT_HOME=$(pwd)/confluent-etcs
export PATH=$PATH:$CONFLUENT_HOME/bin
#
echo "(step-3) cli를 통한 jdbc connector 다운로드"
$CONFLUENT_HOME/bin/confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.6.0 --component-dir $(pwd)/confluent-etcs
#
echo "(step-4) debezium connector 컨테이너의 connector리스트에 삽입"
docker cp $(pwd)/confluent-etcs/confluentinc-kafka-connect-jdbc postgres-kafka-source-connector:/kafka/connect

echo "(step-5) debezium connector 컨테이너 재시작으로 loading new connector"
docker restart postgres-kafka-source-connector

이렇게 설치를 끝내고 GET http://localhost:8083/connector-plugins 을 전송하면 아래와 같이 정상적으로 JdbcSink/SourceConnector와 연동된 것을 확인할 수 있다.

[
    {
        "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "type": "sink",
        "version": "10.6.0"
    },
    {
        "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "type": "source",
        "version": "10.6.0"
    },
    ...
    {
        "class": "io.debezium.connector.postgresql.PostgresConnector",
        "type": "source",
        "version": "1.9.7.Final"
    },
    ...
]

2-3-3. sink-connector configuration

POST http://localhost:8083/connectors
{
    "name": "sink-connector",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "task.max" : 1,
        "topics": "dbserver5434.public.user_table",

        "connection.url": "jdbc:postgresql://chatting-db-1:5433/chat1",
        "connection.user":"postgres",
        "connection.password":"password",

        # table/column 자동생성 방지
        # 두개의 테이블이 이미 동일함
        "auto.create": "false",
        "auto.evolve": "false",
        "delete.enabled": "true",
        "insert.mode": "upsert",
        "pk.mode": "record_key",
        "tombstones.on.delete": "true",

        # schema일치 확인 및 payload 추출 과정
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "true",
        "transforms": "unwrap,addTopicPrefix",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "transforms.addTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.addTopicPrefix.regex":"(.*)",
        "transforms.addTopicPrefix.replacement":"$1",

        # 해당 테이블에 new row 삽입
        "table.name.format":"user_table",

        # 몇 개의 메세지를 읽고 sink할 것인지
        "batch.size": "1"
    }
}

최종적으로 Kafka에서 dbserver5434.public.user_table 토픽에 대한 schema는 아래와 같이 설정된다.

{
   "schema": {
      "type": "struct",
      "fields": [
         {
            "type": "string",
            "optional": false,
            "field": "user_id"
         },
         {
            "type": "string",
            "optional": true,
            "field": "user_name"
         },
         {
            "type": "string",
            "optional": true,
            "field": "user_status"
         }
      ],
      "optional": false,
      "name": "dbserver5434.public.user_table.Value"
   },
   "payload": {
      "user_id": "a",
      "user_name": "a",
      "user_status": "a"
   }
}

2-4. uni-directional DB sink 결과

일단 단방향 sink 설정은 이걸로 끝이 났다. 한번 확인해보자.

먼저 kafka-connector 컨테이너 실행 이후 install-jdbc-connector.sh 실행

gyuminhwangbo@Gyuminui-MacBookPro spring-chatting-server % sh ./install-jdbc-connector.sh

(step-1) confluent-hub cli 다운로드 및 압축해제
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 7584k  100 7584k    0     0  1131k      0  0:00:06  0:00:06 --:--:-- 1587k
mkdir: confluent-etcs: File exists
x share/doc/confluent-hub-client/notices/
x share/doc/confluent-hub-client/licenses/
x share/java/confluent-hub-client/jakarta.ws.rs-api-2.1.6.jar
...

(step-2) confluent-hub 환경변수 설정
/Users/gyuminhwangbo/study/spring-chatting-server

(step-3) cli를 통한 jdbc connector 다운로드
Running in a "--no-prompt" mode
Implicit acceptance of the license below:
Confluent Community License
https://www.confluent.io/confluent-community-license
Downloading component Kafka Connect JDBC 10.6.0, provided by Confluent, Inc. from Confluent Hub and installing into /Users/gyuminhwangbo/study/spring-chatting-server/confluent-etcs
Implicit confirmation of the question: Do you want to uninstall existing version 10.6.0?
...

(step-4) debezium connector 컨테이너의 connector리스트에 삽입

(step-5) debezium connector 컨테이너 재시작으로 loading new connector
postgres-kafka-source-connector

그리고 connector configuration upload를 하였다.

아래는 DB:5434과 연결된 chatServer의 api gateway(nginx)에 user_table의 insert api를 날렸을 때 터미널 상황이다.

# (1) kafka-connector가 postgresDB:5434의 Txlog에서 변경사항 감지(CDC)
postgres-kafka-source-connector | 2023-01-08 07:19:07,589 INFO   Postgres|dbserver5434|streaming  First LSN 'LSN{0/168BF58}' received   [io.debezium.connector.postgresql.connection.WalPositionLocator]
postgres-kafka-source-connector | 2023-01-08 07:19:07,589 INFO   Postgres|dbserver5434|streaming  WAL resume position 'LSN{0/168BF58}' discovered   [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource]
postgres-kafka-source-connector | 2023-01-08 07:19:07,593 INFO   Postgres|dbserver5434|streaming  Connection gracefully closed   [io.debezium.jdbc.JdbcConnection]
postgres-kafka-source-connector | 2023-01-08 07:19:07,600 INFO   Postgres|dbserver5434|streaming  Connection gracefully closed   [io.debezium.jdbc.JdbcConnection]

# (Additional) POST 반환값
nginx                     | 192.168.240.1 - - [08/Jan/2023:07:19:07 +0000] "POST /chat/user HTTP/1.1" 200 86 "-" "PostmanRuntime/7.29.2" "-"

# (2) PgOutput - postgres가 기본적으로 제공하는 replica설정을 앞서 우리가
# logical로 바꿧었다. 그리고 logical로 저장된 TXlog들을 디코딩해서 kafka에 밀어넣기 위해
# PgOutput이라는 모듈을 통해 logicalTX----(decoding-PgOutput)---->kafka를 수행한다.
postgres-kafka-source-connector | 2023-01-08 07:19:07,614 INFO   Postgres|dbserver5434|streaming  Initializing PgOutput logical decoder publication   [io.debezium.connector.postgresql.connection.PostgresReplicationConnection]
chatting-db-2             | 2023-01-08 07:19:07.620 UTC [82] LOG:  starting logical decoding for slot "debezium"
chatting-db-2             | 2023-01-08 07:19:07.620 UTC [82] DETAIL:  Streaming transactions committing after 0/168BDD8, reading WAL from 0/168BDD8.
chatting-db-2             | 2023-01-08 07:19:07.620 UTC [82] STATEMENT:  START_REPLICATION SLOT "debezium" LOGICAL 0/168BDD8 ("proto_version" '1', "publication_names" 'dbz_publication')
chatting-db-2             | 2023-01-08 07:19:07.620 UTC [82] LOG:  logical decoding found consistent point at 0/168BDD8
chatting-db-2             | 2023-01-08 07:19:07.620 UTC [82] DETAIL:  There are no running transactions.
chatting-db-2             | 2023-01-08 07:19:07.620 UTC [82] STATEMENT:  START_REPLICATION SLOT "debezium" LOGICAL 0/168BDD8 ("proto_version" '1', "publication_names" 'dbz_publication')
postgres-kafka-source-connector | 2023-01-08 07:19:07,636 INFO   Postgres|dbserver5434|streaming  Requested thread factory for connector PostgresConnector, id = dbserver5434 named = keep-alive   [io.debezium.util.Threads]
postgres-kafka-source-connector | 2023-01-08 07:19:07,637 INFO   Postgres|dbserver5434|streaming  Creating thread debezium-postgresconnector-dbserver5434-keep-alive   [io.debezium.util.Threads]
postgres-kafka-source-connector | 2023-01-08 07:19:07,638 INFO   Postgres|dbserver5434|streaming  Processing messages   [io.debezium.connector.postgresql.PostgresStreamingChangeEventSource]
postgres-kafka-source-connector | 2023-01-08 07:19:07,654 INFO   Postgres|dbserver5434|streaming  Message with LSN 'LSN{0/168BF58}' arrived, switching off the filtering   [io.debezium.connector.postgresql.connection.WalPositionLocator]
postgres-kafka-source-connector | 2023-01-08 07:19:08,264 INFO   ||  1 records sent during previous 00:01:18.579, last recorded offset: {transaction_id=null, lsn_proc=23641944, lsn=23641944, txId=501, ts_usec=1673162347279067}   [io.debezium.connector.common.BaseSourceTask]

# (Additional) 이건 그냥 별개로 connector 안거치고 바로 kafka에 삽입하는 별도의 pipeline.
chatting-server-2         | 2023-01-08 07:19:07.806  INFO 1 --- [ad | producer-1] chatting.chat.web.ChatController         : 메세지 전송 성공 topic=log-user-add, offset=0, partition=1

# (3) JDBC-Sink connector가 kafka-topic의 소비된 message의 last offset을 확인하고,
# 신규 데이터 발견, postgresql에 대한 dialect를 만들어서 쿼리를 실행시키는 과정
postgres-kafka-source-connector | 2023-01-08 07:19:08,287 INFO   ||  [Producer clientId=connector-producer-source-connector-0] Resetting the last seen epoch of partition dbserver5434.public.user_table-0 to 0 since the associated topicId changed from null to oCJqYdENQ1C2cPZeNEhtsw   [org.apache.kafka.clients.Metadata]
postgres-kafka-source-connector | 2023-01-08 07:19:08,313 INFO   ||  Attempting to open connection #1 to PostgreSql   [io.confluent.connect.jdbc.util.CachedConnectionProvider]
postgres-kafka-source-connector | 2023-01-08 07:19:08,412 INFO   ||  Maximum table name length for database is 63 bytes   [io.confluent.connect.jdbc.dialect.PostgreSqlDatabaseDialect]
postgres-kafka-source-connector | 2023-01-08 07:19:08,412 INFO   ||  JdbcDbWriter Connected   [io.confluent.connect.jdbc.sink.JdbcDbWriter]
postgres-kafka-source-connector | 2023-01-08 07:19:08,430 INFO   ||  Checking PostgreSql dialect for existence of TABLE "user_table"   [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect]
postgres-kafka-source-connector | 2023-01-08 07:19:08,440 INFO   ||  Using PostgreSql dialect TABLE "user_table" present   [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect]
postgres-kafka-source-connector | 2023-01-08 07:19:08,456 INFO   ||  Checking PostgreSql dialect for type of TABLE "user_table"   [io.confluent.connect.jdbc.dialect.GenericDatabaseDialect]
postgres-kafka-source-connector | 2023-01-08 07:19:08,460 INFO   ||  Setting metadata for table "user_table" to Table{name='"user_table"', type=TABLE columns=[Column{'user_name', isPrimaryKey=false, allowsNull=true, sqlType=varchar}, Column{'user_id', isPrimaryKey=true, allowsNull=false, sqlType=varchar}, Column{'user_status', isPrimaryKey=false, allowsNull=true, sqlType=varchar}]}   [io.confluent.connect.jdbc.util.TableDefinitions]

드디어! 단방향 설정이 끝났다. 이제는 양방향이 남았다. 포스팅이 너무 길어져서 양방향 설계는 다음 포스팅에서 설계하겠다. 아래는 고려하는 양방향 DB sync의 아키텍처이다.

img

Reference