Producer
Kafka producer는 메시지를 생산(Produce)해서 Kafka의 Topic으로 메시지를 보내는 애플리케이션이다.
Record(Message = Record = Event = Data) 구조
메세지는 크게 header, key, value로 구성되어 있다. Header는 kafka topic, partition 등의 kafka 관련 정보들을 담고 있고, 우리가 실제로 보내려고 하는 데이터는 key와 value 안에 넣어서 보내진다. key와 value는 Avro나 json, protobuf 등 다양한 형태가 가능하다.
Serializer/Deserializer
kafka는 데이터를 byte array로만 저장한다. 그래서 우리는 kafka로 메세지를 보낼 때 byte array 로 변환해서 보내야하고, 받을 때도 마찬가지로 byte array를 받아 직접 변환해서 사용해야 한다. producer의 데이터 변환기는 serializer, consumer의 데이터 변환기는 deserializer이다.
따라서 producer는 JSON이나 string 등으로 받은 데이터를 record 형태로 만들고, 이를 key와 value에 넣은 다음에 send 한다. 그러면 producer 안에 있는 subscribe 라이브러리에서 직렬화(serializer)가 이루어 진 후 byte array 로 변환이 된다. Byte array 로 변환된 데이터는 이제 kafka 로 보내진다.
Consumer는 이제 kafka엣 있는 데이터를 가져갈 때, byte array 형태의 데이터를 다시 원본 형태로 역직렬화(deserializer)하여 전달한다.
Producing to Kafka
우리가 실제로 kafka 에 데이터를 producing 하는 방법은 간단하다. 먼저 메세지 레코드를 만들고 send() 함수로 우리가 만든 serializer를 호출하면, 데이터가 직렬화된 후 byte array로 변환이 된다. 그 다음 과정들은 kafka 내부 라이브러리가 알아서 진행해준다.
Partitioner는 이 데이터를 어느 parition으로 보낼 것인지를 정해준다. 데이터는 compress 압축 옵션을 통과하여 recordaccumulator에 보내져서 배치 형태 또는 그냥 쌩으로 kafka에 전송된다. Kafka는 데이터를 받으면 잘 받았다라는 응답을 주고, 실패했을 때는 재시도 옵션에 따라 데이터를 다시 전송한다.
Partitioner
Partitioner는 메세지를 topic의 어떤 partition으로 보낼지를 결정하는 역할을 한다고 했다. Partitioner는 우리가 보내는 메세지의 키값을 가지고 hash 값(숫자)을 만들고, 파티션의 개수로 나눈 나머지를 구해서 몇 번 partition으로 보낼지 결정한다. 그렇기 때문에 키값을 반드시 정해줘야 한다.
만약에 key값이 null일 때는 어떻게 분산이 될까? Kafka 2.4 이전의 DefaultPartitioner는 Round Robin 정책으로 동작하여 1,2,3,… 순서대로 들어갔다. 하지만 이는 batch 로 데이터를 보낼 때는 불가능하다. 그래서 Kafka 2.4이후의 DefaultPartitioner는 Sticky 정책으로 동작하여 하나의 Batch가 닫힐 때 까지 하나의 partition에게 record를 보내고 랜덤으로 다른 Partition을 선택해서 다시 하나의 배치가 끝날 때까지 해당 partition에 레코드를 넣는다.
왼쪽은 6개의 레코드를 6개의 각기 다른 patition으로 날리는 반면에, 오른쪽은 6개의 메세지를 보낼 때 3개씩 묶어서 2번의 request만 날리면 된다. partioner는 별도로 개발해서 세팅할 수 있다.
Consumer
Consumer는 kafka topic의 commit log(=partition)에서 데이터를 가져와서 활용하는 애플리케이션이다. 각 consumer들은 각각의 고유의 속도로 commit log에서 데이터를 순서대로 read(poll)를 한다. 서로 다른 그룹에 속하는 consumer 들은 아무런 관련이 없고, 하나의 partition에서 동시에 데이터를 가져올 수 있다.
Consumer offset
Consumer offset은 Consumer가 자동이나 수동으로 읽은 데이터의 위치를 표시하여 데이터를 중복으로 읽어가는 것을 방지한다.
만약에 consumer group A 에 있는 consumer 가 partition0 에 있는 2번째 데이터를 가져가서 사용을 했다면, 그 다음 읽어갈 위치인 3번째 offset의 위치를 ‘__consumer_offsets’라는 internal topic에 저장한다. GroupA:MyTopic:P0:3
는 나의 그룹은 GroupA, MyTopic 이라는 topic에서 데이터를 읽었고, 읽어간 partition의 번호는 0번, 다음번 읽어갈 데이터의 위치는 3번이다 라는 뜻이다.
partition 여러개에서 하나의 consumer가 데이터를 가져올 때는, 그 consumer가 모든 partition에 있는 데이터를 가져간다. Consumer는 각 Partition에서의 Consumer Offset을 별도로 기록한다.
consumer group
동일한 group.id로 구성된 모든 Consumer들은 하나의 Consumer Group을 형성한다. Consumer Group의 Consumer들은 작업량을 어느정도 균등하게 분할한다.
위의 그림처럼 여려개의 partition과 여러개의 consumer들이 있을 때는, 하나의 Partition은 항상 Consumer Group내의 하나의Consumer에 의해서만 사용된다. 하지만 하나의 consumer는 여러 개의 patition을 사용할 수 있다. 즉, partition:consumer=N:1인 것이다.
여러개의 consumer group이 있는 경우도 마찬가지이다. Consumer group들은 서로간의 상관관계가 없기 때문에 각자 행동한다. 즉, 동일한 topic 에서 consume 하는 여러개의 consumer group이 있을 수 있다는 말이다.
Message Ordering(순서)
메세지를 kafka에 producing 할 때 partitioner가 메세지의 key 값을 가지고 자동으로 어느 partition에 메세지를 보낼지 정해준다고 했다. 만약에 partition이 1개라면 모든 메세지가 partition 하나에 들어가기 때문에 전체 메세지에 대해 순서가 보장 가능하다. 하지만 처리량이 저하된다는 단점이 있다. partition이 여러개일 경우에는 전체 메세지에 대해서 어떤 메세지가 먼저 들어왔는지는 알 수가 없다. 그 이유는 partition 별로 각각 동작하기 때문이다.
그렇다고 순서를 보장하기 위해서 partition을 1개만 사용한다? kafka는 병렬처리를 위한 시스템인데 이렇게 사용하면 당연히 kafka를 쓰는 의미가 사라진다. 그리고 parition을 1개로 구성해서 모든 메세지에 대해 전체 순서를 보장해야 하는 경우가 많을까? 거의 없다. 대부분의 경우, Key로 구분할 수 있는 메시지들의 순서 보장이 필요한 경우가 많다. 예를 들어 각 사용자들의 log를 분석하는 경우를 생각하면 된다. 굳이 서로 다른 사용자의 log를 순서대로 처리할 필요는 없는 것이다.
다시말해서 동일한 Key를 가진 메시지를 동일한 Partition에 전달하게 하여 Key레벨의 순서를 보장하도록 사용하여 2개 이상의 Partition을 사용해 처리량 증가 시키면 된다. 간단하게 같은 key 값을 주기만 하면 partitioner는 해시 알고리즘을 통해서 같은 parition으로 분배해 줄 것이다. 그러면 consumer도 특정 key를 가진 데이터에 대해서 순서대로 데이터를 가지고 갈 수 있다. 주의할 점은 운영중에 Partition 개수를 변경하면 partitioner의 알고리즘 때문에 순서가 보장 불가하다.
Cardinality
Cardinality는 특정 데이터 집합에서 유니크(Unique)한 값의 개수가 얼마나 많냐에 대한 분포를 나타낸다. 아래 그림처럼 partition에 메세지가 고르게 들어가지 않아 consumer 들간의 작업량의 차이가 큰 경우가 있다. 따라서 key 값은 레코드르르 partition 전체에 고르게 분배하도록 만들어야 한다.
Consumer Failure
사실 consumer의 개수와 partition의 개수가 동일한 경우가 가장 이상적이다. 이 때 하나의 consumer가 장애가 난다면, 장애가 난 consumer가 담당하고 있던 partition을 다른 consumer 가 가져갈 수 있도록 rebalancing이 일어난다. 이때도 여전히 하나의 parition은 consumer group 내의 하나의 consumer에 의해서만 사용된다.