Kafka Broker
브로커 아키텍처
브로커란?
Kafka 클러스터를 구성하는 개별 서버 노드다. 각 브로커는 파티션 데이터 저장, 클라이언트 요청 처리, 복제 참여를 담당한다.
Kafka Cluster
┌────────────────────────────────────────────────────────┐
│ │
│ Broker 1 (id:1) Broker 2 (id:2) Broker 3 (id:3)│
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ orders-P0 │ │ orders-P1 │ │ orders-P2 │ │
│ │ (Leader) │ │ (Leader) │ │ (Leader) │ │
│ │ orders-P1 │ │ orders-P2 │ │ orders-P0 │ │
│ │ (Follower) │ │ (Follower) │ │ (Follower) │ │
│ │ orders-P2 │ │ orders-P0 │ │ orders-P1 │ │
│ │ (Follower) │ │ (Follower) │ │ (Follower) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Controller: Broker 1 │
└────────────────────────────────────────────────────────┘
│
ZooKeeper / KRaft 클러스터
브로커 내부 컴포넌트
┌─────────────────────────────────────────────────────┐
│ Kafka Broker │
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ Network Layer│ │ API Layer │ │
│ │ (Acceptor, │ → │ (KafkaApis) │ │
│ │ Processor) │ └──────────────┘ │
│ └──────────────┘ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Request Handler Pool │ │
│ │ (num.io.threads 스레드 풀) │ │
│ └──────────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────┼──────────────────┐ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────┐ │
│ │ ReplicaManager│ │ GroupCoord. │ │ LogManager│ │
│ │ (복제 관리) │ │ (Consumer │ │(파티션 로그│ │
│ │ │ │ 그룹 관리) │ │ 파일 관리) │ │
│ └──────────────┘ └──────────────┘ └──────────┘ │
└─────────────────────────────────────────────────────┘
로그 세그먼트
파티션 저장 구조
각 파티션은 디스크에 여러 세그먼트 파일로 저장된다.
/kafka-logs/orders-0/ (orders 토픽, 파티션 0)
├── 00000000000000000000.log (세그먼트 파일: 메시지 본문)
├── 00000000000000000000.index (오프셋 → 파일 위치 인덱스)
├── 00000000000000000000.timeindex (타임스탬프 → 오프셋 인덱스)
├── 00000000000001048576.log (다음 세그먼트: offset 1048576부터)
├── 00000000000001048576.index
├── 00000000000001048576.timeindex
└── leader-epoch-checkpoint
파일명의 숫자는 해당 세그먼트의 시작 오프셋이다.
세그먼트 전환 (Rolling)
새 세그먼트가 생성되는 조건:
log.segment.bytes=1073741824 # 1GB 초과 시 새 세그먼트
log.roll.hours=168 # 7일 경과 시 새 세그먼트
log.roll.jitter.hours=0 # 세그먼트 전환 시간 분산 (0이면 동시 전환)
메시지 조회 방식
Consumer가 offset 150000 요청:
1. 바이너리 서치로 세그먼트 파일 특정:
00000000000000000000.log → offset 0~149999
00000000000000150000.log → offset 150000~ (이 파일)
2. .index 파일에서 offset → 파일 내 byte 위치 조회 (희소 인덱스)
offset 150000 → file position: 8192
3. .log 파일의 8192 위치부터 읽기 (Sequential I/O)
인덱스 구조 (희소 인덱스)
인덱스는 모든 메시지에 대해 생성되지 않음:
log.index.interval.bytes=4096 # 4KB마다 인덱스 항목 생성
인덱스 파일:
offset=0 → file_pos=0
offset=50 → file_pos=4096
offset=103 → file_pos=8192
...
조회: 이진 탐색으로 가장 가까운 인덱스 항목 찾은 후
로그 파일에서 순차 스캔으로 정확한 offset 찾기
데이터 보존 정책
# 시간 기반
log.retention.hours=168 # 7일 보존 (기본값)
log.retention.ms=604800000 # ms 단위 (우선순위 높음)
# 크기 기반
log.retention.bytes=1073741824 # 파티션당 최대 1GB
# 정리 방식
log.cleanup.policy=delete # 만료 세그먼트 삭제 (기본)
log.cleanup.policy=compact # 키별 마지막 값만 유지
log.cleanup.policy=delete,compact # 두 가지 병행
# 삭제되지 않는 최소 보존 시간 (compact 사용 시)
log.retention.minutes=1440 # 최소 1일 보존 후 compaction
파티션 리더 선출
정상적인 리더 선출
토픽 생성 시 리더 분산 배치:
Broker1, Broker2, Broker3 각각 균등하게 리더 할당
파티션 0: Leader=Broker1, ISR=[1,2,3]
파티션 1: Leader=Broker2, ISR=[2,3,1]
파티션 2: Leader=Broker3, ISR=[3,1,2]
리더 장애 시 선출 과정
ZooKeeper 기반 (기존):
1. Broker2(Leader)가 ZooKeeper 세션 만료 (장애)
2. Controller(Broker1)가 ZooKeeper 이벤트로 감지
3. Controller가 ISR 목록에서 새 리더 선정
ISR=[2,3,1] → Broker2 제외 → Broker3 선정
4. Controller가 ZooKeeper에 새 리더 정보 업데이트
5. Controller가 모든 브로커에게 LeaderAndIsr 요청 전송
6. Broker3이 리더 역할 시작
7. 브로커들이 Metadata 캐시 갱신
8. Producer/Consumer가 새 리더로 재연결
KRaft 기반 (Kafka 3.x+):
1. Broker2(Leader)가 Raft heartbeat 중단 (장애)
2. KRaft Controller 쿼럼이 감지 (ZooKeeper 불필요)
3. Active Controller가 새 리더 결정
4. 변경사항을 Raft 로그에 기록 (과반수 동의)
5. Broker들에게 새 리더 정보 브로드캐스트
6. 선출 완료 (ZooKeeper 대비 수십 ms 빠름)
Preferred Leader 선출
각 파티션에는 최초 지정된 Preferred Leader가 있다. 장애 후 복구 시 Preferred Leader로 다시 리더를 이전한다.
# Preferred Leader 선출 트리거
kafka-leader-election.sh --bootstrap-server kafka:9092 \
--election-type PREFERRED \
--all-topic-partitions
# 설정으로 자동화
auto.leader.rebalance.enable=true # 자동 Preferred Leader 복귀 (기본 true)
leader.imbalance.check.interval.seconds=300 # 5분마다 체크
leader.imbalance.per.broker.percentage=10 # 10% 이상 불균형 시 재조정
Controller
Controller의 역할
클러스터 내 단 하나의 브로커가 Controller 역할을 담당한다. 클러스터 관리의 핵심 두뇌다.
Controller 담당 업무:
1. 브로커 장애 감지 및 리더 재선출
2. 토픽/파티션 생성/삭제 처리
3. ISR 변경 사항 관리
4. 파티션 재할당 조정
5. 클러스터 Metadata 관리
Controller 선출
ZooKeeper 기반:
모든 브로커가 ZooKeeper의 /controller 경로에 임시 노드(ephemeral node) 생성 경쟁
→ 먼저 생성한 브로커가 Controller
Controller 장애 시:
ZooKeeper가 ephemeral node 삭제
→ 다른 브로커들이 /controller 노드 생성 경쟁
→ 새 Controller 선출
KRaft (Kafka Raft Metadata mode):
별도 KRaft Controller 노드 또는 브로커와 통합 모드로 실행
Raft 합의 알고리즘으로 Controller 선출 및 Metadata 관리
ZooKeeper 의존성 완전 제거
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
process.roles=broker,controller # 브로커+컨트롤러 통합 모드
KRaft
ZooKeeper vs KRaft
| 구분 | ZooKeeper 기반 | KRaft |
|---|---|---|
| Metadata 저장 | ZooKeeper 외부 시스템 | Kafka 내부 Raft 로그 |
| Controller 선출 | ZooKeeper ephemeral node | Raft 합의 |
| 확장성 | 파티션 수 수만 개 한계 | 수백만 파티션 지원 |
| 운영 복잡도 | ZooKeeper 별도 관리 | 단일 시스템 관리 |
| 장애 복구 | 수십 초 | 수 초 |
| Kafka 버전 | 3.x까지 지원, 4.0에서 제거 | 2.8+ 지원, 3.3+에서 프로덕션 |
KRaft 설정
# KRaft 모드 server.properties
process.roles=broker,controller # broker 또는 controller 또는 둘 다
node.id=1
controller.quorum.voters=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
listeners=PLAINTEXT://kafka1:9092,CONTROLLER://kafka1:9093
inter.broker.listener.name=PLAINTEXT
controller.listener.names=CONTROLLER
# Cluster ID 생성 (최초 1회)
kafka-storage.sh random-uuid
# 스토리지 초기화
kafka-storage.sh format \
--config /etc/kafka/server.properties \
--cluster-id <UUID>
브로커 장애 시 동작
장애 감지
ZooKeeper 기반:
- 브로커가 ZooKeeper에 heartbeat 전송 (zookeeper.session.timeout.ms)
- 기본 18초 내 heartbeat 없으면 장애로 판단
- ZooKeeper가 해당 브로커의 임시 노드 삭제
KRaft 기반:
- 브로커가 Controller에 heartbeat 전송
- 기본 9초(broker.session.timeout.ms) 내 없으면 장애로 판단
장애 시 순서
1. Controller가 장애 브로커 감지
2. 장애 브로커가 Leader인 파티션 목록 수집
3. 각 파티션의 ISR에서 새 Leader 선출
4. 새 Leader 정보를 모든 브로커에 전파 (LeaderAndIsr 요청)
5. ZooKeeper/KRaft에 새 Metadata 기록
6. Producer/Consumer가 Metadata 갱신 후 새 Leader로 연결
7. 장애 브로커 복구 후 ISR 재참여 (복제 따라잡기)
장애 브로커 복구 후 처리
브로커 재시작:
1. 로그 복구 (Log Recovery)
- 마지막 checkpoint 이후 로그 일관성 확인
- Leader Epoch 기반으로 불일치 로그 truncate
2. Controller에 재등록
3. 각 파티션의 Leader에게 fetch 시작
4. Leader의 LEO까지 따라잡으면 ISR 재진입
5. 필요시 Preferred Leader 복귀
파티션 증설 시 주의사항
파티션 추가 방법
# 파티션 수 늘리기 (6 → 9)
kafka-topics.sh --bootstrap-server kafka:9092 \
--alter --topic orders \
--partitions 9
주의사항:
1. 파티션 감소는 불가능 (증가만 가능)
2. 기존 메시지는 재배치되지 않음
3. 새 파티션에는 새 메시지만 들어감
키 기반 파티셔닝 영향
파티션 3개일 때:
key="order-123" → murmur2(key) % 3 = 1 → Partition 1
파티션 9개로 증가 후:
key="order-123" → murmur2(key) % 9 = 7 → Partition 7
결과: 같은 키가 다른 파티션으로 → 순서 보장 깨짐
기존 데이터(P1)와 새 데이터(P7)가 분산됨
안전한 파티션 증가 절차:
1. 영향받는 Consumer의 처리 로직이 순서에 의존하는지 확인
2. 의존하는 경우: 기존 데이터 소비 완료 후 파티션 증가
3. 의존하지 않는 경우: 즉시 증가 가능
4. 새 파티션에 기존 데이터 마이그레이션이 필요하면 별도 작업
파티션 재할당 (Partition Reassignment)
브로커를 추가했을 때 기존 파티션을 새 브로커에 재분산한다.
# 재할당 계획 생성
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--broker-list "1,2,3,4" \
--topics-to-move-json-file topics.json \
--generate
# 재할당 실행
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--reassignment-json-file reassignment.json \
--execute
# 재할당 상태 확인
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--reassignment-json-file reassignment.json \
--verify
재할당 시 주의:
- 대용량 데이터 이동으로 네트워크/디스크 I/O 급증
- 운영 시간 외 또는 throttle 적용 권장
throttle 적용:
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type brokers \
--entity-name 1 \
--alter \
--add-config leader.replication.throttled.rate=10485760 # 10MB/s
브로커 성능 튜닝
OS 수준 설정
# 파일 디스크립터 한도
ulimit -n 100000
echo "kafka soft nofile 100000" >> /etc/security/limits.conf
echo "kafka hard nofile 100000" >> /etc/security/limits.conf
# 가상 메모리 (Kafka는 페이지 캐시를 적극 활용)
sysctl -w vm.swappiness=1 # 스왑 거의 사용 안 함
sysctl -w vm.dirty_background_ratio=5 # 백그라운드 플러시 시작 임계값
sysctl -w vm.dirty_ratio=80 # 동기 플러시 임계값
# 네트워크 버퍼
sysctl -w net.core.rmem_max=16777216
sysctl -w net.core.wmem_max=16777216
브로커 핵심 설정
# 스레드 설정
num.network.threads=8 # 네트워크 I/O 스레드 (CPU 코어 수 기준)
num.io.threads=16 # 디스크 I/O 스레드 (num.network.threads * 2 권장)
num.replica.fetchers=4 # 복제 fetch 스레드
# 로그 설정
log.dirs=/data/kafka-logs # 여러 디스크: /data1/kafka,/data2/kafka
log.segment.bytes=1073741824
log.retention.hours=168
# 소켓 버퍼
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
JVM 설정
# kafka-server-start.sh 수정 또는 KAFKA_HEAP_OPTS 환경변수
export KAFKA_HEAP_OPTS="-Xmx6g -Xms6g"
# G1GC 설정 (대용량 힙 권장)
export KAFKA_JVM_PERFORMANCE_OPTS="-server \
-XX:+UseG1GC \
-XX:MaxGCPauseMillis=20 \
-XX:InitiatingHeapOccupancyPercent=35 \
-XX:+ExplicitGCInvokesConcurrent \
-Djava.awt.headless=true"
모니터링 핵심 지표
브로커 헬스:
kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec # 초당 수신 바이트
kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec # 초당 송신 바이트
kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec # 초당 메시지 수
요청 처리:
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=Produce # Produce 지연
kafka.network:type=RequestMetrics,name=TotalTimeMs,request=FetchConsumer
kafka.network:type=RequestMetrics,name=RequestsPerSec
복제:
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions # 복제 지연 파티션
kafka.server:type=ReplicaManager,name=LeaderCount # 이 브로커가 리더인 파티션 수
kafka.controller:type=KafkaController,name=ActiveControllerCount # 1이면 정상
디스크:
kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs # 플러시 통계