Producer acks

acks는 Producer가 kafka에게 메세지를 보냈을 때 kafka가 메세지를 잘 받았는지 확인하기 위해 사용하는 producer parameter이다. 다시말해 acks는 요청이 성공할때를 정의하는데 사용되는 Producer Parameter 이다.

acks=0 은 producer가 kafka에게 메세지를 보내면 끝, 메시지 손실이 다소 있더라도 빠르게 메시지를 보내야 하는 경우에 사용한다.

image

acks=1은 default값으로, Leader가 메시지를 수신하면 잘 받았다고 producer에게 ack를 보낸다. 하지만 Leader가 Producer에게 ACK를 보낸 후, Follower가 복제하기 전에 Leader에 장애가 발생하면 메시지가 손실된다. 그래서 “At most once(최대한번)” 전송을 보장한다고 말한다.

image

acks=-1 or acks=all은 메시지가 모든 Replica까지 Commit되면 ack를 보낸다. Leader가 죽어도 데이터가 다 복제되었다는 확증을 할 수 있다. 그러나 acks=1보다 acks를 받는 시간이 더 길고, producer에 retry 옵션이 적용되었고 broker는 acks를 못 보내는 특정 케이스에서는 데이터가 쌓이는 문제가 있다.그렇기 때문에 “At least once(최소한번)” 전송을 보장한다.

image

Producer retry

Producer 재시도(retry)는 네트워크 또는 시스템의 일시적인 오류를 보완하기 위해 사용한다.

image

보통 delivery.timeout.ms 조정으로 재시도 동작을 제어한다. 참고로 acks=0 에서는 retry 옵셥이 무의미하다.

Producer Batch 처리

Producer에서는 producer record를 만들고 send()로 메세지를 보낸다. 그러면 serializer -> Partitioner -> Compress -> RecordAccumulator -> ProduceRequest 오브젝트 형태에 담아 그걸 broker 쪽으로 보낸다. 이 때 메세지를 한 번에 모아서 보내는 producer 옵션이 있다. linger.ms 옵션과 batch.size 옵션이다.

  • liner.ms
    • default : 0, 즉시보냄
    • 메시지가 함께 Batch 처리될때까지 대기시간
    • 대기시간 동안 모인 메세지들만 한번에 보내는 방법
    • 일반적으로 linger.ms=100 으로 설정
  • batch.size
    • default : 16 KB
    • 보내기전 Batch의 최대 크기
    • 내가 정한 메세지의 크기가 도달할 때 까지 계속 기다렸다가 해당 크기가 되면 한번에 보내는 방법
    • 일반적으로 batch.size=1000000 으로 설정

한 번에 메세지를 보내는 것을 batch 처리라고 하는데, batch 처리는 broker 가 처리하는 작업이 줄어들기 때문에 더 나은 처리량을 제공한다.

Producer Delivery Timeout

Producer는 메세지를 send()하고 메세지를 저장할 버퍼를 할당받을 때까지 대기하는 시간이 있다. 그것을 max.block.ms 로 제어한다. send()가 되면 linger.ms 옵션으로 메세지가 batch 처리될 때까지의 대기 시간을 지정한다. 그리고 메세지를 보내면 kafka에 갔다가 메세지가 잘 들어갔다는 응답이 돌아올 때 까지 기다리는 시간을 request.timeout.ms 로 설정한다. 메세지가 제대로 들어가지 않았을 때는 retry.backoff.ms 옵션으로 retry 사이의 대기시간을 설정한다. batch를 만들 때 부터 메세지를 잘 받았다고 응답을 받는 것 까지를 delivery.timeout.ms 로 제어한다.

image

Producer message send 순서 보장

Producer가 메세지를 보낼 때 여러개의 request 가 동시에 broker로 날아가는 경우가 있다. 이때 하나의 produer 하나의 broker에 동시에 보낼 수 있는 최대한의 메세지 개수를 max.in.flight.requests.per.connection 옵션으로 설정한다. default 값은 5이고, 5로 하면 한번에 5개의 batch가 날아갈 수 있다.

image

하지만 batch0을 보내는 데 실패했다면, batch1이 batch0보다 먼저 commit log에 추가되어 메세지 전송 순서가 달라진다. 이를 방지하기 위해 메세지 순서를 보장하는 옵션이 있는데 enalble.idempotence=True로 하면 된다. enable.idempotence를 사용하면, 하나의 Batch가 실패하면, 같은 Partition으로 들어오는 후속 Batch들은 OutOfOrderSequenceException 에러를 보내면서 같이 메세지 전송을 실패한다. 그러고 다시 batch 0 부터 retry를 하게 된다.

Page Cache 와Flush

메세지가 broker안의 partition에 전송되고 disk 에 저장되는 과정은 다음과 같다.

image

Partition의 데이터를 바로 disk에 저장하지 않는다. 그럼 어떻게 disk에 저장될까? Partition은 Log Segment file로 구성된다고 했고, broker는 성능의 향상을 위해 이 Log Segment를 OS Page Cache에 기록한다. 그게 나중에 flush 해서 disk로 들어간다. 로그 파일에 저장된 메시지의 데이터 형식은 Broker가 Producer로부터 받은 것이고, Consumer에게 보내는것과 정확히 동일하기 때문에 Zero-Copy가 가능하다. Zero-copy 전송은 데이터가 CPU 개입없이 Page Cache와 Network Buffer사이에서 직접 전송되는 것을 의미한다. Page Cache는 Broker가 완전히 종료되었거나, OS background “Flusher Thread” 가 동작할 때 디스크로 Flush된다.