Delivery Semantics
Producer가 메세지를 전송하는 방식은 여러가지가 있다.
- At-Most-Once Semantics(최대한번)
- 확인 시간이 초과되거나 오류가 반환될때 Producer가 재시도 하지 않으면, 메시지가 Kafka Topic에 기록되지 않아 Consumer 에게 전달되지 않을 수 있음
- 중복 가능성을 피하기 위해 때때로 메시지가 전달되지 않을 수 있음을 허용
- At-Least-Once Semantics(최소한번)
- Producer가 Kafka Broker로부터 ack를 수신하고 acks=all 이면 메시지가 Kafka Topic에 최소 한번 작성되었음을 의미함
- 그러나 ack가 시간 초과되거나 오류를 수신하면 메시지가 Kafka Topic에 기록되지 않았다고 가정하 고메시지 전송을 다시 시도할 수 있음
- Broker가 ack를 보내기 직전에 실패했지만 메시지가 Kafka Topic에 성공적으로 기록된 후에 이 재시도를 수행하면 메시지가 두 번 기록되어 최종 Consumer에게 두번 이상 전달 되어 중복 작업과 같은 잘못된 결과로 이어질 수 있음
- Exactly-Once Semantics(정확히한번)
- Producer가 메시지 전송을 다시 시도하더라도 메시지가 최종 Consumer에게 정확히 한 번 전달됨
- 메시징 시스템 자체와 메시지를 생성하고 소비하는 애플리케이션 간의 협력이 반드시 필요함
- 예를들어, 메시지를 성공적으로 사용한 후 Kafka Consumer를 이전 Offset으로 되감으면 해당 Offset에서 최신 Offset까지 모든 메시지를 다시 수신하게 됨
Exactly-Once Semantics (EOS)
EOS는 중복 메세지로 인한 중복 처리를 방지하기 위해 사용된다. 데이터가 정확히한번 처리되도록 보장해야 하는 실시간 미션 크리티컬 스트리밍 Application이다. EOS를 하기 위해서는 두 가지 옵션을 사용해야 한다.
- Idempotent Producer
- 클라이언트(Idempotent Producer)에서 생성되는 중복 메시지 방지
- Producer의 파라미터중 enable.idempotence 를 true 로 설정해야 함
- Producer가 Retry(재시도)를 하더라도, 메시지 중복을 방지함
- Transaction
- 하나의 트랜잭션 내의 모든 메시지가 모두 Write 되던지, 전혀 Write 되지 않게 하던지 (Atomic Message) 설정
- 각 Producer에 고유한 transactional.id를 설정
- Producer를 Transaction API를 사용하여 개발되어 있어야 함
- Consumer에서 isolation.level을 read_committed로 설정 (transaction 은 commit 하기 전에 들어와 있는 메세지를 consumer 가 가져가서 쓰면 안되기 때문)
EOS는 java client 에서만 지원한다.
- Transaction Coordinator 사용
- 특별한Transaction Log를관리하는Broker Thread
- 일련의ID 번호(Producer ID, Sequence Number, Transaction ID)를할당하고 클라이언트가이정보를메시지Header에포함하여메시지를고유하게식별
- Sequence Number는Broker가중복된메시지를skip할수있게함
Idempotent Producer 메시지 전송 프로세스
메세지는 sequence number와 고유한 producer ID 를 가지고 전송이 된다. 이렇게 보내진 데이터는 broker의 메모리에 map 정보가 저장된다. 이 map은 *.snapshot
파일에 일정 주기로 저장된다.
만약에 broker가 ack를 보내지 못한 경우에는, producer가 ack를 받지 못했기 때문에 동인한 메세지에 대한 재시도를 수행한다. 이때 enable.idempotence=true로 설정하지 않았다면, broker는 메세지를 중복으로 받지 못한다. 그렇기 떄문에 broker는 메세지를 받지 못하고 duplicate 에러를 보낸다.
Transaction
Transaction을 구현하기 위해서 몇 가지 새로운 개념들이 필요하다.
- Transaction Coordinator
- 각 producer에는 transaction coordinator가 할당되어 PID 할당 및 transaction 를 관리하는 데 필요한 모든 로직을 수행한다.
- Transaction log
- internal kafka topic으로 consumer offset topic과 유사하게 모든 transaction의 영구적인 record를 저장하는 transaction coordinator의 상태 저장소이다.
- Transanctionalid
- producer를 식별하기 위해 사용되는 고유 번호
- 동일한 transactionid를 가진 서로 다른 producer 들은 이전에 만들어진 transaction을 다시 시작하거나 중단 가능
Transaction parameters
Transaction 처리 프로세스
특정 topic에서 데이터를 가져와서 그 데이터를 기반으로 다시 다른 topic으로 데이터를 전송하는 하나의 transaction의 처리 프로세스를 살펴보자.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
1. Transaction Coordinator 찾기
producer가 initTransactions()를 호출하여 내부적으로 FindCoordinatorRequest를 broker에게 보내서 producer 역할을 할 애플리케이션과 매핑이 될 transaction coordinator 의 위치를 찾고, transaction coordinator는 애플리케이션을 위한 PID를 생성한다.
2. Producer ID 얻기
그 다음으로는 producer가 transaction coordinator 한테 initPidRequest를 보내서 producer에 할당될 PID를 가져온다. 그러면 transaction coordinator는 transaction log에 transaction id 와 매핑되어 있는 producer 정보를 기록하여 transaction 이 실해될 준비를 한다.
3. Transaction 시작
producer가 beginTransactions()를 호출하여 새로운 transaction의 시작을 알린다. 시작을 알리게 되면 애플리케이션 쪽에서 transaction coordinator 쪽으로 호출이 일어난다. 하지만 실제로 첫번째 레코드가 전송되기 전까지는 transaction이 시작하지는 않는다.
4. AddPartitionsToTxnRequest, ProduceRequest, AddOffsetCommitsToTxnRequest, TxnOffsetCommitRequest
4-1. 이제 애플리케이션이 producer.send() 메소드로 topic으로 데이터를 보내기 전에 어떤 partition으로 데이터를 보낼 것인지에 대한 정보가 자동으로 transaction coordinator에게 전달된다.
4-1a. 이게 transaction log에 기록된다. 어떤 user topic 쪽으로 보낼지, 어떤 partition으로 보낼지, 그게 PID와 어떻게 매핑되어 있다 라는 정보를 log에 기록하는 것이다. 그리고 이때부터 타이머를 시작한다.
4-2. 이제 producer는 broker에게 메세지를 보낸다.
4-2a. 요청을 받은 브로커는 topic에 데이터를 PID와 함께 write한다.
4-3. producer가 sendOffsetsToTransaction()을 호출하면 transaction coordiantor로 요청이 보내진다.
4-3a. transaction coordinator는 __consumer_offsets
topic 에서 어떤 topic의 partition이 이 transaction과 연결이 되어 있는지 자동으로 추론해서 transaction log에 기록한다.
4-4. producer가 내부적으로 TxnOffsetCommitRequest를 consumer coordinator에게 보낸다.
4-4a. 그리고 consumer offset 쪽으로 해당되는 offset에 대한 정보들을 write 한다. 아직 commit 하기 전이기 때문에 외부에서는 이 데이터를 볼 수 없다.
5. EndTxnRequest, WriteTxnMarkerRequest, Writing the final Commit or Abort Message
5-1. producer가 transaction을 완료하기 위해서 commitTransaction() 또는 abortTransaction()을 호출하는데, commit을 호출하면 producer가 자동으로 transaction coordinator에게 요청을 전달한다.
5-1a. 그러면 transaction coordinator가 transaction log로 prepare라는 record를 기록한다.
5-2. 이제 실제로 transaction coordinator가 commit을 실행한다. user topic 쪽으로 commit을 실행한다.
5-2a. 그리고 consumer offset 관련된 것도 commit을 해서 commit을 완료한다.
5-3. 다 완료되면 정말로 commit 이 되었고, 모든 transaction이 끝났다는 정보를 transaction log에 committed 라고 기록하면서 전체 하나의 transaction을 완료시킨다.