본문 바로가기
Kafka

Kafka-KIP848에 대하여 및 테스트(KRaft)

by ms727 2025. 4. 16.

개요

회사에서 Kafka 클러스터링 구성을 진행하다보니 KIP-848에 대해서 알게 되었다.

나름 Kafka동작방식에 있어서 획기적인 아이디어를 적용하였으니 한 번 알아보면 좋을 것 같아서 좀 찾아보았다.

KIP848

  • KIP-848은 Kafka의 새로운 Consumer Rebalance Protocol을 도입

기존 리밸런싱 방식의 문제점

1.모든 Consumer들이 Coordinator에게 Join 요청

  1. Coordinator가 모든 Consumer의 metadata 수집
  2. 리더 Consumer(보통 첫 번째 consumer)가 전체 assignment를 계산
  3. Coordinator가 각 Consumer에게 할당
  • 모든 Consumer가 Join 완료되어야만 리밸런스가 가능
  • 느린 Consumer 하나 때문에 전체 Group이 지연됨
  • Rebalance 중에는 메시지 처리 중단
  • Consumer 수가 많을수록 느려짐

메세지 처리중단은 클러스터를 구성하는데 나름 크리티컬한 문제가 될 수 있다고 생각하여 KIP848를 적용하는게 낫다고 생각했다. 적용방식도 크게 어렵지는 않다.

KIP848의 이점

KIP848은 리밸런싱을 서버가 담당하여 처리하게 되는데 이로인해서 다음과 같은 이점을 얻을 수 있다.

항목 설명
group.protocol=consumer 새로운 리밸런스 방식을 활성화하는 설정
Server-side assignment 기존 Consumer 리더가 아닌 Broker가 직접 파티션 할당
Epoch 기반 상태 관리 Partition 할당 정보를 Epoch으로 버전 관리
Partial Join 지원 느린 Consumer를 기다릴 필요 없이 리밸런스 가능
Poll() 병렬 처리 가능 일부 Consumer가 늦게 Join해도 다른 Consumer는 메시지를 처리 가능

조건

KIP848은 kafka client가 3.5버전 이상이여야 적용이 가능하며 Spring-kafka의존성은 3.1버전 이상에서 적용가능하다.

kafka Docker Compose 작성

먼저 kafka를 띄워야하기에 Kafka Docker compose를 작성하도록 한다.

services:
  broker:
    image: bitnami/kafka:3.7.0
    hostname: broker
    container_name: broker
    ports:
      - "7092:9092"
      - "10000:9094"
    environment:
      # Kraft Settings
      KAFKA_CFG_NODE_ID: 0
      KAFKA_KRAFT_CLUSTER_ID: HsDBs9l6UUmQq7Y5E6bNlw
      KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@localhost:9093
      KAFKA_CFG_PROCESS_ROLES: controller,broker

      # Listeners
      KAFKA_CFG_LISTENERS: INTERNAL://broker:29092, PLAINTEXT://0.0.0.0:9092, EXTERNAL://:9094, CONTROLLER://:9093
      KAFKA_CFG_ADVERTISED_LISTENERS: INTERNAL://broker:29092, PLAINTEXT://broker:9092, EXTERNAL://127.0.0.1:10000
      KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT,INTERNAL:PLAINTEXT
      KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_CFG_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: 0

      # Clustering
      KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CFG_DEFAULT_REPLICATION_FACTOR: 1

      # KIP-848
      KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "classic,consumer"
      KAFKA_CFG_TRANSACTION_PARTITION_VERIFICATION_ENABLE: "false"

Kafka Producer 설정

@Component
public class ProducerRunner implements ApplicationRunner {

    private final KafkaTemplate<String, String> template;

    ProducerRunner(KafkaTemplate<String, String> template) {
        this.template = template;
    }

    @Bean
    public NewTopic myTopic() {
        return new NewTopic("KIP848-topic", 5, (short) 1); // 파티션 5개, replication factor 1
    }

    @Override
    public void run(ApplicationArguments args) {
        for (int i = 0; i < 20; i++) {
            String key = String.valueOf(i % 5); // 파티션 균등 분산
            String message = (i == 3 || i == 10 || i == 17) ? "slow-message-" + i : "message-" + i;
            template.send("KIP848-topic", key, message);
        }

    }
}

//properties
spring.kafka.bootstrap-servers=localhost:10000

파티션을 5개로 나누고 새로운 토픽을 만든다.

Consumer 설정


@Component
@KafkaListener(groupId = "rebalance-consumer", topics = "KIP848-topic")
public class KafkaConsumerService {

    private static final Logger log = LoggerFactory.getLogger(KafkaConsumerService.class);

    @KafkaHandler
    public void consume(String message) {
        String threadName = Thread.currentThread().getName();
        log.info("[{}][{}] START - message: {}", threadName, LocalDateTime.now(), message);
        if (message.contains("slow")) {
            try {
                Thread.sleep(5000); // 일부 Consumer는 일부러 느리게 동작
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
        log.info("[{}][{}] END   - message: {}", threadName, LocalDateTime.now(), message);

    }
}

//application-consumer1.properties
spring.kafka.consumer.properties.group.protocol=consumer
spring.kafka.consumer.group-id=rebalance-consumer
spring.kafka.consumer.client-id=consumer1

spring.kafka.bootstrap-servers=localhost:10000

spring.kafka.consumer.properties.group.protocol=consumer 이 문법이 있어야 KIP848를 적용할 수 있고,

실제 구동을 하게 되면

이런 문구가 뜨게 된다. 이러면 잘 적용된 것이다.

사실 KIP-848에 대한 테스트를 완벽하게 진행하고 싶었지만

리밸런싱 타이밍을 정확히 맞추기는 어려워 완벽한 테스트를 진행하기 쉽지 않았다.

컨슈머가 10대 정도는 있고 대규모 처리시에 리밸런싱에 대한 관측이 쉬울 것 같으므로 실무에 적용하고 이에 따른 모니터링 결과가 있으면 추가로 적도록 하겠다.