Kafka Connector란?

Apache Kafka Connect는 kafka 주변을 이루고 있는 여러 개의 echo 시스템 중에 하나이다. 시스템 간 메세지를 전송해야 할 때, 아래 그림같이 MySQL에서 데이터를 뽑아서 kafka로 집어넣는 producer application을 만들고, kafka 에 있는 데이터를 뽑아서 AWS S3로 보내는 consumer application을 만들어야 한다. 그런데 이렇게 시스템 간의 메세지 전송이 필요할 때마다 apllication을 개발해야 할까?

image

이러한 상황에서 누군가가 이전에 만들어 놓은 애플리케이션이 있다면 아주 편할 것이다. 그것이 바로 kafka connect이다. kafka connect를 사용하면 애플리케이션의 개발이 불필요하다. 아래 그림처럼 MySQL용 source connector가 있고 AWS S3용 sink connector가 다 만들어져 있다. 데이터가 있는 데이터 소스에서 데이터를 뽑은 다음에 kafka로 전송해 주는 connector가 source connector이고, kafka에 들어 있는 데이터를 consuming 해서 해당 시스템으로 보내주는 connector를 sink connector 라고 한다. 즉, 잘 알려져 있는 시스템들용으로 이미 connector들이 다 만들어져 있기 때문에 그걸 가져다가 그대로 사용하면 된다.

image

connector들은 confluent hub 웹사이트에서 찾을 수 있다. https://www.confluent.io/hub

Kafa Connect란?

Kafka Connect는 Apache Kafka 안팎으로 데이터를 스트리밍(전송)하기 위한 Framework이다. Kafka Connect는 다른 데이터 시스템을 Kafka와 통합하는 과정을 표준화한 Framework 통합을 위한 Connector 개발, 배포, 관리를 단순화 시켜놓은 framework이다.

  • Connectors ‒ Task를 관리하여 데이터 스트리밍을 조정하는 Plugin(jar), Java Class/Instance
  • Tasks ‒ Kafka에서 또는 Kafka에서 데이터를 전송하는 방법의 구현체(Java Class/Instance)
  • Workers ‒ Connector 및 Task를 실행하는 실행 중인 프로세스(Process)
  • Converters ‒ Connect와 데이터를 보내거나 받는 시스템 간에 데이터를 변환하는 데 사용되는 Components(Java Class)
    • json converter, spring converter 등 존재
  • Transforms ‒ Connector 에 의해 생성되거나 Connector로 전송되는 각 메시지를 변경하는 간단한 Components(Java Class)
  • Dead Letter Queue ‒ Connect에서 Connector 오류를 처리하는 방법

Connect Architecture

Connector Worker Node는 VM이나 물리서버를 의미한다. 이 위에 connector worker software를 설치하고 프로세스를 띄우면 connector worker process를 만들 수 있다. 여기서 source connector를 배포하고 task를 실행 시키면 데이터를 뽑아서 kafka의 특정 topic으로 넣어줄 수 있다. 반대로 kafka에서 데이터를 뽑아서 sink connector를 이용하여 다른 시스템으로 데이터를 전송할 수 있다.

image

connector worker process는 kafka와 어떻게 접속하는가? connector worker process도 기동을 할 때 bootstrap server정보를 입력하게 되어 있는데, 그 정보를 활용하여 kafka server에 접속한다.

Connector worker 는 Standalone 또는 Distributed Workers 방식을 사용할 수 있다. 아래 그림의 왼쪽 처럼 stanalone은 single process로 도는 것으로 확장이 불가하고, connector worker process가 관련된 정보를 모두 local disk에 쓰는 특징이 있다. 아래 그림의 오른쪽 처럼 Distributed workers 방식은 확장도 가능하고 하나의 노드가 죽었을 때 다른 쪽으로 node 나 task를 옮길 수 있다. 그리고 데이터를 다른 시스템에서 가지고 올 때 언제 것을 가지고 왓는지에 대한 정보를 standalone 방식은 local disk에 저장하고, distributed 방식은 kafka의 별도의 topic에 저장한다.

image

connector cluster도 여러개를 만들 수 있다. Connect Worker 파라미터중 group.id 를 다르게 하여 Cluster를 구분한다.

image


Connector 배포

Connector 배포 전에는 connect worker process 안에 아무것도 없는 상태이다.

image

아무것도 없는 상태에서 아래와 같은 config를 통해 connector를 배포하면 Connect Worker에 Connector Instance 및 Task가 생성된다. task안에는 MySQL에서 어떻게 데이터를 가지고 올 것인가에 대한 로직들이 포함되어 있다. Task class가 기동이 되면 source system에서 native data를 가지고 오게 되고 connect 내에서 쓰는 record로 변환을 해준다.

"config": {
  [...]
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "connection.url": "jdbc:postgresql://postgresql_ip:5432/",
  "topics": ”sample.orders",
}

image

Converter

이제 key와 value에 대한 converter를 설정해준다. 말그대로 native data를 가져와서 connect record로 만들었는데 kafka에 넣기 위해서는 byte array로 변환해야 한다. 그 때 사용하는것이 converter이다. key converter와 value converter가 존재한다. converter는 기본적으로 schema list를 연동하게 만들어져있다.

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081

image

Single Message Transform(SMT)

SMT는 단건 메세지별 데이터 변환 기능이다. Task와 Converter 사이에서 데이터 변환이 필요한 경우 사용한다. 20개 정도의 pre-defined SMT를 제공하고 있는데 그 중 대표적인 것은 다음과 같다.

  • Cast: 필드 또는 전체 Key 또는 Value를 특정 유형으로 캐스트(타입 변환) (예: 정수 필드를 더 작은 너비로 강제 적용)
  • Drop: 레코드에서 Key 또는 Value를 삭제하고 null로 설정
  • InsertField: 레코드 메타데이터 또는 구성된 Static Value의 속성을 사용하여 필드를 삽입
  • MaskField: 필드 유형에 대해 유효한 null 값으로 지정된 필드를 마스킹
  • ReplaceField: 필드를 필터링하거나 이름을 변경

여러개의 SMT들을 연결해서 사용할 수도 있다. SMT를 설정하기 위해서는 config 파일을 아래와 같이 변경하면 된다. addTimeToTopic,labelBar 두개의 SMT를 사용하겠다는 뜻이고, transform 마다 원하는 input parameter가 다르기 때문에 그것들을 하나씩 설정해준다.

"config": {
  [...]
  "transforms": "addTimeToTopic,labelBar",
  "transforms.addTimeToTopic.type": "org.apache.kafka.connect.transforms.TimestampRouter",
  "transforms.addTimeToTopic.topic.format": "${topic}-${timestamp}",
  "transforms.addTimeToTopic.timestamp.format": "YYYYMMDD",
  "transforms.labelBar.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.labelBar.renames": "delivery_addr:shipping_addr",
}

이렇게 설정해서 적용하면 connect record를 설정된 SMT의 순서대로 변환해준다.

image

Sink Connector의 Data Flow

Sink Connector는 Source connector 와 방법은 같은데 반대로 뒤집혀진 모양이다. kafka에서 데이터를 가져오면 converter가 먼저 데이터를 convert하고, transform이 동작하고 마지막으로 task가 그 데이터를 native 데이터로 변환해서 DB에 넣어준다.

image