Log Cleanup Policy

Log는 consume 되어도 삭제되지 않기 때문에 log 파일의 관리가 필요하다. 삭제되면 서로 다른 시간에 서로 다른 consumer 들이 데이터를 consume 하지 못한다. 하지만 새로운 데이터는 계속 들어오고, 디스크는 한계가 있기 때문에 언젠가는 과거 데이터를 지워야한다. 따라서 broker나 topic 단위로 데이터를 지우는 작업을 진행한다.

  • log.cleanup.policy 파라미터
    • delete
    • compact
    • delete + compact

log.cleanup.policy로 지워지는 데이터들은 현재 producer에게 메세지를 받아서 write 하고 있지 않는 파일이 대상이다. 즉, 현재 active segment의 log는 cleanup의 대상이 아니다.


Delete Cleanup Policy

말 그대로 과거 데이터를 삭제하는 작업이다. log cleaner thread 가 동작하고 있다가 확인한 다음에 데이터를 삭제해준다.

  • log.cleanup.policy 파라미터: delete로 세팅
  • log.retention.ms : log 보관 주기 (기본값 : 7 일)
    • segment 파일에 저장된 가장 최신의 메세지가 log.retention.ms 보다 오래되었으면 해당 segment를 삭제
  • log.retention.check.interval.ms : log cleaner thread가 log segment가 보관 주기가 넘었는지 안넘었는지 체크하는 주기 (기본값 : 5 분)

Topic의 메세지를 모두 삭제

하지만 log.retention.ms를 넘지않았지만 급하게 데이터를 지우고 싶을 때가 있다. 운영환경에서는 권장하지 않지만 topic의 메세지를 모두 삭제할 수는 있다. 주의할 것은 log file 자체를 직접 삭제하면 안된다.

  1. producer와 consumer를 shutdown (메세지를 계속 보내면 안되니깐)
  2. 지우고 싶은 topic의 retetion.ms = 0 으로 세팅
    kafka-config.sh --zookeeper ${zookeeper ip address} --alter --entity-name topics --entity-name topic_name --add-config retention.ms=0
    
  3. cleanup thread 가 동작할 동안 대기 (cleanup thread 는 기본적으로 5분 마다 동작)
  4. 메세지가 삭제됨
  5. 메세지 삭제 확인 후, 원래 설정으로 원복
    kafka-config.sh --zookeeper ${zookeeper ip address} --alter --entity-name topics --entity-name topic_name --delete-config retention.ms
    

Compact Cleanup Policy

compact cleanup policy를 사용하기 위해서는 메세지에 반드시 key 값이 있어야 한다. Compact Cleanup Policy는 주어진 key에 최신 value만 유지하도록 압축해 주기 때문이다. 동일한 key를 가지는 모든 메세지는 동일한 partition으로 전송되는데, compact 정책은 partition 별로 특정 key의 최신 value만 유지하면서 압축을 해준다. 그렇기 때문에 중간에 partition의 개수를 줄이거나 늘이는 경우 동일한 key를 가진 메세지가 다른 partition에 들어갈 수 있는데, 이러한 경우는 동일한 key를 가진 여러 메세지가 여전히 존재할 수 있다. 중복 제거용 기술이 아니라는 것을 기억해야 한다.

image

로그 압축을 사용하지 않으면 Consumer는 항상 전체 로그를 읽고 각 Key에 대한 가장 최신 상태에 도달할 수 있지만, 로그 압축을 사용하면 오래된 데이터를 읽지 않기 때문에 Consumer가 최종 상태에 더 빨리 도달할 수 있다. __consumer_offsets 라는 internal Topic는 압축기술이 적용되어 있는데 어떤 consumer group 이 어디 까지 데이터를 읽어갔는지는 partition 별로 맨 마지막 것만 가지고 있으면 ㅣ되기 때문이다. 따라서 Compact Cleanup Policy는 시스템 오류 후에 상태를 빠르게 복원하는데 유용한 방법이다.

Log Compaction 동작 원리

Log Compaction 동작 원리를 살펴보자. 아래 그림에서 이전에 cleanup을 진행해서 701번 바로 전 segment 까지 cleanup이 되었다라고 가정하자. cleaner point의 전을 tail, 후를 head라고 한다. head에서 offset map 정보를 뽑아내고 tail의 offset map과 비교한다. k1이라는 key는 head의 offset map에 없기 때문에 유지를 하고, k8이라는 key는 head의 offset map에 존재하기 때문에 tail에서 지워버린다. 이렇게 계속 비교하면서 진행하고 다 진행되었으면 cleaner point의 위치를 변경한다.

image

  • log.cleaner.min.cleanable.ratio (기본값 : 0.5)
    • Head 영역 데이터가 Tail 영역보다 크면(기본값 50%), Cleaner 시작
  • log.cleaner.io.max.bytes.per.second (기본값 : 무제한)
    • Log Cleaner의 Read/Write 의 처리량을 제한하여 시스템 리소스 보호 가능
  • 동일한 Key를 갖는 메시지가 매우 많은 경우, 더 빠른 정리를 위해서 아래의 파라미터를 증가시켜야 함
    • log.cleaner.threads (기본값 : 1)
    • log.cleaner.dedupe.buffer.size (기본값 : 134,217,728)

Tombstone Message

원래 kafka는 메세지를 지우지 않는 시스템인데, compaction을 실행하면 데이터가 지워진다. 이 때 tombstone message를 같이 주면 특정 key를 가진 데이터를 모두 삭제할 수 있다.

Compaction 사용시에 K1를 Key로 가지는 메시지를 지우려면, 동일한 Key를 가지는 메시지에 null value를 가지는 메세제를 Topic으로 보내면 된다. 그러면 최신 데이터가 null이기 때문에 이건 의미 없는 데이터라는 판단을 하고 해당 key를 다 지워버린다. 이때 해당 메시지가 지워지기 까지 (기본 1 day) 시간이 있으므로, 그 전까지는 해당 메시지를 consume할 수 있다. 메시지를 지우기 전 보관 기간 (기본 1 day)은 log.cleaner.delete.retention.ms 로 조정한다.