Replication이란?
Broker에 장애가 발생한다면 장애가 발생한 Broker의 Partition들은 모두 사용할 수 없게 된다.
그럼 간단하게 생각해서 다른 broker에 새로운 partition을 생성해주면 되지 않을까? 하지만 partition을 새로 생성해 준것이기 때문에 지금까지 쌓아놓은 데이터까지 옮겨지지 않는 문제가 있다.
이를 방지하기 위해 replication이라는 것이 등장했다. Partition을 새롭게 생성하는 것이 아니라 복제(replication) 하여 다른 broker 상에 복제물 (replica)를 만들어 놓고 장애를 대비한다. producer로 부터 새로운 데이터가 들어오면 바로 다른 broker 에 있는 replica에 복사해서 똑같은 데이터를 만들어 놓는다. 이렇게 하면 과거의 데이터도 가지고 있으면서 새로운 데이터도 받을 수 있다.
원래 원본을 가지고 있는 partition을 Leader partition이라고 하고, 복제를 해가고 있는 partition을 follower partition이라고 한다. Producer는 Leader에만 Write하고, Consumer는 Leader로부터만 Read한다. Follower는 Broker장애시 안정성을 제공하기 위해서만 존재하는 것이고, 서비스를 하기 위해 존재하는 것이 아니다. 서비스는 leader를 가지고만 진행한다. Follower는 Leader의 Commit Log에서 데이터를 가져오기 요청(Fetch Request)으로 복제한다. 즉, leader가 알아서 복사본을 주는 것이 아니고, follwer들이 가져간다고 말을하고 가져가는 형태이다.
Broker에 장애가 발생한다면?
Leader partition에 장애가 발생한다면 kafka cluster가 follwer partition 중에서 하나를 골라 새로운 leader로 선정한다.(선정하는 과정은 매우 복잡) 그리고 producer는 바뀐 leader 쪽으로 데이터를 보낸다. consumer도 마찬가지고 바뀐 leader에서 데이터를 가져간다. 그리고 follwer는 새로운 leader에서 데이터를 복제해간다.
Leader partition 자동 분산
만약에 하나의 broker에만 leader parition들이 몰려있다면, Producer는 Leader에만 Write하고 Consumer는 Leader로부터만 Read하기 때문에 모든 producer들과 cosumer들은 해당 broker에만 몰릴것이고, 그렇게 하면 과부하가 일어나게 될것이다.
partition leader를 자동으로 분산시키는 옵션이 있다. auto.leader.rebalance.enable
을 enable로 하면 된다. 또 다른 옵션으로는 leader.imbalance.check.interval.seconds : 기본값300 (300초 마다 leader의 불균형이 있는지 체크), leader.imbalance.per.broker.percentage : 기본값10 (다른 broker 보다 leader를 10% 이상 가져가면 불균형) 가 있다.
Rack Awareness
Rack 간 broker들을 분산하여 Rack 장애를 대비할 수 있는데, 이 경우에 broker가 다른 rack에 있다고 알려줘야 한다. 이 때 쓰는 것이 broker.rack
옵션이다. 복제본(Replica-Leader/Follower)은 최대한 Rack간에 균형을 유지하여 Rack 장애를 대비한다. Rack awareness로 topic을 만들면 그 때 broker.rack 옵션을 보고 분산을 시키게 된다.
In-Sync Replicas (ISR)
In-Sync Replicas(ISR)는 High Water Mark라고 하는 지점까지 동일한 Replicas (Leader와 Follower 모두)의 목록이다. follwer 중에서 high water mark 까지 잘 복사해서 가지고 있는 애는 ISR이라고 하고, Leader에 장애가 발생하면 ISR 중에서 새 Leader를 선출한다.
replica.lag.max.messages
High Water Mark 지점까지 문제없이 잘 복사하고 있는지는 replica.lag.max.messages
라는 옵션으로 판단할 수 있다.
Leader의 맨 끝은 LOG-END-OFFSET라고 하는데, 만약 replica.lag.max.messages=4
라고 하면 LOG-END-OFFSET과 replica가 복사한 데이터의 마지막 offset 위치가 4개 미만이면 해당 follwer partition을 ISR 이라고 한다. 반대로 leader를 잘 복사하지 못한 follwer는 Out-of-Sync Follower(OSR) 이라고 한다.
replica.lag.time.max.ms
메시지 유입량이 갑자기 늘어날 경우 replica.lag.max.messages로 ISR을 판단하면 다음과 같은 문제가 발생한다. 메시지 유입량이 갑자기 늘어날 경우(예,초당10 msg/sec), 지연으로 판단하고 모든 replica들을 OSR(Outof-Sync Replica)로 상태를 변경시키게 된다. 실제 Follower는 정상적으로 동작하고 있었지만 잠깐의 지연이 발생했을 뿐인데, replica.lag.max.messages 옵션을 이용하면 OSR로 판단하게 되는 문제가 발생한다는 것이다. 이렇게 되면 운영중에 불필요한 error(데이터를 보내는데 충분한 ISR 가 없다는 에러)가 발생하게 되고, 그로 인한 불필요한 producer retry가 유발된다.
그래서 replica.lag.time.max.ms으로 ISR을 판단한다. 이는 Follower가 Leader로 Fetch 요청을 보내는Interval을 체크한다. replica.lag.time.max.ms = 10000이라면 Follower가 Leader로 Fetch 요청을 10000 ms내에만 요청하면 정상으로 판단한다. Confluent 에서는replica.lag.time.max.ms옵션만 제공한다.
ISR 관리
ISR은 leader partition이 떠있는 broker가 관리한다. broker는 replica.lag.time.max.ms이내에 Follower가 fetch하지 않으면 ISR에서 제거하고 zookeeper에게 ISR 정보를 보낸다. Zookeeper는 다시 controller 라고 하는 broker에 ISR 정보를 보낸다.
kafka cluster 내의 broker 중 하나가 controller가 된다. Controller는 zookeeper를 통해 broker의 상태를 실시간으로 모니터링하고, zookeeper에서 받은 leader와 replica 정보를 cluster 내의 다른 broker들에게 전달한다. 다른 broker 들에게 정보를 전달하는 이유는 동일한 정보를 모든 broker들에게 캐시해서 더 빠르게 엑세스 할 수 있도록 한다. 또한, Controller는 leader가 속한 broker 가 죽으면 leader를 다시 선출해준다. 이러한 역할을 하는 controller가 죽으면 zookeeper가 다른 broker 중에서 controller를 다시 고른다.
Consumer 관련 Position
- Last Committed Offset(Current Offset) : Consumer가 최종 Commit한 Offset
- consumer가 여기까지 가져갔다는 표시
- Current Position : Consumer가 읽어간 위치 (처리중, Commit 전)
- batch로 메세지를 가져가는 경우 최종으로 읽어간 위치
- High Water Mark(Committed) : ISR(Leader-Follower)간에 복제된 Offset
- Log End Offset : Producer가 메시지를 보내서 저장된, 로그의 맨 끝 Offset
Committed란?
commited의 의미는 ISR 목록의 모든 Replicas가 메시지를 받은 상태이다. OSR은 신경쓰지 않는다. 아래 그림에서는 ISR의 모든 replica가 5번까지 메세지를 받았기 때문에 5번의 위치가 fully-replicated commited가 된다. consumer는 commited 된 메세지만 읽을 수 있다. 그리고 leader는 메세지를 commit할 시기를 결정해준다. committed 메세지는 모든 follwer에서 동일한 offset을 갖도록 보장한다. 이 말은 아래 그림에서 5번이 committed 위치라고 했는데, OSR도 언젠가는 5번까지 따라잡아서 5번 메세지를 가질 것이라는 말이다. 즉, 5번까지는 잘 복제된다는것을 보장한다는 뜻이고, 어떤 replica가 leader 인지에 관계없이 모든 consumer는 5번 데이터 까지 볼 수 있다는 뜻이다. Broker가 다시 시작할 때 Committed 메시지 목록을 유지하도록 하기 위해, Broker의 모든 Partition에 대한 마지막 Committed Offset replicationoffset-checkpoint라는 파일에 기록된다.
Replicas 동기화
- High Water Mark
- 가장 최근에 Committed 메시지의 Offset 추적
-
replication-offset-checkpoint 파일에 체크포인트를 기록
- Leader Epoch
- 새 Leader가 선출된 시점을 Offset으로 표시 (새로운 leader가 선출되면 epoch0 에서 epoch1로 바뀜)
- Broker 복구중에 메시지를 체크포인트로 자른 다음 현재 Leader를 따르기 위해 사용됨
- Controller가 새 Leader를 선택하면 Leader Epoch를 업데이트하고 해당 정보를 ISR 목록의 모든 구성원에게 보냄
- leader-epoch-checkpoint 파일에 체크포인트를 기록
Message Commit 과정
Follwer는 leader로부터 fetch만 수행한다고 했다. Broker 내에는 fetcher thread 라는 것이 존재한다. fetcher thread는 leader에서 계속해서 데이터를 가져오는 역할을 한다.
- Offset 5 까지 복제가 완료되어 있는 상황에서, Producer가 메시지를 보내면 Leader가 offset 6에 새 메시지를 추가
- 각 Follower들의 Fetcher Thread가 독립적으로 fetch를 수행하고, 가져온 메시지를 offset 6에 Write
- 각 Follower들의 Fetcher Thread 가 독립적으로 다시 fetch를 수행하고 새로운 데이터가 없기 때문에 null을 받음. Leader는 follwer들이 6번을 잘 받았기 때문에 7번을 요구한다고 생각하고 High Water Mark를 6번으로 이동
- 각 Follower들의 Fetcher Thread가 독립적으로 다시 fetch를 수행하고 High Water Mark를 받아서 follwer들의 high water mark를 이동시킴