데이터 싱크를 위한 MSK Connect 도입 (feat : CDC 뽀개기)

bibi_papa
펫프렌즈 기술블로그
37 min readSep 22, 2023

--

들어가기전에

반갑습니다.
펫프렌즈 인터널서비스팀 백엔드 개발자 ‘민’입니다.

저희 팀은 주문 이후 여정인 배송 및 물류 서비스를 주요 업무로 담당하고 있으며 이외에도 발주, 거래처, 물류창고 및 재고 관리 등 메인 어플리케이션 뒷단에서 눈에 띄지 않는 다양한 시스템들을 관리하고 있습니다.

이번글에서는 WMS(창고 관리 시스템) 내재화 프로젝트에 도입된 MSK Connect와 Debezium을 활용한 데이터 동기화 파이프라인 구축절차에 대해 공유드리고자 합니다.

(부제 : 민과 함께하는 CDC 뿌셔버리기~ )

안녕? 난 민이라고 해~

도입배경

펫프렌즈는 현재 모놀리식 아키텍처에서 MSA로의 전환 작업을 진행 중입니다. 이 변화의 첫 번째 단계로 이벤트 소싱 패턴을 적용하기 위해 카프카 (MSK) 사용을 결정하였고 첫 도입은 제가 입사 후 처음 참여한 WMS 내재화 프로젝트에서 시작되었습니다. (벌써 1년이..😢)

WMS 내재화 프로젝트는 외부 솔루션으로 사용되던 WMS 시스템을 회사 내부 시스템으로 이관하는 작업으로, 기존 Legacy DB에 저장되어 있던 입/출고 및 재고와 같은 데이터를 신규 DB로 이관하고 관리하는 것을 포함했습니다.

새로운 도메인이 분리되었지만, WMS는 여전히 메인 어플리케이션(펫프렌즈)에서 관리중인 상품, 거래처 정보와 같은 기준정보 데이터를 필요로 했으며, 이 데이터들을 동기화하기 위한 데이터 파이프라인이 필요했습니다.

이벤트 소싱 패턴을 위해 도입된 MSK를 적극 활용하고, 빠른 구축 및 유연한 장애대처를 위해 Kafka 기반인 Debezium을 사용하여 데이터 동기화 파이프라인을 구축하기로 결정하였습니다.

MSK Connect가 뭔데?

Apache Kafka로 데이터를 쉽게 스트리밍할 수 있도록 해주는 MSK의 기능입니다. Kafka Connect만 사용하여 CDC환경을 구축하면 상당히 많은 인프라 수작업을 필요로 하는데, MSK Connect를 사용하면 AWS Console상에서 쉽게 Kafka Connect를 구성 및 배포 할 수 있습니다.

Connector는 아래와 같이 두 가지 종류로 나뉩니다.

Source Connector

데이터를 생산하여 클러스터로 데이터를 보내는 역할을 합니다. (Producer)

Sink Connector

클러스터로 부터 데이터를 수신하여 소비하는 역할을 합니다. (Consumer)

* 이미지 출처 : confluent

Debezium은 또 뭐야?

kafka Connect를 기반으로 개발된 오픈 소스 분산 CDC도구 입니다.

자세하게는 Kafka Conncet에 설치되는 플러그인으로써 로그를 기반으로 연결된 데이터베이스의 모든 행 수준 변경사항을 추적 및 감지하여 메시지 (Payload)를 생산하는 역할을 합니다.

debezium이 지원하는 dbms 목록은 공식 홈페이지에서 확인 가능합니다.

* 이미지 출처 : debezium

서론은 여기까지.. 자 그럼 구축절차를 살펴보도록 하겠습니다.

MSK Connect를 활용하여 CDC 구축하기

전체적인 구축절차는 Amazon 공식 문서를 참고하였습니다.
내용이 워낙.. 방대하여 모든 절차에 대해 상세하게 다루지는 않겠습니다.😅 설명이 부족하다고 느껴지는 부분은 공식 문서를 참고해주시기 바랍니다. 🙏

이하 설명드릴 CDC 구축 환경은 아래와 같습니다.

Infra & Software

1. Amazon RDS (MariaDB : v 10.3)
2. Amazon MSK (kafka : v 2.7.2)
3. Amazon S3
4. Debezium-Mysql (v 1.9.5) + MSK Connect 👉 Source Connector
5. Spring Boot (v 2.3.12) 👉 Sink Connector

테스트에 사용할 테이블은 아래와 같습니다.

create table CDC_TEST
(
id bigint auto_increment primary key,
code varchar(20) not null comment '코드',
request_day date null comment '요청일',
created_at datetime null comment '등록일시',
created_user_id bigint null comment '생성자',
updated_user_id bigint null comment '수정자'
)
comment 'cdc 테스트 테이블';

#1. RDS 설정하기

Debezium은 데이터베이스의 로그파일을 기반으로 변경사항을 감지합니다.
MySql은 binary log 파일에 모든 데이터베이스 변경사항이 기록되는데 Debezium이 해당 파일을 읽을 수 있게 파라미터 그룹을 변경해야 합니다.

RDS 파라미터 그룹 설정

  • RDS > 인스턴스 > 구성 > 파라미터 그룹 > 파라미터 편집
  • binlog_format : ROW
  • 파라미터 그룹 변경후 RDS 재구동 필요

DB 계정 생성

Debezium에서 사용할 DB 계정을 생성합니다.
Debezium이 필요한 권한 및 이유는 공식 홈페이지에서 자세히 확인 가능합니다. 만약 해당하는 모든 권한을 소유한 계정이 있고, 별도 CDC 계정 생성을 원치 않는다면 생략해도 괜찮습니다.

  • 계정 생성 및 권한 부여
# 계정 생성
CREATE USER '아이디'@'%' IDENTIFIED BY '비밀번호';
# 권한 부여
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO '아이디'@'%';
# 접속 허용
GRANT EXECUTE ON *.* TO '아이디'@'%';
# 권한 적용
FLUSH PRIVILEGES;

#2. MSK Connect 설정하기

Debezium 다운로드 및 S3 업로드

  • 공식 홈페이지에서 Debezium-mysql을 다운받습니다.
  • 플러그인을 S3에 업로드합니다.
aws s3 cp debezium-mysql-1.9.5.zip s3://버킷명

MSK Connect 사용자 지정 플러그인 생성

  • MSK > MSK Connect > 사용자 지정 플러그인 > 플러그인 생성
  • 업로드한 s3 버킷에서 플러그인 파일 선택
  • 이름 입력 후 [사용자 지정 플러그인 생성] 버튼 클릭

MSK Connect 작업자 구성 생성

  • MSK > MSK Connect > 작업자 구성 > 작업자 구성 생성
  • 이름 및 작업자 구성 입력 후 [작업자 구성 생성] 버튼 클릭
  • 작업자 구성 예시
# 메시지 컨버터
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

# __amazon_msk_connect_offsets 자동생성 토픽
# offset : bin.log 오프셋 기록 토픽 파티션 및 리플리케이션 수
offset.storage.partitions=3
offset.storage.replication.factor=3

# __amazon_msk_connect_configs 자동생성 토픽
# confg : 커넥터 내부 구성정보 저장 토픽 리플리케이션 수 (파티션은 강제 1고정, 변경불가)
config.storage.replication.factor=3

#__amazon_msk_connect_status 자동생성 토픽
# status : 작업 구성상태 변경 기록 토픽 파티션 및 리플리케이션 수
status.storage.partitions=3
status.storage.replication.factor=3

#3. Source Connector 생성

사용자 플러그인 선택

  • MSK > MSK Connect > 커넥터 > 커넥터 생성
  • 업로드한 Debezium 플러그인 선택

커넥터 속성 정보 입력

  • 기본 정보 입력
  • MSK Cluster 및 인증 유형 선택
  • 커넥터 용량 설정
  • 작업자 구성 선택
  • 엑세스 권한 선택
  • ⚠️ 커넥터 구성 정보 입력 ⚠️

Debezium이 플러그인 형태로 제공되는 오픈소스이다보니, 대부분의 핵심 기능들이 Properties를 통해 제공됩니다. 대부분의 트러블 슈팅도 해당 설정값 변경을 통해 이루어지니 유의깊게 살펴봐야 하는 부분입니다! 모든 설정값에 대한 설명은 공식 페이지에서 확인 가능하오니 참고 부탁드립니다.😄

  • 커넥터 구성정보 예시
connector.class=io.debezium.connector.mysql.MySqlConnector
# Debezium mysql은 최대 1개 작업자만 지원
tasks.max=1

# DB 정보
database.hostname={DB_HOST}
database.port={DB_PORT}
database.user={DB_USER}
database.password={DB_PASS}
# MSK Connect의 고유 ID로 유일값으로 지정
database.server.id=1
# MSK Connect의 고유 이름으로 CDC 토픽의 Prefix로 사용됨
database.server.name=testdb.sink
# 변경감지할 Schema, 여러개일 경우 ','로 구분
database.include.list=TESTDB
database.ssl.mode=disabled
# 변경감지할 테이블 명, 여러개일 경우 ','로 구분, 없으면 모든 테이블 읽음, 스키마명 지정 필수
table.include.list=TESTDB.CDC_TEST

# SMT(Single Message Transformation) 설정
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.fields=op,table
# cdc payload중 메타정보에 해당하는 필드의 접두어 지정
transforms.unwrap.add.fields.prefix=cdc_meta_
# 기본적으로 delete 레코드는 스트림에서 제거하는데 이를 방지하기 위함
transforms.unwrap.drop.tombstones=false
transforms.unwrap.delete.handling.mode=rewrite

# decimal처리 시 기본값인 precise는 0x001D2와 같은 바이트 형태로 줌
# double로 변경시 FLOAT 64형태로 전송함
decimal.handling.mode=double

# Source Connector가 MSK에 보내는 heatbeat 주기(ms)
heartbeat.interval.ms=10000
heartbeat.topics.prefix=cdc-heartbeat

# Kafka Bootstrap 주소
database.history.kafka.bootstrap.servers={BOOTSTRAP_HOST_PORT}
# DDL문 기록 토픽
database.history.kafka.topic=testdb.sink.history
database.history.kafka.recovery.poll.interval.ms=100
database.history.kafka.query.timeout.ms=3000
database.history.kafka.recovery.attempts=4
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

# 구문오류에 해당하는 ddl문 파싱에러시 skip 여부
database.history.skip.unparseable.ddl=true
# 캡쳐 대상 테이블에 대해서만 ddl문 수신 여부
database.history.store.only.captured.tables.ddl=true
# 스냅샷 생성시 락 여부 (default:minimal -> 스키마 및 메타데이터 읽는 동안만 lock)
snapshot.locking.mode=none

보안정보 입력

  • 암호화 유형 선택(Broker와 Msk Connect간 전송 데이터)

로그 전송 유형 선택

  • MSK Connect 로그 전송 채널 선택

검토 및 생성

  • 작성내용 검토 및 커넥터 생성

여기까지 하면 본격적으로 MSK Connect를 생성하기 시작합니다.

커넥터는 최초 구동시 캡쳐 테이블의 전체 row를 대상으로 스냅샷을 생성하기 때문에 Database의 크기에 따라 커넥터 생성에 소요되는 시간이 다릅니다.

참고로 커넥터 생성 로그는 로그 전송 유형 선택 단계에서 선택한 방식으로 조회 가능합니다.

#3–1. Snapshot에 대한 고찰 (깨알팁 🎁)

Q: 스냅샷을 뜬다구요!?
A: 네, 뜬다구요

Q: 스냅샷은 어디에 사용되나요?
A: 커넥터 생성 전후 데이터 변경을 추적하기 위한 기준점 역할을 합니다.

Q: 스냅샷 이후 데이터부터 변경을 추적한다는 뜻일까요?
A: 맞아요. 스냅샷 이후의 데이터 변경을 실시간으로 추적하며, 스냅샷 데이터는 최초 싱크에 활용될 수 있습니다. 이 데이터들은 별도 Payload를 생성하여 전달되며, 메시지의 Operation(DML) 타입은 ‘r’입니다.

Q: 아하, 근데 그렇게 최초 싱크를 맞추기엔.. 데이터 정합성이 걱정되네요.
A: 네, 그래서 snapshot.locking.mode와 같은 속성값을 제공하여 커넥터 생성시 Table Lock을 보장합니다. 다만 변경감지 대상 테이블들의 크기가 클수록 커넥터 생성시간이 길어지고 그만큼 Lock을 잡는시간이 길어지겠죠?

Q: 그래서 펫프렌즈는 쓴다는거에요 안쓴다는거에요?
A: 저희는 사용하지 않는답니다. 다만 데이터 싱크를 수동으로 맞추기위한 별도 Upsert 스크립트를 작성하여 최초 이관시, 그리고 커넥터 장애 시에 야무지게 사용하고 있답니다. 껄껄

Q: 그렇군요, 그래서 예시에 적어주신 snapshot.locking.mode 속성값이 none인가요?
A: 맞습니다! default값은 minimal로 스키마를 읽는동안 Lock을 잡는 설정이니 항상 주의하세요. 껄껄

Q: 너무 너무 감사합니다.! 정말 잘 알고 계시네요!
A: 별말씀을요! 지금까지 상용 DB 1시간동안 Lock잡았던 민이였습니다.

뭐하는 새X지..

#4. CDC 변경감지 테스트

#4–1. CDC Topic 목록 조회

연결된 부트스트랩 서버로 접속하여 토픽 리스트를 조회합니다.

👉 토픽 조회 명령어

kafka-topics.sh --bootstrap-server {bootstrap.server.host:port} --list

출력 결과

생성된 토픽중 __amazon_msk_connect로 시작하는 토픽들이 있습니다. 해당 토픽을은 msk connect가 변경감지를 위해 내부적으로 사용하는 토픽들이며 아래와 같은 역할을 합니다.

__amazon_msk_connect_config_커넥터명_UUID : 커넥터 내부 구성정보 저장
__amazon_msk_connect_offset_커넥터명_UUID : binlog 파일 offset 기록
__amazon_msk_connect_status_커넥터명_UUID : 작업 구성상태 변경 기록

👆 참고로 작업자 구성 입력시 해당 토픽들의 파티션 & 리플리케이션을 설정했었습니다.

나머지 토픽들은 아래와 같은 역할을 합니다.

cdc-heartbeat.testdb.sink : heartbeat 토픽
testdb.sink.history : DDL 기록 토픽
testdb.sink.TESTDB.CDC_TEST : CDC_TEST 테이블 변경감지 토픽

👆 토픽 네이밍 규칙은 위에 예시로 적어드린 커넥터 구성정보 내 코멘트를 참고바랍니다.

또 하나의 깨알팁을 드리자면, 구성값중 heartbeat.interval.ms값을 기준으로 heartbeat 토픽에 아래와 같은 메시지를 전달합니다.

UI for Apache Kafka

💡 펫프렌즈는 해당 토픽을 사용하여 커넥터 경보알림을 받고 있습니다.

#4–2. CDC 토픽 메시지 수신 및 DML 테스트

CDC_TEST 테이블의 변경감지 토픽명을 알았으니 메시지를 조회해봅니다.

👉 토픽 메시지 수신 명령어

kafka-console-consumer.sh --bootstrap-server{bootstrap.server.host:port} --from-beginning --topic testdb.sink.TESTDB.CDC_TEST | jq --color-output .

✅ 토픽 메시지 수신 결과

Q: 앵? 아직 DML문을 실행시키지 않았는데 왜 메시지가 조회될까요?

A: 커넥터 최초 구동시 읽어드린 스냅샷 데이터입니다. 같은 이유로 대용량 테이블의 토픽 메시지를 조회할 때는 — from -beginning 옵션을 제거하는걸 추천 드립니다.

💡 만약 Sink Connector가 이미 구축되어있는 상태에서 커넥터를 생성한다면 Operation type “r”에 대한 스킵처리가 필요할 수 있습니다. (스냅샷 데이터로 최초 싱크를 맞추지 않는다는 가정하에)

먼저 Insert문 테스트입니다.

👉 Insert 문 실행

#단일 insert
insert into CDC_TEST
(id, code, request_day, created_at, created_user_id, updated_user_id)
values
(100, 'cdc테스트1', '2021-10-02', now(), 1, 1);

#다중 insert
insert into CDC_TEST
(id, code, request_day, created_at, created_user_id, updated_user_id)
values
(101, 'cdc테스트2', '2021-10-02', now(), 1, 1),
(102, 'cdc테스트3', '2021-10-02', now(), 1, 1);

✅ 토픽 메시지 수신 결과

단일 Insert
다중 Insert

👉 cdc_meta_op : “c” (create)로 수신됩니다.

  • cdc_meta_op 는 커넥터 구성시 SMT설정에 의한 접두어입니다.

또한 Bulk성 Insert여도 총 2번의 메시지가 수신되는걸 확인할 수 있습니다.

주의할 점은 날짜 및 시간 타입의 경우 숫자형태로 메시지를 수신하는걸 볼 수 있는데요? 해당 숫자는 Epoch Time으로 국제표준시각(UTC) 1970-01-01 00:00:00을 기준으로 해당 날짜 및 시간까지의 밀리세컨드를 반환합니다.

때문에 Sink Connector에서 날짜 및 시간 타입에 대한 적절한 컨버팅이 필요합니다.

//Epoch DateTime convert
public static LocalDateTime convertEpochTImeMilli(Long epochTimeMilli) {
if (epochTimeMilli !=null) {
return Instant.ofEpochMilli(epochTimeMilli)
.atZone(ZoneOffset.UTC).toLocalDateTime();
}
return null;
}

//Epoch Date convert
public static LocalDate convertEpochDays(Integer epochDays) {
if (epochDays !=null) {
return LocalDate.ofEpochDay(epochDays);
}
return null;
}

다음은 Update문 테스트입니다.

👉 Update 문 실행

#단일 update
update CDC_TEST set code = '변경테스트' where id = 100;

#다중 update
update CDC_TEST set code = '변경테스트' where 1 = 1;

✅ 토픽 메시지 수신 결과

단일 Update

👉 cdc_meta_op : “u” (update)로 수신됩니다.

  • 스크린샷은 없으나 bulk성 update도 총 2번의 message가 수신됩니다.

다음은 Delete문 테스트입니다.

👉 Delete 문 실행

#단일 delete
delete from CDC_TEST where id = 100;

#다중 delete
delete from CDC_TEST where id in (101,102);

✅ 토픽 메시지 수신 결과

단일 Delete

👉 cdc_meta_op : “d” (delete)로 수신됩니다.

  • 스크린샷은 없으나 bulk성 update도 총 2번의 message가 수신됩니다.

주의할 점은 삭제 레코드 수신 후 null 레코드가 찍혀있습니다.

예시로 공유드린 커넥터 구성정보 코멘트에 적어놓았듯,
기본적으로 삭제 레코드는 스트림에서 제거됩니다.

삭제레코드 조회를 위해transfroms.unwrap.delete.handling.mode값을 rewrite로 변경하였기 때문에 삭제된 레코드가 다시 써지면서 Counsumer가 메시지를 수신하고 추가로 기존 스트림에서 제거된 삭제 메시지가 null로 수신됩니다.

때문에 Sink Connector에서 null메시지를 skip처리하는 부분이 필요하며펫프렌즈의 경우 ConsumerInterceptor를 활용하여 별도 동작없이 commit처리 하고 있습니다.

번외로 DDL문 테스트입니다.

👉 DDL문 실행

#alter
alter table CDC_TEST modify code varchar(50);

#truncate
truncate table CDC_TEST;

#create
create table CDC_TEST_2 as select * from CDC_TEST;

✅ 토픽 메시지 수신 결과

⁉️ DDL문의 경우 테이블 변경 감지 토픽에 메시지가 수신되지 않습니다.

단, 커넥터 구성에 명시한 {database.history.kafka.topic} 토픽에서
확인 가능합니다. 스크린샷은 생략합니다.

#5. Sink Connector 구축 (Spring boot)

CDC 메시지가 정상적으로 수신되는걸 확인하였으니, 해당 메시지를 소비처리할 Consumer를 작성합니다.

본 글은 CDC에 대한 내용만 다루며, Spring Boot Kafka Consumer구축에 대한 내용은 다루지 않습니다. 만약 Consumer 환경이 구축되어 있지 않다면, 다음 Spring.io 공식문서를 참고해주세요.

#5–1. Domain, Repository

package kr.co.test.cdc.worker.cdcsample.domain;

import lombok.AccessLevel;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;

import java.time.LocalDate;
import java.time.LocalDateTime;

@Getter
@Entity
@Table(name = "CDC_TEST")
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class CdcTest {
@Id
@Column(name = "id")
private Long id;

@Column(name = "code")
private String code;

@Column(name = "request_day")
private LocalDate requestDay;

@Column(name = "created_at")
private LocalDateTime createdAt;

@Column(name = "created_user_id")
private Long createdUserId;

@Column(name = "updated_user_id")
private Long updatedUserId;

@Builder
public CdcTest(Long id, String code, LocalDate requestDay, LocalDateTime createdAt, Long createdUserId, Long updatedUserId) {
this.id = id;
this.code = code;
this.requestDay = requestDay;
this.createdAt = createdAt;
this.createdUserId = createdUserId;
this.updatedUserId = updatedUserId;
}

public void update(CdcTest other) {
this.code = other.code;
this.requestDay = other.requestDay;
this.createdAt = other.createdAt;
this.createdUserId = other.createdUserId;
this.updatedUserId = other.updatedUserId;
}

}
package kr.co.test.cdc.worker.cdcsample.repository;

import kr.co.test.cdc.worker.cdcsample.domain.CdcTest;
import org.springframework.data.jpa.repository.JpaRepository;

public interface CdcTestRepository extends JpaRepository<CdcTest,Long> {
}

#5–2. Payload DTO

package kr.co.test.cdc.worker.model;

import lombok.Getter;
import lombok.ToString;
/*
* cdc 변경감지 수신 메시지 공통 메타정보
*/
@Getter
@ToString
public class BaseSyncDto {
/* 변경감지 타입 : c,r,u,d */
private Character operation;
/* 변경감지 테이블 명 */
private String tableName;
/* 변경감지 삭제여부 */
private Boolean isDeleted;

public BaseSyncDto(Character operation, String tableName, Boolean isDeleted)
{
this.operation = operation;
this.tableName = tableName;
this.isDeleted = isDeleted;
}
}
package kr.co.test.cdc.worker.cdcsample.dto;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;

import lombok.Getter;
import lombok.ToString;

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;

import kr.co.test.cdc.worker.cdcsample.domain.CdcTest;
import kr.co.test.cdc.worker.model.BaseSyncDto;
import static kr.co.test.cdc.worker.global.util.CdcUtil.convertEpochDays;
import static kr.co.test.cdc.worker.global.util.CdcUtil.convertEpochTImeMilli;

/*
* CDC_TEST Payload DTO
*/
@Getter
@ToString(callSuper = true)
public class CdcTestSyncDto extends BaseSyncDto {

private Long id;
private String code;
@JsonFormat(pattern = "yyyy-MM-dd")
private LocalDate requestDay;
@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private LocalDateTime createdAt;
private Long createdUserId;
private Long updatedUserId;

@JsonCreator
public CdcTestSyncDto(
@JsonProperty("id") Long id,
@JsonProperty("code") String code,
@JsonProperty("request_day") Integer requestDay,
@JsonProperty("created_at") Long createdAt,
@JsonProperty("created_user_id") Long createdUserId,
@JsonProperty("updated_user_id") Long updatedUserId,
@JsonProperty("cdc_meta_op") Character operation,
@JsonProperty("cdc_meta_table") String tableName,
@JsonProperty("__deleted") Boolean isDeleted) {
super(operation, tableName, isDeleted);
this.id = id;
this.code = code;
this.requestDay = convertEpochDays(requestDay);
this.createdAt = convertEpochTImeMilli(createdAt);
this.createdUserId = createdUserId;
this.updatedUserId = updatedUserId;
}

/* 엔티티 반환 */
public CdcTest toEntity() {
return CdcTest.builder()
.id(id)
.code(code)
.requestDay(requestDay)
.createdAt(createdAt)
.createdUserId(createdUserId)
.updatedUserId(updatedUserId)
.build();
}

}

#5–3. Util

package kr.co.test.cdc.worker.global.util;

import kr.co.test.cdc.worker.model.BaseSyncDto;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Optional;
/*
* CDC 관련 유틸 클래스
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class CdcUtil {

public static final char CDC_OP_INSERT = 'c';
public static final char CDC_OP_UPDATE = 'u';
public static final char CDC_OP_DELETE = 'd';
public static final char CDC_OP_SELECT = 'r';

/* Snapshot 데이터가 아닌지 확인 */
public static boolean validateDMLType(char cdcMetaOp) {
return cdcMetaOp != CDC_OP_SELECT;
}

/*
* BaseSyncDto의 operation을 통해 DML Type을 구분할 수 있지만
* 양측 Database가 싱크되지 않은 상황에서 에러를 발생시킬 수 있어
* 별도의 DML 실행 가능여부 판별 메소드 정의
* (발생가능 에러 ex: 없는 레코드 삭제, pk중복 등)
*/
public static boolean isInsertRecord(Optional<?> optional, BaseSyncDto dto) {
return optional.isEmpty() && !dto.getIsDeleted();
}

public static boolean isUpdateRecord(Optional<?> optional, BaseSyncDto dto) {
return optional.isPresent() && !dto.getIsDeleted();
}

public static boolean isDeleteRecord(Optional<?> optional, BaseSyncDto dto) {
return optional.isPresent() && dto.getIsDeleted();
}

//Epoch DateTime convert
public static LocalDateTime convertEpochTImeMilli(Long epochTimeMilli) {
if (epochTimeMilli !=null) {
return Instant.ofEpochMilli(epochTimeMilli)
.atZone(ZoneOffset.UTC).toLocalDateTime();
}
return null;
}

//Epoch Date convert
public static LocalDate convertEpochDays(Integer epochDays) {
if (epochDays !=null) {
return LocalDate.ofEpochDay(epochDays);
}
return null;
}

}

#5–4. Service

package kr.co.test.cdc.worker.cdcsample.application;

import lombok.RequiredArgsConstructor;

import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.util.Optional;

import kr.co.test.cdc.worker.cdcsample.domain.CdcTest;
import kr.co.test.cdc.worker.cdcsample.dto.CdcTestSyncDto;
import kr.co.test.cdc.worker.cdcsample.repository.CdcTestRepository;
import static kr.co.test.cdc.worker.global.util.CdcUtil.isDeleteRecord;
import static kr.co.test.cdc.worker.global.util.CdcUtil.isInsertRecord;
import static kr.co.test.cdc.worker.global.util.CdcUtil.isUpdateRecord;

@Service
@RequiredArgsConstructor
public class CdcSyncService {
private final CdcTestRepository cdcTestRepository;

/* CDC_TEST 동기화 */
@Transactional
public void syncCdcTest(CdcTestSyncDto dto) {
final Long id = dto.getId();
Optional<CdcTest> cdcTest = cdcTestRepository.findById(id);

if (isInsertRecord(cdcTest, dto)) {
cdcTestRepository.save(dto.toEntity());
return;
}
if (isDeleteRecord(cdcTest, dto)) {
cdcTestRepository.deleteById(id);
return;
}
if (isUpdateRecord(cdcTest, dto)) {
cdcTest.get().update(dto.toEntity());
}
}

}

#5–5. Listener

package kr.co.test.cdc.worker.cdcsample.consumer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import kr.co.test.cdc.worker.cdcsample.application.CdcSyncService;
import kr.co.test.cdc.worker.cdcsample.dto.CdcTestSyncDto;
import kr.co.test.cdc.worker.global.util.CdcUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/*
* cdc 컨슈머 (cdc sample)
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class CdcConsumer {

private final ObjectMapper objectMapper;
private final CdcSyncService cdcTestService;

/* CDC_TEST 동기화 컨슈머 */
@KafkaListener(topics = "testdb.sink.TESTDB.CDC_TEST", groupId = "cdc-test-events-group")
public void onSyncCdcTest(ConsumerRecord<String, String> consumerRecord) throws JsonProcessingException {

final CdcTestSyncDto cdcTestSyncDto = objectMapper.readValue(consumerRecord.value(), CdcTestSyncDto.class);

if (CdcUtil.validateDMLType(cdcTestSyncDto.getOperation())) {
cdcTestService.syncCdcTest(cdcTestSyncDto);
}
}

}

먼길 오느라 고생하셨습니다.👏 여기까지 작성하시면 모든 준비는 완료되었습니다. 스프링부트 구동시 DML테스트에서 발행된 커밋되지 않은 메시지들이 처리됩니다.

이어서 DML 테스트를 진행하면 양쪽 데이터베이스가 동기화되는 것을 확인할 수 있습니다.

글을 마치며

CDC는 MSA와 분산 데이터베이스 환경에서 필수요소 중 하나입니다.

  • MSA로 전환하는 과정에서 Legacy Application과 병행운영
  • 조회용 데이터베이스에 데이터 복제
  • 분산 데이터베이스 간 조인 작업을 위한 테이블 동기화

이외에도 다양한 케이스에서 데이터 동기화 요건은 발생할 수 있습니다.

펫프렌즈 역시, 현재는 CQRS 패턴 적용을 위한 Command Store → Query Store간 동기화, 검색 부분 색인을 위한 통계 DB 동기화 등 MSA로 나아가는 여러 길목에서 CDC를 활용하고 있답니다.

이렇게 다양한 서비스에서 CDC를 운영하며 얻은 펫프만의 노하우나 커넥터 관련 트러블슈팅에 대한 내용을 다룰까도 생각했습니다. 그렇게 막상 글을 쓰려다보니, CDC환경을 구축하며 Reference가 생각보다 빈약해서 맘고생이 많았던 작년의 제가 떠오르더군요..😢

비록 포스팅 시기가 늦어졌지만.. 지금 이 순간, 같은이유로 고생하고 계실 누군가에게 이 글이 큰 도움이 되었으면 하는 마음으로 커넥터 구축절차라는 주제로 열심히 적어보았습니다.

스압..에 못이겨 못다한 이야기들은 꼭 다음번 포스팅에서 공유드려보도록 하겠습니다.

긴 글 읽어주셔서 감사합니다.

Reference

--

--