Kafka Connect는 별도의 producer와 consumer의 개발 없이 Kafka를 통해 Data source/destination 간 메세지 송수신을 가능하도록 해주는 솔루션이다. Kafka connect plugin을 설치하고 설정을 변경해주면 손쉽게 사용할 수 있다.

image

예시로 위 그림과 같은 아키텍처를 구성해본다. Source Connector는 producer 역할을 하고, Sink Connector는 consumer의 역할을 한다. Source Connector는 Debezium이라는 플러그인을 사용하는데, Debezium은 DB의 변경사항을 캡쳐해서 kafka broker로 전송하게 하는 플러그인이다. Debezium은 MySQL의 커밋사항을 기록하는 bin log를 읽어서 변경사항을 메세지로 kafka broker한테 전달한다. Sink Connector는 S3 를 사용하고, S3는 로컬 스택을 활용한다. 로컬 스택은 AWS S3의 클라우드 스택을 로컬에서 테스트 할 수 있게 지원하는 서비스이다.

https://www.confluent.io/hub/ 에 들어가서 원하는 플러그인을 받을 수 있다. Debezium MySQL CDC Source Connector 와 Amazon S3 Sink Connector를 사용한다. CDC는 chane data capture의 약자이다.

image

image

yaml 파일은 아래와 같으며 실행하기 전에 /etc/hosts/를 아래와 같이 변경해준다.

image

version: '3'
services:
  zookeeper-1:
    hostname: zookeeper1
    image: confluentinc/cp-zookeeper:6.2.0
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: 12181
      ZOOKEEPER_DATA_DIR: /zookeeper/data
      ZOOKEEPER_SERVERS: zookeeper1:22888:23888;zookeeper2:32888:33888;zookeeper3:42888:43888
    ports:
      - 12181:12181
      - 22888:22888
      - 23888:23888
    volumes:
      - ./zookeeper/data/1:/zookeeper/data

  zookeeper-2:
    hostname: zookeeper2
    image: confluentinc/cp-zookeeper:6.2.0
    environment:
      ZOOKEEPER_SERVER_ID: 2
      ZOOKEEPER_CLIENT_PORT: 22181
      ZOOKEEPER_DATA_DIR: /zookeeper/data
      ZOOKEEPER_SERVERS: zookeeper1:22888:23888;zookeeper2:32888:33888;zookeeper3:42888:43888
    ports:
      - 22181:22181
      - 32888:32888
      - 33888:33888
    volumes:
      - ./zookeeper/data/2:/zookeeper/data

  zookeeper-3:
    hostname: zookeeper3
    image: confluentinc/cp-zookeeper:6.2.0
    environment:
      ZOOKEEPER_SERVER_ID: 3
      ZOOKEEPER_CLIENT_PORT: 32181
      ZOOKEEPER_DATA_DIR: /zookeeper/data
      ZOOKEEPER_SERVERS: zookeeper1:22888:23888;zookeeper2:32888:33888;zookeeper3:42888:43888
    ports:
      - 32181:32181
      - 42888:42888
      - 43888:43888
    volumes:
      - ./zookeeper/data/3:/zookeeper/data

  kafka-1:
    image: confluentinc/cp-kafka:6.2.0
    hostname: kafka1
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper1:12181,zookeeper2:22181,zookeeper3:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:19092
      KAFKA_LOG_DIRS: /kafka
    ports:
      - 19092:19092
    volumes:
      - ./kafka/logs/1:/kafka

  kafka-2:
    image: confluentinc/cp-kafka:6.2.0
    hostname: kafka2
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper1:12181,zookeeper2:22181,zookeeper3:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:29092
      KAFKA_LOG_DIRS: /kafka
    ports:
      - 29092:29092
    volumes:
      - ./kafka/logs/2:/kafka

  kafka-3:
    image: confluentinc/cp-kafka:6.2.0
    hostname: kafka3
    depends_on:
      - zookeeper-1
      - zookeeper-2
      - zookeeper-3
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper1:12181,zookeeper2:22181,zookeeper3:32181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:39092
      KAFKA_LOG_DIRS: /kafka
    ports:
      - 39092:39092
    volumes:
      - ./kafka/logs/3:/kafka


  connect-1:
    hostname: connect1
    image: confluentinc/cp-kafka-connect:6.2.0
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka1:19092,kafka2:29092,kafka3:39092  # kafka cluster의 broker를 검색하는데 사용
      CONNECT_REST_ADVERTISED_HOST_NAME: connect1
      CONNECT_GROUP_ID: default-connect-group  # connect worker와 join할 connect cluster의 그룹명
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter  # json으로 설정
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_CONFIG_STORAGE_TOPIC: config  # satndalone은 이 정보를 로컬에 저장, destributed모드는 별도의 kafka topic에 저장
      CONNECT_OFFSET_STORAGE_TOPIC: offset
      CONNECT_STATUS_STORAGE_TOPIC: status
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
      CONNECT_REST_PORT: 8083  # connector가 서비스할 포트 지정
    ports:
      - 18083:8083  # 외부에서 18083으로 접근
    volumes:
      - ./connectors/1:/usr/share/confluent-hub-components
    command:
      - bash
      - -c
      - |
        confluent-hub install --no-prompt confluentinc/kafka-connect-s3:10.0.3  # 플러그인 설치
        confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.0
        /etc/confluent/docker/run &
        sleep infinity
        
  localstack-1:
    hostname: localstack1
    image: localstack/localstack:latest
    environment:
      AWS_DEFAULT_REGION: us-east-2
      EDGE_PORT: 4566
      SERVICES: s3
      AWS_ACCESS_KEY_ID: test
      AWS_SECRET_ACCESS_KEY: test
    ports:
      - 4566:4566
    volumes:
      - ./localstack:/tmp/localstack

  mysql-1:
    hostname: mysql1
    image: mysql/mysql-server:5.7
    ports:
      - 3306:3306
    environment:
      MYSQL_USER: root
      MYSQL_ROOT_HOST: "%%"
      MYSQL_DATABASE: mydb
      MYSQL_ROOT_PASSWORD: passwd
    command: mysqld  # debizium이 binlog를 사용한다고 했는데, binlog사용을 위한 설정
      --server-id=1234
      --max-binlog-size=4096
      --binlog-format=ROW
      --log-bin=bin-log
      --sync-binlog=1
      --binlog-rows-query-log-events=ON
    volumes:
      - ./mysql:/var/lib/mysql

docker compose를 실행하면서 몇가지 사전작업을 해준다. https://sequelpro.com 에서 sequel pro 설치를 한다. sequel pro는 MySQL에 쿼리를 날려주는 툴이다. mysql-1의 environment 내용을 적어주면 된다.

image

image

aws cli를 설치한다. https://docs.aws.amazon.com/ko_kr/cli/latest/userguide/install-cliv2-mac.html#cliv2-mac-install-cmd

# aws cli 설치
$ curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
$ unzip awscliv2.zip
$ sudo ./aws/install

image

설치후에 aws configure로 config를 변경한다.

# aws cli 설정
$ aws configure
$ aws s3 --endpoint-url=http://localhost:4566 ls

image

# bucket 생성
$ aws s3api create-bucket --bucket mybucket --endpoint-url=http://localhost:4566
# bucket 생성 확인
$ aws s3 --endpoint-url=http://localhost:4566 ls  

image

# object 업로드
$ vim hello.txt 
$ aws s3api put-object --bucket mybucket --body hello.txt --key hello --endpoint-url=http://localhost:4566

image

# bucket 내 object list up
$ aws s3api list-objects --endpoint-url=http://localhost:4566 --bucket mybucket

image

# object 다운로드
$ aws s3api get-object --endpoint-url=http://localhost:4566 --bucket mybucket --key hello output.txt

image

mysql connector 등록할때 {myip} 를 각자 ip에 맞게 수정해서 실행한다.

# mysql connector 등록
$ curl -v -XPOST -H'Accept:application/json' -H'Content-Type:application/json' http://connect1:18083/connectors \
  -d '
{
    "name": "mysql-source-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector", # connector 구현체
        "database.hostname": "${myip}",
        "database.port": "3306",
        "database.user": "root",
        "database.password": "passwd",
        "database.server.id": "1234",
        "database.server.name": "mysql-1",
        "database.include.list": "mydb",  # 어떤 DB에서 변경사항이 발생했을때 이벤트로 처리할 것인지 지정
        "database.history.kafka.bootstrap.servers": "${myip}:19092, ${myip}:29092, ${myip}:39092",
        "database.history.kafka.topic": "kafka-student-changes",
        "include.schema.changes": "true",  # 스키마 변경에 대해서도 이벤트 발행
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false"

    }
}'

# Location 헤더 접근 해서 설정 확인
$ curl -v <Location>

image

이제 sink connector를 등록한다. 마찬가지로 {myip} 를 각자 ip에 맞게 수정해서 실행한다.

# s3 connector 등록
curl -v -XPOST -H'Accept:application/json' -H'Content-Type:application/json' http://connect1:18083/connectors \
  -d '{
    "name": "s3-sink-connector",
    "config": {
      "topics": "mysql-1.mydb.kafka",  # 서버명.DB명.테이블명
      "connector.class": "io.confluent.connect.s3.S3SinkConnector",
      "flush.size": 1,
      "s3.bucket.name": "mybucket",
      "s3.region": "us-east-2",
      "s3.part.size": "5242880",
      "s3.proxy.url": "http://${myip}:4566",
      "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable": "false",
      "value.converter.schemas.enable": "false",
      "storage.class": "io.confluent.connect.s3.storage.S3Storage",
      "aws.access.key.id": "test",
      "aws.secret.access.key": "test",
      "topics.dir": "topicsdir"
    }
  }'
  
# Location 헤더 접근 해서 설정 확인
$ curl -v <Location>

image

# Cluster status
$ curl -v -XGET -H'Accept: application/json' http://connect1:18083

# connectors 상세정보 확인
$ curl -v -XGET -H'Accept: application/json' http://connect1:18083/connectors
$ curl -v -XGET -H'Accept: application/json' 'http://connect1:18083/connectors?expand=status'
$ curl -v -XGET -H'Accept: application/json' http://connect1:18083/connectors/mysql-source-connector/config
$ curl -v -XGET -H'Accept: application/json' http://connect1:18083/connectors/mysql-source-connector/status
# connector 멈추기
$ curl -v -XPUT -H'Accept: application/json' http://connect1:18083/connectors/mysql-source-connector/pause
$ curl -v -XPUT -H'Accept: application/json' http://connect1:18083/connectors/mysql-source-connector/resume

image

image

image

이제 sequelpro에서 쿼리를 날리면서 테이블을 만들어본다.

# mysql queries
/* CREATE TABLE kafka (
    student_no int(10) NOT NULL AUTO_INCREMENT PRIMARY KEY,
    name char(10) NOT NULL,
    phone_no char(20)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO kafka(name, phone_no) VALUES('Sam', '01012345768');
INSERT INTO kafka(name, phone_no) VALUES('Mary', '01022445768');
INSERT INTO kafka(name, phone_no) VALUES('Tom', '0212342132');
INSERT INTO kafka(name, phone_no) VALUES('Susan', '021234423');
INSERT INTO kafka(name, phone_no) VALUES('Joe', '01073219284');

SELECT * FROM kafka;

UPDATE kafka SET phone_no='01077778888' where name='Sam';
*/

image

데이터가 잘 들어왓는지 확인한다.

$ aws s3api list-objects --endpoint-url=http://localhost:4566 --bucket mybucket
$ aws s3api get-object --endpoint-url=http://localhost:4566 --bucket mybucket --key <key> output.txt

image

아까 넣었던 데이터가 json 형식으로 s3에 저장된것을 볼 수 있다.

image