Producer 내부 아키텍처

전체 흐름

Application
    │
    │ producer.send(record)
    ▼
┌─────────────────────────────────────────────────────┐
│                  KafkaProducer                       │
│                                                      │
│  Serializer → Partitioner → RecordAccumulator        │
│                                    │                 │
│                              배치 누적                │
│                                    │                 │
│                             Sender Thread            │
│                             (I/O 전담)               │
└─────────────────────────────────────────────────────┘
                                    │
                             NetworkClient
                                    │
                             Kafka Broker

RecordAccumulator

Producer 스레드와 Sender 스레드 사이의 버퍼 역할을 한다. 각 TopicPartition마다 Deque<ProducerBatch>를 유지하여 메시지를 배치로 묶는다.

RecordAccumulator 내부:
┌─────────────────────────────────────────────┐
│                                             │
│  orders-P0: [batch1: msg0,msg1,msg2] [batch2: msg3] │
│  orders-P1: [batch1: msg4,msg5]             │
│  payments-P0: [batch1: msg6]                │
│                                             │
└─────────────────────────────────────────────┘
         ↑ Producer 스레드가 추가
         ↓ Sender 스레드가 가져가서 전송

주요 설정:

buffer.memory=33554432      # 전체 버퍼 메모리 (기본 32MB)
batch.size=16384            # 배치 최대 크기 (기본 16KB)
linger.ms=0                 # 배치 대기 시간 (기본 0ms, 즉시 전송)
max.block.ms=60000          # 버퍼 꽉 찼을 때 block 최대 시간

Sender 스레드

RecordAccumulator에서 배치를 가져와 Kafka Broker로 전송하는 백그라운드 I/O 스레드다.

Sender 동작 사이클:
1. RecordAccumulator에서 전송 가능한 배치 수집
2. Metadata에서 각 파티션의 Leader 브로커 확인
3. 브로커별로 요청 묶기 (같은 브로커로 가는 배치 합산)
4. NetworkClient로 비동기 전송
5. 응답 수신 후 Future 완료 처리

in-flight 요청 수 제한:
max.in.flight.requests.per.connection=5  # 브로커당 동시 미확인 요청 수

배치와 압축

배치 전략

linger.ms=0 (기본):
  메시지 도착 즉시 전송 → 지연 최소, 처리량 낮음
  단일 메시지가 하나의 네트워크 요청 = 오버헤드 큼

linger.ms=10:
  10ms 대기 후 배치로 전송 → 지연 소폭 증가, 처리량 크게 향상
  같은 시간 내 도착한 메시지들이 하나의 배치

batch.size=65536 (64KB):
  배치가 64KB 차면 즉시 전송 (linger.ms 기다리지 않음)
처리량 최적화 설정:
linger.ms=20
batch.size=131072       # 128KB
compression.type=snappy
buffer.memory=67108864  # 64MB

압축

Producer에서 압축하면 네트워크 전송량과 브로커 저장 공간을 줄일 수 있다. Broker는 압축 해제 없이 그대로 저장하고 Consumer가 해제한다.

압축 알고리즘 압축률 속도 CPU 사용 권장 상황
none 없음 - 없음 기본, 소량 데이터
gzip 높음 느림 높음 디스크 공간 최우선
snappy 중간 빠름 낮음 범용 (Google 권장)
lz4 중간 매우 빠름 낮음 처리량 최우선
zstd 높음 빠름 중간 고압축률+고성능
compression.type=snappy   # Producer 설정

파티셔닝

기본 파티셔닝 로직

// Kafka 2.4+ 기본 파티셔너: StickyPartitioner
// 키가 없는 메시지: 배치가 찰 때까지 같은 파티션에 Sticky
// 키가 있는 메시지: murmur2 해시 기반으로 파티션 결정

// 키 기반 파티셔닝 (순서 보장):
ProducerRecord<String, String> record =
    new ProducerRecord<>("orders", orderId, orderJson);
// orderId가 같으면 항상 같은 파티션 → 순서 보장

커스텀 파티셔너

public class OrderPriorityPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        int numPartitions = cluster.partitionCountForTopic(topic);

        // VIP 주문은 파티션 0으로 고정 (별도 Consumer가 전담 처리)
        if (value instanceof String && ((String) value).contains("\"vip\":true")) {
            return 0;
        }

        // 일반 주문은 나머지 파티션에 분산
        return (Utils.murmur2(keyBytes) & Integer.MAX_VALUE) % (numPartitions - 1) + 1;
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}
}

// 설정
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
    OrderPriorityPartitioner.class.getName());

멱등성 Producer

재시도로 인한 중복 문제

일반 Producer 재시도 시나리오:
1. Producer → Broker: msg1 전송
2. Broker: msg1 저장 성공
3. 네트워크 장애로 ack 미전달
4. Producer: timeout → 재시도
5. Producer → Broker: msg1 재전송
6. Broker: msg1 중복 저장
→ 메시지 중복!

멱등성 Producer 동작

enable.idempotence=true
# 자동으로 설정됨:
# acks=all
# max.in.flight.requests.per.connection=5 이하
# retries=Integer.MAX_VALUE
멱등성 메커니즘:
1. Producer 시작 시 PID (Producer ID) 발급받음
2. 각 메시지에 (PID, Sequence Number) 부여
3. Broker는 파티션별로 마지막 Sequence Number 추적
4. 중복 시퀀스가 오면 저장하지 않고 ack만 반환

Producer: (PID=42, Seq=5, msg1) 전송
Broker: 저장, 마지막 seq=5 기록
네트워크 장애 후 재시도: (PID=42, Seq=5, msg1) 재전송
Broker: seq=5 이미 처리됨 → 중복 무시, ack 반환
→ 중복 없이 exactly-once (파티션 내)
한계:
- 단일 파티션 내에서만 exactly-once 보장
- Producer 재시작 시 PID가 바뀌어 보장 범위 초기화
- 크로스 파티션 또는 크로스 토픽 exactly-once는 트랜잭션 필요

트랜잭션 Producer

트랜잭션이란?

여러 파티션 또는 여러 토픽에 걸쳐 원자적 쓰기를 보장한다. 모두 성공하거나 모두 실패한다.

// 트랜잭션 설정
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-producer-1");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // 트랜잭션 초기화

// 트랜잭션 사용
producer.beginTransaction();
try {
    producer.send(new ProducerRecord<>("orders", orderId, orderJson));
    producer.send(new ProducerRecord<>("inventory-reservations", orderId, reservationJson));
    producer.send(new ProducerRecord<>("notifications", orderId, notificationJson));

    producer.commitTransaction(); // 3개 토픽 모두 원자적 커밋
} catch (Exception e) {
    producer.abortTransaction(); // 3개 토픽 모두 롤백
    throw e;
}

Consumer-Producer 트랜잭션 (Read-Process-Write)

// Kafka Streams가 내부적으로 사용하는 패턴
// Consumer에서 읽고, 처리하고, Producer로 쓰는 과정을 원자적으로

producer.beginTransaction();
try {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
        String result = processRecord(record);
        producer.send(new ProducerRecord<>("output-topic", record.key(), result));
    }

    // Consumer Offset 커밋을 트랜잭션에 포함
    Map<TopicPartition, OffsetAndMetadata> offsets = getOffsets(records);
    producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());

    producer.commitTransaction();
} catch (Exception e) {
    producer.abortTransaction();
}

Isolation Level

트랜잭션 메시지를 Consumer에서 읽는 방법은 isolation.level 설정으로 제어한다.

# Consumer 설정
isolation.level=read_committed   # 커밋된 트랜잭션 메시지만 읽음 (기본: read_uncommitted)
read_uncommitted (기본):
  진행 중인 트랜잭션 메시지도 즉시 읽음
  → 나중에 abort되면 이미 처리한 메시지가 유효하지 않을 수 있음

read_committed:
  commitTransaction() 완료된 메시지만 읽음
  → Last Stable Offset(LSO) 이하의 메시지만 노출
  → 처리량 소폭 감소, 정확한 exactly-once 보장

Producer 행(Hang) 원인과 대응

원인 1: 버퍼 꽉 참

증상: producer.send()가 max.block.ms 동안 블로킹 후 BufferExhaustedException

원인:
  - Sender 스레드가 브로커에 전송하는 속도 < 메시지 생성 속도
  - 브로커 장애로 전송 불가 → 버퍼 지속 누적

해결:
  buffer.memory 증가 (기본 32MB → 64MB+)
  linger.ms 줄여 더 자주 전송
  브로커 장애 확인
  max.block.ms를 줄여 빠른 실패(fail-fast) 처리

원인 2: Metadata 조회 실패

증상: Producer 시작 시 또는 리더 변경 후 메시지 전송 지연/실패

원인:
  - 브로커 연결 불가 → Metadata 조회 실패
  - Leader 선출 중인 파티션에 쓰기 시도

설정:
  metadata.max.age.ms=300000      # Metadata 캐시 갱신 주기
  max.block.ms=60000              # Metadata 조회 대기 최대 시간
  request.timeout.ms=30000        # 요청 타임아웃
  reconnect.backoff.ms=50         # 재연결 시도 간격
  reconnect.backoff.max.ms=1000   # 재연결 최대 대기

원인 3: in-flight 요청 과다

증상: 처리량은 높은데 지연 증가

원인: 브로커가 요청 처리보다 느리게 응답
     → in-flight 요청 누적 → 버퍼 소진

해결:
  max.in.flight.requests.per.connection=1~5 (멱등성 활성화 시 5 이하 필수)
  acks=1로 변경 (내구성 트레이드오프)

재시도 중복 방지

재시도 설정

retries=2147483647                  # 사실상 무한 재시도
retry.backoff.ms=100                # 재시도 간격 (기본 100ms)
delivery.timeout.ms=120000          # 전체 전송 타임아웃 (기본 2분)
                                    # retries를 넘더라도 이 시간 내에만 재시도

재시도 중복 방지 전략

전략 1: 멱등성 Producer (단순, 단일 파티션)
  enable.idempotence=true
  → Broker가 시퀀스 번호로 중복 감지

전략 2: 트랜잭션 Producer (크로스 파티션/토픽)
  transactional.id=unique-id
  → 트랜잭션 단위로 원자적 처리

전략 3: 애플리케이션 레벨 멱등 키
  메시지에 고유 ID 포함 → Consumer가 중복 체크
  → Kafka 설정 무관하게 항상 사용 가능

전략 4: Outbox 패턴
  DB 트랜잭션으로 발행 보장 → Relay가 중복 없이 전송

멱등성 + 트랜잭션 조합

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 멱등성
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

        // 트랜잭션 (서비스 인스턴스별 고유 ID)
        String hostname = System.getenv("HOSTNAME");
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-svc-" + hostname);

        // 내구성
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);

        // 배치 성능
        props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

        return new DefaultKafkaProducerFactory<>(props);
    }
}

성능 튜닝 요약

목표 설정
처리량 최대화 linger.ms 20~50
처리량 최대화 batch.size 65536~131072
처리량 최대화 compression.type snappy / lz4
처리량 최대화 buffer.memory 64MB+
지연 최소화 linger.ms 0~5
지연 최소화 batch.size 작게
내구성 최대화 acks all
내구성 최대화 enable.idempotence true
내구성 최대화 min.insync.replicas 2
처리량 vs 지연 트레이드오프:
linger.ms=0:  지연 낮음, 처리량 낮음
linger.ms=50: 지연 높음, 처리량 높음

실무 권장:
  - 실시간 처리 필요: linger.ms=0~5
  - 대량 데이터 파이프라인: linger.ms=20~100

카테고리:

업데이트: