Rest Proxy
Rest Proxy는 kafka client를 사용하지 않고도 Restful API를 통해 kafka의 일부 기능들을 사용할 수 있도록 구현된 서비스이다. topic을 만들거나 메세지 producing이나 comsuming도 가능하며, curl로 테스트나 디버깅도 할 수 있다. 또한 이미 HTTP로 운영하던 서비스는 쉽게 통합이 가능하다. 하지만 kafka broker와 별도 어플리케이션으로 구동되기 때문에 관리를 더 많이 해야하고, 일번적으로 kafka client 보다 성능적으로 좋지 못하다는 단점이 있다. 따라서 kafka client를 도입하지 못하는 상황이거나 kafka client가 제공하지 않는 언어를 사용하거나 빠른 결과 확인을 해야 할 때 많이 사용한다. Rest Proxy는 confluent community 라이센스를 가지고 있다.
Schema Registry
MSA에서는 kafka 미들웨어의 도입을 통해 서비스가 서비스를 호출하는 방식이 아닌 kafka를 통해 메세지를 송수신 함으로써 비동기 처리를 가능하게 한다. 이때 producer와 consumer가 서로 누군지 모르는 상태에서 작동한다. 하지만 필드가 삭제/추가 되거나 데이터의 형이 변경될 수도 있기 때문에 메세지의 스키마는 항상 변동된다. 이때문에 producer와 consumer를 다른 사람이 관리하고 있는 상황에서 스키마의 변동에 대한 정보가 제대로 전달되지 않아 문제가 되는 경우도 있다. 이러한 문제를 해결하기 위해 메세지 스키마 유효성에 대해서 상호 보증할 수 있는 메커니즘이 필요하게 되었고, Schema Registry가 그 역할을 한다.
Schema Registry는 버저닝과 스키마 호환성이라는 것이 있다. Schema Registry는 동일 스키마에 대한 호환성 체크를 하기 위해 버전을 유지한다. 예를 들어 최초 스키마 생성 시 버전1, 그 이후에 필드의 수정이 있으면 버전2 이렇게 버전을 증가시키면서 버저닝을 한다. 호환성 타입은 아래와 같이 4가지가 있다. 참고로 Restful API를 통해 호환성 타입을 변경도 가능하다.
- Backward(기본설정): 필드삭제, optional 필드 추가 허용 - 컨슈머부터 업그레이드
- 이전 스키마를 사용하던 컨슈머가 새 스키마를 사용해 데이터를 읽을 수 있다는 보장이 없음
- Forward: 필드 추가, optional 필드 삭제 허용 - 프로듀서부터 업그레이드
- 새 스키마를 사용하는 컨슈머가 이전 스키마를 사용해 데이터를 읽을 수 있다는 보장이 없음
- Full: Backward, Forward 모두 만족함 - 순서 무관
- None: Backward, Forward 모두 만족하지 않음 - 순서 무관
Schema Registry 동작 방식은 아래와 같다. Schema Registry도 Rest proxy 처럼 kafka broker와는 별도의 어플리케이션으로 작동한다. Producer가 Schema Registry에 스키마를 전송하고, avro/json/protobuf 포맷으로 스키마 ID와 페이로드를 인코딩하여 kafka broker에게 전송한다. (Avro는 json으로 스키마를 작성해서 binary 포맷으로 serialize 시켜준다.) Consumer는 avro/json/protobuf 중 하나의 포맷으로 변환이 된 포맷을 받고, 어떤 스키마 ID를 사용하는지 추출해서 스키마를 Schema Registry 로부터 스키마를 받는다. 그 후 그 스키마로 deserialize해서 최종적으로 메세지를 받는다. Producer와 consumer는 매번 Schema Registry에 스키마를 전송하고 받는것이 아니고, 로컬 캐시에 해당 스키마가 존재하지 않을 때만 Schema Registry와 송수신을 한다.
Schema Registry 구성으로 Producer가 잘못된 메세지를 전달하더라도 스키마 전송과정에서 실패하게 될 것이고, consumer는 잘못된 메세지를 받을 우려가 없어 장애를 미연에 방지할 수 있다. 하지만 Schema Registry가 장애가 날 경우에는 심각한 문제가 일어날 수 있다는 단점이 있다.
실습
먼저 아래와 같이 /etc/hosts 파일을 수정해준다.
$ sudo vim /etc/hosts
yaml 파일은 아래와 같고, 아래 yaml 파일을 실행한다.
$ docker compose -f <yaml파일 이름> up
version: '3'
services:
zookeeper-1:
hostname: zookeeper1
image: confluentinc/cp-zookeeper:6.2.0
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: 12181
ZOOKEEPER_DATA_DIR: /zookeeper/data
ports:
- 12181:12181
volumes:
- ./zookeeper/data/1:/zookeeper/data
kafka-1:
hostname: kafka1
image: confluentinc/cp-kafka:6.2.0
depends_on:
- zookeeper-1
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:12181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:19092
KAFKA_LOG_DIRS: /kafka
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
ports:
- 19092:19092
volumes:
- ./kafka/logs/1:/kafka
schema-registry-1:
hostname: schemaregistry1
image: confluentinc/cp-schema-registry:6.2.0
depends_on:
- kafka-1
environment:
SCHEMA_REGISTRY_HOST_NAME: schemaregistry1
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092 # 연결할 kafka broker
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:18081 # schemaregistry endpoint
SCHEMA_REGISTRY_SCHEMA_COMPATIBILITY_LEVEL: backward
ports:
- 18081:18081
rest-proxy-1:
hostname: restproxy1
image: confluentinc/cp-kafka-rest:6.2.0
depends_on:
- schema-registry-1
ports:
- 18082:18082
environment:
KAFKA_REST_HOST_NAME: restproxy1
KAFKA_REST_BOOTSTRAP_SERVERS: PLAINTEXT://kafka1:19092 # rest-proxy 가 접근할 kafka broker
KAFKA_REST_LISTENERS: http://0.0.0.0:18082 # rest-proxy endpoint
KAFKA_REST_SCHEMA_REGISTRY_URL: http://schemaregistry1:18081 # avro/json/protobuf serializer와 deserializer가 접근해서 스키마 유효성 검증할 때 사용
KAFKA_REST_PRODUCER_THREADS: 3
KAFKA_REST_CONSUMER_THREADS: 3
KAFKA_REST_CONSUMER_REQUEST_TIMEOUT_MS: 5000
schema-registry 실습
이제 스키마를 생성해보자. 스키마는 kafka 통해서 오가는 메세지의 포맷을 정의한다. Schema Registry는 스키마가 시간에 따라서 변화하는 범위를 지정하는게 목적이며, 이 범위 자체를 subject 라고 한다. s1은 string, s2는 int 타입으로 정의한다.
# 스키마 생성
$ curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\"type\": \"string\"}"}' http://schemaregistry1:18081/subjects/s1/versions
$ curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\"type\": \"int\"}"}' http://schemaregistry1:18081/subjects/s2/versions
# 등록한 subject 조회
$ curl -v -XGET http://schemaregistry1:18081/subjects
# 설정한 호환성 타입 확인
$ curl -v -XGET http://schemaregistry1:18081/config
# 호환성 타입 변경
$ curl -v -XPUT --data '{"compatibility": "FULL"}' -H'Content-Type: application/vnd.schemaregistry.v1+json' http://schemaregistry1:18081/config
# 변경한 호환성 타입 확인
$ curl -v -XGET http://schemaregistry1:18081/config
$ curl -v -XPUT --data '{"compatibility": "BACKWARD"}' -H'Content-Type: application/vnd.schemaregistry.v1+json' http://schemaregistry1:18081/config
$ curl -v -XGET http://schemaregistry1:18081/config
# subject별 호환성 타입 설정
$ curl -v -XPUT --data '{"compatibility": "NONE"}' -H'Content-Type: application/vnd.schemaregistry.v1+json' http://schemaregistry1:18081/config/s1
# 스키마 아이디로 등록된 스키마 조회(전역)
$ curl -v -XGET http://schemaregistry1:18081/schemas/ids/1
# s1 subject 내 버전으로 스키마 조회(지역)
$ curl -v -XGET http://schemaregistry1:18081/subjects/s1/versions/1
# s1 subject 내 최신버전으로 스키마 조회
curl -v -XGET http://schemaregistry1:18081/subjects/s1/versions/latest
# 호환성 체크
$ curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\"type\": \"int\"}"}' http://schemaregistry1:18081/compatibility/subjects/s1/versions/latest
$ curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\"type\": \"string\"}"}' http://schemaregistry1:18081/compatibility/subjects/s2/versions/latest
$ curl -v -XPOST -H'Content-Type: application/vnd.schemaregistry.v1+json' --data '{"schema": "{\"type\": \"int\"}"}' http://schemaregistry1:18081/compatibility/subjects/s2/versions/latest
원래 지정되어 있던 int로 했을 때만 compatible true가 뜬다.
# 등록된 subject 삭제
$ curl -v -XDELETE http://schemaregistry1:18081/subjects/s1
$ curl -v -XDELETE http://schemaregistry1:18081/subjects/s2
Rest Proxy 실습
bank라는 topic에 메세지를 전달해보자. 여기서는 value_schema만 정의했다. 원래는 key_schema도 정해줄수 있다.
# Avro examples
$ curl -v -XPOST -H'Content-Type: application/vnd.kafka.avro.v2+json' -H'Accept: application/vnd.kafka.v2+json' --data '{"value_schema": "{\"type\": \"record\", \"name\": \"account\", \"fields\": [{\"name\": \"balance\", \"type\": \"int\"}]}", "records": [{"value": {"balance": 10000}}]}' http://restproxy1:18082/topics/bank
Avro 메세지를 소비할 수 있는 consumer를 생성한다.
# Consumer 생성
$ curl -v -XPOST -H'Content-Type: application/vnd.kafka.v2+json' --data '{"name": "my_consumer_instance", "format": "avro", "auto.offset.reset": "earliest"}' http://restproxy1:18082/consumers/my_consumer_group
consumer 그룹에 consumer 인스턴스가 생성되고, instance_id는 아까 지정했던대로 my_consumer_instance가 나오고, base_uri는 consumer에 접근해서 구독하게 만들게 위해 필요하다.
아까 메세지를 넣어놨던 topic bank 에 대해 구독을 해본다. http://restproxy1:18082/consumers/my_consumer_group/instances/my_consumer_instance/
가 base_uri이고, 이 뒤에 uri를 축가해서 원하는 작업을 할 수 있다. 구독을 하려면 subscription을 사용하면 된다.
# Topic 구독
$ curl -v -XPOST -H'Content-Type: application/vnd.kafka.v2+json' --data '{"topics":["bank"]}' http://restproxy1:18082/consumers/my_consumer_group/instances/my_consumer_instance/subscription
소비를 할 때는 base_uri에 records를 추가해주면 된다.
# 메세지 처음부터 소비
$ curl -v -XGET -H'Accept: application/vnd.kafka.avro.v2+json' http://restproxy1:18082/consumers/my_consumer_group/instances/my_consumer_instance/records
처음에 넣었던 메세지가 출력된느 것을 볼 수 있다.