Kafka Connect는 별도의 producer와 consumer의 개발 없이 Kafka를 통해 Data source/destination 간 메세지 송수신을 가능하도록 해주는 솔루션이다. Kafka connect plugin을 설치하고 설정을 변경해주면 손쉽게 사용할 수 있다.
예시로 위 그림과 같은 아키텍처를 구성해본다. Source Connector는 producer 역할을 하고, Sink Connector는 consumer의 역할을 한다. Source Connector는 Debezium이라는 플러그인을 사용하는데, Debezium은 DB의 변경사항을 캡쳐해서 kafka broker로 전송하게 하는 플러그인이다. Debezium은 MySQL의 커밋사항을 기록하는 bin log를 읽어서 변경사항을 메세지로 kafka broker한테 전달한다. Sink Connector는 S3 를 사용하고, S3는 로컬 스택을 활용한다. 로컬 스택은 AWS S3의 클라우드 스택을 로컬에서 테스트 할 수 있게 지원하는 서비스이다.
https://www.confluent.io/hub/ 에 들어가서 원하는 플러그인을 받을 수 있다. Debezium MySQL CDC Source Connector 와 Amazon S3 Sink Connector를 사용한다. CDC는 chane data capture의 약자이다.
yaml 파일은 아래와 같으며 실행하기 전에 /etc/hosts/를 아래와 같이 변경해준다.
version: '3'
services:
zookeeper-1:
hostname: zookeeper1
image: confluentinc/cp-zookeeper:6.2.0
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 12181
ZOOKEEPER_DATA_DIR: /zookeeper/data
ZOOKEEPER_SERVERS: zookeeper1:22888:23888;zookeeper2:32888:33888;zookeeper3:42888:43888
ports:
- 12181:12181
- 22888:22888
- 23888:23888
volumes:
- ./zookeeper/data/1:/zookeeper/data
zookeeper-2:
hostname: zookeeper2
image: confluentinc/cp-zookeeper:6.2.0
environment:
ZOOKEEPER_SERVER_ID: 2
ZOOKEEPER_CLIENT_PORT: 22181
ZOOKEEPER_DATA_DIR: /zookeeper/data
ZOOKEEPER_SERVERS: zookeeper1:22888:23888;zookeeper2:32888:33888;zookeeper3:42888:43888
ports:
- 22181:22181
- 32888:32888
- 33888:33888
volumes:
- ./zookeeper/data/2:/zookeeper/data
zookeeper-3:
hostname: zookeeper3
image: confluentinc/cp-zookeeper:6.2.0
environment:
ZOOKEEPER_SERVER_ID: 3
ZOOKEEPER_CLIENT_PORT: 32181
ZOOKEEPER_DATA_DIR: /zookeeper/data
ZOOKEEPER_SERVERS: zookeeper1:22888:23888;zookeeper2:32888:33888;zookeeper3:42888:43888
ports:
- 32181:32181
- 42888:42888
- 43888:43888
volumes:
- ./zookeeper/data/3:/zookeeper/data
kafka-1:
image: confluentinc/cp-kafka:6.2.0
hostname: kafka1
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:12181,zookeeper2:22181,zookeeper3:32181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:19092
KAFKA_LOG_DIRS: /kafka
ports:
- 19092:19092
volumes:
- ./kafka/logs/1:/kafka
kafka-2:
image: confluentinc/cp-kafka:6.2.0
hostname: kafka2
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:12181,zookeeper2:22181,zookeeper3:32181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29092
KAFKA_LOG_DIRS: /kafka
ports:
- 29092:29092
volumes:
- ./kafka/logs/2:/kafka
kafka-3:
image: confluentinc/cp-kafka:6.2.0
hostname: kafka3
depends_on:
- zookeeper-1
- zookeeper-2
- zookeeper-3
environment:
KAFKA_BROKER_ID: 3
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:12181,zookeeper2:22181,zookeeper3:32181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:39092
KAFKA_LOG_DIRS: /kafka
ports:
- 39092:39092
volumes:
- ./kafka/logs/3:/kafka
connect-1:
hostname: connect1
image: confluentinc/cp-kafka-connect:6.2.0
depends_on:
- kafka-1
- kafka-2
- kafka-3
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka1:19092,kafka2:29092,kafka3:39092 # kafka cluster의 broker를 검색하는데 사용
CONNECT_REST_ADVERTISED_HOST_NAME: connect1
CONNECT_GROUP_ID: default-connect-group # connect worker와 join할 connect cluster의 그룹명
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter # json으로 설정
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_CONFIG_STORAGE_TOPIC: config # satndalone은 이 정보를 로컬에 저장, destributed모드는 별도의 kafka topic에 저장
CONNECT_OFFSET_STORAGE_TOPIC: offset
CONNECT_STATUS_STORAGE_TOPIC: status
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
CONNECT_REST_PORT: 8083 # connector가 서비스할 포트 지정
ports:
- 18083:8083 # 외부에서 18083으로 접근
volumes:
- ./connectors/1:/usr/share/confluent-hub-components
command:
- bash
- -c
- |
confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.0.3 # 플러그인 설치
confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
/etc/confluent/docker/run &
sleep infinity
localstack-1:
hostname: localstack1
image: localstack/localstack:latest
environment:
AWS_DEFAULT_REGION: us-east-2
EDGE_PORT: 4566
SERVICES: s3
AWS_ACCESS_KEY_ID: test
AWS_SECRET_ACCESS_KEY: test
ports:
- 4566:4566
volumes:
- ./localstack:/tmp/localstack
mysql-1:
hostname: mysql1
image: mysql/mysql-server:5.7
ports:
- 3306:3306
environment:
MYSQL_USER: root
MYSQL_ROOT_HOST: "%%"
MYSQL_DATABASE: mydb
MYSQL_ROOT_PASSWORD: passwd
command: mysqld # debizium이 binlog를 사용한다고 했는데, binlog사용을 위한 설정
--server-id=1234
--max-binlog-size=4096
--binlog-format=ROW
--log-bin=bin-log
--sync-binlog=1
--binlog-rows-query-log-events=ON
volumes:
- ./mysql:/var/lib/mysql
docker compose를 실행하면서 몇가지 사전작업을 해준다. https://sequelpro.com 에서 sequel pro 설치를 한다. sequel pro는 MySQL에 쿼리를 날려주는 툴이다. mysql-1의 environment 내용을 적어주면 된다.
aws cli를 설치한다. https://docs.aws.amazon.com/ko_kr/cli/latest/userguide/install-cliv2-mac.html#cliv2-mac-install-cmd
# aws cli 설치
$ curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
$ unzip awscliv2.zip
$ sudo ./aws/install
설치후에 aws configure로 config를 변경한다.
# aws cli 설정
$ aws configure
$ aws s3 --endpoint-url=http://localhost:4566 ls
# bucket 생성
$ aws s3api create-bucket --bucket mybucket --endpoint-url=http://localhost:4566
# bucket 생성 확인
$ aws s3 --endpoint-url=http://localhost:4566 ls
# object 업로드
$ vim hello.txt
$ aws s3api put-object --bucket mybucket --body hello.txt --key hello --endpoint-url=http://localhost:4566
# bucket 내 object list up
$ aws s3api list-objects --endpoint-url=http://localhost:4566 --bucket mybucket
# object 다운로드
$ aws s3api get-object --endpoint-url=http://localhost:4566 --bucket mybucket --key hello output.txt
mysql connector 등록할때 {myip} 를 각자 ip에 맞게 수정해서 실행한다.
# mysql connector 등록
$ curl -v -XPOST -H'Accept:application/json' -H'Content-Type:application/json' http://connect1:18083/connectors \
-d '
{
"name": "mysql-source-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector", # connector 구현체
"database.hostname": "${myip}",
"database.port": "3306",
"database.user": "root",
"database.password": "passwd",
"database.server.id": "1234",
"database.server.name": "mysql-1",
"database.include.list": "mydb", # 어떤 DB에서 변경사항이 발생했을때 이벤트로 처리할 것인지 지정
"database.history.kafka.bootstrap.servers": "${myip}:19092, ${myip}:29092, ${myip}:39092",
"database.history.kafka.topic": "kafka-student-changes",
"include.schema.changes": "true", # 스키마 변경에 대해서도 이벤트 발행
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}'
# Location 헤더 접근 해서 설정 확인
$ curl -v <Location>
이제 sink connector를 등록한다. 마찬가지로 {myip} 를 각자 ip에 맞게 수정해서 실행한다.
# s3 connector 등록
curl -v -XPOST -H'Accept:application/json' -H'Content-Type:application/json' http://connect1:18083/connectors \
-d '{
"name": "s3-sink-connector",
"config": {
"topics": "mysql-1.mydb.kafka", # 서버명.DB명.테이블명
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"flush.size": 1,
"s3.bucket.name": "mybucket",
"s3.region": "us-east-2",
"s3.part.size": "5242880",
"s3.proxy.url": "http://${myip}:4566",
"format.class": "io.confluent.connect.s3.format.json.JsonFormat",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"aws.access.key.id": "test",
"aws.secret.access.key": "test",
"topics.dir": "topicsdir"
}
}'
# Location 헤더 접근 해서 설정 확인
$ curl -v <Location>
# Cluster status
$ curl -v -XGET -H'Accept: application/json' http://connect1:18083
# connectors 상세정보 확인
$ curl -v -XGET -H'Accept: application/json' http://connect1:18083/connectors
$ curl -v -XGET -H'Accept: application/json' 'http://connect1:18083/connectors?expand=status'
$ curl -v -XGET -H'Accept: application/json' http://connect1:18083/connectors/mysql-source-connector/config
$ curl -v -XGET -H'Accept: application/json' http://connect1:18083/connectors/mysql-source-connector/status
# connector 멈추기
$ curl -v -XPUT -H'Accept: application/json' http://connect1:18083/connectors/mysql-source-connector/pause
$ curl -v -XPUT -H'Accept: application/json' http://connect1:18083/connectors/mysql-source-connector/resume
이제 sequelpro에서 쿼리를 날리면서 테이블을 만들어본다.
# mysql queries
/* CREATE TABLE kafka (
student_no int(10) NOT NULL AUTO_INCREMENT PRIMARY KEY,
name char(10) NOT NULL,
phone_no char(20)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO kafka(name, phone_no) VALUES('Sam', '01012345768');
INSERT INTO kafka(name, phone_no) VALUES('Mary', '01022445768');
INSERT INTO kafka(name, phone_no) VALUES('Tom', '0212342132');
INSERT INTO kafka(name, phone_no) VALUES('Susan', '021234423');
INSERT INTO kafka(name, phone_no) VALUES('Joe', '01073219284');
SELECT * FROM kafka;
UPDATE kafka SET phone_no='01077778888' where name='Sam';
*/
데이터가 잘 들어왓는지 확인한다.
$ aws s3api list-objects --endpoint-url=http://localhost:4566 --bucket mybucket
$ aws s3api get-object --endpoint-url=http://localhost:4566 --bucket mybucket --key <key> output.txt
아까 넣었던 데이터가 json 형식으로 s3에 저장된것을 볼 수 있다.