Consumer의 동작 방식

Consumer는 메시지를 가져오기 위해 Partition에 연속적으로 Polling 한다. poll(100) 는 100ms 마다 메세지를 가져온다는 뜻이다. 그리고 가져온 위치를 나타내는 offset 정보를 __consumer_offsets 라는 internal Topic에 저장하여 관리한다.

image

동일한 group.id로 구성된 모든 Consumer들은 하나의 Consumer Group을 형성하고, 동일한 Topic에서 consume하는 여러 Consumer Group이 있을 수 있다. 각 consumer group은 독립적으로 동작하기 때문이다.

image


ConsumerPartition Assignment

이번에는 consumer 가 partition 개수에 따라서 어떤식으로 할당이 되는지 알아본다.

Partition을 Consumer에게 Assign(할당) 할 때,

  • 하나의 Partition은 지정된 Consumer Group내의 하나의 Consumer만 사용
  • 동일한 Key를 가진 메시지는 동일한 Consumer가 사용 (Partition수를 변경하지 않는다는 가정)
  • Consumer의 설정 파라미터중에서 partition.assignment.strategy로 할당 방식 조정
  • Consumer Group은 Group Coordinator라는 프로세스에 의해 관리됨

결론부터 말하면 Group Coordinator (=하나의 Broker)와 Group Leader(=하나의 Consumer)가 상호작용하면서 comsumer를 특정 partition에 할당해준다.

image

① Consumer 등록 및 Group Coordinator 선택

image

kafka cluster 내에 6개의 partition을 가지고 있는 topic 있다고 하자. Consumer group 에는 0~6번 까지 총 7개의 consumer 들이 있다. 각 consumer 들은 동일한 group.id 로 kafka cluster 안에 있는 broker 에 접근을 하는데, 이 때 kafka는 이 broker에 접속하는 이러한 consumer 들이 있구나라고 하면서 하나의 comsumer group을 생성한다.

이제 consumer group 안에 있는 consumer 들이 offset을 기록할 leader partition을 정해준다. hash(group.id) % offsets.topic.num.partitions 수식을 사용하여 group.id가 저장될__consumer_offsets의 Partition을 결정한다. 이 알고리즘으로 broker 102가 group coordinator가 되었다고 가정하자.

② JoinGroup 요청 순서에 따라 Consumer 나열

image

Group coordinator 가 된 broker 102는 consumer 카탈로그를 생성하기 전에 consumer들로부터 join group 요청을 받는다. consumer group에 있는 consumer들은 broker에 붙을 때 joingroup이라는 request를 날리는데, group.initial.rebalance.ms (default 3초) 동안 들어오는 joingroup request를 날린 consumer들을 같은 그룹으로 생각한다.

이때 group coordinator는 joingroup request를 날린 순서를 consumer group에 던져주는데, 가장 먼저 요청을 날린 consumer 한테 그 정보를 던져준다. Joingroup 요청을 가장 먼저 보낸 consumer가 group leader가 된다. 여기서 group leader는 consumer 0번이라고 하자.

③ Group Leader 결정 및 Partition 할당

image

group leader와 group coordinator가 서로 상호작용을 하면서 어떤 식으로 할당할 것인지 정해진다. Group Leader는 구성된 partition.assignment.strategy를 사용하여 각 Consumer에게 Partition을 할당한다. partition.assignment.strategy는 consumer 파라미터이다.

consumer 보다 partition이 많은 경우에는 한 개의 consumer 에게 여러 partition이 할당되는데, 아래와 같이 consumer의 개수가 더 많으면 할당받지 못하는 partition이 생긴다.

④ Consumer → Partition 매핑 정보를 Group Coordinator에게 전송

image

이렇게 각 consumer를 partition에 매핑한 정보는 다시 group coordinator에게 보내져, 이렇게 할당을 할게 라고 알려준다.

⑤ 각 Consumer에게 할당된 Partition 정보를 보냄

image

그러면 broker 102는 매핑 정보를 메모리 캐시하고, zookeeper에게 보내는 작업을 해주고, 최종적으로 이렇게 할당하면 되겠다 라고 응답을 consumer group에 있는 모든 consumer들에게 다 보내준다. 이제 각 consumer는 할당받은 partition에서 데이터를 받아온다.

왜 Group Coordinator(Broker)가 직접 Partition을 할당하지 않는가?

Kafka 는 가능한한 많은 계산을 클라이언트(producer, consumer)가 수행하도록 하여, Broker의 부담을 줄인다. 그 이유는 많은 Consumer Group과 Consumer들이 있고 Broker 혼자서 Rebalance를 위한 계산을 한다고 생각해보면 Broker에 엄청난 부담이 될 것이다. 따라서 이러한 계산을 Broker가 아닌 클라이언트에게 오프로드(Offload) 하는 것이다.


Consumer Rebalancing Trigger

Consumer Rebalancing은 consumer group에 있는 어떤 consumer 가 나 group에 join 할래라는 joinrequest를 보내면 바로 트리거가 시작된다. 이외에 rebalancing 이 일어나는 상황은 다음과 같다.

  • Consumer가 Consumer Group에서 탈퇴 (consumer 개수 감소)
  • 신규 Consumer가 Consumer Group에 합류 (consumer 개수 증가)
  • Consumer가 Topic 구독을 변경
  • Consumer Group가 Topic 메타데이터의 변경사항을 인지 (예:Partition증가)

Consumer Rebalabncing Process

  1. Group Coordinator는 heartbeats의 플래그를 사용하여 Consumer에게 Rebalance 신호를 보냄
  2. 모든 Consumer가 일시중지하고 Offset을 Commit
  3. Consumer는 Consumer Group의 새로운”Generation”에 다시 합류
  4. Partition 재할당
  5. Consumer는 새 Partition에서 다시 Consume을 시작

2번 에서 Consumer Rebalancing시 Consumer들은 메시지를 Consume 하지 못하기 때문에 불필요 Rebalancing은 반드시 피해야 한다.

Consumer Heartbeats

Broker 가 consumer에 대한 장애를 인지하기 위해 사용하는 것이 consumer heartbeat이다. Consumer poll()과 별도로 백그라운드 Thread에서 Heartbeats를 broker로 보낸다. 나 살아있다는 것을 주기적으로 알리는 것이다. 이 주기를 heartbeat.interval.ms (기본 3초)로 설정한다.

이 때 session.timeout.ms (기본 10초) 라는 시간 동안 heartbeat가 수신되지 않으면 consumer가 죽은 것으로 인지하고 consumer group에서 삭제된다.

poll()도 heartbeat와 상관없이 주기적으로 호출이 되어야 한다. max.poll.interval.ms (기본 5분) 에 한번씩 호출한다.

과도한 rebalancing을 피하는 방법

  1. Consumer Group 멤버고정
    • Group의 각 Consumer에게 고유한 group.instance.id를 할당
      • 같은 group.instance.id을 가진 consumer가 다시 join 하면 rebalancing 하지 않음
    • Consumer는 LeaveGroupRequest를 사용하지 않아야 함
    • Rejoin(재가입)은 알려진 group.instance.id에 대한Rebalance를 trigger 하지 않음
  2. session.timeout.ms튜닝
    • heartbeat.interval.ms를 session.timeout.ms의 1/3로 설정
    • group.min.session.timeout.ms (Default: 6 seconds) 와 group.max.session.timeout.ms (Default: 5 minutes)의 사이값으로 설정
    • 장점: Consumer가 Rejoin(재가입)할 수 있는 더 많은 시간을 제공
    • 단점: Consumer 장애를 감지하는데 시간이 더 오래걸림
  3. max.poll.interval.ms튜닝
    • Consumer에게 poll()한 데이터를 처리할 수 있는 충분한 시간 제공
    • 너무 크게 하면 안됨